clarify possible IV reuse with multiple Encrypt handles
[python-delta-tar] / testing / test_crypto.py
CommitLineData
5133232d
PG
1import binascii
2import os
3import pylibscrypt
c2d1c3ec 4import struct
5133232d
PG
5import unittest
6
7import deltatar.crypto as crypto
8
e2a4e4f0
PG
9import cryptography
10
5133232d
PG
11def b(s):
12 return s.encode("UTF-8")
13
90ee2a74 14CRYPTO_NACL_SIZE = 16
5133232d 15CRYPTO_KEY_SIZE = 16
90ee2a74
PG
16
17TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail")
18TEST_PASSPHRASE = b"test1234"
19TEST_AES_GCM_AAD = b"authenticated plain text"
20TEST_DUMMY_FILENAME = "insurance-file.txt"
21TEST_VERSION = 1
22TEST_PARAMVERSION = 1
23TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE)
6110ef14 24PLAIN_PARAMVERSION = 0
5133232d
PG
25
26def faux_hdr (ctsize=1337, iv=None):
27 return \
28 { "version" : 42
29 , "paramversion" : 2187
30 , "nacl" : binascii.unhexlify(b"0011223344556677"
31 b"8899aabbccddeeff")
32 , "iv" : iv or binascii.unhexlify(b"0011223344556677"
33 b"8899aabb")
34 , "ctsize" : ctsize
90ee2a74
PG
35 , "tag" : binascii.unhexlify(b"deadbeefbadb100d"
36 b"b1eedc0ffeedea15")
5133232d
PG
37 }
38
30019abf 39FILL_MOD_MEMO = { }
5133232d 40
fd10b44a 41def fill_mod (n, off=0):
30019abf
PG
42 global FILL_MOD_MEMO
43 k = (n, off)
44 m = FILL_MOD_MEMO.get (k, None)
45 if m is not None:
46 return m
c2d1c3ec
PG
47 buf = bytearray (n)
48 bufv = memoryview (buf)
49 for i in range (n):
fd10b44a
PG
50 off += 1
51 c = off % 64 + 32
c2d1c3ec 52 struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
30019abf
PG
53 m = bytes (buf)
54 FILL_MOD_MEMO [k] = m
55 return m
c2d1c3ec
PG
56
57
5133232d
PG
58def faux_payload ():
59 return "abcd" * 42
60
cb7397d5 61
5133232d 62class CryptoLayerTest (unittest.TestCase):
cb7397d5
PG
63 pass
64
65
66class AESGCMTest (CryptoLayerTest):
5133232d 67
be124bca
PG
68 os_urandom = os.urandom
69
cb7a3911
PG
70 def tearDown (self):
71 """Reset globals altered for testing."""
72 _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
73 ("I am fully aware that this will void my warranty.")
74 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
75 ("I am fully aware that this will void my warranty.")
be124bca 76 os.urandom = self.os_urandom
cb7a3911 77
90ee2a74
PG
78 def test_crypto_aes_gcm_enc_ctor (self):
79 password = str (os.urandom (42))
1f3fd7b0
PG
80 encryptor = crypto.Encrypt (TEST_VERSION,
81 TEST_PARAMVERSION,
82 password=password,
83 nacl=TEST_STATIC_NACL)
84
6110ef14
PG
85 def test_crypto_aes_gcm_enc_ctor_bad_plainparams (self):
86 """Refuse plaintext passthrough mode by default."""
87 password = str (os.urandom (42))
88 with self.assertRaises (crypto.InvalidParameter):
89 encryptor = crypto.Encrypt (TEST_VERSION,
90 PLAIN_PARAMVERSION,
91 password=password,
92 nacl=TEST_STATIC_NACL)
93
94
95 def test_crypto_aes_gcm_enc_ctor_ok_insecure_plainparams (self):
96 """
97 Comply with request for plaintext passthrough mode if the
98 *insecure* flag is passed.
99 """
100 password = str (os.urandom (42))
101 encryptor = crypto.Encrypt (TEST_VERSION,
102 PLAIN_PARAMVERSION,
103 password=password,
104 nacl=TEST_STATIC_NACL,
105 insecure=True)
106
1f3fd7b0
PG
107
108 def test_crypto_aes_gcm_enc_ctor_key (self):
109 key = os.urandom (42)
110 encryptor = crypto.Encrypt (TEST_VERSION,
111 TEST_PARAMVERSION,
112 key=key,
113 nacl=TEST_STATIC_NACL)
114
115
116 def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
117 """
118 Either key (+nacl) or password must be supplied, not both.
119 """
1c2f7f07 120 with self.assertRaises (crypto.InvalidParameter): # neither key nor pw
1f3fd7b0
PG
121 encryptor = crypto.Encrypt (TEST_VERSION,
122 TEST_PARAMVERSION,
123 nacl=TEST_STATIC_NACL)
1f3fd7b0
PG
124
125 password = str (os.urandom (42))
126 key = os.urandom (16) # scrypt sized
1c2f7f07 127 with self.assertRaises (crypto.InvalidParameter): # both key and pw
1f3fd7b0
PG
128 encryptor = crypto.Encrypt (TEST_VERSION,
129 TEST_PARAMVERSION,
130 password=password,
131 key=key,
132 nacl=TEST_STATIC_NACL)
1f3fd7b0 133
1c2f7f07 134 with self.assertRaises (crypto.InvalidParameter): # key, but salt missing
1f3fd7b0
PG
135 encryptor = crypto.Encrypt (TEST_VERSION,
136 TEST_PARAMVERSION,
137 key=key,
138 nacl=None)
1f3fd7b0 139
1c2f7f07 140 with self.assertRaises (crypto.InvalidParameter): # empty pw
1f3fd7b0
PG
141 encryptor = crypto.Encrypt (TEST_VERSION,
142 TEST_PARAMVERSION,
143 password=b"",
144 nacl=TEST_STATIC_NACL)
90ee2a74
PG
145
146
147 def test_crypto_aes_gcm_enc_header_size (self):
148 password = str (os.urandom (42))
1f3fd7b0 149 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 150 TEST_PARAMVERSION,
1f3fd7b0 151 password=password,
90ee2a74
PG
152 nacl=TEST_STATIC_NACL)
153
154 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
155 assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
cb7a3911 156 _, _ = encryptor.process (TEST_PLAINTEXT)
90ee2a74
PG
157 _, header, _ = encryptor.done (header_dummy)
158 assert len (header) == crypto.PDTCRYPT_HDR_SIZE
5133232d 159
e2a4e4f0 160
66b1c6f4
PG
161 def test_crypto_aes_gcm_enc_multi_ivs_ok_explicit_counter (self):
162 """
163 Access the list of IVs used during encryption and check reuse. Start
164 from explicit counters so the inputs don’t overlap. IVs must not have
165 been reused.
166 """
167
168 def enc (start_count, data):
169 password = str (os.urandom (42))
170 encryptor = crypto.Encrypt (TEST_VERSION,
171 TEST_PARAMVERSION,
172 password=password,
173 nacl=TEST_STATIC_NACL,
174 counter=start_count)
175
176 for i, blob in enumerate (data, 1):
177 fname = "%s_%d" % (TEST_DUMMY_FILENAME, i)
178 header_dummy = encryptor.next (fname)
179 _, _ = encryptor.process (blob)
180 _, header, _ = encryptor.done (header_dummy)
181 assert len (encryptor.get_used_ivs ()) == i
182
183 return encryptor.get_used_ivs ()
184
185 ivs1 = enc (0x0042, [TEST_PLAINTEXT, b"none of your business"])
186 ivs2 = enc (0x1337, [b"read me if you can", b"for British eyes only!"])
187
188 # No reuse in general.
189 assert len (ivs1 & ivs2) == 0
190
191 ivs1 = list (ivs1)
192 ivs2 = list (ivs2)
193
194 # Counters of used IVs must match what we passed explicitly.
195 def extract_counters (ivs):
196 def getcount (iv):
197 _, cnt = struct.unpack (crypto.FMT_I2N_IV, iv)
198 return cnt
199 return list (map (getcount, ivs))
200
201 cnt1 = extract_counters (ivs1)
202 cnt2 = extract_counters (ivs2)
203
204 assert 0x0042 in cnt1
205 assert 0x0043 in cnt1
206
207 assert 0x1337 in cnt2
208 assert 0x1338 in cnt2
209
210
5133232d 211 def test_crypto_aes_gcm_enc_chunk_size (self):
90ee2a74 212 password = str (os.urandom (42))
1f3fd7b0 213 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 214 TEST_PARAMVERSION,
1f3fd7b0 215 password=password,
90ee2a74
PG
216 nacl=TEST_STATIC_NACL)
217
218 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
cb7a3911 219 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
90ee2a74
PG
220 assert len (ciphertext) == len (TEST_PLAINTEXT)
221 rest, header, fixed = encryptor.done (header_dummy)
222 assert len (rest) == 0
5133232d 223
e2a4e4f0 224
1f3fd7b0
PG
225 def test_crypto_aes_gcm_dec_ctor (self):
226 """
227 Ensure that only either key or password is accepted.
228 """
229 password = str (os.urandom (42))
230 key = os.urandom (16) # scrypt sized
231
232 decryptor = crypto.Decrypt (password=password)
233 decryptor = crypto.Decrypt (key=key)
234
1c2f7f07 235 with self.assertRaises (crypto.InvalidParameter): # both password and key
1f3fd7b0 236 decryptor = crypto.Decrypt (password=password, key=key)
1f3fd7b0 237
1c2f7f07 238 with self.assertRaises (crypto.InvalidParameter): # neither password nor key
1f3fd7b0 239 decryptor = crypto.Decrypt (password=None, key=None)
1f3fd7b0 240
1c2f7f07 241 with self.assertRaises (crypto.InvalidParameter): # # empty password
1f3fd7b0 242 decryptor = crypto.Decrypt (password="")
1f3fd7b0 243
30019abf 244
e2a4e4f0 245 def test_crypto_aes_gcm_dec_simple (self):
90ee2a74 246 password = str (os.urandom (42))
1f3fd7b0 247 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 248 TEST_PARAMVERSION,
1f3fd7b0 249 password=password,
90ee2a74
PG
250 nacl=TEST_STATIC_NACL)
251
252 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
cb7a3911 253 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
90ee2a74
PG
254 rest, header, fixed = encryptor.done (header_dummy)
255 ciphertext += rest
256
1f3fd7b0 257 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
90ee2a74
PG
258 decryptor.next (header)
259 plaintext = decryptor.process (ciphertext)
3ba1441c 260 rest = decryptor.done ()
90ee2a74
PG
261 plaintext += rest
262
90ee2a74 263 assert plaintext == TEST_PLAINTEXT
7c32c176
PG
264
265
6110ef14
PG
266 def test_crypto_aes_gcm_dec_plain_bad (self):
267 """
268 Downgrade to plaintext must not be allowed in parameters
269 obtained from headers.
270 """
271 password = str (os.urandom (42))
272 encryptor = crypto.Encrypt (TEST_VERSION,
273 TEST_PARAMVERSION,
274 password=password,
275 nacl=TEST_STATIC_NACL)
276
277 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
278 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
279 rest, header, fixed = encryptor.done (header_dummy)
280 ciphertext += rest
281
282 header = crypto.hdr_read (header)
283 header ["paramversion"] = PLAIN_PARAMVERSION
284 ok, header = crypto.hdr_make (header)
285 assert ok
286
287 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
288 with self.assertRaises (crypto.InvalidParameter):
289 decryptor.next (header)
290
291
292 def test_crypto_aes_gcm_dec_plain_ok_insecure (self):
293 """
294 Allow plaintext crypto mode if *insecure* flag is passed.
295 """
296 password = str (os.urandom (42))
297 encryptor = crypto.Encrypt (TEST_VERSION,
298 PLAIN_PARAMVERSION,
299 password=password,
300 nacl=TEST_STATIC_NACL,
301 insecure=True)
302
303 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
304 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
305 rest, header, fixed = encryptor.done (header_dummy)
306 ciphertext += rest
307
308 header = crypto.hdr_read (header)
309 header ["paramversion"] = PLAIN_PARAMVERSION
310 ok, header = crypto.hdr_make (header)
311 assert ok
312
313 decryptor = crypto.Decrypt (password=password,
314 fixedparts=fixed,
315 insecure=True)
316 decryptor.next (header)
317 plaintext = decryptor.process (ciphertext)
318 rest = decryptor.done ()
319 plaintext += rest
320
321 assert plaintext == TEST_PLAINTEXT
322
323
e2a4e4f0 324 def test_crypto_aes_gcm_dec_bad_tag (self):
90ee2a74 325 password = str (os.urandom (42))
1f3fd7b0 326 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 327 TEST_PARAMVERSION,
1f3fd7b0 328 password=password,
90ee2a74
PG
329 nacl=TEST_STATIC_NACL)
330
331 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
cb7a3911 332 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
90ee2a74
PG
333 ciphertext2, header, fixed = encryptor.done (header_dummy)
334
335 mut_header = bytearray (header)
336 mut_header_vw = memoryview (mut_header)
337 # replace one byte in the tag part of the header
338 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
339 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
340 header = bytes (mut_header)
341
1f3fd7b0 342 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
90ee2a74
PG
343 decryptor.next (header)
344 plaintext = decryptor.process (ciphertext)
1c2f7f07 345 with self.assertRaises (crypto.InvalidGCMTag):
3ba1441c 346 _ = decryptor.done ()
e2a4e4f0
PG
347
348
c2d1c3ec
PG
349 def test_crypto_aes_gcm_enc_multicnk (self):
350 cnksiz = 1 << 10
90ee2a74
PG
351 pt = fill_mod (1 << 14)
352 password = str (os.urandom (42))
1f3fd7b0 353 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 354 TEST_PARAMVERSION,
1f3fd7b0 355 password=password,
90ee2a74
PG
356 nacl=TEST_STATIC_NACL)
357 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
c2d1c3ec
PG
358
359 off = 0
360 ct = b""
90ee2a74
PG
361 while off < len (pt):
362 upto = min (off + cnksiz, len (pt))
cb7a3911 363 _, cnk = encryptor.process (pt [off:upto])
c2d1c3ec
PG
364 ct += cnk
365 off += cnksiz
90ee2a74
PG
366 cnk, header, fixed = encryptor.done (header_dummy)
367 ct += cnk
368
369 assert len (pt) == len (ct)
c2d1c3ec
PG
370
371
30019abf
PG
372 def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
373 cnksiz = 1 << 10
374 pt = fill_mod (1 << 14)
375 password = str (os.urandom (42))
376 encryptor = crypto.Encrypt (TEST_VERSION,
377 TEST_PARAMVERSION,
378 password=password,
379 nacl=TEST_STATIC_NACL,
380 strict_ivs=True)
381 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
382
383 off = 0
384 ct = b""
385 while off < len (pt):
386 upto = min (off + cnksiz, len (pt))
cb7a3911 387 _, cnk = encryptor.process (pt [off:upto])
30019abf
PG
388 ct += cnk
389 off += cnksiz
390 cnk, header, fixed = encryptor.done (header_dummy)
391 ct += cnk
392
393 assert len (pt) == len (ct)
394
395
396 def test_crypto_aes_gcm_enc_multiobj (self):
397 cnksiz = 1 << 10
398 password = str (os.urandom (42))
399 encryptor = crypto.Encrypt (TEST_VERSION,
400 TEST_PARAMVERSION,
401 password=password,
402 nacl=TEST_STATIC_NACL,
403 strict_ivs=False)
404
405 def addobj (i):
406 pt = fill_mod (1 << 14, off=i)
407 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
408
409 off = 0
410 ct = b""
411 while off < len (pt):
412 upto = min (off + cnksiz, len (pt))
cb7a3911 413 _, cnk = encryptor.process (pt [off:upto])
30019abf
PG
414 ct += cnk
415 off += cnksiz
416 cnk, header, fixed = encryptor.done (header_dummy)
417 ct += cnk
418
419 assert len (pt) == len (ct)
420
421 for i in range (5): addobj (i)
422
be124bca
PG
423 assert len (encryptor.fixed) == 1
424
30019abf
PG
425
426 def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
427 cnksiz = 1 << 10
428 password = str (os.urandom (42))
429 encryptor = crypto.Encrypt (TEST_VERSION,
430 TEST_PARAMVERSION,
431 password=password,
432 nacl=TEST_STATIC_NACL,
433 strict_ivs=True)
be124bca 434 curfixed = None # must remain constant after first
30019abf
PG
435
436 def addobj (i):
437 pt = fill_mod (1 << 14, off=i)
438 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
439
440 off = 0
441 ct = b""
442 while off < len (pt):
443 upto = min (off + cnksiz, len (pt))
cb7a3911 444 _, cnk = encryptor.process (pt [off:upto])
30019abf
PG
445 ct += cnk
446 off += cnksiz
447 cnk, header, fixed = encryptor.done (header_dummy)
be124bca
PG
448 nonlocal curfixed
449 if curfixed is None:
450 curfixed = fixed
451 else:
452 assert fixed == curfixed
30019abf
PG
453 ct += cnk
454
455 assert len (pt) == len (ct)
456
457 for i in range (5): addobj (i)
458
be124bca
PG
459 assert len (encryptor.fixed) == 1
460
30019abf 461
770173c5
PG
462 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
463 """
464 Test behavior when the file counter tops out.
465
466 Artificially lower the maximum possible file counter. Considering
cb7a3911 467 invalid (0) and reserved (1, 2) values, the smallest possible file counter
770173c5
PG
468 for normal objects is 3. Starting from that, the header of the (max -
469 3)rd object must have both a different IV fixed part and a counter.
470 """
471 minimum = 3
472 new_max = 8
cb7a3911
PG
473 crypto._testing_set_AES_GCM_IV_CNT_MAX \
474 ("I am fully aware that this will void my warranty.", new_max)
770173c5
PG
475 cnksiz = 1 << 10
476 password = str (os.urandom (42))
477 encryptor = crypto.Encrypt (TEST_VERSION,
478 TEST_PARAMVERSION,
479 password=password,
480 nacl=TEST_STATIC_NACL,
481 strict_ivs=True)
482
483 last_iv = None
484 last_cnt = minimum
485
486 def addobj (i, wrap=False):
487 nonlocal last_iv
488 nonlocal last_cnt
489 pt = fill_mod (1 << 14, off=i)
490 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
491
492 off = 0
493 ct = b""
494 while off < len (pt):
495 upto = min (off + cnksiz, len (pt))
cb7a3911 496 _, cnk = encryptor.process (pt [off:upto])
770173c5
PG
497 ct += cnk
498 off += cnksiz
499 cnk, header, fixed = encryptor.done (header_dummy)
500 this_iv = crypto.hdr_read (header) ["iv"]
501 if last_iv is not None:
502 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
503 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
504 if wrap is False:
505 assert last_fixed == this_fixed
506 assert last_cnt == this_cnt - 1
507 else:
508 assert last_fixed != this_fixed
509 assert this_cnt == minimum
510 last_iv = this_iv
511 ct += cnk
512
513 assert len (pt) == len (ct)
514
515 for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
516 addobj (i + 1, True) # counter wraps to 3
517
518 for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
519 addobj (j + 1, True) # counter wraps to 3 again
520
be124bca
PG
521 assert len (encryptor.fixed) == 3
522
523
524 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self):
525 """
526 Test behavior when the file counter tops out and the transition to
527 the next IV fixed part fails on account of a bad random generator.
528
529 Replaces the ``urandom`` reference in ``os`` with a deterministic
530 function. The encryptor context must communicate this condition with an
531 ``IVFixedPartError``.
532 """
533 minimum = 3
534 new_max = 8
535 crypto._testing_set_AES_GCM_IV_CNT_MAX \
536 ("I am fully aware that this will void my warranty.", new_max)
537 cnksiz = 1 << 10
538 os.urandom = lambda n: bytes (bytearray ([n % 256] * n))
539 password = str (os.urandom (42))
540 encryptor = crypto.Encrypt (TEST_VERSION,
541 TEST_PARAMVERSION,
542 password=password,
543 nacl=TEST_STATIC_NACL,
544 strict_ivs=True)
545
546 def addobj (i):
547 pt = fill_mod (1 << 14, off=i)
548 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
549
550 off = 0
551 while off < len (pt):
552 upto = min (off + cnksiz, len (pt))
553 _, cnk = encryptor.process (pt [off:upto])
554 off += cnksiz
555
556 for i in range (minimum, new_max): addobj (42 + i)
557
558 with self.assertRaises (crypto.IVFixedPartError):
559 addobj (42 + i)
560
561
cb7a3911
PG
562
563 def test_crypto_aes_gcm_enc_length_cap (self):
564 """
565 Artificially lower the maximum allowable data length and attempt to
e2f52c53
PG
566 encrypt a larger object. Verify that the crypto handler only encrypts
567 data up to the size limit. A downstream user detects that condition by
568 testing whether the encryption step yielded less bytes than the
569 plaintext.
c7066870
TJ
570
571 The sibling to this test is test_restore_backup_max_file_length()
572 in test_delatar.py. Deltatar will transparently create a splitted object
573 with an increased IV file counter.
cb7a3911
PG
574 """
575 new_max = 2187
576 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
577 ("I am fully aware that this will void my warranty.", new_max)
578 cnksiz = 1 << 10
579 password = str (os.urandom (42))
580 encryptor = crypto.Encrypt (TEST_VERSION,
581 TEST_PARAMVERSION,
582 password=password,
583 nacl=TEST_STATIC_NACL)
584
585 def encobj (s):
586 pt, ct = fill_mod (s), None
587 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
588
589 n, ct = encryptor.process (pt)
590 rest, _, _ = encryptor.done (header_dummy)
366d4b42
PG
591
592 # NB: If this check *ever* fails, then something changed in the
593 # encoding layer. AES-GCM is a stream cipher so each encoding
594 # step will yield the exact number of ciphertext bytes that
595 # was provided as plaintext. Thus there cannot be any encoded
596 # data left when calling the finalizers. None of the crypo code
597 # depends on that assumption but nevertheless we check it here
598 # in case anything changes upstream in the Cryptography
599 # library. In case there actually is a rest, replace the
600 # assertion below with ``ct += rest``.
601 assert (len (rest) == 0)
cb7a3911
PG
602
603 if len (pt) > new_max:
e2f52c53
PG
604 # If the plaintext was longer than the artificially lowered
605 # maximum, then the number of ciphertext bytes must be clamped
606 # to the maximum.
607 assert n == new_max
cb7a3911
PG
608 else:
609 assert n == len (pt) == len (ct)
610
611 for i in range (16): encobj (1 << i)
770173c5
PG
612
613
58ed14b8
PG
614 def test_crypto_aes_gcm_dec_length_cap (self):
615 """
616 The decryptor must reject headers with an object size that exceeds
617 the PDTCRYPT maximum. Longer files split into multiple objects.
618 """
619 password = str (os.urandom (42))
620 meta = faux_hdr()
621 meta ["ctsize"] = crypto.PDTCRYPT_MAX_OBJ_SIZE + 1
622 ok, header = crypto.hdr_make (meta)
623
624 assert ok
625
626 # Set up decryption with bogus header.
627 decryptor = crypto.Decrypt (password=password, fixedparts=[])
628
629 with self.assertRaises (crypto.InvalidHeader):
630 decryptor.next (header)
631
632
633 def test_crypto_aes_gcm_dec_length_mismatch (self):
634 """
635 Catch attempts at decrypting more data than what was stated in the
636 header.
637 """
638 cnksiz = 1 << 10
639 orig_pt = fill_mod (1 << 14)
640 password = str (os.urandom (42))
641 encryptor = crypto.Encrypt (TEST_VERSION,
642 TEST_PARAMVERSION,
643 password=password,
644 nacl=TEST_STATIC_NACL)
645 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
646
647 off = 0
648 ct = b""
649 while off < len (orig_pt):
650 upto = min (off + cnksiz, len (orig_pt))
651 _n, cnk = encryptor.process (orig_pt [off:upto])
652 ct += cnk
653 off += cnksiz
654 cnk, header, fixed = encryptor.done (header_dummy)
655 ct += cnk
656
657 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
658
659 decryptor.next (header)
660 off = 0
661 pt = b""
662 while off < len (orig_pt):
663 upto = min (off + cnksiz, len (orig_pt))
664 cnk = decryptor.process (ct [off:upto])
665 pt += cnk
666 off += cnksiz
667
668 with self.assertRaises (crypto.CiphertextTooLong):
669 # Try and decrypt one byte more than was encrypted.
670 # This must be caught in crypto.py.
671 _ = decryptor.process (ct [0:1])
672
673
c2d1c3ec 674 def test_crypto_aes_gcm_dec_multicnk (self):
90ee2a74
PG
675 cnksiz = 1 << 10
676 orig_pt = fill_mod (1 << 14)
677 password = str (os.urandom (42))
1f3fd7b0 678 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 679 TEST_PARAMVERSION,
1f3fd7b0 680 password=password,
90ee2a74
PG
681 nacl=TEST_STATIC_NACL)
682 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
c2d1c3ec
PG
683
684 off = 0
685 ct = b""
686 while off < len (orig_pt):
687 upto = min (off + cnksiz, len (orig_pt))
cb7a3911 688 _n, cnk = encryptor.process (orig_pt [off:upto])
c2d1c3ec
PG
689 ct += cnk
690 off += cnksiz
90ee2a74
PG
691 cnk, header, fixed = encryptor.done (header_dummy)
692 ct += cnk
c2d1c3ec 693
1f3fd7b0
PG
694 decryptor = crypto.Decrypt (password=password,
695 fixedparts=fixed)
90ee2a74 696 decryptor.next (header)
c2d1c3ec 697 off = 0
90ee2a74 698 pt = b""
c2d1c3ec
PG
699 while off < len (orig_pt):
700 upto = min (off + cnksiz, len (orig_pt))
cb7a3911 701 cnk = decryptor.process (ct [off:upto])
c2d1c3ec
PG
702 pt += cnk
703 off += cnksiz
c2d1c3ec 704
90ee2a74 705
3ba1441c 706 pt += decryptor.done ()
c2d1c3ec
PG
707 assert pt == orig_pt
708
709
7c32c176 710 def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
90ee2a74
PG
711 cnksiz = 1 << 10
712 orig_pt = fill_mod (1 << 14)
713 password = str (os.urandom (42))
1f3fd7b0 714 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 715 TEST_PARAMVERSION,
1f3fd7b0 716 password=password,
90ee2a74
PG
717 nacl=TEST_STATIC_NACL)
718 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
7c32c176
PG
719
720 off = 0
721 ct = b""
722 while off < len (orig_pt):
723 upto = min (off + cnksiz, len (orig_pt))
cb7a3911 724 _n, cnk = encryptor.process (orig_pt [off:upto])
7c32c176
PG
725 ct += cnk
726 off += cnksiz
90ee2a74
PG
727 cnk, header, fixed = encryptor.done (header_dummy)
728 ct += cnk
729
730 mut_header = bytearray (header)
731 mut_header_vw = memoryview (mut_header)
732 # replace one byte in the tag part of the header
733 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
734 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
735 header = bytes (mut_header)
736
1f3fd7b0
PG
737 decryptor = crypto.Decrypt (password=password,
738 fixedparts=fixed)
90ee2a74 739 decryptor.next (header)
7c32c176 740 off = 0
90ee2a74 741 pt = b""
7c32c176
PG
742 while off < len (orig_pt):
743 upto = min (off + cnksiz, len (orig_pt))
90ee2a74 744 cnk = decryptor.process (ct [off:upto])
7c32c176
PG
745 pt += cnk
746 off += cnksiz
747
1c2f7f07 748 with self.assertRaises (crypto.InvalidGCMTag):
3ba1441c 749 _ = decryptor.done ()
da4ea4fa
PG
750
751
2f0b128c
PG
752 def test_crypto_aes_gcm_dec_iv_gap (self):
753 """
754 Encrypt multiple objects using non-consecutive IVs and verify that the
755 decryptor errors out with an exception in strict mode but keeps quiet
756 otherwise.
757 """
758 cnksiz = 1 << 10
759 orig_pt_1 = fill_mod (1 << 10)
760 orig_pt_2 = fill_mod (1 << 10, 23)
761 orig_pt_3 = fill_mod (1 << 10, 42)
762 password = str (os.urandom (42))
763 encryptor = crypto.Encrypt (TEST_VERSION,
764 TEST_PARAMVERSION,
765 password=password,
766 nacl=TEST_STATIC_NACL)
767
768 def enc (pt):
769 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
770
771 off = 0
772 ct = b""
773 while off < len (pt):
774 upto = min (off + cnksiz, len (pt))
775 _n, cnk = encryptor.process (pt [off:upto])
776 ct += cnk
777 off += cnksiz
778 cnk, header, fixed = encryptor.done (header_dummy)
779 return ct + cnk, header, fixed
780
781 ct_1, hdr_1, _____ = enc (orig_pt_1)
782
783 ## Here we bump the iv of the encryptor, breaking the series.
784 encryptor.set_object_counter (encryptor.cnt + 1)
785 ct_2, hdr_2, fixed = enc (orig_pt_2)
786
787 ## IV of final object is again in-sequence.
788 ct_3, hdr_3, fixed = enc (orig_pt_3)
789
790 def decrypt (strict_ivs):
791 decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
792 strict_ivs=strict_ivs)
793
794 def dec (hdr, ct):
795 decryptor.next (hdr)
796 off = 0
797 pt = b""
798 while off < len (ct):
799 upto = min (off + cnksiz, len (ct))
800 cnk = decryptor.process (ct [off:upto])
801 pt += cnk
802 off += cnksiz
803 return pt + decryptor.done ()
804
805 decr_pt_1 = dec (hdr_1, ct_1)
806 decr_pt_2 = dec (hdr_2, ct_2) ## ← good header, non-consecutive IV
807 decr_pt_3 = dec (hdr_3, ct_3)
808
809 assert decr_pt_1 == orig_pt_1
810 assert decr_pt_2 == orig_pt_2
811 assert decr_pt_3 == orig_pt_3
812
813 with self.assertRaises (crypto.NonConsecutiveIV):
814 decrypt (True)
815
816 decrypt (False) # Sequence passes
817
818
fd10b44a
PG
819 def test_crypto_aes_gcm_dec_iv_reuse (self):
820 """
821 Meddle with encrypted content: extract the IV from one object
822 and inject it into the header of another. This must be rejected
823 by the decryptor.
824 """
825 cnksiz = 1 << 10
826 orig_pt_1 = fill_mod (1 << 10)
827 orig_pt_2 = fill_mod (1 << 10, 42)
828 password = str (os.urandom (42))
1f3fd7b0 829 encryptor = crypto.Encrypt (TEST_VERSION,
fd10b44a 830 TEST_PARAMVERSION,
1f3fd7b0 831 password=password,
fd10b44a
PG
832 nacl=TEST_STATIC_NACL)
833
834 def enc (pt):
835 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
836
837 off = 0
838 ct = b""
839 while off < len (pt):
840 upto = min (off + cnksiz, len (pt))
cb7a3911 841 _n, cnk = encryptor.process (pt [off:upto])
fd10b44a
PG
842 ct += cnk
843 off += cnksiz
844 cnk, header, fixed = encryptor.done (header_dummy)
845 return ct + cnk, header, fixed
846
847 ct_1, hdr_1, _____ = enc (orig_pt_1)
66b1c6f4 848 encryptor.cnt -= 1 # induce error by forcing an identical IV on next step
fd10b44a 849
66b1c6f4
PG
850 with self.assertRaises (crypto.DuplicateIV): # reuse detected
851 ct_2, hdr_2, fixed = enc (orig_pt_2)
fd10b44a
PG
852
853
cb7397d5
PG
854class HeaderTest (CryptoLayerTest):
855
e2a4e4f0
PG
856 def test_crypto_fmt_hdr_make (self):
857 meta = faux_hdr()
858 ok, hdr = crypto.hdr_make (meta)
859 assert ok
90ee2a74 860 assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
e2a4e4f0
PG
861
862
cb7397d5
PG
863 def test_crypto_fmt_hdr_make_useless (self):
864 ok, ret = crypto.hdr_make ({ 42: "x" })
865 assert ok is False
a83fa4ed 866 assert ret.startswith ("error assembling header:")
cb7397d5
PG
867
868
e2a4e4f0
PG
869 def test_crypto_fmt_hdr_read (self):
870 meta = faux_hdr()
871 ok, hdr = crypto.hdr_make (meta)
90ee2a74
PG
872 assert ok is True
873 assert hdr is not None
874 mmeta = crypto.hdr_read (hdr)
875 assert mmeta is not None
e2a4e4f0
PG
876 for k in meta:
877 if meta [k] != mmeta [k]:
878 raise "header mismatch after reading: expected %r, got %r" \
879 % (meta [k], mmeta [k])
880
881
cb7397d5
PG
882 def test_crypto_fmt_hdr_read_trailing_garbage (self):
883 meta = faux_hdr()
884 ok, hdr = crypto.hdr_make (meta)
90ee2a74
PG
885 ok, hdr = crypto.hdr_make (meta)
886 assert ok is True
887 assert hdr is not None
cb7397d5 888 hdr += b"-junk"
1c2f7f07 889 with self.assertRaises (crypto.InvalidHeader):
90ee2a74 890 _ = crypto.hdr_read (hdr)
cb7397d5
PG
891
892
893 def test_crypto_fmt_hdr_read_leading_garbage (self):
894 meta = faux_hdr()
895 ok, hdr = crypto.hdr_make (meta)
90ee2a74
PG
896 ok, hdr = crypto.hdr_make (meta)
897 assert ok is True
898 assert hdr is not None
cb7397d5 899 hdr = b"junk-" + hdr
1c2f7f07 900 with self.assertRaises (crypto.InvalidHeader):
90ee2a74 901 _ = crypto.hdr_read (hdr)
cb7397d5
PG
902
903
904 def test_crypto_fmt_hdr_inner_garbage (self):
905 meta = faux_hdr()
906 ok, hdr = crypto.hdr_make (meta)
c176405d 907 assert ok
90ee2a74 908 data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
1c2f7f07 909 with self.assertRaises (crypto.InvalidHeader):
90ee2a74 910 _ = crypto.hdr_read (data)
c176405d 911