#!/usr/bin/env python3 """ Test size of volumes when using multiple volumes and compression is on Uses random files from disc as input. That is not very time-efficient but provides a realistic setting for the nature of input data (file sizes, randomness of data, ...) Not a unittest, will probably take too long """ from tempfile import mkstemp, mkdtemp from shutil import rmtree import random import os from os.path import isdir, dirname, abspath, join as pjoin from glob import iglob import stat import sys from traceback import print_exc if __name__ == '__main__': # ensure we are importing the "right" deltatar parent_dir = dirname(dirname(abspath(__file__))) sys.path.insert(0, parent_dir) print('pre-prended {} to sys path'.format(parent_dir)) import deltatar from deltatar.tarfile import TarFile, BLOCKSIZE import deltatar.crypto as crypto #: tolerances of volume sizes KiB = 1024 MiB = KiB * KiB SIZE_TOLERANCE_GZ = 32*KiB # Gzip compression object buffer SIZE_TOLERANCE_BZ2 = MiB SIZE_TOLERANCE_XZ = 72*KiB SIZE_TOLERANCE_NONE = 3*BLOCKSIZE # should be little #: variables for find_random_files DIR_RETURN_MIN_REC = 5 DIR_RETURN_PROB = 0.0 # disabled DIR_MAX_REC = 20 START_DIR = '/' #: subdirs of START_DIR that might contain volatile or network-mounted data EXCLUDE_DIRS = 'var', 'proc', 'dev', 'tmp', 'media', 'mnt', 'sys' OK_MODES = stat.S_ISREG, stat.S_ISDIR, stat.S_ISFIFO, stat.S_ISLNK, \ stat.S_ISCHR, stat.S_ISBLK def _get_random_file(dir_name, rec_level): """ recursive helper for find_random_files """ if rec_level > DIR_MAX_REC: return None #print('_get_random_file in {}, level {}'.format(dir_name, rec_level)) try: contents = os.listdir(dir_name) except PermissionError: return None if not contents: return None entry = pjoin(dir_name, random.choice(contents)) if isdir(entry): if rec_level > DIR_RETURN_MIN_REC and \ random.random() < DIR_RETURN_PROB: return entry # with a small probability return a dir else: return _get_random_file(entry, rec_level + 1) else: return entry def find_random_files(min_file_size=100): """ generator over random file names Checks if files are readable by user (os.access) and excludes dirs with most volatile files and mounts; will still yield links or -- with a small probablility -- names of dirs with many parents (no '/usr' but maybe /usr/local/lib/python/site-packages/deltatar) :param int min_file_size: size (in bytes) that returned files have to have at least """ # prepare list of dirs in START_DIR that are not EXCLUDE_DIRS start_contents = [pjoin(START_DIR, dirn) for dirn in os.listdir(START_DIR)] for excl in EXCLUDE_DIRS: try: start_contents.remove(pjoin(START_DIR, excl)) except ValueError: pass # infinite main loop while True: #print('_get_random_file in {}, level {}'.format(START_DIR, 0)) entry = random.choice(start_contents) if isdir(entry): next_result = _get_random_file(entry, 1) else: next_result = entry #print('found non-dir in START_DIR: {}'.format(next_result)) if next_result is None: #print('received None, try next') continue if not os.access(next_result, os.R_OK): #print('cannot access {}, try next'.format(next_result)) continue statres = os.lstat(next_result) if statres.st_size < min_file_size: #print('file {} too small'.format(next_result)) continue mode = statres.st_mode if not any(mode_test(mode) for mode_test in OK_MODES): #print('mode not accepted for {}, try next'.format(next_result)) continue yield next_result def new_volume_handler(tarobj, base_name, volume_number, prefix='', debug_level=0): """ called when creating a new volume from TarFile.addfile """ if debug_level: print(prefix + 'new volume handler called with {} and new vol {}' .format(base_name, volume_number)) # close current volume file object tarobj.fileobj.close() # create name for next volume file idx = base_name.rindex('.0.') new_vol_path = '{}.{}.{}'.format(base_name[:idx], volume_number, base_name[idx+3:]) tarobj.open_volume(new_vol_path) def test(volume_size, input_size_factor, mode, password, temp_dir, prefix='', clean_up_if_error=False, debug_level=0): """ create TarFile with given vol_size, add vol_size*input_size :param volume_size: in MiB :param str prefix: optional output prefix :param str mode: compression mode for TarFile's mode argument :param bool clean_up_if_error: True will ensure there are no files left; False (default): leave volumes if error :param int debug_level: 0-3 where 0=no debug output, 3=lots of debug output (forwarded to TarFile constructor) :returns: True if test failed (some size wrong, file missing, ...) """ input_size = volume_size * input_size_factor * MiB something_strange = False if 'gz' in mode: suffix = 'tgz' size_tolerance = SIZE_TOLERANCE_GZ elif 'bz' in mode: suffix = 'tbz' size_tolerance = SIZE_TOLERANCE_BZ2 elif 'xz' in mode: suffix = 'txz' size_tolerance = SIZE_TOLERANCE_XZ else: suffix = 'tar' size_tolerance = SIZE_TOLERANCE_NONE temp_name = None file_handle = None base_name = None try: # create temp file file_handle, temp_name = mkstemp(dir=temp_dir, suffix='.0.' + suffix) os.close(file_handle) file_handle = None # preparations base_name = temp_name.replace('.0.' + suffix, '') if debug_level: print(prefix + 'tarfile: ' + temp_name) volume_prefix = prefix + 'vol={}MiB, in=*{}, mode={}: ' \ .format(volume_size, input_size_factor, mode) def vol_handler(a,b,c): return new_volume_handler(a,b,c, volume_prefix, debug_level) # create tar object encryptor = None if password is not None: encryptor = crypto.Encrypt (1, 1, password=password) tarobj = TarFile.open(temp_name, mode=mode, max_volume_size=volume_size*MiB, new_volume_handler=vol_handler, encryption=encryptor, debug=debug_level) # add data added_size = 0 new_size = 0 files_added = [] for count, file_name in enumerate(find_random_files()): if file_name.startswith(base_name): continue # do not accidentally add self new_size = os.lstat(file_name).st_size if new_size > max(volume_size*MiB, input_size-added_size): continue # add at most one volume_size too much new_name = '{}_{:04d}_{}_{:09d}' \ .format(base_name, count, file_name.replace('/','_')[:200], new_size) tarobj.add(file_name, arcname=new_name) files_added.append(new_name) added_size += new_size if debug_level > 2: print('{}vol={}MiB, in=*{}, mode={}: added {:.1f}MiB/{:.1f}MiB' .format(prefix, volume_size, input_size_factor, mode, added_size/MiB, input_size/MiB)) if added_size > input_size: break tarobj.close() # check volume files n_wrong_size = 0 n_volumes = 0 volume_size_sum = 0 for file_name in iglob(pjoin(temp_dir, base_name + '*')): n_volumes += 1 vol_size = os.lstat(file_name).st_size volume_size_sum += vol_size if debug_level: print('{} - {}: {:.3f}'.format(prefix, file_name, vol_size/MiB)) if abs(vol_size - volume_size*MiB) > size_tolerance: n_wrong_size += 1 if debug_level: print(prefix + 'compression ratio (input/compressed size): {:.2f}' .format(added_size/volume_size_sum)) if n_wrong_size > 1: print(prefix + 'wrong size!') something_strange = True if n_volumes == 0: print(prefix + 'no volumes!') something_strange = True # extract data if debug_level: print(prefix + 'extracting:') decryptor = None if password is not None: decryptor = crypto.Decrypt (password=password) tarobj = TarFile.open(temp_name, mode=mode.replace('w', 'r'), new_volume_handler=new_volume_handler, encryption=decryptor, debug=debug_level) tarobj.extractall(path='/') tarobj.close() # check whether all original files are accounted for n_files_found = 0 files_found = [False for _ in files_added] for file_name in iglob(pjoin(temp_dir, base_name + '_*')): n_files_found += 1 orig_size = int(file_name[-9:]) if os.lstat(file_name).st_size != orig_size: print(prefix + 'wrong size: {} instead of {} for {}!' .format(os.lstat(file_name).st_size, orig_size, file_name)) something_strange = True try: idx = files_added.index(file_name) except ValueError: print(prefix + 'extracted file that was not added: ' + file_name) something_strange = True else: files_found[idx] = True not_found = [file_name for file_name, found in zip(files_added, files_found) if not found] for file_name in not_found: print(prefix + 'original file not found: ' + file_name) something_strange = True if n_files_found != len(files_added): print(prefix + 'added {} files but extracted {}!' .format(len(files_added), n_files_found)) something_strange = True except Exception as exc: print('caught exception {}'.format(exc)) print_exc() something_strange = True finally: if file_handle: os.close(file_handle) # clean up if base_name: for file_name in iglob(base_name + '*'): if clean_up_if_error: os.unlink(file_name) elif something_strange and file_name.endswith('.' + suffix): continue # skip else: os.unlink(file_name) # remove if debug_level and something_strange and not clean_up_if_error: print(prefix + 'leaving volume files ' + base_name + '.*.'+suffix) # summarize if something_strange: print('{}test with volume_size={}, input_factor={}, mode={} failed!' .format(prefix, volume_size, input_size_factor, mode)) elif debug_level: print(prefix + 'test succeeded') return something_strange def test_lots(fast_fail=False, debug_level=0, clean_up_if_error=False): """ Tests a lot of combinations of volume_size, input_size and mode :param bool fast_fail: set to True to stop after first error :retuns: number of failed tests """ # volume sizes in MiB volume_sizes = 10, 100 # input size factor (multiplied with volume size) input_size_factors = 3, 10, 30 # compression modes (including uncompressed as comparison) modes = ('w|gz' , None) \ , ('w|bz2', None) \ , ('w|xz' , None) \ , ('w#gz' , None) \ , ('w#gz' , "test1234") \ , ('w#' , "test1234") # create a temp dir for all input and output data temp_dir = mkdtemp(prefix='deltatar_cmprs_tst_') n_errs = 0 n_tests = len(volume_sizes) * len(input_size_factors) * len(modes) test_idx = 0 stop_now = False for volume_size in volume_sizes: if stop_now: break for input_size_factor in input_size_factors: if stop_now: break for mode, password in modes: test_idx += 1 prefix = 'test{:d}: '.format(test_idx) something_strange = test(volume_size, input_size_factor, mode, password, temp_dir, prefix, clean_up_if_error=False, debug_level=debug_level) if something_strange: n_errs += 1 if fast_fail: stop_now = True break print('after running test {:3d}/{} have {} errs' .format(test_idx, n_tests, n_errs)) if n_errs == 0: print('removing temp dir {}'.format(temp_dir)) rmtree(temp_dir) else: print('leaving temp dir {}'.format(temp_dir)) return n_errs if __name__ == '__main__': # run test n_errs = test_lots() # forward number of errors to shell sys.exit(n_errs)