7 import deltatar.crypto as crypto
12 return s.encode("UTF-8")
17 TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail")
18 TEST_PASSPHRASE = b"test1234"
19 TEST_AES_GCM_AAD = b"authenticated plain text"
20 TEST_DUMMY_FILENAME = "insurance-file.txt"
23 TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE)
25 def faux_hdr (ctsize=1337, iv=None):
28 , "paramversion" : 2187
29 , "nacl" : binascii.unhexlify(b"0011223344556677"
31 , "iv" : iv or binascii.unhexlify(b"0011223344556677"
34 , "tag" : binascii.unhexlify(b"deadbeefbadb100d"
40 def fill_mod (n, off=0):
43 m = FILL_MOD_MEMO.get (k, None)
47 bufv = memoryview (buf)
51 struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
61 class CryptoLayerTest (unittest.TestCase):
65 class AESGCMTest (CryptoLayerTest):
67 os_urandom = os.urandom
70 """Reset globals altered for testing."""
71 _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
72 ("I am fully aware that this will void my warranty.")
73 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
74 ("I am fully aware that this will void my warranty.")
75 os.urandom = self.os_urandom
77 def test_crypto_aes_gcm_enc_ctor (self):
78 password = str (os.urandom (42))
79 encryptor = crypto.Encrypt (TEST_VERSION,
82 nacl=TEST_STATIC_NACL)
85 def test_crypto_aes_gcm_enc_ctor_key (self):
87 encryptor = crypto.Encrypt (TEST_VERSION,
90 nacl=TEST_STATIC_NACL)
93 def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
95 Either key (+nacl) or password must be supplied, not both.
97 with self.assertRaises (crypto.InvalidParameter): # neither key nor pw
98 encryptor = crypto.Encrypt (TEST_VERSION,
100 nacl=TEST_STATIC_NACL)
102 password = str (os.urandom (42))
103 key = os.urandom (16) # scrypt sized
104 with self.assertRaises (crypto.InvalidParameter): # both key and pw
105 encryptor = crypto.Encrypt (TEST_VERSION,
109 nacl=TEST_STATIC_NACL)
111 with self.assertRaises (crypto.InvalidParameter): # key, but salt missing
112 encryptor = crypto.Encrypt (TEST_VERSION,
117 with self.assertRaises (crypto.InvalidParameter): # empty pw
118 encryptor = crypto.Encrypt (TEST_VERSION,
121 nacl=TEST_STATIC_NACL)
124 def test_crypto_aes_gcm_enc_header_size (self):
125 password = str (os.urandom (42))
126 encryptor = crypto.Encrypt (TEST_VERSION,
129 nacl=TEST_STATIC_NACL)
131 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
132 assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
133 _, _ = encryptor.process (TEST_PLAINTEXT)
134 _, header, _ = encryptor.done (header_dummy)
135 assert len (header) == crypto.PDTCRYPT_HDR_SIZE
138 def test_crypto_aes_gcm_enc_chunk_size (self):
139 password = str (os.urandom (42))
140 encryptor = crypto.Encrypt (TEST_VERSION,
143 nacl=TEST_STATIC_NACL)
145 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
146 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
147 assert len (ciphertext) == len (TEST_PLAINTEXT)
148 rest, header, fixed = encryptor.done (header_dummy)
149 assert len (rest) == 0
152 def test_crypto_aes_gcm_dec_ctor (self):
154 Ensure that only either key or password is accepted.
156 password = str (os.urandom (42))
157 key = os.urandom (16) # scrypt sized
159 decryptor = crypto.Decrypt (password=password)
160 decryptor = crypto.Decrypt (key=key)
162 with self.assertRaises (crypto.InvalidParameter): # both password and key
163 decryptor = crypto.Decrypt (password=password, key=key)
165 with self.assertRaises (crypto.InvalidParameter): # neither password nor key
166 decryptor = crypto.Decrypt (password=None, key=None)
168 with self.assertRaises (crypto.InvalidParameter): # # empty password
169 decryptor = crypto.Decrypt (password="")
172 def test_crypto_aes_gcm_dec_simple (self):
173 password = str (os.urandom (42))
174 encryptor = crypto.Encrypt (TEST_VERSION,
177 nacl=TEST_STATIC_NACL)
179 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
180 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
181 rest, header, fixed = encryptor.done (header_dummy)
184 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
185 decryptor.next (header)
186 plaintext = decryptor.process (ciphertext)
187 rest = decryptor.done ()
190 assert plaintext == TEST_PLAINTEXT
193 def test_crypto_aes_gcm_dec_bad_tag (self):
194 password = str (os.urandom (42))
195 encryptor = crypto.Encrypt (TEST_VERSION,
198 nacl=TEST_STATIC_NACL)
200 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
201 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
202 ciphertext2, header, fixed = encryptor.done (header_dummy)
204 mut_header = bytearray (header)
205 mut_header_vw = memoryview (mut_header)
206 # replace one byte in the tag part of the header
207 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
208 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
209 header = bytes (mut_header)
211 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
212 decryptor.next (header)
213 plaintext = decryptor.process (ciphertext)
214 with self.assertRaises (crypto.InvalidGCMTag):
215 _ = decryptor.done ()
218 def test_crypto_aes_gcm_enc_multicnk (self):
220 pt = fill_mod (1 << 14)
221 password = str (os.urandom (42))
222 encryptor = crypto.Encrypt (TEST_VERSION,
225 nacl=TEST_STATIC_NACL)
226 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
230 while off < len (pt):
231 upto = min (off + cnksiz, len (pt))
232 _, cnk = encryptor.process (pt [off:upto])
235 cnk, header, fixed = encryptor.done (header_dummy)
238 assert len (pt) == len (ct)
241 def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
243 pt = fill_mod (1 << 14)
244 password = str (os.urandom (42))
245 encryptor = crypto.Encrypt (TEST_VERSION,
248 nacl=TEST_STATIC_NACL,
250 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
254 while off < len (pt):
255 upto = min (off + cnksiz, len (pt))
256 _, cnk = encryptor.process (pt [off:upto])
259 cnk, header, fixed = encryptor.done (header_dummy)
262 assert len (pt) == len (ct)
265 def test_crypto_aes_gcm_enc_multiobj (self):
267 password = str (os.urandom (42))
268 encryptor = crypto.Encrypt (TEST_VERSION,
271 nacl=TEST_STATIC_NACL,
275 pt = fill_mod (1 << 14, off=i)
276 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
280 while off < len (pt):
281 upto = min (off + cnksiz, len (pt))
282 _, cnk = encryptor.process (pt [off:upto])
285 cnk, header, fixed = encryptor.done (header_dummy)
288 assert len (pt) == len (ct)
290 for i in range (5): addobj (i)
292 assert len (encryptor.fixed) == 1
295 def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
297 password = str (os.urandom (42))
298 encryptor = crypto.Encrypt (TEST_VERSION,
301 nacl=TEST_STATIC_NACL,
303 curfixed = None # must remain constant after first
306 pt = fill_mod (1 << 14, off=i)
307 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
311 while off < len (pt):
312 upto = min (off + cnksiz, len (pt))
313 _, cnk = encryptor.process (pt [off:upto])
316 cnk, header, fixed = encryptor.done (header_dummy)
321 assert fixed == curfixed
324 assert len (pt) == len (ct)
326 for i in range (5): addobj (i)
328 assert len (encryptor.fixed) == 1
331 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
333 Test behavior when the file counter tops out.
335 Artificially lower the maximum possible file counter. Considering
336 invalid (0) and reserved (1, 2) values, the smallest possible file counter
337 for normal objects is 3. Starting from that, the header of the (max -
338 3)rd object must have both a different IV fixed part and a counter.
342 crypto._testing_set_AES_GCM_IV_CNT_MAX \
343 ("I am fully aware that this will void my warranty.", new_max)
345 password = str (os.urandom (42))
346 encryptor = crypto.Encrypt (TEST_VERSION,
349 nacl=TEST_STATIC_NACL,
355 def addobj (i, wrap=False):
358 pt = fill_mod (1 << 14, off=i)
359 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
363 while off < len (pt):
364 upto = min (off + cnksiz, len (pt))
365 _, cnk = encryptor.process (pt [off:upto])
368 cnk, header, fixed = encryptor.done (header_dummy)
369 this_iv = crypto.hdr_read (header) ["iv"]
370 if last_iv is not None:
371 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
372 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
374 assert last_fixed == this_fixed
375 assert last_cnt == this_cnt - 1
377 assert last_fixed != this_fixed
378 assert this_cnt == minimum
382 assert len (pt) == len (ct)
384 for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
385 addobj (i + 1, True) # counter wraps to 3
387 for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
388 addobj (j + 1, True) # counter wraps to 3 again
390 assert len (encryptor.fixed) == 3
393 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self):
395 Test behavior when the file counter tops out and the transition to
396 the next IV fixed part fails on account of a bad random generator.
398 Replaces the ``urandom`` reference in ``os`` with a deterministic
399 function. The encryptor context must communicate this condition with an
400 ``IVFixedPartError``.
404 crypto._testing_set_AES_GCM_IV_CNT_MAX \
405 ("I am fully aware that this will void my warranty.", new_max)
407 os.urandom = lambda n: bytes (bytearray ([n % 256] * n))
408 password = str (os.urandom (42))
409 encryptor = crypto.Encrypt (TEST_VERSION,
412 nacl=TEST_STATIC_NACL,
416 pt = fill_mod (1 << 14, off=i)
417 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
420 while off < len (pt):
421 upto = min (off + cnksiz, len (pt))
422 _, cnk = encryptor.process (pt [off:upto])
425 for i in range (minimum, new_max): addobj (42 + i)
427 with self.assertRaises (crypto.IVFixedPartError):
432 def test_crypto_aes_gcm_enc_length_cap (self):
434 Artificially lower the maximum allowable data length and attempt to
435 encrypt a larger object. Verify that the crypto handler only encrypts
436 data up to the size limit. A downstream user detects that condition by
437 testing whether the encryption step yielded less bytes than the
440 The sibling to this test is test_restore_backup_max_file_length()
441 in test_delatar.py. Deltatar will transparently create a splitted object
442 with an increased IV file counter.
445 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
446 ("I am fully aware that this will void my warranty.", new_max)
448 password = str (os.urandom (42))
449 encryptor = crypto.Encrypt (TEST_VERSION,
452 nacl=TEST_STATIC_NACL)
455 pt, ct = fill_mod (s), None
456 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
458 n, ct = encryptor.process (pt)
459 rest, _, _ = encryptor.done (header_dummy)
461 # NB: If this check *ever* fails, then something changed in the
462 # encoding layer. AES-GCM is a stream cipher so each encoding
463 # step will yield the exact number of ciphertext bytes that
464 # was provided as plaintext. Thus there cannot be any encoded
465 # data left when calling the finalizers. None of the crypo code
466 # depends on that assumption but nevertheless we check it here
467 # in case anything changes upstream in the Cryptography
468 # library. In case there actually is a rest, replace the
469 # assertion below with ``ct += rest``.
470 assert (len (rest) == 0)
472 if len (pt) > new_max:
473 # If the plaintext was longer than the artificially lowered
474 # maximum, then the number of ciphertext bytes must be clamped
478 assert n == len (pt) == len (ct)
480 for i in range (16): encobj (1 << i)
483 def test_crypto_aes_gcm_dec_length_cap (self):
485 The decryptor must reject headers with an object size that exceeds
486 the PDTCRYPT maximum. Longer files split into multiple objects.
488 password = str (os.urandom (42))
490 meta ["ctsize"] = crypto.PDTCRYPT_MAX_OBJ_SIZE + 1
491 ok, header = crypto.hdr_make (meta)
495 # Set up decryption with bogus header.
496 decryptor = crypto.Decrypt (password=password, fixedparts=[])
498 with self.assertRaises (crypto.InvalidHeader):
499 decryptor.next (header)
502 def test_crypto_aes_gcm_dec_length_mismatch (self):
504 Catch attempts at decrypting more data than what was stated in the
508 orig_pt = fill_mod (1 << 14)
509 password = str (os.urandom (42))
510 encryptor = crypto.Encrypt (TEST_VERSION,
513 nacl=TEST_STATIC_NACL)
514 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
518 while off < len (orig_pt):
519 upto = min (off + cnksiz, len (orig_pt))
520 _n, cnk = encryptor.process (orig_pt [off:upto])
523 cnk, header, fixed = encryptor.done (header_dummy)
526 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
528 decryptor.next (header)
531 while off < len (orig_pt):
532 upto = min (off + cnksiz, len (orig_pt))
533 cnk = decryptor.process (ct [off:upto])
537 with self.assertRaises (crypto.CiphertextTooLong):
538 # Try and decrypt one byte more than was encrypted.
539 # This must be caught in crypto.py.
540 _ = decryptor.process (ct [0:1])
543 def test_crypto_aes_gcm_dec_multicnk (self):
545 orig_pt = fill_mod (1 << 14)
546 password = str (os.urandom (42))
547 encryptor = crypto.Encrypt (TEST_VERSION,
550 nacl=TEST_STATIC_NACL)
551 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
555 while off < len (orig_pt):
556 upto = min (off + cnksiz, len (orig_pt))
557 _n, cnk = encryptor.process (orig_pt [off:upto])
560 cnk, header, fixed = encryptor.done (header_dummy)
563 decryptor = crypto.Decrypt (password=password,
565 decryptor.next (header)
568 while off < len (orig_pt):
569 upto = min (off + cnksiz, len (orig_pt))
570 cnk = decryptor.process (ct [off:upto])
575 pt += decryptor.done ()
579 def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
581 orig_pt = fill_mod (1 << 14)
582 password = str (os.urandom (42))
583 encryptor = crypto.Encrypt (TEST_VERSION,
586 nacl=TEST_STATIC_NACL)
587 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
591 while off < len (orig_pt):
592 upto = min (off + cnksiz, len (orig_pt))
593 _n, cnk = encryptor.process (orig_pt [off:upto])
596 cnk, header, fixed = encryptor.done (header_dummy)
599 mut_header = bytearray (header)
600 mut_header_vw = memoryview (mut_header)
601 # replace one byte in the tag part of the header
602 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
603 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
604 header = bytes (mut_header)
606 decryptor = crypto.Decrypt (password=password,
608 decryptor.next (header)
611 while off < len (orig_pt):
612 upto = min (off + cnksiz, len (orig_pt))
613 cnk = decryptor.process (ct [off:upto])
617 with self.assertRaises (crypto.InvalidGCMTag):
618 _ = decryptor.done ()
621 def test_crypto_aes_gcm_dec_iv_reuse (self):
623 Meddle with encrypted content: extract the IV from one object
624 and inject it into the header of another. This must be rejected
628 orig_pt_1 = fill_mod (1 << 10)
629 orig_pt_2 = fill_mod (1 << 10, 42)
630 password = str (os.urandom (42))
631 encryptor = crypto.Encrypt (TEST_VERSION,
634 nacl=TEST_STATIC_NACL)
637 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
641 while off < len (pt):
642 upto = min (off + cnksiz, len (pt))
643 _n, cnk = encryptor.process (pt [off:upto])
646 cnk, header, fixed = encryptor.done (header_dummy)
647 return ct + cnk, header, fixed
649 ct_1, hdr_1, _____ = enc (orig_pt_1)
650 ct_2, hdr_2, fixed = enc (orig_pt_2)
652 mut_hdr_2 = bytearray (hdr_2)
653 mut_hdr_2_vw = memoryview (mut_hdr_2)
655 iv_lo = crypto.HDR_OFF_IV
656 iv_hi = crypto.HDR_OFF_IV + crypto.PDTCRYPT_HDR_SIZE_IV
657 iv_1 = hdr_1 [iv_lo : iv_hi]
658 # transplant into other header
659 mut_hdr_2_vw [iv_lo : iv_hi] = iv_1
660 hdr_2_mod = bytes (mut_hdr_2)
661 decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
668 while off < len (ct):
669 upto = min (off + cnksiz, len (ct))
670 cnk = decryptor.process (ct [off:upto])
673 return pt + decryptor.done ()
675 decr_pt_1 = dec (hdr_1, ct_1)
676 decr_pt_2 = dec (hdr_2, ct_2) # good header, different IV
677 with self.assertRaises (crypto.DuplicateIV): # bad header, reuse detected
678 decr_pt_2 = dec (hdr_2_mod, ct_2)
681 class HeaderTest (CryptoLayerTest):
683 def test_crypto_fmt_hdr_make (self):
685 ok, hdr = crypto.hdr_make (meta)
687 assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
690 def test_crypto_fmt_hdr_make_useless (self):
691 ok, ret = crypto.hdr_make ({ 42: "x" })
693 assert ret.startswith ("error assembling header:")
696 def test_crypto_fmt_hdr_read (self):
698 ok, hdr = crypto.hdr_make (meta)
700 assert hdr is not None
701 mmeta = crypto.hdr_read (hdr)
702 assert mmeta is not None
704 if meta [k] != mmeta [k]:
705 raise "header mismatch after reading: expected %r, got %r" \
706 % (meta [k], mmeta [k])
709 def test_crypto_fmt_hdr_read_trailing_garbage (self):
711 ok, hdr = crypto.hdr_make (meta)
712 ok, hdr = crypto.hdr_make (meta)
714 assert hdr is not None
716 with self.assertRaises (crypto.InvalidHeader):
717 _ = crypto.hdr_read (hdr)
720 def test_crypto_fmt_hdr_read_leading_garbage (self):
722 ok, hdr = crypto.hdr_make (meta)
723 ok, hdr = crypto.hdr_make (meta)
725 assert hdr is not None
727 with self.assertRaises (crypto.InvalidHeader):
728 _ = crypto.hdr_read (hdr)
731 def test_crypto_fmt_hdr_inner_garbage (self):
733 ok, hdr = crypto.hdr_make (meta)
735 data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
736 with self.assertRaises (crypto.InvalidHeader):
737 _ = crypto.hdr_read (data)