# Copyright (C) 2013 Intra2net AG # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see # import os, unittest, hashlib, string from deltatar.tarfile import TarFile, PAX_FORMAT, GNU_FORMAT, BLOCKSIZE from . import BaseTest import rescue_tar def new_volume_handler(tarobj, base_name, volume_number): ''' Handles the new volumes ''' volume_path = "%s.%d" % (base_name, volume_number) tarobj.open_volume(volume_path) class RescueTarTest(BaseTest): def test_rescue_ok(self): ''' Test rescue_tar when no file is broken, without using multivol tars. ''' # create sample data hash = dict() hash["big"] = self.create_file("big", 50000) hash["big2"] = self.create_file("big2", 10200) hash["small"] = self.create_file("small", 100) hash["small2"] = self.create_file("small2", 354) # create the tar file with volumes tarobj = TarFile.open("sample.tar.gz", mode="w#gz") tarobj.add("big") tarobj.add("big2") tarobj.add("small") tarobj.add("small2") tarobj.close() assert os.path.exists("sample.tar.gz") os.unlink("big") os.unlink("big2") os.unlink("small") os.unlink("small2") # extract rescue_tar.rescue("sample.tar.gz") # check output for key, value in hash.items(): assert os.path.exists(key) assert value == self.md5sum(key) def test_rescue_broken(self): ''' Use rescue_tar utility to split the file in compressed tar blocks that individually decompressed and "untarred", thanks to be using the concat gzip tar format. In this case, we simulate that one of the files is corrupted. The rest will decompress just fine. ''' # create sample data hash = dict() hash["big"] = self.create_file("big", 50000) hash["big2"] = self.create_file("big2", 10200) hash["small"] = self.create_file("small", 100) hash["small2"] = self.create_file("small2", 354) # create the tar file with volumes tarobj = TarFile.open("sample.tar.gz", mode="w#gz") tarobj.add("big") tarobj.add("big2") tarobj.add("small") tarobj.add("small2") tarobj.close() assert os.path.exists("sample.tar.gz") # overwrite stuff in the middle of the big file f = open('sample.tar.gz', 'r+b') f.seek(100) f.write(bytes("breaking things", "UTF-8")) f.close() os.unlink("big") os.unlink("big2") os.unlink("small") os.unlink("small2") # extract rescue_tar.rescue("sample.tar.gz") # check output for key, value in hash.items(): if key == "big": continue assert os.path.exists(key) assert value == self.md5sum(key)