improve unit test SNR
[python-delta-tar] / testing / test_deltatar.py
1 # Copyright (C) 2013 Intra2net AG
2 #
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU Lesser General Public License as published
5 # by the Free Software Foundation; either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Lesser General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see
15 # <http://www.gnu.org/licenses/lgpl-3.0.html>
16
17 # Author: Eduardo Robles Elvira <edulix@wadobo.com>
18
19 import errno
20 import os
21 import re
22 import random
23 import shutil
24 import logging
25 import binascii
26 import json
27 from datetime import datetime
28 from functools import partial
29 from unittest import skip, SkipTest
30
31 import deltatar.tarfile as tarfile
32 from deltatar.tarfile import TarFile
33 from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
34 from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35 import deltatar.crypto as crypto
36
37 from . import BaseTest
38 from . import new_volume_handler
39
40 # Enable warning messages from deltatar. This minimizes the SNR of
41 # test runs, but none of the messages are meaningful in any way.
42 VERBOSE_TEST_OUTPUT = False
43
44 class DeltaTarTest(BaseTest):
45     """
46     Test backups
47     """
48     MODE = ''
49     MODE_COMPRESSES = False
50
51     ENCRYPTION = None  # (password : str, paramversion : int) option
52
53     GIT_DIR = '.git'
54
55     def setUp(self):
56         '''
57         Create base test data
58         '''
59         self.pwd = os.getcwd()
60         os.system('rm -rf target_dir source_dir* backup_dir* huge')
61         os.makedirs('source_dir/test/test2')
62         self.hash = dict()
63         self.hash["source_dir/test/test2"] = ''
64         self.hash["source_dir/big"]  = self.create_file("source_dir/big", 50000)
65         self.hash["source_dir/small"]  = self.create_file("source_dir/small", 100)
66         self.hash["source_dir/test/huge"]  = self.create_file("source_dir/test/huge", 700000)
67         self.hash["source_dir/test/huge2"]  = self.create_file("source_dir/test/huge2", 800000)
68
69         self.consoleLogger = None
70         if VERBOSE_TEST_OUTPUT is True:
71             self.consoleLogger = logging.StreamHandler()
72             self.consoleLogger.setLevel(logging.DEBUG)
73
74         if not os.path.isdir(self.GIT_DIR):
75             # Not running inside git tree, take our
76             # own testing directory as source.
77             self.GIT_DIR = 'testing'
78
79             if not os.path.isdir(self.GIT_DIR):
80                 raise Exception('No input directory found: ' + self.GIT_DIR)
81
82     def tearDown(self):
83         '''
84         Remove temporal files created by unit tests and reset globals.
85         '''
86         os.chdir(self.pwd)
87         os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
88         _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
89                   ("I am fully aware that this will void my warranty.")
90
91     def test_restore_simple_full_backup(self):
92         '''
93         Creates a full backup without any filtering and restores it.
94         '''
95         password, paramversion = self.ENCRYPTION or (None, None)
96         deltatar = DeltaTar(mode=self.MODE, password=password,
97                             crypto_paramversion=paramversion,
98                             logger=self.consoleLogger)
99
100         # create first backup
101         deltatar.create_full_backup(
102             source_path="source_dir",
103             backup_path="backup_dir")
104
105         assert os.path.exists("backup_dir")
106         shutil.rmtree("source_dir")
107
108         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
109         tar_path = os.path.join("backup_dir", tar_filename)
110
111         deltatar.restore_backup(target_path="source_dir",
112                                 backup_tar_path=tar_path)
113
114         for key, value in self.hash.items():
115             assert os.path.exists(key)
116             if value:
117                 assert value == self.md5sum(key)
118
119
120     def test_create_backup_max_file_length (self):
121         """
122         Creates a full backup including one file that exceeds the (purposely
123         lowered) upper bound on GCM encrypted objects. This will yield multiple
124         encrypted objects for one plaintext file.
125
126         Success is verified by splitting the archive at object boundaries and
127         counting the parts.
128         """
129         if self.MODE_COMPRESSES is True:
130             raise SkipTest ("GCM file length test not meaningful with compression.")
131         if self.ENCRYPTION is None:
132             raise SkipTest ("GCM file length applies only to encrypted backups.")
133
134         new_max = 20000 # cannot be less than tar block size
135         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
136                 ("I am fully aware that this will void my warranty.",
137                  new_max)
138
139         password, paramversion = self.ENCRYPTION
140         deltatar = DeltaTar (mode=self.MODE, password=password,
141                              crypto_paramversion=paramversion,
142                              logger=self.consoleLogger)
143
144         self.hash = dict ()
145         os.makedirs ("source_dir2")
146         for f, s in [("empty"          , 0)             # 1 tar objects
147                     ,("slightly_larger", new_max + 1)   # 2
148                     ,("twice"          , 2 * new_max)   # 3
149                     ]:
150             f = "source_dir2/%s" % f
151             self.hash [f] = self.create_file (f, s)
152
153         deltatar.create_full_backup \
154                 (source_path="source_dir2", backup_path="backup_dir")
155
156         assert os.path.exists ("backup_dir")
157         shutil.rmtree ("source_dir2")
158
159         backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
160         backup_path     = os.path.join("backup_dir", backup_filename)
161
162         # split the resulting archive into its constituents without
163         # decrypting
164         ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
165                         "-o backup_dir/split <\'%s\'" % backup_path)
166
167         assert os.path.exists ("backup_dir/split")
168
169         dents = os.listdir ("backup_dir/split")
170         assert len (dents) == 6
171
172
173     def test_restore_backup_max_file_length (self):
174         """
175         Creates a full backup including one file that exceeds the (purposely
176         lowered) upper bound on GCM encrypted objects. This will yield two
177         encrypted objects for one plaintext file.
178
179         Success is verified by splitting the archive at object boundaries and
180         counting the parts.
181         """
182         if self.MODE_COMPRESSES is True:
183             raise SkipTest ("GCM file length test not meaningful with compression.")
184         if self.ENCRYPTION is None:
185             raise SkipTest ("GCM file length applies only to encrypted backups.")
186
187         new_max = 20000 # cannot be less than tar block size
188         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
189                     ("I am fully aware that this will void my warranty.",
190                      new_max)
191
192         password, paramversion = self.ENCRYPTION
193         deltatar = DeltaTar (mode=self.MODE, password=password,
194                              crypto_paramversion=paramversion,
195                              logger=self.consoleLogger)
196
197         self.hash = dict ()
198         os.makedirs ("source_dir2")
199         for f, s in [("empty"          , 0)             # 1 tar objects
200                     ,("almost_large"   , new_max - 1)   # 2
201                     ,("large"          , new_max)       # 3
202                     ,("slightly_larger", new_max + 1)   # 4
203                     ,("twice"          , 2 * new_max)   # 5
204                     ,("twice_plus_one" , (2 * new_max) + 1)   # 6
205                     ]:
206             f = "source_dir2/%s" % f
207             self.hash [f] = self.create_file (f, s)
208
209         deltatar.create_full_backup \
210                 (source_path="source_dir2", backup_path="backup_dir")
211
212         assert os.path.exists ("backup_dir")
213         shutil.rmtree ("source_dir2")
214
215         backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
216         backup_path     = os.path.join("backup_dir", backup_filename)
217
218         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
219         tar_path = os.path.join("backup_dir", tar_filename)
220
221         deltatar.restore_backup(target_path="source_dir2",
222                                 backup_tar_path=tar_path)
223
224         for key, value in self.hash.items():
225             assert os.path.exists(key)
226             if value:
227                 assert value == self.md5sum(key)
228
229
230     def test_create_backup_index_max_file_length (self):
231         """
232         Creates a full backup with a too large index file for the upper bound
233         of the GCM encryption. Since the index file has a fixed IV file counter
234         of AES_GCM_IV_CNT_INDEX, we expect the crypto layer to abort.
235
236         60+ GB of (potentially compressed) index file should last for a while...
237         """
238         if self.MODE_COMPRESSES is True:
239             raise SkipTest ("GCM file length test not meaningful with compression.")
240         if self.ENCRYPTION is None:
241             raise SkipTest ("GCM file length applies only to encrypted backups.")
242
243         new_max = 5000
244         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
245                     ("I am fully aware that this will void my warranty.",
246                      new_max)
247
248         password, paramversion = self.ENCRYPTION
249         deltatar = DeltaTar (mode=self.MODE, password=password,
250                              crypto_paramversion=paramversion,
251                              logger=self.consoleLogger)
252
253         self.hash = dict ()
254         os.makedirs ("source_dir2")
255         for i in range (42):
256             f = "source_dir2/dummy_%rd" % i
257             self.hash [f] = self.create_file (f, i)
258
259         with self.assertRaises (crypto.InvalidFileCounter):
260             deltatar.create_full_backup \
261                     (source_path="source_dir2", backup_path="backup_dir")
262         shutil.rmtree ("source_dir2")
263
264
265     def test_check_index_checksum(self):
266         '''
267         Creates a full backup and checks the index' checksum of files
268         '''
269         password, paramversion = self.ENCRYPTION or (None, None)
270         deltatar = DeltaTar(mode=self.MODE, password=password,
271                             crypto_paramversion=paramversion,
272                             logger=self.consoleLogger)
273
274         # create first backup
275         deltatar.create_full_backup(
276             source_path="source_dir",
277             backup_path="backup_dir")
278
279
280         index_filename = deltatar.index_name_func(True)
281         index_path = os.path.join("backup_dir", index_filename)
282
283         f = open(index_path, 'rb')
284         crc = None
285         checked = False
286         began_list = False
287         while True:
288             l = f.readline()
289             if l == b'':
290                 break
291             if b'BEGIN-FILE-LIST' in l:
292                 crc = binascii.crc32(l) & 0xFFFFffff
293                 began_list = True
294             elif b'END-FILE-LIST' in l:
295                 crc = binascii.crc32(l, crc) & 0xffffffff
296
297                 # next line contains the crc
298                 data = json.loads(f.readline().decode("UTF-8"))
299                 assert data['type'] == 'file-list-checksum'
300                 assert data['checksum'] == crc
301                 checked = True
302                 break
303             elif began_list:
304                 crc = binascii.crc32(l, crc) & 0xffffffff
305         f.close()
306
307
308     def test_restore_multivol(self):
309         '''
310         Creates a full backup without any filtering with multiple volumes and
311         restore it.
312         '''
313         if ':gz' in self.MODE:
314             raise SkipTest('compression information is lost when creating '
315                            'multiple volumes with no Stream')
316
317         password, paramversion = self.ENCRYPTION or (None, None)
318         deltatar = DeltaTar(mode=self.MODE, password=password,
319                             crypto_paramversion=paramversion,
320                             logger=self.consoleLogger)
321
322         self.hash = dict()
323         os.makedirs('source_dir2')
324         self.hash["source_dir2/big"]  = self.create_file("source_dir2/big", 100000)
325         self.hash["source_dir2/huge"]  = self.create_file("source_dir2/huge", 1200000)
326
327         # create first backup
328         deltatar.create_full_backup(
329             source_path="source_dir2",
330             backup_path="backup_dir",
331             max_volume_size=1)
332
333         assert os.path.exists("backup_dir")
334         assert os.path.exists(os.path.join("backup_dir",
335             deltatar.volume_name_func("backup_dir", True, 0)))
336         if self.MODE_COMPRESSES:
337             n_vols = 1
338         else:
339             n_vols = 2
340         for i_vol in range(n_vols):
341             assert os.path.exists(os.path.join("backup_dir",
342                 deltatar.volume_name_func("backup_dir", True, i_vol)))
343         assert not os.path.exists(os.path.join("backup_dir",
344             deltatar.volume_name_func("backup_dir", True, n_vols)))
345
346         shutil.rmtree("source_dir2")
347
348         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
349         tar_path = os.path.join("backup_dir", tar_filename)
350
351         # this should automatically restore all volumes
352         deltatar.restore_backup(target_path="source_dir2",
353                                 backup_tar_path=tar_path)
354
355         for key, value in self.hash.items():
356             assert os.path.exists(key)
357             if value:
358                 assert value == self.md5sum(key)
359
360     def test_restore_multivol_split(self):
361         '''
362         Creates a full backup without any filtering with multiple volumes
363         with big files bigger than the max volume size and
364         restore it.
365         '''
366         if self.MODE.startswith(':') or self.MODE.startswith('|'):
367             raise SkipTest('this test only works for uncompressed '
368                            'or concat compressed modes')
369
370         password, paramversion = self.ENCRYPTION or (None, None)
371         deltatar = DeltaTar(mode=self.MODE, password=password,
372                             crypto_paramversion=paramversion,
373                             logger=self.consoleLogger)
374
375         self.hash = dict()
376         os.makedirs('source_dir2')
377         self.hash["source_dir2/big"]  = self.create_file("source_dir2/big", 3*1024*1024)
378         self.hash["source_dir2/huge"]  = self.create_file("source_dir2/huge", 4*1024*1024)
379         self.hash["source_dir2/huge2"]  = self.create_file("source_dir2/huge2", 4*1024*1024)
380
381         # create first backup
382         deltatar.create_full_backup(
383             source_path="source_dir2",
384             backup_path="backup_dir",
385             max_volume_size=2)
386
387         assert os.path.exists("backup_dir")
388         assert os.path.exists(os.path.join("backup_dir",
389             deltatar.volume_name_func("backup_dir", True, 0)))
390         if self.MODE_COMPRESSES:
391             n_vols = 1
392         else:
393             n_vols = 6
394         for i_vol in range(n_vols):
395             assert os.path.exists(os.path.join("backup_dir",
396                 deltatar.volume_name_func("backup_dir", True, i_vol)))
397         assert not os.path.exists(os.path.join("backup_dir",
398             deltatar.volume_name_func("backup_dir", True, n_vols)))
399
400         shutil.rmtree("source_dir2")
401
402         index_filename = deltatar.index_name_func(True)
403         index_path = os.path.join("backup_dir", index_filename)
404
405         deltatar.restore_backup(target_path="source_dir2",
406             backup_indexes_paths=[index_path])
407
408         for key, value in self.hash.items():
409             assert os.path.exists(key)
410             if value:
411                 assert value == self.md5sum(key)
412
413
414     def test_full_backup_index_extra_data(self):
415         '''
416         Tests that the index file for a full backup can store extra_data and
417         that this data can be retrieved.
418         '''
419         password, paramversion = self.ENCRYPTION or (None, None)
420         deltatar = DeltaTar(mode=self.MODE, password=password,
421                             crypto_paramversion=paramversion,
422                             logger=self.consoleLogger)
423
424         extra_data = dict(
425             hola="caracola",
426             otra_cosa=[1, "lista"],
427             y_otra=dict(bola=1.1)
428         )
429
430         deltatar.create_full_backup(
431             source_path="source_dir",
432             backup_path="backup_dir",
433             extra_data=extra_data)
434
435         index_filename = deltatar.index_name_func(is_full=True)
436         index_path = os.path.join("backup_dir", index_filename)
437
438         # iterate_index_path retrieves extra_data, and thus we can then compare
439         index_it = deltatar.iterate_index_path(index_path)
440         self.assertEqual(index_it.extra_data, extra_data)
441
442
443     def test_diff_backup_index_extra_data(self):
444         '''
445         Tests that the index file for a diff backup can store extra_data and
446         that this data can be retrieved.
447         '''
448         password, paramversion = self.ENCRYPTION or (None, None)
449         deltatar = DeltaTar(mode=self.MODE, password=password,
450                             crypto_paramversion=paramversion,
451                             logger=self.consoleLogger)
452
453         extra_data = dict(
454             hola="caracola",
455             otra_cosa=[1, "lista"],
456             y_otra=dict(bola=1.1)
457         )
458         # do first backup
459         deltatar.create_full_backup(
460             source_path="source_dir",
461             backup_path="backup_dir")
462
463
464         prev_index_filename = deltatar.index_name_func(is_full=True)
465         prev_index_path = os.path.join("backup_dir", prev_index_filename)
466
467         # create empty diff backup
468         deltatar.create_diff_backup("source_dir", "backup_dir2",
469                                     prev_index_path, extra_data=extra_data)
470
471         index_filename = deltatar.index_name_func(is_full=False)
472         index_path = os.path.join("backup_dir2", index_filename)
473
474         # iterate_index_path retrieves extra_data, and thus we can then compare
475         index_it = deltatar.iterate_index_path(index_path)
476         self.assertEqual(index_it.extra_data, extra_data)
477
478     def test_restore_multivol2(self):
479         '''
480         Creates a full backup without any filtering with multiple volumes and
481         restore it.
482         '''
483         password, paramversion = self.ENCRYPTION or (None, None)
484         deltatar = DeltaTar(mode=self.MODE, password=password,
485                             crypto_paramversion=paramversion,
486                             logger=self.consoleLogger)
487
488         shutil.copytree(self.GIT_DIR, "source_dir2")
489
490         # create first backup
491         deltatar.create_full_backup(
492             source_path="source_dir2",
493             backup_path="backup_dir",
494             max_volume_size=1)
495
496         assert os.path.exists("backup_dir")
497         assert os.path.exists(os.path.join("backup_dir",
498             deltatar.volume_name_func("backup_dir", True, 0)))
499
500         shutil.rmtree("source_dir2")
501
502         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
503         tar_path = os.path.join("backup_dir", tar_filename)
504
505         # this should automatically restore all volumes
506         deltatar.restore_backup(target_path="source_dir2",
507                                 backup_tar_path=tar_path)
508
509         self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
510
511     def test_restore_multivol_manual_from_index(self):
512         '''
513         Creates a full backup without any filtering with multiple volumes and
514         restore it.
515         '''
516         # this test only works for uncompressed or concat compressed modes
517         if self.MODE.startswith(':') or self.MODE.startswith('|'):
518             raise SkipTest('this test only works for uncompressed '
519                            'or concat compressed modes')
520
521         password, paramversion = self.ENCRYPTION or (None, None)
522         deltatar = DeltaTar(mode=self.MODE, password=password,
523                             crypto_paramversion=paramversion,
524                             logger=self.consoleLogger)
525
526         self.hash = dict()
527         os.makedirs('source_dir2')
528         self.hash["source_dir2/big"]  = self.create_file("source_dir2/big", 100000)
529         self.hash["source_dir2/huge"]  = self.create_file("source_dir2/huge", 1200000)
530
531         # create first backup
532         deltatar.create_full_backup(
533             source_path="source_dir2",
534             backup_path="backup_dir",
535             max_volume_size=1)
536
537         assert os.path.exists("backup_dir")
538         assert os.path.exists(os.path.join("backup_dir",
539             deltatar.volume_name_func("backup_dir", True, 0)))
540         if self.MODE_COMPRESSES:
541             n_vols = 1
542         else:
543             n_vols = 2
544         for i_vol in range(n_vols):
545             assert os.path.exists(os.path.join("backup_dir",
546                 deltatar.volume_name_func("backup_dir", True, i_vol)))
547         assert not os.path.exists(os.path.join("backup_dir",
548             deltatar.volume_name_func("backup_dir", True, n_vols)))
549
550         shutil.rmtree("source_dir2")
551
552         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
553         tar_path = os.path.join("backup_dir", tar_filename)
554
555         index_filename = deltatar.index_name_func(True)
556         index_path = os.path.join("backup_dir", index_filename)
557
558         # this should automatically restore the huge file
559         f = deltatar.open_auxiliary_file(index_path, 'r')
560         offset = None
561         while True:
562             l = f.readline()
563             if not len(l):
564                 break
565             data = json.loads(l.decode('UTF-8'))
566             if data.get('type', '') == 'file' and\
567                     deltatar.unprefixed(data['path']) == "huge":
568                 offset = data['offset']
569                 break
570
571         assert offset is not None
572
573         fo = open(tar_path, 'rb')
574         fo.seek(offset)
575         def new_volume_handler(mode, tarobj, base_name, volume_number):
576             suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
577             if self.ENCRYPTION is not None:
578                 # deltatar module is shadowed here
579                 suf += "." + deltatar_PDTCRYPT_EXTENSION
580             tarobj.open_volume(datetime.now().strftime(
581                 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
582         new_volume_handler = partial(new_volume_handler, self.MODE)
583
584         crypto_ctx = None
585         if self.ENCRYPTION is not None:
586             crypto_ctx = crypto.Decrypt (password)
587
588         tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
589                               new_volume_handler=new_volume_handler,
590                               encryption=crypto_ctx)
591
592         member = tarobj.next()
593         member.path = deltatar.unprefixed(member.path)
594         member.name = deltatar.unprefixed(member.name)
595         tarobj.extract(member)
596         tarobj.close()
597         fo.close()
598         assert self.hash['source_dir2/huge'] == self.md5sum('huge')
599
600         os.unlink("huge")
601
602
603     def test_restore_manual_from_index_twice (self):
604         """
605         Creates a full backup and restore the same file twice. This *must* fail
606         when encryption is active.
607
608         Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
609         backwards within the same file. This prevents the encryption layer from
610         exploding due to a reused IV in an overall valid archive.
611
612         This test anticipates possible future mistakes since it’s entirely
613         feasible to implement backward seeks for *_Stream* with concat mode.
614         """
615         # this test only works for uncompressed or concat compressed modes
616         if self.MODE.startswith("|") or self.MODE_COMPRESSES:
617             raise SkipTest("this test only works for uncompressed "
618                            "or concat compressed modes")
619
620         password, paramversion = self.ENCRYPTION or (None, None)
621         deltatar = DeltaTar(mode=self.MODE, password=password,
622                             crypto_paramversion=paramversion,
623                             logger=self.consoleLogger)
624
625         self.hash = dict()
626         os.makedirs("source_dir2")
627         self.hash["source_dir2/samefile"] = \
628             self.create_file("source_dir2/samefile", 1 * 1024)
629
630         # create first backup
631         deltatar.create_full_backup(
632             source_path="source_dir2",
633             backup_path="backup_dir")
634
635         assert os.path.exists("backup_dir")
636         assert os.path.exists(os.path.join("backup_dir",
637             deltatar.volume_name_func("backup_dir", True, 0)))
638
639         shutil.rmtree("source_dir2")
640
641         tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
642         tar_path = os.path.join("backup_dir", tar_filename)
643
644         index_filename = deltatar.index_name_func(True)
645         index_path = os.path.join("backup_dir", index_filename)
646
647         f = deltatar.open_auxiliary_file(index_path, "r")
648         offset = None
649         while True:
650             l = f.readline()
651             if not len(l):
652                 break
653             data = json.loads(l.decode("UTF-8"))
654             if data.get("type", "") == "file" and\
655                     deltatar.unprefixed(data["path"]) == "samefile":
656                 offset = data["offset"]
657                 break
658
659         assert offset is not None
660
661         fo = open(tar_path, "rb")
662         fo.seek(offset)
663
664         crypto_ctx = None
665         if self.ENCRYPTION is not None:
666             crypto_ctx = crypto.Decrypt (password)
667
668         tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
669                               encryption=crypto_ctx)
670         member = tarobj.next()
671         member.path = deltatar.unprefixed(member.path)
672         member.name = deltatar.unprefixed(member.name)
673
674         # extract once â€¦
675         tarobj.extract(member)
676         assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
677
678         # â€¦ and twice
679         try:
680             tarobj.extract(member)
681         except tarfile.StreamError:
682             if crypto_ctx is not None:
683                 pass # good: seeking backwards not allowed
684             else:
685                 raise
686         tarobj.close()
687         fo.close()
688         assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
689
690         os.unlink("samefile")
691
692
693     def test_restore_from_index(self):
694         '''
695         Restores a full backup using an index file.
696         '''
697         if self.MODE.startswith(':') or self.MODE.startswith('|'):
698             raise SkipTest('this test only works for uncompressed '
699                            'or concat compressed modes')
700
701         password, paramversion = self.ENCRYPTION or (None, None)
702         deltatar = DeltaTar(mode=self.MODE, password=password,
703                             crypto_paramversion=paramversion,
704                             logger=self.consoleLogger)
705
706         # create first backup
707         deltatar.create_full_backup(
708             source_path="source_dir",
709             backup_path="backup_dir")
710
711         shutil.rmtree("source_dir")
712
713         # this should automatically restore all volumes
714         index_filename = deltatar.index_name_func(True)
715         index_path = os.path.join("backup_dir", index_filename)
716
717         deltatar.restore_backup(target_path="source_dir",
718             backup_indexes_paths=[index_path])
719
720         for key, value in self.hash.items():
721             assert os.path.exists(key)
722             if value:
723                 assert value == self.md5sum(key)
724
725     def test_restore_multivol_from_index(self):
726         '''
727         Restores a full multivolume backup using an index file.
728         '''
729         if self.MODE.startswith(':') or self.MODE.startswith('|'):
730             raise SkipTest('this test only works for uncompressed '
731                            'or concat compressed modes')
732
733         password, paramversion = self.ENCRYPTION or (None, None)
734         deltatar = DeltaTar(mode=self.MODE, password=password,
735                             crypto_paramversion=paramversion,
736                             logger=self.consoleLogger)
737
738         # create first backup
739         deltatar.create_full_backup(
740             source_path="source_dir",
741             backup_path="backup_dir",
742             max_volume_size=2)
743
744         shutil.rmtree("source_dir")
745
746         # this should automatically restore all volumes
747         index_filename = deltatar.index_name_func(True)
748         index_path = os.path.join("backup_dir", index_filename)
749
750         deltatar.restore_backup(target_path="source_dir",
751             backup_indexes_paths=[index_path])
752
753         for key, value in self.hash.items():
754             assert os.path.exists(key)
755             if value:
756                 assert value == self.md5sum(key)
757
758     def test_create_basic_filtering(self):
759         '''
760         Tests create backup basic filtering.
761         '''
762         password, paramversion = self.ENCRYPTION or (None, None)
763         deltatar = DeltaTar(mode=self.MODE, password=password,
764                             crypto_paramversion=paramversion,
765                             logger=self.consoleLogger,
766                             included_files=["test", "small"],
767                             excluded_files=["test/huge"])
768
769         # create first backup
770         deltatar.create_full_backup(
771             source_path="source_dir",
772             backup_path="backup_dir")
773
774         assert os.path.exists("backup_dir")
775         shutil.rmtree("source_dir")
776
777         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
778         tar_path = os.path.join("backup_dir", tar_filename)
779
780         deltatar.restore_backup(target_path="source_dir",
781                                 backup_tar_path=tar_path)
782
783         assert os.path.exists("source_dir/small")
784         assert os.path.exists("source_dir/test")
785         assert os.path.exists("source_dir/test/huge2")
786         assert os.path.exists("source_dir/test/test2")
787
788         assert not os.path.exists("source_dir/test/huge")
789         assert not os.path.exists("source_dir/big")
790
791     def test_create_filter_func(self):
792         '''
793         Tests create backup basic filtering.
794         '''
795         visited_paths = []
796         def filter_func(visited_paths, path):
797             if path not in visited_paths:
798                 visited_paths.append(path)
799             return True
800
801         filter_func = partial(filter_func, visited_paths)
802
803         password, paramversion = self.ENCRYPTION or (None, None)
804         deltatar = DeltaTar(mode=self.MODE, password=password,
805                             crypto_paramversion=paramversion,
806                             logger=self.consoleLogger,
807                             included_files=["test", "small"],
808                             excluded_files=["test/huge"],
809                             filter_func=filter_func)
810
811         # create first backup
812         deltatar.create_full_backup(
813             source_path="source_dir",
814             backup_path="backup_dir")
815
816         assert os.path.exists("backup_dir")
817         shutil.rmtree("source_dir")
818
819         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
820         tar_path = os.path.join("backup_dir", tar_filename)
821
822         deltatar.restore_backup(target_path="source_dir",
823                                 backup_tar_path=tar_path)
824         assert set(visited_paths) == set([
825                 'small',
826                 'test',
827                 'test/huge2',
828                 'test/test2'
829             ])
830
831     def test_create_filter_out_func(self):
832         '''
833         Tests create backup basic filtering.
834         '''
835         visited_paths = []
836         def filter_func(visited_paths, path):
837             '''
838             Filter out everything
839             '''
840             if path not in visited_paths:
841                 visited_paths.append(path)
842             return False
843
844         filter_func = partial(filter_func, visited_paths)
845
846         password, paramversion = self.ENCRYPTION or (None, None)
847         deltatar = DeltaTar(mode=self.MODE, password=password,
848                             crypto_paramversion=paramversion,
849                             logger=self.consoleLogger,
850                             included_files=["test", "small"],
851                             excluded_files=["test/huge"],
852                             filter_func=filter_func)
853
854         # create first backup
855         deltatar.create_full_backup(
856             source_path="source_dir",
857             backup_path="backup_dir")
858
859         assert os.path.exists("backup_dir")
860         shutil.rmtree("source_dir")
861
862         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
863         tar_path = os.path.join("backup_dir", tar_filename)
864
865         deltatar.restore_backup(target_path="source_dir",
866                                 backup_tar_path=tar_path)
867         assert set(visited_paths) == set([
868                 'small',
869                 'test'
870             ])
871
872         # check that effectively no file was backed up
873         assert not os.path.exists("source_dir/small")
874         assert not os.path.exists("source_dir/big")
875         assert not os.path.exists("source_dir/test")
876
877     def test_restore_index_basic_filtering(self):
878         '''
879         Creates a backup, and then filter when doing the index based restore.
880         '''
881         if self.MODE.startswith(':') or self.MODE.startswith('|'):
882             raise SkipTest('this test only works for uncompressed '
883                            'or concat compressed modes')
884
885         password, paramversion = self.ENCRYPTION or (None, None)
886         deltatar = DeltaTar(mode=self.MODE, password=password,
887                             crypto_paramversion=paramversion,
888                             logger=self.consoleLogger)
889
890         # create first backup
891         deltatar.create_full_backup(
892             source_path="source_dir",
893             backup_path="backup_dir")
894
895         assert os.path.exists("backup_dir")
896         shutil.rmtree("source_dir")
897
898         index_filename = deltatar.index_name_func(True)
899         index_path = os.path.join("backup_dir", index_filename)
900
901         deltatar.included_files = ["test", "small"]
902         deltatar.excluded_files = ["test/huge"]
903         deltatar.restore_backup(target_path="source_dir",
904             backup_indexes_paths=[index_path])
905
906         assert os.path.exists("source_dir/small")
907         assert os.path.exists("source_dir/test")
908         assert os.path.exists("source_dir/test/huge2")
909         assert os.path.exists("source_dir/test/test2")
910
911         assert not os.path.exists("source_dir/test/huge")
912         assert not os.path.exists("source_dir/big")
913
914     def test_restore_index_filter_func(self):
915         '''
916         Creates a backup, and then filter when doing the index based restore,
917         using the filter function.
918         '''
919         if self.MODE.startswith(':') or self.MODE.startswith('|'):
920             raise SkipTest('this test only works for uncompressed '
921                            'or concat compressed modes')
922
923         visited_paths = []
924         def filter_func(visited_paths, path):
925             if path not in visited_paths:
926                 visited_paths.append(path)
927             return True
928
929         filter_func = partial(filter_func, visited_paths)
930
931         password, paramversion = self.ENCRYPTION or (None, None)
932         deltatar = DeltaTar(mode=self.MODE, password=password,
933                             crypto_paramversion=paramversion,
934                             logger=self.consoleLogger)
935
936         # create first backup
937         deltatar.create_full_backup(
938             source_path="source_dir",
939             backup_path="backup_dir")
940
941         assert os.path.exists("backup_dir")
942         shutil.rmtree("source_dir")
943
944         index_filename = deltatar.index_name_func(True)
945         index_path = os.path.join("backup_dir", index_filename)
946
947         deltatar.included_files = ["test", "small"]
948         deltatar.excluded_files = ["test/huge"]
949         deltatar.filter_func = filter_func
950         deltatar.restore_backup(target_path="source_dir",
951             backup_indexes_paths=[index_path])
952
953         assert set(visited_paths) == set([
954                 'small',
955                 'test',
956                 'test/huge2',
957                 'test/test2'
958             ])
959
960     def test_restore_tar_basic_filtering(self):
961         '''
962         Creates a backup, and then filter when doing the tar based restore.
963         '''
964         password, paramversion = self.ENCRYPTION or (None, None)
965         deltatar = DeltaTar(mode=self.MODE, password=password,
966                             crypto_paramversion=paramversion,
967                             logger=self.consoleLogger)
968
969         # create first backup
970         deltatar.create_full_backup(
971             source_path="source_dir",
972             backup_path="backup_dir")
973
974         assert os.path.exists("backup_dir")
975         shutil.rmtree("source_dir")
976
977         deltatar.included_files = ["test", "small"]
978         deltatar.excluded_files = ["test/huge"]
979
980         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
981         tar_path = os.path.join("backup_dir", tar_filename)
982
983         deltatar.restore_backup(target_path="source_dir",
984                                 backup_tar_path=tar_path)
985
986         assert os.path.exists("source_dir/small")
987         assert os.path.exists("source_dir/test")
988         assert os.path.exists("source_dir/test/huge2")
989         assert os.path.exists("source_dir/test/test2")
990
991         assert not os.path.exists("source_dir/test/huge")
992         assert not os.path.exists("source_dir/big")
993
994     def test_restore_tar_filter_func(self):
995         '''
996         Creates a backup, and then filter when doing the tar based restore,
997         using the filter function.
998         '''
999         visited_paths = []
1000         def filter_func(visited_paths, path):
1001             if path not in visited_paths:
1002                 visited_paths.append(path)
1003             return True
1004
1005         filter_func = partial(filter_func, visited_paths)
1006
1007         password, paramversion = self.ENCRYPTION or (None, None)
1008         deltatar = DeltaTar(mode=self.MODE, password=password,
1009                             crypto_paramversion=paramversion,
1010                             logger=self.consoleLogger)
1011
1012         # create first backup
1013         deltatar.create_full_backup(
1014             source_path="source_dir",
1015             backup_path="backup_dir")
1016
1017         assert os.path.exists("backup_dir")
1018         shutil.rmtree("source_dir")
1019
1020         index_filename = deltatar.index_name_func(True)
1021         index_path = os.path.join("backup_dir", index_filename)
1022
1023         deltatar.included_files = ["test", "small"]
1024         deltatar.excluded_files = ["test/huge"]
1025         deltatar.filter_func = filter_func
1026
1027         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1028         tar_path = os.path.join("backup_dir", tar_filename)
1029
1030         deltatar.restore_backup(target_path="source_dir",
1031                                 backup_tar_path=tar_path)
1032         assert set(visited_paths) == set([
1033                 'small',
1034                 'test',
1035                 'test/huge2',
1036                 'test/test2'
1037             ])
1038
1039     def test_filter_path_regexp(self):
1040         '''
1041         Test specifically the deltatar.filter_path function with regular
1042         expressions
1043         '''
1044         included_files = [
1045             re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1046             re.compile('^yes$'),
1047             'testing'
1048         ]
1049         excluded_files = [
1050             re.compile('^testing/in_the'),
1051         ]
1052         deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1053                             excluded_files=excluded_files)
1054
1055         # assert valid and invalid paths
1056         assert deltatar.filter_path('test/hola')
1057         assert deltatar.filter_path('test/hola/any/thing')
1058         assert deltatar.filter_path('test/caracola/caracolero')
1059         assert deltatar.filter_path('test/caracola/caracolero/yeah')
1060         assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1061         assert deltatar.filter_path('yes')
1062         assert deltatar.filter_path('testing')
1063         assert deltatar.filter_path('testing/yes')
1064         assert deltatar.filter_path('testing/in_th')
1065
1066         assert not deltatar.filter_path('something')
1067         assert not deltatar.filter_path('other/thing')
1068         assert not deltatar.filter_path('test_ing')
1069         assert not deltatar.filter_path('test/hola_lala')
1070         assert not deltatar.filter_path('test/agur')
1071         assert not deltatar.filter_path('testing_something')
1072         assert not deltatar.filter_path('yeso')
1073         assert not deltatar.filter_path('yes/o')
1074         assert not deltatar.filter_path('yes_o')
1075         assert not deltatar.filter_path('testing/in_the')
1076         assert not deltatar.filter_path('testing/in_the_field')
1077         assert not deltatar.filter_path('testing/in_the/field')
1078
1079     def test_filter_path_parent(self):
1080         '''
1081         Test specifically the deltatar.filter_path function for parent matching
1082         '''
1083         included_files = [
1084             'testing/path/to/some/thing'
1085         ]
1086         deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1087
1088         # assert valid and invalid paths
1089         assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1090         assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1091         assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1092         assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1093         assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1094         assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1095         assert deltatar.filter_path('testing/something/else') == NO_MATCH
1096
1097     def test_parent_matching_simple_full_backup(self):
1098         '''
1099         Create a full backup using parent matching
1100         '''
1101         included_files = [
1102             'test/huge2'
1103         ]
1104
1105         password, paramversion = self.ENCRYPTION or (None, None)
1106         deltatar = DeltaTar(mode=self.MODE, password=password,
1107                             crypto_paramversion=paramversion,
1108                             logger=self.consoleLogger,
1109                             included_files=included_files)
1110
1111         # create first backup
1112         deltatar.create_full_backup(
1113             source_path="source_dir",
1114             backup_path="backup_dir")
1115
1116         assert os.path.exists("backup_dir")
1117         shutil.rmtree("source_dir")
1118
1119         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1120         tar_path = os.path.join("backup_dir", tar_filename)
1121
1122         deltatar = DeltaTar(mode=self.MODE, password=password,
1123                             logger=self.consoleLogger)
1124         deltatar.restore_backup(target_path="source_dir",
1125                                 backup_tar_path=tar_path)
1126
1127         assert os.path.exists('source_dir/test/huge2')
1128         assert os.path.exists('source_dir/test/')
1129         assert not os.path.exists('source_dir/test/huge')
1130         assert not os.path.exists('source_dir/big')
1131         assert not os.path.exists('source_dir/small')
1132
1133     def test_parent_matching_simple_full_backup_restore(self):
1134         '''
1135         Create a full backup and restores it using parent matching
1136         '''
1137         included_files = [
1138             'test/huge2'
1139         ]
1140
1141         password, paramversion = self.ENCRYPTION or (None, None)
1142         deltatar = DeltaTar(mode=self.MODE, password=password,
1143                             crypto_paramversion=paramversion,
1144                             logger=self.consoleLogger)
1145
1146         # create first backup
1147         deltatar.create_full_backup(
1148             source_path="source_dir",
1149             backup_path="backup_dir")
1150
1151         assert os.path.exists("backup_dir")
1152         shutil.rmtree("source_dir")
1153
1154         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1155         tar_path = os.path.join("backup_dir", tar_filename)
1156
1157         deltatar = DeltaTar(mode=self.MODE, password=password,
1158                             logger=self.consoleLogger,
1159                             included_files=included_files)
1160         deltatar.restore_backup(target_path="source_dir",
1161                                 backup_tar_path=tar_path)
1162
1163         assert os.path.exists('source_dir/test/huge2')
1164         assert os.path.exists('source_dir/test/')
1165         assert not os.path.exists('source_dir/test/huge')
1166         assert not os.path.exists('source_dir/big')
1167         assert not os.path.exists('source_dir/small')
1168
1169     def test_parent_matching_index_full_backup_restore(self):
1170         '''
1171         Create a full backup and restores it using parent matching
1172         '''
1173         included_files = [
1174             'test/huge2'
1175         ]
1176
1177         password, paramversion = self.ENCRYPTION or (None, None)
1178         deltatar = DeltaTar(mode=self.MODE, password=password,
1179                             crypto_paramversion=paramversion,
1180                             logger=self.consoleLogger)
1181
1182         # create first backup
1183         deltatar.create_full_backup(
1184             source_path="source_dir",
1185             backup_path="backup_dir")
1186
1187         assert os.path.exists("backup_dir")
1188         shutil.rmtree("source_dir")
1189
1190         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1191         tar_path = os.path.join("backup_dir", tar_filename)
1192
1193         deltatar = DeltaTar(mode=self.MODE, password=password,
1194                             logger=self.consoleLogger,
1195                             included_files=included_files)
1196         deltatar.restore_backup(target_path="source_dir",
1197                                 backup_tar_path=tar_path)
1198
1199         assert os.path.exists('source_dir/test/huge2')
1200         assert os.path.exists('source_dir/test/')
1201         assert not os.path.exists('source_dir/test/huge')
1202         assert not os.path.exists('source_dir/big')
1203         assert not os.path.exists('source_dir/small')
1204
1205     def test_collate_iterators(self):
1206         '''
1207         Tests the collate iterators functionality with two exact directories,
1208         using an index iterator from a backup and the exact same source dir.
1209         '''
1210         password, paramversion = self.ENCRYPTION or (None, None)
1211         deltatar = DeltaTar(mode=self.MODE, password=password,
1212                             crypto_paramversion=paramversion,
1213                             logger=self.consoleLogger)
1214
1215         # create first backup
1216         deltatar.create_full_backup(
1217             source_path="source_dir",
1218             backup_path="backup_dir")
1219
1220         assert os.path.exists("backup_dir")
1221
1222         cwd = os.getcwd()
1223         index_filename = deltatar.index_name_func(is_full=True)
1224         index_path = os.path.join(cwd, "backup_dir", index_filename)
1225         index_it = deltatar.iterate_index_path(index_path)
1226
1227         os.chdir('source_dir')
1228         dir_it = deltatar._recursive_walk_dir('.')
1229         path_it = deltatar.jsonize_path_iterator(dir_it)
1230
1231         try:
1232             for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1233                 assert deltatar._equal_stat_dicts(path1, path2)
1234         finally:
1235             os.chdir(cwd)
1236
1237     def test_collate_iterators_diffdirs(self):
1238         '''
1239         Use the collate iterators functionality with two different directories.
1240         It must behave in an expected way.
1241         '''
1242         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1243
1244         password, paramversion = self.ENCRYPTION or (None, None)
1245         deltatar = DeltaTar(mode=self.MODE, password=password,
1246                             crypto_paramversion=paramversion,
1247                             logger=self.consoleLogger)
1248
1249         # create first backup
1250         deltatar.create_full_backup(
1251             source_path="source_dir",
1252             backup_path="backup_dir")
1253
1254         assert os.path.exists("backup_dir")
1255         self.hash["source_dir/z"]  = self.create_file("source_dir/z", 100)
1256
1257         cwd = os.getcwd()
1258         index_filename = deltatar.index_name_func(is_full=True)
1259         index_path = os.path.join(cwd, "backup_dir", index_filename)
1260         index_it = deltatar.iterate_index_path(index_path)
1261
1262         os.chdir('source_dir')
1263         dir_it = deltatar._recursive_walk_dir('.')
1264         path_it = deltatar.jsonize_path_iterator(dir_it)
1265
1266         try:
1267             for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1268                 if path2['path'] == 'z':
1269                     assert not path1
1270                 else:
1271                     assert deltatar._equal_stat_dicts(path1, path2)
1272         finally:
1273             os.chdir(cwd)
1274
1275     def test_collate_iterators_diffdirs2(self):
1276         '''
1277         Use the collate iterators functionality with two different directories.
1278         It must behave in an expected way.
1279         '''
1280         password, paramversion = self.ENCRYPTION or (None, None)
1281         deltatar = DeltaTar(mode=self.MODE, password=password,
1282                             crypto_paramversion=paramversion,
1283                             logger=self.consoleLogger)
1284
1285         # create first backup
1286         deltatar.create_full_backup(
1287             source_path="source_dir",
1288             backup_path="backup_dir")
1289
1290         assert os.path.exists("backup_dir")
1291
1292         # add some new files and directories
1293         os.makedirs('source_dir/bigdir')
1294         self.hash["source_dir/bigdir"] = ""
1295         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1296         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1297         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1298
1299         cwd = os.getcwd()
1300         index_filename = deltatar.index_name_func(is_full=True)
1301         index_path = os.path.join(cwd, "backup_dir", index_filename)
1302         index_it = deltatar.iterate_index_path(index_path)
1303
1304         os.chdir('source_dir')
1305         dir_it = deltatar._recursive_walk_dir('.')
1306         path_it = deltatar.jsonize_path_iterator(dir_it)
1307
1308         visited_pairs = []
1309
1310         try:
1311             for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1312                 visited_pairs.append(
1313                     (deltatar.unprefixed(path1['path']) if path1 else None,
1314                      path2['path'] if path2 else None)
1315                 )
1316         finally:
1317             assert visited_pairs == [
1318                 (u'big', u'big'),
1319                 (None, u'bigdir'),
1320                 (u'small', u'small'),
1321                 (u'test', u'test'),
1322                 (None, u'zzzz'),
1323                 (None, u'bigdir/a'),
1324                 (None, u'bigdir/b'),
1325                 (u'test/huge', u'test/huge'),
1326                 (u'test/huge2', u'test/huge2'),
1327                 (u'test/test2', u'test/test2'),
1328             ]
1329             os.chdir(cwd)
1330
1331     def test_create_empty_diff_backup(self):
1332         '''
1333         Creates an empty (no changes) backup diff
1334         '''
1335         password, paramversion = self.ENCRYPTION or (None, None)
1336         deltatar = DeltaTar(mode=self.MODE, password=password,
1337                             crypto_paramversion=paramversion,
1338                             logger=self.consoleLogger)
1339
1340         # create first backup
1341         deltatar.create_full_backup(
1342             source_path="source_dir",
1343             backup_path="backup_dir")
1344
1345         prev_index_filename = deltatar.index_name_func(is_full=True)
1346         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1347
1348         deltatar.create_diff_backup("source_dir", "backup_dir2",
1349                                     prev_index_path)
1350
1351         # check index items
1352         index_path = os.path.join("backup_dir2",
1353             deltatar.index_name_func(is_full=False))
1354         index_it = deltatar.iterate_index_path(index_path)
1355         n = 0
1356         for i in index_it:
1357             n += 1
1358             assert i[0]['path'].startswith("list://")
1359
1360         assert n == 6
1361
1362         # check the tar file
1363         assert os.path.exists("backup_dir2")
1364         shutil.rmtree("source_dir")
1365
1366         tar_filename = deltatar.volume_name_func('backup_dir2',
1367             is_full=False, volume_number=0)
1368         tar_path = os.path.join("backup_dir2", tar_filename)
1369
1370         # no file restored, because the diff was empty
1371         deltatar.restore_backup(target_path="source_dir",
1372                                 backup_tar_path=tar_path)
1373         assert len(os.listdir("source_dir")) == 0
1374
1375
1376     def test_create_diff_backup1(self):
1377         '''
1378         Creates a diff backup when there are new files
1379         '''
1380         password, paramversion = self.ENCRYPTION or (None, None)
1381         deltatar = DeltaTar(mode=self.MODE, password=password,
1382                             crypto_paramversion=paramversion,
1383                             logger=self.consoleLogger)
1384
1385         # create first backup
1386         deltatar.create_full_backup(
1387             source_path="source_dir",
1388             backup_path="backup_dir")
1389
1390         prev_index_filename = deltatar.index_name_func(is_full=True)
1391         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1392
1393         # add some new files and directories
1394         os.makedirs('source_dir/bigdir')
1395         self.hash["source_dir/bigdir"] = ""
1396         os.unlink("source_dir/small")
1397         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1398         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1399         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1400
1401         deltatar.create_diff_backup("source_dir", "backup_dir2",
1402                                     prev_index_path)
1403
1404         # check index items
1405         index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
1406         index_it = deltatar.iterate_index_path(index_path)
1407         l = [i[0]['path'] for i in index_it]
1408
1409         assert l == [
1410             'list://big',
1411             'snapshot://bigdir',
1412             'delete://small',
1413             'list://test',
1414             'snapshot://zzzz',
1415             'snapshot://bigdir/a',
1416             'snapshot://bigdir/b',
1417             'list://test/huge',
1418             'list://test/huge2',
1419             'list://test/test2',
1420         ]
1421
1422         # check the tar file
1423         assert os.path.exists("backup_dir2")
1424         shutil.rmtree("source_dir")
1425
1426         # create source_dir with the small file, that will be then deleted by
1427         # the restore_backup
1428         os.mkdir("source_dir")
1429         open("source_dir/small", 'wb').close()
1430
1431         tar_filename = deltatar.volume_name_func('backup_dir2',
1432             is_full=False, volume_number=0)
1433         tar_path = os.path.join("backup_dir2", tar_filename)
1434
1435         # restore the backup, this will create only the new files
1436         deltatar.restore_backup(target_path="source_dir",
1437                                 backup_tar_path=tar_path)
1438         # the order doesn't matter
1439         assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
1440
1441     def test_restore_from_index_diff_backup(self):
1442         '''
1443         Creates a full backup, modifies some files, creates a diff backup,
1444         then restores the diff backup from zero.
1445         '''
1446         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1447             raise SkipTest('this test only works for uncompressed '
1448                            'or concat compressed modes')
1449
1450         password, paramversion = self.ENCRYPTION or (None, None)
1451         deltatar = DeltaTar(mode=self.MODE, password=password,
1452                             crypto_paramversion=paramversion,
1453                             logger=self.consoleLogger)
1454
1455         # create first backup
1456         deltatar.create_full_backup(
1457             source_path="source_dir",
1458             backup_path="backup_dir")
1459
1460         prev_index_filename = deltatar.index_name_func(is_full=True)
1461         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1462
1463         # add some new files and directories
1464         os.makedirs('source_dir/bigdir')
1465         self.hash["source_dir/bigdir"] = ""
1466         os.unlink("source_dir/small")
1467         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1468         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1469         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1470
1471         deltatar.create_diff_backup("source_dir", "backup_dir2",
1472                                     prev_index_path)
1473
1474         # apply diff backup in target_dir
1475         index_filename = deltatar.index_name_func(is_full=False)
1476         index_path = os.path.join("backup_dir2", index_filename)
1477         deltatar.restore_backup("target_dir",
1478             backup_indexes_paths=[index_path, prev_index_path])
1479
1480         # then compare the two directories source_dir and target_dir and check
1481         # they are the same
1482         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1483
1484     def test_restore_from_index_diff_backup2(self):
1485         '''
1486         Creates a full backup, modifies some files, creates a diff backup,
1487         then restores the diff backup with the full backup as a starting point.
1488         '''
1489         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1490             raise SkipTest('this test only works for uncompressed '
1491                            'or concat compressed modes')
1492
1493         password, paramversion = self.ENCRYPTION or (None, None)
1494         deltatar = DeltaTar(mode=self.MODE, password=password,
1495                             crypto_paramversion=paramversion,
1496                             logger=self.consoleLogger)
1497
1498         # create first backup
1499         deltatar.create_full_backup(
1500             source_path="source_dir",
1501             backup_path="backup_dir")
1502
1503         prev_index_filename = deltatar.index_name_func(is_full=True)
1504         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1505
1506         # add some new files and directories
1507         os.makedirs('source_dir/bigdir')
1508         self.hash["source_dir/bigdir"] = ""
1509         os.unlink("source_dir/small")
1510         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1511         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1512         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1513         shutil.rmtree("source_dir/test")
1514
1515         deltatar.create_diff_backup("source_dir", "backup_dir2",
1516                                     prev_index_path)
1517
1518         # first restore initial backup in target_dir
1519         tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1520         tar_path = os.path.join("backup_dir", tar_filename)
1521         deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1522
1523         # then apply diff backup in target_dir
1524         index_filename = deltatar.index_name_func(is_full=False)
1525         index_path = os.path.join("backup_dir2", index_filename)
1526         deltatar.restore_backup("target_dir",
1527             backup_indexes_paths=[index_path, prev_index_path])
1528
1529         # then compare the two directories source_dir and target_dir and check
1530         # they are the same
1531         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1532
1533     def test_restore_from_index_diff_backup3(self):
1534         '''
1535         Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1536         diff backup, then restores the diff backup with the full backup as a
1537         starting point.
1538         '''
1539         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1540             raise SkipTest('this test only works for uncompressed '
1541                            'or concat compressed modes')
1542
1543         password, paramversion = self.ENCRYPTION or (None, None)
1544         deltatar = DeltaTar(mode=self.MODE, password=password,
1545                             crypto_paramversion=paramversion,
1546                             logger=self.consoleLogger)
1547
1548         shutil.rmtree("source_dir")
1549         shutil.copytree(self.GIT_DIR, "source_dir")
1550         shutil.copytree(self.GIT_DIR, "source_dir_diff")
1551
1552         # create first backup
1553         deltatar.create_full_backup(
1554             source_path="source_dir",
1555             backup_path="backup_dir")
1556
1557         prev_index_filename = deltatar.index_name_func(is_full=True)
1558         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1559
1560         # alter the source_dir randomly
1561         source_it = deltatar._recursive_walk_dir('source_dir_diff')
1562
1563         for path in source_it:
1564             # if path doesn't exist (might have previously removed) ignore it.
1565             # also ignore it (i.e. do not change it) 70% of the time
1566             if not os.path.exists(path) or random.random() < 0.7:
1567                 continue
1568
1569             # remove the file
1570             if os.path.isdir(path):
1571                 shutil.rmtree(path)
1572             else:
1573                 os.unlink(path)
1574
1575         deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1576                                     prev_index_path)
1577
1578         # first restore initial backup in target_dir
1579         tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1580         tar_path = os.path.join("backup_dir", tar_filename)
1581         deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1582
1583         # and check that target_dir equals to source_dir (which is the same as
1584         # self.GIT_DIR initially)
1585         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1586
1587         # then apply diff backup in target_dir
1588         index_filename = deltatar.index_name_func(is_full=False)
1589         index_path = os.path.join("backup_dir2", index_filename)
1590         deltatar.restore_backup("target_dir",
1591             backup_indexes_paths=[index_path, prev_index_path])
1592
1593         # and check that target_dir equals to source_dir_diff (the randomly
1594         # altered self.GIT_DIR directory)
1595         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1596
1597         # then delete target_dir and apply diff backup from zero and check again
1598         shutil.rmtree("target_dir")
1599         deltatar.restore_backup("target_dir",
1600             backup_indexes_paths=[index_path, prev_index_path])
1601
1602         # and check that target_dir equals to source_dir_diff (the randomly
1603         # altered self.GIT_DIR directory)
1604         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1605
1606     def test_restore_from_index_diff_backup3_multivol(self):
1607         '''
1608         Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1609         diff backup, then restores the diff backup with the full backup as a
1610         starting point.
1611         '''
1612         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1613             raise SkipTest('this test only works for uncompressed '
1614                            'or concat compressed modes')
1615
1616         password, paramversion = self.ENCRYPTION or (None, None)
1617         deltatar = DeltaTar(mode=self.MODE, password=password,
1618                             crypto_paramversion=paramversion,
1619                             logger=self.consoleLogger)
1620
1621         shutil.rmtree("source_dir")
1622         shutil.copytree(self.GIT_DIR, "source_dir")
1623         shutil.copytree(self.GIT_DIR, "source_dir_diff")
1624
1625         # create first backup
1626         deltatar.create_full_backup(
1627             source_path="source_dir",
1628             backup_path="backup_dir",
1629             max_volume_size=1)
1630
1631         prev_index_filename = deltatar.index_name_func(is_full=True)
1632         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1633
1634         # alter the source_dir randomly
1635         source_it = deltatar._recursive_walk_dir('source_dir_diff')
1636
1637         for path in source_it:
1638             # if path doesn't exist (might have previously removed) ignore it.
1639             # also ignore it (i.e. do not change it) 70% of the time
1640             if not os.path.exists(path) or random.random() < 0.7:
1641                 continue
1642
1643             # remove the file
1644             if os.path.isdir(path):
1645                 shutil.rmtree(path)
1646             else:
1647                 os.unlink(path)
1648
1649         deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1650                                     prev_index_path, max_volume_size=1)
1651
1652         # first restore initial backup in target_dir
1653         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1654         tar_path = os.path.join("backup_dir", tar_filename)
1655         deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1656
1657         # and check that target_dir equals to source_dir (which is the same as
1658         # self.GIT_DIR initially)
1659         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1660
1661         # then apply diff backup in target_dir
1662         index_filename = deltatar.index_name_func(is_full=False)
1663         index_path = os.path.join("backup_dir2", index_filename)
1664         deltatar.restore_backup("target_dir",
1665             backup_indexes_paths=[index_path, prev_index_path])
1666
1667         # and check that target_dir equals to source_dir_diff (the randomly
1668         # altered self.GIT_DIR directory)
1669         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1670
1671         # then delete target_dir and apply diff backup from zero and check again
1672         shutil.rmtree("target_dir")
1673         deltatar.restore_backup("target_dir",
1674             backup_indexes_paths=[index_path, prev_index_path])
1675
1676         # and check that target_dir equals to source_dir_diff (the randomly
1677         # altered self.GIT_DIR directory)
1678         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1679
1680     def check_equal_dirs(self, path1, path2, deltatar):
1681         '''
1682         compare the two directories source_dir and target_dir and check
1683         # they are the same
1684         '''
1685         source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
1686         source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
1687         target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
1688         target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1689         while True:
1690             try:
1691                 sitem = next(source_it)
1692                 titem = next(target_it)
1693             except StopIteration:
1694                 try:
1695                     titem = next(target_it)
1696                     raise Exception("iterators do not stop at the same time")
1697                 except StopIteration:
1698                     break
1699             try:
1700                 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
1701             except Exception as e:
1702                 print("SITEM: " + str(sitem))
1703                 print("TITEM: " + str(titem))
1704                 raise e
1705
1706     def test_create_no_symlinks(self):
1707         '''
1708         Creates a full backup from different varieties of symlinks. The
1709         extracted archive may not contain any symlinks but the file contents
1710         '''
1711
1712         os.system("rm -rf source_dir")
1713         os.makedirs("source_dir/symlinks")
1714         fd = os.open("source_dir/symlinks/valid_linkname",
1715                      os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1716         os.write(fd, b"valid link target for symlink tests; please ignore\n")
1717         os.close(fd)
1718         # first one is good, the rest points nowhere
1719         self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1720         self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1721         self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1722         self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
1723         password, paramversion = self.ENCRYPTION or (None, None)
1724         deltatar = DeltaTar(mode=self.MODE, password=password,
1725                             crypto_paramversion=paramversion,
1726                             logger=self.consoleLogger)
1727
1728         # create first backup
1729         deltatar.create_full_backup(source_path="source_dir",
1730                                     backup_path="backup_dir")
1731
1732         assert os.path.exists("backup_dir")
1733         shutil.rmtree("source_dir")
1734         assert not os.path.exists("source_dir")
1735
1736         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1737         tar_path = os.path.join("backup_dir", tar_filename)
1738
1739         deltatar.restore_backup(target_path="source_dir",
1740                                 backup_tar_path=tar_path)
1741
1742         for _r, _ds, fs in os.walk("source_dir/symlinks"):
1743         # only the valid link plus the linked file may be found in the
1744         # extracted archive
1745             assert len(fs) == 2
1746             for f in fs:
1747                 # the link must have been resolved and file contents must match
1748                 # the linked file
1749                 assert not os.path.islink(f)
1750                 with open("source_dir/symlinks/valid_linkname") as a:
1751                     with open("source_dir/symlinks/whatever") as b:
1752                         assert a.read() == b.read()
1753
1754     def test_restore_with_symlinks(self):
1755         '''
1756         Creates a full backup containing different varieties of symlinks. All
1757         of them must be filtered out.
1758         '''
1759         password, paramversion = self.ENCRYPTION or (None, None)
1760         deltatar = DeltaTar(mode=self.MODE, password=password,
1761                             crypto_paramversion=paramversion,
1762                             logger=self.consoleLogger)
1763
1764         # create first backup
1765         deltatar.create_full_backup(source_path="source_dir",
1766                                     backup_path="backup_dir")
1767
1768         assert os.path.exists("backup_dir")
1769         shutil.rmtree("source_dir")
1770
1771         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1772         tar_path = os.path.join("backup_dir", tar_filename)
1773
1774         # add symlinks to existing archive
1775
1776         def add_symlink (a, name, dst):
1777             l = tarfile.TarInfo("snapshot://%s" % name)
1778             l.type = tarfile.SYMTYPE
1779             l.linkname = dst
1780             a.addfile(l)
1781             return name
1782
1783         try:
1784             with tarfile.open(tar_path,mode="a") as a:
1785                 checkme = \
1786                     [ add_symlink(a, "symlinks/foo", "internal-file")
1787                     , add_symlink(a, "symlinks/bar", "/absolute/path")
1788                     , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1789         except tarfile.ReadError as e:
1790             if self.MODE == '#' or self.MODE.endswith ("gz"):
1791                 checkme = []
1792             else:
1793                 raise
1794         except ValueError as e:
1795             if self.MODE.startswith ('#'):
1796                 checkme = []
1797             else:
1798                 raise
1799
1800         deltatar.restore_backup(target_path="source_dir",
1801                                 backup_tar_path=tar_path)
1802
1803         # check what happened to our symlinks
1804         for name in checkme:
1805             fullpath = os.path.join("source_dir", name)
1806             assert not os.path.exists(fullpath)
1807
1808     def test_restore_malicious_symlinks(self):
1809         '''
1810         Creates a full backup containing a symlink and a file of the same name.
1811         This simulates a symlink attack with a link pointing to some external
1812         path that is abused to write outside the extraction prefix.
1813         '''
1814         password, paramversion = self.ENCRYPTION or (None, None)
1815         deltatar = DeltaTar(mode=self.MODE, password=password,
1816                             crypto_paramversion=paramversion,
1817                             logger=self.consoleLogger)
1818
1819         # create first backup
1820         deltatar.create_full_backup(source_path="source_dir",
1821                                     backup_path="backup_dir")
1822
1823         assert os.path.exists("backup_dir")
1824         shutil.rmtree("source_dir")
1825
1826         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1827         tar_path = os.path.join("backup_dir", tar_filename)
1828
1829         # add symlinks to existing archive
1830
1831         def add_symlink (a, name, dst):
1832             l = tarfile.TarInfo("snapshot://%s" % name)
1833             l.type = tarfile.SYMTYPE
1834             l.linkname = dst
1835             a.addfile(l)
1836
1837         def add_file (a, name):
1838             f = tarfile.TarInfo("snapshot://%s" % name)
1839             f.type = tarfile.REGTYPE
1840             a.addfile(f)
1841
1842         testpath = "symlinks/pernicious-link"
1843         testdst = "/tmp/does/not/exist"
1844
1845         try:
1846             with tarfile.open(tar_path, mode="a") as a:
1847                 add_symlink(a, testpath, testdst)
1848                 add_symlink(a, testpath, testdst+"X")
1849                 add_symlink(a, testpath, testdst+"XXX")
1850                 add_file(a, testpath)
1851         except tarfile.ReadError as e:
1852             if self.MODE == '#' or self.MODE.endswith ("gz"):
1853                 pass
1854             else:
1855                 raise
1856         except ValueError as e:
1857             if self.MODE.startswith ('#'):
1858                 pass # O_APPEND of concat archives not feasible
1859             else:
1860                 raise
1861
1862         deltatar.restore_backup(target_path="source_dir",
1863                                 backup_tar_path=tar_path)
1864
1865         # check whether the link was extracted; deltatar seems to only ever
1866         # retrieve the first item it finds for a given path which in the case
1867         # at hand is a symlink to some non-existent path
1868         fullpath = os.path.join("source_dir", testpath)
1869         assert not os.path.exists(fullpath)
1870
1871 class DeltaTar2Test(DeltaTarTest):
1872     '''
1873     Same as DeltaTar but with specific ":" mode
1874     '''
1875     MODE = ':'
1876
1877
1878 class DeltaTarStreamTest(DeltaTarTest):
1879     '''
1880     Same as DeltaTar but with specific uncompressed stream mode
1881     '''
1882     MODE = '|'
1883
1884
1885 class DeltaTarGzipTest(DeltaTarTest):
1886     '''
1887     Same as DeltaTar but with specific gzip mode
1888     '''
1889     MODE = ':gz'
1890     MODE_COMPRESSES = True
1891
1892
1893 class DeltaTarGzipStreamTest(DeltaTarTest):
1894     '''
1895     Same as DeltaTar but with specific gzip stream mode
1896     '''
1897     MODE = '|gz'
1898     MODE_COMPRESSES = True
1899
1900
1901 @skip('Bz2 tests are too slow..')
1902 class DeltaTarBz2Test(DeltaTarTest):
1903     '''
1904     Same as DeltaTar but with specific bz2 mode
1905     '''
1906     MODE = ':bz2'
1907     MODE_COMPRESSES = True
1908
1909
1910 @skip('Bz2 tests are too slow..')
1911 class DeltaTarBz2StreamTest(DeltaTarTest):
1912     '''
1913     Same as DeltaTar but with specific bz2 stream mode
1914     '''
1915     MODE = '|bz2'
1916     MODE_COMPRESSES = True
1917
1918
1919 class DeltaTarGzipConcatTest(DeltaTarTest):
1920     '''
1921     Same as DeltaTar but with specific gzip concat stream mode
1922     '''
1923     MODE = '#gz'
1924     MODE_COMPRESSES = True
1925
1926
1927 class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
1928     '''
1929     Same as DeltaTar but with specific gzip aes128 concat stream mode
1930     '''
1931     MODE = '#gz'
1932     ENCRYPTION = ('some magic key', 1)
1933     MODE_COMPRESSES = True
1934
1935
1936 class DeltaTarAes128ConcatTest(DeltaTarTest):
1937     '''
1938     Same as DeltaTar but with specific aes128 concat stream mode
1939     '''
1940     MODE = '#'
1941     ENCRYPTION = ('some magic key', 1)
1942
1943