bpo-32713: Fix tarfile.itn for large/negative float values. (GH-5434)
[python-delta-tar] / testing / test_crypto.py
1 import binascii
2 import os
3 import pylibscrypt
4 import struct
5 import unittest
6
7 import deltatar.crypto as crypto
8
9 import cryptography
10
11 def b(s):
12     return s.encode("UTF-8")
13
14 CRYPTO_NACL_SIZE  = 16
15 CRYPTO_KEY_SIZE   = 16
16
17 TEST_PLAINTEXT       = b("gentlemen don’t read each other’s mail")
18 TEST_PASSPHRASE      = b"test1234"
19 TEST_AES_GCM_AAD     = b"authenticated plain text"
20 TEST_DUMMY_FILENAME  = "insurance-file.txt"
21 TEST_VERSION         = 1
22 TEST_PARAMVERSION    = 1
23 TEST_STATIC_NACL     = os.urandom (CRYPTO_NACL_SIZE)
24 PLAIN_PARAMVERSION   = 0
25
26 def faux_hdr (ctsize=1337, iv=None):
27     return \
28         {      "version" : 42
29         , "paramversion" : 2187
30         ,         "nacl" : binascii.unhexlify(b"0011223344556677"
31                                               b"8899aabbccddeeff")
32         ,           "iv" : iv or binascii.unhexlify(b"0011223344556677"
33                                                     b"8899aabb")
34         ,       "ctsize" : ctsize
35         ,          "tag" : binascii.unhexlify(b"deadbeefbadb100d"
36                                               b"b1eedc0ffeedea15")
37         }
38
39 FILL_MOD_MEMO = { }
40
41 def fill_mod (n, off=0):
42     global FILL_MOD_MEMO
43     k = (n, off)
44     m = FILL_MOD_MEMO.get (k, None)
45     if m is not None:
46         return m
47     buf = bytearray (n)
48     bufv = memoryview (buf)
49     for i in range (n):
50         off += 1
51         c = off % 64 + 32
52         struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
53     m = bytes (buf)
54     FILL_MOD_MEMO [k] = m
55     return m
56
57
58 def faux_payload ():
59     return "abcd" * 42
60
61
62 class CryptoLayerTest (unittest.TestCase):
63     pass
64
65
66 class AESGCMTest (CryptoLayerTest):
67
68     os_urandom = os.urandom
69
70     def tearDown (self):
71         """Reset globals altered for testing."""
72         _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
73                   ("I am fully aware that this will void my warranty.")
74         _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
75                   ("I am fully aware that this will void my warranty.")
76         os.urandom = self.os_urandom
77
78     def test_crypto_aes_gcm_enc_ctor (self):
79         password   = str (os.urandom (42))
80         encryptor  = crypto.Encrypt (TEST_VERSION,
81                                      TEST_PARAMVERSION,
82                                      password=password,
83                                      nacl=TEST_STATIC_NACL)
84
85     def test_crypto_aes_gcm_enc_ctor_bad_plainparams (self):
86         """Refuse plaintext passthrough mode by default."""
87         password   = str (os.urandom (42))
88         with self.assertRaises (crypto.InvalidParameter):
89             encryptor  = crypto.Encrypt (TEST_VERSION,
90                                          PLAIN_PARAMVERSION,
91                                          password=password,
92                                          nacl=TEST_STATIC_NACL)
93
94
95     def test_crypto_aes_gcm_enc_ctor_ok_insecure_plainparams (self):
96         """
97         Comply with request for plaintext passthrough mode if the
98         *insecure* flag is passed.
99         """
100         password   = str (os.urandom (42))
101         encryptor  = crypto.Encrypt (TEST_VERSION,
102                                         PLAIN_PARAMVERSION,
103                                         password=password,
104                                         nacl=TEST_STATIC_NACL,
105                                         insecure=True)
106
107
108     def test_crypto_aes_gcm_enc_ctor_key (self):
109         key        = os.urandom (42)
110         encryptor  = crypto.Encrypt (TEST_VERSION,
111                                      TEST_PARAMVERSION,
112                                      key=key,
113                                      nacl=TEST_STATIC_NACL)
114
115
116     def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
117         """
118         Either key (+nacl) or password must be supplied, not both.
119         """
120         with self.assertRaises (crypto.InvalidParameter):       # neither key nor pw
121             encryptor = crypto.Encrypt (TEST_VERSION,
122                                         TEST_PARAMVERSION,
123                                         nacl=TEST_STATIC_NACL)
124
125         password = str (os.urandom (42))
126         key      =      os.urandom (16)  # scrypt sized
127         with self.assertRaises (crypto.InvalidParameter):       # both key and pw
128             encryptor = crypto.Encrypt (TEST_VERSION,
129                                         TEST_PARAMVERSION,
130                                         password=password,
131                                         key=key,
132                                         nacl=TEST_STATIC_NACL)
133
134         with self.assertRaises (crypto.InvalidParameter):       # key, but salt missing
135             encryptor = crypto.Encrypt (TEST_VERSION,
136                                         TEST_PARAMVERSION,
137                                         key=key,
138                                         nacl=None)
139
140         with self.assertRaises (crypto.InvalidParameter):       # empty pw
141             encryptor = crypto.Encrypt (TEST_VERSION,
142                                         TEST_PARAMVERSION,
143                                         password=b"",
144                                         nacl=TEST_STATIC_NACL)
145
146
147     def test_crypto_aes_gcm_enc_header_size (self):
148         password       = str (os.urandom (42))
149         encryptor      = crypto.Encrypt (TEST_VERSION,
150                                          TEST_PARAMVERSION,
151                                          password=password,
152                                          nacl=TEST_STATIC_NACL)
153
154         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
155         assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
156         _, _           = encryptor.process (TEST_PLAINTEXT)
157         _, header, _   = encryptor.done (header_dummy)
158         assert len (header) == crypto.PDTCRYPT_HDR_SIZE
159
160
161     def test_crypto_aes_gcm_enc_multi_ivs_ok_explicit_counter (self):
162         """
163         Access the list of IVs used during encryption and check reuse. Start
164         from explicit counters so the inputs don’t overlap. IVs must not have
165         been reused.
166         """
167
168         def enc (start_count, data):
169             password       = str (os.urandom (42))
170             encryptor      = crypto.Encrypt (TEST_VERSION,
171                                              TEST_PARAMVERSION,
172                                              password=password,
173                                              nacl=TEST_STATIC_NACL,
174                                              counter=start_count,
175                                              strict_ivs=True)
176
177             for i, blob in enumerate (data, 1):
178                 fname = "%s_%d" % (TEST_DUMMY_FILENAME, i)
179                 header_dummy   = encryptor.next (fname)
180                 _, _           = encryptor.process (blob)
181                 _, header, _   = encryptor.done (header_dummy)
182                 assert len (encryptor.get_used_ivs ()) == i
183
184             return encryptor.get_used_ivs ()
185
186         ivs1 = enc (0x0042, [TEST_PLAINTEXT, b"none of your business"])
187         ivs2 = enc (0x1337, [b"read me if you can", b"for British eyes only!"])
188
189         # No reuse in general.
190         assert len (ivs1 & ivs2) == 0
191
192         ivs1 = list (ivs1)
193         ivs2 = list (ivs2)
194
195         # Counters of used IVs must match what we passed explicitly.
196         def extract_counters (ivs):
197             def getcount (iv):
198                 _, cnt = struct.unpack (crypto.FMT_I2N_IV, iv)
199                 return cnt
200             return list (map (getcount, ivs))
201
202         cnt1 = extract_counters (ivs1)
203         cnt2 = extract_counters (ivs2)
204
205         assert 0x0042 in cnt1
206         assert 0x0043 in cnt1
207
208         assert 0x1337 in cnt2
209         assert 0x1338 in cnt2
210
211
212     def test_crypto_aes_gcm_enc_chunk_size (self):
213         password       = str (os.urandom (42))
214         encryptor      = crypto.Encrypt (TEST_VERSION,
215                                          TEST_PARAMVERSION,
216                                          password=password,
217                                          nacl=TEST_STATIC_NACL)
218
219         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
220         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
221         assert len (ciphertext) == len (TEST_PLAINTEXT)
222         rest, header, fixed = encryptor.done (header_dummy)
223         assert len (rest) == 0
224
225
226     def test_crypto_aes_gcm_dec_ctor (self):
227         """
228         Ensure that only either key or password is accepted.
229         """
230         password = str (os.urandom (42))
231         key      =      os.urandom (16)  # scrypt sized
232
233         decryptor = crypto.Decrypt (password=password)
234         decryptor = crypto.Decrypt (key=key)
235
236         with self.assertRaises (crypto.InvalidParameter):       # both password and key
237             decryptor = crypto.Decrypt (password=password, key=key)
238
239         with self.assertRaises (crypto.InvalidParameter):       # neither password nor key
240             decryptor = crypto.Decrypt (password=None, key=None)
241
242         with self.assertRaises (crypto.InvalidParameter):       # # empty password
243             decryptor = crypto.Decrypt (password="")
244
245
246     def test_crypto_aes_gcm_dec_simple (self):
247         password       = str (os.urandom (42))
248         encryptor      = crypto.Encrypt (TEST_VERSION,
249                                          TEST_PARAMVERSION,
250                                          password=password,
251                                          nacl=TEST_STATIC_NACL)
252
253         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
254         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
255         rest, header, fixed = encryptor.done (header_dummy)
256         ciphertext    += rest
257
258         decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
259         decryptor.next (header)
260         plaintext      = decryptor.process (ciphertext)
261         rest           = decryptor.done ()
262         plaintext     += rest
263
264         assert plaintext == TEST_PLAINTEXT
265
266
267     def test_crypto_aes_gcm_dec_plain_bad (self):
268         """
269         Downgrade to plaintext must not be allowed in parameters
270         obtained from headers.
271         """
272         password       = str (os.urandom (42))
273         encryptor      = crypto.Encrypt (TEST_VERSION,
274                                          TEST_PARAMVERSION,
275                                          password=password,
276                                          nacl=TEST_STATIC_NACL)
277
278         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
279         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
280         rest, header, fixed = encryptor.done (header_dummy)
281         ciphertext    += rest
282
283         header         = crypto.hdr_read (header)
284         header ["paramversion"] = PLAIN_PARAMVERSION
285         ok, header     = crypto.hdr_make (header)
286         assert ok
287
288         decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
289         with self.assertRaises (crypto.InvalidParameter):
290             decryptor.next (header)
291
292
293     def test_crypto_aes_gcm_dec_plain_ok_insecure (self):
294         """
295         Allow plaintext crypto mode if *insecure* flag is passed.
296         """
297         password       = str (os.urandom (42))
298         encryptor      = crypto.Encrypt (TEST_VERSION,
299                                          PLAIN_PARAMVERSION,
300                                          password=password,
301                                          nacl=TEST_STATIC_NACL,
302                                          insecure=True)
303
304         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
305         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
306         rest, header, fixed = encryptor.done (header_dummy)
307         ciphertext    += rest
308
309         header         = crypto.hdr_read (header)
310         header ["paramversion"] = PLAIN_PARAMVERSION
311         ok, header     = crypto.hdr_make (header)
312         assert ok
313
314         decryptor      = crypto.Decrypt (password=password,
315                                          fixedparts=fixed,
316                                          insecure=True)
317         decryptor.next (header)
318         plaintext      = decryptor.process (ciphertext)
319         rest           = decryptor.done ()
320         plaintext     += rest
321
322         assert plaintext == TEST_PLAINTEXT
323
324
325     def test_crypto_aes_gcm_dec_bad_tag (self):
326         password       = str (os.urandom (42))
327         encryptor      = crypto.Encrypt (TEST_VERSION,
328                                          TEST_PARAMVERSION,
329                                          password=password,
330                                          nacl=TEST_STATIC_NACL)
331
332         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
333         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
334         ciphertext2, header, fixed = encryptor.done (header_dummy)
335
336         mut_header     = bytearray (header)
337         mut_header_vw  = memoryview (mut_header)
338         # replace one byte in the tag part of the header
339         second_byte    = mut_header_vw [crypto.HDR_OFF_TAG + 2]
340         mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
341         header         = bytes (mut_header)
342
343         decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
344         decryptor.next (header)
345         plaintext      = decryptor.process (ciphertext)
346         with self.assertRaises (crypto.InvalidGCMTag):
347             _ = decryptor.done ()
348
349
350     def test_crypto_aes_gcm_enc_multicnk (self):
351         cnksiz = 1 << 10
352         pt    = fill_mod (1 << 14)
353         password       = str (os.urandom (42))
354         encryptor      = crypto.Encrypt (TEST_VERSION,
355                                          TEST_PARAMVERSION,
356                                          password=password,
357                                          nacl=TEST_STATIC_NACL)
358         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
359
360         off = 0
361         ct = b""
362         while off < len (pt):
363             upto = min (off + cnksiz, len (pt))
364             _, cnk = encryptor.process (pt [off:upto])
365             ct += cnk
366             off += cnksiz
367         cnk, header, fixed = encryptor.done (header_dummy)
368         ct += cnk
369
370         assert len (pt) == len (ct)
371
372
373     def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
374         cnksiz = 1 << 10
375         pt    = fill_mod (1 << 14)
376         password       = str (os.urandom (42))
377         encryptor      = crypto.Encrypt (TEST_VERSION,
378                                          TEST_PARAMVERSION,
379                                          password=password,
380                                          nacl=TEST_STATIC_NACL,
381                                          strict_ivs=True)
382         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
383
384         off = 0
385         ct = b""
386         while off < len (pt):
387             upto = min (off + cnksiz, len (pt))
388             _, cnk = encryptor.process (pt [off:upto])
389             ct += cnk
390             off += cnksiz
391         cnk, header, fixed = encryptor.done (header_dummy)
392         ct += cnk
393
394         assert len (pt) == len (ct)
395
396
397     def test_crypto_aes_gcm_enc_multiobj (self):
398         cnksiz    = 1 << 10
399         password  = str (os.urandom (42))
400         encryptor = crypto.Encrypt (TEST_VERSION,
401                                     TEST_PARAMVERSION,
402                                     password=password,
403                                     nacl=TEST_STATIC_NACL,
404                                     strict_ivs=False)
405
406         def addobj (i):
407             pt           = fill_mod (1 << 14, off=i)
408             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
409
410             off = 0
411             ct = b""
412             while off < len (pt):
413                 upto = min (off + cnksiz, len (pt))
414                 _, cnk = encryptor.process (pt [off:upto])
415                 ct += cnk
416                 off += cnksiz
417             cnk, header, fixed = encryptor.done (header_dummy)
418             ct += cnk
419
420             assert len (pt) == len (ct)
421
422         for i in range (5): addobj (i)
423
424         assert len (encryptor.fixed) == 1
425
426
427     def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
428         cnksiz    = 1 << 10
429         password  = str (os.urandom (42))
430         encryptor = crypto.Encrypt (TEST_VERSION,
431                                     TEST_PARAMVERSION,
432                                     password=password,
433                                     nacl=TEST_STATIC_NACL,
434                                     strict_ivs=True)
435         curfixed  = None # must remain constant after first
436
437         def addobj (i):
438             pt           = fill_mod (1 << 14, off=i)
439             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
440
441             off = 0
442             ct = b""
443             while off < len (pt):
444                 upto = min (off + cnksiz, len (pt))
445                 _, cnk = encryptor.process (pt [off:upto])
446                 ct += cnk
447                 off += cnksiz
448             cnk, header, fixed = encryptor.done (header_dummy)
449             nonlocal curfixed
450             if curfixed is None:
451                 curfixed = fixed
452             else:
453                 assert fixed == curfixed
454             ct += cnk
455
456             assert len (pt) == len (ct)
457
458         for i in range (5): addobj (i)
459
460         assert len (encryptor.fixed) == 1
461
462
463     def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
464         """
465         Test behavior when the file counter tops out.
466
467         Artificially lower the maximum possible file counter. Considering
468         invalid (0) and reserved (1, 2) values, the smallest possible file counter
469         for normal objects is 3. Starting from that, the header of the (max -
470         3)rd object must have both a different IV fixed part and a counter.
471         """
472         minimum = 3
473         new_max = 8
474         crypto._testing_set_AES_GCM_IV_CNT_MAX \
475                 ("I am fully aware that this will void my warranty.", new_max)
476         cnksiz    = 1 << 10
477         password  = str (os.urandom (42))
478         encryptor = crypto.Encrypt (TEST_VERSION,
479                                     TEST_PARAMVERSION,
480                                     password=password,
481                                     nacl=TEST_STATIC_NACL,
482                                     strict_ivs=True)
483
484         last_iv  = None
485         last_cnt = minimum
486
487         def addobj (i, wrap=False):
488             nonlocal last_iv
489             nonlocal last_cnt
490             pt           = fill_mod (1 << 14, off=i)
491             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
492
493             off = 0
494             ct = b""
495             while off < len (pt):
496                 upto = min (off + cnksiz, len (pt))
497                 _, cnk = encryptor.process (pt [off:upto])
498                 ct += cnk
499                 off += cnksiz
500             cnk, header, fixed = encryptor.done (header_dummy)
501             this_iv = crypto.hdr_read (header) ["iv"]
502             if last_iv is not None:
503                 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
504                 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
505                 if wrap is False:
506                     assert last_fixed == this_fixed
507                     assert last_cnt   == this_cnt - 1
508                 else:
509                     assert last_fixed != this_fixed
510                     assert this_cnt   == minimum
511             last_iv = this_iv
512             ct += cnk
513
514             assert len (pt) == len (ct)
515
516         for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
517         addobj (i + 1, True) # counter wraps to 3
518
519         for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
520         addobj (j + 1, True) # counter wraps to 3 again
521
522         assert len (encryptor.fixed) == 3
523
524
525     def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self):
526         """
527         Test behavior when the file counter tops out and the transition to
528         the next IV fixed part fails on account of a bad random generator.
529
530         Replaces the ``urandom`` reference in ``os`` with a deterministic
531         function. The encryptor context must communicate this condition with an
532         ``IVFixedPartError``.
533         """
534         minimum = 3
535         new_max = 8
536         crypto._testing_set_AES_GCM_IV_CNT_MAX \
537                 ("I am fully aware that this will void my warranty.", new_max)
538         cnksiz    = 1 << 10
539         os.urandom = lambda n: bytes (bytearray ([n % 256] * n))
540         password  = str (os.urandom (42))
541         encryptor = crypto.Encrypt (TEST_VERSION,
542                                     TEST_PARAMVERSION,
543                                     password=password,
544                                     nacl=TEST_STATIC_NACL,
545                                     strict_ivs=True)
546
547         def addobj (i):
548             pt = fill_mod (1 << 14, off=i)
549             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
550
551             off = 0
552             while off < len (pt):
553                 upto = min (off + cnksiz, len (pt))
554                 _, cnk = encryptor.process (pt [off:upto])
555                 off += cnksiz
556
557         for i in range (minimum, new_max): addobj (42 + i)
558
559         with self.assertRaises (crypto.IVFixedPartError):
560             addobj (42 + i)
561
562
563
564     def test_crypto_aes_gcm_enc_length_cap (self):
565         """
566         Artificially lower the maximum allowable data length and attempt to
567         encrypt a larger object. Verify that the crypto handler only encrypts
568         data up to the size limit. A downstream user detects that condition by
569         testing whether the encryption step yielded less bytes than the
570         plaintext.
571
572         The sibling to this test is test_restore_backup_max_file_length()
573         in test_delatar.py. Deltatar will transparently create a splitted object
574         with an increased IV file counter.
575         """
576         new_max = 2187
577         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
578                 ("I am fully aware that this will void my warranty.", new_max)
579         cnksiz    = 1 << 10
580         password  = str (os.urandom (42))
581         encryptor = crypto.Encrypt (TEST_VERSION,
582                                     TEST_PARAMVERSION,
583                                     password=password,
584                                     nacl=TEST_STATIC_NACL)
585
586         def encobj (s):
587             pt, ct       = fill_mod (s), None
588             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
589
590             n, ct = encryptor.process (pt)
591             rest, _, _ = encryptor.done (header_dummy)
592
593             # NB: If this check *ever* fails, then something changed in the
594             #     encoding layer. AES-GCM is a stream cipher so each encoding
595             #     step will yield the exact number of ciphertext bytes that
596             #     was provided as plaintext. Thus there cannot be any encoded
597             #     data left when calling the finalizers. None of the crypo code
598             #     depends on that assumption but nevertheless we check it here
599             #     in case anything changes upstream in the Cryptography
600             #     library. In case there actually is a rest, replace the
601             #     assertion below with ``ct += rest``.
602             assert (len (rest) == 0)
603
604             if len (pt) > new_max:
605                 # If the plaintext was longer than the artificially lowered
606                 # maximum, then the number of ciphertext bytes must be clamped
607                 # to the maximum.
608                 assert n == new_max
609             else:
610                 assert n == len (pt) == len (ct)
611
612         for i in range (16): encobj (1 << i)
613
614
615     def test_crypto_aes_gcm_dec_length_cap (self):
616         """
617         The decryptor must reject headers with an object size that exceeds
618         the PDTCRYPT maximum. Longer files split into multiple objects.
619         """
620         password        = str (os.urandom (42))
621         meta            = faux_hdr()
622         meta ["ctsize"] = crypto.PDTCRYPT_MAX_OBJ_SIZE + 1
623         ok, header      = crypto.hdr_make (meta)
624
625         assert ok
626
627         # Set up decryption with bogus header.
628         decryptor = crypto.Decrypt (password=password, fixedparts=[])
629
630         with self.assertRaises (crypto.InvalidHeader):
631             decryptor.next (header)
632
633
634     def test_crypto_aes_gcm_dec_length_mismatch (self):
635         """
636         Catch attempts at decrypting more data than what was stated in the
637         header.
638         """
639         cnksiz         = 1 << 10
640         orig_pt        = fill_mod (1 << 14)
641         password       = str (os.urandom (42))
642         encryptor      = crypto.Encrypt (TEST_VERSION,
643                                          TEST_PARAMVERSION,
644                                          password=password,
645                                          nacl=TEST_STATIC_NACL)
646         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
647
648         off = 0
649         ct = b""
650         while off < len (orig_pt):
651             upto = min (off + cnksiz, len (orig_pt))
652             _n, cnk = encryptor.process (orig_pt [off:upto])
653             ct += cnk
654             off += cnksiz
655         cnk, header, fixed = encryptor.done (header_dummy)
656         ct += cnk
657
658         decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
659
660         decryptor.next (header)
661         off = 0
662         pt  = b""
663         while off < len (orig_pt):
664             upto = min (off + cnksiz, len (orig_pt))
665             cnk  = decryptor.process (ct [off:upto])
666             pt += cnk
667             off += cnksiz
668
669         with self.assertRaises (crypto.CiphertextTooLong):
670             # Try and decrypt one byte more than was encrypted.
671             # This must be caught in crypto.py.
672             _ = decryptor.process (ct [0:1])
673
674
675     def test_crypto_aes_gcm_dec_multicnk (self):
676         cnksiz         = 1 << 10
677         orig_pt        = fill_mod (1 << 14)
678         password       = str (os.urandom (42))
679         encryptor      = crypto.Encrypt (TEST_VERSION,
680                                          TEST_PARAMVERSION,
681                                          password=password,
682                                          nacl=TEST_STATIC_NACL)
683         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
684
685         off = 0
686         ct = b""
687         while off < len (orig_pt):
688             upto = min (off + cnksiz, len (orig_pt))
689             _n, cnk = encryptor.process (orig_pt [off:upto])
690             ct += cnk
691             off += cnksiz
692         cnk, header, fixed = encryptor.done (header_dummy)
693         ct += cnk
694
695         decryptor      = crypto.Decrypt (password=password,
696                                          fixedparts=fixed)
697         decryptor.next (header)
698         off = 0
699         pt  = b""
700         while off < len (orig_pt):
701             upto = min (off + cnksiz, len (orig_pt))
702             cnk  = decryptor.process (ct [off:upto])
703             pt += cnk
704             off += cnksiz
705
706
707         pt += decryptor.done ()
708         assert pt == orig_pt
709
710
711     def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
712         cnksiz         = 1 << 10
713         orig_pt        = fill_mod (1 << 14)
714         password       = str (os.urandom (42))
715         encryptor      = crypto.Encrypt (TEST_VERSION,
716                                          TEST_PARAMVERSION,
717                                          password=password,
718                                          nacl=TEST_STATIC_NACL)
719         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
720
721         off = 0
722         ct = b""
723         while off < len (orig_pt):
724             upto = min (off + cnksiz, len (orig_pt))
725             _n, cnk = encryptor.process (orig_pt [off:upto])
726             ct += cnk
727             off += cnksiz
728         cnk, header, fixed = encryptor.done (header_dummy)
729         ct += cnk
730
731         mut_header     = bytearray (header)
732         mut_header_vw  = memoryview (mut_header)
733         # replace one byte in the tag part of the header
734         second_byte    = mut_header_vw [crypto.HDR_OFF_TAG + 2]
735         mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
736         header         = bytes (mut_header)
737
738         decryptor      = crypto.Decrypt (password=password,
739                                          fixedparts=fixed)
740         decryptor.next (header)
741         off = 0
742         pt  = b""
743         while off < len (orig_pt):
744             upto = min (off + cnksiz, len (orig_pt))
745             cnk = decryptor.process (ct [off:upto])
746             pt += cnk
747             off += cnksiz
748
749         with self.assertRaises (crypto.InvalidGCMTag):
750             _ = decryptor.done ()
751
752
753     def test_crypto_aes_gcm_dec_iv_gap (self):
754         """
755         Encrypt multiple objects using non-consecutive IVs and verify that the
756         decryptor errors out with an exception in strict mode but keeps quiet
757         otherwise.
758         """
759         cnksiz         = 1 << 10
760         orig_pt_1      = fill_mod (1 << 10)
761         orig_pt_2      = fill_mod (1 << 10, 23)
762         orig_pt_3      = fill_mod (1 << 10, 42)
763         password       = str (os.urandom (42))
764         encryptor      = crypto.Encrypt (TEST_VERSION,
765                                          TEST_PARAMVERSION,
766                                          password=password,
767                                          nacl=TEST_STATIC_NACL)
768
769         def enc (pt):
770             header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
771
772             off = 0
773             ct = b""
774             while off < len (pt):
775                 upto = min (off + cnksiz, len (pt))
776                 _n, cnk = encryptor.process (pt [off:upto])
777                 ct += cnk
778                 off += cnksiz
779             cnk, header, fixed = encryptor.done (header_dummy)
780             return ct + cnk, header, fixed
781
782         ct_1, hdr_1, _____ = enc (orig_pt_1)
783
784         ## Here we bump the iv of the encryptor, breaking the series.
785         encryptor.set_object_counter (encryptor.cnt + 1)
786         ct_2, hdr_2, fixed = enc (orig_pt_2)
787
788         ## IV of final object is again in-sequence.
789         ct_3, hdr_3, fixed = enc (orig_pt_3)
790
791         def decrypt (strict_ivs):
792             decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
793                                         strict_ivs=strict_ivs)
794
795             def dec (hdr, ct):
796                 decryptor.next (hdr)
797                 off = 0
798                 pt  = b""
799                 while off < len (ct):
800                     upto = min (off + cnksiz, len (ct))
801                     cnk = decryptor.process (ct [off:upto])
802                     pt += cnk
803                     off += cnksiz
804                 return pt + decryptor.done ()
805
806             decr_pt_1 = dec (hdr_1, ct_1)
807             decr_pt_2 = dec (hdr_2, ct_2) ## ← good header, non-consecutive IV
808             decr_pt_3 = dec (hdr_3, ct_3)
809
810             assert decr_pt_1 == orig_pt_1
811             assert decr_pt_2 == orig_pt_2
812             assert decr_pt_3 == orig_pt_3
813
814         with self.assertRaises (crypto.NonConsecutiveIV):
815             decrypt (True)
816
817         decrypt (False) # Sequence passes
818
819
820     def test_crypto_aes_gcm_dec_iv_reuse (self):
821         """
822         Meddle with encrypted content: extract the IV from one object
823         and inject it into the header of another. This must be rejected
824         by the decryptor with paranoid IV checking enabled.
825         """
826         cnksiz         = 1 << 10
827         orig_pt_1      = fill_mod (1 << 10)
828         orig_pt_2      = fill_mod (1 << 10, 42)
829         password       = str (os.urandom (42))
830         encryptor      = crypto.Encrypt (TEST_VERSION,
831                                          TEST_PARAMVERSION,
832                                          password=password,
833                                          nacl=TEST_STATIC_NACL,
834                                          strict_ivs=True)
835
836         def enc (pt):
837             header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
838
839             off = 0
840             ct = b""
841             while off < len (pt):
842                 upto = min (off + cnksiz, len (pt))
843                 _n, cnk = encryptor.process (pt [off:upto])
844                 ct += cnk
845                 off += cnksiz
846             cnk, header, fixed = encryptor.done (header_dummy)
847             return ct + cnk, header, fixed
848
849         ct_1, hdr_1, _____ = enc (orig_pt_1)
850         encryptor.cnt -= 1 # induce error by forcing an identical IV on next step
851
852         with self.assertRaises (crypto.DuplicateIV): # reuse detected
853             ct_2, hdr_2, fixed = enc (orig_pt_2)
854
855
856 class HeaderTest (CryptoLayerTest):
857
858     def test_crypto_fmt_hdr_make (self):
859         meta = faux_hdr()
860         ok, hdr = crypto.hdr_make (meta)
861         assert ok
862         assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
863
864
865     def test_crypto_fmt_hdr_make_useless (self):
866         ok, ret = crypto.hdr_make ({ 42: "x" })
867         assert ok is False
868         assert ret.startswith ("error assembling header:")
869
870
871     def test_crypto_fmt_hdr_read (self):
872         meta = faux_hdr()
873         ok, hdr = crypto.hdr_make (meta)
874         assert ok is True
875         assert hdr is not None
876         mmeta = crypto.hdr_read (hdr)
877         assert mmeta is not None
878         for k in meta:
879             if meta [k] != mmeta [k]:
880                 raise "header mismatch after reading: expected %r, got %r" \
881                       % (meta [k], mmeta [k])
882
883
884     def test_crypto_fmt_hdr_read_trailing_garbage (self):
885         meta = faux_hdr()
886         ok, hdr = crypto.hdr_make (meta)
887         ok, hdr = crypto.hdr_make (meta)
888         assert ok is True
889         assert hdr is not None
890         hdr += b"-junk"
891         with self.assertRaises (crypto.InvalidHeader):
892             _ = crypto.hdr_read (hdr)
893
894
895     def test_crypto_fmt_hdr_read_leading_garbage (self):
896         meta = faux_hdr()
897         ok, hdr = crypto.hdr_make (meta)
898         ok, hdr = crypto.hdr_make (meta)
899         assert ok is True
900         assert hdr is not None
901         hdr = b"junk-" + hdr
902         with self.assertRaises (crypto.InvalidHeader):
903             _ = crypto.hdr_read (hdr)
904
905
906     def test_crypto_fmt_hdr_inner_garbage (self):
907         meta = faux_hdr()
908         ok, hdr = crypto.hdr_make (meta)
909         assert ok
910         data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
911         with self.assertRaises (crypto.InvalidHeader):
912             _ = crypto.hdr_read (data)
913