# Copyright (C) 2013 Intra2net AG # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see # # Author: Eduardo Robles Elvira import errno import os import re import random import shutil import logging import binascii import json from datetime import datetime from functools import partial from unittest import skip, SkipTest import deltatar.tarfile as tarfile from deltatar.tarfile import TarFile from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION import deltatar.crypto as crypto from . import BaseTest from . import new_volume_handler # Enable warning messages from deltatar. This minimizes the SNR of # test runs, but none of the messages are meaningful in any way. VERBOSE_TEST_OUTPUT = False class DeltaTarTest(BaseTest): """ Test backups """ MODE = '' MODE_COMPRESSES = False ENCRYPTION = None # (password : str, paramversion : int) option GIT_DIR = '.git' FSTEST = None FSAPI_SAVED = [] def setUp(self): ''' Create base test data ''' self.pwd = os.getcwd() os.system('rm -rf target_dir source_dir* backup_dir* huge') os.makedirs('source_dir/test/test2') self.hash = dict() self.hash["source_dir/test/test2"] = '' self.hash["source_dir/big"] = self.create_file("source_dir/big", 50000) self.hash["source_dir/small"] = self.create_file("source_dir/small", 100) self.hash["source_dir/test/huge"] = self.create_file("source_dir/test/huge", 700000) self.hash["source_dir/test/huge2"] = self.create_file("source_dir/test/huge2", 800000) self.consoleLogger = None if VERBOSE_TEST_OUTPUT is True: self.consoleLogger = logging.StreamHandler() self.consoleLogger.setLevel(logging.DEBUG) if not os.path.isdir(self.GIT_DIR): # Not running inside git tree, take our # own testing directory as source. self.GIT_DIR = 'testing' if not os.path.isdir(self.GIT_DIR): raise Exception('No input directory found: ' + self.GIT_DIR) if self.FSTEST is not None: self.FSTEST () def tearDown(self): ''' Remove temporary files created by unit tests and restore the API functions in *os*. ''' for att, val in self.FSAPI_SAVED: setattr (os, att, val) os.chdir(self.pwd) os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge") _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ ("I am fully aware that this will void my warranty.") def test_restore_simple_full_backup(self): ''' Creates a full backup without any filtering and restores it. ''' password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") shutil.rmtree("source_dir") tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) deltatar.restore_backup(target_path="source_dir", backup_tar_path=tar_path) for key, value in self.hash.items(): assert os.path.exists(key) if value: assert value == self.md5sum(key) def test_create_backup_max_file_length (self): """ Creates a full backup including one file that exceeds the (purposely lowered) upper bound on GCM encrypted objects. This will yield multiple encrypted objects for one plaintext file. Success is verified by splitting the archive at object boundaries and counting the parts. """ if self.MODE_COMPRESSES is True: raise SkipTest ("GCM file length test not meaningful with compression.") if self.ENCRYPTION is None: raise SkipTest ("GCM file length applies only to encrypted backups.") new_max = 20000 # cannot be less than tar block size crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ ("I am fully aware that this will void my warranty.", new_max) password, paramversion = self.ENCRYPTION deltatar = DeltaTar (mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) self.hash = dict () os.makedirs ("source_dir2") for f, s in [("empty" , 0) # 1 tar objects ,("slightly_larger", new_max + 1) # 2 ,("twice" , 2 * new_max) # 3 ]: f = "source_dir2/%s" % f self.hash [f] = self.create_file (f, s) deltatar.create_full_backup \ (source_path="source_dir2", backup_path="backup_dir") assert os.path.exists ("backup_dir") shutil.rmtree ("source_dir2") backup_filename = deltatar.volume_name_func ("backup_dir", True, 0) backup_path = os.path.join("backup_dir", backup_filename) # split the resulting archive into its constituents without # decrypting ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - " "-o backup_dir/split <\'%s\'" % backup_path) assert os.path.exists ("backup_dir/split") dents = os.listdir ("backup_dir/split") assert len (dents) == 6 def test_restore_backup_max_file_length (self): """ Creates a full backup including one file that exceeds the (purposely lowered) upper bound on GCM encrypted objects. This will yield two encrypted objects for one plaintext file. Success is verified by splitting the archive at object boundaries and counting the parts. """ if self.MODE_COMPRESSES is True: raise SkipTest ("GCM file length test not meaningful with compression.") if self.ENCRYPTION is None: raise SkipTest ("GCM file length applies only to encrypted backups.") new_max = 20000 # cannot be less than tar block size crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ ("I am fully aware that this will void my warranty.", new_max) password, paramversion = self.ENCRYPTION deltatar = DeltaTar (mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) self.hash = dict () os.makedirs ("source_dir2") for f, s in [("empty" , 0) # 1 tar objects ,("almost_large" , new_max - 1) # 2 ,("large" , new_max) # 3 ,("slightly_larger", new_max + 1) # 4 ,("twice" , 2 * new_max) # 5 ,("twice_plus_one" , (2 * new_max) + 1) # 6 ]: f = "source_dir2/%s" % f self.hash [f] = self.create_file (f, s) deltatar.create_full_backup \ (source_path="source_dir2", backup_path="backup_dir") assert os.path.exists ("backup_dir") shutil.rmtree ("source_dir2") backup_filename = deltatar.volume_name_func ("backup_dir", True, 0) backup_path = os.path.join("backup_dir", backup_filename) tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) deltatar.restore_backup(target_path="source_dir2", backup_tar_path=tar_path) for key, value in self.hash.items(): assert os.path.exists(key) if value: assert value == self.md5sum(key) def test_create_backup_index_max_file_length (self): """ Creates a full backup with a too large index file for the upper bound of the GCM encryption. Since the index file has a fixed IV file counter of AES_GCM_IV_CNT_INDEX, we expect the crypto layer to abort. 60+ GB of (potentially compressed) index file should last for a while... """ if self.MODE_COMPRESSES is True: raise SkipTest ("GCM file length test not meaningful with compression.") if self.ENCRYPTION is None: raise SkipTest ("GCM file length applies only to encrypted backups.") new_max = 5000 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \ ("I am fully aware that this will void my warranty.", new_max) password, paramversion = self.ENCRYPTION deltatar = DeltaTar (mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) self.hash = dict () os.makedirs ("source_dir2") for i in range (42): f = "source_dir2/dummy_%rd" % i self.hash [f] = self.create_file (f, i) with self.assertRaises (crypto.InvalidFileCounter): deltatar.create_full_backup \ (source_path="source_dir2", backup_path="backup_dir") shutil.rmtree ("source_dir2") def test_check_index_checksum(self): ''' Creates a full backup and checks the index' checksum of files ''' password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") index_filename = deltatar.index_name_func(True) index_path = os.path.join("backup_dir", index_filename) f = open(index_path, 'rb') crc = None checked = False began_list = False while True: l = f.readline() if l == b'': break if b'BEGIN-FILE-LIST' in l: crc = binascii.crc32(l) & 0xFFFFffff began_list = True elif b'END-FILE-LIST' in l: crc = binascii.crc32(l, crc) & 0xffffffff # next line contains the crc data = json.loads(f.readline().decode("UTF-8")) assert data['type'] == 'file-list-checksum' assert data['checksum'] == crc checked = True break elif began_list: crc = binascii.crc32(l, crc) & 0xffffffff f.close() def test_restore_multivol(self): ''' Creates a full backup without any filtering with multiple volumes and restore it. ''' if ':gz' in self.MODE: raise SkipTest('compression information is lost when creating ' 'multiple volumes with no Stream') password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) self.hash = dict() os.makedirs('source_dir2') self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000) self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000) # create first backup deltatar.create_full_backup( source_path="source_dir2", backup_path="backup_dir", max_volume_size=1) assert os.path.exists("backup_dir") assert os.path.exists(os.path.join("backup_dir", deltatar.volume_name_func("backup_dir", True, 0))) if self.MODE_COMPRESSES: n_vols = 1 else: n_vols = 2 for i_vol in range(n_vols): assert os.path.exists(os.path.join("backup_dir", deltatar.volume_name_func("backup_dir", True, i_vol))) assert not os.path.exists(os.path.join("backup_dir", deltatar.volume_name_func("backup_dir", True, n_vols))) shutil.rmtree("source_dir2") tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) # this should automatically restore all volumes deltatar.restore_backup(target_path="source_dir2", backup_tar_path=tar_path) for key, value in self.hash.items(): assert os.path.exists(key) if value: assert value == self.md5sum(key) def test_restore_multivol_split(self): ''' Creates a full backup without any filtering with multiple volumes with big files bigger than the max volume size and restore it. ''' if self.MODE.startswith(':') or self.MODE.startswith('|'): raise SkipTest('this test only works for uncompressed ' 'or concat compressed modes') password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) self.hash = dict() os.makedirs('source_dir2') self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 3*1024*1024) self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 4*1024*1024) self.hash["source_dir2/huge2"] = self.create_file("source_dir2/huge2", 4*1024*1024) # create first backup deltatar.create_full_backup( source_path="source_dir2", backup_path="backup_dir", max_volume_size=2) assert os.path.exists("backup_dir") assert os.path.exists(os.path.join("backup_dir", deltatar.volume_name_func("backup_dir", True, 0))) if self.MODE_COMPRESSES: n_vols = 1 else: n_vols = 6 for i_vol in range(n_vols): assert os.path.exists(os.path.join("backup_dir", deltatar.volume_name_func("backup_dir", True, i_vol))) assert not os.path.exists(os.path.join("backup_dir", deltatar.volume_name_func("backup_dir", True, n_vols))) shutil.rmtree("source_dir2") index_filename = deltatar.index_name_func(True) index_path = os.path.join("backup_dir", index_filename) deltatar.restore_backup(target_path="source_dir2", backup_indexes_paths=[index_path]) for key, value in self.hash.items(): assert os.path.exists(key) if value: assert value == self.md5sum(key) def test_full_backup_index_extra_data(self): ''' Tests that the index file for a full backup can store extra_data and that this data can be retrieved. ''' password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) extra_data = dict( hola="caracola", otra_cosa=[1, "lista"], y_otra=dict(bola=1.1) ) deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir", extra_data=extra_data) index_filename = deltatar.index_name_func(is_full=True) index_path = os.path.join("backup_dir", index_filename) # iterate_index_path retrieves extra_data, and thus we can then compare index_it = deltatar.iterate_index_path(index_path) self.assertEqual(index_it.extra_data, extra_data) def test_diff_backup_index_extra_data(self): ''' Tests that the index file for a diff backup can store extra_data and that this data can be retrieved. ''' password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) extra_data = dict( hola="caracola", otra_cosa=[1, "lista"], y_otra=dict(bola=1.1) ) # do first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") prev_index_filename = deltatar.index_name_func(is_full=True) prev_index_path = os.path.join("backup_dir", prev_index_filename) # create empty diff backup deltatar.create_diff_backup("source_dir", "backup_dir2", prev_index_path, extra_data=extra_data) index_filename = deltatar.index_name_func(is_full=False) index_path = os.path.join("backup_dir2", index_filename) # iterate_index_path retrieves extra_data, and thus we can then compare index_it = deltatar.iterate_index_path(index_path) self.assertEqual(index_it.extra_data, extra_data) def test_restore_multivol2(self): ''' Creates a full backup without any filtering with multiple volumes and restore it. ''' password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) shutil.copytree(self.GIT_DIR, "source_dir2") # create first backup deltatar.create_full_backup( source_path="source_dir2", backup_path="backup_dir", max_volume_size=1) assert os.path.exists("backup_dir") assert os.path.exists(os.path.join("backup_dir", deltatar.volume_name_func("backup_dir", True, 0))) shutil.rmtree("source_dir2") tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) # this should automatically restore all volumes deltatar.restore_backup(target_path="source_dir2", backup_tar_path=tar_path) self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar) def test_restore_multivol_manual_from_index(self): ''' Creates a full backup without any filtering with multiple volumes and restore it. ''' # this test only works for uncompressed or concat compressed modes if self.MODE.startswith(':') or self.MODE.startswith('|'): raise SkipTest('this test only works for uncompressed ' 'or concat compressed modes') password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) self.hash = dict() os.makedirs('source_dir2') self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000) self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000) # create first backup deltatar.create_full_backup( source_path="source_dir2", backup_path="backup_dir", max_volume_size=1) assert os.path.exists("backup_dir") assert os.path.exists(os.path.join("backup_dir", deltatar.volume_name_func("backup_dir", True, 0))) if self.MODE_COMPRESSES: n_vols = 1 else: n_vols = 2 for i_vol in range(n_vols): assert os.path.exists(os.path.join("backup_dir", deltatar.volume_name_func("backup_dir", True, i_vol))) assert not os.path.exists(os.path.join("backup_dir", deltatar.volume_name_func("backup_dir", True, n_vols))) shutil.rmtree("source_dir2") tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) index_filename = deltatar.index_name_func(True) index_path = os.path.join("backup_dir", index_filename) # this should automatically restore the huge file f = deltatar.open_auxiliary_file(index_path, 'r') offset = None while True: l = f.readline() if not len(l): break data = json.loads(l.decode('UTF-8')) if data.get('type', '') == 'file' and\ deltatar.unprefixed(data['path']) == "huge": offset = data['offset'] break assert offset is not None fo = open(tar_path, 'rb') fo.seek(offset) def new_volume_handler(mode, tarobj, base_name, volume_number): suf = DeltaTar._DeltaTar__file_extensions_dict[mode] if self.ENCRYPTION is not None: # deltatar module is shadowed here suf += "." + deltatar_PDTCRYPT_EXTENSION tarobj.open_volume(datetime.now().strftime( "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf) new_volume_handler = partial(new_volume_handler, self.MODE) crypto_ctx = None if self.ENCRYPTION is not None: crypto_ctx = crypto.Decrypt (password) tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo, new_volume_handler=new_volume_handler, encryption=crypto_ctx) member = tarobj.next() member.path = deltatar.unprefixed(member.path) member.name = deltatar.unprefixed(member.name) tarobj.extract(member) tarobj.close() fo.close() assert self.hash['source_dir2/huge'] == self.md5sum('huge') os.unlink("huge") def test_restore_manual_from_index_twice (self): """ Creates a full backup and restore the same file twice. This *must* fail when encryption is active. Currently, tarfile.py’s *_Stream* class conveniently disallows seeking backwards within the same file. This prevents the encryption layer from exploding due to a reused IV in an overall valid archive. This test anticipates possible future mistakes since it’s entirely feasible to implement backward seeks for *_Stream* with concat mode. """ # this test only works for uncompressed or concat compressed modes if self.MODE.startswith("|") or self.MODE_COMPRESSES: raise SkipTest("this test only works for uncompressed " "or concat compressed modes") password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) self.hash = dict() os.makedirs("source_dir2") self.hash["source_dir2/samefile"] = \ self.create_file("source_dir2/samefile", 1 * 1024) # create first backup deltatar.create_full_backup( source_path="source_dir2", backup_path="backup_dir") assert os.path.exists("backup_dir") assert os.path.exists(os.path.join("backup_dir", deltatar.volume_name_func("backup_dir", True, 0))) shutil.rmtree("source_dir2") tar_filename = deltatar.volume_name_func("backup_dir", True, 0) tar_path = os.path.join("backup_dir", tar_filename) index_filename = deltatar.index_name_func(True) index_path = os.path.join("backup_dir", index_filename) f = deltatar.open_auxiliary_file(index_path, "r") offset = None while True: l = f.readline() if not len(l): break data = json.loads(l.decode("UTF-8")) if data.get("type", "") == "file" and\ deltatar.unprefixed(data["path"]) == "samefile": offset = data["offset"] break assert offset is not None fo = open(tar_path, "rb") fo.seek(offset) crypto_ctx = None if self.ENCRYPTION is not None: crypto_ctx = crypto.Decrypt (password) tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo, encryption=crypto_ctx) member = tarobj.next() member.path = deltatar.unprefixed(member.path) member.name = deltatar.unprefixed(member.name) # extract once … tarobj.extract(member) assert self.hash["source_dir2/samefile"] == self.md5sum("samefile") # … and twice try: tarobj.extract(member) except tarfile.StreamError: if crypto_ctx is not None: pass # good: seeking backwards not allowed else: raise tarobj.close() fo.close() assert self.hash["source_dir2/samefile"] == self.md5sum("samefile") os.unlink("samefile") def test_restore_from_index(self): ''' Restores a full backup using an index file. ''' if self.MODE.startswith(':') or self.MODE.startswith('|'): raise SkipTest('this test only works for uncompressed ' 'or concat compressed modes') password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") shutil.rmtree("source_dir") # this should automatically restore all volumes index_filename = deltatar.index_name_func(True) index_path = os.path.join("backup_dir", index_filename) deltatar.restore_backup(target_path="source_dir", backup_indexes_paths=[index_path]) for key, value in self.hash.items(): assert os.path.exists(key) if value: assert value == self.md5sum(key) def test_restore_multivol_from_index(self): ''' Restores a full multivolume backup using an index file. ''' if self.MODE.startswith(':') or self.MODE.startswith('|'): raise SkipTest('this test only works for uncompressed ' 'or concat compressed modes') password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir", max_volume_size=2) shutil.rmtree("source_dir") # this should automatically restore all volumes index_filename = deltatar.index_name_func(True) index_path = os.path.join("backup_dir", index_filename) deltatar.restore_backup(target_path="source_dir", backup_indexes_paths=[index_path]) for key, value in self.hash.items(): assert os.path.exists(key) if value: assert value == self.md5sum(key) def test_create_basic_filtering(self): ''' Tests create backup basic filtering. ''' password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger, included_files=["test", "small"], excluded_files=["test/huge"]) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") shutil.rmtree("source_dir") tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) deltatar.restore_backup(target_path="source_dir", backup_tar_path=tar_path) assert os.path.exists("source_dir/small") assert os.path.exists("source_dir/test") assert os.path.exists("source_dir/test/huge2") assert os.path.exists("source_dir/test/test2") assert not os.path.exists("source_dir/test/huge") assert not os.path.exists("source_dir/big") def test_create_filter_func(self): ''' Tests create backup basic filtering. ''' visited_paths = [] def filter_func(visited_paths, path): if path not in visited_paths: visited_paths.append(path) return True filter_func = partial(filter_func, visited_paths) password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger, included_files=["test", "small"], excluded_files=["test/huge"], filter_func=filter_func) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") shutil.rmtree("source_dir") tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) deltatar.restore_backup(target_path="source_dir", backup_tar_path=tar_path) assert set(visited_paths) == set([ 'small', 'test', 'test/huge2', 'test/test2' ]) def test_create_filter_out_func(self): ''' Tests create backup basic filtering. ''' visited_paths = [] def filter_func(visited_paths, path): ''' Filter out everything ''' if path not in visited_paths: visited_paths.append(path) return False filter_func = partial(filter_func, visited_paths) password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger, included_files=["test", "small"], excluded_files=["test/huge"], filter_func=filter_func) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") shutil.rmtree("source_dir") tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) deltatar.restore_backup(target_path="source_dir", backup_tar_path=tar_path) assert set(visited_paths) == set([ 'small', 'test' ]) # check that effectively no file was backed up assert not os.path.exists("source_dir/small") assert not os.path.exists("source_dir/big") assert not os.path.exists("source_dir/test") def test_restore_index_basic_filtering(self): ''' Creates a backup, and then filter when doing the index based restore. ''' if self.MODE.startswith(':') or self.MODE.startswith('|'): raise SkipTest('this test only works for uncompressed ' 'or concat compressed modes') password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") shutil.rmtree("source_dir") index_filename = deltatar.index_name_func(True) index_path = os.path.join("backup_dir", index_filename) deltatar.included_files = ["test", "small"] deltatar.excluded_files = ["test/huge"] deltatar.restore_backup(target_path="source_dir", backup_indexes_paths=[index_path]) assert os.path.exists("source_dir/small") assert os.path.exists("source_dir/test") assert os.path.exists("source_dir/test/huge2") assert os.path.exists("source_dir/test/test2") assert not os.path.exists("source_dir/test/huge") assert not os.path.exists("source_dir/big") def test_restore_index_filter_func(self): ''' Creates a backup, and then filter when doing the index based restore, using the filter function. ''' if self.MODE.startswith(':') or self.MODE.startswith('|'): raise SkipTest('this test only works for uncompressed ' 'or concat compressed modes') visited_paths = [] def filter_func(visited_paths, path): if path not in visited_paths: visited_paths.append(path) return True filter_func = partial(filter_func, visited_paths) password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") shutil.rmtree("source_dir") index_filename = deltatar.index_name_func(True) index_path = os.path.join("backup_dir", index_filename) deltatar.included_files = ["test", "small"] deltatar.excluded_files = ["test/huge"] deltatar.filter_func = filter_func deltatar.restore_backup(target_path="source_dir", backup_indexes_paths=[index_path]) assert set(visited_paths) == set([ 'small', 'test', 'test/huge2', 'test/test2' ]) def test_restore_tar_basic_filtering(self): ''' Creates a backup, and then filter when doing the tar based restore. ''' password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") shutil.rmtree("source_dir") deltatar.included_files = ["test", "small"] deltatar.excluded_files = ["test/huge"] tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) deltatar.restore_backup(target_path="source_dir", backup_tar_path=tar_path) assert os.path.exists("source_dir/small") assert os.path.exists("source_dir/test") assert os.path.exists("source_dir/test/huge2") assert os.path.exists("source_dir/test/test2") assert not os.path.exists("source_dir/test/huge") assert not os.path.exists("source_dir/big") def test_restore_tar_filter_func(self): ''' Creates a backup, and then filter when doing the tar based restore, using the filter function. ''' visited_paths = [] def filter_func(visited_paths, path): if path not in visited_paths: visited_paths.append(path) return True filter_func = partial(filter_func, visited_paths) password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") shutil.rmtree("source_dir") index_filename = deltatar.index_name_func(True) index_path = os.path.join("backup_dir", index_filename) deltatar.included_files = ["test", "small"] deltatar.excluded_files = ["test/huge"] deltatar.filter_func = filter_func tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) deltatar.restore_backup(target_path="source_dir", backup_tar_path=tar_path) assert set(visited_paths) == set([ 'small', 'test', 'test/huge2', 'test/test2' ]) def test_filter_path_regexp(self): ''' Test specifically the deltatar.filter_path function with regular expressions ''' included_files = [ re.compile('^test/(hola|caracola/caracolero)(|/.*)$'), re.compile('^yes$'), 'testing' ] excluded_files = [ re.compile('^testing/in_the'), ] deltatar = DeltaTar(mode=self.MODE, included_files=included_files, excluded_files=excluded_files) # assert valid and invalid paths assert deltatar.filter_path('test/hola') assert deltatar.filter_path('test/hola/any/thing') assert deltatar.filter_path('test/caracola/caracolero') assert deltatar.filter_path('test/caracola/caracolero/yeah') assert deltatar.filter_path('test/caracola/caracolero/whatever/aa') assert deltatar.filter_path('yes') assert deltatar.filter_path('testing') assert deltatar.filter_path('testing/yes') assert deltatar.filter_path('testing/in_th') assert not deltatar.filter_path('something') assert not deltatar.filter_path('other/thing') assert not deltatar.filter_path('test_ing') assert not deltatar.filter_path('test/hola_lala') assert not deltatar.filter_path('test/agur') assert not deltatar.filter_path('testing_something') assert not deltatar.filter_path('yeso') assert not deltatar.filter_path('yes/o') assert not deltatar.filter_path('yes_o') assert not deltatar.filter_path('testing/in_the') assert not deltatar.filter_path('testing/in_the_field') assert not deltatar.filter_path('testing/in_the/field') def test_filter_path_parent(self): ''' Test specifically the deltatar.filter_path function for parent matching ''' included_files = [ 'testing/path/to/some/thing' ] deltatar = DeltaTar(mode=self.MODE, included_files=included_files) # assert valid and invalid paths assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH assert deltatar.filter_path('testing/path/to/some/thing') == MATCH assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH assert deltatar.filter_path('testing/something/else') == NO_MATCH def test_parent_matching_simple_full_backup(self): ''' Create a full backup using parent matching ''' included_files = [ 'test/huge2' ] password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger, included_files=included_files) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") shutil.rmtree("source_dir") tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) deltatar = DeltaTar(mode=self.MODE, password=password, logger=self.consoleLogger) deltatar.restore_backup(target_path="source_dir", backup_tar_path=tar_path) assert os.path.exists('source_dir/test/huge2') assert os.path.exists('source_dir/test/') assert not os.path.exists('source_dir/test/huge') assert not os.path.exists('source_dir/big') assert not os.path.exists('source_dir/small') def test_parent_matching_simple_full_backup_restore(self): ''' Create a full backup and restores it using parent matching ''' included_files = [ 'test/huge2' ] password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") shutil.rmtree("source_dir") tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) deltatar = DeltaTar(mode=self.MODE, password=password, logger=self.consoleLogger, included_files=included_files) deltatar.restore_backup(target_path="source_dir", backup_tar_path=tar_path) assert os.path.exists('source_dir/test/huge2') assert os.path.exists('source_dir/test/') assert not os.path.exists('source_dir/test/huge') assert not os.path.exists('source_dir/big') assert not os.path.exists('source_dir/small') def test_parent_matching_index_full_backup_restore(self): ''' Create a full backup and restores it using parent matching ''' included_files = [ 'test/huge2' ] password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") shutil.rmtree("source_dir") tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) deltatar = DeltaTar(mode=self.MODE, password=password, logger=self.consoleLogger, included_files=included_files) deltatar.restore_backup(target_path="source_dir", backup_tar_path=tar_path) assert os.path.exists('source_dir/test/huge2') assert os.path.exists('source_dir/test/') assert not os.path.exists('source_dir/test/huge') assert not os.path.exists('source_dir/big') assert not os.path.exists('source_dir/small') def test_collate_iterators(self): ''' Tests the collate iterators functionality with two exact directories, using an index iterator from a backup and the exact same source dir. ''' password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") cwd = os.getcwd() index_filename = deltatar.index_name_func(is_full=True) index_path = os.path.join(cwd, "backup_dir", index_filename) index_it = deltatar.iterate_index_path(index_path) os.chdir('source_dir') dir_it = deltatar._recursive_walk_dir('.') path_it = deltatar.jsonize_path_iterator(dir_it) try: for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it): assert deltatar._equal_stat_dicts(path1, path2) finally: os.chdir(cwd) def test_collate_iterators_diffdirs(self): ''' Use the collate iterators functionality with two different directories. It must behave in an expected way. ''' self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100) password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") self.hash["source_dir/z"] = self.create_file("source_dir/z", 100) cwd = os.getcwd() index_filename = deltatar.index_name_func(is_full=True) index_path = os.path.join(cwd, "backup_dir", index_filename) index_it = deltatar.iterate_index_path(index_path) os.chdir('source_dir') dir_it = deltatar._recursive_walk_dir('.') path_it = deltatar.jsonize_path_iterator(dir_it) try: for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it): if path2['path'] == 'z': assert not path1 else: assert deltatar._equal_stat_dicts(path1, path2) finally: os.chdir(cwd) def test_collate_iterators_diffdirs2(self): ''' Use the collate iterators functionality with two different directories. It must behave in an expected way. ''' password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") # add some new files and directories os.makedirs('source_dir/bigdir') self.hash["source_dir/bigdir"] = "" self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100) self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500) self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100) cwd = os.getcwd() index_filename = deltatar.index_name_func(is_full=True) index_path = os.path.join(cwd, "backup_dir", index_filename) index_it = deltatar.iterate_index_path(index_path) os.chdir('source_dir') dir_it = deltatar._recursive_walk_dir('.') path_it = deltatar.jsonize_path_iterator(dir_it) visited_pairs = [] try: for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it): visited_pairs.append( (deltatar.unprefixed(path1['path']) if path1 else None, path2['path'] if path2 else None) ) finally: assert visited_pairs == [ (u'big', u'big'), (None, u'bigdir'), (u'small', u'small'), (u'test', u'test'), (None, u'zzzz'), (None, u'bigdir/a'), (None, u'bigdir/b'), (u'test/huge', u'test/huge'), (u'test/huge2', u'test/huge2'), (u'test/test2', u'test/test2'), ] os.chdir(cwd) def test_create_empty_diff_backup(self): ''' Creates an empty (no changes) backup diff ''' password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") prev_index_filename = deltatar.index_name_func(is_full=True) prev_index_path = os.path.join("backup_dir", prev_index_filename) deltatar.create_diff_backup("source_dir", "backup_dir2", prev_index_path) # check index items index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False)) index_it = deltatar.iterate_index_path(index_path) n = 0 for i in index_it: n += 1 assert i[0]['path'].startswith("list://") assert n == 6 # check the tar file assert os.path.exists("backup_dir2") shutil.rmtree("source_dir") tar_filename = deltatar.volume_name_func('backup_dir2', is_full=False, volume_number=0) tar_path = os.path.join("backup_dir2", tar_filename) # no file restored, because the diff was empty deltatar.restore_backup(target_path="source_dir", backup_tar_path=tar_path) assert len(os.listdir("source_dir")) == 0 def test_create_diff_backup1(self): ''' Creates a diff backup when there are new files ''' password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") prev_index_filename = deltatar.index_name_func(is_full=True) prev_index_path = os.path.join("backup_dir", prev_index_filename) # add some new files and directories os.makedirs('source_dir/bigdir') self.hash["source_dir/bigdir"] = "" os.unlink("source_dir/small") self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100) self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500) self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100) deltatar.create_diff_backup("source_dir", "backup_dir2", prev_index_path) # check index items index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False)) index_it = deltatar.iterate_index_path(index_path) l = [i[0]['path'] for i in index_it] assert l == [ 'list://big', 'snapshot://bigdir', 'delete://small', 'list://test', 'snapshot://zzzz', 'snapshot://bigdir/a', 'snapshot://bigdir/b', 'list://test/huge', 'list://test/huge2', 'list://test/test2', ] # check the tar file assert os.path.exists("backup_dir2") shutil.rmtree("source_dir") # create source_dir with the small file, that will be then deleted by # the restore_backup os.mkdir("source_dir") open("source_dir/small", 'wb').close() tar_filename = deltatar.volume_name_func('backup_dir2', is_full=False, volume_number=0) tar_path = os.path.join("backup_dir2", tar_filename) # restore the backup, this will create only the new files deltatar.restore_backup(target_path="source_dir", backup_tar_path=tar_path) # the order doesn't matter assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir']) def test_restore_from_index_diff_backup(self): ''' Creates a full backup, modifies some files, creates a diff backup, then restores the diff backup from zero. ''' if self.MODE.startswith(':') or self.MODE.startswith('|'): raise SkipTest('this test only works for uncompressed ' 'or concat compressed modes') password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") prev_index_filename = deltatar.index_name_func(is_full=True) prev_index_path = os.path.join("backup_dir", prev_index_filename) # add some new files and directories os.makedirs('source_dir/bigdir') self.hash["source_dir/bigdir"] = "" os.unlink("source_dir/small") self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100) self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500) self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100) deltatar.create_diff_backup("source_dir", "backup_dir2", prev_index_path) # apply diff backup in target_dir index_filename = deltatar.index_name_func(is_full=False) index_path = os.path.join("backup_dir2", index_filename) deltatar.restore_backup("target_dir", backup_indexes_paths=[index_path, prev_index_path]) # then compare the two directories source_dir and target_dir and check # they are the same self.check_equal_dirs('source_dir', 'target_dir', deltatar) def test_restore_from_index_diff_backup2(self): ''' Creates a full backup, modifies some files, creates a diff backup, then restores the diff backup with the full backup as a starting point. ''' if self.MODE.startswith(':') or self.MODE.startswith('|'): raise SkipTest('this test only works for uncompressed ' 'or concat compressed modes') password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") prev_index_filename = deltatar.index_name_func(is_full=True) prev_index_path = os.path.join("backup_dir", prev_index_filename) # add some new files and directories os.makedirs('source_dir/bigdir') self.hash["source_dir/bigdir"] = "" os.unlink("source_dir/small") self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100) self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500) self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100) shutil.rmtree("source_dir/test") deltatar.create_diff_backup("source_dir", "backup_dir2", prev_index_path) # first restore initial backup in target_dir tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0) tar_path = os.path.join("backup_dir", tar_filename) deltatar.restore_backup("target_dir", backup_tar_path=tar_path) # then apply diff backup in target_dir index_filename = deltatar.index_name_func(is_full=False) index_path = os.path.join("backup_dir2", index_filename) try: deltatar.restore_backup("target_dir", backup_indexes_paths=[index_path, prev_index_path]) # then compare the two directories source_dir and target_dir and check # they are the same self.check_equal_dirs('source_dir', 'target_dir', deltatar) except FileNotFoundError as exn: if self.FSTEST is None: # fs traversal may fail here raise exn def test_restore_from_index_diff_backup3(self): ''' Creates a full backup of self.GIT_DIR, modifies some random files, creates a diff backup, then restores the diff backup with the full backup as a starting point. ''' if self.MODE.startswith(':') or self.MODE.startswith('|'): raise SkipTest('this test only works for uncompressed ' 'or concat compressed modes') password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) shutil.rmtree("source_dir") shutil.copytree(self.GIT_DIR, "source_dir") shutil.copytree(self.GIT_DIR, "source_dir_diff") # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir") prev_index_filename = deltatar.index_name_func(is_full=True) prev_index_path = os.path.join("backup_dir", prev_index_filename) # alter the source_dir randomly source_it = deltatar._recursive_walk_dir('source_dir_diff') for path in source_it: # if path doesn't exist (might have previously removed) ignore it. # also ignore it (i.e. do not change it) 70% of the time if not os.path.exists(path) or random.random() < 0.7: continue # remove the file if os.path.isdir(path): shutil.rmtree(path) else: os.unlink(path) deltatar.create_diff_backup("source_dir_diff", "backup_dir2", prev_index_path) # first restore initial backup in target_dir tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0) tar_path = os.path.join("backup_dir", tar_filename) deltatar.restore_backup("target_dir", backup_tar_path=tar_path) # and check that target_dir equals to source_dir (which is the same as # self.GIT_DIR initially) self.check_equal_dirs('source_dir', 'target_dir', deltatar) # then apply diff backup in target_dir index_filename = deltatar.index_name_func(is_full=False) index_path = os.path.join("backup_dir2", index_filename) deltatar.restore_backup("target_dir", backup_indexes_paths=[index_path, prev_index_path]) # and check that target_dir equals to source_dir_diff (the randomly # altered self.GIT_DIR directory) self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar) # then delete target_dir and apply diff backup from zero and check again shutil.rmtree("target_dir") deltatar.restore_backup("target_dir", backup_indexes_paths=[index_path, prev_index_path]) # and check that target_dir equals to source_dir_diff (the randomly # altered self.GIT_DIR directory) self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar) def test_restore_from_index_diff_backup3_multivol(self): ''' Creates a full backup of self.GIT_DIR, modifies some random files, creates a diff backup, then restores the diff backup with the full backup as a starting point. ''' if self.MODE.startswith(':') or self.MODE.startswith('|'): raise SkipTest('this test only works for uncompressed ' 'or concat compressed modes') password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) shutil.rmtree("source_dir") shutil.copytree(self.GIT_DIR, "source_dir") shutil.copytree(self.GIT_DIR, "source_dir_diff") # create first backup deltatar.create_full_backup( source_path="source_dir", backup_path="backup_dir", max_volume_size=1) prev_index_filename = deltatar.index_name_func(is_full=True) prev_index_path = os.path.join("backup_dir", prev_index_filename) # alter the source_dir randomly source_it = deltatar._recursive_walk_dir('source_dir_diff') for path in source_it: # if path doesn't exist (might have previously removed) ignore it. # also ignore it (i.e. do not change it) 70% of the time if not os.path.exists(path) or random.random() < 0.7: continue # remove the file if os.path.isdir(path): shutil.rmtree(path) else: os.unlink(path) deltatar.create_diff_backup("source_dir_diff", "backup_dir2", prev_index_path, max_volume_size=1) # first restore initial backup in target_dir tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) if self.FSTEST is not None: return # the below will fail in stat checks, but that is expected deltatar.restore_backup("target_dir", backup_tar_path=tar_path) # and check that target_dir equals to source_dir (which is the same as # self.GIT_DIR initially) self.check_equal_dirs('source_dir', 'target_dir', deltatar) # then apply diff backup in target_dir index_filename = deltatar.index_name_func(is_full=False) index_path = os.path.join("backup_dir2", index_filename) deltatar.restore_backup("target_dir", backup_indexes_paths=[index_path, prev_index_path]) # and check that target_dir equals to source_dir_diff (the randomly # altered self.GIT_DIR directory) self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar) # then delete target_dir and apply diff backup from zero and check again shutil.rmtree("target_dir") deltatar.restore_backup("target_dir", backup_indexes_paths=[index_path, prev_index_path]) # and check that target_dir equals to source_dir_diff (the randomly # altered self.GIT_DIR directory) self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar) def check_equal_dirs(self, path1, path2, deltatar): ''' compare the two directories source_dir and target_dir and check # they are the same ''' source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True) source_it = deltatar.jsonize_path_iterator(source_it, strip=1) target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True) target_it = deltatar.jsonize_path_iterator(target_it, strip=1) while True: try: sitem = next(source_it) titem = next(target_it) except StopIteration: try: titem = next(target_it) raise Exception("iterators do not stop at the same time") except StopIteration: break try: assert deltatar._equal_stat_dicts(sitem[0], titem[0]) except Exception as e: print("SITEM: " + str(sitem)) print("TITEM: " + str(titem)) raise e def test_create_no_symlinks(self): ''' Creates a full backup from different varieties of symlinks. The extracted archive may not contain any symlinks but the file contents ''' os.system("rm -rf source_dir") os.makedirs("source_dir/symlinks") fd = os.open("source_dir/symlinks/valid_linkname", os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) os.write(fd, b"valid link target for symlink tests; please ignore\n") os.close(fd) # first one is good, the rest points nowhere self.create_symlink("valid_linkname", "source_dir/symlinks/whatever") self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy") self.create_symlink("burp/../buzz", "source_dir/symlinks/blup") self.create_symlink("../../../../biz", "source_dir/symlinks/bleep") password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup(source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") shutil.rmtree("source_dir") assert not os.path.exists("source_dir") tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) deltatar.restore_backup(target_path="source_dir", backup_tar_path=tar_path) for _r, _ds, fs in os.walk("source_dir/symlinks"): # only the valid link plus the linked file may be found in the # extracted archive assert len(fs) == 2 for f in fs: # the link must have been resolved and file contents must match # the linked file assert not os.path.islink(f) with open("source_dir/symlinks/valid_linkname") as a: with open("source_dir/symlinks/whatever") as b: assert a.read() == b.read() def test_restore_with_symlinks(self): ''' Creates a full backup containing different varieties of symlinks. All of them must be filtered out. ''' password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup(source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") shutil.rmtree("source_dir") tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) # add symlinks to existing archive def add_symlink (a, name, dst): l = tarfile.TarInfo("snapshot://%s" % name) l.type = tarfile.SYMTYPE l.linkname = dst a.addfile(l) return name try: with tarfile.open(tar_path,mode="a") as a: checkme = \ [ add_symlink(a, "symlinks/foo", "internal-file") , add_symlink(a, "symlinks/bar", "/absolute/path") , add_symlink(a, "symlinks/baz", "../parent/../../paths") ] except tarfile.ReadError as e: if self.MODE == '#' or self.MODE.endswith ("gz"): checkme = [] else: raise except ValueError as e: if self.MODE.startswith ('#'): checkme = [] else: raise deltatar.restore_backup(target_path="source_dir", backup_tar_path=tar_path) # check what happened to our symlinks for name in checkme: fullpath = os.path.join("source_dir", name) assert not os.path.exists(fullpath) def test_restore_malicious_symlinks(self): ''' Creates a full backup containing a symlink and a file of the same name. This simulates a symlink attack with a link pointing to some external path that is abused to write outside the extraction prefix. ''' password, paramversion = self.ENCRYPTION or (None, None) deltatar = DeltaTar(mode=self.MODE, password=password, crypto_paramversion=paramversion, logger=self.consoleLogger) # create first backup deltatar.create_full_backup(source_path="source_dir", backup_path="backup_dir") assert os.path.exists("backup_dir") shutil.rmtree("source_dir") tar_filename = deltatar.volume_name_func('backup_dir', True, 0) tar_path = os.path.join("backup_dir", tar_filename) # add symlinks to existing archive def add_symlink (a, name, dst): l = tarfile.TarInfo("snapshot://%s" % name) l.type = tarfile.SYMTYPE l.linkname = dst a.addfile(l) def add_file (a, name): f = tarfile.TarInfo("snapshot://%s" % name) f.type = tarfile.REGTYPE a.addfile(f) testpath = "symlinks/pernicious-link" testdst = "/tmp/does/not/exist" try: with tarfile.open(tar_path, mode="a") as a: add_symlink(a, testpath, testdst) add_symlink(a, testpath, testdst+"X") add_symlink(a, testpath, testdst+"XXX") add_file(a, testpath) except tarfile.ReadError as e: if self.MODE == '#' or self.MODE.endswith ("gz"): pass else: raise except ValueError as e: if self.MODE.startswith ('#'): pass # O_APPEND of concat archives not feasible else: raise deltatar.restore_backup(target_path="source_dir", backup_tar_path=tar_path) # check whether the link was extracted; deltatar seems to only ever # retrieve the first item it finds for a given path which in the case # at hand is a symlink to some non-existent path fullpath = os.path.join("source_dir", testpath) assert not os.path.lexists(fullpath) class TarfileTest(BaseTest): pwd = None def setUp(self): self.pwd = os.getcwd() os.makedirs("backup_dir", exist_ok=True) def tearDown(self): ''' Remove temporary files created by unit tests and restore the API functions in *os*. ''' os.chdir(self.pwd) shutil.rmtree("backup_dir") def test_extract_malicious_symlinks_unlink(self): ''' Test symlink mitigation: The destination must be deleted prior to extraction. ''' tar_path = os.path.join("backup_dir", "malicious-archive") # add symlinks to existing archive def add_symlink (a, name, dst): l = tarfile.TarInfo(name) l.type = tarfile.SYMTYPE l.linkname = dst a.addfile(l) def add_file (a, name): f = tarfile.TarInfo(name) f.type = tarfile.REGTYPE a.addfile(f) # Add a symlink pointing to must-not-exist, then append a file # object at the same path. The file must not end up at # “must-not-exist” (the pointee) but at “not-as-symlink” (the # pointer) that was unlinked prior to extraction. testpath = "test/not-a-symlink" testdst = "must-not-exist" try: with tarfile.open(tar_path, mode="w") as a: add_symlink(a, testpath, testdst) add_file(a, testpath) except tarfile.ReadError as e: if self.MODE == '#' or self.MODE.endswith ("gz"): pass else: raise except ValueError as e: if self.MODE.startswith ('#'): pass # O_APPEND of concat archives not feasible else: raise def test_extract(dst, unlink): with tarfile.open(tar_path, mode="r") as a: os.makedirs(dst, exist_ok=True) olddir = os.getcwd() try: os.chdir(dst) a.extractall(unlink=unlink) finally: os.chdir(olddir) fullpath = os.path.join(dst, testpath) fulldst = os.path.join(dst, "test/%s" % testdst) if unlink is True: # Check whether the file was extracted. The object at the # symlink location (source) must be the file. The must not # be an object at the symlink destination. assert not os.path.islink(fullpath) assert not os.path.exists(fulldst) else: # Without unlink protection, the file must be found at the # symlink destination with the symlink intact. assert os.path.islink(fullpath) assert os.path.exists(fulldst) test_extract("test_dst_unlinked" , True) test_extract("test_dst_symlinked", False) def fsapi_access_true (self): """ Chicanery for testing improper use of the *os* module. """ def yes (*_a, **_ka): return True self.FSAPI_SAVED.append (("access", getattr (os, "access"))) setattr (os, "access", yes) class DeltaTar2Test(DeltaTarTest): ''' Same as DeltaTar but with specific ":" mode ''' MODE = ':' class DeltaTarStreamTest(DeltaTarTest): ''' Same as DeltaTar but with specific uncompressed stream mode ''' MODE = '|' class DeltaTarGzipTest(DeltaTarTest): ''' Same as DeltaTar but with specific gzip mode ''' MODE = ':gz' MODE_COMPRESSES = True class DeltaTarGzipStreamTest(DeltaTarTest): ''' Same as DeltaTar but with specific gzip stream mode ''' MODE = '|gz' MODE_COMPRESSES = True @skip('Bz2 tests are too slow..') class DeltaTarBz2Test(DeltaTarTest): ''' Same as DeltaTar but with specific bz2 mode ''' MODE = ':bz2' MODE_COMPRESSES = True @skip('Bz2 tests are too slow..') class DeltaTarBz2StreamTest(DeltaTarTest): ''' Same as DeltaTar but with specific bz2 stream mode ''' MODE = '|bz2' MODE_COMPRESSES = True class DeltaTarGzipConcatTest(DeltaTarTest): ''' Same as DeltaTar but with specific gzip concat stream mode ''' MODE = '#gz' MODE_COMPRESSES = True class DeltaTarGzipAes128ConcatTest(DeltaTarTest): ''' Same as DeltaTar but with specific gzip aes128 concat stream mode ''' MODE = '#gz' ENCRYPTION = ('some magic key', 1) MODE_COMPRESSES = True class DeltaTarAes128ConcatTest(DeltaTarTest): ''' Same as DeltaTar but with specific aes128 concat stream mode ''' MODE = '#' ENCRYPTION = ('some magic key', 1) class DeltaTarFilesystemHandlingTestBase(BaseTest): ''' Mess with filesystem APIs. ''' FSTEST = fsapi_access_true class DeltaTarFSGzipTest(DeltaTarFilesystemHandlingTestBase, DeltaTarGzipTest): pass class DeltaTarFSGzipConcatTest(DeltaTarFilesystemHandlingTestBase, DeltaTarGzipConcatTest): pass class DeltaTarFSAes128ConcatTest(DeltaTarFilesystemHandlingTestBase, DeltaTarAes128ConcatTest): pass class DeltaTarFSGzipAes128ConcatTest(DeltaTarFilesystemHandlingTestBase, DeltaTarGzipAes128ConcatTest): pass