Commit | Line | Data |
---|---|---|
5133232d PG |
1 | import binascii |
2 | import os | |
3 | import pylibscrypt | |
c2d1c3ec | 4 | import struct |
5133232d PG |
5 | import unittest |
6 | ||
7 | import deltatar.crypto as crypto | |
8 | ||
e2a4e4f0 PG |
9 | import cryptography |
10 | ||
5133232d PG |
11 | def b(s): |
12 | return s.encode("UTF-8") | |
13 | ||
90ee2a74 | 14 | CRYPTO_NACL_SIZE = 16 |
5133232d | 15 | CRYPTO_KEY_SIZE = 16 |
90ee2a74 PG |
16 | |
17 | TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail") | |
18 | TEST_PASSPHRASE = b"test1234" | |
19 | TEST_AES_GCM_AAD = b"authenticated plain text" | |
20 | TEST_DUMMY_FILENAME = "insurance-file.txt" | |
21 | TEST_VERSION = 1 | |
22 | TEST_PARAMVERSION = 1 | |
23 | TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE) | |
5133232d PG |
24 | |
25 | def faux_hdr (ctsize=1337, iv=None): | |
26 | return \ | |
27 | { "version" : 42 | |
28 | , "paramversion" : 2187 | |
29 | , "nacl" : binascii.unhexlify(b"0011223344556677" | |
30 | b"8899aabbccddeeff") | |
31 | , "iv" : iv or binascii.unhexlify(b"0011223344556677" | |
32 | b"8899aabb") | |
33 | , "ctsize" : ctsize | |
90ee2a74 PG |
34 | , "tag" : binascii.unhexlify(b"deadbeefbadb100d" |
35 | b"b1eedc0ffeedea15") | |
5133232d PG |
36 | } |
37 | ||
30019abf | 38 | FILL_MOD_MEMO = { } |
5133232d | 39 | |
fd10b44a | 40 | def fill_mod (n, off=0): |
30019abf PG |
41 | global FILL_MOD_MEMO |
42 | k = (n, off) | |
43 | m = FILL_MOD_MEMO.get (k, None) | |
44 | if m is not None: | |
45 | return m | |
c2d1c3ec PG |
46 | buf = bytearray (n) |
47 | bufv = memoryview (buf) | |
48 | for i in range (n): | |
fd10b44a PG |
49 | off += 1 |
50 | c = off % 64 + 32 | |
c2d1c3ec | 51 | struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8")) |
30019abf PG |
52 | m = bytes (buf) |
53 | FILL_MOD_MEMO [k] = m | |
54 | return m | |
c2d1c3ec PG |
55 | |
56 | ||
5133232d PG |
57 | def faux_payload (): |
58 | return "abcd" * 42 | |
59 | ||
cb7397d5 | 60 | |
5133232d | 61 | class CryptoLayerTest (unittest.TestCase): |
cb7397d5 PG |
62 | pass |
63 | ||
64 | ||
65 | class AESGCMTest (CryptoLayerTest): | |
5133232d | 66 | |
be124bca PG |
67 | os_urandom = os.urandom |
68 | ||
cb7a3911 PG |
69 | def tearDown (self): |
70 | """Reset globals altered for testing.""" | |
71 | _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \ | |
72 | ("I am fully aware that this will void my warranty.") | |
73 | _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ | |
74 | ("I am fully aware that this will void my warranty.") | |
be124bca | 75 | os.urandom = self.os_urandom |
cb7a3911 | 76 | |
90ee2a74 PG |
77 | def test_crypto_aes_gcm_enc_ctor (self): |
78 | password = str (os.urandom (42)) | |
1f3fd7b0 PG |
79 | encryptor = crypto.Encrypt (TEST_VERSION, |
80 | TEST_PARAMVERSION, | |
81 | password=password, | |
82 | nacl=TEST_STATIC_NACL) | |
83 | ||
84 | ||
85 | def test_crypto_aes_gcm_enc_ctor_key (self): | |
86 | key = os.urandom (42) | |
87 | encryptor = crypto.Encrypt (TEST_VERSION, | |
88 | TEST_PARAMVERSION, | |
89 | key=key, | |
90 | nacl=TEST_STATIC_NACL) | |
91 | ||
92 | ||
93 | def test_crypto_aes_gcm_enc_ctor_no_key_pw (self): | |
94 | """ | |
95 | Either key (+nacl) or password must be supplied, not both. | |
96 | """ | |
1c2f7f07 | 97 | with self.assertRaises (crypto.InvalidParameter): # neither key nor pw |
1f3fd7b0 PG |
98 | encryptor = crypto.Encrypt (TEST_VERSION, |
99 | TEST_PARAMVERSION, | |
100 | nacl=TEST_STATIC_NACL) | |
1f3fd7b0 PG |
101 | |
102 | password = str (os.urandom (42)) | |
103 | key = os.urandom (16) # scrypt sized | |
1c2f7f07 | 104 | with self.assertRaises (crypto.InvalidParameter): # both key and pw |
1f3fd7b0 PG |
105 | encryptor = crypto.Encrypt (TEST_VERSION, |
106 | TEST_PARAMVERSION, | |
107 | password=password, | |
108 | key=key, | |
109 | nacl=TEST_STATIC_NACL) | |
1f3fd7b0 | 110 | |
1c2f7f07 | 111 | with self.assertRaises (crypto.InvalidParameter): # key, but salt missing |
1f3fd7b0 PG |
112 | encryptor = crypto.Encrypt (TEST_VERSION, |
113 | TEST_PARAMVERSION, | |
114 | key=key, | |
115 | nacl=None) | |
1f3fd7b0 | 116 | |
1c2f7f07 | 117 | with self.assertRaises (crypto.InvalidParameter): # empty pw |
1f3fd7b0 PG |
118 | encryptor = crypto.Encrypt (TEST_VERSION, |
119 | TEST_PARAMVERSION, | |
120 | password=b"", | |
121 | nacl=TEST_STATIC_NACL) | |
90ee2a74 PG |
122 | |
123 | ||
124 | def test_crypto_aes_gcm_enc_header_size (self): | |
125 | password = str (os.urandom (42)) | |
1f3fd7b0 | 126 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 127 | TEST_PARAMVERSION, |
1f3fd7b0 | 128 | password=password, |
90ee2a74 PG |
129 | nacl=TEST_STATIC_NACL) |
130 | ||
131 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
132 | assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE | |
cb7a3911 | 133 | _, _ = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
134 | _, header, _ = encryptor.done (header_dummy) |
135 | assert len (header) == crypto.PDTCRYPT_HDR_SIZE | |
5133232d | 136 | |
e2a4e4f0 | 137 | |
5133232d | 138 | def test_crypto_aes_gcm_enc_chunk_size (self): |
90ee2a74 | 139 | password = str (os.urandom (42)) |
1f3fd7b0 | 140 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 141 | TEST_PARAMVERSION, |
1f3fd7b0 | 142 | password=password, |
90ee2a74 PG |
143 | nacl=TEST_STATIC_NACL) |
144 | ||
145 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 146 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
147 | assert len (ciphertext) == len (TEST_PLAINTEXT) |
148 | rest, header, fixed = encryptor.done (header_dummy) | |
149 | assert len (rest) == 0 | |
5133232d | 150 | |
e2a4e4f0 | 151 | |
1f3fd7b0 PG |
152 | def test_crypto_aes_gcm_dec_ctor (self): |
153 | """ | |
154 | Ensure that only either key or password is accepted. | |
155 | """ | |
156 | password = str (os.urandom (42)) | |
157 | key = os.urandom (16) # scrypt sized | |
158 | ||
159 | decryptor = crypto.Decrypt (password=password) | |
160 | decryptor = crypto.Decrypt (key=key) | |
161 | ||
1c2f7f07 | 162 | with self.assertRaises (crypto.InvalidParameter): # both password and key |
1f3fd7b0 | 163 | decryptor = crypto.Decrypt (password=password, key=key) |
1f3fd7b0 | 164 | |
1c2f7f07 | 165 | with self.assertRaises (crypto.InvalidParameter): # neither password nor key |
1f3fd7b0 | 166 | decryptor = crypto.Decrypt (password=None, key=None) |
1f3fd7b0 | 167 | |
1c2f7f07 | 168 | with self.assertRaises (crypto.InvalidParameter): # # empty password |
1f3fd7b0 | 169 | decryptor = crypto.Decrypt (password="") |
1f3fd7b0 | 170 | |
30019abf | 171 | |
e2a4e4f0 | 172 | def test_crypto_aes_gcm_dec_simple (self): |
90ee2a74 | 173 | password = str (os.urandom (42)) |
1f3fd7b0 | 174 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 175 | TEST_PARAMVERSION, |
1f3fd7b0 | 176 | password=password, |
90ee2a74 PG |
177 | nacl=TEST_STATIC_NACL) |
178 | ||
179 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 180 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
181 | rest, header, fixed = encryptor.done (header_dummy) |
182 | ciphertext += rest | |
183 | ||
1f3fd7b0 | 184 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) |
90ee2a74 PG |
185 | decryptor.next (header) |
186 | plaintext = decryptor.process (ciphertext) | |
3ba1441c | 187 | rest = decryptor.done () |
90ee2a74 PG |
188 | plaintext += rest |
189 | ||
90ee2a74 | 190 | assert plaintext == TEST_PLAINTEXT |
7c32c176 PG |
191 | |
192 | ||
e2a4e4f0 | 193 | def test_crypto_aes_gcm_dec_bad_tag (self): |
90ee2a74 | 194 | password = str (os.urandom (42)) |
1f3fd7b0 | 195 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 196 | TEST_PARAMVERSION, |
1f3fd7b0 | 197 | password=password, |
90ee2a74 PG |
198 | nacl=TEST_STATIC_NACL) |
199 | ||
200 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 201 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
202 | ciphertext2, header, fixed = encryptor.done (header_dummy) |
203 | ||
204 | mut_header = bytearray (header) | |
205 | mut_header_vw = memoryview (mut_header) | |
206 | # replace one byte in the tag part of the header | |
207 | second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2] | |
208 | mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 | |
209 | header = bytes (mut_header) | |
210 | ||
1f3fd7b0 | 211 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) |
90ee2a74 PG |
212 | decryptor.next (header) |
213 | plaintext = decryptor.process (ciphertext) | |
1c2f7f07 | 214 | with self.assertRaises (crypto.InvalidGCMTag): |
3ba1441c | 215 | _ = decryptor.done () |
e2a4e4f0 PG |
216 | |
217 | ||
c2d1c3ec PG |
218 | def test_crypto_aes_gcm_enc_multicnk (self): |
219 | cnksiz = 1 << 10 | |
90ee2a74 PG |
220 | pt = fill_mod (1 << 14) |
221 | password = str (os.urandom (42)) | |
1f3fd7b0 | 222 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 223 | TEST_PARAMVERSION, |
1f3fd7b0 | 224 | password=password, |
90ee2a74 PG |
225 | nacl=TEST_STATIC_NACL) |
226 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
c2d1c3ec PG |
227 | |
228 | off = 0 | |
229 | ct = b"" | |
90ee2a74 PG |
230 | while off < len (pt): |
231 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 232 | _, cnk = encryptor.process (pt [off:upto]) |
c2d1c3ec PG |
233 | ct += cnk |
234 | off += cnksiz | |
90ee2a74 PG |
235 | cnk, header, fixed = encryptor.done (header_dummy) |
236 | ct += cnk | |
237 | ||
238 | assert len (pt) == len (ct) | |
c2d1c3ec PG |
239 | |
240 | ||
30019abf PG |
241 | def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self): |
242 | cnksiz = 1 << 10 | |
243 | pt = fill_mod (1 << 14) | |
244 | password = str (os.urandom (42)) | |
245 | encryptor = crypto.Encrypt (TEST_VERSION, | |
246 | TEST_PARAMVERSION, | |
247 | password=password, | |
248 | nacl=TEST_STATIC_NACL, | |
249 | strict_ivs=True) | |
250 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
251 | ||
252 | off = 0 | |
253 | ct = b"" | |
254 | while off < len (pt): | |
255 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 256 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
257 | ct += cnk |
258 | off += cnksiz | |
259 | cnk, header, fixed = encryptor.done (header_dummy) | |
260 | ct += cnk | |
261 | ||
262 | assert len (pt) == len (ct) | |
263 | ||
264 | ||
265 | def test_crypto_aes_gcm_enc_multiobj (self): | |
266 | cnksiz = 1 << 10 | |
267 | password = str (os.urandom (42)) | |
268 | encryptor = crypto.Encrypt (TEST_VERSION, | |
269 | TEST_PARAMVERSION, | |
270 | password=password, | |
271 | nacl=TEST_STATIC_NACL, | |
272 | strict_ivs=False) | |
273 | ||
274 | def addobj (i): | |
275 | pt = fill_mod (1 << 14, off=i) | |
276 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
277 | ||
278 | off = 0 | |
279 | ct = b"" | |
280 | while off < len (pt): | |
281 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 282 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
283 | ct += cnk |
284 | off += cnksiz | |
285 | cnk, header, fixed = encryptor.done (header_dummy) | |
286 | ct += cnk | |
287 | ||
288 | assert len (pt) == len (ct) | |
289 | ||
290 | for i in range (5): addobj (i) | |
291 | ||
be124bca PG |
292 | assert len (encryptor.fixed) == 1 |
293 | ||
30019abf PG |
294 | |
295 | def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self): | |
296 | cnksiz = 1 << 10 | |
297 | password = str (os.urandom (42)) | |
298 | encryptor = crypto.Encrypt (TEST_VERSION, | |
299 | TEST_PARAMVERSION, | |
300 | password=password, | |
301 | nacl=TEST_STATIC_NACL, | |
302 | strict_ivs=True) | |
be124bca | 303 | curfixed = None # must remain constant after first |
30019abf PG |
304 | |
305 | def addobj (i): | |
306 | pt = fill_mod (1 << 14, off=i) | |
307 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
308 | ||
309 | off = 0 | |
310 | ct = b"" | |
311 | while off < len (pt): | |
312 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 313 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
314 | ct += cnk |
315 | off += cnksiz | |
316 | cnk, header, fixed = encryptor.done (header_dummy) | |
be124bca PG |
317 | nonlocal curfixed |
318 | if curfixed is None: | |
319 | curfixed = fixed | |
320 | else: | |
321 | assert fixed == curfixed | |
30019abf PG |
322 | ct += cnk |
323 | ||
324 | assert len (pt) == len (ct) | |
325 | ||
326 | for i in range (5): addobj (i) | |
327 | ||
be124bca PG |
328 | assert len (encryptor.fixed) == 1 |
329 | ||
30019abf | 330 | |
770173c5 PG |
331 | def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self): |
332 | """ | |
333 | Test behavior when the file counter tops out. | |
334 | ||
335 | Artificially lower the maximum possible file counter. Considering | |
cb7a3911 | 336 | invalid (0) and reserved (1, 2) values, the smallest possible file counter |
770173c5 PG |
337 | for normal objects is 3. Starting from that, the header of the (max - |
338 | 3)rd object must have both a different IV fixed part and a counter. | |
339 | """ | |
340 | minimum = 3 | |
341 | new_max = 8 | |
cb7a3911 PG |
342 | crypto._testing_set_AES_GCM_IV_CNT_MAX \ |
343 | ("I am fully aware that this will void my warranty.", new_max) | |
770173c5 PG |
344 | cnksiz = 1 << 10 |
345 | password = str (os.urandom (42)) | |
346 | encryptor = crypto.Encrypt (TEST_VERSION, | |
347 | TEST_PARAMVERSION, | |
348 | password=password, | |
349 | nacl=TEST_STATIC_NACL, | |
350 | strict_ivs=True) | |
351 | ||
352 | last_iv = None | |
353 | last_cnt = minimum | |
354 | ||
355 | def addobj (i, wrap=False): | |
356 | nonlocal last_iv | |
357 | nonlocal last_cnt | |
358 | pt = fill_mod (1 << 14, off=i) | |
359 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
360 | ||
361 | off = 0 | |
362 | ct = b"" | |
363 | while off < len (pt): | |
364 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 365 | _, cnk = encryptor.process (pt [off:upto]) |
770173c5 PG |
366 | ct += cnk |
367 | off += cnksiz | |
368 | cnk, header, fixed = encryptor.done (header_dummy) | |
369 | this_iv = crypto.hdr_read (header) ["iv"] | |
370 | if last_iv is not None: | |
371 | this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv) | |
372 | last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv) | |
373 | if wrap is False: | |
374 | assert last_fixed == this_fixed | |
375 | assert last_cnt == this_cnt - 1 | |
376 | else: | |
377 | assert last_fixed != this_fixed | |
378 | assert this_cnt == minimum | |
379 | last_iv = this_iv | |
380 | ct += cnk | |
381 | ||
382 | assert len (pt) == len (ct) | |
383 | ||
384 | for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8] | |
385 | addobj (i + 1, True) # counter wraps to 3 | |
386 | ||
387 | for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8] | |
388 | addobj (j + 1, True) # counter wraps to 3 again | |
389 | ||
be124bca PG |
390 | assert len (encryptor.fixed) == 3 |
391 | ||
392 | ||
393 | def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self): | |
394 | """ | |
395 | Test behavior when the file counter tops out and the transition to | |
396 | the next IV fixed part fails on account of a bad random generator. | |
397 | ||
398 | Replaces the ``urandom`` reference in ``os`` with a deterministic | |
399 | function. The encryptor context must communicate this condition with an | |
400 | ``IVFixedPartError``. | |
401 | """ | |
402 | minimum = 3 | |
403 | new_max = 8 | |
404 | crypto._testing_set_AES_GCM_IV_CNT_MAX \ | |
405 | ("I am fully aware that this will void my warranty.", new_max) | |
406 | cnksiz = 1 << 10 | |
407 | os.urandom = lambda n: bytes (bytearray ([n % 256] * n)) | |
408 | password = str (os.urandom (42)) | |
409 | encryptor = crypto.Encrypt (TEST_VERSION, | |
410 | TEST_PARAMVERSION, | |
411 | password=password, | |
412 | nacl=TEST_STATIC_NACL, | |
413 | strict_ivs=True) | |
414 | ||
415 | def addobj (i): | |
416 | pt = fill_mod (1 << 14, off=i) | |
417 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
418 | ||
419 | off = 0 | |
420 | while off < len (pt): | |
421 | upto = min (off + cnksiz, len (pt)) | |
422 | _, cnk = encryptor.process (pt [off:upto]) | |
423 | off += cnksiz | |
424 | ||
425 | for i in range (minimum, new_max): addobj (42 + i) | |
426 | ||
427 | with self.assertRaises (crypto.IVFixedPartError): | |
428 | addobj (42 + i) | |
429 | ||
430 | ||
cb7a3911 PG |
431 | |
432 | def test_crypto_aes_gcm_enc_length_cap (self): | |
433 | """ | |
434 | Artificially lower the maximum allowable data length and attempt to | |
435 | encrypt a larger object. Verify that the crypto handler aborts with and | |
436 | exception. | |
437 | """ | |
438 | new_max = 2187 | |
439 | crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ | |
440 | ("I am fully aware that this will void my warranty.", new_max) | |
441 | cnksiz = 1 << 10 | |
442 | password = str (os.urandom (42)) | |
443 | encryptor = crypto.Encrypt (TEST_VERSION, | |
444 | TEST_PARAMVERSION, | |
445 | password=password, | |
446 | nacl=TEST_STATIC_NACL) | |
447 | ||
448 | def encobj (s): | |
449 | pt, ct = fill_mod (s), None | |
450 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s)) | |
451 | ||
452 | n, ct = encryptor.process (pt) | |
453 | rest, _, _ = encryptor.done (header_dummy) | |
454 | ct += rest | |
455 | ||
456 | if len (pt) > new_max: | |
457 | assert n < len (pt) | |
458 | else: | |
459 | assert n == len (pt) == len (ct) | |
460 | ||
461 | for i in range (16): encobj (1 << i) | |
770173c5 PG |
462 | |
463 | ||
c2d1c3ec | 464 | def test_crypto_aes_gcm_dec_multicnk (self): |
90ee2a74 PG |
465 | cnksiz = 1 << 10 |
466 | orig_pt = fill_mod (1 << 14) | |
467 | password = str (os.urandom (42)) | |
1f3fd7b0 | 468 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 469 | TEST_PARAMVERSION, |
1f3fd7b0 | 470 | password=password, |
90ee2a74 PG |
471 | nacl=TEST_STATIC_NACL) |
472 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
c2d1c3ec PG |
473 | |
474 | off = 0 | |
475 | ct = b"" | |
476 | while off < len (orig_pt): | |
477 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 478 | _n, cnk = encryptor.process (orig_pt [off:upto]) |
c2d1c3ec PG |
479 | ct += cnk |
480 | off += cnksiz | |
90ee2a74 PG |
481 | cnk, header, fixed = encryptor.done (header_dummy) |
482 | ct += cnk | |
c2d1c3ec | 483 | |
1f3fd7b0 PG |
484 | decryptor = crypto.Decrypt (password=password, |
485 | fixedparts=fixed) | |
90ee2a74 | 486 | decryptor.next (header) |
c2d1c3ec | 487 | off = 0 |
90ee2a74 | 488 | pt = b"" |
c2d1c3ec PG |
489 | while off < len (orig_pt): |
490 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 491 | cnk = decryptor.process (ct [off:upto]) |
c2d1c3ec PG |
492 | pt += cnk |
493 | off += cnksiz | |
c2d1c3ec | 494 | |
90ee2a74 | 495 | |
3ba1441c | 496 | pt += decryptor.done () |
c2d1c3ec PG |
497 | assert pt == orig_pt |
498 | ||
499 | ||
7c32c176 | 500 | def test_crypto_aes_gcm_dec_multicnk_bad_tag (self): |
90ee2a74 PG |
501 | cnksiz = 1 << 10 |
502 | orig_pt = fill_mod (1 << 14) | |
503 | password = str (os.urandom (42)) | |
1f3fd7b0 | 504 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 505 | TEST_PARAMVERSION, |
1f3fd7b0 | 506 | password=password, |
90ee2a74 PG |
507 | nacl=TEST_STATIC_NACL) |
508 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
7c32c176 PG |
509 | |
510 | off = 0 | |
511 | ct = b"" | |
512 | while off < len (orig_pt): | |
513 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 514 | _n, cnk = encryptor.process (orig_pt [off:upto]) |
7c32c176 PG |
515 | ct += cnk |
516 | off += cnksiz | |
90ee2a74 PG |
517 | cnk, header, fixed = encryptor.done (header_dummy) |
518 | ct += cnk | |
519 | ||
520 | mut_header = bytearray (header) | |
521 | mut_header_vw = memoryview (mut_header) | |
522 | # replace one byte in the tag part of the header | |
523 | second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2] | |
524 | mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 | |
525 | header = bytes (mut_header) | |
526 | ||
1f3fd7b0 PG |
527 | decryptor = crypto.Decrypt (password=password, |
528 | fixedparts=fixed) | |
90ee2a74 | 529 | decryptor.next (header) |
7c32c176 | 530 | off = 0 |
90ee2a74 | 531 | pt = b"" |
7c32c176 PG |
532 | while off < len (orig_pt): |
533 | upto = min (off + cnksiz, len (orig_pt)) | |
90ee2a74 | 534 | cnk = decryptor.process (ct [off:upto]) |
7c32c176 PG |
535 | pt += cnk |
536 | off += cnksiz | |
537 | ||
1c2f7f07 | 538 | with self.assertRaises (crypto.InvalidGCMTag): |
3ba1441c | 539 | _ = decryptor.done () |
da4ea4fa PG |
540 | |
541 | ||
fd10b44a PG |
542 | def test_crypto_aes_gcm_dec_iv_reuse (self): |
543 | """ | |
544 | Meddle with encrypted content: extract the IV from one object | |
545 | and inject it into the header of another. This must be rejected | |
546 | by the decryptor. | |
547 | """ | |
548 | cnksiz = 1 << 10 | |
549 | orig_pt_1 = fill_mod (1 << 10) | |
550 | orig_pt_2 = fill_mod (1 << 10, 42) | |
551 | password = str (os.urandom (42)) | |
1f3fd7b0 | 552 | encryptor = crypto.Encrypt (TEST_VERSION, |
fd10b44a | 553 | TEST_PARAMVERSION, |
1f3fd7b0 | 554 | password=password, |
fd10b44a PG |
555 | nacl=TEST_STATIC_NACL) |
556 | ||
557 | def enc (pt): | |
558 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
559 | ||
560 | off = 0 | |
561 | ct = b"" | |
562 | while off < len (pt): | |
563 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 564 | _n, cnk = encryptor.process (pt [off:upto]) |
fd10b44a PG |
565 | ct += cnk |
566 | off += cnksiz | |
567 | cnk, header, fixed = encryptor.done (header_dummy) | |
568 | return ct + cnk, header, fixed | |
569 | ||
570 | ct_1, hdr_1, _____ = enc (orig_pt_1) | |
571 | ct_2, hdr_2, fixed = enc (orig_pt_2) | |
572 | ||
573 | mut_hdr_2 = bytearray (hdr_2) | |
574 | mut_hdr_2_vw = memoryview (mut_hdr_2) | |
575 | # get IV | |
576 | iv_lo = crypto.HDR_OFF_IV | |
577 | iv_hi = crypto.HDR_OFF_IV + crypto.PDTCRYPT_HDR_SIZE_IV | |
578 | iv_1 = hdr_1 [iv_lo : iv_hi] | |
579 | # transplant into other header | |
580 | mut_hdr_2_vw [iv_lo : iv_hi] = iv_1 | |
581 | hdr_2_mod = bytes (mut_hdr_2) | |
1f3fd7b0 | 582 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed, |
fd10b44a PG |
583 | strict_ivs=True) |
584 | ||
585 | def dec (hdr, ct): | |
586 | decryptor.next (hdr) | |
587 | off = 0 | |
588 | pt = b"" | |
589 | while off < len (ct): | |
590 | upto = min (off + cnksiz, len (ct)) | |
591 | cnk = decryptor.process (ct [off:upto]) | |
592 | pt += cnk | |
593 | off += cnksiz | |
594 | return pt + decryptor.done () | |
595 | ||
596 | decr_pt_1 = dec (hdr_1, ct_1) | |
597 | decr_pt_2 = dec (hdr_2, ct_2) # good header, different IV | |
1c2f7f07 | 598 | with self.assertRaises (crypto.DuplicateIV): # bad header, reuse detected |
fd10b44a | 599 | decr_pt_2 = dec (hdr_2_mod, ct_2) |
fd10b44a PG |
600 | |
601 | ||
cb7397d5 PG |
602 | class HeaderTest (CryptoLayerTest): |
603 | ||
e2a4e4f0 PG |
604 | def test_crypto_fmt_hdr_make (self): |
605 | meta = faux_hdr() | |
606 | ok, hdr = crypto.hdr_make (meta) | |
607 | assert ok | |
90ee2a74 | 608 | assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE |
e2a4e4f0 PG |
609 | |
610 | ||
cb7397d5 PG |
611 | def test_crypto_fmt_hdr_make_useless (self): |
612 | ok, ret = crypto.hdr_make ({ 42: "x" }) | |
613 | assert ok is False | |
a83fa4ed | 614 | assert ret.startswith ("error assembling header:") |
cb7397d5 PG |
615 | |
616 | ||
e2a4e4f0 PG |
617 | def test_crypto_fmt_hdr_read (self): |
618 | meta = faux_hdr() | |
619 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
620 | assert ok is True |
621 | assert hdr is not None | |
622 | mmeta = crypto.hdr_read (hdr) | |
623 | assert mmeta is not None | |
e2a4e4f0 PG |
624 | for k in meta: |
625 | if meta [k] != mmeta [k]: | |
626 | raise "header mismatch after reading: expected %r, got %r" \ | |
627 | % (meta [k], mmeta [k]) | |
628 | ||
629 | ||
cb7397d5 PG |
630 | def test_crypto_fmt_hdr_read_trailing_garbage (self): |
631 | meta = faux_hdr() | |
632 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
633 | ok, hdr = crypto.hdr_make (meta) |
634 | assert ok is True | |
635 | assert hdr is not None | |
cb7397d5 | 636 | hdr += b"-junk" |
1c2f7f07 | 637 | with self.assertRaises (crypto.InvalidHeader): |
90ee2a74 | 638 | _ = crypto.hdr_read (hdr) |
cb7397d5 PG |
639 | |
640 | ||
641 | def test_crypto_fmt_hdr_read_leading_garbage (self): | |
642 | meta = faux_hdr() | |
643 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
644 | ok, hdr = crypto.hdr_make (meta) |
645 | assert ok is True | |
646 | assert hdr is not None | |
cb7397d5 | 647 | hdr = b"junk-" + hdr |
1c2f7f07 | 648 | with self.assertRaises (crypto.InvalidHeader): |
90ee2a74 | 649 | _ = crypto.hdr_read (hdr) |
cb7397d5 PG |
650 | |
651 | ||
652 | def test_crypto_fmt_hdr_inner_garbage (self): | |
653 | meta = faux_hdr() | |
654 | ok, hdr = crypto.hdr_make (meta) | |
c176405d | 655 | assert ok |
90ee2a74 | 656 | data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:] |
1c2f7f07 | 657 | with self.assertRaises (crypto.InvalidHeader): |
90ee2a74 | 658 | _ = crypto.hdr_read (data) |
c176405d | 659 |