graciously handle GCM data length limit
[python-delta-tar] / testing / test_crypto.py
1 import binascii
2 import os
3 import pylibscrypt
4 import struct
5 import unittest
6
7 import deltatar.crypto as crypto
8
9 import cryptography
10
11 def b(s):
12     return s.encode("UTF-8")
13
14 CRYPTO_NACL_SIZE  = 16
15 CRYPTO_KEY_SIZE   = 16
16
17 TEST_PLAINTEXT       = b("gentlemen don’t read each other’s mail")
18 TEST_PASSPHRASE      = b"test1234"
19 TEST_AES_GCM_AAD     = b"authenticated plain text"
20 TEST_DUMMY_FILENAME  = "insurance-file.txt"
21 TEST_VERSION         = 1
22 TEST_PARAMVERSION    = 1
23 TEST_STATIC_NACL     = os.urandom (CRYPTO_NACL_SIZE)
24
25 def faux_hdr (ctsize=1337, iv=None):
26     return \
27         {      "version" : 42
28         , "paramversion" : 2187
29         ,         "nacl" : binascii.unhexlify(b"0011223344556677"
30                                               b"8899aabbccddeeff")
31         ,           "iv" : iv or binascii.unhexlify(b"0011223344556677"
32                                                     b"8899aabb")
33         ,       "ctsize" : ctsize
34         ,          "tag" : binascii.unhexlify(b"deadbeefbadb100d"
35                                               b"b1eedc0ffeedea15")
36         }
37
38 FILL_MOD_MEMO = { }
39
40 def fill_mod (n, off=0):
41     global FILL_MOD_MEMO
42     k = (n, off)
43     m = FILL_MOD_MEMO.get (k, None)
44     if m is not None:
45         return m
46     buf = bytearray (n)
47     bufv = memoryview (buf)
48     for i in range (n):
49         off += 1
50         c = off % 64 + 32
51         struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
52     m = bytes (buf)
53     FILL_MOD_MEMO [k] = m
54     return m
55
56
57 def faux_payload ():
58     return "abcd" * 42
59
60
61 class CryptoLayerTest (unittest.TestCase):
62     pass
63
64
65 class AESGCMTest (CryptoLayerTest):
66
67     def tearDown (self):
68         """Reset globals altered for testing."""
69         _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
70                   ("I am fully aware that this will void my warranty.")
71         _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
72                   ("I am fully aware that this will void my warranty.")
73
74     def test_crypto_aes_gcm_enc_ctor (self):
75         password   = str (os.urandom (42))
76         encryptor  = crypto.Encrypt (TEST_VERSION,
77                                      TEST_PARAMVERSION,
78                                      password=password,
79                                      nacl=TEST_STATIC_NACL)
80
81
82     def test_crypto_aes_gcm_enc_ctor_key (self):
83         key        = os.urandom (42)
84         encryptor  = crypto.Encrypt (TEST_VERSION,
85                                      TEST_PARAMVERSION,
86                                      key=key,
87                                      nacl=TEST_STATIC_NACL)
88
89
90     def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
91         """
92         Either key (+nacl) or password must be supplied, not both.
93         """
94         try:
95             encryptor = crypto.Encrypt (TEST_VERSION,
96                                         TEST_PARAMVERSION,
97                                         nacl=TEST_STATIC_NACL)
98         except crypto.InvalidParameter: # neither key nor pw
99             pass
100
101         password = str (os.urandom (42))
102         key      =      os.urandom (16)  # scrypt sized
103         try:
104             encryptor = crypto.Encrypt (TEST_VERSION,
105                                         TEST_PARAMVERSION,
106                                         password=password,
107                                         key=key,
108                                         nacl=TEST_STATIC_NACL)
109         except crypto.InvalidParameter: # both key and pw
110             pass
111
112         try:
113             encryptor = crypto.Encrypt (TEST_VERSION,
114                                         TEST_PARAMVERSION,
115                                         key=key,
116                                         nacl=None)
117         except crypto.InvalidParameter: # key, but salt missing
118             pass
119
120         try:
121             encryptor = crypto.Encrypt (TEST_VERSION,
122                                         TEST_PARAMVERSION,
123                                         password=b"",
124                                         nacl=TEST_STATIC_NACL)
125         except crypto.InvalidParameter: # empty pw
126             pass
127
128
129     def test_crypto_aes_gcm_enc_header_size (self):
130         password       = str (os.urandom (42))
131         encryptor      = crypto.Encrypt (TEST_VERSION,
132                                          TEST_PARAMVERSION,
133                                          password=password,
134                                          nacl=TEST_STATIC_NACL)
135
136         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
137         assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
138         _, _           = encryptor.process (TEST_PLAINTEXT)
139         _, header, _   = encryptor.done (header_dummy)
140         assert len (header) == crypto.PDTCRYPT_HDR_SIZE
141
142
143     def test_crypto_aes_gcm_enc_chunk_size (self):
144         password       = str (os.urandom (42))
145         encryptor      = crypto.Encrypt (TEST_VERSION,
146                                          TEST_PARAMVERSION,
147                                          password=password,
148                                          nacl=TEST_STATIC_NACL)
149
150         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
151         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
152         assert len (ciphertext) == len (TEST_PLAINTEXT)
153         rest, header, fixed = encryptor.done (header_dummy)
154         assert len (rest) == 0
155
156
157     def test_crypto_aes_gcm_dec_ctor (self):
158         """
159         Ensure that only either key or password is accepted.
160         """
161         password = str (os.urandom (42))
162         key      =      os.urandom (16)  # scrypt sized
163
164         decryptor = crypto.Decrypt (password=password)
165         decryptor = crypto.Decrypt (key=key)
166
167         try:
168             decryptor = crypto.Decrypt (password=password, key=key)
169         except crypto.InvalidParameter: # both password and key
170             pass
171
172         try:
173             decryptor = crypto.Decrypt (password=None, key=None)
174         except crypto.InvalidParameter: # neither password nor key
175             pass
176
177         try:
178             decryptor = crypto.Decrypt (password="")
179         except crypto.InvalidParameter: # empty password
180             pass
181
182
183     def test_crypto_aes_gcm_dec_simple (self):
184         password       = str (os.urandom (42))
185         encryptor      = crypto.Encrypt (TEST_VERSION,
186                                          TEST_PARAMVERSION,
187                                          password=password,
188                                          nacl=TEST_STATIC_NACL)
189
190         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
191         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
192         rest, header, fixed = encryptor.done (header_dummy)
193         ciphertext    += rest
194
195         decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
196         decryptor.next (header)
197         plaintext      = decryptor.process (ciphertext)
198         rest           = decryptor.done ()
199         plaintext     += rest
200
201         assert plaintext == TEST_PLAINTEXT
202
203
204     def test_crypto_aes_gcm_dec_bad_tag (self):
205         password       = str (os.urandom (42))
206         encryptor      = crypto.Encrypt (TEST_VERSION,
207                                          TEST_PARAMVERSION,
208                                          password=password,
209                                          nacl=TEST_STATIC_NACL)
210
211         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
212         _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
213         ciphertext2, header, fixed = encryptor.done (header_dummy)
214
215         mut_header     = bytearray (header)
216         mut_header_vw  = memoryview (mut_header)
217         # replace one byte in the tag part of the header
218         second_byte    = mut_header_vw [crypto.HDR_OFF_TAG + 2]
219         mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
220         header         = bytes (mut_header)
221
222         decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
223         decryptor.next (header)
224         plaintext      = decryptor.process (ciphertext)
225         try:
226             _ = decryptor.done ()
227         except crypto.InvalidGCMTag:
228             pass
229
230
231     def test_crypto_aes_gcm_enc_multicnk (self):
232         cnksiz = 1 << 10
233         pt    = fill_mod (1 << 14)
234         password       = str (os.urandom (42))
235         encryptor      = crypto.Encrypt (TEST_VERSION,
236                                          TEST_PARAMVERSION,
237                                          password=password,
238                                          nacl=TEST_STATIC_NACL)
239         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
240
241         off = 0
242         ct = b""
243         while off < len (pt):
244             upto = min (off + cnksiz, len (pt))
245             _, cnk = encryptor.process (pt [off:upto])
246             ct += cnk
247             off += cnksiz
248         cnk, header, fixed = encryptor.done (header_dummy)
249         ct += cnk
250
251         assert len (pt) == len (ct)
252
253
254     def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
255         cnksiz = 1 << 10
256         pt    = fill_mod (1 << 14)
257         password       = str (os.urandom (42))
258         encryptor      = crypto.Encrypt (TEST_VERSION,
259                                          TEST_PARAMVERSION,
260                                          password=password,
261                                          nacl=TEST_STATIC_NACL,
262                                          strict_ivs=True)
263         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
264
265         off = 0
266         ct = b""
267         while off < len (pt):
268             upto = min (off + cnksiz, len (pt))
269             _, cnk = encryptor.process (pt [off:upto])
270             ct += cnk
271             off += cnksiz
272         cnk, header, fixed = encryptor.done (header_dummy)
273         ct += cnk
274
275         assert len (pt) == len (ct)
276
277
278     def test_crypto_aes_gcm_enc_multiobj (self):
279         cnksiz    = 1 << 10
280         password  = str (os.urandom (42))
281         encryptor = crypto.Encrypt (TEST_VERSION,
282                                     TEST_PARAMVERSION,
283                                     password=password,
284                                     nacl=TEST_STATIC_NACL,
285                                     strict_ivs=False)
286
287         def addobj (i):
288             pt           = fill_mod (1 << 14, off=i)
289             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
290
291             off = 0
292             ct = b""
293             while off < len (pt):
294                 upto = min (off + cnksiz, len (pt))
295                 _, cnk = encryptor.process (pt [off:upto])
296                 ct += cnk
297                 off += cnksiz
298             cnk, header, fixed = encryptor.done (header_dummy)
299             ct += cnk
300
301             assert len (pt) == len (ct)
302
303         for i in range (5): addobj (i)
304
305
306     def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
307         cnksiz    = 1 << 10
308         password  = str (os.urandom (42))
309         encryptor = crypto.Encrypt (TEST_VERSION,
310                                     TEST_PARAMVERSION,
311                                     password=password,
312                                     nacl=TEST_STATIC_NACL,
313                                     strict_ivs=True)
314
315         def addobj (i):
316             pt           = fill_mod (1 << 14, off=i)
317             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
318
319             off = 0
320             ct = b""
321             while off < len (pt):
322                 upto = min (off + cnksiz, len (pt))
323                 _, cnk = encryptor.process (pt [off:upto])
324                 ct += cnk
325                 off += cnksiz
326             cnk, header, fixed = encryptor.done (header_dummy)
327             ct += cnk
328
329             assert len (pt) == len (ct)
330
331         for i in range (5): addobj (i)
332
333
334     def test_crypto_aes_gcm_enc_multiobj_cnt_wrap (self):
335         """
336         Test behavior when the file counter tops out.
337
338         Artificially lower the maximum possible file counter. Considering
339         invalid (0) and reserved (1, 2) values, the smallest possible file counter
340         for normal objects is 3. Starting from that, the header of the (max -
341         3)rd object must have both a different IV fixed part and a counter.
342         """
343         minimum = 3
344         new_max = 8
345         crypto._testing_set_AES_GCM_IV_CNT_MAX \
346                 ("I am fully aware that this will void my warranty.", new_max)
347         cnksiz    = 1 << 10
348         password  = str (os.urandom (42))
349         encryptor = crypto.Encrypt (TEST_VERSION,
350                                     TEST_PARAMVERSION,
351                                     password=password,
352                                     nacl=TEST_STATIC_NACL,
353                                     strict_ivs=True)
354
355         last_iv  = None
356         last_cnt = minimum
357
358         def addobj (i, wrap=False):
359             nonlocal last_iv
360             nonlocal last_cnt
361             pt           = fill_mod (1 << 14, off=i)
362             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
363
364             off = 0
365             ct = b""
366             while off < len (pt):
367                 upto = min (off + cnksiz, len (pt))
368                 _, cnk = encryptor.process (pt [off:upto])
369                 ct += cnk
370                 off += cnksiz
371             cnk, header, fixed = encryptor.done (header_dummy)
372             this_iv = crypto.hdr_read (header) ["iv"]
373             if last_iv is not None:
374                 this_fixed, this_cnt = struct.unpack (crypto.FMT_I2N_IV, this_iv)
375                 last_fixed, last_cnt = struct.unpack (crypto.FMT_I2N_IV, last_iv)
376                 if wrap is False:
377                     assert last_fixed == this_fixed
378                     assert last_cnt   == this_cnt - 1
379                 else:
380                     assert last_fixed != this_fixed
381                     assert this_cnt   == minimum
382             last_iv = this_iv
383             ct += cnk
384
385             assert len (pt) == len (ct)
386
387         for i in range (minimum, new_max + 1): addobj (i) # counter range: [3, 8]
388         addobj (i + 1, True) # counter wraps to 3
389
390         for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
391         addobj (j + 1, True) # counter wraps to 3 again
392
393
394     def test_crypto_aes_gcm_enc_length_cap (self):
395         """
396         Artificially lower the maximum allowable data length and attempt to
397         encrypt a larger object. Verify that the crypto handler aborts with and
398         exception.
399         """
400         new_max = 2187
401         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
402                 ("I am fully aware that this will void my warranty.", new_max)
403         cnksiz    = 1 << 10
404         password  = str (os.urandom (42))
405         encryptor = crypto.Encrypt (TEST_VERSION,
406                                     TEST_PARAMVERSION,
407                                     password=password,
408                                     nacl=TEST_STATIC_NACL)
409
410         def encobj (s):
411             pt, ct       = fill_mod (s), None
412             header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
413
414             n, ct = encryptor.process (pt)
415             rest, _, _ = encryptor.done (header_dummy)
416             ct += rest
417
418             if len (pt) > new_max:
419                 assert n < len (pt)
420             else:
421                 assert n == len (pt) == len (ct)
422
423         for i in range (16): encobj (1 << i)
424
425
426     def test_crypto_aes_gcm_dec_multicnk (self):
427         cnksiz         = 1 << 10
428         orig_pt        = fill_mod (1 << 14)
429         password       = str (os.urandom (42))
430         encryptor      = crypto.Encrypt (TEST_VERSION,
431                                          TEST_PARAMVERSION,
432                                          password=password,
433                                          nacl=TEST_STATIC_NACL)
434         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
435
436         off = 0
437         ct = b""
438         while off < len (orig_pt):
439             upto = min (off + cnksiz, len (orig_pt))
440             _n, cnk = encryptor.process (orig_pt [off:upto])
441             ct += cnk
442             off += cnksiz
443         cnk, header, fixed = encryptor.done (header_dummy)
444         ct += cnk
445
446         decryptor      = crypto.Decrypt (password=password,
447                                          fixedparts=fixed)
448         decryptor.next (header)
449         off = 0
450         pt  = b""
451         while off < len (orig_pt):
452             upto = min (off + cnksiz, len (orig_pt))
453             cnk  = decryptor.process (ct [off:upto])
454             pt += cnk
455             off += cnksiz
456
457
458         pt += decryptor.done ()
459         assert pt == orig_pt
460
461
462     def test_crypto_aes_gcm_dec_multicnk_bad_tag (self):
463         cnksiz         = 1 << 10
464         orig_pt        = fill_mod (1 << 14)
465         password       = str (os.urandom (42))
466         encryptor      = crypto.Encrypt (TEST_VERSION,
467                                          TEST_PARAMVERSION,
468                                          password=password,
469                                          nacl=TEST_STATIC_NACL)
470         header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
471
472         off = 0
473         ct = b""
474         while off < len (orig_pt):
475             upto = min (off + cnksiz, len (orig_pt))
476             _n, cnk = encryptor.process (orig_pt [off:upto])
477             ct += cnk
478             off += cnksiz
479         cnk, header, fixed = encryptor.done (header_dummy)
480         ct += cnk
481
482         mut_header     = bytearray (header)
483         mut_header_vw  = memoryview (mut_header)
484         # replace one byte in the tag part of the header
485         second_byte    = mut_header_vw [crypto.HDR_OFF_TAG + 2]
486         mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
487         header         = bytes (mut_header)
488
489         decryptor      = crypto.Decrypt (password=password,
490                                          fixedparts=fixed)
491         decryptor.next (header)
492         off = 0
493         pt  = b""
494         while off < len (orig_pt):
495             upto = min (off + cnksiz, len (orig_pt))
496             cnk = decryptor.process (ct [off:upto])
497             pt += cnk
498             off += cnksiz
499
500         try:
501             _ = decryptor.done ()
502         except crypto.InvalidGCMTag:
503             pass
504
505
506     def test_crypto_aes_gcm_dec_iv_reuse (self):
507         """
508         Meddle with encrypted content: extract the IV from one object
509         and inject it into the header of another. This must be rejected
510         by the decryptor.
511         """
512         cnksiz         = 1 << 10
513         orig_pt_1      = fill_mod (1 << 10)
514         orig_pt_2      = fill_mod (1 << 10, 42)
515         password       = str (os.urandom (42))
516         encryptor      = crypto.Encrypt (TEST_VERSION,
517                                          TEST_PARAMVERSION,
518                                          password=password,
519                                          nacl=TEST_STATIC_NACL)
520
521         def enc (pt):
522             header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
523
524             off = 0
525             ct = b""
526             while off < len (pt):
527                 upto = min (off + cnksiz, len (pt))
528                 _n, cnk = encryptor.process (pt [off:upto])
529                 ct += cnk
530                 off += cnksiz
531             cnk, header, fixed = encryptor.done (header_dummy)
532             return ct + cnk, header, fixed
533
534         ct_1, hdr_1, _____ = enc (orig_pt_1)
535         ct_2, hdr_2, fixed = enc (orig_pt_2)
536
537         mut_hdr_2    = bytearray (hdr_2)
538         mut_hdr_2_vw = memoryview (mut_hdr_2)
539         # get IV
540         iv_lo        = crypto.HDR_OFF_IV
541         iv_hi        = crypto.HDR_OFF_IV + crypto.PDTCRYPT_HDR_SIZE_IV
542         iv_1         = hdr_1 [iv_lo : iv_hi]
543         # transplant into other header
544         mut_hdr_2_vw [iv_lo : iv_hi] = iv_1
545         hdr_2_mod    = bytes (mut_hdr_2)
546         decryptor    = crypto.Decrypt (password=password, fixedparts=fixed,
547                                        strict_ivs=True)
548
549         def dec (hdr, ct):
550             decryptor.next (hdr)
551             off = 0
552             pt  = b""
553             while off < len (ct):
554                 upto = min (off + cnksiz, len (ct))
555                 cnk = decryptor.process (ct [off:upto])
556                 pt += cnk
557                 off += cnksiz
558             return pt + decryptor.done ()
559
560         decr_pt_1 = dec (hdr_1, ct_1)
561         decr_pt_2 = dec (hdr_2, ct_2) # good header, different IV
562         try:
563             decr_pt_2 = dec (hdr_2_mod, ct_2)
564         except crypto.DuplicateIV: # bad header, reuse detected
565             pass
566
567
568 class HeaderTest (CryptoLayerTest):
569
570     def test_crypto_fmt_hdr_make (self):
571         meta = faux_hdr()
572         ok, hdr = crypto.hdr_make (meta)
573         assert ok
574         assert len (hdr) == crypto.PDTCRYPT_HDR_SIZE
575
576
577     def test_crypto_fmt_hdr_make_useless (self):
578         ok, ret = crypto.hdr_make ({ 42: "x" })
579         assert ok is False
580         assert ret.startswith ("error writing header:")
581
582
583     def test_crypto_fmt_hdr_read (self):
584         meta = faux_hdr()
585         ok, hdr = crypto.hdr_make (meta)
586         assert ok is True
587         assert hdr is not None
588         mmeta = crypto.hdr_read (hdr)
589         assert mmeta is not None
590         for k in meta:
591             if meta [k] != mmeta [k]:
592                 raise "header mismatch after reading: expected %r, got %r" \
593                       % (meta [k], mmeta [k])
594
595
596     def test_crypto_fmt_hdr_read_trailing_garbage (self):
597         meta = faux_hdr()
598         ok, hdr = crypto.hdr_make (meta)
599         ok, hdr = crypto.hdr_make (meta)
600         assert ok is True
601         assert hdr is not None
602         hdr += b"-junk"
603         try:
604             _ = crypto.hdr_read (hdr)
605         except crypto.InvalidHeader:
606             pass
607
608
609     def test_crypto_fmt_hdr_read_leading_garbage (self):
610         meta = faux_hdr()
611         ok, hdr = crypto.hdr_make (meta)
612         ok, hdr = crypto.hdr_make (meta)
613         assert ok is True
614         assert hdr is not None
615         hdr = b"junk-" + hdr
616         try:
617             _ = crypto.hdr_read (hdr)
618         except crypto.InvalidHeader:
619             pass
620
621
622     def test_crypto_fmt_hdr_inner_garbage (self):
623         meta = faux_hdr()
624         ok, hdr = crypto.hdr_make (meta)
625         assert ok
626         data = hdr[:len(hdr)//2] + b"junk-" + hdr[len(hdr)//2:]
627         try:
628             _ = crypto.hdr_read (data)
629         except crypto.InvalidHeader:
630             pass
631