From 98fc07a53d3f86b5104d1c7c69087560a2022ffe Mon Sep 17 00:00:00 2001 From: Philipp Gesang Date: Tue, 4 Apr 2017 09:43:23 +0200 Subject: [PATCH] improve parameter handling of crypto.py --- deltatar/crypto.py | 131 ++++++++++++++++++++++++++++++++++++++++------------ 1 files changed, 101 insertions(+), 30 deletions(-) diff --git a/deltatar/crypto.py b/deltatar/crypto.py index 838c681..a0dd19a 100755 --- a/deltatar/crypto.py +++ b/deltatar/crypto.py @@ -75,6 +75,10 @@ class DecryptionError (Exception): """Error during decryption.""" pass +class Unreachable (Exception): + """Makeshift __builtin_unreachable().""" + pass + ############################################################################### ## crypto layer version @@ -575,7 +579,12 @@ class Decrypt (Crypto): ## freestanding invocation ############################################################################### +PDTCRYPT_VERBOSE = False PDTCRYPT_BLOCKSIZE = 1 << 12 +PDTCRYPT_SINK = 0 +PDTCRYPT_SOURCE = 1 +SELF = None + class PDTDecryptionError (Exception): """Decryption failed.""" @@ -585,7 +594,7 @@ def noise (*a, **b): print (file=sys.stderr, *a, *b) -def depdtcrypt (pw, ins, outs, verbose): +def depdtcrypt (pw, ins, outs): """ Remove PDTCRYPT layer from obj encrypted with pw. Used on a Deltatar backup this will yield a (possibly Gzip compressed) tarball. @@ -602,7 +611,7 @@ def depdtcrypt (pw, ins, outs, verbose): npt = len (pt) nonlocal total_pt total_pt += npt - if verbose is True: + if PDTCRYPT_VERBOSE is True: noise ("PDT:\t· decrypt plaintext %d B" % (npt)) try: nn = outs.write (pt) @@ -616,17 +625,17 @@ def depdtcrypt (pw, ins, outs, verbose): # current object completed; in a valid archive this marks either # the start of a new header or the end of the input if ctleft == 0: # current object requires finalization - if verbose is True: + if PDTCRYPT_VERBOSE is True: noise ("PDT: %d finalize" % ins.tell ()) ret, pt = decr.done () if ret is False: raise DecryptionError ("error finalizing object (%s)" % pt) out (pt) - if verbose is True: + if PDTCRYPT_VERBOSE is True: noise ("PDT:\t· object validated") - if verbose is True: + if PDTCRYPT_VERBOSE is True: noise ("PDT: %d hdr" % ins.tell ()) try: hdr = hdr_read_stream (ins) @@ -643,7 +652,7 @@ def depdtcrypt (pw, ins, outs, verbose): except InvalidHeader as exn: raise PDTDecryptionError ("invalid header at position %d in %r " "(%s)" % (exn, ins.tell (), ins)) - if verbose is True: + if PDTCRYPT_VERBOSE is True: pretty = hdr_fmt_pretty (hdr) noise (reduce (lambda a, e: (a + "\n" if a else "") + "PDT:\t· " + e, pretty.splitlines (), "")) @@ -651,14 +660,14 @@ def depdtcrypt (pw, ins, outs, verbose): decr.next (hdr) total_obj += 1 - if verbose is True: + if PDTCRYPT_VERBOSE is True: noise ("PDT: %d decrypt obj no. %d, %d B" % (ins.tell (), total_obj, ctleft)) # always allocate a new buffer since python-cryptography doesn’t allow # passing a bytearray :/ nexpect = min (ctleft, PDTCRYPT_BLOCKSIZE) - if verbose is True: + if PDTCRYPT_VERBOSE is True: noise ("PDT:\t· [%d] %d%% done, read block (%d B of %d B remaining)" % (ins.tell (), 100 - ctleft * 100 / (ctcurrent > 0 and ctcurrent or 1), @@ -675,55 +684,117 @@ def depdtcrypt (pw, ins, outs, verbose): total_ct += nct total_read += nct - if verbose is True: + if PDTCRYPT_VERBOSE is True: noise ("PDT:\t· decrypt ciphertext %d B" % (nct)) pt = decr.process (ct) out (pt) -PDT_SINK = 0 -PDT_SOURCE = 1 -def deptdcrypt_mk_stream (kind, path, verbose=False): +def deptdcrypt_mk_stream (kind, path): """Create stream from file or stdio descriptor.""" - if kind == PDT_SINK: + if kind == PDTCRYPT_SINK: if path == "-": - if verbose is True: noise ("PDT: sink: stdout") + if PDTCRYPT_VERBOSE is True: noise ("PDT: sink: stdout") return sys.stdout.buffer else: - if verbose is True: noise ("PDT: sink: file %s" % path) + if PDTCRYPT_VERBOSE is True: noise ("PDT: sink: file %s" % path) return io.FileIO (path, "w") - if kind == PDT_SOURCE: + if kind == PDTCRYPT_SOURCE: if path == "-": - if verbose is True: noise ("PDT: source: stdin") + if PDTCRYPT_VERBOSE is True: noise ("PDT: source: stdin") return sys.stdin.buffer else: - if verbose is True: noise ("PDT: source: file %s" % path) + if PDTCRYPT_VERBOSE is True: noise ("PDT: source: file %s" % path) return io.FileIO (path, "r") raise ValueError ("bogus stream “%s” / %s" % (kind, path)) -def depdtcrypt_file (pw, spath, dpath, verbose=False): +def depdtcrypt_file (pw, spath, dpath): """ Remove PDTCRYPT layer from file at ``spath`` using password ``pw``, writing the decrypted result to dpath. """ - if verbose is True: + if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypt %s → %s" % (spath, dpath), file=sys.stderr) - with deptdcrypt_mk_stream (PDT_SOURCE, spath) as ins: - with deptdcrypt_mk_stream (PDT_SINK, dpath) as outs: - return depdtcrypt (pw, ins, outs, verbose) + with deptdcrypt_mk_stream (PDTCRYPT_SOURCE, spath) as ins: + with deptdcrypt_mk_stream (PDTCRYPT_SINK, dpath) as outs: + return depdtcrypt (pw, ins, outs) + + +def usage (err=False): + out = print + if err is True: + out = noise + out ("usage: %s { --help | [ -v ] PASSWORD -i { - | SOURCE } -o { - | DESTINATION } }" + % SELF) + out ("") + out ("\twhere") + out ("\t\tPASSWORD password to derive the encryption key from") + out ("\t\t-i SOURCE file name to read from") + out ("\t\t-o DESTINATION file to write output to") + out ("\t\t-v print extra info") + out ("") + out ("\tinstead of filenames, “-” may used to specify stdin / stdout") + out ("") + sys.exit ((err is True) and 42 or 0) + + +def parse_argv (argv): + global SELF + pw = None + insspec = None + outsspec = None + + argvi = iter (argv) + SELF = os.path.basename (next (argvi)) + + for arg in argvi: + if arg in [ "-h", "--help" ]: + usage () + raise Unreachable + elif arg in [ "-v", "--verbose", "--wtf" ]: + global PDTCRYPT_VERBOSE + PDTCRYPT_VERBOSE = True + elif arg in [ "-i", "--in", "--source" ]: + insspec = next (argvi) + if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypt from %s" % insspec) + elif arg in [ "-o", "--out", "--dest", "--sink" ]: + outsspec = next (argvi) + if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypt to %s" % outsspec) + else: + if pw is None: + pw = arg + else: + noise ("ERROR: unqualified argument “%s” but password already " + "given" % arg) + noise ("") + usage (err=True) + raise Unreachable + + if pw is None: + noise ("ERROR: no password given") + noise ("") + usage (err=True) + raise Unreachable + + # default to stdout + ins = deptdcrypt_mk_stream (PDTCRYPT_SOURCE, insspec or "-") + outs = deptdcrypt_mk_stream (PDTCRYPT_SINK , outsspec or "-") + return pw, ins, outs def main (argv): + pw, ins, outs = parse_argv (argv) total_read, total_obj, total_ct, total_pt = \ - depdtcrypt_file (argv [1], argv [2], argv [3], verbose=True) - noise ("PDT: decryption successful" ) - noise ("PDT: %.10d bytes read" % total_read) - noise ("PDT: %.10d objects decrypted" % total_obj ) - noise ("PDT: %.10d bytes ciphertext" % total_ct ) - noise ("PDT: %.10d bytes plaintext" % total_pt ) - noise ("" ) + depdtcrypt (pw, ins, outs) + if PDTCRYPT_VERBOSE is True: + noise ("PDT: decryption successful" ) + noise ("PDT: %.10d bytes read" % total_read) + noise ("PDT: %.10d objects decrypted" % total_obj ) + noise ("PDT: %.10d bytes ciphertext" % total_ct ) + noise ("PDT: %.10d bytes plaintext" % total_pt ) + noise ("" ) if __name__ == "__main__": -- 1.7.1