self.next_pfx ()
- def set_parameters (self, password, paramversion=None, nacl=None,
- counter=None, nextpfx=None):
+ def set_parameters (self, password=None, key=None, paramversion=None,
+ nacl=None, counter=None, nextpfx=None):
if nextpfx is not None:
self.next_pfx = nextpfx
self.next_pfx ()
self.set_object_counter (counter)
+
+ if key is not None:
+ self.key, self.nacl = key, nacl
+ return
+
if isinstance (password, bytes) is False: password = str.encode (password)
self.password = password
if paramversion is None and nacl is None:
if ( self.paramversion != paramversion
or self.password != password
or self.nacl != nacl):
- self.set_parameters (password, paramversion, nacl)
+ self.set_parameters (password=password, paramversion=paramversion,
+ nacl=nacl)
def counters (self):
version = None
paramenc = None
- def __init__ (self, password, version, paramversion, nacl=None,
+ def __init__ (self, version, paramversion, password=None, key=None, nacl=None,
counter=AES_GCM_IV_CNT_DATA):
- # passwort
- if isinstance (password, str) is False:
- raise InvalidParameter ("__init__: password must be a string, not %s"
- % type (password))
- if len (password) == 0:
- raise InvalidParameter ("__init__: refusing to encrypt with empty "
- "password")
+ if password is None and key is None \
+ or password is not None and key is not None :
+ raise InvalidParameter ("__init__: need either key or password")
+
+ if key is not None:
+ if isinstance (key, bytes) is False:
+ raise InvalidParameter ("__init__: key must be provided as "
+ "bytes, not %s" % type (key))
+ if nacl is None:
+ raise InvalidParameter ("__init__: salt must be provided along "
+ "with encryption key")
+ else: # password, no key
+ if isinstance (password, str) is False:
+ raise InvalidParameter ("__init__: password must be a string, not %s"
+ % type (password))
+ if len (password) == 0:
+ raise InvalidParameter ("__init__: supplied empty password but not "
+ "permitted for PDT encrypted files")
# version
if isinstance (version, int) is False:
raise InvalidParameter ("__init__: version number must be an "
self.version = version
self.paramenc = ENCRYPTION_PARAMETERS.get (paramversion) ["enc"]
- super().__init__ (password, paramversion, nacl, counter=counter,
+ super().__init__ (password, key, paramversion, nacl, counter=counter,
nextpfx=lambda: self.fixed.append (os.urandom(8)))
strict_ivs = False # if True, panic on duplicate object IV
last_iv = None # check consecutive ivs in strict mode
- def __init__ (self, password, counter=None, fixedparts=None,
+ def __init__ (self, password=None, key=None, counter=None, fixedparts=None,
strict_ivs=False):
- # passwort
- if isinstance (password, str) is False:
- raise InvalidParameter ("__init__: password must be a string, not %s"
- % type (password))
- if len (password) == 0:
- raise InvalidParameter ("__init__: supplied empty password but not "
- "permitted for PDT encrypted files")
+ if password is None and key is None \
+ or password is not None and key is not None :
+ raise InvalidParameter ("__init__: need either key or password")
+
+ if key is not None:
+ if isinstance (key, bytes) is False:
+ raise InvalidParameter ("__init__: key must be provided as "
+ "bytes, not %s" % type (key))
+ else: # password, no key
+ if isinstance (password, str) is False:
+ raise InvalidParameter ("__init__: password must be a string, not %s"
+ % type (password))
+ if len (password) == 0:
+ raise InvalidParameter ("__init__: supplied empty password but not "
+ "permitted for PDT encrypted files")
# fixed parts
if fixedparts is not None:
if isinstance (fixedparts, list) is False:
% type (fixedparts))
self.fixed = fixedparts
self.fixed.sort ()
- super().__init__ (password, counter=counter)
self.used_ivs = set ()
self.strict_ivs = strict_ivs
- super().__init__ (password, counter=counter)
+ super().__init__ (password, key, counter=counter)
def valid_fixed_part (self, iv):
outfile = None # Python file object for output
if mode & PDTCRYPT_DECRYPT: # decryptor
- decr = Decrypt (pw, strict_ivs=PDTCRYPT_STRICTIVS)
+ decr = Decrypt (password=pw, strict_ivs=PDTCRYPT_STRICTIVS)
else:
decr = PassthroughDecryptor ()
# used together with aes modes to encrypt and decrypt backups.
password = None
+ crypto_key = None
+ nacl = None
# parameter version to use when encrypting; note that this has no effect
# on decryption since the required settings are determined from the headers
def __init__(self, excluded_files=[], included_files=[],
filter_func=None, mode="", password=None,
+ crypto_key=None, nacl=None,
crypto_paramversion=DELTATAR_PARAMETER_VERSION,
logger=None, index_mode=None, index_name_func=None,
volume_name_func=None):
'|bz2' open a bzip2 compressed stream of tar blocks
'#gz' open a stream of gzip compressed tar blocks
+ - crypto_key: used to encrypt and decrypt backups. Encryption will
+ be enabled automatically if a key is supplied. Requires a salt to be
+ passed as well.
+
+ - nacl: salt that was used to derive the encryption key for embedding
+ in the PDTCRYPT header. Not needed when decrypting and when
+ encrypting with password.
+
- password: used to encrypt and decrypt backups. Encryption will be
enabled automatically if a password is supplied.
self.logger.addHandler(logger)
self.mode = mode
+ if crypto_key is not None:
+ self.crypto_key = crypto_key
+ self.nacl = nacl # encryption only
+
if password is not None:
self.password = password
def initialize_encryption (self, mode):
password = self.password
+ key = self.crypto_key
+ nacl = self.nacl
- if password is None:
+ if key is None and password is None:
return
if mode == CRYPTO_MODE_ENCRYPT:
- return crypto.Encrypt (password,
+ return crypto.Encrypt (password=password,
+ key=key,
+ nacl=nacl,
version=DELTATAR_HEADER_VERSION,
paramversion=self.crypto_paramversion)
if mode == CRYPTO_MODE_DECRYPT:
- return crypto.Decrypt (password)
+ return crypto.Decrypt (password=password, key=key)
raise Exception ("invalid encryption mode [%r]" % mode)
if self.tar_obj is None:
decryptor = None
if self.delta_tar.password is not None:
- decryptor = crypto.Decrypt (self.delta_tar.password)
+ decryptor = crypto.Decrypt \
+ (password=self.delta_tar.password,
+ key=self.delta_tar.crypto_key)
self.tar_obj = tarfile.TarFile.open(self.tar_path,
mode='r' + self.delta_tar.mode,
format=tarfile.GNU_FORMAT,
self._deltatar = deltatar
self._cwd = cwd
self._password = deltatar.password
+ self._crypto_key = deltatar.crypto_key
self._decryptors = []
try:
decryptor = None
if self._password is not None:
- decryptor = crypto.Decrypt (self._password)
+ decryptor = crypto.Decrypt (password=self._password,
+ key=self._crypto_key)
# make paths absolute to avoid cwd problems
if not os.path.isabs(index):
def test_crypto_aes_gcm_enc_ctor (self):
password = str (os.urandom (42))
- encryptor = crypto.Encrypt (password, TEST_VERSION,
- TEST_PARAMVERSION, nacl=TEST_STATIC_NACL)
+ encryptor = crypto.Encrypt (TEST_VERSION,
+ TEST_PARAMVERSION,
+ password=password,
+ nacl=TEST_STATIC_NACL)
+
+
+ def test_crypto_aes_gcm_enc_ctor_key (self):
+ key = os.urandom (42)
+ encryptor = crypto.Encrypt (TEST_VERSION,
+ TEST_PARAMVERSION,
+ key=key,
+ nacl=TEST_STATIC_NACL)
+
+
+ def test_crypto_aes_gcm_enc_ctor_no_key_pw (self):
+ """
+ Either key (+nacl) or password must be supplied, not both.
+ """
+ try:
+ encryptor = crypto.Encrypt (TEST_VERSION,
+ TEST_PARAMVERSION,
+ nacl=TEST_STATIC_NACL)
+ except crypto.InvalidParameter: # neither key nor pw
+ pass
+
+ password = str (os.urandom (42))
+ key = os.urandom (16) # scrypt sized
+ try:
+ encryptor = crypto.Encrypt (TEST_VERSION,
+ TEST_PARAMVERSION,
+ password=password,
+ key=key,
+ nacl=TEST_STATIC_NACL)
+ except crypto.InvalidParameter: # both key and pw
+ pass
+
+ try:
+ encryptor = crypto.Encrypt (TEST_VERSION,
+ TEST_PARAMVERSION,
+ key=key,
+ nacl=None)
+ except crypto.InvalidParameter: # key, but salt missing
+ pass
+
+ try:
+ encryptor = crypto.Encrypt (TEST_VERSION,
+ TEST_PARAMVERSION,
+ password=b"",
+ nacl=TEST_STATIC_NACL)
+ except crypto.InvalidParameter: # empty pw
+ pass
def test_crypto_aes_gcm_enc_header_size (self):
password = str (os.urandom (42))
- encryptor = crypto.Encrypt (password, TEST_VERSION,
+ encryptor = crypto.Encrypt (TEST_VERSION,
TEST_PARAMVERSION,
+ password=password,
nacl=TEST_STATIC_NACL)
header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
def test_crypto_aes_gcm_enc_chunk_size (self):
password = str (os.urandom (42))
- encryptor = crypto.Encrypt (password, TEST_VERSION,
+ encryptor = crypto.Encrypt (TEST_VERSION,
TEST_PARAMVERSION,
+ password=password,
nacl=TEST_STATIC_NACL)
header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
assert len (rest) == 0
+ def test_crypto_aes_gcm_dec_ctor (self):
+ """
+ Ensure that only either key or password is accepted.
+ """
+ password = str (os.urandom (42))
+ key = os.urandom (16) # scrypt sized
+
+ decryptor = crypto.Decrypt (password=password)
+ decryptor = crypto.Decrypt (key=key)
+
+ try:
+ decryptor = crypto.Decrypt (password=password, key=key)
+ except crypto.InvalidParameter: # both password and key
+ pass
+
+ try:
+ decryptor = crypto.Decrypt (password=None, key=None)
+ except crypto.InvalidParameter: # neither password nor key
+ pass
+
+ try:
+ decryptor = crypto.Decrypt (password="")
+ except crypto.InvalidParameter: # empty password
+ pass
+
def test_crypto_aes_gcm_dec_simple (self):
password = str (os.urandom (42))
- encryptor = crypto.Encrypt (password, TEST_VERSION,
+ encryptor = crypto.Encrypt (TEST_VERSION,
TEST_PARAMVERSION,
+ password=password,
nacl=TEST_STATIC_NACL)
header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
rest, header, fixed = encryptor.done (header_dummy)
ciphertext += rest
- decryptor = crypto.Decrypt (password, fixedparts=fixed)
+ decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
decryptor.next (header)
plaintext = decryptor.process (ciphertext)
rest = decryptor.done ()
def test_crypto_aes_gcm_dec_bad_tag (self):
password = str (os.urandom (42))
- encryptor = crypto.Encrypt (password, TEST_VERSION,
+ encryptor = crypto.Encrypt (TEST_VERSION,
TEST_PARAMVERSION,
+ password=password,
nacl=TEST_STATIC_NACL)
header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
header = bytes (mut_header)
- decryptor = crypto.Decrypt (password, fixedparts=fixed)
+ decryptor = crypto.Decrypt (password=password, fixedparts=fixed)
decryptor.next (header)
plaintext = decryptor.process (ciphertext)
try:
cnksiz = 1 << 10
pt = fill_mod (1 << 14)
password = str (os.urandom (42))
- encryptor = crypto.Encrypt (password, TEST_VERSION,
+ encryptor = crypto.Encrypt (TEST_VERSION,
TEST_PARAMVERSION,
+ password=password,
nacl=TEST_STATIC_NACL)
header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
cnksiz = 1 << 10
orig_pt = fill_mod (1 << 14)
password = str (os.urandom (42))
- encryptor = crypto.Encrypt (password, TEST_VERSION,
+ encryptor = crypto.Encrypt (TEST_VERSION,
TEST_PARAMVERSION,
+ password=password,
nacl=TEST_STATIC_NACL)
header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
cnk, header, fixed = encryptor.done (header_dummy)
ct += cnk
- decryptor = crypto.Decrypt (password, fixedparts=fixed)
+ decryptor = crypto.Decrypt (password=password,
+ fixedparts=fixed)
decryptor.next (header)
off = 0
pt = b""
cnksiz = 1 << 10
orig_pt = fill_mod (1 << 14)
password = str (os.urandom (42))
- encryptor = crypto.Encrypt (password, TEST_VERSION,
+ encryptor = crypto.Encrypt (TEST_VERSION,
TEST_PARAMVERSION,
+ password=password,
nacl=TEST_STATIC_NACL)
header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
mut_header_vw [crypto.HDR_OFF_TAG + 2] = (second_byte + 0x2a) % 256
header = bytes (mut_header)
- decryptor = crypto.Decrypt (password, fixedparts=fixed)
+ decryptor = crypto.Decrypt (password=password,
+ fixedparts=fixed)
decryptor.next (header)
off = 0
pt = b""
orig_pt_1 = fill_mod (1 << 10)
orig_pt_2 = fill_mod (1 << 10, 42)
password = str (os.urandom (42))
- encryptor = crypto.Encrypt (password, TEST_VERSION,
+ encryptor = crypto.Encrypt (TEST_VERSION,
TEST_PARAMVERSION,
+ password=password,
nacl=TEST_STATIC_NACL)
def enc (pt):
# transplant into other header
mut_hdr_2_vw [iv_lo : iv_hi] = iv_1
hdr_2_mod = bytes (mut_hdr_2)
- decryptor = crypto.Decrypt (password, fixedparts=fixed,
+ decryptor = crypto.Decrypt (password=password, fixedparts=fixed,
strict_ivs=True)
def dec (hdr, ct):
encryptor = None
if password is not None:
- encryptor = crypto.Encrypt (password, 1, 1)
+ encryptor = crypto.Encrypt (1, 1, password=password)
tarobj = TarFile.open(temp_name, mode=mode,
max_volume_size=volume_size*MiB,
print(prefix + 'extracting:')
decryptor = None
if password is not None:
- decryptor = crypto.Decrypt (password)
+ decryptor = crypto.Decrypt (password=password)
tarobj = TarFile.open(temp_name, mode=mode.replace('w', 'r'),
new_volume_handler=new_volume_handler,
encryption=decryptor, debug=debug_level)