deltatar: initial implementation of the diff restore engine
authorEduardo Robles Elvira <edulix@wadobo.com>
Tue, 6 Aug 2013 12:41:35 +0000 (14:41 +0200)
committerEduardo Robles Elvira <edulix@wadobo.com>
Tue, 6 Aug 2013 12:41:35 +0000 (14:41 +0200)
deltatar/deltatar.py
testing/test_deltatar.py

index c1fb58b..f5d5609 100644 (file)
@@ -362,7 +362,7 @@ class DeltaTar(object):
         keys = [u'gid', u'type', u'mode', u'mtime', u'size', u'inode',
                 u'ctime', u'uid']
 
-        if d1 is None and d2 is not None or d1 is not None and d2 is None:
+        if (not d1 and d2 != None) or (d1 != None and not d2):
             return False
 
         if self.prefixed(d1.get('path', -1)) != self.prefixed(d2.get('path', -2)):
@@ -644,7 +644,7 @@ class DeltaTar(object):
         dir_path_it = self.jsonize_path_iterator(dir_it)
 
         # for each file to be in the backup, do:
-        for ipath, dpath in self.collate_iterators(index_it, dir_path_it):
+        for ipath, dpath, l_no in self.collate_iterators(index_it, dir_path_it):
             action = None
             # if file is not in the index, it means it's a new file, so we have
             # to take a snapshot
