allow passing keys directly to CLI crypto.py
authorPhilipp Gesang <philipp.gesang@intra2net.com>
Thu, 18 May 2017 15:47:25 +0000 (17:47 +0200)
committerThomas Jarosch <thomas.jarosch@intra2net.com>
Mon, 2 Apr 2018 11:34:09 +0000 (13:34 +0200)
Keys may now be passed as command line argument or environment
variable.

The only valid format is 16 bytes in hexadecimal.

deltatar/crypto.py
deltatar/deltatar.py
testing/test_crypto.py
testing/test_encryption.py

index 429cdbe..f4b330a 100755 (executable)
@@ -394,7 +394,7 @@ def hdr_from_params (version, paramversion, nacl, iv, ctsize, tag):
                           PDTCRYPT_HDR_MAGIC,
                           version, paramversion, nacl, iv, ctsize, tag)
     except Exception as exn:
-        return False, "error writing header: %s" % str (exn)
+        return False, "error assembling header: %s" % str (exn)
 
     return True, bytes (buf)
 
@@ -709,22 +709,25 @@ class Crypto (object):
         """
         self.next_fixed ()
         self.set_object_counter (counter)
-
         self.strict_ivs = strict_ivs
 
+        if paramversion is not None:
+            self.paramversion = paramversion
+
         if key is not None:
             self.key, self.nacl = key, nacl
             return
 
-        if isinstance (password, bytes) is False: password = str.encode (password)
-        self.password = password
-        if paramversion is None and nacl is None:
-            # postpone until first header is available
-            return
-        kdf = kdf_by_version (paramversion)
-        if kdf is not None:
-            self.key, self.nacl = kdf (password, nacl)
-        self.paramversion = paramversion
+        if password is not None:
+            if isinstance (password, bytes) is False:
+                password = str.encode (password)
+            self.password = password
+            if paramversion is None and nacl is None:
+                # postpone key setup until first header is available
+                return
+            kdf = kdf_by_version (paramversion)
+            if kdf is not None:
+                self.key, self.nacl = kdf (password, nacl)
 
 
     def process (self, buf):
@@ -1052,7 +1055,8 @@ class Decrypt (Crypto):
             self.fixed = fixedparts
             self.fixed.sort ()
 
-        super().__init__ (password, key, counter=counter, strict_ivs=strict_ivs)
+        super().__init__ (password=password, key=key, counter=counter,
+                          strict_ivs=strict_ivs)
 
 
     def valid_fixed_part (self, iv):
@@ -1201,6 +1205,9 @@ PDTCRYPT_SUB = \
         { "process" : PDTCRYPT_SUB_PROCESS
         , "scrypt"  : PDTCRYPT_SUB_SCRYPT }
 
+PDTCRYPT_SECRET_PW   = 0
+PDTCRYPT_SECRET_KEY  = 1
+
 PDTCRYPT_DECRYPT   = 1 << 0 # decrypt archive with password
 PDTCRYPT_SPLIT     = 1 << 1 # split archive into individual objects
 PDTCRYPT_HASH      = 1 << 2 # output scrypt hash for file and given password
@@ -1253,10 +1260,10 @@ class PassthroughDecryptor (object):
         return d
 
 
-def depdtcrypt (mode, pw, ins, outs):
+def depdtcrypt (mode, secret, ins, outs):
     """
