break
data = json.loads(l.decode('UTF-8'))
if data.get('type', '') == 'file' and\
- deltatar.unprefixed(data['path']) == "./huge":
+ deltatar.unprefixed(data['path']) == "huge":
offset = data['offset']
break
'''
deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
logger=self.consoleLogger,
- included_files=["/test", "/small"],
- excluded_files=["/test/huge"])
+ included_files=["test", "small"],
+ excluded_files=["test/huge"])
# create first backup
deltatar.create_full_backup(
deltatar.restore_backup(target_path="source_dir",
backup_tar_path=tar_path)
- assert os.path.exists("./source_dir/small")
- assert os.path.exists("./source_dir/test")
- assert os.path.exists("./source_dir/test/huge2")
- assert os.path.exists("./source_dir/test/test2")
+ assert os.path.exists("source_dir/small")
+ assert os.path.exists("source_dir/test")
+ assert os.path.exists("source_dir/test/huge2")
+ assert os.path.exists("source_dir/test/test2")
- assert not os.path.exists("./source_dir/test/huge")
- assert not os.path.exists("./source_dir/big")
+ assert not os.path.exists("source_dir/test/huge")
+ assert not os.path.exists("source_dir/big")
def test_create_filter_func(self):
'''
filter_func = partial(filter_func, visited_paths)
deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
logger=self.consoleLogger,
- included_files=["/test", "/small"],
- excluded_files=["/test/huge"],
+ included_files=["test", "small"],
+ excluded_files=["test/huge"],
filter_func=filter_func)
# create first backup
deltatar.restore_backup(target_path="source_dir",
backup_tar_path=tar_path)
assert set(visited_paths) == set([
- '/small',
- '/test',
- '/test/huge2',
- '/test/test2'
+ 'small',
+ 'test',
+ 'test/huge2',
+ 'test/test2'
])
def test_create_filter_out_func(self):
filter_func = partial(filter_func, visited_paths)
deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
logger=self.consoleLogger,
- included_files=["/test", "/small"],
- excluded_files=["/test/huge"],
+ included_files=["test", "small"],
+ excluded_files=["test/huge"],
filter_func=filter_func)
# create first backup
deltatar.restore_backup(target_path="source_dir",
backup_tar_path=tar_path)
assert set(visited_paths) == set([
- '/small',
- '/test'
+ 'small',
+ 'test'
])
# check that effectively no file was backed up
- assert not os.path.exists("./source_dir/small")
- assert not os.path.exists("./source_dir/big")
- assert not os.path.exists("./source_dir/test")
+ assert not os.path.exists("source_dir/small")
+ assert not os.path.exists("source_dir/big")
+ assert not os.path.exists("source_dir/test")
def test_restore_index_basic_filtering(self):
'''
index_filename = deltatar.index_name_func(True)
index_path = os.path.join("backup_dir", index_filename)
- deltatar.included_files = ["/test", "/small"]
- deltatar.excluded_files = ["/test/huge"]
+ deltatar.included_files = ["test", "small"]
+ deltatar.excluded_files = ["test/huge"]
deltatar.restore_backup(target_path="source_dir",
backup_indexes_paths=[index_path])
- assert os.path.exists("./source_dir/small")
- assert os.path.exists("./source_dir/test")
- assert os.path.exists("./source_dir/test/huge2")
- assert os.path.exists("./source_dir/test/test2")
+ assert os.path.exists("source_dir/small")
+ assert os.path.exists("source_dir/test")
+ assert os.path.exists("source_dir/test/huge2")
+ assert os.path.exists("source_dir/test/test2")
- assert not os.path.exists("./source_dir/test/huge")
- assert not os.path.exists("./source_dir/big")
+ assert not os.path.exists("source_dir/test/huge")
+ assert not os.path.exists("source_dir/big")
def test_restore_index_filter_func(self):
'''
index_filename = deltatar.index_name_func(True)
index_path = os.path.join("backup_dir", index_filename)
- deltatar.included_files = ["/test", "/small"]
- deltatar.excluded_files = ["/test/huge"]
+ deltatar.included_files = ["test", "small"]
+ deltatar.excluded_files = ["test/huge"]
deltatar.filter_func = filter_func
deltatar.restore_backup(target_path="source_dir",
backup_indexes_paths=[index_path])
assert set(visited_paths) == set([
- '/small',
- '/test',
- '/test/huge2',
- '/test/test2'
+ 'small',
+ 'test',
+ 'test/huge2',
+ 'test/test2'
])
def test_restore_tar_basic_filtering(self):
assert os.path.exists("backup_dir")
shutil.rmtree("source_dir")
- deltatar.included_files = ["/test", "/small"]
- deltatar.excluded_files = ["/test/huge"]
+ deltatar.included_files = ["test", "small"]
+ deltatar.excluded_files = ["test/huge"]
tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
tar_path = os.path.join("backup_dir", tar_filename)
deltatar.restore_backup(target_path="source_dir",
backup_tar_path=tar_path)
- assert os.path.exists("./source_dir/small")
- assert os.path.exists("./source_dir/test")
- assert os.path.exists("./source_dir/test/huge2")
- assert os.path.exists("./source_dir/test/test2")
+ assert os.path.exists("source_dir/small")
+ assert os.path.exists("source_dir/test")
+ assert os.path.exists("source_dir/test/huge2")
+ assert os.path.exists("source_dir/test/test2")
- assert not os.path.exists("./source_dir/test/huge")
- assert not os.path.exists("./source_dir/big")
+ assert not os.path.exists("source_dir/test/huge")
+ assert not os.path.exists("source_dir/big")
def test_restore_tar_filter_func(self):
'''
index_filename = deltatar.index_name_func(True)
index_path = os.path.join("backup_dir", index_filename)
- deltatar.included_files = ["/test", "/small"]
- deltatar.excluded_files = ["/test/huge"]
+ deltatar.included_files = ["test", "small"]
+ deltatar.excluded_files = ["test/huge"]
deltatar.filter_func = filter_func
tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
deltatar.restore_backup(target_path="source_dir",
backup_tar_path=tar_path)
assert set(visited_paths) == set([
- '/small',
- '/test',
- '/test/huge2',
- '/test/test2'
+ 'small',
+ 'test',
+ 'test/huge2',
+ 'test/test2'
])
def test_filter_path_regexp(self):
expressions
'''
included_files = [
- re.compile('^/test/(hola|caracola/caracolero)(|/.*)$'),
- re.compile('^/yes$'),
- '/testing'
+ re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
+ re.compile('^yes$'),
+ 'testing'
]
excluded_files = [
- re.compile('^/testing/in_the'),
+ re.compile('^testing/in_the'),
]
deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
excluded_files=excluded_files)
# assert valid and invalid paths
- assert deltatar.filter_path('/test/hola')
- assert deltatar.filter_path('/test/hola/any/thing')
- assert deltatar.filter_path('/test/caracola/caracolero')
- assert deltatar.filter_path('/test/caracola/caracolero/yeah')
- assert deltatar.filter_path('/test/caracola/caracolero/whatever/aa')
- assert deltatar.filter_path('/yes')
- assert deltatar.filter_path('/testing')
- assert deltatar.filter_path('/testing/yes')
- assert deltatar.filter_path('/testing/in_th')
-
- assert not deltatar.filter_path('/something')
- assert not deltatar.filter_path('/other/thing')
- assert not deltatar.filter_path('/test_ing')
- assert not deltatar.filter_path('/test/hola_lala')
- assert not deltatar.filter_path('/test/agur')
- assert not deltatar.filter_path('/testing_something')
- assert not deltatar.filter_path('/yeso')
- assert not deltatar.filter_path('/yes/o')
- assert not deltatar.filter_path('/yes_o')
- assert not deltatar.filter_path('/testing/in_the')
- assert not deltatar.filter_path('/testing/in_the_field')
- assert not deltatar.filter_path('/testing/in_the/field')
+ assert deltatar.filter_path('test/hola')
+ assert deltatar.filter_path('test/hola/any/thing')
+ assert deltatar.filter_path('test/caracola/caracolero')
+ assert deltatar.filter_path('test/caracola/caracolero/yeah')
+ assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
+ assert deltatar.filter_path('yes')
+ assert deltatar.filter_path('testing')
+ assert deltatar.filter_path('testing/yes')
+ assert deltatar.filter_path('testing/in_th')
+
+ assert not deltatar.filter_path('something')
+ assert not deltatar.filter_path('other/thing')
+ assert not deltatar.filter_path('test_ing')
+ assert not deltatar.filter_path('test/hola_lala')
+ assert not deltatar.filter_path('test/agur')
+ assert not deltatar.filter_path('testing_something')
+ assert not deltatar.filter_path('yeso')
+ assert not deltatar.filter_path('yes/o')
+ assert not deltatar.filter_path('yes_o')
+ assert not deltatar.filter_path('testing/in_the')
+ assert not deltatar.filter_path('testing/in_the_field')
+ assert not deltatar.filter_path('testing/in_the/field')
def test_filter_path_parent(self):
'''
Test specifically the deltatar.filter_path function for parent matching
'''
included_files = [
- '/testing/path/to/some/thing'
+ 'testing/path/to/some/thing'
]
deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
# assert valid and invalid paths
- assert deltatar.filter_path('/testing', is_dir=True) == PARENT_MATCH
- assert deltatar.filter_path('/testing/path/', is_dir=True) == PARENT_MATCH
- assert deltatar.filter_path('/testing/path/to', is_dir=True) == PARENT_MATCH
- assert deltatar.filter_path('/testing/path/to/some', is_dir=True) == PARENT_MATCH
- assert deltatar.filter_path('/testing/path/to/some/thing') == MATCH
- assert deltatar.filter_path('/testing/path/to/some/thing/what&/ever') == MATCH
- assert deltatar.filter_path('/testing/something/else') == NO_MATCH
+ assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
+ assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
+ assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
+ assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
+ assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
+ assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
+ assert deltatar.filter_path('testing/something/else') == NO_MATCH
def test_parent_matching_simple_full_backup(self):
'''
Create a full backup using parent matching
'''
included_files = [
- '/test/huge2'
+ 'test/huge2'
]
deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
logger=self.consoleLogger,
Create a full backup and restores it using parent matching
'''
included_files = [
- '/test/huge2'
+ 'test/huge2'
]
deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
logger=self.consoleLogger)
Create a full backup and restores it using parent matching
'''
included_files = [
- '/test/huge2'
+ 'test/huge2'
]
deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
logger=self.consoleLogger)
try:
for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
- if path2['path'] == './z':
+ if path2['path'] == 'z':
assert not path1
else:
assert deltatar._equal_stat_dicts(path1, path2)
)
finally:
assert visited_pairs == [
- (u'./big', u'./big'),
- (None, u'./bigdir'),
- (u'./small', u'./small'),
- (u'./test', u'./test'),
- (None, u'./zzzz'),
- (None, u'./bigdir/a'),
- (None, u'./bigdir/b'),
- (u'./test/huge', u'./test/huge'),
- (u'./test/huge2', u'./test/huge2'),
- (u'./test/test2', u'./test/test2'),
+ (u'big', u'big'),
+ (None, u'bigdir'),
+ (u'small', u'small'),
+ (u'test', u'test'),
+ (None, u'zzzz'),
+ (None, u'bigdir/a'),
+ (None, u'bigdir/b'),
+ (u'test/huge', u'test/huge'),
+ (u'test/huge2', u'test/huge2'),
+ (u'test/test2', u'test/test2'),
]
os.chdir(cwd)
l = [i[0]['path'] for i in index_it]
assert l == [
- 'list://./big',
- 'snapshot://./bigdir',
- 'delete://./small',
- 'list://./test',
- 'snapshot://./zzzz',
- 'snapshot://./bigdir/a',
- 'snapshot://./bigdir/b',
- 'list://./test/huge',
- 'list://./test/huge2',
- 'list://./test/test2',
+ 'list://big',
+ 'snapshot://bigdir',
+ 'delete://small',
+ 'list://test',
+ 'snapshot://zzzz',
+ 'snapshot://bigdir/a',
+ 'snapshot://bigdir/b',
+ 'list://test/huge',
+ 'list://test/huge2',
+ 'list://test/test2',
]
# check the tar file
compare the two directories source_dir and target_dir and check
# they are the same
'''
- source_it = deltatar._recursive_walk_dir(path1)
+ source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
- target_it = deltatar._recursive_walk_dir(path2)
+ target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
while True:
try: