Commit | Line | Data |
---|---|---|
5133232d PG |
1 | import binascii |
2 | import os | |
3 | import pylibscrypt | |
c2d1c3ec | 4 | import struct |
5133232d PG |
5 | import unittest |
6 | ||
7 | import deltatar.crypto as crypto | |
8 | ||
e2a4e4f0 PG |
9 | import cryptography |
10 | ||
5133232d PG |
11 | def b(s): |
12 | return s.encode("UTF-8") | |
13 | ||
90ee2a74 | 14 | CRYPTO_NACL_SIZE = 16 |
5133232d | 15 | CRYPTO_KEY_SIZE = 16 |
90ee2a74 PG |
16 | |
17 | TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail") | |
18 | TEST_PASSPHRASE = b"test1234" | |
19 | TEST_AES_GCM_AAD = b"authenticated plain text" | |
20 | TEST_DUMMY_FILENAME = "insurance-file.txt" | |
21 | TEST_VERSION = 1 | |
22 | TEST_PARAMVERSION = 1 | |
23 | TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE) | |
5133232d PG |
24 | |
25 | def faux_hdr (ctsize=1337, iv=None): | |
26 | return \ | |
27 | { "version" : 42 | |
28 | , "paramversion" : 2187 | |
29 | , "nacl" : binascii.unhexlify(b"0011223344556677" | |
30 | b"8899aabbccddeeff") | |
31 | , "iv" : iv or binascii.unhexlify(b"0011223344556677" | |
32 | b"8899aabb") | |
33 | , "ctsize" : ctsize | |
90ee2a74 PG |
34 | , "tag" : binascii.unhexlify(b"deadbeefbadb100d" |
35 | b"b1eedc0ffeedea15") | |
5133232d PG |
36 | } |
37 | ||
30019abf | 38 | FILL_MOD_MEMO = { } |
5133232d | 39 | |
fd10b44a | 40 | def fill_mod (n, off=0): |
30019abf PG |
41 | global FILL_MOD_MEMO |
42 | k = (n, off) | |
43 | m = FILL_MOD_MEMO.get (k, None) | |
44 | if m is not None: | |
45 | return m | |
c2d1c3ec PG |
46 | buf = bytearray (n) |
47 | bufv = memoryview (buf) | |
48 | for i in range (n): | |
fd10b44a PG |
49 | off += 1 |
50 | c = off % 64 + 32 | |
c2d1c3ec | 51 | struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8")) |
30019abf PG |
52 | m = bytes (buf) |
53 | FILL_MOD_MEMO [k] = m | |
54 | return m | |
c2d1c3ec PG |
55 | |
56 | ||
5133232d PG |
57 | def faux_payload (): |
58 | return "abcd" * 42 | |
59 | ||
cb7397d5 | 60 | |
5133232d | 61 | class CryptoLayerTest (unittest.TestCase): |
cb7397d5 PG |
62 | pass |
63 | ||
64 | ||
65 | class AESGCMTest (CryptoLayerTest): | |
5133232d | 66 | |
cb7a3911 PG |
67 | def tearDown (self): |
68 | """Reset globals altered for testing.""" | |
69 | _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \ | |
70 | ("I am fully aware that this will void my warranty.") | |
71 | _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ | |
72 | ("I am fully aware that this will void my warranty.") | |
73 | ||
90ee2a74 PG |
74 | def test_crypto_aes_gcm_enc_ctor (self): |
75 | password = str (os.urandom (42)) | |
1f3fd7b0 PG |
76 | encryptor = crypto.Encrypt (TEST_VERSION, |
77 | TEST_PARAMVERSION, | |
78 | password=password, | |
79 | nacl=TEST_STATIC_NACL) | |
80 | ||
81 | ||
82 | def test_crypto_aes_gcm_enc_ctor_key (self): | |
83 | key = os.urandom (42) | |
84 | encryptor = crypto.Encrypt (TEST_VERSION, | |
85 | TEST_PARAMVERSION, | |
86 | key=key, | |
87 | nacl=TEST_STATIC_NACL) | |
88 | ||
89 | ||
90 | def test_crypto_aes_gcm_enc_ctor_no_key_pw (self): | |
91 | """ | |
92 | Either key (+nacl) or password must be supplied, not both. | |
93 | """ | |
94 | try: | |
95 | encryptor = crypto.Encrypt (TEST_VERSION, | |
96 | TEST_PARAMVERSION, | |
97 | nacl=TEST_STATIC_NACL) | |
98 | except crypto.InvalidParameter: # neither key nor pw | |
99 | pass | |
100 | ||
101 | password = str (os.urandom (42)) | |
102 | key = os.urandom (16) # scrypt sized | |
103 | try: | |
104 | encryptor = crypto.Encrypt (TEST_VERSION, | |
105 | TEST_PARAMVERSION, | |
106 | password=password, | |
107 | key=key, | |
108 | nacl=TEST_STATIC_NACL) | |
109 | except crypto.InvalidParameter: # both key and pw | |
110 | pass | |
111 | ||
112 | try: | |
113 | encryptor = crypto.Encrypt (TEST_VERSION, | |
114 | TEST_PARAMVERSION, | |
115 | key=key, | |
116 | nacl=None) | |
117 | except crypto.InvalidParameter: # key, but salt missing | |
118 | pass | |
119 | ||
120 | try: | |
121 | encryptor = crypto.Encrypt (TEST_VERSION, | |
122 | TEST_PARAMVERSION, | |
123 | password=b"", | |
124 | nacl=TEST_STATIC_NACL) | |
125 | except crypto.InvalidParameter: # empty pw | |
126 | pass | |
90ee2a74 PG |
127 | |
128 | ||
129 | def test_crypto_aes_gcm_enc_header_size (self): | |
130 | password = str (os.urandom (42)) | |
1f3fd7b0 | 131 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 132 | TEST_PARAMVERSION, |
1f3fd7b0 | 133 | password=password, |
90ee2a74 PG |
134 | nacl=TEST_STATIC_NACL) |
135 | ||
136 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
137 | assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE | |
cb7a3911 | 138 | _, _ = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
139 | _, header, _ = encryptor.done (header_dummy) |
140 | assert len (header) == crypto.PDTCRYPT_HDR_SIZE | |
5133232d | 141 | |
e2a4e4f0 | 142 | |
5133232d | 143 | def test_crypto_aes_gcm_enc_chunk_size (self): |
90ee2a74 | 144 | password = str (os.urandom (42)) |
1f3fd7b0 | 145 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 146 | TEST_PARAMVERSION, |
1f3fd7b0 | 147 | password=password, |
90ee2a74 PG |
148 | nacl=TEST_STATIC_NACL) |
149 | ||
150 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 151 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
152 | assert len (ciphertext) == len (TEST_PLAINTEXT) |
153 | rest, header, fixed = encryptor.done (header_dummy) | |
154 | assert len (rest) == 0 | |
5133232d | 155 | |
e2a4e4f0 | 156 | |
1f3fd7b0 PG |
157 | def test_crypto_aes_gcm_dec_ctor (self): |
158 | """ | |
159 | Ensure that only either key or password is accepted. | |
160 | """ | |
161 | password = str (os.urandom (42)) | |
162 | key = os.urandom (16) # scrypt sized | |
163 | ||
164 | decryptor = crypto.Decrypt (password=password) | |
165 | decryptor = crypto.Decrypt (key=key) | |
166 | ||
167 | try: | |
168 | decryptor = crypto.Decrypt (password=password, key=key) | |
169 | except crypto.InvalidParameter: # both password and key | |
170 | pass | |
171 | ||
172 | try: | |
173 | decryptor = crypto.Decrypt (password=None, key=None) | |
174 | except crypto.InvalidParameter: # neither password nor key | |
175 | pass | |
176 | ||
177 | try: | |
178 | decryptor = crypto.Decrypt (password="") | |
179 | except crypto.InvalidParameter: # empty password | |
180 | pass | |
181 | ||
30019abf | 182 | |
e2a4e4f0 | 183 | def test_crypto_aes_gcm_dec_simple (self): |
90ee2a74 | 184 | password = str (os.urandom (42)) |
1f3fd7b0 | 185 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 186 | TEST_PARAMVERSION, |
1f3fd7b0 | 187 | password=password, |
90ee2a74 PG |
188 | nacl=TEST_STATIC_NACL) |
189 | ||
190 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 191 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
192 | rest, header, fixed = encryptor.done (header_dummy) |
193 | ciphertext += rest | |
194 | ||
1f3fd7b0 | 195 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) |
90ee2a74 PG |
196 | decryptor.next (header) |
197 | plaintext = decryptor.process (ciphertext) | |
3ba1441c | 198 | rest = decryptor.done () |
90ee2a74 PG |
199 | plaintext += rest |
200 | ||
90ee2a74 | 201 | assert plaintext == TEST_PLAINTEXT |
7c32c176 PG |
202 | |
203 | ||
e2a4e4f0 | 204 | def test_crypto_aes_gcm_dec_bad_tag (self): |
90ee2a74 | 205 | password = str (os.urandom (42)) |
1f3fd7b0 | 206 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 207 | TEST_PARAMVERSION, |
1f3fd7b0 | 208 | password=password, |
90ee2a74 PG |
209 | nacl=TEST_STATIC_NACL) |
210 | ||
211 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 212 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
213 | ciphertext2, header, fixed = encryptor.done (header_dummy) |
214 | ||
215 | mut_header = bytearray (header) | |
216 | mut_header_vw = memoryview (mut_header) | |
217 | # replace one byte in the tag part of the header | |
218 | second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2] | |
219 | mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 | |
220 | header = bytes (mut_header) | |
221 | ||
1f3fd7b0 | 222 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) |
90ee2a74 PG |
223 | decryptor.next (header) |
224 | plaintext = decryptor.process (ciphertext) | |
3ba1441c PG |
225 | try: |
226 | _ = decryptor.done () | |
227 | except crypto.InvalidGCMTag: | |
228 | pass | |
e2a4e4f0 PG |
229 | |
230 | ||
c2d1c3ec PG |
231 | def test_crypto_aes_gcm_enc_multicnk (self): |
232 | cnksiz = 1 << 10 | |
90ee2a74 PG |
233 | pt = fill_mod (1 << 14) |
234 | password = str (os.urandom (42)) | |
1f3fd7b0 | 235 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 236 | TEST_PARAMVERSION, |
1f3fd7b0 | 237 | password=password, |
90ee2a74 PG |
238 | nacl=TEST_STATIC_NACL) |
239 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
c2d1c3ec PG |
240 | |
241 | off = 0 | |
242 | ct = b"" | |
90ee2a74 PG |
243 | while off < len (pt): |
244 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 245 | _, cnk = encryptor.process (pt [off:upto]) |
c2d1c3ec PG |
246 | ct += cnk |
247 | off += cnksiz | |
90ee2a74 PG |
248 | cnk, header, fixed = encryptor.done (header_dummy) |
249 | ct += cnk | |
250 | ||
251 | assert len (pt) == len (ct) | |
c2d1c3ec PG |
252 | |
253 | ||
30019abf PG |
254 | def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self): |
255 | cnksiz = 1 << 10 | |
256 | pt = fill_mod (1 << 14) | |
257 | password = str (os.urandom (42)) | |
258 | encryptor = crypto.Encrypt (TEST_VERSION, | |
259 | TEST_PARAMVERSION, | |
260 | password=password, | |
261 | nacl=TEST_STATIC_NACL, | |
262 | strict_ivs=True) | |
263 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
264 | ||
265 | off = 0 | |
266 | ct = b"" | |
267 | while off < len (pt): | |
268 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 269 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
270 | ct += cnk |
271 | off += cnksiz | |
272 | cnk, header, fixed = encryptor.done (header_dummy) | |
273 | ct += cnk | |
274 | ||
275 | assert len (pt) == len (ct) | |
276 | ||
277 | ||
278 | def test_crypto_aes_gcm_enc_multiobj (self): | |
279 | cnksiz = 1 << 10 | |
280 | password = str (os.urandom (42)) | |
281 | encryptor = crypto.Encrypt (TEST_VERSION, | |
282 | TEST_PARAMVERSION, | |
283 | password=password, | |
284 | nacl=TEST_STATIC_NACL, | |
285 | strict_ivs=False) | |
286 | ||
287 | def addobj (i): | |
288 | pt = fill_mod (1 << 14, off=i) | |
289 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
290 | ||
291 | off = 0 | |
292 | ct = b"" | |
293 | while off < len (pt): | |
294 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 295 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
296 | ct += cnk |
297 | off += cnksiz | |
298 | cnk, header, fixed = encryptor.done (header_dummy) | |
299 | ct += cnk | |
300 | ||
301 | assert len (pt) == len (ct) | |
302 | ||
303 | for i in range (5): addobj (i) | |
304 | ||
305 | ||
306 | def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self): | |
307 | cnksiz = 1 << 10 | |
308 | password = str (os.urandom (42)) | |
309 | encryptor = crypto.Encrypt (TEST_VERSION, | |
310 | TEST_PARAMVERSION, | |
311 | password=password, | |
312 | nacl=TEST_STATIC_NACL, | |
313 | strict_ivs=True) | |
314 | ||
315 | def addobj (i): | |
316 | pt = fill_mod (1 << 14, off=i) | |
317 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
318 | ||
319 | off = 0 | |
320 | ct = b"" | |
321 | while off < len (pt): | |
322 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 323 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
324 | ct += cnk |
325 | off += cnksiz | |
326 | cnk, header, fixed = encryptor.done (header_dummy) | |
327 | ct += cnk | |
328 | ||
329 | assert len (pt) == len (ct) | |
330 | ||
331 | for i in range (5): addobj (i) | |
332 | ||
333 | ||
770173c5 PG |
334 | def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self): |
335 | """ | |
336 | Test behavior when the file counter tops out. | |
337 | ||
338 | Artificially lower the maximum possible file counter. Considering | |
cb7a3911 | 339 | invalid (0) and reserved (1, 2) values, the smallest possible file counter |
770173c5 PG |
340 | for normal objects is 3. Starting from that, the header of the (max - |
341 | 3)rd object must have both a different IV fixed part and a counter. | |
342 | """ | |
343 | minimum = 3 | |
344 | new_max = 8 | |
cb7a3911 PG |
345 | crypto._testing_set_AES_GCM_IV_CNT_MAX \ |
346 | ("I am fully aware that this will void my warranty.", new_max) | |
770173c5 PG |
347 | cnksiz = 1 << 10 |
348 | password = str (os.urandom (42)) | |
349 | encryptor = crypto.Encrypt (TEST_VERSION, | |
350 | TEST_PARAMVERSION, | |
351 | password=password, | |
352 | nacl=TEST_STATIC_NACL, | |
353 | strict_ivs=True) | |
354 | ||
355 | last_iv = None | |
356 | last_cnt = minimum | |
357 | ||
358 | def addobj (i, wrap=False): | |
359 | nonlocal last_iv | |
360 | nonlocal last_cnt | |
361 | pt = fill_mod (1 << 14, off=i) | |
362 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
363 | ||
364 | off = 0 | |
365 | ct = b"" | |
366 | while off < len (pt): | |
367 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 368 | _, cnk = encryptor.process (pt [off:upto]) |
770173c5 PG |
369 | ct += cnk |
370 | off += cnksiz | |
371 | cnk, header, fixed = encryptor.done (header_dummy) | |
372 | this_iv = crypto.hdr_read (header) ["iv"] | |
373 | if last_iv is not None: | |
374 | this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv) | |
375 | last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv) | |
376 | if wrap is False: | |
377 | assert last_fixed == this_fixed | |
378 | assert last_cnt == this_cnt - 1 | |
379 | else: | |
380 | assert last_fixed != this_fixed | |
381 | assert this_cnt == minimum | |
382 | last_iv = this_iv | |
383 | ct += cnk | |
384 | ||
385 | assert len (pt) == len (ct) | |
386 | ||
387 | for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8] | |
388 | addobj (i + 1, True) # counter wraps to 3 | |
389 | ||
390 | for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8] | |
391 | addobj (j + 1, True) # counter wraps to 3 again | |
392 | ||
cb7a3911 PG |
393 | |
394 | def test_crypto_aes_gcm_enc_length_cap (self): | |
395 | """ | |
396 | Artificially lower the maximum allowable data length and attempt to | |
397 | encrypt a larger object. Verify that the crypto handler aborts with and | |
398 | exception. | |
399 | """ | |
400 | new_max = 2187 | |
401 | crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ | |
402 | ("I am fully aware that this will void my warranty.", new_max) | |
403 | cnksiz = 1 << 10 | |
404 | password = str (os.urandom (42)) | |
405 | encryptor = crypto.Encrypt (TEST_VERSION, | |
406 | TEST_PARAMVERSION, | |
407 | password=password, | |
408 | nacl=TEST_STATIC_NACL) | |
409 | ||
410 | def encobj (s): | |
411 | pt, ct = fill_mod (s), None | |
412 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s)) | |
413 | ||
414 | n, ct = encryptor.process (pt) | |
415 | rest, _, _ = encryptor.done (header_dummy) | |
416 | ct += rest | |
417 | ||
418 | if len (pt) > new_max: | |
419 | assert n < len (pt) | |
420 | else: | |
421 | assert n == len (pt) == len (ct) | |
422 | ||
423 | for i in range (16): encobj (1 << i) | |
770173c5 PG |
424 | |
425 | ||
c2d1c3ec | 426 | def test_crypto_aes_gcm_dec_multicnk (self): |
90ee2a74 PG |
427 | cnksiz = 1 << 10 |
428 | orig_pt = fill_mod (1 << 14) | |
429 | password = str (os.urandom (42)) | |
1f3fd7b0 | 430 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 431 | TEST_PARAMVERSION, |
1f3fd7b0 | 432 | password=password, |
90ee2a74 PG |
433 | nacl=TEST_STATIC_NACL) |
434 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
c2d1c3ec PG |
435 | |
436 | off = 0 | |
437 | ct = b"" | |
438 | while off < len (orig_pt): | |
439 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 440 | _n, cnk = encryptor.process (orig_pt [off:upto]) |
c2d1c3ec PG |
441 | ct += cnk |
442 | off += cnksiz | |
90ee2a74 PG |
443 | cnk, header, fixed = encryptor.done (header_dummy) |
444 | ct += cnk | |
c2d1c3ec | 445 | |
1f3fd7b0 PG |
446 | decryptor = crypto.Decrypt (password=password, |
447 | fixedparts=fixed) | |
90ee2a74 | 448 | decryptor.next (header) |
c2d1c3ec | 449 | off = 0 |
90ee2a74 | 450 | pt = b"" |
c2d1c3ec PG |
451 | while off < len (orig_pt): |
452 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 453 | cnk = decryptor.process (ct [off:upto]) |
c2d1c3ec PG |
454 | pt += cnk |
455 | off += cnksiz | |
c2d1c3ec | 456 | |
90ee2a74 | 457 | |
3ba1441c | 458 | pt += decryptor.done () |
c2d1c3ec PG |
459 | assert pt == orig_pt |
460 | ||
461 | ||
7c32c176 | 462 | def test_crypto_aes_gcm_dec_multicnk_bad_tag (self): |
90ee2a74 PG |
463 | cnksiz = 1 << 10 |
464 | orig_pt = fill_mod (1 << 14) | |
465 | password = str (os.urandom (42)) | |
1f3fd7b0 | 466 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 467 | TEST_PARAMVERSION, |
1f3fd7b0 | 468 | password=password, |
90ee2a74 PG |
469 | nacl=TEST_STATIC_NACL) |
470 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
7c32c176 PG |
471 | |
472 | off = 0 | |
473 | ct = b"" | |
474 | while off < len (orig_pt): | |
475 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 476 | _n, cnk = encryptor.process (orig_pt [off:upto]) |
7c32c176 PG |
477 | ct += cnk |
478 | off += cnksiz | |
90ee2a74 PG |
479 | cnk, header, fixed = encryptor.done (header_dummy) |
480 | ct += cnk | |
481 | ||
482 | mut_header = bytearray (header) | |
483 | mut_header_vw = memoryview (mut_header) | |
484 | # replace one byte in the tag part of the header | |
485 | second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2] | |
486 | mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 | |
487 | header = bytes (mut_header) | |
488 | ||
1f3fd7b0 PG |
489 | decryptor = crypto.Decrypt (password=password, |
490 | fixedparts=fixed) | |
90ee2a74 | 491 | decryptor.next (header) |
7c32c176 | 492 | off = 0 |
90ee2a74 | 493 | pt = b"" |
7c32c176 PG |
494 | while off < len (orig_pt): |
495 | upto = min (off + cnksiz, len (orig_pt)) | |
90ee2a74 | 496 | cnk = decryptor.process (ct [off:upto]) |
7c32c176 PG |
497 | pt += cnk |
498 | off += cnksiz | |
499 | ||
3ba1441c PG |
500 | try: |
501 | _ = decryptor.done () | |
502 | except crypto.InvalidGCMTag: | |
503 | pass | |
da4ea4fa PG |
504 | |
505 | ||
fd10b44a PG |
506 | def test_crypto_aes_gcm_dec_iv_reuse (self): |
507 | """ | |
508 | Meddle with encrypted content: extract the IV from one object | |
509 | and inject it into the header of another. This must be rejected | |
510 | by the decryptor. | |
511 | """ | |
512 | cnksiz = 1 << 10 | |
513 | orig_pt_1 = fill_mod (1 << 10) | |
514 | orig_pt_2 = fill_mod (1 << 10, 42) | |
515 | password = str (os.urandom (42)) | |
1f3fd7b0 | 516 | encryptor = crypto.Encrypt (TEST_VERSION, |
fd10b44a | 517 | TEST_PARAMVERSION, |
1f3fd7b0 | 518 | password=password, |
fd10b44a PG |
519 | nacl=TEST_STATIC_NACL) |
520 | ||
521 | def enc (pt): | |
522 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
523 | ||
524 | off = 0 | |
525 | ct = b"" | |
526 | while off < len (pt): | |
527 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 528 | _n, cnk = encryptor.process (pt [off:upto]) |
fd10b44a PG |
529 | ct += cnk |
530 | off += cnksiz | |
531 | cnk, header, fixed = encryptor.done (header_dummy) | |
532 | return ct + cnk, header, fixed | |
533 | ||
534 | ct_1, hdr_1, _____ = enc (orig_pt_1) | |
535 | ct_2, hdr_2, fixed = enc (orig_pt_2) | |
536 | ||
537 | mut_hdr_2 = bytearray (hdr_2) | |
538 | mut_hdr_2_vw = memoryview (mut_hdr_2) | |
539 | # get IV | |
540 | iv_lo = crypto.HDR_OFF_IV | |
541 | iv_hi = crypto.HDR_OFF_IV + crypto.PDTCRYPT_HDR_SIZE_IV | |
542 | iv_1 = hdr_1 [iv_lo : iv_hi] | |
543 | # transplant into other header | |
544 | mut_hdr_2_vw [iv_lo : iv_hi] = iv_1 | |
545 | hdr_2_mod = bytes (mut_hdr_2) | |
1f3fd7b0 | 546 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed, |
fd10b44a PG |
547 | strict_ivs=True) |
548 | ||
549 | def dec (hdr, ct): | |
550 | decryptor.next (hdr) | |
551 | off = 0 | |
552 | pt = b"" | |
553 | while off < len (ct): | |
554 | upto = min (off + cnksiz, len (ct)) | |
555 | cnk = decryptor.process (ct [off:upto]) | |
556 | pt += cnk | |
557 | off += cnksiz | |
558 | return pt + decryptor.done () | |
559 | ||
560 | decr_pt_1 = dec (hdr_1, ct_1) | |
561 | decr_pt_2 = dec (hdr_2, ct_2) # good header, different IV | |
562 | try: | |
563 | decr_pt_2 = dec (hdr_2_mod, ct_2) | |
564 | except crypto.DuplicateIV: # bad header, reuse detected | |
565 | pass | |
566 | ||
567 | ||
cb7397d5 PG |
568 | class HeaderTest (CryptoLayerTest): |
569 | ||
e2a4e4f0 PG |
570 | def test_crypto_fmt_hdr_make (self): |
571 | meta = faux_hdr() | |
572 | ok, hdr = crypto.hdr_make (meta) | |
573 | assert ok | |
90ee2a74 | 574 | assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE |
e2a4e4f0 PG |
575 | |
576 | ||
cb7397d5 PG |
577 | def test_crypto_fmt_hdr_make_useless (self): |
578 | ok, ret = crypto.hdr_make ({ 42: "x" }) | |
579 | assert ok is False | |
580 | assert ret.startswith ("error writing header:") | |
581 | ||
582 | ||
e2a4e4f0 PG |
583 | def test_crypto_fmt_hdr_read (self): |
584 | meta = faux_hdr() | |
585 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
586 | assert ok is True |
587 | assert hdr is not None | |
588 | mmeta = crypto.hdr_read (hdr) | |
589 | assert mmeta is not None | |
e2a4e4f0 PG |
590 | for k in meta: |
591 | if meta [k] != mmeta [k]: | |
592 | raise "header mismatch after reading: expected %r, got %r" \ | |
593 | % (meta [k], mmeta [k]) | |
594 | ||
595 | ||
cb7397d5 PG |
596 | def test_crypto_fmt_hdr_read_trailing_garbage (self): |
597 | meta = faux_hdr() | |
598 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
599 | ok, hdr = crypto.hdr_make (meta) |
600 | assert ok is True | |
601 | assert hdr is not None | |
cb7397d5 | 602 | hdr += b"-junk" |
97ba515a | 603 | try: |
90ee2a74 | 604 | _ = crypto.hdr_read (hdr) |
97ba515a PG |
605 | except crypto.InvalidHeader: |
606 | pass | |
cb7397d5 PG |
607 | |
608 | ||
609 | def test_crypto_fmt_hdr_read_leading_garbage (self): | |
610 | meta = faux_hdr() | |
611 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
612 | ok, hdr = crypto.hdr_make (meta) |
613 | assert ok is True | |
614 | assert hdr is not None | |
cb7397d5 | 615 | hdr = b"junk-" + hdr |
97ba515a | 616 | try: |
90ee2a74 | 617 | _ = crypto.hdr_read (hdr) |
97ba515a PG |
618 | except crypto.InvalidHeader: |
619 | pass | |
cb7397d5 PG |
620 | |
621 | ||
622 | def test_crypto_fmt_hdr_inner_garbage (self): | |
623 | meta = faux_hdr() | |
624 | ok, hdr = crypto.hdr_make (meta) | |
c176405d | 625 | assert ok |
90ee2a74 | 626 | data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:] |
97ba515a | 627 | try: |
90ee2a74 | 628 | _ = crypto.hdr_read (data) |
97ba515a PG |
629 | except crypto.InvalidHeader: |
630 | pass | |
c176405d | 631 |