7 import deltatar.crypto as crypto
12 return s.encode("UTF-8")
17 TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail")
18 TEST_PASSPHRASE = b"test1234"
19 TEST_AES_GCM_AAD = b"authenticated plain text"
20 TEST_DUMMY_FILENAME = "insurance-file.txt"
23 TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE)
25 def faux_hdr (ctsize=1337, iv=None):
28 , "paramversion" : 2187
29 , "nacl" : binascii.unhexlify(b"0011223344556677"
31 , "iv" : iv or binascii.unhexlify(b"0011223344556677"
34 , "tag" : binascii.unhexlify(b"deadbeefbadb100d"
40 def fill_mod (n, off=0):
43 m = FILL_MOD_MEMO.get (k, None)
47 bufv = memoryview (buf)
51 struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
61 class CryptoLayerTest (unittest.TestCase):
65 class AESGCMTest (CryptoLayerTest):
67 os_urandom = os.urandom
70 """Reset globals altered for testing."""
71 _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
72 ("I am fully aware that this will void my warranty.")
73 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
74 ("I am fully aware that this will void my warranty.")
75 os.urandom = self.os_urandom
77 def test_crypto_aes_gcm_enc_ctor (self):
78 password = str (os.urandom (42))
79 encryptor = crypto.Encrypt (TEST_VERSION,
82 nacl=TEST_STATIC_NACL)
85 def test_crypto_aes_gcm_enc_ctor_key (self):
87 encryptor = crypto.Encrypt (TEST_VERSION,
90 nacl=TEST_STATIC_NACL)
93 def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
95 Either key (+nacl) or password must be supplied, not both.
97 with self.assertRaises (crypto.InvalidParameter): # neither key nor pw
98 encryptor = crypto.Encrypt (TEST_VERSION,
100 nacl=TEST_STATIC_NACL)
102 password = str (os.urandom (42))
103 key = os.urandom (16) # scrypt sized
104 with self.assertRaises (crypto.InvalidParameter): # both key and pw
105 encryptor = crypto.Encrypt (TEST_VERSION,
109 nacl=TEST_STATIC_NACL)
111 with self.assertRaises (crypto.InvalidParameter): # key, but salt missing
112 encryptor = crypto.Encrypt (TEST_VERSION,
117 with self.assertRaises (crypto.InvalidParameter): # empty pw
118 encryptor = crypto.Encrypt (TEST_VERSION,
121 nacl=TEST_STATIC_NACL)
124 def test_crypto_aes_gcm_enc_header_size (self):
125 password = str (os.urandom (42))
126 encryptor = crypto.Encrypt (TEST_VERSION,
129 nacl=TEST_STATIC_NACL)
131 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
132 assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
133 _, _ = encryptor.process (TEST_PLAINTEXT)
134 _, header, _ = encryptor.done (header_dummy)
135 assert len (header) == crypto.PDTCRYPT_HDR_SIZE
138 def test_crypto_aes_gcm_enc_chunk_size (self):
139 password = str (os.urandom (42))
140 encryptor = crypto.Encrypt (TEST_VERSION,
143 nacl=TEST_STATIC_NACL)
145 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
146 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
147 assert len (ciphertext) == len (TEST_PLAINTEXT)
148 rest, header, fixed = encryptor.done (header_dummy)
149 assert len (rest) == 0
152 def test_crypto_aes_gcm_dec_ctor (self):
154 Ensure that only either key or password is accepted.
156 password = str (os.urandom (42))
157 key = os.urandom (16) # scrypt sized
159 decryptor = crypto.Decrypt (password=password)
160 decryptor = crypto.Decrypt (key=key)
162 with self.assertRaises (crypto.InvalidParameter): # both password and key
163 decryptor = crypto.Decrypt (password=password, key=key)
165 with self.assertRaises (crypto.InvalidParameter): # neither password nor key
166 decryptor = crypto.Decrypt (password=None, key=None)
168 with self.assertRaises (crypto.InvalidParameter): # # empty password
169 decryptor = crypto.Decrypt (password="")
172 def test_crypto_aes_gcm_dec_simple (self):
173 password = str (os.urandom (42))
174 encryptor = crypto.Encrypt (TEST_VERSION,
177 nacl=TEST_STATIC_NACL)
179 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
180 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
181 rest, header, fixed = encryptor.done (header_dummy)
184 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
185 decryptor.next (header)
186 plaintext = decryptor.process (ciphertext)
187 rest = decryptor.done ()
190 assert plaintext == TEST_PLAINTEXT
193 def test_crypto_aes_gcm_dec_bad_tag (self):
194 password = str (os.urandom (42))
195 encryptor = crypto.Encrypt (TEST_VERSION,
198 nacl=TEST_STATIC_NACL)
200 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
201 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
202 ciphertext2, header, fixed = encryptor.done (header_dummy)
204 mut_header = bytearray (header)
205 mut_header_vw = memoryview (mut_header)
206 # replace one byte in the tag part of the header
207 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
208 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
209 header = bytes (mut_header)
211 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
212 decryptor.next (header)
213 plaintext = decryptor.process (ciphertext)
214 with self.assertRaises (crypto.InvalidGCMTag):
215 _ = decryptor.done ()
218 def test_crypto_aes_gcm_enc_multicnk (self):
220 pt = fill_mod (1 << 14)
221 password = str (os.urandom (42))
222 encryptor = crypto.Encrypt (TEST_VERSION,
225 nacl=TEST_STATIC_NACL)
226 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
230 while off < len (pt):
231 upto = min (off + cnksiz, len (pt))
232 _, cnk = encryptor.process (pt [off:upto])
235 cnk, header, fixed = encryptor.done (header_dummy)
238 assert len (pt) == len (ct)
241 def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
243 pt = fill_mod (1 << 14)
244 password = str (os.urandom (42))
245 encryptor = crypto.Encrypt (TEST_VERSION,
248 nacl=TEST_STATIC_NACL,
250 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
254 while off < len (pt):
255 upto = min (off + cnksiz, len (pt))
256 _, cnk = encryptor.process (pt [off:upto])
259 cnk, header, fixed = encryptor.done (header_dummy)
262 assert len (pt) == len (ct)
265 def test_crypto_aes_gcm_enc_multiobj (self):
267 password = str (os.urandom (42))
268 encryptor = crypto.Encrypt (TEST_VERSION,
271 nacl=TEST_STATIC_NACL,
275 pt = fill_mod (1 << 14, off=i)
276 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
280 while off < len (pt):
281 upto = min (off + cnksiz, len (pt))
282 _, cnk = encryptor.process (pt [off:upto])
285 cnk, header, fixed = encryptor.done (header_dummy)
288 assert len (pt) == len (ct)
290 for i in range (5): addobj (i)
292 assert len (encryptor.fixed) == 1
295 def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
297 password = str (os.urandom (42))
298 encryptor = crypto.Encrypt (TEST_VERSION,
301 nacl=TEST_STATIC_NACL,
303 curfixed = None # must remain constant after first
306 pt = fill_mod (1 << 14, off=i)
307 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
311 while off < len (pt):
312 upto = min (off + cnksiz, len (pt))
313 _, cnk = encryptor.process (pt [off:upto])
316 cnk, header, fixed = encryptor.done (header_dummy)
321 assert fixed == curfixed
324 assert len (pt) == len (ct)
326 for i in range (5): addobj (i)
328 assert len (encryptor.fixed) == 1
331 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
333 Test behavior when the file counter tops out.
335 Artificially lower the maximum possible file counter. Considering
336 invalid (0) and reserved (1, 2) values, the smallest possible file counter
337 for normal objects is 3. Starting from that, the header of the (max -
338 3)rd object must have both a different IV fixed part and a counter.
342 crypto._testing_set_AES_GCM_IV_CNT_MAX \
343 ("I am fully aware that this will void my warranty.", new_max)
345 password = str (os.urandom (42))
346 encryptor = crypto.Encrypt (TEST_VERSION,
349 nacl=TEST_STATIC_NACL,
355 def addobj (i, wrap=False):
358 pt = fill_mod (1 << 14, off=i)
359 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
363 while off < len (pt):
364 upto = min (off + cnksiz, len (pt))
365 _, cnk = encryptor.process (pt [off:upto])
368 cnk, header, fixed = encryptor.done (header_dummy)
369 this_iv = crypto.hdr_read (header) ["iv"]
370 if last_iv is not None:
371 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
372 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
374 assert last_fixed == this_fixed
375 assert last_cnt == this_cnt - 1
377 assert last_fixed != this_fixed
378 assert this_cnt == minimum
382 assert len (pt) == len (ct)
384 for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
385 addobj (i + 1, True) # counter wraps to 3
387 for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
388 addobj (j + 1, True) # counter wraps to 3 again
390 assert len (encryptor.fixed) == 3
393 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self):
395 Test behavior when the file counter tops out and the transition to
396 the next IV fixed part fails on account of a bad random generator.
398 Replaces the ``urandom`` reference in ``os`` with a deterministic
399 function. The encryptor context must communicate this condition with an
400 ``IVFixedPartError``.
404 crypto._testing_set_AES_GCM_IV_CNT_MAX \
405 ("I am fully aware that this will void my warranty.", new_max)
407 os.urandom = lambda n: bytes (bytearray ([n % 256] * n))
408 password = str (os.urandom (42))
409 encryptor = crypto.Encrypt (TEST_VERSION,
412 nacl=TEST_STATIC_NACL,
416 pt = fill_mod (1 << 14, off=i)
417 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
420 while off < len (pt):
421 upto = min (off + cnksiz, len (pt))
422 _, cnk = encryptor.process (pt [off:upto])
425 for i in range (minimum, new_max): addobj (42 + i)
427 with self.assertRaises (crypto.IVFixedPartError):
432 def test_crypto_aes_gcm_enc_length_cap (self):
434 Artificially lower the maximum allowable data length and attempt to
435 encrypt a larger object. Verify that the crypto handler aborts with and
438 The sibling to this test is test_restore_backup_max_file_length()
439 in test_delatar.py. Deltatar will transparently create a splitted object
440 with an increased IV file counter.
443 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
444 ("I am fully aware that this will void my warranty.", new_max)
446 password = str (os.urandom (42))
447 encryptor = crypto.Encrypt (TEST_VERSION,
450 nacl=TEST_STATIC_NACL)
453 pt, ct = fill_mod (s), None
454 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
456 n, ct = encryptor.process (pt)
457 rest, _, _ = encryptor.done (header_dummy)
459 # NB: If this check *ever* fails, then something changed in the
460 # encoding layer. AES-GCM is a stream cipher so each encoding
461 # step will yield the exact number of ciphertext bytes that
462 # was provided as plaintext. Thus there cannot be any encoded
463 # data left when calling the finalizers. None of the crypo code
464 # depends on that assumption but nevertheless we check it here
465 # in case anything changes upstream in the Cryptography
466 # library. In case there actually is a rest, replace the
467 # assertion below with ``ct += rest``.
468 assert (len (rest) == 0)
470 if len (pt) > new_max:
473 assert n == len (pt) == len (ct)
475 for i in range (16): encobj (1 << i)
478 def test_crypto_aes_gcm_dec_multicnk (self):
480 orig_pt = fill_mod (1 << 14)
481 password = str (os.urandom (42))
482 encryptor = crypto.Encrypt (TEST_VERSION,
485 nacl=TEST_STATIC_NACL)
486 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
490 while off < len (orig_pt):
491 upto = min (off + cnksiz, len (orig_pt))
492 _n, cnk = encryptor.process (orig_pt [off:upto])
495 cnk, header, fixed = encryptor.done (header_dummy)
498 decryptor = crypto.Decrypt (password=password,
500 decryptor.next (header)
503 while off < len (orig_pt):
504 upto = min (off + cnksiz, len (orig_pt))
505 cnk = decryptor.process (ct [off:upto])
510 pt += decryptor.done ()
514 def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
516 orig_pt = fill_mod (1 << 14)
517 password = str (os.urandom (42))
518 encryptor = crypto.Encrypt (TEST_VERSION,
521 nacl=TEST_STATIC_NACL)
522 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
526 while off < len (orig_pt):
527 upto = min (off + cnksiz, len (orig_pt))
528 _n, cnk = encryptor.process (orig_pt [off:upto])
531 cnk, header, fixed = encryptor.done (header_dummy)
534 mut_header = bytearray (header)
535 mut_header_vw = memoryview (mut_header)
536 # replace one byte in the tag part of the header
537 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
538 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
539 header = bytes (mut_header)
541 decryptor = crypto.Decrypt (password=password,
543 decryptor.next (header)
546 while off < len (orig_pt):
547 upto = min (off + cnksiz, len (orig_pt))
548 cnk = decryptor.process (ct [off:upto])
552 with self.assertRaises (crypto.InvalidGCMTag):
553 _ = decryptor.done ()
556 def test_crypto_aes_gcm_dec_iv_reuse (self):
558 Meddle with encrypted content: extract the IV from one object
559 and inject it into the header of another. This must be rejected
563 orig_pt_1 = fill_mod (1 << 10)
564 orig_pt_2 = fill_mod (1 << 10, 42)
565 password = str (os.urandom (42))
566 encryptor = crypto.Encrypt (TEST_VERSION,
569 nacl=TEST_STATIC_NACL)
572 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
576 while off < len (pt):
577 upto = min (off + cnksiz, len (pt))
578 _n, cnk = encryptor.process (pt [off:upto])
581 cnk, header, fixed = encryptor.done (header_dummy)
582 return ct + cnk, header, fixed
584 ct_1, hdr_1, _____ = enc (orig_pt_1)
585 ct_2, hdr_2, fixed = enc (orig_pt_2)
587 mut_hdr_2 = bytearray (hdr_2)
588 mut_hdr_2_vw = memoryview (mut_hdr_2)
590 iv_lo = crypto.HDR_OFF_IV
591 iv_hi = crypto.HDR_OFF_IV + crypto.PDTCRYPT_HDR_SIZE_IV
592 iv_1 = hdr_1 [iv_lo : iv_hi]
593 # transplant into other header
594 mut_hdr_2_vw [iv_lo : iv_hi] = iv_1
595 hdr_2_mod = bytes (mut_hdr_2)
596 decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
603 while off < len (ct):
604 upto = min (off + cnksiz, len (ct))
605 cnk = decryptor.process (ct [off:upto])
608 return pt + decryptor.done ()
610 decr_pt_1 = dec (hdr_1, ct_1)
611 decr_pt_2 = dec (hdr_2, ct_2) # good header, different IV
612 with self.assertRaises (crypto.DuplicateIV): # bad header, reuse detected
613 decr_pt_2 = dec (hdr_2_mod, ct_2)
616 class HeaderTest (CryptoLayerTest):
618 def test_crypto_fmt_hdr_make (self):
620 ok, hdr = crypto.hdr_make (meta)
622 assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
625 def test_crypto_fmt_hdr_make_useless (self):
626 ok, ret = crypto.hdr_make ({ 42: "x" })
628 assert ret.startswith ("error assembling header:")
631 def test_crypto_fmt_hdr_read (self):
633 ok, hdr = crypto.hdr_make (meta)
635 assert hdr is not None
636 mmeta = crypto.hdr_read (hdr)
637 assert mmeta is not None
639 if meta [k] != mmeta [k]:
640 raise "header mismatch after reading: expected %r, got %r" \
641 % (meta [k], mmeta [k])
644 def test_crypto_fmt_hdr_read_trailing_garbage (self):
646 ok, hdr = crypto.hdr_make (meta)
647 ok, hdr = crypto.hdr_make (meta)
649 assert hdr is not None
651 with self.assertRaises (crypto.InvalidHeader):
652 _ = crypto.hdr_read (hdr)
655 def test_crypto_fmt_hdr_read_leading_garbage (self):
657 ok, hdr = crypto.hdr_make (meta)
658 ok, hdr = crypto.hdr_make (meta)
660 assert hdr is not None
662 with self.assertRaises (crypto.InvalidHeader):
663 _ = crypto.hdr_read (hdr)
666 def test_crypto_fmt_hdr_inner_garbage (self):
668 ok, hdr = crypto.hdr_make (meta)
670 data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
671 with self.assertRaises (crypto.InvalidHeader):
672 _ = crypto.hdr_read (data)