#!/usr/bin/env python3 # Copyright (C) 2014 Intra2net AG # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see # # Authors: Victor Ramirez de la Corte # Eduardo Robles Elvira import argparse import os import zlib import fnmatch import cProfile import io import binascii import pstats def main(args = None): ''' Main function, parses the command line arguments and launches the appropiate benchmark test. ''' parser = argparse.ArgumentParser(description='Profiling test options. ') parser.add_argument('-t', '--test', choices=['delta-tarfile', 'tarfile', 'zlib', 'zlib-blocks'], help='Select option for testing. ') parser.add_argument('-l', '--compression-level', type=int, default=9, choices=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], help='Select level of compression for zlib. ') parser.add_argument('-p', '--path', help='path to the file/dir to compress.') parser.add_argument('-P', '--profile', action='store_true', help='Enable profile') parser.add_argument('-m', '--tarmode', default="w:gz", help='Specify tarfile mode [w:gz]') parser.add_argument('-st', '--strategy', default='default', choices=['default', 'filtered', 'huffman_only'], help='Select the deflate strategy in zlib related ' 'tests.') parser.add_argument( '-s', '--sort', help='Sort output profile', choices=[ 'calls', 'cumulative', 'cumtime', 'file', 'path', 'module', 'ncalls', 'pcalls', 'line', 'name', 'nfl', 'stdname', 'time', 'tottime'], default="cumulative") if not args: pargs = parser.parse_args() else: pargs = parser.parse_args(args) compression_level = pargs.compression_level path = pargs.path # start profiling if pargs.profile: pr = cProfile.Profile() pr.enable() if pargs.test in ['delta-tarfile', 'tarfile']: test_tarfile(pargs.test, compression_level, path, pargs.tarmode) elif pargs.test == 'zlib': test_zlib(compression_level, path, pargs.strategy) elif pargs.test == 'zlib-blocks': test_zlib_blocks(compression_level, path, pargs.strategy) else: parser.print_help() # end profiling if pargs.profile: pr.disable() print_profile(pr, pargs.sort) def test_tarfile(res, compression_level=9, path='source_dir', tarmode="w:gz"): ''' Test that creates a tarfile called test_tarfile.tar.gz from a directory. ''' encryptor = None dstfile = "test_tarfile.tar.gz" if res == 'delta-tarfile': import sys # need to fix up the path before we can load a module explicitly dtarpath = os.getenv ("DELTATAR") if dtarpath is not None: sys.path.append (dtarpath) try: from deltatar import tarfile as TarFile from deltatar import crypto except ImportError: raise Exception ("location of deltatar not found; " "set $DELTATAR to provide an explicit path") if tarmode.endswith (".pdtcrypt"): dstfile = dstfile + ".pdtcrypt" tarmode = tarmode.rstrip (".pdtcrypt") encryptor = crypto.Encrypt (password="test1234", nacl=binascii.unhexlify(b"0011223344556677" b"8899aabbccddeeff"), version=1, paramversion=1) elif res == 'tarfile': import tarfile as TarFile tar = TarFile.open (dstfile, mode=tarmode, concat='#' in tarmode, encryption=encryptor, compresslevel=compression_level) tar.add(path) def test_zlib(compression_level=9, path='source_dir', strategy='default', BUFSIZE = 16 * 1024): ''' Test that compresses a directory with zlib ''' if strategy == 'default': strategy = zlib.Z_DEFAULT_STRATEGY elif strategy == 'filtered': strategy = zlib.Z_FILTERED else: strategy = zlib.Z_HUFFMAN_ONLY c = zlib.compressobj(compression_level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, strategy) def add(path): l = 0 l_orig = 0 if os.path.isdir(path): for f in os.listdir(path): l2, l_orig2 = add(os.path.join(path, f)) l += l2 l_orig += l_orig2 elif os.path.isfile(path): with open(path, 'rb') as fo: while True: buf = fo.read(BUFSIZE) l_orig += len(buf) if len(buf) == 0: return l, l_orig buf = c.compress(buf) if buf is not None: l += len(buf) return l, l_orig l, l_orig = add(path) l += len(c.flush()) print("total size: %d MiB compressed, %d MiB uncompressed" % ( (l / (1024 ** 2)), (l_orig / (1024 ** 2)))) def test_zlib_blocks(compression_level=9, path='source_dir', strategy='default', BUFSIZE = 16 * 1024): ''' Test that compresses a directory with zlib ''' if strategy == 'default': strategy = zlib.Z_DEFAULT_STRATEGY elif strategy == 'filtered': strategy = zlib.Z_FILTERED else: strategy = zlib.Z_HUFFMAN_ONLY def add(path): l = 0 l_orig = 0 if os.path.isdir(path): for f in os.listdir(path): l2, l_orig2 = add(os.path.join(path, f)) l += l2 l_orig += l_orig2 elif os.path.isfile(path): c = zlib.compressobj(compression_level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, strategy) with open(path, 'rb') as fo: while True: buf = fo.read(BUFSIZE) l_orig += len(buf) if len(buf) == 0: l += len(c.flush()) return l, l_orig buf = c.compress(buf) if buf is not None: l += len(buf) return l, l_orig l, l_orig = add(path) print("total size: %d MiB compressed, %d MiB uncompressed" % ( (l / (1024 ** 2)), (l_orig / (1024 ** 2)))) def print_profile(pr, sort): s = io.StringIO() if not sort: sort = 'cumulative' ps = pstats.Stats(pr, stream=s).sort_stats(sort) ps.print_stats(20) print(s.getvalue()) if __name__ == '__main__': main()