extend strict iv tracking to encryption
authorPhilipp Gesang <philipp.gesang@intra2net.com>
Tue, 9 May 2017 08:22:43 +0000 (10:22 +0200)
committerThomas Jarosch <thomas.jarosch@intra2net.com>
Mon, 2 Apr 2018 11:34:08 +0000 (13:34 +0200)
This is just an extra soundness check to prevent accidental reuse
if IVs when handled incorrectly (same initial counters passed
twice to the same context). In normal usage this case cannot
happen.

deltatar/crypto.py
testing/test_crypto.py

index b8ad82d..e33de7a 100755 (executable)
@@ -447,8 +447,9 @@ class Crypto (object):
     key  = None
     cnt  = None # file counter (uint32_t != 0)
     iv   = None # current IV
-    fixed        = None # accu for 64 bit fixed parts of IV
-    used_ivs     = None # tracks IVs during decryption
+    fixed        = None  # accu for 64 bit fixed parts of IV
+    used_ivs     = None  # tracks IVs
+    strict_ivs   = False # if True, panic on duplicate object IV
     password     = None
     paramversion = None
     stats = { "in"  : 0
@@ -461,6 +462,7 @@ class Crypto (object):
     index_counter_used = False
 
     def __init__ (self, *al, **akv):
+        self.used_ivs = set ()
         self.set_parameters (*al, **akv)
 
 
@@ -495,12 +497,15 @@ class Crypto (object):
 
 
     def set_parameters (self, password=None, key=None, paramversion=None,
-                        nacl=None, counter=None, nextpfx=None):
+                        nacl=None, counter=None, nextpfx=None,
+                        strict_ivs=False):
         if nextpfx is not None:
             self.next_pfx = nextpfx
             self.next_pfx ()
         self.set_object_counter (counter)
 
+        self.strict_ivs = strict_ivs
+
         if key is not None:
             self.key, self.nacl = key, nacl
             return
@@ -525,15 +530,25 @@ class Crypto (object):
         return b""
 
 
-    def next (self, password, paramversion, nacl):
+    def next (self, password, paramversion, nacl, iv):
         self.ctsize = 0
         self.ptsize = 0
         self.stats ["obj"] += 1
+
+        self.check_duplicate_iv (iv)
+
         if (   self.paramversion != paramversion
             or self.password     != password
             or self.nacl         != nacl):
             self.set_parameters (password=password, paramversion=paramversion,
-                                 nacl=nacl)
+                                 nacl=nacl, strict_ivs=self.strict_ivs)
+
+
+    def check_duplicate_iv (self, iv):
+        if self.strict_ivs is True and iv in self.used_ivs:
+            raise DuplicateIV ("iv %s was reused" % iv_fmt (iv))
+        # vi has not been used before; add to collection
+        self.used_ivs.add (iv)
 
 
     def counters (self):
@@ -547,7 +562,7 @@ class Encrypt (Crypto):
     paramenc     = None
 
     def __init__ (self, version, paramversion, password=None, key=None, nacl=None,
-                  counter=AES_GCM_IV_CNT_DATA):
+                  counter=AES_GCM_IV_CNT_DATA, strict_ivs=True):
         if         password is     None and key is     None \
                 or password is not None and key is not None :
             raise InvalidParameter ("__init__: need either key or password")
@@ -594,7 +609,8 @@ class Encrypt (Crypto):
         self.paramenc     = ENCRYPTION_PARAMETERS.get (paramversion) ["enc"]
 
         super().__init__ (password, key, paramversion, nacl, counter=counter,
-                          nextpfx=lambda: self.fixed.append (os.urandom(8)))
+                          nextpfx=lambda: self.fixed.append (os.urandom(8)),
+                          strict_ivs=strict_ivs)
 
 
     def iv_make (self):
@@ -625,7 +641,7 @@ class Encrypt (Crypto):
                                     % self.paramversion)
         hdrdum = hdr_make_dummy (filename)
         self.lastinfo = (filename, hdrdum)
-        super().next (self.password, self.paramversion, self.nacl)
+        super().next (self.password, self.paramversion, self.nacl, self.iv)
 
         self.set_object_counter (self.cnt + 1)
         return hdrdum
@@ -664,7 +680,6 @@ class Encrypt (Crypto):
 class Decrypt (Crypto):
 
     tag        = None   # GCM tag, part of header
