deny insecure parameters by default
[python-delta-tar] / testing / test_crypto.py
CommitLineData
5133232d
PG
1import binascii
2import os
3import pylibscrypt
c2d1c3ec 4import struct
5133232d
PG
5import unittest
6
7import deltatar.crypto as crypto
8
e2a4e4f0
PG
9import cryptography
10
5133232d
PG
11def b(s):
12 return s.encode("UTF-8")
13
90ee2a74 14CRYPTO_NACL_SIZE = 16
5133232d 15CRYPTO_KEY_SIZE = 16
90ee2a74
PG
16
17TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail")
18TEST_PASSPHRASE = b"test1234"
19TEST_AES_GCM_AAD = b"authenticated plain text"
20TEST_DUMMY_FILENAME = "insurance-file.txt"
21TEST_VERSION = 1
22TEST_PARAMVERSION = 1
23TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE)
6110ef14 24PLAIN_PARAMVERSION = 0
5133232d
PG
25
26def faux_hdr (ctsize=1337, iv=None):
27 return \
28 { "version" : 42
29 , "paramversion" : 2187
30 , "nacl" : binascii.unhexlify(b"0011223344556677"
31 b"8899aabbccddeeff")
32 , "iv" : iv or binascii.unhexlify(b"0011223344556677"
33 b"8899aabb")
34 , "ctsize" : ctsize
90ee2a74
PG
35 , "tag" : binascii.unhexlify(b"deadbeefbadb100d"
36 b"b1eedc0ffeedea15")
5133232d
PG
37 }
38
30019abf 39FILL_MOD_MEMO = { }
5133232d 40
fd10b44a 41def fill_mod (n, off=0):
30019abf
PG
42 global FILL_MOD_MEMO
43 k = (n, off)
44 m = FILL_MOD_MEMO.get (k, None)
45 if m is not None:
46 return m
c2d1c3ec
PG
47 buf = bytearray (n)
48 bufv = memoryview (buf)
49 for i in range (n):
fd10b44a
PG
50 off += 1
51 c = off % 64 + 32
c2d1c3ec 52 struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
30019abf
PG
53 m = bytes (buf)
54 FILL_MOD_MEMO [k] = m
55 return m
c2d1c3ec
PG
56
57
5133232d
PG
58def faux_payload ():
59 return "abcd" * 42
60
cb7397d5 61
5133232d 62class CryptoLayerTest (unittest.TestCase):
cb7397d5
PG
63 pass
64
65
66class AESGCMTest (CryptoLayerTest):
5133232d 67
be124bca
PG
68 os_urandom = os.urandom
69
cb7a3911
PG
70 def tearDown (self):
71 """Reset globals altered for testing."""
72 _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
73 ("I am fully aware that this will void my warranty.")
74 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
75 ("I am fully aware that this will void my warranty.")
be124bca 76 os.urandom = self.os_urandom
cb7a3911 77
90ee2a74
PG
78 def test_crypto_aes_gcm_enc_ctor (self):
79 password = str (os.urandom (42))
1f3fd7b0
PG
80 encryptor = crypto.Encrypt (TEST_VERSION,
81 TEST_PARAMVERSION,
82 password=password,
83 nacl=TEST_STATIC_NACL)
84
6110ef14
PG
85 def test_crypto_aes_gcm_enc_ctor_bad_plainparams (self):
86 """Refuse plaintext passthrough mode by default."""
87 password = str (os.urandom (42))
88 with self.assertRaises (crypto.InvalidParameter):
89 encryptor = crypto.Encrypt (TEST_VERSION,
90 PLAIN_PARAMVERSION,
91 password=password,
92 nacl=TEST_STATIC_NACL)
93
94
95 def test_crypto_aes_gcm_enc_ctor_ok_insecure_plainparams (self):
96 """
97 Comply with request for plaintext passthrough mode if the
98 *insecure* flag is passed.
99 """
100 password = str (os.urandom (42))
101 encryptor = crypto.Encrypt (TEST_VERSION,
102 PLAIN_PARAMVERSION,
103 password=password,
104 nacl=TEST_STATIC_NACL,
105 insecure=True)
106
1f3fd7b0
PG
107
108 def test_crypto_aes_gcm_enc_ctor_key (self):
109 key = os.urandom (42)
110 encryptor = crypto.Encrypt (TEST_VERSION,
111 TEST_PARAMVERSION,
112 key=key,
113 nacl=TEST_STATIC_NACL)
114
115
116 def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
117 """
118 Either key (+nacl) or password must be supplied, not both.
119 """
1c2f7f07 120 with self.assertRaises (crypto.InvalidParameter): # neither key nor pw
1f3fd7b0
PG
121 encryptor = crypto.Encrypt (TEST_VERSION,
122 TEST_PARAMVERSION,
123 nacl=TEST_STATIC_NACL)
1f3fd7b0
PG
124
125 password = str (os.urandom (42))
126 key = os.urandom (16) # scrypt sized
1c2f7f07 127 with self.assertRaises (crypto.InvalidParameter): # both key and pw
1f3fd7b0
PG
128 encryptor = crypto.Encrypt (TEST_VERSION,
129 TEST_PARAMVERSION,
130 password=password,
131 key=key,
132 nacl=TEST_STATIC_NACL)
1f3fd7b0 133
1c2f7f07 134 with self.assertRaises (crypto.InvalidParameter): # key, but salt missing
1f3fd7b0
PG
135 encryptor = crypto.Encrypt (TEST_VERSION,
136 TEST_PARAMVERSION,
137 key=key,
138 nacl=None)
1f3fd7b0 139
1c2f7f07 140 with self.assertRaises (crypto.InvalidParameter): # empty pw
1f3fd7b0
PG
141 encryptor = crypto.Encrypt (TEST_VERSION,
142 TEST_PARAMVERSION,
143 password=b"",
144 nacl=TEST_STATIC_NACL)
90ee2a74
PG
145
146
147 def test_crypto_aes_gcm_enc_header_size (self):
148 password = str (os.urandom (42))
1f3fd7b0 149 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 150 TEST_PARAMVERSION,
1f3fd7b0 151 password=password,
90ee2a74
PG
152 nacl=TEST_STATIC_NACL)
153
154 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
155 assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
cb7a3911 156 _, _ = encryptor.process (TEST_PLAINTEXT)
90ee2a74
PG
157 _, header, _ = encryptor.done (header_dummy)
158 assert len (header) == crypto.PDTCRYPT_HDR_SIZE
5133232d 159
e2a4e4f0 160
5133232d 161 def test_crypto_aes_gcm_enc_chunk_size (self):
90ee2a74 162 password = str (os.urandom (42))
1f3fd7b0 163 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 164 TEST_PARAMVERSION,
1f3fd7b0 165 password=password,
90ee2a74
PG
166 nacl=TEST_STATIC_NACL)
167
168 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
cb7a3911 169 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
90ee2a74
PG
170 assert len (ciphertext) == len (TEST_PLAINTEXT)
171 rest, header, fixed = encryptor.done (header_dummy)
172 assert len (rest) == 0
5133232d 173
e2a4e4f0 174
1f3fd7b0
PG
175 def test_crypto_aes_gcm_dec_ctor (self):
176 """
177 Ensure that only either key or password is accepted.
178 """
179 password = str (os.urandom (42))
180 key = os.urandom (16) # scrypt sized
181
182 decryptor = crypto.Decrypt (password=password)
183 decryptor = crypto.Decrypt (key=key)
184
1c2f7f07 185 with self.assertRaises (crypto.InvalidParameter): # both password and key
1f3fd7b0 186 decryptor = crypto.Decrypt (password=password, key=key)
1f3fd7b0 187
1c2f7f07 188 with self.assertRaises (crypto.InvalidParameter): # neither password nor key
1f3fd7b0 189 decryptor = crypto.Decrypt (password=None, key=None)
1f3fd7b0 190
1c2f7f07 191 with self.assertRaises (crypto.InvalidParameter): # # empty password
1f3fd7b0 192 decryptor = crypto.Decrypt (password="")
1f3fd7b0 193
30019abf 194
e2a4e4f0 195 def test_crypto_aes_gcm_dec_simple (self):
90ee2a74 196 password = str (os.urandom (42))
1f3fd7b0 197 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 198 TEST_PARAMVERSION,
1f3fd7b0 199 password=password,
90ee2a74
PG
200 nacl=TEST_STATIC_NACL)
201
202 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
cb7a3911 203 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
90ee2a74
PG
204 rest, header, fixed = encryptor.done (header_dummy)
205 ciphertext += rest
206
1f3fd7b0 207 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
90ee2a74
PG
208 decryptor.next (header)
209 plaintext = decryptor.process (ciphertext)
3ba1441c 210 rest = decryptor.done ()
90ee2a74
PG
211 plaintext += rest
212
90ee2a74 213 assert plaintext == TEST_PLAINTEXT
7c32c176
PG
214
215
6110ef14
PG
216 def test_crypto_aes_gcm_dec_plain_bad (self):
217 """
218 Downgrade to plaintext must not be allowed in parameters
219 obtained from headers.
220 """
221 password = str (os.urandom (42))
222 encryptor = crypto.Encrypt (TEST_VERSION,
223 TEST_PARAMVERSION,
224 password=password,
225 nacl=TEST_STATIC_NACL)
226
227 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
228 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
229 rest, header, fixed = encryptor.done (header_dummy)
230 ciphertext += rest
231
232 header = crypto.hdr_read (header)
233 header ["paramversion"] = PLAIN_PARAMVERSION
234 ok, header = crypto.hdr_make (header)
235 assert ok
236
237 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
238 with self.assertRaises (crypto.InvalidParameter):
239 decryptor.next (header)
240
241
242 def test_crypto_aes_gcm_dec_plain_ok_insecure (self):
243 """
244 Allow plaintext crypto mode if *insecure* flag is passed.
245 """
246 password = str (os.urandom (42))
247 encryptor = crypto.Encrypt (TEST_VERSION,
248 PLAIN_PARAMVERSION,
249 password=password,
250 nacl=TEST_STATIC_NACL,
251 insecure=True)
252
253 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
254 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
255 rest, header, fixed = encryptor.done (header_dummy)
256 ciphertext += rest
257
258 header = crypto.hdr_read (header)
259 header ["paramversion"] = PLAIN_PARAMVERSION
260 ok, header = crypto.hdr_make (header)
261 assert ok
262
263 decryptor = crypto.Decrypt (password=password,
264 fixedparts=fixed,
265 insecure=True)
266 decryptor.next (header)
267 plaintext = decryptor.process (ciphertext)
268 rest = decryptor.done ()
269 plaintext += rest
270
271 assert plaintext == TEST_PLAINTEXT
272
273
e2a4e4f0 274 def test_crypto_aes_gcm_dec_bad_tag (self):
90ee2a74 275 password = str (os.urandom (42))
1f3fd7b0 276 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 277 TEST_PARAMVERSION,
1f3fd7b0 278 password=password,
90ee2a74
PG
279 nacl=TEST_STATIC_NACL)
280
281 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
cb7a3911 282 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
90ee2a74
PG
283 ciphertext2, header, fixed = encryptor.done (header_dummy)
284
285 mut_header = bytearray (header)
286 mut_header_vw = memoryview (mut_header)
287 # replace one byte in the tag part of the header
288 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
289 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
290 header = bytes (mut_header)
291
1f3fd7b0 292 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
90ee2a74
PG
293 decryptor.next (header)
294 plaintext = decryptor.process (ciphertext)
1c2f7f07 295 with self.assertRaises (crypto.InvalidGCMTag):
3ba1441c 296 _ = decryptor.done ()
e2a4e4f0
PG
297
298
c2d1c3ec
PG
299 def test_crypto_aes_gcm_enc_multicnk (self):
300 cnksiz = 1 << 10
90ee2a74
PG
301 pt = fill_mod (1 << 14)
302 password = str (os.urandom (42))
1f3fd7b0 303 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 304 TEST_PARAMVERSION,
1f3fd7b0 305 password=password,
90ee2a74
PG
306 nacl=TEST_STATIC_NACL)
307 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
c2d1c3ec
PG
308
309 off = 0
310 ct = b""
90ee2a74
PG
311 while off < len (pt):
312 upto = min (off + cnksiz, len (pt))
cb7a3911 313 _, cnk = encryptor.process (pt [off:upto])
c2d1c3ec
PG
314 ct += cnk
315 off += cnksiz
90ee2a74
PG
316 cnk, header, fixed = encryptor.done (header_dummy)
317 ct += cnk
318
319 assert len (pt) == len (ct)
c2d1c3ec
PG
320
321
30019abf
PG
322 def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
323 cnksiz = 1 << 10
324 pt = fill_mod (1 << 14)
325 password = str (os.urandom (42))
326 encryptor = crypto.Encrypt (TEST_VERSION,
327 TEST_PARAMVERSION,
328 password=password,
329 nacl=TEST_STATIC_NACL,
330 strict_ivs=True)
331 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
332
333 off = 0
334 ct = b""
335 while off < len (pt):
336 upto = min (off + cnksiz, len (pt))
cb7a3911 337 _, cnk = encryptor.process (pt [off:upto])
30019abf
PG
338 ct += cnk
339 off += cnksiz
340 cnk, header, fixed = encryptor.done (header_dummy)
341 ct += cnk
342
343 assert len (pt) == len (ct)
344
345
346 def test_crypto_aes_gcm_enc_multiobj (self):
347 cnksiz = 1 << 10
348 password = str (os.urandom (42))
349 encryptor = crypto.Encrypt (TEST_VERSION,
350 TEST_PARAMVERSION,
351 password=password,
352 nacl=TEST_STATIC_NACL,
353 strict_ivs=False)
354
355 def addobj (i):
356 pt = fill_mod (1 << 14, off=i)
357 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
358
359 off = 0
360 ct = b""
361 while off < len (pt):
362 upto = min (off + cnksiz, len (pt))
cb7a3911 363 _, cnk = encryptor.process (pt [off:upto])
30019abf
PG
364 ct += cnk
365 off += cnksiz
366 cnk, header, fixed = encryptor.done (header_dummy)
367 ct += cnk
368
369 assert len (pt) == len (ct)
370
371 for i in range (5): addobj (i)
372
be124bca
PG
373 assert len (encryptor.fixed) == 1
374
30019abf
PG
375
376 def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
377 cnksiz = 1 << 10
378 password = str (os.urandom (42))
379 encryptor = crypto.Encrypt (TEST_VERSION,
380 TEST_PARAMVERSION,
381 password=password,
382 nacl=TEST_STATIC_NACL,
383 strict_ivs=True)
be124bca 384 curfixed = None # must remain constant after first
30019abf
PG
385
386 def addobj (i):
387 pt = fill_mod (1 << 14, off=i)
388 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
389
390 off = 0
391 ct = b""
392 while off < len (pt):
393 upto = min (off + cnksiz, len (pt))
cb7a3911 394 _, cnk = encryptor.process (pt [off:upto])
30019abf
PG
395 ct += cnk
396 off += cnksiz
397 cnk, header, fixed = encryptor.done (header_dummy)
be124bca
PG
398 nonlocal curfixed
399 if curfixed is None:
400 curfixed = fixed
401 else:
402 assert fixed == curfixed
30019abf
PG
403 ct += cnk
404
405 assert len (pt) == len (ct)
406
407 for i in range (5): addobj (i)
408
be124bca
PG
409 assert len (encryptor.fixed) == 1
410
30019abf 411
770173c5
PG
412 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
413 """
414 Test behavior when the file counter tops out.
415
416 Artificially lower the maximum possible file counter. Considering
cb7a3911 417 invalid (0) and reserved (1, 2) values, the smallest possible file counter
770173c5
PG
418 for normal objects is 3. Starting from that, the header of the (max -
419 3)rd object must have both a different IV fixed part and a counter.
420 """
421 minimum = 3
422 new_max = 8
cb7a3911
PG
423 crypto._testing_set_AES_GCM_IV_CNT_MAX \
424 ("I am fully aware that this will void my warranty.", new_max)
770173c5
PG
425 cnksiz = 1 << 10
426 password = str (os.urandom (42))
427 encryptor = crypto.Encrypt (TEST_VERSION,
428 TEST_PARAMVERSION,
429 password=password,
430 nacl=TEST_STATIC_NACL,
431 strict_ivs=True)
432
433 last_iv = None
434 last_cnt = minimum
435
436 def addobj (i, wrap=False):
437 nonlocal last_iv
438 nonlocal last_cnt
439 pt = fill_mod (1 << 14, off=i)
440 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
441
442 off = 0
443 ct = b""
444 while off < len (pt):
445 upto = min (off + cnksiz, len (pt))
cb7a3911 446 _, cnk = encryptor.process (pt [off:upto])
770173c5
PG
447 ct += cnk
448 off += cnksiz
449 cnk, header, fixed = encryptor.done (header_dummy)
450 this_iv = crypto.hdr_read (header) ["iv"]
451 if last_iv is not None:
452 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
453 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
454 if wrap is False:
455 assert last_fixed == this_fixed
456 assert last_cnt == this_cnt - 1
457 else:
458 assert last_fixed != this_fixed
459 assert this_cnt == minimum
460 last_iv = this_iv
461 ct += cnk
462
463 assert len (pt) == len (ct)
464
465 for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
466 addobj (i + 1, True) # counter wraps to 3
467
468 for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
469 addobj (j + 1, True) # counter wraps to 3 again
470
be124bca
PG
471 assert len (encryptor.fixed) == 3
472
473
474 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self):
475 """
476 Test behavior when the file counter tops out and the transition to
477 the next IV fixed part fails on account of a bad random generator.
478
479 Replaces the ``urandom`` reference in ``os`` with a deterministic
480 function. The encryptor context must communicate this condition with an
481 ``IVFixedPartError``.
482 """
483 minimum = 3
484 new_max = 8
485 crypto._testing_set_AES_GCM_IV_CNT_MAX \
486 ("I am fully aware that this will void my warranty.", new_max)
487 cnksiz = 1 << 10
488 os.urandom = lambda n: bytes (bytearray ([n % 256] * n))
489 password = str (os.urandom (42))
490 encryptor = crypto.Encrypt (TEST_VERSION,
491 TEST_PARAMVERSION,
492 password=password,
493 nacl=TEST_STATIC_NACL,
494 strict_ivs=True)
495
496 def addobj (i):
497 pt = fill_mod (1 << 14, off=i)
498 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
499
500 off = 0
501 while off < len (pt):
502 upto = min (off + cnksiz, len (pt))
503 _, cnk = encryptor.process (pt [off:upto])
504 off += cnksiz
505
506 for i in range (minimum, new_max): addobj (42 + i)
507
508 with self.assertRaises (crypto.IVFixedPartError):
509 addobj (42 + i)
510
511
cb7a3911
PG
512
513 def test_crypto_aes_gcm_enc_length_cap (self):
514 """
515 Artificially lower the maximum allowable data length and attempt to
e2f52c53
PG
516 encrypt a larger object. Verify that the crypto handler only encrypts
517 data up to the size limit. A downstream user detects that condition by
518 testing whether the encryption step yielded less bytes than the
519 plaintext.
c7066870
TJ
520
521 The sibling to this test is test_restore_backup_max_file_length()
522 in test_delatar.py. Deltatar will transparently create a splitted object
523 with an increased IV file counter.
cb7a3911
PG
524 """
525 new_max = 2187
526 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
527 ("I am fully aware that this will void my warranty.", new_max)
528 cnksiz = 1 << 10
529 password = str (os.urandom (42))
530 encryptor = crypto.Encrypt (TEST_VERSION,
531 TEST_PARAMVERSION,
532 password=password,
533 nacl=TEST_STATIC_NACL)
534
535 def encobj (s):
536 pt, ct = fill_mod (s), None
537 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
538
539 n, ct = encryptor.process (pt)
540 rest, _, _ = encryptor.done (header_dummy)
366d4b42
PG
541
542 # NB: If this check *ever* fails, then something changed in the
543 # encoding layer. AES-GCM is a stream cipher so each encoding
544 # step will yield the exact number of ciphertext bytes that
545 # was provided as plaintext. Thus there cannot be any encoded
546 # data left when calling the finalizers. None of the crypo code
547 # depends on that assumption but nevertheless we check it here
548 # in case anything changes upstream in the Cryptography
549 # library. In case there actually is a rest, replace the
550 # assertion below with ``ct += rest``.
551 assert (len (rest) == 0)
cb7a3911
PG
552
553 if len (pt) > new_max:
e2f52c53
PG
554 # If the plaintext was longer than the artificially lowered
555 # maximum, then the number of ciphertext bytes must be clamped
556 # to the maximum.
557 assert n == new_max
cb7a3911
PG
558 else:
559 assert n == len (pt) == len (ct)
560
561 for i in range (16): encobj (1 << i)
770173c5
PG
562
563
58ed14b8
PG
564 def test_crypto_aes_gcm_dec_length_cap (self):
565 """
566 The decryptor must reject headers with an object size that exceeds
567 the PDTCRYPT maximum. Longer files split into multiple objects.
568 """
569 password = str (os.urandom (42))
570 meta = faux_hdr()
571 meta ["ctsize"] = crypto.PDTCRYPT_MAX_OBJ_SIZE + 1
572 ok, header = crypto.hdr_make (meta)
573
574 assert ok
575
576 # Set up decryption with bogus header.
577 decryptor = crypto.Decrypt (password=password, fixedparts=[])
578
579 with self.assertRaises (crypto.InvalidHeader):
580 decryptor.next (header)
581
582
583 def test_crypto_aes_gcm_dec_length_mismatch (self):
584 """
585 Catch attempts at decrypting more data than what was stated in the
586 header.
587 """
588 cnksiz = 1 << 10
589 orig_pt = fill_mod (1 << 14)
590 password = str (os.urandom (42))
591 encryptor = crypto.Encrypt (TEST_VERSION,
592 TEST_PARAMVERSION,
593 password=password,
594 nacl=TEST_STATIC_NACL)
595 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
596
597 off = 0
598 ct = b""
599 while off < len (orig_pt):
600 upto = min (off + cnksiz, len (orig_pt))
601 _n, cnk = encryptor.process (orig_pt [off:upto])
602 ct += cnk
603 off += cnksiz
604 cnk, header, fixed = encryptor.done (header_dummy)
605 ct += cnk
606
607 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
608
609 decryptor.next (header)
610 off = 0
611 pt = b""
612 while off < len (orig_pt):
613 upto = min (off + cnksiz, len (orig_pt))
614 cnk = decryptor.process (ct [off:upto])
615 pt += cnk
616 off += cnksiz
617
618 with self.assertRaises (crypto.CiphertextTooLong):
619 # Try and decrypt one byte more than was encrypted.
620 # This must be caught in crypto.py.
621 _ = decryptor.process (ct [0:1])
622
623
c2d1c3ec 624 def test_crypto_aes_gcm_dec_multicnk (self):
90ee2a74
PG
625 cnksiz = 1 << 10
626 orig_pt = fill_mod (1 << 14)
627 password = str (os.urandom (42))
1f3fd7b0 628 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 629 TEST_PARAMVERSION,
1f3fd7b0 630 password=password,
90ee2a74
PG
631 nacl=TEST_STATIC_NACL)
632 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
c2d1c3ec
PG
633
634 off = 0
635 ct = b""
636 while off < len (orig_pt):
637 upto = min (off + cnksiz, len (orig_pt))
cb7a3911 638 _n, cnk = encryptor.process (orig_pt [off:upto])
c2d1c3ec
PG
639 ct += cnk
640 off += cnksiz
90ee2a74
PG
641 cnk, header, fixed = encryptor.done (header_dummy)
642 ct += cnk
c2d1c3ec 643
1f3fd7b0
PG
644 decryptor = crypto.Decrypt (password=password,
645 fixedparts=fixed)
90ee2a74 646 decryptor.next (header)
c2d1c3ec 647 off = 0
90ee2a74 648 pt = b""
c2d1c3ec
PG
649 while off < len (orig_pt):
650 upto = min (off + cnksiz, len (orig_pt))
cb7a3911 651 cnk = decryptor.process (ct [off:upto])
c2d1c3ec
PG
652 pt += cnk
653 off += cnksiz
c2d1c3ec 654
90ee2a74 655
3ba1441c 656 pt += decryptor.done ()
c2d1c3ec
PG
657 assert pt == orig_pt
658
659
7c32c176 660 def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
90ee2a74
PG
661 cnksiz = 1 << 10
662 orig_pt = fill_mod (1 << 14)
663 password = str (os.urandom (42))
1f3fd7b0 664 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 665 TEST_PARAMVERSION,
1f3fd7b0 666 password=password,
90ee2a74
PG
667 nacl=TEST_STATIC_NACL)
668 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
7c32c176
PG
669
670 off = 0
671 ct = b""
672 while off < len (orig_pt):
673 upto = min (off + cnksiz, len (orig_pt))
cb7a3911 674 _n, cnk = encryptor.process (orig_pt [off:upto])
7c32c176
PG
675 ct += cnk
676 off += cnksiz
90ee2a74
PG
677 cnk, header, fixed = encryptor.done (header_dummy)
678 ct += cnk
679
680 mut_header = bytearray (header)
681 mut_header_vw = memoryview (mut_header)
682 # replace one byte in the tag part of the header
683 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
684 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
685 header = bytes (mut_header)
686
1f3fd7b0
PG
687 decryptor = crypto.Decrypt (password=password,
688 fixedparts=fixed)
90ee2a74 689 decryptor.next (header)
7c32c176 690 off = 0
90ee2a74 691 pt = b""
7c32c176
PG
692 while off < len (orig_pt):
693 upto = min (off + cnksiz, len (orig_pt))
90ee2a74 694 cnk = decryptor.process (ct [off:upto])
7c32c176
PG
695 pt += cnk
696 off += cnksiz
697
1c2f7f07 698 with self.assertRaises (crypto.InvalidGCMTag):
3ba1441c 699 _ = decryptor.done ()
da4ea4fa
PG
700
701
2f0b128c
PG
702 def test_crypto_aes_gcm_dec_iv_gap (self):
703 """
704 Encrypt multiple objects using non-consecutive IVs and verify that the
705 decryptor errors out with an exception in strict mode but keeps quiet
706 otherwise.
707 """
708 cnksiz = 1 << 10
709 orig_pt_1 = fill_mod (1 << 10)
710 orig_pt_2 = fill_mod (1 << 10, 23)
711 orig_pt_3 = fill_mod (1 << 10, 42)
712 password = str (os.urandom (42))
713 encryptor = crypto.Encrypt (TEST_VERSION,
714 TEST_PARAMVERSION,
715 password=password,
716 nacl=TEST_STATIC_NACL)
717
718 def enc (pt):
719 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
720
721 off = 0
722 ct = b""
723 while off < len (pt):
724 upto = min (off + cnksiz, len (pt))
725 _n, cnk = encryptor.process (pt [off:upto])
726 ct += cnk
727 off += cnksiz
728 cnk, header, fixed = encryptor.done (header_dummy)
729 return ct + cnk, header, fixed
730
731 ct_1, hdr_1, _____ = enc (orig_pt_1)
732
733 ## Here we bump the iv of the encryptor, breaking the series.
734 encryptor.set_object_counter (encryptor.cnt + 1)
735 ct_2, hdr_2, fixed = enc (orig_pt_2)
736
737 ## IV of final object is again in-sequence.
738 ct_3, hdr_3, fixed = enc (orig_pt_3)
739
740 def decrypt (strict_ivs):
741 decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
742 strict_ivs=strict_ivs)
743
744 def dec (hdr, ct):
745 decryptor.next (hdr)
746 off = 0
747 pt = b""
748 while off < len (ct):
749 upto = min (off + cnksiz, len (ct))
750 cnk = decryptor.process (ct [off:upto])
751 pt += cnk
752 off += cnksiz
753 return pt + decryptor.done ()
754
755 decr_pt_1 = dec (hdr_1, ct_1)
756 decr_pt_2 = dec (hdr_2, ct_2) ## ← good header, non-consecutive IV
757 decr_pt_3 = dec (hdr_3, ct_3)
758
759 assert decr_pt_1 == orig_pt_1
760 assert decr_pt_2 == orig_pt_2
761 assert decr_pt_3 == orig_pt_3
762
763 with self.assertRaises (crypto.NonConsecutiveIV):
764 decrypt (True)
765
766 decrypt (False) # Sequence passes
767
768
fd10b44a
PG
769 def test_crypto_aes_gcm_dec_iv_reuse (self):
770 """
771 Meddle with encrypted content: extract the IV from one object
772 and inject it into the header of another. This must be rejected
773 by the decryptor.
774 """
775 cnksiz = 1 << 10
776 orig_pt_1 = fill_mod (1 << 10)
777 orig_pt_2 = fill_mod (1 << 10, 42)
778 password = str (os.urandom (42))
1f3fd7b0 779 encryptor = crypto.Encrypt (TEST_VERSION,
fd10b44a 780 TEST_PARAMVERSION,
1f3fd7b0 781 password=password,
fd10b44a
PG
782 nacl=TEST_STATIC_NACL)
783
784 def enc (pt):
785 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
786
787 off = 0
788 ct = b""
789 while off < len (pt):
790 upto = min (off + cnksiz, len (pt))
cb7a3911 791 _n, cnk = encryptor.process (pt [off:upto])
fd10b44a
PG
792 ct += cnk
793 off += cnksiz
794 cnk, header, fixed = encryptor.done (header_dummy)
795 return ct + cnk, header, fixed
796
797 ct_1, hdr_1, _____ = enc (orig_pt_1)
798 ct_2, hdr_2, fixed = enc (orig_pt_2)
799
800 mut_hdr_2 = bytearray (hdr_2)
801 mut_hdr_2_vw = memoryview (mut_hdr_2)
802 # get IV
803 iv_lo = crypto.HDR_OFF_IV
804 iv_hi = crypto.HDR_OFF_IV + crypto.PDTCRYPT_HDR_SIZE_IV
805 iv_1 = hdr_1 [iv_lo : iv_hi]
806 # transplant into other header
807 mut_hdr_2_vw [iv_lo : iv_hi] = iv_1
808 hdr_2_mod = bytes (mut_hdr_2)
1f3fd7b0 809 decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
fd10b44a
PG
810 strict_ivs=True)
811
812 def dec (hdr, ct):
813 decryptor.next (hdr)
814 off = 0
815 pt = b""
816 while off < len (ct):
817 upto = min (off + cnksiz, len (ct))
818 cnk = decryptor.process (ct [off:upto])
819 pt += cnk
820 off += cnksiz
821 return pt + decryptor.done ()
822
823 decr_pt_1 = dec (hdr_1, ct_1)
824 decr_pt_2 = dec (hdr_2, ct_2) # good header, different IV
1c2f7f07 825 with self.assertRaises (crypto.DuplicateIV): # bad header, reuse detected
fd10b44a 826 decr_pt_2 = dec (hdr_2_mod, ct_2)
fd10b44a
PG
827
828
cb7397d5
PG
829class HeaderTest (CryptoLayerTest):
830
e2a4e4f0
PG
831 def test_crypto_fmt_hdr_make (self):
832 meta = faux_hdr()
833 ok, hdr = crypto.hdr_make (meta)
834 assert ok
90ee2a74 835 assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
e2a4e4f0
PG
836
837
cb7397d5
PG
838 def test_crypto_fmt_hdr_make_useless (self):
839 ok, ret = crypto.hdr_make ({ 42: "x" })
840 assert ok is False
a83fa4ed 841 assert ret.startswith ("error assembling header:")
cb7397d5
PG
842
843
e2a4e4f0
PG
844 def test_crypto_fmt_hdr_read (self):
845 meta = faux_hdr()
846 ok, hdr = crypto.hdr_make (meta)
90ee2a74
PG
847 assert ok is True
848 assert hdr is not None
849 mmeta = crypto.hdr_read (hdr)
850 assert mmeta is not None
e2a4e4f0
PG
851 for k in meta:
852 if meta [k] != mmeta [k]:
853 raise "header mismatch after reading: expected %r, got %r" \
854 % (meta [k], mmeta [k])
855
856
cb7397d5
PG
857 def test_crypto_fmt_hdr_read_trailing_garbage (self):
858 meta = faux_hdr()
859 ok, hdr = crypto.hdr_make (meta)
90ee2a74
PG
860 ok, hdr = crypto.hdr_make (meta)
861 assert ok is True
862 assert hdr is not None
cb7397d5 863 hdr += b"-junk"
1c2f7f07 864 with self.assertRaises (crypto.InvalidHeader):
90ee2a74 865 _ = crypto.hdr_read (hdr)
cb7397d5
PG
866
867
868 def test_crypto_fmt_hdr_read_leading_garbage (self):
869 meta = faux_hdr()
870 ok, hdr = crypto.hdr_make (meta)
90ee2a74
PG
871 ok, hdr = crypto.hdr_make (meta)
872 assert ok is True
873 assert hdr is not None
cb7397d5 874 hdr = b"junk-" + hdr
1c2f7f07 875 with self.assertRaises (crypto.InvalidHeader):
90ee2a74 876 _ = crypto.hdr_read (hdr)
cb7397d5
PG
877
878
879 def test_crypto_fmt_hdr_inner_garbage (self):
880 meta = faux_hdr()
881 ok, hdr = crypto.hdr_make (meta)
c176405d 882 assert ok
90ee2a74 883 data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
1c2f7f07 884 with self.assertRaises (crypto.InvalidHeader):
90ee2a74 885 _ = crypto.hdr_read (data)
c176405d 886