mirror of
https://github.com/r4sas/PBinCLI
synced 2025-01-09 22:37:53 +00:00
[wip] url shortener support (#19)
Signed-off-by: r4sas <r4sas@i2pmail.org>
This commit is contained in:
parent
6b6c33e545
commit
635c87dabd
@ -2,6 +2,6 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
__author__ = "R4SAS <r4sas@i2pmail.org>"
|
__author__ = "R4SAS <r4sas@i2pmail.org>"
|
||||||
__version__ = "0.2.1"
|
__version__ = "0.2.2b1"
|
||||||
__copyright__ = "Copyright (c) R4SAS"
|
__copyright__ = "Copyright (c) R4SAS"
|
||||||
__license__ = "MIT"
|
__license__ = "MIT"
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
from pbincli.format import Paste
|
from pbincli.format import Paste
|
||||||
|
from pbincli.utils import PBinCLIError
|
||||||
|
|
||||||
def send(args, api_client):
|
def send(args, api_client):
|
||||||
if not args.notext:
|
if not args.notext:
|
||||||
@ -7,8 +8,7 @@ def send(args, api_client):
|
|||||||
elif args.stdin:
|
elif args.stdin:
|
||||||
text = args.stdin.read()
|
text = args.stdin.read()
|
||||||
elif not args.file:
|
elif not args.file:
|
||||||
print("Nothing to send!")
|
PBinCLIError("Nothing to send!")
|
||||||
exit(1)
|
|
||||||
else:
|
else:
|
||||||
text = ""
|
text = ""
|
||||||
|
|
||||||
@ -64,11 +64,9 @@ def send(args, api_client):
|
|||||||
result['id'],
|
result['id'],
|
||||||
passphrase))
|
passphrase))
|
||||||
elif result['status']: # return code is other then zero
|
elif result['status']: # return code is other then zero
|
||||||
print("Something went wrong...\nError:\t\t{}".format(result['message']))
|
PBinCLIError("Something went wrong...\nError:\t\t{}".format(result['message']))
|
||||||
exit(1)
|
|
||||||
else: # or here no status field in response or it is empty
|
else: # or here no status field in response or it is empty
|
||||||
print("Something went wrong...\nError: Empty response.")
|
PBinCLIError("Something went wrong...\nError: Empty response.")
|
||||||
exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
def get(args, api_client):
|
def get(args, api_client):
|
||||||
@ -77,12 +75,10 @@ def get(args, api_client):
|
|||||||
try:
|
try:
|
||||||
pasteid, passphrase = args.pasteinfo.split("#")
|
pasteid, passphrase = args.pasteinfo.split("#")
|
||||||
except ValueError:
|
except ValueError:
|
||||||
print("PBinCLI error: provided info hasn't contain valid PasteID#Passphrase string")
|
PBinCLIError("Provided info hasn't contain valid PasteID#Passphrase string")
|
||||||
exit(1)
|
|
||||||
|
|
||||||
if not (pasteid and passphrase):
|
if not (pasteid and passphrase):
|
||||||
print("PBinCLI error: Incorrect request")
|
PBinCLIError("Incorrect request")
|
||||||
exit(1)
|
|
||||||
|
|
||||||
if args.debug: print("PasteID:\t{}\nPassphrase:\t{}".format(pasteid, passphrase))
|
if args.debug: print("PasteID:\t{}\nPassphrase:\t{}".format(pasteid, passphrase))
|
||||||
|
|
||||||
@ -139,11 +135,9 @@ def get(args, api_client):
|
|||||||
api_client.delete(json_encode({'pasteid':pasteid,'deletetoken':'burnafterreading'}))
|
api_client.delete(json_encode({'pasteid':pasteid,'deletetoken':'burnafterreading'}))
|
||||||
|
|
||||||
elif result['status']: # return code is other then zero
|
elif result['status']: # return code is other then zero
|
||||||
print("Something went wrong...\nError:\t\t{}".format(result['message']))
|
PBinCLIError("Something went wrong...\nError:\t\t{}".format(result['message']))
|
||||||
exit(1)
|
|
||||||
else: # or here no status field in response or it is empty
|
else: # or here no status field in response or it is empty
|
||||||
print("Something went wrong...\nError: Empty response.")
|
PBinCLIError("Something went wrong...\nError: Empty response.")
|
||||||
exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
def delete(args, api_client):
|
def delete(args, api_client):
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
import requests
|
import requests
|
||||||
|
from pbincli.utils import PBinCLIError
|
||||||
|
|
||||||
class PrivateBin:
|
class PrivateBin:
|
||||||
def __init__(self, settings=None):
|
def __init__(self, settings=None):
|
||||||
@ -27,8 +28,7 @@ class PrivateBin:
|
|||||||
try:
|
try:
|
||||||
return result.json()
|
return result.json()
|
||||||
except ValueError:
|
except ValueError:
|
||||||
print("ERROR: Unable parse response as json. Received (size = {}):\n{}".format(len(result.text), result.text))
|
PBinCLIError("Unable parse response as json. Received (size = {}):\n{}".format(len(result.text), result.text))
|
||||||
exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
def get(self, request):
|
def get(self, request):
|
||||||
@ -56,11 +56,9 @@ class PrivateBin:
|
|||||||
if not result['status']:
|
if not result['status']:
|
||||||
print("Paste successfully deleted!")
|
print("Paste successfully deleted!")
|
||||||
elif result['status']:
|
elif result['status']:
|
||||||
print("Something went wrong...\nError:\t\t{}".format(result['message']))
|
PBinCLIError("Something went wrong...\nError:\t\t{}".format(result['message']))
|
||||||
exit(1)
|
|
||||||
else:
|
else:
|
||||||
print("Something went wrong...\nError: Empty response.")
|
PBinCLIError("Something went wrong...\nError: Empty response.")
|
||||||
exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
def getVersion(self):
|
def getVersion(self):
|
||||||
@ -74,9 +72,24 @@ class PrivateBin:
|
|||||||
else 1
|
else 1
|
||||||
|
|
||||||
class Shortener:
|
class Shortener:
|
||||||
def __init__(self, settings=None):
|
"""Some parts of this class was taken from
|
||||||
self.server = settings['server']
|
python-yourls (https://github.com/tflink/python-yourls/) library
|
||||||
self.headers = {'X-Requested-With': 'JSONHttpRequest'}
|
"""
|
||||||
|
def __init__(self, apiurl, username=None, password=None, token=None, settings=None):
|
||||||
|
# we checking which service is used, because some services doesn't require
|
||||||
|
# any authentication, or have only one domain on which it working
|
||||||
|
if settings['api'] == 'yourls':
|
||||||
|
if not apiurl:
|
||||||
|
PBinCLIError("YOURLS: An API URL is required")
|
||||||
|
self.apiurl = apiurl
|
||||||
|
|
||||||
|
if not username or not password:
|
||||||
|
if not self.token:
|
||||||
|
PBinCLIError("YOURLS: username and password or token are required")
|
||||||
|
else:
|
||||||
|
self.auth_args = {'signature': token}
|
||||||
|
else:
|
||||||
|
self.auth_args = {'username': self.username, 'password': self.password}
|
||||||
|
|
||||||
if settings['proxy']:
|
if settings['proxy']:
|
||||||
self.proxy = {settings['proxy'].split('://')[0]: settings['proxy']}
|
self.proxy = {settings['proxy'].split('://')[0]: settings['proxy']}
|
||||||
@ -90,3 +103,33 @@ class Shortener:
|
|||||||
self.session = requests.Session()
|
self.session = requests.Session()
|
||||||
self.session.verify = settings['nocheckcert']
|
self.session.verify = settings['nocheckcert']
|
||||||
|
|
||||||
|
def yourls(self, url):
|
||||||
|
data_format = 'json'
|
||||||
|
args = {'action': 'shorturl', 'format': data_format, 'url': url}
|
||||||
|
reqest = dict(args.items() + self.auth_args.items())
|
||||||
|
|
||||||
|
result = self.session.post(
|
||||||
|
url = self.apiurl,
|
||||||
|
proxies = self.proxy,
|
||||||
|
data = request)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result.json()
|
||||||
|
if result['status'] == 'fail' and result['code'] == 'error:keyword':
|
||||||
|
PBinCLIError("YOURLS: Received error from API: {}".format(result['message']))
|
||||||
|
if not 'shorturl' in result:
|
||||||
|
PBinCLIError("YOURLS: Unknown error: {}".format(result['message']))
|
||||||
|
|
||||||
|
print("Paste short URL: {}".format(result['shorturl']))
|
||||||
|
except ValueError:
|
||||||
|
PBinCLIError("YOURLS: Unable parse response. Received (size = {}):\n{}".format(len(result.text), result.text))
|
||||||
|
|
||||||
|
def clck(self, url):
|
||||||
|
# from urllib.parse import quote_plus
|
||||||
|
request = {'url': url}
|
||||||
|
|
||||||
|
result = self.session.post(
|
||||||
|
url = "https://clck.ru/--",
|
||||||
|
proxies = self.proxy,
|
||||||
|
data = request)
|
||||||
|
print("Paste short URL: {}".format(result))
|
||||||
|
@ -39,7 +39,7 @@ def main():
|
|||||||
# URL shortener
|
# URL shortener
|
||||||
send_parser.add_argument("-s", "--shorten", default=False, action="store_true", help="use URL shortener")
|
send_parser.add_argument("-s", "--shorten", default=False, action="store_true", help="use URL shortener")
|
||||||
send_parser.add_argument("--shorten-api", default="yourls", action="store",
|
send_parser.add_argument("--shorten-api", default="yourls", action="store",
|
||||||
choices=["yourls"], help="select API used for selected URL shortener service")
|
choices=["clckru", "yourls"], help="API used by shortener service (default: YOURLS)")
|
||||||
send_parser.add_argument("--shorten-url", help="URL of shortener service API")
|
send_parser.add_argument("--shorten-url", help="URL of shortener service API")
|
||||||
send_parser.add_argument("--shorten-user", help="Shortener username")
|
send_parser.add_argument("--shorten-user", help="Shortener username")
|
||||||
send_parser.add_argument("--shorten-pass", help="Shortener password")
|
send_parser.add_argument("--shorten-pass", help="Shortener password")
|
||||||
@ -89,7 +89,7 @@ def main():
|
|||||||
if var in os.environ: CONFIG[key] = os.getenv(var)
|
if var in os.environ: CONFIG[key] = os.getenv(var)
|
||||||
|
|
||||||
SETTINGS = {
|
SETTINGS = {
|
||||||
"server" : validate_url(CONFIG["server"])
|
"server" : validate_url(CONFIG["server"]),
|
||||||
"proxy": CONFIG["proxy"],
|
"proxy": CONFIG["proxy"],
|
||||||
"nocheckcert": args.no_check_certificate,
|
"nocheckcert": args.no_check_certificate,
|
||||||
"noinsecurewarn": args.no_insecure_warning
|
"noinsecurewarn": args.no_insecure_warning
|
||||||
@ -101,8 +101,7 @@ def main():
|
|||||||
try:
|
try:
|
||||||
args.func(args, api_client)
|
args.func(args, api_client)
|
||||||
except PBinCLIException as pe:
|
except PBinCLIException as pe:
|
||||||
print("PBinCLI error: {}".format(pe))
|
raise PBinCLIException("error: {}".format(pe))
|
||||||
sys.exit(1)
|
|
||||||
else:
|
else:
|
||||||
parser.print_help()
|
parser.print_help()
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
from Crypto.Random import get_random_bytes
|
from Crypto.Random import get_random_bytes
|
||||||
from Crypto.Cipher import AES
|
from Crypto.Cipher import AES
|
||||||
from base64 import b64encode, b64decode
|
from base64 import b64encode, b64decode
|
||||||
from pbincli.utils import PBinCLIException
|
from pbincli.utils import PBinCLIError
|
||||||
import zlib
|
import zlib
|
||||||
|
|
||||||
CIPHER_ITERATION_COUNT = 100000
|
CIPHER_ITERATION_COUNT = 100000
|
||||||
@ -143,7 +143,7 @@ class Paste:
|
|||||||
elif self._version == 1:
|
elif self._version == 1:
|
||||||
return zlib.decompress(bytearray(map(lambda c:ord(c)&255, b64decode(s.encode('utf-8')).decode('utf-8'))), -zlib.MAX_WBITS)
|
return zlib.decompress(bytearray(map(lambda c:ord(c)&255, b64decode(s.encode('utf-8')).decode('utf-8'))), -zlib.MAX_WBITS)
|
||||||
else:
|
else:
|
||||||
raise PBinCLIException('Unknown compression type provided in paste!')
|
PBinCLIError('Unknown compression type provided in paste!')
|
||||||
|
|
||||||
|
|
||||||
def __compress(self, s):
|
def __compress(self, s):
|
||||||
@ -160,7 +160,7 @@ class Paste:
|
|||||||
b = co.compress(s) + co.flush()
|
b = co.compress(s) + co.flush()
|
||||||
return b64encode(''.join(map(chr, b)).encode('utf-8'))
|
return b64encode(''.join(map(chr, b)).encode('utf-8'))
|
||||||
else:
|
else:
|
||||||
raise PBinCLIException('Unknown compression type provided!')
|
PBinCLIError('Unknown compression type provided!')
|
||||||
|
|
||||||
|
|
||||||
def decrypt(self):
|
def decrypt(self):
|
||||||
|
@ -1,9 +1,14 @@
|
|||||||
import json, ntpath, os
|
import json, ntpath, os, sys
|
||||||
|
|
||||||
class PBinCLIException(Exception):
|
class PBinCLIException(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def PBinCLIError(message):
|
||||||
|
print("PBinCLI Error: {}".format(message), file=sys.stderr)
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
|
||||||
def path_leaf(path):
|
def path_leaf(path):
|
||||||
head, tail = ntpath.split(path)
|
head, tail = ntpath.split(path)
|
||||||
return tail or ntpath.basename(head)
|
return tail or ntpath.basename(head)
|
||||||
@ -12,13 +17,13 @@ def path_leaf(path):
|
|||||||
def check_readable(f):
|
def check_readable(f):
|
||||||
# Checks if path exists and readable
|
# Checks if path exists and readable
|
||||||
if not os.path.exists(f) or not os.access(f, os.R_OK):
|
if not os.path.exists(f) or not os.access(f, os.R_OK):
|
||||||
raise PBinCLIException("Error accessing path: {}".format(f))
|
PBinCLIError("Error accessing path: {}".format(f))
|
||||||
|
|
||||||
|
|
||||||
def check_writable(f):
|
def check_writable(f):
|
||||||
# Checks if path is writable
|
# Checks if path is writable
|
||||||
if not os.access(os.path.dirname(f) or ".", os.W_OK):
|
if not os.access(os.path.dirname(f) or ".", os.W_OK):
|
||||||
raise PBinCLIException("Path is not writable: {}".format(f))
|
PBinCLIError("Path is not writable: {}".format(f))
|
||||||
|
|
||||||
|
|
||||||
def json_encode(s):
|
def json_encode(s):
|
||||||
|
Loading…
Reference in New Issue
Block a user