"""Reached EOF."""
pass
+class InvalidParameter (Exception):
+ """Inputs not valid for PDT encryption."""
+ pass
+
class InvalidHeader (Exception):
"""Header not valid."""
pass
"""IV fixed part not in supplied list."""
pass
+class FormatError (Exception):
+ """Unusable parameters in header."""
+ pass
+
class DecryptionError (Exception):
"""Error during decryption."""
pass
"""Makeshift __builtin_unreachable()."""
pass
+class InternalError (Exception):
+ """Errors not ascribable to bad user inputs or cryptography."""
+ pass
+
###############################################################################
## crypto layer version
key = None
cnt = None # file counter (uint32_t != 0)
iv = None # current IV
- pfx = None # accu for 64 bit fixed parts of IV
+ fixed = None # accu for 64 bit fixed parts of IV
password = None
paramversion = None
stats = { "in" : 0
self.cnt = AES_GCM_IV_CNT_DATA
return
if cnt == 0 or cnt > AES_GCM_IV_CNT_MAX + 1:
- raise Exception ("XXX invalid counter value %d requested" % cnt)
+ raise InvalidParameter ("invalid counter value %d requested: "
+ "acceptable values are from 1 to %d"
+ % (cnt, AES_GCM_IV_CNT_MAX))
if cnt == AES_GCM_IV_CNT_INFOFILE:
if self.info_counter_used is True:
- raise Exception ("XXX attempted to reuse info file counter")
+ raise InvalidParameter ("attempted to reuse info file counter "
+ "%d: must be unique" % cnt)
self.info_counter_used = True
return
if cnt <= AES_GCM_IV_CNT_MAX:
def __init__ (self, password, version, paramversion, nacl=None,
counter=AES_GCM_IV_CNT_DATA):
if len (password) == 0:
- raise Exception ("XXX refusing to encrypt with empty password")
- self.pfx = [ ]
+ raise InvalidParameter ("__init__: refusing to encrypt with empty "
+ "password")
+ self.fixed = [ ]
self.version = version
self.paramenc = ENCRYPTION_PARAMETERS.get (paramversion) ["enc"]
super().__init__ (password, paramversion, nacl, counter=counter,
- nextpfx=lambda: self.pfx.append (os.urandom(8)))
+ nextpfx=lambda: self.fixed.append (os.urandom(8)))
def iv_make (self):
- return struct.pack(FMT_I2N_IV, self.pfx [-1], self.cnt)
+ return struct.pack(FMT_I2N_IV, self.fixed [-1], self.cnt)
def next (self, filename, counter=None):
elif self.paramenc == "passthrough":
self.enc = PassthroughCipher ()
else:
- raise Exception ("XXX garbage encryption parameter %d → %r"
- % (self.paramversion, enc))
+ raise InvalidParameter ("next: parameter version %d not known"
+ % self.paramversion)
hdrdum = hdr_make_dummy (filename)
self.lastinfo = (filename, hdrdum)
super().next (self.password, self.paramversion, self.nacl)
def done (self, cmpdata):
filename, hdrdum = self.lastinfo
if cmpdata != hdrdum:
- raise Exception ("XXX bad sync for writing header") ## we need to converge on a sensible error handling strategy
+ raise RuntimeError ("done: bad sync of header for object %d: "
+ "preliminary data does not match; this likely "
+ "indicates a wrongly repositioned stream"
+ % self.cnt)
data = self.enc.finalize ()
self.stats ["out"] += len (data)
self.ctsize += len (data)
ok, hdr = hdr_from_params (self.version, self.paramversion, self.nacl,
self.iv, self.ctsize, self.enc.tag)
if ok is False:
- raise Exception ("XXX error constructing header: %r" % hdr) ## we need to converge on a sensible error handling strategy
- return data, hdr, self.pfx
+ raise InternalError ("error constructing header: %r" % hdr)
+ return data, hdr, self.fixed
def process (self, buf):
def __init__ (self, password, paramversion=None, nacl=None, counter=None,
fixedparts=None):
if fixedparts is not None:
- self.pfx = fixedparts
- self.pfx.sort ()
+ self.fixed = fixedparts
+ self.fixed.sort ()
super().__init__ (password, paramversion, nacl, counter=counter)
super().__init__ (password, paramversion, nacl, counter=counter)
- def valid_pfx (self, iv):
+ def valid_fixed_part (self, iv):
# check if fixed part is known
- pfx, _cnt = struct.unpack (FMT_I2N_IV, iv)
- i = bisect.bisect_left (self.pfx, pfx)
- return i != len (self.pfx) and self.pfx [i] == pfx
+ fixed, _cnt = struct.unpack (FMT_I2N_IV, iv)
+ i = bisect.bisect_left (self.fixed, fixed)
+ return i != len (self.fixed) and self.fixed [i] == fixed
def next (self, hdr):
if self.key is None:
super().next (self.password, paramversion, hdr ["nacl"])
iv = hdr ["iv"]
- if self.pfx is not None and self.valid_pfx (iv) is False:
+ if self.fixed is not None and self.valid_fixed_part (iv) is False:
fixed, _ = struct.unpack (FMT_I2N_IV, iv)
raise InvalidIVFixedPart ("iv [%r] has invalid fixed part [%r]"
% (iv, fixed))
self.tag = hdr ["tag"]
- defs = ENCRYPTION_PARAMETERS.get(paramversion)
+ defs = ENCRYPTION_PARAMETERS.get (paramversion, None)
+ if defs is None:
+ raise FormatError ("header contains unknown parameter version %d; "
+ "maybe the file was created by a more recent "
+ "version of Deltatar" % paramversion)
enc = defs ["enc"]
if enc == "aes-gcm":
self.enc = Cipher \
elif enc == "passthrough":
self.enc = PassthroughCipher ()
else:
- raise Exception ("XXX garbage encryption parameter %d → %r"
- % (paramversion, enc))
+ raise InternalError ("encryption parameter set %d refers to unknown "
+ "mode %r" % (paramversion, enc))
self.set_object_counter (self.cnt + 1)