From 36b9932a9d37d2728a5b55df7869fa9601dc254f Mon Sep 17 00:00:00 2001 From: Philipp Gesang Date: Mon, 10 Apr 2017 11:15:09 +0200 Subject: [PATCH] add input checks at API boundaries MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Verify conformance of user-supplied inputs on a very basic level, communicating violations via InvalidParameter exception. Of course due to the limitations of the type systems these can’t be made exhaustive. E. g. no effort is being made to inspect a (passing) list or dict test for well-formed contents. --- deltatar/crypto.py | 97 +++++++++++++++++++++++++++++++++++++++++++++++++--- 1 files changed, 92 insertions(+), 5 deletions(-) diff --git a/deltatar/crypto.py b/deltatar/crypto.py index 0a65741..3e512ba 100755 --- a/deltatar/crypto.py +++ b/deltatar/crypto.py @@ -494,9 +494,36 @@ class Encrypt (Crypto): def __init__ (self, password, version, paramversion, nacl=None, counter=AES_GCM_IV_CNT_DATA): + # passwort + if isinstance (password, str) is False: + raise InvalidParameter ("__init__: password must be a string, not %s" + % type (password)) if len (password) == 0: raise InvalidParameter ("__init__: refusing to encrypt with empty " "password") + # version + if isinstance (version, int) is False: + raise InvalidParameter ("__init__: version number must be an " + "integer, not %s" % type (version)) + if version < 0: + raise InvalidParameter ("__init__: version number must be a " + "nonnegative integer, not %d" % version) + # paramversion + if isinstance (paramversion, int) is False: + raise InvalidParameter ("__init__: crypto parameter version number " + "must be an integer, not %s" + % type (paramversion)) + if paramversion < 0: + raise InvalidParameter ("__init__: crypto parameter version number " + "must be a nonnegative integer, not %d" + % paramversion) + # salt + if nacl is not None: + if isinstance (nacl, bytes) is False: + raise InvalidParameter ("__init__: salt given, but of type %s " + "instead of bytes" % type (nacl)) + # salt length would depend on the actual encryption so it can’t be + # validated at this point self.fixed = [ ] self.version = version self.paramenc = ENCRYPTION_PARAMETERS.get (paramversion) ["enc"] @@ -510,6 +537,14 @@ class Encrypt (Crypto): def next (self, filename, counter=None): + if isinstance (filename, str) is False: + raise InvalidParameter ("next: filename must be a string, no %s" + % type (filename)) + if counter is not None \ + and isinstance (counter, int) is False: + raise InvalidParameter ("next: the supplied counter is of invalid " + "type %s; please pass an integer instead" + % type (counter)) self.iv = self.iv_make () if self.paramenc == "aes-gcm": self.enc = Cipher \ @@ -531,6 +566,9 @@ class Encrypt (Crypto): def done (self, cmpdata): + if isinstance (cmpdata, bytes) is False: + raise InvalidParameter ("done: comparison input expected as bytes, " + "not %s" % type (cmpdata)) filename, hdrdum = self.lastinfo if cmpdata != hdrdum: raise RuntimeError ("done: bad sync of header for object %d: " @@ -548,6 +586,9 @@ class Encrypt (Crypto): def process (self, buf): + if isinstance (buf, bytes) is False: + raise InvalidParameter ("process: expected byte buffer, not %s" + % type (buf)) self.ptsize += len (buf) data = super().process (buf) self.ctsize += len (data) @@ -560,7 +601,35 @@ class Decrypt (Crypto): def __init__ (self, password, paramversion=None, nacl=None, counter=None, fixedparts=None): + # passwort + if isinstance (password, str) is False: + raise InvalidParameter ("__init__: password must be a string, not %s" + % type (password)) + if len (password) == 0: + raise InvalidParameter ("__init__: supplied empty password but not " + "permitted for PDT encrypted files") + # version + if isinstance (version, int) is False: + raise InvalidParameter ("__init__: version number must be an " + "integer, not %s" % type (version)) + if version < 0: + raise InvalidParameter ("__init__: version number must be a " + "nonnegative integer, not %d" % version) + # paramversion + if isinstance (paramversion, int) is False: + raise InvalidParameter ("__init__: crypto parameter version number " + "must be an integer, not %s" + % type (paramversion)) + if paramversion < 0: + raise InvalidParameter ("__init__: crypto parameter version number " + "must be a nonnegative integer, not %d" + % paramversion) + # fixed parts if fixedparts is not None: + if isinstance (fixedparts, list) is False: + raise InvalidParameter ("__init__: IV fixed parts must be " + "supplied as list, not %s" + % type (fixedparts)) self.fixed = fixedparts self.fixed.sort () super().__init__ (password, paramversion, nacl, counter=counter) @@ -577,15 +646,26 @@ class Decrypt (Crypto): def next (self, hdr): if isinstance (hdr, bytes) is True: hdr = hdr_read (hdr) - paramversion = hdr ["paramversion"] + elif isinstance (hdr, dict) is False: + # this won’t catch malformed specs though + raise InvalidParameter ("next: wrong type of parameter hdr: " + "expected bytes or spec, got %s" + % type (tag)) + try: + paramversion = hdr ["paramversion"] + nacl = hdr ["nacl"] + iv = hdr ["iv"] + tag = hdr ["tag"] + except KeyError: + raise InvalidHeader ("next: not a header %r" % hdr) + if self.key is None: - super().next (self.password, paramversion, hdr ["nacl"]) - iv = hdr ["iv"] + super().next (self.password, paramversion, nacl) if self.fixed is not None and self.valid_fixed_part (iv) is False: fixed, _ = struct.unpack (FMT_I2N_IV, iv) raise InvalidIVFixedPart ("iv [%r] has invalid fixed part [%r]" % (iv, fixed)) - self.tag = hdr ["tag"] + self.tag = tag defs = ENCRYPTION_PARAMETERS.get (paramversion, None) if defs is None: raise FormatError ("header contains unknown parameter version %d; " @@ -595,7 +675,7 @@ class Decrypt (Crypto): if enc == "aes-gcm": self.enc = Cipher \ ( algorithms.AES (self.key) - , modes.GCM (hdr["iv"], tag=self.tag) + , modes.GCM (iv, tag=self.tag) , backend = default_backend ()) \ . decryptor () elif enc == "passthrough": @@ -612,6 +692,10 @@ class Decrypt (Crypto): if tag is None: data = self.enc.finalize () else: + if isinstance (tag, bytes) is False: + raise InvalidParameter ("done: wrong type of parameter " + "tag: expected bytes, got %s" + % type (tag)) data = self.enc.finalize_with_tag (self.tag) except cryptography.exceptions.InvalidTag: return InvalidGCMTag ("done: tag mismatch of object %d: %r " @@ -623,6 +707,9 @@ class Decrypt (Crypto): def process (self, buf): + if isinstance (buf, bytes) is False: + raise InvalidParameter ("process: expected byte buffer, not %s" + % type (buf)) self.ctsize += len (buf) data = super().process (buf) self.ptsize += len (data) -- 1.7.1