# Copyright (C) 2013 Intra2net AG # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see # import os, unittest, hashlib, string from deltatar.tarfile import TarFile, GNU_FORMAT, GZ_MAGIC_BYTES import filesplit from . import BaseTest, new_volume_handler class ConcatCompressTest(BaseTest): """ Test concatenated compression in tarfiles """ def test_zcat_extract_concat(self): """ Create a tar file with only one file inside, using concat compression mode. Then decompress it with zcat and untar it with gnu tar. """ # create the content of the file to compress and hash it hash = self.create_file("big", 50000) # create the tar file with volumes tarobj = TarFile.open("sample.tar.gz", mode="w#gz", format=GNU_FORMAT) tarobj.add("big") tarobj.close() os.unlink("big") # extract with normal tar and check output os.system("zcat sample.tar.gz > sample.tar") os.system("tar xf sample.tar") assert os.path.exists("big") assert hash == self.md5sum("big") def test_concat_extract(self): ''' Create a tar file with only one file inside, using concat compression mode, then decompress it with tarlib module too. ''' # create the content of the file to compress and hash it hash = self.create_file("big", 50000) # create the tar file with volumes tarobj = TarFile.open("sample.tar.gz", mode="w#gz") tarobj.add("big") tarobj.close() os.unlink("big") tarobj = TarFile.open("sample.tar.gz", mode="r#gz") tarobj.extractall() tarobj.close() assert os.path.exists("big") assert hash == self.md5sum("big") def test_concat_extract_fileobj(self): ''' Create a tar file with only one file inside, using concat compression mode, then decompress it with tarlib module using the fileobj parameter. ''' # create the content of the file to compress and hash it hash = self.create_file("big", 50000) # create the tar file with volumes tarobj = TarFile.open("sample.tar.gz", mode="w#gz") tarobj.add("big") pos = tarobj.get_last_member_offset() tarobj.close() os.unlink("big") fo = open("sample.tar.gz", 'rb') # will not be released on tarfile.close() fo.seek(pos) tarobj = TarFile.open(mode="r#gz", fileobj=fo) tarobj.extract(tarobj.next()) tarobj.close() fo.close() assert os.path.exists("big") assert hash == self.md5sum("big") def test_concat_extract_one_fileobj(self): ''' Create a tar file with multiple files inside, using concat compression mode, then decompress it with tarlib module using the fileobj parameter. ''' # create the content of the file to compress and hash it hash = dict() hash["big"] = self.create_file("big", 50000) hash["small"] = self.create_file("small", 100) hash["small2"] = self.create_file("small2", 354) # create the tar file with volumes tarobj = TarFile.open("sample.tar.gz", mode="w#gz") tarobj.add("big") tarobj.add("small") pos = tarobj.get_last_member_offset() tarobj.add("small2") tarobj.close() assert os.path.exists("sample.tar.gz") os.unlink("big") os.unlink("small") os.unlink("small2") # extract only the "small" file fo = open("sample.tar.gz", 'rb') # will not be released on tarfile.close() fo.seek(pos) tarobj = TarFile.open(mode="r#gz", fileobj=fo) tarobj.extract(tarobj.next()) tarobj.close() fo.close() assert os.path.exists("small") assert hash['small'] == self.md5sum("small") # we didn't extract the other files assert not os.path.exists("big") assert not os.path.exists("small2") def test_concat_extract_one_fileobj_multivol(self): ''' Create a tar file with multiple files inside and multiple volume, using concat compression mode, then decompress a file spanning two volumess with tarlib module using the fileobj parameter. ''' # create the content of the file to compress and hash it hash = dict() hash["small"] = self.create_file("small", 100000) hash["big"] = self.create_file("big", 1200000) # create the tar file with volumes tarobj = TarFile.open("sample.tar.gz", mode="w#gz", max_volume_size=1000000, new_volume_handler=new_volume_handler) tarobj.add("small") tarobj.add("big") pos = tarobj.get_last_member_offset() tarobj.close() assert os.path.exists("sample.tar.gz") os.unlink("big") os.unlink("small") def new_volume_handler_fo(tarobj, base_name, volume_number): ''' Handles the new volumes, ignoring base_name as it'll be None because we'll be using a seek fileobj. ''' volume_path = "sample.tar.gz.%d" % volume_number tarobj.open_volume(volume_path) # extract only the "small" file fo = open("sample.tar.gz", 'rb') # will not be released on tarfile.close() fo.seek(pos) tarobj = TarFile.open(mode="r#gz", fileobj=fo, new_volume_handler=new_volume_handler_fo) tarobj.extract(tarobj.next()) tarobj.close() fo.close() assert os.path.exists("big") assert hash['big'] == self.md5sum("big") # we didn't extract the other files assert not os.path.exists("small") def test_multiple_files_zcat_extract(self): ''' Create a tar file with only multiple files inside, using concat compression mode, then decompress the tarfile. ''' # create sample data hash = dict() hash["big"] = self.create_file("big", 50000) hash["small"] = self.create_file("small", 100) hash["small2"] = self.create_file("small2", 354) # create the tar file with volumes tarobj = TarFile.open("sample.tar.gz", mode="w#gz") tarobj.add("big") tarobj.add("small") tarobj.add("small2") tarobj.close() assert os.path.exists("sample.tar.gz") os.unlink("big") os.unlink("small") os.unlink("small2") # extract and check output os.system("zcat sample.tar.gz > sample.tar") tarobj = TarFile.open("sample.tar", mode="r") tarobj.extractall() tarobj.close() for key, value in hash.items(): assert os.path.exists(key) assert value == self.md5sum(key) def test_multiple_files_concat_extract(self): ''' Create a tar file with only multiple files inside, using concat compression mode, then decompress the tarfile. ''' # create sample data hash = dict() hash["big"] = self.create_file("big", 50000) hash["small"] = self.create_file("small", 100) hash["small2"] = self.create_file("small2", 354) # create the tar file with volumes tarobj = TarFile.open("sample.tar.gz", mode="w#gz") tarobj.add("big") tarobj.add("small") tarobj.add("small2") tarobj.close() assert os.path.exists("sample.tar.gz") os.unlink("big") os.unlink("small") os.unlink("small2") # extract and check output tarobj = TarFile.open("sample.tar.gz", mode="r#gz") tarobj.extractall() tarobj.close() for key, value in hash.items(): assert os.path.exists(key) assert value == self.md5sum(key) def test_multivol_gzip_concat_extract(self): ''' Test multivol tarball with concat compression. ''' # create sample data hash = dict() hash["big"] = self.create_file("big", 50000) hash["big2"] = self.create_file("big2", 10200) hash["small"] = self.create_file("small", 100) hash["small2"] = self.create_file("small2", 354) # create the tar file with volumes tarobj = TarFile.open("sample.tar.gz", mode="w#gz", max_volume_size=20000, new_volume_handler=new_volume_handler) tarobj.add("big") tarobj.add("big2") tarobj.add("small") tarobj.add("small2") tarobj.close() assert os.path.exists("sample.tar.gz") os.unlink("big") os.unlink("big2") os.unlink("small") os.unlink("small2") # extract tarobj = TarFile.open("sample.tar.gz", mode="r#gz", new_volume_handler=new_volume_handler) tarobj.extractall() tarobj.close() # check output for key, value in hash.items(): assert os.path.exists(key) assert value == self.md5sum(key) def test_multiple_files_rescue_extract(self): ''' Use filesplit utility to split the file in compressed tar blocks that individually decompressed and "untarred", thanks to be using the concat gzip tar format. ''' # create sample data hash = dict() hash["big"] = self.create_file("big", 50000) hash["small"] = self.create_file("small", 100) hash["small2"] = self.create_file("small2", 354) # create the tar file with volumes tarobj = TarFile.open("sample.tar.gz", mode="w#gz") tarobj.add("big") tarobj.add("small") tarobj.add("small2") tarobj.close() assert os.path.exists("sample.tar.gz") os.unlink("big") os.unlink("small") os.unlink("small2") filesplit.split_file(GZ_MAGIC_BYTES, "sample.tar.gz.", "sample.tar.gz") assert os.path.exists("sample.tar.gz.0") # first file assert os.path.exists("sample.tar.gz.1") # second file assert os.path.exists("sample.tar.gz.2") # third file assert not os.path.exists("sample.tar.gz.3") # nothing else # extract and check output for i in range(0, 3): tarobj = TarFile.open("sample.tar.gz.%d" % i, mode="r|gz") tarobj.extractall() tarobj.close() for key, value in hash.items(): assert os.path.exists(key) assert value == self.md5sum(key) def test_multiple_files_rescue_extract_gnu(self): ''' Use filesplit utility to split the file in compressed tar blocks that individually decompressed and "untarred", thanks to be using the concat gzip tar format. We do the extraction with standard gnu tar and gzip command line commands. ''' # create sample data hash = dict() hash["big"] = self.create_file("big", 50000) hash["small"] = self.create_file("small", 100) hash["small2"] = self.create_file("small2", 354) # create the tar file with volumes tarobj = TarFile.open("sample.tar.gz", mode="w#gz") tarobj.add("big") tarobj.add("small") tarobj.add("small2") tarobj.close() assert os.path.exists("sample.tar.gz") os.unlink("big") os.unlink("small") os.unlink("small2") # extract using the command line this time os.system("python3 filesplit.py -s $'\\x1f\\x8b' -p sample.tar.gz. sample.tar.gz") assert os.path.exists("sample.tar.gz.0") # first file assert os.path.exists("sample.tar.gz.1") # second file assert os.path.exists("sample.tar.gz.2") # third file assert not os.path.exists("sample.tar.gz.3") # nothing else # extract and check output for i in range(0, 3): os.system("gzip -cd sample.tar.gz.%d > sample.%d.tar" % (i, i)) os.system("tar xf sample.%d.tar" % i) for key, value in hash.items(): assert os.path.exists(key) assert value == self.md5sum(key) def test_multiple_files_rescue_extract_broken(self): ''' Use filesplit utility to split the file in compressed tar blocks that individually decompressed and "untarred", thanks to be using the concat gzip tar format. In this case, we simulate that one of the files is corrupted. The rest will decompress just fine. ''' # create sample data hash = dict() hash["big"] = self.create_file("big", 50000) hash["small"] = self.create_file("small", 100) hash["small2"] = self.create_file("small2", 354) # create the tar file with volumes tarobj = TarFile.open("sample.tar.gz", mode="w#gz") tarobj.add("big") tarobj.add("small") tarobj.add("small2") tarobj.close() assert os.path.exists("sample.tar.gz") # overwrite stuff in the middle of the big file f = open('sample.tar.gz', 'r+b') f.seek(100) f.write(bytes("breaking things", 'UTF-8')) f.close() os.unlink("big") os.unlink("small") os.unlink("small2") # equivalent to $ python filesplit.py -s $'\x1f\x8b' -p sample.tar.gz. sample.tar.gz filesplit.split_file(GZ_MAGIC_BYTES, "sample.tar.gz.", "sample.tar.gz") assert os.path.exists("sample.tar.gz.0") # first file assert os.path.exists("sample.tar.gz.1") # second file assert os.path.exists("sample.tar.gz.2") # third file assert not os.path.exists("sample.tar.gz.3") # nothing else # extract and check output for i in range(0, 3): try: tarobj = TarFile.open("sample.tar.gz.%d" % i, mode="r|gz") tarobj.extractall() tarobj.close() except Exception as e: if i == 0: # big file doesn't extract well because it's corrupted pass else: raise Exception("Error extracting a tar.gz not related to the broken 'big' file") for key, value in hash.items(): if key != "big": assert os.path.exists(key) assert value == self.md5sum(key)