import binascii
import ctypes
import io
+from functools import reduce
import os
import struct
import sys
I2N_HDR_MAGIC = b"PDTCRYPT"
-I2N_HDR_SIZE_MAGIC = 8
-I2N_HDR_SIZE_VERSION = 2
-I2N_HDR_SIZE_PARAMVERSION = 2
-I2N_HDR_SIZE_NACL = 16
-I2N_HDR_SIZE_IV = 12
-I2N_HDR_SIZE_CTSIZE = 8
+I2N_HDR_SIZE_MAGIC = 8 # 8
+I2N_HDR_SIZE_VERSION = 2 # 10
+I2N_HDR_SIZE_PARAMVERSION = 2 # 12
+I2N_HDR_SIZE_NACL = 16 # 28
+I2N_HDR_SIZE_IV = 12 # 40
+I2N_HDR_SIZE_CTSIZE = 8 # 48
I2N_TLR_SIZE_TAG = 16 # GCM auth tag, appended to data
I2N_HDR_SIZE = I2N_HDR_SIZE_MAGIC + I2N_HDR_SIZE_VERSION \
return True, bytes (buf)
+def hdr_make_dummy (s):
+ """
+ Create a header sized block of bytes initialized to a value derived from a
+ string. Used to verify we’ve jumped back correctly to the actual position
+ of the object header.
+ """
+ c = reduce (lambda a, c: a + ord(c), s, 0) % 0xFF
+ return bytearray (struct.pack ("B", c)) * I2N_HDR_SIZE
+
+
def hdr_make (hdr):
return hdr_from_params (version=hdr.get("version"),
paramversion=hdr.get("paramversion"),
def process (self, buf):
- self.aes.update (buf)
+ if self.aes is not None:
+ return self.aes.update (buf)
+ # this branch is taken when .close() is called on the fileobj
+ return b""
class Encrypt (Crypto):
+ curobj = None
+ hdrdum = None
+
def __init__ (self, password, paramversion, nacl=None):
if nacl is None:
_, params = kdf_by_version (paramversion)
return struct.pack("<8sL", self.pfx, self.cnt % 0xffFFffFF)
- def next (self, filename, version, paramversion, nacl, ctsize=None):
+ def next (self, filename, version, paramversion, nacl):
iv = self.iv_make()
- ok, hdr = hdr_from_params (version, paramversion, nacl, iv, ctsize)
- if ok is False:
- return None
+ self.curobj = (filename, version, paramversion, nacl)
self.cnt += 1
aad = "%s" % filename
self.aes = Cipher \
, modes.GCM (iv)
, backend = default_backend ()) \
.encryptor ()
- return hdr
-
- return self.aes.authenticate_additional_data (str.encode (aad))
+ self.aes.authenticate_additional_data (str.encode (aad))
+ self.hdrdum = hdr_make_dummy (filename)
+ return self.hdrdum
- def done (self):
- return self.aes.finalize ()
+ def done (self, cmpdata, ctsize):
+ if cmpdata != self.hdrdum:
+ raise "XXX bad sync for writing header" ## we need to converge on a sensible error handling strategy
+ data, tag = self.aes.finalize () ## XXX we could also put the tag in the header
+ ctsize += len (data)
+ (filename, version, paramversion, nacl) = self.curobj
+ ok, hdr = hdr_from_params (version, paramversion, nacl, iv, ctsize)
+ if ok is False:
+ raise "XXX error constructing header" ## we need to converge on a sensible error handling strategy
+ return data, tag, hdr
class Decrypt (Crypto):