expand crypto api to accept precomputed key
authorPhilipp Gesang <philipp.gesang@intra2net.com>
Mon, 8 May 2017 14:27:13 +0000 (16:27 +0200)
committerThomas Jarosch <thomas.jarosch@intra2net.com>
Mon, 2 Apr 2018 11:34:08 +0000 (13:34 +0200)
deltatar/crypto.py
deltatar/deltatar.py
testing/test_crypto.py
testing/test_multivol_compression_sizes.py

index fed8e56..b8ad82d 100755 (executable)
@@ -494,12 +494,17 @@ class Crypto (object):
         self.next_pfx ()
 
 
-    def set_parameters (self, password, paramversion=None, nacl=None,
-                        counter=None, nextpfx=None):
+    def set_parameters (self, password=None, key=None, paramversion=None,
+                        nacl=None, counter=None, nextpfx=None):
         if nextpfx is not None:
             self.next_pfx = nextpfx
             self.next_pfx ()
         self.set_object_counter (counter)
+
+        if key is not None:
+            self.key, self.nacl = key, nacl
+            return
+
         if isinstance (password, bytes) is False: password = str.encode (password)
         self.password = password
         if paramversion is None and nacl is None:
@@ -527,7 +532,8 @@ class Crypto (object):
         if (   self.paramversion != paramversion
             or self.password     != password
             or self.nacl         != nacl):
-            self.set_parameters (password, paramversion, nacl)
+            self.set_parameters (password=password, paramversion=paramversion,
+                                 nacl=nacl)
 
 
     def counters (self):
@@ -540,15 +546,26 @@ class Encrypt (Crypto):
     version      = None
     paramenc     = None
 
