7 import deltatar.crypto as crypto
12 return s.encode("UTF-8")
17 TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail")
18 TEST_PASSPHRASE = b"test1234"
19 TEST_AES_GCM_AAD = b"authenticated plain text"
20 TEST_DUMMY_FILENAME = "insurance-file.txt"
23 TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE)
25 def faux_hdr (ctsize=1337, iv=None):
28 , "paramversion" : 2187
29 , "nacl" : binascii.unhexlify(b"0011223344556677"
31 , "iv" : iv or binascii.unhexlify(b"0011223344556677"
34 , "tag" : binascii.unhexlify(b"deadbeefbadb100d"
40 def fill_mod (n, off=0):
43 m = FILL_MOD_MEMO.get (k, None)
47 bufv = memoryview (buf)
51 struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
61 class CryptoLayerTest (unittest.TestCase):
65 class AESGCMTest (CryptoLayerTest):
68 """Reset globals altered for testing."""
69 _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
70 ("I am fully aware that this will void my warranty.")
71 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
72 ("I am fully aware that this will void my warranty.")
74 def test_crypto_aes_gcm_enc_ctor (self):
75 password = str (os.urandom (42))
76 encryptor = crypto.Encrypt (TEST_VERSION,
79 nacl=TEST_STATIC_NACL)
82 def test_crypto_aes_gcm_enc_ctor_key (self):
84 encryptor = crypto.Encrypt (TEST_VERSION,
87 nacl=TEST_STATIC_NACL)
90 def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
92 Either key (+nacl) or password must be supplied, not both.
95 encryptor = crypto.Encrypt (TEST_VERSION,
97 nacl=TEST_STATIC_NACL)
98 except crypto.InvalidParameter: # neither key nor pw
101 password = str (os.urandom (42))
102 key = os.urandom (16) # scrypt sized
104 encryptor = crypto.Encrypt (TEST_VERSION,
108 nacl=TEST_STATIC_NACL)
109 except crypto.InvalidParameter: # both key and pw
113 encryptor = crypto.Encrypt (TEST_VERSION,
117 except crypto.InvalidParameter: # key, but salt missing
121 encryptor = crypto.Encrypt (TEST_VERSION,
124 nacl=TEST_STATIC_NACL)
125 except crypto.InvalidParameter: # empty pw
129 def test_crypto_aes_gcm_enc_header_size (self):
130 password = str (os.urandom (42))
131 encryptor = crypto.Encrypt (TEST_VERSION,
134 nacl=TEST_STATIC_NACL)
136 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
137 assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
138 _, _ = encryptor.process (TEST_PLAINTEXT)
139 _, header, _ = encryptor.done (header_dummy)
140 assert len (header) == crypto.PDTCRYPT_HDR_SIZE
143 def test_crypto_aes_gcm_enc_chunk_size (self):
144 password = str (os.urandom (42))
145 encryptor = crypto.Encrypt (TEST_VERSION,
148 nacl=TEST_STATIC_NACL)
150 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
151 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
152 assert len (ciphertext) == len (TEST_PLAINTEXT)
153 rest, header, fixed = encryptor.done (header_dummy)
154 assert len (rest) == 0
157 def test_crypto_aes_gcm_dec_ctor (self):
159 Ensure that only either key or password is accepted.
161 password = str (os.urandom (42))
162 key = os.urandom (16) # scrypt sized
164 decryptor = crypto.Decrypt (password=password)
165 decryptor = crypto.Decrypt (key=key)
168 decryptor = crypto.Decrypt (password=password, key=key)
169 except crypto.InvalidParameter: # both password and key
173 decryptor = crypto.Decrypt (password=None, key=None)
174 except crypto.InvalidParameter: # neither password nor key
178 decryptor = crypto.Decrypt (password="")
179 except crypto.InvalidParameter: # empty password
183 def test_crypto_aes_gcm_dec_simple (self):
184 password = str (os.urandom (42))
185 encryptor = crypto.Encrypt (TEST_VERSION,
188 nacl=TEST_STATIC_NACL)
190 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
191 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
192 rest, header, fixed = encryptor.done (header_dummy)
195 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
196 decryptor.next (header)
197 plaintext = decryptor.process (ciphertext)
198 rest = decryptor.done ()
201 assert plaintext == TEST_PLAINTEXT
204 def test_crypto_aes_gcm_dec_bad_tag (self):
205 password = str (os.urandom (42))
206 encryptor = crypto.Encrypt (TEST_VERSION,
209 nacl=TEST_STATIC_NACL)
211 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
212 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
213 ciphertext2, header, fixed = encryptor.done (header_dummy)
215 mut_header = bytearray (header)
216 mut_header_vw = memoryview (mut_header)
217 # replace one byte in the tag part of the header
218 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
219 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
220 header = bytes (mut_header)
222 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
223 decryptor.next (header)
224 plaintext = decryptor.process (ciphertext)
226 _ = decryptor.done ()
227 except crypto.InvalidGCMTag:
231 def test_crypto_aes_gcm_enc_multicnk (self):
233 pt = fill_mod (1 << 14)
234 password = str (os.urandom (42))
235 encryptor = crypto.Encrypt (TEST_VERSION,
238 nacl=TEST_STATIC_NACL)
239 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
243 while off < len (pt):
244 upto = min (off + cnksiz, len (pt))
245 _, cnk = encryptor.process (pt [off:upto])
248 cnk, header, fixed = encryptor.done (header_dummy)
251 assert len (pt) == len (ct)
254 def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
256 pt = fill_mod (1 << 14)
257 password = str (os.urandom (42))
258 encryptor = crypto.Encrypt (TEST_VERSION,
261 nacl=TEST_STATIC_NACL,
263 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
267 while off < len (pt):
268 upto = min (off + cnksiz, len (pt))
269 _, cnk = encryptor.process (pt [off:upto])
272 cnk, header, fixed = encryptor.done (header_dummy)
275 assert len (pt) == len (ct)
278 def test_crypto_aes_gcm_enc_multiobj (self):
280 password = str (os.urandom (42))
281 encryptor = crypto.Encrypt (TEST_VERSION,
284 nacl=TEST_STATIC_NACL,
288 pt = fill_mod (1 << 14, off=i)
289 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
293 while off < len (pt):
294 upto = min (off + cnksiz, len (pt))
295 _, cnk = encryptor.process (pt [off:upto])
298 cnk, header, fixed = encryptor.done (header_dummy)
301 assert len (pt) == len (ct)
303 for i in range (5): addobj (i)
306 def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
308 password = str (os.urandom (42))
309 encryptor = crypto.Encrypt (TEST_VERSION,
312 nacl=TEST_STATIC_NACL,
316 pt = fill_mod (1 << 14, off=i)
317 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
321 while off < len (pt):
322 upto = min (off + cnksiz, len (pt))
323 _, cnk = encryptor.process (pt [off:upto])
326 cnk, header, fixed = encryptor.done (header_dummy)
329 assert len (pt) == len (ct)
331 for i in range (5): addobj (i)
334 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
336 Test behavior when the file counter tops out.
338 Artificially lower the maximum possible file counter. Considering
339 invalid (0) and reserved (1, 2) values, the smallest possible file counter
340 for normal objects is 3. Starting from that, the header of the (max -
341 3)rd object must have both a different IV fixed part and a counter.
345 crypto._testing_set_AES_GCM_IV_CNT_MAX \
346 ("I am fully aware that this will void my warranty.", new_max)
348 password = str (os.urandom (42))
349 encryptor = crypto.Encrypt (TEST_VERSION,
352 nacl=TEST_STATIC_NACL,
358 def addobj (i, wrap=False):
361 pt = fill_mod (1 << 14, off=i)
362 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
366 while off < len (pt):
367 upto = min (off + cnksiz, len (pt))
368 _, cnk = encryptor.process (pt [off:upto])
371 cnk, header, fixed = encryptor.done (header_dummy)
372 this_iv = crypto.hdr_read (header) ["iv"]
373 if last_iv is not None:
374 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
375 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
377 assert last_fixed == this_fixed
378 assert last_cnt == this_cnt - 1
380 assert last_fixed != this_fixed
381 assert this_cnt == minimum
385 assert len (pt) == len (ct)
387 for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
388 addobj (i + 1, True) # counter wraps to 3
390 for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
391 addobj (j + 1, True) # counter wraps to 3 again
394 def test_crypto_aes_gcm_enc_length_cap (self):
396 Artificially lower the maximum allowable data length and attempt to
397 encrypt a larger object. Verify that the crypto handler aborts with and
401 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
402 ("I am fully aware that this will void my warranty.", new_max)
404 password = str (os.urandom (42))
405 encryptor = crypto.Encrypt (TEST_VERSION,
408 nacl=TEST_STATIC_NACL)
411 pt, ct = fill_mod (s), None
412 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
414 n, ct = encryptor.process (pt)
415 rest, _, _ = encryptor.done (header_dummy)
418 if len (pt) > new_max:
421 assert n == len (pt) == len (ct)
423 for i in range (16): encobj (1 << i)
426 def test_crypto_aes_gcm_dec_multicnk (self):
428 orig_pt = fill_mod (1 << 14)
429 password = str (os.urandom (42))
430 encryptor = crypto.Encrypt (TEST_VERSION,
433 nacl=TEST_STATIC_NACL)
434 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
438 while off < len (orig_pt):
439 upto = min (off + cnksiz, len (orig_pt))
440 _n, cnk = encryptor.process (orig_pt [off:upto])
443 cnk, header, fixed = encryptor.done (header_dummy)
446 decryptor = crypto.Decrypt (password=password,
448 decryptor.next (header)
451 while off < len (orig_pt):
452 upto = min (off + cnksiz, len (orig_pt))
453 cnk = decryptor.process (ct [off:upto])
458 pt += decryptor.done ()
462 def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
464 orig_pt = fill_mod (1 << 14)
465 password = str (os.urandom (42))
466 encryptor = crypto.Encrypt (TEST_VERSION,
469 nacl=TEST_STATIC_NACL)
470 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
474 while off < len (orig_pt):
475 upto = min (off + cnksiz, len (orig_pt))
476 _n, cnk = encryptor.process (orig_pt [off:upto])
479 cnk, header, fixed = encryptor.done (header_dummy)
482 mut_header = bytearray (header)
483 mut_header_vw = memoryview (mut_header)
484 # replace one byte in the tag part of the header
485 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
486 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
487 header = bytes (mut_header)
489 decryptor = crypto.Decrypt (password=password,
491 decryptor.next (header)
494 while off < len (orig_pt):
495 upto = min (off + cnksiz, len (orig_pt))
496 cnk = decryptor.process (ct [off:upto])
501 _ = decryptor.done ()
502 except crypto.InvalidGCMTag:
506 def test_crypto_aes_gcm_dec_iv_reuse (self):
508 Meddle with encrypted content: extract the IV from one object
509 and inject it into the header of another. This must be rejected
513 orig_pt_1 = fill_mod (1 << 10)
514 orig_pt_2 = fill_mod (1 << 10, 42)
515 password = str (os.urandom (42))
516 encryptor = crypto.Encrypt (TEST_VERSION,
519 nacl=TEST_STATIC_NACL)
522 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
526 while off < len (pt):
527 upto = min (off + cnksiz, len (pt))
528 _n, cnk = encryptor.process (pt [off:upto])
531 cnk, header, fixed = encryptor.done (header_dummy)
532 return ct + cnk, header, fixed
534 ct_1, hdr_1, _____ = enc (orig_pt_1)
535 ct_2, hdr_2, fixed = enc (orig_pt_2)
537 mut_hdr_2 = bytearray (hdr_2)
538 mut_hdr_2_vw = memoryview (mut_hdr_2)
540 iv_lo = crypto.HDR_OFF_IV
541 iv_hi = crypto.HDR_OFF_IV + crypto.PDTCRYPT_HDR_SIZE_IV
542 iv_1 = hdr_1 [iv_lo : iv_hi]
543 # transplant into other header
544 mut_hdr_2_vw [iv_lo : iv_hi] = iv_1
545 hdr_2_mod = bytes (mut_hdr_2)
546 decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
553 while off < len (ct):
554 upto = min (off + cnksiz, len (ct))
555 cnk = decryptor.process (ct [off:upto])
558 return pt + decryptor.done ()
560 decr_pt_1 = dec (hdr_1, ct_1)
561 decr_pt_2 = dec (hdr_2, ct_2) # good header, different IV
563 decr_pt_2 = dec (hdr_2_mod, ct_2)
564 except crypto.DuplicateIV: # bad header, reuse detected
568 class HeaderTest (CryptoLayerTest):
570 def test_crypto_fmt_hdr_make (self):
572 ok, hdr = crypto.hdr_make (meta)
574 assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
577 def test_crypto_fmt_hdr_make_useless (self):
578 ok, ret = crypto.hdr_make ({ 42: "x" })
580 assert ret.startswith ("error writing header:")
583 def test_crypto_fmt_hdr_read (self):
585 ok, hdr = crypto.hdr_make (meta)
587 assert hdr is not None
588 mmeta = crypto.hdr_read (hdr)
589 assert mmeta is not None
591 if meta [k] != mmeta [k]:
592 raise "header mismatch after reading: expected %r, got %r" \
593 % (meta [k], mmeta [k])
596 def test_crypto_fmt_hdr_read_trailing_garbage (self):
598 ok, hdr = crypto.hdr_make (meta)
599 ok, hdr = crypto.hdr_make (meta)
601 assert hdr is not None
604 _ = crypto.hdr_read (hdr)
605 except crypto.InvalidHeader:
609 def test_crypto_fmt_hdr_read_leading_garbage (self):
611 ok, hdr = crypto.hdr_make (meta)
612 ok, hdr = crypto.hdr_make (meta)
614 assert hdr is not None
617 _ = crypto.hdr_read (hdr)
618 except crypto.InvalidHeader:
622 def test_crypto_fmt_hdr_inner_garbage (self):
624 ok, hdr = crypto.hdr_make (meta)
626 data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
628 _ = crypto.hdr_read (data)
629 except crypto.InvalidHeader: