""" Intra2net 2017 =============================================================================== test_recover.py – behavior facing file corruption =============================================================================== Corruptors have the signature ``(unittest × string × bool × bool) → void``, where the *string* argument is the name of the file to modify, the *booleans* specialize the operation for compressed and encrypted data. Issues are communicated upward by throwing. - corrupt_header (): Modify the first object header where it hurts. With encryption, the tag is corrupted to cause authentication of the decrypted data to fail. For compressed data, the two byte magic is altered, for uncompressed archives, the tar header checksum field. - corrupt_truncate (): Drop the file’s content after two thirds, causing extraction of later objects to fail. Since the operation preserves the offsets of objects before the cutoff, this yields the same results regardless of whether restore or rescue mode is used. - corrupt_ctsize (): Modify the *ctsize* field of a PDTCRYPT header. The goal is to have decryption continue past the end of the object, causing data authentication to fail and file reads to be at odds with the offsets in the index. Only applicable to encrypted archives; will raise *UndefinedTest* otherwise. - corrupt_entire_header (): Invert all bits of the first object header (PDTCRYPT, gzip, tar) without affecting the payload. This renders the object unreadable; the result will resemble a file with arbitrary leading data but all the remaining object offsets intact, so the contents can still be extracted with index based recovery. - corrupt_payload_start (): For all header variants, skip to the first byte past the header and corrupt it. Encrypted objects will fail to authenticate. Compressed objects will yield a bad CRC32. The Tar layer will take no notice but the extracted object will fail an independent checksum comparison with that of the original file. - corrupt_leading_garbage (): Prepend random data to an otherwise valid file. Creates a situation that index based recovery cannot handle by shifting the offsets of all objects in the file. In rescue mode, these objects must be located and extracted regardless. - corrupt_trailing_data (): Append data to an otherwise valid file. Both the recovery and rescue modes must be able to retrieve all objects from that file. - corrupt_volume (): Zero out an entire backup file. This is interesting for multivolume tests: all files from the affected volume must be missing but objects that span volume bounds will still be partially recoverable. - corrupt_hole (): Remove a region from a file. Following the damaged part, no object can be recovered in index mode, but rescue mode will still find those. The object containing the start of the hole will fail checksum tests because of the missing part and the overlap with the subsequent object. """ import logging import os import shutil import stat import sys import unittest from functools import partial import deltatar.deltatar as deltatar import deltatar.crypto as crypto import deltatar.tarfile as tarfile from . import BaseTest TEST_PASSWORD = "test1234" TEST_VOLSIZ = 2 # MB TEST_FILESPERVOL = 3 VOLUME_OVERHEAD = 1.4 # account for tar overhead when fitting files into # volumes; this is black magic TEST_BLOCKSIZE = 4096 ############################################################################### ## helpers ## ############################################################################### def flip_bits (fname, off, b=0x01, n=1): """ Open file *fname* at offset *off*, replacing the next *n* bytes with their values xor’ed with *b*. """ fd = os.open (fname, os.O_RDWR) try: pos = os.lseek (fd, off, os.SEEK_SET) assert pos == off chunk = os.read (fd, n) chunk = bytes (map (lambda v: v ^ b, chunk)) pos = os.lseek (fd, off, os.SEEK_SET) assert pos == off os.write (fd, chunk) finally: os.close (fd) def gz_header_size (fname, off=0): """ Determine the length of the gzip header starting at *off* in file fname. The header is variable length because it may contain the filename as NUL terminated bytes. """ # length so we need to determine where the actual payload starts off = tarfile.GZ_HEADER_SIZE fd = os.open (fname, os.O_RDONLY) try: pos = os.lseek (fd, off, os.SEEK_SET) assert pos == off while os.read (fd, 1)[0] != 0: off += 1 pos = os.lseek (fd, off, os.SEEK_SET) assert pos == off finally: os.close (fd) return off def is_pdt_encrypted (fname): """ Returns true if the file contains at least one PDT header plus enough space for the object. """ try: with open (fname, "rb") as st: hdr = crypto.hdr_read_stream (st) siz = hdr ["ctsize"] assert (len (st.read (siz)) == siz) except Exception as exn: return False return True ############################################################################### ## corruption simulators ## ############################################################################### class UndefinedTest (Exception): """No test available for the asked combination of parameters.""" def corrupt_header (_, fname, compress, encrypt): """ Modify a significant byte in the object header of the format. """ if encrypt is True: # damage GCM tag flip_bits (fname, crypto.HDR_OFF_TAG + 1) elif compress is True: # invalidate magic flip_bits (fname, 1) else: # Fudge checksum. From tar(5): # # struct header_gnu_tar { # char name[100]; # char mode[8]; # char uid[8]; # char gid[8]; # char size[12]; # char mtime[12]; # char checksum[8]; # … flip_bits (fname, 100 + 8 + 8 + 8 + 12 + 12 + 1) def corrupt_truncate (_, fname, _compress, _encrypt): """ Shorten file by one third. """ fd = os.open (fname, os.O_WRONLY) size = os.lseek (fd, 0, os.SEEK_END) os.ftruncate (fd, 2 * size // 3) os.fsync (fd) os.close (fd) def corrupt_ctsize (_, fname, compress, encrypt): """ Blow up the size of an object so as to cause its apparent payload to leak into the next one. """ if encrypt is True: # damage lowest bit of second least significant byte of size field; # this effectively sets the ciphertext size to 422, causing it to # extend over the next object into the third one. return flip_bits (fname, crypto.HDR_OFF_CTSIZE + 1, b=0x01) raise UndefinedTest ("corrupt_ctsize %s %s %s" % (fname, compress, encrypt)) def corrupt_entire_header (_, fname, compress, encrypt): """ Flip all bits in the first object header. """ if encrypt is True: flip_bits (fname, 0, 0xff, crypto.PDTCRYPT_HDR_SIZE) elif compress is True: flip_bits (fname, 0, 0xff, gz_header_size (fname)) else: flip_bits (fname, 0, 0xff, tarfile.BLOCKSIZE) def corrupt_payload_start (_, fname, compress, encrypt): """ Modify the byte following the object header structure of the format. """ if encrypt is True: flip_bits (fname, crypto.PDTCRYPT_HDR_SIZE + 1) elif compress is True: flip_bits (fname, gz_header_size (fname) + 1) else: flip_bits (fname, tarfile.BLOCKSIZE + 1) def corrupt_leading_garbage (_, fname, compress, encrypt): """ Prepend junk to file. """ aname = os.path.abspath (fname) infd = os.open (fname, os.O_RDONLY) size = os.lseek (infd, 0, os.SEEK_END) assert os.lseek (infd, 0, os.SEEK_SET) == 0 outfd = os.open (os.path.dirname (aname), os.O_WRONLY | os.O_TMPFILE, stat.S_IRUSR | stat.S_IWUSR) junk = os.urandom (42) # write new file with garbage prepended done = 0 os.write (outfd, junk) # junk first done += len (junk) while done < size: data = os.read (infd, TEST_BLOCKSIZE) os.write (outfd, data) done += len (data) assert os.lseek (outfd, 0, os.SEEK_CUR) == done # close and free old file os.close (infd) os.unlink (fname) # install the new file in its place, atomically path = "/proc/self/fd/%d" % outfd os.link (path, aname, src_dir_fd=0, follow_symlinks=True) os.close (outfd) def corrupt_trailing_data (_, fname, compress, encrypt): """ Append random data to file. """ junk = os.urandom (42) fd = os.open (fname, os.O_WRONLY | os.O_APPEND) os.write (fd, junk) os.close (fd) def corrupt_volume (_, fname, compress, encrypt): """ Zero out an entire volume. """ fd = os.open (fname, os.O_WRONLY) size = os.lseek (fd, 0, os.SEEK_END) assert os.lseek (fd, 0, os.SEEK_SET) == 0 zeros = bytes (b'\x00' * TEST_BLOCKSIZE) while size > 0: todo = min (size, TEST_BLOCKSIZE) os.write (fd, zeros [:todo]) size -= todo os.close (fd) def corrupt_hole (_, fname, compress, encrypt): """ Cut file in three pieces, reassemble without the middle one. """ aname = os.path.abspath (fname) infd = os.open (fname, os.O_RDONLY) size = os.lseek (infd, 0, os.SEEK_END) assert os.lseek (infd, 0, os.SEEK_SET) == 0 assert size > 3 * TEST_BLOCKSIZE hole = (size / 3, size * 2 / 3) outfd = os.open (os.path.dirname (aname), os.O_WRONLY | os.O_TMPFILE, stat.S_IRUSR | stat.S_IWUSR) done = 0 while done < size: data = os.read (infd, TEST_BLOCKSIZE) if done < hole [0] or hole [1] < done: # only copy from outside hole os.write (outfd, data) done += len (data) os.close (infd) os.unlink (fname) path = "/proc/self/fd/%d" % outfd os.link (path, aname, src_dir_fd=0, follow_symlinks=True) os.close (outfd) def immaculate (_, _fname, _compress, _encrypt): """ No-op dummy. """ pass ############################################################################### ## tests ## ############################################################################### class DefectiveTest (BaseTest): """ Disaster recovery: restore corrupt backups. """ COMPRESSION = None PASSWORD = None FAILURES = 0 # files that could not be restored MISMATCHES = 0 # files that were restored but corrupted CORRUPT = corrupt_payload_start VOLUMES = 1 MISSING = None # normally the number of failures def setUp(self): ''' Create base test data ''' self.pwd = os.getcwd() self.dst_path = "source_dir" self.src_path = "%s2" % self.dst_path self.hash = dict() os.system('rm -rf target_dir source_dir* backup_dir* huge') os.makedirs (self.src_path) for i in range (5): f = "dummy_%d" % i self.hash [f] = self.create_file ("%s/%s" % (self.src_path, f), 5 + i) def tearDown(self): ''' Remove temporal files created by unit tests and reset globals. ''' os.chdir(self.pwd) os.system("rm -rf source_dir source_dir2 backup_dir*") @staticmethod def default_volume_name (backup_file, _x, _y, n, *a, **kwa): return backup_file % n def gen_file_names (self, comp, pw): bak_path = "backup_dir" backup_file = "the_full_backup_%0.2d.tar" backup_full = ("%s/%s" % (bak_path, backup_file)) % 0 index_file = "the_full_index" if self.COMPRESSION is not None: backup_file += ".gz" backup_full += ".gz" index_file += ".gz" if self.PASSWORD is not None: backup_file = "%s.%s" % (backup_file, deltatar.PDTCRYPT_EXTENSION) backup_full = "%s.%s" % (backup_full, deltatar.PDTCRYPT_EXTENSION) index_file = "%s.%s" % (index_file , deltatar.PDTCRYPT_EXTENSION) return bak_path, backup_file, backup_full, index_file def gen_multivol (self, nvol): # add n files for one nth the volume size each, corrected # for metadata and tar block overhead fsiz = int ( ( TEST_VOLSIZ / (TEST_FILESPERVOL * VOLUME_OVERHEAD)) * 1024 * 1024) fcnt = (self.VOLUMES - 1) * TEST_FILESPERVOL for i in range (fcnt): nvol, invol = divmod(i, TEST_FILESPERVOL) f = "dummy_vol_%d_n_%0.2d" % (nvol, invol) self.hash [f] = self.create_file ("%s/%s" % (self.src_path, f), fsiz, random=True) class RecoverTest (DefectiveTest): """ Recover: restore corrupt backups from index file information. """ def test_recover_corrupt (self): """ Perform various damaging actions that cause unreadable objects. Expects the extraction to fail in normal mode. With disaster recovery, extraction must succeed, and exactly one file must be missing. """ mode = self.COMPRESSION or "#" bak_path, backup_file, backup_full, index_file = \ self.gen_file_names (self.COMPRESSION, self.PASSWORD) if self.VOLUMES > 1: self.gen_multivol (self.VOLUMES) vname = partial (self.default_volume_name, backup_file) dtar = deltatar.DeltaTar (mode=mode, logger=None, password=self.PASSWORD, index_name_func=lambda _: index_file, volume_name_func=vname) dtar.create_full_backup \ (source_path=self.src_path, backup_path=bak_path, max_volume_size=1) if self.PASSWORD is not None: # ensure all files are at least superficially in PDT format for f in os.listdir (bak_path): assert is_pdt_encrypted ("%s/%s" % (bak_path, f)) # first restore must succeed dtar.restore_backup(target_path=self.dst_path, backup_indexes_paths=[ "%s/%s" % (bak_path, index_file) ], disaster=tarfile.TOLERANCE_RECOVER, strict_validation=False) for key, value in self.hash.items (): f = "%s/%s" % (self.dst_path, key) assert os.path.exists (f) assert value == self.md5sum (f) shutil.rmtree (self.dst_path) shutil.rmtree (self.src_path) self.CORRUPT (backup_full, self.COMPRESSION is not None, self.PASSWORD is not None) # normal restore must fail try: dtar.restore_backup(target_path=self.dst_path, backup_tar_path=backup_full) except tarfile.CompressionError: if self.PASSWORD is not None or self.COMPRESSION is not None: pass else: raise except tarfile.ReadError: # can happen with all three modes pass except tarfile.DecryptionError: if self.PASSWORD is not None: pass else: raise os.chdir (self.pwd) # not restored due to the error above # but recover will succeed failed = dtar.recover_backup(target_path=self.dst_path, backup_indexes_paths=[ "%s/%s" % (bak_path, index_file) ]) assert len (failed) == self.FAILURES # with one file missing missing = [] mismatch = [] for key, value in self.hash.items (): kkey = "%s/%s" % (self.dst_path, key) if os.path.exists (kkey): if value != self.md5sum (kkey): mismatch.append (key) else: missing.append (key) # usually, an object whose extraction fails will not be found on # disk afterwards so the number of failures equals that of missing # files. however, some modes will create partial files for objects # spanning multiple volumes that contain the parts whose checksums # were valid. assert len (missing) == (self.MISSING if self.MISSING is not None else self.FAILURES) assert len (mismatch) == self.MISMATCHES shutil.rmtree (self.dst_path) class RescueTest (DefectiveTest): """ Rescue: restore corrupt backups from backup set that is damaged to a degree that the index file is worthless. """ def test_rescue_corrupt (self): """ Perform various damaging actions that cause unreadable objects, then attempt to extract objects regardless. """ mode = self.COMPRESSION or "#" bak_path, backup_file, backup_full, index_file = \ self.gen_file_names (self.COMPRESSION, self.PASSWORD) if self.VOLUMES > 1: self.gen_multivol (self.VOLUMES) vname = partial (self.default_volume_name, backup_file) dtar = deltatar.DeltaTar (mode=mode, logger=None, password=self.PASSWORD, index_name_func=lambda _: index_file, volume_name_func=vname) dtar.create_full_backup \ (source_path=self.src_path, backup_path=bak_path, max_volume_size=1) if self.PASSWORD is not None: # ensure all files are at least superficially in PDT format for f in os.listdir (bak_path): assert is_pdt_encrypted ("%s/%s" % (bak_path, f)) # first restore must succeed dtar.restore_backup(target_path=self.dst_path, backup_indexes_paths=[ "%s/%s" % (bak_path, index_file) ], disaster=tarfile.TOLERANCE_RECOVER, strict_validation=False) for key, value in self.hash.items (): f = "%s/%s" % (self.dst_path, key) assert os.path.exists (f) assert value == self.md5sum (f) shutil.rmtree (self.dst_path) shutil.rmtree (self.src_path) self.CORRUPT (backup_full, self.COMPRESSION is not None, self.PASSWORD is not None) # normal restore must fail try: dtar.restore_backup(target_path=self.dst_path, backup_tar_path=backup_full) except tarfile.CompressionError: if self.PASSWORD is not None or self.COMPRESSION is not None: pass else: raise except tarfile.ReadError: # can happen with all three modes pass except tarfile.DecryptionError: if self.PASSWORD is not None: pass else: raise os.chdir (self.pwd) # not restored due to the error above # but recover will succeed failed = dtar.rescue_backup(target_path=self.dst_path, backup_tar_path=backup_full) # with one file missing missing = [] mismatch = [] for key, value in self.hash.items (): kkey = "%s/%s" % (self.dst_path, key) if os.path.exists (kkey): if value != self.md5sum (kkey): mismatch.append (key) else: missing.append (key) assert len (failed) == self.FAILURES assert len (missing) == (self.MISSING if self.MISSING is not None else self.FAILURES) assert len (mismatch) == self.MISMATCHES shutil.rmtree (self.dst_path) class GenIndexTest (DefectiveTest): """ Deducing an index for a backup with tarfile. """ def test_gen_index (self): """ Create backup, leave it unharmed, then generate an index. """ mode = self.COMPRESSION or "#" bak_path, backup_file, backup_full, index_file = \ self.gen_file_names (self.COMPRESSION, self.PASSWORD) if self.VOLUMES > 1: self.gen_multivol (self.VOLUMES) vname = partial (self.default_volume_name, backup_file) dtar = deltatar.DeltaTar (mode=mode, logger=None, password=self.PASSWORD, index_name_func=lambda _: index_file, volume_name_func=vname) dtar.create_full_backup \ (source_path=self.src_path, backup_path=bak_path, max_volume_size=1) def gen_volume_name (nvol): return os.path.join (bak_path, vname (backup_full, True, nvol)) psidx = tarfile.gen_rescue_index (gen_volume_name, mode, password=self.PASSWORD) # correct for objects spanning volumes: these are treated as separate # in the index! assert len (psidx) - self.VOLUMES + 1 == len (self.hash) ############################################################################### # rescue ############################################################################### class RecoverCorruptPayloadTestBase (RecoverTest): COMPRESSION = None PASSWORD = None FAILURES = 0 # tarfile will restore but corrupted, as MISMATCHES = 1 # revealed by the hash class RecoverCorruptPayloadSingleTest (RecoverCorruptPayloadTestBase): VOLUMES = 1 class RecoverCorruptPayloadMultiTest (RecoverCorruptPayloadTestBase): VOLUMES = 3 class RecoverCorruptPayloadGZTestBase (RecoverTest): COMPRESSION = "#gz" PASSWORD = None FAILURES = 1 MISMATCHES = 0 class RecoverCorruptPayloadGZSingleTest (RecoverCorruptPayloadGZTestBase): VOLUMES = 1 class RecoverCorruptPayloadGZMultiTest (RecoverCorruptPayloadGZTestBase): VOLUMES = 3 class RecoverCorruptPayloadGZAESTestBase (RecoverTest): COMPRESSION = "#gz" PASSWORD = TEST_PASSWORD FAILURES = 1 MISMATCHES = 0 class RecoverCorruptPayloadGZAESSingleTest (RecoverCorruptPayloadGZAESTestBase): VOLUMES = 1 class RecoverCorruptPayloadGZAESMultiTest (RecoverCorruptPayloadGZAESTestBase): VOLUMES = 3 class RecoverCorruptHeaderTestBase (RecoverTest): COMPRESSION = None PASSWORD = None FAILURES = 1 CORRUPT = corrupt_header MISMATCHES = 0 class RecoverCorruptHeaderSingleTest (RecoverCorruptHeaderTestBase): VOLUMES = 1 class RecoverCorruptHeaderMultiTest (RecoverCorruptHeaderTestBase): VOLUMES = 3 class RecoverCorruptHeaderGZTestBase (RecoverTest): COMPRESSION = "#gz" PASSWORD = None FAILURES = 1 CORRUPT = corrupt_header MISMATCHES = 0 class RecoverCorruptHeaderGZSingleTest (RecoverCorruptHeaderGZTestBase): VOLUMES = 1 class RecoverCorruptHeaderGZMultiTest (RecoverCorruptHeaderGZTestBase): VOLUMES = 3 class RecoverCorruptHeaderGZAESTestBase (RecoverTest): COMPRESSION = "#gz" PASSWORD = TEST_PASSWORD FAILURES = 1 CORRUPT = corrupt_header MISMATCHES = 0 class RecoverCorruptHeaderGZAESSingleTest (RecoverCorruptHeaderGZAESTestBase): VOLUMES = 1 class RecoverCorruptHeaderGZAESMultiTest (RecoverCorruptHeaderGZAESTestBase): VOLUMES = 3 class RecoverCorruptTruncateTestBase (RecoverTest): COMPRESSION = None PASSWORD = None FAILURES = 0 CORRUPT = corrupt_truncate MISMATCHES = 0 class RecoverCorruptTruncateTest (RecoverCorruptTruncateTestBase): pass class RecoverCorruptTruncateGZTest (RecoverCorruptTruncateTestBase): """Two files that failed missing.""" COMPRESSION = "#gz" FAILURES = 2 class RecoverCorruptTruncateGZAESTest (RecoverCorruptTruncateTestBase): """Two files that failed missing.""" COMPRESSION = "#gz" PASSWORD = TEST_PASSWORD FAILURES = 2 class RecoverCorruptEntireHeaderTestBase (RecoverTest): COMPRESSION = None PASSWORD = None FAILURES = 1 CORRUPT = corrupt_entire_header MISMATCHES = 0 class RecoverCorruptEntireHeaderSingleTest (RecoverCorruptEntireHeaderTestBase): VOLUMES = 1 class RecoverCorruptEntireHeaderMultiTest (RecoverCorruptEntireHeaderTestBase): VOLUMES = 3 class RecoverCorruptEntireHeaderGZTestBase (RecoverTest): COMPRESSION = "#gz" PASSWORD = None FAILURES = 1 CORRUPT = corrupt_entire_header MISMATCHES = 0 class RecoverCorruptEntireHeaderGZSingleTest (RecoverCorruptEntireHeaderGZTestBase): VOLUMES = 1 class RecoverCorruptEntireHeaderGZMultiTest (RecoverCorruptEntireHeaderGZTestBase): VOLUMES = 3 class RecoverCorruptEntireHeaderGZAESTestBase (RecoverTest): COMPRESSION = "#gz" PASSWORD = TEST_PASSWORD FAILURES = 1 CORRUPT = corrupt_entire_header MISMATCHES = 0 class RecoverCorruptEntireHeaderGZAESSingleTest (RecoverCorruptEntireHeaderGZAESTestBase): VOLUMES = 1 class RecoverCorruptEntireHeaderGZAESMultiTest (RecoverCorruptEntireHeaderGZAESTestBase): VOLUMES = 3 class RecoverCorruptTrailingDataTestBase (RecoverTest): # plain Tar is indifferent against traling data and the results # are consistent COMPRESSION = None PASSWORD = None FAILURES = 0 CORRUPT = corrupt_trailing_data MISMATCHES = 0 class RecoverCorruptTrailingDataSingleTest (RecoverCorruptTrailingDataTestBase): VOLUMES = 1 class RecoverCorruptTrailingDataMultiTest (RecoverCorruptTrailingDataTestBase): # the last object in first archive has extra bytes somewhere in the # middle because tar itself performs no data checksumming. MISMATCHES = 1 VOLUMES = 3 class RecoverCorruptTrailingDataGZTestBase (RecoverTest): # reading past the final object will cause decompression failure; # all objects except for the last survive unharmed though COMPRESSION = "#gz" PASSWORD = None FAILURES = 1 CORRUPT = corrupt_trailing_data MISMATCHES = 0 class RecoverCorruptTrailingDataGZSingleTest (RecoverCorruptTrailingDataGZTestBase): VOLUMES = 1 class RecoverCorruptTrailingDataGZMultiTest (RecoverCorruptTrailingDataGZTestBase): VOLUMES = 3 # the last file of the first volume will only contain the data of the # second part which is contained in the second volume. this happens # because the CRC32 is wrong for the first part so it gets discarded, then # the object is recreated from the first header of the second volume, # containing only the remainder of the data. MISMATCHES = 1 MISSING = 0 class RecoverCorruptTrailingDataGZAESTestBase (RecoverTest): COMPRESSION = "#gz" PASSWORD = TEST_PASSWORD FAILURES = 0 CORRUPT = corrupt_trailing_data MISMATCHES = 0 class RecoverCorruptTrailingDataGZAESSingleTest (RecoverCorruptTrailingDataGZAESTestBase): VOLUMES = 1 class RecoverCorruptTrailingDataGZAESMultiTest (RecoverCorruptTrailingDataGZAESTestBase): VOLUMES = 3 class RecoverCorruptVolumeBaseTest (RecoverTest): COMPRESSION = None PASSWORD = None FAILURES = 8 CORRUPT = corrupt_volume VOLUMES = 3 class RecoverCorruptVolumeTest (RecoverCorruptVolumeBaseTest): pass class RecoverCorruptVolumeGZTest (RecoverCorruptVolumeBaseTest): COMPRESSION = "#gz" class RecoverCorruptVolumeGZAESTest (RecoverCorruptVolumeBaseTest): COMPRESSION = "#gz" PASSWORD = TEST_PASSWORD @unittest.skipIf(sys.version_info < (3, 4), "requires recent os library") class RecoverCorruptHoleBaseTest (RecoverTest): """ Cut bytes from the middle of a volume. Index-based recovery works only up to the hole. """ COMPRESSION = None PASSWORD = None FAILURES = 3 CORRUPT = corrupt_hole VOLUMES = 2 # request two vols to swell up the first one MISMATCHES = 1 @unittest.skipIf(sys.version_info < (3, 4), "requires recent os library") class RecoverCorruptHoleTest (RecoverCorruptHoleBaseTest): FAILURES = 2 @unittest.skipIf(sys.version_info < (3, 4), "requires recent os library") class RecoverCorruptHoleGZTest (RecoverCorruptHoleBaseTest): COMPRESSION = "#gz" MISSING = 2 @unittest.skipIf(sys.version_info < (3, 4), "requires recent os library") class RecoverCorruptHoleGZAESTest (RecoverCorruptHoleBaseTest): COMPRESSION = "#gz" PASSWORD = TEST_PASSWORD MISSING = 2 ############################################################################### # rescue ############################################################################### class RescueCorruptTruncateTestBase (RescueTest): COMPRESSION = None PASSWORD = None FAILURES = 0 CORRUPT = corrupt_truncate MISMATCHES = 0 class RescueCorruptTruncateTest (RescueCorruptTruncateTestBase): pass class RescueCorruptTruncateGZTest (RescueCorruptTruncateTestBase): """Two files that failed missing.""" COMPRESSION = "#gz" MISSING = 2 class RescueCorruptTruncateGZAESTest (RescueCorruptTruncateTestBase): """Two files missing but didn’t fail on account of their absence.""" COMPRESSION = "#gz" PASSWORD = TEST_PASSWORD MISSING = 2 @unittest.skipIf(sys.version_info < (3, 4), "requires recent os library") class RescueCorruptHoleBaseTest (RescueTest): """ Cut bytes from the middle of a volume. """ COMPRESSION = None PASSWORD = None FAILURES = 0 CORRUPT = corrupt_hole VOLUMES = 2 # request two vols to swell up the first one MISMATCHES = 2 # intersected by hole MISSING = 1 # excised by hole @unittest.skipIf(sys.version_info < (3, 4), "requires recent os library") class RescueCorruptHoleTest (RescueCorruptHoleBaseTest): pass @unittest.skipIf(sys.version_info < (3, 4), "requires recent os library") class RescueCorruptHoleGZTest (RescueCorruptHoleBaseTest): COMPRESSION = "#gz" # the decompressor explodes in our face processing the first dummy, nothing # we can do to recover FAILURES = 1 @unittest.skipIf(sys.version_info < (3, 4), "requires recent os library") class RescueCorruptHoleGZAESTest (RescueCorruptHoleBaseTest): COMPRESSION = "#gz" PASSWORD = TEST_PASSWORD # again, ignoring the crypto errors yields a bad zlib stream causing the # decompressor to abort where the hole begins; the file is extracted up # to this point though FAILURES = 1 class RescueCorruptHeaderCTSizeGZAESTest (RescueTest): COMPRESSION = "#gz" PASSWORD = TEST_PASSWORD FAILURES = 0 CORRUPT = corrupt_ctsize MISMATCHES = 0 @unittest.skipIf(sys.version_info < (3, 4), "requires recent os library") class RescueCorruptLeadingGarbageTestBase (RescueTest): # plain Tar is indifferent against traling data and the results # are consistent COMPRESSION = None PASSWORD = None FAILURES = 0 CORRUPT = corrupt_leading_garbage MISMATCHES = 0 @unittest.skipIf(sys.version_info < (3, 4), "requires recent os library") class RescueCorruptLeadingGarbageSingleTest (RescueCorruptLeadingGarbageTestBase): VOLUMES = 1 @unittest.skipIf(sys.version_info < (3, 4), "requires recent os library") class RescueCorruptLeadingGarbageMultiTest (RescueCorruptLeadingGarbageTestBase): # the last object in first archive has extra bytes somewhere in the # middle because tar itself performs no data checksumming. MISMATCHES = 2 VOLUMES = 3 ############################################################################### # index ############################################################################### class GenIndexIntactBaseTest (GenIndexTest): """ """ COMPRESSION = None PASSWORD = None FAILURES = 0 CORRUPT = immaculate VOLUMES = 1 MISMATCHES = 1 class GenIndexIntactSingleTest (GenIndexIntactBaseTest): pass class GenIndexIntactSingleGZTest (GenIndexIntactBaseTest): COMPRESSION = "#gz" MISSING = 2 class GenIndexIntactSingleGZAESTest (GenIndexIntactBaseTest): COMPRESSION = "#gz" PASSWORD = TEST_PASSWORD MISSING = 2 class GenIndexIntactMultiTest (GenIndexIntactBaseTest): VOLUMES = 3 pass class GenIndexIntactMultiGZTest (GenIndexIntactBaseTest): VOLUMES = 3 COMPRESSION = "#gz" MISSING = 2 class GenIndexIntactMultiGZAESTest (GenIndexIntactBaseTest): VOLUMES = 3 COMPRESSION = "#gz" PASSWORD = TEST_PASSWORD MISSING = 2 class GenIndexCorruptTruncateBaseTest (GenIndexTest): """ Recreate index from file that lacks the latter portion. """ COMPRESSION = None PASSWORD = None FAILURES = 0 CORRUPT = corrupt_truncate MISSING = 2 class GenIndexCorruptTruncateTest (GenIndexCorruptTruncateBaseTest): pass class GenIndexCorruptTruncateGZTest (GenIndexCorruptTruncateBaseTest): COMPRESSION = "#gz" class GenIndexCorruptTruncateGZAESTest (GenIndexCorruptTruncateBaseTest): COMPRESSION = "#gz" PASSWORD = TEST_PASSWORD class GenIndexCorruptHoleBaseTest (GenIndexTest): """ Recreate index from file with hole. """ COMPRESSION = None PASSWORD = None FAILURES = 0 CORRUPT = corrupt_hole VOLUMES = 1 MISMATCHES = 1 class GenIndexCorruptHoleTest (GenIndexCorruptHoleBaseTest): pass class GenIndexCorruptHoleGZTest (GenIndexCorruptHoleBaseTest): COMPRESSION = "#gz" MISSING = 2 class GenIndexCorruptHoleGZAESTest (GenIndexCorruptHoleBaseTest): COMPRESSION = "#gz" PASSWORD = TEST_PASSWORD MISSING = 2 class GenIndexCorruptEntireHeaderBaseTest (GenIndexTest): """ Recreate index from file with defective headers. """ COMPRESSION = None PASSWORD = None FAILURES = 0 CORRUPT = corrupt_entire_header VOLUMES = 1 MISMATCHES = 1 class GenIndexCorruptEntireHeaderTest (GenIndexCorruptEntireHeaderBaseTest): pass class GenIndexCorruptEntireHeaderGZTest (GenIndexCorruptEntireHeaderBaseTest): COMPRESSION = "#gz" MISSING = 2 class GenIndexCorruptEntireHeaderGZAESTest (GenIndexCorruptEntireHeaderBaseTest): COMPRESSION = "#gz" PASSWORD = TEST_PASSWORD MISSING = 2