f47c7c7efb8278fa93381e85d48cb0c2c0635084
[python-delta-tar] / testing / test_crypto.py
1 import binascii
2 import os
3 import pylibscrypt
4 import struct
5 import unittest
6
7 import deltatar.crypto as crypto
8
9 import cryptography
10
11 def b(s):
12     return s.encode("UTF-8")
13
14 CRYPTO_NACL_SIZE  = 16
15 CRYPTO_KEY_SIZE   = 16
16
17 TEST_PLAINTEXT       = b("gentlemen don’t read each other’s mail")
18 TEST_PASSPHRASE      = b"test1234"
19 TEST_AES_GCM_AAD     = b"authenticated plain text"
20 TEST_DUMMY_FILENAME  = "insurance-file.txt"
21 TEST_VERSION         = 1
22 TEST_PARAMVERSION    = 1
23 TEST_STATIC_NACL     = os.urandom (CRYPTO_NACL_SIZE)
24 PLAIN_PARAMVERSION   = 0
25
26 def faux_hdr (ctsize=1337, iv=None):
27     return \
28         {      "version" : 42
29         , "paramversion" : 2187
30         ,         "nacl" : binascii.unhexlify(b"0011223344556677"
31                                               b"8899aabbccddeeff")
32         ,           "iv" : iv or binascii.unhexlify(b"0011223344556677"
33                                                     b"8899aabb")
34         ,       "ctsize" : ctsize
35         ,          "tag" : binascii.unhexlify(b"deadbeefbadb100d"
36                                               b"b1eedc0ffeedea15")
37         }
38
39 FILL_MOD_MEMO = { }
40
41 def fill_mod (n, off=0):
42     global FILL_MOD_MEMO
43     k = (n, off)
44     m = FILL_MOD_MEMO.get (k, None)
45     if m is not None:
46         return m
47     buf = bytearray (n)
48     bufv = memoryview (buf)
49     for i in range (n):
50         off += 1
51         c = off % 64 + 32
52         struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
53     m = bytes (buf)
54     FILL_MOD_MEMO [k] = m
55     return m
56
57
58 def faux_payload ():
59     return "abcd" * 42
60
61
62 class CryptoLayerTest (unittest.TestCase):
63     pass
64
65
66 class AESGCMTest (CryptoLayerTest):
67
68     os_urandom = os.urandom
69
70     def tearDown (self):
71         """Reset globals altered for testing."""
72         _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
73                   ("I am fully aware that this will void my warranty.")
74         _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
75                   ("I am fully aware that this will void my warranty.")
76         os.urandom = self.os_urandom
77
78     def test_crypto_aes_gcm_enc_ctor (self):
79         password   = str (os.urandom (42))
80         encryptor  = crypto.Encrypt (TEST_VERSION,
81                                      TEST_PARAMVERSION,
82                                      password=password,
83                                      nacl=TEST_STATIC_NACL)
84
85     def test_crypto_aes_gcm_enc_ctor_bad_plainparams (self):
86         """Refuse plaintext passthrough mode by default."""
87         password   = str (os.urandom (42))
88         with self.assertRaises (crypto.InvalidParameter):
89             encryptor  = crypto.Encrypt (TEST_VERSION,
90                                          PLAIN_PARAMVERSION,
91                                          password=password,
92                                          nacl=TEST_STATIC_NACL)
93
94
95     def test_crypto_aes_gcm_enc_ctor_ok_insecure_plainparams (self):
96         """
97         Comply with request for plaintext passthrough mode if the
98         *insecure* flag is passed.
99         """
100         password   = str (os.urandom (42))
101         encryptor  = crypto.Encrypt (TEST_VERSION,
102                                         PLAIN_PARAMVERSION,
103                                         password=password,
104                                         nacl=TEST_STATIC_NACL,
105                                         insecure=True)
106
107
108     def test_crypto_aes_gcm_enc_ctor_key (self):
109         key        = os.urandom (42)
110         encryptor  = crypto.Encrypt (TEST_VERSION,
111                                      TEST_PARAMVERSION,
112                                      key=key,
113                                      nacl=TEST_STATIC_NACL)
114
115
116     def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
117         """
118         Either key (+nacl) or password must be supplied, not both.
119         """
120         with self.assertRaises (crypto.InvalidParameter):       # neither key nor pw
121             encryptor = crypto.Encrypt (TEST_VERSION,
122                                         TEST_PARAMVERSION,
123                                         nacl=TEST_STATIC_NACL)
124
125         password = str (os.urandom (42))
126         key      =      os.urandom (16)  # scrypt sized
127         with self.assertRaises (crypto.InvalidParameter):       # both key and pw
128             encryptor = crypto.Encrypt (TEST_VERSION,
129                                         TEST_PARAMVERSION,
130                                         password=password,
131                                         key=key,
132                                         nacl=TEST_STATIC_NACL)
133
134         with self.assertRaises (crypto.InvalidParameter):       # key, but salt missing
135             encryptor = crypto.Encrypt (TEST_VERSION,
136                                         TEST_PARAMVERSION,
137                                         key=key,
138                                         nacl=None)
139
140         with self.assertRaises (crypto.InvalidParameter):       # empty pw
141             encryptor = crypto.Encrypt (TEST_VERSION,
142                                         TEST_PARAMVERSION,
143                                         password=b"",
144                                         nacl=TEST_STATIC_NACL)
145
146
147     def test_crypto_aes_gcm_enc_header_size (self):
148         password       = str (os.urandom (42))
149         encryptor      = crypto.Encrypt (TEST_VERSION,
150                                          TEST_PARAMVERSION,
151                                          password=password,
152                                          nacl=TEST_STATIC_NACL)
153
154         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
155         assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
156         _, _           = encryptor.process (TEST_PLAINTEXT)
157         _, header, _   = encryptor.done (header_dummy)
158         assert len (header) == crypto.PDTCRYPT_HDR_SIZE
159
160
161     def test_crypto_aes_gcm_enc_chunk_size (self):
162         password       = str (os.urandom (42))
163         encryptor      = crypto.Encrypt (TEST_VERSION,
164                                          TEST_PARAMVERSION,
165                                          password=password,
166                                          nacl=TEST_STATIC_NACL)
167
168         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
169         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
170         assert len (ciphertext) == len (TEST_PLAINTEXT)
171         rest, header, fixed = encryptor.done (header_dummy)
172         assert len (rest) == 0
173
174
175     def test_crypto_aes_gcm_dec_ctor (self):
176         """
177         Ensure that only either key or password is accepted.
178         """
179         password = str (os.urandom (42))
180         key      =      os.urandom (16)  # scrypt sized
181
182         decryptor = crypto.Decrypt (password=password)
183         decryptor = crypto.Decrypt (key=key)
184
185         with self.assertRaises (crypto.InvalidParameter):       # both password and key
186             decryptor = crypto.Decrypt (password=password, key=key)
187
188         with self.assertRaises (crypto.InvalidParameter):       # neither password nor key
189             decryptor = crypto.Decrypt (password=None, key=None)
190
191         with self.assertRaises (crypto.InvalidParameter):       # # empty password
192             decryptor = crypto.Decrypt (password="")
193
194
195     def test_crypto_aes_gcm_dec_simple (self):
196         password       = str (os.urandom (42))
197         encryptor      = crypto.Encrypt (TEST_VERSION,
198                                          TEST_PARAMVERSION,
199                                          password=password,
200                                          nacl=TEST_STATIC_NACL)
201
202         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
203         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
204         rest, header, fixed = encryptor.done (header_dummy)
205         ciphertext    += rest
206
207         decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
208         decryptor.next (header)
209         plaintext      = decryptor.process (ciphertext)
210         rest           = decryptor.done ()
211         plaintext     += rest
212
213         assert plaintext == TEST_PLAINTEXT
214
215
216     def test_crypto_aes_gcm_dec_plain_bad (self):
217         """
218         Downgrade to plaintext must not be allowed in parameters
219         obtained from headers.
220         """
221         password       = str (os.urandom (42))
222         encryptor      = crypto.Encrypt (TEST_VERSION,
223                                          TEST_PARAMVERSION,
224                                          password=password,
225                                          nacl=TEST_STATIC_NACL)
226
227         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
228         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
229         rest, header, fixed = encryptor.done (header_dummy)
230         ciphertext    += rest
231
232         header         = crypto.hdr_read (header)
233         header ["paramversion"] = PLAIN_PARAMVERSION
234         ok, header     = crypto.hdr_make (header)
235         assert ok
236
237         decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
238         with self.assertRaises (crypto.InvalidParameter):
239             decryptor.next (header)
240
241
242     def test_crypto_aes_gcm_dec_plain_ok_insecure (self):
243         """
244         Allow plaintext crypto mode if *insecure* flag is passed.
245         """
246         password       = str (os.urandom (42))
247         encryptor      = crypto.Encrypt (TEST_VERSION,
248                                          PLAIN_PARAMVERSION,
249                                          password=password,
250                                          nacl=TEST_STATIC_NACL,
251                                          insecure=True)
252
253         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
254         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
255         rest, header, fixed = encryptor.done (header_dummy)
256         ciphertext    += rest
257
258         header         = crypto.hdr_read (header)
259         header ["paramversion"] = PLAIN_PARAMVERSION
260         ok, header     = crypto.hdr_make (header)
261         assert ok
262
263         decryptor      = crypto.Decrypt (password=password,
264                                          fixedparts=fixed,
265                                          insecure=True)
266         decryptor.next (header)
267         plaintext      = decryptor.process (ciphertext)
268         rest           = decryptor.done ()
269         plaintext     += rest
270
271         assert plaintext == TEST_PLAINTEXT
272
273
274     def test_crypto_aes_gcm_dec_bad_tag (self):
275         password       = str (os.urandom (42))
276         encryptor      = crypto.Encrypt (TEST_VERSION,
277                                          TEST_PARAMVERSION,
278                                          password=password,
279                                          nacl=TEST_STATIC_NACL)
280
281         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
282         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
283         ciphertext2, header, fixed = encryptor.done (header_dummy)
284
285         mut_header     = bytearray (header)
286         mut_header_vw  = memoryview (mut_header)
287         # replace one byte in the tag part of the header
288         second_byte    = mut_header_vw [crypto.HDR_OFF_TAG + 2]
289         mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
290         header         = bytes (mut_header)
291
292         decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
293         decryptor.next (header)
294         plaintext      = decryptor.process (ciphertext)
295         with self.assertRaises (crypto.InvalidGCMTag):
296             _ = decryptor.done ()
297
298
299     def test_crypto_aes_gcm_enc_multicnk (self):
300         cnksiz = 1 << 10
301         pt    = fill_mod (1 << 14)
302         password       = str (os.urandom (42))
303         encryptor      = crypto.Encrypt (TEST_VERSION,
304                                          TEST_PARAMVERSION,
305                                          password=password,
306                                          nacl=TEST_STATIC_NACL)
307         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
308
309         off = 0
310         ct = b""
311         while off < len (pt):
312             upto = min (off + cnksiz, len (pt))
313             _, cnk = encryptor.process (pt [off:upto])
314             ct += cnk
315             off += cnksiz
316         cnk, header, fixed = encryptor.done (header_dummy)
317         ct += cnk
318
319         assert len (pt) == len (ct)
320
321
322     def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
323         cnksiz = 1 << 10
324         pt    = fill_mod (1 << 14)
325         password       = str (os.urandom (42))
326         encryptor      = crypto.Encrypt (TEST_VERSION,
327                                          TEST_PARAMVERSION,
328                                          password=password,
329                                          nacl=TEST_STATIC_NACL,
330                                          strict_ivs=True)
331         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
332
333         off = 0
334         ct = b""
335         while off < len (pt):
336             upto = min (off + cnksiz, len (pt))
337             _, cnk = encryptor.process (pt [off:upto])
338             ct += cnk
339             off += cnksiz
340         cnk, header, fixed = encryptor.done (header_dummy)
341         ct += cnk
342
343         assert len (pt) == len (ct)
344
345
346     def test_crypto_aes_gcm_enc_multiobj (self):
347         cnksiz    = 1 << 10
348         password  = str (os.urandom (42))
349         encryptor = crypto.Encrypt (TEST_VERSION,
350                                     TEST_PARAMVERSION,
351                                     password=password,
352                                     nacl=TEST_STATIC_NACL,
353                                     strict_ivs=False)
354
355         def addobj (i):
356             pt           = fill_mod (1 << 14, off=i)
357             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
358
359             off = 0
360             ct = b""
361             while off < len (pt):
362                 upto = min (off + cnksiz, len (pt))
363                 _, cnk = encryptor.process (pt [off:upto])
364                 ct += cnk
365                 off += cnksiz
366             cnk, header, fixed = encryptor.done (header_dummy)
367             ct += cnk
368
369             assert len (pt) == len (ct)
370
371         for i in range (5): addobj (i)
372
373         assert len (encryptor.fixed) == 1
374
375
376     def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
377         cnksiz    = 1 << 10
378         password  = str (os.urandom (42))
379         encryptor = crypto.Encrypt (TEST_VERSION,
380                                     TEST_PARAMVERSION,
381                                     password=password,
382                                     nacl=TEST_STATIC_NACL,
383                                     strict_ivs=True)
384         curfixed  = None # must remain constant after first
385
386         def addobj (i):
387             pt           = fill_mod (1 << 14, off=i)
388             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
389
390             off = 0
391             ct = b""
392             while off < len (pt):
393                 upto = min (off + cnksiz, len (pt))
394                 _, cnk = encryptor.process (pt [off:upto])
395                 ct += cnk
396                 off += cnksiz
397             cnk, header, fixed = encryptor.done (header_dummy)
398             nonlocal curfixed
399             if curfixed is None:
400                 curfixed = fixed
401             else:
402                 assert fixed == curfixed
403             ct += cnk
404
405             assert len (pt) == len (ct)
406
407         for i in range (5): addobj (i)
408
409         assert len (encryptor.fixed) == 1
410
411
412     def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
413         """
414         Test behavior when the file counter tops out.
415
416         Artificially lower the maximum possible file counter. Considering
417         invalid (0) and reserved (1, 2) values, the smallest possible file counter
418         for normal objects is 3. Starting from that, the header of the (max -
419         3)rd object must have both a different IV fixed part and a counter.
420         """
421         minimum = 3
422         new_max = 8
423         crypto._testing_set_AES_GCM_IV_CNT_MAX \
424                 ("I am fully aware that this will void my warranty.", new_max)
425         cnksiz    = 1 << 10
426         password  = str (os.urandom (42))
427         encryptor = crypto.Encrypt (TEST_VERSION,
428                                     TEST_PARAMVERSION,
429                                     password=password,
430                                     nacl=TEST_STATIC_NACL,
431                                     strict_ivs=True)
432
433         last_iv  = None
434         last_cnt = minimum
435
436         def addobj (i, wrap=False):
437             nonlocal last_iv
438             nonlocal last_cnt
439             pt           = fill_mod (1 << 14, off=i)
440             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
441
442             off = 0
443             ct = b""
444             while off < len (pt):
445                 upto = min (off + cnksiz, len (pt))
446                 _, cnk = encryptor.process (pt [off:upto])
447                 ct += cnk
448                 off += cnksiz
449             cnk, header, fixed = encryptor.done (header_dummy)
450             this_iv = crypto.hdr_read (header) ["iv"]
451             if last_iv is not None:
452                 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
453                 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
454                 if wrap is False:
455                     assert last_fixed == this_fixed
456                     assert last_cnt   == this_cnt - 1
457                 else:
458                     assert last_fixed != this_fixed
459                     assert this_cnt   == minimum
460             last_iv = this_iv
461             ct += cnk
462
463             assert len (pt) == len (ct)
464
465         for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
466         addobj (i + 1, True) # counter wraps to 3
467
468         for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
469         addobj (j + 1, True) # counter wraps to 3 again
470
471         assert len (encryptor.fixed) == 3
472
473
474     def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self):
475         """
476         Test behavior when the file counter tops out and the transition to
477         the next IV fixed part fails on account of a bad random generator.
478
479         Replaces the ``urandom`` reference in ``os`` with a deterministic
480         function. The encryptor context must communicate this condition with an
481         ``IVFixedPartError``.
482         """
483         minimum = 3
484         new_max = 8
485         crypto._testing_set_AES_GCM_IV_CNT_MAX \
486                 ("I am fully aware that this will void my warranty.", new_max)
487         cnksiz    = 1 << 10
488         os.urandom = lambda n: bytes (bytearray ([n % 256] * n))
489         password  = str (os.urandom (42))
490         encryptor = crypto.Encrypt (TEST_VERSION,
491                                     TEST_PARAMVERSION,
492                                     password=password,
493                                     nacl=TEST_STATIC_NACL,
494                                     strict_ivs=True)
495
496         def addobj (i):
497             pt = fill_mod (1 << 14, off=i)
498             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
499
500             off = 0
501             while off < len (pt):
502                 upto = min (off + cnksiz, len (pt))
503                 _, cnk = encryptor.process (pt [off:upto])
504                 off += cnksiz
505
506         for i in range (minimum, new_max): addobj (42 + i)
507
508         with self.assertRaises (crypto.IVFixedPartError):
509             addobj (42 + i)
510
511
512
513     def test_crypto_aes_gcm_enc_length_cap (self):
514         """
515         Artificially lower the maximum allowable data length and attempt to
516         encrypt a larger object. Verify that the crypto handler only encrypts
517         data up to the size limit. A downstream user detects that condition by
518         testing whether the encryption step yielded less bytes than the
519         plaintext.
520
521         The sibling to this test is test_restore_backup_max_file_length()
522         in test_delatar.py. Deltatar will transparently create a splitted object
523         with an increased IV file counter.
524         """
525         new_max = 2187
526         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
527                 ("I am fully aware that this will void my warranty.", new_max)
528         cnksiz    = 1 << 10
529         password  = str (os.urandom (42))
530         encryptor = crypto.Encrypt (TEST_VERSION,
531                                     TEST_PARAMVERSION,
532                                     password=password,
533                                     nacl=TEST_STATIC_NACL)
534
535         def encobj (s):
536             pt, ct       = fill_mod (s), None
537             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
538
539             n, ct = encryptor.process (pt)
540             rest, _, _ = encryptor.done (header_dummy)
541
542             # NB: If this check *ever* fails, then something changed in the
543             #     encoding layer. AES-GCM is a stream cipher so each encoding
544             #     step will yield the exact number of ciphertext bytes that
545             #     was provided as plaintext. Thus there cannot be any encoded
546             #     data left when calling the finalizers. None of the crypo code
547             #     depends on that assumption but nevertheless we check it here
548             #     in case anything changes upstream in the Cryptography
549             #     library. In case there actually is a rest, replace the
550             #     assertion below with ``ct += rest``.
551             assert (len (rest) == 0)
552
553             if len (pt) > new_max:
554                 # If the plaintext was longer than the artificially lowered
555                 # maximum, then the number of ciphertext bytes must be clamped
556                 # to the maximum.
557                 assert n == new_max
558             else:
559                 assert n == len (pt) == len (ct)
560
561         for i in range (16): encobj (1 << i)
562
563
564     def test_crypto_aes_gcm_dec_length_cap (self):
565         """
566         The decryptor must reject headers with an object size that exceeds
567         the PDTCRYPT maximum. Longer files split into multiple objects.
568         """
569         password        = str (os.urandom (42))
570         meta            = faux_hdr()
571         meta ["ctsize"] = crypto.PDTCRYPT_MAX_OBJ_SIZE + 1
572         ok, header      = crypto.hdr_make (meta)
573
574         assert ok
575
576         # Set up decryption with bogus header.
577         decryptor = crypto.Decrypt (password=password, fixedparts=[])
578
579         with self.assertRaises (crypto.InvalidHeader):
580             decryptor.next (header)
581
582
583     def test_crypto_aes_gcm_dec_length_mismatch (self):
584         """
585         Catch attempts at decrypting more data than what was stated in the
586         header.
587         """
588         cnksiz         = 1 << 10
589         orig_pt        = fill_mod (1 << 14)
590         password       = str (os.urandom (42))
591         encryptor      = crypto.Encrypt (TEST_VERSION,
592                                          TEST_PARAMVERSION,
593                                          password=password,
594                                          nacl=TEST_STATIC_NACL)
595         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
596
597         off = 0
598         ct = b""
599         while off < len (orig_pt):
600             upto = min (off + cnksiz, len (orig_pt))
601             _n, cnk = encryptor.process (orig_pt [off:upto])
602             ct += cnk
603             off += cnksiz
604         cnk, header, fixed = encryptor.done (header_dummy)
605         ct += cnk
606
607         decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
608
609         decryptor.next (header)
610         off = 0
611         pt  = b""
612         while off < len (orig_pt):
613             upto = min (off + cnksiz, len (orig_pt))
614             cnk  = decryptor.process (ct [off:upto])
615             pt += cnk
616             off += cnksiz
617
618         with self.assertRaises (crypto.CiphertextTooLong):
619             # Try and decrypt one byte more than was encrypted.
620             # This must be caught in crypto.py.
621             _ = decryptor.process (ct [0:1])
622
623
624     def test_crypto_aes_gcm_dec_multicnk (self):
625         cnksiz         = 1 << 10
626         orig_pt        = fill_mod (1 << 14)
627         password       = str (os.urandom (42))
628         encryptor      = crypto.Encrypt (TEST_VERSION,
629                                          TEST_PARAMVERSION,
630                                          password=password,
631                                          nacl=TEST_STATIC_NACL)
632         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
633
634         off = 0
635         ct = b""
636         while off < len (orig_pt):
637             upto = min (off + cnksiz, len (orig_pt))
638             _n, cnk = encryptor.process (orig_pt [off:upto])
639             ct += cnk
640             off += cnksiz
641         cnk, header, fixed = encryptor.done (header_dummy)
642         ct += cnk
643
644         decryptor      = crypto.Decrypt (password=password,
645                                          fixedparts=fixed)
646         decryptor.next (header)
647         off = 0
648         pt  = b""
649         while off < len (orig_pt):
650             upto = min (off + cnksiz, len (orig_pt))
651             cnk  = decryptor.process (ct [off:upto])
652             pt += cnk
653             off += cnksiz
654
655
656         pt += decryptor.done ()
657         assert pt == orig_pt
658
659
660     def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
661         cnksiz         = 1 << 10
662         orig_pt        = fill_mod (1 << 14)
663         password       = str (os.urandom (42))
664         encryptor      = crypto.Encrypt (TEST_VERSION,
665                                          TEST_PARAMVERSION,
666                                          password=password,
667                                          nacl=TEST_STATIC_NACL)
668         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
669
670         off = 0
671         ct = b""
672         while off < len (orig_pt):
673             upto = min (off + cnksiz, len (orig_pt))
674             _n, cnk = encryptor.process (orig_pt [off:upto])
675             ct += cnk
676             off += cnksiz
677         cnk, header, fixed = encryptor.done (header_dummy)
678         ct += cnk
679
680         mut_header     = bytearray (header)
681         mut_header_vw  = memoryview (mut_header)
682         # replace one byte in the tag part of the header
683         second_byte    = mut_header_vw [crypto.HDR_OFF_TAG + 2]
684         mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
685         header         = bytes (mut_header)
686
687         decryptor      = crypto.Decrypt (password=password,
688                                          fixedparts=fixed)
689         decryptor.next (header)
690         off = 0
691         pt  = b""
692         while off < len (orig_pt):
693             upto = min (off + cnksiz, len (orig_pt))
694             cnk = decryptor.process (ct [off:upto])
695             pt += cnk
696             off += cnksiz
697
698         with self.assertRaises (crypto.InvalidGCMTag):
699             _ = decryptor.done ()
700
701
702     def test_crypto_aes_gcm_dec_iv_gap (self):
703         """
704         Encrypt multiple objects using non-consecutive IVs and verify that the
705         decryptor errors out with an exception in strict mode but keeps quiet
706         otherwise.
707         """
708         cnksiz         = 1 << 10
709         orig_pt_1      = fill_mod (1 << 10)
710         orig_pt_2      = fill_mod (1 << 10, 23)
711         orig_pt_3      = fill_mod (1 << 10, 42)
712         password       = str (os.urandom (42))
713         encryptor      = crypto.Encrypt (TEST_VERSION,
714                                          TEST_PARAMVERSION,
715                                          password=password,
716                                          nacl=TEST_STATIC_NACL)
717
718         def enc (pt):
719             header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
720
721             off = 0
722             ct = b""
723             while off < len (pt):
724                 upto = min (off + cnksiz, len (pt))
725                 _n, cnk = encryptor.process (pt [off:upto])
726                 ct += cnk
727                 off += cnksiz
728             cnk, header, fixed = encryptor.done (header_dummy)
729             return ct + cnk, header, fixed
730
731         ct_1, hdr_1, _____ = enc (orig_pt_1)
732
733         ## Here we bump the iv of the encryptor, breaking the series.
734         encryptor.set_object_counter (encryptor.cnt + 1)
735         ct_2, hdr_2, fixed = enc (orig_pt_2)
736
737         ## IV of final object is again in-sequence.
738         ct_3, hdr_3, fixed = enc (orig_pt_3)
739
740         def decrypt (strict_ivs):
741             decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
742                                         strict_ivs=strict_ivs)
743
744             def dec (hdr, ct):
745                 decryptor.next (hdr)
746                 off = 0
747                 pt  = b""
748                 while off < len (ct):
749                     upto = min (off + cnksiz, len (ct))
750                     cnk = decryptor.process (ct [off:upto])
751                     pt += cnk
752                     off += cnksiz
753                 return pt + decryptor.done ()
754
755             decr_pt_1 = dec (hdr_1, ct_1)
756             decr_pt_2 = dec (hdr_2, ct_2) ## ← good header, non-consecutive IV
757             decr_pt_3 = dec (hdr_3, ct_3)
758
759             assert decr_pt_1 == orig_pt_1
760             assert decr_pt_2 == orig_pt_2
761             assert decr_pt_3 == orig_pt_3
762
763         with self.assertRaises (crypto.NonConsecutiveIV):
764             decrypt (True)
765
766         decrypt (False) # Sequence passes
767
768
769     def test_crypto_aes_gcm_dec_iv_reuse (self):
770         """
771         Meddle with encrypted content: extract the IV from one object
772         and inject it into the header of another. This must be rejected
773         by the decryptor.
774         """
775         cnksiz         = 1 << 10
776         orig_pt_1      = fill_mod (1 << 10)
777         orig_pt_2      = fill_mod (1 << 10, 42)
778         password       = str (os.urandom (42))
779         encryptor      = crypto.Encrypt (TEST_VERSION,
780                                          TEST_PARAMVERSION,
781                                          password=password,
782                                          nacl=TEST_STATIC_NACL)
783
784         def enc (pt):
785             header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
786
787             off = 0
788             ct = b""
789             while off < len (pt):
790                 upto = min (off + cnksiz, len (pt))
791                 _n, cnk = encryptor.process (pt [off:upto])
792                 ct += cnk
793                 off += cnksiz
794             cnk, header, fixed = encryptor.done (header_dummy)
795             return ct + cnk, header, fixed
796
797         ct_1, hdr_1, _____ = enc (orig_pt_1)
798         ct_2, hdr_2, fixed = enc (orig_pt_2)
799
800         mut_hdr_2    = bytearray (hdr_2)
801         mut_hdr_2_vw = memoryview (mut_hdr_2)
802         # get IV
803         iv_lo        = crypto.HDR_OFF_IV
804         iv_hi        = crypto.HDR_OFF_IV + crypto.PDTCRYPT_HDR_SIZE_IV
805         iv_1         = hdr_1 [iv_lo : iv_hi]
806         # transplant into other header
807         mut_hdr_2_vw [iv_lo : iv_hi] = iv_1
808         hdr_2_mod    = bytes (mut_hdr_2)
809         decryptor    = crypto.Decrypt (password=password, fixedparts=fixed,
810                                        strict_ivs=True)
811
812         def dec (hdr, ct):
813             decryptor.next (hdr)
814             off = 0
815             pt  = b""
816             while off < len (ct):
817                 upto = min (off + cnksiz, len (ct))
818                 cnk = decryptor.process (ct [off:upto])
819                 pt += cnk
820                 off += cnksiz
821             return pt + decryptor.done ()
822
823         decr_pt_1 = dec (hdr_1, ct_1)
824         decr_pt_2 = dec (hdr_2, ct_2) # good header, different IV
825         with self.assertRaises (crypto.DuplicateIV):        # bad header, reuse detected
826             decr_pt_2 = dec (hdr_2_mod, ct_2)
827
828
829 class HeaderTest (CryptoLayerTest):
830
831     def test_crypto_fmt_hdr_make (self):
832         meta = faux_hdr()
833         ok, hdr = crypto.hdr_make (meta)
834         assert ok
835         assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
836
837
838     def test_crypto_fmt_hdr_make_useless (self):
839         ok, ret = crypto.hdr_make ({ 42: "x" })
840         assert ok is False
841         assert ret.startswith ("error assembling header:")
842
843
844     def test_crypto_fmt_hdr_read (self):
845         meta = faux_hdr()
846         ok, hdr = crypto.hdr_make (meta)
847         assert ok is True
848         assert hdr is not None
849         mmeta = crypto.hdr_read (hdr)
850         assert mmeta is not None
851         for k in meta:
852             if meta [k] != mmeta [k]:
853                 raise "header mismatch after reading: expected %r, got %r" \
854                       % (meta [k], mmeta [k])
855
856
857     def test_crypto_fmt_hdr_read_trailing_garbage (self):
858         meta = faux_hdr()
859         ok, hdr = crypto.hdr_make (meta)
860         ok, hdr = crypto.hdr_make (meta)
861         assert ok is True
862         assert hdr is not None
863         hdr += b"-junk"
864         with self.assertRaises (crypto.InvalidHeader):
865             _ = crypto.hdr_read (hdr)
866
867
868     def test_crypto_fmt_hdr_read_leading_garbage (self):
869         meta = faux_hdr()
870         ok, hdr = crypto.hdr_make (meta)
871         ok, hdr = crypto.hdr_make (meta)
872         assert ok is True
873         assert hdr is not None
874         hdr = b"junk-" + hdr
875         with self.assertRaises (crypto.InvalidHeader):
876             _ = crypto.hdr_read (hdr)
877
878
879     def test_crypto_fmt_hdr_inner_garbage (self):
880         meta = faux_hdr()
881         ok, hdr = crypto.hdr_make (meta)
882         assert ok
883         data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
884         with self.assertRaises (crypto.InvalidHeader):
885             _ = crypto.hdr_read (data)
886