from cryptography.hazmat.backends import default_backend
-__all__ = [ "ENCRYPT", "DECRYPT"
- , "hdr_make", "hdr_read", "hdr_fmt", "hdr_fmt_pretty"
+__all__ = [ "hdr_make", "hdr_read", "hdr_fmt", "hdr_fmt_pretty"
, "I2N_HDR_SIZE", "I2N_TLR_SIZE_TAG" ]
return tag_read (data)
-
###############################################################################
## convenience wrapper
###############################################################################
+KEY_MEMO = { } # static because needed for both the info file and the archive
+
+
+def kdf_by_version (paramversion):
+ defs = ENCRYPTION_PARAMETERS.get(paramversion)
+ if defs is None:
+ raise ValueError ("no encryption parameters for version %r"
+ % paramversion)
+ (kdf, params) = defs["kdf"]
+ if kdf != "scrypt":
+ raise ValueError ("key derivation method %r unknown" % kdf)
+ return kdf, params
+
+
class Crypto (object):
"""
pfx = None # 64 bit fixed parts of IV
cnt = None
- key_memo = { }
-
- def __init__ (self):
+ def __init__ (self, *al, **akv):
self.cnt = 1
+ self.set_parameters (*al, **akv)
- def set_parameters (self, pw, nacl, paramversion, pfx=None):
- self.pw = pw
+ def set_parameters (self, password, paramversion, nacl, pfx=None):
+ if isinstance (password, bytes) is False: password = str.encode (password)
+ self.password = password
self.nacl = nacl
self.paramversion = paramversion
-
- defs = ENCRYPTION_PARAMETERS.get(paramversion)
- if defs is None:
- raise ValueError ("no encryption parameters for version %r"
- % paramversion)
- (kdf, params) = defs["hash"]
- if kdf != "scrypt":
- raise ValueError ("key derivation method %r unknown" % kdf)
-
- if nacl is None:
- nacl = os.urandom (params["NaCl_LEN"])
+ (kdf, params) = kdf_by_version (paramversion)
N = params["N"]
r = params["r"]
p = params["p"]
dkLen = params["dkLen"]
- key_parms = (pw, nacl, N, r, p, dkLen)
- if key_parms not in key_memo:
- key_memo [key_parms] = pylibscrypt.scrypt (pw, nacl, N, r, p, dkLen)
- self.key = key_memo [key_parms]
+ key_parms = (password, nacl, N, r, p, dkLen)
+ global KEY_MEMO
+ if key_parms not in KEY_MEMO:
+ print(">> new key, memoize parms %s" % repr (key_parms))
+ KEY_MEMO [key_parms] = pylibscrypt.scrypt (password, nacl, N, r, p, dkLen)
+ else:
+ print(">> use memoized key for parms %s" % repr (key_parms))
+ self.key = KEY_MEMO [key_parms]
if pfx is not None:
self.pfx = pfx
def set_parameters_from_header (self, hdr):
- self.pw = pw
+ self.password = password
self.nacl = nacl
self.paramversion = paramversion
self.pfx = pfx
def next (self, aad=None, iv=None):
- ctx.authenticate_additional_data (aad)
+ self.aes.authenticate_additional_data (aad)
+
def process (self, buf):
- self.ctx.update (buf)
+ self.aes.update (buf)
class Encrypt (Crypto):
- def __init__ (self, pw, nacl, paramversion):
- super().__init__ (ENCRYPT, pw, nacl, paramversion)
+ def __init__ (self, password, paramversion, nacl=None):
+ if nacl is None:
+ _, params = kdf_by_version (paramversion)
+ nacl = os.urandom (params["NaCl_LEN"])
+ super().__init__ (password, paramversion, nacl)
def iv_make (self):
return struct.pack("<8sL", self.pfx, self.cnt % 0xffFFffFF)
- def next (filename, ctsize):
+ def next (self, filename, ctsize):
self.cnt += 1
aad = "%.20x|%s" % (ctsize, filename)
iv = iv_make()
def done (self):
- return self.ctx.finalize ()
+ return self.aes.finalize ()
class Decrypt (Crypto):
pfx = None
- def __init__ (self, pw, nacl, paramversion, pfx=None):
- super().__init__ (DECRYPT, pw, nacl, paramversion)
- self.pfx = pfx # XXX not needed, right?
+ def __init__ (self, password, paramversion, nacl):
+ super().__init__ (password, paramversion, nacl)
- def next (filename, hdr):
+ def next (self, filename, hdr):
self.cnt += 1
aad = "%0.20x|%s" % (hdr["ctsize"], filename)
print("I2N: got header ā%sā" % crypto.hdr_fmt (hdr))
return super().next(aad)
- def next_in_source (tarinfo, source):
+ def next_in_source (self, tarinfo, source):
ok, hdr = hdr_read_stream (source)
if ok is False:
raise DecryptionError("Irrecoverable error reading header from "
def done (self, tag):
- return self.ctx.finalize_with_tag (tag)
+ return self.aes.finalize_with_tag (tag)
###############################################################################
# used together with aes modes to encrypt and decrypt backups.
password = None
+ # When encrypting, the salt is created by the first crypto operation, i. e.
+ # opening the index for writing. The subsequently opened archive uses the
+ # same salt.
+ nacl = None
+
# python logger object.
logger = None
# init index
index_name = self.index_name_func(True)
index_path = os.path.join(backup_path, index_name)
- index_fd = self.open_index(index_path, 'w')
+ index_fd = self.open_index(index_path, 'w') # **NOT** an fd
+ if index_fd.encryption is not None:
+ self.nacl = index_fd.encryption.nacl
cwd = os.getcwd()
format=tarfile.GNU_FORMAT,
concat_compression='#gz' in self.mode,
password=self.password,
+ nacl=self.nacl,
max_volume_size=max_volume_size,
new_volume_handler=new_volume_handler,
save_to_members=False,
dereference=True)
-
-
os.chdir(source_path)
# for each file to be in the backup, do:
# init index
index_name = self.index_name_func(is_full=False)
index_path = os.path.join(backup_path, index_name)
- index_fd = self.open_index(index_path, 'w')
+ index_fd = self.open_index(index_path, 'w') # **NOT** an fd
+ if index_fd.encryption is not None:
+ self.nacl = index_fd.encryption.nacl
cwd = os.getcwd()
format=tarfile.GNU_FORMAT,
concat_compression='#gz' in self.mode,
password=self.password,
+ nacl=self.nacl,
max_volume_size=max_volume_size,
new_volume_handler=new_volume_handler,
save_to_members=False,
format=tarfile.GNU_FORMAT,
concat_compression='#gz' in self.delta_tar.mode,
password=self.delta_tar.password,
+ nacl=self.delta_tar.nacl,
new_volume_handler=self.new_volume_handler,
save_to_members=False,
dereference=True)
format=tarfile.GNU_FORMAT,
concat_compression='#gz' in self.mode,
password=self.password,
+ nacl=self.nacl,
new_volume_handler=new_volume_handler,
save_to_members=False,
dereference=True)
format=tarfile.GNU_FORMAT,
concat_compression='#gz' in self._deltatar.mode,
password=self._deltatar.password,
+ nacl=self._deltatar.nacl,
new_volume_handler=index_data['new_volume_handler'],
save_to_members=False)