From: Philipp Gesang Date: Fri, 17 Apr 2020 09:27:48 +0000 (+0200) Subject: enable strict IV checking by default during decryption X-Git-Tag: v2.2~1^2~6 X-Git-Url: http://developer.intra2net.com/git/?p=python-delta-tar;a=commitdiff_plain;h=b750b280c78f7b62ba30249c68a8542a3421534c enable strict IV checking by default during decryption Address Deltatar audit item 2.3: Ciphertexts can be re-ordered or dropped when decryption is non-linear Set *decryption* contexts to validate IVs unless the user opts out. This requires sprinkling options for non-strict behavior wherever a decryption context might be created, and disabling the strict checking for APIs intended for use with possibly corrupted inputs. During a normal *encryption* when only a single Encrypt handle is used, the IVs are guaranteed unique. Strict tracking of IVs is only necessary in testing and when working with multiple Encrypt handles so used IVs are to be compared across handlers afterwards. Thus we need not enable the ``strict_ivs'' option for Encrypt handles. --- diff --git a/deltatar/crypto.py b/deltatar/crypto.py index 388e9c7..178a469 100755 --- a/deltatar/crypto.py +++ b/deltatar/crypto.py @@ -919,7 +919,7 @@ class Crypto (object): iv = None # current IV fixed = None # accu for 64 bit fixed parts of IV used_ivs = None # tracks IVs - strict_ivs = False # if True, panic on duplicate object IV + strict_ivs = False # if True, panic on duplicate or non-consecutive object IV password = None paramversion = None insecure = False # allow plaintext parameters @@ -1051,19 +1051,16 @@ class Crypto (object): return out - def next (self, password, paramversion, nacl, iv): + def next (self, password, paramversion, nacl): """ Prepare for encrypting another object: Reset the data counters and change the configuration in case one of the variable parameters differs - from the last object. Also check the IV for duplicates and error out - if strict checking was requested. + from the last object. """ self.ctsize = 0 self.ptsize = 0 self.stats ["obj"] += 1 - self.check_duplicate_iv (iv) - if ( self.paramversion != paramversion or self.password != password or self.nacl != nacl): @@ -1072,18 +1069,6 @@ class Crypto (object): insecure=self.insecure) - def check_duplicate_iv (self, iv): - """ - Add an IV (the 12 byte representation as in the header) to the list. With - strict checking enabled, this will throw a ``DuplicateIV``. Depending on - the context, this may indicate a serious error (IV reuse). - """ - if self.strict_ivs is True and iv in self.used_ivs: - raise DuplicateIV ("iv %s was reused" % iv_fmt (iv)) - # vi has not been used before; add to collection - self.used_ivs.add (iv) - - def counters (self): """ Access the data counters. @@ -1108,6 +1093,13 @@ class Crypto (object): return self.used_ivs + def reset_last_iv (self): + """ + Implemented only for decryptor; no-op otherwise. + """ + pass + + class Encrypt (Crypto): lastinfo = None @@ -1115,7 +1107,7 @@ class Encrypt (Crypto): paramenc = None def __init__ (self, version, paramversion, password=None, key=None, nacl=None, - counter=AES_GCM_IV_CNT_DATA, strict_ivs=True, insecure=False): + counter=AES_GCM_IV_CNT_DATA, strict_ivs=False, insecure=False): """ The ctor will throw immediately if one of the parameters does not conform to our expectations. @@ -1132,7 +1124,9 @@ class Encrypt (Crypto): ``AES_GCM_IV_CNT_INDEX`` are unique in each backup set and cannot be reused even with different fixed parts. :type strict_ivs: bool - :type insecure: bool, whether to permit passthrough mode + :param strict_ivs: Enable paranoid tracking of IVs. + :type insecure: bool + :param insecure: whether to permit passthrough mode *Security considerations*: The ``class Encrypt`` handle guarantees that all random parts (first eight bytes) of the IVs used for encrypting @@ -1275,12 +1269,32 @@ class Encrypt (Crypto): % self.paramversion) hdrdum = hdr_make_dummy (filename) self.lastinfo = (filename, hdrdum) - super().next (self.password, self.paramversion, self.nacl, self.iv) + + self.check_duplicate_iv (self.iv) + + super().next (self.password, self.paramversion, self.nacl) self.set_object_counter (self.cnt + 1) return hdrdum + def check_duplicate_iv (self, iv): + """ + Add an IV (the 12 byte representation as in the header) to the list. With + strict checking enabled, this will throw a ``DuplicateIV``. Depending on + the context, this may indicate a serious error (IV reuse). + + IVs are only tracked in strict_ivs mode. + """ + if self.strict_ivs is False: + return + + if iv in self.used_ivs: + raise DuplicateIV ("iv %s was reused" % iv_fmt (iv)) + # vi has not been used before; add to collection + self.used_ivs.add (iv) + + def done (self, cmpdata): """ Complete encryption of an object. After this has been called, attempts @@ -1344,7 +1358,7 @@ class Decrypt (Crypto): hdr_ctsize = -1 def __init__ (self, password=None, key=None, counter=None, fixedparts=None, - strict_ivs=False, insecure=False): + strict_ivs=True, insecure=False): """ Sanitizing ctor for the decryption context. ``fixedparts`` specifies a list of IV fixed parts accepted during decryption. If a fixed part is @@ -1359,9 +1373,20 @@ class Decrypt (Crypto): ``AES_GCM_IV_CNT_INDEX`` are unique in each backup set and cannot be reused even with different fixed parts. :type fixedparts: bytes list + :type strict_ivs: bool + :param strict_ivs: fail if IVs of decrypted objects are not linearly + increasing :type insecure: bool :param insecure: whether to process objects encrypted in passthrough mode (*``paramversion`` < 1*) + + *Security considerations*: The ``strict_ivs`` setting protects against + ciphertext reordering and injection attacks. For this to work it relies + on a property of how the object counters are created during encryption. + If multiple ``Encrypt`` handles have been used during encryption, this + is property is unlikely to apply as it would require manual management + of counters across Encrypt handles. In these cases it may thus be + necessary to disable the ```strict_ivs`` protection. """ if password is None and key is None \ or password is not None and key is not None : @@ -1401,6 +1426,17 @@ class Decrypt (Crypto): return i != len (self.fixed) and self.fixed [i] == fixed + def reset_last_iv (self): + """ + Force a new IV sequence start. The last IV counter will be set from the + next IV encountered and the check for consecutive IVs will be suppressed. + + The intended use is backup volume boundaries or handling batches of + objects encrypted with ``Encrypt`` handles initialized with different + initial counter values. + """ + self.last_iv = None + def check_consecutive_iv (self, iv): """ Check whether the counter part of the given IV is indeed the successor @@ -1412,10 +1448,10 @@ class Decrypt (Crypto): if self.strict_ivs is True \ and self.last_iv is not None \ and self.last_iv [0] == fixed \ - and self.last_iv [1] != cnt - 1: + and self.last_iv [1] + 1 != cnt: raise NonConsecutiveIV ("iv %s counter not successor of " "last object (expected %d, found %d)" - % (iv_fmt (iv), self.last_iv [1], cnt)) + % (iv_fmt (iv), self.last_iv [1] + 1, cnt)) self.last_iv = (fixed, cnt) @@ -1447,10 +1483,11 @@ class Decrypt (Crypto): self.hdr_ctsize = ctsize - super().next (self.password, paramversion, nacl, iv) + super().next (self.password, paramversion, nacl) if self.fixed is not None and self.valid_fixed_part (iv) is False: raise InvalidIVFixedPart ("iv %s has invalid fixed part" % iv_fmt (iv)) + self.check_consecutive_iv (iv) self.tag = tag diff --git a/deltatar/deltatar.py b/deltatar/deltatar.py index a9f8a9c..9ebe6d7 100644 --- a/deltatar/deltatar.py +++ b/deltatar/deltatar.py @@ -555,7 +555,13 @@ class DeltaTar(object): return path - def initialize_encryption (self, mode): + def initialize_encryption (self, mode, strict_validation=True): + """ + :type strict_validation: bool + :param strict_validation: Enable strict IV checking in the crypto + layer. Should be disabled when dealing with + potentially corrupted data. + """ password = self.password key = self.crypto_key nacl = self.nacl @@ -569,12 +575,14 @@ class DeltaTar(object): version=self.crypto_version, paramversion=self.crypto_paramversion) if mode == CRYPTO_MODE_DECRYPT: - return crypto.Decrypt (password=password, key=key) + return crypto.Decrypt (password=password, key=key, + strict_ivs=strict_validation) raise Exception ("invalid encryption mode [%r]" % mode) - def open_auxiliary_file(self, path, mode='r', kind=AUXILIARY_FILE_INDEX): + def open_auxiliary_file(self, path, mode='r', kind=AUXILIARY_FILE_INDEX, + strict_validation=True): ''' Given the specified configuration, opens a file for reading or writing, inheriting the encryption and compression settings from the backup. @@ -600,7 +608,8 @@ class DeltaTar(object): if mode == "w": crypto_ctx = self.initialize_encryption (CRYPTO_MODE_ENCRYPT) elif mode == "r": - crypto_ctx = self.initialize_encryption (CRYPTO_MODE_DECRYPT) + crypto_ctx = self.initialize_encryption (CRYPTO_MODE_DECRYPT, + strict_validation=strict_validation) if crypto_ctx is not None: if kind == AUXILIARY_FILE_INFO: @@ -1004,7 +1013,7 @@ class DeltaTar(object): index_sink.close() - def iterate_index_path(self, index_path): + def iterate_index_path(self, index_path, strict_validation=True): ''' Returns an index iterator. Internally, it uses a classic iterator class. We do that instead of just yielding so that the iterator object can have @@ -1032,7 +1041,10 @@ class DeltaTar(object): Allows this iterator to be used with the "with" statement ''' if self.f is None: - self.f = self.delta_tar.open_auxiliary_file(self.index_path, 'r') + self.f = self.delta_tar.open_auxiliary_file \ + (self.index_path, + 'r', + strict_validation=strict_validation) # check index header j, l_no = self.delta_tar._parse_json_line(self.f, 0) if j.get("type", '') != 'python-delta-tar-index' or\ @@ -1116,7 +1128,8 @@ class DeltaTar(object): if self.delta_tar.password is not None: decryptor = crypto.Decrypt \ (password=self.delta_tar.password, - key=self.delta_tar.crypto_key) + key=self.delta_tar.crypto_key, + strict_ivs=False) self.tar_obj = tarfile.TarFile.open(self.tar_path, mode='r' + self.delta_tar.mode, format=tarfile.GNU_FORMAT, @@ -1325,7 +1338,9 @@ class DeltaTar(object): tarobj.open_volume(volume_path, encryption=encryption) if self.decryptor is None: - self.decryptor = self.initialize_encryption (CRYPTO_MODE_DECRYPT) + self.decryptor = \ + self.initialize_encryption (CRYPTO_MODE_DECRYPT, + strict_validation=False) backup_path = os.path.dirname(backup_tar_path) if not os.path.isabs(backup_path): @@ -1354,7 +1369,8 @@ class DeltaTar(object): def restore_backup(self, target_path, backup_indexes_paths=[], backup_tar_path=None, restore_callback=None, - disaster=tarfile.TOLERANCE_STRICT, backup_index=None): + disaster=tarfile.TOLERANCE_STRICT, backup_index=None, + strict_validation=True): ''' Restores a backup. @@ -1438,7 +1454,9 @@ class DeltaTar(object): # setup for decrypting payload if self.decryptor is None: - self.decryptor = self.initialize_encryption (CRYPTO_MODE_DECRYPT) + self.decryptor = \ + self.initialize_encryption (CRYPTO_MODE_DECRYPT, + strict_validation=strict_validation) if mode == 'tar': index_it = self.iterate_tar_path(backup_tar_path) @@ -1450,7 +1468,9 @@ class DeltaTar(object): try: # get iterator from newest index at _data[0] index1 = helper._data[0]["path"] - index_it = self.iterate_index_path(index1) + index_it = \ + self.iterate_index_path(index1, + strict_validation=strict_validation) except tarfile.DecryptionError as exn: self.logger.error("failed to decrypt file [%s]: %s; is this an " "actual encrypted index file?" @@ -1467,6 +1487,7 @@ class DeltaTar(object): backup_index=backup_index, disaster=disaster) + index_decryptor = helper._data[0]["decryptor"] dir_it = self._recursive_walk_dir('.') dir_path_it = self.jsonize_path_iterator(dir_it) @@ -1552,7 +1573,8 @@ class DeltaTar(object): """ return self.restore_backup(target_path, backup_indexes_paths=backup_indexes_paths, - disaster=tarfile.TOLERANCE_RECOVER) + disaster=tarfile.TOLERANCE_RECOVER, + strict_validation=False) def rescue_backup(self, target_path, backup_tar_path, @@ -1576,7 +1598,8 @@ class DeltaTar(object): return self.restore_backup(target_path, backup_index=backup_index, backup_tar_path=backup_tar_path, - disaster=tarfile.TOLERANCE_RESCUE) + disaster=tarfile.TOLERANCE_RESCUE, + strict_validation=False) def _parse_json_line(self, f, l_no): @@ -1649,6 +1672,10 @@ class RestoreHelper(object): self._decryptors = [] self._disaster = disaster + # Disable strict checking for linearly increasing IVs when running + # in rescue or recover mode. + strict_validation = disaster == tarfile.TOLERANCE_STRICT + try: import grp, pwd except ImportError: @@ -1684,7 +1711,8 @@ class RestoreHelper(object): decryptor = None if self._password is not None: decryptor = crypto.Decrypt (password=self._password, - key=self._crypto_key) + key=self._crypto_key, + strict_ivs=strict_validation) # make paths absolute to avoid cwd problems if not os.path.isabs(index): @@ -1911,9 +1939,10 @@ class RestoreHelper(object): self._deltatar.logger.warning('tarfile: %s' % e) @staticmethod - def new_volume_handler(deltarobj, cwd, is_full, backup_path, encryption, tarobj, base_name, volume_number): + def new_volume_handler(deltarobj, cwd, is_full, backup_path, decryptor, tarobj, base_name, volume_number): ''' - Handles the new volumes + Set up a new volume and perform the tasks necessary for transitioning + to the next one. ''' volume_name = deltarobj.volume_name_func(backup_path, is_full, volume_number, guess_name=True) @@ -1922,7 +1951,8 @@ class RestoreHelper(object): # we convert relative paths into absolute because CWD is changed if not os.path.isabs(volume_path): volume_path = os.path.join(cwd, volume_path) - tarobj.open_volume(volume_path, encryption=encryption) + + tarobj.open_volume(volume_path, encryption=decryptor) def restore_file(self, file_data, index_data, path, l_no, unprefixed_path): ''' diff --git a/deltatar/tarfile.py b/deltatar/tarfile.py index 325feb8..8fc47d6 100644 --- a/deltatar/tarfile.py +++ b/deltatar/tarfile.py @@ -526,6 +526,9 @@ class _Stream: self.encryption = encryption self.lasthdr = None + if encryption is not None: + encryption.reset_last_iv () + try: if comptype == "gz": try: @@ -937,8 +940,13 @@ class _Stream: """Set the stream's file pointer to pos. Negative seeking is forbidden. """ - if pos - self.pos >= 0: + if pos == self.pos: + pass # nothing to do + elif pos - self.pos >= 0: blocks, remainder = divmod(pos - self.pos, self.bufsize) + if self.encryption is not None: + # IV succession is only preserved between successive objects. + self.encryption.reset_last_iv () for i in range(blocks): self.read(self.bufsize) self.read(remainder) @@ -3720,17 +3728,26 @@ def reconstruct_offsets_tar (fname): os.close (ifd) -def read_tarobj_at_offset (fileobj, offset, mode, secret=None): +def read_tarobj_at_offset (fileobj, offset, mode, secret=None, + strict_validation=True): + """ + :type strict_validation: bool + :param strict_validation: Enable strict IV checking in the crypto + layer. Should be disabled when dealing with + potentially corrupted data. + """ decr = None if secret is not None: ks = secret [0] if ks == crypto.PDTCRYPT_SECRET_PW: - decr = crypto.Decrypt (password=secret [1]) + decr = crypto.Decrypt (password=secret [1], + strict_ivs=strict_validation) elif ks == crypto.PDTCRYPT_SECRET_KEY: key = binascii.unhexlify (secret [1]) - decr = crypto.Decrypt (key=key) + decr = crypto.Decrypt (key=key, + strict_ivs=strict_validation) else: raise RuntimeError @@ -3816,7 +3833,8 @@ def gen_rescue_index (gen_volume_name, mode, maxvol=None, password=None, key=Non fileobj = bltn_open (vpath, "rb") def aux (acc, off): - obj = read_tarobj_at_offset (fileobj, off, mode, secret=secret) + obj = read_tarobj_at_offset (fileobj, off, mode, secret=secret, + strict_validation=False) if obj is not None: acc.append ((off, nvol, obj)) return acc diff --git a/testing/test_crypto.py b/testing/test_crypto.py index 3f444f9..496f2e5 100644 --- a/testing/test_crypto.py +++ b/testing/test_crypto.py @@ -171,7 +171,8 @@ class AESGCMTest (CryptoLayerTest): TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL, - counter=start_count) + counter=start_count, + strict_ivs=True) for i, blob in enumerate (data, 1): fname = "%s_%d" % (TEST_DUMMY_FILENAME, i) @@ -820,7 +821,7 @@ class AESGCMTest (CryptoLayerTest): """ Meddle with encrypted content: extract the IV from one object and inject it into the header of another. This must be rejected - by the decryptor. + by the decryptor with paranoid IV checking enabled. """ cnksiz = 1 << 10 orig_pt_1 = fill_mod (1 << 10) @@ -829,7 +830,8 @@ class AESGCMTest (CryptoLayerTest): encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, - nacl=TEST_STATIC_NACL) + nacl=TEST_STATIC_NACL, + strict_ivs=True) def enc (pt): header_dummy = encryptor.next (TEST_DUMMY_FILENAME) diff --git a/testing/test_recover.py b/testing/test_recover.py index 9edea9d..0a47d21 100644 --- a/testing/test_recover.py +++ b/testing/test_recover.py @@ -441,7 +441,9 @@ class RecoverTest (DefectiveTest): dtar.restore_backup(target_path=self.dst_path, backup_indexes_paths=[ "%s/%s" % (bak_path, index_file) - ]) + ], + disaster=tarfile.TOLERANCE_RECOVER, + strict_validation=False) for key, value in self.hash.items (): f = "%s/%s" % (self.dst_path, key) assert os.path.exists (f) @@ -541,7 +543,9 @@ class RescueTest (DefectiveTest): dtar.restore_backup(target_path=self.dst_path, backup_indexes_paths=[ "%s/%s" % (bak_path, index_file) - ]) + ], + disaster=tarfile.TOLERANCE_RECOVER, + strict_validation=False) for key, value in self.hash.items (): f = "%s/%s" % (self.dst_path, key) assert os.path.exists (f)