Browse Source

url shortener support (#19)

Currently tested only with clck.ru service. YOURLS test needed.

Signed-off-by: r4sas <r4sas@i2pmail.org>
dependabot/add-v2-config-file
R4SAS 5 years ago
parent
commit
432675f2e6
Signed by untrusted user: r4sas
GPG Key ID: 66F6C87B98EBCFE2
  1. 18
      pbincli/actions.py
  2. 77
      pbincli/api.py
  3. 57
      pbincli/cli.py

18
pbincli/actions.py

@ -1,7 +1,11 @@
from pbincli.format import Paste from pbincli.format import Paste
from pbincli.utils import PBinCLIError from pbincli.utils import PBinCLIError
def send(args, api_client): def send(args, api_client, settings=None):
from pbincli.api import Shortener
if args.short:
shortener = Shortener(settings)
if not args.notext: if not args.notext:
if args.text: if args.text:
text = args.text text = args.text
@ -60,7 +64,7 @@ def send(args, api_client):
result['id'], result['id'],
passphrase, passphrase,
result['deletetoken'], result['deletetoken'],
api_client.server, settings['server'],
result['id'], result['id'],
passphrase)) passphrase))
elif result['status']: # return code is other then zero elif result['status']: # return code is other then zero
@ -68,8 +72,14 @@ def send(args, api_client):
else: # or here no status field in response or it is empty else: # or here no status field in response or it is empty
PBinCLIError("Something went wrong...\nError: Empty response.") PBinCLIError("Something went wrong...\nError: Empty response.")
if args.short:
shortener.getlink("{}?{}#{}".format(
settings['server'],
result['id'],
passphrase))
def get(args, api_client): def get(args, api_client, settings=None):
from pbincli.utils import check_writable, json_encode from pbincli.utils import check_writable, json_encode
try: try:
@ -140,7 +150,7 @@ def get(args, api_client):
PBinCLIError("Something went wrong...\nError: Empty response.") PBinCLIError("Something went wrong...\nError: Empty response.")
def delete(args, api_client): def delete(args, api_client, settings=None):
from pbincli.utils import json_encode from pbincli.utils import json_encode
pasteid = args.paste pasteid = args.paste

77
pbincli/api.py

@ -11,12 +11,12 @@ class PrivateBin:
else: else:
self.proxy = {} self.proxy = {}
if settings['noinsecurewarn']: if settings['no_insecure_warning']:
from requests.packages.urllib3.exceptions import InsecureRequestWarning from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
self.session = requests.Session() self.session = requests.Session()
self.session.verify = settings['nocheckcert'] self.session.verify = not settings['no_check_certificate']
def post(self, request): def post(self, request):
result = self.session.post( result = self.session.post(
@ -75,61 +75,64 @@ class Shortener:
"""Some parts of this class was taken from """Some parts of this class was taken from
python-yourls (https://github.com/tflink/python-yourls/) library python-yourls (https://github.com/tflink/python-yourls/) library
""" """
def __init__(self, apiurl, username=None, password=None, token=None, settings=None): def __init__(self, settings=None):
self.api = settings['short_api']
# we checking which service is used, because some services doesn't require # we checking which service is used, because some services doesn't require
# any authentication, or have only one domain on which it working # any authentication, or have only one domain on which it working
if settings['api'] == 'yourls': if self.api == 'yourls':
if not apiurl: if not settings['short_url']:
PBinCLIError("YOURLS: An API URL is required") PBinCLIError("YOURLS: An API URL is required")
self.apiurl = apiurl self.apiurl = settings['short_url']
if not username or not password: if not settings['short_user'] or not settings['short_pass']:
if not self.token: if not settings['short_token']:
PBinCLIError("YOURLS: username and password or token are required") PBinCLIError("YOURLS: username and password or token are required")
else: else:
self.auth_args = {'signature': token} self.auth_args = {'signature': settings['short_token']}
else: else:
self.auth_args = {'username': self.username, 'password': self.password} self.auth_args = {'username': settings['short_user'], 'password': settings['short_pass']}
if settings['proxy']: if settings['proxy']:
self.proxy = {settings['proxy'].split('://')[0]: settings['proxy']} self.proxy = {settings['proxy'].split('://')[0]: settings['proxy']}
else: else:
self.proxy = {} self.proxy = {}
if settings['noinsecurewarn']: if settings['no_insecure_warning']:
from requests.packages.urllib3.exceptions import InsecureRequestWarning from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
self.session = requests.Session() self.session = requests.Session()
self.session.verify = settings['nocheckcert'] self.session.verify = not settings['no_check_certificate']
def yourls(self, url): def getlink(self, url):
data_format = 'json' if self.api == 'yourls':
args = {'action': 'shorturl', 'format': data_format, 'url': url} data_format = 'json'
reqest = dict(args.items() + self.auth_args.items()) args = {'action': 'shorturl', 'format': data_format, 'url': url}
reqest = dict(args.items() + self.auth_args.items())
result = self.session.post( result = self.session.post(
url = self.apiurl, url = self.apiurl,
proxies = self.proxy, proxies = self.proxy,
data = request) data = request)
try: try:
result.json() result.json()
if result['status'] == 'fail' and result['code'] == 'error:keyword': if result['status'] == 'fail' and result['code'] == 'error:keyword':
PBinCLIError("YOURLS: Received error from API: {}".format(result['message'])) PBinCLIError("YOURLS: Received error from API: {}".format(result['message']))
if not 'shorturl' in result: if not 'shorturl' in result:
PBinCLIError("YOURLS: Unknown error: {}".format(result['message'])) PBinCLIError("YOURLS: Unknown error: {}".format(result['message']))
print("Paste short URL: {}".format(result['shorturl'])) print("Short Link:\t{}".format(result['shorturl']))
except ValueError: except ValueError:
PBinCLIError("YOURLS: Unable parse response. Received (size = {}):\n{}".format(len(result.text), result.text)) PBinCLIError("YOURLS: Unable parse response. Received (size = {}):\n{}".format(len(result.text), result.text))
def clck(self, url): elif self.api == 'clckru':
# from urllib.parse import quote_plus # from urllib.parse import quote_plus
request = {'url': url} request = {'url': url}
result = self.session.post( result = self.session.post(
url = "https://clck.ru/--", url = "https://clck.ru/--",
proxies = self.proxy, proxies = self.proxy,
data = request) data = request)
print("Paste short URL: {}".format(result)) print("Short Link:\t{}".format(result.text))

57
pbincli/cli.py

@ -37,15 +37,16 @@ def main():
send_parser.add_argument("-c", "--compression", default="zlib", action="store", send_parser.add_argument("-c", "--compression", default="zlib", action="store",
choices=["zlib", "none"], help="set compression for paste (default: zlib). Note: works only on v2 paste format") choices=["zlib", "none"], help="set compression for paste (default: zlib). Note: works only on v2 paste format")
# URL shortener # URL shortener
send_parser.add_argument("-s", "--shorten", default=False, action="store_true", help="use URL shortener") send_parser.add_argument("-S", "--short", default=False, action="store_true", help="use URL shortener")
send_parser.add_argument("--shorten-api", default="yourls", action="store", send_parser.add_argument("--short-api", default=argparse.SUPPRESS, action="store", choices=["clckru", "yourls"], help="API used by shortener service")
choices=["clckru", "yourls"], help="API used by shortener service (default: YOURLS)") send_parser.add_argument("--short-url", default=argparse.SUPPRESS, help="URL of shortener service API")
send_parser.add_argument("--shorten-url", help="URL of shortener service API") send_parser.add_argument("--short-user", default=argparse.SUPPRESS, help="Shortener username")
send_parser.add_argument("--shorten-user", help="Shortener username") send_parser.add_argument("--short-pass", default=argparse.SUPPRESS, help="Shortener password")
send_parser.add_argument("--shorten-pass", help="Shortener password") send_parser.add_argument("--short-token", default=argparse.SUPPRESS, help="Shortener token")
send_parser.add_argument("--shorten-token", help="Shortener token") # Connection options
# Certificates check bypass send_parser.add_argument("-s", "--server", default=argparse.SUPPRESS, help="PrivateBin service URL (default: https://paste.i2pd.xyz/)")
send_parser.add_argument("--no-check-certificate", default=True, action="store_false", help="disable certificate validation") send_parser.add_argument("-x", "--proxy", default=argparse.SUPPRESS, help="Proxy server address (default: None)")
send_parser.add_argument("--no-check-certificate", default=False, action="store_true", help="disable certificate validation")
send_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)") send_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)")
# #
send_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug") send_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug")
@ -57,7 +58,7 @@ def main():
get_parser = subparsers.add_parser("get", description="Get data from PrivateBin instance") get_parser = subparsers.add_parser("get", description="Get data from PrivateBin instance")
get_parser.add_argument("pasteinfo", help="example: aabb#cccddd") get_parser.add_argument("pasteinfo", help="example: aabb#cccddd")
get_parser.add_argument("-p", "--password", help="password for decrypting paste") get_parser.add_argument("-p", "--password", help="password for decrypting paste")
get_parser.add_argument("--no-check-certificate", default=True, action="store_false", help="disable certificate validation") get_parser.add_argument("--no-check-certificate", default=False, action="store_true", help="disable certificate validation")
get_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)") get_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)")
get_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug") get_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug")
get_parser.set_defaults(func=pbincli.actions.get) get_parser.set_defaults(func=pbincli.actions.get)
@ -66,7 +67,7 @@ def main():
delete_parser = subparsers.add_parser("delete", description="Delete paste from PrivateBin instance using token") delete_parser = subparsers.add_parser("delete", description="Delete paste from PrivateBin instance using token")
delete_parser.add_argument("-p", "--paste", required=True, help="paste id") delete_parser.add_argument("-p", "--paste", required=True, help="paste id")
delete_parser.add_argument("-t", "--token", required=True, help="paste deletion token") delete_parser.add_argument("-t", "--token", required=True, help="paste deletion token")
delete_parser.add_argument("--no-check-certificate", default=True, action="store_false", help="disable certificate validation") delete_parser.add_argument("--no-check-certificate", default=False, action="store_true", help="disable certificate validation")
delete_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)") delete_parser.add_argument("--no-insecure-warning", default=False, action="store_true", help="suppress InsecureRequestWarning (only with --no-check-certificate)")
delete_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug") delete_parser.add_argument("-d", "--debug", default=False, action="store_true", help="enable debug")
delete_parser.set_defaults(func=pbincli.actions.delete) delete_parser.set_defaults(func=pbincli.actions.delete)
@ -75,10 +76,24 @@ def main():
args = parser.parse_args() args = parser.parse_args()
CONFIG = { CONFIG = {
"server": "https://paste.i2pd.xyz/", 'server': 'https://paste.i2pd.xyz/',
"proxy": None 'proxy': None,
'short_api': None,
'short_url': None,
'short_user': None,
'short_pass': None,
'short_token': None,
'no_check_certificate': False,
'no_insecure_warning': False
} }
"""Configuration preference order:
1. Command line switches
2. Environment variables
3. Configuration file
4. Default values below
"""
for p in CONFIG_PATHS: for p in CONFIG_PATHS:
if os.path.exists(p): if os.path.exists(p):
CONFIG.update(read_config(p)) CONFIG.update(read_config(p))
@ -87,19 +102,19 @@ def main():
for key in CONFIG.keys(): for key in CONFIG.keys():
var = "PRIVATEBIN_{}".format(key.upper()) var = "PRIVATEBIN_{}".format(key.upper())
if var in os.environ: CONFIG[key] = os.getenv(var) if var in os.environ: CONFIG[key] = os.getenv(var)
# values from command line switches are preferred
args_var = vars(args)
if key in args_var:
CONFIG[key] = args_var[key]
SETTINGS = { # Re-validate PrivateBin instance URL
"server" : validate_url(CONFIG["server"]), CONFIG['server'] = validate_url(CONFIG['server'])
"proxy": CONFIG["proxy"],
"nocheckcert": args.no_check_certificate,
"noinsecurewarn": args.no_insecure_warning
}
api_client = PrivateBin(SETTINGS) api_client = PrivateBin(CONFIG)
if hasattr(args, "func"): if hasattr(args, "func"):
try: try:
args.func(args, api_client) args.func(args, api_client, settings=CONFIG)
except PBinCLIException as pe: except PBinCLIException as pe:
raise PBinCLIException("error: {}".format(pe)) raise PBinCLIException("error: {}".format(pe))
else: else:

Loading…
Cancel
Save