Browse Source

[wip] v2 support code (#13)

dependabot/add-v2-config-file
R4SAS 5 years ago
parent
commit
f7fae450a0
  1. 200
      pbincli/actions.py
  2. 50
      pbincli/api.py
  3. 226
      pbincli/format.py
  4. 30
      pbincli/utils.py
  5. 1
      requirements.txt

200
pbincli/actions.py

@ -1,14 +1,7 @@ @@ -1,14 +1,7 @@
import json, hashlib, os, sys
import pbincli.actions
from sjcl import SJCL
from base64 import b64encode, b64decode
from pbincli.utils import PBinCLIException
from sys import exit
from pbincli.format import Paste
def send(args, api_client):
from pbincli.utils import check_readable, compress, path_leaf
from mimetypes import guess_type
if not args.notext:
if args.text:
text = args.text
@ -16,202 +9,159 @@ def send(args, api_client): @@ -16,202 +9,159 @@ def send(args, api_client):
text = args.stdin.read()
elif not args.file:
print("Nothing to send!")
sys.exit(1)
exit(1)
else:
text = ""
# Formatting request
request = {'expire':args.expire,'formatter':args.format,'burnafterreading':int(args.burn),'opendiscussion':int(args.discus)}
passphrase = b64encode(os.urandom(32))
if args.debug: print("Passphrase:\t{}".format(passphrase))
paste = Paste(args.debug)
paste.setVersion(api_client.getVersion())
paste.setText(text)
# If we set PASSWORD variable
if args.password:
digest = hashlib.sha256(args.password.encode("UTF-8")).hexdigest()
password = passphrase + digest.encode("UTF-8")
else:
password = passphrase
if args.debug: print("Password:\t{}".format(password))
# Encrypting text
cipher = SJCL().encrypt(compress(text.encode('utf-8')), password, mode='gcm')
# TODO: should be implemented in upstream
for k in ['salt', 'iv', 'ct']: cipher[k] = cipher[k].decode()
request['data'] = json.dumps(cipher, ensure_ascii=False).replace(' ','')
paste.setPassword(args.password)
# If we set FILE variable
if args.file:
check_readable(args.file)
with open(args.file, "rb") as f:
contents = f.read()
f.close()
mime = guess_type(args.file, strict=False)[0]
# MIME fallback
if not mime: mime = "application/octet-stream"
paste.setAttachment(args.file)
if args.debug: print("Filename:\t{}\nMIME-type:\t{}".format(path_leaf(args.file), mime))
paste.encrypt(
formatter = args.format,
burnafterreading = args.burn,
discussion = args.discus,
expiration = args.expire)
file = "data:" + mime[0] + ";base64," + b64encode(contents).decode()
filename = path_leaf(args.file)
cipherfile = SJCL().encrypt(compress(file.encode('utf-8')), password, mode='gcm')
# TODO: should be implemented in upstream
for k in ['salt', 'iv', 'ct']: cipherfile[k] = cipherfile[k].decode()
cipherfilename = SJCL().encrypt(compress(filename.encode('utf-8')), password, mode='gcm')
for k in ['salt', 'iv', 'ct']: cipherfilename[k] = cipherfilename[k].decode()
request['attachment'] = json.dumps(cipherfile, ensure_ascii=False).replace(' ','')
request['attachmentname'] = json.dumps(cipherfilename, ensure_ascii=False).replace(' ','')
request = paste.getJSON()
if args.debug: print("Request:\t{}".format(request))
# If we use dry option, exit now
if args.dry: sys.exit(0)
if args.dry: exit(0)
result = api_client.post(request)
if args.debug: print("Response:\t{}\n".format(result))
try:
result = json.loads(result)
except ValueError as e:
print("PBinCLI Error: {}".format(e))
sys.exit(1)
if 'status' in result and not result['status']:
print("Paste uploaded!\nPasteID:\t{}\nPassword:\t{}\nDelete token:\t{}\n\nLink:\t\t{}?{}#{}".format(result['id'], passphrase.decode(), result['deletetoken'], api_client.server, result['id'], passphrase.decode()))
passphrase = paste.getHash()
print("Paste uploaded!\nPasteID:\t{}\nPassword:\t{}\nDelete token:\t{}\n\nLink:\t\t{}?{}#{}".format(
result['id'],
passphrase,
result['deletetoken'],
api_client.server,
result['id'],
passphrase))
elif 'status' in result and result['status']:
print("Something went wrong...\nError:\t\t{}".format(result['message']))
sys.exit(1)
exit(1)
else:
print("Something went wrong...\nError: Empty response.")
sys.exit(1)
exit(1)
def get(args, api_client):
from pbincli.utils import check_writable, decompress
from pbincli.utils import check_writable
pasteid, passphrase = args.pasteinfo.split("#")
try:
pasteid, passphrase = args.pasteinfo.split("#")
except ValueError as err:
print("PBinCLI error: provided info hasn't contain valid PasteID#Passphrase string")
exit(1)
if not (pasteid and passphrase):
print("PBinCLI error: Incorrect request")
exit(1)
if pasteid and passphrase:
if args.debug: print("PasteID:\t{}\nPassphrase:\t{}".format(pasteid, passphrase))
if args.debug: print("PasteID:\t{}\nPassphrase:\t{}".format(pasteid, passphrase))
if args.password:
digest = hashlib.sha256(args.password.encode("UTF-8")).hexdigest()
password = passphrase + digest.encode("UTF-8")
else:
password = passphrase
paste = Paste()
if args.password:
paste.setPassword(args.password)
if args.debug: print("Password:\t{}".format(password))
result = api_client.get(pasteid)
else:
print("PBinCLI error: Incorrect request")
sys.exit(1)
result = api_client.get(pasteid)
if args.debug: print("Response:\t{}\n".format(result))
try:
result = json.loads(result)
except ValueError as e:
print("PBinCLI Error: {}".format(e))
sys.exit(1)
if 'status' in result and not result['status']:
print("Paste received! Text inside:")
data = json.loads(result['data'])
print("Paste received!")
if args.debug: print("Text:\t{}\n".format(data))
version = result['v'] if 'v' in result else 1
paste.setVersion(version)
if version == 2:
if args.debug: print("Message:\t{}\nAuthentication data:\t{}".format(result['ct'], result['adata']))
text = SJCL().decrypt(data, password)
paste.setHash(passphrase)
paste.loadJSON(result)
paste.decrypt()
text = paste.getText()
if args.debug: print("Decoded text size: {}\n".format(len(text)))
if len(text):
print("{}\n".format(decompress(text.decode())))
if args.debug: print("{}\n".format(text.decode()))
filename = "paste-" + pasteid + ".txt"
print("Found text in paste. Saving it to {}".format(filename))
check_writable("paste.txt")
with open("paste.txt", "wb") as f:
f.write(decompress(text.decode()))
check_writable(filename)
with open(filename, "wb") as f:
f.write(text)
f.close
if 'attachment' in result and 'attachmentname' in result:
print("Found file, attached to paste. Decoding it and saving")
cipherfile = json.loads(result['attachment'])
cipherfilename = json.loads(result['attachmentname'])
attachment, attachment_name = paste.getAttachment()
if args.debug: print("Name:\t{}\nData:\t{}".format(cipherfilename, cipherfile))
if attachment:
print("Found file, attached to paste. Saving it to {}\n".format(attachment_name))
attachmentf = SJCL().decrypt(cipherfile, password)
attachmentname = SJCL().decrypt(cipherfilename, password)
attachment = decompress(attachmentf.decode('utf-8')).decode('utf-8').split(',', 1)[1]
file = b64decode(attachment)
filename = decompress(attachmentname.decode('utf-8')).decode('utf-8')
print("Filename:\t{}\n".format(filename))
check_writable(filename)
with open(filename, "wb") as f:
f.write(file)
check_writable(attachment_name)
with open(attachment_name, "wb") as f:
f.write(attachment)
f.close
if 'burnafterreading' in result['meta'] and result['meta']['burnafterreading']:
if version == 1 and 'meta' in result and 'burnafterreading' in result['meta'] and result['meta']['burnafterreading']:
print("Burn afrer reading flag found. Deleting paste...")
result = api_client.delete(pasteid, 'burnafterreading')
result = api_client.delete(json_encode({'pasteid':pasteid,'deletetoken':'burnafterreading'}))
if args.debug: print("Delete response:\t{}\n".format(result))
try:
result = json.loads(result)
except ValueError as e:
print("PBinCLI Error: {}".format(e))
sys.exit(1)
if 'status' in result and not result['status']:
print("Paste successfully deleted!")
elif 'status' in result and result['status']:
print("Something went wrong...\nError:\t\t{}".format(result['message']))
sys.exit(1)
exit(1)
else:
print("Something went wrong...\nError: Empty response.")
sys.exit(1)
exit(1)
elif 'status' in result and result['status']:
print("Something went wrong...\nError:\t\t{}".format(result['message']))
sys.exit(1)
exit(1)
else:
print("Something went wrong...\nError: Empty response.")
sys.exit(1)
exit(1)
def delete(args, api_client):
from pbincli.utils import json_encode
pasteid = args.paste
token = args.token
if args.debug: print("PasteID:\t{}\nToken:\t\t{}".format(pasteid, token))
result = api_client.delete(pasteid, token)
result = api_client.delete(json_encode({'pasteid':pasteid,'deletetoken':token}))
if args.debug: print("Response:\t{}\n".format(result))
try:
result = json.loads(result)
except ValueError as e:
print("PBinCLI Error: {}".format(e))
sys.exit(1)
if 'status' in result and not result['status']:
print("Paste successfully deleted!")
elif 'status' in result and result['status']:
print("Something went wrong...\nError:\t\t{}".format(result['message']))
sys.exit(1)
exit(1)
else:
print("Something went wrong...\nError: Empty response.")
sys.exit(1)
exit(1)

50
pbincli/api.py

@ -11,17 +11,49 @@ class PrivateBin: @@ -11,17 +11,49 @@ class PrivateBin:
def post(self, request):
r = requests.post(url = self.server, headers = self.headers, proxies = self.proxy, data = request)
return r.text
result = requests.post(
url = self.server,
headers = self.headers,
proxies = self.proxy,
data = request)
try:
return result.json()
except ValueError as e:
print("ERROR: Unable parse response as json. Received (size = {}):\n".format(len(result.text), result.text))
exit(1)
def get(self, request):
url = self.server + "?" + request
r = requests.get(url = url, headers = self.headers, proxies = self.proxy)
return r.text
return requests.get(
url = self.server + "?" + request,
headers = self.headers,
proxies = self.proxy).json()
def delete(self, request):
result = requests.post(
url = self.server,
headers = self.headers,
proxies = self.proxy,
data = request)
# using try as workaround for versions < 1.3 due to we cant detect
# if server used version 1.2, where auto-deletion is added
try:
return result.json()
except ValueError as e:
# unable parse response as json because it can be empty (1.2), so simulate correct answer
from json import loads as json_loads
return json_loads('{"status":0}')
def delete(self, pasteid, token):
request = {'pasteid':pasteid,'deletetoken':token}
r = requests.post(url = self.server, headers = self.headers, proxies = self.proxy, data = request)
return r.text
def getVersion(self):
jsonldSchema = requests.get(
url = self.server + '?jsonld=paste',
proxies = self.proxy).json()
return jsonldSchema['@context']['v']['@value'] \
if ('@context' in jsonldSchema and
'v' in jsonldSchema['@context'] and
'@value' in jsonldSchema['@context']['v']) \
else 1

226
pbincli/format.py

@ -0,0 +1,226 @@ @@ -0,0 +1,226 @@
from Crypto.Random import get_random_bytes
from Crypto.Cipher import AES
from base64 import b64encode, b64decode
CIPHER_ITERATION_COUNT = 100000
CIPHER_SALT_BYTES = 8
CIPHER_BLOCK_BITS = 256
CIPHER_BLOCK_BYTES = int(CIPHER_BLOCK_BITS/8)
CIPHER_TAG_BITS = int(CIPHER_BLOCK_BITS/2)
CIPHER_TAG_BYTES = int(CIPHER_TAG_BITS/8)
class Paste:
def __init__(self, debug=False):
self._version = 2
self._data = ""
self._text = ""
self._attachment = ""
self._attachment_name = ""
self._key = get_random_bytes(CIPHER_BLOCK_BYTES)
self._password = ""
self._debug = debug
def setVersion(self, version):
self._version = version
def setPassword(self, password):
self._password = password
def setText(self, text):
self._text = text
def setAttachment(self, path):
from pbincli.utils import check_readable, path_leaf
from mimetypes import guess_type
check_readable(path)
with open(path, 'rb') as f:
contents = f.read()
f.close()
mime = guess_type(path, strict=False)[0]
# MIME fallback
if not mime: mime = 'application/octet-stream'
if self._debug: print("Filename:\t{}\nMIME-type:\t{}".format(path_leaf(path), mime))
self._attachment = 'data:' + mime + ';base64,' + b64encode(contents).decode()
self._attachment_name = path_leaf(path)
def getText(self):
return self._text
def getAttachment(self):
return [b64decode(self._attachment.split(',', 1)[1]), self._attachment_name] \
if self._attachment \
else [False,False]
def getJSON(self):
return self._data
def loadJSON(self, data):
self._data = data
def getHash(self):
if self._version == 2:
from base58 import b58encode
return b58encode(self._key).decode()
else:
return b64encode(self._key).decode()
def setHash(self, hash):
if self._version == 2:
from base58 import b58decode
self._key = b58decode(hash)
else:
self._key = b64decode(hash)
def __deriveKey(self, salt):
from Crypto.Protocol.KDF import PBKDF2
from Crypto.Hash import HMAC, SHA256
# Key derivation, using PBKDF2 and SHA256 HMAC
return PBKDF2(
self._key + self._password.encode(),
salt,
dkLen = CIPHER_BLOCK_BYTES,
count = CIPHER_ITERATION_COUNT,
prf = lambda password, salt: HMAC.new(
password,
salt,
SHA256
).digest())
def __initializeCipher(self, key, iv, adata):
from pbincli.utils import json_encode
cipher = AES.new(key, AES.MODE_GCM, nonce=iv, mac_len=CIPHER_TAG_BYTES)
cipher.update(json_encode(adata))
return cipher
def decrypt(self):
from pbincli.utils import decompress
from json import loads as json_decode
if self._version == 2:
iv = b64decode(self._paste['adata'][0][0])
salt = b64decode(self._paste['adata'][0][1])
key = self.__deriveKey(salt)
cipher = self.__initializeCipher(key, iv, self._paste['adata'])
# Cut the cipher text into message and tag
cipher_text_tag = b64decode(self._paste['ct'])
cipher_text = cipher_text_tag[:-CIPHER_TAG_BYTES]
cipher_tag = cipher_text_tag[-CIPHER_TAG_BYTES:]
cipher_message = json_decode(decompress(cipher.decrypt_and_verify(cipher_text, cipher_tag), self._version))
self._text = cipher_message['paste'].encode()
if 'attachment' in cipher_message and 'attachment_name' in cipher_message:
self._attachment = cipher_message['attachment']
self._attachment_name = cipher_message['attachment_name']
else:
from hashlib import sha256
from sjcl import SJCL
if self._password:
digest = sha256(self._password.encode("UTF-8")).hexdigest()
password = b64encode(self._key) + digest.encode("UTF-8")
else:
password = b64encode(self._key)
cipher_text = json_decode(self._data['data'])
if self._debug: print("Text:\t{}\n".format(data))
text = SJCL().decrypt(cipher_text, password)
if len(text):
self._text = decompress(text.decode(), self._version)
if 'attachment' in self._data and 'attachmentname' in self._data:
cipherfile = json_decode(self._data['attachment'])
cipherfilename = json_decode(self._data['attachmentname'])
if self._debug: print("Name:\t{}\nData:\t{}".format(cipherfilename, cipherfile))
attachment = SJCL().decrypt(cipherfile, password)
attachmentname = SJCL().decrypt(cipherfilename, password)
self._attachment = decompress(attachment.decode('utf-8'), self._version).decode('utf-8')
self._attachment_name = decompress(attachmentname.decode('utf-8'), self._version).decode('utf-8')
def encrypt(self, formatter, burnafterreading, discussion, expiration):
from pbincli.utils import compress, json_encode
if self._version == 2:
iv = get_random_bytes(CIPHER_TAG_BYTES)
salt = get_random_bytes(CIPHER_SALT_BYTES)
key = self.__deriveKey(salt)
# prepare encryption authenticated data and message
adata = [
[
b64encode(iv).decode(),
b64encode(salt).decode(),
CIPHER_ITERATION_COUNT,
CIPHER_BLOCK_BITS,
CIPHER_TAG_BITS,
'aes',
'gcm',
'zlib'
],
formatter,
int(burnafterreading),
int(discussion)
]
cipher_message = {'paste':self._text}
if self._attachment:
cipher_message['attachment'] = self._attachment
cipher_message['attachment_name'] = self._attachment_name
cipher = self.__initializeCipher(key, iv, adata)
ciphertext, tag = cipher.encrypt_and_digest(compress(json_encode(cipher_message), self._version))
self._data = {'v':2,'adata':adata,'ct':b64encode(ciphertext + tag).decode(),'meta':{'expire':expiration}}
else:
from hashlib import sha256
from sjcl import SJCL
self._data = {'expire':expiration,'formatter':formatter,'burnafterreading':int(burnafterreading),'opendiscussion':int(discussion)}
if self._password:
digest = sha256(self._password.encode("UTF-8")).hexdigest()
password = b64encode(self._key) + digest.encode("UTF-8")
else:
password = b64encode(self._key)
if self._debug: print("Password:\t{}".format(password))
# Encrypting text
cipher = SJCL().encrypt(compress(self._text.encode('utf-8'), self._version), password, mode='gcm')
for k in ['salt', 'iv', 'ct']: cipher[k] = cipher[k].decode()
self._data['data'] = json_encode(cipher)
if self._attachment:
cipherfile = SJCL().encrypt(compress(self._attachment.encode('utf-8'), self._version), password, mode='gcm')
for k in ['salt', 'iv', 'ct']: cipherfile[k] = cipherfile[k].decode()
cipherfilename = SJCL().encrypt(compress(self._attachment_name.encode('utf-8'), self._version), password, mode='gcm')
for k in ['salt', 'iv', 'ct']: cipherfilename[k] = cipherfilename[k].decode()
self._data['attachment'] = json_encode(cipherfile)
self._data['attachmentname'] = json_encode(cipherfilename)

30
pbincli/utils.py

@ -22,12 +22,24 @@ def check_writable(f): @@ -22,12 +22,24 @@ def check_writable(f):
raise PBinCLIException("Path is not writable: {}".format(f))
def decompress(s):
return zlib.decompress(bytearray(map(ord, b64decode(s.encode('utf-8')).decode('utf-8'))), -zlib.MAX_WBITS)
def compress(s):
co = zlib.compressobj(wbits=-zlib.MAX_WBITS)
b = co.compress(s) + co.flush()
return b64encode(''.join(map(chr, b)).encode('utf-8'))
def decompress(s, ver = 1):
if ver == 2:
return zlib.decompress(s, -zlib.MAX_WBITS)
else:
return zlib.decompress(bytearray(map(ord, b64decode(s.encode('utf-8')).decode('utf-8'))), -zlib.MAX_WBITS)
def compress(s, ver = 1):
if ver == 2:
# using compressobj as compress doesn't let us specify wbits
# needed to get the raw stream without headers
co = zlib.compressobj(wbits=-zlib.MAX_WBITS)
return co.compress(s) + co.flush()
else:
co = zlib.compressobj(wbits=-zlib.MAX_WBITS)
b = co.compress(s) + co.flush()
return b64encode(''.join(map(chr, b)).encode('utf-8'))
def json_encode(s):
return json.dumps(s, separators=(',',':')).encode()

1
requirements.txt

@ -1,3 +1,4 @@ @@ -1,3 +1,4 @@
pycryptodome
sjcl
base58
requests

Loading…
Cancel
Save