# Copyright (C) 2013 Intra2net AG # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see # import os, unittest, hashlib, string import random from deltatar import crypto import sys BLOCK_SIZE = 8096 def new_volume_handler(tarobj, base_name, volume_number, encryption=None): ''' Handles the new volumes ''' volume_path = "%s.%d" % (base_name, volume_number) tarobj.open_volume(volume_path, encryption=encryption) def make_new_encryption_volume_handler(encryption): ''' Handles the new volumes using the right crypto context. ''' return lambda tarobj, base_name, volume_number: \ new_volume_handler (tarobj, base_name, volume_number, encryption=encryption) def closing_new_volume_handler(tarobj, base_name, volume_number): ''' Handles the new volumes ''' volume_path = "%s.%d" % (base_name, volume_number) tarobj.fileobj.close() tarobj.open_volume(volume_path) class BaseTest(unittest.TestCase): """ Test concatenated compression in tarfiles """ def tearDown(self): ''' Remove temporal files created by unit tests ''' os.system("rm -rf big big2 small small2 sample.* pdtcrypt-object-*.bin") def create_file_low_entropy(self, path, length): ''' Creates a file with some gibberish inside, returning the md5sum of that file. File path and length are specified as function arguments. ''' data = string.ascii_lowercase + string.digits + "\n" # determine how often need to repeat data and how much part of data is # left in the end to fill file up to length n_blocks, remainder = divmod(length, len(data)) with open(path, 'w') as write_handle: for _ in range(n_blocks): write_handle.write(data) write_handle.write(data[:remainder]) return self.md5sum(path) def create_file_high_entropy(self, path, length): """ Create a file with quality random content to counteract compression. """ fd = os.open (path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) try: p = 0 while p < length: todo = min (length - p, BLOCK_SIZE) os.write (fd, os.urandom (todo)) p += todo finally: os.close (fd) assert p == length return self.md5sum (path) def create_file(self, path, length, random=False): ''' Creates a file with some gibberish inside, returning the md5sum of that file. File path and length are specified as function arguments. ''' if random is True: return self.create_file_high_entropy (path, length) return self.create_file_low_entropy (path, length) def create_symlink(self, linkname, path): os.symlink(linkname, path) return self.md5sum(path, linkname=linkname) def md5sum(self, filename, linkname=None): ''' Returns the md5sum of a file specified by its filename/path or, if ``linkname`` is specified, the hash of both paths (for symlinks). ''' md5 = hashlib.md5() if linkname is None: with open(filename,'rb') as f: for chunk in iter(lambda: f.read(128*md5.block_size), b''): md5.update(chunk) else: # symlink; hash paths md5.update(filename.encode("UTF-8")) md5.update(linkname.encode("UTF-8")) return md5.hexdigest()