"16s") # GCM tag
# aes+gcm
-AES_GCM_IV_LEN = 12
-AES_GCM_MAX_SIZE = (1 << 36) - (1 << 5) # 2^39 - 2^8 b ≅ 64 GB
-AES_GCM_FMT_TAG = "<16s"
+AES_GCM_MAX_SIZE = (1 << 36) - (1 << 5) # 2^39 - 2^8 b ≅ 64 GB
+PDTCRYPT_MAX_OBJ_SIZE_DEFAULT = 63 * (1 << 30) # 63 GB
+PDTCRYPT_MAX_OBJ_SIZE = PDTCRYPT_MAX_OBJ_SIZE_DEFAULT
# index and info files are written on-the fly while encrypting so their
# counters must be available inadvance
-AES_GCM_IV_CNT_INFOFILE = 1 # constant
-AES_GCM_IV_CNT_INDEX = AES_GCM_IV_CNT_INFOFILE + 1
-AES_GCM_IV_CNT_DATA = AES_GCM_IV_CNT_INDEX + 1 # also for multivolume
-AES_GCM_IV_CNT_MAX = 0xffFFffFF
+AES_GCM_IV_CNT_INFOFILE = 1 # constant
+AES_GCM_IV_CNT_INDEX = AES_GCM_IV_CNT_INFOFILE + 1
+AES_GCM_IV_CNT_DATA = AES_GCM_IV_CNT_INDEX + 1 # also for multivolume
+AES_GCM_IV_CNT_MAX_DEFAULT = 0xffFFffFF
+AES_GCM_IV_CNT_MAX = AES_GCM_IV_CNT_MAX_DEFAULT
###############################################################################
def process (self, buf):
- if self.enc is not None:
- self.stats ["in"] += len (buf)
- out = self.enc.update (buf)
- self.stats ["out"] += len (out)
- return out
- return b""
+ if self.enc is None:
+ raise RuntimeError ("process: context not initialized")
+ self.stats ["in"] += len (buf)
+ out = self.enc.update (buf)
+ self.stats ["out"] += len (out)
+ return out
def next (self, password, paramversion, nacl, iv):
return struct.pack(FMT_I2N_IV, self.fixed [-1], self.cnt)
- def next (self, filename, counter=None):
- if isinstance (filename, str) is False:
- raise InvalidParameter ("next: filename must be a string, no %s"
- % type (filename))
+ def next (self, filename=None, counter=None):
+ if filename is None:
+ if self.lastinfo is None:
+ raise InvalidParameter ("next: filename is mandatory for "
+ "first object")
+ filename, _dummy = self.lastinfo
+ else:
+ if isinstance (filename, str) is False:
+ raise InvalidParameter ("next: filename must be a string, no %s"
+ % type (filename))
if counter is not None:
if isinstance (counter, int) is False:
raise InvalidParameter ("next: the supplied counter is of "
if isinstance (cmpdata, bytes) is False:
raise InvalidParameter ("done: comparison input expected as bytes, "
"not %s" % type (cmpdata))
+ if self.lastinfo is None:
+ raise RuntimeError ("done: encryption context not initialized")
filename, hdrdum = self.lastinfo
if cmpdata != hdrdum:
raise RuntimeError ("done: bad sync of header for object %d: "
if isinstance (buf, bytes) is False:
raise InvalidParameter ("process: expected byte buffer, not %s"
% type (buf))
- self.ptsize += len (buf)
- data = super().process (buf)
+ bsize = len (buf)
+ newptsize = self.ptsize + bsize
+ diff = newptsize - PDTCRYPT_MAX_OBJ_SIZE
+ if diff > 0:
+ bsize -= diff
+ newptsize = PDTCRYPT_MAX_OBJ_SIZE
+ self.ptsize = newptsize
+ data = super().process (buf [:bsize])
self.ctsize += len (data)
- return data
+ return bsize, data
class Decrypt (Crypto):
## testing helpers
###############################################################################
-def _testing_set_AES_GCM_IV_CNT_MAX (vow, n):
+def _patch_global (glob, vow, n=None):
"""
Adapt upper file counter bound for testing IV logic. Completely unsafe.
"""
assert vow == "I am fully aware that this will void my warranty."
- global AES_GCM_IV_CNT_MAX
- r = AES_GCM_IV_CNT_MAX
- AES_GCM_IV_CNT_MAX = n
+ r = globals () [glob]
+ if n is None:
+ n = globals () [glob + "_DEFAULT"]
+ globals () [glob] = n
return r
+_testing_set_AES_GCM_IV_CNT_MAX = \
+ partial (_patch_global, "AES_GCM_IV_CNT_MAX")
+
+_testing_set_PDTCRYPT_MAX_OBJ_SIZE = \
+ partial (_patch_global, "PDTCRYPT_MAX_OBJ_SIZE")
+
###############################################################################
## freestanding invocation
###############################################################################