Commit | Line | Data |
---|---|---|
5133232d PG |
1 | import binascii |
2 | import os | |
3 | import pylibscrypt | |
c2d1c3ec | 4 | import struct |
5133232d PG |
5 | import unittest |
6 | ||
7 | import deltatar.crypto as crypto | |
8 | ||
e2a4e4f0 PG |
9 | import cryptography |
10 | ||
5133232d PG |
11 | def b(s): |
12 | return s.encode("UTF-8") | |
13 | ||
90ee2a74 | 14 | CRYPTO_NACL_SIZE = 16 |
5133232d | 15 | CRYPTO_KEY_SIZE = 16 |
90ee2a74 PG |
16 | |
17 | TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail") | |
18 | TEST_PASSPHRASE = b"test1234" | |
19 | TEST_AES_GCM_AAD = b"authenticated plain text" | |
20 | TEST_DUMMY_FILENAME = "insurance-file.txt" | |
21 | TEST_VERSION = 1 | |
22 | TEST_PARAMVERSION = 1 | |
23 | TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE) | |
6110ef14 | 24 | PLAIN_PARAMVERSION = 0 |
5133232d PG |
25 | |
26 | def faux_hdr (ctsize=1337, iv=None): | |
27 | return \ | |
28 | { "version" : 42 | |
29 | , "paramversion" : 2187 | |
30 | , "nacl" : binascii.unhexlify(b"0011223344556677" | |
31 | b"8899aabbccddeeff") | |
32 | , "iv" : iv or binascii.unhexlify(b"0011223344556677" | |
33 | b"8899aabb") | |
34 | , "ctsize" : ctsize | |
90ee2a74 PG |
35 | , "tag" : binascii.unhexlify(b"deadbeefbadb100d" |
36 | b"b1eedc0ffeedea15") | |
5133232d PG |
37 | } |
38 | ||
30019abf | 39 | FILL_MOD_MEMO = { } |
5133232d | 40 | |
fd10b44a | 41 | def fill_mod (n, off=0): |
30019abf PG |
42 | global FILL_MOD_MEMO |
43 | k = (n, off) | |
44 | m = FILL_MOD_MEMO.get (k, None) | |
45 | if m is not None: | |
46 | return m | |
c2d1c3ec PG |
47 | buf = bytearray (n) |
48 | bufv = memoryview (buf) | |
49 | for i in range (n): | |
fd10b44a PG |
50 | off += 1 |
51 | c = off % 64 + 32 | |
c2d1c3ec | 52 | struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8")) |
30019abf PG |
53 | m = bytes (buf) |
54 | FILL_MOD_MEMO [k] = m | |
55 | return m | |
c2d1c3ec PG |
56 | |
57 | ||
5133232d PG |
58 | def faux_payload (): |
59 | return "abcd" * 42 | |
60 | ||
cb7397d5 | 61 | |
5133232d | 62 | class CryptoLayerTest (unittest.TestCase): |
cb7397d5 PG |
63 | pass |
64 | ||
65 | ||
66 | class AESGCMTest (CryptoLayerTest): | |
5133232d | 67 | |
be124bca PG |
68 | os_urandom = os.urandom |
69 | ||
cb7a3911 PG |
70 | def tearDown (self): |
71 | """Reset globals altered for testing.""" | |
72 | _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \ | |
73 | ("I am fully aware that this will void my warranty.") | |
74 | _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ | |
75 | ("I am fully aware that this will void my warranty.") | |
be124bca | 76 | os.urandom = self.os_urandom |
cb7a3911 | 77 | |
90ee2a74 PG |
78 | def test_crypto_aes_gcm_enc_ctor (self): |
79 | password = str (os.urandom (42)) | |
1f3fd7b0 PG |
80 | encryptor = crypto.Encrypt (TEST_VERSION, |
81 | TEST_PARAMVERSION, | |
82 | password=password, | |
83 | nacl=TEST_STATIC_NACL) | |
84 | ||
6110ef14 PG |
85 | def test_crypto_aes_gcm_enc_ctor_bad_plainparams (self): |
86 | """Refuse plaintext passthrough mode by default.""" | |
87 | password = str (os.urandom (42)) | |
88 | with self.assertRaises (crypto.InvalidParameter): | |
89 | encryptor = crypto.Encrypt (TEST_VERSION, | |
90 | PLAIN_PARAMVERSION, | |
91 | password=password, | |
92 | nacl=TEST_STATIC_NACL) | |
93 | ||
94 | ||
95 | def test_crypto_aes_gcm_enc_ctor_ok_insecure_plainparams (self): | |
96 | """ | |
97 | Comply with request for plaintext passthrough mode if the | |
98 | *insecure* flag is passed. | |
99 | """ | |
100 | password = str (os.urandom (42)) | |
101 | encryptor = crypto.Encrypt (TEST_VERSION, | |
102 | PLAIN_PARAMVERSION, | |
103 | password=password, | |
104 | nacl=TEST_STATIC_NACL, | |
105 | insecure=True) | |
106 | ||
1f3fd7b0 PG |
107 | |
108 | def test_crypto_aes_gcm_enc_ctor_key (self): | |
109 | key = os.urandom (42) | |
110 | encryptor = crypto.Encrypt (TEST_VERSION, | |
111 | TEST_PARAMVERSION, | |
112 | key=key, | |
113 | nacl=TEST_STATIC_NACL) | |
114 | ||
115 | ||
116 | def test_crypto_aes_gcm_enc_ctor_no_key_pw (self): | |
117 | """ | |
118 | Either key (+nacl) or password must be supplied, not both. | |
119 | """ | |
1c2f7f07 | 120 | with self.assertRaises (crypto.InvalidParameter): # neither key nor pw |
1f3fd7b0 PG |
121 | encryptor = crypto.Encrypt (TEST_VERSION, |
122 | TEST_PARAMVERSION, | |
123 | nacl=TEST_STATIC_NACL) | |
1f3fd7b0 PG |
124 | |
125 | password = str (os.urandom (42)) | |
126 | key = os.urandom (16) # scrypt sized | |
1c2f7f07 | 127 | with self.assertRaises (crypto.InvalidParameter): # both key and pw |
1f3fd7b0 PG |
128 | encryptor = crypto.Encrypt (TEST_VERSION, |
129 | TEST_PARAMVERSION, | |
130 | password=password, | |
131 | key=key, | |
132 | nacl=TEST_STATIC_NACL) | |
1f3fd7b0 | 133 | |
1c2f7f07 | 134 | with self.assertRaises (crypto.InvalidParameter): # key, but salt missing |
1f3fd7b0 PG |
135 | encryptor = crypto.Encrypt (TEST_VERSION, |
136 | TEST_PARAMVERSION, | |
137 | key=key, | |
138 | nacl=None) | |
1f3fd7b0 | 139 | |
1c2f7f07 | 140 | with self.assertRaises (crypto.InvalidParameter): # empty pw |
1f3fd7b0 PG |
141 | encryptor = crypto.Encrypt (TEST_VERSION, |
142 | TEST_PARAMVERSION, | |
143 | password=b"", | |
144 | nacl=TEST_STATIC_NACL) | |
90ee2a74 PG |
145 | |
146 | ||
147 | def test_crypto_aes_gcm_enc_header_size (self): | |
148 | password = str (os.urandom (42)) | |
1f3fd7b0 | 149 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 150 | TEST_PARAMVERSION, |
1f3fd7b0 | 151 | password=password, |
90ee2a74 PG |
152 | nacl=TEST_STATIC_NACL) |
153 | ||
154 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
155 | assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE | |
cb7a3911 | 156 | _, _ = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
157 | _, header, _ = encryptor.done (header_dummy) |
158 | assert len (header) == crypto.PDTCRYPT_HDR_SIZE | |
5133232d | 159 | |
e2a4e4f0 | 160 | |
5133232d | 161 | def test_crypto_aes_gcm_enc_chunk_size (self): |
90ee2a74 | 162 | password = str (os.urandom (42)) |
1f3fd7b0 | 163 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 164 | TEST_PARAMVERSION, |
1f3fd7b0 | 165 | password=password, |
90ee2a74 PG |
166 | nacl=TEST_STATIC_NACL) |
167 | ||
168 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 169 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
170 | assert len (ciphertext) == len (TEST_PLAINTEXT) |
171 | rest, header, fixed = encryptor.done (header_dummy) | |
172 | assert len (rest) == 0 | |
5133232d | 173 | |
e2a4e4f0 | 174 | |
1f3fd7b0 PG |
175 | def test_crypto_aes_gcm_dec_ctor (self): |
176 | """ | |
177 | Ensure that only either key or password is accepted. | |
178 | """ | |
179 | password = str (os.urandom (42)) | |
180 | key = os.urandom (16) # scrypt sized | |
181 | ||
182 | decryptor = crypto.Decrypt (password=password) | |
183 | decryptor = crypto.Decrypt (key=key) | |
184 | ||
1c2f7f07 | 185 | with self.assertRaises (crypto.InvalidParameter): # both password and key |
1f3fd7b0 | 186 | decryptor = crypto.Decrypt (password=password, key=key) |
1f3fd7b0 | 187 | |
1c2f7f07 | 188 | with self.assertRaises (crypto.InvalidParameter): # neither password nor key |
1f3fd7b0 | 189 | decryptor = crypto.Decrypt (password=None, key=None) |
1f3fd7b0 | 190 | |
1c2f7f07 | 191 | with self.assertRaises (crypto.InvalidParameter): # # empty password |
1f3fd7b0 | 192 | decryptor = crypto.Decrypt (password="") |
1f3fd7b0 | 193 | |
30019abf | 194 | |
e2a4e4f0 | 195 | def test_crypto_aes_gcm_dec_simple (self): |
90ee2a74 | 196 | password = str (os.urandom (42)) |
1f3fd7b0 | 197 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 198 | TEST_PARAMVERSION, |
1f3fd7b0 | 199 | password=password, |
90ee2a74 PG |
200 | nacl=TEST_STATIC_NACL) |
201 | ||
202 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 203 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
204 | rest, header, fixed = encryptor.done (header_dummy) |
205 | ciphertext += rest | |
206 | ||
1f3fd7b0 | 207 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) |
90ee2a74 PG |
208 | decryptor.next (header) |
209 | plaintext = decryptor.process (ciphertext) | |
3ba1441c | 210 | rest = decryptor.done () |
90ee2a74 PG |
211 | plaintext += rest |
212 | ||
90ee2a74 | 213 | assert plaintext == TEST_PLAINTEXT |
7c32c176 PG |
214 | |
215 | ||
6110ef14 PG |
216 | def test_crypto_aes_gcm_dec_plain_bad (self): |
217 | """ | |
218 | Downgrade to plaintext must not be allowed in parameters | |
219 | obtained from headers. | |
220 | """ | |
221 | password = str (os.urandom (42)) | |
222 | encryptor = crypto.Encrypt (TEST_VERSION, | |
223 | TEST_PARAMVERSION, | |
224 | password=password, | |
225 | nacl=TEST_STATIC_NACL) | |
226 | ||
227 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
228 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) | |
229 | rest, header, fixed = encryptor.done (header_dummy) | |
230 | ciphertext += rest | |
231 | ||
232 | header = crypto.hdr_read (header) | |
233 | header ["paramversion"] = PLAIN_PARAMVERSION | |
234 | ok, header = crypto.hdr_make (header) | |
235 | assert ok | |
236 | ||
237 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) | |
238 | with self.assertRaises (crypto.InvalidParameter): | |
239 | decryptor.next (header) | |
240 | ||
241 | ||
242 | def test_crypto_aes_gcm_dec_plain_ok_insecure (self): | |
243 | """ | |
244 | Allow plaintext crypto mode if *insecure* flag is passed. | |
245 | """ | |
246 | password = str (os.urandom (42)) | |
247 | encryptor = crypto.Encrypt (TEST_VERSION, | |
248 | PLAIN_PARAMVERSION, | |
249 | password=password, | |
250 | nacl=TEST_STATIC_NACL, | |
251 | insecure=True) | |
252 | ||
253 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
254 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) | |
255 | rest, header, fixed = encryptor.done (header_dummy) | |
256 | ciphertext += rest | |
257 | ||
258 | header = crypto.hdr_read (header) | |
259 | header ["paramversion"] = PLAIN_PARAMVERSION | |
260 | ok, header = crypto.hdr_make (header) | |
261 | assert ok | |
262 | ||
263 | decryptor = crypto.Decrypt (password=password, | |
264 | fixedparts=fixed, | |
265 | insecure=True) | |
266 | decryptor.next (header) | |
267 | plaintext = decryptor.process (ciphertext) | |
268 | rest = decryptor.done () | |
269 | plaintext += rest | |
270 | ||
271 | assert plaintext == TEST_PLAINTEXT | |
272 | ||
273 | ||
e2a4e4f0 | 274 | def test_crypto_aes_gcm_dec_bad_tag (self): |
90ee2a74 | 275 | password = str (os.urandom (42)) |
1f3fd7b0 | 276 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 277 | TEST_PARAMVERSION, |
1f3fd7b0 | 278 | password=password, |
90ee2a74 PG |
279 | nacl=TEST_STATIC_NACL) |
280 | ||
281 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 282 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
283 | ciphertext2, header, fixed = encryptor.done (header_dummy) |
284 | ||
285 | mut_header = bytearray (header) | |
286 | mut_header_vw = memoryview (mut_header) | |
287 | # replace one byte in the tag part of the header | |
288 | second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2] | |
289 | mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 | |
290 | header = bytes (mut_header) | |
291 | ||
1f3fd7b0 | 292 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) |
90ee2a74 PG |
293 | decryptor.next (header) |
294 | plaintext = decryptor.process (ciphertext) | |
1c2f7f07 | 295 | with self.assertRaises (crypto.InvalidGCMTag): |
3ba1441c | 296 | _ = decryptor.done () |
e2a4e4f0 PG |
297 | |
298 | ||
c2d1c3ec PG |
299 | def test_crypto_aes_gcm_enc_multicnk (self): |
300 | cnksiz = 1 << 10 | |
90ee2a74 PG |
301 | pt = fill_mod (1 << 14) |
302 | password = str (os.urandom (42)) | |
1f3fd7b0 | 303 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 304 | TEST_PARAMVERSION, |
1f3fd7b0 | 305 | password=password, |
90ee2a74 PG |
306 | nacl=TEST_STATIC_NACL) |
307 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
c2d1c3ec PG |
308 | |
309 | off = 0 | |
310 | ct = b"" | |
90ee2a74 PG |
311 | while off < len (pt): |
312 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 313 | _, cnk = encryptor.process (pt [off:upto]) |
c2d1c3ec PG |
314 | ct += cnk |
315 | off += cnksiz | |
90ee2a74 PG |
316 | cnk, header, fixed = encryptor.done (header_dummy) |
317 | ct += cnk | |
318 | ||
319 | assert len (pt) == len (ct) | |
c2d1c3ec PG |
320 | |
321 | ||
30019abf PG |
322 | def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self): |
323 | cnksiz = 1 << 10 | |
324 | pt = fill_mod (1 << 14) | |
325 | password = str (os.urandom (42)) | |
326 | encryptor = crypto.Encrypt (TEST_VERSION, | |
327 | TEST_PARAMVERSION, | |
328 | password=password, | |
329 | nacl=TEST_STATIC_NACL, | |
330 | strict_ivs=True) | |
331 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
332 | ||
333 | off = 0 | |
334 | ct = b"" | |
335 | while off < len (pt): | |
336 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 337 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
338 | ct += cnk |
339 | off += cnksiz | |
340 | cnk, header, fixed = encryptor.done (header_dummy) | |
341 | ct += cnk | |
342 | ||
343 | assert len (pt) == len (ct) | |
344 | ||
345 | ||
346 | def test_crypto_aes_gcm_enc_multiobj (self): | |
347 | cnksiz = 1 << 10 | |
348 | password = str (os.urandom (42)) | |
349 | encryptor = crypto.Encrypt (TEST_VERSION, | |
350 | TEST_PARAMVERSION, | |
351 | password=password, | |
352 | nacl=TEST_STATIC_NACL, | |
353 | strict_ivs=False) | |
354 | ||
355 | def addobj (i): | |
356 | pt = fill_mod (1 << 14, off=i) | |
357 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
358 | ||
359 | off = 0 | |
360 | ct = b"" | |
361 | while off < len (pt): | |
362 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 363 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
364 | ct += cnk |
365 | off += cnksiz | |
366 | cnk, header, fixed = encryptor.done (header_dummy) | |
367 | ct += cnk | |
368 | ||
369 | assert len (pt) == len (ct) | |
370 | ||
371 | for i in range (5): addobj (i) | |
372 | ||
be124bca PG |
373 | assert len (encryptor.fixed) == 1 |
374 | ||
30019abf PG |
375 | |
376 | def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self): | |
377 | cnksiz = 1 << 10 | |
378 | password = str (os.urandom (42)) | |
379 | encryptor = crypto.Encrypt (TEST_VERSION, | |
380 | TEST_PARAMVERSION, | |
381 | password=password, | |
382 | nacl=TEST_STATIC_NACL, | |
383 | strict_ivs=True) | |
be124bca | 384 | curfixed = None # must remain constant after first |
30019abf PG |
385 | |
386 | def addobj (i): | |
387 | pt = fill_mod (1 << 14, off=i) | |
388 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
389 | ||
390 | off = 0 | |
391 | ct = b"" | |
392 | while off < len (pt): | |
393 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 394 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
395 | ct += cnk |
396 | off += cnksiz | |
397 | cnk, header, fixed = encryptor.done (header_dummy) | |
be124bca PG |
398 | nonlocal curfixed |
399 | if curfixed is None: | |
400 | curfixed = fixed | |
401 | else: | |
402 | assert fixed == curfixed | |
30019abf PG |
403 | ct += cnk |
404 | ||
405 | assert len (pt) == len (ct) | |
406 | ||
407 | for i in range (5): addobj (i) | |
408 | ||
be124bca PG |
409 | assert len (encryptor.fixed) == 1 |
410 | ||
30019abf | 411 | |
770173c5 PG |
412 | def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self): |
413 | """ | |
414 | Test behavior when the file counter tops out. | |
415 | ||
416 | Artificially lower the maximum possible file counter. Considering | |
cb7a3911 | 417 | invalid (0) and reserved (1, 2) values, the smallest possible file counter |
770173c5 PG |
418 | for normal objects is 3. Starting from that, the header of the (max - |
419 | 3)rd object must have both a different IV fixed part and a counter. | |
420 | """ | |
421 | minimum = 3 | |
422 | new_max = 8 | |
cb7a3911 PG |
423 | crypto._testing_set_AES_GCM_IV_CNT_MAX \ |
424 | ("I am fully aware that this will void my warranty.", new_max) | |
770173c5 PG |
425 | cnksiz = 1 << 10 |
426 | password = str (os.urandom (42)) | |
427 | encryptor = crypto.Encrypt (TEST_VERSION, | |
428 | TEST_PARAMVERSION, | |
429 | password=password, | |
430 | nacl=TEST_STATIC_NACL, | |
431 | strict_ivs=True) | |
432 | ||
433 | last_iv = None | |
434 | last_cnt = minimum | |
435 | ||
436 | def addobj (i, wrap=False): | |
437 | nonlocal last_iv | |
438 | nonlocal last_cnt | |
439 | pt = fill_mod (1 << 14, off=i) | |
440 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
441 | ||
442 | off = 0 | |
443 | ct = b"" | |
444 | while off < len (pt): | |
445 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 446 | _, cnk = encryptor.process (pt [off:upto]) |
770173c5 PG |
447 | ct += cnk |
448 | off += cnksiz | |
449 | cnk, header, fixed = encryptor.done (header_dummy) | |
450 | this_iv = crypto.hdr_read (header) ["iv"] | |
451 | if last_iv is not None: | |
452 | this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv) | |
453 | last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv) | |
454 | if wrap is False: | |
455 | assert last_fixed == this_fixed | |
456 | assert last_cnt == this_cnt - 1 | |
457 | else: | |
458 | assert last_fixed != this_fixed | |
459 | assert this_cnt == minimum | |
460 | last_iv = this_iv | |
461 | ct += cnk | |
462 | ||
463 | assert len (pt) == len (ct) | |
464 | ||
465 | for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8] | |
466 | addobj (i + 1, True) # counter wraps to 3 | |
467 | ||
468 | for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8] | |
469 | addobj (j + 1, True) # counter wraps to 3 again | |
470 | ||
be124bca PG |
471 | assert len (encryptor.fixed) == 3 |
472 | ||
473 | ||
474 | def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self): | |
475 | """ | |
476 | Test behavior when the file counter tops out and the transition to | |
477 | the next IV fixed part fails on account of a bad random generator. | |
478 | ||
479 | Replaces the ``urandom`` reference in ``os`` with a deterministic | |
480 | function. The encryptor context must communicate this condition with an | |
481 | ``IVFixedPartError``. | |
482 | """ | |
483 | minimum = 3 | |
484 | new_max = 8 | |
485 | crypto._testing_set_AES_GCM_IV_CNT_MAX \ | |
486 | ("I am fully aware that this will void my warranty.", new_max) | |
487 | cnksiz = 1 << 10 | |
488 | os.urandom = lambda n: bytes (bytearray ([n % 256] * n)) | |
489 | password = str (os.urandom (42)) | |
490 | encryptor = crypto.Encrypt (TEST_VERSION, | |
491 | TEST_PARAMVERSION, | |
492 | password=password, | |
493 | nacl=TEST_STATIC_NACL, | |
494 | strict_ivs=True) | |
495 | ||
496 | def addobj (i): | |
497 | pt = fill_mod (1 << 14, off=i) | |
498 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
499 | ||
500 | off = 0 | |
501 | while off < len (pt): | |
502 | upto = min (off + cnksiz, len (pt)) | |
503 | _, cnk = encryptor.process (pt [off:upto]) | |
504 | off += cnksiz | |
505 | ||
506 | for i in range (minimum, new_max): addobj (42 + i) | |
507 | ||
508 | with self.assertRaises (crypto.IVFixedPartError): | |
509 | addobj (42 + i) | |
510 | ||
511 | ||
cb7a3911 PG |
512 | |
513 | def test_crypto_aes_gcm_enc_length_cap (self): | |
514 | """ | |
515 | Artificially lower the maximum allowable data length and attempt to | |
e2f52c53 PG |
516 | encrypt a larger object. Verify that the crypto handler only encrypts |
517 | data up to the size limit. A downstream user detects that condition by | |
518 | testing whether the encryption step yielded less bytes than the | |
519 | plaintext. | |
c7066870 TJ |
520 | |
521 | The sibling to this test is test_restore_backup_max_file_length() | |
522 | in test_delatar.py. Deltatar will transparently create a splitted object | |
523 | with an increased IV file counter. | |
cb7a3911 PG |
524 | """ |
525 | new_max = 2187 | |
526 | crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ | |
527 | ("I am fully aware that this will void my warranty.", new_max) | |
528 | cnksiz = 1 << 10 | |
529 | password = str (os.urandom (42)) | |
530 | encryptor = crypto.Encrypt (TEST_VERSION, | |
531 | TEST_PARAMVERSION, | |
532 | password=password, | |
533 | nacl=TEST_STATIC_NACL) | |
534 | ||
535 | def encobj (s): | |
536 | pt, ct = fill_mod (s), None | |
537 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s)) | |
538 | ||
539 | n, ct = encryptor.process (pt) | |
540 | rest, _, _ = encryptor.done (header_dummy) | |
366d4b42 PG |
541 | |
542 | # NB: If this check *ever* fails, then something changed in the | |
543 | # encoding layer. AES-GCM is a stream cipher so each encoding | |
544 | # step will yield the exact number of ciphertext bytes that | |
545 | # was provided as plaintext. Thus there cannot be any encoded | |
546 | # data left when calling the finalizers. None of the crypo code | |
547 | # depends on that assumption but nevertheless we check it here | |
548 | # in case anything changes upstream in the Cryptography | |
549 | # library. In case there actually is a rest, replace the | |
550 | # assertion below with ``ct += rest``. | |
551 | assert (len (rest) == 0) | |
cb7a3911 PG |
552 | |
553 | if len (pt) > new_max: | |
e2f52c53 PG |
554 | # If the plaintext was longer than the artificially lowered |
555 | # maximum, then the number of ciphertext bytes must be clamped | |
556 | # to the maximum. | |
557 | assert n == new_max | |
cb7a3911 PG |
558 | else: |
559 | assert n == len (pt) == len (ct) | |
560 | ||
561 | for i in range (16): encobj (1 << i) | |
770173c5 PG |
562 | |
563 | ||
58ed14b8 PG |
564 | def test_crypto_aes_gcm_dec_length_cap (self): |
565 | """ | |
566 | The decryptor must reject headers with an object size that exceeds | |
567 | the PDTCRYPT maximum. Longer files split into multiple objects. | |
568 | """ | |
569 | password = str (os.urandom (42)) | |
570 | meta = faux_hdr() | |
571 | meta ["ctsize"] = crypto.PDTCRYPT_MAX_OBJ_SIZE + 1 | |
572 | ok, header = crypto.hdr_make (meta) | |
573 | ||
574 | assert ok | |
575 | ||
576 | # Set up decryption with bogus header. | |
577 | decryptor = crypto.Decrypt (password=password, fixedparts=[]) | |
578 | ||
579 | with self.assertRaises (crypto.InvalidHeader): | |
580 | decryptor.next (header) | |
581 | ||
582 | ||
583 | def test_crypto_aes_gcm_dec_length_mismatch (self): | |
584 | """ | |
585 | Catch attempts at decrypting more data than what was stated in the | |
586 | header. | |
587 | """ | |
588 | cnksiz = 1 << 10 | |
589 | orig_pt = fill_mod (1 << 14) | |
590 | password = str (os.urandom (42)) | |
591 | encryptor = crypto.Encrypt (TEST_VERSION, | |
592 | TEST_PARAMVERSION, | |
593 | password=password, | |
594 | nacl=TEST_STATIC_NACL) | |
595 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
596 | ||
597 | off = 0 | |
598 | ct = b"" | |
599 | while off < len (orig_pt): | |
600 | upto = min (off + cnksiz, len (orig_pt)) | |
601 | _n, cnk = encryptor.process (orig_pt [off:upto]) | |
602 | ct += cnk | |
603 | off += cnksiz | |
604 | cnk, header, fixed = encryptor.done (header_dummy) | |
605 | ct += cnk | |
606 | ||
607 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) | |
608 | ||
609 | decryptor.next (header) | |
610 | off = 0 | |
611 | pt = b"" | |
612 | while off < len (orig_pt): | |
613 | upto = min (off + cnksiz, len (orig_pt)) | |
614 | cnk = decryptor.process (ct [off:upto]) | |
615 | pt += cnk | |
616 | off += cnksiz | |
617 | ||
618 | with self.assertRaises (crypto.CiphertextTooLong): | |
619 | # Try and decrypt one byte more than was encrypted. | |
620 | # This must be caught in crypto.py. | |
621 | _ = decryptor.process (ct [0:1]) | |
622 | ||
623 | ||
c2d1c3ec | 624 | def test_crypto_aes_gcm_dec_multicnk (self): |
90ee2a74 PG |
625 | cnksiz = 1 << 10 |
626 | orig_pt = fill_mod (1 << 14) | |
627 | password = str (os.urandom (42)) | |
1f3fd7b0 | 628 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 629 | TEST_PARAMVERSION, |
1f3fd7b0 | 630 | password=password, |
90ee2a74 PG |
631 | nacl=TEST_STATIC_NACL) |
632 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
c2d1c3ec PG |
633 | |
634 | off = 0 | |
635 | ct = b"" | |
636 | while off < len (orig_pt): | |
637 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 638 | _n, cnk = encryptor.process (orig_pt [off:upto]) |
c2d1c3ec PG |
639 | ct += cnk |
640 | off += cnksiz | |
90ee2a74 PG |
641 | cnk, header, fixed = encryptor.done (header_dummy) |
642 | ct += cnk | |
c2d1c3ec | 643 | |
1f3fd7b0 PG |
644 | decryptor = crypto.Decrypt (password=password, |
645 | fixedparts=fixed) | |
90ee2a74 | 646 | decryptor.next (header) |
c2d1c3ec | 647 | off = 0 |
90ee2a74 | 648 | pt = b"" |
c2d1c3ec PG |
649 | while off < len (orig_pt): |
650 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 651 | cnk = decryptor.process (ct [off:upto]) |
c2d1c3ec PG |
652 | pt += cnk |
653 | off += cnksiz | |
c2d1c3ec | 654 | |
90ee2a74 | 655 | |
3ba1441c | 656 | pt += decryptor.done () |
c2d1c3ec PG |
657 | assert pt == orig_pt |
658 | ||
659 | ||
7c32c176 | 660 | def test_crypto_aes_gcm_dec_multicnk_bad_tag (self): |
90ee2a74 PG |
661 | cnksiz = 1 << 10 |
662 | orig_pt = fill_mod (1 << 14) | |
663 | password = str (os.urandom (42)) | |
1f3fd7b0 | 664 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 665 | TEST_PARAMVERSION, |
1f3fd7b0 | 666 | password=password, |
90ee2a74 PG |
667 | nacl=TEST_STATIC_NACL) |
668 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
7c32c176 PG |
669 | |
670 | off = 0 | |
671 | ct = b"" | |
672 | while off < len (orig_pt): | |
673 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 674 | _n, cnk = encryptor.process (orig_pt [off:upto]) |
7c32c176 PG |
675 | ct += cnk |
676 | off += cnksiz | |
90ee2a74 PG |
677 | cnk, header, fixed = encryptor.done (header_dummy) |
678 | ct += cnk | |
679 | ||
680 | mut_header = bytearray (header) | |
681 | mut_header_vw = memoryview (mut_header) | |
682 | # replace one byte in the tag part of the header | |
683 | second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2] | |
684 | mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 | |
685 | header = bytes (mut_header) | |
686 | ||
1f3fd7b0 PG |
687 | decryptor = crypto.Decrypt (password=password, |
688 | fixedparts=fixed) | |
90ee2a74 | 689 | decryptor.next (header) |
7c32c176 | 690 | off = 0 |
90ee2a74 | 691 | pt = b"" |
7c32c176 PG |
692 | while off < len (orig_pt): |
693 | upto = min (off + cnksiz, len (orig_pt)) | |
90ee2a74 | 694 | cnk = decryptor.process (ct [off:upto]) |
7c32c176 PG |
695 | pt += cnk |
696 | off += cnksiz | |
697 | ||
1c2f7f07 | 698 | with self.assertRaises (crypto.InvalidGCMTag): |
3ba1441c | 699 | _ = decryptor.done () |
da4ea4fa PG |
700 | |
701 | ||
2f0b128c PG |
702 | def test_crypto_aes_gcm_dec_iv_gap (self): |
703 | """ | |
704 | Encrypt multiple objects using non-consecutive IVs and verify that the | |
705 | decryptor errors out with an exception in strict mode but keeps quiet | |
706 | otherwise. | |
707 | """ | |
708 | cnksiz = 1 << 10 | |
709 | orig_pt_1 = fill_mod (1 << 10) | |
710 | orig_pt_2 = fill_mod (1 << 10, 23) | |
711 | orig_pt_3 = fill_mod (1 << 10, 42) | |
712 | password = str (os.urandom (42)) | |
713 | encryptor = crypto.Encrypt (TEST_VERSION, | |
714 | TEST_PARAMVERSION, | |
715 | password=password, | |
716 | nacl=TEST_STATIC_NACL) | |
717 | ||
718 | def enc (pt): | |
719 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
720 | ||
721 | off = 0 | |
722 | ct = b"" | |
723 | while off < len (pt): | |
724 | upto = min (off + cnksiz, len (pt)) | |
725 | _n, cnk = encryptor.process (pt [off:upto]) | |
726 | ct += cnk | |
727 | off += cnksiz | |
728 | cnk, header, fixed = encryptor.done (header_dummy) | |
729 | return ct + cnk, header, fixed | |
730 | ||
731 | ct_1, hdr_1, _____ = enc (orig_pt_1) | |
732 | ||
733 | ## Here we bump the iv of the encryptor, breaking the series. | |
734 | encryptor.set_object_counter (encryptor.cnt + 1) | |
735 | ct_2, hdr_2, fixed = enc (orig_pt_2) | |
736 | ||
737 | ## IV of final object is again in-sequence. | |
738 | ct_3, hdr_3, fixed = enc (orig_pt_3) | |
739 | ||
740 | def decrypt (strict_ivs): | |
741 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed, | |
742 | strict_ivs=strict_ivs) | |
743 | ||
744 | def dec (hdr, ct): | |
745 | decryptor.next (hdr) | |
746 | off = 0 | |
747 | pt = b"" | |
748 | while off < len (ct): | |
749 | upto = min (off + cnksiz, len (ct)) | |
750 | cnk = decryptor.process (ct [off:upto]) | |
751 | pt += cnk | |
752 | off += cnksiz | |
753 | return pt + decryptor.done () | |
754 | ||
755 | decr_pt_1 = dec (hdr_1, ct_1) | |
756 | decr_pt_2 = dec (hdr_2, ct_2) ## ← good header, non-consecutive IV | |
757 | decr_pt_3 = dec (hdr_3, ct_3) | |
758 | ||
759 | assert decr_pt_1 == orig_pt_1 | |
760 | assert decr_pt_2 == orig_pt_2 | |
761 | assert decr_pt_3 == orig_pt_3 | |
762 | ||
763 | with self.assertRaises (crypto.NonConsecutiveIV): | |
764 | decrypt (True) | |
765 | ||
766 | decrypt (False) # Sequence passes | |
767 | ||
768 | ||
fd10b44a PG |
769 | def test_crypto_aes_gcm_dec_iv_reuse (self): |
770 | """ | |
771 | Meddle with encrypted content: extract the IV from one object | |
772 | and inject it into the header of another. This must be rejected | |
773 | by the decryptor. | |
774 | """ | |
775 | cnksiz = 1 << 10 | |
776 | orig_pt_1 = fill_mod (1 << 10) | |
777 | orig_pt_2 = fill_mod (1 << 10, 42) | |
778 | password = str (os.urandom (42)) | |
1f3fd7b0 | 779 | encryptor = crypto.Encrypt (TEST_VERSION, |
fd10b44a | 780 | TEST_PARAMVERSION, |
1f3fd7b0 | 781 | password=password, |
fd10b44a PG |
782 | nacl=TEST_STATIC_NACL) |
783 | ||
784 | def enc (pt): | |
785 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
786 | ||
787 | off = 0 | |
788 | ct = b"" | |
789 | while off < len (pt): | |
790 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 791 | _n, cnk = encryptor.process (pt [off:upto]) |
fd10b44a PG |
792 | ct += cnk |
793 | off += cnksiz | |
794 | cnk, header, fixed = encryptor.done (header_dummy) | |
795 | return ct + cnk, header, fixed | |
796 | ||
797 | ct_1, hdr_1, _____ = enc (orig_pt_1) | |
798 | ct_2, hdr_2, fixed = enc (orig_pt_2) | |
799 | ||
800 | mut_hdr_2 = bytearray (hdr_2) | |
801 | mut_hdr_2_vw = memoryview (mut_hdr_2) | |
802 | # get IV | |
803 | iv_lo = crypto.HDR_OFF_IV | |
804 | iv_hi = crypto.HDR_OFF_IV + crypto.PDTCRYPT_HDR_SIZE_IV | |
805 | iv_1 = hdr_1 [iv_lo : iv_hi] | |
806 | # transplant into other header | |
807 | mut_hdr_2_vw [iv_lo : iv_hi] = iv_1 | |
808 | hdr_2_mod = bytes (mut_hdr_2) | |
1f3fd7b0 | 809 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed, |
fd10b44a PG |
810 | strict_ivs=True) |
811 | ||
812 | def dec (hdr, ct): | |
813 | decryptor.next (hdr) | |
814 | off = 0 | |
815 | pt = b"" | |
816 | while off < len (ct): | |
817 | upto = min (off + cnksiz, len (ct)) | |
818 | cnk = decryptor.process (ct [off:upto]) | |
819 | pt += cnk | |
820 | off += cnksiz | |
821 | return pt + decryptor.done () | |
822 | ||
823 | decr_pt_1 = dec (hdr_1, ct_1) | |
824 | decr_pt_2 = dec (hdr_2, ct_2) # good header, different IV | |
1c2f7f07 | 825 | with self.assertRaises (crypto.DuplicateIV): # bad header, reuse detected |
fd10b44a | 826 | decr_pt_2 = dec (hdr_2_mod, ct_2) |
fd10b44a PG |
827 | |
828 | ||
cb7397d5 PG |
829 | class HeaderTest (CryptoLayerTest): |
830 | ||
e2a4e4f0 PG |
831 | def test_crypto_fmt_hdr_make (self): |
832 | meta = faux_hdr() | |
833 | ok, hdr = crypto.hdr_make (meta) | |
834 | assert ok | |
90ee2a74 | 835 | assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE |
e2a4e4f0 PG |
836 | |
837 | ||
cb7397d5 PG |
838 | def test_crypto_fmt_hdr_make_useless (self): |
839 | ok, ret = crypto.hdr_make ({ 42: "x" }) | |
840 | assert ok is False | |
a83fa4ed | 841 | assert ret.startswith ("error assembling header:") |
cb7397d5 PG |
842 | |
843 | ||
e2a4e4f0 PG |
844 | def test_crypto_fmt_hdr_read (self): |
845 | meta = faux_hdr() | |
846 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
847 | assert ok is True |
848 | assert hdr is not None | |
849 | mmeta = crypto.hdr_read (hdr) | |
850 | assert mmeta is not None | |
e2a4e4f0 PG |
851 | for k in meta: |
852 | if meta [k] != mmeta [k]: | |
853 | raise "header mismatch after reading: expected %r, got %r" \ | |
854 | % (meta [k], mmeta [k]) | |
855 | ||
856 | ||
cb7397d5 PG |
857 | def test_crypto_fmt_hdr_read_trailing_garbage (self): |
858 | meta = faux_hdr() | |
859 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
860 | ok, hdr = crypto.hdr_make (meta) |
861 | assert ok is True | |
862 | assert hdr is not None | |
cb7397d5 | 863 | hdr += b"-junk" |
1c2f7f07 | 864 | with self.assertRaises (crypto.InvalidHeader): |
90ee2a74 | 865 | _ = crypto.hdr_read (hdr) |
cb7397d5 PG |
866 | |
867 | ||
868 | def test_crypto_fmt_hdr_read_leading_garbage (self): | |
869 | meta = faux_hdr() | |
870 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
871 | ok, hdr = crypto.hdr_make (meta) |
872 | assert ok is True | |
873 | assert hdr is not None | |
cb7397d5 | 874 | hdr = b"junk-" + hdr |
1c2f7f07 | 875 | with self.assertRaises (crypto.InvalidHeader): |
90ee2a74 | 876 | _ = crypto.hdr_read (hdr) |
cb7397d5 PG |
877 | |
878 | ||
879 | def test_crypto_fmt_hdr_inner_garbage (self): | |
880 | meta = faux_hdr() | |
881 | ok, hdr = crypto.hdr_make (meta) | |
c176405d | 882 | assert ok |
90ee2a74 | 883 | data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:] |
1c2f7f07 | 884 | with self.assertRaises (crypto.InvalidHeader): |
90ee2a74 | 885 | _ = crypto.hdr_read (data) |
c176405d | 886 |