"""
import binascii
+import bisect
import ctypes
import io
from functools import reduce, partial
FMT_UINT16_LE = "<H"
FMT_UINT64_LE = "<Q"
+FMT_I2N_IV = "<8sL" # 8 random bytes ‖ 32 bit counter
FMT_I2N_HDR = ("<" # host byte order
"8s" # magic
"H" # version
AES_GCM_IV_CNT_INFOFILE = 1 # constant
AES_GCM_IV_CNT_DATA = AES_GCM_IV_CNT_INFOFILE + 1 # also for multivolume
+AES_GCM_IV_CNT_MAX = 0xffFFffFF
###############################################################################
def update (self, b) : return b
- def finalize (self, ) : return b""
+ def finalize (self) : return b""
def finalize_with_tag (self, _) : return b""
enc = None
nacl = None
key = None
- pfx = None # 64 bit fixed parts of IV
- cnt = None
- iv = None
+ cnt = None # file counter (uint32_t != 0)
+ iv = None # current IV
+ pfx = None # accu for 64 bit fixed parts of IV
password = None
stats = { "in" : 0
, "out" : 0
, "obj" : 0 }
- state = STATE_FRESH
ctsize = -1
ptsize = -1
paramversion = None
+ info_counter_used = False
def __init__ (self, *al, **akv):
self.set_parameters (*al, **akv)
- def set_parameters (self, password, paramversion, nacl=None, pfx=None,
- counter=None):
- self.cnt = counter or AES_GCM_IV_CNT_DATA
+ def next_pfx (self):
+ pass
+
+
+ def set_object_counter (self, cnt=None):
+ if cnt is None:
+ self.cnt = AES_GCM_IV_CNT_DATA
+ return
+ if cnt == 0 or cnt > AES_GCM_IV_CNT_MAX + 1:
+ raise Exception ("XXX invalid counter value %d requested" % cnt)
+ if cnt == AES_GCM_IV_CNT_INFOFILE:
+ if self.info_counter_used is True:
+ raise Exception ("XXX attempted to reuse info file counter")
+ self.info_counter_used = True
+ return
+ if cnt <= AES_GCM_IV_CNT_MAX:
+ self.cnt = cnt
+ return
+ # cnt == AES_GCM_IV_CNT_MAX + 1 → wrap
+ self.cnt = AES_GCM_IV_CNT_DATA
+ self.next_pfx ()
+
+
+ def set_parameters (self, password, paramversion, nacl=None, counter=None,
+ nextpfx=None):
+ if nextpfx is not None:
+ self.next_pfx = nextpfx
+ self.next_pfx ()
+ self.set_object_counter (counter)
if isinstance (password, bytes) is False: password = str.encode (password)
self.password = password
if paramversion is None and nacl is None:
if kdf is not None:
self.key, self.nacl = kdf (password, nacl)
- if pfx is not None and isinstance (pfx, bytes) is True:
- self.pfx = pfx
- if self.pfx is None:
- self.pfx = os.urandom(8)
-
self.paramversion = paramversion
self.ctsize = 0
self.ptsize = 0
self.stats ["obj"] += 1
- self.state = STATE_LIVE
if ( self.paramversion != paramversion
or self.password != password
or self.nacl != nacl):
return self.stats ["obj"], self.stats ["in"], self.stats ["out"]
- def currentstate (self): return self.state
-
-
class Encrypt (Crypto):
curobj = None
hdrdum = None
+
def __init__ (self, password, paramversion, nacl=None,
counter=AES_GCM_IV_CNT_DATA):
- super().__init__ (password, paramversion, nacl, counter=counter)
+ self.pfx = [ ]
+ super().__init__ (password, paramversion, nacl, counter=counter,
+ nextpfx=lambda: self.pfx.append (os.urandom(8)))
def iv_make (self):
- return struct.pack("<8sL", self.pfx, self.cnt % 0xffFFffFF)
+ return struct.pack(FMT_I2N_IV, self.pfx [-1], self.cnt)
def next (self, filename, version, paramversion, nacl):
- self.iv = self.iv_make()
self.curobj = (filename, version, paramversion, nacl or self.nacl)
- self.cnt += 1
+ self.set_object_counter (self.cnt + 1)
+ self.iv = self.iv_make ()
defs = ENCRYPTION_PARAMETERS.get(paramversion)
enc = defs ["enc"]
if enc == "aes-gcm":
self.ctsize, self.enc.tag)
if ok is False:
raise Exception ("XXX error constructing header: %r" % hdr) ## we need to converge on a sensible error handling strategy
- self.state = STATE_DEAD
return data, hdr
class Decrypt (Crypto):
- pfx = None
- tag = None # GCM tag, part of header
+ tag = None # GCM tag, part of header
- def __init__ (self, password, paramversion=None, nacl=None, counter=None):
+ def __init__ (self, password, paramversion=None, nacl=None, counter=None,
+ fixedparts=None):
+ if fixedparts is not None:
+ self.pfx = fixedparts
+ self.pfx.sort ()
+ super().__init__ (password, paramversion, nacl, counter=counter,
+ nextpfx=lambda: self.pfx.pop())
super().__init__ (password, paramversion, nacl, counter=counter)
+ def valid_pfx (self, iv):
+ # check if fixed part is known
+ pfx, _cnt = struct.unpack (FMT_I2N_IV, iv)
+ i = bisect.bisect_left (self.pfx, pfx)
+ return i != len (self.pfx) and self.pfx [i] == pfx
+
+
def next (self, hdr):
+ paramversion = hdr ["paramversion"]
if self.key is None:
- super().next (self.password, hdr ["paramversion"], hdr ["nacl"])
- self.cnt += 1
+ super().next (self.password, paramversion, hdr ["nacl"])
+ self.set_object_counter (self.cnt + 1)
iv = hdr ["iv"]
+ if self.pfx is not None and self.valid_pfx (iv) is False:
+ raise Exception ("XXX iv %r has invalid fixed part" % iv)
self.tag = hdr ["tag"]
+ defs = ENCRYPTION_PARAMETERS.get(paramversion)
+ enc = defs ["enc"]
if enc == "aes-gcm":
self.enc = Cipher \
( algorithms.AES (self.key)
ret, data = self.enc.finalize_with_tag (self.tag)
except crypto.cryptography.exceptions.InvalidTag as exn:
return False, repr (exn)
+ self.ctsize += len (data)
self.stats ["out"] += len (data)
- self.state = STATE_DEAD
return ret, data