Browse Source

[shortener] separate services related code in functions

Signed-off-by: r4sas <r4sas@i2pmail.org>
dependabot/add-v2-config-file
R4SAS 5 years ago
parent
commit
ebfe0c48a0
Signed by untrusted user: r4sas
GPG Key ID: 66F6C87B98EBCFE2
  1. 237
      pbincli/api.py

237
pbincli/api.py

@ -111,133 +111,150 @@ class Shortener: @@ -111,133 +111,150 @@ class Shortener:
self.session, self.proxy = _config_requests(settings)
def getlink(self, url):
if self.api == 'yourls':
request = {'action': 'shorturl', 'format': 'json', 'url': url}
request.update(self.auth_args)
# that is api -> function mapper for running service-related function when getlink() used
servicesList = {
'yourls': self._yourls,
'clckru': self._clckru,
'tinyurl': self._tinyurl,
'isgd': self._gd,
'vgd': self._gd,
'cuttly': self._cuttly
}
# run function selected by choosen API
servicesList[self.api](url)
def _yourls(self,url):
request = {'action': 'shorturl', 'format': 'json', 'url': url}
request.update(self.auth_args)
result = self.session.post(
url = self.apiurl,
proxies = self.proxy,
data = request)
result = self.session.post(
url = self.apiurl,
proxies = self.proxy,
data = request)
try:
result.raise_for_status()
except HTTPError:
try:
result.raise_for_status()
except HTTPError:
try:
response = result.json()
except ValueError:
PBinCLIError("YOURLS: Unable parse response. Received (size = {}):\n{}".format(len(result.text), result.text))
else:
PBinCLIError("YOURLS: Received error from API: {} with JSON {}".format(result, response))
else:
response = result.json()
except ValueError:
PBinCLIError("YOURLS: Unable parse response. Received (size = {}):\n{}".format(len(result.text), result.text))
else:
PBinCLIError("YOURLS: Received error from API: {} with JSON {}".format(result, response))
else:
response = result.json()
if {'status', 'statusCode', 'message'} <= set(response.keys()):
if response['status'] == 'fail':
PBinCLIError("YOURLS: Received error from API: {}".format(response['message']))
if not 'shorturl' in response:
PBinCLIError("YOURLS: Unknown error: {}".format(response['message']))
else:
print("Short Link:\t{}".format(response['shorturl']))
if {'status', 'statusCode', 'message'} <= set(response.keys()):
if response['status'] == 'fail':
PBinCLIError("YOURLS: Received error from API: {}".format(response['message']))
if not 'shorturl' in response:
PBinCLIError("YOURLS: Unknown error: {}".format(response['message']))
else:
PBinCLIError("YOURLS: No status, statusCode or message fields in response! Received:\n{}".format(response))
print("Short Link:\t{}".format(response['shorturl']))
else:
PBinCLIError("YOURLS: No status, statusCode or message fields in response! Received:\n{}".format(response))
elif self.api == 'clckru':
request = {'url': url}
try:
result = self.session.post(
url = "https://clck.ru/--",
proxies = self.proxy,
data = request)
print("Short Link:\t{}".format(result.text))
except Exception as ex:
PBinCLIError("clck.ru: unexcepted behavior: {}".format(ex))
def _clckru(self, url):
request = {'url': url}
elif self.api == 'tinyurl':
request = {'url': url}
try:
result = self.session.post(
url = "https://clck.ru/--",
proxies = self.proxy,
data = request)
print("Short Link:\t{}".format(result.text))
except Exception as ex:
PBinCLIError("clck.ru: unexcepted behavior: {}".format(ex))
try:
result = self.session.post(
url = "https://tinyurl.com/api-create.php",
proxies = self.proxy,
data = request)
print("Short Link:\t{}".format(result.text))
except Exception as ex:
PBinCLIError("TinyURL: unexcepted behavior: {}".format(ex))
elif self.api == 'isgd' or self.api == 'vgd':
request = {
'format': 'json',
'url': url,
'logstats': 0 # we don't want use any statistics
}
headers = { 'User-Agent': self.useragent}
def _tinyurl(self, url):
request = {'url': url}
try:
result = self.session.post(
url = self.apiurl + "create.php",
headers = headers,
proxies = self.proxy,
data = request)
try:
result = self.session.post(
url = "https://tinyurl.com/api-create.php",
proxies = self.proxy,
data = request)
print("Short Link:\t{}".format(result.text))
except Exception as ex:
PBinCLIError("TinyURL: unexcepted behavior: {}".format(ex))
response = result.json()
if 'shorturl' in response:
print("Short Link:\t{}".format(response['shorturl']))
else:
PBinCLIError("{}: got error {} from API: {}".format(
"is.gd" if self.api == 'isgd' else 'v.gd',
response['errorcode'],
response['errormessage']))
def _gd(self, url):
request = {
'format': 'json',
'url': url,
'logstats': 0 # we don't want use any statistics
}
headers = { 'User-Agent': self.useragent}
except Exception as ex:
PBinCLIError("{}: unexcepted behavior: {}".format(
try:
result = self.session.post(
url = self.apiurl + "create.php",
headers = headers,
proxies = self.proxy,
data = request)
response = result.json()
if 'shorturl' in response:
print("Short Link:\t{}".format(response['shorturl']))
else:
PBinCLIError("{}: got error {} from API: {}".format(
"is.gd" if self.api == 'isgd' else 'v.gd',
ex))
response['errorcode'],
response['errormessage']))
elif self.api == 'cuttly':
request = {
'url': url,
'domain': 0
}
except Exception as ex:
PBinCLIError("{}: unexcepted behavior: {}".format(
"is.gd" if self.api == 'isgd' else 'v.gd',
ex))
try:
result = self.session.post(
url = "https://cutt.ly/scripts/shortenUrl.php",
proxies = self.proxy,
data = request)
print("Short Link:\t{}".format(result.text))
except Exception as ex:
PBinCLIError("cutt.ly: unexcepted behavior: {}".format(ex))
'''
# That code needs testing. API requires username and apiKey or accessToken to work.
elif self.api == 'bitly':
request = {'url': url}
headers = {'X-Requested-With': 'XMLHttpRequest'}
try:
result = self.session.post(
url = "https://bitly.com/",
headers = headers,
proxies = self.proxy,
data = request)
response = result.json()
if response['data'] and response['status_code'] == 200:
print("Short Link:\t{}".format(response['data']['anon_shorten']['link']))
elif response['status_txt']:
errcode = response['status_txt'] or 'DEFAULT'
friendlyError = {
'RATE_LIMIT_EXCEEDED': 'Whoa - you\'ve exceeded your quota. Create a free account to keep shortening.',
'INVALID_ARG_URL': 'Unable to shorten that link. It is not a valid url.',
'INVALID_ARG_LONGURL': 'Unable to shorten that link. It is not a valid url.',
'ALREADY_A_BITLY_LINK': 'That is already a Bitly link',
'UNKNOWN_ERROR': 'Woops. Something went wrong. Please try again.',
'DEFAULT': 'An error occurred'
}
PBinCLIError("bitly: got error from API: {}".format(friendlyError[errcode]))
except Exception as ex:
PBinCLIError("bitly: unexcepted behavior: {}".format(ex))
'''
def _cuttly(self, url):
request = {
'url': url,
'domain': 0
}
try:
result = self.session.post(
url = "https://cutt.ly/scripts/shortenUrl.php",
proxies = self.proxy,
data = request)
print("Short Link:\t{}".format(result.text))
except Exception as ex:
PBinCLIError("cutt.ly: unexcepted behavior: {}".format(ex))
# [WIP] That code needs testing. API requires username and apiKey or accessToken to work.
def _bitly(self, url):
request = {'url': url}
headers = {'X-Requested-With': 'XMLHttpRequest'}
try:
result = self.session.post(
url = "https://bitly.com/",
headers = headers,
proxies = self.proxy,
data = request)
response = result.json()
if response['data'] and response['status_code'] == 200:
print("Short Link:\t{}".format(response['data']['anon_shorten']['link']))
elif response['status_txt']:
errcode = response['status_txt'] or 'DEFAULT'
friendlyError = {
'RATE_LIMIT_EXCEEDED': 'Whoa - you\'ve exceeded your quota. Create a free account to keep shortening.',
'INVALID_ARG_URL': 'Unable to shorten that link. It is not a valid url.',
'INVALID_ARG_LONGURL': 'Unable to shorten that link. It is not a valid url.',
'ALREADY_A_BITLY_LINK': 'That is already a Bitly link',
'UNKNOWN_ERROR': 'Woops. Something went wrong. Please try again.',
'DEFAULT': 'An error occurred'
}
PBinCLIError("bitly: got error from API: {}".format(friendlyError[errcode]))
except Exception as ex:
PBinCLIError("bitly: unexcepted behavior: {}".format(ex))

Loading…
Cancel
Save