| 1 | # Copyright (C) 2013 Intra2net AG |
| 2 | # |
| 3 | # This program is free software; you can redistribute it and/or modify |
| 4 | # it under the terms of the GNU Lesser General Public License as published |
| 5 | # by the Free Software Foundation; either version 3 of the License, or |
| 6 | # (at your option) any later version. |
| 7 | # |
| 8 | # This program is distributed in the hope that it will be useful, |
| 9 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 10 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 11 | # GNU Lesser General Public License for more details. |
| 12 | # |
| 13 | # You should have received a copy of the GNU General Public License |
| 14 | # along with this program. If not, see |
| 15 | # <http://www.gnu.org/licenses/lgpl-3.0.html> |
| 16 | |
| 17 | |
| 18 | import os, unittest, hashlib, string |
| 19 | |
| 20 | from deltatar.tarfile import TarFile, PAX_FORMAT, GNU_FORMAT, BLOCKSIZE |
| 21 | from . import BaseTest |
| 22 | import rescue_tar |
| 23 | |
| 24 | def new_volume_handler(tarobj, base_name, volume_number): |
| 25 | ''' |
| 26 | Handles the new volumes |
| 27 | ''' |
| 28 | volume_path = "%s.%d" % (base_name, volume_number) |
| 29 | tarobj.open_volume(volume_path) |
| 30 | |
| 31 | class RescueTarTest(BaseTest): |
| 32 | def test_rescue_ok(self): |
| 33 | ''' |
| 34 | Test rescue_tar when no file is broken, without using multivol tars. |
| 35 | ''' |
| 36 | |
| 37 | # create sample data |
| 38 | hash = dict() |
| 39 | hash["big"] = self.create_file("big", 50000) |
| 40 | hash["big2"] = self.create_file("big2", 10200) |
| 41 | hash["small"] = self.create_file("small", 100) |
| 42 | hash["small2"] = self.create_file("small2", 354) |
| 43 | |
| 44 | # create the tar file with volumes |
| 45 | tarobj = TarFile.open("sample.tar.gz", |
| 46 | mode="w#gz") |
| 47 | tarobj.add("big") |
| 48 | tarobj.add("big2") |
| 49 | tarobj.add("small") |
| 50 | tarobj.add("small2") |
| 51 | tarobj.close() |
| 52 | |
| 53 | assert os.path.exists("sample.tar.gz") |
| 54 | os.unlink("big") |
| 55 | os.unlink("big2") |
| 56 | os.unlink("small") |
| 57 | os.unlink("small2") |
| 58 | |
| 59 | # extract |
| 60 | rescue_tar.rescue("sample.tar.gz") |
| 61 | |
| 62 | # check output |
| 63 | for key, value in hash.items(): |
| 64 | assert os.path.exists(key) |
| 65 | assert value == self.md5sum(key) |
| 66 | |
| 67 | def test_rescue_broken(self): |
| 68 | ''' |
| 69 | Use rescue_tar utility to split the file in compressed tar blocks that |
| 70 | individually decompressed and "untarred", thanks to be using the |
| 71 | concat gzip tar format. In this case, we simulate that one of the files |
| 72 | is corrupted. The rest will decompress just fine. |
| 73 | ''' |
| 74 | |
| 75 | # create sample data |
| 76 | hash = dict() |
| 77 | hash["big"] = self.create_file("big", 50000) |
| 78 | hash["big2"] = self.create_file("big2", 10200) |
| 79 | hash["small"] = self.create_file("small", 100) |
| 80 | hash["small2"] = self.create_file("small2", 354) |
| 81 | |
| 82 | # create the tar file with volumes |
| 83 | tarobj = TarFile.open("sample.tar.gz", |
| 84 | mode="w#gz") |
| 85 | tarobj.add("big") |
| 86 | tarobj.add("big2") |
| 87 | tarobj.add("small") |
| 88 | tarobj.add("small2") |
| 89 | tarobj.close() |
| 90 | |
| 91 | assert os.path.exists("sample.tar.gz") |
| 92 | |
| 93 | # overwrite stuff in the middle of the big file |
| 94 | f = open('sample.tar.gz', 'r+b') |
| 95 | f.seek(100) |
| 96 | f.write(bytes("breaking things", "UTF-8")) |
| 97 | f.close() |
| 98 | |
| 99 | os.unlink("big") |
| 100 | os.unlink("big2") |
| 101 | os.unlink("small") |
| 102 | os.unlink("small2") |
| 103 | |
| 104 | # extract |
| 105 | rescue_tar.rescue("sample.tar.gz") |
| 106 | |
| 107 | # check output |
| 108 | for key, value in hash.items(): |
| 109 | if key == "big": |
| 110 | continue |
| 111 | assert os.path.exists(key) |
| 112 | assert value == self.md5sum(key) |