import pylibscrypt
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
+import cryptography
__all__ = [ "hdr_make", "hdr_read", "hdr_fmt", "hdr_fmt_pretty"
###############################################################################
+## exceptions
+###############################################################################
+
+class EndOfFile (Exception):
+ """Reached EOF."""
+ pass
+
+class InvalidHeader (Exception):
+ """Header not valid."""
+ pass
+
+class DecryptionError (Exception):
+ """Error during decryption."""
+ pass
+
+
+###############################################################################
## crypto layer version
###############################################################################
mag, version, paramversion, nacl, iv, ctsize, tag = \
struct.unpack (FMT_I2N_HDR, data)
except Exception as exn:
- return False, "error reading header from [%r]: %s" % (data, str (exn))
+ raise InvalidHeader ("error unpacking header from [%r]: %s"
+ % (binascii.hexlify (data), str (exn)))
if mag != I2N_HDR_MAGIC:
- return False, "bad magic in header: expected [%s], got [%s]" \
- % (I2N_HDR_MAGIC, mag)
+ raise InvalidHeader ("bad magic in header: expected [%s], got [%s]"
+ % (I2N_HDR_MAGIC, mag))
- return True, \
+ return \
{ "version" : version
, "paramversion" : paramversion
, "nacl" : nacl
def hdr_read_stream (instr):
data = instr.read(I2N_HDR_SIZE)
if len (data) != I2N_HDR_SIZE:
- return False, "error reading from [%r]: expected %d B, received %d B" \
- % (instr, I2N_HDR_SIZE, len (data))
+ raise EndOfFile ("read: expected %d B, received %d B"
+ % (I2N_HDR_SIZE, len (data)))
return hdr_read (data)
hdr_dump = hex_spaced_of_bytes
-HDR_FMT_PRETTY = """
-version = %-4d : %s
-paramversion = %-4d : %s
-nacl : %s
-iv : %s
-ctsize = %-20d : %s
+HDR_FMT_PRETTY = \
+"""version = %-4d : %s
+paramversion = %-4d : %s
+nacl : %s
+iv : %s
+ctsize = %-20d : %s
+tag : %s
"""
def hdr_fmt_pretty (h):
hex_spaced_of_bytes (h["nacl"]),
hex_spaced_of_bytes (h["iv"]),
h["ctsize"],
- hex_spaced_of_bytes (struct.pack (FMT_UINT64_LE, h["ctsize"])))
+ hex_spaced_of_bytes (struct.pack (FMT_UINT64_LE, h["ctsize"])),
+ hex_spaced_of_bytes (h["tag"]))
def tag_fmt (t):
ret, data = True, self.enc.finalize ()
else:
ret, data = self.enc.finalize_with_tag (self.tag)
- except crypto.cryptography.exceptions.InvalidTag as exn:
+ except cryptography.exceptions.InvalidTag as exn:
return False, repr (exn)
self.ctsize += len (data)
self.stats ["out"] += len (data)
## freestanding invocation
###############################################################################
+PDTCRYPT_BLOCKSIZE = 1 << 12
+
+class PDTDecryptionError (Exception):
+ """Decryption failed."""
+
+
+def noise (*a, **b):
+ print (file=sys.stderr, *a, *b)
+
+
+def depdtcrypt (pw, ins, outs, verbose):
+ """
+ Remove PDTCRYPT layer from obj encrypted with pw. Used on a Deltatar
+ backup this will yield a (possibly Gzip compressed) tarball.
+ """
+ ctleft = -1 # length of ciphertext to consume
+ ctcurrent = 0 # total ciphertext of current object
+ decr = Decrypt (pw) # decryptor
+ total_obj = 0 # total number of objects read
+ total_pt = 0 # total plaintext bytes
+ total_ct = 0 # total ciphertext bytes
+ total_read = 0 # total bytes read
+
+ def out (pt):
+ npt = len (pt)
+ nonlocal total_pt
+ total_pt += npt
+ if verbose is True:
+ noise ("PDT:\t· decrypt plaintext %d B" % (npt))
+ try:
+ nn = outs.write (pt)
+ except OSError as exn: # probably ENOSPC
+ raise DecryptionError ("error (%s)" % exn)
+ if nn != npt:
+ raise DecryptionError ("write aborted after %d of %d B" % (nn, npt))
+
+ while True:
+ if ctleft <= 0:
+ # current object completed; in a valid archive this marks either
+ # the start of a new header or the end of the input
+ if ctleft == 0: # current object requires finalization
+ if verbose is True:
+ noise ("PDT: %d finalize" % ins.tell ())
+ ret, pt = decr.done ()
+ if ret is False:
+ raise DecryptionError ("error finalizing object (%s)"
+ % pt)
+ out (pt)
+ if verbose is True:
+ noise ("PDT:\t· object validated")
+
+ if verbose is True:
+ noise ("PDT: %d hdr" % ins.tell ())
+ try:
+ hdr = hdr_read_stream (ins)
+ total_read += I2N_HDR_SIZE
+ except EndOfFile:
+ if total_ct + total_obj * I2N_HDR_SIZE != total_read:
+ raise PDTDecryptionError ("ciphertext processed (%d B) plus "
+ "overhead (%d × %d B) does not match "
+ "the number of bytes read (%d )"
+ % (total_ct, total_obj, I2N_HDR_SIZE,
+ total_read))
+ # the single good exit
+ return total_read, total_obj, total_ct, total_pt
+ except InvalidHeader as exn:
+ raise PDTDecryptionError ("invalid header at position %d in %r "
+ "(%s)" % (exn, ins.tell (), ins))
+ if verbose is True:
+ pretty = hdr_fmt_pretty (hdr)
+ noise (reduce (lambda a, e: (a + "\n" if a else "") + "PDT:\t· " + e,
+ pretty.splitlines (), ""))
+ ctcurrent = ctleft = hdr ["ctsize"]
+ decr.next (hdr)
+ total_obj += 1
+
+ if verbose is True:
+ noise ("PDT: %d decrypt obj no. %d, %d B"
+ % (ins.tell (), total_obj, ctleft))
+
+ # always allocate a new buffer since python-cryptography doesn’t allow
+ # passing a bytearray :/
+ nexpect = min (ctleft, PDTCRYPT_BLOCKSIZE)
+ if verbose is True:
+ noise ("PDT:\t· [%d] %d%% done, read block (%d B of %d B remaining)"
+ % (ins.tell (),
+ 100 - ctleft * 100 / (ctcurrent > 0 and ctcurrent or 1),
+ nexpect, ctleft))
+ ct = ins.read (nexpect)
+ nct = len (ct)
+ if nct < nexpect:
+ off = ins.tell ()
+ raise EndOfFile ("hit EOF after %d of %d B in block [%d:%d); "
+ "%d B ciphertext remaining for object no %d"
+ % (nct, nexpect, off, off + nexpect, ctleft,
+ total_obj))
+ ctleft -= nct
+ total_ct += nct
+ total_read += nct
+
+ if verbose is True:
+ noise ("PDT:\t· decrypt ciphertext %d B" % (nct))
+ pt = decr.process (ct)
+ out (pt)
+
+
+def depdtcrypt_file (pw, spath, dpath, verbose=False):
+ """
+ Remove PDTCRYPT layer from file at ``spath`` using password ``pw``, writing
+ the decrypted result to dpath.
+ """
+ if verbose is True:
+ noise ("PDT: decrypt %s → %s" % (spath, dpath), file=sys.stderr)
+ with io.FileIO (spath, "r") as ins:
+ with io.FileIO (dpath, "w") as outs:
+ return depdtcrypt (pw, ins, outs, verbose)
+
+
def main (argv):
- print("For testing run unit tests, kthxbye!")
- return -1
+ total_read, total_obj, total_ct, total_pt = \
+ depdtcrypt_file (argv [1], argv [2], argv [3], verbose=True)
+ noise ("PDT: decryption successful" )
+ noise ("PDT: %.10d bytes read" % total_read)
+ noise ("PDT: %.10d objects decrypted" % total_obj )
+ noise ("PDT: %.10d bytes ciphertext" % total_ct )
+ noise ("PDT: %.10d bytes plaintext" % total_pt )
+ noise ("" )
+
if __name__ == "__main__":
sys.exit (main (sys.argv))