def apply_delayed_links(self):
data = self._data[0]
- for member, path, set_attrs in self._delayed_symlinks:
+ # only restore those links whose placeholder file hasn’t been removed
+ # during subsequent extraction
+ for member, path, set_attrs, st_dev, st_ino in self._delayed_symlinks:
+ fullpath = os.path.join(path, member.name)
+ try:
+ st = os.stat(fullpath)
+ except OSError:
+ self._deltatar.logger.warning \
+ ("Not restoring symlink %s from tarball: placeholder "
+ "file was deleted during extraction")
+ continue
+ if st.st_dev != st_dev or st.st_ino != st_ino:
+ self._deltatar.logger.warning \
+ ("Not restoring symlink %s from tarball: placeholder "
+ "file was modified during extraction")
+ continue
+ # at this point we’re certain we’re dealing with the placeholder we
+ # created so we can remove it and create the actual symlink
+ os.unlink(fullpath)
data["tarobj"].extract(member, path, set_attrs=set_attrs)
def delete(self, path):
# file might fail when trying to extract a multivolume member
index_data['tarobj'].volume_number = index_data['curr_vol_no']
- def create_placeholder_file (tarinfo, path, set_attrs, recover=RECOVER_OK):
+ def create_placeholder_file (member, path, set_attrs, recover=RECOVER_OK):
try:
- fullpath = os.path.join(path, tarinfo.name)
+ fullpath = os.path.join(path, member.name)
fd = os.open(fullpath, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0)
except FileExistsError as exn: # == EEXIST
if recover != RECOVER_NO: # remove existing file and retry
os.unlink(fullpath)
- return create_placeholder_file(tarinfo, path, set_attrs,
+ return create_placeholder_file(member, path, set_attrs,
recover=RECOVER_NO)
raise exn # propagate error otherwise
except FileNotFoundError as exn: # == ENOENT
if recover == RECOVER_OK: # create interdir only once
os.makedirs(path)
- return create_placeholder_file(tarinfo, path, set_attrs,
+ return create_placeholder_file(member, path, set_attrs,
recover=RECOVER_INTERDIR_MADE)
+ st = os.fstat(fd)
os.close(fd)
- return self._delayed_symlinks.append((member, path, set_attrs))
+ return self._delayed_symlinks.append((member, path, set_attrs,
+ # GNU tar also stores
+ # st_birthtim[e] (via gnulib)
+ # which is not available on
+ # Linux
+ st.st_dev, st.st_ino))
# finally, restore the file
index_data['tarobj'].extract(member, symlink_cb=create_placeholder_file)
if symlink_cb is not None and tarinfo.issym() \
and (os.path.isabs(tarinfo.linkname)
or contains_dot_dot(tarinfo.linkname)):
- return symlink_cb(tarinfo, path, set_attrs)
+ return symlink_cb(member, path, set_attrs)
try:
self._extract_member(tarinfo, os.path.join(path, tarinfo.name),
from functools import partial
from unittest import skip, SkipTest
-from deltatar.tarfile import TarFile, GNU_FORMAT
+import deltatar.tarfile as tarfile
+from tarfile import TarFile
from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
import filesplit
with open("source_dir/symlinks/whatever") as b:
assert a.read() == b.read()
+ def test_restore_with_symlinks(self):
+ '''
+ Creates a full backup containing different varieties of symlinks. The
+ malicious ones must be filtered out.
+ '''
+
+ deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
+ logger=self.consoleLogger)
+
+ # create first backup
+ deltatar.create_full_backup(source_path="source_dir",
+ backup_path="backup_dir")
+
+ assert os.path.exists("backup_dir")
+ shutil.rmtree("source_dir")
+
+ tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
+ tar_path = os.path.join("backup_dir", tar_filename)
+
+ # add symlinks to existing archive
+
+ def add_symlink (a, kind, name, dst):
+ l = tarfile.TarInfo("snapshot://%s" % name)
+ l.type = tarfile.SYMTYPE
+ l.linkname = dst
+ a.addfile(l)
+ return (kind, name, dst)
+
+ with tarfile.open(tar_path,mode="w") as a:
+ checkme = \
+ [ add_symlink(a, SYMLINK_GOOD,
+ "symlinks/foo", "internal-file")
+ , add_symlink(a, SYMLINK_BAD,
+ "symlinks/bar", "/absolute/path")
+ , add_symlink(a, SYMLINK_BAD,
+ "symlinks/baz", "../parent/../../paths") ]
+
+ deltatar.restore_backup(target_path="source_dir",
+ backup_tar_path=tar_path)
+
+ # check what happened to our symlinks
+ for kind, source, dest in checkme:
+ resolve = kind == SYMLINK_GOOD
+ fullpath = os.path.join("source_dir", source)
+ assert os.path.islink(fullpath)
+ if resolve is True:
+ try:
+ linkname = os.readlink(fullpath)
+ assert dest == linkname
+ except OSError as exn:
+ if exn.errno == errno.EINVAL:
+ raise Exception("Extracted file “%s” [%s] is not"
+ " a valid symlink!"
+ % (str(source), str(os.stat(fullpath))))
class DeltaTar2Test(DeltaTarTest):
'''