@@ -781,18 +781,22 @@ class DeltaTar(object):
 
         It assumes that the items in both lists are ordered in the same way.
         '''
+        l_no = 0
         elem1, elem2 = None, None
         while True:
             if not elem1:
                 try:
-                    elem1 = it1.next()[0]
+                    elem1 = it1.next()
+                    l_no += 1
                     if isinstance(elem1, tuple):
                         elem1 = elem1[0]
                 except StopIteration:
                     if elem2:
-                        yield (None, elem2)
+                        yield (None, elem2, l_no)
                     for elem2 in it2:
-                        yield (None, elem2)
+                        if isinstance(elem2, tuple):
+                            elem2 = elem2[0]
+                        yield (None, elem2, l_no)
                     break
                 index1 = self.unprefixed(elem1['path'])
             if not elem2:
@@ -802,9 +806,11 @@ class DeltaTar(object):
                         elem2 = elem2[0]
                 except StopIteration:
                     if elem1:
-                        yield (elem1, None)
+                        yield (elem1, None, l_no)
                     for elem1 in it1:
-                        yield (elem1, None)
+                        if isinstance(elem1, tuple):
+                            elem1 = elem1[0]
+                        yield (elem1, None, l_no)
                     break
                 index2 = self.unprefixed(elem2['path'])
 
@@ -813,17 +819,17 @@ class DeltaTar(object):
                 # it means that there's a new parent directory in index2, so
                 # it goes first
                 if index1.count('/') > index2.count('/'):
-                    yield (None, elem2)
+                    yield (None, elem2, l_no)
                     elem2 = None
                 else:
-                    yield (elem1, None)
+                    yield (elem1, None, l_no)
                     elem1 = None
             elif index1 == index2:
-                yield (elem1, elem2)
+                yield (elem1, elem2, l_no)
                 elem1, elem2 = None, None
             else:
                 # index2 is less
-                yield (None, elem2)
+                yield (None, elem2, l_no)
                 elem2 = None
 
     def restore_backup(self, target_path, backup_indexes_paths=[],
@@ -855,8 +861,12 @@ class DeltaTar(object):
         if backup_indexes_paths is None and backup_tar_path == []:
             raise Exception("You have to either provide index paths or a tar path")
 
-        tar_mode = (backup_indexes_paths == [])
-        if tar_mode:
+        if len(backup_indexes_paths) == 0:
+            mode = "tar"
+        else:
+            mode = "diff"
+
+        if mode == "tar":
             if not isinstance(backup_tar_path, basestring):
                 raise Exception('Backup tar path must be a string')
 
@@ -891,21 +901,22 @@ class DeltaTar(object):
             os.makedirs(target_path)
 
         cwd = os.getcwd()
-        def new_volume_handler(deltarobj, cwd, backup_path, tarobj, base_name, volume_number):
-            '''
-            Handles the new volumes
-            '''
-            volume_name = deltarobj.volume_name_func(backup_path, True, volume_number)
-            volume_path = os.path.join(backup_path, volume_name)
-
-            # we convert relative paths into absolute because CWD is changed
-            if not os.path.isabs(volume_path):
-                volume_path = os.path.join(cwd, volume_path)
-            tarobj.open_volume(volume_path)
 
         # wraps some args from context into the handler
 
-        if tar_mode:
+        if mode == 'tar':
+            def new_volume_handler(deltarobj, cwd, backup_path, tarobj, base_name, volume_number):
+                '''
+                Handles the new volumes
+                '''
+                volume_name = deltarobj.volume_name_func(backup_path, True, volume_number)
+                volume_path = os.path.join(backup_path, volume_name)
+
+                # we convert relative paths into absolute because CWD is changed
+                if not os.path.isabs(volume_path):
+                    volume_path = os.path.join(cwd, volume_path)
+                tarobj.open_volume(volume_path)
+
             backup_path = os.path.dirname(backup_tar_path)
             new_volume_handler = partial(new_volume_handler, self, cwd, backup_path)
             tarobj = tarfile.TarFile.open(backup_tar_path,
@@ -936,85 +947,46 @@ class DeltaTar(object):
             tarobj.extractall(filter=filter)
             os.chdir(cwd)
             tarobj.close()
-        else:
-            # for now, we only consider one index
-            backup_index_path = backup_indexes_paths[0]
+        elif mode == "diff":
             os.chdir(target_path)
+            helper = RestoreHelper(self, cwd, backup_indexes_paths)
 
-            # make path absolute
-            if not os.path.isabs(backup_index_path):
-                backup_index_path = os.path.join(cwd, backup_index_path)
+            index_it = helper._data[0]['iterator']
+            dir_it = self._recursive_walk_dir('.')
+            dir_path_it = self.jsonize_path_iterator(dir_it)
 
-            # setup some vars
-            backup_path = os.path.dirname(backup_index_path)
-            new_volume_handler = partial(new_volume_handler, self, cwd, backup_path)
-
-            # some initialization:
-
-            # current volume number
-            curr_vol_no = None
-            # current volume file
-            vol_fd = None
-            offset = -1
-            tarobj = None
-
-            # iterate through the items to be restored
-            for j, l_no in self.iterate_index_path(backup_index_path):
-                op_type = j.get('type', '')
-                op_path  = j.get('path', '')
-                upath = self.unprefixed(op_path)
+            # for each file to be in the backup, do:
+            for ipath, dpath, l_no in self.collate_iterators(index_it, dir_path_it):
+                if not ipath:
+                    upath = dpath['path']
+                    op_type = dpath['type']
+                else:
+                    upath = self.unprefixed(ipath['path'])
+                    op_type = ipath['type']
 
                 # filter paths
+                # TODO: think about changes of type "dir converted to file" and
+                # how can that affect filtering op_type
                 if self.filter_path(upath, '.', op_type == 'directory') == NO_MATCH:
                     continue
 
-                # check volume number
-                vol_no = j.get('volume', -1)
-                if not isinstance(vol_no, int) or vol_no < 0:
-                    self.logger.warn('unrecognized type to be '
-                                     'restored: %s, line %d' % (op_type, l_no))
-
-                # setup the volume that needs to be read
-                if curr_vol_no != vol_no:
-                    curr_vol_no = vol_no
-                    vol_name = self.volume_name_func(backup_path, True, vol_no)
-                    vol_path = os.path.join(backup_path, vol_name)
-                    if vol_fd:
-                        vol_fd.close()
-                    vol_fd = open(vol_path, 'r')
-
-                    # force reopen of the tarobj because of new volume
-                    if tarobj:
-                        tarobj.close()
-                        tarobj = None
-
-                # seek tarfile if needed
-                offset = j.get('offset', -1)
-                if tarobj:
-                    member = tarobj.next()
-                    if member.path != op_path:
-                        # force a seek and reopen
-                        tarobj.close()
-                        tarobj = None
-
-                # open the tarfile if needed
-                if not tarobj:
-                    vol_fd.seek(offset)
-                    tarobj = tarfile.open(mode="r" + self.mode, fileobj=vol_fd,
-                                format=tarfile.GNU_FORMAT,
-                                concat_compression='#gz' in self.mode,
-                                password=self.password,
-                                new_volume_handler=new_volume_handler)
-                    member = tarobj.next()
+                # if file not found in dpath, we can directly restore from index
+                if not dpath:
+                    helper.restore(ipath, l_no)
+                    continue
+
+                # if both files are equal, we have nothing to restore
+                if self._equal_stat_dicts(ipath, dpath):
+                    continue
 
-                member.path = upath
-                member.name = upath
-                # finally, restore the file
-                tarobj.extract(member)
+                # we have to restore the file, but first we need to delete the
+                # current existing file
+                helper.delete(self.unprefixed(ipath['path']))
+                helper.restore(ipath, l_no)
 
             os.chdir(cwd)
-            if tarobj:
-                tarobj.close()
+            helper.cleanup()
+
 
     def _parse_json_line(self, f, l_no):
         '''
