add test for corruption of encrypted files
authorPhilipp Gesang <philipp.gesang@intra2net.com>
Thu, 10 Aug 2017 13:32:16 +0000 (15:32 +0200)
committerThomas Jarosch <thomas.jarosch@intra2net.com>
Mon, 2 Apr 2018 11:34:09 +0000 (13:34 +0200)
runtests.py
testing/test_recover.py

index 3ea1e6e..3605964 100755 (executable)
@@ -22,7 +22,7 @@ import unittest
 from testing.test_crypto import HeaderTest, AESGCMTest
 from testing.test_multivol import MultivolGnuFormatTest, MultivolPaxFormatTest
 from testing.test_concat_compress import ConcatCompressTest
-from testing.test_recover import RecoverTest
+from testing.test_recover import RecoverTest, RecoverGZTest, RecoverGZAESTest
 from testing.test_rescue_tar import RescueTarTest
 from testing.test_encryption import EncryptionTest
 from testing.test_deltatar import (DeltaTarTest, DeltaTar2Test,
@@ -56,7 +56,8 @@ if __name__ == "__main__":
                          , DeltaTarGzipAes128ConcatTest
                          , DeltaTarAes128ConcatTest
                          , HeaderTest, AESGCMTest
-                         , RecoverTest
+                         # testing.test_recover
+                         , RecoverTest, RecoverGZTest, RecoverGZAESTest
                          ]:
                 try:
                     t = group (n)
index 5c84783..52dcbee 100644 (file)
@@ -8,6 +8,12 @@ import deltatar.tarfile  as tarfile
 
 from . import BaseTest
 
+TEST_PASSWORD = "test1234"
+
+###############################################################################
+## helpers                                                                   ##
+###############################################################################
+
 def flip_bits (fname, off, b=0x01, n=1):
     """
     Open file *fname* at offset *off*, replacing the next *n* bytes with
@@ -48,37 +54,59 @@ def gz_header_size (fname, off=0):
 
     return off
 
+def is_pdt_encrypted (fname):
+    """
+    Returns true if the file contains at least one PDT header plus enough
+    space for the object.
+    """
+    try:
+        with open (fname, "rb") as st:
+            hdr = crypto.hdr_read_stream (st)
+            siz = hdr ["ctsize"]
+            assert (len (st.read (siz)) == siz)
+    except Exception as exn:
+        return False
+    return True
+
+
+###############################################################################
+## tests                                                                     ##
+###############################################################################
 
 class RecoverTest (BaseTest):
     """
     Disaster recovery: restore corrupt backups.
     """
 
-    GIT_DIR = '.git'
+    COMPRESSION = None
+    PASSWORD    = None
+    FAILURES    = 0
+
 
     def setUp(self):
         '''
         Create base test data
         '''
-        self.pwd = os.getcwd()
+        self.pwd      = os.getcwd()
+        self.dst_path = "source_dir"
+        self.src_path = "%s2" % self.dst_path
+        self.hash     = dict()
+
         os.system('rm -rf target_dir source_dir* backup_dir* huge')
-        os.makedirs('source_dir/test/test2')
-        self.hash = dict()
-        self.hash["source_dir/test/test2"] = ''
-        self.hash["source_dir/big"]  = self.create_file("source_dir/big", 50000)
-        self.hash["source_dir/small"]  = self.create_file("source_dir/small", 100)
-        self.hash["source_dir/test/huge"]  = self.create_file("source_dir/test/huge", 700000)
-        self.hash["source_dir/test/huge2"]  = self.create_file("source_dir/test/huge2", 800000)
+        os.makedirs (self.src_path)
 
-        self.consoleLogger = logging.StreamHandler()
-        self.consoleLogger.setLevel(logging.DEBUG)
+        for i in range (5):
+            f = "dummy_%rd" % i
+            self.hash [f] = self.create_file ("%s/%s"
+                                              % (self.src_path, f), 5 + i)
 
-        if not os.path.isdir(self.GIT_DIR):
-            # Not running inside git tree, take our
-            # own testing directory as source.
-            self.GIT_DIR = 'testing'
-            if not os.path.isdir(self.GIT_DIR):
-                raise Exception('No input directory found: ' + self.GIT_DIR)
+
+    def tearDown(self):
+        '''
+        Remove temporal files created by unit tests and reset globals.
+        '''
+        os.chdir(self.pwd)
+        os.system("rm -rf source_dir source_dir2 backup_dir*")
 
 
     def test_recover_corrupt_byte (self):
@@ -88,68 +116,97 @@ class RecoverTest (BaseTest):
         Expects the extraction to fail in normal mode. With disaster recovery,
         extraction must succeed, and exactly one file must be missing.
         """
