7 import deltatar.crypto as crypto
12 return s.encode("UTF-8")
17 TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail")
18 TEST_PASSPHRASE = b"test1234"
19 TEST_AES_GCM_AAD = b"authenticated plain text"
20 TEST_DUMMY_FILENAME = "insurance-file.txt"
23 TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE)
25 def faux_hdr (ctsize=1337, iv=None):
28 , "paramversion" : 2187
29 , "nacl" : binascii.unhexlify(b"0011223344556677"
31 , "iv" : iv or binascii.unhexlify(b"0011223344556677"
34 , "tag" : binascii.unhexlify(b"deadbeefbadb100d"
40 def fill_mod (n, off=0):
43 m = FILL_MOD_MEMO.get (k, None)
47 bufv = memoryview (buf)
51 struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
61 class CryptoLayerTest (unittest.TestCase):
65 class AESGCMTest (CryptoLayerTest):
67 os_urandom = os.urandom
70 """Reset globals altered for testing."""
71 _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
72 ("I am fully aware that this will void my warranty.")
73 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
74 ("I am fully aware that this will void my warranty.")
75 os.urandom = self.os_urandom
77 def test_crypto_aes_gcm_enc_ctor (self):
78 password = str (os.urandom (42))
79 encryptor = crypto.Encrypt (TEST_VERSION,
82 nacl=TEST_STATIC_NACL)
85 def test_crypto_aes_gcm_enc_ctor_key (self):
87 encryptor = crypto.Encrypt (TEST_VERSION,
90 nacl=TEST_STATIC_NACL)
93 def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
95 Either key (+nacl) or password must be supplied, not both.
97 with self.assertRaises (crypto.InvalidParameter): # neither key nor pw
98 encryptor = crypto.Encrypt (TEST_VERSION,
100 nacl=TEST_STATIC_NACL)
102 password = str (os.urandom (42))
103 key = os.urandom (16) # scrypt sized
104 with self.assertRaises (crypto.InvalidParameter): # both key and pw
105 encryptor = crypto.Encrypt (TEST_VERSION,
109 nacl=TEST_STATIC_NACL)
111 with self.assertRaises (crypto.InvalidParameter): # key, but salt missing
112 encryptor = crypto.Encrypt (TEST_VERSION,
117 with self.assertRaises (crypto.InvalidParameter): # empty pw
118 encryptor = crypto.Encrypt (TEST_VERSION,
121 nacl=TEST_STATIC_NACL)
124 def test_crypto_aes_gcm_enc_header_size (self):
125 password = str (os.urandom (42))
126 encryptor = crypto.Encrypt (TEST_VERSION,
129 nacl=TEST_STATIC_NACL)
131 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
132 assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
133 _, _ = encryptor.process (TEST_PLAINTEXT)
134 _, header, _ = encryptor.done (header_dummy)
135 assert len (header) == crypto.PDTCRYPT_HDR_SIZE
138 def test_crypto_aes_gcm_enc_chunk_size (self):
139 password = str (os.urandom (42))
140 encryptor = crypto.Encrypt (TEST_VERSION,
143 nacl=TEST_STATIC_NACL)
145 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
146 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
147 assert len (ciphertext) == len (TEST_PLAINTEXT)
148 rest, header, fixed = encryptor.done (header_dummy)
149 assert len (rest) == 0
152 def test_crypto_aes_gcm_dec_ctor (self):
154 Ensure that only either key or password is accepted.
156 password = str (os.urandom (42))
157 key = os.urandom (16) # scrypt sized
159 decryptor = crypto.Decrypt (password=password)
160 decryptor = crypto.Decrypt (key=key)
162 with self.assertRaises (crypto.InvalidParameter): # both password and key
163 decryptor = crypto.Decrypt (password=password, key=key)
165 with self.assertRaises (crypto.InvalidParameter): # neither password nor key
166 decryptor = crypto.Decrypt (password=None, key=None)
168 with self.assertRaises (crypto.InvalidParameter): # # empty password
169 decryptor = crypto.Decrypt (password="")
172 def test_crypto_aes_gcm_dec_simple (self):
173 password = str (os.urandom (42))
174 encryptor = crypto.Encrypt (TEST_VERSION,
177 nacl=TEST_STATIC_NACL)
179 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
180 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
181 rest, header, fixed = encryptor.done (header_dummy)
184 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
185 decryptor.next (header)
186 plaintext = decryptor.process (ciphertext)
187 rest = decryptor.done ()
190 assert plaintext == TEST_PLAINTEXT
193 def test_crypto_aes_gcm_dec_bad_tag (self):
194 password = str (os.urandom (42))
195 encryptor = crypto.Encrypt (TEST_VERSION,
198 nacl=TEST_STATIC_NACL)
200 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
201 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
202 ciphertext2, header, fixed = encryptor.done (header_dummy)
204 mut_header = bytearray (header)
205 mut_header_vw = memoryview (mut_header)
206 # replace one byte in the tag part of the header
207 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
208 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
209 header = bytes (mut_header)
211 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
212 decryptor.next (header)
213 plaintext = decryptor.process (ciphertext)
214 with self.assertRaises (crypto.InvalidGCMTag):
215 _ = decryptor.done ()
218 def test_crypto_aes_gcm_enc_multicnk (self):
220 pt = fill_mod (1 << 14)
221 password = str (os.urandom (42))
222 encryptor = crypto.Encrypt (TEST_VERSION,
225 nacl=TEST_STATIC_NACL)
226 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
230 while off < len (pt):
231 upto = min (off + cnksiz, len (pt))
232 _, cnk = encryptor.process (pt [off:upto])
235 cnk, header, fixed = encryptor.done (header_dummy)
238 assert len (pt) == len (ct)
241 def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
243 pt = fill_mod (1 << 14)
244 password = str (os.urandom (42))
245 encryptor = crypto.Encrypt (TEST_VERSION,
248 nacl=TEST_STATIC_NACL,
250 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
254 while off < len (pt):
255 upto = min (off + cnksiz, len (pt))
256 _, cnk = encryptor.process (pt [off:upto])
259 cnk, header, fixed = encryptor.done (header_dummy)
262 assert len (pt) == len (ct)
265 def test_crypto_aes_gcm_enc_multiobj (self):
267 password = str (os.urandom (42))
268 encryptor = crypto.Encrypt (TEST_VERSION,
271 nacl=TEST_STATIC_NACL,
275 pt = fill_mod (1 << 14, off=i)
276 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
280 while off < len (pt):
281 upto = min (off + cnksiz, len (pt))
282 _, cnk = encryptor.process (pt [off:upto])
285 cnk, header, fixed = encryptor.done (header_dummy)
288 assert len (pt) == len (ct)
290 for i in range (5): addobj (i)
292 assert len (encryptor.fixed) == 1
295 def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
297 password = str (os.urandom (42))
298 encryptor = crypto.Encrypt (TEST_VERSION,
301 nacl=TEST_STATIC_NACL,
303 curfixed = None # must remain constant after first
306 pt = fill_mod (1 << 14, off=i)
307 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
311 while off < len (pt):
312 upto = min (off + cnksiz, len (pt))
313 _, cnk = encryptor.process (pt [off:upto])
316 cnk, header, fixed = encryptor.done (header_dummy)
321 assert fixed == curfixed
324 assert len (pt) == len (ct)
326 for i in range (5): addobj (i)
328 assert len (encryptor.fixed) == 1
331 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
333 Test behavior when the file counter tops out.
335 Artificially lower the maximum possible file counter. Considering
336 invalid (0) and reserved (1, 2) values, the smallest possible file counter
337 for normal objects is 3. Starting from that, the header of the (max -
338 3)rd object must have both a different IV fixed part and a counter.
342 crypto._testing_set_AES_GCM_IV_CNT_MAX \
343 ("I am fully aware that this will void my warranty.", new_max)
345 password = str (os.urandom (42))
346 encryptor = crypto.Encrypt (TEST_VERSION,
349 nacl=TEST_STATIC_NACL,
355 def addobj (i, wrap=False):
358 pt = fill_mod (1 << 14, off=i)
359 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
363 while off < len (pt):
364 upto = min (off + cnksiz, len (pt))
365 _, cnk = encryptor.process (pt [off:upto])
368 cnk, header, fixed = encryptor.done (header_dummy)
369 this_iv = crypto.hdr_read (header) ["iv"]
370 if last_iv is not None:
371 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
372 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
374 assert last_fixed == this_fixed
375 assert last_cnt == this_cnt - 1
377 assert last_fixed != this_fixed
378 assert this_cnt == minimum
382 assert len (pt) == len (ct)
384 for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
385 addobj (i + 1, True) # counter wraps to 3
387 for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
388 addobj (j + 1, True) # counter wraps to 3 again
390 assert len (encryptor.fixed) == 3
393 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self):
395 Test behavior when the file counter tops out and the transition to
396 the next IV fixed part fails on account of a bad random generator.
398 Replaces the ``urandom`` reference in ``os`` with a deterministic
399 function. The encryptor context must communicate this condition with an
400 ``IVFixedPartError``.
404 crypto._testing_set_AES_GCM_IV_CNT_MAX \
405 ("I am fully aware that this will void my warranty.", new_max)
407 os.urandom = lambda n: bytes (bytearray ([n % 256] * n))
408 password = str (os.urandom (42))
409 encryptor = crypto.Encrypt (TEST_VERSION,
412 nacl=TEST_STATIC_NACL,
416 pt = fill_mod (1 << 14, off=i)
417 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
420 while off < len (pt):
421 upto = min (off + cnksiz, len (pt))
422 _, cnk = encryptor.process (pt [off:upto])
425 for i in range (minimum, new_max): addobj (42 + i)
427 with self.assertRaises (crypto.IVFixedPartError):
432 def test_crypto_aes_gcm_enc_length_cap (self):
434 Artificially lower the maximum allowable data length and attempt to
435 encrypt a larger object. Verify that the crypto handler aborts with and
438 The sibling to this test is test_restore_backup_max_file_length()
439 in test_delatar.py. Deltatar will transparently create a splitted object
440 with an increased IV file counter.
443 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
444 ("I am fully aware that this will void my warranty.", new_max)
446 password = str (os.urandom (42))
447 encryptor = crypto.Encrypt (TEST_VERSION,
450 nacl=TEST_STATIC_NACL)
453 pt, ct = fill_mod (s), None
454 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
456 n, ct = encryptor.process (pt)
457 rest, _, _ = encryptor.done (header_dummy)
460 if len (pt) > new_max:
463 assert n == len (pt) == len (ct)
465 for i in range (16): encobj (1 << i)
468 def test_crypto_aes_gcm_dec_multicnk (self):
470 orig_pt = fill_mod (1 << 14)
471 password = str (os.urandom (42))
472 encryptor = crypto.Encrypt (TEST_VERSION,
475 nacl=TEST_STATIC_NACL)
476 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
480 while off < len (orig_pt):
481 upto = min (off + cnksiz, len (orig_pt))
482 _n, cnk = encryptor.process (orig_pt [off:upto])
485 cnk, header, fixed = encryptor.done (header_dummy)
488 decryptor = crypto.Decrypt (password=password,
490 decryptor.next (header)
493 while off < len (orig_pt):
494 upto = min (off + cnksiz, len (orig_pt))
495 cnk = decryptor.process (ct [off:upto])
500 pt += decryptor.done ()
504 def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
506 orig_pt = fill_mod (1 << 14)
507 password = str (os.urandom (42))
508 encryptor = crypto.Encrypt (TEST_VERSION,
511 nacl=TEST_STATIC_NACL)
512 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
516 while off < len (orig_pt):
517 upto = min (off + cnksiz, len (orig_pt))
518 _n, cnk = encryptor.process (orig_pt [off:upto])
521 cnk, header, fixed = encryptor.done (header_dummy)
524 mut_header = bytearray (header)
525 mut_header_vw = memoryview (mut_header)
526 # replace one byte in the tag part of the header
527 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
528 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
529 header = bytes (mut_header)
531 decryptor = crypto.Decrypt (password=password,
533 decryptor.next (header)
536 while off < len (orig_pt):
537 upto = min (off + cnksiz, len (orig_pt))
538 cnk = decryptor.process (ct [off:upto])
542 with self.assertRaises (crypto.InvalidGCMTag):
543 _ = decryptor.done ()
546 def test_crypto_aes_gcm_dec_iv_reuse (self):
548 Meddle with encrypted content: extract the IV from one object
549 and inject it into the header of another. This must be rejected
553 orig_pt_1 = fill_mod (1 << 10)
554 orig_pt_2 = fill_mod (1 << 10, 42)
555 password = str (os.urandom (42))
556 encryptor = crypto.Encrypt (TEST_VERSION,
559 nacl=TEST_STATIC_NACL)
562 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
566 while off < len (pt):
567 upto = min (off + cnksiz, len (pt))
568 _n, cnk = encryptor.process (pt [off:upto])
571 cnk, header, fixed = encryptor.done (header_dummy)
572 return ct + cnk, header, fixed
574 ct_1, hdr_1, _____ = enc (orig_pt_1)
575 ct_2, hdr_2, fixed = enc (orig_pt_2)
577 mut_hdr_2 = bytearray (hdr_2)
578 mut_hdr_2_vw = memoryview (mut_hdr_2)
580 iv_lo = crypto.HDR_OFF_IV
581 iv_hi = crypto.HDR_OFF_IV + crypto.PDTCRYPT_HDR_SIZE_IV
582 iv_1 = hdr_1 [iv_lo : iv_hi]
583 # transplant into other header
584 mut_hdr_2_vw [iv_lo : iv_hi] = iv_1
585 hdr_2_mod = bytes (mut_hdr_2)
586 decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
593 while off < len (ct):
594 upto = min (off + cnksiz, len (ct))
595 cnk = decryptor.process (ct [off:upto])
598 return pt + decryptor.done ()
600 decr_pt_1 = dec (hdr_1, ct_1)
601 decr_pt_2 = dec (hdr_2, ct_2) # good header, different IV
602 with self.assertRaises (crypto.DuplicateIV): # bad header, reuse detected
603 decr_pt_2 = dec (hdr_2_mod, ct_2)
606 class HeaderTest (CryptoLayerTest):
608 def test_crypto_fmt_hdr_make (self):
610 ok, hdr = crypto.hdr_make (meta)
612 assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
615 def test_crypto_fmt_hdr_make_useless (self):
616 ok, ret = crypto.hdr_make ({ 42: "x" })
618 assert ret.startswith ("error assembling header:")
621 def test_crypto_fmt_hdr_read (self):
623 ok, hdr = crypto.hdr_make (meta)
625 assert hdr is not None
626 mmeta = crypto.hdr_read (hdr)
627 assert mmeta is not None
629 if meta [k] != mmeta [k]:
630 raise "header mismatch after reading: expected %r, got %r" \
631 % (meta [k], mmeta [k])
634 def test_crypto_fmt_hdr_read_trailing_garbage (self):
636 ok, hdr = crypto.hdr_make (meta)
637 ok, hdr = crypto.hdr_make (meta)
639 assert hdr is not None
641 with self.assertRaises (crypto.InvalidHeader):
642 _ = crypto.hdr_read (hdr)
645 def test_crypto_fmt_hdr_read_leading_garbage (self):
647 ok, hdr = crypto.hdr_make (meta)
648 ok, hdr = crypto.hdr_make (meta)
650 assert hdr is not None
652 with self.assertRaises (crypto.InvalidHeader):
653 _ = crypto.hdr_read (hdr)
656 def test_crypto_fmt_hdr_inner_garbage (self):
658 ok, hdr = crypto.hdr_make (meta)
660 data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
661 with self.assertRaises (crypto.InvalidHeader):
662 _ = crypto.hdr_read (data)