helper.delete(upath)
helper.restore_directories_permissions()
- helper.apply_delayed_links()
index_it.release()
os.chdir(cwd)
helper.cleanup()
return j, l_no
-RECOVER_OK = 0
-RECOVER_NO = 1
-RECOVER_INTERDIR_MADE = 2
-
class RestoreHelper(object):
'''
Class used to help to restore files from indices
# tarfile.extractall for details.
_directories = []
- # collected symlinks to be restored at a later instant
- _delayed_symlinks= []
-
def __init__(self, deltatar, cwd, index_list=[], backup_path=False,
tarobj=None):
'''
data['tarobj'].close()
data['tarobj'] = None
- def apply_delayed_links(self):
- data = self._data[0]
- # only restore those links whose placeholder file hasn’t been removed
- # during subsequent extraction
- for member, path, set_attrs, st_dev, st_ino in self._delayed_symlinks:
- fullpath = os.path.join(path, member.name)
- try:
- st = os.stat(fullpath)
- except OSError:
- self._deltatar.logger.warning \
- ("Not restoring symlink %s from tarball: placeholder "
- "file was deleted during extraction" % fullpath)
- continue
- if st.st_dev != st_dev or st.st_ino != st_ino:
- self._deltatar.logger.warning \
- ("Not restoring symlink %s from tarball: placeholder "
- "file was modified during extraction" % fullpath)
- continue
- # at this point we’re certain we’re dealing with the placeholder we
- # created so we can remove it and create the actual symlink
- os.unlink(fullpath)
- data["tarobj"].extract(member, path, set_attrs=set_attrs)
-
def delete(self, path):
'''
Delete a file
# file might fail when trying to extract a multivolume member
index_data['tarobj'].volume_number = index_data['curr_vol_no']
- def create_placeholder_file (member, path, set_attrs, recover=RECOVER_OK):
- try:
- fullpath = os.path.join(path, member.name)
- fd = os.open(fullpath, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0)
- except FileExistsError as exn: # == EEXIST
- if recover != RECOVER_NO: # remove existing file and retry
- os.unlink(fullpath)
- return create_placeholder_file(member, path, set_attrs,
- recover=RECOVER_NO)
- raise exn # propagate error otherwise
- except FileNotFoundError as exn: # == ENOENT
- if recover == RECOVER_OK: # create interdir only once
- os.makedirs(path)
- return create_placeholder_file(member, path, set_attrs,
- recover=RECOVER_INTERDIR_MADE)
- st = os.fstat(fd)
- os.close(fd)
- return self._delayed_symlinks.append((member, path, set_attrs,
- # GNU tar also stores
- # st_birthtim[e] (via gnulib)
- # which is not available on
- # Linux
- st.st_dev, st.st_ino))
+ def ignore_symlink (member, *_args):
+ self._deltatar.logger.warning("Ignoring symlink %s" % member.name)
# finally, restore the file
- index_data['tarobj'].extract(member, symlink_cb=create_placeholder_file)
+ index_data['tarobj'].extract(member, symlink_cb=ignore_symlink)
def add_member_dir(self, member):
'''
DeprecationWarning, 2)
return stat.filemode(mode)
-def contains_dot_dot(name):
- """Check whether a path contains double dots thus referring to the parent
- directory.
- """
- l = len(name)
- if l < 2:
- return False
- p = 0
- while True:
- if name[p] == '.' and name[p + 1] == '.':
- ppp = p + 2
- if ppp == l or name[ppp] == '/':
- return True
- while name[p] != '/':
- if p == l - 2:
- return False
- p += 1
- if p == l - 2:
- break
- p += 1
- return False
-
class TarError(Exception):
"""Base exception."""
pass
``symlink_cb`` is a hook accepting a function that is passed the
``member``, ``path``, and ``set_attrs`` arguments if the tarinfo for
``member`` indicates a symlink in which case only the callback
- passed will be applied, skipping the actual extraction.
+ passed will be applied, skipping the actual extraction. In case the
+ callback is invoked, its return value is passed on to the caller.
"""
self._check("r")
if tarinfo.islnk():
tarinfo._link_target = os.path.join(path, tarinfo.linkname)
- if symlink_cb is not None and tarinfo.issym() \
- and (os.path.isabs(tarinfo.linkname)
- or contains_dot_dot(tarinfo.linkname)):
+ if symlink_cb is not None and tarinfo.issym():
return symlink_cb(member, path, set_attrs)
try:
from . import BaseTest
from . import new_volume_handler
-SYMLINK_GOOD = 0
-SYMLINK_BAD = 1
-
class DeltaTarTest(BaseTest):
"""
Test backups
def test_restore_with_symlinks(self):
'''
- Creates a full backup containing different varieties of symlinks. The
- malicious ones must be filtered out.
+ Creates a full backup containing different varieties of symlinks. All
+ of them must be filtered out.
'''
deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
# add symlinks to existing archive
- def add_symlink (a, kind, name, dst):
+ def add_symlink (a, name, dst):
l = tarfile.TarInfo("snapshot://%s" % name)
l.type = tarfile.SYMTYPE
l.linkname = dst
a.addfile(l)
- return (kind, name, dst)
+ return name
with tarfile.open(tar_path,mode="w") as a:
checkme = \
- [ add_symlink(a, SYMLINK_GOOD,
- "symlinks/foo", "internal-file")
- , add_symlink(a, SYMLINK_BAD,
- "symlinks/bar", "/absolute/path")
- , add_symlink(a, SYMLINK_BAD,
- "symlinks/baz", "../parent/../../paths") ]
+ [ add_symlink(a, "symlinks/foo", "internal-file")
+ , add_symlink(a, "symlinks/bar", "/absolute/path")
+ , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
deltatar.restore_backup(target_path="source_dir",
backup_tar_path=tar_path)
# check what happened to our symlinks
- for kind, source, dest in checkme:
- resolve = kind == SYMLINK_GOOD
- fullpath = os.path.join("source_dir", source)
- assert os.path.islink(fullpath)
- if resolve is True:
- try:
- linkname = os.readlink(fullpath)
- assert dest == linkname
- except OSError as exn:
- if exn.errno == errno.EINVAL:
- raise Exception("Extracted file “%s” [%s] is not"
- " a valid symlink!"
- % (str(source), str(os.stat(fullpath))))
+ for name in checkme:
+ fullpath = os.path.join("source_dir", name)
+ assert not os.path.exists(fullpath)
def test_restore_malicious_symlinks(self):
'''
f = tarfile.TarInfo("snapshot://%s" % name)
f.type = tarfile.REGTYPE
a.addfile(f)
- return a
testpath = "symlinks/pernicious-link"
- testdst = "symlinks/pernicious-link"
+ testdst = "/tmp/does/not/exist"
+
with tarfile.open(tar_path,mode="w") as a:
add_symlink(a, testpath, testdst)
+ add_symlink(a, testpath, testdst+"X")
+ add_symlink(a, testpath, testdst+"XXX")
add_file(a, testpath)
- dstinfo = \
- { "exists": os.path.exists(testdst)
- , "chksum": os.path.isfile(testdst) and self.md5sum(testdst) or None }
-
deltatar.restore_backup(target_path="source_dir",
backup_tar_path=tar_path)
- # check whether we got the link or the file
+ # check whether the link was extracted; deltatar seems to only ever
+ # retrieve the first item it finds for a given path which in the case
+ # at hand is a symlink to some non-existent path
fullpath = os.path.join("source_dir", testpath)
- # since tarfile unconditionally opens files with the flag combo
- # “O_WRONLY | O_CREAT | O_TRUNC” (like GNU tar without the -k flag) we
- # expect the link to be present, not the file from the archive; also,
- #the link target may not contain the contents of the dummy file
- assert os.path.islink(fullpath)
- dst = os.readlink(fullpath)
- assert os.path.exists(dst) == dstinfo["exists"]
- chksum = dstinfo.get("chksum")
- if chksum is not None:
- assert self.md5sum(dst) == chksum
+ assert not os.path.exists(fullpath)
class DeltaTar2Test(DeltaTarTest):
'''