b2539b92493284f06384758a39276a4c6ae365a6
[python-delta-tar] / testing / test_concat_compress.py
1 # Copyright (C) 2013 Intra2net AG
2 #
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU Lesser General Public License as published
5 # by the Free Software Foundation; either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Lesser General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see
15 # <http://www.gnu.org/licenses/lgpl-3.0.html>
16
17
18 import os, unittest, hashlib, string
19
20 from deltatar.tarfile import TarFile, GNU_FORMAT, GZ_MAGIC_BYTES
21
22 import filesplit
23 from . import BaseTest, new_volume_handler
24
25 class ConcatCompressTest(BaseTest):
26     """
27     Test concatenated compression in tarfiles
28     """
29
30     def test_zcat_extract_concat(self):
31         """
32         Create a tar file with only one file inside, using concat compression
33         mode. Then decompress it with zcat and untar it with gnu tar.
34         """
35
36         # create the content of the file to compress and hash it
37         hash = self.create_file("big", 50000)
38
39         # create the tar file with volumes
40         tarobj = TarFile.open("sample.tar.gz",
41                               mode="w#gz",
42                               format=GNU_FORMAT)
43         tarobj.add("big")
44         tarobj.close()
45         os.unlink("big")
46
47         # extract with normal tar and check output
48         os.system("zcat sample.tar.gz > sample.tar")
49         os.system("tar xf sample.tar")
50         assert os.path.exists("big")
51         assert hash == self.md5sum("big")
52
53     def test_concat_extract(self):
54         '''
55         Create a tar file with only one file inside, using concat compression
56         mode, then decompress it with tarlib module too.
57         '''
58
59         # create the content of the file to compress and hash it
60         hash = self.create_file("big", 50000)
61
62         # create the tar file with volumes
63         tarobj = TarFile.open("sample.tar.gz",
64                               mode="w#gz")
65         tarobj.add("big")
66         tarobj.close()
67         os.unlink("big")
68
69         tarobj = TarFile.open("sample.tar.gz",
70                               mode="r#gz")
71         tarobj.extractall()
72         tarobj.close()
73         assert os.path.exists("big")
74         assert hash == self.md5sum("big")
75
76     def test_concat_extract_fileobj(self):
77         '''
78         Create a tar file with only one file inside, using concat compression
79         mode, then decompress it with tarlib module using the fileobj parameter.
80         '''
81
82         # create the content of the file to compress and hash it
83         hash = self.create_file("big", 50000)
84
85         # create the tar file with volumes
86         tarobj = TarFile.open("sample.tar.gz",
87                               mode="w#gz")
88         tarobj.add("big")
89         pos = tarobj.get_last_member_offset()
90         tarobj.close()
91         os.unlink("big")
92
93         fo = open("sample.tar.gz", 'rb') # will not be released on tarfile.close()
94         fo.seek(pos)
95         tarobj = TarFile.open(mode="r#gz", fileobj=fo)
96         tarobj.extract(tarobj.next())
97         tarobj.close()
98         fo.close()
99         assert os.path.exists("big")
100         assert hash == self.md5sum("big")
101
102     def test_concat_extract_one_fileobj(self):
103         '''
104         Create a tar file with multiple files inside, using concat compression
105         mode, then decompress it with tarlib module using the fileobj parameter.
106         '''
107
108         # create the content of the file to compress and hash it
109         hash = dict()
110         hash["big"] = self.create_file("big", 50000)
111         hash["small"] = self.create_file("small", 100)
112         hash["small2"] = self.create_file("small2", 354)
113
114         # create the tar file with volumes
115         tarobj = TarFile.open("sample.tar.gz",
116                               mode="w#gz")
117         tarobj.add("big")
118         tarobj.add("small")
119         pos = tarobj.get_last_member_offset()
120         tarobj.add("small2")
121         tarobj.close()
122
123         assert os.path.exists("sample.tar.gz")
124
125         os.unlink("big")
126         os.unlink("small")
127         os.unlink("small2")
128
129         # extract only the "small" file
130         fo = open("sample.tar.gz", 'rb') # will not be released on tarfile.close()
131         fo.seek(pos)
132         tarobj = TarFile.open(mode="r#gz", fileobj=fo)
133         tarobj.extract(tarobj.next())
134         tarobj.close()
135         fo.close()
136         assert os.path.exists("small")
137         assert hash['small'] == self.md5sum("small")
138
139         # we didn't extract the other files
140         assert not os.path.exists("big")
141         assert not os.path.exists("small2")
142
143     def test_concat_extract_one_fileobj_multivol(self):
144         '''
145         Create a tar file with multiple files inside and multiple volume,
146         using concat compression mode, then decompress a file spanning two
147         volumess with tarlib module using the fileobj parameter.
148         '''
149
150         # create the content of the file to compress and hash it
151         hash = dict()
152         hash["small"] = self.create_file("small", 100000)
153         hash["big"] = self.create_file("big", 1200000)
154
155         # create the tar file with volumes
156         tarobj = TarFile.open("sample.tar.gz",
157                               mode="w#gz",
158                               max_volume_size=1000000,
159                               new_volume_handler=new_volume_handler)
160         tarobj.add("small")
161         tarobj.add("big")
162         pos = tarobj.get_last_member_offset()
163         tarobj.close()
164
165         assert os.path.exists("sample.tar.gz")
166
167         os.unlink("big")
168         os.unlink("small")
169
170         def new_volume_handler_fo(tarobj, base_name, volume_number):
171             '''
172             Handles the new volumes, ignoring base_name as it'll be None because
173             we'll be using a seek fileobj.
174             '''
175             volume_path = "sample.tar.gz.%d" % volume_number
176             tarobj.open_volume(volume_path)
177
178         # extract only the "small" file
179         fo = open("sample.tar.gz", 'rb') # will not be released on tarfile.close()
180         fo.seek(pos)
181         tarobj = TarFile.open(mode="r#gz", fileobj=fo,
182                               new_volume_handler=new_volume_handler_fo)
183         tarobj.extract(tarobj.next())
184         tarobj.close()
185         fo.close()
186         assert os.path.exists("big")
187         assert hash['big'] == self.md5sum("big")
188
189         # we didn't extract the other files
190         assert not os.path.exists("small")
191
192     def test_multiple_files_zcat_extract(self):
193         '''
194         Create a tar file with only multiple files inside, using concat
195         compression mode, then decompress the tarfile.
196         '''
197
198         # create sample data
199         hash = dict()
200         hash["big"] = self.create_file("big", 50000)
201         hash["small"] = self.create_file("small", 100)
202         hash["small2"] = self.create_file("small2", 354)
203
204         # create the tar file with volumes
205         tarobj = TarFile.open("sample.tar.gz",
206                               mode="w#gz")
207         tarobj.add("big")
208         tarobj.add("small")
209         tarobj.add("small2")
210         tarobj.close()
211
212         assert os.path.exists("sample.tar.gz")
213
214         os.unlink("big")
215         os.unlink("small")
216         os.unlink("small2")
217
218         # extract and check output
219         os.system("zcat sample.tar.gz > sample.tar")
220         tarobj = TarFile.open("sample.tar",
221                               mode="r")
222         tarobj.extractall()
223         tarobj.close()
224
225         for key, value in hash.items():
226             assert os.path.exists(key)
227             assert value == self.md5sum(key)
228
229     def test_multiple_files_concat_extract(self):
230         '''
231         Create a tar file with only multiple files inside, using concat
232         compression mode, then decompress the tarfile.
233         '''
234
235         # create sample data
236         hash = dict()
237         hash["big"] = self.create_file("big", 50000)
238         hash["small"] = self.create_file("small", 100)
239         hash["small2"] = self.create_file("small2", 354)
240
241         # create the tar file with volumes
242         tarobj = TarFile.open("sample.tar.gz", mode="w#gz")
243         tarobj.add("big")
244         tarobj.add("small")
245         tarobj.add("small2")
246         tarobj.close()
247
248         assert os.path.exists("sample.tar.gz")
249
250         os.unlink("big")
251         os.unlink("small")
252         os.unlink("small2")
253
254         # extract and check output
255         tarobj = TarFile.open("sample.tar.gz",
256                               mode="r#gz")
257         tarobj.extractall()
258         tarobj.close()
259
260         for key, value in hash.items():
261             assert os.path.exists(key)
262             assert value == self.md5sum(key)
263
264     def test_multivol_gzip_concat_extract(self):
265         '''
266         Test multivol tarball with concat compression.
267         '''
268
269         # create sample data
270         hash = dict()
271         hash["big"] = self.create_file("big", 50000)
272         hash["big2"] = self.create_file("big2", 10200)
273         hash["small"] = self.create_file("small", 100)
274         hash["small2"] = self.create_file("small2", 354)
275
276         # create the tar file with volumes
277         tarobj = TarFile.open("sample.tar.gz",
278                               mode="w#gz",
279                               max_volume_size=20000,
280                               new_volume_handler=new_volume_handler)
281         tarobj.add("big")
282         tarobj.add("big2")
283         tarobj.add("small")
284         tarobj.add("small2")
285         tarobj.close()
286
287         assert os.path.exists("sample.tar.gz")
288         os.unlink("big")
289         os.unlink("big2")
290         os.unlink("small")
291         os.unlink("small2")
292
293         # extract
294         tarobj = TarFile.open("sample.tar.gz",
295                               mode="r#gz",
296                               new_volume_handler=new_volume_handler)
297         tarobj.extractall()
298         tarobj.close()
299
300         # check output
301         for key, value in hash.items():
302             assert os.path.exists(key)
303             assert value == self.md5sum(key)
304
305     def test_multiple_files_rescue_extract(self):
306         '''
307         Use filesplit utility to split the file in compressed tar blocks that
308         individually decompressed and "untarred", thanks to be using the
309         concat gzip tar format.
310         '''
311         # create sample data
312         hash = dict()
313         hash["big"] = self.create_file("big", 50000)
314         hash["small"] = self.create_file("small", 100)
315         hash["small2"] = self.create_file("small2", 354)
316
317         # create the tar file with volumes
318         tarobj = TarFile.open("sample.tar.gz", mode="w#gz")
319         tarobj.add("big")
320         tarobj.add("small")
321         tarobj.add("small2")
322         tarobj.close()
323
324         assert os.path.exists("sample.tar.gz")
325
326         os.unlink("big")
327         os.unlink("small")
328         os.unlink("small2")
329
330         filesplit.split_file(GZ_MAGIC_BYTES, "sample.tar.gz.", "sample.tar.gz")
331
332         assert os.path.exists("sample.tar.gz.0") # first file
333         assert os.path.exists("sample.tar.gz.1") # second file
334         assert os.path.exists("sample.tar.gz.2") # third file
335         assert not os.path.exists("sample.tar.gz.3") # nothing else
336
337         # extract and check output
338         for i in range(0, 3):
339             tarobj = TarFile.open("sample.tar.gz.%d" % i,
340                                 mode="r|gz")
341             tarobj.extractall()
342             tarobj.close()
343
344         for key, value in hash.items():
345             assert os.path.exists(key)
346             assert value == self.md5sum(key)
347
348     def test_multiple_files_rescue_extract_gnu(self):
349         '''
350         Use filesplit utility to split the file in compressed tar blocks that
351         individually decompressed and "untarred", thanks to be using the
352         concat gzip tar format. We do the extraction with standard gnu tar and
353         gzip command line commands.
354         '''
355
356         # create sample data
357         hash = dict()
358         hash["big"] = self.create_file("big", 50000)
359         hash["small"] = self.create_file("small", 100)
360         hash["small2"] = self.create_file("small2", 354)
361
362         # create the tar file with volumes
363         tarobj = TarFile.open("sample.tar.gz", mode="w#gz")
364         tarobj.add("big")
365         tarobj.add("small")
366         tarobj.add("small2")
367         tarobj.close()
368
369         assert os.path.exists("sample.tar.gz")
370
371         os.unlink("big")
372         os.unlink("small")
373         os.unlink("small2")
374
375         # extract using the command line this time
376         os.system("python3 filesplit.py -s $'\\x1f\\x8b' -p sample.tar.gz. sample.tar.gz")
377
378         assert os.path.exists("sample.tar.gz.0") # first file
379         assert os.path.exists("sample.tar.gz.1") # second file
380         assert os.path.exists("sample.tar.gz.2") # third file
381         assert not os.path.exists("sample.tar.gz.3") # nothing else
382
383         # extract and check output
384         for i in range(0, 3):
385             os.system("gzip -cd sample.tar.gz.%d > sample.%d.tar" % (i, i))
386             os.system("tar xf sample.%d.tar" % i)
387
388         for key, value in hash.items():
389             assert os.path.exists(key)
390             assert value == self.md5sum(key)
391
392     def test_multiple_files_rescue_extract_broken(self):
393         '''
394         Use filesplit utility to split the file in compressed tar blocks that
395         individually decompressed and "untarred", thanks to be using the
396         concat gzip tar format. In this case, we simulate that one of the files
397         is corrupted. The rest will decompress just fine.
398         '''
399
400         # create sample data
401         hash = dict()
402         hash["big"] = self.create_file("big", 50000)
403         hash["small"] = self.create_file("small", 100)
404         hash["small2"] = self.create_file("small2", 354)
405
406         # create the tar file with volumes
407         tarobj = TarFile.open("sample.tar.gz", mode="w#gz")
408         tarobj.add("big")
409         tarobj.add("small")
410         tarobj.add("small2")
411         tarobj.close()
412
413         assert os.path.exists("sample.tar.gz")
414
415         # overwrite stuff in the middle of the big file
416         f = open('sample.tar.gz', 'r+b')
417         f.seek(100)
418         f.write(bytes("breaking things", 'UTF-8'))
419         f.close()
420
421         os.unlink("big")
422         os.unlink("small")
423         os.unlink("small2")
424
425         # equivalent to $ python filesplit.py -s $'\x1f\x8b' -p sample.tar.gz. sample.tar.gz
426         filesplit.split_file(GZ_MAGIC_BYTES, "sample.tar.gz.", "sample.tar.gz")
427
428         assert os.path.exists("sample.tar.gz.0") # first file
429         assert os.path.exists("sample.tar.gz.1") # second file
430         assert os.path.exists("sample.tar.gz.2") # third file
431         assert not os.path.exists("sample.tar.gz.3") # nothing else
432
433         # extract and check output
434         for i in range(0, 3):
435             try:
436                 tarobj = TarFile.open("sample.tar.gz.%d" % i,
437                                     mode="r|gz")
438                 tarobj.extractall()
439                 tarobj.close()
440             except Exception as e:
441                 if i == 0: # big file doesn't extract well because it's corrupted
442                     pass
443                 else:
444                     raise Exception("Error extracting a tar.gz not related to the broken 'big' file")
445
446         for key, value in hash.items():
447             if key != "big":
448                 assert os.path.exists(key)
449                 assert value == self.md5sum(key)