Merge branch 'tarfile-unlink'
authorThomas Jarosch <thomas.jarosch@intra2net.com>
Tue, 4 Feb 2020 13:15:33 +0000 (14:15 +0100)
committerThomas Jarosch <thomas.jarosch@intra2net.com>
Tue, 4 Feb 2020 13:15:33 +0000 (14:15 +0100)
deltatar/deltatar.py
deltatar/tarfile.py
testing/test_deltatar.py

index 460d274..71bc904 100644 (file)
@@ -1348,7 +1348,7 @@ class DeltaTar(object):
             return False
         filter = partial(filter, self, list_func)
 
-        tarobj.extractall(filter=filter)
+        tarobj.extractall(filter=filter, unlink=True)
         tarobj.close()
 
     def restore_backup(self, target_path, backup_indexes_paths=[],
@@ -2014,7 +2014,8 @@ class RestoreHelper(object):
             self._deltatar.logger.warning("Ignoring symlink %s" % member.name)
 
         # finally, restore the file
-        index_data['tarobj'].extract(member, symlink_cb=ignore_symlink)
+        index_data['tarobj'].extract(member, symlink_cb=ignore_symlink,
+                                     unlink=True)
 
     def add_member_dir(self, member):
         '''
index b4b784a..325feb8 100644 (file)
@@ -2841,7 +2841,7 @@ class TarFile(object):
             self.closed = True
             raise
 
-    def extractall(self, path=".", members=None, filter=None):
+    def extractall(self, path=".", members=None, filter=None, unlink=False):
         """Extract all members from the archive to the current working
            directory and set owner, modification time and permissions on
            directories afterwards. `path' specifies a different directory
@@ -2866,7 +2866,7 @@ class TarFile(object):
                 tarinfo = copy.copy(tarinfo)
                 tarinfo.mode = 0o0700
             # Do not set_attrs directories, as we will do that further down
-            self.extract(tarinfo, path, set_attrs=not tarinfo.isdir())
+            self.extract(tarinfo, path, set_attrs=not tarinfo.isdir(), unlink=unlink)
 
         # Reverse sort directories.
         directories.sort(key=lambda a: a.name)
@@ -2885,7 +2885,8 @@ class TarFile(object):
                 else:
                     self._dbg(1, "tarfile: %s" % e)
 
-    def extract(self, member, path="", set_attrs=True, symlink_cb=None):
+    def extract(self, member, path="", set_attrs=True, symlink_cb=None,
+                unlink=False):
         """Extract a member from the archive to the current working directory,
            using its full name. Its file information is extracted as accurately
            as possible. `member' may be a filename or a TarInfo object. You can
@@ -2913,7 +2914,7 @@ class TarFile(object):
 
         try:
             self._extract_member(tarinfo, os.path.join(path, tarinfo.name),
-                                 set_attrs=set_attrs)
+                                 set_attrs=set_attrs, unlink=unlink)
         except EnvironmentError as e:
             if self.errorlevel > 0:
                 raise
@@ -2961,7 +2962,7 @@ class TarFile(object):
             # blkdev, etc.), return None instead of a file object.
             return None
 
-    def _extract_member(self, tarinfo, targetpath, set_attrs=True):
+    def _extract_member(self, tarinfo, targetpath, set_attrs=True, unlink=False):
         """Extract the TarInfo object tarinfo to a physical
            file called targetpath.
         """
@@ -2983,6 +2984,9 @@ class TarFile(object):
         else:
             self._dbg(1, tarinfo.name)
 
+        if unlink is True:
+            _unlinkfirst(targetpath)
+
         if tarinfo.isreg():
             self.makefile(tarinfo, targetpath)
         elif tarinfo.isdir():
@@ -3295,6 +3299,15 @@ class TarFile(object):
             if not self._extfileobj:
                 self.fileobj.close()
             self.closed = True
+
+def _unlinkfirst(targetpath):
+    try:
+        os.unlink(targetpath)
+    except OSError as e:
+        if e.errno == errno.ENOENT or e.errno == errno.EISDIR:
+            pass
+
+
 # class TarFile
 
 class TarIter:
index adfcfbb..4fac62f 100644 (file)
@@ -1822,6 +1822,7 @@ class DeltaTarTest(BaseTest):
             fullpath = os.path.join("source_dir", name)
             assert not os.path.exists(fullpath)
 
+
     def test_restore_malicious_symlinks(self):
         '''
         Creates a full backup containing a symlink and a file of the same name.
@@ -1883,7 +1884,94 @@ class DeltaTarTest(BaseTest):
         # retrieve the first item it finds for a given path which in the case
         # at hand is a symlink to some non-existent path
         fullpath = os.path.join("source_dir", testpath)
-        assert not os.path.exists(fullpath)
+        assert not os.path.lexists(fullpath)
+
+
+class TarfileTest(BaseTest):
+    pwd = None
+
+    def setUp(self):
+        self.pwd = os.getcwd()
+        os.makedirs("backup_dir", exist_ok=True)
+
+    def tearDown(self):
+        '''
+        Remove temporary files created by unit tests and restore the API
+        functions in *os*.
+        '''
+        os.chdir(self.pwd)
+        shutil.rmtree("backup_dir")
+
+    def test_extract_malicious_symlinks_unlink(self):
+        '''
+        Test symlink mitigation: The destination must be deleted prior to
+        extraction.
+        '''
+        tar_path = os.path.join("backup_dir", "malicious-archive")
+
+        # add symlinks to existing archive
+
+        def add_symlink (a, name, dst):
+            l = tarfile.TarInfo(name)
+            l.type = tarfile.SYMTYPE
+            l.linkname = dst
+            a.addfile(l)
+
+        def add_file (a, name):
+            f = tarfile.TarInfo(name)
+            f.type = tarfile.REGTYPE
+            a.addfile(f)
+
+        # Add a symlink pointing to must-not-exist, then append a file
+        # object at the same path. The file must not end up at
+        # “must-not-exist” (the pointee) but at “not-as-symlink” (the
+        # pointer) that was unlinked prior to extraction.
+        testpath = "test/not-a-symlink"
+        testdst  = "must-not-exist"
+
+        try:
+            with tarfile.open(tar_path, mode="w") as a:
+                add_symlink(a, testpath, testdst)
+                add_file(a, testpath)
+        except tarfile.ReadError as e:
+            if self.MODE == '#' or self.MODE.endswith ("gz"):
+                pass
+            else:
+                raise
+        except ValueError as e:
+            if self.MODE.startswith ('#'):
+                pass # O_APPEND of concat archives not feasible
+            else:
+                raise
+
+        def test_extract(dst, unlink):
+            with tarfile.open(tar_path, mode="r") as a:
+                os.makedirs(dst, exist_ok=True)
+                olddir = os.getcwd()
+                try:
+                    os.chdir(dst)
+                    a.extractall(unlink=unlink)
+                finally:
+                    os.chdir(olddir)
+
+            fullpath = os.path.join(dst, testpath)
+            fulldst  = os.path.join(dst, "test/%s" % testdst)
+
+            if unlink is True:
+                # Check whether the file was extracted. The object at the
+                # symlink location (source) must be the file. The must not
+                # be an object at the symlink destination.
+                assert not os.path.islink(fullpath)
+                assert not os.path.exists(fulldst)
+            else:
+                # Without unlink protection, the file must be found at the
+                # symlink destination with the symlink intact.
+                assert os.path.islink(fullpath)
+                assert os.path.exists(fulldst)
+
+
+        test_extract("test_dst_unlinked" , True)
+        test_extract("test_dst_symlinked", False)
 
 
 def fsapi_access_true (self):