-        mode           = "#gz"
-        dst_path       = "source_dir"
-        src_path       = "%s2" % dst_path
+        mode           = self.COMPRESSION or "#"
         bak_path       = "backup_dir"
-        backup_file    = "the_full_backup.tar.gz"
+        backup_file    = "the_full_backup.tar"
         backup_full    = "%s/%s" % (bak_path, backup_file)
-        index_file     = "the_full_index.gz"
+        index_file     = "the_full_index"
+
+        if self.COMPRESSION is not None:
+            backup_file += ".gz"
+            backup_full += ".gz"
+            index_file  += ".gz"
+
+        if self.PASSWORD is not None:
+            backup_file += deltatar.PDTCRYPT_EXTENSION
+            backup_full += deltatar.PDTCRYPT_EXTENSION
+            index_file  += deltatar.PDTCRYPT_EXTENSION
 
         def vname (*a, **kwa):
             return backup_file
 
-        dtar = deltatar.DeltaTar (mode="#gz",
-                                  logger=self.consoleLogger,
+        dtar = deltatar.DeltaTar (mode=mode,
+                                  logger=None,
+                                  password=self.PASSWORD,
                                   index_name_func=lambda _: index_file,
                                   volume_name_func=vname)
-        self.hash = dict ()
-        os.makedirs (src_path)
-        for i in range (5):
-            f = "dummy_%rd" % i
-            self.hash [f] = self.create_file ("%s/%s" % (src_path, f), 5 + i)
 
         dtar.create_full_backup \
-            (source_path=src_path, backup_path=bak_path)
+            (source_path=self.src_path, backup_path=bak_path)
+
+        if self.PASSWORD is not None:
+            # ensure all files are at least superficially in PDT format
+            for f in os.listdir (bak_path):
+                assert is_pdt_encrypted ("%s/%s" % (bak_path, f))
 
         # first restore must succeed
-        dtar.restore_backup(target_path=dst_path,
+        dtar.restore_backup(target_path=self.dst_path,
                             backup_tar_path=backup_full)
         for key, value in self.hash.items ():
-            f = "%s/%s" % (dst_path, key)
+            f = "%s/%s" % (self.dst_path, key)
             assert os.path.exists (f)
             assert value == self.md5sum (f)
-        shutil.rmtree (dst_path)
-        shutil.rmtree (src_path)
+        shutil.rmtree (self.dst_path)
+        shutil.rmtree (self.src_path)
 
-        flip_bits (backup_full, gz_header_size (backup_full) + 1)
+        if self.PASSWORD is not None:
+            flip_bits (backup_full, crypto.PDTCRYPT_HDR_SIZE + 1)
+        elif self.COMPRESSION is not None:
+            flip_bits (backup_full, gz_header_size (backup_full) + 1)
+        else:
+            flip_bits (backup_full, tarfile.BLOCKSIZE + 1)
 
         # normal restore must fail
-        curdir = os.getcwd () # not restored after below failure
-        with self.assertRaises (Exception) as cm:
-            dtar.restore_backup(target_path=dst_path,
+        try:
+            dtar.restore_backup(target_path=self.dst_path,
                                 backup_tar_path=backup_full)
-        assert type (cm.exception) in [ tarfile.CompressionError
-                                      , tarfile.ReadError
-                                      ]
-
-        os.chdir (curdir)
+        except tarfile.CompressionError:
+            if self.PASSWORD is not None or self.COMPRESSION is not None:
+                pass
+        except tarfile.ReadError:
+            if self.PASSWORD is not None or self.COMPRESSION is not None:
+                pass
+
+        os.chdir (self.pwd) # not restored due to the error above
         # but recover will succeed
-        failed = dtar.recover_backup(target_path=dst_path,
+        failed = dtar.recover_backup(target_path=self.dst_path,
                                      backup_indexes_paths=[
                                          "%s/%s" % (bak_path, index_file)
                                      ])
-        assert len (failed) == 1
+
+        assert len (failed) == self.FAILURES
 
         # with one file missing
         missing = []
         for key, value in self.hash.items ():
-            kkey = "%s/%s" % (dst_path, key)
+            kkey = "%s/%s" % (self.dst_path, key)
             if os.path.exists (kkey):
                 assert value == self.md5sum (kkey)
             else:
                 missing.append (key)
-        assert len (missing) == 1
+        assert len (missing) == self.FAILURES
+
+        shutil.rmtree (self.dst_path)
+
+
+class RecoverGZTest (RecoverTest):
+    COMPRESSION = "#gz"
+    PASSWORD    = None
+    FAILURES    = 1
+
 
-        shutil.rmtree (dst_path)
+class RecoverGZAESTest (RecoverTest):
+    COMPRESSION = "#gz"
+    PASSWORD    = TEST_PASSWORD
+    FAILURES    = 1