deny insecure parameters by default
authorPhilipp Gesang <philipp.gesang@intra2net.com>
Thu, 16 Apr 2020 13:43:36 +0000 (15:43 +0200)
committerPhilipp Gesang <philipp.gesang@intra2net.com>
Thu, 16 Apr 2020 14:32:43 +0000 (16:32 +0200)
Address Deltatar audit item 2.1: Violating data integrity using
passthrough mode.

The ciphertext is assumed attacker controlled. This allows a
downgrade to plaintext by setting the parameter version to
plaintext passthrough on an encrypted object, bypassing integrity
protection. Reduce the susceptibility to this attack by refusing
to continue if such a header is encountered in the input.
Passthrough mode is still available if the *insecure* flag is
passed to the constructors of the encryption / decryption
handles.

deltatar/crypto.py
testing/test_crypto.py

index c3f8e32..3c3a816 100755 (executable)
@@ -281,6 +281,10 @@ ENCRYPTION_PARAMETERS = \
                    , "NaCl_LEN" : 16 })
         , "enc": "aes-gcm" } }
 
+# Mode zero is unencrypted and only provided for testing purposes. nless
+# the encryptor / decryptor are explicitly instructed to do so.
+MIN_SECURE_PARAMETERS = 1
+
 ###############################################################################
 ## constants
 ###############################################################################
@@ -910,6 +914,7 @@ class Crypto (object):
     strict_ivs   = False # if True, panic on duplicate object IV
     password     = None
     paramversion = None
+    insecure     = False # allow plaintext parameters
     stats = { "in"  : 0
             , "out" : 0
             , "obj" : 0 }
@@ -977,16 +982,28 @@ class Crypto (object):
 
 
     def set_parameters (self, password=None, key=None, paramversion=None,
-                        nacl=None, counter=None, strict_ivs=False):
+                        nacl=None, counter=None, strict_ivs=False,
+                        insecure=False):
         """
         Configure the internal state of a crypto context. Not intended for
         external use.
+
+        A parameter version indicating passthrough (plaintext) mode is rejected
+        with an ``InvalidParameter`` unless ``insecure`` is set.
         """
         self.next_fixed ()
         self.set_object_counter (counter)
         self.strict_ivs = strict_ivs
 
+        self.insecure = insecure
+
         if paramversion is not None:
+            if self.insecure is False \
+                    and paramversion < MIN_SECURE_PARAMETERS:
+                raise InvalidParameter \
+                    ("set_parameters: requested parameter version %d but "
+                     "plaintext encryption disallowed in secure context!"
+                     % paramversion)
             self.paramversion = paramversion
 
         if key is not None:
@@ -1043,7 +1060,8 @@ class Crypto (object):
             or self.password     != password
             or self.nacl         != nacl):
             self.set_parameters (password=password, paramversion=paramversion,
-                                 nacl=nacl, strict_ivs=self.strict_ivs)
+                                 nacl=nacl, strict_ivs=self.strict_ivs,
+                                 insecure=self.insecure)
 
 
     def check_duplicate_iv (self, iv):
@@ -1080,7 +1098,7 @@ class Encrypt (Crypto):
     paramenc     = None
 
     def __init__ (self, version, paramversion, password=None, key=None, nacl=None,
-                  counter=AES_GCM_IV_CNT_DATA, strict_ivs=True):
+                  counter=AES_GCM_IV_CNT_DATA, strict_ivs=True, insecure=False):
         """
         The ctor will throw immediately if one of the parameters does not conform
         to our expectations.
@@ -1098,6 +1116,7 @@ class Encrypt (Crypto):
                             ``AES_GCM_IV_CNT_INDEX`` are unique in each backup set
                             and cannot be reused even with different fixed parts.
         :type   strict_ivs: bool
+        :type     insecure: bool, whether to permit passthrough mode
         """
         if         password is     None and key is     None \
                 or password is not None and key is not None :
@@ -1145,7 +1164,7 @@ class Encrypt (Crypto):
         self.paramenc     = ENCRYPTION_PARAMETERS.get (paramversion) ["enc"]
 
         super().__init__ (password, key, paramversion, nacl, counter=counter,
-                          strict_ivs=strict_ivs)
+                          strict_ivs=strict_ivs, insecure=insecure)
 
 
     def next_fixed (self, retries=PDTCRYPT_IV_GEN_MAX_RETRIES):
@@ -1299,7 +1318,7 @@ class Decrypt (Crypto):
     hdr_ctsize = -1
 
     def __init__ (self, password=None, key=None, counter=None, fixedparts=None,
-                  strict_ivs=False):
+                  strict_ivs=False, insecure=False):
         """
         Sanitizing ctor for the decryption context. ``fixedparts`` specifies a
         list of IV fixed parts accepted during decryption. If a fixed part is
@@ -1314,6 +1333,8 @@ class Decrypt (Crypto):
                             ``AES_GCM_IV_CNT_INDEX`` are unique in each backup set
                             and cannot be reused even with different fixed parts.
         :type   fixedparts: bytes list
+        :type     insecure: bool, whether to process objects encrypted in
+                            passthrough mode (*``paramversion`` < 1*)
         """
         if         password is     None and key is     None \
                 or password is not None and key is not None :
