clarify possible IV reuse with multiple Encrypt handles
[python-delta-tar] / testing / test_crypto.py
1 import binascii
2 import os
3 import pylibscrypt
4 import struct
5 import unittest
6
7 import deltatar.crypto as crypto
8
9 import cryptography
10
11 def b(s):
12     return s.encode("UTF-8")
13
14 CRYPTO_NACL_SIZE  = 16
15 CRYPTO_KEY_SIZE   = 16
16
17 TEST_PLAINTEXT       = b("gentlemen don’t read each other’s mail")
18 TEST_PASSPHRASE      = b"test1234"
19 TEST_AES_GCM_AAD     = b"authenticated plain text"
20 TEST_DUMMY_FILENAME  = "insurance-file.txt"
21 TEST_VERSION         = 1
22 TEST_PARAMVERSION    = 1
23 TEST_STATIC_NACL     = os.urandom (CRYPTO_NACL_SIZE)
24 PLAIN_PARAMVERSION   = 0
25
26 def faux_hdr (ctsize=1337, iv=None):
27     return \
28         {      "version" : 42
29         , "paramversion" : 2187
30         ,         "nacl" : binascii.unhexlify(b"0011223344556677"
31                                               b"8899aabbccddeeff")
32         ,           "iv" : iv or binascii.unhexlify(b"0011223344556677"
33                                                     b"8899aabb")
34         ,       "ctsize" : ctsize
35         ,          "tag" : binascii.unhexlify(b"deadbeefbadb100d"
36                                               b"b1eedc0ffeedea15")
37         }
38
39 FILL_MOD_MEMO = { }
40
41 def fill_mod (n, off=0):
42     global FILL_MOD_MEMO
43     k = (n, off)
44     m = FILL_MOD_MEMO.get (k, None)
45     if m is not None:
46         return m
47     buf = bytearray (n)
48     bufv = memoryview (buf)
49     for i in range (n):
50         off += 1
51         c = off % 64 + 32
52         struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
53     m = bytes (buf)
54     FILL_MOD_MEMO [k] = m
55     return m
56
57
58 def faux_payload ():
59     return "abcd" * 42
60
61
62 class CryptoLayerTest (unittest.TestCase):
63     pass
64
65
66 class AESGCMTest (CryptoLayerTest):
67
68     os_urandom = os.urandom
69
70     def tearDown (self):
71         """Reset globals altered for testing."""
72         _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
73                   ("I am fully aware that this will void my warranty.")
74         _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
75                   ("I am fully aware that this will void my warranty.")
76         os.urandom = self.os_urandom
77
78     def test_crypto_aes_gcm_enc_ctor (self):
79         password   = str (os.urandom (42))
80         encryptor  = crypto.Encrypt (TEST_VERSION,
81                                      TEST_PARAMVERSION,
82                                      password=password,
83                                      nacl=TEST_STATIC_NACL)
84
85     def test_crypto_aes_gcm_enc_ctor_bad_plainparams (self):
86         """Refuse plaintext passthrough mode by default."""
87         password   = str (os.urandom (42))
88         with self.assertRaises (crypto.InvalidParameter):
89             encryptor  = crypto.Encrypt (TEST_VERSION,
90                                          PLAIN_PARAMVERSION,
91                                          password=password,
92                                          nacl=TEST_STATIC_NACL)
93
94
95     def test_crypto_aes_gcm_enc_ctor_ok_insecure_plainparams (self):
96         """
97         Comply with request for plaintext passthrough mode if the
98         *insecure* flag is passed.
99         """
100         password   = str (os.urandom (42))
101         encryptor  = crypto.Encrypt (TEST_VERSION,
102                                         PLAIN_PARAMVERSION,
103                                         password=password,
104                                         nacl=TEST_STATIC_NACL,
105                                         insecure=True)
106
107
108     def test_crypto_aes_gcm_enc_ctor_key (self):
109         key        = os.urandom (42)
110         encryptor  = crypto.Encrypt (TEST_VERSION,
111                                      TEST_PARAMVERSION,
112                                      key=key,
113                                      nacl=TEST_STATIC_NACL)
114
115
116     def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
117         """
118         Either key (+nacl) or password must be supplied, not both.
119         """
120         with self.assertRaises (crypto.InvalidParameter):       # neither key nor pw
121             encryptor = crypto.Encrypt (TEST_VERSION,
122                                         TEST_PARAMVERSION,
123                                         nacl=TEST_STATIC_NACL)
124
125         password = str (os.urandom (42))
126         key      =      os.urandom (16)  # scrypt sized
127         with self.assertRaises (crypto.InvalidParameter):       # both key and pw
128             encryptor = crypto.Encrypt (TEST_VERSION,
129                                         TEST_PARAMVERSION,
130                                         password=password,
131                                         key=key,
132                                         nacl=TEST_STATIC_NACL)
133
134         with self.assertRaises (crypto.InvalidParameter):       # key, but salt missing
135             encryptor = crypto.Encrypt (TEST_VERSION,
136                                         TEST_PARAMVERSION,
137                                         key=key,
138                                         nacl=None)
139
140         with self.assertRaises (crypto.InvalidParameter):       # empty pw
141             encryptor = crypto.Encrypt (TEST_VERSION,
142                                         TEST_PARAMVERSION,
143                                         password=b"",
144                                         nacl=TEST_STATIC_NACL)
145
146
147     def test_crypto_aes_gcm_enc_header_size (self):
148         password       = str (os.urandom (42))
149         encryptor      = crypto.Encrypt (TEST_VERSION,
150                                          TEST_PARAMVERSION,
151                                          password=password,
152                                          nacl=TEST_STATIC_NACL)
153
154         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
155         assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
156         _, _           = encryptor.process (TEST_PLAINTEXT)
157         _, header, _   = encryptor.done (header_dummy)
158         assert len (header) == crypto.PDTCRYPT_HDR_SIZE
159
160
161     def test_crypto_aes_gcm_enc_multi_ivs_ok_explicit_counter (self):
162         """
163         Access the list of IVs used during encryption and check reuse. Start
164         from explicit counters so the inputs don’t overlap. IVs must not have
165         been reused.
166         """
167
168         def enc (start_count, data):
169             password       = str (os.urandom (42))
170             encryptor      = crypto.Encrypt (TEST_VERSION,
171                                              TEST_PARAMVERSION,
172                                              password=password,
173                                              nacl=TEST_STATIC_NACL,
174                                              counter=start_count)
175
176             for i, blob in enumerate (data, 1):
177                 fname = "%s_%d" % (TEST_DUMMY_FILENAME, i)
178                 header_dummy   = encryptor.next (fname)
179                 _, _           = encryptor.process (blob)
180                 _, header, _   = encryptor.done (header_dummy)
181                 assert len (encryptor.get_used_ivs ()) == i
182
183             return encryptor.get_used_ivs ()
184
185         ivs1 = enc (0x0042, [TEST_PLAINTEXT, b"none of your business"])
186         ivs2 = enc (0x1337, [b"read me if you can", b"for British eyes only!"])
187
188         # No reuse in general.
189         assert len (ivs1 & ivs2) == 0
190
191         ivs1 = list (ivs1)
192         ivs2 = list (ivs2)
193
194         # Counters of used IVs must match what we passed explicitly.
195         def extract_counters (ivs):
196             def getcount (iv):
197                 _, cnt = struct.unpack (crypto.FMT_I2N_IV, iv)
198                 return cnt
199             return list (map (getcount, ivs))
200
201         cnt1 = extract_counters (ivs1)
202         cnt2 = extract_counters (ivs2)
203
204         assert 0x0042 in cnt1
205         assert 0x0043 in cnt1
206
207         assert 0x1337 in cnt2
208         assert 0x1338 in cnt2
209
210
211     def test_crypto_aes_gcm_enc_chunk_size (self):
212         password       = str (os.urandom (42))
213         encryptor      = crypto.Encrypt (TEST_VERSION,
214                                          TEST_PARAMVERSION,
215                                          password=password,
216                                          nacl=TEST_STATIC_NACL)
217
218         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
219         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
220         assert len (ciphertext) == len (TEST_PLAINTEXT)
221         rest, header, fixed = encryptor.done (header_dummy)
222         assert len (rest) == 0
223
224
225     def test_crypto_aes_gcm_dec_ctor (self):
226         """
227         Ensure that only either key or password is accepted.
228         """
229         password = str (os.urandom (42))
230         key      =      os.urandom (16)  # scrypt sized
231
232         decryptor = crypto.Decrypt (password=password)
233         decryptor = crypto.Decrypt (key=key)
234
235         with self.assertRaises (crypto.InvalidParameter):       # both password and key
236             decryptor = crypto.Decrypt (password=password, key=key)
237
238         with self.assertRaises (crypto.InvalidParameter):       # neither password nor key
239             decryptor = crypto.Decrypt (password=None, key=None)
240
241         with self.assertRaises (crypto.InvalidParameter):       # # empty password
242             decryptor = crypto.Decrypt (password="")
243
244
245     def test_crypto_aes_gcm_dec_simple (self):
246         password       = str (os.urandom (42))
247         encryptor      = crypto.Encrypt (TEST_VERSION,
248                                          TEST_PARAMVERSION,
249                                          password=password,
250                                          nacl=TEST_STATIC_NACL)
251
252         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
253         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
254         rest, header, fixed = encryptor.done (header_dummy)
255         ciphertext    += rest
256
257         decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
258         decryptor.next (header)
259         plaintext      = decryptor.process (ciphertext)
260         rest           = decryptor.done ()
261         plaintext     += rest
262
263         assert plaintext == TEST_PLAINTEXT
264
265
266     def test_crypto_aes_gcm_dec_plain_bad (self):
267         """
268         Downgrade to plaintext must not be allowed in parameters
269         obtained from headers.
270         """
271         password       = str (os.urandom (42))
272         encryptor      = crypto.Encrypt (TEST_VERSION,
273                                          TEST_PARAMVERSION,
274                                          password=password,
275                                          nacl=TEST_STATIC_NACL)
276
277         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
278         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
279         rest, header, fixed = encryptor.done (header_dummy)
280         ciphertext    += rest
281
282         header         = crypto.hdr_read (header)
283         header ["paramversion"] = PLAIN_PARAMVERSION
284         ok, header     = crypto.hdr_make (header)
285         assert ok
286
287         decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
288         with self.assertRaises (crypto.InvalidParameter):
289             decryptor.next (header)
290
291
292     def test_crypto_aes_gcm_dec_plain_ok_insecure (self):
293         """
294         Allow plaintext crypto mode if *insecure* flag is passed.
295         """
296         password       = str (os.urandom (42))
297         encryptor      = crypto.Encrypt (TEST_VERSION,
298                                          PLAIN_PARAMVERSION,
299                                          password=password,
300                                          nacl=TEST_STATIC_NACL,
301                                          insecure=True)
302
303         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
304         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
305         rest, header, fixed = encryptor.done (header_dummy)
306         ciphertext    += rest
307
308         header         = crypto.hdr_read (header)
309         header ["paramversion"] = PLAIN_PARAMVERSION
310         ok, header     = crypto.hdr_make (header)
311         assert ok
312
313         decryptor      = crypto.Decrypt (password=password,
314                                          fixedparts=fixed,
315                                          insecure=True)
316         decryptor.next (header)
317         plaintext      = decryptor.process (ciphertext)
318         rest           = decryptor.done ()
319         plaintext     += rest
320
321         assert plaintext == TEST_PLAINTEXT
322
323
324     def test_crypto_aes_gcm_dec_bad_tag (self):
325         password       = str (os.urandom (42))
326         encryptor      = crypto.Encrypt (TEST_VERSION,
327                                          TEST_PARAMVERSION,
328                                          password=password,
329                                          nacl=TEST_STATIC_NACL)
330
331         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
332         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
333         ciphertext2, header, fixed = encryptor.done (header_dummy)
334
335         mut_header     = bytearray (header)
336         mut_header_vw  = memoryview (mut_header)
337         # replace one byte in the tag part of the header
338         second_byte    = mut_header_vw [crypto.HDR_OFF_TAG + 2]
339         mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
340         header         = bytes (mut_header)
341
342         decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
343         decryptor.next (header)
344         plaintext      = decryptor.process (ciphertext)
345         with self.assertRaises (crypto.InvalidGCMTag):
346             _ = decryptor.done ()
347
348
349     def test_crypto_aes_gcm_enc_multicnk (self):
350         cnksiz = 1 << 10
351         pt    = fill_mod (1 << 14)
352         password       = str (os.urandom (42))
353         encryptor      = crypto.Encrypt (TEST_VERSION,
354                                          TEST_PARAMVERSION,
355                                          password=password,
356                                          nacl=TEST_STATIC_NACL)
357         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
358
359         off = 0
360         ct = b""
361         while off < len (pt):
362             upto = min (off + cnksiz, len (pt))
363             _, cnk = encryptor.process (pt [off:upto])
364             ct += cnk
365             off += cnksiz
366         cnk, header, fixed = encryptor.done (header_dummy)
367         ct += cnk
368
369         assert len (pt) == len (ct)
370
371
372     def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
373         cnksiz = 1 << 10
374         pt    = fill_mod (1 << 14)
375         password       = str (os.urandom (42))
376         encryptor      = crypto.Encrypt (TEST_VERSION,
377                                          TEST_PARAMVERSION,
378                                          password=password,
379                                          nacl=TEST_STATIC_NACL,
380                                          strict_ivs=True)
381         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
382
383         off = 0
384         ct = b""
385         while off < len (pt):
386             upto = min (off + cnksiz, len (pt))
387             _, cnk = encryptor.process (pt [off:upto])
388             ct += cnk
389             off += cnksiz
390         cnk, header, fixed = encryptor.done (header_dummy)
391         ct += cnk
392
393         assert len (pt) == len (ct)
394
395
396     def test_crypto_aes_gcm_enc_multiobj (self):
397         cnksiz    = 1 << 10
398         password  = str (os.urandom (42))
399         encryptor = crypto.Encrypt (TEST_VERSION,
400                                     TEST_PARAMVERSION,
401                                     password=password,
402                                     nacl=TEST_STATIC_NACL,
403                                     strict_ivs=False)
404
405         def addobj (i):
406             pt           = fill_mod (1 << 14, off=i)
407             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
408
409             off = 0
410             ct = b""
411             while off < len (pt):
412                 upto = min (off + cnksiz, len (pt))
413                 _, cnk = encryptor.process (pt [off:upto])
414                 ct += cnk
415                 off += cnksiz
416             cnk, header, fixed = encryptor.done (header_dummy)
417             ct += cnk
418
419             assert len (pt) == len (ct)
420
421         for i in range (5): addobj (i)
422
423         assert len (encryptor.fixed) == 1
424
425
426     def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
427         cnksiz    = 1 << 10
428         password  = str (os.urandom (42))
429         encryptor = crypto.Encrypt (TEST_VERSION,
430                                     TEST_PARAMVERSION,
431                                     password=password,
432                                     nacl=TEST_STATIC_NACL,
433                                     strict_ivs=True)
434         curfixed  = None # must remain constant after first
435
436         def addobj (i):
437             pt           = fill_mod (1 << 14, off=i)
438             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
439
440             off = 0
441             ct = b""
442             while off < len (pt):
443                 upto = min (off + cnksiz, len (pt))
444                 _, cnk = encryptor.process (pt [off:upto])
445                 ct += cnk
446                 off += cnksiz
447             cnk, header, fixed = encryptor.done (header_dummy)
448             nonlocal curfixed
449             if curfixed is None:
450                 curfixed = fixed
451             else:
452                 assert fixed == curfixed
453             ct += cnk
454
455             assert len (pt) == len (ct)
456
457         for i in range (5): addobj (i)
458
459         assert len (encryptor.fixed) == 1
460
461
462     def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
463         """
464         Test behavior when the file counter tops out.
465
466         Artificially lower the maximum possible file counter. Considering
467         invalid (0) and reserved (1, 2) values, the smallest possible file counter
468         for normal objects is 3. Starting from that, the header of the (max -
469         3)rd object must have both a different IV fixed part and a counter.
470         """
471         minimum = 3
472         new_max = 8
473         crypto._testing_set_AES_GCM_IV_CNT_MAX \
474                 ("I am fully aware that this will void my warranty.", new_max)
475         cnksiz    = 1 << 10
476         password  = str (os.urandom (42))
477         encryptor = crypto.Encrypt (TEST_VERSION,
478                                     TEST_PARAMVERSION,
479                                     password=password,
480                                     nacl=TEST_STATIC_NACL,
481                                     strict_ivs=True)
482
483         last_iv  = None
484         last_cnt = minimum
485
486         def addobj (i, wrap=False):
487             nonlocal last_iv
488             nonlocal last_cnt
489             pt           = fill_mod (1 << 14, off=i)
490             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
491
492             off = 0
493             ct = b""
494             while off < len (pt):
495                 upto = min (off + cnksiz, len (pt))
496                 _, cnk = encryptor.process (pt [off:upto])
497                 ct += cnk
498                 off += cnksiz
499             cnk, header, fixed = encryptor.done (header_dummy)
500             this_iv = crypto.hdr_read (header) ["iv"]
501             if last_iv is not None:
502                 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
503                 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
504                 if wrap is False:
505                     assert last_fixed == this_fixed
506                     assert last_cnt   == this_cnt - 1
507                 else:
508                     assert last_fixed != this_fixed
509                     assert this_cnt   == minimum
510             last_iv = this_iv
511             ct += cnk
512
513             assert len (pt) == len (ct)
514
515         for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
516         addobj (i + 1, True) # counter wraps to 3
517
518         for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
519         addobj (j + 1, True) # counter wraps to 3 again
520
521         assert len (encryptor.fixed) == 3
522
523
524     def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self):
525         """
526         Test behavior when the file counter tops out and the transition to
527         the next IV fixed part fails on account of a bad random generator.
528
529         Replaces the ``urandom`` reference in ``os`` with a deterministic
530         function. The encryptor context must communicate this condition with an
531         ``IVFixedPartError``.
532         """
533         minimum = 3
534         new_max = 8
535         crypto._testing_set_AES_GCM_IV_CNT_MAX \
536                 ("I am fully aware that this will void my warranty.", new_max)
537         cnksiz    = 1 << 10
538         os.urandom = lambda n: bytes (bytearray ([n % 256] * n))
539         password  = str (os.urandom (42))
540         encryptor = crypto.Encrypt (TEST_VERSION,
541                                     TEST_PARAMVERSION,
542                                     password=password,
543                                     nacl=TEST_STATIC_NACL,
544                                     strict_ivs=True)
545
546         def addobj (i):
547             pt = fill_mod (1 << 14, off=i)
548             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
549
550             off = 0
551             while off < len (pt):
552                 upto = min (off + cnksiz, len (pt))
553                 _, cnk = encryptor.process (pt [off:upto])
554                 off += cnksiz
555
556         for i in range (minimum, new_max): addobj (42 + i)
557
558         with self.assertRaises (crypto.IVFixedPartError):
559             addobj (42 + i)
560
561
562
563     def test_crypto_aes_gcm_enc_length_cap (self):
564         """
565         Artificially lower the maximum allowable data length and attempt to
566         encrypt a larger object. Verify that the crypto handler only encrypts
567         data up to the size limit. A downstream user detects that condition by
568         testing whether the encryption step yielded less bytes than the
569         plaintext.
570
571         The sibling to this test is test_restore_backup_max_file_length()
572         in test_delatar.py. Deltatar will transparently create a splitted object
573         with an increased IV file counter.
574         """
575         new_max = 2187
576         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
577                 ("I am fully aware that this will void my warranty.", new_max)
578         cnksiz    = 1 << 10
579         password  = str (os.urandom (42))
580         encryptor = crypto.Encrypt (TEST_VERSION,
581                                     TEST_PARAMVERSION,
582                                     password=password,
583                                     nacl=TEST_STATIC_NACL)
584
585         def encobj (s):
586             pt, ct       = fill_mod (s), None
587             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
588
589             n, ct = encryptor.process (pt)
590             rest, _, _ = encryptor.done (header_dummy)
591
592             # NB: If this check *ever* fails, then something changed in the
593             #     encoding layer. AES-GCM is a stream cipher so each encoding
594             #     step will yield the exact number of ciphertext bytes that
595             #     was provided as plaintext. Thus there cannot be any encoded
596             #     data left when calling the finalizers. None of the crypo code
597             #     depends on that assumption but nevertheless we check it here
598             #     in case anything changes upstream in the Cryptography
599             #     library. In case there actually is a rest, replace the
600             #     assertion below with ``ct += rest``.
601             assert (len (rest) == 0)
602
603             if len (pt) > new_max:
604                 # If the plaintext was longer than the artificially lowered
605                 # maximum, then the number of ciphertext bytes must be clamped
606                 # to the maximum.
607                 assert n == new_max
608             else:
609                 assert n == len (pt) == len (ct)
610
611         for i in range (16): encobj (1 << i)
612
613
614     def test_crypto_aes_gcm_dec_length_cap (self):
615         """
616         The decryptor must reject headers with an object size that exceeds
617         the PDTCRYPT maximum. Longer files split into multiple objects.
618         """
619         password        = str (os.urandom (42))
620         meta            = faux_hdr()
621         meta ["ctsize"] = crypto.PDTCRYPT_MAX_OBJ_SIZE + 1
622         ok, header      = crypto.hdr_make (meta)
623
624         assert ok
625
626         # Set up decryption with bogus header.
627         decryptor = crypto.Decrypt (password=password, fixedparts=[])
628
629         with self.assertRaises (crypto.InvalidHeader):
630             decryptor.next (header)
631
632
633     def test_crypto_aes_gcm_dec_length_mismatch (self):
634         """
635         Catch attempts at decrypting more data than what was stated in the
636         header.
637         """
638         cnksiz         = 1 << 10
639         orig_pt        = fill_mod (1 << 14)
640         password       = str (os.urandom (42))
641         encryptor      = crypto.Encrypt (TEST_VERSION,
642                                          TEST_PARAMVERSION,
643                                          password=password,
644                                          nacl=TEST_STATIC_NACL)
645         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
646
647         off = 0
648         ct = b""
649         while off < len (orig_pt):
650             upto = min (off + cnksiz, len (orig_pt))
651             _n, cnk = encryptor.process (orig_pt [off:upto])
652             ct += cnk
653             off += cnksiz
654         cnk, header, fixed = encryptor.done (header_dummy)
655         ct += cnk
656
657         decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
658
659         decryptor.next (header)
660         off = 0
661         pt  = b""
662         while off < len (orig_pt):
663             upto = min (off + cnksiz, len (orig_pt))
664             cnk  = decryptor.process (ct [off:upto])
665             pt += cnk
666             off += cnksiz
667
668         with self.assertRaises (crypto.CiphertextTooLong):
669             # Try and decrypt one byte more than was encrypted.
670             # This must be caught in crypto.py.
671             _ = decryptor.process (ct [0:1])
672
673
674     def test_crypto_aes_gcm_dec_multicnk (self):
675         cnksiz         = 1 << 10
676         orig_pt        = fill_mod (1 << 14)
677         password       = str (os.urandom (42))
678         encryptor      = crypto.Encrypt (TEST_VERSION,
679                                          TEST_PARAMVERSION,
680                                          password=password,
681                                          nacl=TEST_STATIC_NACL)
682         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
683
684         off = 0
685         ct = b""
686         while off < len (orig_pt):
687             upto = min (off + cnksiz, len (orig_pt))
688             _n, cnk = encryptor.process (orig_pt [off:upto])
689             ct += cnk
690             off += cnksiz
691         cnk, header, fixed = encryptor.done (header_dummy)
692         ct += cnk
693
694         decryptor      = crypto.Decrypt (password=password,
695                                          fixedparts=fixed)
696         decryptor.next (header)
697         off = 0
698         pt  = b""
699         while off < len (orig_pt):
700             upto = min (off + cnksiz, len (orig_pt))
701             cnk  = decryptor.process (ct [off:upto])
702             pt += cnk
703             off += cnksiz
704
705
706         pt += decryptor.done ()
707         assert pt == orig_pt
708
709
710     def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
711         cnksiz         = 1 << 10
712         orig_pt        = fill_mod (1 << 14)
713         password       = str (os.urandom (42))
714         encryptor      = crypto.Encrypt (TEST_VERSION,
715                                          TEST_PARAMVERSION,
716                                          password=password,
717                                          nacl=TEST_STATIC_NACL)
718         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
719
720         off = 0
721         ct = b""
722         while off < len (orig_pt):
723             upto = min (off + cnksiz, len (orig_pt))
724             _n, cnk = encryptor.process (orig_pt [off:upto])
725             ct += cnk
726             off += cnksiz
727         cnk, header, fixed = encryptor.done (header_dummy)
728         ct += cnk
729
730         mut_header     = bytearray (header)
731         mut_header_vw  = memoryview (mut_header)
732         # replace one byte in the tag part of the header
733         second_byte    = mut_header_vw [crypto.HDR_OFF_TAG + 2]
734         mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
735         header         = bytes (mut_header)
736
737         decryptor      = crypto.Decrypt (password=password,
738                                          fixedparts=fixed)
739         decryptor.next (header)
740         off = 0
741         pt  = b""
742         while off < len (orig_pt):
743             upto = min (off + cnksiz, len (orig_pt))
744             cnk = decryptor.process (ct [off:upto])
745             pt += cnk
746             off += cnksiz
747
748         with self.assertRaises (crypto.InvalidGCMTag):
749             _ = decryptor.done ()
750
751
752     def test_crypto_aes_gcm_dec_iv_gap (self):
753         """
754         Encrypt multiple objects using non-consecutive IVs and verify that the
755         decryptor errors out with an exception in strict mode but keeps quiet
756         otherwise.
757         """
758         cnksiz         = 1 << 10
759         orig_pt_1      = fill_mod (1 << 10)
760         orig_pt_2      = fill_mod (1 << 10, 23)
761         orig_pt_3      = fill_mod (1 << 10, 42)
762         password       = str (os.urandom (42))
763         encryptor      = crypto.Encrypt (TEST_VERSION,
764                                          TEST_PARAMVERSION,
765                                          password=password,
766                                          nacl=TEST_STATIC_NACL)
767
768         def enc (pt):
769             header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
770
771             off = 0
772             ct = b""
773             while off < len (pt):
774                 upto = min (off + cnksiz, len (pt))
775                 _n, cnk = encryptor.process (pt [off:upto])
776                 ct += cnk
777                 off += cnksiz
778             cnk, header, fixed = encryptor.done (header_dummy)
779             return ct + cnk, header, fixed
780
781         ct_1, hdr_1, _____ = enc (orig_pt_1)
782
783         ## Here we bump the iv of the encryptor, breaking the series.
784         encryptor.set_object_counter (encryptor.cnt + 1)
785         ct_2, hdr_2, fixed = enc (orig_pt_2)
786
787         ## IV of final object is again in-sequence.
788         ct_3, hdr_3, fixed = enc (orig_pt_3)
789
790         def decrypt (strict_ivs):
791             decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
792                                         strict_ivs=strict_ivs)
793
794             def dec (hdr, ct):
795                 decryptor.next (hdr)
796                 off = 0
797                 pt  = b""
798                 while off < len (ct):
799                     upto = min (off + cnksiz, len (ct))
800                     cnk = decryptor.process (ct [off:upto])
801                     pt += cnk
802                     off += cnksiz
803                 return pt + decryptor.done ()
804
805             decr_pt_1 = dec (hdr_1, ct_1)
806             decr_pt_2 = dec (hdr_2, ct_2) ## ← good header, non-consecutive IV
807             decr_pt_3 = dec (hdr_3, ct_3)
808
809             assert decr_pt_1 == orig_pt_1
810             assert decr_pt_2 == orig_pt_2
811             assert decr_pt_3 == orig_pt_3
812
813         with self.assertRaises (crypto.NonConsecutiveIV):
814             decrypt (True)
815
816         decrypt (False) # Sequence passes
817
818
819     def test_crypto_aes_gcm_dec_iv_reuse (self):
820         """
821         Meddle with encrypted content: extract the IV from one object
822         and inject it into the header of another. This must be rejected
823         by the decryptor.
824         """
825         cnksiz         = 1 << 10
826         orig_pt_1      = fill_mod (1 << 10)
827         orig_pt_2      = fill_mod (1 << 10, 42)
828         password       = str (os.urandom (42))
829         encryptor      = crypto.Encrypt (TEST_VERSION,
830                                          TEST_PARAMVERSION,
831                                          password=password,
832                                          nacl=TEST_STATIC_NACL)
833
834         def enc (pt):
835             header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
836
837             off = 0
838             ct = b""
839             while off < len (pt):
840                 upto = min (off + cnksiz, len (pt))
841                 _n, cnk = encryptor.process (pt [off:upto])
842                 ct += cnk
843                 off += cnksiz
844             cnk, header, fixed = encryptor.done (header_dummy)
845             return ct + cnk, header, fixed
846
847         ct_1, hdr_1, _____ = enc (orig_pt_1)
848         encryptor.cnt -= 1 # induce error by forcing an identical IV on next step
849
850         with self.assertRaises (crypto.DuplicateIV): # reuse detected
851             ct_2, hdr_2, fixed = enc (orig_pt_2)
852
853
854 class HeaderTest (CryptoLayerTest):
855
856     def test_crypto_fmt_hdr_make (self):
857         meta = faux_hdr()
858         ok, hdr = crypto.hdr_make (meta)
859         assert ok
860         assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
861
862
863     def test_crypto_fmt_hdr_make_useless (self):
864         ok, ret = crypto.hdr_make ({ 42: "x" })
865         assert ok is False
866         assert ret.startswith ("error assembling header:")
867
868
869     def test_crypto_fmt_hdr_read (self):
870         meta = faux_hdr()
871         ok, hdr = crypto.hdr_make (meta)
872         assert ok is True
873         assert hdr is not None
874         mmeta = crypto.hdr_read (hdr)
875         assert mmeta is not None
876         for k in meta:
877             if meta [k] != mmeta [k]:
878                 raise "header mismatch after reading: expected %r, got %r" \
879                       % (meta [k], mmeta [k])
880
881
882     def test_crypto_fmt_hdr_read_trailing_garbage (self):
883         meta = faux_hdr()
884         ok, hdr = crypto.hdr_make (meta)
885         ok, hdr = crypto.hdr_make (meta)
886         assert ok is True
887         assert hdr is not None
888         hdr += b"-junk"
889         with self.assertRaises (crypto.InvalidHeader):
890             _ = crypto.hdr_read (hdr)
891
892
893     def test_crypto_fmt_hdr_read_leading_garbage (self):
894         meta = faux_hdr()
895         ok, hdr = crypto.hdr_make (meta)
896         ok, hdr = crypto.hdr_make (meta)
897         assert ok is True
898         assert hdr is not None
899         hdr = b"junk-" + hdr
900         with self.assertRaises (crypto.InvalidHeader):
901             _ = crypto.hdr_read (hdr)
902
903
904     def test_crypto_fmt_hdr_inner_garbage (self):
905         meta = faux_hdr()
906         ok, hdr = crypto.hdr_make (meta)
907         assert ok
908         data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
909         with self.assertRaises (crypto.InvalidHeader):
910             _ = crypto.hdr_read (data)
911