-    strict_ivs = False  # if True, panic on duplicate object IV
     last_iv    = None   # check consecutive ivs in strict mode
 
     def __init__ (self, password=None, key=None, counter=None, fixedparts=None,
@@ -693,10 +708,7 @@ class Decrypt (Crypto):
             self.fixed = fixedparts
             self.fixed.sort ()
 
-        self.used_ivs   = set ()
-        self.strict_ivs = strict_ivs
-
-        super().__init__ (password, key, counter=counter)
+        super().__init__ (password, key, counter=counter, strict_ivs=strict_ivs)
 
 
     def valid_fixed_part (self, iv):
@@ -706,13 +718,6 @@ class Decrypt (Crypto):
         return i != len (self.fixed) and self.fixed [i] == fixed
 
 
-    def check_duplicate_iv (self, iv):
-        if self.strict_ivs is True and iv in self.used_ivs:
-            raise DuplicateIV ("iv %s was reused" % iv_fmt (iv))
-        # vi has not been used before; add to collection
-        self.used_ivs.add (iv)
-
-
     def check_consecutive_iv (self, iv):
         fixed, cnt = struct.unpack (FMT_I2N_IV, iv)
         if self.strict_ivs is True \
@@ -741,12 +746,10 @@ class Decrypt (Crypto):
         except KeyError:
             raise InvalidHeader ("next: not a header %r" % hdr)
 
-        if self.key is None or nacl != self.nacl:
-            super().next (self.password, paramversion, nacl)
+        super().next (self.password, paramversion, nacl, iv)
         if self.fixed is not None and self.valid_fixed_part (iv) is False:
             raise InvalidIVFixedPart ("iv %s has invalid fixed part"
                                       % iv_fmt (iv))
-        self.check_duplicate_iv   (iv)
         self.check_consecutive_iv (iv)
 
         self.tag = tag
index 50e3f2e..30ea1d5 100644 (file)
@@ -35,15 +35,23 @@ def faux_hdr (ctsize=1337, iv=None):
                                               b"b1eedc0ffeedea15")
         }
 
+FILL_MOD_MEMO = { }
 
 def fill_mod (n, off=0):
+    global FILL_MOD_MEMO
+    k = (n, off)
+    m = FILL_MOD_MEMO.get (k, None)
+    if m is not None:
+        return m
     buf = bytearray (n)
     bufv = memoryview (buf)
     for i in range (n):
         off += 1
         c = off % 64 + 32
         struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
-    return bytes (buf)
+    m = bytes (buf)
+    FILL_MOD_MEMO [k] = m
+    return m
 
 
 def faux_payload ():
@@ -164,6 +172,7 @@ class AESGCMTest (CryptoLayerTest):
         except crypto.InvalidParameter: # empty password
             pass
 
+
     def test_crypto_aes_gcm_dec_simple (self):
         password       = str (os.urandom (42))
         encryptor      = crypto.Encrypt (TEST_VERSION,
@@ -235,6 +244,86 @@ class AESGCMTest (CryptoLayerTest):
         assert len (pt) == len (ct)
 
 
+    def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
+        cnksiz = 1 << 10
+        pt    = fill_mod (1 << 14)
+        password       = str (os.urandom (42))
+        encryptor      = crypto.Encrypt (TEST_VERSION,
+                                         TEST_PARAMVERSION,
+                                         password=password,
+                                         nacl=TEST_STATIC_NACL,
+                                         strict_ivs=True)
+        header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
+
+        off = 0
+        ct = b""
+        while off < len (pt):
+            upto = min (off + cnksiz, len (pt))
+            cnk = encryptor.process (pt [off:upto])
+            ct += cnk
+            off += cnksiz
+        cnk, header, fixed = encryptor.done (header_dummy)
+        ct += cnk
+
+        assert len (pt) == len (ct)
+
+
+    def test_crypto_aes_gcm_enc_multiobj (self):
+        cnksiz    = 1 << 10
+        password  = str (os.urandom (42))
+        encryptor = crypto.Encrypt (TEST_VERSION,
+                                    TEST_PARAMVERSION,
+                                    password=password,
+                                    nacl=TEST_STATIC_NACL,
+                                    strict_ivs=False)
+
+        def addobj (i):
+            pt           = fill_mod (1 << 14, off=i)
+            header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
+
+            off = 0
+            ct = b""
+            while off < len (pt):
+                upto = min (off + cnksiz, len (pt))
+                cnk = encryptor.process (pt [off:upto])
+                ct += cnk
+                off += cnksiz
+            cnk, header, fixed = encryptor.done (header_dummy)
+            ct += cnk
+
+            assert len (pt) == len (ct)
+
+        for i in range (5): addobj (i)
+
+
+    def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
+        cnksiz    = 1 << 10
+        password  = str (os.urandom (42))
+        encryptor = crypto.Encrypt (TEST_VERSION,
+                                    TEST_PARAMVERSION,
+                                    password=password,
+                                    nacl=TEST_STATIC_NACL,
+                                    strict_ivs=True)
+
+        def addobj (i):
+            pt           = fill_mod (1 << 14, off=i)
+            header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
+
+            off = 0
+            ct = b""
+            while off < len (pt):
+                upto = min (off + cnksiz, len (pt))
+                cnk = encryptor.process (pt [off:upto])
+                ct += cnk
+                off += cnksiz
+            cnk, header, fixed = encryptor.done (header_dummy)
+            ct += cnk
+
+            assert len (pt) == len (ct)
+
+        for i in range (5): addobj (i)
+
+
     def test_crypto_aes_gcm_dec_multicnk (self):
         cnksiz         = 1 << 10
         orig_pt        = fill_mod (1 << 14)