Commit | Line | Data |
---|---|---|
5133232d PG |
1 | import binascii |
2 | import os | |
3 | import pylibscrypt | |
c2d1c3ec | 4 | import struct |
5133232d PG |
5 | import unittest |
6 | ||
7 | import deltatar.crypto as crypto | |
8 | ||
e2a4e4f0 PG |
9 | import cryptography |
10 | ||
5133232d PG |
11 | def b(s): |
12 | return s.encode("UTF-8") | |
13 | ||
90ee2a74 | 14 | CRYPTO_NACL_SIZE = 16 |
5133232d | 15 | CRYPTO_KEY_SIZE = 16 |
90ee2a74 PG |
16 | |
17 | TEST_PLAINTEXT = b("gentlemen don’t read each other’s mail") | |
18 | TEST_PASSPHRASE = b"test1234" | |
19 | TEST_AES_GCM_AAD = b"authenticated plain text" | |
20 | TEST_DUMMY_FILENAME = "insurance-file.txt" | |
21 | TEST_VERSION = 1 | |
22 | TEST_PARAMVERSION = 1 | |
23 | TEST_STATIC_NACL = os.urandom (CRYPTO_NACL_SIZE) | |
6110ef14 | 24 | PLAIN_PARAMVERSION = 0 |
5133232d PG |
25 | |
26 | def faux_hdr (ctsize=1337, iv=None): | |
27 | return \ | |
28 | { "version" : 42 | |
29 | , "paramversion" : 2187 | |
30 | , "nacl" : binascii.unhexlify(b"0011223344556677" | |
31 | b"8899aabbccddeeff") | |
32 | , "iv" : iv or binascii.unhexlify(b"0011223344556677" | |
33 | b"8899aabb") | |
34 | , "ctsize" : ctsize | |
90ee2a74 PG |
35 | , "tag" : binascii.unhexlify(b"deadbeefbadb100d" |
36 | b"b1eedc0ffeedea15") | |
5133232d PG |
37 | } |
38 | ||
30019abf | 39 | FILL_MOD_MEMO = { } |
5133232d | 40 | |
fd10b44a | 41 | def fill_mod (n, off=0): |
30019abf PG |
42 | global FILL_MOD_MEMO |
43 | k = (n, off) | |
44 | m = FILL_MOD_MEMO.get (k, None) | |
45 | if m is not None: | |
46 | return m | |
c2d1c3ec PG |
47 | buf = bytearray (n) |
48 | bufv = memoryview (buf) | |
49 | for i in range (n): | |
fd10b44a PG |
50 | off += 1 |
51 | c = off % 64 + 32 | |
c2d1c3ec | 52 | struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8")) |
30019abf PG |
53 | m = bytes (buf) |
54 | FILL_MOD_MEMO [k] = m | |
55 | return m | |
c2d1c3ec PG |
56 | |
57 | ||
5133232d PG |
58 | def faux_payload (): |
59 | return "abcd" * 42 | |
60 | ||
cb7397d5 | 61 | |
5133232d | 62 | class CryptoLayerTest (unittest.TestCase): |
cb7397d5 PG |
63 | pass |
64 | ||
65 | ||
66 | class AESGCMTest (CryptoLayerTest): | |
5133232d | 67 | |
be124bca PG |
68 | os_urandom = os.urandom |
69 | ||
cb7a3911 PG |
70 | def tearDown (self): |
71 | """Reset globals altered for testing.""" | |
72 | _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \ | |
73 | ("I am fully aware that this will void my warranty.") | |
74 | _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ | |
75 | ("I am fully aware that this will void my warranty.") | |
be124bca | 76 | os.urandom = self.os_urandom |
cb7a3911 | 77 | |
90ee2a74 PG |
78 | def test_crypto_aes_gcm_enc_ctor (self): |
79 | password = str (os.urandom (42)) | |
1f3fd7b0 PG |
80 | encryptor = crypto.Encrypt (TEST_VERSION, |
81 | TEST_PARAMVERSION, | |
82 | password=password, | |
83 | nacl=TEST_STATIC_NACL) | |
84 | ||
6110ef14 PG |
85 | def test_crypto_aes_gcm_enc_ctor_bad_plainparams (self): |
86 | """Refuse plaintext passthrough mode by default.""" | |
87 | password = str (os.urandom (42)) | |
88 | with self.assertRaises (crypto.InvalidParameter): | |
89 | encryptor = crypto.Encrypt (TEST_VERSION, | |
90 | PLAIN_PARAMVERSION, | |
91 | password=password, | |
92 | nacl=TEST_STATIC_NACL) | |
93 | ||
94 | ||
95 | def test_crypto_aes_gcm_enc_ctor_ok_insecure_plainparams (self): | |
96 | """ | |
97 | Comply with request for plaintext passthrough mode if the | |
98 | *insecure* flag is passed. | |
99 | """ | |
100 | password = str (os.urandom (42)) | |
101 | encryptor = crypto.Encrypt (TEST_VERSION, | |
102 | PLAIN_PARAMVERSION, | |
103 | password=password, | |
104 | nacl=TEST_STATIC_NACL, | |
105 | insecure=True) | |
106 | ||
1f3fd7b0 PG |
107 | |
108 | def test_crypto_aes_gcm_enc_ctor_key (self): | |
109 | key = os.urandom (42) | |
110 | encryptor = crypto.Encrypt (TEST_VERSION, | |
111 | TEST_PARAMVERSION, | |
112 | key=key, | |
113 | nacl=TEST_STATIC_NACL) | |
114 | ||
115 | ||
116 | def test_crypto_aes_gcm_enc_ctor_no_key_pw (self): | |
117 | """ | |
118 | Either key (+nacl) or password must be supplied, not both. | |
119 | """ | |
1c2f7f07 | 120 | with self.assertRaises (crypto.InvalidParameter): # neither key nor pw |
1f3fd7b0 PG |
121 | encryptor = crypto.Encrypt (TEST_VERSION, |
122 | TEST_PARAMVERSION, | |
123 | nacl=TEST_STATIC_NACL) | |
1f3fd7b0 PG |
124 | |
125 | password = str (os.urandom (42)) | |
126 | key = os.urandom (16) # scrypt sized | |
1c2f7f07 | 127 | with self.assertRaises (crypto.InvalidParameter): # both key and pw |
1f3fd7b0 PG |
128 | encryptor = crypto.Encrypt (TEST_VERSION, |
129 | TEST_PARAMVERSION, | |
130 | password=password, | |
131 | key=key, | |
132 | nacl=TEST_STATIC_NACL) | |
1f3fd7b0 | 133 | |
1c2f7f07 | 134 | with self.assertRaises (crypto.InvalidParameter): # key, but salt missing |
1f3fd7b0 PG |
135 | encryptor = crypto.Encrypt (TEST_VERSION, |
136 | TEST_PARAMVERSION, | |
137 | key=key, | |
138 | nacl=None) | |
1f3fd7b0 | 139 | |
1c2f7f07 | 140 | with self.assertRaises (crypto.InvalidParameter): # empty pw |
1f3fd7b0 PG |
141 | encryptor = crypto.Encrypt (TEST_VERSION, |
142 | TEST_PARAMVERSION, | |
143 | password=b"", | |
144 | nacl=TEST_STATIC_NACL) | |
90ee2a74 PG |
145 | |
146 | ||
147 | def test_crypto_aes_gcm_enc_header_size (self): | |
148 | password = str (os.urandom (42)) | |
1f3fd7b0 | 149 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 150 | TEST_PARAMVERSION, |
1f3fd7b0 | 151 | password=password, |
90ee2a74 PG |
152 | nacl=TEST_STATIC_NACL) |
153 | ||
154 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
155 | assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE | |
cb7a3911 | 156 | _, _ = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
157 | _, header, _ = encryptor.done (header_dummy) |
158 | assert len (header) == crypto.PDTCRYPT_HDR_SIZE | |
5133232d | 159 | |
e2a4e4f0 | 160 | |
66b1c6f4 PG |
161 | def test_crypto_aes_gcm_enc_multi_ivs_ok_explicit_counter (self): |
162 | """ | |
163 | Access the list of IVs used during encryption and check reuse. Start | |
164 | from explicit counters so the inputs don’t overlap. IVs must not have | |
165 | been reused. | |
166 | """ | |
167 | ||
168 | def enc (start_count, data): | |
169 | password = str (os.urandom (42)) | |
170 | encryptor = crypto.Encrypt (TEST_VERSION, | |
171 | TEST_PARAMVERSION, | |
172 | password=password, | |
173 | nacl=TEST_STATIC_NACL, | |
b750b280 PG |
174 | counter=start_count, |
175 | strict_ivs=True) | |
66b1c6f4 PG |
176 | |
177 | for i, blob in enumerate (data, 1): | |
178 | fname = "%s_%d" % (TEST_DUMMY_FILENAME, i) | |
179 | header_dummy = encryptor.next (fname) | |
180 | _, _ = encryptor.process (blob) | |
181 | _, header, _ = encryptor.done (header_dummy) | |
182 | assert len (encryptor.get_used_ivs ()) == i | |
183 | ||
184 | return encryptor.get_used_ivs () | |
185 | ||
186 | ivs1 = enc (0x0042, [TEST_PLAINTEXT, b"none of your business"]) | |
187 | ivs2 = enc (0x1337, [b"read me if you can", b"for British eyes only!"]) | |
188 | ||
189 | # No reuse in general. | |
190 | assert len (ivs1 & ivs2) == 0 | |
191 | ||
192 | ivs1 = list (ivs1) | |
193 | ivs2 = list (ivs2) | |
194 | ||
195 | # Counters of used IVs must match what we passed explicitly. | |
196 | def extract_counters (ivs): | |
197 | def getcount (iv): | |
198 | _, cnt = struct.unpack (crypto.FMT_I2N_IV, iv) | |
199 | return cnt | |
200 | return list (map (getcount, ivs)) | |
201 | ||
202 | cnt1 = extract_counters (ivs1) | |
203 | cnt2 = extract_counters (ivs2) | |
204 | ||
205 | assert 0x0042 in cnt1 | |
206 | assert 0x0043 in cnt1 | |
207 | ||
208 | assert 0x1337 in cnt2 | |
209 | assert 0x1338 in cnt2 | |
210 | ||
211 | ||
5133232d | 212 | def test_crypto_aes_gcm_enc_chunk_size (self): |
90ee2a74 | 213 | password = str (os.urandom (42)) |
1f3fd7b0 | 214 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 215 | TEST_PARAMVERSION, |
1f3fd7b0 | 216 | password=password, |
90ee2a74 PG |
217 | nacl=TEST_STATIC_NACL) |
218 | ||
219 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 220 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
221 | assert len (ciphertext) == len (TEST_PLAINTEXT) |
222 | rest, header, fixed = encryptor.done (header_dummy) | |
223 | assert len (rest) == 0 | |
5133232d | 224 | |
e2a4e4f0 | 225 | |
1f3fd7b0 PG |
226 | def test_crypto_aes_gcm_dec_ctor (self): |
227 | """ | |
228 | Ensure that only either key or password is accepted. | |
229 | """ | |
230 | password = str (os.urandom (42)) | |
231 | key = os.urandom (16) # scrypt sized | |
232 | ||
233 | decryptor = crypto.Decrypt (password=password) | |
234 | decryptor = crypto.Decrypt (key=key) | |
235 | ||
1c2f7f07 | 236 | with self.assertRaises (crypto.InvalidParameter): # both password and key |
1f3fd7b0 | 237 | decryptor = crypto.Decrypt (password=password, key=key) |
1f3fd7b0 | 238 | |
1c2f7f07 | 239 | with self.assertRaises (crypto.InvalidParameter): # neither password nor key |
1f3fd7b0 | 240 | decryptor = crypto.Decrypt (password=None, key=None) |
1f3fd7b0 | 241 | |
1c2f7f07 | 242 | with self.assertRaises (crypto.InvalidParameter): # # empty password |
1f3fd7b0 | 243 | decryptor = crypto.Decrypt (password="") |
1f3fd7b0 | 244 | |
30019abf | 245 | |
e2a4e4f0 | 246 | def test_crypto_aes_gcm_dec_simple (self): |
90ee2a74 | 247 | password = str (os.urandom (42)) |
1f3fd7b0 | 248 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 249 | TEST_PARAMVERSION, |
1f3fd7b0 | 250 | password=password, |
90ee2a74 PG |
251 | nacl=TEST_STATIC_NACL) |
252 | ||
253 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 254 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
255 | rest, header, fixed = encryptor.done (header_dummy) |
256 | ciphertext += rest | |
257 | ||
1f3fd7b0 | 258 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) |
90ee2a74 PG |
259 | decryptor.next (header) |
260 | plaintext = decryptor.process (ciphertext) | |
3ba1441c | 261 | rest = decryptor.done () |
90ee2a74 PG |
262 | plaintext += rest |
263 | ||
90ee2a74 | 264 | assert plaintext == TEST_PLAINTEXT |
7c32c176 PG |
265 | |
266 | ||
6110ef14 PG |
267 | def test_crypto_aes_gcm_dec_plain_bad (self): |
268 | """ | |
269 | Downgrade to plaintext must not be allowed in parameters | |
270 | obtained from headers. | |
271 | """ | |
272 | password = str (os.urandom (42)) | |
273 | encryptor = crypto.Encrypt (TEST_VERSION, | |
274 | TEST_PARAMVERSION, | |
275 | password=password, | |
276 | nacl=TEST_STATIC_NACL) | |
277 | ||
278 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
279 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) | |
280 | rest, header, fixed = encryptor.done (header_dummy) | |
281 | ciphertext += rest | |
282 | ||
283 | header = crypto.hdr_read (header) | |
284 | header ["paramversion"] = PLAIN_PARAMVERSION | |
285 | ok, header = crypto.hdr_make (header) | |
286 | assert ok | |
287 | ||
288 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) | |
289 | with self.assertRaises (crypto.InvalidParameter): | |
290 | decryptor.next (header) | |
291 | ||
292 | ||
293 | def test_crypto_aes_gcm_dec_plain_ok_insecure (self): | |
294 | """ | |
295 | Allow plaintext crypto mode if *insecure* flag is passed. | |
296 | """ | |
297 | password = str (os.urandom (42)) | |
298 | encryptor = crypto.Encrypt (TEST_VERSION, | |
299 | PLAIN_PARAMVERSION, | |
300 | password=password, | |
301 | nacl=TEST_STATIC_NACL, | |
302 | insecure=True) | |
303 | ||
304 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
305 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) | |
306 | rest, header, fixed = encryptor.done (header_dummy) | |
307 | ciphertext += rest | |
308 | ||
309 | header = crypto.hdr_read (header) | |
310 | header ["paramversion"] = PLAIN_PARAMVERSION | |
311 | ok, header = crypto.hdr_make (header) | |
312 | assert ok | |
313 | ||
314 | decryptor = crypto.Decrypt (password=password, | |
315 | fixedparts=fixed, | |
316 | insecure=True) | |
317 | decryptor.next (header) | |
318 | plaintext = decryptor.process (ciphertext) | |
319 | rest = decryptor.done () | |
320 | plaintext += rest | |
321 | ||
322 | assert plaintext == TEST_PLAINTEXT | |
323 | ||
324 | ||
e2a4e4f0 | 325 | def test_crypto_aes_gcm_dec_bad_tag (self): |
90ee2a74 | 326 | password = str (os.urandom (42)) |
1f3fd7b0 | 327 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 328 | TEST_PARAMVERSION, |
1f3fd7b0 | 329 | password=password, |
90ee2a74 PG |
330 | nacl=TEST_STATIC_NACL) |
331 | ||
332 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
cb7a3911 | 333 | _, ciphertext = encryptor.process (TEST_PLAINTEXT) |
90ee2a74 PG |
334 | ciphertext2, header, fixed = encryptor.done (header_dummy) |
335 | ||
336 | mut_header = bytearray (header) | |
337 | mut_header_vw = memoryview (mut_header) | |
338 | # replace one byte in the tag part of the header | |
339 | second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2] | |
340 | mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 | |
341 | header = bytes (mut_header) | |
342 | ||
1f3fd7b0 | 343 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) |
90ee2a74 PG |
344 | decryptor.next (header) |
345 | plaintext = decryptor.process (ciphertext) | |
1c2f7f07 | 346 | with self.assertRaises (crypto.InvalidGCMTag): |
3ba1441c | 347 | _ = decryptor.done () |
e2a4e4f0 PG |
348 | |
349 | ||
c2d1c3ec PG |
350 | def test_crypto_aes_gcm_enc_multicnk (self): |
351 | cnksiz = 1 << 10 | |
90ee2a74 PG |
352 | pt = fill_mod (1 << 14) |
353 | password = str (os.urandom (42)) | |
1f3fd7b0 | 354 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 355 | TEST_PARAMVERSION, |
1f3fd7b0 | 356 | password=password, |
90ee2a74 PG |
357 | nacl=TEST_STATIC_NACL) |
358 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
c2d1c3ec PG |
359 | |
360 | off = 0 | |
361 | ct = b"" | |
90ee2a74 PG |
362 | while off < len (pt): |
363 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 364 | _, cnk = encryptor.process (pt [off:upto]) |
c2d1c3ec PG |
365 | ct += cnk |
366 | off += cnksiz | |
90ee2a74 PG |
367 | cnk, header, fixed = encryptor.done (header_dummy) |
368 | ct += cnk | |
369 | ||
370 | assert len (pt) == len (ct) | |
c2d1c3ec PG |
371 | |
372 | ||
30019abf PG |
373 | def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self): |
374 | cnksiz = 1 << 10 | |
375 | pt = fill_mod (1 << 14) | |
376 | password = str (os.urandom (42)) | |
377 | encryptor = crypto.Encrypt (TEST_VERSION, | |
378 | TEST_PARAMVERSION, | |
379 | password=password, | |
380 | nacl=TEST_STATIC_NACL, | |
381 | strict_ivs=True) | |
382 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
383 | ||
384 | off = 0 | |
385 | ct = b"" | |
386 | while off < len (pt): | |
387 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 388 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
389 | ct += cnk |
390 | off += cnksiz | |
391 | cnk, header, fixed = encryptor.done (header_dummy) | |
392 | ct += cnk | |
393 | ||
394 | assert len (pt) == len (ct) | |
395 | ||
396 | ||
397 | def test_crypto_aes_gcm_enc_multiobj (self): | |
398 | cnksiz = 1 << 10 | |
399 | password = str (os.urandom (42)) | |
400 | encryptor = crypto.Encrypt (TEST_VERSION, | |
401 | TEST_PARAMVERSION, | |
402 | password=password, | |
403 | nacl=TEST_STATIC_NACL, | |
404 | strict_ivs=False) | |
405 | ||
406 | def addobj (i): | |
407 | pt = fill_mod (1 << 14, off=i) | |
408 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
409 | ||
410 | off = 0 | |
411 | ct = b"" | |
412 | while off < len (pt): | |
413 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 414 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
415 | ct += cnk |
416 | off += cnksiz | |
417 | cnk, header, fixed = encryptor.done (header_dummy) | |
418 | ct += cnk | |
419 | ||
420 | assert len (pt) == len (ct) | |
421 | ||
422 | for i in range (5): addobj (i) | |
423 | ||
be124bca PG |
424 | assert len (encryptor.fixed) == 1 |
425 | ||
30019abf PG |
426 | |
427 | def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self): | |
428 | cnksiz = 1 << 10 | |
429 | password = str (os.urandom (42)) | |
430 | encryptor = crypto.Encrypt (TEST_VERSION, | |
431 | TEST_PARAMVERSION, | |
432 | password=password, | |
433 | nacl=TEST_STATIC_NACL, | |
434 | strict_ivs=True) | |
be124bca | 435 | curfixed = None # must remain constant after first |
30019abf PG |
436 | |
437 | def addobj (i): | |
438 | pt = fill_mod (1 << 14, off=i) | |
439 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
440 | ||
441 | off = 0 | |
442 | ct = b"" | |
443 | while off < len (pt): | |
444 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 445 | _, cnk = encryptor.process (pt [off:upto]) |
30019abf PG |
446 | ct += cnk |
447 | off += cnksiz | |
448 | cnk, header, fixed = encryptor.done (header_dummy) | |
be124bca PG |
449 | nonlocal curfixed |
450 | if curfixed is None: | |
451 | curfixed = fixed | |
452 | else: | |
453 | assert fixed == curfixed | |
30019abf PG |
454 | ct += cnk |
455 | ||
456 | assert len (pt) == len (ct) | |
457 | ||
458 | for i in range (5): addobj (i) | |
459 | ||
be124bca PG |
460 | assert len (encryptor.fixed) == 1 |
461 | ||
30019abf | 462 | |
770173c5 PG |
463 | def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self): |
464 | """ | |
465 | Test behavior when the file counter tops out. | |
466 | ||
467 | Artificially lower the maximum possible file counter. Considering | |
cb7a3911 | 468 | invalid (0) and reserved (1, 2) values, the smallest possible file counter |
770173c5 PG |
469 | for normal objects is 3. Starting from that, the header of the (max - |
470 | 3)rd object must have both a different IV fixed part and a counter. | |
471 | """ | |
472 | minimum = 3 | |
473 | new_max = 8 | |
cb7a3911 PG |
474 | crypto._testing_set_AES_GCM_IV_CNT_MAX \ |
475 | ("I am fully aware that this will void my warranty.", new_max) | |
770173c5 PG |
476 | cnksiz = 1 << 10 |
477 | password = str (os.urandom (42)) | |
478 | encryptor = crypto.Encrypt (TEST_VERSION, | |
479 | TEST_PARAMVERSION, | |
480 | password=password, | |
481 | nacl=TEST_STATIC_NACL, | |
482 | strict_ivs=True) | |
483 | ||
484 | last_iv = None | |
485 | last_cnt = minimum | |
486 | ||
487 | def addobj (i, wrap=False): | |
488 | nonlocal last_iv | |
489 | nonlocal last_cnt | |
490 | pt = fill_mod (1 << 14, off=i) | |
491 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
492 | ||
493 | off = 0 | |
494 | ct = b"" | |
495 | while off < len (pt): | |
496 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 497 | _, cnk = encryptor.process (pt [off:upto]) |
770173c5 PG |
498 | ct += cnk |
499 | off += cnksiz | |
500 | cnk, header, fixed = encryptor.done (header_dummy) | |
501 | this_iv = crypto.hdr_read (header) ["iv"] | |
502 | if last_iv is not None: | |
503 | this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv) | |
504 | last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv) | |
505 | if wrap is False: | |
506 | assert last_fixed == this_fixed | |
507 | assert last_cnt == this_cnt - 1 | |
508 | else: | |
509 | assert last_fixed != this_fixed | |
510 | assert this_cnt == minimum | |
511 | last_iv = this_iv | |
512 | ct += cnk | |
513 | ||
514 | assert len (pt) == len (ct) | |
515 | ||
516 | for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8] | |
517 | addobj (i + 1, True) # counter wraps to 3 | |
518 | ||
519 | for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8] | |
520 | addobj (j + 1, True) # counter wraps to 3 again | |
521 | ||
be124bca PG |
522 | assert len (encryptor.fixed) == 3 |
523 | ||
524 | ||
525 | def test_crypto_aes_gcm_enc_multiobj_cnt_wrap_badfixed (self): | |
526 | """ | |
527 | Test behavior when the file counter tops out and the transition to | |
528 | the next IV fixed part fails on account of a bad random generator. | |
529 | ||
530 | Replaces the ``urandom`` reference in ``os`` with a deterministic | |
531 | function. The encryptor context must communicate this condition with an | |
532 | ``IVFixedPartError``. | |
533 | """ | |
534 | minimum = 3 | |
535 | new_max = 8 | |
536 | crypto._testing_set_AES_GCM_IV_CNT_MAX \ | |
537 | ("I am fully aware that this will void my warranty.", new_max) | |
538 | cnksiz = 1 << 10 | |
539 | os.urandom = lambda n: bytes (bytearray ([n % 256] * n)) | |
540 | password = str (os.urandom (42)) | |
541 | encryptor = crypto.Encrypt (TEST_VERSION, | |
542 | TEST_PARAMVERSION, | |
543 | password=password, | |
544 | nacl=TEST_STATIC_NACL, | |
545 | strict_ivs=True) | |
546 | ||
547 | def addobj (i): | |
548 | pt = fill_mod (1 << 14, off=i) | |
549 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i)) | |
550 | ||
551 | off = 0 | |
552 | while off < len (pt): | |
553 | upto = min (off + cnksiz, len (pt)) | |
554 | _, cnk = encryptor.process (pt [off:upto]) | |
555 | off += cnksiz | |
556 | ||
557 | for i in range (minimum, new_max): addobj (42 + i) | |
558 | ||
559 | with self.assertRaises (crypto.IVFixedPartError): | |
560 | addobj (42 + i) | |
561 | ||
562 | ||
cb7a3911 PG |
563 | |
564 | def test_crypto_aes_gcm_enc_length_cap (self): | |
565 | """ | |
566 | Artificially lower the maximum allowable data length and attempt to | |
e2f52c53 PG |
567 | encrypt a larger object. Verify that the crypto handler only encrypts |
568 | data up to the size limit. A downstream user detects that condition by | |
569 | testing whether the encryption step yielded less bytes than the | |
570 | plaintext. | |
c7066870 TJ |
571 | |
572 | The sibling to this test is test_restore_backup_max_file_length() | |
573 | in test_delatar.py. Deltatar will transparently create a splitted object | |
574 | with an increased IV file counter. | |
cb7a3911 PG |
575 | """ |
576 | new_max = 2187 | |
577 | crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ | |
578 | ("I am fully aware that this will void my warranty.", new_max) | |
579 | cnksiz = 1 << 10 | |
580 | password = str (os.urandom (42)) | |
581 | encryptor = crypto.Encrypt (TEST_VERSION, | |
582 | TEST_PARAMVERSION, | |
583 | password=password, | |
584 | nacl=TEST_STATIC_NACL) | |
585 | ||
586 | def encobj (s): | |
587 | pt, ct = fill_mod (s), None | |
588 | header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s)) | |
589 | ||
590 | n, ct = encryptor.process (pt) | |
591 | rest, _, _ = encryptor.done (header_dummy) | |
366d4b42 PG |
592 | |
593 | # NB: If this check *ever* fails, then something changed in the | |
594 | # encoding layer. AES-GCM is a stream cipher so each encoding | |
595 | # step will yield the exact number of ciphertext bytes that | |
596 | # was provided as plaintext. Thus there cannot be any encoded | |
597 | # data left when calling the finalizers. None of the crypo code | |
598 | # depends on that assumption but nevertheless we check it here | |
599 | # in case anything changes upstream in the Cryptography | |
600 | # library. In case there actually is a rest, replace the | |
601 | # assertion below with ``ct += rest``. | |
602 | assert (len (rest) == 0) | |
cb7a3911 PG |
603 | |
604 | if len (pt) > new_max: | |
e2f52c53 PG |
605 | # If the plaintext was longer than the artificially lowered |
606 | # maximum, then the number of ciphertext bytes must be clamped | |
607 | # to the maximum. | |
608 | assert n == new_max | |
cb7a3911 PG |
609 | else: |
610 | assert n == len (pt) == len (ct) | |
611 | ||
612 | for i in range (16): encobj (1 << i) | |
770173c5 PG |
613 | |
614 | ||
58ed14b8 PG |
615 | def test_crypto_aes_gcm_dec_length_cap (self): |
616 | """ | |
617 | The decryptor must reject headers with an object size that exceeds | |
618 | the PDTCRYPT maximum. Longer files split into multiple objects. | |
619 | """ | |
620 | password = str (os.urandom (42)) | |
621 | meta = faux_hdr() | |
622 | meta ["ctsize"] = crypto.PDTCRYPT_MAX_OBJ_SIZE + 1 | |
623 | ok, header = crypto.hdr_make (meta) | |
624 | ||
625 | assert ok | |
626 | ||
627 | # Set up decryption with bogus header. | |
628 | decryptor = crypto.Decrypt (password=password, fixedparts=[]) | |
629 | ||
630 | with self.assertRaises (crypto.InvalidHeader): | |
631 | decryptor.next (header) | |
632 | ||
633 | ||
634 | def test_crypto_aes_gcm_dec_length_mismatch (self): | |
635 | """ | |
636 | Catch attempts at decrypting more data than what was stated in the | |
637 | header. | |
638 | """ | |
639 | cnksiz = 1 << 10 | |
640 | orig_pt = fill_mod (1 << 14) | |
641 | password = str (os.urandom (42)) | |
642 | encryptor = crypto.Encrypt (TEST_VERSION, | |
643 | TEST_PARAMVERSION, | |
644 | password=password, | |
645 | nacl=TEST_STATIC_NACL) | |
646 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
647 | ||
648 | off = 0 | |
649 | ct = b"" | |
650 | while off < len (orig_pt): | |
651 | upto = min (off + cnksiz, len (orig_pt)) | |
652 | _n, cnk = encryptor.process (orig_pt [off:upto]) | |
653 | ct += cnk | |
654 | off += cnksiz | |
655 | cnk, header, fixed = encryptor.done (header_dummy) | |
656 | ct += cnk | |
657 | ||
658 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed) | |
659 | ||
660 | decryptor.next (header) | |
661 | off = 0 | |
662 | pt = b"" | |
663 | while off < len (orig_pt): | |
664 | upto = min (off + cnksiz, len (orig_pt)) | |
665 | cnk = decryptor.process (ct [off:upto]) | |
666 | pt += cnk | |
667 | off += cnksiz | |
668 | ||
669 | with self.assertRaises (crypto.CiphertextTooLong): | |
670 | # Try and decrypt one byte more than was encrypted. | |
671 | # This must be caught in crypto.py. | |
672 | _ = decryptor.process (ct [0:1]) | |
673 | ||
674 | ||
c2d1c3ec | 675 | def test_crypto_aes_gcm_dec_multicnk (self): |
90ee2a74 PG |
676 | cnksiz = 1 << 10 |
677 | orig_pt = fill_mod (1 << 14) | |
678 | password = str (os.urandom (42)) | |
1f3fd7b0 | 679 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 680 | TEST_PARAMVERSION, |
1f3fd7b0 | 681 | password=password, |
90ee2a74 PG |
682 | nacl=TEST_STATIC_NACL) |
683 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
c2d1c3ec PG |
684 | |
685 | off = 0 | |
686 | ct = b"" | |
687 | while off < len (orig_pt): | |
688 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 689 | _n, cnk = encryptor.process (orig_pt [off:upto]) |
c2d1c3ec PG |
690 | ct += cnk |
691 | off += cnksiz | |
90ee2a74 PG |
692 | cnk, header, fixed = encryptor.done (header_dummy) |
693 | ct += cnk | |
c2d1c3ec | 694 | |
1f3fd7b0 PG |
695 | decryptor = crypto.Decrypt (password=password, |
696 | fixedparts=fixed) | |
90ee2a74 | 697 | decryptor.next (header) |
c2d1c3ec | 698 | off = 0 |
90ee2a74 | 699 | pt = b"" |
c2d1c3ec PG |
700 | while off < len (orig_pt): |
701 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 702 | cnk = decryptor.process (ct [off:upto]) |
c2d1c3ec PG |
703 | pt += cnk |
704 | off += cnksiz | |
c2d1c3ec | 705 | |
90ee2a74 | 706 | |
3ba1441c | 707 | pt += decryptor.done () |
c2d1c3ec PG |
708 | assert pt == orig_pt |
709 | ||
710 | ||
7c32c176 | 711 | def test_crypto_aes_gcm_dec_multicnk_bad_tag (self): |
90ee2a74 PG |
712 | cnksiz = 1 << 10 |
713 | orig_pt = fill_mod (1 << 14) | |
714 | password = str (os.urandom (42)) | |
1f3fd7b0 | 715 | encryptor = crypto.Encrypt (TEST_VERSION, |
90ee2a74 | 716 | TEST_PARAMVERSION, |
1f3fd7b0 | 717 | password=password, |
90ee2a74 PG |
718 | nacl=TEST_STATIC_NACL) |
719 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
7c32c176 PG |
720 | |
721 | off = 0 | |
722 | ct = b"" | |
723 | while off < len (orig_pt): | |
724 | upto = min (off + cnksiz, len (orig_pt)) | |
cb7a3911 | 725 | _n, cnk = encryptor.process (orig_pt [off:upto]) |
7c32c176 PG |
726 | ct += cnk |
727 | off += cnksiz | |
90ee2a74 PG |
728 | cnk, header, fixed = encryptor.done (header_dummy) |
729 | ct += cnk | |
730 | ||
731 | mut_header = bytearray (header) | |
732 | mut_header_vw = memoryview (mut_header) | |
733 | # replace one byte in the tag part of the header | |
734 | second_byte = mut_header_vw [crypto.HDR_OFF_TAG + 2] | |
735 | mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256 | |
736 | header = bytes (mut_header) | |
737 | ||
1f3fd7b0 PG |
738 | decryptor = crypto.Decrypt (password=password, |
739 | fixedparts=fixed) | |
90ee2a74 | 740 | decryptor.next (header) |
7c32c176 | 741 | off = 0 |
90ee2a74 | 742 | pt = b"" |
7c32c176 PG |
743 | while off < len (orig_pt): |
744 | upto = min (off + cnksiz, len (orig_pt)) | |
90ee2a74 | 745 | cnk = decryptor.process (ct [off:upto]) |
7c32c176 PG |
746 | pt += cnk |
747 | off += cnksiz | |
748 | ||
1c2f7f07 | 749 | with self.assertRaises (crypto.InvalidGCMTag): |
3ba1441c | 750 | _ = decryptor.done () |
da4ea4fa PG |
751 | |
752 | ||
2f0b128c PG |
753 | def test_crypto_aes_gcm_dec_iv_gap (self): |
754 | """ | |
755 | Encrypt multiple objects using non-consecutive IVs and verify that the | |
756 | decryptor errors out with an exception in strict mode but keeps quiet | |
757 | otherwise. | |
758 | """ | |
759 | cnksiz = 1 << 10 | |
760 | orig_pt_1 = fill_mod (1 << 10) | |
761 | orig_pt_2 = fill_mod (1 << 10, 23) | |
762 | orig_pt_3 = fill_mod (1 << 10, 42) | |
763 | password = str (os.urandom (42)) | |
764 | encryptor = crypto.Encrypt (TEST_VERSION, | |
765 | TEST_PARAMVERSION, | |
766 | password=password, | |
767 | nacl=TEST_STATIC_NACL) | |
768 | ||
769 | def enc (pt): | |
770 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
771 | ||
772 | off = 0 | |
773 | ct = b"" | |
774 | while off < len (pt): | |
775 | upto = min (off + cnksiz, len (pt)) | |
776 | _n, cnk = encryptor.process (pt [off:upto]) | |
777 | ct += cnk | |
778 | off += cnksiz | |
779 | cnk, header, fixed = encryptor.done (header_dummy) | |
780 | return ct + cnk, header, fixed | |
781 | ||
782 | ct_1, hdr_1, _____ = enc (orig_pt_1) | |
783 | ||
784 | ## Here we bump the iv of the encryptor, breaking the series. | |
785 | encryptor.set_object_counter (encryptor.cnt + 1) | |
786 | ct_2, hdr_2, fixed = enc (orig_pt_2) | |
787 | ||
788 | ## IV of final object is again in-sequence. | |
789 | ct_3, hdr_3, fixed = enc (orig_pt_3) | |
790 | ||
791 | def decrypt (strict_ivs): | |
792 | decryptor = crypto.Decrypt (password=password, fixedparts=fixed, | |
793 | strict_ivs=strict_ivs) | |
794 | ||
795 | def dec (hdr, ct): | |
796 | decryptor.next (hdr) | |
797 | off = 0 | |
798 | pt = b"" | |
799 | while off < len (ct): | |
800 | upto = min (off + cnksiz, len (ct)) | |
801 | cnk = decryptor.process (ct [off:upto]) | |
802 | pt += cnk | |
803 | off += cnksiz | |
804 | return pt + decryptor.done () | |
805 | ||
806 | decr_pt_1 = dec (hdr_1, ct_1) | |
807 | decr_pt_2 = dec (hdr_2, ct_2) ## ← good header, non-consecutive IV | |
808 | decr_pt_3 = dec (hdr_3, ct_3) | |
809 | ||
810 | assert decr_pt_1 == orig_pt_1 | |
811 | assert decr_pt_2 == orig_pt_2 | |
812 | assert decr_pt_3 == orig_pt_3 | |
813 | ||
814 | with self.assertRaises (crypto.NonConsecutiveIV): | |
815 | decrypt (True) | |
816 | ||
817 | decrypt (False) # Sequence passes | |
818 | ||
819 | ||
fd10b44a PG |
820 | def test_crypto_aes_gcm_dec_iv_reuse (self): |
821 | """ | |
822 | Meddle with encrypted content: extract the IV from one object | |
823 | and inject it into the header of another. This must be rejected | |
b750b280 | 824 | by the decryptor with paranoid IV checking enabled. |
fd10b44a PG |
825 | """ |
826 | cnksiz = 1 << 10 | |
827 | orig_pt_1 = fill_mod (1 << 10) | |
828 | orig_pt_2 = fill_mod (1 << 10, 42) | |
829 | password = str (os.urandom (42)) | |
1f3fd7b0 | 830 | encryptor = crypto.Encrypt (TEST_VERSION, |
fd10b44a | 831 | TEST_PARAMVERSION, |
1f3fd7b0 | 832 | password=password, |
b750b280 PG |
833 | nacl=TEST_STATIC_NACL, |
834 | strict_ivs=True) | |
fd10b44a PG |
835 | |
836 | def enc (pt): | |
837 | header_dummy = encryptor.next (TEST_DUMMY_FILENAME) | |
838 | ||
839 | off = 0 | |
840 | ct = b"" | |
841 | while off < len (pt): | |
842 | upto = min (off + cnksiz, len (pt)) | |
cb7a3911 | 843 | _n, cnk = encryptor.process (pt [off:upto]) |
fd10b44a PG |
844 | ct += cnk |
845 | off += cnksiz | |
846 | cnk, header, fixed = encryptor.done (header_dummy) | |
847 | return ct + cnk, header, fixed | |
848 | ||
849 | ct_1, hdr_1, _____ = enc (orig_pt_1) | |
66b1c6f4 | 850 | encryptor.cnt -= 1 # induce error by forcing an identical IV on next step |
fd10b44a | 851 | |
66b1c6f4 PG |
852 | with self.assertRaises (crypto.DuplicateIV): # reuse detected |
853 | ct_2, hdr_2, fixed = enc (orig_pt_2) | |
fd10b44a PG |
854 | |
855 | ||
cb7397d5 PG |
856 | class HeaderTest (CryptoLayerTest): |
857 | ||
e2a4e4f0 PG |
858 | def test_crypto_fmt_hdr_make (self): |
859 | meta = faux_hdr() | |
860 | ok, hdr = crypto.hdr_make (meta) | |
861 | assert ok | |
90ee2a74 | 862 | assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE |
e2a4e4f0 PG |
863 | |
864 | ||
cb7397d5 PG |
865 | def test_crypto_fmt_hdr_make_useless (self): |
866 | ok, ret = crypto.hdr_make ({ 42: "x" }) | |
867 | assert ok is False | |
a83fa4ed | 868 | assert ret.startswith ("error assembling header:") |
cb7397d5 PG |
869 | |
870 | ||
e2a4e4f0 PG |
871 | def test_crypto_fmt_hdr_read (self): |
872 | meta = faux_hdr() | |
873 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
874 | assert ok is True |
875 | assert hdr is not None | |
876 | mmeta = crypto.hdr_read (hdr) | |
877 | assert mmeta is not None | |
e2a4e4f0 PG |
878 | for k in meta: |
879 | if meta [k] != mmeta [k]: | |
880 | raise "header mismatch after reading: expected %r, got %r" \ | |
881 | % (meta [k], mmeta [k]) | |
882 | ||
883 | ||
cb7397d5 PG |
884 | def test_crypto_fmt_hdr_read_trailing_garbage (self): |
885 | meta = faux_hdr() | |
886 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
887 | ok, hdr = crypto.hdr_make (meta) |
888 | assert ok is True | |
889 | assert hdr is not None | |
cb7397d5 | 890 | hdr += b"-junk" |
1c2f7f07 | 891 | with self.assertRaises (crypto.InvalidHeader): |
90ee2a74 | 892 | _ = crypto.hdr_read (hdr) |
cb7397d5 PG |
893 | |
894 | ||
895 | def test_crypto_fmt_hdr_read_leading_garbage (self): | |
896 | meta = faux_hdr() | |
897 | ok, hdr = crypto.hdr_make (meta) | |
90ee2a74 PG |
898 | ok, hdr = crypto.hdr_make (meta) |
899 | assert ok is True | |
900 | assert hdr is not None | |
cb7397d5 | 901 | hdr = b"junk-" + hdr |
1c2f7f07 | 902 | with self.assertRaises (crypto.InvalidHeader): |
90ee2a74 | 903 | _ = crypto.hdr_read (hdr) |
cb7397d5 PG |
904 | |
905 | ||
906 | def test_crypto_fmt_hdr_inner_garbage (self): | |
907 | meta = faux_hdr() | |
908 | ok, hdr = crypto.hdr_make (meta) | |
c176405d | 909 | assert ok |
90ee2a74 | 910 | data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:] |
1c2f7f07 | 911 | with self.assertRaises (crypto.InvalidHeader): |
90ee2a74 | 912 | _ = crypto.hdr_read (data) |
c176405d | 913 |