7 import deltatar.crypto as crypto
12 return s.encode("UTF-8")
17 TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail")
18 TEST_PASSPHRASE = b"test1234"
19 TEST_AES_GCM_AAD = b"authenticated plain text"
20 TEST_DUMMY_FILENAME = "insurance-file.txt"
23 TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE)
25 def faux_hdr (ctsize=1337, iv=None):
28 , "paramversion" : 2187
29 , "nacl" : binascii.unhexlify(b"0011223344556677"
31 , "iv" : iv or binascii.unhexlify(b"0011223344556677"
34 , "tag" : binascii.unhexlify(b"deadbeefbadb100d"
40 def fill_mod (n, off=0):
43 m = FILL_MOD_MEMO.get (k, None)
47 bufv = memoryview (buf)
51 struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
61 class CryptoLayerTest (unittest.TestCase):
65 class AESGCMTest (CryptoLayerTest):
67 os_urandom = os.urandom
70 """Reset globals altered for testing."""
71 _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
72 ("I am fully aware that this will void my warranty.")
73 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
74 ("I am fully aware that this will void my warranty.")
75 os.urandom = self.os_urandom
77 def test_crypto_aes_gcm_enc_ctor (self):
78 password = str (os.urandom (42))
79 encryptor = crypto.Encrypt (TEST_VERSION,
82 nacl=TEST_STATIC_NACL)
85 def test_crypto_aes_gcm_enc_ctor_key (self):
87 encryptor = crypto.Encrypt (TEST_VERSION,
90 nacl=TEST_STATIC_NACL)
93 def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
95 Either key (+nacl) or password must be supplied, not both.
98 encryptor = crypto.Encrypt (TEST_VERSION,
100 nacl=TEST_STATIC_NACL)
101 except crypto.InvalidParameter: # neither key nor pw
104 password = str (os.urandom (42))
105 key = os.urandom (16) # scrypt sized
107 encryptor = crypto.Encrypt (TEST_VERSION,
111 nacl=TEST_STATIC_NACL)
112 except crypto.InvalidParameter: # both key and pw
116 encryptor = crypto.Encrypt (TEST_VERSION,
120 except crypto.InvalidParameter: # key, but salt missing
124 encryptor = crypto.Encrypt (TEST_VERSION,
127 nacl=TEST_STATIC_NACL)
128 except crypto.InvalidParameter: # empty pw
132 def test_crypto_aes_gcm_enc_header_size (self):
133 password = str (os.urandom (42))
134 encryptor = crypto.Encrypt (TEST_VERSION,
137 nacl=TEST_STATIC_NACL)
139 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
140 assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
141 _, _ = encryptor.process (TEST_PLAINTEXT)
142 _, header, _ = encryptor.done (header_dummy)
143 assert len (header) == crypto.PDTCRYPT_HDR_SIZE
146 def test_crypto_aes_gcm_enc_chunk_size (self):
147 password = str (os.urandom (42))
148 encryptor = crypto.Encrypt (TEST_VERSION,
151 nacl=TEST_STATIC_NACL)
153 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
154 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
155 assert len (ciphertext) == len (TEST_PLAINTEXT)
156 rest, header, fixed = encryptor.done (header_dummy)
157 assert len (rest) == 0
160 def test_crypto_aes_gcm_dec_ctor (self):
162 Ensure that only either key or password is accepted.
164 password = str (os.urandom (42))
165 key = os.urandom (16) # scrypt sized
167 decryptor = crypto.Decrypt (password=password)
168 decryptor = crypto.Decrypt (key=key)
171 decryptor = crypto.Decrypt (password=password, key=key)
172 except crypto.InvalidParameter: # both password and key
176 decryptor = crypto.Decrypt (password=None, key=None)
177 except crypto.InvalidParameter: # neither password nor key
181 decryptor = crypto.Decrypt (password="")
182 except crypto.InvalidParameter: # empty password
186 def test_crypto_aes_gcm_dec_simple (self):
187 password = str (os.urandom (42))
188 encryptor = crypto.Encrypt (TEST_VERSION,
191 nacl=TEST_STATIC_NACL)
193 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
194 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
195 rest, header, fixed = encryptor.done (header_dummy)
198 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
199 decryptor.next (header)
200 plaintext = decryptor.process (ciphertext)
201 rest = decryptor.done ()
204 assert plaintext == TEST_PLAINTEXT
207 def test_crypto_aes_gcm_dec_bad_tag (self):
208 password = str (os.urandom (42))
209 encryptor = crypto.Encrypt (TEST_VERSION,
212 nacl=TEST_STATIC_NACL)
214 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
215 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
216 ciphertext2, header, fixed = encryptor.done (header_dummy)
218 mut_header = bytearray (header)
219 mut_header_vw = memoryview (mut_header)
220 # replace one byte in the tag part of the header
221 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
222 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
223 header = bytes (mut_header)
225 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
226 decryptor.next (header)
227 plaintext = decryptor.process (ciphertext)
229 _ = decryptor.done ()
230 except crypto.InvalidGCMTag:
234 def test_crypto_aes_gcm_enc_multicnk (self):
236 pt = fill_mod (1 << 14)
237 password = str (os.urandom (42))
238 encryptor = crypto.Encrypt (TEST_VERSION,
241 nacl=TEST_STATIC_NACL)
242 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
246 while off < len (pt):
247 upto = min (off + cnksiz, len (pt))
248 _, cnk = encryptor.process (pt [off:upto])
251 cnk, header, fixed = encryptor.done (header_dummy)
254 assert len (pt) == len (ct)
257 def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
259 pt = fill_mod (1 << 14)
260 password = str (os.urandom (42))
261 encryptor = crypto.Encrypt (TEST_VERSION,
264 nacl=TEST_STATIC_NACL,
266 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
270 while off < len (pt):
271 upto = min (off + cnksiz, len (pt))
272 _, cnk = encryptor.process (pt [off:upto])
275 cnk, header, fixed = encryptor.done (header_dummy)
278 assert len (pt) == len (ct)
281 def test_crypto_aes_gcm_enc_multiobj (self):
283 password = str (os.urandom (42))
284 encryptor = crypto.Encrypt (TEST_VERSION,
287 nacl=TEST_STATIC_NACL,
291 pt = fill_mod (1 << 14, off=i)
292 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
296 while off < len (pt):
297 upto = min (off + cnksiz, len (pt))
298 _, cnk = encryptor.process (pt [off:upto])
301 cnk, header, fixed = encryptor.done (header_dummy)
304 assert len (pt) == len (ct)
306 for i in range (5): addobj (i)
308 assert len (encryptor.fixed) == 1
311 def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
313 password = str (os.urandom (42))
314 encryptor = crypto.Encrypt (TEST_VERSION,
317 nacl=TEST_STATIC_NACL,
319 curfixed = None # must remain constant after first
322 pt = fill_mod (1 << 14, off=i)
323 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
327 while off < len (pt):
328 upto = min (off + cnksiz, len (pt))
329 _, cnk = encryptor.process (pt [off:upto])
332 cnk, header, fixed = encryptor.done (header_dummy)
337 assert fixed == curfixed
340 assert len (pt) == len (ct)
342 for i in range (5): addobj (i)
344 assert len (encryptor.fixed) == 1
347 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
349 Test behavior when the file counter tops out.
351 Artificially lower the maximum possible file counter. Considering
352 invalid (0) and reserved (1, 2) values, the smallest possible file counter
353 for normal objects is 3. Starting from that, the header of the (max -
354 3)rd object must have both a different IV fixed part and a counter.
358 crypto._testing_set_AES_GCM_IV_CNT_MAX \
359 ("I am fully aware that this will void my warranty.", new_max)
361 password = str (os.urandom (42))
362 encryptor = crypto.Encrypt (TEST_VERSION,
365 nacl=TEST_STATIC_NACL,
371 def addobj (i, wrap=False):
374 pt = fill_mod (1 << 14, off=i)
375 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
379 while off < len (pt):
380 upto = min (off + cnksiz, len (pt))
381 _, cnk = encryptor.process (pt [off:upto])
384 cnk, header, fixed = encryptor.done (header_dummy)
385 this_iv = crypto.hdr_read (header) ["iv"]
386 if last_iv is not None:
387 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
388 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
390 assert last_fixed == this_fixed
391 assert last_cnt == this_cnt - 1
393 assert last_fixed != this_fixed
394 assert this_cnt == minimum
398 assert len (pt) == len (ct)
400 for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
401 addobj (i + 1, True) # counter wraps to 3
403 for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
404 addobj (j + 1, True) # counter wraps to 3 again
406 assert len (encryptor.fixed) == 3
409 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self):
411 Test behavior when the file counter tops out and the transition to
412 the next IV fixed part fails on account of a bad random generator.
414 Replaces the ``urandom`` reference in ``os`` with a deterministic
415 function. The encryptor context must communicate this condition with an
416 ``IVFixedPartError``.
420 crypto._testing_set_AES_GCM_IV_CNT_MAX \
421 ("I am fully aware that this will void my warranty.", new_max)
423 os.urandom = lambda n: bytes (bytearray ([n % 256] * n))
424 password = str (os.urandom (42))
425 encryptor = crypto.Encrypt (TEST_VERSION,
428 nacl=TEST_STATIC_NACL,
432 pt = fill_mod (1 << 14, off=i)
433 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
436 while off < len (pt):
437 upto = min (off + cnksiz, len (pt))
438 _, cnk = encryptor.process (pt [off:upto])
441 for i in range (minimum, new_max): addobj (42 + i)
443 with self.assertRaises (crypto.IVFixedPartError):
448 def test_crypto_aes_gcm_enc_length_cap (self):
450 Artificially lower the maximum allowable data length and attempt to
451 encrypt a larger object. Verify that the crypto handler aborts with and
455 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
456 ("I am fully aware that this will void my warranty.", new_max)
458 password = str (os.urandom (42))
459 encryptor = crypto.Encrypt (TEST_VERSION,
462 nacl=TEST_STATIC_NACL)
465 pt, ct = fill_mod (s), None
466 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
468 n, ct = encryptor.process (pt)
469 rest, _, _ = encryptor.done (header_dummy)
472 if len (pt) > new_max:
475 assert n == len (pt) == len (ct)
477 for i in range (16): encobj (1 << i)
480 def test_crypto_aes_gcm_dec_multicnk (self):
482 orig_pt = fill_mod (1 << 14)
483 password = str (os.urandom (42))
484 encryptor = crypto.Encrypt (TEST_VERSION,
487 nacl=TEST_STATIC_NACL)
488 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
492 while off < len (orig_pt):
493 upto = min (off + cnksiz, len (orig_pt))
494 _n, cnk = encryptor.process (orig_pt [off:upto])
497 cnk, header, fixed = encryptor.done (header_dummy)
500 decryptor = crypto.Decrypt (password=password,
502 decryptor.next (header)
505 while off < len (orig_pt):
506 upto = min (off + cnksiz, len (orig_pt))
507 cnk = decryptor.process (ct [off:upto])
512 pt += decryptor.done ()
516 def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
518 orig_pt = fill_mod (1 << 14)
519 password = str (os.urandom (42))
520 encryptor = crypto.Encrypt (TEST_VERSION,
523 nacl=TEST_STATIC_NACL)
524 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
528 while off < len (orig_pt):
529 upto = min (off + cnksiz, len (orig_pt))
530 _n, cnk = encryptor.process (orig_pt [off:upto])
533 cnk, header, fixed = encryptor.done (header_dummy)
536 mut_header = bytearray (header)
537 mut_header_vw = memoryview (mut_header)
538 # replace one byte in the tag part of the header
539 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
540 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
541 header = bytes (mut_header)
543 decryptor = crypto.Decrypt (password=password,
545 decryptor.next (header)
548 while off < len (orig_pt):
549 upto = min (off + cnksiz, len (orig_pt))
550 cnk = decryptor.process (ct [off:upto])
555 _ = decryptor.done ()
556 except crypto.InvalidGCMTag:
560 def test_crypto_aes_gcm_dec_iv_reuse (self):
562 Meddle with encrypted content: extract the IV from one object
563 and inject it into the header of another. This must be rejected
567 orig_pt_1 = fill_mod (1 << 10)
568 orig_pt_2 = fill_mod (1 << 10, 42)
569 password = str (os.urandom (42))
570 encryptor = crypto.Encrypt (TEST_VERSION,
573 nacl=TEST_STATIC_NACL)
576 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
580 while off < len (pt):
581 upto = min (off + cnksiz, len (pt))
582 _n, cnk = encryptor.process (pt [off:upto])
585 cnk, header, fixed = encryptor.done (header_dummy)
586 return ct + cnk, header, fixed
588 ct_1, hdr_1, _____ = enc (orig_pt_1)
589 ct_2, hdr_2, fixed = enc (orig_pt_2)
591 mut_hdr_2 = bytearray (hdr_2)
592 mut_hdr_2_vw = memoryview (mut_hdr_2)
594 iv_lo = crypto.HDR_OFF_IV
595 iv_hi = crypto.HDR_OFF_IV + crypto.PDTCRYPT_HDR_SIZE_IV
596 iv_1 = hdr_1 [iv_lo : iv_hi]
597 # transplant into other header
598 mut_hdr_2_vw [iv_lo : iv_hi] = iv_1
599 hdr_2_mod = bytes (mut_hdr_2)
600 decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
607 while off < len (ct):
608 upto = min (off + cnksiz, len (ct))
609 cnk = decryptor.process (ct [off:upto])
612 return pt + decryptor.done ()
614 decr_pt_1 = dec (hdr_1, ct_1)
615 decr_pt_2 = dec (hdr_2, ct_2) # good header, different IV
617 decr_pt_2 = dec (hdr_2_mod, ct_2)
618 except crypto.DuplicateIV: # bad header, reuse detected
622 class HeaderTest (CryptoLayerTest):
624 def test_crypto_fmt_hdr_make (self):
626 ok, hdr = crypto.hdr_make (meta)
628 assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
631 def test_crypto_fmt_hdr_make_useless (self):
632 ok, ret = crypto.hdr_make ({ 42: "x" })
634 assert ret.startswith ("error assembling header:")
637 def test_crypto_fmt_hdr_read (self):
639 ok, hdr = crypto.hdr_make (meta)
641 assert hdr is not None
642 mmeta = crypto.hdr_read (hdr)
643 assert mmeta is not None
645 if meta [k] != mmeta [k]:
646 raise "header mismatch after reading: expected %r, got %r" \
647 % (meta [k], mmeta [k])
650 def test_crypto_fmt_hdr_read_trailing_garbage (self):
652 ok, hdr = crypto.hdr_make (meta)
653 ok, hdr = crypto.hdr_make (meta)
655 assert hdr is not None
658 _ = crypto.hdr_read (hdr)
659 except crypto.InvalidHeader:
663 def test_crypto_fmt_hdr_read_leading_garbage (self):
665 ok, hdr = crypto.hdr_make (meta)
666 ok, hdr = crypto.hdr_make (meta)
668 assert hdr is not None
671 _ = crypto.hdr_read (hdr)
672 except crypto.InvalidHeader:
676 def test_crypto_fmt_hdr_inner_garbage (self):
678 ok, hdr = crypto.hdr_make (meta)
680 data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
682 _ = crypto.hdr_read (data)
683 except crypto.InvalidHeader: