7 import deltatar.crypto as crypto
12 return s.encode("UTF-8")
17 TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail")
18 TEST_PASSPHRASE = b"test1234"
19 TEST_AES_GCM_AAD = b"authenticated plain text"
20 TEST_DUMMY_FILENAME = "insurance-file.txt"
23 TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE)
25 def faux_hdr (ctsize=1337, iv=None):
28 , "paramversion" : 2187
29 , "nacl" : binascii.unhexlify(b"0011223344556677"
31 , "iv" : iv or binascii.unhexlify(b"0011223344556677"
34 , "tag" : binascii.unhexlify(b"deadbeefbadb100d"
40 def fill_mod (n, off=0):
43 m = FILL_MOD_MEMO.get (k, None)
47 bufv = memoryview (buf)
51 struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
61 class CryptoLayerTest (unittest.TestCase):
65 class AESGCMTest (CryptoLayerTest):
67 os_urandom = os.urandom
70 """Reset globals altered for testing."""
71 _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
72 ("I am fully aware that this will void my warranty.")
73 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
74 ("I am fully aware that this will void my warranty.")
75 os.urandom = self.os_urandom
77 def test_crypto_aes_gcm_enc_ctor (self):
78 password = str (os.urandom (42))
79 encryptor = crypto.Encrypt (TEST_VERSION,
82 nacl=TEST_STATIC_NACL)
85 def test_crypto_aes_gcm_enc_ctor_key (self):
87 encryptor = crypto.Encrypt (TEST_VERSION,
90 nacl=TEST_STATIC_NACL)
93 def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
95 Either key (+nacl) or password must be supplied, not both.
97 with self.assertRaises (crypto.InvalidParameter): # neither key nor pw
98 encryptor = crypto.Encrypt (TEST_VERSION,
100 nacl=TEST_STATIC_NACL)
102 password = str (os.urandom (42))
103 key = os.urandom (16) # scrypt sized
104 with self.assertRaises (crypto.InvalidParameter): # both key and pw
105 encryptor = crypto.Encrypt (TEST_VERSION,
109 nacl=TEST_STATIC_NACL)
111 with self.assertRaises (crypto.InvalidParameter): # key, but salt missing
112 encryptor = crypto.Encrypt (TEST_VERSION,
117 with self.assertRaises (crypto.InvalidParameter): # empty pw
118 encryptor = crypto.Encrypt (TEST_VERSION,
121 nacl=TEST_STATIC_NACL)
124 def test_crypto_aes_gcm_enc_header_size (self):
125 password = str (os.urandom (42))
126 encryptor = crypto.Encrypt (TEST_VERSION,
129 nacl=TEST_STATIC_NACL)
131 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
132 assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
133 _, _ = encryptor.process (TEST_PLAINTEXT)
134 _, header, _ = encryptor.done (header_dummy)
135 assert len (header) == crypto.PDTCRYPT_HDR_SIZE
138 def test_crypto_aes_gcm_enc_chunk_size (self):
139 password = str (os.urandom (42))
140 encryptor = crypto.Encrypt (TEST_VERSION,
143 nacl=TEST_STATIC_NACL)
145 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
146 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
147 assert len (ciphertext) == len (TEST_PLAINTEXT)
148 rest, header, fixed = encryptor.done (header_dummy)
149 assert len (rest) == 0
152 def test_crypto_aes_gcm_dec_ctor (self):
154 Ensure that only either key or password is accepted.
156 password = str (os.urandom (42))
157 key = os.urandom (16) # scrypt sized
159 decryptor = crypto.Decrypt (password=password)
160 decryptor = crypto.Decrypt (key=key)
162 with self.assertRaises (crypto.InvalidParameter): # both password and key
163 decryptor = crypto.Decrypt (password=password, key=key)
165 with self.assertRaises (crypto.InvalidParameter): # neither password nor key
166 decryptor = crypto.Decrypt (password=None, key=None)
168 with self.assertRaises (crypto.InvalidParameter): # # empty password
169 decryptor = crypto.Decrypt (password="")
172 def test_crypto_aes_gcm_dec_simple (self):
173 password = str (os.urandom (42))
174 encryptor = crypto.Encrypt (TEST_VERSION,
177 nacl=TEST_STATIC_NACL)
179 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
180 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
181 rest, header, fixed = encryptor.done (header_dummy)
184 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
185 decryptor.next (header)
186 plaintext = decryptor.process (ciphertext)
187 rest = decryptor.done ()
190 assert plaintext == TEST_PLAINTEXT
193 def test_crypto_aes_gcm_dec_bad_tag (self):
194 password = str (os.urandom (42))
195 encryptor = crypto.Encrypt (TEST_VERSION,
198 nacl=TEST_STATIC_NACL)
200 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
201 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
202 ciphertext2, header, fixed = encryptor.done (header_dummy)
204 mut_header = bytearray (header)
205 mut_header_vw = memoryview (mut_header)
206 # replace one byte in the tag part of the header
207 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
208 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
209 header = bytes (mut_header)
211 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
212 decryptor.next (header)
213 plaintext = decryptor.process (ciphertext)
214 with self.assertRaises (crypto.InvalidGCMTag):
215 _ = decryptor.done ()
218 def test_crypto_aes_gcm_enc_multicnk (self):
220 pt = fill_mod (1 << 14)
221 password = str (os.urandom (42))
222 encryptor = crypto.Encrypt (TEST_VERSION,
225 nacl=TEST_STATIC_NACL)
226 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
230 while off < len (pt):
231 upto = min (off + cnksiz, len (pt))
232 _, cnk = encryptor.process (pt [off:upto])
235 cnk, header, fixed = encryptor.done (header_dummy)
238 assert len (pt) == len (ct)
241 def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
243 pt = fill_mod (1 << 14)
244 password = str (os.urandom (42))
245 encryptor = crypto.Encrypt (TEST_VERSION,
248 nacl=TEST_STATIC_NACL,
250 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
254 while off < len (pt):
255 upto = min (off + cnksiz, len (pt))
256 _, cnk = encryptor.process (pt [off:upto])
259 cnk, header, fixed = encryptor.done (header_dummy)
262 assert len (pt) == len (ct)
265 def test_crypto_aes_gcm_enc_multiobj (self):
267 password = str (os.urandom (42))
268 encryptor = crypto.Encrypt (TEST_VERSION,
271 nacl=TEST_STATIC_NACL,
275 pt = fill_mod (1 << 14, off=i)
276 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
280 while off < len (pt):
281 upto = min (off + cnksiz, len (pt))
282 _, cnk = encryptor.process (pt [off:upto])
285 cnk, header, fixed = encryptor.done (header_dummy)
288 assert len (pt) == len (ct)
290 for i in range (5): addobj (i)
292 assert len (encryptor.fixed) == 1
295 def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
297 password = str (os.urandom (42))
298 encryptor = crypto.Encrypt (TEST_VERSION,
301 nacl=TEST_STATIC_NACL,
303 curfixed = None # must remain constant after first
306 pt = fill_mod (1 << 14, off=i)
307 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
311 while off < len (pt):
312 upto = min (off + cnksiz, len (pt))
313 _, cnk = encryptor.process (pt [off:upto])
316 cnk, header, fixed = encryptor.done (header_dummy)
321 assert fixed == curfixed
324 assert len (pt) == len (ct)
326 for i in range (5): addobj (i)
328 assert len (encryptor.fixed) == 1
331 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
333 Test behavior when the file counter tops out.
335 Artificially lower the maximum possible file counter. Considering
336 invalid (0) and reserved (1, 2) values, the smallest possible file counter
337 for normal objects is 3. Starting from that, the header of the (max -
338 3)rd object must have both a different IV fixed part and a counter.
342 crypto._testing_set_AES_GCM_IV_CNT_MAX \
343 ("I am fully aware that this will void my warranty.", new_max)
345 password = str (os.urandom (42))
346 encryptor = crypto.Encrypt (TEST_VERSION,
349 nacl=TEST_STATIC_NACL,
355 def addobj (i, wrap=False):
358 pt = fill_mod (1 << 14, off=i)
359 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
363 while off < len (pt):
364 upto = min (off + cnksiz, len (pt))
365 _, cnk = encryptor.process (pt [off:upto])
368 cnk, header, fixed = encryptor.done (header_dummy)
369 this_iv = crypto.hdr_read (header) ["iv"]
370 if last_iv is not None:
371 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
372 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
374 assert last_fixed == this_fixed
375 assert last_cnt == this_cnt - 1
377 assert last_fixed != this_fixed
378 assert this_cnt == minimum
382 assert len (pt) == len (ct)
384 for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
385 addobj (i + 1, True) # counter wraps to 3
387 for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
388 addobj (j + 1, True) # counter wraps to 3 again
390 assert len (encryptor.fixed) == 3
393 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self):
395 Test behavior when the file counter tops out and the transition to
396 the next IV fixed part fails on account of a bad random generator.
398 Replaces the ``urandom`` reference in ``os`` with a deterministic
399 function. The encryptor context must communicate this condition with an
400 ``IVFixedPartError``.
404 crypto._testing_set_AES_GCM_IV_CNT_MAX \
405 ("I am fully aware that this will void my warranty.", new_max)
407 os.urandom = lambda n: bytes (bytearray ([n % 256] * n))
408 password = str (os.urandom (42))
409 encryptor = crypto.Encrypt (TEST_VERSION,
412 nacl=TEST_STATIC_NACL,
416 pt = fill_mod (1 << 14, off=i)
417 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
420 while off < len (pt):
421 upto = min (off + cnksiz, len (pt))
422 _, cnk = encryptor.process (pt [off:upto])
425 for i in range (minimum, new_max): addobj (42 + i)
427 with self.assertRaises (crypto.IVFixedPartError):
432 def test_crypto_aes_gcm_enc_length_cap (self):
434 Artificially lower the maximum allowable data length and attempt to
435 encrypt a larger object. Verify that the crypto handler aborts with and
439 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
440 ("I am fully aware that this will void my warranty.", new_max)
442 password = str (os.urandom (42))
443 encryptor = crypto.Encrypt (TEST_VERSION,
446 nacl=TEST_STATIC_NACL)
449 pt, ct = fill_mod (s), None
450 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
452 n, ct = encryptor.process (pt)
453 rest, _, _ = encryptor.done (header_dummy)
456 if len (pt) > new_max:
459 assert n == len (pt) == len (ct)
461 for i in range (16): encobj (1 << i)
464 def test_crypto_aes_gcm_dec_multicnk (self):
466 orig_pt = fill_mod (1 << 14)
467 password = str (os.urandom (42))
468 encryptor = crypto.Encrypt (TEST_VERSION,
471 nacl=TEST_STATIC_NACL)
472 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
476 while off < len (orig_pt):
477 upto = min (off + cnksiz, len (orig_pt))
478 _n, cnk = encryptor.process (orig_pt [off:upto])
481 cnk, header, fixed = encryptor.done (header_dummy)
484 decryptor = crypto.Decrypt (password=password,
486 decryptor.next (header)
489 while off < len (orig_pt):
490 upto = min (off + cnksiz, len (orig_pt))
491 cnk = decryptor.process (ct [off:upto])
496 pt += decryptor.done ()
500 def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
502 orig_pt = fill_mod (1 << 14)
503 password = str (os.urandom (42))
504 encryptor = crypto.Encrypt (TEST_VERSION,
507 nacl=TEST_STATIC_NACL)
508 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
512 while off < len (orig_pt):
513 upto = min (off + cnksiz, len (orig_pt))
514 _n, cnk = encryptor.process (orig_pt [off:upto])
517 cnk, header, fixed = encryptor.done (header_dummy)
520 mut_header = bytearray (header)
521 mut_header_vw = memoryview (mut_header)
522 # replace one byte in the tag part of the header
523 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
524 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
525 header = bytes (mut_header)
527 decryptor = crypto.Decrypt (password=password,
529 decryptor.next (header)
532 while off < len (orig_pt):
533 upto = min (off + cnksiz, len (orig_pt))
534 cnk = decryptor.process (ct [off:upto])
538 with self.assertRaises (crypto.InvalidGCMTag):
539 _ = decryptor.done ()
542 def test_crypto_aes_gcm_dec_iv_reuse (self):
544 Meddle with encrypted content: extract the IV from one object
545 and inject it into the header of another. This must be rejected
549 orig_pt_1 = fill_mod (1 << 10)
550 orig_pt_2 = fill_mod (1 << 10, 42)
551 password = str (os.urandom (42))
552 encryptor = crypto.Encrypt (TEST_VERSION,
555 nacl=TEST_STATIC_NACL)
558 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
562 while off < len (pt):
563 upto = min (off + cnksiz, len (pt))
564 _n, cnk = encryptor.process (pt [off:upto])
567 cnk, header, fixed = encryptor.done (header_dummy)
568 return ct + cnk, header, fixed
570 ct_1, hdr_1, _____ = enc (orig_pt_1)
571 ct_2, hdr_2, fixed = enc (orig_pt_2)
573 mut_hdr_2 = bytearray (hdr_2)
574 mut_hdr_2_vw = memoryview (mut_hdr_2)
576 iv_lo = crypto.HDR_OFF_IV
577 iv_hi = crypto.HDR_OFF_IV + crypto.PDTCRYPT_HDR_SIZE_IV
578 iv_1 = hdr_1 [iv_lo : iv_hi]
579 # transplant into other header
580 mut_hdr_2_vw [iv_lo : iv_hi] = iv_1
581 hdr_2_mod = bytes (mut_hdr_2)
582 decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
589 while off < len (ct):
590 upto = min (off + cnksiz, len (ct))
591 cnk = decryptor.process (ct [off:upto])
594 return pt + decryptor.done ()
596 decr_pt_1 = dec (hdr_1, ct_1)
597 decr_pt_2 = dec (hdr_2, ct_2) # good header, different IV
598 with self.assertRaises (crypto.DuplicateIV): # bad header, reuse detected
599 decr_pt_2 = dec (hdr_2_mod, ct_2)
602 class HeaderTest (CryptoLayerTest):
604 def test_crypto_fmt_hdr_make (self):
606 ok, hdr = crypto.hdr_make (meta)
608 assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
611 def test_crypto_fmt_hdr_make_useless (self):
612 ok, ret = crypto.hdr_make ({ 42: "x" })
614 assert ret.startswith ("error assembling header:")
617 def test_crypto_fmt_hdr_read (self):
619 ok, hdr = crypto.hdr_make (meta)
621 assert hdr is not None
622 mmeta = crypto.hdr_read (hdr)
623 assert mmeta is not None
625 if meta [k] != mmeta [k]:
626 raise "header mismatch after reading: expected %r, got %r" \
627 % (meta [k], mmeta [k])
630 def test_crypto_fmt_hdr_read_trailing_garbage (self):
632 ok, hdr = crypto.hdr_make (meta)
633 ok, hdr = crypto.hdr_make (meta)
635 assert hdr is not None
637 with self.assertRaises (crypto.InvalidHeader):
638 _ = crypto.hdr_read (hdr)
641 def test_crypto_fmt_hdr_read_leading_garbage (self):
643 ok, hdr = crypto.hdr_make (meta)
644 ok, hdr = crypto.hdr_make (meta)
646 assert hdr is not None
648 with self.assertRaises (crypto.InvalidHeader):
649 _ = crypto.hdr_read (hdr)
652 def test_crypto_fmt_hdr_inner_garbage (self):
654 ok, hdr = crypto.hdr_make (meta)
656 data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
657 with self.assertRaises (crypto.InvalidHeader):
658 _ = crypto.hdr_read (data)