-    def __init__ (self, password, version, paramversion, nacl=None,
+    def __init__ (self, version, paramversion, password=None, key=None, nacl=None,
                   counter=AES_GCM_IV_CNT_DATA):
-        # passwort
-        if isinstance (password, str) is False:
-            raise InvalidParameter ("__init__: password must be a string, not %s"
-                                    % type (password))
-        if len (password) == 0:
-            raise InvalidParameter ("__init__: refusing to encrypt with empty "
-                                    "password")
+        if         password is     None and key is     None \
+                or password is not None and key is not None :
+            raise InvalidParameter ("__init__: need either key or password")
+
+        if key is not None:
+            if isinstance (key, bytes) is False:
+                raise InvalidParameter ("__init__: key must be provided as "
+                                        "bytes, not %s" % type (key))
+            if nacl is None:
+                raise InvalidParameter ("__init__: salt must be provided along "
+                                        "with encryption key")
+        else: # password, no key
+            if isinstance (password, str) is False:
+                raise InvalidParameter ("__init__: password must be a string, not %s"
+                                        % type (password))
+            if len (password) == 0:
+                raise InvalidParameter ("__init__: supplied empty password but not "
+                                        "permitted for PDT encrypted files")
         # version
         if isinstance (version, int) is False:
             raise InvalidParameter ("__init__: version number must be an "
@@ -576,7 +593,7 @@ class Encrypt (Crypto):
         self.version      = version
         self.paramenc     = ENCRYPTION_PARAMETERS.get (paramversion) ["enc"]
 
-        super().__init__ (password, paramversion, nacl, counter=counter,
+        super().__init__ (password, key, paramversion, nacl, counter=counter,
                           nextpfx=lambda: self.fixed.append (os.urandom(8)))
 
 
@@ -650,15 +667,23 @@ class Decrypt (Crypto):
     strict_ivs = False  # if True, panic on duplicate object IV
     last_iv    = None   # check consecutive ivs in strict mode
 
-    def __init__ (self, password, counter=None, fixedparts=None,
+    def __init__ (self, password=None, key=None, counter=None, fixedparts=None,
                   strict_ivs=False):
-        # passwort
-        if isinstance (password, str) is False:
-            raise InvalidParameter ("__init__: password must be a string, not %s"
-                                    % type (password))
-        if len (password) == 0:
-            raise InvalidParameter ("__init__: supplied empty password but not "
-                                    "permitted for PDT encrypted files")
+        if         password is     None and key is     None \
+                or password is not None and key is not None :
+            raise InvalidParameter ("__init__: need either key or password")
+
+        if key is not None:
+            if isinstance (key, bytes) is False:
+                raise InvalidParameter ("__init__: key must be provided as "
+                                        "bytes, not %s" % type (key))
+        else: # password, no key
+            if isinstance (password, str) is False:
+                raise InvalidParameter ("__init__: password must be a string, not %s"
+                                        % type (password))
+            if len (password) == 0:
+                raise InvalidParameter ("__init__: supplied empty password but not "
+                                        "permitted for PDT encrypted files")
         # fixed parts
         if fixedparts is not None:
             if isinstance (fixedparts, list) is False:
@@ -667,12 +692,11 @@ class Decrypt (Crypto):
                                         % type (fixedparts))
             self.fixed = fixedparts
             self.fixed.sort ()
-            super().__init__ (password, counter=counter)
 
         self.used_ivs   = set ()
         self.strict_ivs = strict_ivs
 
-        super().__init__ (password, counter=counter)
+        super().__init__ (password, key, counter=counter)
 
 
     def valid_fixed_part (self, iv):
@@ -845,7 +869,7 @@ def depdtcrypt (mode, pw, ins, outs):
     outfile    = None            # Python file object for output
 
     if mode & PDTCRYPT_DECRYPT:  # decryptor
-        decr = Decrypt (pw, strict_ivs=PDTCRYPT_STRICTIVS)
+        decr = Decrypt (password=pw, strict_ivs=PDTCRYPT_STRICTIVS)
     else:
         decr = PassthroughDecryptor ()
 
index d0d6b4e..0e42260 100644 (file)
@@ -93,6 +93,8 @@ class DeltaTar(object):
 
     # used together with aes modes to encrypt and decrypt backups.
     password = None
+    crypto_key = None
+    nacl = None
 
     # parameter version to use when encrypting; note that this has no effect
     # on decryption since the required settings are determined from the headers
@@ -151,6 +153,7 @@ class DeltaTar(object):
 
     def __init__(self, excluded_files=[], included_files=[],
                  filter_func=None, mode="", password=None,
+                 crypto_key=None, nacl=None,
                  crypto_paramversion=DELTATAR_PARAMETER_VERSION,
                  logger=None, index_mode=None, index_name_func=None,
                  volume_name_func=None):
@@ -185,6 +188,14 @@ class DeltaTar(object):
            '|bz2'      open a bzip2 compressed stream of tar blocks
            '#gz'       open a stream of gzip compressed tar blocks
 
+        - crypto_key: used to encrypt and decrypt backups. Encryption will
+          be enabled automatically if a key is supplied. Requires a salt to be
+          passed as well.
+
+        - nacl: salt that was used to derive the encryption key for embedding
+          in the PDTCRYPT header. Not needed when decrypting and when
+          encrypting with password.
+
         - password: used to encrypt and decrypt backups. Encryption will be
           enabled automatically if a password is supplied.
 
@@ -227,6 +238,10 @@ class DeltaTar(object):
             self.logger.addHandler(logger)
         self.mode = mode
 
+        if crypto_key is not None:
+            self.crypto_key = crypto_key
+            self.nacl = nacl # encryption only
+
         if password is not None:
             self.password = password
 
@@ -519,15 +534,19 @@ class DeltaTar(object):
 
     def initialize_encryption (self, mode):
         password = self.password
+        key      = self.crypto_key
+        nacl     = self.nacl
 
-        if password is None:
+        if key is None and password is None:
             return
         if mode == CRYPTO_MODE_ENCRYPT:
-            return crypto.Encrypt (password,
+            return crypto.Encrypt (password=password,
+                                   key=key,
+                                   nacl=nacl,
                                    version=DELTATAR_HEADER_VERSION,
                                    paramversion=self.crypto_paramversion)
         if mode == CRYPTO_MODE_DECRYPT:
-            return crypto.Decrypt (password)
+            return crypto.Decrypt (password=password, key=key)
 
         raise Exception ("invalid encryption mode [%r]" % mode)
 
@@ -1060,7 +1079,9 @@ class DeltaTar(object):
                 if self.tar_obj is None:
                     decryptor = None
                     if self.delta_tar.password is not None:
-                        decryptor = crypto.Decrypt (self.delta_tar.password)
+                        decryptor = crypto.Decrypt \
+                                        (password=self.delta_tar.password,
+                                         key=self.delta_tar.crypto_key)
                     self.tar_obj = tarfile.TarFile.open(self.tar_path,
                         mode='r' + self.delta_tar.mode,
                         format=tarfile.GNU_FORMAT,
@@ -1467,6 +1488,7 @@ class RestoreHelper(object):
         self._deltatar = deltatar
         self._cwd = cwd
         self._password = deltatar.password
+        self._crypto_key = deltatar.crypto_key
         self._decryptors = []
 
         try:
@@ -1485,7 +1507,8 @@ class RestoreHelper(object):
 
                 decryptor = None
                 if self._password is not None:
-                    decryptor = crypto.Decrypt (self._password)
+                    decryptor = crypto.Decrypt (password=self._password,
+                                                key=self._crypto_key)
 
                 # make paths absolute to avoid cwd problems
                 if not os.path.isabs(index):
index 98100a8..50e3f2e 100644 (file)
@@ -58,14 +58,64 @@ class AESGCMTest (CryptoLayerTest):
 
     def test_crypto_aes_gcm_enc_ctor (self):
         password   = str (os.urandom (42))
-        encryptor  = crypto.Encrypt (password, TEST_VERSION,
-                                     TEST_PARAMVERSION, nacl=TEST_STATIC_NACL)
+        encryptor  = crypto.Encrypt (TEST_VERSION,
+                                     TEST_PARAMVERSION,
+                                     password=password,
+                                     nacl=TEST_STATIC_NACL)
+
+
+    def test_crypto_aes_gcm_enc_ctor_key (self):
+        key        = os.urandom (42)
+        encryptor  = crypto.Encrypt (TEST_VERSION,
+                                     TEST_PARAMVERSION,
+                                     key=key,
+                                     nacl=TEST_STATIC_NACL)
+
+
+    def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
+        """
+        Either key (+nacl) or password must be supplied, not both.
+        """
+        try:
+            encryptor = crypto.Encrypt (TEST_VERSION,
+                                        TEST_PARAMVERSION,
+                                        nacl=TEST_STATIC_NACL)
+        except crypto.InvalidParameter: # neither key nor pw
+            pass
+
+        password = str (os.urandom (42))
+        key      =      os.urandom (16)  # scrypt sized
+        try:
+            encryptor = crypto.Encrypt (TEST_VERSION,
+                                        TEST_PARAMVERSION,
+                                        password=password,
+                                        key=key,
+                                        nacl=TEST_STATIC_NACL)
+        except crypto.InvalidParameter: # both key and pw
+            pass
+
+        try:
+            encryptor = crypto.Encrypt (TEST_VERSION,
+                                        TEST_PARAMVERSION,
+                                        key=key,
+                                        nacl=None)
+        except crypto.InvalidParameter: # key, but salt missing
+            pass
+
+        try:
+            encryptor = crypto.Encrypt (TEST_VERSION,
+                                        TEST_PARAMVERSION,
+                                        password=b"",
+                                        nacl=TEST_STATIC_NACL)
+        except crypto.InvalidParameter: # empty pw
+            pass
 
 
     def test_crypto_aes_gcm_enc_header_size (self):
         password       = str (os.urandom (42))
-        encryptor      = crypto.Encrypt (password, TEST_VERSION,
+        encryptor      = crypto.Encrypt (TEST_VERSION,
                                          TEST_PARAMVERSION,
+                                         password=password,
                                          nacl=TEST_STATIC_NACL)
 
         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
@@ -77,8 +127,9 @@ class AESGCMTest (CryptoLayerTest):
 
     def test_crypto_aes_gcm_enc_chunk_size (self):
         password       = str (os.urandom (42))
-        encryptor      = crypto.Encrypt (password, TEST_VERSION,
+        encryptor      = crypto.Encrypt (TEST_VERSION,
                                          TEST_PARAMVERSION,
+                                         password=password,
                                          nacl=TEST_STATIC_NACL)
 
         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
@@ -88,10 +139,36 @@ class AESGCMTest (CryptoLayerTest):
         assert len (rest) == 0
 
 
+    def test_crypto_aes_gcm_dec_ctor (self):
+        """
+        Ensure that only either key or password is accepted.
+        """
+        password = str (os.urandom (42))
+        key      =      os.urandom (16)  # scrypt sized
+
+        decryptor = crypto.Decrypt (password=password)
+        decryptor = crypto.Decrypt (key=key)
+
+        try:
+            decryptor = crypto.Decrypt (password=password, key=key)
+        except crypto.InvalidParameter: # both password and key
+            pass
+
+        try:
+            decryptor = crypto.Decrypt (password=None, key=None)
+        except crypto.InvalidParameter: # neither password nor key
+            pass
+
+        try:
+            decryptor = crypto.Decrypt (password="")
+        except crypto.InvalidParameter: # empty password
+            pass
+
     def test_crypto_aes_gcm_dec_simple (self):
         password       = str (os.urandom (42))
-        encryptor      = crypto.Encrypt (password, TEST_VERSION,
+        encryptor      = crypto.Encrypt (TEST_VERSION,
                                          TEST_PARAMVERSION,
+                                         password=password,
                                          nacl=TEST_STATIC_NACL)
 
         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
@@ -99,7 +176,7 @@ class AESGCMTest (CryptoLayerTest):
         rest, header, fixed = encryptor.done (header_dummy)
         ciphertext    += rest
 
-        decryptor      = crypto.Decrypt (password, fixedparts=fixed)
+        decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
         decryptor.next (header)
         plaintext      = decryptor.process (ciphertext)
         rest           = decryptor.done ()
@@ -110,8 +187,9 @@ class AESGCMTest (CryptoLayerTest):
 
     def test_crypto_aes_gcm_dec_bad_tag (self):
         password       = str (os.urandom (42))
-        encryptor      = crypto.Encrypt (password, TEST_VERSION,
+        encryptor      = crypto.Encrypt (TEST_VERSION,
                                          TEST_PARAMVERSION,
+                                         password=password,
                                          nacl=TEST_STATIC_NACL)
 
         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
@@ -125,7 +203,7 @@ class AESGCMTest (CryptoLayerTest):
         mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
         header         = bytes (mut_header)
 
-        decryptor      = crypto.Decrypt (password, fixedparts=fixed)
+        decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
         decryptor.next (header)
         plaintext      = decryptor.process (ciphertext)
         try:
@@ -138,8 +216,9 @@ class AESGCMTest (CryptoLayerTest):
         cnksiz = 1 << 10
         pt    = fill_mod (1 << 14)
         password       = str (os.urandom (42))
-        encryptor      = crypto.Encrypt (password, TEST_VERSION,
+        encryptor      = crypto.Encrypt (TEST_VERSION,
                                          TEST_PARAMVERSION,
+                                         password=password,
                                          nacl=TEST_STATIC_NACL)
         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
 
@@ -160,8 +239,9 @@ class AESGCMTest (CryptoLayerTest):
         cnksiz         = 1 << 10
         orig_pt        = fill_mod (1 << 14)
         password       = str (os.urandom (42))
-        encryptor      = crypto.Encrypt (password, TEST_VERSION,
+        encryptor      = crypto.Encrypt (TEST_VERSION,
                                          TEST_PARAMVERSION,
+                                         password=password,
                                          nacl=TEST_STATIC_NACL)
         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
 
@@ -175,7 +255,8 @@ class AESGCMTest (CryptoLayerTest):
         cnk, header, fixed = encryptor.done (header_dummy)
         ct += cnk
 
-        decryptor      = crypto.Decrypt (password, fixedparts=fixed)
+        decryptor      = crypto.Decrypt (password=password,
+                                         fixedparts=fixed)
         decryptor.next (header)
         off = 0
         pt  = b""
@@ -194,8 +275,9 @@ class AESGCMTest (CryptoLayerTest):
         cnksiz         = 1 << 10
         orig_pt        = fill_mod (1 << 14)
         password       = str (os.urandom (42))
-        encryptor      = crypto.Encrypt (password, TEST_VERSION,
+        encryptor      = crypto.Encrypt (TEST_VERSION,
                                          TEST_PARAMVERSION,
+                                         password=password,
                                          nacl=TEST_STATIC_NACL)
         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
 
@@ -216,7 +298,8 @@ class AESGCMTest (CryptoLayerTest):
         mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
         header         = bytes (mut_header)
 
-        decryptor      = crypto.Decrypt (password, fixedparts=fixed)
+        decryptor      = crypto.Decrypt (password=password,
+                                         fixedparts=fixed)
         decryptor.next (header)
         off = 0
         pt  = b""
@@ -242,8 +325,9 @@ class AESGCMTest (CryptoLayerTest):
         orig_pt_1      = fill_mod (1 << 10)
         orig_pt_2      = fill_mod (1 << 10, 42)
         password       = str (os.urandom (42))
-        encryptor      = crypto.Encrypt (password, TEST_VERSION,
+        encryptor      = crypto.Encrypt (TEST_VERSION,
                                          TEST_PARAMVERSION,
+                                         password=password,
                                          nacl=TEST_STATIC_NACL)
 
         def enc (pt):
@@ -271,7 +355,7 @@ class AESGCMTest (CryptoLayerTest):
         # transplant into other header
         mut_hdr_2_vw [iv_lo : iv_hi] = iv_1
         hdr_2_mod    = bytes (mut_hdr_2)
-        decryptor    = crypto.Decrypt (password, fixedparts=fixed,
+        decryptor    = crypto.Decrypt (password=password, fixedparts=fixed,
                                        strict_ivs=True)
 
         def dec (hdr, ct):
index b2fbc93..a6e8f01 100644 (file)
@@ -195,7 +195,7 @@ def test(volume_size, input_size_factor, mode, password, temp_dir, prefix='',
 
         encryptor = None
         if password is not None:
-            encryptor = crypto.Encrypt (password, 1, 1)
+            encryptor = crypto.Encrypt (1, 1, password=password)
 
         tarobj = TarFile.open(temp_name, mode=mode,
                               max_volume_size=volume_size*MiB,
@@ -257,7 +257,7 @@ def test(volume_size, input_size_factor, mode, password, temp_dir, prefix='',
             print(prefix + 'extracting:')
         decryptor = None
         if password is not None:
-            decryptor = crypto.Decrypt (password)
+            decryptor = crypto.Decrypt (password=password)
         tarobj = TarFile.open(temp_name, mode=mode.replace('w', 'r'),
                               new_volume_handler=new_volume_handler,
                               encryption=decryptor, debug=debug_level)