Commit | Line | Data |
---|---|---|
5133232d PG |
1 | import binascii |
2 | import os | |
3 | import pylibscrypt | |
c2d1c3ec | 4 | import struct |
5133232d PG |
5 | import unittest |
6 | ||
7 | import deltatar.crypto as crypto | |
8 | ||
e2a4e4f0 PG |
9 | import cryptography |
10 | ||
5133232d PG |
11 | def b(s): |
12 | return s.encode("UTF-8") | |
13 | ||
90ee2a74 | 14 | CRYPTO_NACL_SIZE = 16 |
5133232d | 15 | CRYPTO_KEY_SIZE = 16 |
90ee2a74 PG |
16 | |
17 | TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail") | |
18 | TEST_PASSPHRASE = b"test1234" | |
19 | TEST_AES_GCM_AAD = b"authenticated plain text" | |
20 | TEST_DUMMY_FILENAME = "insurance-file.txt" | |
21 | TEST_VERSION = 1 | |
22 | TEST_PARAMVERSION = 1 | |
23 | TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE) | |
6110ef14 | 24 | PLAIN_PARAMVERSION = 0 |
5133232d PG |
25 | |
26 | def faux_hdr (ctsize=1337, iv=None): | |
27 | return \ | |
28 | { "version" : 42 | |
29 | , "paramversion" : 2187 | |
30 | , "nacl" : binascii.unhexlify(b"0011223344556677" | |
31 | b"8899aabbccddeeff") | |
32 | , "iv" : iv or binascii.unhexlify(b"0011223344556677" | |
33 | b"8899aabb") | |
34 | , "ctsize" : ctsize | |
90ee2a74 PG |
35 | , "tag" : binascii.unhexlify(b"deadbeefbadb100d" |
36 | b"b1eedc0ffeedea15") | |
5133232d PG |
37 | } |
38 | ||
30019abf | 39 | FILL_MOD_MEMO = { } |
5133232d | 40 | |
fd10b44a | 41 | def fill_mod (n, off=0): |
30019abf PG |
42 | global FILL_MOD_MEMO |
43 | k = (n, off) | |
44 | m = FILL_MOD_MEMO.get (k, None) | |
45 | if m is not None: | |
46 | return m | |
c2d1c3ec PG |
47 | buf = bytearray (n) |
48 | bufv = memoryview (buf) | |
49 | for i in range (n): | |
fd10b44a PG |
50 | off += 1 |
51 | c = off % 64 + 32 | |
c2d1c3ec | 52 | struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8")) |
30019abf PG |
53 | m = bytes (buf) |
54 | FILL_MOD_MEMO [k] = m | |
55 | return m | |
c2d1c3ec PG |
56 | |
57 | ||
5133232d PG |
58 | def faux_payload (): |
59 | return "abcd" * 42 | |
60 | ||
cb7397d5 | 61 | |
5133232d | 62 | class CryptoLayerTest (unittest.TestCase): |
cb7397d5 PG |
63 | pass |
64 | ||
65 | ||
66 | class AESGCMTest (CryptoLayerTest): | |
5133232d | 67 | |
be124bca PG |
68 | os_urandom = os.urandom |
69 | ||
cb7a3911 PG |
70 | def tearDown (self): |
71 | """Reset globals altered for testing.""" | |
72 | _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \ | |
73 | ("I am fully aware that this will void my warranty.") | |
74 | _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ | |
75 | ("I am fully aware that this will void my warranty.") | |
be124bca | 76 | os.urandom = self.os_urandom |
cb7a3911 | 77 | |
90ee2a74 PG |
78 | def test_crypto_aes_gcm_enc_ctor (self): |
79 | password = str (os.urandom (42)) | |
1f3fd7b0 PG |
80 | encryptor = crypto.Encrypt (TEST_VERSION, |
81 | TEST_PARAMVERSION, | |
82 | password=password, | |
83 | nacl=TEST_STATIC_NACL) | |
84 | ||
6110ef14 PG |
85 | def test_crypto_aes_gcm_enc_ctor_bad_plainparams (self): |
86 | """Refuse plaintext passthrough mode by default.""" | |
87 | password = str (os.urandom (42)) | |
88 | with self.assertRaises (crypto.InvalidParameter): | |
89 | encryptor = crypto.Encrypt (TEST_VERSION, | |
90 | PLAIN_PARAMVERSION, | |
91 | password=password, | |
92 | nacl=TEST_STATIC_NACL) | |
93 | ||
94 | ||
95 | def test_crypto_aes_gcm_enc_ctor_ok_insecure_plainparams (self): | |
96 | """ | |
97 | Comply with request for plaintext passthrough mode if the | |
98 | *insecure* flag is passed. | |
99 | """ | |
100 | password = str (os.urandom (42)) | |
101 | encryptor = crypto.Encrypt (TEST_VERSION, | |
102 | PLAIN_PARAMVERSION, | |
103 | password=password, | |
104 | nacl=TEST_STATIC_NACL, | |
105 | insecure=True) | |
106 | ||
1f3fd7b0 PG |
107 | |
108 | def test_crypto_aes_gcm_enc_ctor_key (self): | |
109 | key = os.urandom (42) | |
110 | encryptor = crypto.Encrypt (TEST_VERSION, | |
111 | TEST_PARAMVERSION, | |
112 | key=key, | |
113 | nacl=TEST_STATIC_NACL) | |
114 | ||
115 | ||
116 | def test_crypto_aes_gcm_enc_ctor_no_key_pw (self): | |
117 | """ | |
118 | Either key (+nacl) or password must be supplied, not both. | |
119 | """ | |
1c2f7f07 | 120 | with self.assertRaises (crypto.InvalidParameter): # neither key nor pw |
1f3fd7b0 PG |
121 | encryptor = crypto.Encrypt (TEST_VERSION, |
122 | TEST_PARAMVERSION, | |
123 | nacl=TEST_STATIC_NACL) | |
1f3fd7b0 PG |
124 | |
125 | password = str (os.urandom (42)) | |
126 | key = os.urandom (16) # scrypt sized | |
1c2f7f07 | 127 | with self.assertRaises (crypto.InvalidParameter): # both key and pw |
1f3fd7b0 PG |
128 | encryptor = crypto.Encrypt (TEST_VERSION, |
129 | TEST_PARAMVERSION, | |
130 | password=password, | |
131 | key=key, | |
132 | nacl=TEST_STATIC_NACL) | |
1f3fd7b0 | 133 | |
1c2f7f07 | 134 | with self.assertRaises (crypto.InvalidParameter): # key, but salt missing |
1f3fd7b0 PG |
135 | encryptor = crypto.Encrypt (TEST_VERSION, |
136 | TEST_PARAMVERSION, | |
137 | key=key, | |
138 | nacl=None) | |
1f3fd7b0 | 139 | |
1c2f7f07 | 140 | with self.assertRaises (crypto.InvalidParameter): # empty pw |
1f3fd7b0 PG |
141 | encryptor = crypto.Encrypt (TEST_VERSION, |
142 | TEST_PARAMVERSION, | |
143 | password=b"", | |
144 | nacl=TEST_STATIC_NACL) | |
90ee2a74 PG |
145 | |
146 | ||
147 | def test_crypto_aes_gcm_enc_header_size (self): | |
148 | password = str (os.urandom (42)) | |
1f3fd7b0 | 149 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 150 | TEST_PARAMVERSION, |
1f3fd7b0 | 151 | password=password, |
90ee2a74 PG |
152 | nacl=TEST_STATIC_NACL) |
153 | ||
154 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
155 | assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE | |
cb7a3911 | 156 | _, _ = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
157 | _, header, _ = encryptor.done (header_dummy) |
158 | assert len (header) == crypto.PDTCRYPT_HDR_SIZE | |
5133232d | 159 | |
e2a4e4f0 | 160 | |
66b1c6f4 PG |
161 | def test_crypto_aes_gcm_enc_multi_ivs_ok_explicit_counter (self): |
162 | """ | |
163 | Access the list of IVs used during encryption and check reuse. Start | |
164 | from explicit counters so the inputs don’t overlap. IVs must not have | |
165 | been reused. | |
166 | """ | |
167 | ||
168 | def enc (start_count, data): | |
169 | password = str (os.urandom (42)) | |
170 | encryptor = crypto.Encrypt (TEST_VERSION, | |
171 | TEST_PARAMVERSION, | |
172 | password=password, | |
173 | nacl=TEST_STATIC_NACL, | |
174 | counter=start_count) | |
175 | ||
176 | for i, blob in enumerate (data, 1): | |
177 | fname = "%s_%d" % (TEST_DUMMY_FILENAME, i) | |
178 | header_dummy = encryptor.next (fname) | |
179 | _, _ = encryptor.process (blob) | |
180 | _, header, _ = encryptor.done (header_dummy) | |
181 | assert len (encryptor.get_used_ivs ()) == i | |
182 | ||
183 | return encryptor.get_used_ivs () | |
184 | ||
185 | ivs1 = enc (0x0042, [TEST_PLAINTEXT, b"none of your business"]) | |
186 | ivs2 = enc (0x1337, [b"read me if you can", b"for British eyes only!"]) | |
187 | ||
188 | # No reuse in general. | |
189 | assert len (ivs1 & ivs2) == 0 | |
190 | ||
191 | ivs1 = list (ivs1) | |
192 | ivs2 = list (ivs2) | |
193 | ||
194 | # Counters of used IVs must match what we passed explicitly. | |
195 | def extract_counters (ivs): | |
196 | def getcount (iv): | |
197 | _, cnt = struct.unpack (crypto.FMT_I2N_IV, iv) | |
198 | return cnt | |
199 | return list (map (getcount, ivs)) | |
200 | ||
201 | cnt1 = extract_counters (ivs1) | |
202 | cnt2 = extract_counters (ivs2) | |
203 | ||
204 | assert 0x0042 in cnt1 | |
205 | assert 0x0043 in cnt1 | |
206 | ||
207 | assert 0x1337 in cnt2 | |
208 | assert 0x1338 in cnt2 | |
209 | ||
210 | ||
5133232d | 211 | def test_crypto_aes_gcm_enc_chunk_size (self): |
90ee2a74 | 212 | password = str (os.urandom (42)) |
1f3fd7b0 | 213 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 214 | TEST_PARAMVERSION, |
1f3fd7b0 | 215 | password=password, |
90ee2a74 PG |
216 | nacl=TEST_STATIC_NACL) |
217 | ||
218 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 219 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
220 | assert len (ciphertext) == len (TEST_PLAINTEXT) |
221 | rest, header, fixed = encryptor.done (header_dummy) | |
222 | assert len (rest) == 0 | |
5133232d | 223 | |
e2a4e4f0 | 224 | |
1f3fd7b0 PG |
225 | def test_crypto_aes_gcm_dec_ctor (self): |
226 | """ | |
227 | Ensure that only either key or password is accepted. | |
228 | """ | |
229 | password = str (os.urandom (42)) | |
230 | key = os.urandom (16) # scrypt sized | |
231 | ||
232 | decryptor = crypto.Decrypt (password=password) | |
233 | decryptor = crypto.Decrypt (key=key) | |
234 | ||
1c2f7f07 | 235 | with self.assertRaises (crypto.InvalidParameter): # both password and key |
1f3fd7b0 | 236 | decryptor = crypto.Decrypt (password=password, key=key) |
1f3fd7b0 | 237 | |
1c2f7f07 | 238 | with self.assertRaises (crypto.InvalidParameter): # neither password nor key |
1f3fd7b0 | 239 | decryptor = crypto.Decrypt (password=None, key=None) |
1f3fd7b0 | 240 | |
1c2f7f07 | 241 | with self.assertRaises (crypto.InvalidParameter): # # empty password |
1f3fd7b0 | 242 | decryptor = crypto.Decrypt (password="") |
1f3fd7b0 | 243 | |
30019abf | 244 | |
e2a4e4f0 | 245 | def test_crypto_aes_gcm_dec_simple (self): |
90ee2a74 | 246 | password = str (os.urandom (42)) |
1f3fd7b0 | 247 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 248 | TEST_PARAMVERSION, |
1f3fd7b0 | 249 | password=password, |
90ee2a74 PG |
250 | nacl=TEST_STATIC_NACL) |
251 | ||
252 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 253 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
254 | rest, header, fixed = encryptor.done (header_dummy) |
255 | ciphertext += rest | |
256 | ||
1f3fd7b0 | 257 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) |
90ee2a74 PG |
258 | decryptor.next (header) |
259 | plaintext = decryptor.process (ciphertext) | |
3ba1441c | 260 | rest = decryptor.done () |
90ee2a74 PG |
261 | plaintext += rest |
262 | ||
90ee2a74 | 263 | assert plaintext == TEST_PLAINTEXT |
7c32c176 PG |
264 | |
265 | ||
6110ef14 PG |
266 | def test_crypto_aes_gcm_dec_plain_bad (self): |
267 | """ | |
268 | Downgrade to plaintext must not be allowed in parameters | |
269 | obtained from headers. | |
270 | """ | |
271 | password = str (os.urandom (42)) | |
272 | encryptor = crypto.Encrypt (TEST_VERSION, | |
273 | TEST_PARAMVERSION, | |
274 | password=password, | |
275 | nacl=TEST_STATIC_NACL) | |
276 | ||
277 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
278 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) | |
279 | rest, header, fixed = encryptor.done (header_dummy) | |
280 | ciphertext += rest | |
281 | ||
282 | header = crypto.hdr_read (header) | |
283 | header ["paramversion"] = PLAIN_PARAMVERSION | |
284 | ok, header = crypto.hdr_make (header) | |
285 | assert ok | |
286 | ||
287 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) | |
288 | with self.assertRaises (crypto.InvalidParameter): | |
289 | decryptor.next (header) | |
290 | ||
291 | ||
292 | def test_crypto_aes_gcm_dec_plain_ok_insecure (self): | |
293 | """ | |
294 | Allow plaintext crypto mode if *insecure* flag is passed. | |
295 | """ | |
296 | password = str (os.urandom (42)) | |
297 | encryptor = crypto.Encrypt (TEST_VERSION, | |
298 | PLAIN_PARAMVERSION, | |
299 | password=password, | |
300 | nacl=TEST_STATIC_NACL, | |
301 | insecure=True) | |
302 | ||
303 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
304 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) | |
305 | rest, header, fixed = encryptor.done (header_dummy) | |
306 | ciphertext += rest | |
307 | ||
308 | header = crypto.hdr_read (header) | |
309 | header ["paramversion"] = PLAIN_PARAMVERSION | |
310 | ok, header = crypto.hdr_make (header) | |
311 | assert ok | |
312 | ||
313 | decryptor = crypto.Decrypt (password=password, | |
314 | fixedparts=fixed, | |
315 | insecure=True) | |
316 | decryptor.next (header) | |
317 | plaintext = decryptor.process (ciphertext) | |
318 | rest = decryptor.done () | |
319 | plaintext += rest | |
320 | ||
321 | assert plaintext == TEST_PLAINTEXT | |
322 | ||
323 | ||
e2a4e4f0 | 324 | def test_crypto_aes_gcm_dec_bad_tag (self): |
90ee2a74 | 325 | password = str (os.urandom (42)) |
1f3fd7b0 | 326 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 327 | TEST_PARAMVERSION, |
1f3fd7b0 | 328 | password=password, |
90ee2a74 PG |
329 | nacl=TEST_STATIC_NACL) |
330 | ||
331 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 332 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
333 | ciphertext2, header, fixed = encryptor.done (header_dummy) |
334 | ||
335 | mut_header = bytearray (header) | |
336 | mut_header_vw = memoryview (mut_header) | |
337 | # replace one byte in the tag part of the header | |
338 | second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2] | |
339 | mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 | |
340 | header = bytes (mut_header) | |
341 | ||
1f3fd7b0 | 342 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) |
90ee2a74 PG |
343 | decryptor.next (header) |
344 | plaintext = decryptor.process (ciphertext) | |
1c2f7f07 | 345 | with self.assertRaises (crypto.InvalidGCMTag): |
3ba1441c | 346 | _ = decryptor.done () |
e2a4e4f0 PG |
347 | |
348 | ||
c2d1c3ec PG |
349 | def test_crypto_aes_gcm_enc_multicnk (self): |
350 | cnksiz = 1 << 10 | |
90ee2a74 PG |
351 | pt = fill_mod (1 << 14) |
352 | password = str (os.urandom (42)) | |
1f3fd7b0 | 353 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 354 | TEST_PARAMVERSION, |
1f3fd7b0 | 355 | password=password, |
90ee2a74 PG |
356 | nacl=TEST_STATIC_NACL) |
357 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
c2d1c3ec PG |
358 | |
359 | off = 0 | |
360 | ct = b"" | |
90ee2a74 PG |
361 | while off < len (pt): |
362 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 363 | _, cnk = encryptor.process (pt [off:upto]) |
c2d1c3ec PG |
364 | ct += cnk |
365 | off += cnksiz | |
90ee2a74 PG |
366 | cnk, header, fixed = encryptor.done (header_dummy) |
367 | ct += cnk | |
368 | ||
369 | assert len (pt) == len (ct) | |
c2d1c3ec PG |
370 | |
371 | ||
30019abf PG |
372 | def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self): |
373 | cnksiz = 1 << 10 | |
374 | pt = fill_mod (1 << 14) | |
375 | password = str (os.urandom (42)) | |
376 | encryptor = crypto.Encrypt (TEST_VERSION, | |
377 | TEST_PARAMVERSION, | |
378 | password=password, | |
379 | nacl=TEST_STATIC_NACL, | |
380 | strict_ivs=True) | |
381 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
382 | ||
383 | off = 0 | |
384 | ct = b"" | |
385 | while off < len (pt): | |
386 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 387 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
388 | ct += cnk |
389 | off += cnksiz | |
390 | cnk, header, fixed = encryptor.done (header_dummy) | |
391 | ct += cnk | |
392 | ||
393 | assert len (pt) == len (ct) | |
394 | ||
395 | ||
396 | def test_crypto_aes_gcm_enc_multiobj (self): | |
397 | cnksiz = 1 << 10 | |
398 | password = str (os.urandom (42)) | |
399 | encryptor = crypto.Encrypt (TEST_VERSION, | |
400 | TEST_PARAMVERSION, | |
401 | password=password, | |
402 | nacl=TEST_STATIC_NACL, | |
403 | strict_ivs=False) | |
404 | ||
405 | def addobj (i): | |
406 | pt = fill_mod (1 << 14, off=i) | |
407 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
408 | ||
409 | off = 0 | |
410 | ct = b"" | |
411 | while off < len (pt): | |
412 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 413 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
414 | ct += cnk |
415 | off += cnksiz | |
416 | cnk, header, fixed = encryptor.done (header_dummy) | |
417 | ct += cnk | |
418 | ||
419 | assert len (pt) == len (ct) | |
420 | ||
421 | for i in range (5): addobj (i) | |
422 | ||
be124bca PG |
423 | assert len (encryptor.fixed) == 1 |
424 | ||
30019abf PG |
425 | |
426 | def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self): | |
427 | cnksiz = 1 << 10 | |
428 | password = str (os.urandom (42)) | |
429 | encryptor = crypto.Encrypt (TEST_VERSION, | |
430 | TEST_PARAMVERSION, | |
431 | password=password, | |
432 | nacl=TEST_STATIC_NACL, | |
433 | strict_ivs=True) | |
be124bca | 434 | curfixed = None # must remain constant after first |
30019abf PG |
435 | |
436 | def addobj (i): | |
437 | pt = fill_mod (1 << 14, off=i) | |
438 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
439 | ||
440 | off = 0 | |
441 | ct = b"" | |
442 | while off < len (pt): | |
443 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 444 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
445 | ct += cnk |
446 | off += cnksiz | |
447 | cnk, header, fixed = encryptor.done (header_dummy) | |
be124bca PG |
448 | nonlocal curfixed |
449 | if curfixed is None: | |
450 | curfixed = fixed | |
451 | else: | |
452 | assert fixed == curfixed | |
30019abf PG |
453 | ct += cnk |
454 | ||
455 | assert len (pt) == len (ct) | |
456 | ||
457 | for i in range (5): addobj (i) | |
458 | ||
be124bca PG |
459 | assert len (encryptor.fixed) == 1 |
460 | ||
30019abf | 461 | |
770173c5 PG |
462 | def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self): |
463 | """ | |
464 | Test behavior when the file counter tops out. | |
465 | ||
466 | Artificially lower the maximum possible file counter. Considering | |
cb7a3911 | 467 | invalid (0) and reserved (1, 2) values, the smallest possible file counter |
770173c5 PG |
468 | for normal objects is 3. Starting from that, the header of the (max - |
469 | 3)rd object must have both a different IV fixed part and a counter. | |
470 | """ | |
471 | minimum = 3 | |
472 | new_max = 8 | |
cb7a3911 PG |
473 | crypto._testing_set_AES_GCM_IV_CNT_MAX \ |
474 | ("I am fully aware that this will void my warranty.", new_max) | |
770173c5 PG |
475 | cnksiz = 1 << 10 |
476 | password = str (os.urandom (42)) | |
477 | encryptor = crypto.Encrypt (TEST_VERSION, | |
478 | TEST_PARAMVERSION, | |
479 | password=password, | |
480 | nacl=TEST_STATIC_NACL, | |
481 | strict_ivs=True) | |
482 | ||
483 | last_iv = None | |
484 | last_cnt = minimum | |
485 | ||
486 | def addobj (i, wrap=False): | |
487 | nonlocal last_iv | |
488 | nonlocal last_cnt | |
489 | pt = fill_mod (1 << 14, off=i) | |
490 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
491 | ||
492 | off = 0 | |
493 | ct = b"" | |
494 | while off < len (pt): | |
495 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 496 | _, cnk = encryptor.process (pt [off:upto]) |
770173c5 PG |
497 | ct += cnk |
498 | off += cnksiz | |
499 | cnk, header, fixed = encryptor.done (header_dummy) | |
500 | this_iv = crypto.hdr_read (header) ["iv"] | |
501 | if last_iv is not None: | |
502 | this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv) | |
503 | last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv) | |
504 | if wrap is False: | |
505 | assert last_fixed == this_fixed | |
506 | assert last_cnt == this_cnt - 1 | |
507 | else: | |
508 | assert last_fixed != this_fixed | |
509 | assert this_cnt == minimum | |
510 | last_iv = this_iv | |
511 | ct += cnk | |
512 | ||
513 | assert len (pt) == len (ct) | |
514 | ||
515 | for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8] | |
516 | addobj (i + 1, True) # counter wraps to 3 | |
517 | ||
518 | for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8] | |
519 | addobj (j + 1, True) # counter wraps to 3 again | |
520 | ||
be124bca PG |
521 | assert len (encryptor.fixed) == 3 |
522 | ||
523 | ||
524 | def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self): | |
525 | """ | |
526 | Test behavior when the file counter tops out and the transition to | |
527 | the next IV fixed part fails on account of a bad random generator. | |
528 | ||
529 | Replaces the ``urandom`` reference in ``os`` with a deterministic | |
530 | function. The encryptor context must communicate this condition with an | |
531 | ``IVFixedPartError``. | |
532 | """ | |
533 | minimum = 3 | |
534 | new_max = 8 | |
535 | crypto._testing_set_AES_GCM_IV_CNT_MAX \ | |
536 | ("I am fully aware that this will void my warranty.", new_max) | |
537 | cnksiz = 1 << 10 | |
538 | os.urandom = lambda n: bytes (bytearray ([n % 256] * n)) | |
539 | password = str (os.urandom (42)) | |
540 | encryptor = crypto.Encrypt (TEST_VERSION, | |
541 | TEST_PARAMVERSION, | |
542 | password=password, | |
543 | nacl=TEST_STATIC_NACL, | |
544 | strict_ivs=True) | |
545 | ||
546 | def addobj (i): | |
547 | pt = fill_mod (1 << 14, off=i) | |
548 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
549 | ||
550 | off = 0 | |
551 | while off < len (pt): | |
552 | upto = min (off + cnksiz, len (pt)) | |
553 | _, cnk = encryptor.process (pt [off:upto]) | |
554 | off += cnksiz | |
555 | ||
556 | for i in range (minimum, new_max): addobj (42 + i) | |
557 | ||
558 | with self.assertRaises (crypto.IVFixedPartError): | |
559 | addobj (42 + i) | |
560 | ||
561 | ||
cb7a3911 PG |
562 | |
563 | def test_crypto_aes_gcm_enc_length_cap (self): | |
564 | """ | |
565 | Artificially lower the maximum allowable data length and attempt to | |
e2f52c53 PG |
566 | encrypt a larger object. Verify that the crypto handler only encrypts |
567 | data up to the size limit. A downstream user detects that condition by | |
568 | testing whether the encryption step yielded less bytes than the | |
569 | plaintext. | |
c7066870 TJ |
570 | |
571 | The sibling to this test is test_restore_backup_max_file_length() | |
572 | in test_delatar.py. Deltatar will transparently create a splitted object | |
573 | with an increased IV file counter. | |
cb7a3911 PG |
574 | """ |
575 | new_max = 2187 | |
576 | crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ | |
577 | ("I am fully aware that this will void my warranty.", new_max) | |
578 | cnksiz = 1 << 10 | |
579 | password = str (os.urandom (42)) | |
580 | encryptor = crypto.Encrypt (TEST_VERSION, | |
581 | TEST_PARAMVERSION, | |
582 | password=password, | |
583 | nacl=TEST_STATIC_NACL) | |
584 | ||
585 | def encobj (s): | |
586 | pt, ct = fill_mod (s), None | |
587 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s)) | |
588 | ||
589 | n, ct = encryptor.process (pt) | |
590 | rest, _, _ = encryptor.done (header_dummy) | |
366d4b42 PG |
591 | |
592 | # NB: If this check *ever* fails, then something changed in the | |
593 | # encoding layer. AES-GCM is a stream cipher so each encoding | |
594 | # step will yield the exact number of ciphertext bytes that | |
595 | # was provided as plaintext. Thus there cannot be any encoded | |
596 | # data left when calling the finalizers. None of the crypo code | |
597 | # depends on that assumption but nevertheless we check it here | |
598 | # in case anything changes upstream in the Cryptography | |
599 | # library. In case there actually is a rest, replace the | |
600 | # assertion below with ``ct += rest``. | |
601 | assert (len (rest) == 0) | |
cb7a3911 PG |
602 | |
603 | if len (pt) > new_max: | |
e2f52c53 PG |
604 | # If the plaintext was longer than the artificially lowered |
605 | # maximum, then the number of ciphertext bytes must be clamped | |
606 | # to the maximum. | |
607 | assert n == new_max | |
cb7a3911 PG |
608 | else: |
609 | assert n == len (pt) == len (ct) | |
610 | ||
611 | for i in range (16): encobj (1 << i) | |
770173c5 PG |
612 | |
613 | ||
58ed14b8 PG |
614 | def test_crypto_aes_gcm_dec_length_cap (self): |
615 | """ | |
616 | The decryptor must reject headers with an object size that exceeds | |
617 | the PDTCRYPT maximum. Longer files split into multiple objects. | |
618 | """ | |
619 | password = str (os.urandom (42)) | |
620 | meta = faux_hdr() | |
621 | meta ["ctsize"] = crypto.PDTCRYPT_MAX_OBJ_SIZE + 1 | |
622 | ok, header = crypto.hdr_make (meta) | |
623 | ||
624 | assert ok | |
625 | ||
626 | # Set up decryption with bogus header. | |
627 | decryptor = crypto.Decrypt (password=password, fixedparts=[]) | |
628 | ||
629 | with self.assertRaises (crypto.InvalidHeader): | |
630 | decryptor.next (header) | |
631 | ||
632 | ||
633 | def test_crypto_aes_gcm_dec_length_mismatch (self): | |
634 | """ | |
635 | Catch attempts at decrypting more data than what was stated in the | |
636 | header. | |
637 | """ | |
638 | cnksiz = 1 << 10 | |
639 | orig_pt = fill_mod (1 << 14) | |
640 | password = str (os.urandom (42)) | |
641 | encryptor = crypto.Encrypt (TEST_VERSION, | |
642 | TEST_PARAMVERSION, | |
643 | password=password, | |
644 | nacl=TEST_STATIC_NACL) | |
645 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
646 | ||
647 | off = 0 | |
648 | ct = b"" | |
649 | while off < len (orig_pt): | |
650 | upto = min (off + cnksiz, len (orig_pt)) | |
651 | _n, cnk = encryptor.process (orig_pt [off:upto]) | |
652 | ct += cnk | |
653 | off += cnksiz | |
654 | cnk, header, fixed = encryptor.done (header_dummy) | |
655 | ct += cnk | |
656 | ||
657 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) | |
658 | ||
659 | decryptor.next (header) | |
660 | off = 0 | |
661 | pt = b"" | |
662 | while off < len (orig_pt): | |
663 | upto = min (off + cnksiz, len (orig_pt)) | |
664 | cnk = decryptor.process (ct [off:upto]) | |
665 | pt += cnk | |
666 | off += cnksiz | |
667 | ||
668 | with self.assertRaises (crypto.CiphertextTooLong): | |
669 | # Try and decrypt one byte more than was encrypted. | |
670 | # This must be caught in crypto.py. | |
671 | _ = decryptor.process (ct [0:1]) | |
672 | ||
673 | ||
c2d1c3ec | 674 | def test_crypto_aes_gcm_dec_multicnk (self): |
90ee2a74 PG |
675 | cnksiz = 1 << 10 |
676 | orig_pt = fill_mod (1 << 14) | |
677 | password = str (os.urandom (42)) | |
1f3fd7b0 | 678 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 679 | TEST_PARAMVERSION, |
1f3fd7b0 | 680 | password=password, |
90ee2a74 PG |
681 | nacl=TEST_STATIC_NACL) |
682 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
c2d1c3ec PG |
683 | |
684 | off = 0 | |
685 | ct = b"" | |
686 | while off < len (orig_pt): | |
687 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 688 | _n, cnk = encryptor.process (orig_pt [off:upto]) |
c2d1c3ec PG |
689 | ct += cnk |
690 | off += cnksiz | |
90ee2a74 PG |
691 | cnk, header, fixed = encryptor.done (header_dummy) |
692 | ct += cnk | |
c2d1c3ec | 693 | |
1f3fd7b0 PG |
694 | decryptor = crypto.Decrypt (password=password, |
695 | fixedparts=fixed) | |
90ee2a74 | 696 | decryptor.next (header) |
c2d1c3ec | 697 | off = 0 |
90ee2a74 | 698 | pt = b"" |
c2d1c3ec PG |
699 | while off < len (orig_pt): |
700 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 701 | cnk = decryptor.process (ct [off:upto]) |
c2d1c3ec PG |
702 | pt += cnk |
703 | off += cnksiz | |
c2d1c3ec | 704 | |
90ee2a74 | 705 | |
3ba1441c | 706 | pt += decryptor.done () |
c2d1c3ec PG |
707 | assert pt == orig_pt |
708 | ||
709 | ||
7c32c176 | 710 | def test_crypto_aes_gcm_dec_multicnk_bad_tag (self): |
90ee2a74 PG |
711 | cnksiz = 1 << 10 |
712 | orig_pt = fill_mod (1 << 14) | |
713 | password = str (os.urandom (42)) | |
1f3fd7b0 | 714 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 715 | TEST_PARAMVERSION, |
1f3fd7b0 | 716 | password=password, |
90ee2a74 PG |
717 | nacl=TEST_STATIC_NACL) |
718 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
7c32c176 PG |
719 | |
720 | off = 0 | |
721 | ct = b"" | |
722 | while off < len (orig_pt): | |
723 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 724 | _n, cnk = encryptor.process (orig_pt [off:upto]) |
7c32c176 PG |
725 | ct += cnk |
726 | off += cnksiz | |
90ee2a74 PG |
727 | cnk, header, fixed = encryptor.done (header_dummy) |
728 | ct += cnk | |
729 | ||
730 | mut_header = bytearray (header) | |
731 | mut_header_vw = memoryview (mut_header) | |
732 | # replace one byte in the tag part of the header | |
733 | second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2] | |
734 | mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 | |
735 | header = bytes (mut_header) | |
736 | ||
1f3fd7b0 PG |
737 | decryptor = crypto.Decrypt (password=password, |
738 | fixedparts=fixed) | |
90ee2a74 | 739 | decryptor.next (header) |
7c32c176 | 740 | off = 0 |
90ee2a74 | 741 | pt = b"" |
7c32c176 PG |
742 | while off < len (orig_pt): |
743 | upto = min (off + cnksiz, len (orig_pt)) | |
90ee2a74 | 744 | cnk = decryptor.process (ct [off:upto]) |
7c32c176 PG |
745 | pt += cnk |
746 | off += cnksiz | |
747 | ||
1c2f7f07 | 748 | with self.assertRaises (crypto.InvalidGCMTag): |
3ba1441c | 749 | _ = decryptor.done () |
da4ea4fa PG |
750 | |
751 | ||
2f0b128c PG |
752 | def test_crypto_aes_gcm_dec_iv_gap (self): |
753 | """ | |
754 | Encrypt multiple objects using non-consecutive IVs and verify that the | |
755 | decryptor errors out with an exception in strict mode but keeps quiet | |
756 | otherwise. | |
757 | """ | |
758 | cnksiz = 1 << 10 | |
759 | orig_pt_1 = fill_mod (1 << 10) | |
760 | orig_pt_2 = fill_mod (1 << 10, 23) | |
761 | orig_pt_3 = fill_mod (1 << 10, 42) | |
762 | password = str (os.urandom (42)) | |
763 | encryptor = crypto.Encrypt (TEST_VERSION, | |
764 | TEST_PARAMVERSION, | |
765 | password=password, | |
766 | nacl=TEST_STATIC_NACL) | |
767 | ||
768 | def enc (pt): | |
769 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
770 | ||
771 | off = 0 | |
772 | ct = b"" | |
773 | while off < len (pt): | |
774 | upto = min (off + cnksiz, len (pt)) | |
775 | _n, cnk = encryptor.process (pt [off:upto]) | |
776 | ct += cnk | |
777 | off += cnksiz | |
778 | cnk, header, fixed = encryptor.done (header_dummy) | |
779 | return ct + cnk, header, fixed | |
780 | ||
781 | ct_1, hdr_1, _____ = enc (orig_pt_1) | |
782 | ||
783 | ## Here we bump the iv of the encryptor, breaking the series. | |
784 | encryptor.set_object_counter (encryptor.cnt + 1) | |
785 | ct_2, hdr_2, fixed = enc (orig_pt_2) | |
786 | ||
787 | ## IV of final object is again in-sequence. | |
788 | ct_3, hdr_3, fixed = enc (orig_pt_3) | |
789 | ||
790 | def decrypt (strict_ivs): | |
791 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed, | |
792 | strict_ivs=strict_ivs) | |
793 | ||
794 | def dec (hdr, ct): | |
795 | decryptor.next (hdr) | |
796 | off = 0 | |
797 | pt = b"" | |
798 | while off < len (ct): | |
799 | upto = min (off + cnksiz, len (ct)) | |
800 | cnk = decryptor.process (ct [off:upto]) | |
801 | pt += cnk | |
802 | off += cnksiz | |
803 | return pt + decryptor.done () | |
804 | ||
805 | decr_pt_1 = dec (hdr_1, ct_1) | |
806 | decr_pt_2 = dec (hdr_2, ct_2) ## ← good header, non-consecutive IV | |
807 | decr_pt_3 = dec (hdr_3, ct_3) | |
808 | ||
809 | assert decr_pt_1 == orig_pt_1 | |
810 | assert decr_pt_2 == orig_pt_2 | |
811 | assert decr_pt_3 == orig_pt_3 | |
812 | ||
813 | with self.assertRaises (crypto.NonConsecutiveIV): | |
814 | decrypt (True) | |
815 | ||
816 | decrypt (False) # Sequence passes | |
817 | ||
818 | ||
fd10b44a PG |
819 | def test_crypto_aes_gcm_dec_iv_reuse (self): |
820 | """ | |
821 | Meddle with encrypted content: extract the IV from one object | |
822 | and inject it into the header of another. This must be rejected | |
823 | by the decryptor. | |
824 | """ | |
825 | cnksiz = 1 << 10 | |
826 | orig_pt_1 = fill_mod (1 << 10) | |
827 | orig_pt_2 = fill_mod (1 << 10, 42) | |
828 | password = str (os.urandom (42)) | |
1f3fd7b0 | 829 | encryptor = crypto.Encrypt (TEST_VERSION, |
fd10b44a | 830 | TEST_PARAMVERSION, |
1f3fd7b0 | 831 | password=password, |
fd10b44a PG |
832 | nacl=TEST_STATIC_NACL) |
833 | ||
834 | def enc (pt): | |
835 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
836 | ||
837 | off = 0 | |
838 | ct = b"" | |
839 | while off < len (pt): | |
840 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 841 | _n, cnk = encryptor.process (pt [off:upto]) |
fd10b44a PG |
842 | ct += cnk |
843 | off += cnksiz | |
844 | cnk, header, fixed = encryptor.done (header_dummy) | |
845 | return ct + cnk, header, fixed | |
846 | ||
847 | ct_1, hdr_1, _____ = enc (orig_pt_1) | |
66b1c6f4 | 848 | encryptor.cnt -= 1 # induce error by forcing an identical IV on next step |
fd10b44a | 849 | |
66b1c6f4 PG |
850 | with self.assertRaises (crypto.DuplicateIV): # reuse detected |
851 | ct_2, hdr_2, fixed = enc (orig_pt_2) | |
fd10b44a PG |
852 | |
853 | ||
cb7397d5 PG |
854 | class HeaderTest (CryptoLayerTest): |
855 | ||
e2a4e4f0 PG |
856 | def test_crypto_fmt_hdr_make (self): |
857 | meta = faux_hdr() | |
858 | ok, hdr = crypto.hdr_make (meta) | |
859 | assert ok | |
90ee2a74 | 860 | assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE |
e2a4e4f0 PG |
861 | |
862 | ||
cb7397d5 PG |
863 | def test_crypto_fmt_hdr_make_useless (self): |
864 | ok, ret = crypto.hdr_make ({ 42: "x" }) | |
865 | assert ok is False | |
a83fa4ed | 866 | assert ret.startswith ("error assembling header:") |
cb7397d5 PG |
867 | |
868 | ||
e2a4e4f0 PG |
869 | def test_crypto_fmt_hdr_read (self): |
870 | meta = faux_hdr() | |
871 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
872 | assert ok is True |
873 | assert hdr is not None | |
874 | mmeta = crypto.hdr_read (hdr) | |
875 | assert mmeta is not None | |
e2a4e4f0 PG |
876 | for k in meta: |
877 | if meta [k] != mmeta [k]: | |
878 | raise "header mismatch after reading: expected %r, got %r" \ | |
879 | % (meta [k], mmeta [k]) | |
880 | ||
881 | ||
cb7397d5 PG |
882 | def test_crypto_fmt_hdr_read_trailing_garbage (self): |
883 | meta = faux_hdr() | |
884 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
885 | ok, hdr = crypto.hdr_make (meta) |
886 | assert ok is True | |
887 | assert hdr is not None | |
cb7397d5 | 888 | hdr += b"-junk" |
1c2f7f07 | 889 | with self.assertRaises (crypto.InvalidHeader): |
90ee2a74 | 890 | _ = crypto.hdr_read (hdr) |
cb7397d5 PG |
891 | |
892 | ||
893 | def test_crypto_fmt_hdr_read_leading_garbage (self): | |
894 | meta = faux_hdr() | |
895 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
896 | ok, hdr = crypto.hdr_make (meta) |
897 | assert ok is True | |
898 | assert hdr is not None | |
cb7397d5 | 899 | hdr = b"junk-" + hdr |
1c2f7f07 | 900 | with self.assertRaises (crypto.InvalidHeader): |
90ee2a74 | 901 | _ = crypto.hdr_read (hdr) |
cb7397d5 PG |
902 | |
903 | ||
904 | def test_crypto_fmt_hdr_inner_garbage (self): | |
905 | meta = faux_hdr() | |
906 | ok, hdr = crypto.hdr_make (meta) | |
c176405d | 907 | assert ok |
90ee2a74 | 908 | data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:] |
1c2f7f07 | 909 | with self.assertRaises (crypto.InvalidHeader): |
90ee2a74 | 910 | _ = crypto.hdr_read (data) |
c176405d | 911 |