7 import deltatar.crypto as crypto
12 return s.encode("UTF-8")
17 TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail")
18 TEST_PASSPHRASE = b"test1234"
19 TEST_AES_GCM_AAD = b"authenticated plain text"
20 TEST_DUMMY_FILENAME = "insurance-file.txt"
23 TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE)
25 def faux_hdr (ctsize=1337, iv=None):
28 , "paramversion" : 2187
29 , "nacl" : binascii.unhexlify(b"0011223344556677"
31 , "iv" : iv or binascii.unhexlify(b"0011223344556677"
34 , "tag" : binascii.unhexlify(b"deadbeefbadb100d"
40 def fill_mod (n, off=0):
43 m = FILL_MOD_MEMO.get (k, None)
47 bufv = memoryview (buf)
51 struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
61 class CryptoLayerTest (unittest.TestCase):
65 class AESGCMTest (CryptoLayerTest):
67 os_urandom = os.urandom
70 """Reset globals altered for testing."""
71 _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
72 ("I am fully aware that this will void my warranty.")
73 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
74 ("I am fully aware that this will void my warranty.")
75 os.urandom = self.os_urandom
77 def test_crypto_aes_gcm_enc_ctor (self):
78 password = str (os.urandom (42))
79 encryptor = crypto.Encrypt (TEST_VERSION,
82 nacl=TEST_STATIC_NACL)
85 def test_crypto_aes_gcm_enc_ctor_key (self):
87 encryptor = crypto.Encrypt (TEST_VERSION,
90 nacl=TEST_STATIC_NACL)
93 def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
95 Either key (+nacl) or password must be supplied, not both.
97 with self.assertRaises (crypto.InvalidParameter): # neither key nor pw
98 encryptor = crypto.Encrypt (TEST_VERSION,
100 nacl=TEST_STATIC_NACL)
102 password = str (os.urandom (42))
103 key = os.urandom (16) # scrypt sized
104 with self.assertRaises (crypto.InvalidParameter): # both key and pw
105 encryptor = crypto.Encrypt (TEST_VERSION,
109 nacl=TEST_STATIC_NACL)
111 with self.assertRaises (crypto.InvalidParameter): # key, but salt missing
112 encryptor = crypto.Encrypt (TEST_VERSION,
117 with self.assertRaises (crypto.InvalidParameter): # empty pw
118 encryptor = crypto.Encrypt (TEST_VERSION,
121 nacl=TEST_STATIC_NACL)
124 def test_crypto_aes_gcm_enc_header_size (self):
125 password = str (os.urandom (42))
126 encryptor = crypto.Encrypt (TEST_VERSION,
129 nacl=TEST_STATIC_NACL)
131 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
132 assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
133 _, _ = encryptor.process (TEST_PLAINTEXT)
134 _, header, _ = encryptor.done (header_dummy)
135 assert len (header) == crypto.PDTCRYPT_HDR_SIZE
138 def test_crypto_aes_gcm_enc_chunk_size (self):
139 password = str (os.urandom (42))
140 encryptor = crypto.Encrypt (TEST_VERSION,
143 nacl=TEST_STATIC_NACL)
145 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
146 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
147 assert len (ciphertext) == len (TEST_PLAINTEXT)
148 rest, header, fixed = encryptor.done (header_dummy)
149 assert len (rest) == 0
152 def test_crypto_aes_gcm_dec_ctor (self):
154 Ensure that only either key or password is accepted.
156 password = str (os.urandom (42))
157 key = os.urandom (16) # scrypt sized
159 decryptor = crypto.Decrypt (password=password)
160 decryptor = crypto.Decrypt (key=key)
162 with self.assertRaises (crypto.InvalidParameter): # both password and key
163 decryptor = crypto.Decrypt (password=password, key=key)
165 with self.assertRaises (crypto.InvalidParameter): # neither password nor key
166 decryptor = crypto.Decrypt (password=None, key=None)
168 with self.assertRaises (crypto.InvalidParameter): # # empty password
169 decryptor = crypto.Decrypt (password="")
172 def test_crypto_aes_gcm_dec_simple (self):
173 password = str (os.urandom (42))
174 encryptor = crypto.Encrypt (TEST_VERSION,
177 nacl=TEST_STATIC_NACL)
179 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
180 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
181 rest, header, fixed = encryptor.done (header_dummy)
184 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
185 decryptor.next (header)
186 plaintext = decryptor.process (ciphertext)
187 rest = decryptor.done ()
190 assert plaintext == TEST_PLAINTEXT
193 def test_crypto_aes_gcm_dec_bad_tag (self):
194 password = str (os.urandom (42))
195 encryptor = crypto.Encrypt (TEST_VERSION,
198 nacl=TEST_STATIC_NACL)
200 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
201 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
202 ciphertext2, header, fixed = encryptor.done (header_dummy)
204 mut_header = bytearray (header)
205 mut_header_vw = memoryview (mut_header)
206 # replace one byte in the tag part of the header
207 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
208 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
209 header = bytes (mut_header)
211 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
212 decryptor.next (header)
213 plaintext = decryptor.process (ciphertext)
214 with self.assertRaises (crypto.InvalidGCMTag):
215 _ = decryptor.done ()
218 def test_crypto_aes_gcm_enc_multicnk (self):
220 pt = fill_mod (1 << 14)
221 password = str (os.urandom (42))
222 encryptor = crypto.Encrypt (TEST_VERSION,
225 nacl=TEST_STATIC_NACL)
226 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
230 while off < len (pt):
231 upto = min (off + cnksiz, len (pt))
232 _, cnk = encryptor.process (pt [off:upto])
235 cnk, header, fixed = encryptor.done (header_dummy)
238 assert len (pt) == len (ct)
241 def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
243 pt = fill_mod (1 << 14)
244 password = str (os.urandom (42))
245 encryptor = crypto.Encrypt (TEST_VERSION,
248 nacl=TEST_STATIC_NACL,
250 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
254 while off < len (pt):
255 upto = min (off + cnksiz, len (pt))
256 _, cnk = encryptor.process (pt [off:upto])
259 cnk, header, fixed = encryptor.done (header_dummy)
262 assert len (pt) == len (ct)
265 def test_crypto_aes_gcm_enc_multiobj (self):
267 password = str (os.urandom (42))
268 encryptor = crypto.Encrypt (TEST_VERSION,
271 nacl=TEST_STATIC_NACL,
275 pt = fill_mod (1 << 14, off=i)
276 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
280 while off < len (pt):
281 upto = min (off + cnksiz, len (pt))
282 _, cnk = encryptor.process (pt [off:upto])
285 cnk, header, fixed = encryptor.done (header_dummy)
288 assert len (pt) == len (ct)
290 for i in range (5): addobj (i)
292 assert len (encryptor.fixed) == 1
295 def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
297 password = str (os.urandom (42))
298 encryptor = crypto.Encrypt (TEST_VERSION,
301 nacl=TEST_STATIC_NACL,
303 curfixed = None # must remain constant after first
306 pt = fill_mod (1 << 14, off=i)
307 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
311 while off < len (pt):
312 upto = min (off + cnksiz, len (pt))
313 _, cnk = encryptor.process (pt [off:upto])
316 cnk, header, fixed = encryptor.done (header_dummy)
321 assert fixed == curfixed
324 assert len (pt) == len (ct)
326 for i in range (5): addobj (i)
328 assert len (encryptor.fixed) == 1
331 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
333 Test behavior when the file counter tops out.
335 Artificially lower the maximum possible file counter. Considering
336 invalid (0) and reserved (1, 2) values, the smallest possible file counter
337 for normal objects is 3. Starting from that, the header of the (max -
338 3)rd object must have both a different IV fixed part and a counter.
342 crypto._testing_set_AES_GCM_IV_CNT_MAX \
343 ("I am fully aware that this will void my warranty.", new_max)
345 password = str (os.urandom (42))
346 encryptor = crypto.Encrypt (TEST_VERSION,
349 nacl=TEST_STATIC_NACL,
355 def addobj (i, wrap=False):
358 pt = fill_mod (1 << 14, off=i)
359 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
363 while off < len (pt):
364 upto = min (off + cnksiz, len (pt))
365 _, cnk = encryptor.process (pt [off:upto])
368 cnk, header, fixed = encryptor.done (header_dummy)
369 this_iv = crypto.hdr_read (header) ["iv"]
370 if last_iv is not None:
371 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
372 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
374 assert last_fixed == this_fixed
375 assert last_cnt == this_cnt - 1
377 assert last_fixed != this_fixed
378 assert this_cnt == minimum
382 assert len (pt) == len (ct)
384 for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
385 addobj (i + 1, True) # counter wraps to 3
387 for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
388 addobj (j + 1, True) # counter wraps to 3 again
390 assert len (encryptor.fixed) == 3
393 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self):
395 Test behavior when the file counter tops out and the transition to
396 the next IV fixed part fails on account of a bad random generator.
398 Replaces the ``urandom`` reference in ``os`` with a deterministic
399 function. The encryptor context must communicate this condition with an
400 ``IVFixedPartError``.
404 crypto._testing_set_AES_GCM_IV_CNT_MAX \
405 ("I am fully aware that this will void my warranty.", new_max)
407 os.urandom = lambda n: bytes (bytearray ([n % 256] * n))
408 password = str (os.urandom (42))
409 encryptor = crypto.Encrypt (TEST_VERSION,
412 nacl=TEST_STATIC_NACL,
416 pt = fill_mod (1 << 14, off=i)
417 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
420 while off < len (pt):
421 upto = min (off + cnksiz, len (pt))
422 _, cnk = encryptor.process (pt [off:upto])
425 for i in range (minimum, new_max): addobj (42 + i)
427 with self.assertRaises (crypto.IVFixedPartError):
432 def test_crypto_aes_gcm_enc_length_cap (self):
434 Artificially lower the maximum allowable data length and attempt to
435 encrypt a larger object. Verify that the crypto handler only encrypts
436 data up to the size limit. A downstream user detects that condition by
437 testing whether the encryption step yielded less bytes than the
440 The sibling to this test is test_restore_backup_max_file_length()
441 in test_delatar.py. Deltatar will transparently create a splitted object
442 with an increased IV file counter.
445 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
446 ("I am fully aware that this will void my warranty.", new_max)
448 password = str (os.urandom (42))
449 encryptor = crypto.Encrypt (TEST_VERSION,
452 nacl=TEST_STATIC_NACL)
455 pt, ct = fill_mod (s), None
456 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
458 n, ct = encryptor.process (pt)
459 rest, _, _ = encryptor.done (header_dummy)
461 # NB: If this check *ever* fails, then something changed in the
462 # encoding layer. AES-GCM is a stream cipher so each encoding
463 # step will yield the exact number of ciphertext bytes that
464 # was provided as plaintext. Thus there cannot be any encoded
465 # data left when calling the finalizers. None of the crypo code
466 # depends on that assumption but nevertheless we check it here
467 # in case anything changes upstream in the Cryptography
468 # library. In case there actually is a rest, replace the
469 # assertion below with ``ct += rest``.
470 assert (len (rest) == 0)
472 if len (pt) > new_max:
473 # If the plaintext was longer than the artificially lowered
474 # maximum, then the number of ciphertext bytes must be clamped
478 assert n == len (pt) == len (ct)
480 for i in range (16): encobj (1 << i)
483 def test_crypto_aes_gcm_dec_multicnk (self):
485 orig_pt = fill_mod (1 << 14)
486 password = str (os.urandom (42))
487 encryptor = crypto.Encrypt (TEST_VERSION,
490 nacl=TEST_STATIC_NACL)
491 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
495 while off < len (orig_pt):
496 upto = min (off + cnksiz, len (orig_pt))
497 _n, cnk = encryptor.process (orig_pt [off:upto])
500 cnk, header, fixed = encryptor.done (header_dummy)
503 decryptor = crypto.Decrypt (password=password,
505 decryptor.next (header)
508 while off < len (orig_pt):
509 upto = min (off + cnksiz, len (orig_pt))
510 cnk = decryptor.process (ct [off:upto])
515 pt += decryptor.done ()
519 def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
521 orig_pt = fill_mod (1 << 14)
522 password = str (os.urandom (42))
523 encryptor = crypto.Encrypt (TEST_VERSION,
526 nacl=TEST_STATIC_NACL)
527 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
531 while off < len (orig_pt):
532 upto = min (off + cnksiz, len (orig_pt))
533 _n, cnk = encryptor.process (orig_pt [off:upto])
536 cnk, header, fixed = encryptor.done (header_dummy)
539 mut_header = bytearray (header)
540 mut_header_vw = memoryview (mut_header)
541 # replace one byte in the tag part of the header
542 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
543 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
544 header = bytes (mut_header)
546 decryptor = crypto.Decrypt (password=password,
548 decryptor.next (header)
551 while off < len (orig_pt):
552 upto = min (off + cnksiz, len (orig_pt))
553 cnk = decryptor.process (ct [off:upto])
557 with self.assertRaises (crypto.InvalidGCMTag):
558 _ = decryptor.done ()
561 def test_crypto_aes_gcm_dec_iv_reuse (self):
563 Meddle with encrypted content: extract the IV from one object
564 and inject it into the header of another. This must be rejected
568 orig_pt_1 = fill_mod (1 << 10)
569 orig_pt_2 = fill_mod (1 << 10, 42)
570 password = str (os.urandom (42))
571 encryptor = crypto.Encrypt (TEST_VERSION,
574 nacl=TEST_STATIC_NACL)
577 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
581 while off < len (pt):
582 upto = min (off + cnksiz, len (pt))
583 _n, cnk = encryptor.process (pt [off:upto])
586 cnk, header, fixed = encryptor.done (header_dummy)
587 return ct + cnk, header, fixed
589 ct_1, hdr_1, _____ = enc (orig_pt_1)
590 ct_2, hdr_2, fixed = enc (orig_pt_2)
592 mut_hdr_2 = bytearray (hdr_2)
593 mut_hdr_2_vw = memoryview (mut_hdr_2)
595 iv_lo = crypto.HDR_OFF_IV
596 iv_hi = crypto.HDR_OFF_IV + crypto.PDTCRYPT_HDR_SIZE_IV
597 iv_1 = hdr_1 [iv_lo : iv_hi]
598 # transplant into other header
599 mut_hdr_2_vw [iv_lo : iv_hi] = iv_1
600 hdr_2_mod = bytes (mut_hdr_2)
601 decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
608 while off < len (ct):
609 upto = min (off + cnksiz, len (ct))
610 cnk = decryptor.process (ct [off:upto])
613 return pt + decryptor.done ()
615 decr_pt_1 = dec (hdr_1, ct_1)
616 decr_pt_2 = dec (hdr_2, ct_2) # good header, different IV
617 with self.assertRaises (crypto.DuplicateIV): # bad header, reuse detected
618 decr_pt_2 = dec (hdr_2_mod, ct_2)
621 class HeaderTest (CryptoLayerTest):
623 def test_crypto_fmt_hdr_make (self):
625 ok, hdr = crypto.hdr_make (meta)
627 assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
630 def test_crypto_fmt_hdr_make_useless (self):
631 ok, ret = crypto.hdr_make ({ 42: "x" })
633 assert ret.startswith ("error assembling header:")
636 def test_crypto_fmt_hdr_read (self):
638 ok, hdr = crypto.hdr_make (meta)
640 assert hdr is not None
641 mmeta = crypto.hdr_read (hdr)
642 assert mmeta is not None
644 if meta [k] != mmeta [k]:
645 raise "header mismatch after reading: expected %r, got %r" \
646 % (meta [k], mmeta [k])
649 def test_crypto_fmt_hdr_read_trailing_garbage (self):
651 ok, hdr = crypto.hdr_make (meta)
652 ok, hdr = crypto.hdr_make (meta)
654 assert hdr is not None
656 with self.assertRaises (crypto.InvalidHeader):
657 _ = crypto.hdr_read (hdr)
660 def test_crypto_fmt_hdr_read_leading_garbage (self):
662 ok, hdr = crypto.hdr_make (meta)
663 ok, hdr = crypto.hdr_make (meta)
665 assert hdr is not None
667 with self.assertRaises (crypto.InvalidHeader):
668 _ = crypto.hdr_read (hdr)
671 def test_crypto_fmt_hdr_inner_garbage (self):
673 ok, hdr = crypto.hdr_make (meta)
675 data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
676 with self.assertRaises (crypto.InvalidHeader):
677 _ = crypto.hdr_read (data)