From f7fae450a02156a5795aa7e86487f397263a9113 Mon Sep 17 00:00:00 2001 From: r4sas Date: Sun, 2 Jun 2019 14:04:38 +0000 Subject: [PATCH] [wip] v2 support code (#13) --- pbincli/actions.py | 200 +++++++++++++++------------------------ pbincli/api.py | 50 ++++++++-- pbincli/format.py | 226 +++++++++++++++++++++++++++++++++++++++++++++ pbincli/utils.py | 30 ++++-- requirements.txt | 1 + 5 files changed, 364 insertions(+), 143 deletions(-) create mode 100644 pbincli/format.py diff --git a/pbincli/actions.py b/pbincli/actions.py index 24e8e33..9c67d60 100644 --- a/pbincli/actions.py +++ b/pbincli/actions.py @@ -1,14 +1,7 @@ -import json, hashlib, os, sys -import pbincli.actions -from sjcl import SJCL - -from base64 import b64encode, b64decode -from pbincli.utils import PBinCLIException +from sys import exit +from pbincli.format import Paste def send(args, api_client): - from pbincli.utils import check_readable, compress, path_leaf - from mimetypes import guess_type - if not args.notext: if args.text: text = args.text @@ -16,202 +9,159 @@ def send(args, api_client): text = args.stdin.read() elif not args.file: print("Nothing to send!") - sys.exit(1) + exit(1) else: text = "" - # Formatting request - request = {'expire':args.expire,'formatter':args.format,'burnafterreading':int(args.burn),'opendiscussion':int(args.discus)} - - passphrase = b64encode(os.urandom(32)) - if args.debug: print("Passphrase:\t{}".format(passphrase)) + paste = Paste(args.debug) + paste.setVersion(api_client.getVersion()) + paste.setText(text) # If we set PASSWORD variable if args.password: - digest = hashlib.sha256(args.password.encode("UTF-8")).hexdigest() - password = passphrase + digest.encode("UTF-8") - else: - password = passphrase - - if args.debug: print("Password:\t{}".format(password)) - - # Encrypting text - cipher = SJCL().encrypt(compress(text.encode('utf-8')), password, mode='gcm') - - # TODO: should be implemented in upstream - for k in ['salt', 'iv', 'ct']: cipher[k] = cipher[k].decode() - - request['data'] = json.dumps(cipher, ensure_ascii=False).replace(' ','') + paste.setPassword(args.password) # If we set FILE variable if args.file: - check_readable(args.file) - with open(args.file, "rb") as f: - contents = f.read() - f.close() - mime = guess_type(args.file, strict=False)[0] - - # MIME fallback - if not mime: mime = "application/octet-stream" + paste.setAttachment(args.file) - if args.debug: print("Filename:\t{}\nMIME-type:\t{}".format(path_leaf(args.file), mime)) + paste.encrypt( + formatter = args.format, + burnafterreading = args.burn, + discussion = args.discus, + expiration = args.expire) - file = "data:" + mime[0] + ";base64," + b64encode(contents).decode() - filename = path_leaf(args.file) - - cipherfile = SJCL().encrypt(compress(file.encode('utf-8')), password, mode='gcm') - # TODO: should be implemented in upstream - for k in ['salt', 'iv', 'ct']: cipherfile[k] = cipherfile[k].decode() - cipherfilename = SJCL().encrypt(compress(filename.encode('utf-8')), password, mode='gcm') - for k in ['salt', 'iv', 'ct']: cipherfilename[k] = cipherfilename[k].decode() - - request['attachment'] = json.dumps(cipherfile, ensure_ascii=False).replace(' ','') - request['attachmentname'] = json.dumps(cipherfilename, ensure_ascii=False).replace(' ','') + request = paste.getJSON() if args.debug: print("Request:\t{}".format(request)) # If we use dry option, exit now - if args.dry: sys.exit(0) + if args.dry: exit(0) result = api_client.post(request) if args.debug: print("Response:\t{}\n".format(result)) - try: - result = json.loads(result) - except ValueError as e: - print("PBinCLI Error: {}".format(e)) - sys.exit(1) - if 'status' in result and not result['status']: - print("Paste uploaded!\nPasteID:\t{}\nPassword:\t{}\nDelete token:\t{}\n\nLink:\t\t{}?{}#{}".format(result['id'], passphrase.decode(), result['deletetoken'], api_client.server, result['id'], passphrase.decode())) + passphrase = paste.getHash() + + print("Paste uploaded!\nPasteID:\t{}\nPassword:\t{}\nDelete token:\t{}\n\nLink:\t\t{}?{}#{}".format( + result['id'], + passphrase, + result['deletetoken'], + api_client.server, + result['id'], + passphrase)) elif 'status' in result and result['status']: print("Something went wrong...\nError:\t\t{}".format(result['message'])) - sys.exit(1) + exit(1) else: print("Something went wrong...\nError: Empty response.") - sys.exit(1) + exit(1) def get(args, api_client): - from pbincli.utils import check_writable, decompress + from pbincli.utils import check_writable - pasteid, passphrase = args.pasteinfo.split("#") + try: + pasteid, passphrase = args.pasteinfo.split("#") + except ValueError as err: + print("PBinCLI error: provided info hasn't contain valid PasteID#Passphrase string") + exit(1) + + if not (pasteid and passphrase): + print("PBinCLI error: Incorrect request") + exit(1) - if pasteid and passphrase: - if args.debug: print("PasteID:\t{}\nPassphrase:\t{}".format(pasteid, passphrase)) + if args.debug: print("PasteID:\t{}\nPassphrase:\t{}".format(pasteid, passphrase)) - if args.password: - digest = hashlib.sha256(args.password.encode("UTF-8")).hexdigest() - password = passphrase + digest.encode("UTF-8") - else: - password = passphrase + paste = Paste() + if args.password: + paste.setPassword(args.password) if args.debug: print("Password:\t{}".format(password)) - result = api_client.get(pasteid) - else: - print("PBinCLI error: Incorrect request") - sys.exit(1) + result = api_client.get(pasteid) if args.debug: print("Response:\t{}\n".format(result)) - try: - result = json.loads(result) - except ValueError as e: - print("PBinCLI Error: {}".format(e)) - sys.exit(1) - if 'status' in result and not result['status']: - print("Paste received! Text inside:") - data = json.loads(result['data']) + print("Paste received!") - if args.debug: print("Text:\t{}\n".format(data)) + version = result['v'] if 'v' in result else 1 + paste.setVersion(version) + + if version == 2: + if args.debug: print("Message:\t{}\nAuthentication data:\t{}".format(result['ct'], result['adata'])) - text = SJCL().decrypt(data, password) + paste.setHash(passphrase) + paste.loadJSON(result) + paste.decrypt() + + text = paste.getText() if args.debug: print("Decoded text size: {}\n".format(len(text))) if len(text): - print("{}\n".format(decompress(text.decode()))) + if args.debug: print("{}\n".format(text.decode())) + filename = "paste-" + pasteid + ".txt" + print("Found text in paste. Saving it to {}".format(filename)) - check_writable("paste.txt") - with open("paste.txt", "wb") as f: - f.write(decompress(text.decode())) + check_writable(filename) + with open(filename, "wb") as f: + f.write(text) f.close - if 'attachment' in result and 'attachmentname' in result: - print("Found file, attached to paste. Decoding it and saving") - - cipherfile = json.loads(result['attachment']) - cipherfilename = json.loads(result['attachmentname']) + attachment, attachment_name = paste.getAttachment() - if args.debug: print("Name:\t{}\nData:\t{}".format(cipherfilename, cipherfile)) + if attachment: + print("Found file, attached to paste. Saving it to {}\n".format(attachment_name)) - attachmentf = SJCL().decrypt(cipherfile, password) - attachmentname = SJCL().decrypt(cipherfilename, password) - - attachment = decompress(attachmentf.decode('utf-8')).decode('utf-8').split(',', 1)[1] - file = b64decode(attachment) - filename = decompress(attachmentname.decode('utf-8')).decode('utf-8') - - print("Filename:\t{}\n".format(filename)) - - check_writable(filename) - with open(filename, "wb") as f: - f.write(file) + check_writable(attachment_name) + with open(attachment_name, "wb") as f: + f.write(attachment) f.close - if 'burnafterreading' in result['meta'] and result['meta']['burnafterreading']: + if version == 1 and 'meta' in result and 'burnafterreading' in result['meta'] and result['meta']['burnafterreading']: print("Burn afrer reading flag found. Deleting paste...") - result = api_client.delete(pasteid, 'burnafterreading') + result = api_client.delete(json_encode({'pasteid':pasteid,'deletetoken':'burnafterreading'})) if args.debug: print("Delete response:\t{}\n".format(result)) - try: - result = json.loads(result) - except ValueError as e: - print("PBinCLI Error: {}".format(e)) - sys.exit(1) - if 'status' in result and not result['status']: print("Paste successfully deleted!") elif 'status' in result and result['status']: print("Something went wrong...\nError:\t\t{}".format(result['message'])) - sys.exit(1) + exit(1) else: print("Something went wrong...\nError: Empty response.") - sys.exit(1) + exit(1) elif 'status' in result and result['status']: print("Something went wrong...\nError:\t\t{}".format(result['message'])) - sys.exit(1) + exit(1) else: print("Something went wrong...\nError: Empty response.") - sys.exit(1) + exit(1) def delete(args, api_client): + from pbincli.utils import json_encode + pasteid = args.paste token = args.token if args.debug: print("PasteID:\t{}\nToken:\t\t{}".format(pasteid, token)) - result = api_client.delete(pasteid, token) + result = api_client.delete(json_encode({'pasteid':pasteid,'deletetoken':token})) if args.debug: print("Response:\t{}\n".format(result)) - try: - result = json.loads(result) - except ValueError as e: - print("PBinCLI Error: {}".format(e)) - sys.exit(1) - if 'status' in result and not result['status']: print("Paste successfully deleted!") elif 'status' in result and result['status']: print("Something went wrong...\nError:\t\t{}".format(result['message'])) - sys.exit(1) + exit(1) else: print("Something went wrong...\nError: Empty response.") - sys.exit(1) + exit(1) diff --git a/pbincli/api.py b/pbincli/api.py index 476ace3..1a136f8 100644 --- a/pbincli/api.py +++ b/pbincli/api.py @@ -11,17 +11,49 @@ class PrivateBin: def post(self, request): - r = requests.post(url = self.server, headers = self.headers, proxies = self.proxy, data = request) - return r.text + result = requests.post( + url = self.server, + headers = self.headers, + proxies = self.proxy, + data = request) + + try: + return result.json() + except ValueError as e: + print("ERROR: Unable parse response as json. Received (size = {}):\n".format(len(result.text), result.text)) + exit(1) def get(self, request): - url = self.server + "?" + request - r = requests.get(url = url, headers = self.headers, proxies = self.proxy) - return r.text + return requests.get( + url = self.server + "?" + request, + headers = self.headers, + proxies = self.proxy).json() + + + def delete(self, request): + result = requests.post( + url = self.server, + headers = self.headers, + proxies = self.proxy, + data = request) + + # using try as workaround for versions < 1.3 due to we cant detect + # if server used version 1.2, where auto-deletion is added + try: + return result.json() + except ValueError as e: + # unable parse response as json because it can be empty (1.2), so simulate correct answer + from json import loads as json_loads + return json_loads('{"status":0}') - def delete(self, pasteid, token): - request = {'pasteid':pasteid,'deletetoken':token} - r = requests.post(url = self.server, headers = self.headers, proxies = self.proxy, data = request) - return r.text + def getVersion(self): + jsonldSchema = requests.get( + url = self.server + '?jsonld=paste', + proxies = self.proxy).json() + return jsonldSchema['@context']['v']['@value'] \ + if ('@context' in jsonldSchema and + 'v' in jsonldSchema['@context'] and + '@value' in jsonldSchema['@context']['v']) \ + else 1 diff --git a/pbincli/format.py b/pbincli/format.py new file mode 100644 index 0000000..4d8d238 --- /dev/null +++ b/pbincli/format.py @@ -0,0 +1,226 @@ +from Crypto.Random import get_random_bytes +from Crypto.Cipher import AES +from base64 import b64encode, b64decode + +CIPHER_ITERATION_COUNT = 100000 +CIPHER_SALT_BYTES = 8 +CIPHER_BLOCK_BITS = 256 +CIPHER_BLOCK_BYTES = int(CIPHER_BLOCK_BITS/8) +CIPHER_TAG_BITS = int(CIPHER_BLOCK_BITS/2) +CIPHER_TAG_BYTES = int(CIPHER_TAG_BITS/8) + +class Paste: + def __init__(self, debug=False): + self._version = 2 + self._data = "" + self._text = "" + self._attachment = "" + self._attachment_name = "" + self._key = get_random_bytes(CIPHER_BLOCK_BYTES) + self._password = "" + self._debug = debug + + + def setVersion(self, version): + self._version = version + + + def setPassword(self, password): + self._password = password + + + def setText(self, text): + self._text = text + + + def setAttachment(self, path): + from pbincli.utils import check_readable, path_leaf + from mimetypes import guess_type + + check_readable(path) + with open(path, 'rb') as f: + contents = f.read() + f.close() + mime = guess_type(path, strict=False)[0] + + # MIME fallback + if not mime: mime = 'application/octet-stream' + + if self._debug: print("Filename:\t{}\nMIME-type:\t{}".format(path_leaf(path), mime)) + + self._attachment = 'data:' + mime + ';base64,' + b64encode(contents).decode() + self._attachment_name = path_leaf(path) + + + def getText(self): + return self._text + + + def getAttachment(self): + return [b64decode(self._attachment.split(',', 1)[1]), self._attachment_name] \ + if self._attachment \ + else [False,False] + + + def getJSON(self): + return self._data + + + def loadJSON(self, data): + self._data = data + + + def getHash(self): + if self._version == 2: + from base58 import b58encode + return b58encode(self._key).decode() + else: + return b64encode(self._key).decode() + + + def setHash(self, hash): + if self._version == 2: + from base58 import b58decode + self._key = b58decode(hash) + else: + self._key = b64decode(hash) + + + def __deriveKey(self, salt): + from Crypto.Protocol.KDF import PBKDF2 + from Crypto.Hash import HMAC, SHA256 + # Key derivation, using PBKDF2 and SHA256 HMAC + return PBKDF2( + self._key + self._password.encode(), + salt, + dkLen = CIPHER_BLOCK_BYTES, + count = CIPHER_ITERATION_COUNT, + prf = lambda password, salt: HMAC.new( + password, + salt, + SHA256 + ).digest()) + + + def __initializeCipher(self, key, iv, adata): + from pbincli.utils import json_encode + cipher = AES.new(key, AES.MODE_GCM, nonce=iv, mac_len=CIPHER_TAG_BYTES) + cipher.update(json_encode(adata)) + return cipher + + + def decrypt(self): + from pbincli.utils import decompress + from json import loads as json_decode + + if self._version == 2: + iv = b64decode(self._paste['adata'][0][0]) + salt = b64decode(self._paste['adata'][0][1]) + key = self.__deriveKey(salt) + + cipher = self.__initializeCipher(key, iv, self._paste['adata']) + # Cut the cipher text into message and tag + cipher_text_tag = b64decode(self._paste['ct']) + cipher_text = cipher_text_tag[:-CIPHER_TAG_BYTES] + cipher_tag = cipher_text_tag[-CIPHER_TAG_BYTES:] + cipher_message = json_decode(decompress(cipher.decrypt_and_verify(cipher_text, cipher_tag), self._version)) + + self._text = cipher_message['paste'].encode() + if 'attachment' in cipher_message and 'attachment_name' in cipher_message: + self._attachment = cipher_message['attachment'] + self._attachment_name = cipher_message['attachment_name'] + else: + from hashlib import sha256 + from sjcl import SJCL + + if self._password: + digest = sha256(self._password.encode("UTF-8")).hexdigest() + password = b64encode(self._key) + digest.encode("UTF-8") + else: + password = b64encode(self._key) + + cipher_text = json_decode(self._data['data']) + + if self._debug: print("Text:\t{}\n".format(data)) + + text = SJCL().decrypt(cipher_text, password) + + if len(text): + self._text = decompress(text.decode(), self._version) + + if 'attachment' in self._data and 'attachmentname' in self._data: + cipherfile = json_decode(self._data['attachment']) + cipherfilename = json_decode(self._data['attachmentname']) + + if self._debug: print("Name:\t{}\nData:\t{}".format(cipherfilename, cipherfile)) + + attachment = SJCL().decrypt(cipherfile, password) + attachmentname = SJCL().decrypt(cipherfilename, password) + + self._attachment = decompress(attachment.decode('utf-8'), self._version).decode('utf-8') + self._attachment_name = decompress(attachmentname.decode('utf-8'), self._version).decode('utf-8') + + + def encrypt(self, formatter, burnafterreading, discussion, expiration): + from pbincli.utils import compress, json_encode + if self._version == 2: + iv = get_random_bytes(CIPHER_TAG_BYTES) + salt = get_random_bytes(CIPHER_SALT_BYTES) + key = self.__deriveKey(salt) + + # prepare encryption authenticated data and message + adata = [ + [ + b64encode(iv).decode(), + b64encode(salt).decode(), + CIPHER_ITERATION_COUNT, + CIPHER_BLOCK_BITS, + CIPHER_TAG_BITS, + 'aes', + 'gcm', + 'zlib' + ], + formatter, + int(burnafterreading), + int(discussion) + ] + cipher_message = {'paste':self._text} + if self._attachment: + cipher_message['attachment'] = self._attachment + cipher_message['attachment_name'] = self._attachment_name + + cipher = self.__initializeCipher(key, iv, adata) + ciphertext, tag = cipher.encrypt_and_digest(compress(json_encode(cipher_message), self._version)) + + self._data = {'v':2,'adata':adata,'ct':b64encode(ciphertext + tag).decode(),'meta':{'expire':expiration}} + + else: + from hashlib import sha256 + from sjcl import SJCL + + self._data = {'expire':expiration,'formatter':formatter,'burnafterreading':int(burnafterreading),'opendiscussion':int(discussion)} + + if self._password: + digest = sha256(self._password.encode("UTF-8")).hexdigest() + password = b64encode(self._key) + digest.encode("UTF-8") + else: + password = b64encode(self._key) + + if self._debug: print("Password:\t{}".format(password)) + + # Encrypting text + cipher = SJCL().encrypt(compress(self._text.encode('utf-8'), self._version), password, mode='gcm') + for k in ['salt', 'iv', 'ct']: cipher[k] = cipher[k].decode() + + self._data['data'] = json_encode(cipher) + + if self._attachment: + cipherfile = SJCL().encrypt(compress(self._attachment.encode('utf-8'), self._version), password, mode='gcm') + for k in ['salt', 'iv', 'ct']: cipherfile[k] = cipherfile[k].decode() + + cipherfilename = SJCL().encrypt(compress(self._attachment_name.encode('utf-8'), self._version), password, mode='gcm') + for k in ['salt', 'iv', 'ct']: cipherfilename[k] = cipherfilename[k].decode() + + self._data['attachment'] = json_encode(cipherfile) + self._data['attachmentname'] = json_encode(cipherfilename) + diff --git a/pbincli/utils.py b/pbincli/utils.py index 534f1c0..2028860 100644 --- a/pbincli/utils.py +++ b/pbincli/utils.py @@ -22,12 +22,24 @@ def check_writable(f): raise PBinCLIException("Path is not writable: {}".format(f)) -def decompress(s): - return zlib.decompress(bytearray(map(ord, b64decode(s.encode('utf-8')).decode('utf-8'))), -zlib.MAX_WBITS) - - -def compress(s): - co = zlib.compressobj(wbits=-zlib.MAX_WBITS) - b = co.compress(s) + co.flush() - - return b64encode(''.join(map(chr, b)).encode('utf-8')) +def decompress(s, ver = 1): + if ver == 2: + return zlib.decompress(s, -zlib.MAX_WBITS) + else: + return zlib.decompress(bytearray(map(ord, b64decode(s.encode('utf-8')).decode('utf-8'))), -zlib.MAX_WBITS) + + +def compress(s, ver = 1): + if ver == 2: + # using compressobj as compress doesn't let us specify wbits + # needed to get the raw stream without headers + co = zlib.compressobj(wbits=-zlib.MAX_WBITS) + return co.compress(s) + co.flush() + else: + co = zlib.compressobj(wbits=-zlib.MAX_WBITS) + b = co.compress(s) + co.flush() + return b64encode(''.join(map(chr, b)).encode('utf-8')) + + +def json_encode(s): + return json.dumps(s, separators=(',',':')).encode() diff --git a/requirements.txt b/requirements.txt index c4f4f7c..29a9b41 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ pycryptodome sjcl +base58 requests