From 432675f2e6ea935c047b73b33c402bca8e12003a Mon Sep 17 00:00:00 2001 From: r4sas Date: Tue, 17 Sep 2019 10:54:58 +0000 Subject: [PATCH] url shortener support (#19) Currently tested only with clck.ru service. YOURLS test needed. Signed-off-by: r4sas --- pbincli/actions.py | 18 ++++++++--- pbincli/api.py | 77 ++++++++++++++++++++++++---------------------- pbincli/cli.py | 57 +++++++++++++++++++++------------- 3 files changed, 90 insertions(+), 62 deletions(-) diff --git a/pbincli/actions.py b/pbincli/actions.py index ab67eab..1eaaa4b 100644 --- a/pbincli/actions.py +++ b/pbincli/actions.py @@ -1,7 +1,11 @@ from pbincli.format import Paste from pbincli.utils import PBinCLIError -def send(args, api_client): +def send(args, api_client, settings=None): + from pbincli.api import Shortener + if args.short: + shortener = Shortener(settings) + if not args.notext: if args.text: text = args.text @@ -60,7 +64,7 @@ def send(args, api_client): result['id'], passphrase, result['deletetoken'], - api_client.server, + settings['server'], result['id'], passphrase)) elif result['status']: # return code is other then zero @@ -68,8 +72,14 @@ def send(args, api_client): else: # or here no status field in response or it is empty PBinCLIError("Something went wrong...\nError: Empty response.") + if args.short: + shortener.getlink("{}?{}#{}".format( + settings['server'], + result['id'], + passphrase)) + -def get(args, api_client): +def get(args, api_client, settings=None): from pbincli.utils import check_writable, json_encode try: @@ -140,7 +150,7 @@ def get(args, api_client): PBinCLIError("Something went wrong...\nError: Empty response.") -def delete(args, api_client): +def delete(args, api_client, settings=None): from pbincli.utils import json_encode pasteid = args.paste diff --git a/pbincli/api.py b/pbincli/api.py index a0d579a..c27dc5c 100644 --- a/pbincli/api.py +++ b/pbincli/api.py @@ -11,12 +11,12 @@ class PrivateBin: else: self.proxy = {} - if settings['noinsecurewarn']: + if settings['no_insecure_warning']: from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) self.session = requests.Session() - self.session.verify = settings['nocheckcert'] + self.session.verify = not settings['no_check_certificate'] def post(self, request): result = self.session.post( @@ -75,61 +75,64 @@ class Shortener: """Some parts of this class was taken from python-yourls (https://github.com/tflink/python-yourls/) library """ - def __init__(self, apiurl, username=None, password=None, token=None, settings=None): + def __init__(self, settings=None): + self.api = settings['short_api'] + # we checking which service is used, because some services doesn't require # any authentication, or have only one domain on which it working - if settings['api'] == 'yourls': - if not apiurl: + if self.api == 'yourls': + if not settings['short_url']: PBinCLIError("YOURLS: An API URL is required") - self.apiurl = apiurl + self.apiurl = settings['short_url'] - if not username or not password: - if not self.token: + if not settings['short_user'] or not settings['short_pass']: + if not settings['short_token']: PBinCLIError("YOURLS: username and password or token are required") else: - self.auth_args = {'signature': token} + self.auth_args = {'signature': settings['short_token']} else: - self.auth_args = {'username': self.username, 'password': self.password} + self.auth_args = {'username': settings['short_user'], 'password': settings['short_pass']} if settings['proxy']: self.proxy = {settings['proxy'].split('://')[0]: settings['proxy']} else: self.proxy = {} - if settings['noinsecurewarn']: + if settings['no_insecure_warning']: from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) self.session = requests.Session() - self.session.verify = settings['nocheckcert'] + self.session.verify = not settings['no_check_certificate'] - def yourls(self, url): - data_format = 'json' - args = {'action': 'shorturl', 'format': data_format, 'url': url} - reqest = dict(args.items() + self.auth_args.items()) + def getlink(self, url): + if self.api == 'yourls': + data_format = 'json' + args = {'action': 'shorturl', 'format': data_format, 'url': url} + reqest = dict(args.items() + self.auth_args.items()) - result = self.session.post( - url = self.apiurl, - proxies = self.proxy, - data = request) + result = self.session.post( + url = self.apiurl, + proxies = self.proxy, + data = request) - try: - result.json() - if result['status'] == 'fail' and result['code'] == 'error:keyword': - PBinCLIError("YOURLS: Received error from API: {}".format(result['message'])) - if not 'shorturl' in result: - PBinCLIError("YOURLS: Unknown error: {}".format(result['message'])) + try: + result.json() + if result['status'] == 'fail' and result['code'] == 'error:keyword': + PBinCLIError("YOURLS: Received error from API: {}".format(result['message'])) + if not 'shorturl' in result: + PBinCLIError("YOURLS: Unknown error: {}".format(result['message'])) - print("Paste short URL: {}".format(result['shorturl'])) - except ValueError: - PBinCLIError("YOURLS: Unable parse response. Received (size = {}):\n{}".format(len(result.text), result.text)) + print("Short Link:\t{}".format(result['shorturl'])) + except ValueError: + PBinCLIError("YOURLS: Unable parse response. Received (size = {}):\n{}".format(len(result.text), result.text)) - def clck(self, url): - # from urllib.parse import quote_plus - request = {'url': url} + elif self.api == 'clckru': + # from urllib.parse import quote_plus + request = {'url': url} - result = self.session.post( - url = "https://clck.ru/--", - proxies = self.proxy, - data = request) - print("Paste short URL: {}".format(result)) + result = self.session.post( + url = "https://clck.ru/--", + proxies = self.proxy, + data = request) + print("Short Link:\t{}".format(result.text)) diff --git a/pbincli/cli.py b/pbincli/cli.py index a126d36..4f72194 100755 --- a/pbincli/cli.py +++ b/pbincli/cli.py @@ -37,15 +37,16 @@ def main(): send_parser.add_argument("-c", "--compression", default="zlib", action="store", choices=["zlib", "none"], help="set compression for paste (default: zlib). Note: works only on v2 paste format") # URL shortener - send_parser.add_argument("-s", "--shorten", default=False, action="store_true", help="use URL shortener") - send_parser.add_argument("--shorten-api", default="yourls", action="store", - choices=["clckru", "yourls"], help="API used by shortener service (default: YOURLS)") - send_parser.add_argument("--shorten-url", help="URL of shortener service API") - send_parser.add_argument("--shorten-user", help="Shortener username") - send_parser.add_argument("--shorten-pass", help="Shortener password") - send_parser.add_argument("--shorten-token", help="Shortener token") - # Certificates check bypass - send_parser.add_argument("--no-check-certificate", default=True, action="store_false", help="disable certificate validation") + send_parser.add_argument("-S", "--short", default=False, action="store_true", help="use URL shortener") + send_parser.add_argument("--short-api", default=argparse.SUPPRESS, action="store", choices=["clckru", "yourls"], help="API used by shortener service") + send_parser.add_argument("--short-url", default=argparse.SUPPRESS, help="URL of shortener service API") + send_parser.add_argument("--short-user", default=argparse.SUPPRESS, help="Shortener username") + send_parser.add_argument("--short-pass", default=argparse.SUPPRESS, help="Shortener password") + send_parser.add_argument("--short-token", default=argparse.SUPPRESS, help="Shortener token") + # Connection options + send_parser.add_argument("-s", "--server", default=argparse.SUPPRESS, help="PrivateBin service URL (default: https://paste.i2pd.xyz/)") + send_parser.add_argument("-x", "--proxy", default=argparse.SUPPRESS, help="Proxy server address (default: None)") + send_parser.add_argument("--no-check-certificate", default=False, action="store_true", help="disable certificate validation") send_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)") # send_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug") @@ -57,7 +58,7 @@ def main(): get_parser = subparsers.add_parser("get", description="Get data from PrivateBin instance") get_parser.add_argument("pasteinfo", help="example: aabb#cccddd") get_parser.add_argument("-p", "--password", help="password for decrypting paste") - get_parser.add_argument("--no-check-certificate", default=True, action="store_false", help="disable certificate validation") + get_parser.add_argument("--no-check-certificate", default=False, action="store_true", help="disable certificate validation") get_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)") get_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug") get_parser.set_defaults(func=pbincli.actions.get) @@ -66,7 +67,7 @@ def main(): delete_parser = subparsers.add_parser("delete", description="Delete paste from PrivateBin instance using token") delete_parser.add_argument("-p", "--paste", required=True, help="paste id") delete_parser.add_argument("-t", "--token", required=True, help="paste deletion token") - delete_parser.add_argument("--no-check-certificate", default=True, action="store_false", help="disable certificate validation") + delete_parser.add_argument("--no-check-certificate", default=False, action="store_true", help="disable certificate validation") delete_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)") delete_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug") delete_parser.set_defaults(func=pbincli.actions.delete) @@ -75,10 +76,24 @@ def main(): args = parser.parse_args() CONFIG = { - "server": "https://paste.i2pd.xyz/", - "proxy": None + 'server': 'https://paste.i2pd.xyz/', + 'proxy': None, + 'short_api': None, + 'short_url': None, + 'short_user': None, + 'short_pass': None, + 'short_token': None, + 'no_check_certificate': False, + 'no_insecure_warning': False } + """Configuration preference order: + 1. Command line switches + 2. Environment variables + 3. Configuration file + 4. Default values below + """ + for p in CONFIG_PATHS: if os.path.exists(p): CONFIG.update(read_config(p)) @@ -87,19 +102,19 @@ def main(): for key in CONFIG.keys(): var = "PRIVATEBIN_{}".format(key.upper()) if var in os.environ: CONFIG[key] = os.getenv(var) + # values from command line switches are preferred + args_var = vars(args) + if key in args_var: + CONFIG[key] = args_var[key] - SETTINGS = { - "server" : validate_url(CONFIG["server"]), - "proxy": CONFIG["proxy"], - "nocheckcert": args.no_check_certificate, - "noinsecurewarn": args.no_insecure_warning - } + # Re-validate PrivateBin instance URL + CONFIG['server'] = validate_url(CONFIG['server']) - api_client = PrivateBin(SETTINGS) + api_client = PrivateBin(CONFIG) if hasattr(args, "func"): try: - args.func(args, api_client) + args.func(args, api_client, settings=CONFIG) except PBinCLIException as pe: raise PBinCLIException("error: {}".format(pe)) else: