, RecoverCorruptVolumeGZAESTest \
, RecoverCorruptHoleTest \
, RecoverCorruptHoleGZTest \
- , RecoverCorruptHoleGZAESTest
+ , RecoverCorruptHoleGZAESTest \
+ , RescueCorruptHoleTest \
+ , RescueCorruptHoleGZTest \
+ , RescueCorruptHoleGZAESTest \
+ , GenIndexIntactTest \
+ , GenIndexIntactGZTest \
+ , GenIndexIntactGZAESTest
from testing.test_rescue_tar import RescueTarTest
from testing.test_encryption import EncryptionTest
from testing.test_deltatar import (DeltaTarTest, DeltaTar2Test,
, RecoverCorruptHoleTest
, RecoverCorruptHoleGZTest
, RecoverCorruptHoleGZAESTest
+ , RescueCorruptHoleTest
+ , RescueCorruptHoleGZTest
+ , RescueCorruptHoleGZAESTest
+ , GenIndexIntactTest
+ , GenIndexIntactGZTest
+ , GenIndexIntactGZAESTest
]:
try:
t = group (n)
import shutil
import stat
+from functools import partial
+
import deltatar.deltatar as deltatar
import deltatar.crypto as crypto
import deltatar.tarfile as tarfile
os.link (path, aname, src_dir_fd=0, follow_symlinks=True)
os.close (outfd)
+def immaculate (_, _fname, _compress, _encrypt):
+ """
+ No-op dummy.
+ """
+ pass
###############################################################################
## tests ##
os.system("rm -rf source_dir source_dir2 backup_dir*")
-class RecoverTest (DefectiveTest):
- """
- Recover: restore corrupt backups from index file information.
- """
+ @staticmethod
+ def default_volume_name (backup_file, _x, _y, n, *a, **kwa):
+ return backup_file % n
- def test_recover_corrupt (self):
- """
- Perform various damaging actions that cause unreadable objects.
-
- Expects the extraction to fail in normal mode. With disaster recovery,
- extraction must succeed, and exactly one file must be missing.
- """
- mode = self.COMPRESSION or "#"
+ def gen_file_names (self, comp, pw):
bak_path = "backup_dir"
backup_file = "the_full_backup_%0.2d.tar"
backup_full = ("%s/%s" % (bak_path, backup_file)) % 0
backup_full = "%s.%s" % (backup_full, deltatar.PDTCRYPT_EXTENSION)
index_file = "%s.%s" % (index_file , deltatar.PDTCRYPT_EXTENSION)
+ return bak_path, backup_file, backup_full, index_file
+
+
+class RecoverTest (DefectiveTest):
+ """
+ Recover: restore corrupt backups from index file information.
+ """
+
+ def test_recover_corrupt (self):
+ """
+ Perform various damaging actions that cause unreadable objects.
+
+ Expects the extraction to fail in normal mode. With disaster recovery,
+ extraction must succeed, and exactly one file must be missing.
+ """
+ mode = self.COMPRESSION or "#"
+ bak_path, backup_file, backup_full, index_file = \
+ self.gen_file_names (self.COMPRESSION, self.PASSWORD)
+
if self.VOLUMES > 1:
# add n files for one nth the volume size each, corrected
# for metadata and tar block overhead
fsiz,
random=True)
- def vname (_x, _y, n, *a, **kwa):
- return backup_file % n
-
+ vname = partial (self.default_volume_name, backup_file)
dtar = deltatar.DeltaTar (mode=mode,
logger=None,
password=self.PASSWORD,
Perform various damaging actions that cause unreadable objects, then
attempt to extract objects regardless.
"""
- mode = self.COMPRESSION or "#"
- bak_path = "backup_dir"
- backup_file = "the_full_backup_%0.2d.tar"
- backup_full = ("%s/%s" % (bak_path, backup_file)) % 0
- index_file = "the_full_index"
-
- if self.COMPRESSION is not None:
- backup_file += ".gz"
- backup_full += ".gz"
- index_file += ".gz"
-
- if self.PASSWORD is not None:
- backup_file = "%s.%s" % (backup_file, deltatar.PDTCRYPT_EXTENSION)
- backup_full = "%s.%s" % (backup_full, deltatar.PDTCRYPT_EXTENSION)
- index_file = "%s.%s" % (index_file , deltatar.PDTCRYPT_EXTENSION)
+ mode = self.COMPRESSION or "#"
+ bak_path, backup_file, backup_full, index_file = \
+ self.gen_file_names (self.COMPRESSION, self.PASSWORD)
if self.VOLUMES > 1:
# add n files for one nth the volume size each, corrected
fsiz,
random=True)
- def vname (_x, _y, n, *a, **kwa):
- return backup_file % n
-
+ vname = partial (self.default_volume_name, backup_file)
dtar = deltatar.DeltaTar (mode=mode,
logger=None,
password=self.PASSWORD,
os.chdir (self.pwd) # not restored due to the error above
# but recover will succeed
failed = dtar.rescue_backup(target_path=self.dst_path,
- backup_indexes_paths=[
- "%s/%s" % (bak_path, index_file)
- ])
+ backup_tar_path=backup_full)
assert len (failed) == self.FAILURES
else:
missing.append (key)
- ssert len (missing) == (self.MISSING if self.MISSING is not None
- else self.FAILURES)
+ assert len (missing) == (self.MISSING if self.MISSING is not None
+ else self.FAILURES)
assert len (mismatch) == self.MISMATCHES
shutil.rmtree (self.dst_path)
+class GenIndexTest (DefectiveTest):
+ """
+ Deducing an index for a backup with tarfile.
+ """
+
+ def test_gen_index (self):
+ """
+ Create backup, leave it unharmed, then generate an index.
+ """
+ mode = self.COMPRESSION or "#"
+ bak_path, backup_file, backup_full, index_file = \
+ self.gen_file_names (self.COMPRESSION, self.PASSWORD)
+
+ vname = partial (self.default_volume_name, backup_file)
+ dtar = deltatar.DeltaTar (mode=mode,
+ logger=None,
+ password=self.PASSWORD,
+ index_name_func=lambda _: index_file,
+ volume_name_func=vname)
+
+ dtar.create_full_backup \
+ (source_path=self.src_path, backup_path=bak_path,
+ max_volume_size=1)
+
+ psidx = tarfile.gen_rescue_index (backup_full, mode, password=self.PASSWORD)
+
+ assert len (psidx) == len (self.hash)
+
+
+###############################################################################
+# rescue
+###############################################################################
+
class RecoverCorruptPayloadTestBase (RecoverTest):
COMPRESSION = None
PASSWORD = None
PASSWORD = TEST_PASSWORD
MISSING = 2
+###############################################################################
+# rescue
+###############################################################################
+
+class RescueCorruptHoleBaseTest (RescueTest):
+ """
+ Cut bytes from the middle of a volume.
+ """
+ COMPRESSION = None
+ PASSWORD = None
+ FAILURES = 3
+ CORRUPT = corrupt_hole
+ VOLUMES = 2 # request two vols to swell up the first one
+ MISMATCHES = 1
+
+class RescueCorruptHoleTest (RescueCorruptHoleBaseTest):
+ FAILURES = 2
+
+class RescueCorruptHoleGZTest (RescueCorruptHoleBaseTest):
+ COMPRESSION = "#gz"
+ MISSING = 2
+
+class RescueCorruptHoleGZAESTest (RescueCorruptHoleBaseTest):
+ COMPRESSION = "#gz"
+ PASSWORD = TEST_PASSWORD
+ MISSING = 2
+
+###############################################################################
+# index
+###############################################################################
+
+class GenIndexIntactBaseTest (GenIndexTest):
+ """
+ """
+ COMPRESSION = None
+ PASSWORD = None
+ FAILURES = 0
+ CORRUPT = immaculate
+ VOLUMES = 1
+ MISMATCHES = 1
+
+
+class GenIndexIntactTest (GenIndexIntactBaseTest):
+ pass
+
+class GenIndexIntactGZTest (GenIndexIntactBaseTest):
+ COMPRESSION = "#gz"
+ MISSING = 2
+
+class GenIndexIntactGZAESTest (GenIndexIntactBaseTest):
+ COMPRESSION = "#gz"
+ PASSWORD = TEST_PASSWORD
+ MISSING = 2
+