bpo-32713: Fix tarfile.itn for large/negative float values. (GH-5434)
[python-delta-tar] / testing / test_crypto.py
... / ...
CommitLineData
1import binascii
2import os
3import pylibscrypt
4import struct
5import unittest
6
7import deltatar.crypto as crypto
8
9import cryptography
10
11def b(s):
12 return s.encode("UTF-8")
13
14CRYPTO_NACL_SIZE = 16
15CRYPTO_KEY_SIZE = 16
16
17TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail")
18TEST_PASSPHRASE = b"test1234"
19TEST_AES_GCM_AAD = b"authenticated plain text"
20TEST_DUMMY_FILENAME = "insurance-file.txt"
21TEST_VERSION = 1
22TEST_PARAMVERSION = 1
23TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE)
24PLAIN_PARAMVERSION = 0
25
26def faux_hdr (ctsize=1337, iv=None):
27 return \
28 { "version" : 42
29 , "paramversion" : 2187
30 , "nacl" : binascii.unhexlify(b"0011223344556677"
31 b"8899aabbccddeeff")
32 , "iv" : iv or binascii.unhexlify(b"0011223344556677"
33 b"8899aabb")
34 , "ctsize" : ctsize
35 , "tag" : binascii.unhexlify(b"deadbeefbadb100d"
36 b"b1eedc0ffeedea15")
37 }
38
39FILL_MOD_MEMO = { }
40
41def fill_mod (n, off=0):
42 global FILL_MOD_MEMO
43 k = (n, off)
44 m = FILL_MOD_MEMO.get (k, None)
45 if m is not None:
46 return m
47 buf = bytearray (n)
48 bufv = memoryview (buf)
49 for i in range (n):
50 off += 1
51 c = off % 64 + 32
52 struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
53 m = bytes (buf)
54 FILL_MOD_MEMO [k] = m
55 return m
56
57
58def faux_payload ():
59 return "abcd" * 42
60
61
62class CryptoLayerTest (unittest.TestCase):
63 pass
64
65
66class AESGCMTest (CryptoLayerTest):
67
68 os_urandom = os.urandom
69
70 def tearDown (self):
71 """Reset globals altered for testing."""
72 _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
73 ("I am fully aware that this will void my warranty.")
74 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
75 ("I am fully aware that this will void my warranty.")
76 os.urandom = self.os_urandom
77
78 def test_crypto_aes_gcm_enc_ctor (self):
79 password = str (os.urandom (42))
80 encryptor = crypto.Encrypt (TEST_VERSION,
81 TEST_PARAMVERSION,
82 password=password,
83 nacl=TEST_STATIC_NACL)
84
85 def test_crypto_aes_gcm_enc_ctor_bad_plainparams (self):
86 """Refuse plaintext passthrough mode by default."""
87 password = str (os.urandom (42))
88 with self.assertRaises (crypto.InvalidParameter):
89 encryptor = crypto.Encrypt (TEST_VERSION,
90 PLAIN_PARAMVERSION,
91 password=password,
92 nacl=TEST_STATIC_NACL)
93
94
95 def test_crypto_aes_gcm_enc_ctor_ok_insecure_plainparams (self):
96 """
97 Comply with request for plaintext passthrough mode if the
98 *insecure* flag is passed.
99 """
100 password = str (os.urandom (42))
101 encryptor = crypto.Encrypt (TEST_VERSION,
102 PLAIN_PARAMVERSION,
103 password=password,
104 nacl=TEST_STATIC_NACL,
105 insecure=True)
106
107
108 def test_crypto_aes_gcm_enc_ctor_key (self):
109 key = os.urandom (42)
110 encryptor = crypto.Encrypt (TEST_VERSION,
111 TEST_PARAMVERSION,
112 key=key,
113 nacl=TEST_STATIC_NACL)
114
115
116 def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
117 """
118 Either key (+nacl) or password must be supplied, not both.
119 """
120 with self.assertRaises (crypto.InvalidParameter): # neither key nor pw
121 encryptor = crypto.Encrypt (TEST_VERSION,
122 TEST_PARAMVERSION,
123 nacl=TEST_STATIC_NACL)
124
125 password = str (os.urandom (42))
126 key = os.urandom (16) # scrypt sized
127 with self.assertRaises (crypto.InvalidParameter): # both key and pw
128 encryptor = crypto.Encrypt (TEST_VERSION,
129 TEST_PARAMVERSION,
130 password=password,
131 key=key,
132 nacl=TEST_STATIC_NACL)
133
134 with self.assertRaises (crypto.InvalidParameter): # key, but salt missing
135 encryptor = crypto.Encrypt (TEST_VERSION,
136 TEST_PARAMVERSION,
137 key=key,
138 nacl=None)
139
140 with self.assertRaises (crypto.InvalidParameter): # empty pw
141 encryptor = crypto.Encrypt (TEST_VERSION,
142 TEST_PARAMVERSION,
143 password=b"",
144 nacl=TEST_STATIC_NACL)
145
146
147 def test_crypto_aes_gcm_enc_header_size (self):
148 password = str (os.urandom (42))
149 encryptor = crypto.Encrypt (TEST_VERSION,
150 TEST_PARAMVERSION,
151 password=password,
152 nacl=TEST_STATIC_NACL)
153
154 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
155 assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
156 _, _ = encryptor.process (TEST_PLAINTEXT)
157 _, header, _ = encryptor.done (header_dummy)
158 assert len (header) == crypto.PDTCRYPT_HDR_SIZE
159
160
161 def test_crypto_aes_gcm_enc_multi_ivs_ok_explicit_counter (self):
162 """
163 Access the list of IVs used during encryption and check reuse. Start
164 from explicit counters so the inputs don’t overlap. IVs must not have
165 been reused.
166 """
167
168 def enc (start_count, data):
169 password = str (os.urandom (42))
170 encryptor = crypto.Encrypt (TEST_VERSION,
171 TEST_PARAMVERSION,
172 password=password,
173 nacl=TEST_STATIC_NACL,
174 counter=start_count,
175 strict_ivs=True)
176
177 for i, blob in enumerate (data, 1):
178 fname = "%s_%d" % (TEST_DUMMY_FILENAME, i)
179 header_dummy = encryptor.next (fname)
180 _, _ = encryptor.process (blob)
181 _, header, _ = encryptor.done (header_dummy)
182 assert len (encryptor.get_used_ivs ()) == i
183
184 return encryptor.get_used_ivs ()
185
186 ivs1 = enc (0x0042, [TEST_PLAINTEXT, b"none of your business"])
187 ivs2 = enc (0x1337, [b"read me if you can", b"for British eyes only!"])
188
189 # No reuse in general.
190 assert len (ivs1 & ivs2) == 0
191
192 ivs1 = list (ivs1)
193 ivs2 = list (ivs2)
194
195 # Counters of used IVs must match what we passed explicitly.
196 def extract_counters (ivs):
197 def getcount (iv):
198 _, cnt = struct.unpack (crypto.FMT_I2N_IV, iv)
199 return cnt
200 return list (map (getcount, ivs))
201
202 cnt1 = extract_counters (ivs1)
203 cnt2 = extract_counters (ivs2)
204
205 assert 0x0042 in cnt1
206 assert 0x0043 in cnt1
207
208 assert 0x1337 in cnt2
209 assert 0x1338 in cnt2
210
211
212 def test_crypto_aes_gcm_enc_chunk_size (self):
213 password = str (os.urandom (42))
214 encryptor = crypto.Encrypt (TEST_VERSION,
215 TEST_PARAMVERSION,
216 password=password,
217 nacl=TEST_STATIC_NACL)
218
219 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
220 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
221 assert len (ciphertext) == len (TEST_PLAINTEXT)
222 rest, header, fixed = encryptor.done (header_dummy)
223 assert len (rest) == 0
224
225
226 def test_crypto_aes_gcm_dec_ctor (self):
227 """
228 Ensure that only either key or password is accepted.
229 """
230 password = str (os.urandom (42))
231 key = os.urandom (16) # scrypt sized
232
233 decryptor = crypto.Decrypt (password=password)
234 decryptor = crypto.Decrypt (key=key)
235
236 with self.assertRaises (crypto.InvalidParameter): # both password and key
237 decryptor = crypto.Decrypt (password=password, key=key)
238
239 with self.assertRaises (crypto.InvalidParameter): # neither password nor key
240 decryptor = crypto.Decrypt (password=None, key=None)
241
242 with self.assertRaises (crypto.InvalidParameter): # # empty password
243 decryptor = crypto.Decrypt (password="")
244
245
246 def test_crypto_aes_gcm_dec_simple (self):
247 password = str (os.urandom (42))
248 encryptor = crypto.Encrypt (TEST_VERSION,
249 TEST_PARAMVERSION,
250 password=password,
251 nacl=TEST_STATIC_NACL)
252
253 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
254 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
255 rest, header, fixed = encryptor.done (header_dummy)
256 ciphertext += rest
257
258 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
259 decryptor.next (header)
260 plaintext = decryptor.process (ciphertext)
261 rest = decryptor.done ()
262 plaintext += rest
263
264 assert plaintext == TEST_PLAINTEXT
265
266
267 def test_crypto_aes_gcm_dec_plain_bad (self):
268 """
269 Downgrade to plaintext must not be allowed in parameters
270 obtained from headers.
271 """
272 password = str (os.urandom (42))
273 encryptor = crypto.Encrypt (TEST_VERSION,
274 TEST_PARAMVERSION,
275 password=password,
276 nacl=TEST_STATIC_NACL)
277
278 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
279 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
280 rest, header, fixed = encryptor.done (header_dummy)
281 ciphertext += rest
282
283 header = crypto.hdr_read (header)
284 header ["paramversion"] = PLAIN_PARAMVERSION
285 ok, header = crypto.hdr_make (header)
286 assert ok
287
288 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
289 with self.assertRaises (crypto.InvalidParameter):
290 decryptor.next (header)
291
292
293 def test_crypto_aes_gcm_dec_plain_ok_insecure (self):
294 """
295 Allow plaintext crypto mode if *insecure* flag is passed.
296 """
297 password = str (os.urandom (42))
298 encryptor = crypto.Encrypt (TEST_VERSION,
299 PLAIN_PARAMVERSION,
300 password=password,
301 nacl=TEST_STATIC_NACL,
302 insecure=True)
303
304 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
305 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
306 rest, header, fixed = encryptor.done (header_dummy)
307 ciphertext += rest
308
309 header = crypto.hdr_read (header)
310 header ["paramversion"] = PLAIN_PARAMVERSION
311 ok, header = crypto.hdr_make (header)
312 assert ok
313
314 decryptor = crypto.Decrypt (password=password,
315 fixedparts=fixed,
316 insecure=True)
317 decryptor.next (header)
318 plaintext = decryptor.process (ciphertext)
319 rest = decryptor.done ()
320 plaintext += rest
321
322 assert plaintext == TEST_PLAINTEXT
323
324
325 def test_crypto_aes_gcm_dec_bad_tag (self):
326 password = str (os.urandom (42))
327 encryptor = crypto.Encrypt (TEST_VERSION,
328 TEST_PARAMVERSION,
329 password=password,
330 nacl=TEST_STATIC_NACL)
331
332 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
333 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
334 ciphertext2, header, fixed = encryptor.done (header_dummy)
335
336 mut_header = bytearray (header)
337 mut_header_vw = memoryview (mut_header)
338 # replace one byte in the tag part of the header
339 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
340 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
341 header = bytes (mut_header)
342
343 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
344 decryptor.next (header)
345 plaintext = decryptor.process (ciphertext)
346 with self.assertRaises (crypto.InvalidGCMTag):
347 _ = decryptor.done ()
348
349
350 def test_crypto_aes_gcm_enc_multicnk (self):
351 cnksiz = 1 << 10
352 pt = fill_mod (1 << 14)
353 password = str (os.urandom (42))
354 encryptor = crypto.Encrypt (TEST_VERSION,
355 TEST_PARAMVERSION,
356 password=password,
357 nacl=TEST_STATIC_NACL)
358 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
359
360 off = 0
361 ct = b""
362 while off < len (pt):
363 upto = min (off + cnksiz, len (pt))
364 _, cnk = encryptor.process (pt [off:upto])
365 ct += cnk
366 off += cnksiz
367 cnk, header, fixed = encryptor.done (header_dummy)
368 ct += cnk
369
370 assert len (pt) == len (ct)
371
372
373 def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
374 cnksiz = 1 << 10
375 pt = fill_mod (1 << 14)
376 password = str (os.urandom (42))
377 encryptor = crypto.Encrypt (TEST_VERSION,
378 TEST_PARAMVERSION,
379 password=password,
380 nacl=TEST_STATIC_NACL,
381 strict_ivs=True)
382 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
383
384 off = 0
385 ct = b""
386 while off < len (pt):
387 upto = min (off + cnksiz, len (pt))
388 _, cnk = encryptor.process (pt [off:upto])
389 ct += cnk
390 off += cnksiz
391 cnk, header, fixed = encryptor.done (header_dummy)
392 ct += cnk
393
394 assert len (pt) == len (ct)
395
396
397 def test_crypto_aes_gcm_enc_multiobj (self):
398 cnksiz = 1 << 10
399 password = str (os.urandom (42))
400 encryptor = crypto.Encrypt (TEST_VERSION,
401 TEST_PARAMVERSION,
402 password=password,
403 nacl=TEST_STATIC_NACL,
404 strict_ivs=False)
405
406 def addobj (i):
407 pt = fill_mod (1 << 14, off=i)
408 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
409
410 off = 0
411 ct = b""
412 while off < len (pt):
413 upto = min (off + cnksiz, len (pt))
414 _, cnk = encryptor.process (pt [off:upto])
415 ct += cnk
416 off += cnksiz
417 cnk, header, fixed = encryptor.done (header_dummy)
418 ct += cnk
419
420 assert len (pt) == len (ct)
421
422 for i in range (5): addobj (i)
423
424 assert len (encryptor.fixed) == 1
425
426
427 def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
428 cnksiz = 1 << 10
429 password = str (os.urandom (42))
430 encryptor = crypto.Encrypt (TEST_VERSION,
431 TEST_PARAMVERSION,
432 password=password,
433 nacl=TEST_STATIC_NACL,
434 strict_ivs=True)
435 curfixed = None # must remain constant after first
436
437 def addobj (i):
438 pt = fill_mod (1 << 14, off=i)
439 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
440
441 off = 0
442 ct = b""
443 while off < len (pt):
444 upto = min (off + cnksiz, len (pt))
445 _, cnk = encryptor.process (pt [off:upto])
446 ct += cnk
447 off += cnksiz
448 cnk, header, fixed = encryptor.done (header_dummy)
449 nonlocal curfixed
450 if curfixed is None:
451 curfixed = fixed
452 else:
453 assert fixed == curfixed
454 ct += cnk
455
456 assert len (pt) == len (ct)
457
458 for i in range (5): addobj (i)
459
460 assert len (encryptor.fixed) == 1
461
462
463 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
464 """
465 Test behavior when the file counter tops out.
466
467 Artificially lower the maximum possible file counter. Considering
468 invalid (0) and reserved (1, 2) values, the smallest possible file counter
469 for normal objects is 3. Starting from that, the header of the (max -
470 3)rd object must have both a different IV fixed part and a counter.
471 """
472 minimum = 3
473 new_max = 8
474 crypto._testing_set_AES_GCM_IV_CNT_MAX \
475 ("I am fully aware that this will void my warranty.", new_max)
476 cnksiz = 1 << 10
477 password = str (os.urandom (42))
478 encryptor = crypto.Encrypt (TEST_VERSION,
479 TEST_PARAMVERSION,
480 password=password,
481 nacl=TEST_STATIC_NACL,
482 strict_ivs=True)
483
484 last_iv = None
485 last_cnt = minimum
486
487 def addobj (i, wrap=False):
488 nonlocal last_iv
489 nonlocal last_cnt
490 pt = fill_mod (1 << 14, off=i)
491 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
492
493 off = 0
494 ct = b""
495 while off < len (pt):
496 upto = min (off + cnksiz, len (pt))
497 _, cnk = encryptor.process (pt [off:upto])
498 ct += cnk
499 off += cnksiz
500 cnk, header, fixed = encryptor.done (header_dummy)
501 this_iv = crypto.hdr_read (header) ["iv"]
502 if last_iv is not None:
503 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
504 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
505 if wrap is False:
506 assert last_fixed == this_fixed
507 assert last_cnt == this_cnt - 1
508 else:
509 assert last_fixed != this_fixed
510 assert this_cnt == minimum
511 last_iv = this_iv
512 ct += cnk
513
514 assert len (pt) == len (ct)
515
516 for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
517 addobj (i + 1, True) # counter wraps to 3
518
519 for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
520 addobj (j + 1, True) # counter wraps to 3 again
521
522 assert len (encryptor.fixed) == 3
523
524
525 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self):
526 """
527 Test behavior when the file counter tops out and the transition to
528 the next IV fixed part fails on account of a bad random generator.
529
530 Replaces the ``urandom`` reference in ``os`` with a deterministic
531 function. The encryptor context must communicate this condition with an
532 ``IVFixedPartError``.
533 """
534 minimum = 3
535 new_max = 8
536 crypto._testing_set_AES_GCM_IV_CNT_MAX \
537 ("I am fully aware that this will void my warranty.", new_max)
538 cnksiz = 1 << 10
539 os.urandom = lambda n: bytes (bytearray ([n % 256] * n))
540 password = str (os.urandom (42))
541 encryptor = crypto.Encrypt (TEST_VERSION,
542 TEST_PARAMVERSION,
543 password=password,
544 nacl=TEST_STATIC_NACL,
545 strict_ivs=True)
546
547 def addobj (i):
548 pt = fill_mod (1 << 14, off=i)
549 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
550
551 off = 0
552 while off < len (pt):
553 upto = min (off + cnksiz, len (pt))
554 _, cnk = encryptor.process (pt [off:upto])
555 off += cnksiz
556
557 for i in range (minimum, new_max): addobj (42 + i)
558
559 with self.assertRaises (crypto.IVFixedPartError):
560 addobj (42 + i)
561
562
563
564 def test_crypto_aes_gcm_enc_length_cap (self):
565 """
566 Artificially lower the maximum allowable data length and attempt to
567 encrypt a larger object. Verify that the crypto handler only encrypts
568 data up to the size limit. A downstream user detects that condition by
569 testing whether the encryption step yielded less bytes than the
570 plaintext.
571
572 The sibling to this test is test_restore_backup_max_file_length()
573 in test_delatar.py. Deltatar will transparently create a splitted object
574 with an increased IV file counter.
575 """
576 new_max = 2187
577 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
578 ("I am fully aware that this will void my warranty.", new_max)
579 cnksiz = 1 << 10
580 password = str (os.urandom (42))
581 encryptor = crypto.Encrypt (TEST_VERSION,
582 TEST_PARAMVERSION,
583 password=password,
584 nacl=TEST_STATIC_NACL)
585
586 def encobj (s):
587 pt, ct = fill_mod (s), None
588 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
589
590 n, ct = encryptor.process (pt)
591 rest, _, _ = encryptor.done (header_dummy)
592
593 # NB: If this check *ever* fails, then something changed in the
594 # encoding layer. AES-GCM is a stream cipher so each encoding
595 # step will yield the exact number of ciphertext bytes that
596 # was provided as plaintext. Thus there cannot be any encoded
597 # data left when calling the finalizers. None of the crypo code
598 # depends on that assumption but nevertheless we check it here
599 # in case anything changes upstream in the Cryptography
600 # library. In case there actually is a rest, replace the
601 # assertion below with ``ct += rest``.
602 assert (len (rest) == 0)
603
604 if len (pt) > new_max:
605 # If the plaintext was longer than the artificially lowered
606 # maximum, then the number of ciphertext bytes must be clamped
607 # to the maximum.
608 assert n == new_max
609 else:
610 assert n == len (pt) == len (ct)
611
612 for i in range (16): encobj (1 << i)
613
614
615 def test_crypto_aes_gcm_dec_length_cap (self):
616 """
617 The decryptor must reject headers with an object size that exceeds
618 the PDTCRYPT maximum. Longer files split into multiple objects.
619 """
620 password = str (os.urandom (42))
621 meta = faux_hdr()
622 meta ["ctsize"] = crypto.PDTCRYPT_MAX_OBJ_SIZE + 1
623 ok, header = crypto.hdr_make (meta)
624
625 assert ok
626
627 # Set up decryption with bogus header.
628 decryptor = crypto.Decrypt (password=password, fixedparts=[])
629
630 with self.assertRaises (crypto.InvalidHeader):
631 decryptor.next (header)
632
633
634 def test_crypto_aes_gcm_dec_length_mismatch (self):
635 """
636 Catch attempts at decrypting more data than what was stated in the
637 header.
638 """
639 cnksiz = 1 << 10
640 orig_pt = fill_mod (1 << 14)
641 password = str (os.urandom (42))
642 encryptor = crypto.Encrypt (TEST_VERSION,
643 TEST_PARAMVERSION,
644 password=password,
645 nacl=TEST_STATIC_NACL)
646 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
647
648 off = 0
649 ct = b""
650 while off < len (orig_pt):
651 upto = min (off + cnksiz, len (orig_pt))
652 _n, cnk = encryptor.process (orig_pt [off:upto])
653 ct += cnk
654 off += cnksiz
655 cnk, header, fixed = encryptor.done (header_dummy)
656 ct += cnk
657
658 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
659
660 decryptor.next (header)
661 off = 0
662 pt = b""
663 while off < len (orig_pt):
664 upto = min (off + cnksiz, len (orig_pt))
665 cnk = decryptor.process (ct [off:upto])
666 pt += cnk
667 off += cnksiz
668
669 with self.assertRaises (crypto.CiphertextTooLong):
670 # Try and decrypt one byte more than was encrypted.
671 # This must be caught in crypto.py.
672 _ = decryptor.process (ct [0:1])
673
674
675 def test_crypto_aes_gcm_dec_multicnk (self):
676 cnksiz = 1 << 10
677 orig_pt = fill_mod (1 << 14)
678 password = str (os.urandom (42))
679 encryptor = crypto.Encrypt (TEST_VERSION,
680 TEST_PARAMVERSION,
681 password=password,
682 nacl=TEST_STATIC_NACL)
683 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
684
685 off = 0
686 ct = b""
687 while off < len (orig_pt):
688 upto = min (off + cnksiz, len (orig_pt))
689 _n, cnk = encryptor.process (orig_pt [off:upto])
690 ct += cnk
691 off += cnksiz
692 cnk, header, fixed = encryptor.done (header_dummy)
693 ct += cnk
694
695 decryptor = crypto.Decrypt (password=password,
696 fixedparts=fixed)
697 decryptor.next (header)
698 off = 0
699 pt = b""
700 while off < len (orig_pt):
701 upto = min (off + cnksiz, len (orig_pt))
702 cnk = decryptor.process (ct [off:upto])
703 pt += cnk
704 off += cnksiz
705
706
707 pt += decryptor.done ()
708 assert pt == orig_pt
709
710
711 def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
712 cnksiz = 1 << 10
713 orig_pt = fill_mod (1 << 14)
714 password = str (os.urandom (42))
715 encryptor = crypto.Encrypt (TEST_VERSION,
716 TEST_PARAMVERSION,
717 password=password,
718 nacl=TEST_STATIC_NACL)
719 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
720
721 off = 0
722 ct = b""
723 while off < len (orig_pt):
724 upto = min (off + cnksiz, len (orig_pt))
725 _n, cnk = encryptor.process (orig_pt [off:upto])
726 ct += cnk
727 off += cnksiz
728 cnk, header, fixed = encryptor.done (header_dummy)
729 ct += cnk
730
731 mut_header = bytearray (header)
732 mut_header_vw = memoryview (mut_header)
733 # replace one byte in the tag part of the header
734 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
735 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
736 header = bytes (mut_header)
737
738 decryptor = crypto.Decrypt (password=password,
739 fixedparts=fixed)
740 decryptor.next (header)
741 off = 0
742 pt = b""
743 while off < len (orig_pt):
744 upto = min (off + cnksiz, len (orig_pt))
745 cnk = decryptor.process (ct [off:upto])
746 pt += cnk
747 off += cnksiz
748
749 with self.assertRaises (crypto.InvalidGCMTag):
750 _ = decryptor.done ()
751
752
753 def test_crypto_aes_gcm_dec_iv_gap (self):
754 """
755 Encrypt multiple objects using non-consecutive IVs and verify that the
756 decryptor errors out with an exception in strict mode but keeps quiet
757 otherwise.
758 """
759 cnksiz = 1 << 10
760 orig_pt_1 = fill_mod (1 << 10)
761 orig_pt_2 = fill_mod (1 << 10, 23)
762 orig_pt_3 = fill_mod (1 << 10, 42)
763 password = str (os.urandom (42))
764 encryptor = crypto.Encrypt (TEST_VERSION,
765 TEST_PARAMVERSION,
766 password=password,
767 nacl=TEST_STATIC_NACL)
768
769 def enc (pt):
770 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
771
772 off = 0
773 ct = b""
774 while off < len (pt):
775 upto = min (off + cnksiz, len (pt))
776 _n, cnk = encryptor.process (pt [off:upto])
777 ct += cnk
778 off += cnksiz
779 cnk, header, fixed = encryptor.done (header_dummy)
780 return ct + cnk, header, fixed
781
782 ct_1, hdr_1, _____ = enc (orig_pt_1)
783
784 ## Here we bump the iv of the encryptor, breaking the series.
785 encryptor.set_object_counter (encryptor.cnt + 1)
786 ct_2, hdr_2, fixed = enc (orig_pt_2)
787
788 ## IV of final object is again in-sequence.
789 ct_3, hdr_3, fixed = enc (orig_pt_3)
790
791 def decrypt (strict_ivs):
792 decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
793 strict_ivs=strict_ivs)
794
795 def dec (hdr, ct):
796 decryptor.next (hdr)
797 off = 0
798 pt = b""
799 while off < len (ct):
800 upto = min (off + cnksiz, len (ct))
801 cnk = decryptor.process (ct [off:upto])
802 pt += cnk
803 off += cnksiz
804 return pt + decryptor.done ()
805
806 decr_pt_1 = dec (hdr_1, ct_1)
807 decr_pt_2 = dec (hdr_2, ct_2) ## ← good header, non-consecutive IV
808 decr_pt_3 = dec (hdr_3, ct_3)
809
810 assert decr_pt_1 == orig_pt_1
811 assert decr_pt_2 == orig_pt_2
812 assert decr_pt_3 == orig_pt_3
813
814 with self.assertRaises (crypto.NonConsecutiveIV):
815 decrypt (True)
816
817 decrypt (False) # Sequence passes
818
819
820 def test_crypto_aes_gcm_dec_iv_reuse (self):
821 """
822 Meddle with encrypted content: extract the IV from one object
823 and inject it into the header of another. This must be rejected
824 by the decryptor with paranoid IV checking enabled.
825 """
826 cnksiz = 1 << 10
827 orig_pt_1 = fill_mod (1 << 10)
828 orig_pt_2 = fill_mod (1 << 10, 42)
829 password = str (os.urandom (42))
830 encryptor = crypto.Encrypt (TEST_VERSION,
831 TEST_PARAMVERSION,
832 password=password,
833 nacl=TEST_STATIC_NACL,
834 strict_ivs=True)
835
836 def enc (pt):
837 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
838
839 off = 0
840 ct = b""
841 while off < len (pt):
842 upto = min (off + cnksiz, len (pt))
843 _n, cnk = encryptor.process (pt [off:upto])
844 ct += cnk
845 off += cnksiz
846 cnk, header, fixed = encryptor.done (header_dummy)
847 return ct + cnk, header, fixed
848
849 ct_1, hdr_1, _____ = enc (orig_pt_1)
850 encryptor.cnt -= 1 # induce error by forcing an identical IV on next step
851
852 with self.assertRaises (crypto.DuplicateIV): # reuse detected
853 ct_2, hdr_2, fixed = enc (orig_pt_2)
854
855
856class HeaderTest (CryptoLayerTest):
857
858 def test_crypto_fmt_hdr_make (self):
859 meta = faux_hdr()
860 ok, hdr = crypto.hdr_make (meta)
861 assert ok
862 assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
863
864
865 def test_crypto_fmt_hdr_make_useless (self):
866 ok, ret = crypto.hdr_make ({ 42: "x" })
867 assert ok is False
868 assert ret.startswith ("error assembling header:")
869
870
871 def test_crypto_fmt_hdr_read (self):
872 meta = faux_hdr()
873 ok, hdr = crypto.hdr_make (meta)
874 assert ok is True
875 assert hdr is not None
876 mmeta = crypto.hdr_read (hdr)
877 assert mmeta is not None
878 for k in meta:
879 if meta [k] != mmeta [k]:
880 raise "header mismatch after reading: expected %r, got %r" \
881 % (meta [k], mmeta [k])
882
883
884 def test_crypto_fmt_hdr_read_trailing_garbage (self):
885 meta = faux_hdr()
886 ok, hdr = crypto.hdr_make (meta)
887 ok, hdr = crypto.hdr_make (meta)
888 assert ok is True
889 assert hdr is not None
890 hdr += b"-junk"
891 with self.assertRaises (crypto.InvalidHeader):
892 _ = crypto.hdr_read (hdr)
893
894
895 def test_crypto_fmt_hdr_read_leading_garbage (self):
896 meta = faux_hdr()
897 ok, hdr = crypto.hdr_make (meta)
898 ok, hdr = crypto.hdr_make (meta)
899 assert ok is True
900 assert hdr is not None
901 hdr = b"junk-" + hdr
902 with self.assertRaises (crypto.InvalidHeader):
903 _ = crypto.hdr_read (hdr)
904
905
906 def test_crypto_fmt_hdr_inner_garbage (self):
907 meta = faux_hdr()
908 ok, hdr = crypto.hdr_make (meta)
909 assert ok
910 data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
911 with self.assertRaises (crypto.InvalidHeader):
912 _ = crypto.hdr_read (data)
913