return False
filter = partial(filter, self, list_func)
- tarobj.extractall(filter=filter)
+ tarobj.extractall(filter=filter, unlink=True)
tarobj.close()
def restore_backup(self, target_path, backup_indexes_paths=[],
self._deltatar.logger.warning("Ignoring symlink %s" % member.name)
# finally, restore the file
- index_data['tarobj'].extract(member, symlink_cb=ignore_symlink)
+ index_data['tarobj'].extract(member, symlink_cb=ignore_symlink,
+ unlink=True)
def add_member_dir(self, member):
'''
self.closed = True
raise
- def extractall(self, path=".", members=None, filter=None):
+ def extractall(self, path=".", members=None, filter=None, unlink=False):
"""Extract all members from the archive to the current working
directory and set owner, modification time and permissions on
directories afterwards. `path' specifies a different directory
tarinfo = copy.copy(tarinfo)
tarinfo.mode = 0o0700
# Do not set_attrs directories, as we will do that further down
- self.extract(tarinfo, path, set_attrs=not tarinfo.isdir())
+ self.extract(tarinfo, path, set_attrs=not tarinfo.isdir(), unlink=unlink)
# Reverse sort directories.
directories.sort(key=lambda a: a.name)
else:
self._dbg(1, "tarfile: %s" % e)
- def extract(self, member, path="", set_attrs=True, symlink_cb=None):
+ def extract(self, member, path="", set_attrs=True, symlink_cb=None,
+ unlink=False):
"""Extract a member from the archive to the current working directory,
using its full name. Its file information is extracted as accurately
as possible. `member' may be a filename or a TarInfo object. You can
try:
self._extract_member(tarinfo, os.path.join(path, tarinfo.name),
- set_attrs=set_attrs)
+ set_attrs=set_attrs, unlink=unlink)
except EnvironmentError as e:
if self.errorlevel > 0:
raise
# blkdev, etc.), return None instead of a file object.
return None
- def _extract_member(self, tarinfo, targetpath, set_attrs=True):
+ def _extract_member(self, tarinfo, targetpath, set_attrs=True, unlink=False):
"""Extract the TarInfo object tarinfo to a physical
file called targetpath.
"""
else:
self._dbg(1, tarinfo.name)
+ if unlink is True:
+ _unlinkfirst(targetpath)
+
if tarinfo.isreg():
self.makefile(tarinfo, targetpath)
elif tarinfo.isdir():
if not self._extfileobj:
self.fileobj.close()
self.closed = True
+
+def _unlinkfirst(targetpath):
+ try:
+ os.unlink(targetpath)
+ except OSError as e:
+ if e.errno == errno.ENOENT or e.errno == errno.EISDIR:
+ pass
+
+
# class TarFile
class TarIter:
fullpath = os.path.join("source_dir", name)
assert not os.path.exists(fullpath)
+
def test_restore_malicious_symlinks(self):
'''
Creates a full backup containing a symlink and a file of the same name.
# retrieve the first item it finds for a given path which in the case
# at hand is a symlink to some non-existent path
fullpath = os.path.join("source_dir", testpath)
- assert not os.path.exists(fullpath)
+ assert not os.path.lexists(fullpath)
+
+
+class TarfileTest(BaseTest):
+ pwd = None
+
+ def setUp(self):
+ self.pwd = os.getcwd()
+ os.makedirs("backup_dir", exist_ok=True)
+
+ def tearDown(self):
+ '''
+ Remove temporary files created by unit tests and restore the API
+ functions in *os*.
+ '''
+ os.chdir(self.pwd)
+ shutil.rmtree("backup_dir")
+
+ def test_extract_malicious_symlinks_unlink(self):
+ '''
+ Test symlink mitigation: The destination must be deleted prior to
+ extraction.
+ '''
+ tar_path = os.path.join("backup_dir", "malicious-archive")
+
+ # add symlinks to existing archive
+
+ def add_symlink (a, name, dst):
+ l = tarfile.TarInfo(name)
+ l.type = tarfile.SYMTYPE
+ l.linkname = dst
+ a.addfile(l)
+
+ def add_file (a, name):
+ f = tarfile.TarInfo(name)
+ f.type = tarfile.REGTYPE
+ a.addfile(f)
+
+ # Add a symlink pointing to must-not-exist, then append a file
+ # object at the same path. The file must not end up at
+ # “must-not-exist” (the pointee) but at “not-as-symlink” (the
+ # pointer) that was unlinked prior to extraction.
+ testpath = "test/not-a-symlink"
+ testdst = "must-not-exist"
+
+ try:
+ with tarfile.open(tar_path, mode="w") as a:
+ add_symlink(a, testpath, testdst)
+ add_file(a, testpath)
+ except tarfile.ReadError as e:
+ if self.MODE == '#' or self.MODE.endswith ("gz"):
+ pass
+ else:
+ raise
+ except ValueError as e:
+ if self.MODE.startswith ('#'):
+ pass # O_APPEND of concat archives not feasible
+ else:
+ raise
+
+ def test_extract(dst, unlink):
+ with tarfile.open(tar_path, mode="r") as a:
+ os.makedirs(dst, exist_ok=True)
+ olddir = os.getcwd()
+ try:
+ os.chdir(dst)
+ a.extractall(unlink=unlink)
+ finally:
+ os.chdir(olddir)
+
+ fullpath = os.path.join(dst, testpath)
+ fulldst = os.path.join(dst, "test/%s" % testdst)
+
+ if unlink is True:
+ # Check whether the file was extracted. The object at the
+ # symlink location (source) must be the file. The must not
+ # be an object at the symlink destination.
+ assert not os.path.islink(fullpath)
+ assert not os.path.exists(fulldst)
+ else:
+ # Without unlink protection, the file must be found at the
+ # symlink destination with the symlink intact.
+ assert os.path.islink(fullpath)
+ assert os.path.exists(fulldst)
+
+
+ test_extract("test_dst_unlinked" , True)
+ test_extract("test_dst_symlinked", False)
def fsapi_access_true (self):