1
1
mirror of https://github.com/r4sas/PBinCLI synced 2025-09-01 08:32:16 +00:00

A more robust logic to parse API settings.

All key-value settings in api.py were parsed through the following syntax:

```python
if settings["something"] is None:
  ...
```

This is problematic because it forces downstream implementations to also
provide all the fields, also when they are not required.

In particular, when new fields are added, downstream implementations
also have to provide them, or the integration will break.

The `settings.get("something")` syntax should be preferred, and settings
itself should also be initialized to a dict by default to prevent
dereferencing a `None` (I mean, if no settings are provided at all the
code should probably still break, but with a relevant error instead of a
fuzzier `TypeError`).

Closes: #51
Closes: #52
Closes: https://github.com/Pioverpie/privatebin-api#12
This commit is contained in:
Fabio Manganiello 2025-02-19 00:28:27 +01:00 committed by R4SAS
parent 41bc3b3745
commit 03bdfe6782

View File

@ -2,98 +2,119 @@ import requests
from requests import HTTPError from requests import HTTPError
from pbincli.utils import PBinCLIError from pbincli.utils import PBinCLIError
def _config_requests(settings=None, shortener=False): def _config_requests(settings=None, shortener=False):
if settings['no_insecure_warning']: settings = settings or {}
if settings.get("no_insecure_warning"):
from requests.packages.urllib3.exceptions import InsecureRequestWarning from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
session = requests.Session() session = requests.Session()
session.verify = not settings['no_check_certificate'] session.verify = not settings.get("no_check_certificate")
if settings['auth'] and not shortener: # do not leak PrivateBin authorization to shortener services if (
if settings['auth'] == 'basic' and settings['auth_user'] and settings['auth_pass']: settings.get("auth") and not shortener
session.auth = (settings['auth_user'], settings['auth_pass']) ): # do not leak PrivateBin authorization to shortener services
elif settings['auth'] == 'custom' and settings['auth_custom']: auth = settings["auth"]
if auth == "basic" and all(
[settings.get("auth_user"), settings.get("auth_pass")]
):
session.auth = (settings["auth_user"], settings["auth_pass"])
elif auth == "custom" and settings.get("auth_custom"):
from json import loads as json_loads from json import loads as json_loads
auth = json_loads(settings['auth_custom'])
auth = json_loads(settings["auth_custom"])
session.headers.update(auth) session.headers.update(auth)
else: else:
PBinCLIError("Incorrect authorization configuration") PBinCLIError("Incorrect authorization configuration")
if settings['proxy']: if settings["proxy"]:
scheme = settings['proxy'].split('://')[0] scheme = settings["proxy"].split("://")[0]
if (scheme.startswith("socks")): if scheme.startswith("socks"):
session.proxies.update({ session.proxies.update(
"http": settings['proxy'], {"http": settings["proxy"], "https": settings["proxy"]}
"https": settings['proxy'] )
})
else: else:
session.proxies.update({scheme: settings['proxy']}) session.proxies.update({scheme: settings["proxy"]})
return session return session
class PrivateBin: class PrivateBin:
def __init__(self, settings=None): def __init__(self, settings=None):
self.server = settings['server'] settings = settings or {}
self.headers = {'X-Requested-With': 'JSONHttpRequest'} if not settings.get("server"):
PBinCLIError("No server specified - unable to continue")
self.server = settings["server"]
self.headers = {"X-Requested-With": "JSONHttpRequest"}
self.session = _config_requests(settings, False) self.session = _config_requests(settings, False)
def post(self, request): def post(self, request):
result = self.session.post( result = self.session.post(url=self.server, headers=self.headers, data=request)
url = self.server,
headers = self.headers,
data = request)
try: try:
return result.json() return result.json()
except ValueError: except ValueError:
PBinCLIError("Unable parse response as json. Received (size = {}):\n{}".format(len(result.text), result.text)) PBinCLIError(
"Unable parse response as json. Received (size = {}):\n{}".format(
len(result.text), result.text
)
)
def get(self, request): def get(self, request):
return self.session.get( return self.session.get(
url = self.server + "?" + request, url=self.server + "?" + request, headers=self.headers
headers = self.headers).json() ).json()
def delete(self, request): def delete(self, request):
# using try as workaround for versions < 1.3 due to we cant detect # using try as workaround for versions < 1.3 due to we cant detect
# if server used version 1.2, where auto-deletion is added # if server used version 1.2, where auto-deletion is added
try: try:
result = self.session.post( result = self.session.post(
url = self.server, url=self.server, headers=self.headers, data=request
headers = self.headers, ).json()
data = request).json()
except ValueError: except ValueError:
# unable parse response as json because it can be empty (1.2), so simulate correct answer # unable parse response as json because it can be empty (1.2), so simulate correct answer
print("NOTICE: Received empty response. We interpret that as our paste has already been deleted.") print(
"NOTICE: Received empty response. We interpret that as our paste has already been deleted."
)
from json import loads as json_loads from json import loads as json_loads
result = json_loads('{"status":0}') result = json_loads('{"status":0}')
if not result['status']: if not result["status"]:
print("Paste successfully deleted!") print("Paste successfully deleted!")
elif result['status']: elif result["status"]:
PBinCLIError("Something went wrong...\nError:\t\t{}".format(result['message'])) PBinCLIError(
"Something went wrong...\nError:\t\t{}".format(result["message"])
)
else: else:
PBinCLIError("Something went wrong...\nError: Empty response.") PBinCLIError("Something went wrong...\nError: Empty response.")
def getVersion(self): def getVersion(self):
result = self.session.get( result = self.session.get(
url = self.server + '?jsonld=paste', url=self.server + "?jsonld=paste", headers=self.headers
headers = self.headers) )
try: try:
jsonldSchema = result.json() jsonldSchema = result.json()
return jsonldSchema['@context']['v']['@value'] \ return (
if ('@context' in jsonldSchema and jsonldSchema["@context"]["v"]["@value"]
'v' in jsonldSchema['@context'] and if (
'@value' in jsonldSchema['@context']['v']) \ "@context" in jsonldSchema
and "v" in jsonldSchema["@context"]
and "@value" in jsonldSchema["@context"]["v"]
)
else 1 else 1
)
except ValueError: except ValueError:
PBinCLIError("Unable parse response as json. Received (size = {}):\n{}".format(len(result.text), result.text)) PBinCLIError(
"Unable parse response as json. Received (size = {}):\n{}".format(
len(result.text), result.text
)
)
def getServer(self): def getServer(self):
return self.server return self.server
@ -103,80 +124,98 @@ class Shortener:
"""Some parts of this class was taken from """Some parts of this class was taken from
python-yourls (https://github.com/tflink/python-yourls/) library python-yourls (https://github.com/tflink/python-yourls/) library
""" """
def __init__(self, settings=None):
self.api = settings['short_api']
def __init__(self, settings=None):
settings = settings or {}
self.api = settings.get("short_api")
if self.api is None: if self.api is None:
PBinCLIError("Unable to activate link shortener without short_api.") PBinCLIError("Unable to activate link shortener without short_api.")
# we checking which service is used, because some services doesn't require # we checking which service is used, because some services doesn't require
# any authentication, or have only one domain on which it working # any authentication, or have only one domain on which it working
if self.api == 'yourls': if self.api == "yourls":
self._yourls_init(settings) self._yourls_init(settings)
elif self.api == 'isgd' or self.api == 'vgd': elif self.api == "isgd" or self.api == "vgd":
self._gd_init() self._gd_init()
elif self.api == 'custom': elif self.api == "custom":
self.apiurl = settings['short_url'] self.apiurl = settings.get("short_url")
if not self.apiurl:
PBinCLIError("short_url is required for custom shortener")
self.session = _config_requests(settings, True) self.session = _config_requests(settings, True)
def _yourls_init(self, settings): def _yourls_init(self, settings):
if not settings['short_url']: apiurl = settings["short_url"]
if not apiurl:
PBinCLIError("YOURLS: An API URL is required") PBinCLIError("YOURLS: An API URL is required")
# setting API URL # setting API URL
apiurl = settings['short_url'] if apiurl.endswith("/yourls-api.php"):
if apiurl.endswith('/yourls-api.php'):
self.apiurl = apiurl self.apiurl = apiurl
elif apiurl.endswith('/'): elif apiurl.endswith("/"):
self.apiurl = apiurl + 'yourls-api.php' self.apiurl = apiurl + "yourls-api.php"
else: else:
PBinCLIError("YOURLS: Incorrect URL is provided.\n" + PBinCLIError(
"It must contain full address to 'yourls-api.php' script (like https://example.com/yourls-api.php)\n" + "YOURLS: Incorrect URL is provided.\n"
"or just contain instance URL with '/' at the end (like https://example.com/)") + "It must contain full address to 'yourls-api.php' script (like https://example.com/yourls-api.php)\n"
+ "or just contain instance URL with '/' at the end (like https://example.com/)"
)
# validating for required credentials # validating for required credentials
if settings['short_user'] and settings['short_pass'] and settings['short_token'] is None: if (
self.auth_args = {'username': settings['short_user'], 'password': settings['short_pass']} settings.get("short_user")
elif settings['short_user'] is None and settings['short_pass'] is None and settings['short_token']: and settings.get("short_pass")
self.auth_args = {'signature': settings['short_token']} and settings.get("short_token") is None
elif settings['short_user'] is None and settings['short_pass'] is None and settings['short_token'] is None: ):
self.auth_args = {
"username": settings["short_user"],
"password": settings["short_pass"],
}
elif (
settings.get("short_user") is None
and settings.get("short_pass") is None
and settings.get("short_token")
):
self.auth_args = {"signature": settings["short_token"]}
elif (
settings.get("short_user") is None
and settings.get("short_pass") is None
and settings.get("short_token") is None
):
self.auth_args = {} self.auth_args = {}
else: else:
PBinCLIError("YOURLS: either username and password or token are required. Otherwise set to default (None)") PBinCLIError(
"YOURLS: either username and password or token are required. Otherwise set to default (None)"
)
def _gd_init(self): def _gd_init(self):
if self.api == 'isgd': if self.api == "isgd":
self.apiurl = 'https://is.gd/' self.apiurl = "https://is.gd/"
else: else:
self.apiurl = 'https://v.gd/' self.apiurl = "https://v.gd/"
self.useragent = 'Mozilla/5.0 (compatible; pbincli - https://github.com/r4sas/pbincli/)' self.useragent = (
"Mozilla/5.0 (compatible; pbincli - https://github.com/r4sas/pbincli/)"
)
def getlink(self, url): def getlink(self, url):
# that is api -> function mapper for running service-related function when getlink() used # that is api -> function mapper for running service-related function when getlink() used
servicesList = { servicesList = {
'yourls': self._yourls, "yourls": self._yourls,
'clckru': self._clckru, "clckru": self._clckru,
'tinyurl': self._tinyurl, "tinyurl": self._tinyurl,
'isgd': self._gd, "isgd": self._gd,
'vgd': self._gd, "vgd": self._gd,
'cuttly': self._cuttly, "cuttly": self._cuttly,
'custom': self._custom "custom": self._custom,
} }
# run function selected by choosen API # run function selected by choosen API
return servicesList[self.api](url) return servicesList[self.api](url)
def _yourls(self, url):
def _yourls(self,url): request = {"action": "shorturl", "format": "json", "url": url}
request = {'action': 'shorturl', 'format': 'json', 'url': url}
request.update(self.auth_args) request.update(self.auth_args)
result = self.session.post( result = self.session.post(url=self.apiurl, data=request)
url = self.apiurl,
data = request)
try: try:
result.raise_for_status() result.raise_for_status()
@ -184,103 +223,115 @@ class Shortener:
try: try:
response = result.json() response = result.json()
except ValueError: except ValueError:
PBinCLIError("YOURLS: Unable parse response. Received (size = {}):\n{}".format(len(result.text), result.text)) PBinCLIError(
"YOURLS: Unable parse response. Received (size = {}):\n{}".format(
len(result.text), result.text
)
)
else: else:
PBinCLIError("YOURLS: Received error from API: {} with JSON {}".format(result, response)) PBinCLIError(
"YOURLS: Received error from API: {} with JSON {}".format(
result, response
)
)
else: else:
response = result.json() response = result.json()
if {'status', 'statusCode', 'message'} <= set(response.keys()): if {"status", "statusCode", "message"} <= set(response.keys()):
if response['status'] == 'fail': if response["status"] == "fail":
PBinCLIError("YOURLS: Received error from API: {}".format(response['message'])) PBinCLIError(
if not 'shorturl' in response: "YOURLS: Received error from API: {}".format(
PBinCLIError("YOURLS: Unknown error: {}".format(response['message'])) response["message"]
)
)
if not "shorturl" in response:
PBinCLIError(
"YOURLS: Unknown error: {}".format(response["message"])
)
else: else:
return response['shorturl'] return response["shorturl"]
else: else:
PBinCLIError("YOURLS: No status, statusCode or message fields in response! Received:\n{}".format(response)) PBinCLIError(
"YOURLS: No status, statusCode or message fields in response! Received:\n{}".format(
response
)
)
def _clckru(self, url): def _clckru(self, url):
request = {'url': url} request = {"url": url}
try: try:
result = self.session.post( result = self.session.post(url="https://clck.ru/--", data=request)
url = "https://clck.ru/--",
data = request)
return result.text return result.text
except Exception as ex: except Exception as ex:
PBinCLIError("clck.ru: unexcepted behavior: {}".format(ex)) PBinCLIError("clck.ru: unexcepted behavior: {}".format(ex))
def _tinyurl(self, url): def _tinyurl(self, url):
request = {'url': url} request = {"url": url}
try: try:
result = self.session.post( result = self.session.post(
url = "https://tinyurl.com/api-create.php", url="https://tinyurl.com/api-create.php", data=request
data = request) )
return result.text return result.text
except Exception as ex: except Exception as ex:
PBinCLIError("TinyURL: unexcepted behavior: {}".format(ex)) PBinCLIError("TinyURL: unexcepted behavior: {}".format(ex))
def _gd(self, url): def _gd(self, url):
request = { request = {
'format': 'json', "format": "json",
'url': url, "url": url,
'logstats': 0 # we don't want use any statistics "logstats": 0, # we don't want use any statistics
} }
headers = { 'User-Agent': self.useragent} headers = {"User-Agent": self.useragent}
try: try:
result = self.session.post( result = self.session.post(
url = self.apiurl + "create.php", url=self.apiurl + "create.php", headers=headers, data=request
headers = headers, )
data = request)
response = result.json() response = result.json()
if 'shorturl' in response: if "shorturl" in response:
return response['shorturl'] return response["shorturl"]
else: else:
PBinCLIError("{}: got error {} from API: {}".format( PBinCLIError(
"is.gd" if self.api == 'isgd' else 'v.gd', "{}: got error {} from API: {}".format(
response['errorcode'], "is.gd" if self.api == "isgd" else "v.gd",
response['errormessage'])) response["errorcode"],
response["errormessage"],
)
)
except Exception as ex: except Exception as ex:
PBinCLIError("{}: unexcepted behavior: {}".format( PBinCLIError(
"is.gd" if self.api == 'isgd' else 'v.gd', "{}: unexcepted behavior: {}".format(
ex)) "is.gd" if self.api == "isgd" else "v.gd", ex
)
)
def _cuttly(self, url): def _cuttly(self, url):
request = { request = {"url": url, "domain": 0}
'url': url,
'domain': 0
}
try: try:
result = self.session.post( result = self.session.post(
url = "https://cutt.ly/scripts/shortenUrl.php", url="https://cutt.ly/scripts/shortenUrl.php", data=request
data = request) )
return result.text return result.text
except Exception as ex: except Exception as ex:
PBinCLIError("cutt.ly: unexcepted behavior: {}".format(ex)) PBinCLIError("cutt.ly: unexcepted behavior: {}".format(ex))
def _custom(self, url): def _custom(self, url):
if self.apiurl is None: if self.apiurl is None:
PBinCLIError("No short_url specified - link will not be shortened.") PBinCLIError("No short_url specified - link will not be shortened.")
from urllib.parse import quote from urllib.parse import quote
qUrl = quote(url, safe="") # urlencoded paste url
qUrl = quote(url, safe="") # urlencoded paste url
rUrl = self.apiurl.replace("{{url}}", qUrl) rUrl = self.apiurl.replace("{{url}}", qUrl)
try: try:
result = self.session.get( result = self.session.get(url=rUrl)
url = rUrl)
return result.text return result.text
except Exception as ex: except Exception as ex:
PBinCLIError("Shorter: unexcepted behavior: {}".format(ex)) PBinCLIError("Shorter: unexcepted behavior: {}".format(ex))