add strict IV validation to decryption handler
authorPhilipp Gesang <philipp.gesang@intra2net.com>
Tue, 18 Apr 2017 14:07:29 +0000 (16:07 +0200)
committerThomas Jarosch <thomas.jarosch@intra2net.com>
Mon, 2 Apr 2018 11:34:08 +0000 (13:34 +0200)
Optionally (on CLI, with the “-s” flag) check for additional IV
properties:

    - Accidental reuse: in GCM, the same IV used more than once
      means that the plaintext is compromised.

    - Unstructured archive: In the headers of a normal PDT
      encrypted archive, the variable parts of the IVs are
      consecutive unless the fixed part changes.

deltatar/crypto.py

index dfdb1d6..5460dec 100755 (executable)
@@ -124,6 +124,16 @@ class InvalidIVFixedPart (Exception):
     pass
 
 
+class DuplicateIV (Exception):
+    """IV reused."""
+    pass
+
+
+class NonConsecutiveIV (Exception):
+    """IV reused."""
+    pass
+
+
 class FormatError (Exception):
     """Unusable parameters in header."""
     pass
@@ -605,9 +615,12 @@ class Encrypt (Crypto):
 
 class Decrypt (Crypto):
 
-    tag = None # GCM tag, part of header
+    tag      = None # GCM tag, part of header
+    used_ivs = None # if a set, panic on duplicate object IV
+    last_iv  = None # check consecutive ivs in strict mode
 
-    def __init__ (self, password, counter=None, fixedparts=None):
+    def __init__ (self, password, counter=None, fixedparts=None,
+                  strict_ivs=False):
         # passwort
         if isinstance (password, str) is False:
             raise InvalidParameter ("__init__: password must be a string, not %s"
@@ -624,6 +637,10 @@ class Decrypt (Crypto):
             self.fixed = fixedparts
             self.fixed.sort ()
             super().__init__ (password, counter=counter)
+
+        if strict_ivs is True:
+            self.used_ivs = set ()
+
         super().__init__ (password, counter=counter)
 
 
@@ -634,6 +651,24 @@ class Decrypt (Crypto):
         return i != len (self.fixed) and self.fixed [i] == fixed
 
 
+    def check_duplicate_iv (self, iv):
+        if iv in self.used_ivs:
+            raise DuplicateIV ("iv [%r] was reused" % iv)
+        # vi has not been used before; add to collection
+        self.used_ivs.add (iv)
+
+
+    def check_consecutive_iv (self, iv):
+        fixed, cnt = struct.unpack (FMT_I2N_IV, iv)
+        if self.last_iv is not None \
+                and self.last_iv [0] == fixed \
+                and self.last_iv [1] != cnt - 1:
+            raise NonConsecutiveIV ("iv [%r] counter not successor of "
+                                    "last object (expected %d, found %d)"
+                                    % (self.last_iv [1], cnt))
+        self.last_iv = (iv, cnt)
+
+
     def next (self, hdr):
         if isinstance (hdr, bytes) is True:
             hdr = hdr_read (hdr)
@@ -656,6 +691,10 @@ class Decrypt (Crypto):
             fixed, _ = struct.unpack (FMT_I2N_IV, iv)
             raise InvalidIVFixedPart ("iv [%r] has invalid fixed part [%r]"
                                       % (iv, fixed))
+        if self.used_ivs is not None:
+            self.check_duplicate_iv   (iv)
+            self.check_consecutive_iv (iv)
+
         self.tag = tag
         defs = ENCRYPTION_PARAMETERS.get (paramversion, None)
         if defs is None:
@@ -712,6 +751,7 @@ class Decrypt (Crypto):
 ###############################################################################
 
 PDTCRYPT_VERBOSE   = False
+PDTCRYPT_STRICTIVS = False
 PDTCRYPT_BLOCKSIZE = 1 << 12
 PDTCRYPT_SINK      = 0
 PDTCRYPT_SOURCE    = 1
@@ -733,7 +773,7 @@ def depdtcrypt (pw, ins, outs):
     """
     ctleft     = -1              # length of ciphertext to consume
     ctcurrent  = 0               # total ciphertext of current object
-    decr       = Decrypt (pw)    # decryptor
+    decr       = Decrypt (pw, strict_ivs=PDTCRYPT_STRICTIVS)  # decryptor
     total_obj  = 0               # total number of objects read
     total_pt   = 0               # total plaintext bytes
     total_ct   = 0               # total ciphertext bytes
@@ -794,7 +834,7 @@ def depdtcrypt (pw, ins, outs):
                 return total_read, total_obj, total_ct, total_pt
             except InvalidHeader as exn:
                 raise PDTDecryptionError ("invalid header at position %d in %r "
-                                          "(%s)" % (exn, tell (ins), ins))
+                                          "(%s)" % (tell (ins), exn, ins))
             if PDTCRYPT_VERBOSE is True:
                 pretty = hdr_fmt_pretty (hdr)
                 noise (reduce (lambda a, e: (a + "\n" if a else "") + "PDT:\t· " + e,
@@ -900,6 +940,9 @@ def parse_argv (argv):
         elif arg in [ "-v", "--verbose", "--wtf" ]:
             global PDTCRYPT_VERBOSE
             PDTCRYPT_VERBOSE = True
+        elif arg in [ "-s", "--strict-ivs" ]:
+            global PDTCRYPT_STRICTIVS
+            PDTCRYPT_STRICTIVS = True
         elif arg in [ "-i", "--in", "--source" ]:
             insspec = next (argvi)
             if PDTCRYPT_VERBOSE is True: noise ("PDT: decrypt from %s" % insspec)