def __init__ (self, password, version, paramversion, nacl=None,
counter=AES_GCM_IV_CNT_DATA):
+ # passwort
+ if isinstance (password, str) is False:
+ raise InvalidParameter ("__init__: password must be a string, not %s"
+ % type (password))
if len (password) == 0:
raise InvalidParameter ("__init__: refusing to encrypt with empty "
"password")
+ # version
+ if isinstance (version, int) is False:
+ raise InvalidParameter ("__init__: version number must be an "
+ "integer, not %s" % type (version))
+ if version < 0:
+ raise InvalidParameter ("__init__: version number must be a "
+ "nonnegative integer, not %d" % version)
+ # paramversion
+ if isinstance (paramversion, int) is False:
+ raise InvalidParameter ("__init__: crypto parameter version number "
+ "must be an integer, not %s"
+ % type (paramversion))
+ if paramversion < 0:
+ raise InvalidParameter ("__init__: crypto parameter version number "
+ "must be a nonnegative integer, not %d"
+ % paramversion)
+ # salt
+ if nacl is not None:
+ if isinstance (nacl, bytes) is False:
+ raise InvalidParameter ("__init__: salt given, but of type %s "
+ "instead of bytes" % type (nacl))
+ # salt length would depend on the actual encryption so it can’t be
+ # validated at this point
self.fixed = [ ]
self.version = version
self.paramenc = ENCRYPTION_PARAMETERS.get (paramversion) ["enc"]
def next (self, filename, counter=None):
+ if isinstance (filename, str) is False:
+ raise InvalidParameter ("next: filename must be a string, no %s"
+ % type (filename))
+ if counter is not None \
+ and isinstance (counter, int) is False:
+ raise InvalidParameter ("next: the supplied counter is of invalid "
+ "type %s; please pass an integer instead"
+ % type (counter))
self.iv = self.iv_make ()
if self.paramenc == "aes-gcm":
self.enc = Cipher \
def done (self, cmpdata):
+ if isinstance (cmpdata, bytes) is False:
+ raise InvalidParameter ("done: comparison input expected as bytes, "
+ "not %s" % type (cmpdata))
filename, hdrdum = self.lastinfo
if cmpdata != hdrdum:
raise RuntimeError ("done: bad sync of header for object %d: "
def process (self, buf):
+ if isinstance (buf, bytes) is False:
+ raise InvalidParameter ("process: expected byte buffer, not %s"
+ % type (buf))
self.ptsize += len (buf)
data = super().process (buf)
self.ctsize += len (data)
def __init__ (self, password, paramversion=None, nacl=None, counter=None,
fixedparts=None):
+ # passwort
+ if isinstance (password, str) is False:
+ raise InvalidParameter ("__init__: password must be a string, not %s"
+ % type (password))
+ if len (password) == 0:
+ raise InvalidParameter ("__init__: supplied empty password but not "
+ "permitted for PDT encrypted files")
+ # version
+ if isinstance (version, int) is False:
+ raise InvalidParameter ("__init__: version number must be an "
+ "integer, not %s" % type (version))
+ if version < 0:
+ raise InvalidParameter ("__init__: version number must be a "
+ "nonnegative integer, not %d" % version)
+ # paramversion
+ if isinstance (paramversion, int) is False:
+ raise InvalidParameter ("__init__: crypto parameter version number "
+ "must be an integer, not %s"
+ % type (paramversion))
+ if paramversion < 0:
+ raise InvalidParameter ("__init__: crypto parameter version number "
+ "must be a nonnegative integer, not %d"
+ % paramversion)
+ # fixed parts
if fixedparts is not None:
+ if isinstance (fixedparts, list) is False:
+ raise InvalidParameter ("__init__: IV fixed parts must be "
+ "supplied as list, not %s"
+ % type (fixedparts))
self.fixed = fixedparts
self.fixed.sort ()
super().__init__ (password, paramversion, nacl, counter=counter)
def next (self, hdr):
if isinstance (hdr, bytes) is True:
hdr = hdr_read (hdr)
- paramversion = hdr ["paramversion"]
+ elif isinstance (hdr, dict) is False:
+ # this won’t catch malformed specs though
+ raise InvalidParameter ("next: wrong type of parameter hdr: "
+ "expected bytes or spec, got %s"
+ % type (tag))
+ try:
+ paramversion = hdr ["paramversion"]
+ nacl = hdr ["nacl"]
+ iv = hdr ["iv"]
+ tag = hdr ["tag"]
+ except KeyError:
+ raise InvalidHeader ("next: not a header %r" % hdr)
+
if self.key is None:
- super().next (self.password, paramversion, hdr ["nacl"])
- iv = hdr ["iv"]
+ super().next (self.password, paramversion, nacl)
if self.fixed is not None and self.valid_fixed_part (iv) is False:
fixed, _ = struct.unpack (FMT_I2N_IV, iv)
raise InvalidIVFixedPart ("iv [%r] has invalid fixed part [%r]"
% (iv, fixed))
- self.tag = hdr ["tag"]
+ self.tag = tag
defs = ENCRYPTION_PARAMETERS.get (paramversion, None)
if defs is None:
raise FormatError ("header contains unknown parameter version %d; "
if enc == "aes-gcm":
self.enc = Cipher \
( algorithms.AES (self.key)
- , modes.GCM (hdr["iv"], tag=self.tag)
+ , modes.GCM (iv, tag=self.tag)
, backend = default_backend ()) \
. decryptor ()
elif enc == "passthrough":
if tag is None:
data = self.enc.finalize ()
else:
+ if isinstance (tag, bytes) is False:
+ raise InvalidParameter ("done: wrong type of parameter "
+ "tag: expected bytes, got %s"
+ % type (tag))
data = self.enc.finalize_with_tag (self.tag)
except cryptography.exceptions.InvalidTag:
return InvalidGCMTag ("done: tag mismatch of object %d: %r "
def process (self, buf):
+ if isinstance (buf, bytes) is False:
+ raise InvalidParameter ("process: expected byte buffer, not %s"
+ % type (buf))
self.ctsize += len (buf)
data = super().process (buf)
self.ptsize += len (data)