@@ -1028,3 +1000,195 @@ class DeltaTar(object):
             raise Exception("error parsing this json line "
                 "(line number %d): %s" % (l_no, l))
         return j, l_no
+
+class RestoreHelper(object):
+    '''
+    Class used to help to restore files from indices
+    '''
+
+    # holds the dicts of data
+    _data = []
+
+    _deltatar = None
+
+    _cwd = None
+
+    def __init__(self, deltatar, cwd, index_list):
+        '''
+        Constructor opens the tars and init the data structures.
+
+        Index list must be provided in reverse order (newer first)
+        '''
+        self._data = []
+        self._deltatar = deltatar
+        self._cwd = cwd
+
+        for index in index_list:
+            # make paths absolute to avoid cwd problems
+            if not os.path.isabs(index):
+                index = os.path.join(cwd, index)
+
+            s = dict(
+                curr_vol_no = None,
+                vol_fd = None,
+                offset = -1,
+                tarobj = None,
+                path = index,
+                iterator = deltatar.iterate_index_path(index),
+                new_volume_handler = partial(self.new_volume_handler,
+                                     self._deltatar, self._cwd, index)
+            )
+            self._data.append(s)
+
+    def cleanup(self):
+        '''
+        Closes all open files
+        '''
+        for data in self._data:
+            if data['tarobj']:
+                data['tarobj'].close()
+                data['tarobj'] = None
+            # TODO: ad a way to close the iterator fd
+            data['iterator']
+
+    def delete(self, path):
+        '''
+        Delete a file
+        '''
+        if os.path.isdir(path):
+            shutil.rmtree(path)
+        else:
+            os.unlink(path)
+
+    def restore(self, itpath, l_no):
+        '''
+        Restore the path from the appropiate backup. Receives the current path
+        from the first index iterator. itpath must be not null.
+        '''
+        data = self._data[0]
+        path = itpath['path']
+
+        # if path is found in the first index as to be deleted or snapshotted,
+        # deal with it and finish
+        if path.startswith('delete://'):
+            self.delete(self._deltatar.unprefixed(path))
+            return
+        elif path.startswith('snapshot://'):
+            self.restore_file(itpath, data, path, l_no, self._deltatar.unprefixed(path))
+            return
+
+        # we go from index to index, finding the path in the index, then finding
+        # the index with the most recent snapshot of the file being restored
+        cur_index = 1
+
+        while cur_index < len(self._data):
+            data = self._data[cur_index]
+            it = data['iterator']
+
+            # find the path in the index
+            d = None
+            l_no = None
+            dpath = None
+            while True:
+                try:
+                    d, l_no = it.next()
+                except StopIteration:
+                    break
+
+                dpath = self._data.unprefixed(d.get('path', ''))
+
+                if path == dpath:
+                    break
+
+            if not d:
+                # file not found, so it's not in the index, so it must be
+                # removed
+                if cur_index == 0:
+                    self.delete(path)
+                    return
+                # this means that the path was found in the first index but
+                # not in a previous one, so something wrong happened.
+                else:
+                    self._deltatar.logger.warn('Error restoring file %s from '
+                        'index, not found in index %s' % (path, data['path']))
+                    return
+
+            if d.get('path', '').startswith('delete://'):
+                self._deltatar.logger.warn(('Strange thing happened, file '
+                    '%s was listed in first index but deleted by another '
+                    'one. Path was ignored and untouched.') % path)
+                return
+            elif d.get('path', '').startswith('snapshot://'):
+                self.restore_file(d, data, path, l_no, dpath)
+                return
+            elif d.get('path', '').startswith('list://'):
+                continue
+
+        self._deltatar.logger.warn(('Error restoring file %s from index, '
+                                    'snapshot not found in any index') % path)
+
+    @classmethod
+    def new_volume_handler(deltarobj, cwd, backup_path, tarobj, base_name, volume_number):
+        '''
+        Handles the new volumes
+        '''
+        volume_name = deltarobj.volume_name_func(backup_path, True, volume_number)
+        volume_path = os.path.join(backup_path, volume_name)
+
+        # we convert relative paths into absolute because CWD is changed
+        if not os.path.isabs(volume_path):
+            volume_path = os.path.join(cwd, volume_path)
+        tarobj.open_volume(volume_path)
+
+    def restore_file(self, file_data, index_data, path, l_no, unprefixed):
+        '''
+        Restores a snapshot of a file from a specific backup
+        '''
+        vol_no = file_data.get('volume', -1)
+        op_type = file_data.get('type', -1)
+
+        # sanity check
+        if not isinstance(vol_no, int) or vol_no < 0:
+            self._deltatar.logger.warn('unrecognized type to be restored: '
+                                       '%s, line %d' % (op_type, l_no))
+
+        # setup the volume that needs to be read
+        if index_data['curr_vol_no'] != vol_no:
+            index_data['curr_vol_no'] = vol_no
+            backup_path = os.path.dirname(index_data['path'])
+            vol_name = self._deltatar.volume_name_func(backup_path, True, vol_no)
+            vol_path = os.path.join(backup_path, vol_name)
+            if index_data['vol_fd']:
+                index_data['vol_fd'].close()
+            index_data['vol_fd'] = open(vol_path, 'r')
+
+            # force reopen of the tarobj because of new volume
+            if index_data['tarobj']:
+                index_data['tarobj'].close()
+                index_data['tarobj'] = None
+
+        # seek tarfile if needed
+        offset = file_data.get('offset', -1)
+        if index_data['tarobj']:
+            member = index_data['tarobj'].next()
+            if member.path != index_data['tarobj']:
+                # force a seek and reopen
+                index_data['tarobj'].close()
+                index_data['tarobj'] = None
+
+        # open the tarfile if needed
+        if not index_data['tarobj']:
+            index_data['vol_fd'].seek(offset)
+            index_data['tarobj'] = tarfile.open(mode="r" + self._deltatar.mode,
+                fileobj=index_data['vol_fd'],
+                format=tarfile.GNU_FORMAT,
+                concat_compression='#gz' in self._deltatar.mode,
+                password=self._deltatar.password,
+                new_volume_handler=index_data['new_volume_handler'])
+
+            member = index_data['tarobj'].next()
+
+        member.path = unprefixed
+        member.name = unprefixed
+        # finally, restore the file
+        index_data['tarobj'].extract(member)
\ No newline at end of file
index 447eac3..8cc127d 100644 (file)
@@ -740,7 +740,7 @@ class DeltaTarTest(BaseTest):
         path_it = deltatar.jsonize_path_iterator(dir_it)
 
         try:
