From: Philipp Gesang Date: Mon, 8 May 2017 14:27:13 +0000 (+0200) Subject: expand crypto api to accept precomputed key X-Git-Tag: v2.2~7^2~120 X-Git-Url: http://developer.intra2net.com/git/?a=commitdiff_plain;h=1f3fd7b02fd7fdd34b531bf53a2ae524312c4af2;p=python-delta-tar expand crypto api to accept precomputed key --- diff --git a/deltatar/crypto.py b/deltatar/crypto.py index fed8e56..b8ad82d 100755 --- a/deltatar/crypto.py +++ b/deltatar/crypto.py @@ -494,12 +494,17 @@ class Crypto (object): self.next_pfx () - def set_parameters (self, password, paramversion=None, nacl=None, - counter=None, nextpfx=None): + def set_parameters (self, password=None, key=None, paramversion=None, + nacl=None, counter=None, nextpfx=None): if nextpfx is not None: self.next_pfx = nextpfx self.next_pfx () self.set_object_counter (counter) + + if key is not None: + self.key, self.nacl = key, nacl + return + if isinstance (password, bytes) is False: password = str.encode (password) self.password = password if paramversion is None and nacl is None: @@ -527,7 +532,8 @@ class Crypto (object): if ( self.paramversion != paramversion or self.password != password or self.nacl != nacl): - self.set_parameters (password, paramversion, nacl) + self.set_parameters (password=password, paramversion=paramversion, + nacl=nacl) def counters (self): @@ -540,15 +546,26 @@ class Encrypt (Crypto): version = None paramenc = None - def __init__ (self, password, version, paramversion, nacl=None, + def __init__ (self, version, paramversion, password=None, key=None, nacl=None, counter=AES_GCM_IV_CNT_DATA): - # passwort - if isinstance (password, str) is False: - raise InvalidParameter ("__init__: password must be a string, not %s" - % type (password)) - if len (password) == 0: - raise InvalidParameter ("__init__: refusing to encrypt with empty " - "password") + if password is None and key is None \ + or password is not None and key is not None : + raise InvalidParameter ("__init__: need either key or password") + + if key is not None: + if isinstance (key, bytes) is False: + raise InvalidParameter ("__init__: key must be provided as " + "bytes, not %s" % type (key)) + if nacl is None: + raise InvalidParameter ("__init__: salt must be provided along " + "with encryption key") + else: # password, no key + if isinstance (password, str) is False: + raise InvalidParameter ("__init__: password must be a string, not %s" + % type (password)) + if len (password) == 0: + raise InvalidParameter ("__init__: supplied empty password but not " + "permitted for PDT encrypted files") # version if isinstance (version, int) is False: raise InvalidParameter ("__init__: version number must be an " @@ -576,7 +593,7 @@ class Encrypt (Crypto): self.version = version self.paramenc = ENCRYPTION_PARAMETERS.get (paramversion) ["enc"] - super().__init__ (password, paramversion, nacl, counter=counter, + super().__init__ (password, key, paramversion, nacl, counter=counter, nextpfx=lambda: self.fixed.append (os.urandom(8))) @@ -650,15 +667,23 @@ class Decrypt (Crypto): strict_ivs = False # if True, panic on duplicate object IV last_iv = None # check consecutive ivs in strict mode - def __init__ (self, password, counter=None, fixedparts=None, + def __init__ (self, password=None, key=None, counter=None, fixedparts=None, strict_ivs=False): - # passwort - if isinstance (password, str) is False: - raise InvalidParameter ("__init__: password must be a string, not %s" - % type (password)) - if len (password) == 0: - raise InvalidParameter ("__init__: supplied empty password but not " - "permitted for PDT encrypted files") + if password is None and key is None \ + or password is not None and key is not None : + raise InvalidParameter ("__init__: need either key or password") + + if key is not None: + if isinstance (key, bytes) is False: + raise InvalidParameter ("__init__: key must be provided as " + "bytes, not %s" % type (key)) + else: # password, no key + if isinstance (password, str) is False: + raise InvalidParameter ("__init__: password must be a string, not %s" + % type (password)) + if len (password) == 0: + raise InvalidParameter ("__init__: supplied empty password but not " + "permitted for PDT encrypted files") # fixed parts if fixedparts is not None: if isinstance (fixedparts, list) is False: @@ -667,12 +692,11 @@ class Decrypt (Crypto): % type (fixedparts)) self.fixed = fixedparts self.fixed.sort () - super().__init__ (password, counter=counter) self.used_ivs = set () self.strict_ivs = strict_ivs - super().__init__ (password, counter=counter) + super().__init__ (password, key, counter=counter) def valid_fixed_part (self, iv): @@ -845,7 +869,7 @@ def depdtcrypt (mode, pw, ins, outs): outfile = None # Python file object for output if mode & PDTCRYPT_DECRYPT: # decryptor - decr = Decrypt (pw, strict_ivs=PDTCRYPT_STRICTIVS) + decr = Decrypt (password=pw, strict_ivs=PDTCRYPT_STRICTIVS) else: decr = PassthroughDecryptor () diff --git a/deltatar/deltatar.py b/deltatar/deltatar.py index d0d6b4e..0e42260 100644 --- a/deltatar/deltatar.py +++ b/deltatar/deltatar.py @@ -93,6 +93,8 @@ class DeltaTar(object): # used together with aes modes to encrypt and decrypt backups. password = None + crypto_key = None + nacl = None # parameter version to use when encrypting; note that this has no effect # on decryption since the required settings are determined from the headers @@ -151,6 +153,7 @@ class DeltaTar(object): def __init__(self, excluded_files=[], included_files=[], filter_func=None, mode="", password=None, + crypto_key=None, nacl=None, crypto_paramversion=DELTATAR_PARAMETER_VERSION, logger=None, index_mode=None, index_name_func=None, volume_name_func=None): @@ -185,6 +188,14 @@ class DeltaTar(object): '|bz2' open a bzip2 compressed stream of tar blocks '#gz' open a stream of gzip compressed tar blocks + - crypto_key: used to encrypt and decrypt backups. Encryption will + be enabled automatically if a key is supplied. Requires a salt to be + passed as well. + + - nacl: salt that was used to derive the encryption key for embedding + in the PDTCRYPT header. Not needed when decrypting and when + encrypting with password. + - password: used to encrypt and decrypt backups. Encryption will be enabled automatically if a password is supplied. @@ -227,6 +238,10 @@ class DeltaTar(object): self.logger.addHandler(logger) self.mode = mode + if crypto_key is not None: + self.crypto_key = crypto_key + self.nacl = nacl # encryption only + if password is not None: self.password = password @@ -519,15 +534,19 @@ class DeltaTar(object): def initialize_encryption (self, mode): password = self.password + key = self.crypto_key + nacl = self.nacl - if password is None: + if key is None and password is None: return if mode == CRYPTO_MODE_ENCRYPT: - return crypto.Encrypt (password, + return crypto.Encrypt (password=password, + key=key, + nacl=nacl, version=DELTATAR_HEADER_VERSION, paramversion=self.crypto_paramversion) if mode == CRYPTO_MODE_DECRYPT: - return crypto.Decrypt (password) + return crypto.Decrypt (password=password, key=key) raise Exception ("invalid encryption mode [%r]" % mode) @@ -1060,7 +1079,9 @@ class DeltaTar(object): if self.tar_obj is None: decryptor = None if self.delta_tar.password is not None: - decryptor = crypto.Decrypt (self.delta_tar.password) + decryptor = crypto.Decrypt \ + (password=self.delta_tar.password, + key=self.delta_tar.crypto_key) self.tar_obj = tarfile.TarFile.open(self.tar_path, mode='r' + self.delta_tar.mode, format=tarfile.GNU_FORMAT, @@ -1467,6 +1488,7 @@ class RestoreHelper(object): self._deltatar = deltatar self._cwd = cwd self._password = deltatar.password + self._crypto_key = deltatar.crypto_key self._decryptors = [] try: @@ -1485,7 +1507,8 @@ class RestoreHelper(object): decryptor = None if self._password is not None: - decryptor = crypto.Decrypt (self._password) + decryptor = crypto.Decrypt (password=self._password, + key=self._crypto_key) # make paths absolute to avoid cwd problems if not os.path.isabs(index): diff --git a/testing/test_crypto.py b/testing/test_crypto.py index 98100a8..50e3f2e 100644 --- a/testing/test_crypto.py +++ b/testing/test_crypto.py @@ -58,14 +58,64 @@ class AESGCMTest (CryptoLayerTest): def test_crypto_aes_gcm_enc_ctor (self): password = str (os.urandom (42)) - encryptor = crypto.Encrypt (password, TEST_VERSION, - TEST_PARAMVERSION, nacl=TEST_STATIC_NACL) + encryptor = crypto.Encrypt (TEST_VERSION, + TEST_PARAMVERSION, + password=password, + nacl=TEST_STATIC_NACL) + + + def test_crypto_aes_gcm_enc_ctor_key (self): + key = os.urandom (42) + encryptor = crypto.Encrypt (TEST_VERSION, + TEST_PARAMVERSION, + key=key, + nacl=TEST_STATIC_NACL) + + + def test_crypto_aes_gcm_enc_ctor_no_key_pw (self): + """ + Either key (+nacl) or password must be supplied, not both. + """ + try: + encryptor = crypto.Encrypt (TEST_VERSION, + TEST_PARAMVERSION, + nacl=TEST_STATIC_NACL) + except crypto.InvalidParameter: # neither key nor pw + pass + + password = str (os.urandom (42)) + key = os.urandom (16) # scrypt sized + try: + encryptor = crypto.Encrypt (TEST_VERSION, + TEST_PARAMVERSION, + password=password, + key=key, + nacl=TEST_STATIC_NACL) + except crypto.InvalidParameter: # both key and pw + pass + + try: + encryptor = crypto.Encrypt (TEST_VERSION, + TEST_PARAMVERSION, + key=key, + nacl=None) + except crypto.InvalidParameter: # key, but salt missing + pass + + try: + encryptor = crypto.Encrypt (TEST_VERSION, + TEST_PARAMVERSION, + password=b"", + nacl=TEST_STATIC_NACL) + except crypto.InvalidParameter: # empty pw + pass def test_crypto_aes_gcm_enc_header_size (self): password = str (os.urandom (42)) - encryptor = crypto.Encrypt (password, TEST_VERSION, + encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, + password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) @@ -77,8 +127,9 @@ class AESGCMTest (CryptoLayerTest): def test_crypto_aes_gcm_enc_chunk_size (self): password = str (os.urandom (42)) - encryptor = crypto.Encrypt (password, TEST_VERSION, + encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, + password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) @@ -88,10 +139,36 @@ class AESGCMTest (CryptoLayerTest): assert len (rest) == 0 + def test_crypto_aes_gcm_dec_ctor (self): + """ + Ensure that only either key or password is accepted. + """ + password = str (os.urandom (42)) + key = os.urandom (16) # scrypt sized + + decryptor = crypto.Decrypt (password=password) + decryptor = crypto.Decrypt (key=key) + + try: + decryptor = crypto.Decrypt (password=password, key=key) + except crypto.InvalidParameter: # both password and key + pass + + try: + decryptor = crypto.Decrypt (password=None, key=None) + except crypto.InvalidParameter: # neither password nor key + pass + + try: + decryptor = crypto.Decrypt (password="") + except crypto.InvalidParameter: # empty password + pass + def test_crypto_aes_gcm_dec_simple (self): password = str (os.urandom (42)) - encryptor = crypto.Encrypt (password, TEST_VERSION, + encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, + password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) @@ -99,7 +176,7 @@ class AESGCMTest (CryptoLayerTest): rest, header, fixed = encryptor.done (header_dummy) ciphertext += rest - decryptor = crypto.Decrypt (password, fixedparts=fixed) + decryptor = crypto.Decrypt (password=password, fixedparts=fixed) decryptor.next (header) plaintext = decryptor.process (ciphertext) rest = decryptor.done () @@ -110,8 +187,9 @@ class AESGCMTest (CryptoLayerTest): def test_crypto_aes_gcm_dec_bad_tag (self): password = str (os.urandom (42)) - encryptor = crypto.Encrypt (password, TEST_VERSION, + encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, + password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) @@ -125,7 +203,7 @@ class AESGCMTest (CryptoLayerTest): mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 header = bytes (mut_header) - decryptor = crypto.Decrypt (password, fixedparts=fixed) + decryptor = crypto.Decrypt (password=password, fixedparts=fixed) decryptor.next (header) plaintext = decryptor.process (ciphertext) try: @@ -138,8 +216,9 @@ class AESGCMTest (CryptoLayerTest): cnksiz = 1 << 10 pt = fill_mod (1 << 14) password = str (os.urandom (42)) - encryptor = crypto.Encrypt (password, TEST_VERSION, + encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, + password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) @@ -160,8 +239,9 @@ class AESGCMTest (CryptoLayerTest): cnksiz = 1 << 10 orig_pt = fill_mod (1 << 14) password = str (os.urandom (42)) - encryptor = crypto.Encrypt (password, TEST_VERSION, + encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, + password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) @@ -175,7 +255,8 @@ class AESGCMTest (CryptoLayerTest): cnk, header, fixed = encryptor.done (header_dummy) ct += cnk - decryptor = crypto.Decrypt (password, fixedparts=fixed) + decryptor = crypto.Decrypt (password=password, + fixedparts=fixed) decryptor.next (header) off = 0 pt = b"" @@ -194,8 +275,9 @@ class AESGCMTest (CryptoLayerTest): cnksiz = 1 << 10 orig_pt = fill_mod (1 << 14) password = str (os.urandom (42)) - encryptor = crypto.Encrypt (password, TEST_VERSION, + encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, + password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) @@ -216,7 +298,8 @@ class AESGCMTest (CryptoLayerTest): mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 header = bytes (mut_header) - decryptor = crypto.Decrypt (password, fixedparts=fixed) + decryptor = crypto.Decrypt (password=password, + fixedparts=fixed) decryptor.next (header) off = 0 pt = b"" @@ -242,8 +325,9 @@ class AESGCMTest (CryptoLayerTest): orig_pt_1 = fill_mod (1 << 10) orig_pt_2 = fill_mod (1 << 10, 42) password = str (os.urandom (42)) - encryptor = crypto.Encrypt (password, TEST_VERSION, + encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, + password=password, nacl=TEST_STATIC_NACL) def enc (pt): @@ -271,7 +355,7 @@ class AESGCMTest (CryptoLayerTest): # transplant into other header mut_hdr_2_vw [iv_lo : iv_hi] = iv_1 hdr_2_mod = bytes (mut_hdr_2) - decryptor = crypto.Decrypt (password, fixedparts=fixed, + decryptor = crypto.Decrypt (password=password, fixedparts=fixed, strict_ivs=True) def dec (hdr, ct): diff --git a/testing/test_multivol_compression_sizes.py b/testing/test_multivol_compression_sizes.py index b2fbc93..a6e8f01 100644 --- a/testing/test_multivol_compression_sizes.py +++ b/testing/test_multivol_compression_sizes.py @@ -195,7 +195,7 @@ def test(volume_size, input_size_factor, mode, password, temp_dir, prefix='', encryptor = None if password is not None: - encryptor = crypto.Encrypt (password, 1, 1) + encryptor = crypto.Encrypt (1, 1, password=password) tarobj = TarFile.open(temp_name, mode=mode, max_volume_size=volume_size*MiB, @@ -257,7 +257,7 @@ def test(volume_size, input_size_factor, mode, password, temp_dir, prefix='', print(prefix + 'extracting:') decryptor = None if password is not None: - decryptor = crypto.Decrypt (password) + decryptor = crypto.Decrypt (password=password) tarobj = TarFile.open(temp_name, mode=mode.replace('w', 'r'), new_volume_handler=new_volume_handler, encryption=decryptor, debug=debug_level)