validate data lengths against value in header
authorPhilipp Gesang <philipp.gesang@intra2net.com>
Wed, 29 Jan 2020 14:27:17 +0000 (15:27 +0100)
committerThomas Jarosch <thomas.jarosch@intra2net.com>
Sat, 1 Feb 2020 13:42:43 +0000 (14:42 +0100)
When decrypting we need to ensure that the decryptor will not
blindly accept data past the size that was specified by the
current object's header. Note that processing trailing data would
fail eventually when the GCM tag is checked; this check just
catches that case earlier.

deltatar/crypto.py
testing/test_crypto.py

index 8928737..53150f1 100755 (executable)
@@ -233,6 +233,14 @@ class NonConsecutiveIV (Exception):
     pass
 
 
+class CiphertextTooLong (Exception):
+    """
+    An attempt was made to decrypt more data than the ciphertext size declared
+    in the object header.
+    """
+    pass
+
+
 class FormatError (Exception):
     """Unusable parameters in header."""
     pass
@@ -1288,6 +1296,7 @@ class Decrypt (Crypto):
 
     tag        = None   # GCM tag, part of header
     last_iv    = None   # check consecutive ivs in strict mode
+    hdr_ctsize = -1
 
     def __init__ (self, password=None, key=None, counter=None, fixedparts=None,
                   strict_ivs=False):
@@ -1379,9 +1388,17 @@ class Decrypt (Crypto):
             nacl         = hdr ["nacl"]
             iv           = hdr ["iv"]
             tag          = hdr ["tag"]
+            ctsize       = hdr ["ctsize"]
         except KeyError:
             raise InvalidHeader ("next: not a header %r" % hdr)
 
+        if ctsize > PDTCRYPT_MAX_OBJ_SIZE:
+            raise InvalidHeader ("next: ciphertext size %d exceeds maximum "
+                                 "object size (%d)"
+                                 % (ctsize, PDTCRYPT_MAX_OBJ_SIZE))
+
+        self.hdr_ctsize = ctsize
+
         super().next (self.password, paramversion, nacl, iv)
         if self.fixed is not None and self.valid_fixed_part (iv) is False:
             raise InvalidIVFixedPart ("iv %s has invalid fixed part"
@@ -1430,8 +1447,11 @@ class Decrypt (Crypto):
             raise InvalidGCMTag ("done: tag mismatch of object %d: %s "
                                   "rejected by finalize ()"
                                   % (self.cnt, binascii.hexlify (self.tag)))
-        self.ctsize += len (data)
+        self.ptsize += len (data)
         self.stats ["out"] += len (data)
+
+        assert self.ctsize == self.ptsize == self.hdr_ctsize
+
         return data
 
 
@@ -1443,6 +1463,11 @@ class Decrypt (Crypto):
             raise InvalidParameter ("process: expected byte buffer, not %s"
                                     % type (buf))
         self.ctsize += len (buf)
+        if self.ctsize > self.hdr_ctsize:
+            raise CiphertextTooLong ("process: object length exceeded: got "
+                                     "%d B but header specfiies %d B"
+                                     % (self.ctsize, self.hdr_ctsize))
+
         data = super().process (buf)
         self.ptsize += len (data)
         return data
index 7efb195..dc652e2 100644 (file)
@@ -480,6 +480,66 @@ class AESGCMTest (CryptoLayerTest):
         for i in range (16): encobj (1 << i)
 
 
+    def test_crypto_aes_gcm_dec_length_cap (self):
+        """
+        The decryptor must reject headers with an object size that exceeds
+        the PDTCRYPT maximum. Longer files split into multiple objects.
+        """
+        password        = str (os.urandom (42))
+        meta            = faux_hdr()
+        meta ["ctsize"] = crypto.PDTCRYPT_MAX_OBJ_SIZE + 1
+        ok, header      = crypto.hdr_make (meta)
+
+        assert ok
+
+        # Set up decryption with bogus header.
+        decryptor = crypto.Decrypt (password=password, fixedparts=[])
+
+        with self.assertRaises (crypto.InvalidHeader):
+            decryptor.next (header)
+
+
+    def test_crypto_aes_gcm_dec_length_mismatch (self):
+        """
+        Catch attempts at decrypting more data than what was stated in the
+        header.
+        """
+        cnksiz         = 1 << 10
+        orig_pt        = fill_mod (1 << 14)
+        password       = str (os.urandom (42))
+        encryptor      = crypto.Encrypt (TEST_VERSION,
+                                         TEST_PARAMVERSION,
+                                         password=password,
+                                         nacl=TEST_STATIC_NACL)
+        header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
+
+        off = 0
+        ct = b""
+        while off < len (orig_pt):
+            upto = min (off + cnksiz, len (orig_pt))
+            _n, cnk = encryptor.process (orig_pt [off:upto])
+            ct += cnk
+            off += cnksiz
+        cnk, header, fixed = encryptor.done (header_dummy)
+        ct += cnk
+
+        decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
+
+        decryptor.next (header)
+        off = 0
+        pt  = b""
+        while off < len (orig_pt):
+            upto = min (off + cnksiz, len (orig_pt))
+            cnk  = decryptor.process (ct [off:upto])
+            pt += cnk
+            off += cnksiz
+
+        with self.assertRaises (crypto.CiphertextTooLong):
+            # Try and decrypt one byte more than was encrypted.
+            # This must be caught in crypto.py.
+            _ = decryptor.process (ct [0:1])
+
+
     def test_crypto_aes_gcm_dec_multicnk (self):
         cnksiz         = 1 << 10
         orig_pt        = fill_mod (1 << 14)