@@ -1340,7 +1361,7 @@ class Decrypt (Crypto):
             self.fixed.sort ()
 
         super().__init__ (password=password, key=key, counter=counter,
-                          strict_ivs=strict_ivs)
+                          strict_ivs=strict_ivs, insecure=insecure)
 
 
     def valid_fixed_part (self, iv):
index 41b38d7..f47c7c7 100644 (file)
@@ -21,6 +21,7 @@ TEST_DUMMY_FILENAME  = "insurance-file.txt"
 TEST_VERSION         = 1
 TEST_PARAMVERSION    = 1
 TEST_STATIC_NACL     = os.urandom (CRYPTO_NACL_SIZE)
+PLAIN_PARAMVERSION   = 0
 
 def faux_hdr (ctsize=1337, iv=None):
     return \
@@ -81,6 +82,28 @@ class AESGCMTest (CryptoLayerTest):
                                      password=password,
                                      nacl=TEST_STATIC_NACL)
 
+    def test_crypto_aes_gcm_enc_ctor_bad_plainparams (self):
+        """Refuse plaintext passthrough mode by default."""
+        password   = str (os.urandom (42))
+        with self.assertRaises (crypto.InvalidParameter):
+            encryptor  = crypto.Encrypt (TEST_VERSION,
+                                         PLAIN_PARAMVERSION,
+                                         password=password,
+                                         nacl=TEST_STATIC_NACL)
+
+
+    def test_crypto_aes_gcm_enc_ctor_ok_insecure_plainparams (self):
+        """
+        Comply with request for plaintext passthrough mode if the
+        *insecure* flag is passed.
+        """
+        password   = str (os.urandom (42))
+        encryptor  = crypto.Encrypt (TEST_VERSION,
+                                        PLAIN_PARAMVERSION,
+                                        password=password,
+                                        nacl=TEST_STATIC_NACL,
+                                        insecure=True)
+
 
     def test_crypto_aes_gcm_enc_ctor_key (self):
         key        = os.urandom (42)
@@ -190,6 +213,64 @@ class AESGCMTest (CryptoLayerTest):
         assert plaintext == TEST_PLAINTEXT
 
 
+    def test_crypto_aes_gcm_dec_plain_bad (self):
+        """
+        Downgrade to plaintext must not be allowed in parameters
+        obtained from headers.
+        """
+        password       = str (os.urandom (42))
+        encryptor      = crypto.Encrypt (TEST_VERSION,
+                                         TEST_PARAMVERSION,
+                                         password=password,
+                                         nacl=TEST_STATIC_NACL)
+
+        header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
+        _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
+        rest, header, fixed = encryptor.done (header_dummy)
+        ciphertext    += rest
+
+        header         = crypto.hdr_read (header)
+        header ["paramversion"] = PLAIN_PARAMVERSION
+        ok, header     = crypto.hdr_make (header)
+        assert ok
+
+        decryptor      = crypto.Decrypt (password=password, fixedparts=fixed)
+        with self.assertRaises (crypto.InvalidParameter):
+            decryptor.next (header)
+
+
+    def test_crypto_aes_gcm_dec_plain_ok_insecure (self):
+        """
+        Allow plaintext crypto mode if *insecure* flag is passed.
+        """
+        password       = str (os.urandom (42))
+        encryptor      = crypto.Encrypt (TEST_VERSION,
+                                         PLAIN_PARAMVERSION,
+                                         password=password,
+                                         nacl=TEST_STATIC_NACL,
+                                         insecure=True)
+
+        header_dummy   = encryptor.next (TEST_DUMMY_FILENAME)
+        _, ciphertext  = encryptor.process (TEST_PLAINTEXT)
+        rest, header, fixed = encryptor.done (header_dummy)
+        ciphertext    += rest
+
+        header         = crypto.hdr_read (header)
+        header ["paramversion"] = PLAIN_PARAMVERSION
+        ok, header     = crypto.hdr_make (header)
+        assert ok
+
+        decryptor      = crypto.Decrypt (password=password,
+                                         fixedparts=fixed,
+                                         insecure=True)
+        decryptor.next (header)
+        plaintext      = decryptor.process (ciphertext)
+        rest           = decryptor.done ()
+        plaintext     += rest
+
+        assert plaintext == TEST_PLAINTEXT
+
+
     def test_crypto_aes_gcm_dec_bad_tag (self):
         password       = str (os.urandom (42))
         encryptor      = crypto.Encrypt (TEST_VERSION,