From d6a20f0ce5f0ff83d4e1e417a5b728647236053e Mon Sep 17 00:00:00 2001 From: Christian Herdtweck Date: Tue, 19 Jul 2016 11:54:01 +0200 Subject: [PATCH] rename volume_size_accuracy to test_volume_size_accuracy to fit pattern of unittests --- runtests.py | 1 + testing/test_volume_size_accuracy.py | 476 ++++++++++++++++++++++++++++++++++ testing/volume_size_accuracy.py | 476 ---------------------------------- 3 files changed, 477 insertions(+), 476 deletions(-) create mode 100755 testing/test_volume_size_accuracy.py delete mode 100755 testing/volume_size_accuracy.py diff --git a/runtests.py b/runtests.py index eb2c7a4..263d28b 100755 --- a/runtests.py +++ b/runtests.py @@ -92,6 +92,7 @@ from testing.test_deltatar import (DeltaTarTest, DeltaTar2Test, ) from testing.test_compression_level import suite +from testing.test_volume_size_accuracy import VolumeSizeAccuracyTest if __name__ == "__main__": import sys diff --git a/testing/test_volume_size_accuracy.py b/testing/test_volume_size_accuracy.py new file mode 100755 index 0000000..22ed178 --- /dev/null +++ b/testing/test_volume_size_accuracy.py @@ -0,0 +1,476 @@ +#!/usr/bin/env python3 + +""" Check very accurately the splitting of files into volumes + +Check: +- behaviour for max_volume_sizes % BLOCKSIZE != 0 +- file sizes very close to size remaining in volume + +By doing the following: +- create a multi-volume archive with max_volume_size % BLOCKSIZE == 0 +- add a file that nearly fills the volume +- add a small file that should just fit in or not +- check expected number and size of volumes +- extract +- check integrity of extracted data +- repeat with max_volume_size +1, +2, +10, ... +- repeat with file sizes -1, -2, -10, ... + +There are 2 ways to run these tests: +- run this file as a script will run run_all_tests, which tests lots of + parameter combinations +- if run as unittest, only a few parameter combinations are run + +Tests use max_volume_size = RECORDSIZE + 1 * BLOCKSIZE: +File 0 has Info0 and blocks Dat00, Dat01, ... Dat0K, (Dat0L, Dat0M, Dat0N) +File 1 has Info1 and blocks Dat10, (Dat11) + end of end of + RECORD volume +block | 0 | 1 | 2 | ... | -5 | -4 | -3 | -2 | -1 | + +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ +0: all fit into first record: blocks = [N_BLOCKS_PER_RECORD-5, 1] +vol0: | Info0 | Dat00 | Dat01 | ... | Info1 | Dat10 | 0 | 0 | | + +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ + +1: all fit into first volume: blocks = [MAX_VOLUME_BLOCKS-5, 1] +vol0: | Info0 | Dat00 | Dat01 | ... | Dat0K | Info1 | Dat10 | 0 | 0 | + +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ + +2: file0 needs next block: blocks = [MAX_VOLUME_BLOCKS-4, 1] + --> will add one block of zeros after end of volume +vol0: | Info0 | Dat00 | Dat01 | ... | Dat0K | Dat0L | Info1 | Dat10 | 0 | 0 + +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ + +3: file0 needs 2 more blocks: blocks = [MAX_VOLUME_BLOCKS-3, 1] + --> will add two blocks of zeros after end of volume +vol0: | Info0 | Dat00 | Dat01 | ... | Dat0K | Dat0L | Dat0M | Info1 | Dat10 |00 + +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ + +4: file0 needs 3 more blocks: blocks = [MAX_VOLUME_BLOCKS-2, 1] +vol0: | Info0 | Dat00 | Dat01 | ... | Dat0K | Dat0L | Dat0M | wasted space | +vol1: | VOL | Dat0N | Info1 | Dat10 | 00 + +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ + +5: file0 regular, file1 needs next block: blocks = [MAX_VOLUME_BLOCKS-5, 2] +vol0: | Info0 | Dat00 | Dat01 | ... | Dat0K | Info1 | Dat10 | | | +vol1: | VOL | Dat11 | 0 | 0 | | | | | | + +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ + +6: both need next block: blocks = [MAX_VOLUME_BLOCKS-4, 2] +vol0: | Info0 | Dat00 | Dat01 | ... | Dat0K | Dat0L | Info1 | Dat10 | | +vol1: | VOL | Dat11 | 0 |0|...| | | | | | + +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ + +(not tested: single huge file) +vol0: | Info0 | Dat00 | Dat01 | ... | Dat02 | Dat03 | Dat04 | wasted space | +vol1: | VOL | Dat05 | Dat06 | ... | Dat07 | Dat08 | Dat09 | wasted space | +vol2: | ... | | | | | | | | | + +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ + +.. codeauthor:: Intra2net AG +""" + +import os +from os.path import dirname, abspath, join as pjoin +from stat import S_ISREG +import sys +from math import ceil +from glob import glob +from tempfile import NamedTemporaryFile, TemporaryDirectory +from unittest import TestCase + +# try to import the tarfile from source, not the globally installed one +source_base = dirname(dirname(abspath(__file__))) +print('adding {} to python path'.format(source_base)) +if os.path.isdir(source_base): + sys.path.insert(0, source_base) +import inspect +from deltatar.tarfile import TarFile, BLOCKSIZE, RECORDSIZE +print('using TarFile from ' + dirname(inspect.getsourcefile(TarFile))) + + +#: number of blocks in a record +N_BLOCKS_PER_RECORD = RECORDSIZE // BLOCKSIZE + +#: number of blocks per tar volume file +MAX_VOLUME_BLOCKS = N_BLOCKS_PER_RECORD + 1 + + +def fill_file(file_handle, data_size): + """ fill given file handle with nonsense data of given size + + .. seealso:: :py:func:`check_file_fill` + """ + temp_data = bytes(range(2**8)) + temp_size = len(temp_data) + n_written = 0 + while n_written + temp_size <= data_size: + file_handle.write(temp_data) + n_written += temp_size + file_handle.write(temp_data[:data_size-n_written]) + + +def check_file_fill(file_name, file_size): + """ check contents of file that was created by :py:func:`fill_file` """ + # open file + with open(file_name, 'rb') as file_handle: + + if file_size < 2**8: + if file_handle.read(2**8) != bytes(range(file_size)): + return False, 'complete contents is wrong' + else: + return True, 'short file fill checks out' + + # check first 256 bytes + if file_handle.read(2**8) != bytes(range(2**8)): + return False, 'first bytes were wrong' + + # check last 256 bytes + size_mod = file_size % 2**8 + expect = bytes(range(size_mod, 2**8)) + bytes(range(size_mod)) + file_handle.seek(file_size-2**8) + if file_handle.read(2**8) != expect: + return False, 'last bytes were wrong' + + return True, 'file fill checks out' + + +def new_volume_handler(tarobj, base_name, volume_number): + """ called from tarobj when creating a new volume """ + tarobj.fileobj.close() + volume_path = "%s.%d" % (base_name, volume_number) + tarobj.open_volume(volume_path) + + +def size_str(size): + """ return string 'N (= b BLKS + m)' """ + return '{} (= {} BLKs + {})'.format(size, *divmod(size, BLOCKSIZE)) + + +def do_test(temp_dir, file_blocks, volume_blocks_arg, offset_blocks, + file_size_offsets=(0, 0), volume_size_offset=0, + print_everything=False): + """ create TarFile with given configuration """ + + # use delayed + conditional print dprnt, that only prints if necessary + # (i.e. if something went wrong) + output = [] + dprnt = output.append + + dprnt('-' * 72) + dprnt('testing with {} file blocks, expect {} volume blocks' + .format(file_blocks, volume_blocks_arg)) + dprnt('expect offsets to be {} blocks'.format(offset_blocks)) + dprnt('using offsets v:{}, files:{}' + .format(volume_size_offset, file_size_offsets)) + + if not (0 <= volume_size_offset < BLOCKSIZE): + raise ValueError('volume size offset outside allowed interval ' + '[0, BLOCKSIZE-1]: {}'.format(volume_size_offset)) + for idx, size_offset in enumerate(file_size_offsets): + if not (0 <= size_offset < BLOCKSIZE): + raise ValueError('size offset for file {} outside allowed interval ' + '[0, BLOCKSIZE-1]: {}' + .format(idx, size_offset)) + if len(file_blocks) != len(file_size_offsets): + raise ValueError('need same number of file block sizes and offsets!') + + max_volume_size = MAX_VOLUME_BLOCKS * BLOCKSIZE + volume_size_offset + actual_sizes = [] + actual_offsets = [] + volume_files = [] + tar_handle = None + temp_file_names = [] + with NamedTemporaryFile(dir=temp_dir, + suffix='.tar', + mode='wb', + delete=False) as tar_handle: + # create TarFile + dprnt('creating tar {} with max volume size {}' + .format(tar_handle.name, size_str(max_volume_size))) + tarobj = TarFile.open(mode='w:', + fileobj=tar_handle, + max_volume_size=max_volume_size, + new_volume_handler=new_volume_handler) + + # add files, remember offsets + for idx, (size_blocks, size_offset) \ + in enumerate(zip(file_blocks, file_size_offsets)): + + # remember offset + actual_offsets.append(tarobj.offset) + + # create file + add_handle = None + file_size = size_blocks * BLOCKSIZE - size_offset + with NamedTemporaryFile(dir=temp_dir, delete=False) as add_handle: + fill_file(add_handle, file_size) + add_handle.close() + temp_file_names.append(add_handle.name) + dprnt('adding file of size {} at offset {}' + .format(size_str(file_size), size_str(tarobj.offset))) + + # add file + tarobj.add(add_handle.name, arcname='file{}'.format(idx)) + + # remove file + if add_handle: + os.unlink(add_handle.name) + + # remember offset where 0s should be added + actual_offsets.append(tarobj.offset) + + # close tar file + dprnt('before close: offset is ' + size_str(actual_offsets[-1])) + tarobj.close() + tar_handle.close() + dprnt('after close: offset is ' + size_str(tarobj.offset)) + # done creating tar + + # get volume file sizes + volume_files = sorted(glob(tar_handle.name + "*")) + for volume_file in volume_files: + actual_size = os.stat(volume_file).st_size + dprnt('found volume {} of size {}' + .format(volume_file, size_str(actual_size))) + actual_sizes.append(actual_size) + + # extract + dprnt('extracting tar') + tarobj = TarFile.open(tar_handle.name, mode='r:', + max_volume_size=max_volume_size, + new_volume_handler=new_volume_handler) + tarobj.extractall(path=temp_dir) + tarobj.close() + + # remove tar volumes + for volume_file in volume_files: + os.unlink(volume_file) + + # check expectation + everything_ok = True + if len(actual_offsets) != len(offset_blocks): + everything_ok = False + dprnt('have {} actual offsets but expected {}!' + .format(len(actual_offsets), len(offset_blocks))) + for idx, (actual_offset, expected_block) \ + in enumerate(zip(actual_offsets, offset_blocks)): + if actual_offset != expected_block * BLOCKSIZE: + everything_ok = False + dprnt('wrong offset for file {}: {} != {}' + .format(idx, size_str(actual_offset), + size_str(expected_block*BLOCKSIZE))) + + # last volume is filled up to RECORDSIZE + volume_blocks = list(volume_blocks_arg) + if (len(actual_sizes) == len(volume_blocks)-1) \ + and (volume_blocks[-1] == 0): + actual_sizes.append(0) + volume_blocks[-2] = ceil(volume_blocks[-2] / N_BLOCKS_PER_RECORD) \ + * N_BLOCKS_PER_RECORD + elif len(actual_sizes) == len(volume_blocks): + volume_blocks[-1] = ceil(volume_blocks[-1] / N_BLOCKS_PER_RECORD) \ + * N_BLOCKS_PER_RECORD + else: + everything_ok = False + dprnt('wrong number of volumes: {} != {}' + .format(len(actual_sizes), len(volume_blocks))) + + for idx, (actual_size, expected_blocks) in \ + enumerate(zip(actual_sizes, volume_blocks)): + if actual_size != expected_blocks * BLOCKSIZE: + everything_ok = False + dprnt('wrong size for volume {}: {} != {}' + .format(idx, size_str(actual_size), + size_str(expected_blocks * BLOCKSIZE))) + + # check extracted files, compare size and contents + for idx, (size_blocks, size_offset) \ + in enumerate(zip(file_blocks, file_size_offsets)): + file_name = pjoin(temp_dir, 'file{}'.format(idx)) + stat_result = os.stat(file_name) + if not S_ISREG(stat_result.st_mode): + everything_ok = False + dprnt('Missing {} after extraction!'.format(file_name)) + if stat_result.st_size != size_blocks * BLOCKSIZE - size_offset: + everything_ok = False + dprnt('extracted {} has wrong size: {} != {} !' + .format(file_name, size_str(stat_result.st_size), + size_str(size_blocks * BLOCKSIZE - size_offset))) + fill_ok, message = check_file_fill(file_name, stat_result.st_size) + output.append(message) + if not fill_ok: + everything_ok = False + dprnt('extracted {} has wrong contents!'.format(file_name)) + + # print output only if something went wrong + if (not everything_ok) or print_everything: + for line in output: + print(line) + + return everything_ok + + +def run_all_tests(fast_fail=True, print_everything=False): + """ run test with lots of parameter combinations, will take quite a while + + see module doc for more info + """ + + # abbreviations for shorter lists + N = N_BLOCKS_PER_RECORD + M = MAX_VOLUME_BLOCKS + B = BLOCKSIZE + + # define tests by numbers of blocks: + # n_blocks file 0, 1; n_blocks vol0, 1, offset Info0, Info1, 0-blocks + tests = (((N-5, 1), (N, 0), (0, N-4, N-2)), # test case 0 + ((M-5, 1), (M, 0), (0, M-4, M-2)), # test case 1 + ((M-4, 1), (M+1, 0), (0, M-3, M-1)), # test case 2 + ((M-3, 1), (M+2, 0), (0, M-2, M)), # test case 3 + ((M-2, 1), (M-2, 6), (0, 2, 4)), # test case 4 + ((M-5, 2), (M-2, 4), (0, M-4, 2)), # test case 5 + ((M-4, 2), (M-1, 4), (0, M-3, 2))) # test case 6 + + # offsets for file and volume sizes in tests: + size_offsets = (0, 1, 2, 5, 10, 22, 46, 100) + \ + (B-1, B-2, B-5, B-10, B-22, B-46, B-100)[::-1] + + n_errs = 0 + n_tests = 0 + n_tests_overall = len(tests) * len(size_offsets)**3 + with TemporaryDirectory(prefix='deltatar_test_') as temp_dir: + for size_comb_idx, (file_sizes, vol_sizes, offsets) \ + in enumerate(tests): + if fast_fail and n_errs > 0: + break + print('=' * 72) + print('size combination {}: ({}, {})' + .format(size_comb_idx, *file_sizes)) + for volume_size_offset in size_offsets: + if fast_fail and n_errs > 0: + break + print('test {:5d} / {:5d}, volume offset = {:3d}' + .format(n_tests, n_tests_overall, + volume_size_offset)) + for file_size_offset0 in size_offsets: + if fast_fail and n_errs > 0: + break + for file_size_offset1 in size_offsets: + if fast_fail and n_errs > 0: + break + n_tests += 1 + test_succeeded = \ + do_test(temp_dir, file_sizes, vol_sizes, offsets, + file_size_offsets=(file_size_offset0, + file_size_offset1), + volume_size_offset=volume_size_offset, + print_everything=print_everything) + if not test_succeeded: + n_errs += 1 + if fast_fail: + print('stopping after test {} (fast-fail set)' + .format(n_tests)) + return n_errs + + +class VolumeSizeAccuracyTest(TestCase): + """ unittest that runs only a few test parameter combinations """ + + # base variables (test case 5) + file_sizes = MAX_VOLUME_BLOCKS-5, 2 + vol_sizes = MAX_VOLUME_BLOCKS-2, 4 + expect_offsets = 0, MAX_VOLUME_BLOCKS-4, 2 + file_size_offsets = 0, 0 + volume_size_offset = 0 + + def do_test(self, file_sizes, vol_sizes, expect_offsets, file_size_offsets, + volume_size_offset): + """ create temp dir, run :py:func:`do_test` with given params """ + with TemporaryDirectory(prefix='deltatar_test_') as temp_dir: + return do_test(temp_dir, file_sizes, vol_sizes, expect_offsets, + file_size_offsets, volume_size_offset) + + def test_base(self): + """ no offsets, easy-peasy """ + self.assertTrue(self.do_test(\ + self.file_sizes, self.vol_sizes, self.expect_offsets, + self.file_size_offsets, self.volume_size_offset)) + + def test_volume_offset(self): + """ only a volume size offset """ + volume_size_offset = 1 + self.assertTrue(self.do_test(\ + self.file_sizes, self.vol_sizes, self.expect_offsets, + self.file_size_offsets, volume_size_offset)) + + volume_size_offset = BLOCKSIZE-1 + self.assertTrue(self.do_test(\ + self.file_sizes, self.vol_sizes, self.expect_offsets, + self.file_size_offsets, volume_size_offset)) + + def test_file0_offset(self): + """ only a offset for size of file0 """ + file0_offset = 1 + self.assertTrue(self.do_test(\ + self.file_sizes, self.vol_sizes, self.expect_offsets, + (file0_offset, self.file_size_offsets[1]), + self.volume_size_offset)) + + file0_offset = BLOCKSIZE-1 + self.assertTrue(self.do_test(\ + self.file_sizes, self.vol_sizes, self.expect_offsets, + (file0_offset, self.file_size_offsets[1]), + self.volume_size_offset)) + + def test_file1_offset(self): + """ only a offset for size of file1 """ + file1_offset = 1 + self.assertTrue(self.do_test(\ + self.file_sizes, self.vol_sizes, self.expect_offsets, + (self.file_size_offsets[0], file1_offset), + self.volume_size_offset)) + + file1_offset = BLOCKSIZE-1 + self.assertTrue(self.do_test(\ + self.file_sizes, self.vol_sizes, self.expect_offsets, + (self.file_size_offsets[0], file1_offset), + self.volume_size_offset)) + + def test_multi_offsets(self): + """ test 2 combinations of all offsets """ + file_size_offsets = (1, 1) + volume_size_offset = BLOCKSIZE-1 + self.assertTrue(self.do_test(\ + self.file_sizes, self.vol_sizes, self.expect_offsets, + file_size_offsets, volume_size_offset)) + + file_size_offsets = (BLOCKSIZE-1, BLOCKSIZE-1) + volume_size_offset = 1 + self.assertTrue(self.do_test(\ + self.file_sizes, self.vol_sizes, self.expect_offsets, + file_size_offsets, volume_size_offset)) + + def test_single_volume(self): + """ smaller files --> only a single volume (test case 1) """ + file_sizes = MAX_VOLUME_BLOCKS-5, 1 + vol_sizes = MAX_VOLUME_BLOCKS, 0 + expect_offsets = 0, MAX_VOLUME_BLOCKS-4, MAX_VOLUME_BLOCKS-2 + self.assertTrue(self.do_test(\ + file_sizes, vol_sizes, expect_offsets, + self.file_size_offsets, self.volume_size_offset)) + + def test_single_volume_offset(self): + """ single volume + offsets """ + file_sizes = MAX_VOLUME_BLOCKS-5, 1 + vol_sizes = MAX_VOLUME_BLOCKS, 0 + expect_offsets = 0, MAX_VOLUME_BLOCKS-4, MAX_VOLUME_BLOCKS-2 + file_size_offsets = (BLOCKSIZE-1, BLOCKSIZE-1) + volume_size_offset = 1 + self.assertTrue(self.do_test(\ + file_sizes, vol_sizes, expect_offsets, + file_size_offsets, volume_size_offset)) + + +if __name__ == '__main__': + sys.exit(run_all_tests()) diff --git a/testing/volume_size_accuracy.py b/testing/volume_size_accuracy.py deleted file mode 100755 index 22ed178..0000000 --- a/testing/volume_size_accuracy.py +++ /dev/null @@ -1,476 +0,0 @@ -#!/usr/bin/env python3 - -""" Check very accurately the splitting of files into volumes - -Check: -- behaviour for max_volume_sizes % BLOCKSIZE != 0 -- file sizes very close to size remaining in volume - -By doing the following: -- create a multi-volume archive with max_volume_size % BLOCKSIZE == 0 -- add a file that nearly fills the volume -- add a small file that should just fit in or not -- check expected number and size of volumes -- extract -- check integrity of extracted data -- repeat with max_volume_size +1, +2, +10, ... -- repeat with file sizes -1, -2, -10, ... - -There are 2 ways to run these tests: -- run this file as a script will run run_all_tests, which tests lots of - parameter combinations -- if run as unittest, only a few parameter combinations are run - -Tests use max_volume_size = RECORDSIZE + 1 * BLOCKSIZE: -File 0 has Info0 and blocks Dat00, Dat01, ... Dat0K, (Dat0L, Dat0M, Dat0N) -File 1 has Info1 and blocks Dat10, (Dat11) - end of end of - RECORD volume -block | 0 | 1 | 2 | ... | -5 | -4 | -3 | -2 | -1 | - +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ -0: all fit into first record: blocks = [N_BLOCKS_PER_RECORD-5, 1] -vol0: | Info0 | Dat00 | Dat01 | ... | Info1 | Dat10 | 0 | 0 | | - +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ - -1: all fit into first volume: blocks = [MAX_VOLUME_BLOCKS-5, 1] -vol0: | Info0 | Dat00 | Dat01 | ... | Dat0K | Info1 | Dat10 | 0 | 0 | - +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ - -2: file0 needs next block: blocks = [MAX_VOLUME_BLOCKS-4, 1] - --> will add one block of zeros after end of volume -vol0: | Info0 | Dat00 | Dat01 | ... | Dat0K | Dat0L | Info1 | Dat10 | 0 | 0 - +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ - -3: file0 needs 2 more blocks: blocks = [MAX_VOLUME_BLOCKS-3, 1] - --> will add two blocks of zeros after end of volume -vol0: | Info0 | Dat00 | Dat01 | ... | Dat0K | Dat0L | Dat0M | Info1 | Dat10 |00 - +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ - -4: file0 needs 3 more blocks: blocks = [MAX_VOLUME_BLOCKS-2, 1] -vol0: | Info0 | Dat00 | Dat01 | ... | Dat0K | Dat0L | Dat0M | wasted space | -vol1: | VOL | Dat0N | Info1 | Dat10 | 00 - +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ - -5: file0 regular, file1 needs next block: blocks = [MAX_VOLUME_BLOCKS-5, 2] -vol0: | Info0 | Dat00 | Dat01 | ... | Dat0K | Info1 | Dat10 | | | -vol1: | VOL | Dat11 | 0 | 0 | | | | | | - +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ - -6: both need next block: blocks = [MAX_VOLUME_BLOCKS-4, 2] -vol0: | Info0 | Dat00 | Dat01 | ... | Dat0K | Dat0L | Info1 | Dat10 | | -vol1: | VOL | Dat11 | 0 |0|...| | | | | | - +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ - -(not tested: single huge file) -vol0: | Info0 | Dat00 | Dat01 | ... | Dat02 | Dat03 | Dat04 | wasted space | -vol1: | VOL | Dat05 | Dat06 | ... | Dat07 | Dat08 | Dat09 | wasted space | -vol2: | ... | | | | | | | | | - +-------+-------+-------+ ... +-------+-------+-------+-------+-------+ - -.. codeauthor:: Intra2net AG -""" - -import os -from os.path import dirname, abspath, join as pjoin -from stat import S_ISREG -import sys -from math import ceil -from glob import glob -from tempfile import NamedTemporaryFile, TemporaryDirectory -from unittest import TestCase - -# try to import the tarfile from source, not the globally installed one -source_base = dirname(dirname(abspath(__file__))) -print('adding {} to python path'.format(source_base)) -if os.path.isdir(source_base): - sys.path.insert(0, source_base) -import inspect -from deltatar.tarfile import TarFile, BLOCKSIZE, RECORDSIZE -print('using TarFile from ' + dirname(inspect.getsourcefile(TarFile))) - - -#: number of blocks in a record -N_BLOCKS_PER_RECORD = RECORDSIZE // BLOCKSIZE - -#: number of blocks per tar volume file -MAX_VOLUME_BLOCKS = N_BLOCKS_PER_RECORD + 1 - - -def fill_file(file_handle, data_size): - """ fill given file handle with nonsense data of given size - - .. seealso:: :py:func:`check_file_fill` - """ - temp_data = bytes(range(2**8)) - temp_size = len(temp_data) - n_written = 0 - while n_written + temp_size <= data_size: - file_handle.write(temp_data) - n_written += temp_size - file_handle.write(temp_data[:data_size-n_written]) - - -def check_file_fill(file_name, file_size): - """ check contents of file that was created by :py:func:`fill_file` """ - # open file - with open(file_name, 'rb') as file_handle: - - if file_size < 2**8: - if file_handle.read(2**8) != bytes(range(file_size)): - return False, 'complete contents is wrong' - else: - return True, 'short file fill checks out' - - # check first 256 bytes - if file_handle.read(2**8) != bytes(range(2**8)): - return False, 'first bytes were wrong' - - # check last 256 bytes - size_mod = file_size % 2**8 - expect = bytes(range(size_mod, 2**8)) + bytes(range(size_mod)) - file_handle.seek(file_size-2**8) - if file_handle.read(2**8) != expect: - return False, 'last bytes were wrong' - - return True, 'file fill checks out' - - -def new_volume_handler(tarobj, base_name, volume_number): - """ called from tarobj when creating a new volume """ - tarobj.fileobj.close() - volume_path = "%s.%d" % (base_name, volume_number) - tarobj.open_volume(volume_path) - - -def size_str(size): - """ return string 'N (= b BLKS + m)' """ - return '{} (= {} BLKs + {})'.format(size, *divmod(size, BLOCKSIZE)) - - -def do_test(temp_dir, file_blocks, volume_blocks_arg, offset_blocks, - file_size_offsets=(0, 0), volume_size_offset=0, - print_everything=False): - """ create TarFile with given configuration """ - - # use delayed + conditional print dprnt, that only prints if necessary - # (i.e. if something went wrong) - output = [] - dprnt = output.append - - dprnt('-' * 72) - dprnt('testing with {} file blocks, expect {} volume blocks' - .format(file_blocks, volume_blocks_arg)) - dprnt('expect offsets to be {} blocks'.format(offset_blocks)) - dprnt('using offsets v:{}, files:{}' - .format(volume_size_offset, file_size_offsets)) - - if not (0 <= volume_size_offset < BLOCKSIZE): - raise ValueError('volume size offset outside allowed interval ' - '[0, BLOCKSIZE-1]: {}'.format(volume_size_offset)) - for idx, size_offset in enumerate(file_size_offsets): - if not (0 <= size_offset < BLOCKSIZE): - raise ValueError('size offset for file {} outside allowed interval ' - '[0, BLOCKSIZE-1]: {}' - .format(idx, size_offset)) - if len(file_blocks) != len(file_size_offsets): - raise ValueError('need same number of file block sizes and offsets!') - - max_volume_size = MAX_VOLUME_BLOCKS * BLOCKSIZE + volume_size_offset - actual_sizes = [] - actual_offsets = [] - volume_files = [] - tar_handle = None - temp_file_names = [] - with NamedTemporaryFile(dir=temp_dir, - suffix='.tar', - mode='wb', - delete=False) as tar_handle: - # create TarFile - dprnt('creating tar {} with max volume size {}' - .format(tar_handle.name, size_str(max_volume_size))) - tarobj = TarFile.open(mode='w:', - fileobj=tar_handle, - max_volume_size=max_volume_size, - new_volume_handler=new_volume_handler) - - # add files, remember offsets - for idx, (size_blocks, size_offset) \ - in enumerate(zip(file_blocks, file_size_offsets)): - - # remember offset - actual_offsets.append(tarobj.offset) - - # create file - add_handle = None - file_size = size_blocks * BLOCKSIZE - size_offset - with NamedTemporaryFile(dir=temp_dir, delete=False) as add_handle: - fill_file(add_handle, file_size) - add_handle.close() - temp_file_names.append(add_handle.name) - dprnt('adding file of size {} at offset {}' - .format(size_str(file_size), size_str(tarobj.offset))) - - # add file - tarobj.add(add_handle.name, arcname='file{}'.format(idx)) - - # remove file - if add_handle: - os.unlink(add_handle.name) - - # remember offset where 0s should be added - actual_offsets.append(tarobj.offset) - - # close tar file - dprnt('before close: offset is ' + size_str(actual_offsets[-1])) - tarobj.close() - tar_handle.close() - dprnt('after close: offset is ' + size_str(tarobj.offset)) - # done creating tar - - # get volume file sizes - volume_files = sorted(glob(tar_handle.name + "*")) - for volume_file in volume_files: - actual_size = os.stat(volume_file).st_size - dprnt('found volume {} of size {}' - .format(volume_file, size_str(actual_size))) - actual_sizes.append(actual_size) - - # extract - dprnt('extracting tar') - tarobj = TarFile.open(tar_handle.name, mode='r:', - max_volume_size=max_volume_size, - new_volume_handler=new_volume_handler) - tarobj.extractall(path=temp_dir) - tarobj.close() - - # remove tar volumes - for volume_file in volume_files: - os.unlink(volume_file) - - # check expectation - everything_ok = True - if len(actual_offsets) != len(offset_blocks): - everything_ok = False - dprnt('have {} actual offsets but expected {}!' - .format(len(actual_offsets), len(offset_blocks))) - for idx, (actual_offset, expected_block) \ - in enumerate(zip(actual_offsets, offset_blocks)): - if actual_offset != expected_block * BLOCKSIZE: - everything_ok = False - dprnt('wrong offset for file {}: {} != {}' - .format(idx, size_str(actual_offset), - size_str(expected_block*BLOCKSIZE))) - - # last volume is filled up to RECORDSIZE - volume_blocks = list(volume_blocks_arg) - if (len(actual_sizes) == len(volume_blocks)-1) \ - and (volume_blocks[-1] == 0): - actual_sizes.append(0) - volume_blocks[-2] = ceil(volume_blocks[-2] / N_BLOCKS_PER_RECORD) \ - * N_BLOCKS_PER_RECORD - elif len(actual_sizes) == len(volume_blocks): - volume_blocks[-1] = ceil(volume_blocks[-1] / N_BLOCKS_PER_RECORD) \ - * N_BLOCKS_PER_RECORD - else: - everything_ok = False - dprnt('wrong number of volumes: {} != {}' - .format(len(actual_sizes), len(volume_blocks))) - - for idx, (actual_size, expected_blocks) in \ - enumerate(zip(actual_sizes, volume_blocks)): - if actual_size != expected_blocks * BLOCKSIZE: - everything_ok = False - dprnt('wrong size for volume {}: {} != {}' - .format(idx, size_str(actual_size), - size_str(expected_blocks * BLOCKSIZE))) - - # check extracted files, compare size and contents - for idx, (size_blocks, size_offset) \ - in enumerate(zip(file_blocks, file_size_offsets)): - file_name = pjoin(temp_dir, 'file{}'.format(idx)) - stat_result = os.stat(file_name) - if not S_ISREG(stat_result.st_mode): - everything_ok = False - dprnt('Missing {} after extraction!'.format(file_name)) - if stat_result.st_size != size_blocks * BLOCKSIZE - size_offset: - everything_ok = False - dprnt('extracted {} has wrong size: {} != {} !' - .format(file_name, size_str(stat_result.st_size), - size_str(size_blocks * BLOCKSIZE - size_offset))) - fill_ok, message = check_file_fill(file_name, stat_result.st_size) - output.append(message) - if not fill_ok: - everything_ok = False - dprnt('extracted {} has wrong contents!'.format(file_name)) - - # print output only if something went wrong - if (not everything_ok) or print_everything: - for line in output: - print(line) - - return everything_ok - - -def run_all_tests(fast_fail=True, print_everything=False): - """ run test with lots of parameter combinations, will take quite a while - - see module doc for more info - """ - - # abbreviations for shorter lists - N = N_BLOCKS_PER_RECORD - M = MAX_VOLUME_BLOCKS - B = BLOCKSIZE - - # define tests by numbers of blocks: - # n_blocks file 0, 1; n_blocks vol0, 1, offset Info0, Info1, 0-blocks - tests = (((N-5, 1), (N, 0), (0, N-4, N-2)), # test case 0 - ((M-5, 1), (M, 0), (0, M-4, M-2)), # test case 1 - ((M-4, 1), (M+1, 0), (0, M-3, M-1)), # test case 2 - ((M-3, 1), (M+2, 0), (0, M-2, M)), # test case 3 - ((M-2, 1), (M-2, 6), (0, 2, 4)), # test case 4 - ((M-5, 2), (M-2, 4), (0, M-4, 2)), # test case 5 - ((M-4, 2), (M-1, 4), (0, M-3, 2))) # test case 6 - - # offsets for file and volume sizes in tests: - size_offsets = (0, 1, 2, 5, 10, 22, 46, 100) + \ - (B-1, B-2, B-5, B-10, B-22, B-46, B-100)[::-1] - - n_errs = 0 - n_tests = 0 - n_tests_overall = len(tests) * len(size_offsets)**3 - with TemporaryDirectory(prefix='deltatar_test_') as temp_dir: - for size_comb_idx, (file_sizes, vol_sizes, offsets) \ - in enumerate(tests): - if fast_fail and n_errs > 0: - break - print('=' * 72) - print('size combination {}: ({}, {})' - .format(size_comb_idx, *file_sizes)) - for volume_size_offset in size_offsets: - if fast_fail and n_errs > 0: - break - print('test {:5d} / {:5d}, volume offset = {:3d}' - .format(n_tests, n_tests_overall, - volume_size_offset)) - for file_size_offset0 in size_offsets: - if fast_fail and n_errs > 0: - break - for file_size_offset1 in size_offsets: - if fast_fail and n_errs > 0: - break - n_tests += 1 - test_succeeded = \ - do_test(temp_dir, file_sizes, vol_sizes, offsets, - file_size_offsets=(file_size_offset0, - file_size_offset1), - volume_size_offset=volume_size_offset, - print_everything=print_everything) - if not test_succeeded: - n_errs += 1 - if fast_fail: - print('stopping after test {} (fast-fail set)' - .format(n_tests)) - return n_errs - - -class VolumeSizeAccuracyTest(TestCase): - """ unittest that runs only a few test parameter combinations """ - - # base variables (test case 5) - file_sizes = MAX_VOLUME_BLOCKS-5, 2 - vol_sizes = MAX_VOLUME_BLOCKS-2, 4 - expect_offsets = 0, MAX_VOLUME_BLOCKS-4, 2 - file_size_offsets = 0, 0 - volume_size_offset = 0 - - def do_test(self, file_sizes, vol_sizes, expect_offsets, file_size_offsets, - volume_size_offset): - """ create temp dir, run :py:func:`do_test` with given params """ - with TemporaryDirectory(prefix='deltatar_test_') as temp_dir: - return do_test(temp_dir, file_sizes, vol_sizes, expect_offsets, - file_size_offsets, volume_size_offset) - - def test_base(self): - """ no offsets, easy-peasy """ - self.assertTrue(self.do_test(\ - self.file_sizes, self.vol_sizes, self.expect_offsets, - self.file_size_offsets, self.volume_size_offset)) - - def test_volume_offset(self): - """ only a volume size offset """ - volume_size_offset = 1 - self.assertTrue(self.do_test(\ - self.file_sizes, self.vol_sizes, self.expect_offsets, - self.file_size_offsets, volume_size_offset)) - - volume_size_offset = BLOCKSIZE-1 - self.assertTrue(self.do_test(\ - self.file_sizes, self.vol_sizes, self.expect_offsets, - self.file_size_offsets, volume_size_offset)) - - def test_file0_offset(self): - """ only a offset for size of file0 """ - file0_offset = 1 - self.assertTrue(self.do_test(\ - self.file_sizes, self.vol_sizes, self.expect_offsets, - (file0_offset, self.file_size_offsets[1]), - self.volume_size_offset)) - - file0_offset = BLOCKSIZE-1 - self.assertTrue(self.do_test(\ - self.file_sizes, self.vol_sizes, self.expect_offsets, - (file0_offset, self.file_size_offsets[1]), - self.volume_size_offset)) - - def test_file1_offset(self): - """ only a offset for size of file1 """ - file1_offset = 1 - self.assertTrue(self.do_test(\ - self.file_sizes, self.vol_sizes, self.expect_offsets, - (self.file_size_offsets[0], file1_offset), - self.volume_size_offset)) - - file1_offset = BLOCKSIZE-1 - self.assertTrue(self.do_test(\ - self.file_sizes, self.vol_sizes, self.expect_offsets, - (self.file_size_offsets[0], file1_offset), - self.volume_size_offset)) - - def test_multi_offsets(self): - """ test 2 combinations of all offsets """ - file_size_offsets = (1, 1) - volume_size_offset = BLOCKSIZE-1 - self.assertTrue(self.do_test(\ - self.file_sizes, self.vol_sizes, self.expect_offsets, - file_size_offsets, volume_size_offset)) - - file_size_offsets = (BLOCKSIZE-1, BLOCKSIZE-1) - volume_size_offset = 1 - self.assertTrue(self.do_test(\ - self.file_sizes, self.vol_sizes, self.expect_offsets, - file_size_offsets, volume_size_offset)) - - def test_single_volume(self): - """ smaller files --> only a single volume (test case 1) """ - file_sizes = MAX_VOLUME_BLOCKS-5, 1 - vol_sizes = MAX_VOLUME_BLOCKS, 0 - expect_offsets = 0, MAX_VOLUME_BLOCKS-4, MAX_VOLUME_BLOCKS-2 - self.assertTrue(self.do_test(\ - file_sizes, vol_sizes, expect_offsets, - self.file_size_offsets, self.volume_size_offset)) - - def test_single_volume_offset(self): - """ single volume + offsets """ - file_sizes = MAX_VOLUME_BLOCKS-5, 1 - vol_sizes = MAX_VOLUME_BLOCKS, 0 - expect_offsets = 0, MAX_VOLUME_BLOCKS-4, MAX_VOLUME_BLOCKS-2 - file_size_offsets = (BLOCKSIZE-1, BLOCKSIZE-1) - volume_size_offset = 1 - self.assertTrue(self.do_test(\ - file_sizes, vol_sizes, expect_offsets, - file_size_offsets, volume_size_offset)) - - -if __name__ == '__main__': - sys.exit(run_all_tests()) -- 1.7.1