2350e7c9e851fd1463667df0135294f75652111d
[python-delta-tar] / testing / test_deltatar.py
1 # Copyright (C) 2013 Intra2net AG
2 #
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU Lesser General Public License as published
5 # by the Free Software Foundation; either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Lesser General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see
15 # <http://www.gnu.org/licenses/lgpl-3.0.html>
16
17 # Author: Eduardo Robles Elvira <edulix@wadobo.com>
18
19 import errno
20 import os
21 import re
22 import random
23 import shutil
24 import logging
25 import binascii
26 import json
27 from datetime import datetime
28 from functools import partial
29 from unittest import skip, SkipTest
30
31 import deltatar.tarfile as tarfile
32 from deltatar.tarfile import TarFile
33 from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
34 from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35 import deltatar.crypto as crypto
36
37 from . import BaseTest
38 from . import new_volume_handler
39
40 class DeltaTarTest(BaseTest):
41     """
42     Test backups
43     """
44     MODE = ''
45     MODE_COMPRESSES = False
46
47     ENCRYPTION = None  # (password : str, paramversion : int) option
48
49     GIT_DIR = '.git'
50
51     def setUp(self):
52         '''
53         Create base test data
54         '''
55         self.pwd = os.getcwd()
56         os.system('rm -rf target_dir source_dir* backup_dir* huge')
57         os.makedirs('source_dir/test/test2')
58         self.hash = dict()
59         self.hash["source_dir/test/test2"] = ''
60         self.hash["source_dir/big"]  = self.create_file("source_dir/big", 50000)
61         self.hash["source_dir/small"]  = self.create_file("source_dir/small", 100)
62         self.hash["source_dir/test/huge"]  = self.create_file("source_dir/test/huge", 700000)
63         self.hash["source_dir/test/huge2"]  = self.create_file("source_dir/test/huge2", 800000)
64
65         self.consoleLogger = logging.StreamHandler()
66         self.consoleLogger.setLevel(logging.DEBUG)
67
68         if not os.path.isdir(self.GIT_DIR):
69             # Not running inside git tree, take our
70             # own testing directory as source.
71             self.GIT_DIR = 'testing'
72
73             if not os.path.isdir(self.GIT_DIR):
74                 raise Exception('No input directory found: ' + self.GIT_DIR)
75
76     def tearDown(self):
77         '''
78         Remove temporal files created by unit tests and reset globals.
79         '''
80         os.chdir(self.pwd)
81         os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
82         _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
83                   ("I am fully aware that this will void my warranty.")
84
85     def test_restore_simple_full_backup(self):
86         '''
87         Creates a full backup without any filtering and restores it.
88         '''
89         password, paramversion = self.ENCRYPTION or (None, None)
90         deltatar = DeltaTar(mode=self.MODE, password=password,
91                             crypto_paramversion=paramversion,
92                             logger=self.consoleLogger)
93
94         # create first backup
95         deltatar.create_full_backup(
96             source_path="source_dir",
97             backup_path="backup_dir")
98
99         assert os.path.exists("backup_dir")
100         shutil.rmtree("source_dir")
101
102         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
103         tar_path = os.path.join("backup_dir", tar_filename)
104
105         deltatar.restore_backup(target_path="source_dir",
106                                 backup_tar_path=tar_path)
107
108         for key, value in self.hash.items():
109             assert os.path.exists(key)
110             if value:
111                 assert value == self.md5sum(key)
112
113
114     def test_create_backup_max_file_length (self):
115         """
116         Creates a full backup including one file that exceeds the (purposely
117         lowered) upper bound on GCM encrypted objects. This will yield multiple
118         encrypted objects for one plaintext file.
119
120         Success is verified by splitting the archive at object boundaries and
121         counting the parts.
122         """
123         if self.MODE_COMPRESSES is True:
124             raise SkipTest ("GCM file length test not meaningful with compression.")
125         if self.ENCRYPTION is None:
126             raise SkipTest ("GCM file length applies only to encrypted backups.")
127
128         new_max = 20000 # cannot be less than tar block size
129         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
130                 ("I am fully aware that this will void my warranty.",
131                  new_max)
132
133         password, paramversion = self.ENCRYPTION
134         deltatar = DeltaTar (mode=self.MODE, password=password,
135                              crypto_paramversion=paramversion,
136                              logger=self.consoleLogger)
137
138         self.hash = dict ()
139         os.makedirs ("source_dir2")
140         for f, s in [("empty"          , 0)             # 1 tar objects
141                     ,("slightly_larger", new_max + 1)   # 2
142                     ,("twice"          , 2 * new_max)   # 3
143                     ]:
144             f = "source_dir2/%s" % f
145             self.hash [f] = self.create_file (f, s)
146
147         deltatar.create_full_backup \
148                 (source_path="source_dir2", backup_path="backup_dir")
149
150         assert os.path.exists ("backup_dir")
151         shutil.rmtree ("source_dir2")
152
153         backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
154         backup_path     = os.path.join("backup_dir", backup_filename)
155
156         # split the resulting archive into its constituents without
157         # decrypting
158         ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
159                         "-o backup_dir/split <\'%s\'" % backup_path)
160
161         assert os.path.exists ("backup_dir/split")
162
163         dents = os.listdir ("backup_dir/split")
164         assert len (dents) == 6
165
166
167     def test_restore_backup_max_file_length (self):
168         """
169         Creates a full backup including one file that exceeds the (purposely
170         lowered) upper bound on GCM encrypted objects. This will yield two
171         encrypted objects for one plaintext file.
172
173         Success is verified by splitting the archive at object boundaries and
174         counting the parts.
175         """
176         if self.MODE_COMPRESSES is True:
177             raise SkipTest ("GCM file length test not meaningful with compression.")
178         if self.ENCRYPTION is None:
179             raise SkipTest ("GCM file length applies only to encrypted backups.")
180
181         new_max = 20000 # cannot be less than tar block size
182         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
183                     ("I am fully aware that this will void my warranty.",
184                      new_max)
185
186         password, paramversion = self.ENCRYPTION
187         deltatar = DeltaTar (mode=self.MODE, password=password,
188                              crypto_paramversion=paramversion,
189                              logger=self.consoleLogger)
190
191         self.hash = dict ()
192         os.makedirs ("source_dir2")
193         for f, s in [("empty"          , 0)             # 1 tar objects
194                     ,("almost_large"   , new_max - 1)   # 2
195                     ,("large"          , new_max)       # 3
196                     ,("slightly_larger", new_max + 1)   # 4
197                     ,("twice"          , 2 * new_max)   # 5
198                     ,("twice_plus_one" , (2 * new_max) + 1)   # 6
199                     ]:
200             f = "source_dir2/%s" % f
201             self.hash [f] = self.create_file (f, s)
202
203         deltatar.create_full_backup \
204                 (source_path="source_dir2", backup_path="backup_dir")
205
206         assert os.path.exists ("backup_dir")
207         shutil.rmtree ("source_dir2")
208
209         backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
210         backup_path     = os.path.join("backup_dir", backup_filename)
211
212         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
213         tar_path = os.path.join("backup_dir", tar_filename)
214
215         deltatar.restore_backup(target_path="source_dir2",
216                                 backup_tar_path=tar_path)
217
218         for key, value in self.hash.items():
219             assert os.path.exists(key)
220             if value:
221                 assert value == self.md5sum(key)
222
223
224     def test_create_backup_index_max_file_length (self):
225         """
226         Creates a full backup with a too large index file for the upper bound
227         of the GCM encryption. Since the index file has a fixed IV file counter
228         of AES_GCM_IV_CNT_INDEX, we expect the crypto layer to abort.
229
230         60+ GB of (potentially compressed) index file should last for a while...
231         """
232         if self.MODE_COMPRESSES is True:
233             raise SkipTest ("GCM file length test not meaningful with compression.")
234         if self.ENCRYPTION is None:
235             raise SkipTest ("GCM file length applies only to encrypted backups.")
236
237         new_max = 5000
238         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
239                     ("I am fully aware that this will void my warranty.",
240                      new_max)
241
242         password, paramversion = self.ENCRYPTION
243         deltatar = DeltaTar (mode=self.MODE, password=password,
244                              crypto_paramversion=paramversion,
245                              logger=self.consoleLogger)
246
247         self.hash = dict ()
248         os.makedirs ("source_dir2")
249         for i in range (42):
250             f = "source_dir2/dummy_%rd" % i
251             self.hash [f] = self.create_file (f, i)
252
253         with self.assertRaises (crypto.InvalidFileCounter):
254             deltatar.create_full_backup \
255                     (source_path="source_dir2", backup_path="backup_dir")
256         shutil.rmtree ("source_dir2")
257
258
259     def test_check_index_checksum(self):
260         '''
261         Creates a full backup and checks the index' checksum of files
262         '''
263         password, paramversion = self.ENCRYPTION or (None, None)
264         deltatar = DeltaTar(mode=self.MODE, password=password,
265                             crypto_paramversion=paramversion,
266                             logger=self.consoleLogger)
267
268         # create first backup
269         deltatar.create_full_backup(
270             source_path="source_dir",
271             backup_path="backup_dir")
272
273
274         index_filename = deltatar.index_name_func(True)
275         index_path = os.path.join("backup_dir", index_filename)
276
277         f = open(index_path, 'rb')
278         crc = None
279         checked = False
280         began_list = False
281         while True:
282             l = f.readline()
283             if l == b'':
284                 break
285             if b'BEGIN-FILE-LIST' in l:
286                 crc = binascii.crc32(l) & 0xFFFFffff
287                 began_list = True
288             elif b'END-FILE-LIST' in l:
289                 crc = binascii.crc32(l, crc) & 0xffffffff
290
291                 # next line contains the crc
292                 data = json.loads(f.readline().decode("UTF-8"))
293                 assert data['type'] == 'file-list-checksum'
294                 assert data['checksum'] == crc
295                 checked = True
296                 break
297             elif began_list:
298                 crc = binascii.crc32(l, crc) & 0xffffffff
299         f.close()
300
301
302     def test_restore_multivol(self):
303         '''
304         Creates a full backup without any filtering with multiple volumes and
305         restore it.
306         '''
307         if ':gz' in self.MODE:
308             raise SkipTest('compression information is lost when creating '
309                            'multiple volumes with no Stream')
310
311         password, paramversion = self.ENCRYPTION or (None, None)
312         deltatar = DeltaTar(mode=self.MODE, password=password,
313                             crypto_paramversion=paramversion,
314                             logger=self.consoleLogger)
315
316         self.hash = dict()
317         os.makedirs('source_dir2')
318         self.hash["source_dir2/big"]  = self.create_file("source_dir2/big", 100000)
319         self.hash["source_dir2/huge"]  = self.create_file("source_dir2/huge", 1200000)
320
321         # create first backup
322         deltatar.create_full_backup(
323             source_path="source_dir2",
324             backup_path="backup_dir",
325             max_volume_size=1)
326
327         assert os.path.exists("backup_dir")
328         assert os.path.exists(os.path.join("backup_dir",
329             deltatar.volume_name_func("backup_dir", True, 0)))
330         if self.MODE_COMPRESSES:
331             n_vols = 1
332         else:
333             n_vols = 2
334         for i_vol in range(n_vols):
335             assert os.path.exists(os.path.join("backup_dir",
336                 deltatar.volume_name_func("backup_dir", True, i_vol)))
337         assert not os.path.exists(os.path.join("backup_dir",
338             deltatar.volume_name_func("backup_dir", True, n_vols)))
339
340         shutil.rmtree("source_dir2")
341
342         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
343         tar_path = os.path.join("backup_dir", tar_filename)
344
345         # this should automatically restore all volumes
346         deltatar.restore_backup(target_path="source_dir2",
347                                 backup_tar_path=tar_path)
348
349         for key, value in self.hash.items():
350             assert os.path.exists(key)
351             if value:
352                 assert value == self.md5sum(key)
353
354     def test_restore_multivol_split(self):
355         '''
356         Creates a full backup without any filtering with multiple volumes
357         with big files bigger than the max volume size and
358         restore it.
359         '''
360         if self.MODE.startswith(':') or self.MODE.startswith('|'):
361             raise SkipTest('this test only works for uncompressed '
362                            'or concat compressed modes')
363
364         password, paramversion = self.ENCRYPTION or (None, None)
365         deltatar = DeltaTar(mode=self.MODE, password=password,
366                             crypto_paramversion=paramversion,
367                             logger=self.consoleLogger)
368
369         self.hash = dict()
370         os.makedirs('source_dir2')
371         self.hash["source_dir2/big"]  = self.create_file("source_dir2/big", 3*1024*1024)
372         self.hash["source_dir2/huge"]  = self.create_file("source_dir2/huge", 4*1024*1024)
373         self.hash["source_dir2/huge2"]  = self.create_file("source_dir2/huge2", 4*1024*1024)
374
375         # create first backup
376         deltatar.create_full_backup(
377             source_path="source_dir2",
378             backup_path="backup_dir",
379             max_volume_size=2)
380
381         assert os.path.exists("backup_dir")
382         assert os.path.exists(os.path.join("backup_dir",
383             deltatar.volume_name_func("backup_dir", True, 0)))
384         if self.MODE_COMPRESSES:
385             n_vols = 1
386         else:
387             n_vols = 6
388         for i_vol in range(n_vols):
389             assert os.path.exists(os.path.join("backup_dir",
390                 deltatar.volume_name_func("backup_dir", True, i_vol)))
391         assert not os.path.exists(os.path.join("backup_dir",
392             deltatar.volume_name_func("backup_dir", True, n_vols)))
393
394         shutil.rmtree("source_dir2")
395
396         index_filename = deltatar.index_name_func(True)
397         index_path = os.path.join("backup_dir", index_filename)
398
399         deltatar.restore_backup(target_path="source_dir2",
400             backup_indexes_paths=[index_path])
401
402         for key, value in self.hash.items():
403             assert os.path.exists(key)
404             if value:
405                 assert value == self.md5sum(key)
406
407
408     def test_full_backup_index_extra_data(self):
409         '''
410         Tests that the index file for a full backup can store extra_data and
411         that this data can be retrieved.
412         '''
413         password, paramversion = self.ENCRYPTION or (None, None)
414         deltatar = DeltaTar(mode=self.MODE, password=password,
415                             crypto_paramversion=paramversion,
416                             logger=self.consoleLogger)
417
418         extra_data = dict(
419             hola="caracola",
420             otra_cosa=[1, "lista"],
421             y_otra=dict(bola=1.1)
422         )
423
424         deltatar.create_full_backup(
425             source_path="source_dir",
426             backup_path="backup_dir",
427             extra_data=extra_data)
428
429         index_filename = deltatar.index_name_func(is_full=True)
430         index_path = os.path.join("backup_dir", index_filename)
431
432         # iterate_index_path retrieves extra_data, and thus we can then compare
433         index_it = deltatar.iterate_index_path(index_path)
434         self.assertEqual(index_it.extra_data, extra_data)
435
436
437     def test_diff_backup_index_extra_data(self):
438         '''
439         Tests that the index file for a diff backup can store extra_data and
440         that this data can be retrieved.
441         '''
442         password, paramversion = self.ENCRYPTION or (None, None)
443         deltatar = DeltaTar(mode=self.MODE, password=password,
444                             crypto_paramversion=paramversion,
445                             logger=self.consoleLogger)
446
447         extra_data = dict(
448             hola="caracola",
449             otra_cosa=[1, "lista"],
450             y_otra=dict(bola=1.1)
451         )
452         # do first backup
453         deltatar.create_full_backup(
454             source_path="source_dir",
455             backup_path="backup_dir")
456
457
458         prev_index_filename = deltatar.index_name_func(is_full=True)
459         prev_index_path = os.path.join("backup_dir", prev_index_filename)
460
461         # create empty diff backup
462         deltatar.create_diff_backup("source_dir", "backup_dir2",
463                                     prev_index_path, extra_data=extra_data)
464
465         index_filename = deltatar.index_name_func(is_full=False)
466         index_path = os.path.join("backup_dir2", index_filename)
467
468         # iterate_index_path retrieves extra_data, and thus we can then compare
469         index_it = deltatar.iterate_index_path(index_path)
470         self.assertEqual(index_it.extra_data, extra_data)
471
472     def test_restore_multivol2(self):
473         '''
474         Creates a full backup without any filtering with multiple volumes and
475         restore it.
476         '''
477         password, paramversion = self.ENCRYPTION or (None, None)
478         deltatar = DeltaTar(mode=self.MODE, password=password,
479                             crypto_paramversion=paramversion,
480                             logger=self.consoleLogger)
481
482         shutil.copytree(self.GIT_DIR, "source_dir2")
483
484         # create first backup
485         deltatar.create_full_backup(
486             source_path="source_dir2",
487             backup_path="backup_dir",
488             max_volume_size=1)
489
490         assert os.path.exists("backup_dir")
491         assert os.path.exists(os.path.join("backup_dir",
492             deltatar.volume_name_func("backup_dir", True, 0)))
493
494         shutil.rmtree("source_dir2")
495
496         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
497         tar_path = os.path.join("backup_dir", tar_filename)
498
499         # this should automatically restore all volumes
500         deltatar.restore_backup(target_path="source_dir2",
501                                 backup_tar_path=tar_path)
502
503         self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
504
505     def test_restore_multivol_manual_from_index(self):
506         '''
507         Creates a full backup without any filtering with multiple volumes and
508         restore it.
509         '''
510         # this test only works for uncompressed or concat compressed modes
511         if self.MODE.startswith(':') or self.MODE.startswith('|'):
512             raise SkipTest('this test only works for uncompressed '
513                            'or concat compressed modes')
514
515         password, paramversion = self.ENCRYPTION or (None, None)
516         deltatar = DeltaTar(mode=self.MODE, password=password,
517                             crypto_paramversion=paramversion,
518                             logger=self.consoleLogger)
519
520         self.hash = dict()
521         os.makedirs('source_dir2')
522         self.hash["source_dir2/big"]  = self.create_file("source_dir2/big", 100000)
523         self.hash["source_dir2/huge"]  = self.create_file("source_dir2/huge", 1200000)
524
525         # create first backup
526         deltatar.create_full_backup(
527             source_path="source_dir2",
528             backup_path="backup_dir",
529             max_volume_size=1)
530
531         assert os.path.exists("backup_dir")
532         assert os.path.exists(os.path.join("backup_dir",
533             deltatar.volume_name_func("backup_dir", True, 0)))
534         if self.MODE_COMPRESSES:
535             n_vols = 1
536         else:
537             n_vols = 2
538         for i_vol in range(n_vols):
539             assert os.path.exists(os.path.join("backup_dir",
540                 deltatar.volume_name_func("backup_dir", True, i_vol)))
541         assert not os.path.exists(os.path.join("backup_dir",
542             deltatar.volume_name_func("backup_dir", True, n_vols)))
543
544         shutil.rmtree("source_dir2")
545
546         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
547         tar_path = os.path.join("backup_dir", tar_filename)
548
549         index_filename = deltatar.index_name_func(True)
550         index_path = os.path.join("backup_dir", index_filename)
551
552         # this should automatically restore the huge file
553         f = deltatar.open_auxiliary_file(index_path, 'r')
554         offset = None
555         while True:
556             l = f.readline()
557             if not len(l):
558                 break
559             data = json.loads(l.decode('UTF-8'))
560             if data.get('type', '') == 'file' and\
561                     deltatar.unprefixed(data['path']) == "huge":
562                 offset = data['offset']
563                 break
564
565         assert offset is not None
566
567         fo = open(tar_path, 'rb')
568         fo.seek(offset)
569         def new_volume_handler(mode, tarobj, base_name, volume_number):
570             suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
571             if self.ENCRYPTION is not None:
572                 # deltatar module is shadowed here
573                 suf += "." + deltatar_PDTCRYPT_EXTENSION
574             tarobj.open_volume(datetime.now().strftime(
575                 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
576         new_volume_handler = partial(new_volume_handler, self.MODE)
577
578         crypto_ctx = None
579         if self.ENCRYPTION is not None:
580             crypto_ctx = crypto.Decrypt (password)
581
582         tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
583                               new_volume_handler=new_volume_handler,
584                               encryption=crypto_ctx)
585
586         member = tarobj.next()
587         member.path = deltatar.unprefixed(member.path)
588         member.name = deltatar.unprefixed(member.name)
589         tarobj.extract(member)
590         tarobj.close()
591         fo.close()
592         assert self.hash['source_dir2/huge'] == self.md5sum('huge')
593
594         os.unlink("huge")
595
596
597     def test_restore_manual_from_index_twice (self):
598         """
599         Creates a full backup and restore the same file twice. This *must* fail
600         when encryption is active.
601
602         Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
603         backwards within the same file. This prevents the encryption layer from
604         exploding due to a reused IV in an overall valid archive.
605
606         This test anticipates possible future mistakes since it’s entirely
607         feasible to implement backward seeks for *_Stream* with concat mode.
608         """
609         # this test only works for uncompressed or concat compressed modes
610         if self.MODE.startswith("|") or self.MODE_COMPRESSES:
611             raise SkipTest("this test only works for uncompressed "
612                            "or concat compressed modes")
613
614         password, paramversion = self.ENCRYPTION or (None, None)
615         deltatar = DeltaTar(mode=self.MODE, password=password,
616                             crypto_paramversion=paramversion,
617                             logger=self.consoleLogger)
618
619         self.hash = dict()
620         os.makedirs("source_dir2")
621         self.hash["source_dir2/samefile"] = \
622             self.create_file("source_dir2/samefile", 1 * 1024)
623
624         # create first backup
625         deltatar.create_full_backup(
626             source_path="source_dir2",
627             backup_path="backup_dir")
628
629         assert os.path.exists("backup_dir")
630         assert os.path.exists(os.path.join("backup_dir",
631             deltatar.volume_name_func("backup_dir", True, 0)))
632
633         shutil.rmtree("source_dir2")
634
635         tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
636         tar_path = os.path.join("backup_dir", tar_filename)
637
638         index_filename = deltatar.index_name_func(True)
639         index_path = os.path.join("backup_dir", index_filename)
640
641         f = deltatar.open_auxiliary_file(index_path, "r")
642         offset = None
643         while True:
644             l = f.readline()
645             if not len(l):
646                 break
647             data = json.loads(l.decode("UTF-8"))
648             if data.get("type", "") == "file" and\
649                     deltatar.unprefixed(data["path"]) == "samefile":
650                 offset = data["offset"]
651                 break
652
653         assert offset is not None
654
655         fo = open(tar_path, "rb")
656         fo.seek(offset)
657
658         crypto_ctx = None
659         if self.ENCRYPTION is not None:
660             crypto_ctx = crypto.Decrypt (password)
661
662         tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
663                               encryption=crypto_ctx)
664         member = tarobj.next()
665         member.path = deltatar.unprefixed(member.path)
666         member.name = deltatar.unprefixed(member.name)
667
668         # extract once â€¦
669         tarobj.extract(member)
670         assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
671
672         # â€¦ and twice
673         try:
674             tarobj.extract(member)
675         except tarfile.StreamError:
676             if crypto_ctx is not None:
677                 pass # good: seeking backwards not allowed
678             else:
679                 raise
680         tarobj.close()
681         fo.close()
682         assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
683
684         os.unlink("samefile")
685
686
687     def test_restore_from_index(self):
688         '''
689         Restores a full backup using an index file.
690         '''
691         if self.MODE.startswith(':') or self.MODE.startswith('|'):
692             raise SkipTest('this test only works for uncompressed '
693                            'or concat compressed modes')
694
695         password, paramversion = self.ENCRYPTION or (None, None)
696         deltatar = DeltaTar(mode=self.MODE, password=password,
697                             crypto_paramversion=paramversion,
698                             logger=self.consoleLogger)
699
700         # create first backup
701         deltatar.create_full_backup(
702             source_path="source_dir",
703             backup_path="backup_dir")
704
705         shutil.rmtree("source_dir")
706
707         # this should automatically restore all volumes
708         index_filename = deltatar.index_name_func(True)
709         index_path = os.path.join("backup_dir", index_filename)
710
711         deltatar.restore_backup(target_path="source_dir",
712             backup_indexes_paths=[index_path])
713
714         for key, value in self.hash.items():
715             assert os.path.exists(key)
716             if value:
717                 assert value == self.md5sum(key)
718
719     def test_restore_multivol_from_index(self):
720         '''
721         Restores a full multivolume backup using an index file.
722         '''
723         if self.MODE.startswith(':') or self.MODE.startswith('|'):
724             raise SkipTest('this test only works for uncompressed '
725                            'or concat compressed modes')
726
727         password, paramversion = self.ENCRYPTION or (None, None)
728         deltatar = DeltaTar(mode=self.MODE, password=password,
729                             crypto_paramversion=paramversion,
730                             logger=self.consoleLogger)
731
732         # create first backup
733         deltatar.create_full_backup(
734             source_path="source_dir",
735             backup_path="backup_dir",
736             max_volume_size=2)
737
738         shutil.rmtree("source_dir")
739
740         # this should automatically restore all volumes
741         index_filename = deltatar.index_name_func(True)
742         index_path = os.path.join("backup_dir", index_filename)
743
744         deltatar.restore_backup(target_path="source_dir",
745             backup_indexes_paths=[index_path])
746
747         for key, value in self.hash.items():
748             assert os.path.exists(key)
749             if value:
750                 assert value == self.md5sum(key)
751
752     def test_create_basic_filtering(self):
753         '''
754         Tests create backup basic filtering.
755         '''
756         password, paramversion = self.ENCRYPTION or (None, None)
757         deltatar = DeltaTar(mode=self.MODE, password=password,
758                             crypto_paramversion=paramversion,
759                             logger=self.consoleLogger,
760                             included_files=["test", "small"],
761                             excluded_files=["test/huge"])
762
763         # create first backup
764         deltatar.create_full_backup(
765             source_path="source_dir",
766             backup_path="backup_dir")
767
768         assert os.path.exists("backup_dir")
769         shutil.rmtree("source_dir")
770
771         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
772         tar_path = os.path.join("backup_dir", tar_filename)
773
774         deltatar.restore_backup(target_path="source_dir",
775                                 backup_tar_path=tar_path)
776
777         assert os.path.exists("source_dir/small")
778         assert os.path.exists("source_dir/test")
779         assert os.path.exists("source_dir/test/huge2")
780         assert os.path.exists("source_dir/test/test2")
781
782         assert not os.path.exists("source_dir/test/huge")
783         assert not os.path.exists("source_dir/big")
784
785     def test_create_filter_func(self):
786         '''
787         Tests create backup basic filtering.
788         '''
789         visited_paths = []
790         def filter_func(visited_paths, path):
791             if path not in visited_paths:
792                 visited_paths.append(path)
793             return True
794
795         filter_func = partial(filter_func, visited_paths)
796
797         password, paramversion = self.ENCRYPTION or (None, None)
798         deltatar = DeltaTar(mode=self.MODE, password=password,
799                             crypto_paramversion=paramversion,
800                             logger=self.consoleLogger,
801                             included_files=["test", "small"],
802                             excluded_files=["test/huge"],
803                             filter_func=filter_func)
804
805         # create first backup
806         deltatar.create_full_backup(
807             source_path="source_dir",
808             backup_path="backup_dir")
809
810         assert os.path.exists("backup_dir")
811         shutil.rmtree("source_dir")
812
813         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
814         tar_path = os.path.join("backup_dir", tar_filename)
815
816         deltatar.restore_backup(target_path="source_dir",
817                                 backup_tar_path=tar_path)
818         assert set(visited_paths) == set([
819                 'small',
820                 'test',
821                 'test/huge2',
822                 'test/test2'
823             ])
824
825     def test_create_filter_out_func(self):
826         '''
827         Tests create backup basic filtering.
828         '''
829         visited_paths = []
830         def filter_func(visited_paths, path):
831             '''
832             Filter out everything
833             '''
834             if path not in visited_paths:
835                 visited_paths.append(path)
836             return False
837
838         filter_func = partial(filter_func, visited_paths)
839
840         password, paramversion = self.ENCRYPTION or (None, None)
841         deltatar = DeltaTar(mode=self.MODE, password=password,
842                             crypto_paramversion=paramversion,
843                             logger=self.consoleLogger,
844                             included_files=["test", "small"],
845                             excluded_files=["test/huge"],
846                             filter_func=filter_func)
847
848         # create first backup
849         deltatar.create_full_backup(
850             source_path="source_dir",
851             backup_path="backup_dir")
852
853         assert os.path.exists("backup_dir")
854         shutil.rmtree("source_dir")
855
856         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
857         tar_path = os.path.join("backup_dir", tar_filename)
858
859         deltatar.restore_backup(target_path="source_dir",
860                                 backup_tar_path=tar_path)
861         assert set(visited_paths) == set([
862                 'small',
863                 'test'
864             ])
865
866         # check that effectively no file was backed up
867         assert not os.path.exists("source_dir/small")
868         assert not os.path.exists("source_dir/big")
869         assert not os.path.exists("source_dir/test")
870
871     def test_restore_index_basic_filtering(self):
872         '''
873         Creates a backup, and then filter when doing the index based restore.
874         '''
875         if self.MODE.startswith(':') or self.MODE.startswith('|'):
876             raise SkipTest('this test only works for uncompressed '
877                            'or concat compressed modes')
878
879         password, paramversion = self.ENCRYPTION or (None, None)
880         deltatar = DeltaTar(mode=self.MODE, password=password,
881                             crypto_paramversion=paramversion,
882                             logger=self.consoleLogger)
883
884         # create first backup
885         deltatar.create_full_backup(
886             source_path="source_dir",
887             backup_path="backup_dir")
888
889         assert os.path.exists("backup_dir")
890         shutil.rmtree("source_dir")
891
892         index_filename = deltatar.index_name_func(True)
893         index_path = os.path.join("backup_dir", index_filename)
894
895         deltatar.included_files = ["test", "small"]
896         deltatar.excluded_files = ["test/huge"]
897         deltatar.restore_backup(target_path="source_dir",
898             backup_indexes_paths=[index_path])
899
900         assert os.path.exists("source_dir/small")
901         assert os.path.exists("source_dir/test")
902         assert os.path.exists("source_dir/test/huge2")
903         assert os.path.exists("source_dir/test/test2")
904
905         assert not os.path.exists("source_dir/test/huge")
906         assert not os.path.exists("source_dir/big")
907
908     def test_restore_index_filter_func(self):
909         '''
910         Creates a backup, and then filter when doing the index based restore,
911         using the filter function.
912         '''
913         if self.MODE.startswith(':') or self.MODE.startswith('|'):
914             raise SkipTest('this test only works for uncompressed '
915                            'or concat compressed modes')
916
917         visited_paths = []
918         def filter_func(visited_paths, path):
919             if path not in visited_paths:
920                 visited_paths.append(path)
921             return True
922
923         filter_func = partial(filter_func, visited_paths)
924
925         password, paramversion = self.ENCRYPTION or (None, None)
926         deltatar = DeltaTar(mode=self.MODE, password=password,
927                             crypto_paramversion=paramversion,
928                             logger=self.consoleLogger)
929
930         # create first backup
931         deltatar.create_full_backup(
932             source_path="source_dir",
933             backup_path="backup_dir")
934
935         assert os.path.exists("backup_dir")
936         shutil.rmtree("source_dir")
937
938         index_filename = deltatar.index_name_func(True)
939         index_path = os.path.join("backup_dir", index_filename)
940
941         deltatar.included_files = ["test", "small"]
942         deltatar.excluded_files = ["test/huge"]
943         deltatar.filter_func = filter_func
944         deltatar.restore_backup(target_path="source_dir",
945             backup_indexes_paths=[index_path])
946
947         assert set(visited_paths) == set([
948                 'small',
949                 'test',
950                 'test/huge2',
951                 'test/test2'
952             ])
953
954     def test_restore_tar_basic_filtering(self):
955         '''
956         Creates a backup, and then filter when doing the tar based restore.
957         '''
958         password, paramversion = self.ENCRYPTION or (None, None)
959         deltatar = DeltaTar(mode=self.MODE, password=password,
960                             crypto_paramversion=paramversion,
961                             logger=self.consoleLogger)
962
963         # create first backup
964         deltatar.create_full_backup(
965             source_path="source_dir",
966             backup_path="backup_dir")
967
968         assert os.path.exists("backup_dir")
969         shutil.rmtree("source_dir")
970
971         deltatar.included_files = ["test", "small"]
972         deltatar.excluded_files = ["test/huge"]
973
974         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
975         tar_path = os.path.join("backup_dir", tar_filename)
976
977         deltatar.restore_backup(target_path="source_dir",
978                                 backup_tar_path=tar_path)
979
980         assert os.path.exists("source_dir/small")
981         assert os.path.exists("source_dir/test")
982         assert os.path.exists("source_dir/test/huge2")
983         assert os.path.exists("source_dir/test/test2")
984
985         assert not os.path.exists("source_dir/test/huge")
986         assert not os.path.exists("source_dir/big")
987
988     def test_restore_tar_filter_func(self):
989         '''
990         Creates a backup, and then filter when doing the tar based restore,
991         using the filter function.
992         '''
993         visited_paths = []
994         def filter_func(visited_paths, path):
995             if path not in visited_paths:
996                 visited_paths.append(path)
997             return True
998
999         filter_func = partial(filter_func, visited_paths)
1000
1001         password, paramversion = self.ENCRYPTION or (None, None)
1002         deltatar = DeltaTar(mode=self.MODE, password=password,
1003                             crypto_paramversion=paramversion,
1004                             logger=self.consoleLogger)
1005
1006         # create first backup
1007         deltatar.create_full_backup(
1008             source_path="source_dir",
1009             backup_path="backup_dir")
1010
1011         assert os.path.exists("backup_dir")
1012         shutil.rmtree("source_dir")
1013
1014         index_filename = deltatar.index_name_func(True)
1015         index_path = os.path.join("backup_dir", index_filename)
1016
1017         deltatar.included_files = ["test", "small"]
1018         deltatar.excluded_files = ["test/huge"]
1019         deltatar.filter_func = filter_func
1020
1021         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1022         tar_path = os.path.join("backup_dir", tar_filename)
1023
1024         deltatar.restore_backup(target_path="source_dir",
1025                                 backup_tar_path=tar_path)
1026         assert set(visited_paths) == set([
1027                 'small',
1028                 'test',
1029                 'test/huge2',
1030                 'test/test2'
1031             ])
1032
1033     def test_filter_path_regexp(self):
1034         '''
1035         Test specifically the deltatar.filter_path function with regular
1036         expressions
1037         '''
1038         included_files = [
1039             re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1040             re.compile('^yes$'),
1041             'testing'
1042         ]
1043         excluded_files = [
1044             re.compile('^testing/in_the'),
1045         ]
1046         deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1047                             excluded_files=excluded_files)
1048
1049         # assert valid and invalid paths
1050         assert deltatar.filter_path('test/hola')
1051         assert deltatar.filter_path('test/hola/any/thing')
1052         assert deltatar.filter_path('test/caracola/caracolero')
1053         assert deltatar.filter_path('test/caracola/caracolero/yeah')
1054         assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1055         assert deltatar.filter_path('yes')
1056         assert deltatar.filter_path('testing')
1057         assert deltatar.filter_path('testing/yes')
1058         assert deltatar.filter_path('testing/in_th')
1059
1060         assert not deltatar.filter_path('something')
1061         assert not deltatar.filter_path('other/thing')
1062         assert not deltatar.filter_path('test_ing')
1063         assert not deltatar.filter_path('test/hola_lala')
1064         assert not deltatar.filter_path('test/agur')
1065         assert not deltatar.filter_path('testing_something')
1066         assert not deltatar.filter_path('yeso')
1067         assert not deltatar.filter_path('yes/o')
1068         assert not deltatar.filter_path('yes_o')
1069         assert not deltatar.filter_path('testing/in_the')
1070         assert not deltatar.filter_path('testing/in_the_field')
1071         assert not deltatar.filter_path('testing/in_the/field')
1072
1073     def test_filter_path_parent(self):
1074         '''
1075         Test specifically the deltatar.filter_path function for parent matching
1076         '''
1077         included_files = [
1078             'testing/path/to/some/thing'
1079         ]
1080         deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1081
1082         # assert valid and invalid paths
1083         assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1084         assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1085         assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1086         assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1087         assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1088         assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1089         assert deltatar.filter_path('testing/something/else') == NO_MATCH
1090
1091     def test_parent_matching_simple_full_backup(self):
1092         '''
1093         Create a full backup using parent matching
1094         '''
1095         included_files = [
1096             'test/huge2'
1097         ]
1098
1099         password, paramversion = self.ENCRYPTION or (None, None)
1100         deltatar = DeltaTar(mode=self.MODE, password=password,
1101                             crypto_paramversion=paramversion,
1102                             logger=self.consoleLogger,
1103                             included_files=included_files)
1104
1105         # create first backup
1106         deltatar.create_full_backup(
1107             source_path="source_dir",
1108             backup_path="backup_dir")
1109
1110         assert os.path.exists("backup_dir")
1111         shutil.rmtree("source_dir")
1112
1113         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1114         tar_path = os.path.join("backup_dir", tar_filename)
1115
1116         deltatar = DeltaTar(mode=self.MODE, password=password,
1117                             logger=self.consoleLogger)
1118         deltatar.restore_backup(target_path="source_dir",
1119                                 backup_tar_path=tar_path)
1120
1121         assert os.path.exists('source_dir/test/huge2')
1122         assert os.path.exists('source_dir/test/')
1123         assert not os.path.exists('source_dir/test/huge')
1124         assert not os.path.exists('source_dir/big')
1125         assert not os.path.exists('source_dir/small')
1126
1127     def test_parent_matching_simple_full_backup_restore(self):
1128         '''
1129         Create a full backup and restores it using parent matching
1130         '''
1131         included_files = [
1132             'test/huge2'
1133         ]
1134
1135         password, paramversion = self.ENCRYPTION or (None, None)
1136         deltatar = DeltaTar(mode=self.MODE, password=password,
1137                             crypto_paramversion=paramversion,
1138                             logger=self.consoleLogger)
1139
1140         # create first backup
1141         deltatar.create_full_backup(
1142             source_path="source_dir",
1143             backup_path="backup_dir")
1144
1145         assert os.path.exists("backup_dir")
1146         shutil.rmtree("source_dir")
1147
1148         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1149         tar_path = os.path.join("backup_dir", tar_filename)
1150
1151         deltatar = DeltaTar(mode=self.MODE, password=password,
1152                             logger=self.consoleLogger,
1153                             included_files=included_files)
1154         deltatar.restore_backup(target_path="source_dir",
1155                                 backup_tar_path=tar_path)
1156
1157         assert os.path.exists('source_dir/test/huge2')
1158         assert os.path.exists('source_dir/test/')
1159         assert not os.path.exists('source_dir/test/huge')
1160         assert not os.path.exists('source_dir/big')
1161         assert not os.path.exists('source_dir/small')
1162
1163     def test_parent_matching_index_full_backup_restore(self):
1164         '''
1165         Create a full backup and restores it using parent matching
1166         '''
1167         included_files = [
1168             'test/huge2'
1169         ]
1170
1171         password, paramversion = self.ENCRYPTION or (None, None)
1172         deltatar = DeltaTar(mode=self.MODE, password=password,
1173                             crypto_paramversion=paramversion,
1174                             logger=self.consoleLogger)
1175
1176         # create first backup
1177         deltatar.create_full_backup(
1178             source_path="source_dir",
1179             backup_path="backup_dir")
1180
1181         assert os.path.exists("backup_dir")
1182         shutil.rmtree("source_dir")
1183
1184         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1185         tar_path = os.path.join("backup_dir", tar_filename)
1186
1187         deltatar = DeltaTar(mode=self.MODE, password=password,
1188                             logger=self.consoleLogger,
1189                             included_files=included_files)
1190         deltatar.restore_backup(target_path="source_dir",
1191                                 backup_tar_path=tar_path)
1192
1193         assert os.path.exists('source_dir/test/huge2')
1194         assert os.path.exists('source_dir/test/')
1195         assert not os.path.exists('source_dir/test/huge')
1196         assert not os.path.exists('source_dir/big')
1197         assert not os.path.exists('source_dir/small')
1198
1199     def test_collate_iterators(self):
1200         '''
1201         Tests the collate iterators functionality with two exact directories,
1202         using an index iterator from a backup and the exact same source dir.
1203         '''
1204         password, paramversion = self.ENCRYPTION or (None, None)
1205         deltatar = DeltaTar(mode=self.MODE, password=password,
1206                             crypto_paramversion=paramversion,
1207                             logger=self.consoleLogger)
1208
1209         # create first backup
1210         deltatar.create_full_backup(
1211             source_path="source_dir",
1212             backup_path="backup_dir")
1213
1214         assert os.path.exists("backup_dir")
1215
1216         cwd = os.getcwd()
1217         index_filename = deltatar.index_name_func(is_full=True)
1218         index_path = os.path.join(cwd, "backup_dir", index_filename)
1219         index_it = deltatar.iterate_index_path(index_path)
1220
1221         os.chdir('source_dir')
1222         dir_it = deltatar._recursive_walk_dir('.')
1223         path_it = deltatar.jsonize_path_iterator(dir_it)
1224
1225         try:
1226             for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1227                 assert deltatar._equal_stat_dicts(path1, path2)
1228         finally:
1229             os.chdir(cwd)
1230
1231     def test_collate_iterators_diffdirs(self):
1232         '''
1233         Use the collate iterators functionality with two different directories.
1234         It must behave in an expected way.
1235         '''
1236         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1237
1238         password, paramversion = self.ENCRYPTION or (None, None)
1239         deltatar = DeltaTar(mode=self.MODE, password=password,
1240                             crypto_paramversion=paramversion,
1241                             logger=self.consoleLogger)
1242
1243         # create first backup
1244         deltatar.create_full_backup(
1245             source_path="source_dir",
1246             backup_path="backup_dir")
1247
1248         assert os.path.exists("backup_dir")
1249         self.hash["source_dir/z"]  = self.create_file("source_dir/z", 100)
1250
1251         cwd = os.getcwd()
1252         index_filename = deltatar.index_name_func(is_full=True)
1253         index_path = os.path.join(cwd, "backup_dir", index_filename)
1254         index_it = deltatar.iterate_index_path(index_path)
1255
1256         os.chdir('source_dir')
1257         dir_it = deltatar._recursive_walk_dir('.')
1258         path_it = deltatar.jsonize_path_iterator(dir_it)
1259
1260         try:
1261             for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1262                 if path2['path'] == 'z':
1263                     assert not path1
1264                 else:
1265                     assert deltatar._equal_stat_dicts(path1, path2)
1266         finally:
1267             os.chdir(cwd)
1268
1269     def test_collate_iterators_diffdirs2(self):
1270         '''
1271         Use the collate iterators functionality with two different directories.
1272         It must behave in an expected way.
1273         '''
1274         password, paramversion = self.ENCRYPTION or (None, None)
1275         deltatar = DeltaTar(mode=self.MODE, password=password,
1276                             crypto_paramversion=paramversion,
1277                             logger=self.consoleLogger)
1278
1279         # create first backup
1280         deltatar.create_full_backup(
1281             source_path="source_dir",
1282             backup_path="backup_dir")
1283
1284         assert os.path.exists("backup_dir")
1285
1286         # add some new files and directories
1287         os.makedirs('source_dir/bigdir')
1288         self.hash["source_dir/bigdir"] = ""
1289         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1290         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1291         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1292
1293         cwd = os.getcwd()
1294         index_filename = deltatar.index_name_func(is_full=True)
1295         index_path = os.path.join(cwd, "backup_dir", index_filename)
1296         index_it = deltatar.iterate_index_path(index_path)
1297
1298         os.chdir('source_dir')
1299         dir_it = deltatar._recursive_walk_dir('.')
1300         path_it = deltatar.jsonize_path_iterator(dir_it)
1301
1302         visited_pairs = []
1303
1304         try:
1305             for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1306                 visited_pairs.append(
1307                     (deltatar.unprefixed(path1['path']) if path1 else None,
1308                      path2['path'] if path2 else None)
1309                 )
1310         finally:
1311             assert visited_pairs == [
1312                 (u'big', u'big'),
1313                 (None, u'bigdir'),
1314                 (u'small', u'small'),
1315                 (u'test', u'test'),
1316                 (None, u'zzzz'),
1317                 (None, u'bigdir/a'),
1318                 (None, u'bigdir/b'),
1319                 (u'test/huge', u'test/huge'),
1320                 (u'test/huge2', u'test/huge2'),
1321                 (u'test/test2', u'test/test2'),
1322             ]
1323             os.chdir(cwd)
1324
1325     def test_create_empty_diff_backup(self):
1326         '''
1327         Creates an empty (no changes) backup diff
1328         '''
1329         password, paramversion = self.ENCRYPTION or (None, None)
1330         deltatar = DeltaTar(mode=self.MODE, password=password,
1331                             crypto_paramversion=paramversion,
1332                             logger=self.consoleLogger)
1333
1334         # create first backup
1335         deltatar.create_full_backup(
1336             source_path="source_dir",
1337             backup_path="backup_dir")
1338
1339         prev_index_filename = deltatar.index_name_func(is_full=True)
1340         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1341
1342         deltatar.create_diff_backup("source_dir", "backup_dir2",
1343                                     prev_index_path)
1344
1345         # check index items
1346         index_path = os.path.join("backup_dir2",
1347             deltatar.index_name_func(is_full=False))
1348         index_it = deltatar.iterate_index_path(index_path)
1349         n = 0
1350         for i in index_it:
1351             n += 1
1352             assert i[0]['path'].startswith("list://")
1353
1354         assert n == 6
1355
1356         # check the tar file
1357         assert os.path.exists("backup_dir2")
1358         shutil.rmtree("source_dir")
1359
1360         tar_filename = deltatar.volume_name_func('backup_dir2',
1361             is_full=False, volume_number=0)
1362         tar_path = os.path.join("backup_dir2", tar_filename)
1363
1364         # no file restored, because the diff was empty
1365         deltatar.restore_backup(target_path="source_dir",
1366                                 backup_tar_path=tar_path)
1367         assert len(os.listdir("source_dir")) == 0
1368
1369
1370     def test_create_diff_backup1(self):
1371         '''
1372         Creates a diff backup when there are new files
1373         '''
1374         password, paramversion = self.ENCRYPTION or (None, None)
1375         deltatar = DeltaTar(mode=self.MODE, password=password,
1376                             crypto_paramversion=paramversion,
1377                             logger=self.consoleLogger)
1378
1379         # create first backup
1380         deltatar.create_full_backup(
1381             source_path="source_dir",
1382             backup_path="backup_dir")
1383
1384         prev_index_filename = deltatar.index_name_func(is_full=True)
1385         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1386
1387         # add some new files and directories
1388         os.makedirs('source_dir/bigdir')
1389         self.hash["source_dir/bigdir"] = ""
1390         os.unlink("source_dir/small")
1391         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1392         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1393         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1394
1395         deltatar.create_diff_backup("source_dir", "backup_dir2",
1396                                     prev_index_path)
1397
1398         # check index items
1399         index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
1400         index_it = deltatar.iterate_index_path(index_path)
1401         l = [i[0]['path'] for i in index_it]
1402
1403         assert l == [
1404             'list://big',
1405             'snapshot://bigdir',
1406             'delete://small',
1407             'list://test',
1408             'snapshot://zzzz',
1409             'snapshot://bigdir/a',
1410             'snapshot://bigdir/b',
1411             'list://test/huge',
1412             'list://test/huge2',
1413             'list://test/test2',
1414         ]
1415
1416         # check the tar file
1417         assert os.path.exists("backup_dir2")
1418         shutil.rmtree("source_dir")
1419
1420         # create source_dir with the small file, that will be then deleted by
1421         # the restore_backup
1422         os.mkdir("source_dir")
1423         open("source_dir/small", 'wb').close()
1424
1425         tar_filename = deltatar.volume_name_func('backup_dir2',
1426             is_full=False, volume_number=0)
1427         tar_path = os.path.join("backup_dir2", tar_filename)
1428
1429         # restore the backup, this will create only the new files
1430         deltatar.restore_backup(target_path="source_dir",
1431                                 backup_tar_path=tar_path)
1432         # the order doesn't matter
1433         assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
1434
1435     def test_restore_from_index_diff_backup(self):
1436         '''
1437         Creates a full backup, modifies some files, creates a diff backup,
1438         then restores the diff backup from zero.
1439         '''
1440         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1441             raise SkipTest('this test only works for uncompressed '
1442                            'or concat compressed modes')
1443
1444         password, paramversion = self.ENCRYPTION or (None, None)
1445         deltatar = DeltaTar(mode=self.MODE, password=password,
1446                             crypto_paramversion=paramversion,
1447                             logger=self.consoleLogger)
1448
1449         # create first backup
1450         deltatar.create_full_backup(
1451             source_path="source_dir",
1452             backup_path="backup_dir")
1453
1454         prev_index_filename = deltatar.index_name_func(is_full=True)
1455         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1456
1457         # add some new files and directories
1458         os.makedirs('source_dir/bigdir')
1459         self.hash["source_dir/bigdir"] = ""
1460         os.unlink("source_dir/small")
1461         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1462         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1463         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1464
1465         deltatar.create_diff_backup("source_dir", "backup_dir2",
1466                                     prev_index_path)
1467
1468         # apply diff backup in target_dir
1469         index_filename = deltatar.index_name_func(is_full=False)
1470         index_path = os.path.join("backup_dir2", index_filename)
1471         deltatar.restore_backup("target_dir",
1472             backup_indexes_paths=[index_path, prev_index_path])
1473
1474         # then compare the two directories source_dir and target_dir and check
1475         # they are the same
1476         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1477
1478     def test_restore_from_index_diff_backup2(self):
1479         '''
1480         Creates a full backup, modifies some files, creates a diff backup,
1481         then restores the diff backup with the full backup as a starting point.
1482         '''
1483         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1484             raise SkipTest('this test only works for uncompressed '
1485                            'or concat compressed modes')
1486
1487         password, paramversion = self.ENCRYPTION or (None, None)
1488         deltatar = DeltaTar(mode=self.MODE, password=password,
1489                             crypto_paramversion=paramversion,
1490                             logger=self.consoleLogger)
1491
1492         # create first backup
1493         deltatar.create_full_backup(
1494             source_path="source_dir",
1495             backup_path="backup_dir")
1496
1497         prev_index_filename = deltatar.index_name_func(is_full=True)
1498         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1499
1500         # add some new files and directories
1501         os.makedirs('source_dir/bigdir')
1502         self.hash["source_dir/bigdir"] = ""
1503         os.unlink("source_dir/small")
1504         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1505         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1506         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1507         shutil.rmtree("source_dir/test")
1508
1509         deltatar.create_diff_backup("source_dir", "backup_dir2",
1510                                     prev_index_path)
1511
1512         # first restore initial backup in target_dir
1513         tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1514         tar_path = os.path.join("backup_dir", tar_filename)
1515         deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1516
1517         # then apply diff backup in target_dir
1518         index_filename = deltatar.index_name_func(is_full=False)
1519         index_path = os.path.join("backup_dir2", index_filename)
1520         deltatar.restore_backup("target_dir",
1521             backup_indexes_paths=[index_path, prev_index_path])
1522
1523         # then compare the two directories source_dir and target_dir and check
1524         # they are the same
1525         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1526
1527     def test_restore_from_index_diff_backup3(self):
1528         '''
1529         Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1530         diff backup, then restores the diff backup with the full backup as a
1531         starting point.
1532         '''
1533         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1534             raise SkipTest('this test only works for uncompressed '
1535                            'or concat compressed modes')
1536
1537         password, paramversion = self.ENCRYPTION or (None, None)
1538         deltatar = DeltaTar(mode=self.MODE, password=password,
1539                             crypto_paramversion=paramversion,
1540                             logger=self.consoleLogger)
1541
1542         shutil.rmtree("source_dir")
1543         shutil.copytree(self.GIT_DIR, "source_dir")
1544         shutil.copytree(self.GIT_DIR, "source_dir_diff")
1545
1546         # create first backup
1547         deltatar.create_full_backup(
1548             source_path="source_dir",
1549             backup_path="backup_dir")
1550
1551         prev_index_filename = deltatar.index_name_func(is_full=True)
1552         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1553
1554         # alter the source_dir randomly
1555         source_it = deltatar._recursive_walk_dir('source_dir_diff')
1556
1557         for path in source_it:
1558             # if path doesn't exist (might have previously removed) ignore it.
1559             # also ignore it (i.e. do not change it) 70% of the time
1560             if not os.path.exists(path) or random.random() < 0.7:
1561                 continue
1562
1563             # remove the file
1564             if os.path.isdir(path):
1565                 shutil.rmtree(path)
1566             else:
1567                 os.unlink(path)
1568
1569         deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1570                                     prev_index_path)
1571
1572         # first restore initial backup in target_dir
1573         tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1574         tar_path = os.path.join("backup_dir", tar_filename)
1575         deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1576
1577         # and check that target_dir equals to source_dir (which is the same as
1578         # self.GIT_DIR initially)
1579         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1580
1581         # then apply diff backup in target_dir
1582         index_filename = deltatar.index_name_func(is_full=False)
1583         index_path = os.path.join("backup_dir2", index_filename)
1584         deltatar.restore_backup("target_dir",
1585             backup_indexes_paths=[index_path, prev_index_path])
1586
1587         # and check that target_dir equals to source_dir_diff (the randomly
1588         # altered self.GIT_DIR directory)
1589         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1590
1591         # then delete target_dir and apply diff backup from zero and check again
1592         shutil.rmtree("target_dir")
1593         deltatar.restore_backup("target_dir",
1594             backup_indexes_paths=[index_path, prev_index_path])
1595
1596         # and check that target_dir equals to source_dir_diff (the randomly
1597         # altered self.GIT_DIR directory)
1598         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1599
1600     def test_restore_from_index_diff_backup3_multivol(self):
1601         '''
1602         Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1603         diff backup, then restores the diff backup with the full backup as a
1604         starting point.
1605         '''
1606         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1607             raise SkipTest('this test only works for uncompressed '
1608                            'or concat compressed modes')
1609
1610         password, paramversion = self.ENCRYPTION or (None, None)
1611         deltatar = DeltaTar(mode=self.MODE, password=password,
1612                             crypto_paramversion=paramversion,
1613                             logger=self.consoleLogger)
1614
1615         shutil.rmtree("source_dir")
1616         shutil.copytree(self.GIT_DIR, "source_dir")
1617         shutil.copytree(self.GIT_DIR, "source_dir_diff")
1618
1619         # create first backup
1620         deltatar.create_full_backup(
1621             source_path="source_dir",
1622             backup_path="backup_dir",
1623             max_volume_size=1)
1624
1625         prev_index_filename = deltatar.index_name_func(is_full=True)
1626         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1627
1628         # alter the source_dir randomly
1629         source_it = deltatar._recursive_walk_dir('source_dir_diff')
1630
1631         for path in source_it:
1632             # if path doesn't exist (might have previously removed) ignore it.
1633             # also ignore it (i.e. do not change it) 70% of the time
1634             if not os.path.exists(path) or random.random() < 0.7:
1635                 continue
1636
1637             # remove the file
1638             if os.path.isdir(path):
1639                 shutil.rmtree(path)
1640             else:
1641                 os.unlink(path)
1642
1643         deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1644                                     prev_index_path, max_volume_size=1)
1645
1646         # first restore initial backup in target_dir
1647         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1648         tar_path = os.path.join("backup_dir", tar_filename)
1649         deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1650
1651         # and check that target_dir equals to source_dir (which is the same as
1652         # self.GIT_DIR initially)
1653         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1654
1655         # then apply diff backup in target_dir
1656         index_filename = deltatar.index_name_func(is_full=False)
1657         index_path = os.path.join("backup_dir2", index_filename)
1658         deltatar.restore_backup("target_dir",
1659             backup_indexes_paths=[index_path, prev_index_path])
1660
1661         # and check that target_dir equals to source_dir_diff (the randomly
1662         # altered self.GIT_DIR directory)
1663         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1664
1665         # then delete target_dir and apply diff backup from zero and check again
1666         shutil.rmtree("target_dir")
1667         deltatar.restore_backup("target_dir",
1668             backup_indexes_paths=[index_path, prev_index_path])
1669
1670         # and check that target_dir equals to source_dir_diff (the randomly
1671         # altered self.GIT_DIR directory)
1672         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1673
1674     def check_equal_dirs(self, path1, path2, deltatar):
1675         '''
1676         compare the two directories source_dir and target_dir and check
1677         # they are the same
1678         '''
1679         source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
1680         source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
1681         target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
1682         target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1683         while True:
1684             try:
1685                 sitem = next(source_it)
1686                 titem = next(target_it)
1687             except StopIteration:
1688                 try:
1689                     titem = next(target_it)
1690                     raise Exception("iterators do not stop at the same time")
1691                 except StopIteration:
1692                     break
1693             try:
1694                 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
1695             except Exception as e:
1696                 print("SITEM: " + str(sitem))
1697                 print("TITEM: " + str(titem))
1698                 raise e
1699
1700     def test_create_no_symlinks(self):
1701         '''
1702         Creates a full backup from different varieties of symlinks. The
1703         extracted archive may not contain any symlinks but the file contents
1704         '''
1705
1706         os.system("rm -rf source_dir")
1707         os.makedirs("source_dir/symlinks")
1708         fd = os.open("source_dir/symlinks/valid_linkname",
1709                      os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1710         os.write(fd, b"valid link target for symlink tests; please ignore\n")
1711         os.close(fd)
1712         # first one is good, the rest points nowhere
1713         self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1714         self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1715         self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1716         self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
1717         password, paramversion = self.ENCRYPTION or (None, None)
1718         deltatar = DeltaTar(mode=self.MODE, password=password,
1719                             crypto_paramversion=paramversion,
1720                             logger=self.consoleLogger)
1721
1722         # create first backup
1723         deltatar.create_full_backup(source_path="source_dir",
1724                                     backup_path="backup_dir")
1725
1726         assert os.path.exists("backup_dir")
1727         shutil.rmtree("source_dir")
1728         assert not os.path.exists("source_dir")
1729
1730         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1731         tar_path = os.path.join("backup_dir", tar_filename)
1732
1733         deltatar.restore_backup(target_path="source_dir",
1734                                 backup_tar_path=tar_path)
1735
1736         for _r, _ds, fs in os.walk("source_dir/symlinks"):
1737         # only the valid link plus the linked file may be found in the
1738         # extracted archive
1739             assert len(fs) == 2
1740             for f in fs:
1741                 # the link must have been resolved and file contents must match
1742                 # the linked file
1743                 assert not os.path.islink(f)
1744                 with open("source_dir/symlinks/valid_linkname") as a:
1745                     with open("source_dir/symlinks/whatever") as b:
1746                         assert a.read() == b.read()
1747
1748     def test_restore_with_symlinks(self):
1749         '''
1750         Creates a full backup containing different varieties of symlinks. All
1751         of them must be filtered out.
1752         '''
1753         password, paramversion = self.ENCRYPTION or (None, None)
1754         deltatar = DeltaTar(mode=self.MODE, password=password,
1755                             crypto_paramversion=paramversion,
1756                             logger=self.consoleLogger)
1757
1758         # create first backup
1759         deltatar.create_full_backup(source_path="source_dir",
1760                                     backup_path="backup_dir")
1761
1762         assert os.path.exists("backup_dir")
1763         shutil.rmtree("source_dir")
1764
1765         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1766         tar_path = os.path.join("backup_dir", tar_filename)
1767
1768         # add symlinks to existing archive
1769
1770         def add_symlink (a, name, dst):
1771             l = tarfile.TarInfo("snapshot://%s" % name)
1772             l.type = tarfile.SYMTYPE
1773             l.linkname = dst
1774             a.addfile(l)
1775             return name
1776
1777         try:
1778             with tarfile.open(tar_path,mode="a") as a:
1779                 checkme = \
1780                     [ add_symlink(a, "symlinks/foo", "internal-file")
1781                     , add_symlink(a, "symlinks/bar", "/absolute/path")
1782                     , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1783         except tarfile.ReadError as e:
1784             if self.MODE == '#' or self.MODE.endswith ("gz"):
1785                 checkme = []
1786             else:
1787                 raise
1788         except ValueError as e:
1789             if self.MODE.startswith ('#'):
1790                 checkme = []
1791             else:
1792                 raise
1793
1794         deltatar.restore_backup(target_path="source_dir",
1795                                 backup_tar_path=tar_path)
1796
1797         # check what happened to our symlinks
1798         for name in checkme:
1799             fullpath = os.path.join("source_dir", name)
1800             assert not os.path.exists(fullpath)
1801
1802     def test_restore_malicious_symlinks(self):
1803         '''
1804         Creates a full backup containing a symlink and a file of the same name.
1805         This simulates a symlink attack with a link pointing to some external
1806         path that is abused to write outside the extraction prefix.
1807         '''
1808         password, paramversion = self.ENCRYPTION or (None, None)
1809         deltatar = DeltaTar(mode=self.MODE, password=password,
1810                             crypto_paramversion=paramversion,
1811                             logger=self.consoleLogger)
1812
1813         # create first backup
1814         deltatar.create_full_backup(source_path="source_dir",
1815                                     backup_path="backup_dir")
1816
1817         assert os.path.exists("backup_dir")
1818         shutil.rmtree("source_dir")
1819
1820         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1821         tar_path = os.path.join("backup_dir", tar_filename)
1822
1823         # add symlinks to existing archive
1824
1825         def add_symlink (a, name, dst):
1826             l = tarfile.TarInfo("snapshot://%s" % name)
1827             l.type = tarfile.SYMTYPE
1828             l.linkname = dst
1829             a.addfile(l)
1830
1831         def add_file (a, name):
1832             f = tarfile.TarInfo("snapshot://%s" % name)
1833             f.type = tarfile.REGTYPE
1834             a.addfile(f)
1835
1836         testpath = "symlinks/pernicious-link"
1837         testdst = "/tmp/does/not/exist"
1838
1839         try:
1840             with tarfile.open(tar_path, mode="a") as a:
1841                 add_symlink(a, testpath, testdst)
1842                 add_symlink(a, testpath, testdst+"X")
1843                 add_symlink(a, testpath, testdst+"XXX")
1844                 add_file(a, testpath)
1845         except tarfile.ReadError as e:
1846             if self.MODE == '#' or self.MODE.endswith ("gz"):
1847                 pass
1848             else:
1849                 raise
1850         except ValueError as e:
1851             if self.MODE.startswith ('#'):
1852                 pass # O_APPEND of concat archives not feasible
1853             else:
1854                 raise
1855
1856         deltatar.restore_backup(target_path="source_dir",
1857                                 backup_tar_path=tar_path)
1858
1859         # check whether the link was extracted; deltatar seems to only ever
1860         # retrieve the first item it finds for a given path which in the case
1861         # at hand is a symlink to some non-existent path
1862         fullpath = os.path.join("source_dir", testpath)
1863         assert not os.path.exists(fullpath)
1864
1865 class DeltaTar2Test(DeltaTarTest):
1866     '''
1867     Same as DeltaTar but with specific ":" mode
1868     '''
1869     MODE = ':'
1870
1871
1872 class DeltaTarStreamTest(DeltaTarTest):
1873     '''
1874     Same as DeltaTar but with specific uncompressed stream mode
1875     '''
1876     MODE = '|'
1877
1878
1879 class DeltaTarGzipTest(DeltaTarTest):
1880     '''
1881     Same as DeltaTar but with specific gzip mode
1882     '''
1883     MODE = ':gz'
1884     MODE_COMPRESSES = True
1885
1886
1887 class DeltaTarGzipStreamTest(DeltaTarTest):
1888     '''
1889     Same as DeltaTar but with specific gzip stream mode
1890     '''
1891     MODE = '|gz'
1892     MODE_COMPRESSES = True
1893
1894
1895 @skip('Bz2 tests are too slow..')
1896 class DeltaTarBz2Test(DeltaTarTest):
1897     '''
1898     Same as DeltaTar but with specific bz2 mode
1899     '''
1900     MODE = ':bz2'
1901     MODE_COMPRESSES = True
1902
1903
1904 @skip('Bz2 tests are too slow..')
1905 class DeltaTarBz2StreamTest(DeltaTarTest):
1906     '''
1907     Same as DeltaTar but with specific bz2 stream mode
1908     '''
1909     MODE = '|bz2'
1910     MODE_COMPRESSES = True
1911
1912
1913 class DeltaTarGzipConcatTest(DeltaTarTest):
1914     '''
1915     Same as DeltaTar but with specific gzip concat stream mode
1916     '''
1917     MODE = '#gz'
1918     MODE_COMPRESSES = True
1919
1920
1921 class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
1922     '''
1923     Same as DeltaTar but with specific gzip aes128 concat stream mode
1924     '''
1925     MODE = '#gz'
1926     ENCRYPTION = ('some magic key', 1)
1927     MODE_COMPRESSES = True
1928
1929
1930 class DeltaTarAes128ConcatTest(DeltaTarTest):
1931     '''
1932     Same as DeltaTar but with specific aes128 concat stream mode
1933     '''
1934     MODE = '#'
1935     ENCRYPTION = ('some magic key', 1)
1936
1937