#!/usr/bin/env python3 """ Intra2net 2017 =============================================================================== crypto -- Encryption Layer for the Deltatar Backup =============================================================================== Crypto stack: - AES-GCM for the symmetric encryption; - Scrypt as KDF. References: - NIST Recommendation for Block Cipher Modes of Operation: Galois/Counter Mode (GCM) and GMAC http://nvlpubs.nist.gov/nistpubs/Legacy/SP/nistspecialpublication800-38d.pdf - AES-GCM v1: https://cryptome.org/2014/01/aes-gcm-v1.pdf - Authentication weaknesses in GCM http://csrc.nist.gov/groups/ST/toolkit/BCM/documents/comments/CWC-GCM/Ferguson2.pdf Errors ------------------------------------------------------------------------------- Errors fall into roughly three categories: - Cryptographical errors or invalid data. - ``InvalidGCMTag`` (decryption failed on account of an invalid GCM tag), - ``InvalidIVFixedPart`` (IV fixed part of object not found in list), - ``DuplicateIV`` (the IV of an object encrypted earlier was reused), - ``NonConsecutiveIV`` (IVs of two encrypted objects are not consecutive), - ``DecryptionError`` (used in CLI decryption for presenting error conditions to the user). - Incorrect usage of the library. - ``InvalidParameter`` (non-conforming user supplied parameter), - ``InvalidHeader`` (data passed for reading not parsable into header), - ``FormatError`` (cannot handle header or parameter version), - ``RuntimeError``. - Bad internal state. If one of these is encountered it means that a state was reached that shouldn’t occur during normal processing. - ``InternalError``, - ``Unreachable``. Also, ``EndOfFile`` is used as a sentinel to communicate that a stream supplied for reading is exhausted. Initialization Vectors ------------------------------------------------------------------------------- Initialization vectors are checked for reuse during the lifetime of a decryptor. The fixed counters for metadata files cannot be reused and attempts to do so will cause a DuplicateIV error. This means the length of objects encrypted with a metadata counter is capped at 63 GB. For ordinary, non-metadata payload, there is an optional mode with strict IV checking that causes a crypto context to fail if an IV encountered or created was already used for decrypting or encrypting, respectively, an earlier object. Note that this mode can trigger false positives when decrypting non-linearly, e. g. when traversing the same object multiple times. Since the crypto context has no notion of a position in a PDT encrypted archive, this condition must be sorted out downstream. When encrypting with more than one Encrypt context special care must be taken to prevent accidental reuse of IVs. The builtin protection against reuse is only effective for objects encrypted with the same Encrypt handle. If multiple Encrypt handles are used to encrypt with the same combination of password and salt, the encryption becomes susceptible to birthday attacks (bound = 2^32 due to the 64-bit random iv). Thus the use of multiple handles is discouraged. Command Line Utility ------------------------------------------------------------------------------- ``crypto.py`` may be invoked as a script for decrypting, validating, and splitting PDT encrypted files. Consult the usage message for details. Usage examples: Decrypt from stdin using the password ‘foo’: :: $ crypto.py process foo -i - -o - some-file.tar.gz Output verbose information about the encrypted objects in the archive: :: $ crypto.py process foo -v -i some-file.tar.gz.pdtcrypt -o /dev/null PDT: decrypt from some-file.tar.gz.pdtcrypt PDT: decrypt to /dev/null PDT: source: file some-file.tar.gz.pdtcrypt PDT: sink: file /dev/null PDT: 0 hdr PDT: · version = 1 : 0100 PDT: · paramversion = 1 : 0100 PDT: · nacl : d270 b031 00d1 87e2 c946 610d 7b7f 7e5f PDT: · iv : 02ee 3dd7 a963 1eb1 0100 0000 PDT: · ctsize = 591 : 4f02 0000 0000 0000 PDT: · tag : 5b2d 6d8b 8f82 4842 12fd 0b10 b6e3 369b PDT: 64 decrypt obj no. 1, 591 B PDT: · [64] 0% done, read block (591 B of 591 B remaining) PDT: · decrypt ciphertext 591 B PDT: · decrypt plaintext 591 B PDT: 655 finalize … Also, the mode *scrypt* allows deriving encryption keys. To calculate the encryption key from the password ‘foo’ and the salt of the first object in a PDT encrypted file: :: $ crypto.py scrypt foo -i some-file.pdtcrypt {"paramversion": 1, "salt": "Cqzbk48e3peEjzWto8D0yA==", "key": "JH9EkMwaM4x9F5aim5gK/Q=="} The computed 16 byte key is given in hexadecimal notation in the value to ``hash`` and can be fed into Python’s ``binascii.unhexlify()`` to obtain the corresponding binary representation. Note that in Scrypt hashing mode, no data integrity checks are being performed. If the wrong password is given, a wrong key will be derived. Whether the password was indeed correct can only be determined by decrypting. Note that since PDT archives essentially consist of a stream of independent objects, the salt and other parameters may change. Thus a key derived using above method from the first object doesn’t necessarily apply to any of the subsequent objects. Future Developments ------------------------------------------------------------------------------- As of 2020 with the format version 1, Deltatar encryption uses the AES-GCM mode which requires meticulous bookkeeping of initialization vectors. A future version could simplify this aspect of the encryption by switching to the more recent AES-GCM-SIV mode (RFC 8452). """ import base64 import binascii import bisect import ctypes import io from functools import reduce, partial import mmap import os import struct import stat import sys import time import types import errno try: import enum34 except ImportError as exn: pass if __name__ == "__main__": ## Work around the import mechanism lest Python’s pwd = os.getcwd() ## preference for local imports causes a cyclical ## import (crypto → pylibscrypt → […] → ./tarfile → crypto). sys.path = [ p for p in sys.path if p.find ("deltatar") < 0 ] import pylibscrypt from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend import cryptography __all__ = [ "hdr_make", "hdr_read", "hdr_fmt", "hdr_fmt_pretty" , "scrypt_hashfile" , "PDTCRYPT_HDR_SIZE", "AES_GCM_IV_CNT_DATA" , "AES_GCM_IV_CNT_INFOFILE", "AES_GCM_IV_CNT_INDEX" ] ############################################################################### ## exceptions ############################################################################### class EndOfFile (Exception): """Reached EOF.""" remainder = 0 msg = 0 def __init__ (self, n=None, msg=None): if n is not None: self.remainder = n self.msg = msg class InvalidParameter (Exception): """Inputs not valid for PDT encryption.""" pass class InvalidHeader (Exception): """Header not valid.""" pass class InvalidGCMTag (Exception): """ The GCM tag calculated during decryption differs from that in the object header. """ pass class InvalidIVFixedPart (Exception): """ IV fixed part not in supplied list: either the backup is corrupt or the current object does not belong to it. """ pass class IVFixedPartError (Exception): """ Error creating a unique IV fixed part: repeated calls to system RNG yielded the same sequence of bytes as the last IV used. """ pass class InvalidFileCounter (Exception): """ When encrypting, an attempted reuse of a dedicated counter (info file, index file) was caught. """ pass class DuplicateIV (Exception): """ During encryption, the current IV fixed part is identical to an already existing IV (same prefix and file counter). This indicates tampering or programmer error and cannot be recovered from. """ pass class NonConsecutiveIV (Exception): """ IVs not numbered consecutively. This is a hard error with strict IV checking. Precludes random access to the encrypted objects. """ pass class CiphertextTooLong (Exception): """ An attempt was made to decrypt more data than the ciphertext size declared in the object header. """ pass class FormatError (Exception): """Unusable parameters in header.""" pass class DecryptionError (Exception): """Error during decryption with ``crypto.py`` on the command line.""" pass class Unreachable (Exception): """ Makeshift __builtin_unreachable(); always a programmer error if thrown. """ pass class InternalError (Exception): """Errors not ascribable to bad user inputs or cryptography.""" pass ############################################################################### ## crypto layer version ############################################################################### ENCRYPTION_PARAMETERS = \ { 0: \ { "kdf": ("dummy", 16) , "enc": "passthrough" } , 1: \ { "kdf": ( "scrypt" , { "dkLen" : 16 , "N" : 1 << 16 , "r" : 8 , "p" : 1 , "NaCl_LEN" : 16 }) , "enc": "aes-gcm" } } # Mode zero is unencrypted and only provided for testing purposes. nless # the encryptor / decryptor are explicitly instructed to do so. MIN_SECURE_PARAMETERS = 1 ############################################################################### ## constants ############################################################################### PDTCRYPT_HDR_MAGIC = b"PDTCRYPT" PDTCRYPT_HDR_SIZE_MAGIC = 8 # 8 PDTCRYPT_HDR_SIZE_VERSION = 2 # 10 PDTCRYPT_HDR_SIZE_PARAMVERSION = 2 # 12 PDTCRYPT_HDR_SIZE_NACL = 16 # 28 PDTCRYPT_HDR_SIZE_IV = 12 # 40 PDTCRYPT_HDR_SIZE_CTSIZE = 8 # 48 PDTCRYPT_HDR_SIZE_TAG = 16 # 64 GCM auth tag PDTCRYPT_HDR_SIZE = PDTCRYPT_HDR_SIZE_MAGIC + PDTCRYPT_HDR_SIZE_VERSION \ + PDTCRYPT_HDR_SIZE_PARAMVERSION + PDTCRYPT_HDR_SIZE_NACL \ + PDTCRYPT_HDR_SIZE_IV + PDTCRYPT_HDR_SIZE_CTSIZE \ + PDTCRYPT_HDR_SIZE_TAG # = 64 # precalculate offsets since Python can’t do constant folding over names HDR_OFF_VERSION = PDTCRYPT_HDR_SIZE_MAGIC HDR_OFF_PARAMVERSION = HDR_OFF_VERSION + PDTCRYPT_HDR_SIZE_VERSION HDR_OFF_NACL = HDR_OFF_PARAMVERSION + PDTCRYPT_HDR_SIZE_PARAMVERSION HDR_OFF_IV = HDR_OFF_NACL + PDTCRYPT_HDR_SIZE_NACL HDR_OFF_CTSIZE = HDR_OFF_IV + PDTCRYPT_HDR_SIZE_IV HDR_OFF_TAG = HDR_OFF_CTSIZE + PDTCRYPT_HDR_SIZE_CTSIZE FMT_UINT16_LE = " hdrinfo; # fn hdr_make (f : handle, h : hdrinfo) -> IOResult; # fn hdr_fmt (h : hdrinfo) -> String; # def hdr_read (data): """ Read bytes as header structure. If the input could not be interpreted as a header, fail with ``InvalidHeader``. """ try: mag, version, paramversion, nacl, iv, ctsize, tag = \ struct.unpack (FMT_I2N_HDR, data) except Exception as exn: raise InvalidHeader ("error unpacking header from [%r]: %s" % (binascii.hexlify (data), str (exn))) if mag != PDTCRYPT_HDR_MAGIC: raise InvalidHeader ("bad magic in header: expected [%s], got [%s]" % (PDTCRYPT_HDR_MAGIC, mag)) return \ { "version" : version , "paramversion" : paramversion , "nacl" : nacl , "iv" : iv , "ctsize" : ctsize , "tag" : tag } def hdr_read_stream (instr): """ Read header from stream at the current position. Fail with ``InvalidHeader`` if insufficient bytes were read from the stream, or if the content could not be interpreted as a header. """ data = instr.read(PDTCRYPT_HDR_SIZE) ldata = len (data) if ldata == 0: raise EndOfFile elif ldata != PDTCRYPT_HDR_SIZE: raise InvalidHeader ("hdr_read_stream: expected %d B, received %d B" % (PDTCRYPT_HDR_SIZE, ldata)) return hdr_read (data) def hdr_from_params (version, paramversion, nacl, iv, ctsize, tag): """ Assemble the necessary values into a PDTCRYPT header. :type version: int to fit uint16_t :type paramversion: int to fit uint16_t :type nacl: bytes to fit uint8_t[16] :type iv: bytes to fit uint8_t[12] :type size: int to fit uint64_t :type tag: bytes to fit uint8_t[16] """ buf = bytearray (PDTCRYPT_HDR_SIZE) bufv = memoryview (buf) try: struct.pack_into (FMT_I2N_HDR, bufv, 0, PDTCRYPT_HDR_MAGIC, version, paramversion, nacl, iv, ctsize, tag) except Exception as exn: return False, "error assembling header: %s" % str (exn) return True, bytes (buf) def hdr_make_dummy (s): """ Create a header sized block of bytes initialized to a value derived from a string. Used to verify we’ve jumped back correctly to the actual position of the object header. """ c = reduce (lambda a, c: a + ord(c), s, 0) % 0xFF return bytes (bytearray (struct.pack ("B", c)) * PDTCRYPT_HDR_SIZE) def hdr_make (hdr): """ Assemble a header from the given header structure. """ return hdr_from_params (version=hdr.get("version"), paramversion=hdr.get("paramversion"), nacl=hdr.get("nacl"), iv=hdr.get("iv"), ctsize=hdr.get("ctsize"), tag=hdr.get("tag")) HDR_FMT = "I2n_header { version: %d, paramversion: %d, nacl: %s[%d]," \ " iv: %s[%d], ctsize: %d, tag: %s[%d] }" def hdr_fmt (h): """Format a header structure into readable output.""" return HDR_FMT % (h["version"], h["paramversion"], binascii.hexlify (h["nacl"]), len(h["nacl"]), binascii.hexlify (h["iv"]), len(h["iv"]), h["ctsize"], binascii.hexlify (h["tag"]), len(h["tag"])) def hex_spaced_of_bytes (b): """Format bytes object, hexdump style.""" return " ".join ([ "%.2x%.2x" % (c1, c2) for c1, c2 in zip (b[0::2], b[1::2]) ]) \ + (len (b) | 1 == len (b) and " %.2x" % b[-1] or "") # odd lengths def hdr_iv_counter (h): """Extract the variable part of the IV of the given header.""" _fixed, cnt = struct.unpack (FMT_I2N_IV, h ["iv"]) return cnt def hdr_iv_fixed (h): """Extract the fixed part of the IV of the given header.""" fixed, _cnt = struct.unpack (FMT_I2N_IV, h ["iv"]) return fixed hdr_dump = hex_spaced_of_bytes HDR_FMT_PRETTY = \ """version = %-4d : %s paramversion = %-4d : %s nacl : %s iv : %s ctsize = %-20d : %s tag : %s """ def hdr_fmt_pretty (h): """ Format header structure into multi-line representation of its contents and their raw representation. (Omit the implicit “PDTCRYPT” magic bytes that precede every header.) """ return HDR_FMT_PRETTY \ % (h["version"], hex_spaced_of_bytes (struct.pack (FMT_UINT16_LE, h["version"])), h["paramversion"], hex_spaced_of_bytes (struct.pack (FMT_UINT16_LE, h["paramversion"])), hex_spaced_of_bytes (h["nacl"]), hex_spaced_of_bytes (h["iv"]), h["ctsize"], hex_spaced_of_bytes (struct.pack (FMT_UINT64_LE, h["ctsize"])), hex_spaced_of_bytes (h["tag"])) IV_FMT = "((f %s) (c %d))" def iv_fmt (iv): """Format the two components of an IV in a readable fashion.""" fixed, cnt = struct.unpack (FMT_I2N_IV, iv) return IV_FMT % (binascii.hexlify (fixed).decode (), cnt) ############################################################################### ## restoration ############################################################################### class Location (object): n = 0 offset = 0 def restore_loc_fmt (loc): return "%d off:%d" \ % (loc.n, loc.offset) def locate_hdr_candidates (fd): """ Walk over instances of the magic string in the payload, collecting their positions. If the offset of the first found instance is not zero, the file begins with leading garbage. Used by desaster recovery. :return: The list of offsets in the file. """ cands = [] mm = mmap.mmap(fd, 0, mmap.MAP_SHARED, mmap.PROT_READ) pos = 0 while True: pos = mm.find (PDTCRYPT_HDR_MAGIC, pos) if pos == -1: break cands.append (pos) pos += 1 return cands HDR_CAND_GOOD = 0 # header marks begin of valid object HDR_CAND_FISHY = 1 # inconclusive (tag mismatch, obj overlap etc.) HDR_CAND_JUNK = 2 # not a header / object unreadable HDR_VERDICT_NAME = \ { HDR_CAND_GOOD : "valid" , HDR_CAND_FISHY : "fishy" , HDR_CAND_JUNK : "junk" } def verdict_fmt (vdt): return HDR_VERDICT_NAME [vdt] def inspect_hdr (fd, off): """ Attempt to parse a header in *fd* at position *off*. Returns a verdict about the quality of that header plus the parsed header when readable. """ _ = os.lseek (fd, off, os.SEEK_SET) if os.lseek (fd, 0, os.SEEK_CUR) != off: if PDTCRYPT_VERBOSE is True: noise ("PDT: %d → dismissed (lseek() past EOF)" % off) return HDR_CAND_JUNK, None raw = os.read (fd, PDTCRYPT_HDR_SIZE) if len (raw) != PDTCRYPT_HDR_SIZE: if PDTCRYPT_VERBOSE is True: noise ("PDT: %d → dismissed (EOF inside header)" % off) return HDR_CAND_JUNK, None try: hdr = hdr_read (raw) except InvalidHeader as exn: if PDTCRYPT_VERBOSE is True: noise ("PDT: %d → dismissed (invalid: [%s])" % (off, str (exn))) return HDR_CAND_JUNK, None obj0 = off + PDTCRYPT_HDR_SIZE objX = obj0 + hdr ["ctsize"] eof = os.lseek (fd, 0, os.SEEK_END) if eof < objX: if PDTCRYPT_VERBOSE is True: noise ("PDT: %d → EOF inside object (%d≤%d≤%d); adjusting size to " "%d" % (off, obj0, eof, objX, (eof - obj0))) # try reading up to the end hdr ["ctsize"] = eof - obj0 return HDR_CAND_FISHY, hdr return HDR_CAND_GOOD, hdr def try_decrypt (ifd, off, hdr, secret, ofd=-1): """ Attempt to decrypt the object in the (seekable) descriptor *ifd* starting at *off* using the metadata in *hdr* and *secret*. An output fd can be specified with *ofd*; if it is *-1* – the default –, the decrypted payload will be discarded. Always creates a fresh decryptor, so validation steps across objects don’t apply. Errors during GCM tag validation are ignored. Used by desaster recovery. """ ctleft = hdr ["ctsize"] pos = off ks = secret [0] if ks == PDTCRYPT_SECRET_PW: decr = Decrypt (password=secret [1]) elif ks == PDTCRYPT_SECRET_KEY: key = secret [1] decr = Decrypt (key=key) else: raise RuntimeError decr.next (hdr) try: os.lseek (ifd, pos, os.SEEK_SET) pt = b"" while ctleft > 0: cnksiz = min (ctleft, PDTCRYPT_BLOCKSIZE) cnk = os.read (ifd, cnksiz) ctleft -= cnksiz pos += cnksiz pt = decr.process (cnk) if ofd != -1: os.write (ofd, pt) try: pt = decr.done () except InvalidGCMTag: noise ("PDT: GCM tag mismatch for object %d–%d" % (off, off + hdr ["ctsize"])) if len (pt) > 0 and ofd != -1: os.write (ofd, pt) except Exception as exn: noise ("PDT: error decrypting object %d–%d@%d, %d B remaining [%s]" % (off, off + hdr ["ctsize"], pos, ctleft, exn)) raise return pos - off def readable_objects_offsets (ifd, secret, cands): """ From a list of candidates, locate the ones that mark the start of actual readable PDTCRYPT objects. """ good = [] for i, cand in enumerate (cands): vdt, hdr = inspect_hdr (ifd, cand) if vdt == HDR_CAND_JUNK: pass # ignore unreadable ones elif vdt in [HDR_CAND_GOOD, HDR_CAND_FISHY]: ctsize = hdr ["ctsize"] off0 = cand + PDTCRYPT_HDR_SIZE ok = try_decrypt (ifd, off0, hdr, secret) == ctsize if ok is True: good.append ((cand, off0 + ctsize)) overlap = find_overlaps (good) return [ g [0] for g in good ] def reconstruct_offsets (fname, secret): ifd = os.open (fname, os.O_RDONLY) try: cands = locate_hdr_candidates (ifd) return readable_objects_offsets (ifd, secret, cands) finally: os.close (ifd) ############################################################################### ## helpers ############################################################################### def make_secret (password=None, key=None): """ Safely create a “secret” value that consists either of a key or a password. Inputs are validated: the password is accepted as (UTF-8 encoded) bytes or string; for the key only a bytes object of the proper size or a base64 encoded string thereof is accepted. If both are provided, the key is preferred over the password; no checks are performed whether the key is derived from the password. :returns: secret value if inputs were acceptable | None otherwise. """ if key is not None: if isinstance (key, str) is True: key = key.encode ("utf-8") if isinstance (key, bytes) is True: if len (key) == AES_KEY_SIZE: return (PDTCRYPT_SECRET_KEY, key) if len (key) == AES_KEY_SIZE * 2: try: key = binascii.unhexlify (key) return (PDTCRYPT_SECRET_KEY, key) except binascii.Error: # garbage in string pass if len (key) == AES_KEY_SIZE_B64: try: key = base64.b64decode (key) # the base64 processor is very tolerant and allows for # arbitrary trailing and leading data thus the data obtained # must be checked for the proper length if len (key) == AES_KEY_SIZE: return (PDTCRYPT_SECRET_KEY, key) except binascii.Error: # “incorrect padding” pass elif password is not None: if isinstance (password, str) is True: return (PDTCRYPT_SECRET_PW, password) elif isinstance (password, bytes) is True: try: password = password.decode ("utf-8") return (PDTCRYPT_SECRET_PW, password) except UnicodeDecodeError: pass return None ############################################################################### ## passthrough / null encryption ############################################################################### class PassthroughCipher (object): tag = struct.pack (" str """ if paramversion is not None: defs = ENCRYPTION_PARAMETERS.get(paramversion, None) if defs is None: raise InvalidParameter ("no encryption parameters for version %r" % paramversion) (kdf, params) = defs["kdf"] fn = None if kdf == "scrypt" : fn = kdf_scrypt elif kdf == "dummy" : fn = kdf_dummy if fn is None: raise ValueError ("key derivation method %r unknown" % kdf) return partial (fn, params) ############################################################################### ## SCRYPT hashing ############################################################################### def scrypt_hashsource (pw, ins): """ Calculate the SCRYPT hash from the password and the information contained in the first header found in ``ins``. This does not validate whether the first object is encrypted correctly. """ if isinstance (pw, str) is True: pw = str.encode (pw) elif isinstance (pw, bytes) is False: raise InvalidParameter ("password must be a string, not %s" % type (pw)) if isinstance (ins, io.BufferedReader) is False and \ isinstance (ins, io.FileIO) is False: raise InvalidParameter ("file to hash must be opened in “binary” mode") hdr = None try: hdr = hdr_read_stream (ins) except EndOfFile as exn: noise ("PDT: malformed input: end of file reading first object header") noise ("PDT:") return 1 nacl = hdr ["nacl"] pver = hdr ["paramversion"] if PDTCRYPT_VERBOSE is True: noise ("PDT: salt of first object : %s" % binascii.hexlify (nacl)) noise ("PDT: parameter version of archive : %d" % pver) try: defs = ENCRYPTION_PARAMETERS.get(pver, None) kdfname, params = defs ["kdf"] if kdfname != "scrypt": noise ("PDT: input is not an SCRYPT archive") noise ("") return 1 kdf = kdf_by_version (None, defs) except ValueError as exn: noise ("PDT: object has unknown parameter version %d" % pver) hsh, _void = kdf (pw, nacl) return hsh, nacl, hdr ["version"], pver def scrypt_hashfile (pw, fname): """ Calculate the SCRYPT hash from the password and the information contained in the first header found in the given file. The header is read only at offset zero. """ with deptdcrypt_mk_stream (PDTCRYPT_SOURCE, fname or "-") as ins: hsh, _void, _void, _void = scrypt_hashsource (pw, ins) return hsh ############################################################################### ## AES-GCM context ############################################################################### class Crypto (object): """ Encryption context to remain alive throughout an entire tarfile pass. """ enc = None nacl = None key = None cnt = None # file counter (uint32_t != 0) iv = None # current IV fixed = None # accu for 64 bit fixed parts of IV used_ivs = None # tracks IVs strict_ivs = False # if True, panic on duplicate or non-consecutive object IV password = None paramversion = None insecure = False # allow plaintext parameters stats = { "in" : 0 , "out" : 0 , "obj" : 0 } ctsize = -1 ptsize = -1 info_counter_used = False index_counter_used = False def __init__ (self, *al, **akv): self.used_ivs = set () self.set_parameters (*al, **akv) def next_fixed (self): # NOP for decryption pass def set_object_counter (self, cnt=None): """ Safely set the internal counter of encrypted objects. Numerous constraints apply: The same counter may not be reused in combination with one IV fixed part. This is validated elsewhere in the IV handling. Counter zero is invalid. The first two counters are reserved for metadata. The implementation does not allow for splitting metadata files over multiple encrypted objects. (This would be possible by assigning new fixed parts.) Thus in a Deltatar backup there is at most one object with a counter value of one and two. On creation of a context, the initial counter may be chosen. The globals ``AES_GCM_IV_CNT_INFOFILE`` and ``AES_GCM_IV_CNT_INDEX`` can be used to request one of the reserved values. If one of these values has been used, any further attempt of setting the counter to that value will be rejected with an ``InvalidFileCounter`` exception. Out of bounds values (i. e. below one and more than the maximum of 2³²) cause an ``InvalidParameter`` exception to be thrown. """ if cnt is None: self.cnt = AES_GCM_IV_CNT_DATA return if cnt == 0 or cnt > AES_GCM_IV_CNT_MAX + 1: raise InvalidParameter ("invalid counter value %d requested: " "acceptable values are from 1 to %d" % (cnt, AES_GCM_IV_CNT_MAX)) if cnt == AES_GCM_IV_CNT_INFOFILE: if self.info_counter_used is True: raise InvalidFileCounter ("attempted to reuse info file " "counter %d: must be unique" % cnt) self.info_counter_used = True elif cnt == AES_GCM_IV_CNT_INDEX: if self.index_counter_used is True: raise InvalidFileCounter ("attempted to reuse index file " "counter %d: must be unique" % cnt) self.index_counter_used = True if cnt <= AES_GCM_IV_CNT_MAX: self.cnt = cnt return # cnt == AES_GCM_IV_CNT_MAX + 1 → wrap self.cnt = AES_GCM_IV_CNT_DATA self.next_fixed () def set_parameters (self, password=None, key=None, paramversion=None, nacl=None, counter=None, strict_ivs=False, insecure=False): """ Configure the internal state of a crypto context. Not intended for external use. A parameter version indicating passthrough (plaintext) mode is rejected with an ``InvalidParameter`` unless ``insecure`` is set. """ self.next_fixed () self.set_object_counter (counter) self.strict_ivs = strict_ivs self.insecure = insecure if paramversion is not None: if self.insecure is False \ and paramversion < MIN_SECURE_PARAMETERS: raise InvalidParameter \ ("set_parameters: requested parameter version %d but " "plaintext encryption disallowed in secure context!" % paramversion) self.paramversion = paramversion if key is not None: self.key, self.nacl = key, nacl return if password is not None: if isinstance (password, bytes) is False: password = str.encode (password) self.password = password if paramversion is None and nacl is None: # postpone key setup until first header is available return kdf = kdf_by_version (paramversion) if kdf is not None: self.key, self.nacl = kdf (password, nacl) def process (self, buf): """ Encrypt / decrypt a buffer. Invokes the ``.update()`` method on the wrapped encryptor or decryptor, respectively. The Cryptography exception ``AlreadyFinalized`` is translated to an ``InternalError`` at this point. It may occur in sound code when the GC closes an encrypting stream after an error. Everywhere else it must be treated as a bug. """ if self.enc is None: raise RuntimeError ("process: context not initialized") self.stats ["in"] += len (buf) try: out = self.enc.update (buf) except cryptography.exceptions.AlreadyFinalized as exn: raise InternalError (exn) self.stats ["out"] += len (out) return out def next (self, password, paramversion, nacl): """ Prepare for encrypting another object: Reset the data counters and change the configuration in case one of the variable parameters differs from the last object. """ self.ctsize = 0 self.ptsize = 0 self.stats ["obj"] += 1 if ( self.paramversion != paramversion or self.password != password or self.nacl != nacl): self.set_parameters (password=password, paramversion=paramversion, nacl=nacl, strict_ivs=self.strict_ivs, insecure=self.insecure) def counters (self): """ Access the data counters. """ return self.stats ["obj"], self.stats ["in"], self.stats ["out"] def drop (self): """ Clear the current context regardless of its finalization state. The next operation must be ``.next()``. """ self.enc = None def get_used_ivs (self): """ Get the set of IVs that were used so far during the lifetime of this context. Useful to check for IV reuse if multiple encryption contexts were used independently. """ return self.used_ivs def reset_last_iv (self): """ Implemented only for decryptor; no-op otherwise. """ pass class Encrypt (Crypto): lastinfo = None version = None paramenc = None def __init__ (self, version, paramversion, password=None, key=None, nacl=None, counter=AES_GCM_IV_CNT_DATA, strict_ivs=False, insecure=False): """ The ctor will throw immediately if one of the parameters does not conform to our expectations. :type version: int to fit uint16_t :type paramversion: int to fit uint16_t :param password: mutually exclusive with ``key`` :type password: bytes :param key: mutually exclusive with ``password`` :type key: bytes :type nacl: bytes :type counter: initial object counter the values ``AES_GCM_IV_CNT_INFOFILE`` and ``AES_GCM_IV_CNT_INDEX`` are unique in each backup set and cannot be reused even with different fixed parts. :type strict_ivs: bool :param strict_ivs: Enable paranoid tracking of IVs. :type insecure: bool :param insecure: whether to permit passthrough mode *Security considerations*: The ``class Encrypt`` handle guarantees that all random parts (first eight bytes) of the IVs used for encrypting objects are unique. This guarantee does *not* apply across handles if multiple handles are used with the same combination of password and salt. Thus, use of multiple handles with the same combination of password and salt is subject to birthday attacks with a bound of 2^32. To avoid collisions, the application should keep the number of handles as low as possible and check for reuse by comparing the set of IVs used of all handles that were created (accessible using the ``get_used_ivs`` method). """ if password is None and key is None \ or password is not None and key is not None : raise InvalidParameter ("__init__: need either key or password") if key is not None: if isinstance (key, bytes) is False: raise InvalidParameter ("__init__: key must be provided as " "bytes, not %s" % type (key)) if nacl is None: raise InvalidParameter ("__init__: salt must be provided along " "with encryption key") else: # password, no key if isinstance (password, str) is False: raise InvalidParameter ("__init__: password must be a string, not %s" % type (password)) if len (password) == 0: raise InvalidParameter ("__init__: supplied empty password but not " "permitted for PDT encrypted files") # version if isinstance (version, int) is False: raise InvalidParameter ("__init__: version number must be an " "integer, not %s" % type (version)) if version < 0: raise InvalidParameter ("__init__: version number must be a " "nonnegative integer, not %d" % version) # paramversion if isinstance (paramversion, int) is False: raise InvalidParameter ("__init__: crypto parameter version number " "must be an integer, not %s" % type (paramversion)) if paramversion < 0: raise InvalidParameter ("__init__: crypto parameter version number " "must be a nonnegative integer, not %d" % paramversion) # salt if nacl is not None: if isinstance (nacl, bytes) is False: raise InvalidParameter ("__init__: salt given, but of type %s " "instead of bytes" % type (nacl)) # salt length would depend on the actual encryption so it can’t be # validated at this point self.fixed = [ ] self.version = version self.paramenc = ENCRYPTION_PARAMETERS.get (paramversion) ["enc"] super().__init__ (password, key, paramversion, nacl, counter=counter, strict_ivs=strict_ivs, insecure=insecure) def next_fixed (self, retries=PDTCRYPT_IV_GEN_MAX_RETRIES): """ Generate the next IV fixed part by reading eight bytes from ``/dev/urandom``. The buffer so obtained is tested against the fixed parts used so far to prevent accidental reuse of IVs. After a configurable number of attempts to create a unique fixed part, it will refuse to continue with an ``IVFixedPartError``. This is unlikely to ever happen on a normal system but may detect an issue with the random generator. The list of fixed parts that were used by the context at hand can be accessed through the ``.fixed`` list. Its last element is the fixed part currently in use. """ i = 0 while i < retries: fp = os.urandom (PDTCRYPT_IV_FIXEDPART_SIZE) if fp not in self.fixed: self.fixed.append (fp) return i += 1 raise IVFixedPartError ("error obtaining a unique IV fixed part from " "/dev/urandom; giving up after %d tries" % i) def iv_make (self): """ Construct a 12-bytes IV from the current fixed part and the object counter. """ return struct.pack(FMT_I2N_IV, self.fixed [-1], self.cnt) def next (self, filename=None, counter=None): """ Prepare for encrypting the next incoming object. Update the counter and put together the IV, possibly changing prefixes. Then create the new encryptor. The argument ``counter`` can be used to specify a file counter for this object. Unless it is one of the reserved values, the counter of subsequent objects will be computed from this one. If this is the first object in a series, ``filename`` is required, otherwise it is reused if not present. The value is used to derive a header sized placeholder to use until after encryption when all the inputs to construct the final header are available. This is then matched in ``.done()`` against the value found at the position of the header. The motivation for this extra check is primarily to assist format debugging: It makes stray headers easy to spot in malformed PDTCRYPT files. """ if filename is None: if self.lastinfo is None: raise InvalidParameter ("next: filename is mandatory for " "first object") filename, _dummy = self.lastinfo else: if isinstance (filename, str) is False: raise InvalidParameter ("next: filename must be a string, no %s" % type (filename)) if counter is not None: if isinstance (counter, int) is False: raise InvalidParameter ("next: the supplied counter is of " "invalid type %s; please pass an " "integer instead" % type (counter)) self.set_object_counter (counter) self.iv = self.iv_make () if self.paramenc == "aes-gcm": self.enc = Cipher \ ( algorithms.AES (self.key) , modes.GCM (self.iv) , backend = default_backend ()) \ .encryptor () elif self.paramenc == "passthrough": self.enc = PassthroughCipher () else: raise InvalidParameter ("next: parameter version %d not known" % self.paramversion) hdrdum = hdr_make_dummy (filename) self.lastinfo = (filename, hdrdum) self.check_duplicate_iv (self.iv) super().next (self.password, self.paramversion, self.nacl) self.set_object_counter (self.cnt + 1) return hdrdum def check_duplicate_iv (self, iv): """ Add an IV (the 12 byte representation as in the header) to the list. With strict checking enabled, this will throw a ``DuplicateIV``. Depending on the context, this may indicate a serious error (IV reuse). IVs are only tracked in strict_ivs mode. """ if self.strict_ivs is False: return if iv in self.used_ivs: raise DuplicateIV ("iv %s was reused" % iv_fmt (iv)) # vi has not been used before; add to collection self.used_ivs.add (iv) def done (self, cmpdata): """ Complete encryption of an object. After this has been called, attempts of encrypting further data will cause an error until ``.next()`` is invoked properly. Returns a 64 bytes buffer containing the object header including all values including the “late” ones e. g. the ciphertext size and the GCM tag. """ if isinstance (cmpdata, bytes) is False: raise InvalidParameter ("done: comparison input expected as bytes, " "not %s" % type (cmpdata)) if self.lastinfo is None: raise RuntimeError ("done: encryption context not initialized") filename, hdrdum = self.lastinfo if cmpdata != hdrdum: raise RuntimeError ("done: bad sync of header for object %d: " "preliminary data does not match; this likely " "indicates a wrongly repositioned stream" % self.cnt) data = self.enc.finalize () self.stats ["out"] += len (data) self.ctsize += len (data) ok, hdr = hdr_from_params (self.version, self.paramversion, self.nacl, self.iv, self.ctsize, self.enc.tag) if ok is False: raise InternalError ("error constructing header: %r" % hdr) return data, hdr, self.fixed def process (self, buf): """ Encrypt a chunk of plaintext with the active encryptor. Returns the size of the input consumed. This **must** be checked downstream. If the maximum possible object size has been reached, the current context must be finalized and a new one established before any further data can be encrypted. The second argument is the remainder of the plaintext that was not encrypted for the caller to use immediately after the new context is ready. """ if isinstance (buf, bytes) is False: raise InvalidParameter ("process: expected byte buffer, not %s" % type (buf)) bsize = len (buf) newptsize = self.ptsize + bsize diff = newptsize - PDTCRYPT_MAX_OBJ_SIZE if diff > 0: bsize -= diff newptsize = PDTCRYPT_MAX_OBJ_SIZE self.ptsize = newptsize data = super().process (buf [:bsize]) self.ctsize += len (data) return bsize, data class Decrypt (Crypto): tag = None # GCM tag, part of header last_iv = None # check consecutive ivs in strict mode hdr_ctsize = -1 def __init__ (self, password=None, key=None, counter=None, fixedparts=None, strict_ivs=True, insecure=False): """ Sanitizing ctor for the decryption context. ``fixedparts`` specifies a list of IV fixed parts accepted during decryption. If a fixed part is encountered that is not in the list, decryption will fail. :param password: mutually exclusive with ``key`` :type password: bytes :param key: mutually exclusive with ``password`` :type key: bytes :type counter: initial object counter the values ``AES_GCM_IV_CNT_INFOFILE`` and ``AES_GCM_IV_CNT_INDEX`` are unique in each backup set and cannot be reused even with different fixed parts. :type fixedparts: bytes list :type strict_ivs: bool :param strict_ivs: fail if IVs of decrypted objects are not linearly increasing :type insecure: bool :param insecure: whether to process objects encrypted in passthrough mode (*``paramversion`` < 1*) *Security considerations*: The ``strict_ivs`` setting protects against ciphertext reordering and injection attacks. For this to work it relies on a property of how the object counters are created during encryption. If multiple ``Encrypt`` handles have been used during encryption, this is property is unlikely to apply as it would require manual management of counters across Encrypt handles. In these cases it may thus be necessary to disable the ```strict_ivs`` protection. """ if password is None and key is None \ or password is not None and key is not None : raise InvalidParameter ("__init__: need either key or password") if key is not None: if isinstance (key, bytes) is False: raise InvalidParameter ("__init__: key must be provided as " "bytes, not %s" % type (key)) else: # password, no key if isinstance (password, str) is False: raise InvalidParameter ("__init__: password must be a string, not %s" % type (password)) if len (password) == 0: raise InvalidParameter ("__init__: supplied empty password but not " "permitted for PDT encrypted files") # fixed parts if fixedparts is not None: if isinstance (fixedparts, list) is False: raise InvalidParameter ("__init__: IV fixed parts must be " "supplied as list, not %s" % type (fixedparts)) self.fixed = fixedparts self.fixed.sort () super().__init__ (password=password, key=key, counter=counter, strict_ivs=strict_ivs, insecure=insecure) def valid_fixed_part (self, iv): """ Check if a fixed part was already seen. """ # check if fixed part is known fixed, _cnt = struct.unpack (FMT_I2N_IV, iv) i = bisect.bisect_left (self.fixed, fixed) return i != len (self.fixed) and self.fixed [i] == fixed def reset_last_iv (self): """ Force a new IV sequence start. The last IV counter will be set from the next IV encountered and the check for consecutive IVs will be suppressed. The intended use is backup volume boundaries or handling batches of objects encrypted with ``Encrypt`` handles initialized with different initial counter values. """ self.last_iv = None def check_consecutive_iv (self, iv): """ Check whether the counter part of the given IV is indeed the successor of the currently present counter. This should always be the case for the objects in a well formed PDT archive but should not be enforced when decrypting out-of-order. """ fixed, cnt = struct.unpack (FMT_I2N_IV, iv) if self.strict_ivs is True \ and self.last_iv is not None \ and self.last_iv [0] == fixed \ and self.last_iv [1] + 1 != cnt: raise NonConsecutiveIV ("iv %s counter not successor of " "last object (expected %d, found %d)" % (iv_fmt (iv), self.last_iv [1] + 1, cnt)) self.last_iv = (fixed, cnt) def next (self, hdr): """ Start decrypting the next object. The PDTCRYPT header for the object can be given either as already parsed object or as bytes. """ if isinstance (hdr, bytes) is True: hdr = hdr_read (hdr) elif isinstance (hdr, dict) is False: # this won’t catch malformed specs though raise InvalidParameter ("next: wrong type of parameter hdr: " "expected bytes or spec, got %s" % type (hdr)) try: paramversion = hdr ["paramversion"] nacl = hdr ["nacl"] iv = hdr ["iv"] tag = hdr ["tag"] ctsize = hdr ["ctsize"] except KeyError: raise InvalidHeader ("next: not a header %r" % hdr) if ctsize > PDTCRYPT_MAX_OBJ_SIZE: raise InvalidHeader ("next: ciphertext size %d exceeds maximum " "object size (%d)" % (ctsize, PDTCRYPT_MAX_OBJ_SIZE)) self.hdr_ctsize = ctsize super().next (self.password, paramversion, nacl) if self.fixed is not None and self.valid_fixed_part (iv) is False: raise InvalidIVFixedPart ("iv %s has invalid fixed part" % iv_fmt (iv)) self.check_consecutive_iv (iv) self.tag = tag defs = ENCRYPTION_PARAMETERS.get (paramversion, None) if defs is None: raise FormatError ("header contains unknown parameter version %d; " "maybe the file was created by a more recent " "version of Deltatar" % paramversion) enc = defs ["enc"] if enc == "aes-gcm": self.enc = Cipher \ ( algorithms.AES (self.key) , modes.GCM (iv, tag=self.tag) , backend = default_backend ()) \ . decryptor () elif enc == "passthrough": self.enc = PassthroughCipher () else: raise InternalError ("encryption parameter set %d refers to unknown " "mode %r" % (paramversion, enc)) self.set_object_counter (self.cnt + 1) def done (self, tag=None): """ Stop decryption of the current object and finalize it with the active context. This will throw an *InvalidGCMTag* exception to indicate that the authentication tag does not match the data. If the tag is correct, the rest of the plaintext is returned. """ data = b"" try: if tag is None: data = self.enc.finalize () else: if isinstance (tag, bytes) is False: raise InvalidParameter ("done: wrong type of parameter " "tag: expected bytes, got %s" % type (tag)) data = self.enc.finalize_with_tag (self.tag) except cryptography.exceptions.InvalidTag: raise InvalidGCMTag ("done: tag mismatch of object %d: %s " "rejected by finalize ()" % (self.cnt, binascii.hexlify (self.tag))) self.ptsize += len (data) self.stats ["out"] += len (data) assert self.ctsize == self.ptsize == self.hdr_ctsize return data def process (self, buf): """ Decrypt the bytes object *buf* with the active decryptor. """ if isinstance (buf, bytes) is False: raise InvalidParameter ("process: expected byte buffer, not %s" % type (buf)) self.ctsize += len (buf) if self.ctsize > self.hdr_ctsize: raise CiphertextTooLong ("process: object length exceeded: got " "%d B but header specfiies %d B" % (self.ctsize, self.hdr_ctsize)) data = super().process (buf) self.ptsize += len (data) return data ############################################################################### ## testing helpers ############################################################################### def _patch_global (glob, vow, n=None): """ Adapt upper file counter bound for testing IV logic. Completely unsafe. """ assert vow == "I am fully aware that this will void my warranty." r = globals () [glob] if n is None: n = globals () [glob + "_DEFAULT"] globals () [glob] = n return r _testing_set_AES_GCM_IV_CNT_MAX = \ partial (_patch_global, "AES_GCM_IV_CNT_MAX") _testing_set_PDTCRYPT_MAX_OBJ_SIZE = \ partial (_patch_global, "PDTCRYPT_MAX_OBJ_SIZE") def open2_dump_file (fname, dir_fd, force=False): outfd = -1 oflags = os.O_CREAT | os.O_WRONLY if force is True: oflags |= os.O_TRUNC else: oflags |= os.O_EXCL try: outfd = os.open (fname, oflags, stat.S_IRUSR | stat.S_IWUSR, dir_fd=dir_fd) except FileExistsError as exn: noise ("PDT: refusing to overwrite existing file %s" % fname) noise ("") raise RuntimeError ("destination file %s already exists" % fname) if PDTCRYPT_VERBOSE is True: noise ("PDT: new output file %s (fd=%d)" % (fname, outfd)) return outfd ############################################################################### ## freestanding invocation ############################################################################### PDTCRYPT_SUB_PROCESS = 0 PDTCRYPT_SUB_SCRYPT = 1 PDTCRYPT_SUB_SCAN = 2 PDTCRYPT_SUB_IVCHECK = 3 PDTCRYPT_SUB = \ { "process" : PDTCRYPT_SUB_PROCESS , "scrypt" : PDTCRYPT_SUB_SCRYPT , "scan" : PDTCRYPT_SUB_SCAN , "ivcheck" : PDTCRYPT_SUB_IVCHECK } PDTCRYPT_DECRYPT = 1 << 0 # decrypt archive with password PDTCRYPT_SPLIT = 1 << 1 # split archive into individual objects PDTCRYPT_HASH = 1 << 2 # output scrypt hash for file and given password PDTCRYPT_SPLITNAME = "pdtcrypt-object-%d.bin" PDTCRYPT_RESCUENAME = "pdtcrypt-rescue-object-%0.5d.bin" PDTCRYPT_VERBOSE = False PDTCRYPT_STRICTIVS = False PDTCRYPT_OVERWRITE = False PDTCRYPT_BLOCKSIZE = 1 << 12 PDTCRYPT_SINK = 0 PDTCRYPT_SOURCE = 1 SELF = None PDTCRYPT_DEFAULT_VER = 1 PDTCRYPT_DEFAULT_PVER = 1 # scrypt hashing output control PDTCRYPT_SCRYPT_INTRANATOR = 0 PDTCRYPT_SCRYPT_PARAMETERS = 1 PDTCRYPT_SCRYPT_DEFAULT = PDTCRYPT_SCRYPT_INTRANATOR PDTCRYPT_SCRYPT_FORMAT = \ { "i2n" : PDTCRYPT_SCRYPT_INTRANATOR , "params" : PDTCRYPT_SCRYPT_PARAMETERS } PDTCRYPT_TT_COLUMNS = 80 # assume standard terminal class PDTDecryptionError (Exception): """Decryption failed.""" class PDTSplitError (Exception): """Decryption failed.""" def noise (*a, **b): print (file=sys.stderr, *a, **b) class PassthroughDecryptor (object): curhdr = None # write current header on first data write def __init__ (self): if PDTCRYPT_VERBOSE is True: noise ("PDT: no encryption; data passthrough") def next (self, hdr): ok, curhdr = hdr_make (hdr) if ok is False: raise PDTDecryptionError ("bad header %r" % hdr) self.curhdr = curhdr def done (self): if self.curhdr is not None: return self.curhdr return b"" def process (self, d): if self.curhdr is not None: d = self.curhdr + d self.curhdr = None return d def check_ivs (ifs): """ Walk the objects in the given reader, validating uniqueness and consecutiveness of the IVs in the object headers. As the IVs are metadata this does not require decryption. """ objs = 0 seen = set () last = None while True: try: hdr = hdr_read_stream (ifs) except EndOfFile as exn: break # done objs += 1 cur = hdr ["iv"] fixed, cnt = struct.unpack (FMT_I2N_IV, cur) if PDTCRYPT_VERBOSE is True: noise ("PDT: obj %d, iv %s" % (objs, iv_fmt (cur))) if last is not None: if fixed != last [0]: noise ("PDT: obj %d, fixed part changed last: %s → this: %s" % (obj, binascii.hexlify (last [0]), binascii.hexlify (fixed))) if cnt != last [1] + 1: raise NonConsecutiveIV ("iv %s counter not successor of " "last object (expected %d, found %d)" % (iv_fmt (cur), last [1] + 1, cnt)) if cur in seen: raise DuplicateIV ("iv %s was reused" % iv_fmt (cur)) seen.add (cur) last = (fixed, cnt) ifs.read (hdr ["ctsize"]) return objs def depdtcrypt (mode, secret, ins, outs): """ Remove PDTCRYPT layer from all objects encrypted with the secret. Used on a Deltatar backup this will yield a (possibly Gzip compressed) tarball. """ ctleft = -1 # length of ciphertext to consume ctcurrent = 0 # total ciphertext of current object total_obj = 0 # total number of objects read total_pt = 0 # total plaintext bytes total_ct = 0 # total ciphertext bytes total_read = 0 # total bytes read outfile = None # Python file object for output if mode & PDTCRYPT_DECRYPT: # decryptor ks = secret [0] if ks == PDTCRYPT_SECRET_PW: decr = Decrypt (password=secret [1], strict_ivs=PDTCRYPT_STRICTIVS) elif ks == PDTCRYPT_SECRET_KEY: key = secret [1] decr = Decrypt (key=key, strict_ivs=PDTCRYPT_STRICTIVS) else: raise InternalError ("‘%d’ does not specify a valid kind of secret" % ks) else: decr = PassthroughDecryptor () def nextout (_): """Dummy for non-split mode: output file does not vary.""" return outs if mode & PDTCRYPT_SPLIT: def nextout (outfile): """ We were passed an fd as outs for accessing the destination directory where extracted archive components are supposed to end up in. """ if outfile is None: if PDTCRYPT_VERBOSE is True: noise ("PDT: no output file to close at this point") else: if PDTCRYPT_VERBOSE is True: noise ("PDT: release output file %r" % outfile) # cleanup happens automatically by the GC; the next # line will error out on account of an invalid fd #outfile.close () assert total_obj > 0 fname = PDTCRYPT_SPLITNAME % total_obj try: outfd = open2_dump_file (fname, outs, force=PDTCRYPT_OVERWRITE) except RuntimeError as exn: raise PDTSplitError (exn) return os.fdopen (outfd, "wb", closefd=True) def tell (s): """ESPIPE is normal on non-seekable stdio stream.""" try: return s.tell () except OSError as exn: if exn.errno == errno.ESPIPE: return -1 def out (pt, outfile): npt = len (pt) nonlocal total_pt total_pt += npt if PDTCRYPT_VERBOSE is True: noise ("PDT:\t· decrypt plaintext %d B" % (npt)) try: nn = outfile.write (pt) except OSError as exn: # probably ENOSPC raise DecryptionError ("error (%s)" % exn) if nn != npt: raise DecryptionError ("write aborted after %d of %d B" % (nn, npt)) while True: if ctleft <= 0: # current object completed; in a valid archive this marks either # the start of a new header or the end of the input if ctleft == 0: # current object requires finalization if PDTCRYPT_VERBOSE is True: noise ("PDT: %d finalize" % tell (ins)) try: pt = decr.done () except InvalidGCMTag as exn: raise DecryptionError ("error finalizing object %d (%d B): " "%r" % (total_obj, len (pt), exn)) \ from exn out (pt, outfile) if PDTCRYPT_VERBOSE is True: noise ("PDT:\t· object validated") if PDTCRYPT_VERBOSE is True: noise ("PDT: %d hdr" % tell (ins)) try: hdr = hdr_read_stream (ins) total_read += PDTCRYPT_HDR_SIZE except EndOfFile as exn: total_read += exn.remainder if total_ct + total_obj * PDTCRYPT_HDR_SIZE != total_read: raise PDTDecryptionError ("ciphertext processed (%d B) plus " "overhead (%d × %d B) does not match " "the number of bytes read (%d )" % (total_ct, total_obj, PDTCRYPT_HDR_SIZE, total_read)) # the single good exit return total_read, total_obj, total_ct, total_pt except InvalidHeader as exn: raise PDTDecryptionError ("invalid header at position %d in %r " "(%s)" % (tell (ins), exn, ins)) if PDTCRYPT_VERBOSE is True: pretty = hdr_fmt_pretty (hdr) noise (reduce (lambda a, e: (a + "\n" if a else "") + "PDT:\t· " + e, pretty.splitlines (), "")) ctcurrent = ctleft = hdr ["ctsize"] decr.next (hdr) total_obj += 1 # used in file counter with split mode # finalization complete or skipped in case of first object in # stream; create a new output file if necessary outfile = nextout (outfile) if PDTCRYPT_VERBOSE is True: noise ("PDT: %d decrypt obj no. %d, %d B" % (tell (ins), total_obj, ctleft)) # always allocate a new buffer since python-cryptography doesn’t allow # passing a bytearray :/ nexpect = min (ctleft, PDTCRYPT_BLOCKSIZE) if PDTCRYPT_VERBOSE is True: noise ("PDT:\t· [%d] %d%% done, read block (%d B of %d B remaining)" % (tell (ins), 100 - ctleft * 100 / (ctcurrent > 0 and ctcurrent or 1), nexpect, ctleft)) ct = ins.read (nexpect) nct = len (ct) if nct < nexpect: off = tell (ins) raise EndOfFile (nct, "hit EOF after %d of %d B in block [%d:%d); " "%d B ciphertext remaining for object no %d" % (nct, nexpect, off, off + nexpect, ctleft, total_obj)) ctleft -= nct total_ct += nct total_read += nct if PDTCRYPT_VERBOSE is True: noise ("PDT:\t· decrypt ciphertext %d B" % (nct)) pt = decr.process (ct) out (pt, outfile) def deptdcrypt_mk_stream (kind, path): """Create stream from file or stdio descriptor.""" if kind == PDTCRYPT_SINK: if path == "-": if PDTCRYPT_VERBOSE is True: noise ("PDT: sink: stdout") return sys.stdout.buffer else: if PDTCRYPT_VERBOSE is True: noise ("PDT: sink: file %s" % path) return io.FileIO (path, "w") if kind == PDTCRYPT_SOURCE: if path == "-": if PDTCRYPT_VERBOSE is True: noise ("PDT: source: stdin") return sys.stdin.buffer else: if PDTCRYPT_VERBOSE is True: noise ("PDT: source: file %s" % path) return io.FileIO (path, "r") raise ValueError ("bogus stream “%s” / %s" % (kind, path)) def mode_depdtcrypt (mode, secret, ins, outs): try: total_read, total_obj, total_ct, total_pt = \ depdtcrypt (mode, secret, ins, outs) except DecryptionError as exn: noise ("PDT: Decryption failed:") noise ("PDT:") noise ("PDT: “%s”" % exn) noise ("PDT:") noise ("PDT: Did you specify the correct key / password?") noise ("") return 1 except PDTSplitError as exn: noise ("PDT: Split operation failed:") noise ("PDT:") noise ("PDT: “%s”" % exn) noise ("PDT:") noise ("PDT: Hint: target directory should be empty.") noise ("") return 1 if PDTCRYPT_VERBOSE is True: noise ("PDT: decryption successful" ) noise ("PDT: %.10d bytes read" % total_read) noise ("PDT: %.10d objects decrypted" % total_obj ) noise ("PDT: %.10d bytes ciphertext" % total_ct ) noise ("PDT: %.10d bytes plaintext" % total_pt ) noise ("" ) return 0 def mode_scrypt (pw, ins=None, nacl=None, fmt=PDTCRYPT_SCRYPT_INTRANATOR): hsh = None paramversion = PDTCRYPT_DEFAULT_PVER if ins is not None: hsh, nacl, version, paramversion = scrypt_hashsource (pw, ins) defs = ENCRYPTION_PARAMETERS.get(paramversion, None) else: nacl = binascii.unhexlify (nacl) defs = ENCRYPTION_PARAMETERS.get(paramversion, None) version = PDTCRYPT_DEFAULT_VER kdfname, params = defs ["kdf"] if hsh is None: kdf = kdf_by_version (None, defs) hsh, _void = kdf (pw, nacl) import json if fmt == PDTCRYPT_SCRYPT_INTRANATOR: out = json.dumps ({ "salt" : base64.b64encode (nacl).decode () , "key" : base64.b64encode (hsh) .decode () , "paramversion" : paramversion }) elif fmt == PDTCRYPT_SCRYPT_PARAMETERS: out = json.dumps ({ "salt" : binascii.hexlify (nacl).decode () , "key" : binascii.hexlify (hsh) .decode () , "version" : version , "scrypt_params" : { "N" : params ["N"] , "r" : params ["r"] , "p" : params ["p"] , "dkLen" : params ["dkLen"] } }) else: raise RuntimeError ("bad scrypt output scheme %r" % fmt) print (out) def noise_output_candidates (cands, indent=8, cols=PDTCRYPT_TT_COLUMNS): """ Print a list of offsets without garbling the terminal too much. The indent is counted from column zero; if it is wide enough, the “PDT: ” marker will be prepended, considered part of the indentation. """ wd = cols - 1 nc = len (cands) idt = " " * indent if indent < 5 else "PDT: " + " " * (indent - 5) line = idt lpos = indent sep = "," lsep = len (sep) init = True # prevent leading separator if indent >= wd: raise ValueError ("the requested indentation exceeds the line " "width by %d" % (indent - wd)) for n in cands: ns = "%d" % n lns = len (ns) if init is False: line += sep lpos += lsep lpos += lns if lpos > wd: # line break noise (line) line = idt lpos = indent + lns elif init is True: init = False else: # space line += ' ' lpos += 1 line += ns if lpos != indent: noise (line) SLICE_START = 1 # ordering is important to have starts of intervals SLICE_END = 0 # sorted before equal ends def find_overlaps (slices): """ Find overlapping slices: iterate open/close points of intervals, tracking the ones open at any time. """ bounds = [] inside = set () # of indices into bounds ovrlp = set () # of indices into bounds for i, s in enumerate (slices): bounds.append ((s [0], SLICE_START, i)) bounds.append ((s [1], SLICE_END , i)) bounds = sorted (bounds) for val in bounds: i = val [2] if val [1] == SLICE_START: inside.add (i) else: if len (inside) > 1: # closing one that overlapped ovrlp |= inside inside.remove (i) return [ slices [i] for i in ovrlp ] def mode_ivcheck (ifd): total_obj = 0 try: total_obj = check_ivs (ifd) except (NonConsecutiveIV, DuplicateIV) as exn: noise ("PDT: Detected inconsistent initialization vectors") noise ("PDT:") noise ("PDT: “%s”" % exn) noise ("PDT:") noise ("") return 1 except Exception as exn: noise ("PDT: Hit an error unrelated to checking IVs") noise ("PDT:") noise ("PDT: “%s”" % exn) noise ("PDT:") return 1 noise ("PDT: Successfully traversed %d encrypted objects in input." % total_obj) noise ("PDT:") noise ("PDT: All IVs consecutive and unique.") def mode_scan (secret, fname, outs=None, nacl=None): """ Dissect a binary file, looking for PDTCRYPT headers and objects. If *outs* is supplied, recoverable data will be dumped into the specified directory. """ try: ifd = os.open (fname, os.O_RDONLY) except FileNotFoundError: noise ("PDT: failed to open %s readonly" % fname) noise ("") usage (err=True) try: if PDTCRYPT_VERBOSE is True: noise ("PDT: scan for potential sync points") cands = locate_hdr_candidates (ifd) if len (cands) == 0: noise ("PDT: scan complete: input does not contain potential PDT " "headers; giving up.") return -1 if PDTCRYPT_VERBOSE is True: noise ("PDT: scan complete: found %d candidates:" % len (cands)) noise_output_candidates (cands) except: os.close (ifd) raise junk, todo, slices = [], [], [] try: nobj = 0 for cand in cands: nobj += 1 vdt, hdr = inspect_hdr (ifd, cand) vdts = verdict_fmt (vdt) if vdt == HDR_CAND_JUNK: noise ("PDT: obj %d: %s object: bad header, skipping" % vdts) junk.append (cand) else: off0 = cand + PDTCRYPT_HDR_SIZE if PDTCRYPT_VERBOSE is True: noise ("PDT: obj %d: read payload @%d" % (nobj, off0)) pretty = hdr_fmt_pretty (hdr) noise (reduce (lambda a, e: (a + "\n" if a else "") + "PDT:\t· " + e, pretty.splitlines (), "")) ofd = -1 if outs is not None: ofname = PDTCRYPT_RESCUENAME % nobj ofd = open2_dump_file (ofname, outs, force=PDTCRYPT_OVERWRITE) ctsize = hdr ["ctsize"] try: l = try_decrypt (ifd, off0, hdr, secret, ofd=ofd) ok = l == ctsize slices.append ((off0, off0 + l)) finally: if ofd != -1: os.close (ofd) if vdt == HDR_CAND_GOOD and ok is True: noise ("PDT: %d → ✓ %s object %d–%d" % (cand, vdts, off0, off0 + ctsize)) elif vdt == HDR_CAND_FISHY and ok is True: noise ("PDT: %d → × %s object %d–%d, corrupt header" % (cand, vdts, off0, off0 + ctsize)) elif vdt == HDR_CAND_GOOD and ok is False: noise ("PDT: %d → × %s object %d–%d, problematic payload" % (cand, vdts, off0, off0 + ctsize)) elif vdt == HDR_CAND_FISHY and ok is False: noise ("PDT: %d → × %s object %d–%d, corrupt header, problematic " "ciphertext" % (cand, vdts, off0, off0 + ctsize)) else: raise Unreachable finally: os.close (ifd) if len (junk) == 0: noise ("PDT: all headers ok") else: noise ("PDT: %d candidates not parseable as headers:" % len (junk)) noise_output_candidates (junk) overlap = find_overlaps (slices) if len (overlap) > 0: noise ("PDT: %d objects overlapping others" % len (overlap)) for slice in overlap: noise ("PDT: × %d→%d" % (slice [0], slice [1])) def usage (err=False): out = print if err is True: out = noise indent = ' ' * len (SELF) out ("usage: %s SUBCOMMAND { --help" % SELF) out (" %s | [ -v ] { -p PASSWORD | -k KEY }" % indent) out (" %s [ { -i | --in } { - | SOURCE } ]" % indent) out (" %s [ { -n | --nacl } { SALT } ]" % indent) out (" %s [ { -o | --out } { - | DESTINATION } ]" % indent) out (" %s [ -D | --no-decrypt ] [ -S | --split ]" % indent) out (" %s [ -f | --format ]" % indent) out ("") out ("\twhere") out ("\t\tSUBCOMMAND main mode: { process | scrypt | scan | ivcheck }") out ("\t\t where:") out ("\t\t process: extract objects from PDT archive") out ("\t\t scrypt: calculate hash from password and first object") out ("\t\t scan: scan input for PDTCRYPT headers") out ("\t\t ivcheck: check whether IVs are consecutive") out ("\t\t-p PASSWORD password to derive the encryption key from") out ("\t\t-k KEY encryption key as 16 bytes in hexadecimal notation") out ("\t\t-s enforce strict handling of initialization vectors") out ("\t\t-i SOURCE file name to read from") out ("\t\t-o DESTINATION file to write output to") out ("\t\t-n SALT provide salt for scrypt mode in hex encoding") out ("\t\t-v print extra info") out ("\t\t-S split into files at object boundaries; this") out ("\t\t requires DESTINATION to refer to directory") out ("\t\t-D PDT header and ciphertext passthrough") out ("\t\t-f format of SCRYPT hash output (“default” or “parameters”)") out ("") out ("\tinstead of filenames, “-” may used to specify stdin / stdout") out ("") sys.exit ((err is True) and 42 or 0) def bail (msg): noise (msg) noise ("") usage (err=True) raise Unreachable def parse_argv (argv): global PDTCRYPT_OVERWRITE global SELF mode = PDTCRYPT_DECRYPT secret = None insspec = None outsspec = None outs = None nacl = None scrypt_format = PDTCRYPT_SCRYPT_DEFAULT argvi = iter (argv) SELF = os.path.basename (next (argvi)) try: rawsubcmd = next (argvi) subcommand = PDTCRYPT_SUB [rawsubcmd] except StopIteration: bail ("ERROR: subcommand required") except KeyError: bail ("ERROR: invalid subcommand “%s” specified" % rawsubcmd) def checked_arg (): nonlocal argvi try: return next (argvi) except StopIteration: bail ("ERROR: argument list incomplete") def checked_secret (s): nonlocal secret if secret is None: secret = s else: bail ("ERROR: encountered “%s” but secret already given" % arg) for arg in argvi: if arg in [ "-h", "--help" ]: usage () raise Unreachable elif arg in [ "-v", "--verbose", "--wtf" ]: global PDTCRYPT_VERBOSE PDTCRYPT_VERBOSE = True elif arg in [ "-i", "--in", "--source" ]: insspec = checked_arg () if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypt from %s" % insspec) elif arg in [ "-p", "--password" ]: arg = checked_arg () checked_secret (make_secret (password=arg)) if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypting with password") else: if subcommand == PDTCRYPT_SUB_PROCESS: if arg in [ "-s", "--strict-ivs" ]: global PDTCRYPT_STRICTIVS PDTCRYPT_STRICTIVS = True elif arg in [ "-o", "--out", "--dest", "--sink" ]: outsspec = checked_arg () if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypt to %s" % outsspec) elif arg in [ "-f", "--force" ]: PDTCRYPT_OVERWRITE = True if PDTCRYPT_VERBOSE is True: noise ("PDT: overwrite existing files") elif arg in [ "-S", "--split" ]: mode |= PDTCRYPT_SPLIT if PDTCRYPT_VERBOSE is True: noise ("PDT: split files") elif arg in [ "-D", "--no-decrypt" ]: mode &= ~PDTCRYPT_DECRYPT if PDTCRYPT_VERBOSE is True: noise ("PDT: not decrypting") elif arg in [ "-k", "--key" ]: arg = checked_arg () checked_secret (make_secret (key=arg)) if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypting with key") else: bail ("ERROR: unexpected positional argument “%s”" % arg) elif subcommand == PDTCRYPT_SUB_SCRYPT: if arg in [ "-n", "--nacl", "--salt" ]: nacl = checked_arg () if PDTCRYPT_VERBOSE is True: noise ("PDT: salt key with %s" % nacl) elif arg in [ "-f", "--format" ]: arg = checked_arg () try: scrypt_format = PDTCRYPT_SCRYPT_FORMAT [arg] except KeyError: bail ("ERROR: invalid scrypt output format %s" % arg) if PDTCRYPT_VERBOSE is True: noise ("PDT: scrypt output format “%s”" % scrypt_format) else: bail ("ERROR: unexpected positional argument “%s”" % arg) elif subcommand == PDTCRYPT_SUB_SCAN: if arg in [ "-o", "--out", "--dest", "--sink" ]: outsspec = checked_arg () if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypt to %s" % outsspec) elif arg in [ "-f", "--force" ]: PDTCRYPT_OVERWRITE = True if PDTCRYPT_VERBOSE is True: noise ("PDT: overwrite existing files") else: bail ("ERROR: unexpected positional argument “%s”" % arg) if secret is None: if PDTCRYPT_VERBOSE is True: noise ("ERROR: no password or key specified, trying $PDTCRYPT_PASSWORD") epw = os.getenv ("PDTCRYPT_PASSWORD") if epw is not None: checked_secret (make_secret (password=epw.strip ())) if secret is None: if PDTCRYPT_VERBOSE is True: noise ("ERROR: no password or key specified, trying $PDTCRYPT_KEY") ek = os.getenv ("PDTCRYPT_KEY") if ek is not None: checked_secret (make_secret (key=ek.strip ())) if secret is None: if subcommand == PDTCRYPT_SUB_IVCHECK: pass elif subcommand == PDTCRYPT_SUB_SCRYPT: bail ("ERROR: scrypt hash mode requested but no password given") elif mode & PDTCRYPT_DECRYPT: bail ("ERROR: decryption requested but no password given") if mode & PDTCRYPT_SPLIT and outsspec is None: bail ("ERROR: split mode is incompatible with stdout sink " "(the default)") if subcommand == PDTCRYPT_SUB_SCAN and outsspec is None: pass # no output by default in scan mode elif mode & PDTCRYPT_SPLIT or subcommand == PDTCRYPT_SUB_SCAN: # destination must be directory if outsspec == "-": bail ("ERROR: mode is incompatible with stdout sink") try: try: os.makedirs (outsspec, 0o700) except FileExistsError: # if it’s a directory with appropriate perms, everything is # good; otherwise, below invocation of open(2) will fail pass outs = os.open (outsspec, os.O_DIRECTORY, 0o600) except FileNotFoundError as exn: bail ("ERROR: cannot create target directory “%s”" % outsspec) except NotADirectoryError as exn: bail ("ERROR: target path “%s” is not a directory" % outsspec) else: outs = deptdcrypt_mk_stream (PDTCRYPT_SINK, outsspec or "-") if subcommand == PDTCRYPT_SUB_SCAN: if insspec is None: bail ("ERROR: please supply an input file for scanning") if insspec == '-': bail ("ERROR: input must be seekable; please specify a file") return True, partial (mode_scan, secret, insspec, outs, nacl=nacl) if subcommand == PDTCRYPT_SUB_IVCHECK: if insspec is None: bail ("ERROR: please supply an input file for checking ivs") if subcommand == PDTCRYPT_SUB_SCRYPT: if secret [0] == PDTCRYPT_SECRET_KEY: bail ("ERROR: scrypt mode requires a password") if insspec is not None and nacl is not None \ or insspec is None and nacl is None : bail ("ERROR: please supply either an input file or " "the salt") # default to stdout ins = None if insspec is not None or subcommand != PDTCRYPT_SUB_SCRYPT: ins = deptdcrypt_mk_stream (PDTCRYPT_SOURCE, insspec or "-") if subcommand == PDTCRYPT_SUB_IVCHECK: return True, partial (mode_ivcheck, ins) if subcommand == PDTCRYPT_SUB_SCRYPT: return True, partial (mode_scrypt, secret [1].encode (), ins, nacl, fmt=scrypt_format) return True, partial (mode_depdtcrypt, mode, secret, ins, outs) def main (argv): ok, runner = parse_argv (argv) if ok is True: return runner () return 1 if __name__ == "__main__": sys.exit (main (sys.argv))