From 9e092947cd04e853ce43b23ba82cd59f0e1f20c4 Mon Sep 17 00:00:00 2001 From: Philipp Gesang Date: Mon, 8 May 2017 11:26:54 +0200 Subject: [PATCH] test that seeking backwards is disallowed by _Stream MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Re-extracting an already decrypted file will fail on account of IV reuse. Currently, tarfile._Stream is not capable of performing backward seeks, so we’re good. Should this limitation be removed in a future version, this unit test will fail. --- testing/test_deltatar.py | 90 ++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 90 insertions(+), 0 deletions(-) diff --git a/testing/test_deltatar.py b/testing/test_deltatar.py index 557de07..c9dbd9c 100644 --- a/testing/test_deltatar.py +++ b/testing/test_deltatar.py @@ -446,6 +446,96 @@ class DeltaTarTest(BaseTest): os.unlink("huge") + def test_restore_manual_from_index_twice (self): + """ + Creates a full backup and restore the same file twice. This *must* fail + when encryption is active. + + Currently, tarfile.py’s *_Stream* class conveniently disallows seeking + backwards within the same file. This prevents the encryption layer from + exploding due to a reused IV in an overall valid archive. + + This test anticipates possible future mistakes since it’s entirely + feasible to implement backward seeks for *_Stream* with concat mode. + """ + # this test only works for uncompressed or concat compressed modes + if self.MODE.startswith("|") or self.MODE_COMPRESSES: + raise SkipTest("this test only works for uncompressed " + "or concat compressed modes") + + password, paramversion = self.ENCRYPTION or (None, None) + deltatar = DeltaTar(mode=self.MODE, password=password, + crypto_paramversion=paramversion, + logger=self.consoleLogger) + + self.hash = dict() + os.makedirs("source_dir2") + self.hash["source_dir2/samefile"] = \ + self.create_file("source_dir2/samefile", 1 * 1024) + + # create first backup + deltatar.create_full_backup( + source_path="source_dir2", + backup_path="backup_dir") + + assert os.path.exists("backup_dir") + assert os.path.exists(os.path.join("backup_dir", + deltatar.volume_name_func("backup_dir", True, 0))) + + shutil.rmtree("source_dir2") + + tar_filename = deltatar.volume_name_func("backup_dir", True, 0) + tar_path = os.path.join("backup_dir", tar_filename) + + index_filename = deltatar.index_name_func(True) + index_path = os.path.join("backup_dir", index_filename) + + f = deltatar.open_auxiliary_file(index_path, "r") + offset = None + while True: + l = f.readline() + if not len(l): + break + data = json.loads(l.decode("UTF-8")) + if data.get("type", "") == "file" and\ + deltatar.unprefixed(data["path"]) == "samefile": + offset = data["offset"] + break + + assert offset is not None + + fo = open(tar_path, "rb") + fo.seek(offset) + + crypto_ctx = None + if self.ENCRYPTION is not None: + crypto_ctx = crypto.Decrypt (password) + + tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo, + encryption=crypto_ctx) + member = tarobj.next() + member.path = deltatar.unprefixed(member.path) + member.name = deltatar.unprefixed(member.name) + + # extract once … + tarobj.extract(member) + assert self.hash["source_dir2/samefile"] == self.md5sum("samefile") + + # … and twice + try: + tarobj.extract(member) + except tarfile.StreamError: + if crypto_ctx is not None: + pass # good: seeking backwards not allowed + else: + raise + tarobj.close() + fo.close() + assert self.hash["source_dir2/samefile"] == self.md5sum("samefile") + + os.unlink("samefile") + + def test_restore_from_index(self): ''' Restores a full backup using an index file. -- 1.7.1