from . import BaseTest
+TEST_PASSWORD = "test1234"
+
+###############################################################################
+## helpers ##
+###############################################################################
+
def flip_bits (fname, off, b=0x01, n=1):
"""
Open file *fname* at offset *off*, replacing the next *n* bytes with
return off
+def is_pdt_encrypted (fname):
+ """
+ Returns true if the file contains at least one PDT header plus enough
+ space for the object.
+ """
+ try:
+ with open (fname, "rb") as st:
+ hdr = crypto.hdr_read_stream (st)
+ siz = hdr ["ctsize"]
+ assert (len (st.read (siz)) == siz)
+ except Exception as exn:
+ return False
+ return True
+
+
+###############################################################################
+## tests ##
+###############################################################################
class RecoverTest (BaseTest):
"""
Disaster recovery: restore corrupt backups.
"""
- GIT_DIR = '.git'
+ COMPRESSION = None
+ PASSWORD = None
+ FAILURES = 0
+
def setUp(self):
'''
Create base test data
'''
- self.pwd = os.getcwd()
+ self.pwd = os.getcwd()
+ self.dst_path = "source_dir"
+ self.src_path = "%s2" % self.dst_path
+ self.hash = dict()
+
os.system('rm -rf target_dir source_dir* backup_dir* huge')
- os.makedirs('source_dir/test/test2')
- self.hash = dict()
- self.hash["source_dir/test/test2"] = ''
- self.hash["source_dir/big"] = self.create_file("source_dir/big", 50000)
- self.hash["source_dir/small"] = self.create_file("source_dir/small", 100)
- self.hash["source_dir/test/huge"] = self.create_file("source_dir/test/huge", 700000)
- self.hash["source_dir/test/huge2"] = self.create_file("source_dir/test/huge2", 800000)
+ os.makedirs (self.src_path)
- self.consoleLogger = logging.StreamHandler()
- self.consoleLogger.setLevel(logging.DEBUG)
+ for i in range (5):
+ f = "dummy_%rd" % i
+ self.hash [f] = self.create_file ("%s/%s"
+ % (self.src_path, f), 5 + i)
- if not os.path.isdir(self.GIT_DIR):
- # Not running inside git tree, take our
- # own testing directory as source.
- self.GIT_DIR = 'testing'
- if not os.path.isdir(self.GIT_DIR):
- raise Exception('No input directory found: ' + self.GIT_DIR)
+
+ def tearDown(self):
+ '''
+ Remove temporal files created by unit tests and reset globals.
+ '''
+ os.chdir(self.pwd)
+ os.system("rm -rf source_dir source_dir2 backup_dir*")
def test_recover_corrupt_byte (self):
Expects the extraction to fail in normal mode. With disaster recovery,
extraction must succeed, and exactly one file must be missing.
"""
- mode = "#gz"
- dst_path = "source_dir"
- src_path = "%s2" % dst_path
+ mode = self.COMPRESSION or "#"
bak_path = "backup_dir"
- backup_file = "the_full_backup.tar.gz"
+ backup_file = "the_full_backup.tar"
backup_full = "%s/%s" % (bak_path, backup_file)
- index_file = "the_full_index.gz"
+ index_file = "the_full_index"
+
+ if self.COMPRESSION is not None:
+ backup_file += ".gz"
+ backup_full += ".gz"
+ index_file += ".gz"
+
+ if self.PASSWORD is not None:
+ backup_file += deltatar.PDTCRYPT_EXTENSION
+ backup_full += deltatar.PDTCRYPT_EXTENSION
+ index_file += deltatar.PDTCRYPT_EXTENSION
def vname (*a, **kwa):
return backup_file
- dtar = deltatar.DeltaTar (mode="#gz",
- logger=self.consoleLogger,
+ dtar = deltatar.DeltaTar (mode=mode,
+ logger=None,
+ password=self.PASSWORD,
index_name_func=lambda _: index_file,
volume_name_func=vname)
- self.hash = dict ()
- os.makedirs (src_path)
- for i in range (5):
- f = "dummy_%rd" % i
- self.hash [f] = self.create_file ("%s/%s" % (src_path, f), 5 + i)
dtar.create_full_backup \
- (source_path=src_path, backup_path=bak_path)
+ (source_path=self.src_path, backup_path=bak_path)
+
+ if self.PASSWORD is not None:
+ # ensure all files are at least superficially in PDT format
+ for f in os.listdir (bak_path):
+ assert is_pdt_encrypted ("%s/%s" % (bak_path, f))
# first restore must succeed
- dtar.restore_backup(target_path=dst_path,
+ dtar.restore_backup(target_path=self.dst_path,
backup_tar_path=backup_full)
for key, value in self.hash.items ():
- f = "%s/%s" % (dst_path, key)
+ f = "%s/%s" % (self.dst_path, key)
assert os.path.exists (f)
assert value == self.md5sum (f)
- shutil.rmtree (dst_path)
- shutil.rmtree (src_path)
+ shutil.rmtree (self.dst_path)
+ shutil.rmtree (self.src_path)
- flip_bits (backup_full, gz_header_size (backup_full) + 1)
+ if self.PASSWORD is not None:
+ flip_bits (backup_full, crypto.PDTCRYPT_HDR_SIZE + 1)
+ elif self.COMPRESSION is not None:
+ flip_bits (backup_full, gz_header_size (backup_full) + 1)
+ else:
+ flip_bits (backup_full, tarfile.BLOCKSIZE + 1)
# normal restore must fail
- curdir = os.getcwd () # not restored after below failure
- with self.assertRaises (Exception) as cm:
- dtar.restore_backup(target_path=dst_path,
+ try:
+ dtar.restore_backup(target_path=self.dst_path,
backup_tar_path=backup_full)
- assert type (cm.exception) in [ tarfile.CompressionError
- , tarfile.ReadError
- ]
-
- os.chdir (curdir)
+ except tarfile.CompressionError:
+ if self.PASSWORD is not None or self.COMPRESSION is not None:
+ pass
+ except tarfile.ReadError:
+ if self.PASSWORD is not None or self.COMPRESSION is not None:
+ pass
+
+ os.chdir (self.pwd) # not restored due to the error above
# but recover will succeed
- failed = dtar.recover_backup(target_path=dst_path,
+ failed = dtar.recover_backup(target_path=self.dst_path,
backup_indexes_paths=[
"%s/%s" % (bak_path, index_file)
])
- assert len (failed) == 1
+
+ assert len (failed) == self.FAILURES
# with one file missing
missing = []
for key, value in self.hash.items ():
- kkey = "%s/%s" % (dst_path, key)
+ kkey = "%s/%s" % (self.dst_path, key)
if os.path.exists (kkey):
assert value == self.md5sum (kkey)
else:
missing.append (key)
- assert len (missing) == 1
+ assert len (missing) == self.FAILURES
+
+ shutil.rmtree (self.dst_path)
+
+
+class RecoverGZTest (RecoverTest):
+ COMPRESSION = "#gz"
+ PASSWORD = None
+ FAILURES = 1
+
- shutil.rmtree (dst_path)
+class RecoverGZAESTest (RecoverTest):
+ COMPRESSION = "#gz"
+ PASSWORD = TEST_PASSWORD
+ FAILURES = 1