-    Remove PDTCRYPT layer from obj encrypted with pw. Used on a Deltatar
-    backup this will yield a (possibly Gzip compressed) tarball.
+    Remove PDTCRYPT layer from all objects encrypted with the secret. Used on a
+    Deltatar backup this will yield a (possibly Gzip compressed) tarball.
     """
     ctleft     = -1              # length of ciphertext to consume
     ctcurrent  = 0               # total ciphertext of current object
@@ -1267,7 +1274,15 @@ def depdtcrypt (mode, pw, ins, outs):
     outfile    = None            # Python file object for output
 
     if mode & PDTCRYPT_DECRYPT:  # decryptor
-        decr = Decrypt (password=pw, strict_ivs=PDTCRYPT_STRICTIVS)
+        ks = secret [0]
+        if ks == PDTCRYPT_SECRET_PW:
+            decr = Decrypt (password=secret [1], strict_ivs=PDTCRYPT_STRICTIVS)
+        elif ks == PDTCRYPT_SECRET_KEY:
+            key = binascii.unhexlify (secret [1])
+            decr = Decrypt (key=key, strict_ivs=PDTCRYPT_STRICTIVS)
+        else:
+            raise InternalError ("‘%d’ does not specify a valid kind of secret"
+                                 % ks)
     else:
         decr = PassthroughDecryptor ()
 
@@ -1434,28 +1449,16 @@ def deptdcrypt_mk_stream (kind, path):
     raise ValueError ("bogus stream “%s” / %s" % (kind, path))
 
 
-def depdtcrypt_file (pw, spath, dpath):
-    """
-    Remove PDTCRYPT layer from file at ``spath`` using password ``pw``, writing
-    the decrypted result to dpath.
-    """
-    if PDTCRYPT_VERBOSE is True:
-        noise ("PDT: decrypt %s → %s" % (spath, dpath), file=sys.stderr)
-    with deptdcrypt_mk_stream (PDTCRYPT_SOURCE, spath) as ins:
-        with deptdcrypt_mk_stream (PDTCRYPT_SINK, dpath) as outs:
-            return depdtcrypt (pw, ins, outs)
-
-
-def mode_depdtcrypt (mode, pw, ins, outs):
+def mode_depdtcrypt (mode, secret, ins, outs):
     try:
         total_read, total_obj, total_ct, total_pt = \
-            depdtcrypt (mode, pw, ins, outs)
+            depdtcrypt (mode, secret, ins, outs)
     except DecryptionError as exn:
         noise ("PDT: Decryption failed:")
         noise ("PDT:")
         noise ("PDT:    “%s”" % exn)
         noise ("PDT:")
-        noise ("PDT: Did you specify the correct password?")
+        noise ("PDT: Did you specify the correct key / password?")
         noise ("")
         return 1
     except PDTSplitError as exn:
@@ -1463,7 +1466,7 @@ def mode_depdtcrypt (mode, pw, ins, outs):
         noise ("PDT:")
         noise ("PDT:    “%s”" % exn)
         noise ("PDT:")
-        noise ("PDT: Hint: target directory should to be empty.")
+        noise ("PDT: Hint: target directory should be empty.")
         noise ("")
         return 1
 
@@ -1496,16 +1499,18 @@ def usage (err=False):
     if err is True:
         out = noise
     out ("usage: %s SUBCOMMAND { --help" % SELF)
-    out ("                     | [ -v ] { PASSWORD } { { -i | --in }  { - | SOURCE } }")
-    out ("                                           { { -o | --out } { - | DESTINATION } }")
-    out ("                                           { -D | --no-decrypt } { -S | --split }")
+    out ("                     | [ -v ] { -p PASSWORD | -k KEY }")
+    out ("                       { { -i | --in }  { - | SOURCE } }")
+    out ("                       { { -o | --out } { - | DESTINATION } }")
+    out ("                       { -D | --no-decrypt } { -S | --split }")
     out ("")
     out ("\twhere")
     out ("\t\tSUBCOMMAND      main mode: { process | scrypt }")
     out ("\t\t                where:")
     out ("\t\t                   process: extract objects from PDT archive")
     out ("\t\t                   scrypt:  calculate hash from password and first object")
-    out ("\t\tPASSWORD        password to derive the encryption key from")
+    out ("\t\t-p PASSWORD     password to derive the encryption key from")
+    out ("\t\t-k KEY          encryption key as 16 bytes in hexadecimal notation")
     out ("\t\t-s              enforce strict handling of initialization vectors")
     out ("\t\t-i SOURCE       file name to read from")
     out ("\t\t-o DESTINATION  file to write output to")
@@ -1519,10 +1524,17 @@ def usage (err=False):
     sys.exit ((err is True) and 42 or 0)
 
 
+def bail (msg):
+    noise (msg)
+    noise ("")
+    usage (err=True)
+    raise Unreachable
+
+
 def parse_argv (argv):
     global SELF
     mode      = PDTCRYPT_DECRYPT
-    pw        = None
+    secret    = None
     insspec   = None
     outsspec  = None
 
@@ -1533,26 +1545,16 @@ def parse_argv (argv):
         rawsubcmd  = next (argvi)
         subcommand = PDTCRYPT_SUB [rawsubcmd]
     except StopIteration:
-        noise ("ERROR: subcommand required")
-        noise ("")
-        usage (err=True)
-        raise Unreachable
+        bail ("ERROR: subcommand required")
     except KeyError:
-        noise ("ERROR: invalid subcommand “%s” specified" % rawsubcmd)
-        noise ("")
-        usage (err=True)
-        raise Unreachable
+        bail ("ERROR: invalid subcommand “%s” specified" % rawsubcmd)
 
-    def checked_pw (arg):
-        nonlocal pw
-        if pw is None:
-            pw = arg
+    def checked_secret (t, arg):
+        nonlocal secret
+        if secret is None:
+            secret = (t, arg)
         else:
-            noise ("ERROR: unqualified argument “%s” but password already "
-                "given" % arg)
-            noise ("")
-            usage (err=True)
-            raise Unreachable
+            bail ("ERROR: encountered “%s” but secret already given" % arg)
 
     for arg in argvi:
         if arg in [ "-h", "--help" ]:
@@ -1567,6 +1569,10 @@ def parse_argv (argv):
         elif arg in [ "-o", "--out", "--dest", "--sink" ]:
             outsspec = next (argvi)
             if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypt to %s" % outsspec)
+        elif arg in [ "-p", "--password" ]:
+            arg = next (argvi)
+            checked_secret (PDTCRYPT_SECRET_PW, arg)
+            if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypting with password")
         else:
             if subcommand == PDTCRYPT_SUB_PROCESS:
                 if arg in [ "-s", "--strict-ivs" ]:
@@ -1582,44 +1588,47 @@ def parse_argv (argv):
                 elif arg in [ "-D", "--no-decrypt" ]:
                     mode &= ~PDTCRYPT_DECRYPT
                     if PDTCRYPT_VERBOSE is True: noise ("PDT: not decrypting")
+                elif arg in [ "-k", "--key" ]:
+                    arg = next (argvi)
+                    checked_secret (PDTCRYPT_SECRET_KEY, arg)
+                    if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypting with key")
                 else:
-                    checked_pw (arg)
+                    bail ("ERROR: unexpected positional argument “%s”" % arg)
             elif subcommand == PDTCRYPT_SUB_SCRYPT:
-                checked_pw (arg)
-            else:
-                raise Unreachable
+                bail ("ERROR: unexpected positional argument “%s”" % arg)
 
-    if pw is None:
+    if secret is None:
         if PDTCRYPT_VERBOSE is True:
-            noise ("ERROR: no password specified, trying $PDTCRYPT_PASSWORD")
+            noise ("ERROR: no password or key specified, trying $PDTCRYPT_PASSWORD")
         epw = os.getenv ("PDTCRYPT_PASSWORD")
         if epw is not None:
-            pw = epw.strip ()
+            checked_secret (PDTCRYPT_SECRET_PW, epw.strip ())
+
+    if secret is None:
+        if PDTCRYPT_VERBOSE is True:
+            noise ("ERROR: no password or key specified, trying $PDTCRYPT_KEY")
+        ek = os.getenv ("PDTCRYPT_KEY")
+        if ek is not None:
+            checked_secret (PDTCRYPT_SECRET_KEY, ek.strip ())
 
-    if pw is None:
+    if secret is None:
         if subcommand == PDTCRYPT_SUB_SCRYPT:
-            noise ("ERROR: scrypt hash mode requested but no password given")
-            noise ("")
-            usage (err=True)
-            raise Unreachable
+            bail ("ERROR: scrypt hash mode requested but no password given")
         elif mode & PDTCRYPT_DECRYPT:
-            noise ("ERROR: encryption requested but no password given")
-            noise ("")
-            usage (err=True)
-            raise Unreachable
+            bail ("ERROR: encryption requested but no password given")
+
+    if subcommand == PDTCRYPT_SUB_SCRYPT and secret [0] == PDTCRYPT_SECRET_KEY:
+        bail ("ERROR: scrypt mode requires a password")
 
     # default to stdout
     ins = deptdcrypt_mk_stream (PDTCRYPT_SOURCE, insspec  or "-")
 
     if subcommand == PDTCRYPT_SUB_SCRYPT:
-        return True, partial (mode_scrypt, pw.encode (), ins)
+        return True, partial (mode_scrypt, secret [1].encode (), ins)
 
     if mode & PDTCRYPT_SPLIT: # destination must be directory
         if outsspec is None or outsspec == "-":
-            noise ("ERROR: split mode is incompatible with stdout sink")
-            noise ("")
-            usage (err=True)
-            raise Unreachable
+            bail ("ERROR: split mode is incompatible with stdout sink")
 
         try:
             try:
@@ -1630,20 +1639,14 @@ def parse_argv (argv):
                 pass
             outs = os.open (outsspec, os.O_DIRECTORY, 0o600)
         except FileNotFoundError as exn:
-            noise ("ERROR: cannot create target directory “%s”" % outsspec)
-            noise ("")
-            usage (err=True)
-            raise Unreachable
+            bail ("ERROR: cannot create target directory “%s”" % outsspec)
         except NotADirectoryError as exn:
-            noise ("ERROR: target path “%s” is not a directory" % outsspec)
-            noise ("")
-            usage (err=True)
-            raise Unreachable
+            bail ("ERROR: target path “%s” is not a directory" % outsspec)
 
     else:
         outs = deptdcrypt_mk_stream (PDTCRYPT_SINK, outsspec or "-")
 
-    return True, partial (mode_depdtcrypt, mode, pw, ins, outs)
+    return True, partial (mode_depdtcrypt, mode, secret, ins, outs)
 
 
 def main (argv):
index 46c9155..8b8c19d 100644 (file)
@@ -79,7 +79,7 @@ class DeltaTar(object):
     # list of files to include in the backup creation or restore operation. It
     # can contain python regular expressions. If empty, all files in the source
     # path will be backed up (when creating a backup) or all the files in the
-    # backup will be restored (when restoring a backuup), but if included_files
+    # backup will be restored (when restoring a backup), but if included_files
     # is set then only the files include in the list will be processed.
     included_files = []
 
@@ -168,7 +168,7 @@ class DeltaTar(object):
           restore operation. It can contain python regular expressions. If
           empty, all files in the source path will be backed up (when creating a
           backup) or all the files in the backup will be restored (when
-          restoring a backuup), but if included_files is set then only the files
+          restoring a backup), but if included_files is set then only the files
           include in the list will be processed.
 
         - filter_func: custom filter of files to be backed up (or restored).
@@ -279,7 +279,7 @@ class DeltaTar(object):
         if mode is None:
             mode = self.__index_extensions_dict [self.index_mode]
         ret += mode
-        if self.password is not None:
+        if self.crypto_key is not None or self.password is not None:
             ret += "." + PDTCRYPT_EXTENSION
         return ret
 
index 0b04c2b..8ed4748 100644 (file)
@@ -631,7 +631,7 @@ class HeaderTest (CryptoLayerTest):
     def test_crypto_fmt_hdr_make_useless (self):
         ok, ret = crypto.hdr_make ({ 42: "x" })
         assert ok is False
-        assert ret.startswith ("error writing header:")
+        assert ret.startswith ("error assembling header:")
 
 
     def test_crypto_fmt_hdr_read (self):
index dfbc603..76568eb 100644 (file)
@@ -15,6 +15,8 @@
 # <http://www.gnu.org/licenses/lgpl-3.0.html>
 
 
+import binascii
+import hashlib
 import os
 
 from deltatar import crypto
@@ -32,7 +34,7 @@ class EncryptionTest(BaseTest):
     Test encryption after compression in tarfiles
     """
 
