also extract temp files and check size and contents
authorChristian Herdtweck <christian.herdtweck@intra2net.com>
Tue, 19 Jul 2016 08:58:27 +0000 (10:58 +0200)
committerChristian Herdtweck <christian.herdtweck@intra2net.com>
Thu, 12 Nov 2020 14:04:34 +0000 (15:04 +0100)
testing/volume_size_accuracy.py

index 820499b..67adade 100755 (executable)
@@ -11,6 +11,8 @@ By doing the following:
 - add a file that nearly fills the volume
 - add a small file that should just fit in or not
 - check expected number and size of volumes
+- extract
+- check integrity of extracted data
 - repeat with max_volume_size +1, +2, +10, ...
 - repeat with file sizes -1, -2, -10, ...
 
@@ -65,7 +67,8 @@ vol2: |  ...  |       |       |     |       |       |       |       |       |
 
 
 import os
-from os.path import dirname, abspath
+from os.path import dirname, abspath, join as pjoin
+from stat import S_ISREG
 import sys
 from math import ceil
 from glob import glob
@@ -91,8 +94,12 @@ MAX_VOLUME_BLOCKS = N_BLOCKS_PER_RECORD + 1
 #: ~48000 tests --> not recommended
 ALWAYS_PRINT_EVERYTHING = False
 
+
 def fill_file(file_handle, data_size):
-    """ fill given file handle with nonsense data of given size """
+    """ fill given file handle with nonsense data of given size
+
+    .. seealso:: :py:func:`check_file_fill`
+    """
     temp_data = bytes(range(2**8))
     temp_size = len(temp_data)
     n_written = 0
@@ -102,6 +109,31 @@ def fill_file(file_handle, data_size):
     file_handle.write(temp_data[:data_size-n_written])
 
 
+def check_file_fill(file_name, file_size):
+    """ check contents of file that was created by :py:func:`fill_file` """
+    # open file
+    with open(file_name, 'rb') as file_handle:
+
+        if file_size < 2**8:
+            if file_handle.read(2**8) != bytes(range(file_size)):
+                return False, 'complete contents is wrong'
+            else:
+                return True, 'short file fill checks out'
+
+        # check first 256 bytes
+        if file_handle.read(2**8) != bytes(range(2**8)):
+            return False, 'first bytes were wrong'
+
+        # check last 256 bytes
+        size_mod = file_size % 2**8
+        expect = bytes(range(size_mod, 2**8)) + bytes(range(size_mod))
+        file_handle.seek(file_size-2**8)
+        if file_handle.read(2**8) != expect:
+            return False, 'last bytes were wrong'
+
+    return True, 'file fill checks out'
+
+
 def new_volume_handler(tarobj, base_name, volume_number):
     """ called from tarobj when creating a new volume """
     tarobj.fileobj.close()
@@ -146,6 +178,7 @@ def test(temp_dir, file_blocks, volume_blocks_arg, offset_blocks,
     actual_offsets = []
     volume_files = []
     tar_handle = None
+    temp_file_names = []
     with NamedTemporaryFile(dir=temp_dir,
                             suffix='.tar',
                             mode='wb',
@@ -171,6 +204,7 @@ def test(temp_dir, file_blocks, volume_blocks_arg, offset_blocks,
             with NamedTemporaryFile(dir=temp_dir, delete=False) as add_handle:
                 fill_file(add_handle, file_size)
                 add_handle.close()
+                temp_file_names.append(add_handle.name)
             dprnt('adding file of size {} at offset {}'
                   .format(size_str(file_size), size_str(tarobj.offset)))
 
@@ -189,18 +223,27 @@ def test(temp_dir, file_blocks, volume_blocks_arg, offset_blocks,
         tarobj.close()
         tar_handle.close()
         dprnt('after close: offset is ' + size_str(tarobj.offset))
+    # done creating tar
 
-        # get volume file sizes
-        volume_files = sorted(glob(tar_handle.name + "*"))
-        for volume_file in volume_files:
-            actual_size = os.stat(volume_file).st_size
-            dprnt('found volume {} of size {}'
-                  .format(volume_file, size_str(actual_size)))
-            actual_sizes.append(actual_size)
-
+    # get volume file sizes
+    volume_files = sorted(glob(tar_handle.name + "*"))
+    for volume_file in volume_files:
+        actual_size = os.stat(volume_file).st_size
+        dprnt('found volume {} of size {}'
+              .format(volume_file, size_str(actual_size)))
+        actual_sizes.append(actual_size)
+
+    # extract
+    dprnt('extracting tar')
+    tarobj = TarFile.open(tar_handle.name, mode='r:',
+                          max_volume_size=max_volume_size,
+                          new_volume_handler=new_volume_handler)
+    tarobj.extractall(path=temp_dir)
+    tarobj.close()
+
+    # remove tar volumes
     for volume_file in volume_files:
         os.unlink(volume_file)
-    # now all temp files should be deleted again
 
     # check expectation
     everything_ok = True
@@ -239,6 +282,25 @@ def test(temp_dir, file_blocks, volume_blocks_arg, offset_blocks,
                   .format(idx, size_str(actual_size),
                           size_str(expected_blocks * BLOCKSIZE)))
 
+    # check extracted files, compare size and contents
+    for idx, (size_blocks, size_offset) \
+            in enumerate(zip(file_blocks, file_size_offsets)):
+        file_name = pjoin(temp_dir, 'file{}'.format(idx))
+        stat_result = os.stat(file_name)
+        if not S_ISREG(stat_result.st_mode):
+            everything_ok = False
+            dprnt('Missing {} after extraction!'.format(file_name))
+        if stat_result.st_size != size_blocks * BLOCKSIZE - size_offset:
+            everything_ok = False
+            dprnt('extracted {} has wrong size: {} != {} !'
+                  .format(file_name, size_str(stat_result.st_size),
+                          size_str(size_blocks * BLOCKSIZE - size_offset)))
+        fill_ok, message = check_file_fill(file_name, stat_result.st_size)
+        output.append(message)
+        if not fill_ok:
+            everything_ok = False
+            dprnt('extracted {} has wrong contents!'.format(file_name))
+
     # print output only if something went wrong
     if (not everything_ok) or ALWAYS_PRINT_EVERYTHING:
         for line in output: