'''
Return if the dicts are equal in the stat keys
'''
- keys = [u'gid', u'type', u'mode',u'size', u'uid', u'mtime', u'ctime',
+ keys = [u'gid', u'type', u'mode',u'size', u'uid', u'mtime',
# TODO: check how to restore this correctly if possible
- # u'inode'
+ # u'inode', u'ctime'
]
if (not d1 and d2 != None) or (d1 != None and not d2):
return False
- if not listsnapshot_equal:
- if self.prefixed(d1.get('path', -1)) != self.prefixed(d2.get('path', -2)):
- return False
- else:
- if self.prefixed(d1.get('path', -1)) != self.prefixed(d2.get('path', -2)):
- return False
+ if self.prefixed(d1.get('path', -1), listsnapshot_equal) != self.prefixed(d2.get('path', -2), listsnapshot_equal):
+ return False
for key in keys:
if d1.get(key, -1) != d2.get(key, -2):
import os
import re
+import random
import shutil
import logging
import binascii
'''
Create base test data
'''
- os.system('rm -rf target_dir source_dir source_dir2 backup_dir backup_dir2 huge')
+ os.system('rm -rf target_dir source_dir* backup_dir* huge')
os.makedirs('source_dir/test/test2')
self.hash = dict()
self.hash["source_dir/test/test2"] = ''
'''
Remove temporal files created by unit tests
'''
- os.system("rm -rf source_dir target_dir source_dir2 backup_dir backup_dir2 huge")
+ os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
def test_restore_simple_full_backup(self):
'''
# then compare the two directories source_dir and target_dir and check
# they are the same
- source_it = deltatar._recursive_walk_dir('source_dir')
- source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
- target_it = deltatar._recursive_walk_dir('target_dir')
- target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
- while True:
- try:
- sitem = source_it.next()
- titem = target_it.next()
- except StopIteration:
- try:
- titem = target_it.next()
- raise Exception("iterators do not stop at the same time")
- except StopIteration:
- break
- assert deltatar._equal_stat_dicts(sitem, titem)
+ self.check_equal_dirs('source_dir', 'target_dir', deltatar)
def test_restore_from_index_diff_backup2(self):
'''
# then compare the two directories source_dir and target_dir and check
# they are the same
- source_it = deltatar._recursive_walk_dir('source_dir')
+ self.check_equal_dirs('source_dir', 'target_dir', deltatar)
+
+ def test_restore_from_index_diff_backup3(self):
+ '''
+ Creates a full backup of .git, modifies some random files, creates a
+ diff backup, then restores the diff backup with the full backup as a
+ starting point.
+ '''
+ # this test only works for uncompressed or concat compressed modes
+ if self.MODE.startswith(':') or self.MODE.startswith('|'):
+ return
+
+ deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
+ logger=self.consoleLogger)
+
+ shutil.rmtree("source_dir")
+ shutil.copytree(".git", "source_dir")
+ shutil.copytree(".git", "source_dir_diff")
+
+ # create first backup
+ deltatar.create_full_backup(
+ source_path="source_dir",
+ backup_path="backup_dir")
+
+ prev_index_filename = deltatar.index_name_func(is_full=True)
+ prev_index_path = os.path.join("backup_dir", prev_index_filename)
+
+ # alter the source_dir randomly
+ source_it = deltatar._recursive_walk_dir('source_dir_diff')
+
+ for path in source_it:
+ # if path doesn't exist (might have previously removed) ignore it.
+ # also ignore it (i.e. do not change it) 70% of the time
+ if not os.path.exists(path) or random.random() < 0.7:
+ continue
+
+ # remove the file
+ if os.path.isdir(path):
+ shutil.rmtree(path)
+ else:
+ os.unlink(path)
+
+ try:
+ deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
+ prev_index_path)
+ except Exception, e:
+ import ipdb; ipdb.set_trace()
+
+ # first restore initial backup in target_dir
+ tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
+ tar_path = os.path.join("backup_dir", tar_filename)
+ deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
+
+ # and check that target_dir equals to source_dir (which is the same as
+ # ".git" initially)
+ self.check_equal_dirs('source_dir', 'target_dir', deltatar)
+
+ # then apply diff backup in target_dir
+ index_filename = deltatar.index_name_func(is_full=True)
+ index_path = os.path.join("backup_dir2", index_filename)
+ deltatar.restore_backup("target_dir",
+ backup_indexes_paths=[index_path, prev_index_path])
+
+ # and check that target_dir equals to source_dir_diff (the randomly
+ # altered ".git" directory)
+ self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
+
+ # then delete target_dir and apply diff backup from zero and check again
+ shutil.rmtree("target_dir")
+ deltatar.restore_backup("target_dir",
+ backup_indexes_paths=[index_path, prev_index_path])
+
+ # and check that target_dir equals to source_dir_diff (the randomly
+ # altered ".git" directory)
+ self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
+
+ def check_equal_dirs(self, path1, path2, deltatar):
+ '''
+ compare the two directories source_dir and target_dir and check
+ # they are the same
+ '''
+ source_it = deltatar._recursive_walk_dir(path1)
source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
- target_it = deltatar._recursive_walk_dir('target_dir')
+ target_it = deltatar._recursive_walk_dir(path2)
target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
while True:
try:
break
assert deltatar._equal_stat_dicts(sitem, titem)
-
class DeltaTar2Test(DeltaTarTest):
'''
Same as DeltaTar but with specific ":" mode