import deltatar.deltatar as deltatar
import deltatar.crypto as crypto
+import deltatar.tarfile as tarfile
from . import BaseTest
their values xor’ed with *b*.
"""
fd = os.open (fname, os.O_RDWR)
+
try:
pos = os.lseek (fd, off, os.SEEK_SET)
assert pos == off
finally:
os.close (fd)
+
+def gz_header_size (fname, off=0):
+ """
+ Determine the length of the gzip header starting at *off* in file fname.
+
+ The header is variable length because it may contain the filename as NUL
+ terminated bytes.
+ """
+ # length so we need to determine where the actual payload starts
+ off = tarfile.GZ_HEADER_SIZE
+ fd = os.open (fname, os.O_RDONLY)
+
+ try:
+ pos = os.lseek (fd, off, os.SEEK_SET)
+ assert pos == off
+ while os.read (fd, 1)[0] != 0:
+ off += 1
+ pos = os.lseek (fd, off, os.SEEK_SET)
+ assert pos == off
+ finally:
+ os.close (fd)
+
+ return off
+
+
class RecoverTest (BaseTest):
"""
Disaster recovery: restore corrupt backups.
Expects the extraction to fail in normal mode. With disaster recovery,
extraction must succeed, and exactly one file must be missing.
"""
- src_path = "source_dir2"
- dst_path = "backup_dir"
- backup_file = "the_full_backup.gz"
- backup_full = "%s/%s" % (dst_path, backup_file)
-
- fulldiff = lambda f: f and "full" or "diff"
+ mode = "#gz"
+ dst_path = "source_dir"
+ src_path = "%s2" % dst_path
+ bak_path = "backup_dir"
+ backup_file = "the_full_backup.tar.gz"
+ backup_full = "%s/%s" % (bak_path, backup_file)
+ index_file = "the_full_index.gz"
def vname (*a, **kwa):
return backup_file
dtar = deltatar.DeltaTar (mode="#gz",
logger=self.consoleLogger,
- index_name_func=lambda f: \
- "the_%s_index" % fulldiff (f),
+ index_name_func=lambda _: index_file,
volume_name_func=vname)
-
self.hash = dict ()
os.makedirs (src_path)
for i in range (5):
f = "source_dir2/dummy_%rd" % i
- self.hash [f] = self.create_file (f, i)
+ self.hash [f] = self.create_file (f, 5 + i)
dtar.create_full_backup \
- (source_path=src_path, backup_path=dst_path)
+ (source_path=src_path, backup_path=bak_path)
+
+ # first restore must succeed
+ dtar.restore_backup(target_path=dst_path,
+ backup_tar_path=backup_full)
+ for key, value in self.hash.items ():
+ assert os.path.exists (key)
+ assert value == self.md5sum (key)
+ shutil.rmtree (src_path)
+
+ flip_bits (backup_full, gz_header_size (backup_full) + 1)
+
+ # normal restore must fail
+ curdir = os.getcwd () # not restored after below failure
+ with self.assertRaises (tarfile.ReadError):
+ dtar.restore_backup(target_path=dst_path,
+ backup_tar_path=backup_full)
+
+ os.chdir (curdir)
+ # but recover will succeed
+ dtar.restore_backup(target_path=dst_path,
+ backup_tar_path=backup_full)
+
+ # with one file missing
+ missing = 0
+ for key, value in self.hash.items ():
+ if os.path.exists (key):
+ assert value == self.md5sum (key)
+ else:
+ missing += 1
+ assert missing == 1
- # damage byte just after first object header
- flip_bits (backup_full, crypto.PDTCRYPT_HDR_SIZE + 1)
shutil.rmtree (src_path)