extend fs access test to compress and crypto variant
[python-delta-tar] / testing / test_deltatar.py
1 # Copyright (C) 2013 Intra2net AG
2 #
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU Lesser General Public License as published
5 # by the Free Software Foundation; either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Lesser General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see
15 # <http://www.gnu.org/licenses/lgpl-3.0.html>
16
17 # Author: Eduardo Robles Elvira <edulix@wadobo.com>
18
19 import errno
20 import os
21 import re
22 import random
23 import shutil
24 import logging
25 import binascii
26 import json
27 from datetime import datetime
28 from functools import partial
29 from unittest import skip, SkipTest
30
31 import deltatar.tarfile as tarfile
32 from deltatar.tarfile import TarFile
33 from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
34 from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35 import deltatar.crypto as crypto
36
37 from . import BaseTest
38 from . import new_volume_handler
39
40 # Enable warning messages from deltatar. This minimizes the SNR of
41 # test runs, but none of the messages are meaningful in any way.
42 VERBOSE_TEST_OUTPUT = False
43
44 class DeltaTarTest(BaseTest):
45     """
46     Test backups
47     """
48     MODE = ''
49     MODE_COMPRESSES = False
50
51     ENCRYPTION = None  # (password : str, paramversion : int) option
52
53     GIT_DIR = '.git'
54
55     FSTEST = None
56     FSAPI_SAVED = []
57
58     def setUp(self):
59         '''
60         Create base test data
61         '''
62         self.pwd = os.getcwd()
63         os.system('rm -rf target_dir source_dir* backup_dir* huge')
64         os.makedirs('source_dir/test/test2')
65         self.hash = dict()
66         self.hash["source_dir/test/test2"] = ''
67         self.hash["source_dir/big"]  = self.create_file("source_dir/big", 50000)
68         self.hash["source_dir/small"]  = self.create_file("source_dir/small", 100)
69         self.hash["source_dir/test/huge"]  = self.create_file("source_dir/test/huge", 700000)
70         self.hash["source_dir/test/huge2"]  = self.create_file("source_dir/test/huge2", 800000)
71
72         self.consoleLogger = None
73         if VERBOSE_TEST_OUTPUT is True:
74             self.consoleLogger = logging.StreamHandler()
75             self.consoleLogger.setLevel(logging.DEBUG)
76
77         if not os.path.isdir(self.GIT_DIR):
78             # Not running inside git tree, take our
79             # own testing directory as source.
80             self.GIT_DIR = 'testing'
81
82             if not os.path.isdir(self.GIT_DIR):
83                 raise Exception('No input directory found: ' + self.GIT_DIR)
84
85         if self.FSTEST is not None:
86             self.FSTEST ()
87
88     def tearDown(self):
89         '''
90         Remove temporary files created by unit tests and restore the API
91         functions in *os*.
92         '''
93         for att, val in self.FSAPI_SAVED:
94             setattr (os, att, val)
95         os.chdir(self.pwd)
96         os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
97         _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
98                   ("I am fully aware that this will void my warranty.")
99
100     def test_restore_simple_full_backup(self):
101         '''
102         Creates a full backup without any filtering and restores it.
103         '''
104         password, paramversion = self.ENCRYPTION or (None, None)
105         deltatar = DeltaTar(mode=self.MODE, password=password,
106                             crypto_paramversion=paramversion,
107                             logger=self.consoleLogger)
108
109         # create first backup
110         deltatar.create_full_backup(
111             source_path="source_dir",
112             backup_path="backup_dir")
113
114         assert os.path.exists("backup_dir")
115         shutil.rmtree("source_dir")
116
117         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
118         tar_path = os.path.join("backup_dir", tar_filename)
119
120         deltatar.restore_backup(target_path="source_dir",
121                                 backup_tar_path=tar_path)
122
123         for key, value in self.hash.items():
124             assert os.path.exists(key)
125             if value:
126                 assert value == self.md5sum(key)
127
128
129     def test_create_backup_max_file_length (self):
130         """
131         Creates a full backup including one file that exceeds the (purposely
132         lowered) upper bound on GCM encrypted objects. This will yield multiple
133         encrypted objects for one plaintext file.
134
135         Success is verified by splitting the archive at object boundaries and
136         counting the parts.
137         """
138         if self.MODE_COMPRESSES is True:
139             raise SkipTest ("GCM file length test not meaningful with compression.")
140         if self.ENCRYPTION is None:
141             raise SkipTest ("GCM file length applies only to encrypted backups.")
142
143         new_max = 20000 # cannot be less than tar block size
144         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
145                 ("I am fully aware that this will void my warranty.",
146                  new_max)
147
148         password, paramversion = self.ENCRYPTION
149         deltatar = DeltaTar (mode=self.MODE, password=password,
150                              crypto_paramversion=paramversion,
151                              logger=self.consoleLogger)
152
153         self.hash = dict ()
154         os.makedirs ("source_dir2")
155         for f, s in [("empty"          , 0)             # 1 tar objects
156                     ,("slightly_larger", new_max + 1)   # 2
157                     ,("twice"          , 2 * new_max)   # 3
158                     ]:
159             f = "source_dir2/%s" % f
160             self.hash [f] = self.create_file (f, s)
161
162         deltatar.create_full_backup \
163                 (source_path="source_dir2", backup_path="backup_dir")
164
165         assert os.path.exists ("backup_dir")
166         shutil.rmtree ("source_dir2")
167
168         backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
169         backup_path     = os.path.join("backup_dir", backup_filename)
170
171         # split the resulting archive into its constituents without
172         # decrypting
173         ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
174                         "-o backup_dir/split <\'%s\'" % backup_path)
175
176         assert os.path.exists ("backup_dir/split")
177
178         dents = os.listdir ("backup_dir/split")
179         assert len (dents) == 6
180
181
182     def test_restore_backup_max_file_length (self):
183         """
184         Creates a full backup including one file that exceeds the (purposely
185         lowered) upper bound on GCM encrypted objects. This will yield two
186         encrypted objects for one plaintext file.
187
188         Success is verified by splitting the archive at object boundaries and
189         counting the parts.
190         """
191         if self.MODE_COMPRESSES is True:
192             raise SkipTest ("GCM file length test not meaningful with compression.")
193         if self.ENCRYPTION is None:
194             raise SkipTest ("GCM file length applies only to encrypted backups.")
195
196         new_max = 20000 # cannot be less than tar block size
197         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
198                     ("I am fully aware that this will void my warranty.",
199                      new_max)
200
201         password, paramversion = self.ENCRYPTION
202         deltatar = DeltaTar (mode=self.MODE, password=password,
203                              crypto_paramversion=paramversion,
204                              logger=self.consoleLogger)
205
206         self.hash = dict ()
207         os.makedirs ("source_dir2")
208         for f, s in [("empty"          , 0)             # 1 tar objects
209                     ,("almost_large"   , new_max - 1)   # 2
210                     ,("large"          , new_max)       # 3
211                     ,("slightly_larger", new_max + 1)   # 4
212                     ,("twice"          , 2 * new_max)   # 5
213                     ,("twice_plus_one" , (2 * new_max) + 1)   # 6
214                     ]:
215             f = "source_dir2/%s" % f
216             self.hash [f] = self.create_file (f, s)
217
218         deltatar.create_full_backup \
219                 (source_path="source_dir2", backup_path="backup_dir")
220
221         assert os.path.exists ("backup_dir")
222         shutil.rmtree ("source_dir2")
223
224         backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
225         backup_path     = os.path.join("backup_dir", backup_filename)
226
227         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
228         tar_path = os.path.join("backup_dir", tar_filename)
229
230         deltatar.restore_backup(target_path="source_dir2",
231                                 backup_tar_path=tar_path)
232
233         for key, value in self.hash.items():
234             assert os.path.exists(key)
235             if value:
236                 assert value == self.md5sum(key)
237
238
239     def test_create_backup_index_max_file_length (self):
240         """
241         Creates a full backup with a too large index file for the upper bound
242         of the GCM encryption. Since the index file has a fixed IV file counter
243         of AES_GCM_IV_CNT_INDEX, we expect the crypto layer to abort.
244
245         60+ GB of (potentially compressed) index file should last for a while...
246         """
247         if self.MODE_COMPRESSES is True:
248             raise SkipTest ("GCM file length test not meaningful with compression.")
249         if self.ENCRYPTION is None:
250             raise SkipTest ("GCM file length applies only to encrypted backups.")
251
252         new_max = 5000
253         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
254                     ("I am fully aware that this will void my warranty.",
255                      new_max)
256
257         password, paramversion = self.ENCRYPTION
258         deltatar = DeltaTar (mode=self.MODE, password=password,
259                              crypto_paramversion=paramversion,
260                              logger=self.consoleLogger)
261
262         self.hash = dict ()
263         os.makedirs ("source_dir2")
264         for i in range (42):
265             f = "source_dir2/dummy_%rd" % i
266             self.hash [f] = self.create_file (f, i)
267
268         with self.assertRaises (crypto.InvalidFileCounter):
269             deltatar.create_full_backup \
270                     (source_path="source_dir2", backup_path="backup_dir")
271         shutil.rmtree ("source_dir2")
272
273
274     def test_check_index_checksum(self):
275         '''
276         Creates a full backup and checks the index' checksum of files
277         '''
278         password, paramversion = self.ENCRYPTION or (None, None)
279         deltatar = DeltaTar(mode=self.MODE, password=password,
280                             crypto_paramversion=paramversion,
281                             logger=self.consoleLogger)
282
283         # create first backup
284         deltatar.create_full_backup(
285             source_path="source_dir",
286             backup_path="backup_dir")
287
288
289         index_filename = deltatar.index_name_func(True)
290         index_path = os.path.join("backup_dir", index_filename)
291
292         f = open(index_path, 'rb')
293         crc = None
294         checked = False
295         began_list = False
296         while True:
297             l = f.readline()
298             if l == b'':
299                 break
300             if b'BEGIN-FILE-LIST' in l:
301                 crc = binascii.crc32(l) & 0xFFFFffff
302                 began_list = True
303             elif b'END-FILE-LIST' in l:
304                 crc = binascii.crc32(l, crc) & 0xffffffff
305
306                 # next line contains the crc
307                 data = json.loads(f.readline().decode("UTF-8"))
308                 assert data['type'] == 'file-list-checksum'
309                 assert data['checksum'] == crc
310                 checked = True
311                 break
312             elif began_list:
313                 crc = binascii.crc32(l, crc) & 0xffffffff
314         f.close()
315
316
317     def test_restore_multivol(self):
318         '''
319         Creates a full backup without any filtering with multiple volumes and
320         restore it.
321         '''
322         if ':gz' in self.MODE:
323             raise SkipTest('compression information is lost when creating '
324                            'multiple volumes with no Stream')
325
326         password, paramversion = self.ENCRYPTION or (None, None)
327         deltatar = DeltaTar(mode=self.MODE, password=password,
328                             crypto_paramversion=paramversion,
329                             logger=self.consoleLogger)
330
331         self.hash = dict()
332         os.makedirs('source_dir2')
333         self.hash["source_dir2/big"]  = self.create_file("source_dir2/big", 100000)
334         self.hash["source_dir2/huge"]  = self.create_file("source_dir2/huge", 1200000)
335
336         # create first backup
337         deltatar.create_full_backup(
338             source_path="source_dir2",
339             backup_path="backup_dir",
340             max_volume_size=1)
341
342         assert os.path.exists("backup_dir")
343         assert os.path.exists(os.path.join("backup_dir",
344             deltatar.volume_name_func("backup_dir", True, 0)))
345         if self.MODE_COMPRESSES:
346             n_vols = 1
347         else:
348             n_vols = 2
349         for i_vol in range(n_vols):
350             assert os.path.exists(os.path.join("backup_dir",
351                 deltatar.volume_name_func("backup_dir", True, i_vol)))
352         assert not os.path.exists(os.path.join("backup_dir",
353             deltatar.volume_name_func("backup_dir", True, n_vols)))
354
355         shutil.rmtree("source_dir2")
356
357         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
358         tar_path = os.path.join("backup_dir", tar_filename)
359
360         # this should automatically restore all volumes
361         deltatar.restore_backup(target_path="source_dir2",
362                                 backup_tar_path=tar_path)
363
364         for key, value in self.hash.items():
365             assert os.path.exists(key)
366             if value:
367                 assert value == self.md5sum(key)
368
369     def test_restore_multivol_split(self):
370         '''
371         Creates a full backup without any filtering with multiple volumes
372         with big files bigger than the max volume size and
373         restore it.
374         '''
375         if self.MODE.startswith(':') or self.MODE.startswith('|'):
376             raise SkipTest('this test only works for uncompressed '
377                            'or concat compressed modes')
378
379         password, paramversion = self.ENCRYPTION or (None, None)
380         deltatar = DeltaTar(mode=self.MODE, password=password,
381                             crypto_paramversion=paramversion,
382                             logger=self.consoleLogger)
383
384         self.hash = dict()
385         os.makedirs('source_dir2')
386         self.hash["source_dir2/big"]  = self.create_file("source_dir2/big", 3*1024*1024)
387         self.hash["source_dir2/huge"]  = self.create_file("source_dir2/huge", 4*1024*1024)
388         self.hash["source_dir2/huge2"]  = self.create_file("source_dir2/huge2", 4*1024*1024)
389
390         # create first backup
391         deltatar.create_full_backup(
392             source_path="source_dir2",
393             backup_path="backup_dir",
394             max_volume_size=2)
395
396         assert os.path.exists("backup_dir")
397         assert os.path.exists(os.path.join("backup_dir",
398             deltatar.volume_name_func("backup_dir", True, 0)))
399         if self.MODE_COMPRESSES:
400             n_vols = 1
401         else:
402             n_vols = 6
403         for i_vol in range(n_vols):
404             assert os.path.exists(os.path.join("backup_dir",
405                 deltatar.volume_name_func("backup_dir", True, i_vol)))
406         assert not os.path.exists(os.path.join("backup_dir",
407             deltatar.volume_name_func("backup_dir", True, n_vols)))
408
409         shutil.rmtree("source_dir2")
410
411         index_filename = deltatar.index_name_func(True)
412         index_path = os.path.join("backup_dir", index_filename)
413
414         deltatar.restore_backup(target_path="source_dir2",
415             backup_indexes_paths=[index_path])
416
417         for key, value in self.hash.items():
418             assert os.path.exists(key)
419             if value:
420                 assert value == self.md5sum(key)
421
422
423     def test_full_backup_index_extra_data(self):
424         '''
425         Tests that the index file for a full backup can store extra_data and
426         that this data can be retrieved.
427         '''
428         password, paramversion = self.ENCRYPTION or (None, None)
429         deltatar = DeltaTar(mode=self.MODE, password=password,
430                             crypto_paramversion=paramversion,
431                             logger=self.consoleLogger)
432
433         extra_data = dict(
434             hola="caracola",
435             otra_cosa=[1, "lista"],
436             y_otra=dict(bola=1.1)
437         )
438
439         deltatar.create_full_backup(
440             source_path="source_dir",
441             backup_path="backup_dir",
442             extra_data=extra_data)
443
444         index_filename = deltatar.index_name_func(is_full=True)
445         index_path = os.path.join("backup_dir", index_filename)
446
447         # iterate_index_path retrieves extra_data, and thus we can then compare
448         index_it = deltatar.iterate_index_path(index_path)
449         self.assertEqual(index_it.extra_data, extra_data)
450
451
452     def test_diff_backup_index_extra_data(self):
453         '''
454         Tests that the index file for a diff backup can store extra_data and
455         that this data can be retrieved.
456         '''
457         password, paramversion = self.ENCRYPTION or (None, None)
458         deltatar = DeltaTar(mode=self.MODE, password=password,
459                             crypto_paramversion=paramversion,
460                             logger=self.consoleLogger)
461
462         extra_data = dict(
463             hola="caracola",
464             otra_cosa=[1, "lista"],
465             y_otra=dict(bola=1.1)
466         )
467         # do first backup
468         deltatar.create_full_backup(
469             source_path="source_dir",
470             backup_path="backup_dir")
471
472
473         prev_index_filename = deltatar.index_name_func(is_full=True)
474         prev_index_path = os.path.join("backup_dir", prev_index_filename)
475
476         # create empty diff backup
477         deltatar.create_diff_backup("source_dir", "backup_dir2",
478                                     prev_index_path, extra_data=extra_data)
479
480         index_filename = deltatar.index_name_func(is_full=False)
481         index_path = os.path.join("backup_dir2", index_filename)
482
483         # iterate_index_path retrieves extra_data, and thus we can then compare
484         index_it = deltatar.iterate_index_path(index_path)
485         self.assertEqual(index_it.extra_data, extra_data)
486
487     def test_restore_multivol2(self):
488         '''
489         Creates a full backup without any filtering with multiple volumes and
490         restore it.
491         '''
492         password, paramversion = self.ENCRYPTION or (None, None)
493         deltatar = DeltaTar(mode=self.MODE, password=password,
494                             crypto_paramversion=paramversion,
495                             logger=self.consoleLogger)
496
497         shutil.copytree(self.GIT_DIR, "source_dir2")
498
499         # create first backup
500         deltatar.create_full_backup(
501             source_path="source_dir2",
502             backup_path="backup_dir",
503             max_volume_size=1)
504
505         assert os.path.exists("backup_dir")
506         assert os.path.exists(os.path.join("backup_dir",
507             deltatar.volume_name_func("backup_dir", True, 0)))
508
509         shutil.rmtree("source_dir2")
510
511         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
512         tar_path = os.path.join("backup_dir", tar_filename)
513
514         # this should automatically restore all volumes
515         deltatar.restore_backup(target_path="source_dir2",
516                                 backup_tar_path=tar_path)
517
518         self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
519
520     def test_restore_multivol_manual_from_index(self):
521         '''
522         Creates a full backup without any filtering with multiple volumes and
523         restore it.
524         '''
525         # this test only works for uncompressed or concat compressed modes
526         if self.MODE.startswith(':') or self.MODE.startswith('|'):
527             raise SkipTest('this test only works for uncompressed '
528                            'or concat compressed modes')
529
530         password, paramversion = self.ENCRYPTION or (None, None)
531         deltatar = DeltaTar(mode=self.MODE, password=password,
532                             crypto_paramversion=paramversion,
533                             logger=self.consoleLogger)
534
535         self.hash = dict()
536         os.makedirs('source_dir2')
537         self.hash["source_dir2/big"]  = self.create_file("source_dir2/big", 100000)
538         self.hash["source_dir2/huge"]  = self.create_file("source_dir2/huge", 1200000)
539
540         # create first backup
541         deltatar.create_full_backup(
542             source_path="source_dir2",
543             backup_path="backup_dir",
544             max_volume_size=1)
545
546         assert os.path.exists("backup_dir")
547         assert os.path.exists(os.path.join("backup_dir",
548             deltatar.volume_name_func("backup_dir", True, 0)))
549         if self.MODE_COMPRESSES:
550             n_vols = 1
551         else:
552             n_vols = 2
553         for i_vol in range(n_vols):
554             assert os.path.exists(os.path.join("backup_dir",
555                 deltatar.volume_name_func("backup_dir", True, i_vol)))
556         assert not os.path.exists(os.path.join("backup_dir",
557             deltatar.volume_name_func("backup_dir", True, n_vols)))
558
559         shutil.rmtree("source_dir2")
560
561         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
562         tar_path = os.path.join("backup_dir", tar_filename)
563
564         index_filename = deltatar.index_name_func(True)
565         index_path = os.path.join("backup_dir", index_filename)
566
567         # this should automatically restore the huge file
568         f = deltatar.open_auxiliary_file(index_path, 'r')
569         offset = None
570         while True:
571             l = f.readline()
572             if not len(l):
573                 break
574             data = json.loads(l.decode('UTF-8'))
575             if data.get('type', '') == 'file' and\
576                     deltatar.unprefixed(data['path']) == "huge":
577                 offset = data['offset']
578                 break
579
580         assert offset is not None
581
582         fo = open(tar_path, 'rb')
583         fo.seek(offset)
584         def new_volume_handler(mode, tarobj, base_name, volume_number):
585             suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
586             if self.ENCRYPTION is not None:
587                 # deltatar module is shadowed here
588                 suf += "." + deltatar_PDTCRYPT_EXTENSION
589             tarobj.open_volume(datetime.now().strftime(
590                 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
591         new_volume_handler = partial(new_volume_handler, self.MODE)
592
593         crypto_ctx = None
594         if self.ENCRYPTION is not None:
595             crypto_ctx = crypto.Decrypt (password)
596
597         tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
598                               new_volume_handler=new_volume_handler,
599                               encryption=crypto_ctx)
600
601         member = tarobj.next()
602         member.path = deltatar.unprefixed(member.path)
603         member.name = deltatar.unprefixed(member.name)
604         tarobj.extract(member)
605         tarobj.close()
606         fo.close()
607         assert self.hash['source_dir2/huge'] == self.md5sum('huge')
608
609         os.unlink("huge")
610
611
612     def test_restore_manual_from_index_twice (self):
613         """
614         Creates a full backup and restore the same file twice. This *must* fail
615         when encryption is active.
616
617         Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
618         backwards within the same file. This prevents the encryption layer from
619         exploding due to a reused IV in an overall valid archive.
620
621         This test anticipates possible future mistakes since it’s entirely
622         feasible to implement backward seeks for *_Stream* with concat mode.
623         """
624         # this test only works for uncompressed or concat compressed modes
625         if self.MODE.startswith("|") or self.MODE_COMPRESSES:
626             raise SkipTest("this test only works for uncompressed "
627                            "or concat compressed modes")
628
629         password, paramversion = self.ENCRYPTION or (None, None)
630         deltatar = DeltaTar(mode=self.MODE, password=password,
631                             crypto_paramversion=paramversion,
632                             logger=self.consoleLogger)
633
634         self.hash = dict()
635         os.makedirs("source_dir2")
636         self.hash["source_dir2/samefile"] = \
637             self.create_file("source_dir2/samefile", 1 * 1024)
638
639         # create first backup
640         deltatar.create_full_backup(
641             source_path="source_dir2",
642             backup_path="backup_dir")
643
644         assert os.path.exists("backup_dir")
645         assert os.path.exists(os.path.join("backup_dir",
646             deltatar.volume_name_func("backup_dir", True, 0)))
647
648         shutil.rmtree("source_dir2")
649
650         tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
651         tar_path = os.path.join("backup_dir", tar_filename)
652
653         index_filename = deltatar.index_name_func(True)
654         index_path = os.path.join("backup_dir", index_filename)
655
656         f = deltatar.open_auxiliary_file(index_path, "r")
657         offset = None
658         while True:
659             l = f.readline()
660             if not len(l):
661                 break
662             data = json.loads(l.decode("UTF-8"))
663             if data.get("type", "") == "file" and\
664                     deltatar.unprefixed(data["path"]) == "samefile":
665                 offset = data["offset"]
666                 break
667
668         assert offset is not None
669
670         fo = open(tar_path, "rb")
671         fo.seek(offset)
672
673         crypto_ctx = None
674         if self.ENCRYPTION is not None:
675             crypto_ctx = crypto.Decrypt (password)
676
677         tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
678                               encryption=crypto_ctx)
679         member = tarobj.next()
680         member.path = deltatar.unprefixed(member.path)
681         member.name = deltatar.unprefixed(member.name)
682
683         # extract once â€¦
684         tarobj.extract(member)
685         assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
686
687         # â€¦ and twice
688         try:
689             tarobj.extract(member)
690         except tarfile.StreamError:
691             if crypto_ctx is not None:
692                 pass # good: seeking backwards not allowed
693             else:
694                 raise
695         tarobj.close()
696         fo.close()
697         assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
698
699         os.unlink("samefile")
700
701
702     def test_restore_from_index(self):
703         '''
704         Restores a full backup using an index file.
705         '''
706         if self.MODE.startswith(':') or self.MODE.startswith('|'):
707             raise SkipTest('this test only works for uncompressed '
708                            'or concat compressed modes')
709
710         password, paramversion = self.ENCRYPTION or (None, None)
711         deltatar = DeltaTar(mode=self.MODE, password=password,
712                             crypto_paramversion=paramversion,
713                             logger=self.consoleLogger)
714
715         # create first backup
716         deltatar.create_full_backup(
717             source_path="source_dir",
718             backup_path="backup_dir")
719
720         shutil.rmtree("source_dir")
721
722         # this should automatically restore all volumes
723         index_filename = deltatar.index_name_func(True)
724         index_path = os.path.join("backup_dir", index_filename)
725
726         deltatar.restore_backup(target_path="source_dir",
727             backup_indexes_paths=[index_path])
728
729         for key, value in self.hash.items():
730             assert os.path.exists(key)
731             if value:
732                 assert value == self.md5sum(key)
733
734     def test_restore_multivol_from_index(self):
735         '''
736         Restores a full multivolume backup using an index file.
737         '''
738         if self.MODE.startswith(':') or self.MODE.startswith('|'):
739             raise SkipTest('this test only works for uncompressed '
740                            'or concat compressed modes')
741
742         password, paramversion = self.ENCRYPTION or (None, None)
743         deltatar = DeltaTar(mode=self.MODE, password=password,
744                             crypto_paramversion=paramversion,
745                             logger=self.consoleLogger)
746
747         # create first backup
748         deltatar.create_full_backup(
749             source_path="source_dir",
750             backup_path="backup_dir",
751             max_volume_size=2)
752
753         shutil.rmtree("source_dir")
754
755         # this should automatically restore all volumes
756         index_filename = deltatar.index_name_func(True)
757         index_path = os.path.join("backup_dir", index_filename)
758
759         deltatar.restore_backup(target_path="source_dir",
760             backup_indexes_paths=[index_path])
761
762         for key, value in self.hash.items():
763             assert os.path.exists(key)
764             if value:
765                 assert value == self.md5sum(key)
766
767     def test_create_basic_filtering(self):
768         '''
769         Tests create backup basic filtering.
770         '''
771         password, paramversion = self.ENCRYPTION or (None, None)
772         deltatar = DeltaTar(mode=self.MODE, password=password,
773                             crypto_paramversion=paramversion,
774                             logger=self.consoleLogger,
775                             included_files=["test", "small"],
776                             excluded_files=["test/huge"])
777
778         # create first backup
779         deltatar.create_full_backup(
780             source_path="source_dir",
781             backup_path="backup_dir")
782
783         assert os.path.exists("backup_dir")
784         shutil.rmtree("source_dir")
785
786         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
787         tar_path = os.path.join("backup_dir", tar_filename)
788
789         deltatar.restore_backup(target_path="source_dir",
790                                 backup_tar_path=tar_path)
791
792         assert os.path.exists("source_dir/small")
793         assert os.path.exists("source_dir/test")
794         assert os.path.exists("source_dir/test/huge2")
795         assert os.path.exists("source_dir/test/test2")
796
797         assert not os.path.exists("source_dir/test/huge")
798         assert not os.path.exists("source_dir/big")
799
800     def test_create_filter_func(self):
801         '''
802         Tests create backup basic filtering.
803         '''
804         visited_paths = []
805         def filter_func(visited_paths, path):
806             if path not in visited_paths:
807                 visited_paths.append(path)
808             return True
809
810         filter_func = partial(filter_func, visited_paths)
811
812         password, paramversion = self.ENCRYPTION or (None, None)
813         deltatar = DeltaTar(mode=self.MODE, password=password,
814                             crypto_paramversion=paramversion,
815                             logger=self.consoleLogger,
816                             included_files=["test", "small"],
817                             excluded_files=["test/huge"],
818                             filter_func=filter_func)
819
820         # create first backup
821         deltatar.create_full_backup(
822             source_path="source_dir",
823             backup_path="backup_dir")
824
825         assert os.path.exists("backup_dir")
826         shutil.rmtree("source_dir")
827
828         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
829         tar_path = os.path.join("backup_dir", tar_filename)
830
831         deltatar.restore_backup(target_path="source_dir",
832                                 backup_tar_path=tar_path)
833         assert set(visited_paths) == set([
834                 'small',
835                 'test',
836                 'test/huge2',
837                 'test/test2'
838             ])
839
840     def test_create_filter_out_func(self):
841         '''
842         Tests create backup basic filtering.
843         '''
844         visited_paths = []
845         def filter_func(visited_paths, path):
846             '''
847             Filter out everything
848             '''
849             if path not in visited_paths:
850                 visited_paths.append(path)
851             return False
852
853         filter_func = partial(filter_func, visited_paths)
854
855         password, paramversion = self.ENCRYPTION or (None, None)
856         deltatar = DeltaTar(mode=self.MODE, password=password,
857                             crypto_paramversion=paramversion,
858                             logger=self.consoleLogger,
859                             included_files=["test", "small"],
860                             excluded_files=["test/huge"],
861                             filter_func=filter_func)
862
863         # create first backup
864         deltatar.create_full_backup(
865             source_path="source_dir",
866             backup_path="backup_dir")
867
868         assert os.path.exists("backup_dir")
869         shutil.rmtree("source_dir")
870
871         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
872         tar_path = os.path.join("backup_dir", tar_filename)
873
874         deltatar.restore_backup(target_path="source_dir",
875                                 backup_tar_path=tar_path)
876         assert set(visited_paths) == set([
877                 'small',
878                 'test'
879             ])
880
881         # check that effectively no file was backed up
882         assert not os.path.exists("source_dir/small")
883         assert not os.path.exists("source_dir/big")
884         assert not os.path.exists("source_dir/test")
885
886     def test_restore_index_basic_filtering(self):
887         '''
888         Creates a backup, and then filter when doing the index based restore.
889         '''
890         if self.MODE.startswith(':') or self.MODE.startswith('|'):
891             raise SkipTest('this test only works for uncompressed '
892                            'or concat compressed modes')
893
894         password, paramversion = self.ENCRYPTION or (None, None)
895         deltatar = DeltaTar(mode=self.MODE, password=password,
896                             crypto_paramversion=paramversion,
897                             logger=self.consoleLogger)
898
899         # create first backup
900         deltatar.create_full_backup(
901             source_path="source_dir",
902             backup_path="backup_dir")
903
904         assert os.path.exists("backup_dir")
905         shutil.rmtree("source_dir")
906
907         index_filename = deltatar.index_name_func(True)
908         index_path = os.path.join("backup_dir", index_filename)
909
910         deltatar.included_files = ["test", "small"]
911         deltatar.excluded_files = ["test/huge"]
912         deltatar.restore_backup(target_path="source_dir",
913             backup_indexes_paths=[index_path])
914
915         assert os.path.exists("source_dir/small")
916         assert os.path.exists("source_dir/test")
917         assert os.path.exists("source_dir/test/huge2")
918         assert os.path.exists("source_dir/test/test2")
919
920         assert not os.path.exists("source_dir/test/huge")
921         assert not os.path.exists("source_dir/big")
922
923     def test_restore_index_filter_func(self):
924         '''
925         Creates a backup, and then filter when doing the index based restore,
926         using the filter function.
927         '''
928         if self.MODE.startswith(':') or self.MODE.startswith('|'):
929             raise SkipTest('this test only works for uncompressed '
930                            'or concat compressed modes')
931
932         visited_paths = []
933         def filter_func(visited_paths, path):
934             if path not in visited_paths:
935                 visited_paths.append(path)
936             return True
937
938         filter_func = partial(filter_func, visited_paths)
939
940         password, paramversion = self.ENCRYPTION or (None, None)
941         deltatar = DeltaTar(mode=self.MODE, password=password,
942                             crypto_paramversion=paramversion,
943                             logger=self.consoleLogger)
944
945         # create first backup
946         deltatar.create_full_backup(
947             source_path="source_dir",
948             backup_path="backup_dir")
949
950         assert os.path.exists("backup_dir")
951         shutil.rmtree("source_dir")
952
953         index_filename = deltatar.index_name_func(True)
954         index_path = os.path.join("backup_dir", index_filename)
955
956         deltatar.included_files = ["test", "small"]
957         deltatar.excluded_files = ["test/huge"]
958         deltatar.filter_func = filter_func
959         deltatar.restore_backup(target_path="source_dir",
960             backup_indexes_paths=[index_path])
961
962         assert set(visited_paths) == set([
963                 'small',
964                 'test',
965                 'test/huge2',
966                 'test/test2'
967             ])
968
969     def test_restore_tar_basic_filtering(self):
970         '''
971         Creates a backup, and then filter when doing the tar based restore.
972         '''
973         password, paramversion = self.ENCRYPTION or (None, None)
974         deltatar = DeltaTar(mode=self.MODE, password=password,
975                             crypto_paramversion=paramversion,
976                             logger=self.consoleLogger)
977
978         # create first backup
979         deltatar.create_full_backup(
980             source_path="source_dir",
981             backup_path="backup_dir")
982
983         assert os.path.exists("backup_dir")
984         shutil.rmtree("source_dir")
985
986         deltatar.included_files = ["test", "small"]
987         deltatar.excluded_files = ["test/huge"]
988
989         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
990         tar_path = os.path.join("backup_dir", tar_filename)
991
992         deltatar.restore_backup(target_path="source_dir",
993                                 backup_tar_path=tar_path)
994
995         assert os.path.exists("source_dir/small")
996         assert os.path.exists("source_dir/test")
997         assert os.path.exists("source_dir/test/huge2")
998         assert os.path.exists("source_dir/test/test2")
999
1000         assert not os.path.exists("source_dir/test/huge")
1001         assert not os.path.exists("source_dir/big")
1002
1003     def test_restore_tar_filter_func(self):
1004         '''
1005         Creates a backup, and then filter when doing the tar based restore,
1006         using the filter function.
1007         '''
1008         visited_paths = []
1009         def filter_func(visited_paths, path):
1010             if path not in visited_paths:
1011                 visited_paths.append(path)
1012             return True
1013
1014         filter_func = partial(filter_func, visited_paths)
1015
1016         password, paramversion = self.ENCRYPTION or (None, None)
1017         deltatar = DeltaTar(mode=self.MODE, password=password,
1018                             crypto_paramversion=paramversion,
1019                             logger=self.consoleLogger)
1020
1021         # create first backup
1022         deltatar.create_full_backup(
1023             source_path="source_dir",
1024             backup_path="backup_dir")
1025
1026         assert os.path.exists("backup_dir")
1027         shutil.rmtree("source_dir")
1028
1029         index_filename = deltatar.index_name_func(True)
1030         index_path = os.path.join("backup_dir", index_filename)
1031
1032         deltatar.included_files = ["test", "small"]
1033         deltatar.excluded_files = ["test/huge"]
1034         deltatar.filter_func = filter_func
1035
1036         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1037         tar_path = os.path.join("backup_dir", tar_filename)
1038
1039         deltatar.restore_backup(target_path="source_dir",
1040                                 backup_tar_path=tar_path)
1041         assert set(visited_paths) == set([
1042                 'small',
1043                 'test',
1044                 'test/huge2',
1045                 'test/test2'
1046             ])
1047
1048     def test_filter_path_regexp(self):
1049         '''
1050         Test specifically the deltatar.filter_path function with regular
1051         expressions
1052         '''
1053         included_files = [
1054             re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1055             re.compile('^yes$'),
1056             'testing'
1057         ]
1058         excluded_files = [
1059             re.compile('^testing/in_the'),
1060         ]
1061         deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1062                             excluded_files=excluded_files)
1063
1064         # assert valid and invalid paths
1065         assert deltatar.filter_path('test/hola')
1066         assert deltatar.filter_path('test/hola/any/thing')
1067         assert deltatar.filter_path('test/caracola/caracolero')
1068         assert deltatar.filter_path('test/caracola/caracolero/yeah')
1069         assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1070         assert deltatar.filter_path('yes')
1071         assert deltatar.filter_path('testing')
1072         assert deltatar.filter_path('testing/yes')
1073         assert deltatar.filter_path('testing/in_th')
1074
1075         assert not deltatar.filter_path('something')
1076         assert not deltatar.filter_path('other/thing')
1077         assert not deltatar.filter_path('test_ing')
1078         assert not deltatar.filter_path('test/hola_lala')
1079         assert not deltatar.filter_path('test/agur')
1080         assert not deltatar.filter_path('testing_something')
1081         assert not deltatar.filter_path('yeso')
1082         assert not deltatar.filter_path('yes/o')
1083         assert not deltatar.filter_path('yes_o')
1084         assert not deltatar.filter_path('testing/in_the')
1085         assert not deltatar.filter_path('testing/in_the_field')
1086         assert not deltatar.filter_path('testing/in_the/field')
1087
1088     def test_filter_path_parent(self):
1089         '''
1090         Test specifically the deltatar.filter_path function for parent matching
1091         '''
1092         included_files = [
1093             'testing/path/to/some/thing'
1094         ]
1095         deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1096
1097         # assert valid and invalid paths
1098         assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1099         assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1100         assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1101         assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1102         assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1103         assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1104         assert deltatar.filter_path('testing/something/else') == NO_MATCH
1105
1106     def test_parent_matching_simple_full_backup(self):
1107         '''
1108         Create a full backup using parent matching
1109         '''
1110         included_files = [
1111             'test/huge2'
1112         ]
1113
1114         password, paramversion = self.ENCRYPTION or (None, None)
1115         deltatar = DeltaTar(mode=self.MODE, password=password,
1116                             crypto_paramversion=paramversion,
1117                             logger=self.consoleLogger,
1118                             included_files=included_files)
1119
1120         # create first backup
1121         deltatar.create_full_backup(
1122             source_path="source_dir",
1123             backup_path="backup_dir")
1124
1125         assert os.path.exists("backup_dir")
1126         shutil.rmtree("source_dir")
1127
1128         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1129         tar_path = os.path.join("backup_dir", tar_filename)
1130
1131         deltatar = DeltaTar(mode=self.MODE, password=password,
1132                             logger=self.consoleLogger)
1133         deltatar.restore_backup(target_path="source_dir",
1134                                 backup_tar_path=tar_path)
1135
1136         assert os.path.exists('source_dir/test/huge2')
1137         assert os.path.exists('source_dir/test/')
1138         assert not os.path.exists('source_dir/test/huge')
1139         assert not os.path.exists('source_dir/big')
1140         assert not os.path.exists('source_dir/small')
1141
1142     def test_parent_matching_simple_full_backup_restore(self):
1143         '''
1144         Create a full backup and restores it using parent matching
1145         '''
1146         included_files = [
1147             'test/huge2'
1148         ]
1149
1150         password, paramversion = self.ENCRYPTION or (None, None)
1151         deltatar = DeltaTar(mode=self.MODE, password=password,
1152                             crypto_paramversion=paramversion,
1153                             logger=self.consoleLogger)
1154
1155         # create first backup
1156         deltatar.create_full_backup(
1157             source_path="source_dir",
1158             backup_path="backup_dir")
1159
1160         assert os.path.exists("backup_dir")
1161         shutil.rmtree("source_dir")
1162
1163         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1164         tar_path = os.path.join("backup_dir", tar_filename)
1165
1166         deltatar = DeltaTar(mode=self.MODE, password=password,
1167                             logger=self.consoleLogger,
1168                             included_files=included_files)
1169         deltatar.restore_backup(target_path="source_dir",
1170                                 backup_tar_path=tar_path)
1171
1172         assert os.path.exists('source_dir/test/huge2')
1173         assert os.path.exists('source_dir/test/')
1174         assert not os.path.exists('source_dir/test/huge')
1175         assert not os.path.exists('source_dir/big')
1176         assert not os.path.exists('source_dir/small')
1177
1178     def test_parent_matching_index_full_backup_restore(self):
1179         '''
1180         Create a full backup and restores it using parent matching
1181         '''
1182         included_files = [
1183             'test/huge2'
1184         ]
1185
1186         password, paramversion = self.ENCRYPTION or (None, None)
1187         deltatar = DeltaTar(mode=self.MODE, password=password,
1188                             crypto_paramversion=paramversion,
1189                             logger=self.consoleLogger)
1190
1191         # create first backup
1192         deltatar.create_full_backup(
1193             source_path="source_dir",
1194             backup_path="backup_dir")
1195
1196         assert os.path.exists("backup_dir")
1197         shutil.rmtree("source_dir")
1198
1199         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1200         tar_path = os.path.join("backup_dir", tar_filename)
1201
1202         deltatar = DeltaTar(mode=self.MODE, password=password,
1203                             logger=self.consoleLogger,
1204                             included_files=included_files)
1205         deltatar.restore_backup(target_path="source_dir",
1206                                 backup_tar_path=tar_path)
1207
1208         assert os.path.exists('source_dir/test/huge2')
1209         assert os.path.exists('source_dir/test/')
1210         assert not os.path.exists('source_dir/test/huge')
1211         assert not os.path.exists('source_dir/big')
1212         assert not os.path.exists('source_dir/small')
1213
1214     def test_collate_iterators(self):
1215         '''
1216         Tests the collate iterators functionality with two exact directories,
1217         using an index iterator from a backup and the exact same source dir.
1218         '''
1219         password, paramversion = self.ENCRYPTION or (None, None)
1220         deltatar = DeltaTar(mode=self.MODE, password=password,
1221                             crypto_paramversion=paramversion,
1222                             logger=self.consoleLogger)
1223
1224         # create first backup
1225         deltatar.create_full_backup(
1226             source_path="source_dir",
1227             backup_path="backup_dir")
1228
1229         assert os.path.exists("backup_dir")
1230
1231         cwd = os.getcwd()
1232         index_filename = deltatar.index_name_func(is_full=True)
1233         index_path = os.path.join(cwd, "backup_dir", index_filename)
1234         index_it = deltatar.iterate_index_path(index_path)
1235
1236         os.chdir('source_dir')
1237         dir_it = deltatar._recursive_walk_dir('.')
1238         path_it = deltatar.jsonize_path_iterator(dir_it)
1239
1240         try:
1241             for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1242                 assert deltatar._equal_stat_dicts(path1, path2)
1243         finally:
1244             os.chdir(cwd)
1245
1246     def test_collate_iterators_diffdirs(self):
1247         '''
1248         Use the collate iterators functionality with two different directories.
1249         It must behave in an expected way.
1250         '''
1251         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1252
1253         password, paramversion = self.ENCRYPTION or (None, None)
1254         deltatar = DeltaTar(mode=self.MODE, password=password,
1255                             crypto_paramversion=paramversion,
1256                             logger=self.consoleLogger)
1257
1258         # create first backup
1259         deltatar.create_full_backup(
1260             source_path="source_dir",
1261             backup_path="backup_dir")
1262
1263         assert os.path.exists("backup_dir")
1264         self.hash["source_dir/z"]  = self.create_file("source_dir/z", 100)
1265
1266         cwd = os.getcwd()
1267         index_filename = deltatar.index_name_func(is_full=True)
1268         index_path = os.path.join(cwd, "backup_dir", index_filename)
1269         index_it = deltatar.iterate_index_path(index_path)
1270
1271         os.chdir('source_dir')
1272         dir_it = deltatar._recursive_walk_dir('.')
1273         path_it = deltatar.jsonize_path_iterator(dir_it)
1274
1275         try:
1276             for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1277                 if path2['path'] == 'z':
1278                     assert not path1
1279                 else:
1280                     assert deltatar._equal_stat_dicts(path1, path2)
1281         finally:
1282             os.chdir(cwd)
1283
1284     def test_collate_iterators_diffdirs2(self):
1285         '''
1286         Use the collate iterators functionality with two different directories.
1287         It must behave in an expected way.
1288         '''
1289         password, paramversion = self.ENCRYPTION or (None, None)
1290         deltatar = DeltaTar(mode=self.MODE, password=password,
1291                             crypto_paramversion=paramversion,
1292                             logger=self.consoleLogger)
1293
1294         # create first backup
1295         deltatar.create_full_backup(
1296             source_path="source_dir",
1297             backup_path="backup_dir")
1298
1299         assert os.path.exists("backup_dir")
1300
1301         # add some new files and directories
1302         os.makedirs('source_dir/bigdir')
1303         self.hash["source_dir/bigdir"] = ""
1304         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1305         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1306         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1307
1308         cwd = os.getcwd()
1309         index_filename = deltatar.index_name_func(is_full=True)
1310         index_path = os.path.join(cwd, "backup_dir", index_filename)
1311         index_it = deltatar.iterate_index_path(index_path)
1312
1313         os.chdir('source_dir')
1314         dir_it = deltatar._recursive_walk_dir('.')
1315         path_it = deltatar.jsonize_path_iterator(dir_it)
1316
1317         visited_pairs = []
1318
1319         try:
1320             for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1321                 visited_pairs.append(
1322                     (deltatar.unprefixed(path1['path']) if path1 else None,
1323                      path2['path'] if path2 else None)
1324                 )
1325         finally:
1326             assert visited_pairs == [
1327                 (u'big', u'big'),
1328                 (None, u'bigdir'),
1329                 (u'small', u'small'),
1330                 (u'test', u'test'),
1331                 (None, u'zzzz'),
1332                 (None, u'bigdir/a'),
1333                 (None, u'bigdir/b'),
1334                 (u'test/huge', u'test/huge'),
1335                 (u'test/huge2', u'test/huge2'),
1336                 (u'test/test2', u'test/test2'),
1337             ]
1338             os.chdir(cwd)
1339
1340     def test_create_empty_diff_backup(self):
1341         '''
1342         Creates an empty (no changes) backup diff
1343         '''
1344         password, paramversion = self.ENCRYPTION or (None, None)
1345         deltatar = DeltaTar(mode=self.MODE, password=password,
1346                             crypto_paramversion=paramversion,
1347                             logger=self.consoleLogger)
1348
1349         # create first backup
1350         deltatar.create_full_backup(
1351             source_path="source_dir",
1352             backup_path="backup_dir")
1353
1354         prev_index_filename = deltatar.index_name_func(is_full=True)
1355         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1356
1357         deltatar.create_diff_backup("source_dir", "backup_dir2",
1358                                     prev_index_path)
1359
1360         # check index items
1361         index_path = os.path.join("backup_dir2",
1362             deltatar.index_name_func(is_full=False))
1363         index_it = deltatar.iterate_index_path(index_path)
1364         n = 0
1365         for i in index_it:
1366             n += 1
1367             assert i[0]['path'].startswith("list://")
1368
1369         assert n == 6
1370
1371         # check the tar file
1372         assert os.path.exists("backup_dir2")
1373         shutil.rmtree("source_dir")
1374
1375         tar_filename = deltatar.volume_name_func('backup_dir2',
1376             is_full=False, volume_number=0)
1377         tar_path = os.path.join("backup_dir2", tar_filename)
1378
1379         # no file restored, because the diff was empty
1380         deltatar.restore_backup(target_path="source_dir",
1381                                 backup_tar_path=tar_path)
1382         assert len(os.listdir("source_dir")) == 0
1383
1384
1385     def test_create_diff_backup1(self):
1386         '''
1387         Creates a diff backup when there are new files
1388         '''
1389         password, paramversion = self.ENCRYPTION or (None, None)
1390         deltatar = DeltaTar(mode=self.MODE, password=password,
1391                             crypto_paramversion=paramversion,
1392                             logger=self.consoleLogger)
1393
1394         # create first backup
1395         deltatar.create_full_backup(
1396             source_path="source_dir",
1397             backup_path="backup_dir")
1398
1399         prev_index_filename = deltatar.index_name_func(is_full=True)
1400         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1401
1402         # add some new files and directories
1403         os.makedirs('source_dir/bigdir')
1404         self.hash["source_dir/bigdir"] = ""
1405         os.unlink("source_dir/small")
1406         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1407         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1408         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1409
1410         deltatar.create_diff_backup("source_dir", "backup_dir2",
1411                                     prev_index_path)
1412
1413         # check index items
1414         index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
1415         index_it = deltatar.iterate_index_path(index_path)
1416         l = [i[0]['path'] for i in index_it]
1417
1418         assert l == [
1419             'list://big',
1420             'snapshot://bigdir',
1421             'delete://small',
1422             'list://test',
1423             'snapshot://zzzz',
1424             'snapshot://bigdir/a',
1425             'snapshot://bigdir/b',
1426             'list://test/huge',
1427             'list://test/huge2',
1428             'list://test/test2',
1429         ]
1430
1431         # check the tar file
1432         assert os.path.exists("backup_dir2")
1433         shutil.rmtree("source_dir")
1434
1435         # create source_dir with the small file, that will be then deleted by
1436         # the restore_backup
1437         os.mkdir("source_dir")
1438         open("source_dir/small", 'wb').close()
1439
1440         tar_filename = deltatar.volume_name_func('backup_dir2',
1441             is_full=False, volume_number=0)
1442         tar_path = os.path.join("backup_dir2", tar_filename)
1443
1444         # restore the backup, this will create only the new files
1445         deltatar.restore_backup(target_path="source_dir",
1446                                 backup_tar_path=tar_path)
1447         # the order doesn't matter
1448         assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
1449
1450     def test_restore_from_index_diff_backup(self):
1451         '''
1452         Creates a full backup, modifies some files, creates a diff backup,
1453         then restores the diff backup from zero.
1454         '''
1455         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1456             raise SkipTest('this test only works for uncompressed '
1457                            'or concat compressed modes')
1458
1459         password, paramversion = self.ENCRYPTION or (None, None)
1460         deltatar = DeltaTar(mode=self.MODE, password=password,
1461                             crypto_paramversion=paramversion,
1462                             logger=self.consoleLogger)
1463
1464         # create first backup
1465         deltatar.create_full_backup(
1466             source_path="source_dir",
1467             backup_path="backup_dir")
1468
1469         prev_index_filename = deltatar.index_name_func(is_full=True)
1470         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1471
1472         # add some new files and directories
1473         os.makedirs('source_dir/bigdir')
1474         self.hash["source_dir/bigdir"] = ""
1475         os.unlink("source_dir/small")
1476         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1477         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1478         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1479
1480         deltatar.create_diff_backup("source_dir", "backup_dir2",
1481                                     prev_index_path)
1482
1483         # apply diff backup in target_dir
1484         index_filename = deltatar.index_name_func(is_full=False)
1485         index_path = os.path.join("backup_dir2", index_filename)
1486         deltatar.restore_backup("target_dir",
1487             backup_indexes_paths=[index_path, prev_index_path])
1488
1489         # then compare the two directories source_dir and target_dir and check
1490         # they are the same
1491         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1492
1493     def test_restore_from_index_diff_backup2(self):
1494         '''
1495         Creates a full backup, modifies some files, creates a diff backup,
1496         then restores the diff backup with the full backup as a starting point.
1497         '''
1498         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1499             raise SkipTest('this test only works for uncompressed '
1500                            'or concat compressed modes')
1501
1502         password, paramversion = self.ENCRYPTION or (None, None)
1503         deltatar = DeltaTar(mode=self.MODE, password=password,
1504                             crypto_paramversion=paramversion,
1505                             logger=self.consoleLogger)
1506
1507         # create first backup
1508         deltatar.create_full_backup(
1509             source_path="source_dir",
1510             backup_path="backup_dir")
1511
1512         prev_index_filename = deltatar.index_name_func(is_full=True)
1513         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1514
1515         # add some new files and directories
1516         os.makedirs('source_dir/bigdir')
1517         self.hash["source_dir/bigdir"] = ""
1518         os.unlink("source_dir/small")
1519         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1520         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1521         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1522         shutil.rmtree("source_dir/test")
1523
1524         deltatar.create_diff_backup("source_dir", "backup_dir2",
1525                                     prev_index_path)
1526
1527         # first restore initial backup in target_dir
1528         tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1529         tar_path = os.path.join("backup_dir", tar_filename)
1530         deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1531
1532         # then apply diff backup in target_dir
1533         index_filename = deltatar.index_name_func(is_full=False)
1534         index_path = os.path.join("backup_dir2", index_filename)
1535
1536         try:
1537             deltatar.restore_backup("target_dir",
1538                 backup_indexes_paths=[index_path, prev_index_path])
1539
1540             # then compare the two directories source_dir and target_dir and check
1541             # they are the same
1542             self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1543         except FileNotFoundError as exn:
1544             if self.FSTEST is None:
1545                 # fs traversal may fail here
1546                 raise exn
1547
1548     def test_restore_from_index_diff_backup3(self):
1549         '''
1550         Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1551         diff backup, then restores the diff backup with the full backup as a
1552         starting point.
1553         '''
1554         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1555             raise SkipTest('this test only works for uncompressed '
1556                            'or concat compressed modes')
1557
1558         password, paramversion = self.ENCRYPTION or (None, None)
1559         deltatar = DeltaTar(mode=self.MODE, password=password,
1560                             crypto_paramversion=paramversion,
1561                             logger=self.consoleLogger)
1562
1563         shutil.rmtree("source_dir")
1564         shutil.copytree(self.GIT_DIR, "source_dir")
1565         shutil.copytree(self.GIT_DIR, "source_dir_diff")
1566
1567         # create first backup
1568         deltatar.create_full_backup(
1569             source_path="source_dir",
1570             backup_path="backup_dir")
1571
1572         prev_index_filename = deltatar.index_name_func(is_full=True)
1573         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1574
1575         # alter the source_dir randomly
1576         source_it = deltatar._recursive_walk_dir('source_dir_diff')
1577
1578         for path in source_it:
1579             # if path doesn't exist (might have previously removed) ignore it.
1580             # also ignore it (i.e. do not change it) 70% of the time
1581             if not os.path.exists(path) or random.random() < 0.7:
1582                 continue
1583
1584             # remove the file
1585             if os.path.isdir(path):
1586                 shutil.rmtree(path)
1587             else:
1588                 os.unlink(path)
1589
1590         deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1591                                     prev_index_path)
1592
1593         # first restore initial backup in target_dir
1594         tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1595         tar_path = os.path.join("backup_dir", tar_filename)
1596         deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1597
1598         # and check that target_dir equals to source_dir (which is the same as
1599         # self.GIT_DIR initially)
1600         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1601
1602         # then apply diff backup in target_dir
1603         index_filename = deltatar.index_name_func(is_full=False)
1604         index_path = os.path.join("backup_dir2", index_filename)
1605         deltatar.restore_backup("target_dir",
1606             backup_indexes_paths=[index_path, prev_index_path])
1607
1608         # and check that target_dir equals to source_dir_diff (the randomly
1609         # altered self.GIT_DIR directory)
1610         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1611
1612         # then delete target_dir and apply diff backup from zero and check again
1613         shutil.rmtree("target_dir")
1614         deltatar.restore_backup("target_dir",
1615             backup_indexes_paths=[index_path, prev_index_path])
1616
1617         # and check that target_dir equals to source_dir_diff (the randomly
1618         # altered self.GIT_DIR directory)
1619         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1620
1621     def test_restore_from_index_diff_backup3_multivol(self):
1622         '''
1623         Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1624         diff backup, then restores the diff backup with the full backup as a
1625         starting point.
1626         '''
1627         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1628             raise SkipTest('this test only works for uncompressed '
1629                            'or concat compressed modes')
1630
1631         password, paramversion = self.ENCRYPTION or (None, None)
1632         deltatar = DeltaTar(mode=self.MODE, password=password,
1633                             crypto_paramversion=paramversion,
1634                             logger=self.consoleLogger)
1635
1636         shutil.rmtree("source_dir")
1637         shutil.copytree(self.GIT_DIR, "source_dir")
1638         shutil.copytree(self.GIT_DIR, "source_dir_diff")
1639
1640         # create first backup
1641         deltatar.create_full_backup(
1642             source_path="source_dir",
1643             backup_path="backup_dir",
1644             max_volume_size=1)
1645
1646         prev_index_filename = deltatar.index_name_func(is_full=True)
1647         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1648
1649         # alter the source_dir randomly
1650         source_it = deltatar._recursive_walk_dir('source_dir_diff')
1651
1652         for path in source_it:
1653             # if path doesn't exist (might have previously removed) ignore it.
1654             # also ignore it (i.e. do not change it) 70% of the time
1655             if not os.path.exists(path) or random.random() < 0.7:
1656                 continue
1657
1658             # remove the file
1659             if os.path.isdir(path):
1660                 shutil.rmtree(path)
1661             else:
1662                 os.unlink(path)
1663
1664         deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1665                                     prev_index_path, max_volume_size=1)
1666
1667         # first restore initial backup in target_dir
1668         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1669         tar_path = os.path.join("backup_dir", tar_filename)
1670         if self.FSTEST is not None:
1671             return  # the below will fail in stat checks, but that is expected
1672         deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1673
1674         # and check that target_dir equals to source_dir (which is the same as
1675         # self.GIT_DIR initially)
1676         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1677
1678         # then apply diff backup in target_dir
1679         index_filename = deltatar.index_name_func(is_full=False)
1680         index_path = os.path.join("backup_dir2", index_filename)
1681         deltatar.restore_backup("target_dir",
1682             backup_indexes_paths=[index_path, prev_index_path])
1683
1684         # and check that target_dir equals to source_dir_diff (the randomly
1685         # altered self.GIT_DIR directory)
1686         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1687
1688         # then delete target_dir and apply diff backup from zero and check again
1689         shutil.rmtree("target_dir")
1690         deltatar.restore_backup("target_dir",
1691             backup_indexes_paths=[index_path, prev_index_path])
1692
1693         # and check that target_dir equals to source_dir_diff (the randomly
1694         # altered self.GIT_DIR directory)
1695         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1696
1697     def check_equal_dirs(self, path1, path2, deltatar):
1698         '''
1699         compare the two directories source_dir and target_dir and check
1700         # they are the same
1701         '''
1702         source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
1703         source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
1704         target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
1705         target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1706         while True:
1707             try:
1708                 sitem = next(source_it)
1709                 titem = next(target_it)
1710             except StopIteration:
1711                 try:
1712                     titem = next(target_it)
1713                     raise Exception("iterators do not stop at the same time")
1714                 except StopIteration:
1715                     break
1716             try:
1717                 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
1718             except Exception as e:
1719                 print("SITEM: " + str(sitem))
1720                 print("TITEM: " + str(titem))
1721                 raise e
1722
1723     def test_create_no_symlinks(self):
1724         '''
1725         Creates a full backup from different varieties of symlinks. The
1726         extracted archive may not contain any symlinks but the file contents
1727         '''
1728
1729         os.system("rm -rf source_dir")
1730         os.makedirs("source_dir/symlinks")
1731         fd = os.open("source_dir/symlinks/valid_linkname",
1732                      os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1733         os.write(fd, b"valid link target for symlink tests; please ignore\n")
1734         os.close(fd)
1735         # first one is good, the rest points nowhere
1736         self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1737         self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1738         self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1739         self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
1740         password, paramversion = self.ENCRYPTION or (None, None)
1741         deltatar = DeltaTar(mode=self.MODE, password=password,
1742                             crypto_paramversion=paramversion,
1743                             logger=self.consoleLogger)
1744
1745         # create first backup
1746         deltatar.create_full_backup(source_path="source_dir",
1747                                     backup_path="backup_dir")
1748
1749         assert os.path.exists("backup_dir")
1750         shutil.rmtree("source_dir")
1751         assert not os.path.exists("source_dir")
1752
1753         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1754         tar_path = os.path.join("backup_dir", tar_filename)
1755
1756         deltatar.restore_backup(target_path="source_dir",
1757                                 backup_tar_path=tar_path)
1758
1759         for _r, _ds, fs in os.walk("source_dir/symlinks"):
1760         # only the valid link plus the linked file may be found in the
1761         # extracted archive
1762             assert len(fs) == 2
1763             for f in fs:
1764                 # the link must have been resolved and file contents must match
1765                 # the linked file
1766                 assert not os.path.islink(f)
1767                 with open("source_dir/symlinks/valid_linkname") as a:
1768                     with open("source_dir/symlinks/whatever") as b:
1769                         assert a.read() == b.read()
1770
1771     def test_restore_with_symlinks(self):
1772         '''
1773         Creates a full backup containing different varieties of symlinks. All
1774         of them must be filtered out.
1775         '''
1776         password, paramversion = self.ENCRYPTION or (None, None)
1777         deltatar = DeltaTar(mode=self.MODE, password=password,
1778                             crypto_paramversion=paramversion,
1779                             logger=self.consoleLogger)
1780
1781         # create first backup
1782         deltatar.create_full_backup(source_path="source_dir",
1783                                     backup_path="backup_dir")
1784
1785         assert os.path.exists("backup_dir")
1786         shutil.rmtree("source_dir")
1787
1788         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1789         tar_path = os.path.join("backup_dir", tar_filename)
1790
1791         # add symlinks to existing archive
1792
1793         def add_symlink (a, name, dst):
1794             l = tarfile.TarInfo("snapshot://%s" % name)
1795             l.type = tarfile.SYMTYPE
1796             l.linkname = dst
1797             a.addfile(l)
1798             return name
1799
1800         try:
1801             with tarfile.open(tar_path,mode="a") as a:
1802                 checkme = \
1803                     [ add_symlink(a, "symlinks/foo", "internal-file")
1804                     , add_symlink(a, "symlinks/bar", "/absolute/path")
1805                     , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1806         except tarfile.ReadError as e:
1807             if self.MODE == '#' or self.MODE.endswith ("gz"):
1808                 checkme = []
1809             else:
1810                 raise
1811         except ValueError as e:
1812             if self.MODE.startswith ('#'):
1813                 checkme = []
1814             else:
1815                 raise
1816
1817         deltatar.restore_backup(target_path="source_dir",
1818                                 backup_tar_path=tar_path)
1819
1820         # check what happened to our symlinks
1821         for name in checkme:
1822             fullpath = os.path.join("source_dir", name)
1823             assert not os.path.exists(fullpath)
1824
1825     def test_restore_malicious_symlinks(self):
1826         '''
1827         Creates a full backup containing a symlink and a file of the same name.
1828         This simulates a symlink attack with a link pointing to some external
1829         path that is abused to write outside the extraction prefix.
1830         '''
1831         password, paramversion = self.ENCRYPTION or (None, None)
1832         deltatar = DeltaTar(mode=self.MODE, password=password,
1833                             crypto_paramversion=paramversion,
1834                             logger=self.consoleLogger)
1835
1836         # create first backup
1837         deltatar.create_full_backup(source_path="source_dir",
1838                                     backup_path="backup_dir")
1839
1840         assert os.path.exists("backup_dir")
1841         shutil.rmtree("source_dir")
1842
1843         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1844         tar_path = os.path.join("backup_dir", tar_filename)
1845
1846         # add symlinks to existing archive
1847
1848         def add_symlink (a, name, dst):
1849             l = tarfile.TarInfo("snapshot://%s" % name)
1850             l.type = tarfile.SYMTYPE
1851             l.linkname = dst
1852             a.addfile(l)
1853
1854         def add_file (a, name):
1855             f = tarfile.TarInfo("snapshot://%s" % name)
1856             f.type = tarfile.REGTYPE
1857             a.addfile(f)
1858
1859         testpath = "symlinks/pernicious-link"
1860         testdst = "/tmp/does/not/exist"
1861
1862         try:
1863             with tarfile.open(tar_path, mode="a") as a:
1864                 add_symlink(a, testpath, testdst)
1865                 add_symlink(a, testpath, testdst+"X")
1866                 add_symlink(a, testpath, testdst+"XXX")
1867                 add_file(a, testpath)
1868         except tarfile.ReadError as e:
1869             if self.MODE == '#' or self.MODE.endswith ("gz"):
1870                 pass
1871             else:
1872                 raise
1873         except ValueError as e:
1874             if self.MODE.startswith ('#'):
1875                 pass # O_APPEND of concat archives not feasible
1876             else:
1877                 raise
1878
1879         deltatar.restore_backup(target_path="source_dir",
1880                                 backup_tar_path=tar_path)
1881
1882         # check whether the link was extracted; deltatar seems to only ever
1883         # retrieve the first item it finds for a given path which in the case
1884         # at hand is a symlink to some non-existent path
1885         fullpath = os.path.join("source_dir", testpath)
1886         assert not os.path.exists(fullpath)
1887
1888
1889 def fsapi_access_true (self):
1890     """
1891     Chicanery for testing improper use of the *os* module.
1892     """
1893     def yes (*_a, **_ka): return True
1894     self.FSAPI_SAVED.append (("access", getattr (os, "access")))
1895     setattr (os, "access", yes)
1896
1897
1898 class DeltaTar2Test(DeltaTarTest):
1899     '''
1900     Same as DeltaTar but with specific ":" mode
1901     '''
1902     MODE = ':'
1903
1904
1905 class DeltaTarStreamTest(DeltaTarTest):
1906     '''
1907     Same as DeltaTar but with specific uncompressed stream mode
1908     '''
1909     MODE = '|'
1910
1911
1912 class DeltaTarGzipTest(DeltaTarTest):
1913     '''
1914     Same as DeltaTar but with specific gzip mode
1915     '''
1916     MODE = ':gz'
1917     MODE_COMPRESSES = True
1918
1919
1920 class DeltaTarGzipStreamTest(DeltaTarTest):
1921     '''
1922     Same as DeltaTar but with specific gzip stream mode
1923     '''
1924     MODE = '|gz'
1925     MODE_COMPRESSES = True
1926
1927
1928 @skip('Bz2 tests are too slow..')
1929 class DeltaTarBz2Test(DeltaTarTest):
1930     '''
1931     Same as DeltaTar but with specific bz2 mode
1932     '''
1933     MODE = ':bz2'
1934     MODE_COMPRESSES = True
1935
1936
1937 @skip('Bz2 tests are too slow..')
1938 class DeltaTarBz2StreamTest(DeltaTarTest):
1939     '''
1940     Same as DeltaTar but with specific bz2 stream mode
1941     '''
1942     MODE = '|bz2'
1943     MODE_COMPRESSES = True
1944
1945
1946 class DeltaTarGzipConcatTest(DeltaTarTest):
1947     '''
1948     Same as DeltaTar but with specific gzip concat stream mode
1949     '''
1950     MODE = '#gz'
1951     MODE_COMPRESSES = True
1952
1953
1954 class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
1955     '''
1956     Same as DeltaTar but with specific gzip aes128 concat stream mode
1957     '''
1958     MODE = '#gz'
1959     ENCRYPTION = ('some magic key', 1)
1960     MODE_COMPRESSES = True
1961
1962
1963 class DeltaTarAes128ConcatTest(DeltaTarTest):
1964     '''
1965     Same as DeltaTar but with specific aes128 concat stream mode
1966     '''
1967     MODE = '#'
1968     ENCRYPTION = ('some magic key', 1)
1969
1970
1971 class DeltaTarFilesystemHandlingTestBase(BaseTest):
1972     '''
1973     Mess with filesystem APIs.
1974     '''
1975     FSTEST = fsapi_access_true
1976
1977 class DeltaTarFSGzipTest(DeltaTarFilesystemHandlingTestBase,
1978                          DeltaTarGzipTest):
1979     pass
1980
1981 class DeltaTarFSGzipConcatTest(DeltaTarFilesystemHandlingTestBase,
1982                                DeltaTarGzipConcatTest):
1983     pass
1984
1985 class DeltaTarFSAes128ConcatTest(DeltaTarFilesystemHandlingTestBase,
1986                                  DeltaTarAes128ConcatTest):
1987     pass
1988
1989 class DeltaTarFSGzipAes128ConcatTest(DeltaTarFilesystemHandlingTestBase,
1990                                      DeltaTarGzipAes128ConcatTest):
1991     pass
1992