"16s") # GCM tag
# aes+gcm
-AES_GCM_IV_LEN = 12
-AES_GCM_MAX_SIZE = (1 << 36) - (1 << 5) # 2^39 - 2^8 b ≅ 64 GB
-AES_GCM_FMT_TAG = "<16s"
+AES_GCM_MAX_SIZE = (1 << 36) - (1 << 5) # 2^39 - 2^8 b ≅ 64 GB
+PDTCRYPT_MAX_OBJ_SIZE_DEFAULT = 63 * (1 << 30) # 63 GB
+PDTCRYPT_MAX_OBJ_SIZE = PDTCRYPT_MAX_OBJ_SIZE_DEFAULT
# index and info files are written on-the fly while encrypting so their
# counters must be available inadvance
-AES_GCM_IV_CNT_INFOFILE = 1 # constant
-AES_GCM_IV_CNT_INDEX = AES_GCM_IV_CNT_INFOFILE + 1
-AES_GCM_IV_CNT_DATA = AES_GCM_IV_CNT_INDEX + 1 # also for multivolume
-AES_GCM_IV_CNT_MAX = 0xffFFffFF
+AES_GCM_IV_CNT_INFOFILE = 1 # constant
+AES_GCM_IV_CNT_INDEX = AES_GCM_IV_CNT_INFOFILE + 1
+AES_GCM_IV_CNT_DATA = AES_GCM_IV_CNT_INDEX + 1 # also for multivolume
+AES_GCM_IV_CNT_MAX_DEFAULT = 0xffFFffFF
+AES_GCM_IV_CNT_MAX = AES_GCM_IV_CNT_MAX_DEFAULT
###############################################################################
def process (self, buf):
- if self.enc is not None:
- self.stats ["in"] += len (buf)
- out = self.enc.update (buf)
- self.stats ["out"] += len (out)
- return out
- return b""
+ if self.enc is None:
+ raise RuntimeError ("process: context not initialized")
+ self.stats ["in"] += len (buf)
+ out = self.enc.update (buf)
+ self.stats ["out"] += len (out)
+ return out
def next (self, password, paramversion, nacl, iv):
return struct.pack(FMT_I2N_IV, self.fixed [-1], self.cnt)
- def next (self, filename, counter=None):
- if isinstance (filename, str) is False:
- raise InvalidParameter ("next: filename must be a string, no %s"
- % type (filename))
+ def next (self, filename=None, counter=None):
+ if filename is None:
+ if self.lastinfo is None:
+ raise InvalidParameter ("next: filename is mandatory for "
+ "first object")
+ filename, _dummy = self.lastinfo
+ else:
+ if isinstance (filename, str) is False:
+ raise InvalidParameter ("next: filename must be a string, no %s"
+ % type (filename))
if counter is not None:
if isinstance (counter, int) is False:
raise InvalidParameter ("next: the supplied counter is of "
if isinstance (cmpdata, bytes) is False:
raise InvalidParameter ("done: comparison input expected as bytes, "
"not %s" % type (cmpdata))
+ if self.lastinfo is None:
+ raise RuntimeError ("done: encryption context not initialized")
filename, hdrdum = self.lastinfo
if cmpdata != hdrdum:
raise RuntimeError ("done: bad sync of header for object %d: "
if isinstance (buf, bytes) is False:
raise InvalidParameter ("process: expected byte buffer, not %s"
% type (buf))
- self.ptsize += len (buf)
- data = super().process (buf)
+ bsize = len (buf)
+ newptsize = self.ptsize + bsize
+ diff = newptsize - PDTCRYPT_MAX_OBJ_SIZE
+ if diff > 0:
+ bsize -= diff
+ newptsize = PDTCRYPT_MAX_OBJ_SIZE
+ self.ptsize = newptsize
+ data = super().process (buf [:bsize])
self.ctsize += len (data)
- return data
+ return bsize, data
class Decrypt (Crypto):
## testing helpers
###############################################################################
-def _testing_set_AES_GCM_IV_CNT_MAX (vow, n):
+def _patch_global (glob, vow, n=None):
"""
Adapt upper file counter bound for testing IV logic. Completely unsafe.
"""
assert vow == "I am fully aware that this will void my warranty."
- global AES_GCM_IV_CNT_MAX
- r = AES_GCM_IV_CNT_MAX
- AES_GCM_IV_CNT_MAX = n
+ r = globals () [glob]
+ if n is None:
+ n = globals () [glob + "_DEFAULT"]
+ globals () [glob] = n
return r
+_testing_set_AES_GCM_IV_CNT_MAX = \
+ partial (_patch_global, "AES_GCM_IV_CNT_MAX")
+
+_testing_set_PDTCRYPT_MAX_OBJ_SIZE = \
+ partial (_patch_global, "PDTCRYPT_MAX_OBJ_SIZE")
+
###############################################################################
## freestanding invocation
###############################################################################
if getattr (self, "cmp", None) is not None:
self._finalize_write_gz ()
if self.arcmode & ARCMODE_ENCRYPT:
+ self.__sync ()
self._finalize_write_encrypt ()
self._init_write_encrypt (name, set_last_block_offset=True)
if self.arcmode & ARCMODE_COMPRESS:
Returns the list of IV fixed parts as used during encryption.
"""
if self.lasthdr is not None:
- self.__sync ()
pos0 = self.fileobj.tell ()
self.fileobj.seek_set (self.lasthdr)
dummy = self.fileobj.read (crypto.PDTCRYPT_HDR_SIZE)
self.__write(s)
def __sync(self):
- """Write what’s self in the buffer to the stream."""
+ """Write what’s left in the buffer to the stream."""
self.__write (b"") # → len (buf) <= bufsiz
self.__enc_write (self.buf)
self.buf = b""
def __write_to_file(self, s, pos=None):
'''
Writes directly to the fileobj; updates self.bytes_written. If “pos” is
- given, the streem will seek to that position first and back afterwards,
+ given, the stream will seek to that position first and back afterwards,
and the total of bytes written is not updated.
'''
self.fileobj.write(s, pos)
def __enc_write(self, s):
- '''
- If there's encryption, the string s is encrypted before write it to
- the file
- '''
- tow = s
+ """
+ If encryption is active, the string s is encrypted before being written
+ to the file.
+ """
+ if len (s) == 0:
+ return
if self.arcmode & ARCMODE_ENCRYPT:
- tow = self.encryption.process(s)
- self.__write_to_file(tow)
+ buf = s
+ while len (buf) > 0:
+ n, ct = self.encryption.process(buf)
+ self.__write_to_file(ct)
+ buf = buf [n:]
+ if len (buf) > 0:
+ # The entire plaintext was not consumed: The size limit
+ # for encrypted objects was reached. Transparently create
+ # a new encrypted object and continue processing the input.
+ self._finalize_write_encrypt ()
+ self._init_write_encrypt ()
+ else:
+ self.__write_to_file(s)
+
def estim_file_size(self):
""" estimates size of file if closing it now
if self.arcmode & ARCMODE_COMPRESS:
self._finalize_write_gz ()
# end of Tar archive marker (two empty blocks) was written
- self.__sync()
# finalize encryption last; no writes may be performed after
# this point
+ self.__sync ()
if self.arcmode & ARCMODE_ENCRYPT:
self._finalize_write_encrypt ()
import os, unittest, hashlib, string
import random
+from deltatar import crypto
+
import sys
def new_volume_handler(tarobj, base_name, volume_number, encryption=None):
class AESGCMTest (CryptoLayerTest):
+ def tearDown (self):
+ """Reset globals altered for testing."""
+ _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
+ ("I am fully aware that this will void my warranty.")
+ _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
+ ("I am fully aware that this will void my warranty.")
+
def test_crypto_aes_gcm_enc_ctor (self):
password = str (os.urandom (42))
encryptor = crypto.Encrypt (TEST_VERSION,
header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
assert len (header_dummy) == crypto.PDTCRYPT_HDR_SIZE
- _ = encryptor.process (TEST_PLAINTEXT)
+ _, _ = encryptor.process (TEST_PLAINTEXT)
_, header, _ = encryptor.done (header_dummy)
assert len (header) == crypto.PDTCRYPT_HDR_SIZE
nacl=TEST_STATIC_NACL)
header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
- ciphertext = encryptor.process (TEST_PLAINTEXT)
+ _, ciphertext = encryptor.process (TEST_PLAINTEXT)
assert len (ciphertext) == len (TEST_PLAINTEXT)
rest, header, fixed = encryptor.done (header_dummy)
assert len (rest) == 0
nacl=TEST_STATIC_NACL)
header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
- ciphertext = encryptor.process (TEST_PLAINTEXT)
+ _, ciphertext = encryptor.process (TEST_PLAINTEXT)
rest, header, fixed = encryptor.done (header_dummy)
ciphertext += rest
nacl=TEST_STATIC_NACL)
header_dummy = encryptor.next (TEST_DUMMY_FILENAME)
- ciphertext = encryptor.process (TEST_PLAINTEXT)
+ _, ciphertext = encryptor.process (TEST_PLAINTEXT)
ciphertext2, header, fixed = encryptor.done (header_dummy)
mut_header = bytearray (header)
ct = b""
while off < len (pt):
upto = min (off + cnksiz, len (pt))
- cnk = encryptor.process (pt [off:upto])
+ _, cnk = encryptor.process (pt [off:upto])
ct += cnk
off += cnksiz
cnk, header, fixed = encryptor.done (header_dummy)
ct = b""
while off < len (pt):
upto = min (off + cnksiz, len (pt))
- cnk = encryptor.process (pt [off:upto])
+ _, cnk = encryptor.process (pt [off:upto])
ct += cnk
off += cnksiz
cnk, header, fixed = encryptor.done (header_dummy)
ct = b""
while off < len (pt):
upto = min (off + cnksiz, len (pt))
- cnk = encryptor.process (pt [off:upto])
+ _, cnk = encryptor.process (pt [off:upto])
ct += cnk
off += cnksiz
cnk, header, fixed = encryptor.done (header_dummy)
ct = b""
while off < len (pt):
upto = min (off + cnksiz, len (pt))
- cnk = encryptor.process (pt [off:upto])
+ _, cnk = encryptor.process (pt [off:upto])
ct += cnk
off += cnksiz
cnk, header, fixed = encryptor.done (header_dummy)
Test behavior when the file counter tops out.
Artificially lower the maximum possible file counter. Considering
- invalid (0) and reserved (1, 2) values, the least possible file counter
+ invalid (0) and reserved (1, 2) values, the smallest possible file counter
for normal objects is 3. Starting from that, the header of the (max -
3)rd object must have both a different IV fixed part and a counter.
"""
minimum = 3
new_max = 8
- old_max = crypto._testing_set_AES_GCM_IV_CNT_MAX \
- ("I am fully aware that this will void my warranty.",
- new_max)
+ crypto._testing_set_AES_GCM_IV_CNT_MAX \
+ ("I am fully aware that this will void my warranty.", new_max)
cnksiz = 1 << 10
password = str (os.urandom (42))
encryptor = crypto.Encrypt (TEST_VERSION,
ct = b""
while off < len (pt):
upto = min (off + cnksiz, len (pt))
- cnk = encryptor.process (pt [off:upto])
+ _, cnk = encryptor.process (pt [off:upto])
ct += cnk
off += cnksiz
cnk, header, fixed = encryptor.done (header_dummy)
for j in range (i + 2, i + new_max - 1): addobj (j) # counter range: [4, 8]
addobj (j + 1, True) # counter wraps to 3 again
- _ = crypto._testing_set_AES_GCM_IV_CNT_MAX \
- ("I am fully aware that this will void my warranty.",
- old_max)
+
+ def test_crypto_aes_gcm_enc_length_cap (self):
+ """
+ Artificially lower the maximum allowable data length and attempt to
+ encrypt a larger object. Verify that the crypto handler aborts with and
+ exception.
+ """
+ new_max = 2187
+ crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
+ ("I am fully aware that this will void my warranty.", new_max)
+ cnksiz = 1 << 10
+ password = str (os.urandom (42))
+ encryptor = crypto.Encrypt (TEST_VERSION,
+ TEST_PARAMVERSION,
+ password=password,
+ nacl=TEST_STATIC_NACL)
+
+ def encobj (s):
+ pt, ct = fill_mod (s), None
+ header_dummy = encryptor.next ("%s_%d" % (TEST_DUMMY_FILENAME, s))
+
+ n, ct = encryptor.process (pt)
+ rest, _, _ = encryptor.done (header_dummy)
+ ct += rest
+
+ if len (pt) > new_max:
+ assert n < len (pt)
+ else:
+ assert n == len (pt) == len (ct)
+
+ for i in range (16): encobj (1 << i)
def test_crypto_aes_gcm_dec_multicnk (self):
ct = b""
while off < len (orig_pt):
upto = min (off + cnksiz, len (orig_pt))
- cnk = encryptor.process (orig_pt [off:upto])
+ _n, cnk = encryptor.process (orig_pt [off:upto])
ct += cnk
off += cnksiz
cnk, header, fixed = encryptor.done (header_dummy)
pt = b""
while off < len (orig_pt):
upto = min (off + cnksiz, len (orig_pt))
- cnk = decryptor.process (ct [off:upto])
+ cnk = decryptor.process (ct [off:upto])
pt += cnk
off += cnksiz
ct = b""
while off < len (orig_pt):
upto = min (off + cnksiz, len (orig_pt))
- cnk = encryptor.process (orig_pt [off:upto])
+ _n, cnk = encryptor.process (orig_pt [off:upto])
ct += cnk
off += cnksiz
cnk, header, fixed = encryptor.done (header_dummy)
ct = b""
while off < len (pt):
upto = min (off + cnksiz, len (pt))
- cnk = encryptor.process (pt [off:upto])
+ _n, cnk = encryptor.process (pt [off:upto])
ct += cnk
off += cnksiz
cnk, header, fixed = encryptor.done (header_dummy)
def tearDown(self):
'''
- Remove temporal files created by unit tests
+ Remove temporal files created by unit tests and reset globals.
'''
os.chdir(self.pwd)
os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
+ _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
+ ("I am fully aware that this will void my warranty.")
def test_restore_simple_full_backup(self):
'''
if value:
assert value == self.md5sum(key)
+
+ def test_create_backup_max_file_length (self):
+ """
+ Creates a full backup including one file that exceeds the (purposely
+ lowered) upper bound on GCM encrypted objects. This will yield multiple
+ encrypted objects for one plaintext file.
+
+ Success is verified by splitting the archive at object boundaries and
+ counting the parts.
+ """
+ if self.MODE_COMPRESSES is True:
+ raise SkipTest ("GCM file length test not meaningful with compression.")
+ if self.ENCRYPTION is None:
+ raise SkipTest ("GCM file length applies only to encrypted backups.")
+
+ new_max = 20000 # cannot be less than tar block size
+ crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
+ ("I am fully aware that this will void my warranty.",
+ new_max)
+
+ password, paramversion = self.ENCRYPTION
+ deltatar = DeltaTar (mode=self.MODE, password=password,
+ crypto_paramversion=paramversion,
+ logger=self.consoleLogger)
+
+ self.hash = dict ()
+ os.makedirs ("source_dir2")
+ for f, s in [("empty" , 0) # 1 tar objects
+ ,("slightly_larger", new_max + 1) # 2
+ ,("twice" , 2 * new_max) # 3
+ ]:
+ f = "source_dir2/%s" % f
+ self.hash [f] = self.create_file (f, s)
+
+ deltatar.create_full_backup \
+ (source_path="source_dir2", backup_path="backup_dir")
+
+ assert os.path.exists ("backup_dir")
+ shutil.rmtree ("source_dir2")
+
+ backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
+ backup_path = os.path.join("backup_dir", backup_filename)
+
+ # split the resulting archive into its constituents without
+ # decrypting
+ ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
+ "-o backup_dir/split <\'%s\'" % backup_path)
+
+ assert os.path.exists ("backup_dir/split")
+
+ dents = os.listdir ("backup_dir/split")
+ assert len (dents) == 6
+
+
+ def test_restore_backup_max_file_length (self):
+ """
+ Creates a full backup including one file that exceeds the (purposely
+ lowered) upper bound on GCM encrypted objects. This will yield two
+ encrypted objects for one plaintext file.
+
+ Success is verified by splitting the archive at object boundaries and
+ counting the parts.
+ """
+ if self.MODE_COMPRESSES is True:
+ raise SkipTest ("GCM file length test not meaningful with compression.")
+ if self.ENCRYPTION is None:
+ raise SkipTest ("GCM file length applies only to encrypted backups.")
+
+ new_max = 20000 # cannot be less than tar block size
+ crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
+ ("I am fully aware that this will void my warranty.",
+ new_max)
+
+ password, paramversion = self.ENCRYPTION
+ deltatar = DeltaTar (mode=self.MODE, password=password,
+ crypto_paramversion=paramversion,
+ logger=self.consoleLogger)
+
+ self.hash = dict ()
+ os.makedirs ("source_dir2")
+ for f, s in [("empty" , 0) # 1 tar objects
+ ,("slightly_larger", new_max + 1) # 2
+ ,("twice" , 2 * new_max) # 3
+ ]:
+ f = "source_dir2/%s" % f
+ self.hash [f] = self.create_file (f, s)
+
+ deltatar.create_full_backup \
+ (source_path="source_dir2", backup_path="backup_dir")
+
+ assert os.path.exists ("backup_dir")
+ shutil.rmtree ("source_dir2")
+
+ backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
+ backup_path = os.path.join("backup_dir", backup_filename)
+
+ tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
+ tar_path = os.path.join("backup_dir", tar_filename)
+
+ deltatar.restore_backup(target_path="source_dir2",
+ backup_tar_path=tar_path)
+
+ for key, value in self.hash.items():
+ assert os.path.exists(key)
+ if value:
+ assert value == self.md5sum(key)
+
+
def test_check_index_checksum(self):
'''
Creates a full backup and checks the index' checksum of files
for size_number in range(4,n_sizes):
for order in 1,-1: # small files first or big files first
encryptor = None
- if password is not None:
+ if password is None:
+ encryptor = None # could leak due to scoping
+ else:
encryptor = crypto.Encrypt (password=password, version=1,
paramversion=1)
tarobj = TarFile.open(tar_file_name,