From: Philipp Gesang Date: Thu, 10 Aug 2017 13:32:16 +0000 (+0200) Subject: add test for corruption of encrypted files X-Git-Tag: v2.2~7^2~84 X-Git-Url: http://developer.intra2net.com/git/?a=commitdiff_plain;h=96fe639921bd468e6dcd0534252da58417af68f6;p=python-delta-tar add test for corruption of encrypted files --- diff --git a/runtests.py b/runtests.py index 3ea1e6e..3605964 100755 --- a/runtests.py +++ b/runtests.py @@ -22,7 +22,7 @@ import unittest from testing.test_crypto import HeaderTest, AESGCMTest from testing.test_multivol import MultivolGnuFormatTest, MultivolPaxFormatTest from testing.test_concat_compress import ConcatCompressTest -from testing.test_recover import RecoverTest +from testing.test_recover import RecoverTest, RecoverGZTest, RecoverGZAESTest from testing.test_rescue_tar import RescueTarTest from testing.test_encryption import EncryptionTest from testing.test_deltatar import (DeltaTarTest, DeltaTar2Test, @@ -56,7 +56,8 @@ if __name__ == "__main__": , DeltaTarGzipAes128ConcatTest , DeltaTarAes128ConcatTest , HeaderTest, AESGCMTest - , RecoverTest + # testing.test_recover + , RecoverTest, RecoverGZTest, RecoverGZAESTest ]: try: t = group (n) diff --git a/testing/test_recover.py b/testing/test_recover.py index 5c84783..52dcbee 100644 --- a/testing/test_recover.py +++ b/testing/test_recover.py @@ -8,6 +8,12 @@ import deltatar.tarfile as tarfile from . import BaseTest +TEST_PASSWORD = "test1234" + +############################################################################### +## helpers ## +############################################################################### + def flip_bits (fname, off, b=0x01, n=1): """ Open file *fname* at offset *off*, replacing the next *n* bytes with @@ -48,37 +54,59 @@ def gz_header_size (fname, off=0): return off +def is_pdt_encrypted (fname): + """ + Returns true if the file contains at least one PDT header plus enough + space for the object. + """ + try: + with open (fname, "rb") as st: + hdr = crypto.hdr_read_stream (st) + siz = hdr ["ctsize"] + assert (len (st.read (siz)) == siz) + except Exception as exn: + return False + return True + + +############################################################################### +## tests ## +############################################################################### class RecoverTest (BaseTest): """ Disaster recovery: restore corrupt backups. """ - GIT_DIR = '.git' + COMPRESSION = None + PASSWORD = None + FAILURES = 0 + def setUp(self): ''' Create base test data ''' - self.pwd = os.getcwd() + self.pwd = os.getcwd() + self.dst_path = "source_dir" + self.src_path = "%s2" % self.dst_path + self.hash = dict() + os.system('rm -rf target_dir source_dir* backup_dir* huge') - os.makedirs('source_dir/test/test2') - self.hash = dict() - self.hash["source_dir/test/test2"] = '' - self.hash["source_dir/big"] = self.create_file("source_dir/big", 50000) - self.hash["source_dir/small"] = self.create_file("source_dir/small", 100) - self.hash["source_dir/test/huge"] = self.create_file("source_dir/test/huge", 700000) - self.hash["source_dir/test/huge2"] = self.create_file("source_dir/test/huge2", 800000) + os.makedirs (self.src_path) - self.consoleLogger = logging.StreamHandler() - self.consoleLogger.setLevel(logging.DEBUG) + for i in range (5): + f = "dummy_%rd" % i + self.hash [f] = self.create_file ("%s/%s" + % (self.src_path, f), 5 + i) - if not os.path.isdir(self.GIT_DIR): - # Not running inside git tree, take our - # own testing directory as source. - self.GIT_DIR = 'testing' - if not os.path.isdir(self.GIT_DIR): - raise Exception('No input directory found: ' + self.GIT_DIR) + + def tearDown(self): + ''' + Remove temporal files created by unit tests and reset globals. + ''' + os.chdir(self.pwd) + os.system("rm -rf source_dir source_dir2 backup_dir*") def test_recover_corrupt_byte (self): @@ -88,68 +116,97 @@ class RecoverTest (BaseTest): Expects the extraction to fail in normal mode. With disaster recovery, extraction must succeed, and exactly one file must be missing. """ - mode = "#gz" - dst_path = "source_dir" - src_path = "%s2" % dst_path + mode = self.COMPRESSION or "#" bak_path = "backup_dir" - backup_file = "the_full_backup.tar.gz" + backup_file = "the_full_backup.tar" backup_full = "%s/%s" % (bak_path, backup_file) - index_file = "the_full_index.gz" + index_file = "the_full_index" + + if self.COMPRESSION is not None: + backup_file += ".gz" + backup_full += ".gz" + index_file += ".gz" + + if self.PASSWORD is not None: + backup_file += deltatar.PDTCRYPT_EXTENSION + backup_full += deltatar.PDTCRYPT_EXTENSION + index_file += deltatar.PDTCRYPT_EXTENSION def vname (*a, **kwa): return backup_file - dtar = deltatar.DeltaTar (mode="#gz", - logger=self.consoleLogger, + dtar = deltatar.DeltaTar (mode=mode, + logger=None, + password=self.PASSWORD, index_name_func=lambda _: index_file, volume_name_func=vname) - self.hash = dict () - os.makedirs (src_path) - for i in range (5): - f = "dummy_%rd" % i - self.hash [f] = self.create_file ("%s/%s" % (src_path, f), 5 + i) dtar.create_full_backup \ - (source_path=src_path, backup_path=bak_path) + (source_path=self.src_path, backup_path=bak_path) + + if self.PASSWORD is not None: + # ensure all files are at least superficially in PDT format + for f in os.listdir (bak_path): + assert is_pdt_encrypted ("%s/%s" % (bak_path, f)) # first restore must succeed - dtar.restore_backup(target_path=dst_path, + dtar.restore_backup(target_path=self.dst_path, backup_tar_path=backup_full) for key, value in self.hash.items (): - f = "%s/%s" % (dst_path, key) + f = "%s/%s" % (self.dst_path, key) assert os.path.exists (f) assert value == self.md5sum (f) - shutil.rmtree (dst_path) - shutil.rmtree (src_path) + shutil.rmtree (self.dst_path) + shutil.rmtree (self.src_path) - flip_bits (backup_full, gz_header_size (backup_full) + 1) + if self.PASSWORD is not None: + flip_bits (backup_full, crypto.PDTCRYPT_HDR_SIZE + 1) + elif self.COMPRESSION is not None: + flip_bits (backup_full, gz_header_size (backup_full) + 1) + else: + flip_bits (backup_full, tarfile.BLOCKSIZE + 1) # normal restore must fail - curdir = os.getcwd () # not restored after below failure - with self.assertRaises (Exception) as cm: - dtar.restore_backup(target_path=dst_path, + try: + dtar.restore_backup(target_path=self.dst_path, backup_tar_path=backup_full) - assert type (cm.exception) in [ tarfile.CompressionError - , tarfile.ReadError - ] - - os.chdir (curdir) + except tarfile.CompressionError: + if self.PASSWORD is not None or self.COMPRESSION is not None: + pass + except tarfile.ReadError: + if self.PASSWORD is not None or self.COMPRESSION is not None: + pass + + os.chdir (self.pwd) # not restored due to the error above # but recover will succeed - failed = dtar.recover_backup(target_path=dst_path, + failed = dtar.recover_backup(target_path=self.dst_path, backup_indexes_paths=[ "%s/%s" % (bak_path, index_file) ]) - assert len (failed) == 1 + + assert len (failed) == self.FAILURES # with one file missing missing = [] for key, value in self.hash.items (): - kkey = "%s/%s" % (dst_path, key) + kkey = "%s/%s" % (self.dst_path, key) if os.path.exists (kkey): assert value == self.md5sum (kkey) else: missing.append (key) - assert len (missing) == 1 + assert len (missing) == self.FAILURES + + shutil.rmtree (self.dst_path) + + +class RecoverGZTest (RecoverTest): + COMPRESSION = "#gz" + PASSWORD = None + FAILURES = 1 + - shutil.rmtree (dst_path) +class RecoverGZAESTest (RecoverTest): + COMPRESSION = "#gz" + PASSWORD = TEST_PASSWORD + FAILURES = 1