-            for path1, path2 in deltatar.collate_iterators(index_it, path_it):
+            for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
                 assert deltatar._equal_stat_dicts(path1, path2)
         finally:
             os.chdir(cwd)
@@ -773,7 +773,7 @@ class DeltaTarTest(BaseTest):
         path_it = deltatar.jsonize_path_iterator(dir_it)
 
         try:
-            for path1, path2 in deltatar.collate_iterators(index_it, path_it):
+            for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
                 if path2['path'] == './z':
                     assert not path1
                 else:
@@ -816,7 +816,7 @@ class DeltaTarTest(BaseTest):
         visited_pairs = []
 
         try:
-            for path1, path2 in deltatar.collate_iterators(index_it, path_it):
+            for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
                 visited_pairs.append(
                     (deltatar.unprefixed(path1['path']) if path1 else None,
                      path2['path'] if path2 else None)
@@ -862,7 +862,7 @@ class DeltaTarTest(BaseTest):
             n += 1
             assert i[0]['path'].startswith("list://")
 
-        assert n == 7
+        assert n == 6
 
         # check the tar file
         assert os.path.exists("backup_dir2")
@@ -958,25 +958,25 @@ class DeltaTarGzipTest(DeltaTarTest):
     MODE = ':gz'
 
 
-class DeltaTarBz2Test(DeltaTarTest):
-    '''
-    Same as DeltaTar but with specific bz2 mode
-    '''
-    MODE = ':bz2'
-
-
 class DeltaTarGzipStreamTest(DeltaTarTest):
     '''
     Same as DeltaTar but with specific gzip stream mode
     '''
     MODE = '|gz'
 
+# Commenting Bz2 tests, they are too slow..
+#class DeltaTarBz2Test(DeltaTarTest):
+    #'''
+    #Same as DeltaTar but with specific bz2 mode
+    #'''
+    #MODE = ':bz2'
 
-class DeltaTarBz2StreamTest(DeltaTarTest):
-    '''
-    Same as DeltaTar but with specific bz2 stream mode
-    '''
-    MODE = '|bz2'
+
+#class DeltaTarBz2StreamTest(DeltaTarTest):
+    #'''
+    #Same as DeltaTar but with specific bz2 stream mode
+    #'''
+    #MODE = '|bz2'
 
 
 class DeltaTarGzipConcatTest(DeltaTarTest):