-    def test_cli_decrypt(self):
+    def test_cli_decrypt_pw (self):
         """
         Create a tar file with only one file inside, using concat
         compression and encryption mode. Then decrypt with crypto.py,
@@ -49,12 +51,52 @@ class EncryptionTest(BaseTest):
         ``crypto.py`` to decrypt on the command line; the rest of the
         data pipeline (→ gzip → tar → files) remains the same.
         """
+        pw = "Where is my cow?"
 
         # create the content of the file to compress and hash it
         hash = self.create_file("big", 50000)
 
         # create the encryption handler
-        encryptor = crypto.Encrypt (password="key",
+        encryptor = crypto.Encrypt (password=pw,
+                                    version=DELTATAR_HEADER_VERSION,
+                                    paramversion=DELTATAR_PARAMETER_VERSION)
+
+        # create the tar file with volumes
+        tarobj = TarFile.open("sample.tar.gz.pdtcrypt",
+                              mode="w#gz",
+                              format=GNU_FORMAT,
+                              encryption=encryptor)
+        tarobj.add("big")
+        tarobj.close()
+        os.unlink("big")
+
+        # decrypt outer archive layer with crypto.py
+        assert os.path.exists("sample.tar.gz.pdtcrypt")
+        ret = os.system("python3 ./deltatar/crypto.py process -p '%s' "
+                        "<sample.tar.gz.pdtcrypt >sample.tar.gz" % pw)
+        assert ret == 0
+        assert os.path.exists("sample.tar.gz")
+
+        # extract with normal tar and check output
+        os.system("zcat sample.tar.gz 2>/dev/null > sample.tar")
+        os.system("tar xf sample.tar")
+        assert os.path.exists("big")
+        assert hash == self.md5sum("big")
+
+
+    def test_cli_decrypt_key_argv (self):
+        """
+        Encrypt, then decrypt using the crypto.py → gzip → tar pipe,
+        supplying the encryption key on the command line.
+        """
+        key  = os.urandom (16)
+        nacl = hashlib.md5 ("Stráðu á mig salti".encode ()).digest ()
+
+        # create the content of the file to compress and hash it
+        hash = self.create_file("big", 50000)
+
+        # create the encryption handler
+        encryptor = crypto.Encrypt (key=key, nacl=nacl,
                                     version=DELTATAR_HEADER_VERSION,
                                     paramversion=DELTATAR_PARAMETER_VERSION)
 
@@ -67,11 +109,51 @@ class EncryptionTest(BaseTest):
         tarobj.close()
         os.unlink("big")
 
-        #filesplit.split_file(b'Salted__', "sample.tar.gz.aes.", "sample.tar.gz.aes128")
+        # decrypt outer archive layer with crypto.py
+        assert os.path.exists("sample.tar.gz.pdtcrypt")
+        ret = os.system("python3 ./deltatar/crypto.py process -k '%s' "
+                        "<sample.tar.gz.pdtcrypt >sample.tar.gz"
+                        % binascii.hexlify (key).decode ())
+        assert ret == 0
+        assert os.path.exists("sample.tar.gz")
+
+        # extract with normal tar and check output
+        os.system("zcat sample.tar.gz 2>/dev/null > sample.tar")
+        os.system("tar xf sample.tar")
+        assert os.path.exists("big")
+        assert hash == self.md5sum("big")
+
+
+    def test_cli_decrypt_key_envp (self):
+        """
+        Encrypt, then decrypt using the crypto.py → gzip → tar pipe,
+        supplying the encryption key through the process environment.
+        """
+        key  = os.urandom (16)
+        nacl = hashlib.md5 ("Stráðu á mig salti".encode ()).digest ()
+
+        # create the content of the file to compress and hash it
+        hash = self.create_file("big", 50000)
+
+        # create the encryption handler
+        encryptor = crypto.Encrypt (key=key, nacl=nacl,
+                                    version=DELTATAR_HEADER_VERSION,
+                                    paramversion=DELTATAR_PARAMETER_VERSION)
+
+        # create the tar file with volumes
+        tarobj = TarFile.open("sample.tar.gz.pdtcrypt",
+                              mode="w#gz",
+                              format=GNU_FORMAT,
+                              encryption=encryptor)
+        tarobj.add("big")
+        tarobj.close()
+        os.unlink("big")
 
         # decrypt outer archive layer with crypto.py
         assert os.path.exists("sample.tar.gz.pdtcrypt")
-        ret = os.system("python3 -s ./deltatar/crypto.py process key <sample.tar.gz.pdtcrypt >sample.tar.gz")
+        ret = os.system("PDTCRYPT_KEY='%s' python3 ./deltatar/crypto.py process "
+                        "<sample.tar.gz.pdtcrypt >sample.tar.gz"
+                        % binascii.hexlify (key).decode ())
         assert ret == 0
         assert os.path.exists("sample.tar.gz")
 
@@ -81,12 +163,15 @@ class EncryptionTest(BaseTest):
         assert os.path.exists("big")
         assert hash == self.md5sum("big")
 
-    def test_cli_multiple_files_decrypt(self):
+
+    def test_cli_multiple_files_decrypt_pw_argv (self):
         """
         Create a tar file with multiple files inside, using concat
         compression and encryption mode. Then decrypt and split with
         ``crypto.py``, decompress it with zcat and untar it with gnu tar.
+        The password is specified as command line argument.
         """
+        pw = "Is that my cow?"
 
         # create sample data
         hash = dict()
@@ -95,7 +180,7 @@ class EncryptionTest(BaseTest):
         hash["small2"] = self.create_file("small2", 354)
 
         # create the encryption handler
-        encryptor = crypto.Encrypt (password="key",
+        encryptor = crypto.Encrypt (password=pw,
                                     version=DELTATAR_HEADER_VERSION,
                                     paramversion=DELTATAR_PARAMETER_VERSION)
 
@@ -113,8 +198,8 @@ class EncryptionTest(BaseTest):
             os.unlink(k)
 
         assert os.path.exists("sample.tar.gz.pdtcrypt")
-        ret = os.system("python3 -s ./deltatar/crypto.py process key --split "
-                        "-i sample.tar.gz.pdtcrypt -o .")
+        ret = os.system("python3 ./deltatar/crypto.py process -p '%s' -s --split "
+                        "-i sample.tar.gz.pdtcrypt -o ." % pw)
         assert ret == 0
         for i in range (len (hash)):
             fname = "pdtcrypt-object-%d.bin" % (i + 1)
@@ -122,9 +207,58 @@ class EncryptionTest(BaseTest):
             os.system("zcat '%s' 2>/dev/null > sample.tar" % fname)
             os.system("tar xf sample.tar")
 
-        for key, value in hash.items():
-            assert os.path.exists(key)
-            assert value == self.md5sum(key)
+        for fname, digest in hash.items():
+            assert os.path.exists(fname)
+            assert digest == self.md5sum(fname)
+
+
+    def test_cli_multiple_files_decrypt_envp (self):
+        """
+        Create a tar file with multiple files inside, using concat
+        compression and encryption mode. Then decrypt and split with
+        ``crypto.py``, decompress it with zcat and untar it with gnu tar.
+        The password is given as environment variable.
+        """
+        pw = "Is that my cow?"
+
+        # create sample data
+        hash = dict()
+        hash["big"] = self.create_file("big", 50000)
+        hash["small"] = self.create_file("small", 100)
+        hash["small2"] = self.create_file("small2", 354)
+
+        # create the encryption handler
+        encryptor = crypto.Encrypt (password=pw,
+                                    version=DELTATAR_HEADER_VERSION,
+                                    paramversion=DELTATAR_PARAMETER_VERSION)
+
+        # create the tar file with volumes
+        tarobj = TarFile.open("sample.tar.gz.pdtcrypt",
+                              mode="w#gz",
+                              format=GNU_FORMAT,
+                              encryption=encryptor)
+
+        for k in hash:
+            tarobj.add(k)
+        tarobj.close()
+
+        for k in hash:
+            os.unlink(k)
+
+        assert os.path.exists("sample.tar.gz.pdtcrypt")
+        ret = os.system("PDTCRYPT_PASSWORD='%s' python3 ./deltatar/crypto.py "
+                        "process -s --split -i sample.tar.gz.pdtcrypt -o ."
+                        % pw)
+        assert ret == 0
+        for i in range (len (hash)):
+            fname = "pdtcrypt-object-%d.bin" % (i + 1)
+            assert os.path.exists(fname)
+            os.system("zcat '%s' 2>/dev/null > sample.tar" % fname)
+            os.system("tar xf sample.tar")
+
+        for fname, digest in hash.items():
+            assert os.path.exists(fname)
+            assert digest == self.md5sum(fname)
 
 
     def test_decrypt(self):