From a83fa4eddc0ea2113a733df3473c6d37c53bda55 Mon Sep 17 00:00:00 2001 From: Philipp Gesang Date: Thu, 18 May 2017 17:47:25 +0200 Subject: [PATCH] allow passing keys directly to CLI crypto.py Keys may now be passed as command line argument or environment variable. The only valid format is 16 bytes in hexadecimal. --- deltatar/crypto.py | 171 ++++++++++++++++++++++--------------------- deltatar/deltatar.py | 6 +- testing/test_crypto.py | 2 +- testing/test_encryption.py | 156 +++++++++++++++++++++++++++++++++++++--- 4 files changed, 236 insertions(+), 99 deletions(-) diff --git a/deltatar/crypto.py b/deltatar/crypto.py index 429cdbe..f4b330a 100755 --- a/deltatar/crypto.py +++ b/deltatar/crypto.py @@ -394,7 +394,7 @@ def hdr_from_params (version, paramversion, nacl, iv, ctsize, tag): PDTCRYPT_HDR_MAGIC, version, paramversion, nacl, iv, ctsize, tag) except Exception as exn: - return False, "error writing header: %s" % str (exn) + return False, "error assembling header: %s" % str (exn) return True, bytes (buf) @@ -709,22 +709,25 @@ class Crypto (object): """ self.next_fixed () self.set_object_counter (counter) - self.strict_ivs = strict_ivs + if paramversion is not None: + self.paramversion = paramversion + if key is not None: self.key, self.nacl = key, nacl return - if isinstance (password, bytes) is False: password = str.encode (password) - self.password = password - if paramversion is None and nacl is None: - # postpone until first header is available - return - kdf = kdf_by_version (paramversion) - if kdf is not None: - self.key, self.nacl = kdf (password, nacl) - self.paramversion = paramversion + if password is not None: + if isinstance (password, bytes) is False: + password = str.encode (password) + self.password = password + if paramversion is None and nacl is None: + # postpone key setup until first header is available + return + kdf = kdf_by_version (paramversion) + if kdf is not None: + self.key, self.nacl = kdf (password, nacl) def process (self, buf): @@ -1052,7 +1055,8 @@ class Decrypt (Crypto): self.fixed = fixedparts self.fixed.sort () - super().__init__ (password, key, counter=counter, strict_ivs=strict_ivs) + super().__init__ (password=password, key=key, counter=counter, + strict_ivs=strict_ivs) def valid_fixed_part (self, iv): @@ -1201,6 +1205,9 @@ PDTCRYPT_SUB = \ { "process" : PDTCRYPT_SUB_PROCESS , "scrypt" : PDTCRYPT_SUB_SCRYPT } +PDTCRYPT_SECRET_PW = 0 +PDTCRYPT_SECRET_KEY = 1 + PDTCRYPT_DECRYPT = 1 << 0 # decrypt archive with password PDTCRYPT_SPLIT = 1 << 1 # split archive into individual objects PDTCRYPT_HASH = 1 << 2 # output scrypt hash for file and given password @@ -1253,10 +1260,10 @@ class PassthroughDecryptor (object): return d -def depdtcrypt (mode, pw, ins, outs): +def depdtcrypt (mode, secret, ins, outs): """ - Remove PDTCRYPT layer from obj encrypted with pw. Used on a Deltatar - backup this will yield a (possibly Gzip compressed) tarball. + Remove PDTCRYPT layer from all objects encrypted with the secret. Used on a + Deltatar backup this will yield a (possibly Gzip compressed) tarball. """ ctleft = -1 # length of ciphertext to consume ctcurrent = 0 # total ciphertext of current object @@ -1267,7 +1274,15 @@ def depdtcrypt (mode, pw, ins, outs): outfile = None # Python file object for output if mode & PDTCRYPT_DECRYPT: # decryptor - decr = Decrypt (password=pw, strict_ivs=PDTCRYPT_STRICTIVS) + ks = secret [0] + if ks == PDTCRYPT_SECRET_PW: + decr = Decrypt (password=secret [1], strict_ivs=PDTCRYPT_STRICTIVS) + elif ks == PDTCRYPT_SECRET_KEY: + key = binascii.unhexlify (secret [1]) + decr = Decrypt (key=key, strict_ivs=PDTCRYPT_STRICTIVS) + else: + raise InternalError ("‘%d’ does not specify a valid kind of secret" + % ks) else: decr = PassthroughDecryptor () @@ -1434,28 +1449,16 @@ def deptdcrypt_mk_stream (kind, path): raise ValueError ("bogus stream “%s” / %s" % (kind, path)) -def depdtcrypt_file (pw, spath, dpath): - """ - Remove PDTCRYPT layer from file at ``spath`` using password ``pw``, writing - the decrypted result to dpath. - """ - if PDTCRYPT_VERBOSE is True: - noise ("PDT: decrypt %s → %s" % (spath, dpath), file=sys.stderr) - with deptdcrypt_mk_stream (PDTCRYPT_SOURCE, spath) as ins: - with deptdcrypt_mk_stream (PDTCRYPT_SINK, dpath) as outs: - return depdtcrypt (pw, ins, outs) - - -def mode_depdtcrypt (mode, pw, ins, outs): +def mode_depdtcrypt (mode, secret, ins, outs): try: total_read, total_obj, total_ct, total_pt = \ - depdtcrypt (mode, pw, ins, outs) + depdtcrypt (mode, secret, ins, outs) except DecryptionError as exn: noise ("PDT: Decryption failed:") noise ("PDT:") noise ("PDT: “%s”" % exn) noise ("PDT:") - noise ("PDT: Did you specify the correct password?") + noise ("PDT: Did you specify the correct key / password?") noise ("") return 1 except PDTSplitError as exn: @@ -1463,7 +1466,7 @@ def mode_depdtcrypt (mode, pw, ins, outs): noise ("PDT:") noise ("PDT: “%s”" % exn) noise ("PDT:") - noise ("PDT: Hint: target directory should to be empty.") + noise ("PDT: Hint: target directory should be empty.") noise ("") return 1 @@ -1496,16 +1499,18 @@ def usage (err=False): if err is True: out = noise out ("usage: %s SUBCOMMAND { --help" % SELF) - out (" | [ -v ] { PASSWORD } { { -i | --in } { - | SOURCE } }") - out (" { { -o | --out } { - | DESTINATION } }") - out (" { -D | --no-decrypt } { -S | --split }") + out (" | [ -v ] { -p PASSWORD | -k KEY }") + out (" { { -i | --in } { - | SOURCE } }") + out (" { { -o | --out } { - | DESTINATION } }") + out (" { -D | --no-decrypt } { -S | --split }") out ("") out ("\twhere") out ("\t\tSUBCOMMAND main mode: { process | scrypt }") out ("\t\t where:") out ("\t\t process: extract objects from PDT archive") out ("\t\t scrypt: calculate hash from password and first object") - out ("\t\tPASSWORD password to derive the encryption key from") + out ("\t\t-p PASSWORD password to derive the encryption key from") + out ("\t\t-k KEY encryption key as 16 bytes in hexadecimal notation") out ("\t\t-s enforce strict handling of initialization vectors") out ("\t\t-i SOURCE file name to read from") out ("\t\t-o DESTINATION file to write output to") @@ -1519,10 +1524,17 @@ def usage (err=False): sys.exit ((err is True) and 42 or 0) +def bail (msg): + noise (msg) + noise ("") + usage (err=True) + raise Unreachable + + def parse_argv (argv): global SELF mode = PDTCRYPT_DECRYPT - pw = None + secret = None insspec = None outsspec = None @@ -1533,26 +1545,16 @@ def parse_argv (argv): rawsubcmd = next (argvi) subcommand = PDTCRYPT_SUB [rawsubcmd] except StopIteration: - noise ("ERROR: subcommand required") - noise ("") - usage (err=True) - raise Unreachable + bail ("ERROR: subcommand required") except KeyError: - noise ("ERROR: invalid subcommand “%s” specified" % rawsubcmd) - noise ("") - usage (err=True) - raise Unreachable + bail ("ERROR: invalid subcommand “%s” specified" % rawsubcmd) - def checked_pw (arg): - nonlocal pw - if pw is None: - pw = arg + def checked_secret (t, arg): + nonlocal secret + if secret is None: + secret = (t, arg) else: - noise ("ERROR: unqualified argument “%s” but password already " - "given" % arg) - noise ("") - usage (err=True) - raise Unreachable + bail ("ERROR: encountered “%s” but secret already given" % arg) for arg in argvi: if arg in [ "-h", "--help" ]: @@ -1567,6 +1569,10 @@ def parse_argv (argv): elif arg in [ "-o", "--out", "--dest", "--sink" ]: outsspec = next (argvi) if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypt to %s" % outsspec) + elif arg in [ "-p", "--password" ]: + arg = next (argvi) + checked_secret (PDTCRYPT_SECRET_PW, arg) + if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypting with password") else: if subcommand == PDTCRYPT_SUB_PROCESS: if arg in [ "-s", "--strict-ivs" ]: @@ -1582,44 +1588,47 @@ def parse_argv (argv): elif arg in [ "-D", "--no-decrypt" ]: mode &= ~PDTCRYPT_DECRYPT if PDTCRYPT_VERBOSE is True: noise ("PDT: not decrypting") + elif arg in [ "-k", "--key" ]: + arg = next (argvi) + checked_secret (PDTCRYPT_SECRET_KEY, arg) + if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypting with key") else: - checked_pw (arg) + bail ("ERROR: unexpected positional argument “%s”" % arg) elif subcommand == PDTCRYPT_SUB_SCRYPT: - checked_pw (arg) - else: - raise Unreachable + bail ("ERROR: unexpected positional argument “%s”" % arg) - if pw is None: + if secret is None: if PDTCRYPT_VERBOSE is True: - noise ("ERROR: no password specified, trying $PDTCRYPT_PASSWORD") + noise ("ERROR: no password or key specified, trying $PDTCRYPT_PASSWORD") epw = os.getenv ("PDTCRYPT_PASSWORD") if epw is not None: - pw = epw.strip () + checked_secret (PDTCRYPT_SECRET_PW, epw.strip ()) + + if secret is None: + if PDTCRYPT_VERBOSE is True: + noise ("ERROR: no password or key specified, trying $PDTCRYPT_KEY") + ek = os.getenv ("PDTCRYPT_KEY") + if ek is not None: + checked_secret (PDTCRYPT_SECRET_KEY, ek.strip ()) - if pw is None: + if secret is None: if subcommand == PDTCRYPT_SUB_SCRYPT: - noise ("ERROR: scrypt hash mode requested but no password given") - noise ("") - usage (err=True) - raise Unreachable + bail ("ERROR: scrypt hash mode requested but no password given") elif mode & PDTCRYPT_DECRYPT: - noise ("ERROR: encryption requested but no password given") - noise ("") - usage (err=True) - raise Unreachable + bail ("ERROR: encryption requested but no password given") + + if subcommand == PDTCRYPT_SUB_SCRYPT and secret [0] == PDTCRYPT_SECRET_KEY: + bail ("ERROR: scrypt mode requires a password") # default to stdout ins = deptdcrypt_mk_stream (PDTCRYPT_SOURCE, insspec or "-") if subcommand == PDTCRYPT_SUB_SCRYPT: - return True, partial (mode_scrypt, pw.encode (), ins) + return True, partial (mode_scrypt, secret [1].encode (), ins) if mode & PDTCRYPT_SPLIT: # destination must be directory if outsspec is None or outsspec == "-": - noise ("ERROR: split mode is incompatible with stdout sink") - noise ("") - usage (err=True) - raise Unreachable + bail ("ERROR: split mode is incompatible with stdout sink") try: try: @@ -1630,20 +1639,14 @@ def parse_argv (argv): pass outs = os.open (outsspec, os.O_DIRECTORY, 0o600) except FileNotFoundError as exn: - noise ("ERROR: cannot create target directory “%s”" % outsspec) - noise ("") - usage (err=True) - raise Unreachable + bail ("ERROR: cannot create target directory “%s”" % outsspec) except NotADirectoryError as exn: - noise ("ERROR: target path “%s” is not a directory" % outsspec) - noise ("") - usage (err=True) - raise Unreachable + bail ("ERROR: target path “%s” is not a directory" % outsspec) else: outs = deptdcrypt_mk_stream (PDTCRYPT_SINK, outsspec or "-") - return True, partial (mode_depdtcrypt, mode, pw, ins, outs) + return True, partial (mode_depdtcrypt, mode, secret, ins, outs) def main (argv): diff --git a/deltatar/deltatar.py b/deltatar/deltatar.py index 46c9155..8b8c19d 100644 --- a/deltatar/deltatar.py +++ b/deltatar/deltatar.py @@ -79,7 +79,7 @@ class DeltaTar(object): # list of files to include in the backup creation or restore operation. It # can contain python regular expressions. If empty, all files in the source # path will be backed up (when creating a backup) or all the files in the - # backup will be restored (when restoring a backuup), but if included_files + # backup will be restored (when restoring a backup), but if included_files # is set then only the files include in the list will be processed. included_files = [] @@ -168,7 +168,7 @@ class DeltaTar(object): restore operation. It can contain python regular expressions. If empty, all files in the source path will be backed up (when creating a backup) or all the files in the backup will be restored (when - restoring a backuup), but if included_files is set then only the files + restoring a backup), but if included_files is set then only the files include in the list will be processed. - filter_func: custom filter of files to be backed up (or restored). @@ -279,7 +279,7 @@ class DeltaTar(object): if mode is None: mode = self.__index_extensions_dict [self.index_mode] ret += mode - if self.password is not None: + if self.crypto_key is not None or self.password is not None: ret += "." + PDTCRYPT_EXTENSION return ret diff --git a/testing/test_crypto.py b/testing/test_crypto.py index 0b04c2b..8ed4748 100644 --- a/testing/test_crypto.py +++ b/testing/test_crypto.py @@ -631,7 +631,7 @@ class HeaderTest (CryptoLayerTest): def test_crypto_fmt_hdr_make_useless (self): ok, ret = crypto.hdr_make ({ 42: "x" }) assert ok is False - assert ret.startswith ("error writing header:") + assert ret.startswith ("error assembling header:") def test_crypto_fmt_hdr_read (self): diff --git a/testing/test_encryption.py b/testing/test_encryption.py index dfbc603..76568eb 100644 --- a/testing/test_encryption.py +++ b/testing/test_encryption.py @@ -15,6 +15,8 @@ # +import binascii +import hashlib import os from deltatar import crypto @@ -32,7 +34,7 @@ class EncryptionTest(BaseTest): Test encryption after compression in tarfiles """ - def test_cli_decrypt(self): + def test_cli_decrypt_pw (self): """ Create a tar file with only one file inside, using concat compression and encryption mode. Then decrypt with crypto.py, @@ -49,12 +51,52 @@ class EncryptionTest(BaseTest): ``crypto.py`` to decrypt on the command line; the rest of the data pipeline (→ gzip → tar → files) remains the same. """ + pw = "Where is my cow?" # create the content of the file to compress and hash it hash = self.create_file("big", 50000) # create the encryption handler - encryptor = crypto.Encrypt (password="key", + encryptor = crypto.Encrypt (password=pw, + version=DELTATAR_HEADER_VERSION, + paramversion=DELTATAR_PARAMETER_VERSION) + + # create the tar file with volumes + tarobj = TarFile.open("sample.tar.gz.pdtcrypt", + mode="w#gz", + format=GNU_FORMAT, + encryption=encryptor) + tarobj.add("big") + tarobj.close() + os.unlink("big") + + # decrypt outer archive layer with crypto.py + assert os.path.exists("sample.tar.gz.pdtcrypt") + ret = os.system("python3 ./deltatar/crypto.py process -p '%s' " + "sample.tar.gz" % pw) + assert ret == 0 + assert os.path.exists("sample.tar.gz") + + # extract with normal tar and check output + os.system("zcat sample.tar.gz 2>/dev/null > sample.tar") + os.system("tar xf sample.tar") + assert os.path.exists("big") + assert hash == self.md5sum("big") + + + def test_cli_decrypt_key_argv (self): + """ + Encrypt, then decrypt using the crypto.py → gzip → tar pipe, + supplying the encryption key on the command line. + """ + key = os.urandom (16) + nacl = hashlib.md5 ("Stráðu á mig salti".encode ()).digest () + + # create the content of the file to compress and hash it + hash = self.create_file("big", 50000) + + # create the encryption handler + encryptor = crypto.Encrypt (key=key, nacl=nacl, version=DELTATAR_HEADER_VERSION, paramversion=DELTATAR_PARAMETER_VERSION) @@ -67,11 +109,51 @@ class EncryptionTest(BaseTest): tarobj.close() os.unlink("big") - #filesplit.split_file(b'Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes128") + # decrypt outer archive layer with crypto.py + assert os.path.exists("sample.tar.gz.pdtcrypt") + ret = os.system("python3 ./deltatar/crypto.py process -k '%s' " + "sample.tar.gz" + % binascii.hexlify (key).decode ()) + assert ret == 0 + assert os.path.exists("sample.tar.gz") + + # extract with normal tar and check output + os.system("zcat sample.tar.gz 2>/dev/null > sample.tar") + os.system("tar xf sample.tar") + assert os.path.exists("big") + assert hash == self.md5sum("big") + + + def test_cli_decrypt_key_envp (self): + """ + Encrypt, then decrypt using the crypto.py → gzip → tar pipe, + supplying the encryption key through the process environment. + """ + key = os.urandom (16) + nacl = hashlib.md5 ("Stráðu á mig salti".encode ()).digest () + + # create the content of the file to compress and hash it + hash = self.create_file("big", 50000) + + # create the encryption handler + encryptor = crypto.Encrypt (key=key, nacl=nacl, + version=DELTATAR_HEADER_VERSION, + paramversion=DELTATAR_PARAMETER_VERSION) + + # create the tar file with volumes + tarobj = TarFile.open("sample.tar.gz.pdtcrypt", + mode="w#gz", + format=GNU_FORMAT, + encryption=encryptor) + tarobj.add("big") + tarobj.close() + os.unlink("big") # decrypt outer archive layer with crypto.py assert os.path.exists("sample.tar.gz.pdtcrypt") - ret = os.system("python3 -s ./deltatar/crypto.py process key sample.tar.gz") + ret = os.system("PDTCRYPT_KEY='%s' python3 ./deltatar/crypto.py process " + "sample.tar.gz" + % binascii.hexlify (key).decode ()) assert ret == 0 assert os.path.exists("sample.tar.gz") @@ -81,12 +163,15 @@ class EncryptionTest(BaseTest): assert os.path.exists("big") assert hash == self.md5sum("big") - def test_cli_multiple_files_decrypt(self): + + def test_cli_multiple_files_decrypt_pw_argv (self): """ Create a tar file with multiple files inside, using concat compression and encryption mode. Then decrypt and split with ``crypto.py``, decompress it with zcat and untar it with gnu tar. + The password is specified as command line argument. """ + pw = "Is that my cow?" # create sample data hash = dict() @@ -95,7 +180,7 @@ class EncryptionTest(BaseTest): hash["small2"] = self.create_file("small2", 354) # create the encryption handler - encryptor = crypto.Encrypt (password="key", + encryptor = crypto.Encrypt (password=pw, version=DELTATAR_HEADER_VERSION, paramversion=DELTATAR_PARAMETER_VERSION) @@ -113,8 +198,8 @@ class EncryptionTest(BaseTest): os.unlink(k) assert os.path.exists("sample.tar.gz.pdtcrypt") - ret = os.system("python3 -s ./deltatar/crypto.py process key --split " - "-i sample.tar.gz.pdtcrypt -o .") + ret = os.system("python3 ./deltatar/crypto.py process -p '%s' -s --split " + "-i sample.tar.gz.pdtcrypt -o ." % pw) assert ret == 0 for i in range (len (hash)): fname = "pdtcrypt-object-%d.bin" % (i + 1) @@ -122,9 +207,58 @@ class EncryptionTest(BaseTest): os.system("zcat '%s' 2>/dev/null > sample.tar" % fname) os.system("tar xf sample.tar") - for key, value in hash.items(): - assert os.path.exists(key) - assert value == self.md5sum(key) + for fname, digest in hash.items(): + assert os.path.exists(fname) + assert digest == self.md5sum(fname) + + + def test_cli_multiple_files_decrypt_envp (self): + """ + Create a tar file with multiple files inside, using concat + compression and encryption mode. Then decrypt and split with + ``crypto.py``, decompress it with zcat and untar it with gnu tar. + The password is given as environment variable. + """ + pw = "Is that my cow?" + + # create sample data + hash = dict() + hash["big"] = self.create_file("big", 50000) + hash["small"] = self.create_file("small", 100) + hash["small2"] = self.create_file("small2", 354) + + # create the encryption handler + encryptor = crypto.Encrypt (password=pw, + version=DELTATAR_HEADER_VERSION, + paramversion=DELTATAR_PARAMETER_VERSION) + + # create the tar file with volumes + tarobj = TarFile.open("sample.tar.gz.pdtcrypt", + mode="w#gz", + format=GNU_FORMAT, + encryption=encryptor) + + for k in hash: + tarobj.add(k) + tarobj.close() + + for k in hash: + os.unlink(k) + + assert os.path.exists("sample.tar.gz.pdtcrypt") + ret = os.system("PDTCRYPT_PASSWORD='%s' python3 ./deltatar/crypto.py " + "process -s --split -i sample.tar.gz.pdtcrypt -o ." + % pw) + assert ret == 0 + for i in range (len (hash)): + fname = "pdtcrypt-object-%d.bin" % (i + 1) + assert os.path.exists(fname) + os.system("zcat '%s' 2>/dev/null > sample.tar" % fname) + os.system("tar xf sample.tar") + + for fname, digest in hash.items(): + assert os.path.exists(fname) + assert digest == self.md5sum(fname) def test_decrypt(self): -- 1.7.1