__all__ = [ "ENCRYPT", "DECRYPT"
- , "AES_GCM_context"
, "hdr_make", "hdr_read", "hdr_fmt", "hdr_fmt_pretty"
, "I2N_HDR_SIZE", "I2N_TLR_SIZE_TAG" ]
SCRYPT_p = 1
SCRYPT_NaCl_LEN = 16
+
###############################################################################
-## header
+## header, trailer
###############################################################################
#
# Interface:
}
-def hdr_from_params (version, paramversion, nacl, iv, ctsize):
+def hdr_read_stream (instr):
+ data = instr.read(I2N_HDR_SIZE)
+ if len (data) != FMT_I2N_HDR:
+ return False, "error reading from [%r]: expected %d B, received %d" \
+ % (instr, I2N_HDR_SIZE, len (data))
+ return True, hdr_read (data)
+
+
+def hdr_from_params (version, paramversion, nacl, iv, ctsize=None):
+ if ctsize is None:
+ ctsize = 0xffffFFFFffffFFFF # dummy, overwritten later
buf = bytearray (I2N_HDR_SIZE)
bufv = memoryview (buf)
def tag_fmt (t):
return struct.pack (AES_GCM_FMT_TAG, t)
+
def tag_read (data):
try:
tag, = struct.unpack (AES_GCM_FMT_TAG, data)
return False, "error reading tag from [%r]: %s" % (data, str (exn))
return True, tag
-###############################################################################
-## {de,en}cryption
-###############################################################################
-
-
-ENCRYPT = 0
-DECRYPT = 1
-
-def aesgcm_context_create (kind, key, aad, iv):
- if kind == ENCRYPT:
- iv = iv or os.urandom(AES_GCM_IV_LEN)
- ctx = Cipher \
- ( algorithms.AES (key)
- , modes.GCM (iv)
- , backend = default_backend ()) \
- .encryptor ()
- elif kind == DECRYPT:
- ctx = Cipher \
- ( algorithms.AES (key)
- , modes.GCM (iv)
- , backend = default_backend ()) \
- . decryptor ()
- ctx.authenticate_additional_data (aad)
- return ctx
-
-
-class AES_GCM_context (object):
- """
- Thin wrapper context over AES encryption.
- """
-
- ctx = None
- iv = None
-
- def __init__ (self, kind, key, aad, iv=None):
- self.aad = aad
- if not iv:
- iv = os.urandom (AES_GCM_IV_LEN)
- self.iv = iv
- self.ctx = aesgcm_context_create (kind, key, aad, iv)
-
- def process_chunk (self, cnk):
- if self.ctx is None:
- return False, "no valid encryption context"
- return True, self.ctx.update (cnk)
+def tag_read_stream (source):
+ data = source.read (I2N_TLR_SIZE_TAG)
+ return tag_read (data)
- def done (self, tag=None):
- if self.ctx is None:
- return False, "no valid encryption context", None
- if tag is None:
- ret = self.ctx.finalize ()
- return True, ret, self.ctx.tag
- ret = self.ctx.finalize_with_tag (tag) # XXX this raises “InvalidTag” if tags don’t match
- return True, ret, None
-
###############################################################################
## convenience wrapper
"""
Encryption context to remain alive throughout an entire tarfile pass.
"""
- kind = None
aes = None
nacl = None
key = None
pfx = None # 64 bit fixed parts of IV
cnt = None
- def __init__ (self, kind, pw, nacl, paramversion, pfx=None):
+ key_memo = { }
+
+ def __init__ (self):
+ self.cnt = 1
+
+
+ def set_parameters (self, pw, nacl, paramversion, pfx=None):
+ self.pw = pw
+ self.nacl = nacl
+ self.paramversion = paramversion
+
defs = ENCRYPTION_PARAMETERS.get(paramversion)
if defs is None:
raise ValueError ("no encryption parameters for version %r"
(kdf, params) = defs["hash"]
if kdf != "scrypt":
raise ValueError ("key derivation method %r unknown" % kdf)
- if nacl is None: # XXX do we actually want this anywhere?
+
+ if nacl is None:
nacl = os.urandom (params["NaCl_LEN"])
- self.key = pylibscrypt.scrypt (pw, nacl, params["N"], params["r"],
- params["p"], params["dkLen"])
- self.kind = kind
- self.cnt = 1
- self.pfx = [ pfx or os.urandom(8) ]
+
+ N = params["N"]
+ r = params["r"]
+ p = params["p"]
+ dkLen = params["dkLen"]
+
+ key_parms = (pw, nacl, N, r, p, dkLen)
+ if key_parms not in key_memo:
+ key_memo [key_parms] = pylibscrypt.scrypt (pw, nacl, N, r, p, dkLen)
+ self.key = key_memo [key_parms]
+
+ if pfx is not None:
+ self.pfx = pfx
+ if self.pfx is None:
+ self.pfx = os.urandom(8)
+
+
+ def set_parameters_from_header (self, hdr):
+ self.pw = pw
+ self.nacl = nacl
+ self.paramversion = paramversion
+ self.pfx = pfx
+
+
+ def next (self, aad=None, iv=None):
+ ctx.authenticate_additional_data (aad)
+
+ def process (self, buf):
+ self.ctx.update (buf)
+
+
+class Encrypt (Crypto):
+
+ def __init__ (self, pw, nacl, paramversion):
+ super().__init__ (ENCRYPT, pw, nacl, paramversion)
def iv_make (self):
return struct.pack("<8sL", self.pfx, self.cnt % 0xffFFffFF)
- def next (self, aad=None):
- """Set up encryption for new object in stream.
+ def next (filename, ctsize):
+ self.cnt += 1
+ aad = "%.20x|%s" % (ctsize, filename)
+ iv = iv_make()
+ self.aes = Cipher \
+ ( algorithms.AES (self.key)
+ , modes.GCM (iv)
+ , backend = default_backend ()) \
+ .encryptor ()
- if ``aad is None``: end reached, no new context.
- """
- if self.aes is not None:
- (ok, data, tag) = self.aes.done ()
- if ok is False:
- raise
- else:
- data, tag = None, None
+ return super().next(aad)
- if aad is None:
- return
- self.cnt += 1
- try:
- self.aes = AES_GCM_context (self.kind, self.key, aad, iv=iv_make())
- except Exception as exn:
- # XXX devise some error handling strategy
- raise ("write failed with buffer of size %d" % len(buf))
+ def done (self):
+ return self.ctx.finalize ()
- return data, tag
+class Decrypt (Crypto):
- def process (self, buf):
- (ok, res) = self.aes.process_chunk (buf)
- if ok is True:
- return res
- # XXX not clear how tarfile expects to communicate this
- raise IOError ("write failed with buffer of size %d" % len(buf))
+ pfx = None
+
+ def __init__ (self, pw, nacl, paramversion, pfx=None):
+ super().__init__ (DECRYPT, pw, nacl, paramversion)
+ self.pfx = pfx # XXX not needed, right?
+
+
+ def next (filename, hdr):
+ self.cnt += 1
+ aad = "%0.20x|%s" % (hdr["ctsize"], filename)
+ print("I2N: got header “%s”" % crypto.hdr_fmt (hdr))
+ self.aes = Cipher \
+ ( algorithms.AES (key)
+ , modes.GCM (hdr["iv"])
+ , backend = default_backend ()) \
+ . decryptor ()
+ return super().next(aad)
+
+
+ def next_in_source (tarinfo, source):
+ ok, hdr = hdr_read_stream (source)
+ if ok is False:
+ raise DecryptionError("Irrecoverable error reading header from "
+ "%r" % source)
+ return self.next(tarinfo.name, hdr)
+
+
+ def done (self, tag):
+ return self.ctx.finalize_with_tag (tag)
###############################################################################