key = None
cnt = None # file counter (uint32_t != 0)
iv = None # current IV
- fixed = None # accu for 64 bit fixed parts of IV
- used_ivs = None # tracks IVs during decryption
+ fixed = None # accu for 64 bit fixed parts of IV
+ used_ivs = None # tracks IVs
+ strict_ivs = False # if True, panic on duplicate object IV
password = None
paramversion = None
stats = { "in" : 0
index_counter_used = False
def __init__ (self, *al, **akv):
+ self.used_ivs = set ()
self.set_parameters (*al, **akv)
def set_parameters (self, password=None, key=None, paramversion=None,
- nacl=None, counter=None, nextpfx=None):
+ nacl=None, counter=None, nextpfx=None,
+ strict_ivs=False):
if nextpfx is not None:
self.next_pfx = nextpfx
self.next_pfx ()
self.set_object_counter (counter)
+ self.strict_ivs = strict_ivs
+
if key is not None:
self.key, self.nacl = key, nacl
return
return b""
- def next (self, password, paramversion, nacl):
+ def next (self, password, paramversion, nacl, iv):
self.ctsize = 0
self.ptsize = 0
self.stats ["obj"] += 1
+
+ self.check_duplicate_iv (iv)
+
if ( self.paramversion != paramversion
or self.password != password
or self.nacl != nacl):
self.set_parameters (password=password, paramversion=paramversion,
- nacl=nacl)
+ nacl=nacl, strict_ivs=self.strict_ivs)
+
+
+ def check_duplicate_iv (self, iv):
+ if self.strict_ivs is True and iv in self.used_ivs:
+ raise DuplicateIV ("iv %s was reused" % iv_fmt (iv))
+ # vi has not been used before; add to collection
+ self.used_ivs.add (iv)
def counters (self):
paramenc = None
def __init__ (self, version, paramversion, password=None, key=None, nacl=None,
- counter=AES_GCM_IV_CNT_DATA):
+ counter=AES_GCM_IV_CNT_DATA, strict_ivs=True):
if password is None and key is None \
or password is not None and key is not None :
raise InvalidParameter ("__init__: need either key or password")
self.paramenc = ENCRYPTION_PARAMETERS.get (paramversion) ["enc"]
super().__init__ (password, key, paramversion, nacl, counter=counter,
- nextpfx=lambda: self.fixed.append (os.urandom(8)))
+ nextpfx=lambda: self.fixed.append (os.urandom(8)),
+ strict_ivs=strict_ivs)
def iv_make (self):
% self.paramversion)
hdrdum = hdr_make_dummy (filename)
self.lastinfo = (filename, hdrdum)
- super().next (self.password, self.paramversion, self.nacl)
+ super().next (self.password, self.paramversion, self.nacl, self.iv)
self.set_object_counter (self.cnt + 1)
return hdrdum
class Decrypt (Crypto):
tag = None # GCM tag, part of header
- strict_ivs = False # if True, panic on duplicate object IV
last_iv = None # check consecutive ivs in strict mode
def __init__ (self, password=None, key=None, counter=None, fixedparts=None,
self.fixed = fixedparts
self.fixed.sort ()
- self.used_ivs = set ()
- self.strict_ivs = strict_ivs
-
- super().__init__ (password, key, counter=counter)
+ super().__init__ (password, key, counter=counter, strict_ivs=strict_ivs)
def valid_fixed_part (self, iv):
return i != len (self.fixed) and self.fixed [i] == fixed
- def check_duplicate_iv (self, iv):
- if self.strict_ivs is True and iv in self.used_ivs:
- raise DuplicateIV ("iv %s was reused" % iv_fmt (iv))
- # vi has not been used before; add to collection
- self.used_ivs.add (iv)
-
-
def check_consecutive_iv (self, iv):
fixed, cnt = struct.unpack (FMT_I2N_IV, iv)
if self.strict_ivs is True \
except KeyError:
raise InvalidHeader ("next: not a header %r" % hdr)
- if self.key is None or nacl != self.nacl:
- super().next (self.password, paramversion, nacl)
+ super().next (self.password, paramversion, nacl, iv)
if self.fixed is not None and self.valid_fixed_part (iv) is False:
raise InvalidIVFixedPart ("iv %s has invalid fixed part"
% iv_fmt (iv))
- self.check_duplicate_iv (iv)
self.check_consecutive_iv (iv)
self.tag = tag
b"b1eedc0ffeedea15")
}
+FILL_MOD_MEMO = { }
def fill_mod (n, off=0):
+ global FILL_MOD_MEMO
+ k = (n, off)
+ m = FILL_MOD_MEMO.get (k, None)
+ if m is not None:
+ return m
buf = bytearray (n)
bufv = memoryview (buf)
for i in range (n):
off += 1
c = off % 64 + 32
struct.pack_into ("c", bufv, i, chr(c).encode("UTF-8"))
- return bytes (buf)
+ m = bytes (buf)
+ FILL_MOD_MEMO [k] = m
+ return m
def faux_payload ():
except crypto.InvalidParameter: # empty password
pass
+
def test_crypto_aes_gcm_dec_simple (self):
password = str (os.urandom (42))
encryptor = crypto.Encrypt (TEST_VERSION,
assert len (pt) == len (ct)
+ def test_crypto_aes_gcm_enc_multicnk_strict_ivs (self):
+ cnksiz = 1 << 10
+ pt = fill_mod (1 << 14)
+ password = str (os.urandom (42))
+ encryptor = crypto.Encrypt (TEST_VERSION,
+ TEST_PARAMVERSION,
+ password=password,
+ nacl=TEST_STATIC_NACL,
+ strict_ivs=True)
+ header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
+
+ off = 0
+ ct = b""
+ while off < len (pt):
+ upto = min (off + cnksiz, len (pt))
+ cnk = encryptor.process (pt [off:upto])
+ ct += cnk
+ off += cnksiz
+ cnk, header, fixed = encryptor.done (header_dummy)
+ ct += cnk
+
+ assert len (pt) == len (ct)
+
+
+ def test_crypto_aes_gcm_enc_multiobj (self):
+ cnksiz = 1 << 10
+ password = str (os.urandom (42))
+ encryptor = crypto.Encrypt (TEST_VERSION,
+ TEST_PARAMVERSION,
+ password=password,
+ nacl=TEST_STATIC_NACL,
+ strict_ivs=False)
+
+ def addobj (i):
+ pt = fill_mod (1 << 14, off=i)
+ header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
+
+ off = 0
+ ct = b""
+ while off < len (pt):
+ upto = min (off + cnksiz, len (pt))
+ cnk = encryptor.process (pt [off:upto])
+ ct += cnk
+ off += cnksiz
+ cnk, header, fixed = encryptor.done (header_dummy)
+ ct += cnk
+
+ assert len (pt) == len (ct)
+
+ for i in range (5): addobj (i)
+
+
+ def test_crypto_aes_gcm_enc_multiobj_strict_ivs (self):
+ cnksiz = 1 << 10
+ password = str (os.urandom (42))
+ encryptor = crypto.Encrypt (TEST_VERSION,
+ TEST_PARAMVERSION,
+ password=password,
+ nacl=TEST_STATIC_NACL,
+ strict_ivs=True)
+
+ def addobj (i):
+ pt = fill_mod (1 << 14, off=i)
+ header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, i))
+
+ off = 0
+ ct = b""
+ while off < len (pt):
+ upto = min (off + cnksiz, len (pt))
+ cnk = encryptor.process (pt [off:upto])
+ ct += cnk
+ off += cnksiz
+ cnk, header, fixed = encryptor.done (header_dummy)
+ ct += cnk
+
+ assert len (pt) == len (ct)
+
+ for i in range (5): addobj (i)
+
+
def test_crypto_aes_gcm_dec_multicnk (self):
cnksiz = 1 << 10
orig_pt = fill_mod (1 << 14)