import binascii import os import pylibscrypt import struct import unittest import deltatar.crypto as crypto import cryptography def b(s): return s.encode("UTF-8") CRYPTO_NACL_SIZE = 16 CRYPTO_KEY_SIZE = 16 TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail") TEST_PASSPHRASE = b"test1234" TEST_AES_GCM_AAD = b"authenticated plain text" TEST_DUMMY_FILENAME = "insurance-file.txt" TEST_VERSION = 1 TEST_PARAMVERSION = 1 TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE) PLAIN_PARAMVERSION = 0 def faux_hdr (ctsize=1337, iv=None): return \ { "version" : 42 , "paramversion" : 2187 , "nacl" : binascii.unhexlify(b"0011223344556677" b"8899aabbccddeeff") , "iv" : iv or binascii.unhexlify(b"0011223344556677" b"8899aabb") , "ctsize" : ctsize , "tag" : binascii.unhexlify(b"deadbeefbadb100d" b"b1eedc0ffeedea15") } FILL_MOD_MEMO = { } def fill_mod (n, off=0): global FILL_MOD_MEMO k = (n, off) m = FILL_MOD_MEMO.get (k, None) if m is not None: return m buf = bytearray (n) bufv = memoryview (buf) for i in range (n): off += 1 c = off % 64 + 32 struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8")) m = bytes (buf) FILL_MOD_MEMO [k] = m return m def faux_payload (): return "abcd" * 42 class CryptoLayerTest (unittest.TestCase): pass class AESGCMTest (CryptoLayerTest): os_urandom = os.urandom def tearDown (self): """Reset globals altered for testing.""" _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \ ("I am fully aware that this will void my warranty.") _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ ("I am fully aware that this will void my warranty.") os.urandom = self.os_urandom def test_crypto_aes_gcm_enc_ctor (self): password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL) def test_crypto_aes_gcm_enc_ctor_bad_plainparams (self): """Refuse plaintext passthrough mode by default.""" password = str (os.urandom (42)) with self.assertRaises (crypto.InvalidParameter): encryptor = crypto.Encrypt (TEST_VERSION, PLAIN_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL) def test_crypto_aes_gcm_enc_ctor_ok_insecure_plainparams (self): """ Comply with request for plaintext passthrough mode if the *insecure* flag is passed. """ password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, PLAIN_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL, insecure=True) def test_crypto_aes_gcm_enc_ctor_key (self): key = os.urandom (42) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, key=key, nacl=TEST_STATIC_NACL) def test_crypto_aes_gcm_enc_ctor_no_key_pw (self): """ Either key (+nacl) or password must be supplied, not both. """ with self.assertRaises (crypto.InvalidParameter): # neither key nor pw encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, nacl=TEST_STATIC_NACL) password = str (os.urandom (42)) key = os.urandom (16) # scrypt sized with self.assertRaises (crypto.InvalidParameter): # both key and pw encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, key=key, nacl=TEST_STATIC_NACL) with self.assertRaises (crypto.InvalidParameter): # key, but salt missing encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, key=key, nacl=None) with self.assertRaises (crypto.InvalidParameter): # empty pw encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=b"", nacl=TEST_STATIC_NACL) def test_crypto_aes_gcm_enc_header_size (self): password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE _, _ = encryptor.process (TEST_PLAINTEXT) _, header, _ = encryptor.done (header_dummy) assert len (header) == crypto.PDTCRYPT_HDR_SIZE def test_crypto_aes_gcm_enc_multi_ivs_ok_explicit_counter (self): """ Access the list of IVs used during encryption and check reuse. Start from explicit counters so the inputs don’t overlap. IVs must not have been reused. """ def enc (start_count, data): password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL, counter=start_count, strict_ivs=True) for i, blob in enumerate (data, 1): fname = "%s_%d" % (TEST_DUMMY_FILENAME, i) header_dummy = encryptor.next (fname) _, _ = encryptor.process (blob) _, header, _ = encryptor.done (header_dummy) assert len (encryptor.get_used_ivs ()) == i return encryptor.get_used_ivs () ivs1 = enc (0x0042, [TEST_PLAINTEXT, b"none of your business"]) ivs2 = enc (0x1337, [b"read me if you can", b"for British eyes only!"]) # No reuse in general. assert len (ivs1 & ivs2) == 0 ivs1 = list (ivs1) ivs2 = list (ivs2) # Counters of used IVs must match what we passed explicitly. def extract_counters (ivs): def getcount (iv): _, cnt = struct.unpack (crypto.FMT_I2N_IV, iv) return cnt return list (map (getcount, ivs)) cnt1 = extract_counters (ivs1) cnt2 = extract_counters (ivs2) assert 0x0042 in cnt1 assert 0x0043 in cnt1 assert 0x1337 in cnt2 assert 0x1338 in cnt2 def test_crypto_aes_gcm_enc_chunk_size (self): password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) _, ciphertext = encryptor.process (TEST_PLAINTEXT) assert len (ciphertext) == len (TEST_PLAINTEXT) rest, header, fixed = encryptor.done (header_dummy) assert len (rest) == 0 def test_crypto_aes_gcm_dec_ctor (self): """ Ensure that only either key or password is accepted. """ password = str (os.urandom (42)) key = os.urandom (16) # scrypt sized decryptor = crypto.Decrypt (password=password) decryptor = crypto.Decrypt (key=key) with self.assertRaises (crypto.InvalidParameter): # both password and key decryptor = crypto.Decrypt (password=password, key=key) with self.assertRaises (crypto.InvalidParameter): # neither password nor key decryptor = crypto.Decrypt (password=None, key=None) with self.assertRaises (crypto.InvalidParameter): # # empty password decryptor = crypto.Decrypt (password="") def test_crypto_aes_gcm_dec_simple (self): password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) _, ciphertext = encryptor.process (TEST_PLAINTEXT) rest, header, fixed = encryptor.done (header_dummy) ciphertext += rest decryptor = crypto.Decrypt (password=password, fixedparts=fixed) decryptor.next (header) plaintext = decryptor.process (ciphertext) rest = decryptor.done () plaintext += rest assert plaintext == TEST_PLAINTEXT def test_crypto_aes_gcm_dec_plain_bad (self): """ Downgrade to plaintext must not be allowed in parameters obtained from headers. """ password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) _, ciphertext = encryptor.process (TEST_PLAINTEXT) rest, header, fixed = encryptor.done (header_dummy) ciphertext += rest header = crypto.hdr_read (header) header ["paramversion"] = PLAIN_PARAMVERSION ok, header = crypto.hdr_make (header) assert ok decryptor = crypto.Decrypt (password=password, fixedparts=fixed) with self.assertRaises (crypto.InvalidParameter): decryptor.next (header) def test_crypto_aes_gcm_dec_plain_ok_insecure (self): """ Allow plaintext crypto mode if *insecure* flag is passed. """ password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, PLAIN_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL, insecure=True) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) _, ciphertext = encryptor.process (TEST_PLAINTEXT) rest, header, fixed = encryptor.done (header_dummy) ciphertext += rest header = crypto.hdr_read (header) header ["paramversion"] = PLAIN_PARAMVERSION ok, header = crypto.hdr_make (header) assert ok decryptor = crypto.Decrypt (password=password, fixedparts=fixed, insecure=True) decryptor.next (header) plaintext = decryptor.process (ciphertext) rest = decryptor.done () plaintext += rest assert plaintext == TEST_PLAINTEXT def test_crypto_aes_gcm_dec_bad_tag (self): password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) _, ciphertext = encryptor.process (TEST_PLAINTEXT) ciphertext2, header, fixed = encryptor.done (header_dummy) mut_header = bytearray (header) mut_header_vw = memoryview (mut_header) # replace one byte in the tag part of the header second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2] mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 header = bytes (mut_header) decryptor = crypto.Decrypt (password=password, fixedparts=fixed) decryptor.next (header) plaintext = decryptor.process (ciphertext) with self.assertRaises (crypto.InvalidGCMTag): _ = decryptor.done () def test_crypto_aes_gcm_enc_multicnk (self): cnksiz = 1 << 10 pt = fill_mod (1 << 14) password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) off = 0 ct = b"" while off < len (pt): upto = min (off + cnksiz, len (pt)) _, cnk = encryptor.process (pt [off:upto]) ct += cnk off += cnksiz cnk, header, fixed = encryptor.done (header_dummy) ct += cnk assert len (pt) == len (ct) def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self): cnksiz = 1 << 10 pt = fill_mod (1 << 14) password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL, strict_ivs=True) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) off = 0 ct = b"" while off < len (pt): upto = min (off + cnksiz, len (pt)) _, cnk = encryptor.process (pt [off:upto]) ct += cnk off += cnksiz cnk, header, fixed = encryptor.done (header_dummy) ct += cnk assert len (pt) == len (ct) def test_crypto_aes_gcm_enc_multiobj (self): cnksiz = 1 << 10 password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL, strict_ivs=False) def addobj (i): pt = fill_mod (1 << 14, off=i) header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) off = 0 ct = b"" while off < len (pt): upto = min (off + cnksiz, len (pt)) _, cnk = encryptor.process (pt [off:upto]) ct += cnk off += cnksiz cnk, header, fixed = encryptor.done (header_dummy) ct += cnk assert len (pt) == len (ct) for i in range (5): addobj (i) assert len (encryptor.fixed) == 1 def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self): cnksiz = 1 << 10 password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL, strict_ivs=True) curfixed = None # must remain constant after first def addobj (i): pt = fill_mod (1 << 14, off=i) header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) off = 0 ct = b"" while off < len (pt): upto = min (off + cnksiz, len (pt)) _, cnk = encryptor.process (pt [off:upto]) ct += cnk off += cnksiz cnk, header, fixed = encryptor.done (header_dummy) nonlocal curfixed if curfixed is None: curfixed = fixed else: assert fixed == curfixed ct += cnk assert len (pt) == len (ct) for i in range (5): addobj (i) assert len (encryptor.fixed) == 1 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self): """ Test behavior when the file counter tops out. Artificially lower the maximum possible file counter. Considering invalid (0) and reserved (1, 2) values, the smallest possible file counter for normal objects is 3. Starting from that, the header of the (max - 3)rd object must have both a different IV fixed part and a counter. """ minimum = 3 new_max = 8 crypto._testing_set_AES_GCM_IV_CNT_MAX \ ("I am fully aware that this will void my warranty.", new_max) cnksiz = 1 << 10 password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL, strict_ivs=True) last_iv = None last_cnt = minimum def addobj (i, wrap=False): nonlocal last_iv nonlocal last_cnt pt = fill_mod (1 << 14, off=i) header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) off = 0 ct = b"" while off < len (pt): upto = min (off + cnksiz, len (pt)) _, cnk = encryptor.process (pt [off:upto]) ct += cnk off += cnksiz cnk, header, fixed = encryptor.done (header_dummy) this_iv = crypto.hdr_read (header) ["iv"] if last_iv is not None: this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv) last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv) if wrap is False: assert last_fixed == this_fixed assert last_cnt == this_cnt - 1 else: assert last_fixed != this_fixed assert this_cnt == minimum last_iv = this_iv ct += cnk assert len (pt) == len (ct) for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8] addobj (i + 1, True) # counter wraps to 3 for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8] addobj (j + 1, True) # counter wraps to 3 again assert len (encryptor.fixed) == 3 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self): """ Test behavior when the file counter tops out and the transition to the next IV fixed part fails on account of a bad random generator. Replaces the ``urandom`` reference in ``os`` with a deterministic function. The encryptor context must communicate this condition with an ``IVFixedPartError``. """ minimum = 3 new_max = 8 crypto._testing_set_AES_GCM_IV_CNT_MAX \ ("I am fully aware that this will void my warranty.", new_max) cnksiz = 1 << 10 os.urandom = lambda n: bytes (bytearray ([n % 256] * n)) password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL, strict_ivs=True) def addobj (i): pt = fill_mod (1 << 14, off=i) header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) off = 0 while off < len (pt): upto = min (off + cnksiz, len (pt)) _, cnk = encryptor.process (pt [off:upto]) off += cnksiz for i in range (minimum, new_max): addobj (42 + i) with self.assertRaises (crypto.IVFixedPartError): addobj (42 + i) def test_crypto_aes_gcm_enc_length_cap (self): """ Artificially lower the maximum allowable data length and attempt to encrypt a larger object. Verify that the crypto handler only encrypts data up to the size limit. A downstream user detects that condition by testing whether the encryption step yielded less bytes than the plaintext. The sibling to this test is test_restore_backup_max_file_length() in test_delatar.py. Deltatar will transparently create a splitted object with an increased IV file counter. """ new_max = 2187 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ ("I am fully aware that this will void my warranty.", new_max) cnksiz = 1 << 10 password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL) def encobj (s): pt, ct = fill_mod (s), None header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s)) n, ct = encryptor.process (pt) rest, _, _ = encryptor.done (header_dummy) # NB: If this check *ever* fails, then something changed in the # encoding layer. AES-GCM is a stream cipher so each encoding # step will yield the exact number of ciphertext bytes that # was provided as plaintext. Thus there cannot be any encoded # data left when calling the finalizers. None of the crypo code # depends on that assumption but nevertheless we check it here # in case anything changes upstream in the Cryptography # library. In case there actually is a rest, replace the # assertion below with ``ct += rest``. assert (len (rest) == 0) if len (pt) > new_max: # If the plaintext was longer than the artificially lowered # maximum, then the number of ciphertext bytes must be clamped # to the maximum. assert n == new_max else: assert n == len (pt) == len (ct) for i in range (16): encobj (1 << i) def test_crypto_aes_gcm_dec_length_cap (self): """ The decryptor must reject headers with an object size that exceeds the PDTCRYPT maximum. Longer files split into multiple objects. """ password = str (os.urandom (42)) meta = faux_hdr() meta ["ctsize"] = crypto.PDTCRYPT_MAX_OBJ_SIZE + 1 ok, header = crypto.hdr_make (meta) assert ok # Set up decryption with bogus header. decryptor = crypto.Decrypt (password=password, fixedparts=[]) with self.assertRaises (crypto.InvalidHeader): decryptor.next (header) def test_crypto_aes_gcm_dec_length_mismatch (self): """ Catch attempts at decrypting more data than what was stated in the header. """ cnksiz = 1 << 10 orig_pt = fill_mod (1 << 14) password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) off = 0 ct = b"" while off < len (orig_pt): upto = min (off + cnksiz, len (orig_pt)) _n, cnk = encryptor.process (orig_pt [off:upto]) ct += cnk off += cnksiz cnk, header, fixed = encryptor.done (header_dummy) ct += cnk decryptor = crypto.Decrypt (password=password, fixedparts=fixed) decryptor.next (header) off = 0 pt = b"" while off < len (orig_pt): upto = min (off + cnksiz, len (orig_pt)) cnk = decryptor.process (ct [off:upto]) pt += cnk off += cnksiz with self.assertRaises (crypto.CiphertextTooLong): # Try and decrypt one byte more than was encrypted. # This must be caught in crypto.py. _ = decryptor.process (ct [0:1]) def test_crypto_aes_gcm_dec_multicnk (self): cnksiz = 1 << 10 orig_pt = fill_mod (1 << 14) password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) off = 0 ct = b"" while off < len (orig_pt): upto = min (off + cnksiz, len (orig_pt)) _n, cnk = encryptor.process (orig_pt [off:upto]) ct += cnk off += cnksiz cnk, header, fixed = encryptor.done (header_dummy) ct += cnk decryptor = crypto.Decrypt (password=password, fixedparts=fixed) decryptor.next (header) off = 0 pt = b"" while off < len (orig_pt): upto = min (off + cnksiz, len (orig_pt)) cnk = decryptor.process (ct [off:upto]) pt += cnk off += cnksiz pt += decryptor.done () assert pt == orig_pt def test_crypto_aes_gcm_dec_multicnk_bad_tag (self): cnksiz = 1 << 10 orig_pt = fill_mod (1 << 14) password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL) header_dummy = encryptor.next (TEST_DUMMY_FILENAME) off = 0 ct = b"" while off < len (orig_pt): upto = min (off + cnksiz, len (orig_pt)) _n, cnk = encryptor.process (orig_pt [off:upto]) ct += cnk off += cnksiz cnk, header, fixed = encryptor.done (header_dummy) ct += cnk mut_header = bytearray (header) mut_header_vw = memoryview (mut_header) # replace one byte in the tag part of the header second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2] mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 header = bytes (mut_header) decryptor = crypto.Decrypt (password=password, fixedparts=fixed) decryptor.next (header) off = 0 pt = b"" while off < len (orig_pt): upto = min (off + cnksiz, len (orig_pt)) cnk = decryptor.process (ct [off:upto]) pt += cnk off += cnksiz with self.assertRaises (crypto.InvalidGCMTag): _ = decryptor.done () def test_crypto_aes_gcm_dec_iv_gap (self): """ Encrypt multiple objects using non-consecutive IVs and verify that the decryptor errors out with an exception in strict mode but keeps quiet otherwise. """ cnksiz = 1 << 10 orig_pt_1 = fill_mod (1 << 10) orig_pt_2 = fill_mod (1 << 10, 23) orig_pt_3 = fill_mod (1 << 10, 42) password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL) def enc (pt): header_dummy = encryptor.next (TEST_DUMMY_FILENAME) off = 0 ct = b"" while off < len (pt): upto = min (off + cnksiz, len (pt)) _n, cnk = encryptor.process (pt [off:upto]) ct += cnk off += cnksiz cnk, header, fixed = encryptor.done (header_dummy) return ct + cnk, header, fixed ct_1, hdr_1, _____ = enc (orig_pt_1) ## Here we bump the iv of the encryptor, breaking the series. encryptor.set_object_counter (encryptor.cnt + 1) ct_2, hdr_2, fixed = enc (orig_pt_2) ## IV of final object is again in-sequence. ct_3, hdr_3, fixed = enc (orig_pt_3) def decrypt (strict_ivs): decryptor = crypto.Decrypt (password=password, fixedparts=fixed, strict_ivs=strict_ivs) def dec (hdr, ct): decryptor.next (hdr) off = 0 pt = b"" while off < len (ct): upto = min (off + cnksiz, len (ct)) cnk = decryptor.process (ct [off:upto]) pt += cnk off += cnksiz return pt + decryptor.done () decr_pt_1 = dec (hdr_1, ct_1) decr_pt_2 = dec (hdr_2, ct_2) ## ← good header, non-consecutive IV decr_pt_3 = dec (hdr_3, ct_3) assert decr_pt_1 == orig_pt_1 assert decr_pt_2 == orig_pt_2 assert decr_pt_3 == orig_pt_3 with self.assertRaises (crypto.NonConsecutiveIV): decrypt (True) decrypt (False) # Sequence passes def test_crypto_aes_gcm_dec_iv_reuse (self): """ Meddle with encrypted content: extract the IV from one object and inject it into the header of another. This must be rejected by the decryptor with paranoid IV checking enabled. """ cnksiz = 1 << 10 orig_pt_1 = fill_mod (1 << 10) orig_pt_2 = fill_mod (1 << 10, 42) password = str (os.urandom (42)) encryptor = crypto.Encrypt (TEST_VERSION, TEST_PARAMVERSION, password=password, nacl=TEST_STATIC_NACL, strict_ivs=True) def enc (pt): header_dummy = encryptor.next (TEST_DUMMY_FILENAME) off = 0 ct = b"" while off < len (pt): upto = min (off + cnksiz, len (pt)) _n, cnk = encryptor.process (pt [off:upto]) ct += cnk off += cnksiz cnk, header, fixed = encryptor.done (header_dummy) return ct + cnk, header, fixed ct_1, hdr_1, _____ = enc (orig_pt_1) encryptor.cnt -= 1 # induce error by forcing an identical IV on next step with self.assertRaises (crypto.DuplicateIV): # reuse detected ct_2, hdr_2, fixed = enc (orig_pt_2) class HeaderTest (CryptoLayerTest): def test_crypto_fmt_hdr_make (self): meta = faux_hdr() ok, hdr = crypto.hdr_make (meta) assert ok assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE def test_crypto_fmt_hdr_make_useless (self): ok, ret = crypto.hdr_make ({ 42: "x" }) assert ok is False assert ret.startswith ("error assembling header:") def test_crypto_fmt_hdr_read (self): meta = faux_hdr() ok, hdr = crypto.hdr_make (meta) assert ok is True assert hdr is not None mmeta = crypto.hdr_read (hdr) assert mmeta is not None for k in meta: if meta [k] != mmeta [k]: raise "header mismatch after reading: expected %r, got %r" \ % (meta [k], mmeta [k]) def test_crypto_fmt_hdr_read_trailing_garbage (self): meta = faux_hdr() ok, hdr = crypto.hdr_make (meta) ok, hdr = crypto.hdr_make (meta) assert ok is True assert hdr is not None hdr += b"-junk" with self.assertRaises (crypto.InvalidHeader): _ = crypto.hdr_read (hdr) def test_crypto_fmt_hdr_read_leading_garbage (self): meta = faux_hdr() ok, hdr = crypto.hdr_make (meta) ok, hdr = crypto.hdr_make (meta) assert ok is True assert hdr is not None hdr = b"junk-" + hdr with self.assertRaises (crypto.InvalidHeader): _ = crypto.hdr_read (hdr) def test_crypto_fmt_hdr_inner_garbage (self): meta = faux_hdr() ok, hdr = crypto.hdr_make (meta) assert ok data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:] with self.assertRaises (crypto.InvalidHeader): _ = crypto.hdr_read (data)