###############################################################################
+## passthrough / null encryption
+###############################################################################
+
+class PassthroughCipher (object):
+
+ tag = struct.pack ("<QQ", 0, 0)
+
+ def __init__ (self) : pass
+
+ def update (self, b) : return b
+
+ def finalize (self, ) : return b""
+
+ def finalize_with_tag (self, _) : return b""
+
+###############################################################################
## convenience wrapper
###############################################################################
"""
Encryption context to remain alive throughout an entire tarfile pass.
"""
- aes = None
+ enc = None
nacl = None
key = None
pfx = None # 64 bit fixed parts of IV
state = STATE_FRESH
ctsize = -1
ptsize = -1
+ paramversion = None
def __init__ (self, *al, **akv):
self.set_parameters (*al, **akv)
if paramversion is None and nacl is None:
# postpone until first header is available
return
- kdf = kdf_by_version (paramversion)
+ kdf = kdf_by_version (paramversion)
if kdf is not None:
self.key, self.nacl = kdf (password, nacl)
if self.pfx is None:
self.pfx = os.urandom(8)
- self.nacl = nacl
self.paramversion = paramversion
def process (self, buf):
- if self.aes is not None:
+ if self.enc is not None:
self.stats ["in"] += len (buf)
- out = self.aes.update (buf)
+ out = self.enc.update (buf)
self.stats ["out"] += len (out)
return out
return b""
- def next (self):
+ def next (self, password, paramversion, nacl):
self.ctsize = 0
self.ptsize = 0
self.stats ["obj"] += 1
self.state = STATE_LIVE
+ if ( self.paramversion != paramversion
+ or self.password != password
+ or self.nacl != nacl):
+ self.set_parameters (password, paramversion, nacl)
def counters (self):
self.iv = self.iv_make()
self.curobj = (filename, version, paramversion, nacl or self.nacl)
self.cnt += 1
- self.aes = Cipher \
- ( algorithms.AES (self.key)
- , modes.GCM (self.iv)
- , backend = default_backend ()) \
- .encryptor ()
- # XXX figure out what we want for AAD. Filename (not known to stream)?
- # Size?
- #aad = "%s" % filename
- #self.aes.authenticate_additional_data (str.encode (aad))
-
+ defs = ENCRYPTION_PARAMETERS.get(paramversion)
+ enc = defs ["enc"]
+ if enc == "aes-gcm":
+ self.enc = Cipher \
+ ( algorithms.AES (self.key)
+ , modes.GCM (self.iv)
+ , backend = default_backend ()) \
+ .encryptor ()
+ elif enc == "passthrough":
+ self.enc = PassthroughCipher ()
+ else:
+ raise Exception ("XXX garbage encryption parameter %d → %r"
+ % (paramversion, enc))
self.hdrdum = hdr_make_dummy (filename)
- super().next ()
+ super().next (self.password, paramversion, nacl)
return self.hdrdum
def done (self, cmpdata):
if cmpdata != self.hdrdum:
raise Exception ("XXX bad sync for writing header") ## we need to converge on a sensible error handling strategy
- data = self.aes.finalize ()
+ data = self.enc.finalize ()
self.stats ["out"] += len (data)
self.ctsize += len (data)
(filename, version, paramversion, nacl) = self.curobj
ok, hdr = hdr_from_params (version, paramversion, nacl, self.iv,
- self.ctsize, self.aes.tag)
+ self.ctsize, self.enc.tag)
if ok is False:
raise Exception ("XXX error constructing header: %r" % hdr) ## we need to converge on a sensible error handling strategy
self.state = STATE_DEAD
def next (self, hdr):
if self.key is None:
- super().__init__ (self.password, hdr ["paramversion"], hdr ["nacl"])
+ super().next (self.password, hdr ["paramversion"], hdr ["nacl"])
self.cnt += 1
iv = hdr ["iv"]
self.tag = hdr ["tag"]
- self.aes = Cipher \
- ( algorithms.AES (self.key)
- , modes.GCM (hdr["iv"], tag=self.tag)
- , backend = default_backend ()) \
- . decryptor ()
- # XXX figure out what we want for AAD. Filename (not known to stream)?
- # Size?
- #self.aes.authenticate_additional_data (str.encode (aad))
-
- def next_in_source (self, tarinfo, source):
- ok, hdr = hdr_read_stream (source)
- if ok is False:
- raise DecryptionError("Irrecoverable error reading header from "
- "%r" % source)
- return self.next(hdr)
+ if enc == "aes-gcm":
+ self.enc = Cipher \
+ ( algorithms.AES (self.key)
+ , modes.GCM (hdr["iv"], tag=self.tag)
+ , backend = default_backend ()) \
+ . decryptor ()
+ elif enc == "passthrough":
+ self.enc = PassthroughCipher ()
+ else:
+ raise Exception ("XXX garbage encryption parameter %d → %r"
+ % (paramversion, enc))
def done (self, tag=None):
data = b""
try:
if tag is None:
- ret, data = True, self.aes.finalize ()
+ ret, data = True, self.enc.finalize ()
else:
- ret, data = self.aes.finalize_with_tag (self.tag)
+ ret, data = self.enc.finalize_with_tag (self.tag)
except crypto.cryptography.exceptions.InvalidTag as exn:
return False, repr (exn)
self.stats ["out"] += len (data)