Merge branch 'crypto-review'
[python-delta-tar] / testing / test_crypto.py
CommitLineData
5133232d
PG
1import binascii
2import os
3import pylibscrypt
c2d1c3ec 4import struct
5133232d
PG
5import unittest
6
7import deltatar.crypto as crypto
8
e2a4e4f0
PG
9import cryptography
10
5133232d
PG
11def b(s):
12 return s.encode("UTF-8")
13
90ee2a74 14CRYPTO_NACL_SIZE = 16
5133232d 15CRYPTO_KEY_SIZE = 16
90ee2a74
PG
16
17TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail")
18TEST_PASSPHRASE = b"test1234"
19TEST_AES_GCM_AAD = b"authenticated plain text"
20TEST_DUMMY_FILENAME = "insurance-file.txt"
21TEST_VERSION = 1
22TEST_PARAMVERSION = 1
23TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE)
6110ef14 24PLAIN_PARAMVERSION = 0
5133232d
PG
25
26def faux_hdr (ctsize=1337, iv=None):
27 return \
28 { "version" : 42
29 , "paramversion" : 2187
30 , "nacl" : binascii.unhexlify(b"0011223344556677"
31 b"8899aabbccddeeff")
32 , "iv" : iv or binascii.unhexlify(b"0011223344556677"
33 b"8899aabb")
34 , "ctsize" : ctsize
90ee2a74
PG
35 , "tag" : binascii.unhexlify(b"deadbeefbadb100d"
36 b"b1eedc0ffeedea15")
5133232d
PG
37 }
38
30019abf 39FILL_MOD_MEMO = { }
5133232d 40
fd10b44a 41def fill_mod (n, off=0):
30019abf
PG
42 global FILL_MOD_MEMO
43 k = (n, off)
44 m = FILL_MOD_MEMO.get (k, None)
45 if m is not None:
46 return m
c2d1c3ec
PG
47 buf = bytearray (n)
48 bufv = memoryview (buf)
49 for i in range (n):
fd10b44a
PG
50 off += 1
51 c = off % 64 + 32
c2d1c3ec 52 struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
30019abf
PG
53 m = bytes (buf)
54 FILL_MOD_MEMO [k] = m
55 return m
c2d1c3ec
PG
56
57
5133232d
PG
58def faux_payload ():
59 return "abcd" * 42
60
cb7397d5 61
5133232d 62class CryptoLayerTest (unittest.TestCase):
cb7397d5
PG
63 pass
64
65
66class AESGCMTest (CryptoLayerTest):
5133232d 67
be124bca
PG
68 os_urandom = os.urandom
69
cb7a3911
PG
70 def tearDown (self):
71 """Reset globals altered for testing."""
72 _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
73 ("I am fully aware that this will void my warranty.")
74 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
75 ("I am fully aware that this will void my warranty.")
be124bca 76 os.urandom = self.os_urandom
cb7a3911 77
90ee2a74
PG
78 def test_crypto_aes_gcm_enc_ctor (self):
79 password = str (os.urandom (42))
1f3fd7b0
PG
80 encryptor = crypto.Encrypt (TEST_VERSION,
81 TEST_PARAMVERSION,
82 password=password,
83 nacl=TEST_STATIC_NACL)
84
6110ef14
PG
85 def test_crypto_aes_gcm_enc_ctor_bad_plainparams (self):
86 """Refuse plaintext passthrough mode by default."""
87 password = str (os.urandom (42))
88 with self.assertRaises (crypto.InvalidParameter):
89 encryptor = crypto.Encrypt (TEST_VERSION,
90 PLAIN_PARAMVERSION,
91 password=password,
92 nacl=TEST_STATIC_NACL)
93
94
95 def test_crypto_aes_gcm_enc_ctor_ok_insecure_plainparams (self):
96 """
97 Comply with request for plaintext passthrough mode if the
98 *insecure* flag is passed.
99 """
100 password = str (os.urandom (42))
101 encryptor = crypto.Encrypt (TEST_VERSION,
102 PLAIN_PARAMVERSION,
103 password=password,
104 nacl=TEST_STATIC_NACL,
105 insecure=True)
106
1f3fd7b0
PG
107
108 def test_crypto_aes_gcm_enc_ctor_key (self):
109 key = os.urandom (42)
110 encryptor = crypto.Encrypt (TEST_VERSION,
111 TEST_PARAMVERSION,
112 key=key,
113 nacl=TEST_STATIC_NACL)
114
115
116 def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
117 """
118 Either key (+nacl) or password must be supplied, not both.
119 """
1c2f7f07 120 with self.assertRaises (crypto.InvalidParameter): # neither key nor pw
1f3fd7b0
PG
121 encryptor = crypto.Encrypt (TEST_VERSION,
122 TEST_PARAMVERSION,
123 nacl=TEST_STATIC_NACL)
1f3fd7b0
PG
124
125 password = str (os.urandom (42))
126 key = os.urandom (16) # scrypt sized
1c2f7f07 127 with self.assertRaises (crypto.InvalidParameter): # both key and pw
1f3fd7b0
PG
128 encryptor = crypto.Encrypt (TEST_VERSION,
129 TEST_PARAMVERSION,
130 password=password,
131 key=key,
132 nacl=TEST_STATIC_NACL)
1f3fd7b0 133
1c2f7f07 134 with self.assertRaises (crypto.InvalidParameter): # key, but salt missing
1f3fd7b0
PG
135 encryptor = crypto.Encrypt (TEST_VERSION,
136 TEST_PARAMVERSION,
137 key=key,
138 nacl=None)
1f3fd7b0 139
1c2f7f07 140 with self.assertRaises (crypto.InvalidParameter): # empty pw
1f3fd7b0
PG
141 encryptor = crypto.Encrypt (TEST_VERSION,
142 TEST_PARAMVERSION,
143 password=b"",
144 nacl=TEST_STATIC_NACL)
90ee2a74
PG
145
146
147 def test_crypto_aes_gcm_enc_header_size (self):
148 password = str (os.urandom (42))
1f3fd7b0 149 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 150 TEST_PARAMVERSION,
1f3fd7b0 151 password=password,
90ee2a74
PG
152 nacl=TEST_STATIC_NACL)
153
154 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
155 assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
cb7a3911 156 _, _ = encryptor.process (TEST_PLAINTEXT)
90ee2a74
PG
157 _, header, _ = encryptor.done (header_dummy)
158 assert len (header) == crypto.PDTCRYPT_HDR_SIZE
5133232d 159
e2a4e4f0 160
66b1c6f4
PG
161 def test_crypto_aes_gcm_enc_multi_ivs_ok_explicit_counter (self):
162 """
163 Access the list of IVs used during encryption and check reuse. Start
164 from explicit counters so the inputs don’t overlap. IVs must not have
165 been reused.
166 """
167
168 def enc (start_count, data):
169 password = str (os.urandom (42))
170 encryptor = crypto.Encrypt (TEST_VERSION,
171 TEST_PARAMVERSION,
172 password=password,
173 nacl=TEST_STATIC_NACL,
b750b280
PG
174 counter=start_count,
175 strict_ivs=True)
66b1c6f4
PG
176
177 for i, blob in enumerate (data, 1):
178 fname = "%s_%d" % (TEST_DUMMY_FILENAME, i)
179 header_dummy = encryptor.next (fname)
180 _, _ = encryptor.process (blob)
181 _, header, _ = encryptor.done (header_dummy)
182 assert len (encryptor.get_used_ivs ()) == i
183
184 return encryptor.get_used_ivs ()
185
186 ivs1 = enc (0x0042, [TEST_PLAINTEXT, b"none of your business"])
187 ivs2 = enc (0x1337, [b"read me if you can", b"for British eyes only!"])
188
189 # No reuse in general.
190 assert len (ivs1 & ivs2) == 0
191
192 ivs1 = list (ivs1)
193 ivs2 = list (ivs2)
194
195 # Counters of used IVs must match what we passed explicitly.
196 def extract_counters (ivs):
197 def getcount (iv):
198 _, cnt = struct.unpack (crypto.FMT_I2N_IV, iv)
199 return cnt
200 return list (map (getcount, ivs))
201
202 cnt1 = extract_counters (ivs1)
203 cnt2 = extract_counters (ivs2)
204
205 assert 0x0042 in cnt1
206 assert 0x0043 in cnt1
207
208 assert 0x1337 in cnt2
209 assert 0x1338 in cnt2
210
211
5133232d 212 def test_crypto_aes_gcm_enc_chunk_size (self):
90ee2a74 213 password = str (os.urandom (42))
1f3fd7b0 214 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 215 TEST_PARAMVERSION,
1f3fd7b0 216 password=password,
90ee2a74
PG
217 nacl=TEST_STATIC_NACL)
218
219 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
cb7a3911 220 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
90ee2a74
PG
221 assert len (ciphertext) == len (TEST_PLAINTEXT)
222 rest, header, fixed = encryptor.done (header_dummy)
223 assert len (rest) == 0
5133232d 224
e2a4e4f0 225
1f3fd7b0
PG
226 def test_crypto_aes_gcm_dec_ctor (self):
227 """
228 Ensure that only either key or password is accepted.
229 """
230 password = str (os.urandom (42))
231 key = os.urandom (16) # scrypt sized
232
233 decryptor = crypto.Decrypt (password=password)
234 decryptor = crypto.Decrypt (key=key)
235
1c2f7f07 236 with self.assertRaises (crypto.InvalidParameter): # both password and key
1f3fd7b0 237 decryptor = crypto.Decrypt (password=password, key=key)
1f3fd7b0 238
1c2f7f07 239 with self.assertRaises (crypto.InvalidParameter): # neither password nor key
1f3fd7b0 240 decryptor = crypto.Decrypt (password=None, key=None)
1f3fd7b0 241
1c2f7f07 242 with self.assertRaises (crypto.InvalidParameter): # # empty password
1f3fd7b0 243 decryptor = crypto.Decrypt (password="")
1f3fd7b0 244
30019abf 245
e2a4e4f0 246 def test_crypto_aes_gcm_dec_simple (self):
90ee2a74 247 password = str (os.urandom (42))
1f3fd7b0 248 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 249 TEST_PARAMVERSION,
1f3fd7b0 250 password=password,
90ee2a74
PG
251 nacl=TEST_STATIC_NACL)
252
253 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
cb7a3911 254 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
90ee2a74
PG
255 rest, header, fixed = encryptor.done (header_dummy)
256 ciphertext += rest
257
1f3fd7b0 258 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
90ee2a74
PG
259 decryptor.next (header)
260 plaintext = decryptor.process (ciphertext)
3ba1441c 261 rest = decryptor.done ()
90ee2a74
PG
262 plaintext += rest
263
90ee2a74 264 assert plaintext == TEST_PLAINTEXT
7c32c176
PG
265
266
6110ef14
PG
267 def test_crypto_aes_gcm_dec_plain_bad (self):
268 """
269 Downgrade to plaintext must not be allowed in parameters
270 obtained from headers.
271 """
272 password = str (os.urandom (42))
273 encryptor = crypto.Encrypt (TEST_VERSION,
274 TEST_PARAMVERSION,
275 password=password,
276 nacl=TEST_STATIC_NACL)
277
278 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
279 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
280 rest, header, fixed = encryptor.done (header_dummy)
281 ciphertext += rest
282
283 header = crypto.hdr_read (header)
284 header ["paramversion"] = PLAIN_PARAMVERSION
285 ok, header = crypto.hdr_make (header)
286 assert ok
287
288 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
289 with self.assertRaises (crypto.InvalidParameter):
290 decryptor.next (header)
291
292
293 def test_crypto_aes_gcm_dec_plain_ok_insecure (self):
294 """
295 Allow plaintext crypto mode if *insecure* flag is passed.
296 """
297 password = str (os.urandom (42))
298 encryptor = crypto.Encrypt (TEST_VERSION,
299 PLAIN_PARAMVERSION,
300 password=password,
301 nacl=TEST_STATIC_NACL,
302 insecure=True)
303
304 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
305 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
306 rest, header, fixed = encryptor.done (header_dummy)
307 ciphertext += rest
308
309 header = crypto.hdr_read (header)
310 header ["paramversion"] = PLAIN_PARAMVERSION
311 ok, header = crypto.hdr_make (header)
312 assert ok
313
314 decryptor = crypto.Decrypt (password=password,
315 fixedparts=fixed,
316 insecure=True)
317 decryptor.next (header)
318 plaintext = decryptor.process (ciphertext)
319 rest = decryptor.done ()
320 plaintext += rest
321
322 assert plaintext == TEST_PLAINTEXT
323
324
e2a4e4f0 325 def test_crypto_aes_gcm_dec_bad_tag (self):
90ee2a74 326 password = str (os.urandom (42))
1f3fd7b0 327 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 328 TEST_PARAMVERSION,
1f3fd7b0 329 password=password,
90ee2a74
PG
330 nacl=TEST_STATIC_NACL)
331
332 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
cb7a3911 333 _, ciphertext = encryptor.process (TEST_PLAINTEXT)
90ee2a74
PG
334 ciphertext2, header, fixed = encryptor.done (header_dummy)
335
336 mut_header = bytearray (header)
337 mut_header_vw = memoryview (mut_header)
338 # replace one byte in the tag part of the header
339 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
340 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
341 header = bytes (mut_header)
342
1f3fd7b0 343 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
90ee2a74
PG
344 decryptor.next (header)
345 plaintext = decryptor.process (ciphertext)
1c2f7f07 346 with self.assertRaises (crypto.InvalidGCMTag):
3ba1441c 347 _ = decryptor.done ()
e2a4e4f0
PG
348
349
c2d1c3ec
PG
350 def test_crypto_aes_gcm_enc_multicnk (self):
351 cnksiz = 1 << 10
90ee2a74
PG
352 pt = fill_mod (1 << 14)
353 password = str (os.urandom (42))
1f3fd7b0 354 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 355 TEST_PARAMVERSION,
1f3fd7b0 356 password=password,
90ee2a74
PG
357 nacl=TEST_STATIC_NACL)
358 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
c2d1c3ec
PG
359
360 off = 0
361 ct = b""
90ee2a74
PG
362 while off < len (pt):
363 upto = min (off + cnksiz, len (pt))
cb7a3911 364 _, cnk = encryptor.process (pt [off:upto])
c2d1c3ec
PG
365 ct += cnk
366 off += cnksiz
90ee2a74
PG
367 cnk, header, fixed = encryptor.done (header_dummy)
368 ct += cnk
369
370 assert len (pt) == len (ct)
c2d1c3ec
PG
371
372
30019abf
PG
373 def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
374 cnksiz = 1 << 10
375 pt = fill_mod (1 << 14)
376 password = str (os.urandom (42))
377 encryptor = crypto.Encrypt (TEST_VERSION,
378 TEST_PARAMVERSION,
379 password=password,
380 nacl=TEST_STATIC_NACL,
381 strict_ivs=True)
382 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
383
384 off = 0
385 ct = b""
386 while off < len (pt):
387 upto = min (off + cnksiz, len (pt))
cb7a3911 388 _, cnk = encryptor.process (pt [off:upto])
30019abf
PG
389 ct += cnk
390 off += cnksiz
391 cnk, header, fixed = encryptor.done (header_dummy)
392 ct += cnk
393
394 assert len (pt) == len (ct)
395
396
397 def test_crypto_aes_gcm_enc_multiobj (self):
398 cnksiz = 1 << 10
399 password = str (os.urandom (42))
400 encryptor = crypto.Encrypt (TEST_VERSION,
401 TEST_PARAMVERSION,
402 password=password,
403 nacl=TEST_STATIC_NACL,
404 strict_ivs=False)
405
406 def addobj (i):
407 pt = fill_mod (1 << 14, off=i)
408 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
409
410 off = 0
411 ct = b""
412 while off < len (pt):
413 upto = min (off + cnksiz, len (pt))
cb7a3911 414 _, cnk = encryptor.process (pt [off:upto])
30019abf
PG
415 ct += cnk
416 off += cnksiz
417 cnk, header, fixed = encryptor.done (header_dummy)
418 ct += cnk
419
420 assert len (pt) == len (ct)
421
422 for i in range (5): addobj (i)
423
be124bca
PG
424 assert len (encryptor.fixed) == 1
425
30019abf
PG
426
427 def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
428 cnksiz = 1 << 10
429 password = str (os.urandom (42))
430 encryptor = crypto.Encrypt (TEST_VERSION,
431 TEST_PARAMVERSION,
432 password=password,
433 nacl=TEST_STATIC_NACL,
434 strict_ivs=True)
be124bca 435 curfixed = None # must remain constant after first
30019abf
PG
436
437 def addobj (i):
438 pt = fill_mod (1 << 14, off=i)
439 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
440
441 off = 0
442 ct = b""
443 while off < len (pt):
444 upto = min (off + cnksiz, len (pt))
cb7a3911 445 _, cnk = encryptor.process (pt [off:upto])
30019abf
PG
446 ct += cnk
447 off += cnksiz
448 cnk, header, fixed = encryptor.done (header_dummy)
be124bca
PG
449 nonlocal curfixed
450 if curfixed is None:
451 curfixed = fixed
452 else:
453 assert fixed == curfixed
30019abf
PG
454 ct += cnk
455
456 assert len (pt) == len (ct)
457
458 for i in range (5): addobj (i)
459
be124bca
PG
460 assert len (encryptor.fixed) == 1
461
30019abf 462
770173c5
PG
463 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
464 """
465 Test behavior when the file counter tops out.
466
467 Artificially lower the maximum possible file counter. Considering
cb7a3911 468 invalid (0) and reserved (1, 2) values, the smallest possible file counter
770173c5
PG
469 for normal objects is 3. Starting from that, the header of the (max -
470 3)rd object must have both a different IV fixed part and a counter.
471 """
472 minimum = 3
473 new_max = 8
cb7a3911
PG
474 crypto._testing_set_AES_GCM_IV_CNT_MAX \
475 ("I am fully aware that this will void my warranty.", new_max)
770173c5
PG
476 cnksiz = 1 << 10
477 password = str (os.urandom (42))
478 encryptor = crypto.Encrypt (TEST_VERSION,
479 TEST_PARAMVERSION,
480 password=password,
481 nacl=TEST_STATIC_NACL,
482 strict_ivs=True)
483
484 last_iv = None
485 last_cnt = minimum
486
487 def addobj (i, wrap=False):
488 nonlocal last_iv
489 nonlocal last_cnt
490 pt = fill_mod (1 << 14, off=i)
491 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
492
493 off = 0
494 ct = b""
495 while off < len (pt):
496 upto = min (off + cnksiz, len (pt))
cb7a3911 497 _, cnk = encryptor.process (pt [off:upto])
770173c5
PG
498 ct += cnk
499 off += cnksiz
500 cnk, header, fixed = encryptor.done (header_dummy)
501 this_iv = crypto.hdr_read (header) ["iv"]
502 if last_iv is not None:
503 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
504 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
505 if wrap is False:
506 assert last_fixed == this_fixed
507 assert last_cnt == this_cnt - 1
508 else:
509 assert last_fixed != this_fixed
510 assert this_cnt == minimum
511 last_iv = this_iv
512 ct += cnk
513
514 assert len (pt) == len (ct)
515
516 for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
517 addobj (i + 1, True) # counter wraps to 3
518
519 for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
520 addobj (j + 1, True) # counter wraps to 3 again
521
be124bca
PG
522 assert len (encryptor.fixed) == 3
523
524
525 def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self):
526 """
527 Test behavior when the file counter tops out and the transition to
528 the next IV fixed part fails on account of a bad random generator.
529
530 Replaces the ``urandom`` reference in ``os`` with a deterministic
531 function. The encryptor context must communicate this condition with an
532 ``IVFixedPartError``.
533 """
534 minimum = 3
535 new_max = 8
536 crypto._testing_set_AES_GCM_IV_CNT_MAX \
537 ("I am fully aware that this will void my warranty.", new_max)
538 cnksiz = 1 << 10
539 os.urandom = lambda n: bytes (bytearray ([n % 256] * n))
540 password = str (os.urandom (42))
541 encryptor = crypto.Encrypt (TEST_VERSION,
542 TEST_PARAMVERSION,
543 password=password,
544 nacl=TEST_STATIC_NACL,
545 strict_ivs=True)
546
547 def addobj (i):
548 pt = fill_mod (1 << 14, off=i)
549 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
550
551 off = 0
552 while off < len (pt):
553 upto = min (off + cnksiz, len (pt))
554 _, cnk = encryptor.process (pt [off:upto])
555 off += cnksiz
556
557 for i in range (minimum, new_max): addobj (42 + i)
558
559 with self.assertRaises (crypto.IVFixedPartError):
560 addobj (42 + i)
561
562
cb7a3911
PG
563
564 def test_crypto_aes_gcm_enc_length_cap (self):
565 """
566 Artificially lower the maximum allowable data length and attempt to
e2f52c53
PG
567 encrypt a larger object. Verify that the crypto handler only encrypts
568 data up to the size limit. A downstream user detects that condition by
569 testing whether the encryption step yielded less bytes than the
570 plaintext.
c7066870
TJ
571
572 The sibling to this test is test_restore_backup_max_file_length()
573 in test_delatar.py. Deltatar will transparently create a splitted object
574 with an increased IV file counter.
cb7a3911
PG
575 """
576 new_max = 2187
577 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
578 ("I am fully aware that this will void my warranty.", new_max)
579 cnksiz = 1 << 10
580 password = str (os.urandom (42))
581 encryptor = crypto.Encrypt (TEST_VERSION,
582 TEST_PARAMVERSION,
583 password=password,
584 nacl=TEST_STATIC_NACL)
585
586 def encobj (s):
587 pt, ct = fill_mod (s), None
588 header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
589
590 n, ct = encryptor.process (pt)
591 rest, _, _ = encryptor.done (header_dummy)
366d4b42
PG
592
593 # NB: If this check *ever* fails, then something changed in the
594 # encoding layer. AES-GCM is a stream cipher so each encoding
595 # step will yield the exact number of ciphertext bytes that
596 # was provided as plaintext. Thus there cannot be any encoded
597 # data left when calling the finalizers. None of the crypo code
598 # depends on that assumption but nevertheless we check it here
599 # in case anything changes upstream in the Cryptography
600 # library. In case there actually is a rest, replace the
601 # assertion below with ``ct += rest``.
602 assert (len (rest) == 0)
cb7a3911
PG
603
604 if len (pt) > new_max:
e2f52c53
PG
605 # If the plaintext was longer than the artificially lowered
606 # maximum, then the number of ciphertext bytes must be clamped
607 # to the maximum.
608 assert n == new_max
cb7a3911
PG
609 else:
610 assert n == len (pt) == len (ct)
611
612 for i in range (16): encobj (1 << i)
770173c5
PG
613
614
58ed14b8
PG
615 def test_crypto_aes_gcm_dec_length_cap (self):
616 """
617 The decryptor must reject headers with an object size that exceeds
618 the PDTCRYPT maximum. Longer files split into multiple objects.
619 """
620 password = str (os.urandom (42))
621 meta = faux_hdr()
622 meta ["ctsize"] = crypto.PDTCRYPT_MAX_OBJ_SIZE + 1
623 ok, header = crypto.hdr_make (meta)
624
625 assert ok
626
627 # Set up decryption with bogus header.
628 decryptor = crypto.Decrypt (password=password, fixedparts=[])
629
630 with self.assertRaises (crypto.InvalidHeader):
631 decryptor.next (header)
632
633
634 def test_crypto_aes_gcm_dec_length_mismatch (self):
635 """
636 Catch attempts at decrypting more data than what was stated in the
637 header.
638 """
639 cnksiz = 1 << 10
640 orig_pt = fill_mod (1 << 14)
641 password = str (os.urandom (42))
642 encryptor = crypto.Encrypt (TEST_VERSION,
643 TEST_PARAMVERSION,
644 password=password,
645 nacl=TEST_STATIC_NACL)
646 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
647
648 off = 0
649 ct = b""
650 while off < len (orig_pt):
651 upto = min (off + cnksiz, len (orig_pt))
652 _n, cnk = encryptor.process (orig_pt [off:upto])
653 ct += cnk
654 off += cnksiz
655 cnk, header, fixed = encryptor.done (header_dummy)
656 ct += cnk
657
658 decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
659
660 decryptor.next (header)
661 off = 0
662 pt = b""
663 while off < len (orig_pt):
664 upto = min (off + cnksiz, len (orig_pt))
665 cnk = decryptor.process (ct [off:upto])
666 pt += cnk
667 off += cnksiz
668
669 with self.assertRaises (crypto.CiphertextTooLong):
670 # Try and decrypt one byte more than was encrypted.
671 # This must be caught in crypto.py.
672 _ = decryptor.process (ct [0:1])
673
674
c2d1c3ec 675 def test_crypto_aes_gcm_dec_multicnk (self):
90ee2a74
PG
676 cnksiz = 1 << 10
677 orig_pt = fill_mod (1 << 14)
678 password = str (os.urandom (42))
1f3fd7b0 679 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 680 TEST_PARAMVERSION,
1f3fd7b0 681 password=password,
90ee2a74
PG
682 nacl=TEST_STATIC_NACL)
683 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
c2d1c3ec
PG
684
685 off = 0
686 ct = b""
687 while off < len (orig_pt):
688 upto = min (off + cnksiz, len (orig_pt))
cb7a3911 689 _n, cnk = encryptor.process (orig_pt [off:upto])
c2d1c3ec
PG
690 ct += cnk
691 off += cnksiz
90ee2a74
PG
692 cnk, header, fixed = encryptor.done (header_dummy)
693 ct += cnk
c2d1c3ec 694
1f3fd7b0
PG
695 decryptor = crypto.Decrypt (password=password,
696 fixedparts=fixed)
90ee2a74 697 decryptor.next (header)
c2d1c3ec 698 off = 0
90ee2a74 699 pt = b""
c2d1c3ec
PG
700 while off < len (orig_pt):
701 upto = min (off + cnksiz, len (orig_pt))
cb7a3911 702 cnk = decryptor.process (ct [off:upto])
c2d1c3ec
PG
703 pt += cnk
704 off += cnksiz
c2d1c3ec 705
90ee2a74 706
3ba1441c 707 pt += decryptor.done ()
c2d1c3ec
PG
708 assert pt == orig_pt
709
710
7c32c176 711 def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
90ee2a74
PG
712 cnksiz = 1 << 10
713 orig_pt = fill_mod (1 << 14)
714 password = str (os.urandom (42))
1f3fd7b0 715 encryptor = crypto.Encrypt (TEST_VERSION,
90ee2a74 716 TEST_PARAMVERSION,
1f3fd7b0 717 password=password,
90ee2a74
PG
718 nacl=TEST_STATIC_NACL)
719 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
7c32c176
PG
720
721 off = 0
722 ct = b""
723 while off < len (orig_pt):
724 upto = min (off + cnksiz, len (orig_pt))
cb7a3911 725 _n, cnk = encryptor.process (orig_pt [off:upto])
7c32c176
PG
726 ct += cnk
727 off += cnksiz
90ee2a74
PG
728 cnk, header, fixed = encryptor.done (header_dummy)
729 ct += cnk
730
731 mut_header = bytearray (header)
732 mut_header_vw = memoryview (mut_header)
733 # replace one byte in the tag part of the header
734 second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2]
735 mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
736 header = bytes (mut_header)
737
1f3fd7b0
PG
738 decryptor = crypto.Decrypt (password=password,
739 fixedparts=fixed)
90ee2a74 740 decryptor.next (header)
7c32c176 741 off = 0
90ee2a74 742 pt = b""
7c32c176
PG
743 while off < len (orig_pt):
744 upto = min (off + cnksiz, len (orig_pt))
90ee2a74 745 cnk = decryptor.process (ct [off:upto])
7c32c176
PG
746 pt += cnk
747 off += cnksiz
748
1c2f7f07 749 with self.assertRaises (crypto.InvalidGCMTag):
3ba1441c 750 _ = decryptor.done ()
da4ea4fa
PG
751
752
2f0b128c
PG
753 def test_crypto_aes_gcm_dec_iv_gap (self):
754 """
755 Encrypt multiple objects using non-consecutive IVs and verify that the
756 decryptor errors out with an exception in strict mode but keeps quiet
757 otherwise.
758 """
759 cnksiz = 1 << 10
760 orig_pt_1 = fill_mod (1 << 10)
761 orig_pt_2 = fill_mod (1 << 10, 23)
762 orig_pt_3 = fill_mod (1 << 10, 42)
763 password = str (os.urandom (42))
764 encryptor = crypto.Encrypt (TEST_VERSION,
765 TEST_PARAMVERSION,
766 password=password,
767 nacl=TEST_STATIC_NACL)
768
769 def enc (pt):
770 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
771
772 off = 0
773 ct = b""
774 while off < len (pt):
775 upto = min (off + cnksiz, len (pt))
776 _n, cnk = encryptor.process (pt [off:upto])
777 ct += cnk
778 off += cnksiz
779 cnk, header, fixed = encryptor.done (header_dummy)
780 return ct + cnk, header, fixed
781
782 ct_1, hdr_1, _____ = enc (orig_pt_1)
783
784 ## Here we bump the iv of the encryptor, breaking the series.
785 encryptor.set_object_counter (encryptor.cnt + 1)
786 ct_2, hdr_2, fixed = enc (orig_pt_2)
787
788 ## IV of final object is again in-sequence.
789 ct_3, hdr_3, fixed = enc (orig_pt_3)
790
791 def decrypt (strict_ivs):
792 decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
793 strict_ivs=strict_ivs)
794
795 def dec (hdr, ct):
796 decryptor.next (hdr)
797 off = 0
798 pt = b""
799 while off < len (ct):
800 upto = min (off + cnksiz, len (ct))
801 cnk = decryptor.process (ct [off:upto])
802 pt += cnk
803 off += cnksiz
804 return pt + decryptor.done ()
805
806 decr_pt_1 = dec (hdr_1, ct_1)
807 decr_pt_2 = dec (hdr_2, ct_2) ## ← good header, non-consecutive IV
808 decr_pt_3 = dec (hdr_3, ct_3)
809
810 assert decr_pt_1 == orig_pt_1
811 assert decr_pt_2 == orig_pt_2
812 assert decr_pt_3 == orig_pt_3
813
814 with self.assertRaises (crypto.NonConsecutiveIV):
815 decrypt (True)
816
817 decrypt (False) # Sequence passes
818
819
fd10b44a
PG
820 def test_crypto_aes_gcm_dec_iv_reuse (self):
821 """
822 Meddle with encrypted content: extract the IV from one object
823 and inject it into the header of another. This must be rejected
b750b280 824 by the decryptor with paranoid IV checking enabled.
fd10b44a
PG
825 """
826 cnksiz = 1 << 10
827 orig_pt_1 = fill_mod (1 << 10)
828 orig_pt_2 = fill_mod (1 << 10, 42)
829 password = str (os.urandom (42))
1f3fd7b0 830 encryptor = crypto.Encrypt (TEST_VERSION,
fd10b44a 831 TEST_PARAMVERSION,
1f3fd7b0 832 password=password,
b750b280
PG
833 nacl=TEST_STATIC_NACL,
834 strict_ivs=True)
fd10b44a
PG
835
836 def enc (pt):
837 header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
838
839 off = 0
840 ct = b""
841 while off < len (pt):
842 upto = min (off + cnksiz, len (pt))
cb7a3911 843 _n, cnk = encryptor.process (pt [off:upto])
fd10b44a
PG
844 ct += cnk
845 off += cnksiz
846 cnk, header, fixed = encryptor.done (header_dummy)
847 return ct + cnk, header, fixed
848
849 ct_1, hdr_1, _____ = enc (orig_pt_1)
66b1c6f4 850 encryptor.cnt -= 1 # induce error by forcing an identical IV on next step
fd10b44a 851
66b1c6f4
PG
852 with self.assertRaises (crypto.DuplicateIV): # reuse detected
853 ct_2, hdr_2, fixed = enc (orig_pt_2)
fd10b44a
PG
854
855
cb7397d5
PG
856class HeaderTest (CryptoLayerTest):
857
e2a4e4f0
PG
858 def test_crypto_fmt_hdr_make (self):
859 meta = faux_hdr()
860 ok, hdr = crypto.hdr_make (meta)
861 assert ok
90ee2a74 862 assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
e2a4e4f0
PG
863
864
cb7397d5
PG
865 def test_crypto_fmt_hdr_make_useless (self):
866 ok, ret = crypto.hdr_make ({ 42: "x" })
867 assert ok is False
a83fa4ed 868 assert ret.startswith ("error assembling header:")
cb7397d5
PG
869
870
e2a4e4f0
PG
871 def test_crypto_fmt_hdr_read (self):
872 meta = faux_hdr()
873 ok, hdr = crypto.hdr_make (meta)
90ee2a74
PG
874 assert ok is True
875 assert hdr is not None
876 mmeta = crypto.hdr_read (hdr)
877 assert mmeta is not None
e2a4e4f0
PG
878 for k in meta:
879 if meta [k] != mmeta [k]:
880 raise "header mismatch after reading: expected %r, got %r" \
881 % (meta [k], mmeta [k])
882
883
cb7397d5
PG
884 def test_crypto_fmt_hdr_read_trailing_garbage (self):
885 meta = faux_hdr()
886 ok, hdr = crypto.hdr_make (meta)
90ee2a74
PG
887 ok, hdr = crypto.hdr_make (meta)
888 assert ok is True
889 assert hdr is not None
cb7397d5 890 hdr += b"-junk"
1c2f7f07 891 with self.assertRaises (crypto.InvalidHeader):
90ee2a74 892 _ = crypto.hdr_read (hdr)
cb7397d5
PG
893
894
895 def test_crypto_fmt_hdr_read_leading_garbage (self):
896 meta = faux_hdr()
897 ok, hdr = crypto.hdr_make (meta)
90ee2a74
PG
898 ok, hdr = crypto.hdr_make (meta)
899 assert ok is True
900 assert hdr is not None
cb7397d5 901 hdr = b"junk-" + hdr
1c2f7f07 902 with self.assertRaises (crypto.InvalidHeader):
90ee2a74 903 _ = crypto.hdr_read (hdr)
cb7397d5
PG
904
905
906 def test_crypto_fmt_hdr_inner_garbage (self):
907 meta = faux_hdr()
908 ok, hdr = crypto.hdr_make (meta)
c176405d 909 assert ok
90ee2a74 910 data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
1c2f7f07 911 with self.assertRaises (crypto.InvalidHeader):
90ee2a74 912 _ = crypto.hdr_read (data)
c176405d 913