dc652e2516b0c5b9b64849883cc530916a991162
[python-delta-tar] / testing / test_crypto.py
1 import binascii
2 import os
3 import pylibscrypt
4 import struct
5 import unittest
6
7 import deltatar.crypto as crypto
8
9 import cryptography
10
11 def b(s):
12     return s.encode("UTF-8")
13
14 CRYPTO_NACL_SIZE  = 16
15 CRYPTO_KEY_SIZE   = 16
16
17 TEST_PLAINTEXT       = b("gentlemen don’t read each other’s mail")
18 TEST_PASSPHRASE      = b"test1234"
19 TEST_AES_GCM_AAD     = b"authenticated plain text"
20 TEST_DUMMY_FILENAME  = "insurance-file.txt"
21 TEST_VERSION         = 1
22 TEST_PARAMVERSION    = 1
23 TEST_STATIC_NACL     = os.urandom (CRYPTO_NACL_SIZE)
24
25 def faux_hdr (ctsize=1337, iv=None):
26     return \
27         {      "version" : 42
28         , "paramversion" : 2187
29         ,         "nacl" : binascii.unhexlify(b"0011223344556677"
30                                               b"8899aabbccddeeff")
31         ,           "iv" : iv or binascii.unhexlify(b"0011223344556677"
32                                                     b"8899aabb")
33         ,       "ctsize" : ctsize
34         ,          "tag" : binascii.unhexlify(b"deadbeefbadb100d"
35                                               b"b1eedc0ffeedea15")
36         }
37
38 FILL_MOD_MEMO = { }
39
40 def fill_mod (n, off=0):
41     global FILL_MOD_MEMO
42     k = (n, off)
43     m = FILL_MOD_MEMO.get (k, None)
44     if m is not None:
45         return m
46     buf = bytearray (n)
47     bufv = memoryview (buf)
48     for i in range (n):
49         off += 1
50         c = off % 64 + 32
51         struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
52     m = bytes (buf)
53     FILL_MOD_MEMO [k] = m
54     return m
55
56
57 def faux_payload ():
58     return "abcd" * 42
59
60
61 class CryptoLayerTest (unittest.TestCase):
62     pass
63
64
65 class AESGCMTest (CryptoLayerTest):
66
67     os_urandom = os.urandom
68
69     def tearDown (self):
70         """Reset globals altered for testing."""
71         _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
72                   ("I am fully aware that this will void my warranty.")
73         _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
74                   ("I am fully aware that this will void my warranty.")
75         os.urandom = self.os_urandom
76
77     def test_crypto_aes_gcm_enc_ctor (self):
78         password   = str (os.urandom (42))
79         encryptor  = crypto.Encrypt (TEST_VERSION,
80                                      TEST_PARAMVERSION,
81                                      password=password,
82                                      nacl=TEST_STATIC_NACL)
83
84
85     def test_crypto_aes_gcm_enc_ctor_key (self):
86         key        = os.urandom (42)
87         encryptor  = crypto.Encrypt (TEST_VERSION,
88                                      TEST_PARAMVERSION,
89                                      key=key,
90                                      nacl=TEST_STATIC_NACL)
91
92
93     def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
94         """
95         Either key (+nacl) or password must be supplied, not both.
96         """
97         with self.assertRaises (crypto.InvalidParameter):       # neither key nor pw
98             encryptor = crypto.Encrypt (TEST_VERSION,
99                                         TEST_PARAMVERSION,
100                                         nacl=TEST_STATIC_NACL)
101
102         password = str (os.urandom (42))
103         key      =      os.urandom (16)  # scrypt sized
104         with self.assertRaises (crypto.InvalidParameter):       # both key and pw
105             encryptor = crypto.Encrypt (TEST_VERSION,
106                                         TEST_PARAMVERSION,
107                                         password=password,
108                                         key=key,
109                                         nacl=TEST_STATIC_NACL)
110
111         with self.assertRaises (crypto.InvalidParameter):       # key, but salt missing
112             encryptor = crypto.Encrypt (TEST_VERSION,
113                                         TEST_PARAMVERSION,
114                                         key=key,
115                                         nacl=None)
116
117         with self.assertRaises (crypto.InvalidParameter):       # empty pw
118             encryptor = crypto.Encrypt (TEST_VERSION,
119                                         TEST_PARAMVERSION,
120                                         password=b"",
121                                         nacl=TEST_STATIC_NACL)
122
123
124     def test_crypto_aes_gcm_enc_header_size (self):
125         password       = str (os.urandom (42))
126         encryptor      = crypto.Encrypt (TEST_VERSION,
127                                          TEST_PARAMVERSION,
128                                          password=password,
129                                          nacl=TEST_STATIC_NACL)
130
131         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
132         assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
133         _, _           = encryptor.process (TEST_PLAINTEXT)
134         _, header, _   = encryptor.done (header_dummy)
135         assert len (header) == crypto.PDTCRYPT_HDR_SIZE
136
137
138     def test_crypto_aes_gcm_enc_chunk_size (self):
139         password       = str (os.urandom (42))
140         encryptor      = crypto.Encrypt (TEST_VERSION,
141                                          TEST_PARAMVERSION,
142                                          password=password,
143                                          nacl=TEST_STATIC_NACL)
144
145         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
146         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
147         assert len (ciphertext) == len (TEST_PLAINTEXT)
148         rest, header, fixed = encryptor.done (header_dummy)
149         assert len (rest) == 0
150
151
152     def test_crypto_aes_gcm_dec_ctor (self):
153         """
154         Ensure that only either key or password is accepted.
155         """
156         password = str (os.urandom (42))
157         key      =      os.urandom (16)  # scrypt sized
158
159         decryptor = crypto.Decrypt (password=password)
160         decryptor = crypto.Decrypt (key=key)
161
162         with self.assertRaises (crypto.InvalidParameter):       # both password and key
163             decryptor = crypto.Decrypt (password=password, key=key)
164
165         with self.assertRaises (crypto.InvalidParameter):       # neither password nor key
166             decryptor = crypto.Decrypt (password=None, key=None)
167
168         with self.assertRaises (crypto.InvalidParameter):       # # empty password
169             decryptor = crypto.Decrypt (password="")
170
171
172     def test_crypto_aes_gcm_dec_simple (self):
173         password       = str (os.urandom (42))
174         encryptor      = crypto.Encrypt (TEST_VERSION,
175                                          TEST_PARAMVERSION,
176                                          password=password,
177                                          nacl=TEST_STATIC_NACL)
178
179         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
180         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
181         rest, header, fixed = encryptor.done (header_dummy)
182         ciphertext    += rest
183
184         decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
185         decryptor.next (header)
186         plaintext      = decryptor.process (ciphertext)
187         rest           = decryptor.done ()
188         plaintext     += rest
189
190         assert plaintext == TEST_PLAINTEXT
191
192
193     def test_crypto_aes_gcm_dec_bad_tag (self):
194         password       = str (os.urandom (42))
195         encryptor      = crypto.Encrypt (TEST_VERSION,
196                                          TEST_PARAMVERSION,
197                                          password=password,
198                                          nacl=TEST_STATIC_NACL)
199
200         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
201         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
202         ciphertext2, header, fixed = encryptor.done (header_dummy)
203
204         mut_header     = bytearray (header)
205         mut_header_vw  = memoryview (mut_header)
206         # replace one byte in the tag part of the header
207         second_byte    = mut_header_vw [crypto.HDR_OFF_TAG + 2]
208         mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
209         header         = bytes (mut_header)
210
211         decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
212         decryptor.next (header)
213         plaintext      = decryptor.process (ciphertext)
214         with self.assertRaises (crypto.InvalidGCMTag):
215             _ = decryptor.done ()
216
217
218     def test_crypto_aes_gcm_enc_multicnk (self):
219         cnksiz = 1 << 10
220         pt    = fill_mod (1 << 14)
221         password       = str (os.urandom (42))
222         encryptor      = crypto.Encrypt (TEST_VERSION,
223                                          TEST_PARAMVERSION,
224                                          password=password,
225                                          nacl=TEST_STATIC_NACL)
226         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
227
228         off = 0
229         ct = b""
230         while off < len (pt):
231             upto = min (off + cnksiz, len (pt))
232             _, cnk = encryptor.process (pt [off:upto])
233             ct += cnk
234             off += cnksiz
235         cnk, header, fixed = encryptor.done (header_dummy)
236         ct += cnk
237
238         assert len (pt) == len (ct)
239
240
241     def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
242         cnksiz = 1 << 10
243         pt    = fill_mod (1 << 14)
244         password       = str (os.urandom (42))
245         encryptor      = crypto.Encrypt (TEST_VERSION,
246                                          TEST_PARAMVERSION,
247                                          password=password,
248                                          nacl=TEST_STATIC_NACL,
249                                          strict_ivs=True)
250         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
251
252         off = 0
253         ct = b""
254         while off < len (pt):
255             upto = min (off + cnksiz, len (pt))
256             _, cnk = encryptor.process (pt [off:upto])
257             ct += cnk
258             off += cnksiz
259         cnk, header, fixed = encryptor.done (header_dummy)
260         ct += cnk
261
262         assert len (pt) == len (ct)
263
264
265     def test_crypto_aes_gcm_enc_multiobj (self):
266         cnksiz    = 1 << 10
267         password  = str (os.urandom (42))
268         encryptor = crypto.Encrypt (TEST_VERSION,
269                                     TEST_PARAMVERSION,
270                                     password=password,
271                                     nacl=TEST_STATIC_NACL,
272                                     strict_ivs=False)
273
274         def addobj (i):
275             pt           = fill_mod (1 << 14, off=i)
276             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
277
278             off = 0
279             ct = b""
280             while off < len (pt):
281                 upto = min (off + cnksiz, len (pt))
282                 _, cnk = encryptor.process (pt [off:upto])
283                 ct += cnk
284                 off += cnksiz
285             cnk, header, fixed = encryptor.done (header_dummy)
286             ct += cnk
287
288             assert len (pt) == len (ct)
289
290         for i in range (5): addobj (i)
291
292         assert len (encryptor.fixed) == 1
293
294
295     def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
296         cnksiz    = 1 << 10
297         password  = str (os.urandom (42))
298         encryptor = crypto.Encrypt (TEST_VERSION,
299                                     TEST_PARAMVERSION,
300                                     password=password,
301                                     nacl=TEST_STATIC_NACL,
302                                     strict_ivs=True)
303         curfixed  = None # must remain constant after first
304
305         def addobj (i):
306             pt           = fill_mod (1 << 14, off=i)
307             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
308
309             off = 0
310             ct = b""
311             while off < len (pt):
312                 upto = min (off + cnksiz, len (pt))
313                 _, cnk = encryptor.process (pt [off:upto])
314                 ct += cnk
315                 off += cnksiz
316             cnk, header, fixed = encryptor.done (header_dummy)
317             nonlocal curfixed
318             if curfixed is None:
319                 curfixed = fixed
320             else:
321                 assert fixed == curfixed
322             ct += cnk
323
324             assert len (pt) == len (ct)
325
326         for i in range (5): addobj (i)
327
328         assert len (encryptor.fixed) == 1
329
330
331     def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
332         """
333         Test behavior when the file counter tops out.
334
335         Artificially lower the maximum possible file counter. Considering
336         invalid (0) and reserved (1, 2) values, the smallest possible file counter
337         for normal objects is 3. Starting from that, the header of the (max -
338         3)rd object must have both a different IV fixed part and a counter.
339         """
340         minimum = 3
341         new_max = 8
342         crypto._testing_set_AES_GCM_IV_CNT_MAX \
343                 ("I am fully aware that this will void my warranty.", new_max)
344         cnksiz    = 1 << 10
345         password  = str (os.urandom (42))
346         encryptor = crypto.Encrypt (TEST_VERSION,
347                                     TEST_PARAMVERSION,
348                                     password=password,
349                                     nacl=TEST_STATIC_NACL,
350                                     strict_ivs=True)
351
352         last_iv  = None
353         last_cnt = minimum
354
355         def addobj (i, wrap=False):
356             nonlocal last_iv
357             nonlocal last_cnt
358             pt           = fill_mod (1 << 14, off=i)
359             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
360
361             off = 0
362             ct = b""
363             while off < len (pt):
364                 upto = min (off + cnksiz, len (pt))
365                 _, cnk = encryptor.process (pt [off:upto])
366                 ct += cnk
367                 off += cnksiz
368             cnk, header, fixed = encryptor.done (header_dummy)
369             this_iv = crypto.hdr_read (header) ["iv"]
370             if last_iv is not None:
371                 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
372                 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
373                 if wrap is False:
374                     assert last_fixed == this_fixed
375                     assert last_cnt   == this_cnt - 1
376                 else:
377                     assert last_fixed != this_fixed
378                     assert this_cnt   == minimum
379             last_iv = this_iv
380             ct += cnk
381
382             assert len (pt) == len (ct)
383
384         for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
385         addobj (i + 1, True) # counter wraps to 3
386
387         for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
388         addobj (j + 1, True) # counter wraps to 3 again
389
390         assert len (encryptor.fixed) == 3
391
392
393     def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self):
394         """
395         Test behavior when the file counter tops out and the transition to
396         the next IV fixed part fails on account of a bad random generator.
397
398         Replaces the ``urandom`` reference in ``os`` with a deterministic
399         function. The encryptor context must communicate this condition with an
400         ``IVFixedPartError``.
401         """
402         minimum = 3
403         new_max = 8
404         crypto._testing_set_AES_GCM_IV_CNT_MAX \
405                 ("I am fully aware that this will void my warranty.", new_max)
406         cnksiz    = 1 << 10
407         os.urandom = lambda n: bytes (bytearray ([n % 256] * n))
408         password  = str (os.urandom (42))
409         encryptor = crypto.Encrypt (TEST_VERSION,
410                                     TEST_PARAMVERSION,
411                                     password=password,
412                                     nacl=TEST_STATIC_NACL,
413                                     strict_ivs=True)
414
415         def addobj (i):
416             pt = fill_mod (1 << 14, off=i)
417             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
418
419             off = 0
420             while off < len (pt):
421                 upto = min (off + cnksiz, len (pt))
422                 _, cnk = encryptor.process (pt [off:upto])
423                 off += cnksiz
424
425         for i in range (minimum, new_max): addobj (42 + i)
426
427         with self.assertRaises (crypto.IVFixedPartError):
428             addobj (42 + i)
429
430
431
432     def test_crypto_aes_gcm_enc_length_cap (self):
433         """
434         Artificially lower the maximum allowable data length and attempt to
435         encrypt a larger object. Verify that the crypto handler only encrypts
436         data up to the size limit. A downstream user detects that condition by
437         testing whether the encryption step yielded less bytes than the
438         plaintext.
439
440         The sibling to this test is test_restore_backup_max_file_length()
441         in test_delatar.py. Deltatar will transparently create a splitted object
442         with an increased IV file counter.
443         """
444         new_max = 2187
445         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
446                 ("I am fully aware that this will void my warranty.", new_max)
447         cnksiz    = 1 << 10
448         password  = str (os.urandom (42))
449         encryptor = crypto.Encrypt (TEST_VERSION,
450                                     TEST_PARAMVERSION,
451                                     password=password,
452                                     nacl=TEST_STATIC_NACL)
453
454         def encobj (s):
455             pt, ct       = fill_mod (s), None
456             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
457
458             n, ct = encryptor.process (pt)
459             rest, _, _ = encryptor.done (header_dummy)
460
461             # NB: If this check *ever* fails, then something changed in the
462             #     encoding layer. AES-GCM is a stream cipher so each encoding
463             #     step will yield the exact number of ciphertext bytes that
464             #     was provided as plaintext. Thus there cannot be any encoded
465             #     data left when calling the finalizers. None of the crypo code
466             #     depends on that assumption but nevertheless we check it here
467             #     in case anything changes upstream in the Cryptography
468             #     library. In case there actually is a rest, replace the
469             #     assertion below with ``ct += rest``.
470             assert (len (rest) == 0)
471
472             if len (pt) > new_max:
473                 # If the plaintext was longer than the artificially lowered
474                 # maximum, then the number of ciphertext bytes must be clamped
475                 # to the maximum.
476                 assert n == new_max
477             else:
478                 assert n == len (pt) == len (ct)
479
480         for i in range (16): encobj (1 << i)
481
482
483     def test_crypto_aes_gcm_dec_length_cap (self):
484         """
485         The decryptor must reject headers with an object size that exceeds
486         the PDTCRYPT maximum. Longer files split into multiple objects.
487         """
488         password        = str (os.urandom (42))
489         meta            = faux_hdr()
490         meta ["ctsize"] = crypto.PDTCRYPT_MAX_OBJ_SIZE + 1
491         ok, header      = crypto.hdr_make (meta)
492
493         assert ok
494
495         # Set up decryption with bogus header.
496         decryptor = crypto.Decrypt (password=password, fixedparts=[])
497
498         with self.assertRaises (crypto.InvalidHeader):
499             decryptor.next (header)
500
501
502     def test_crypto_aes_gcm_dec_length_mismatch (self):
503         """
504         Catch attempts at decrypting more data than what was stated in the
505         header.
506         """
507         cnksiz         = 1 << 10
508         orig_pt        = fill_mod (1 << 14)
509         password       = str (os.urandom (42))
510         encryptor      = crypto.Encrypt (TEST_VERSION,
511                                          TEST_PARAMVERSION,
512                                          password=password,
513                                          nacl=TEST_STATIC_NACL)
514         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
515
516         off = 0
517         ct = b""
518         while off < len (orig_pt):
519             upto = min (off + cnksiz, len (orig_pt))
520             _n, cnk = encryptor.process (orig_pt [off:upto])
521             ct += cnk
522             off += cnksiz
523         cnk, header, fixed = encryptor.done (header_dummy)
524         ct += cnk
525
526         decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
527
528         decryptor.next (header)
529         off = 0
530         pt  = b""
531         while off < len (orig_pt):
532             upto = min (off + cnksiz, len (orig_pt))
533             cnk  = decryptor.process (ct [off:upto])
534             pt += cnk
535             off += cnksiz
536
537         with self.assertRaises (crypto.CiphertextTooLong):
538             # Try and decrypt one byte more than was encrypted.
539             # This must be caught in crypto.py.
540             _ = decryptor.process (ct [0:1])
541
542
543     def test_crypto_aes_gcm_dec_multicnk (self):
544         cnksiz         = 1 << 10
545         orig_pt        = fill_mod (1 << 14)
546         password       = str (os.urandom (42))
547         encryptor      = crypto.Encrypt (TEST_VERSION,
548                                          TEST_PARAMVERSION,
549                                          password=password,
550                                          nacl=TEST_STATIC_NACL)
551         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
552
553         off = 0
554         ct = b""
555         while off < len (orig_pt):
556             upto = min (off + cnksiz, len (orig_pt))
557             _n, cnk = encryptor.process (orig_pt [off:upto])
558             ct += cnk
559             off += cnksiz
560         cnk, header, fixed = encryptor.done (header_dummy)
561         ct += cnk
562
563         decryptor      = crypto.Decrypt (password=password,
564                                          fixedparts=fixed)
565         decryptor.next (header)
566         off = 0
567         pt  = b""
568         while off < len (orig_pt):
569             upto = min (off + cnksiz, len (orig_pt))
570             cnk  = decryptor.process (ct [off:upto])
571             pt += cnk
572             off += cnksiz
573
574
575         pt += decryptor.done ()
576         assert pt == orig_pt
577
578
579     def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
580         cnksiz         = 1 << 10
581         orig_pt        = fill_mod (1 << 14)
582         password       = str (os.urandom (42))
583         encryptor      = crypto.Encrypt (TEST_VERSION,
584                                          TEST_PARAMVERSION,
585                                          password=password,
586                                          nacl=TEST_STATIC_NACL)
587         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
588
589         off = 0
590         ct = b""
591         while off < len (orig_pt):
592             upto = min (off + cnksiz, len (orig_pt))
593             _n, cnk = encryptor.process (orig_pt [off:upto])
594             ct += cnk
595             off += cnksiz
596         cnk, header, fixed = encryptor.done (header_dummy)
597         ct += cnk
598
599         mut_header     = bytearray (header)
600         mut_header_vw  = memoryview (mut_header)
601         # replace one byte in the tag part of the header
602         second_byte    = mut_header_vw [crypto.HDR_OFF_TAG + 2]
603         mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
604         header         = bytes (mut_header)
605
606         decryptor      = crypto.Decrypt (password=password,
607                                          fixedparts=fixed)
608         decryptor.next (header)
609         off = 0
610         pt  = b""
611         while off < len (orig_pt):
612             upto = min (off + cnksiz, len (orig_pt))
613             cnk = decryptor.process (ct [off:upto])
614             pt += cnk
615             off += cnksiz
616
617         with self.assertRaises (crypto.InvalidGCMTag):
618             _ = decryptor.done ()
619
620
621     def test_crypto_aes_gcm_dec_iv_reuse (self):
622         """
623         Meddle with encrypted content: extract the IV from one object
624         and inject it into the header of another. This must be rejected
625         by the decryptor.
626         """
627         cnksiz         = 1 << 10
628         orig_pt_1      = fill_mod (1 << 10)
629         orig_pt_2      = fill_mod (1 << 10, 42)
630         password       = str (os.urandom (42))
631         encryptor      = crypto.Encrypt (TEST_VERSION,
632                                          TEST_PARAMVERSION,
633                                          password=password,
634                                          nacl=TEST_STATIC_NACL)
635
636         def enc (pt):
637             header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
638
639             off = 0
640             ct = b""
641             while off < len (pt):
642                 upto = min (off + cnksiz, len (pt))
643                 _n, cnk = encryptor.process (pt [off:upto])
644                 ct += cnk
645                 off += cnksiz
646             cnk, header, fixed = encryptor.done (header_dummy)
647             return ct + cnk, header, fixed
648
649         ct_1, hdr_1, _____ = enc (orig_pt_1)
650         ct_2, hdr_2, fixed = enc (orig_pt_2)
651
652         mut_hdr_2    = bytearray (hdr_2)
653         mut_hdr_2_vw = memoryview (mut_hdr_2)
654         # get IV
655         iv_lo        = crypto.HDR_OFF_IV
656         iv_hi        = crypto.HDR_OFF_IV + crypto.PDTCRYPT_HDR_SIZE_IV
657         iv_1         = hdr_1 [iv_lo : iv_hi]
658         # transplant into other header
659         mut_hdr_2_vw [iv_lo : iv_hi] = iv_1
660         hdr_2_mod    = bytes (mut_hdr_2)
661         decryptor    = crypto.Decrypt (password=password, fixedparts=fixed,
662                                        strict_ivs=True)
663
664         def dec (hdr, ct):
665             decryptor.next (hdr)
666             off = 0
667             pt  = b""
668             while off < len (ct):
669                 upto = min (off + cnksiz, len (ct))
670                 cnk = decryptor.process (ct [off:upto])
671                 pt += cnk
672                 off += cnksiz
673             return pt + decryptor.done ()
674
675         decr_pt_1 = dec (hdr_1, ct_1)
676         decr_pt_2 = dec (hdr_2, ct_2) # good header, different IV
677         with self.assertRaises (crypto.DuplicateIV):        # bad header, reuse detected
678             decr_pt_2 = dec (hdr_2_mod, ct_2)
679
680
681 class HeaderTest (CryptoLayerTest):
682
683     def test_crypto_fmt_hdr_make (self):
684         meta = faux_hdr()
685         ok, hdr = crypto.hdr_make (meta)
686         assert ok
687         assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
688
689
690     def test_crypto_fmt_hdr_make_useless (self):
691         ok, ret = crypto.hdr_make ({ 42: "x" })
692         assert ok is False
693         assert ret.startswith ("error assembling header:")
694
695
696     def test_crypto_fmt_hdr_read (self):
697         meta = faux_hdr()
698         ok, hdr = crypto.hdr_make (meta)
699         assert ok is True
700         assert hdr is not None
701         mmeta = crypto.hdr_read (hdr)
702         assert mmeta is not None
703         for k in meta:
704             if meta [k] != mmeta [k]:
705                 raise "header mismatch after reading: expected %r, got %r" \
706                       % (meta [k], mmeta [k])
707
708
709     def test_crypto_fmt_hdr_read_trailing_garbage (self):
710         meta = faux_hdr()
711         ok, hdr = crypto.hdr_make (meta)
712         ok, hdr = crypto.hdr_make (meta)
713         assert ok is True
714         assert hdr is not None
715         hdr += b"-junk"
716         with self.assertRaises (crypto.InvalidHeader):
717             _ = crypto.hdr_read (hdr)
718
719
720     def test_crypto_fmt_hdr_read_leading_garbage (self):
721         meta = faux_hdr()
722         ok, hdr = crypto.hdr_make (meta)
723         ok, hdr = crypto.hdr_make (meta)
724         assert ok is True
725         assert hdr is not None
726         hdr = b"junk-" + hdr
727         with self.assertRaises (crypto.InvalidHeader):
728             _ = crypto.hdr_read (hdr)
729
730
731     def test_crypto_fmt_hdr_inner_garbage (self):
732         meta = faux_hdr()
733         ok, hdr = crypto.hdr_make (meta)
734         assert ok
735         data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
736         with self.assertRaises (crypto.InvalidHeader):
737             _ = crypto.hdr_read (data)
738