from . import tarfile
-
class NullHandler(logging.Handler):
def emit(self, record):
pass
logging.getLogger("deltatar.DeltaTar").addHandler(NullHandler())
+
+# match mode
+NO_MATCH = False
+MATCH = True
+PARENT_MATCH = 2
+
class DeltaTar(object):
'''
Backup class used to create backups
return "%s-%s-%03d.tar%s" % (prefix, date_str, volume_number + 1, extension)
- def filter_path(self, path, source_path=""):
+ def filter_path(self, path, source_path="", is_dir=None):
'''
Filters a path, given the source_path, using the filtering properties
set in the constructor.
2. excluded_files
3. filter_func (which must return whether the file is accepted or not)
'''
-
if len(source_path) > 0:
path = path[len(source_path):]
# 1. filter included_files
+ match = MATCH
if len(self.included_files) > 0:
- matches = False
+ match = NO_MATCH
for i in self.included_files:
# it can be either a regexp or a string
if isinstance(i, basestring):
# if the string matches, then continue
if i == path:
- matches = True
+ match = MATCH
break
# if the string ends with / it's a directory, and if the
# path does not start with the directory, then it's not
# included
if i.endswith('/') and path.startswith(i):
- matches = True
+ match = MATCH
break
# if the string doesn't end with /, add it and do the same
# check
elif path.startswith(i + '/'):
- matches = True
+ match = MATCH
break
+ # check for PARENT_MATCH
+ if is_dir:
+ dir_path = path
+ if not dir_path.endswith('/'):
+ dir_path += '/'
+
+ if i.startswith(dir_path):
+ match = PARENT_MATCH
+
# if it's a reg exp, then we just check if it matches
elif isinstance(i, re._pattern_type):
if i.match(path):
- matches = True
+ match = MATCH
break
else:
self.logger.warn('Invalid pattern in included_files: %s' % str(i))
- if not matches:
- return False
+ if match == NO_MATCH:
+ return NO_MATCH
+ # when a directory is in PARENT_MATCH, it doesn't matter if it's
+ # excluded. It's subfiles will be excluded, but the directory itself
+ # won't
+ if match != PARENT_MATCH:
for e in self.excluded_files:
# it can be either a regexp or a string
if isinstance(e, basestring):
# if the string matches, then exclude
if e == path:
- return False
+ return NO_MATCH
# if the string ends with / it's a directory, and if the
# path starts with the directory, then exclude
if e.endswith('/') and path.startswith(e):
- return False
+ return NO_MATCH
# if the string doesn't end with /, do the same check with
# the slash added
elif path.startswith(e + '/'):
- return False
+ return NO_MATCH
# if it's a reg exp, then we just check if it matches
elif isinstance(e, re._pattern_type):
if e.match(path):
- return False
+ return NO_MATCH
else:
self.logger.warn('Invalid pattern in excluded_files: %s' % str(e))
if self.filter_func:
return self.filter_func(path)
- return True
+ return match
def _recursive_walk_dir(self, source_path):
'''
Walk a directory recursively, yielding each file/directory
-
- TODO: do filtering with self.included_files etc
'''
def walk_dir(dir_path):
'''
for filename in os.listdir(dir_path):
file_path = os.path.join(dir_path, filename)
- if not self.filter_path(file_path, source_path):
+ is_dir = os.path.isdir(file_path)
+ if self.filter_path(file_path, source_path, is_dir) == NO_MATCH:
continue
if not os.access(file_path, os.R_OK):
self.logger.warn('Error accessing possibly locked file %s' % file_path)
while diryield_stack:
try:
cur_path = diryield_stack[-1].next()
+ is_dir = os.path.isdir(cur_path)
+ status = self.filter_path(cur_path, source_path, is_dir)
except StopIteration:
diryield_stack.pop()
if delayed_path_stack:
delayed_path_stack.pop()
continue
- if delayed_path_stack:
- for delayed_path in delayed_path_stack:
- if not self.filter_path(delayed_path, source_path):
- continue
- yield delayed_path
- del delayed_path_stack[:]
+ if status == MATCH:
+ if delayed_path_stack:
+ for delayed_path in delayed_path_stack:
+ is_dir = os.path.isdir(delayed_path)
+ if self.filter_path(delayed_path, source_path, is_dir) == NO_MATCH:
+ continue
+ yield delayed_path
+ del delayed_path_stack[:]
- if not self.filter_path(cur_path, source_path):
- continue
+ yield cur_path
- yield cur_path
+ if os.path.isdir(cur_path):
+ diryield_stack.append(walk_dir(cur_path))
- if os.path.isdir(cur_path):
+ elif status == PARENT_MATCH:
+ delayed_path_stack.append(cur_path)
diryield_stack.append(walk_dir(cur_path))
def _stat_dict(self, path):
new_volume_handler=new_volume_handler)
os.chdir(target_path)
- def filter(cls, path):
- return cls.filter_path(path, '.')
+ def filter(cls, tarinfo):
+ return cls.filter_path(tarinfo.path, '.', tarinfo.isdir()) != NO_MATCH
filter = partial(filter, self)
tarobj.extractall(filter=filter)
# filtering paths
op_path = j.get('path', '')
- if not self.filter_path(op_path, '.'):
+ if self.filter_path(op_path, '.', op_type == 'directory') == NO_MATCH:
continue
vol_no = j.get('volume', -1)
if self.volume_number > 0 and tarinfo.ismultivol():
continue
- if filter and not filter(tarinfo.path):
+ if filter and not filter(tarinfo):
continue
if tarinfo.isdir():
from functools import partial
from deltatar.tarfile import TarFile, GNU_FORMAT
-from deltatar.deltatar import DeltaTar
+from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
import filesplit
from . import BaseTest
'/test/test2'
])
- def test_deltatar_filter_path_regexp(self):
+ def test_filter_path_regexp(self):
'''
Test specifically the deltatar.filter_path function with regular
expressions
assert not deltatar.filter_path('/testing/in_the_field')
assert not deltatar.filter_path('/testing/in_the/field')
+ def test_filter_path_parent(self):
+ '''
+ Test specifically the deltatar.filter_path function for parent matching
+ '''
+ included_files = [
+ '/testing/path/to/some/thing'
+ ]
+ deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
+
+ # assert valid and invalid paths
+ assert deltatar.filter_path('/testing', is_dir=True) == PARENT_MATCH
+ assert deltatar.filter_path('/testing/path/', is_dir=True) == PARENT_MATCH
+ assert deltatar.filter_path('/testing/path/to', is_dir=True) == PARENT_MATCH
+ assert deltatar.filter_path('/testing/path/to/some', is_dir=True) == PARENT_MATCH
+ assert deltatar.filter_path('/testing/path/to/some/thing') == MATCH
+ assert deltatar.filter_path('/testing/path/to/some/thing/what&/ever') == MATCH
+ assert deltatar.filter_path('/testing/something/else') == NO_MATCH
+
+ def test_parent_matching_simple_full_backup(self):
+ '''
+ Create a full backup using parent matching
+ '''
+ included_files = [
+ '/test/huge2'
+ ]
+ deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
+ logger=self.consoleLogger,
+ included_files=included_files)
+
+ # create first backup
+ deltatar.create_full_backup(
+ source_path="source_dir",
+ backup_path="backup_dir")
+
+ assert os.path.exists("backup_dir")
+ shutil.rmtree("source_dir")
+
+ tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
+ tar_path = os.path.join("backup_dir", tar_filename)
+
+ deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
+ logger=self.consoleLogger)
+ deltatar.restore_backup(target_path="source_dir",
+ backup_tar_path=tar_path)
+
+ assert os.path.exists('source_dir/test/huge2')
+ assert os.path.exists('source_dir/test/')
+ assert not os.path.exists('source_dir/test/huge')
+ assert not os.path.exists('source_dir/big')
+ assert not os.path.exists('source_dir/small')
+
+ def test_parent_matching_simple_full_backup_restore(self):
+ '''
+ Create a full backup and restores it using parent matching
+ '''
+ included_files = [
+ '/test/huge2'
+ ]
+ deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
+ logger=self.consoleLogger)
+
+ # create first backup
+ deltatar.create_full_backup(
+ source_path="source_dir",
+ backup_path="backup_dir")
+
+ assert os.path.exists("backup_dir")
+ shutil.rmtree("source_dir")
+
+ tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
+ tar_path = os.path.join("backup_dir", tar_filename)
+
+ deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
+ logger=self.consoleLogger,
+ included_files=included_files)
+ deltatar.restore_backup(target_path="source_dir",
+ backup_tar_path=tar_path)
+
+ assert os.path.exists('source_dir/test/huge2')
+ assert os.path.exists('source_dir/test/')
+ assert not os.path.exists('source_dir/test/huge')
+ assert not os.path.exists('source_dir/big')
+ assert not os.path.exists('source_dir/small')
+
+ def test_parent_matching_index_full_backup_restore(self):
+ '''
+ Create a full backup and restores it using parent matching
+ '''
+ included_files = [
+ '/test/huge2'
+ ]
+ deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
+ logger=self.consoleLogger)
+
+ # create first backup
+ deltatar.create_full_backup(
+ source_path="source_dir",
+ backup_path="backup_dir")
+
+ assert os.path.exists("backup_dir")
+ shutil.rmtree("source_dir")
+
+ tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
+ tar_path = os.path.join("backup_dir", tar_filename)
+
+ deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
+ logger=self.consoleLogger,
+ included_files=included_files)
+ deltatar.restore_backup(target_path="source_dir",
+ backup_tar_path=tar_path)
+
+ assert os.path.exists('source_dir/test/huge2')
+ assert os.path.exists('source_dir/test/')
+ assert not os.path.exists('source_dir/test/huge')
+ assert not os.path.exists('source_dir/big')
+ assert not os.path.exists('source_dir/small')
+
class DeltaTar2Test(DeltaTarTest):
'''