test that seeking backwards is disallowed by _Stream
authorPhilipp Gesang <philipp.gesang@intra2net.com>
Mon, 8 May 2017 09:26:54 +0000 (11:26 +0200)
committerThomas Jarosch <thomas.jarosch@intra2net.com>
Mon, 2 Apr 2018 11:34:08 +0000 (13:34 +0200)
Re-extracting an already decrypted file will fail on account of
IV reuse. Currently, tarfile._Stream is not capable of performing
backward seeks, so we’re good. Should this limitation be removed
in a future version, this unit test will fail.

testing/test_deltatar.py

index 557de07..c9dbd9c 100644 (file)
@@ -446,6 +446,96 @@ class DeltaTarTest(BaseTest):
         os.unlink("huge")
 
 
+    def test_restore_manual_from_index_twice (self):
+        """
+        Creates a full backup and restore the same file twice. This *must* fail
+        when encryption is active.
+
+        Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
+        backwards within the same file. This prevents the encryption layer from
+        exploding due to a reused IV in an overall valid archive.
+
+        This test anticipates possible future mistakes since it’s entirely
+        feasible to implement backward seeks for *_Stream* with concat mode.
+        """
+        # this test only works for uncompressed or concat compressed modes
+        if self.MODE.startswith("|") or self.MODE_COMPRESSES:
+            raise SkipTest("this test only works for uncompressed "
+                           "or concat compressed modes")
+
+        password, paramversion = self.ENCRYPTION or (None, None)
+        deltatar = DeltaTar(mode=self.MODE, password=password,
+                            crypto_paramversion=paramversion,
+                            logger=self.consoleLogger)
+
+        self.hash = dict()
+        os.makedirs("source_dir2")
+        self.hash["source_dir2/samefile"] = \
+            self.create_file("source_dir2/samefile", 1 * 1024)
+
+        # create first backup
+        deltatar.create_full_backup(
+            source_path="source_dir2",
+            backup_path="backup_dir")
+
+        assert os.path.exists("backup_dir")
+        assert os.path.exists(os.path.join("backup_dir",
+            deltatar.volume_name_func("backup_dir", True, 0)))
+
+        shutil.rmtree("source_dir2")
+
+        tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
+        tar_path = os.path.join("backup_dir", tar_filename)
+
+        index_filename = deltatar.index_name_func(True)
+        index_path = os.path.join("backup_dir", index_filename)
+
+        f = deltatar.open_auxiliary_file(index_path, "r")
+        offset = None
+        while True:
+            l = f.readline()
+            if not len(l):
+                break
+            data = json.loads(l.decode("UTF-8"))
+            if data.get("type", "") == "file" and\
+                    deltatar.unprefixed(data["path"]) == "samefile":
+                offset = data["offset"]
+                break
+
+        assert offset is not None
+
+        fo = open(tar_path, "rb")
+        fo.seek(offset)
+
+        crypto_ctx = None
+        if self.ENCRYPTION is not None:
+            crypto_ctx = crypto.Decrypt (password)
+
+        tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
+                              encryption=crypto_ctx)
+        member = tarobj.next()
+        member.path = deltatar.unprefixed(member.path)
+        member.name = deltatar.unprefixed(member.name)
+
+        # extract once …
+        tarobj.extract(member)
+        assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
+
+        # … and twice
+        try:
+            tarobj.extract(member)
+        except tarfile.StreamError:
+            if crypto_ctx is not None:
+                pass # good: seeking backwards not allowed
+            else:
+                raise
+        tarobj.close()
+        fo.close()
+        assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
+
+        os.unlink("samefile")
+
+
     def test_restore_from_index(self):
         '''
         Restores a full backup using an index file.