Commit | Line | Data |
---|---|---|
5133232d PG |
1 | import binascii |
2 | import os | |
3 | import pylibscrypt | |
c2d1c3ec | 4 | import struct |
5133232d PG |
5 | import unittest |
6 | ||
7 | import deltatar.crypto as crypto | |
8 | ||
e2a4e4f0 PG |
9 | import cryptography |
10 | ||
5133232d PG |
11 | def b(s): |
12 | return s.encode("UTF-8") | |
13 | ||
90ee2a74 | 14 | CRYPTO_NACL_SIZE = 16 |
5133232d | 15 | CRYPTO_KEY_SIZE = 16 |
90ee2a74 PG |
16 | |
17 | TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail") | |
18 | TEST_PASSPHRASE = b"test1234" | |
19 | TEST_AES_GCM_AAD = b"authenticated plain text" | |
20 | TEST_DUMMY_FILENAME = "insurance-file.txt" | |
21 | TEST_VERSION = 1 | |
22 | TEST_PARAMVERSION = 1 | |
23 | TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE) | |
5133232d PG |
24 | |
25 | def faux_hdr (ctsize=1337, iv=None): | |
26 | return \ | |
27 | { "version" : 42 | |
28 | , "paramversion" : 2187 | |
29 | , "nacl" : binascii.unhexlify(b"0011223344556677" | |
30 | b"8899aabbccddeeff") | |
31 | , "iv" : iv or binascii.unhexlify(b"0011223344556677" | |
32 | b"8899aabb") | |
33 | , "ctsize" : ctsize | |
90ee2a74 PG |
34 | , "tag" : binascii.unhexlify(b"deadbeefbadb100d" |
35 | b"b1eedc0ffeedea15") | |
5133232d PG |
36 | } |
37 | ||
30019abf | 38 | FILL_MOD_MEMO = { } |
5133232d | 39 | |
fd10b44a | 40 | def fill_mod (n, off=0): |
30019abf PG |
41 | global FILL_MOD_MEMO |
42 | k = (n, off) | |
43 | m = FILL_MOD_MEMO.get (k, None) | |
44 | if m is not None: | |
45 | return m | |
c2d1c3ec PG |
46 | buf = bytearray (n) |
47 | bufv = memoryview (buf) | |
48 | for i in range (n): | |
fd10b44a PG |
49 | off += 1 |
50 | c = off % 64 + 32 | |
c2d1c3ec | 51 | struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8")) |
30019abf PG |
52 | m = bytes (buf) |
53 | FILL_MOD_MEMO [k] = m | |
54 | return m | |
c2d1c3ec PG |
55 | |
56 | ||
5133232d PG |
57 | def faux_payload (): |
58 | return "abcd" * 42 | |
59 | ||
cb7397d5 | 60 | |
5133232d | 61 | class CryptoLayerTest (unittest.TestCase): |
cb7397d5 PG |
62 | pass |
63 | ||
64 | ||
65 | class AESGCMTest (CryptoLayerTest): | |
5133232d | 66 | |
be124bca PG |
67 | os_urandom = os.urandom |
68 | ||
cb7a3911 PG |
69 | def tearDown (self): |
70 | """Reset globals altered for testing.""" | |
71 | _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \ | |
72 | ("I am fully aware that this will void my warranty.") | |
73 | _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ | |
74 | ("I am fully aware that this will void my warranty.") | |
be124bca | 75 | os.urandom = self.os_urandom |
cb7a3911 | 76 | |
90ee2a74 PG |
77 | def test_crypto_aes_gcm_enc_ctor (self): |
78 | password = str (os.urandom (42)) | |
1f3fd7b0 PG |
79 | encryptor = crypto.Encrypt (TEST_VERSION, |
80 | TEST_PARAMVERSION, | |
81 | password=password, | |
82 | nacl=TEST_STATIC_NACL) | |
83 | ||
84 | ||
85 | def test_crypto_aes_gcm_enc_ctor_key (self): | |
86 | key = os.urandom (42) | |
87 | encryptor = crypto.Encrypt (TEST_VERSION, | |
88 | TEST_PARAMVERSION, | |
89 | key=key, | |
90 | nacl=TEST_STATIC_NACL) | |
91 | ||
92 | ||
93 | def test_crypto_aes_gcm_enc_ctor_no_key_pw (self): | |
94 | """ | |
95 | Either key (+nacl) or password must be supplied, not both. | |
96 | """ | |
1c2f7f07 | 97 | with self.assertRaises (crypto.InvalidParameter): # neither key nor pw |
1f3fd7b0 PG |
98 | encryptor = crypto.Encrypt (TEST_VERSION, |
99 | TEST_PARAMVERSION, | |
100 | nacl=TEST_STATIC_NACL) | |
1f3fd7b0 PG |
101 | |
102 | password = str (os.urandom (42)) | |
103 | key = os.urandom (16) # scrypt sized | |
1c2f7f07 | 104 | with self.assertRaises (crypto.InvalidParameter): # both key and pw |
1f3fd7b0 PG |
105 | encryptor = crypto.Encrypt (TEST_VERSION, |
106 | TEST_PARAMVERSION, | |
107 | password=password, | |
108 | key=key, | |
109 | nacl=TEST_STATIC_NACL) | |
1f3fd7b0 | 110 | |
1c2f7f07 | 111 | with self.assertRaises (crypto.InvalidParameter): # key, but salt missing |
1f3fd7b0 PG |
112 | encryptor = crypto.Encrypt (TEST_VERSION, |
113 | TEST_PARAMVERSION, | |
114 | key=key, | |
115 | nacl=None) | |
1f3fd7b0 | 116 | |
1c2f7f07 | 117 | with self.assertRaises (crypto.InvalidParameter): # empty pw |
1f3fd7b0 PG |
118 | encryptor = crypto.Encrypt (TEST_VERSION, |
119 | TEST_PARAMVERSION, | |
120 | password=b"", | |
121 | nacl=TEST_STATIC_NACL) | |
90ee2a74 PG |
122 | |
123 | ||
124 | def test_crypto_aes_gcm_enc_header_size (self): | |
125 | password = str (os.urandom (42)) | |
1f3fd7b0 | 126 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 127 | TEST_PARAMVERSION, |
1f3fd7b0 | 128 | password=password, |
90ee2a74 PG |
129 | nacl=TEST_STATIC_NACL) |
130 | ||
131 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
132 | assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE | |
cb7a3911 | 133 | _, _ = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
134 | _, header, _ = encryptor.done (header_dummy) |
135 | assert len (header) == crypto.PDTCRYPT_HDR_SIZE | |
5133232d | 136 | |
e2a4e4f0 | 137 | |
5133232d | 138 | def test_crypto_aes_gcm_enc_chunk_size (self): |
90ee2a74 | 139 | password = str (os.urandom (42)) |
1f3fd7b0 | 140 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 141 | TEST_PARAMVERSION, |
1f3fd7b0 | 142 | password=password, |
90ee2a74 PG |
143 | nacl=TEST_STATIC_NACL) |
144 | ||
145 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 146 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
147 | assert len (ciphertext) == len (TEST_PLAINTEXT) |
148 | rest, header, fixed = encryptor.done (header_dummy) | |
149 | assert len (rest) == 0 | |
5133232d | 150 | |
e2a4e4f0 | 151 | |
1f3fd7b0 PG |
152 | def test_crypto_aes_gcm_dec_ctor (self): |
153 | """ | |
154 | Ensure that only either key or password is accepted. | |
155 | """ | |
156 | password = str (os.urandom (42)) | |
157 | key = os.urandom (16) # scrypt sized | |
158 | ||
159 | decryptor = crypto.Decrypt (password=password) | |
160 | decryptor = crypto.Decrypt (key=key) | |
161 | ||
1c2f7f07 | 162 | with self.assertRaises (crypto.InvalidParameter): # both password and key |
1f3fd7b0 | 163 | decryptor = crypto.Decrypt (password=password, key=key) |
1f3fd7b0 | 164 | |
1c2f7f07 | 165 | with self.assertRaises (crypto.InvalidParameter): # neither password nor key |
1f3fd7b0 | 166 | decryptor = crypto.Decrypt (password=None, key=None) |
1f3fd7b0 | 167 | |
1c2f7f07 | 168 | with self.assertRaises (crypto.InvalidParameter): # # empty password |
1f3fd7b0 | 169 | decryptor = crypto.Decrypt (password="") |
1f3fd7b0 | 170 | |
30019abf | 171 | |
e2a4e4f0 | 172 | def test_crypto_aes_gcm_dec_simple (self): |
90ee2a74 | 173 | password = str (os.urandom (42)) |
1f3fd7b0 | 174 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 175 | TEST_PARAMVERSION, |
1f3fd7b0 | 176 | password=password, |
90ee2a74 PG |
177 | nacl=TEST_STATIC_NACL) |
178 | ||
179 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 180 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
181 | rest, header, fixed = encryptor.done (header_dummy) |
182 | ciphertext += rest | |
183 | ||
1f3fd7b0 | 184 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) |
90ee2a74 PG |
185 | decryptor.next (header) |
186 | plaintext = decryptor.process (ciphertext) | |
3ba1441c | 187 | rest = decryptor.done () |
90ee2a74 PG |
188 | plaintext += rest |
189 | ||
90ee2a74 | 190 | assert plaintext == TEST_PLAINTEXT |
7c32c176 PG |
191 | |
192 | ||
e2a4e4f0 | 193 | def test_crypto_aes_gcm_dec_bad_tag (self): |
90ee2a74 | 194 | password = str (os.urandom (42)) |
1f3fd7b0 | 195 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 196 | TEST_PARAMVERSION, |
1f3fd7b0 | 197 | password=password, |
90ee2a74 PG |
198 | nacl=TEST_STATIC_NACL) |
199 | ||
200 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 201 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
202 | ciphertext2, header, fixed = encryptor.done (header_dummy) |
203 | ||
204 | mut_header = bytearray (header) | |
205 | mut_header_vw = memoryview (mut_header) | |
206 | # replace one byte in the tag part of the header | |
207 | second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2] | |
208 | mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 | |
209 | header = bytes (mut_header) | |
210 | ||
1f3fd7b0 | 211 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) |
90ee2a74 PG |
212 | decryptor.next (header) |
213 | plaintext = decryptor.process (ciphertext) | |
1c2f7f07 | 214 | with self.assertRaises (crypto.InvalidGCMTag): |
3ba1441c | 215 | _ = decryptor.done () |
e2a4e4f0 PG |
216 | |
217 | ||
c2d1c3ec PG |
218 | def test_crypto_aes_gcm_enc_multicnk (self): |
219 | cnksiz = 1 << 10 | |
90ee2a74 PG |
220 | pt = fill_mod (1 << 14) |
221 | password = str (os.urandom (42)) | |
1f3fd7b0 | 222 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 223 | TEST_PARAMVERSION, |
1f3fd7b0 | 224 | password=password, |
90ee2a74 PG |
225 | nacl=TEST_STATIC_NACL) |
226 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
c2d1c3ec PG |
227 | |
228 | off = 0 | |
229 | ct = b"" | |
90ee2a74 PG |
230 | while off < len (pt): |
231 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 232 | _, cnk = encryptor.process (pt [off:upto]) |
c2d1c3ec PG |
233 | ct += cnk |
234 | off += cnksiz | |
90ee2a74 PG |
235 | cnk, header, fixed = encryptor.done (header_dummy) |
236 | ct += cnk | |
237 | ||
238 | assert len (pt) == len (ct) | |
c2d1c3ec PG |
239 | |
240 | ||
30019abf PG |
241 | def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self): |
242 | cnksiz = 1 << 10 | |
243 | pt = fill_mod (1 << 14) | |
244 | password = str (os.urandom (42)) | |
245 | encryptor = crypto.Encrypt (TEST_VERSION, | |
246 | TEST_PARAMVERSION, | |
247 | password=password, | |
248 | nacl=TEST_STATIC_NACL, | |
249 | strict_ivs=True) | |
250 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
251 | ||
252 | off = 0 | |
253 | ct = b"" | |
254 | while off < len (pt): | |
255 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 256 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
257 | ct += cnk |
258 | off += cnksiz | |
259 | cnk, header, fixed = encryptor.done (header_dummy) | |
260 | ct += cnk | |
261 | ||
262 | assert len (pt) == len (ct) | |
263 | ||
264 | ||
265 | def test_crypto_aes_gcm_enc_multiobj (self): | |
266 | cnksiz = 1 << 10 | |
267 | password = str (os.urandom (42)) | |
268 | encryptor = crypto.Encrypt (TEST_VERSION, | |
269 | TEST_PARAMVERSION, | |
270 | password=password, | |
271 | nacl=TEST_STATIC_NACL, | |
272 | strict_ivs=False) | |
273 | ||
274 | def addobj (i): | |
275 | pt = fill_mod (1 << 14, off=i) | |
276 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
277 | ||
278 | off = 0 | |
279 | ct = b"" | |
280 | while off < len (pt): | |
281 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 282 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
283 | ct += cnk |
284 | off += cnksiz | |
285 | cnk, header, fixed = encryptor.done (header_dummy) | |
286 | ct += cnk | |
287 | ||
288 | assert len (pt) == len (ct) | |
289 | ||
290 | for i in range (5): addobj (i) | |
291 | ||
be124bca PG |
292 | assert len (encryptor.fixed) == 1 |
293 | ||
30019abf PG |
294 | |
295 | def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self): | |
296 | cnksiz = 1 << 10 | |
297 | password = str (os.urandom (42)) | |
298 | encryptor = crypto.Encrypt (TEST_VERSION, | |
299 | TEST_PARAMVERSION, | |
300 | password=password, | |
301 | nacl=TEST_STATIC_NACL, | |
302 | strict_ivs=True) | |
be124bca | 303 | curfixed = None # must remain constant after first |
30019abf PG |
304 | |
305 | def addobj (i): | |
306 | pt = fill_mod (1 << 14, off=i) | |
307 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
308 | ||
309 | off = 0 | |
310 | ct = b"" | |
311 | while off < len (pt): | |
312 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 313 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
314 | ct += cnk |
315 | off += cnksiz | |
316 | cnk, header, fixed = encryptor.done (header_dummy) | |
be124bca PG |
317 | nonlocal curfixed |
318 | if curfixed is None: | |
319 | curfixed = fixed | |
320 | else: | |
321 | assert fixed == curfixed | |
30019abf PG |
322 | ct += cnk |
323 | ||
324 | assert len (pt) == len (ct) | |
325 | ||
326 | for i in range (5): addobj (i) | |
327 | ||
be124bca PG |
328 | assert len (encryptor.fixed) == 1 |
329 | ||
30019abf | 330 | |
770173c5 PG |
331 | def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self): |
332 | """ | |
333 | Test behavior when the file counter tops out. | |
334 | ||
335 | Artificially lower the maximum possible file counter. Considering | |
cb7a3911 | 336 | invalid (0) and reserved (1, 2) values, the smallest possible file counter |
770173c5 PG |
337 | for normal objects is 3. Starting from that, the header of the (max - |
338 | 3)rd object must have both a different IV fixed part and a counter. | |
339 | """ | |
340 | minimum = 3 | |
341 | new_max = 8 | |
cb7a3911 PG |
342 | crypto._testing_set_AES_GCM_IV_CNT_MAX \ |
343 | ("I am fully aware that this will void my warranty.", new_max) | |
770173c5 PG |
344 | cnksiz = 1 << 10 |
345 | password = str (os.urandom (42)) | |
346 | encryptor = crypto.Encrypt (TEST_VERSION, | |
347 | TEST_PARAMVERSION, | |
348 | password=password, | |
349 | nacl=TEST_STATIC_NACL, | |
350 | strict_ivs=True) | |
351 | ||
352 | last_iv = None | |
353 | last_cnt = minimum | |
354 | ||
355 | def addobj (i, wrap=False): | |
356 | nonlocal last_iv | |
357 | nonlocal last_cnt | |
358 | pt = fill_mod (1 << 14, off=i) | |
359 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
360 | ||
361 | off = 0 | |
362 | ct = b"" | |
363 | while off < len (pt): | |
364 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 365 | _, cnk = encryptor.process (pt [off:upto]) |
770173c5 PG |
366 | ct += cnk |
367 | off += cnksiz | |
368 | cnk, header, fixed = encryptor.done (header_dummy) | |
369 | this_iv = crypto.hdr_read (header) ["iv"] | |
370 | if last_iv is not None: | |
371 | this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv) | |
372 | last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv) | |
373 | if wrap is False: | |
374 | assert last_fixed == this_fixed | |
375 | assert last_cnt == this_cnt - 1 | |
376 | else: | |
377 | assert last_fixed != this_fixed | |
378 | assert this_cnt == minimum | |
379 | last_iv = this_iv | |
380 | ct += cnk | |
381 | ||
382 | assert len (pt) == len (ct) | |
383 | ||
384 | for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8] | |
385 | addobj (i + 1, True) # counter wraps to 3 | |
386 | ||
387 | for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8] | |
388 | addobj (j + 1, True) # counter wraps to 3 again | |
389 | ||
be124bca PG |
390 | assert len (encryptor.fixed) == 3 |
391 | ||
392 | ||
393 | def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self): | |
394 | """ | |
395 | Test behavior when the file counter tops out and the transition to | |
396 | the next IV fixed part fails on account of a bad random generator. | |
397 | ||
398 | Replaces the ``urandom`` reference in ``os`` with a deterministic | |
399 | function. The encryptor context must communicate this condition with an | |
400 | ``IVFixedPartError``. | |
401 | """ | |
402 | minimum = 3 | |
403 | new_max = 8 | |
404 | crypto._testing_set_AES_GCM_IV_CNT_MAX \ | |
405 | ("I am fully aware that this will void my warranty.", new_max) | |
406 | cnksiz = 1 << 10 | |
407 | os.urandom = lambda n: bytes (bytearray ([n % 256] * n)) | |
408 | password = str (os.urandom (42)) | |
409 | encryptor = crypto.Encrypt (TEST_VERSION, | |
410 | TEST_PARAMVERSION, | |
411 | password=password, | |
412 | nacl=TEST_STATIC_NACL, | |
413 | strict_ivs=True) | |
414 | ||
415 | def addobj (i): | |
416 | pt = fill_mod (1 << 14, off=i) | |
417 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
418 | ||
419 | off = 0 | |
420 | while off < len (pt): | |
421 | upto = min (off + cnksiz, len (pt)) | |
422 | _, cnk = encryptor.process (pt [off:upto]) | |
423 | off += cnksiz | |
424 | ||
425 | for i in range (minimum, new_max): addobj (42 + i) | |
426 | ||
427 | with self.assertRaises (crypto.IVFixedPartError): | |
428 | addobj (42 + i) | |
429 | ||
430 | ||
cb7a3911 PG |
431 | |
432 | def test_crypto_aes_gcm_enc_length_cap (self): | |
433 | """ | |
434 | Artificially lower the maximum allowable data length and attempt to | |
e2f52c53 PG |
435 | encrypt a larger object. Verify that the crypto handler only encrypts |
436 | data up to the size limit. A downstream user detects that condition by | |
437 | testing whether the encryption step yielded less bytes than the | |
438 | plaintext. | |
c7066870 TJ |
439 | |
440 | The sibling to this test is test_restore_backup_max_file_length() | |
441 | in test_delatar.py. Deltatar will transparently create a splitted object | |
442 | with an increased IV file counter. | |
cb7a3911 PG |
443 | """ |
444 | new_max = 2187 | |
445 | crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ | |
446 | ("I am fully aware that this will void my warranty.", new_max) | |
447 | cnksiz = 1 << 10 | |
448 | password = str (os.urandom (42)) | |
449 | encryptor = crypto.Encrypt (TEST_VERSION, | |
450 | TEST_PARAMVERSION, | |
451 | password=password, | |
452 | nacl=TEST_STATIC_NACL) | |
453 | ||
454 | def encobj (s): | |
455 | pt, ct = fill_mod (s), None | |
456 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s)) | |
457 | ||
458 | n, ct = encryptor.process (pt) | |
459 | rest, _, _ = encryptor.done (header_dummy) | |
366d4b42 PG |
460 | |
461 | # NB: If this check *ever* fails, then something changed in the | |
462 | # encoding layer. AES-GCM is a stream cipher so each encoding | |
463 | # step will yield the exact number of ciphertext bytes that | |
464 | # was provided as plaintext. Thus there cannot be any encoded | |
465 | # data left when calling the finalizers. None of the crypo code | |
466 | # depends on that assumption but nevertheless we check it here | |
467 | # in case anything changes upstream in the Cryptography | |
468 | # library. In case there actually is a rest, replace the | |
469 | # assertion below with ``ct += rest``. | |
470 | assert (len (rest) == 0) | |
cb7a3911 PG |
471 | |
472 | if len (pt) > new_max: | |
e2f52c53 PG |
473 | # If the plaintext was longer than the artificially lowered |
474 | # maximum, then the number of ciphertext bytes must be clamped | |
475 | # to the maximum. | |
476 | assert n == new_max | |
cb7a3911 PG |
477 | else: |
478 | assert n == len (pt) == len (ct) | |
479 | ||
480 | for i in range (16): encobj (1 << i) | |
770173c5 PG |
481 | |
482 | ||
c2d1c3ec | 483 | def test_crypto_aes_gcm_dec_multicnk (self): |
90ee2a74 PG |
484 | cnksiz = 1 << 10 |
485 | orig_pt = fill_mod (1 << 14) | |
486 | password = str (os.urandom (42)) | |
1f3fd7b0 | 487 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 488 | TEST_PARAMVERSION, |
1f3fd7b0 | 489 | password=password, |
90ee2a74 PG |
490 | nacl=TEST_STATIC_NACL) |
491 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
c2d1c3ec PG |
492 | |
493 | off = 0 | |
494 | ct = b"" | |
495 | while off < len (orig_pt): | |
496 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 497 | _n, cnk = encryptor.process (orig_pt [off:upto]) |
c2d1c3ec PG |
498 | ct += cnk |
499 | off += cnksiz | |
90ee2a74 PG |
500 | cnk, header, fixed = encryptor.done (header_dummy) |
501 | ct += cnk | |
c2d1c3ec | 502 | |
1f3fd7b0 PG |
503 | decryptor = crypto.Decrypt (password=password, |
504 | fixedparts=fixed) | |
90ee2a74 | 505 | decryptor.next (header) |
c2d1c3ec | 506 | off = 0 |
90ee2a74 | 507 | pt = b"" |
c2d1c3ec PG |
508 | while off < len (orig_pt): |
509 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 510 | cnk = decryptor.process (ct [off:upto]) |
c2d1c3ec PG |
511 | pt += cnk |
512 | off += cnksiz | |
c2d1c3ec | 513 | |
90ee2a74 | 514 | |
3ba1441c | 515 | pt += decryptor.done () |
c2d1c3ec PG |
516 | assert pt == orig_pt |
517 | ||
518 | ||
7c32c176 | 519 | def test_crypto_aes_gcm_dec_multicnk_bad_tag (self): |
90ee2a74 PG |
520 | cnksiz = 1 << 10 |
521 | orig_pt = fill_mod (1 << 14) | |
522 | password = str (os.urandom (42)) | |
1f3fd7b0 | 523 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 524 | TEST_PARAMVERSION, |
1f3fd7b0 | 525 | password=password, |
90ee2a74 PG |
526 | nacl=TEST_STATIC_NACL) |
527 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
7c32c176 PG |
528 | |
529 | off = 0 | |
530 | ct = b"" | |
531 | while off < len (orig_pt): | |
532 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 533 | _n, cnk = encryptor.process (orig_pt [off:upto]) |
7c32c176 PG |
534 | ct += cnk |
535 | off += cnksiz | |
90ee2a74 PG |
536 | cnk, header, fixed = encryptor.done (header_dummy) |
537 | ct += cnk | |
538 | ||
539 | mut_header = bytearray (header) | |
540 | mut_header_vw = memoryview (mut_header) | |
541 | # replace one byte in the tag part of the header | |
542 | second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2] | |
543 | mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 | |
544 | header = bytes (mut_header) | |
545 | ||
1f3fd7b0 PG |
546 | decryptor = crypto.Decrypt (password=password, |
547 | fixedparts=fixed) | |
90ee2a74 | 548 | decryptor.next (header) |
7c32c176 | 549 | off = 0 |
90ee2a74 | 550 | pt = b"" |
7c32c176 PG |
551 | while off < len (orig_pt): |
552 | upto = min (off + cnksiz, len (orig_pt)) | |
90ee2a74 | 553 | cnk = decryptor.process (ct [off:upto]) |
7c32c176 PG |
554 | pt += cnk |
555 | off += cnksiz | |
556 | ||
1c2f7f07 | 557 | with self.assertRaises (crypto.InvalidGCMTag): |
3ba1441c | 558 | _ = decryptor.done () |
da4ea4fa PG |
559 | |
560 | ||
fd10b44a PG |
561 | def test_crypto_aes_gcm_dec_iv_reuse (self): |
562 | """ | |
563 | Meddle with encrypted content: extract the IV from one object | |
564 | and inject it into the header of another. This must be rejected | |
565 | by the decryptor. | |
566 | """ | |
567 | cnksiz = 1 << 10 | |
568 | orig_pt_1 = fill_mod (1 << 10) | |
569 | orig_pt_2 = fill_mod (1 << 10, 42) | |
570 | password = str (os.urandom (42)) | |
1f3fd7b0 | 571 | encryptor = crypto.Encrypt (TEST_VERSION, |
fd10b44a | 572 | TEST_PARAMVERSION, |
1f3fd7b0 | 573 | password=password, |
fd10b44a PG |
574 | nacl=TEST_STATIC_NACL) |
575 | ||
576 | def enc (pt): | |
577 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
578 | ||
579 | off = 0 | |
580 | ct = b"" | |
581 | while off < len (pt): | |
582 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 583 | _n, cnk = encryptor.process (pt [off:upto]) |
fd10b44a PG |
584 | ct += cnk |
585 | off += cnksiz | |
586 | cnk, header, fixed = encryptor.done (header_dummy) | |
587 | return ct + cnk, header, fixed | |
588 | ||
589 | ct_1, hdr_1, _____ = enc (orig_pt_1) | |
590 | ct_2, hdr_2, fixed = enc (orig_pt_2) | |
591 | ||
592 | mut_hdr_2 = bytearray (hdr_2) | |
593 | mut_hdr_2_vw = memoryview (mut_hdr_2) | |
594 | # get IV | |
595 | iv_lo = crypto.HDR_OFF_IV | |
596 | iv_hi = crypto.HDR_OFF_IV + crypto.PDTCRYPT_HDR_SIZE_IV | |
597 | iv_1 = hdr_1 [iv_lo : iv_hi] | |
598 | # transplant into other header | |
599 | mut_hdr_2_vw [iv_lo : iv_hi] = iv_1 | |
600 | hdr_2_mod = bytes (mut_hdr_2) | |
1f3fd7b0 | 601 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed, |
fd10b44a PG |
602 | strict_ivs=True) |
603 | ||
604 | def dec (hdr, ct): | |
605 | decryptor.next (hdr) | |
606 | off = 0 | |
607 | pt = b"" | |
608 | while off < len (ct): | |
609 | upto = min (off + cnksiz, len (ct)) | |
610 | cnk = decryptor.process (ct [off:upto]) | |
611 | pt += cnk | |
612 | off += cnksiz | |
613 | return pt + decryptor.done () | |
614 | ||
615 | decr_pt_1 = dec (hdr_1, ct_1) | |
616 | decr_pt_2 = dec (hdr_2, ct_2) # good header, different IV | |
1c2f7f07 | 617 | with self.assertRaises (crypto.DuplicateIV): # bad header, reuse detected |
fd10b44a | 618 | decr_pt_2 = dec (hdr_2_mod, ct_2) |
fd10b44a PG |
619 | |
620 | ||
cb7397d5 PG |
621 | class HeaderTest (CryptoLayerTest): |
622 | ||
e2a4e4f0 PG |
623 | def test_crypto_fmt_hdr_make (self): |
624 | meta = faux_hdr() | |
625 | ok, hdr = crypto.hdr_make (meta) | |
626 | assert ok | |
90ee2a74 | 627 | assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE |
e2a4e4f0 PG |
628 | |
629 | ||
cb7397d5 PG |
630 | def test_crypto_fmt_hdr_make_useless (self): |
631 | ok, ret = crypto.hdr_make ({ 42: "x" }) | |
632 | assert ok is False | |
a83fa4ed | 633 | assert ret.startswith ("error assembling header:") |
cb7397d5 PG |
634 | |
635 | ||
e2a4e4f0 PG |
636 | def test_crypto_fmt_hdr_read (self): |
637 | meta = faux_hdr() | |
638 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
639 | assert ok is True |
640 | assert hdr is not None | |
641 | mmeta = crypto.hdr_read (hdr) | |
642 | assert mmeta is not None | |
e2a4e4f0 PG |
643 | for k in meta: |
644 | if meta [k] != mmeta [k]: | |
645 | raise "header mismatch after reading: expected %r, got %r" \ | |
646 | % (meta [k], mmeta [k]) | |
647 | ||
648 | ||
cb7397d5 PG |
649 | def test_crypto_fmt_hdr_read_trailing_garbage (self): |
650 | meta = faux_hdr() | |
651 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
652 | ok, hdr = crypto.hdr_make (meta) |
653 | assert ok is True | |
654 | assert hdr is not None | |
cb7397d5 | 655 | hdr += b"-junk" |
1c2f7f07 | 656 | with self.assertRaises (crypto.InvalidHeader): |
90ee2a74 | 657 | _ = crypto.hdr_read (hdr) |
cb7397d5 PG |
658 | |
659 | ||
660 | def test_crypto_fmt_hdr_read_leading_garbage (self): | |
661 | meta = faux_hdr() | |
662 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
663 | ok, hdr = crypto.hdr_make (meta) |
664 | assert ok is True | |
665 | assert hdr is not None | |
cb7397d5 | 666 | hdr = b"junk-" + hdr |
1c2f7f07 | 667 | with self.assertRaises (crypto.InvalidHeader): |
90ee2a74 | 668 | _ = crypto.hdr_read (hdr) |
cb7397d5 PG |
669 | |
670 | ||
671 | def test_crypto_fmt_hdr_inner_garbage (self): | |
672 | meta = faux_hdr() | |
673 | ok, hdr = crypto.hdr_make (meta) | |
c176405d | 674 | assert ok |
90ee2a74 | 675 | data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:] |
1c2f7f07 | 676 | with self.assertRaises (crypto.InvalidHeader): |
90ee2a74 | 677 | _ = crypto.hdr_read (data) |
c176405d | 678 |