graciously handle GCM data length limit
[python-delta-tar] / testing / test_deltatar.py
1 # Copyright (C) 2013 Intra2net AG
2 #
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU Lesser General Public License as published
5 # by the Free Software Foundation; either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Lesser General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see
15 # <http://www.gnu.org/licenses/lgpl-3.0.html>
16
17 # Author: Eduardo Robles Elvira <edulix@wadobo.com>
18
19 import errno
20 import os
21 import re
22 import random
23 import shutil
24 import logging
25 import binascii
26 import json
27 from datetime import datetime
28 from functools import partial
29 from unittest import skip, SkipTest
30
31 import deltatar.tarfile as tarfile
32 from deltatar.tarfile import TarFile
33 from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
34 from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35 import deltatar.crypto as crypto
36
37 from . import BaseTest
38 from . import new_volume_handler
39
40 class DeltaTarTest(BaseTest):
41     """
42     Test backups
43     """
44     MODE = ''
45     MODE_COMPRESSES = False
46
47     ENCRYPTION = None  # (password : str, paramversion : int) option
48
49     GIT_DIR = '.git'
50
51     def setUp(self):
52         '''
53         Create base test data
54         '''
55         self.pwd = os.getcwd()
56         os.system('rm -rf target_dir source_dir* backup_dir* huge')
57         os.makedirs('source_dir/test/test2')
58         self.hash = dict()
59         self.hash["source_dir/test/test2"] = ''
60         self.hash["source_dir/big"]  = self.create_file("source_dir/big", 50000)
61         self.hash["source_dir/small"]  = self.create_file("source_dir/small", 100)
62         self.hash["source_dir/test/huge"]  = self.create_file("source_dir/test/huge", 700000)
63         self.hash["source_dir/test/huge2"]  = self.create_file("source_dir/test/huge2", 800000)
64
65         self.consoleLogger = logging.StreamHandler()
66         self.consoleLogger.setLevel(logging.DEBUG)
67
68         if not os.path.isdir(self.GIT_DIR):
69             # Not running inside git tree, take our
70             # own testing directory as source.
71             self.GIT_DIR = 'testing'
72
73             if not os.path.isdir(self.GIT_DIR):
74                 raise Exception('No input directory found: ' + self.GIT_DIR)
75
76     def tearDown(self):
77         '''
78         Remove temporal files created by unit tests and reset globals.
79         '''
80         os.chdir(self.pwd)
81         os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
82         _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
83                   ("I am fully aware that this will void my warranty.")
84
85     def test_restore_simple_full_backup(self):
86         '''
87         Creates a full backup without any filtering and restores it.
88         '''
89         password, paramversion = self.ENCRYPTION or (None, None)
90         deltatar = DeltaTar(mode=self.MODE, password=password,
91                             crypto_paramversion=paramversion,
92                             logger=self.consoleLogger)
93
94         # create first backup
95         deltatar.create_full_backup(
96             source_path="source_dir",
97             backup_path="backup_dir")
98
99         assert os.path.exists("backup_dir")
100         shutil.rmtree("source_dir")
101
102         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
103         tar_path = os.path.join("backup_dir", tar_filename)
104
105         deltatar.restore_backup(target_path="source_dir",
106                                 backup_tar_path=tar_path)
107
108         for key, value in self.hash.items():
109             assert os.path.exists(key)
110             if value:
111                 assert value == self.md5sum(key)
112
113
114     def test_create_backup_max_file_length (self):
115         """
116         Creates a full backup including one file that exceeds the (purposely
117         lowered) upper bound on GCM encrypted objects. This will yield multiple
118         encrypted objects for one plaintext file.
119
120         Success is verified by splitting the archive at object boundaries and
121         counting the parts.
122         """
123         if self.MODE_COMPRESSES is True:
124             raise SkipTest ("GCM file length test not meaningful with compression.")
125         if self.ENCRYPTION is None:
126             raise SkipTest ("GCM file length applies only to encrypted backups.")
127
128         new_max = 20000 # cannot be less than tar block size
129         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
130                 ("I am fully aware that this will void my warranty.",
131                  new_max)
132
133         password, paramversion = self.ENCRYPTION
134         deltatar = DeltaTar (mode=self.MODE, password=password,
135                              crypto_paramversion=paramversion,
136                              logger=self.consoleLogger)
137
138         self.hash = dict ()
139         os.makedirs ("source_dir2")
140         for f, s in [("empty"          , 0)             # 1 tar objects
141                     ,("slightly_larger", new_max + 1)   # 2
142                     ,("twice"          , 2 * new_max)   # 3
143                     ]:
144             f = "source_dir2/%s" % f
145             self.hash [f] = self.create_file (f, s)
146
147         deltatar.create_full_backup \
148                 (source_path="source_dir2", backup_path="backup_dir")
149
150         assert os.path.exists ("backup_dir")
151         shutil.rmtree ("source_dir2")
152
153         backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
154         backup_path     = os.path.join("backup_dir", backup_filename)
155
156         # split the resulting archive into its constituents without
157         # decrypting
158         ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
159                         "-o backup_dir/split <\'%s\'" % backup_path)
160
161         assert os.path.exists ("backup_dir/split")
162
163         dents = os.listdir ("backup_dir/split")
164         assert len (dents) == 6
165
166
167     def test_restore_backup_max_file_length (self):
168         """
169         Creates a full backup including one file that exceeds the (purposely
170         lowered) upper bound on GCM encrypted objects. This will yield two
171         encrypted objects for one plaintext file.
172
173         Success is verified by splitting the archive at object boundaries and
174         counting the parts.
175         """
176         if self.MODE_COMPRESSES is True:
177             raise SkipTest ("GCM file length test not meaningful with compression.")
178         if self.ENCRYPTION is None:
179             raise SkipTest ("GCM file length applies only to encrypted backups.")
180
181         new_max = 20000 # cannot be less than tar block size
182         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
183                     ("I am fully aware that this will void my warranty.",
184                      new_max)
185
186         password, paramversion = self.ENCRYPTION
187         deltatar = DeltaTar (mode=self.MODE, password=password,
188                              crypto_paramversion=paramversion,
189                              logger=self.consoleLogger)
190
191         self.hash = dict ()
192         os.makedirs ("source_dir2")
193         for f, s in [("empty"          , 0)             # 1 tar objects
194                     ,("slightly_larger", new_max + 1)   # 2
195                     ,("twice"          , 2 * new_max)   # 3
196                     ]:
197             f = "source_dir2/%s" % f
198             self.hash [f] = self.create_file (f, s)
199
200         deltatar.create_full_backup \
201                 (source_path="source_dir2", backup_path="backup_dir")
202
203         assert os.path.exists ("backup_dir")
204         shutil.rmtree ("source_dir2")
205
206         backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
207         backup_path     = os.path.join("backup_dir", backup_filename)
208
209         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
210         tar_path = os.path.join("backup_dir", tar_filename)
211
212         deltatar.restore_backup(target_path="source_dir2",
213                                 backup_tar_path=tar_path)
214
215         for key, value in self.hash.items():
216             assert os.path.exists(key)
217             if value:
218                 assert value == self.md5sum(key)
219
220
221     def test_check_index_checksum(self):
222         '''
223         Creates a full backup and checks the index' checksum of files
224         '''
225         password, paramversion = self.ENCRYPTION or (None, None)
226         deltatar = DeltaTar(mode=self.MODE, password=password,
227                             crypto_paramversion=paramversion,
228                             logger=self.consoleLogger)
229
230         # create first backup
231         deltatar.create_full_backup(
232             source_path="source_dir",
233             backup_path="backup_dir")
234
235
236         index_filename = deltatar.index_name_func(True)
237         index_path = os.path.join("backup_dir", index_filename)
238
239         f = open(index_path, 'rb')
240         crc = None
241         checked = False
242         began_list = False
243         while True:
244             l = f.readline()
245             if l == b'':
246                 break
247             if b'BEGIN-FILE-LIST' in l:
248                 crc = binascii.crc32(l) & 0xFFFFffff
249                 began_list = True
250             elif b'END-FILE-LIST' in l:
251                 crc = binascii.crc32(l, crc) & 0xffffffff
252
253                 # next line contains the crc
254                 data = json.loads(f.readline().decode("UTF-8"))
255                 assert data['type'] == 'file-list-checksum'
256                 assert data['checksum'] == crc
257                 checked = True
258                 break
259             elif began_list:
260                 crc = binascii.crc32(l, crc) & 0xffffffff
261         f.close()
262
263
264     def test_restore_multivol(self):
265         '''
266         Creates a full backup without any filtering with multiple volumes and
267         restore it.
268         '''
269         if ':gz' in self.MODE:
270             raise SkipTest('compression information is lost when creating '
271                            'multiple volumes with no Stream')
272
273         password, paramversion = self.ENCRYPTION or (None, None)
274         deltatar = DeltaTar(mode=self.MODE, password=password,
275                             crypto_paramversion=paramversion,
276                             logger=self.consoleLogger)
277
278         self.hash = dict()
279         os.makedirs('source_dir2')
280         self.hash["source_dir2/big"]  = self.create_file("source_dir2/big", 100000)
281         self.hash["source_dir2/huge"]  = self.create_file("source_dir2/huge", 1200000)
282
283         # create first backup
284         deltatar.create_full_backup(
285             source_path="source_dir2",
286             backup_path="backup_dir",
287             max_volume_size=1)
288
289         assert os.path.exists("backup_dir")
290         assert os.path.exists(os.path.join("backup_dir",
291             deltatar.volume_name_func("backup_dir", True, 0)))
292         if self.MODE_COMPRESSES:
293             n_vols = 1
294         else:
295             n_vols = 2
296         for i_vol in range(n_vols):
297             assert os.path.exists(os.path.join("backup_dir",
298                 deltatar.volume_name_func("backup_dir", True, i_vol)))
299         assert not os.path.exists(os.path.join("backup_dir",
300             deltatar.volume_name_func("backup_dir", True, n_vols)))
301
302         shutil.rmtree("source_dir2")
303
304         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
305         tar_path = os.path.join("backup_dir", tar_filename)
306
307         # this should automatically restore all volumes
308         deltatar.restore_backup(target_path="source_dir2",
309                                 backup_tar_path=tar_path)
310
311         for key, value in self.hash.items():
312             assert os.path.exists(key)
313             if value:
314                 assert value == self.md5sum(key)
315
316     def test_restore_multivol_split(self):
317         '''
318         Creates a full backup without any filtering with multiple volumes
319         with big files bigger than the max volume size and
320         restore it.
321         '''
322         if self.MODE.startswith(':') or self.MODE.startswith('|'):
323             raise SkipTest('this test only works for uncompressed '
324                            'or concat compressed modes')
325
326         password, paramversion = self.ENCRYPTION or (None, None)
327         deltatar = DeltaTar(mode=self.MODE, password=password,
328                             crypto_paramversion=paramversion,
329                             logger=self.consoleLogger)
330
331         self.hash = dict()
332         os.makedirs('source_dir2')
333         self.hash["source_dir2/big"]  = self.create_file("source_dir2/big", 3*1024*1024)
334         self.hash["source_dir2/huge"]  = self.create_file("source_dir2/huge", 4*1024*1024)
335         self.hash["source_dir2/huge2"]  = self.create_file("source_dir2/huge2", 4*1024*1024)
336
337         # create first backup
338         deltatar.create_full_backup(
339             source_path="source_dir2",
340             backup_path="backup_dir",
341             max_volume_size=2)
342
343         assert os.path.exists("backup_dir")
344         assert os.path.exists(os.path.join("backup_dir",
345             deltatar.volume_name_func("backup_dir", True, 0)))
346         if self.MODE_COMPRESSES:
347             n_vols = 1
348         else:
349             n_vols = 6
350         for i_vol in range(n_vols):
351             assert os.path.exists(os.path.join("backup_dir",
352                 deltatar.volume_name_func("backup_dir", True, i_vol)))
353         assert not os.path.exists(os.path.join("backup_dir",
354             deltatar.volume_name_func("backup_dir", True, n_vols)))
355
356         shutil.rmtree("source_dir2")
357
358         index_filename = deltatar.index_name_func(True)
359         index_path = os.path.join("backup_dir", index_filename)
360
361         deltatar.restore_backup(target_path="source_dir2",
362             backup_indexes_paths=[index_path])
363
364         for key, value in self.hash.items():
365             assert os.path.exists(key)
366             if value:
367                 assert value == self.md5sum(key)
368
369
370     def test_full_backup_index_extra_data(self):
371         '''
372         Tests that the index file for a full backup can store extra_data and
373         that this data can be retrieved.
374         '''
375         password, paramversion = self.ENCRYPTION or (None, None)
376         deltatar = DeltaTar(mode=self.MODE, password=password,
377                             crypto_paramversion=paramversion,
378                             logger=self.consoleLogger)
379
380         extra_data = dict(
381             hola="caracola",
382             otra_cosa=[1, "lista"],
383             y_otra=dict(bola=1.1)
384         )
385
386         deltatar.create_full_backup(
387             source_path="source_dir",
388             backup_path="backup_dir",
389             extra_data=extra_data)
390
391         index_filename = deltatar.index_name_func(is_full=True)
392         index_path = os.path.join("backup_dir", index_filename)
393
394         # iterate_index_path retrieves extra_data, and thus we can then compare
395         index_it = deltatar.iterate_index_path(index_path)
396         self.assertEqual(index_it.extra_data, extra_data)
397
398
399     def test_diff_backup_index_extra_data(self):
400         '''
401         Tests that the index file for a diff backup can store extra_data and
402         that this data can be retrieved.
403         '''
404         password, paramversion = self.ENCRYPTION or (None, None)
405         deltatar = DeltaTar(mode=self.MODE, password=password,
406                             crypto_paramversion=paramversion,
407                             logger=self.consoleLogger)
408
409         extra_data = dict(
410             hola="caracola",
411             otra_cosa=[1, "lista"],
412             y_otra=dict(bola=1.1)
413         )
414         # do first backup
415         deltatar.create_full_backup(
416             source_path="source_dir",
417             backup_path="backup_dir")
418
419
420         prev_index_filename = deltatar.index_name_func(is_full=True)
421         prev_index_path = os.path.join("backup_dir", prev_index_filename)
422
423         # create empty diff backup
424         deltatar.create_diff_backup("source_dir", "backup_dir2",
425                                     prev_index_path, extra_data=extra_data)
426
427         index_filename = deltatar.index_name_func(is_full=False)
428         index_path = os.path.join("backup_dir2", index_filename)
429
430         # iterate_index_path retrieves extra_data, and thus we can then compare
431         index_it = deltatar.iterate_index_path(index_path)
432         self.assertEqual(index_it.extra_data, extra_data)
433
434     def test_restore_multivol2(self):
435         '''
436         Creates a full backup without any filtering with multiple volumes and
437         restore it.
438         '''
439         password, paramversion = self.ENCRYPTION or (None, None)
440         deltatar = DeltaTar(mode=self.MODE, password=password,
441                             crypto_paramversion=paramversion,
442                             logger=self.consoleLogger)
443
444         shutil.copytree(self.GIT_DIR, "source_dir2")
445
446         # create first backup
447         deltatar.create_full_backup(
448             source_path="source_dir2",
449             backup_path="backup_dir",
450             max_volume_size=1)
451
452         assert os.path.exists("backup_dir")
453         assert os.path.exists(os.path.join("backup_dir",
454             deltatar.volume_name_func("backup_dir", True, 0)))
455
456         shutil.rmtree("source_dir2")
457
458         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
459         tar_path = os.path.join("backup_dir", tar_filename)
460
461         # this should automatically restore all volumes
462         deltatar.restore_backup(target_path="source_dir2",
463                                 backup_tar_path=tar_path)
464
465         self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
466
467     def test_restore_multivol_manual_from_index(self):
468         '''
469         Creates a full backup without any filtering with multiple volumes and
470         restore it.
471         '''
472         # this test only works for uncompressed or concat compressed modes
473         if self.MODE.startswith(':') or self.MODE.startswith('|'):
474             raise SkipTest('this test only works for uncompressed '
475                            'or concat compressed modes')
476
477         password, paramversion = self.ENCRYPTION or (None, None)
478         deltatar = DeltaTar(mode=self.MODE, password=password,
479                             crypto_paramversion=paramversion,
480                             logger=self.consoleLogger)
481
482         self.hash = dict()
483         os.makedirs('source_dir2')
484         self.hash["source_dir2/big"]  = self.create_file("source_dir2/big", 100000)
485         self.hash["source_dir2/huge"]  = self.create_file("source_dir2/huge", 1200000)
486
487         # create first backup
488         deltatar.create_full_backup(
489             source_path="source_dir2",
490             backup_path="backup_dir",
491             max_volume_size=1)
492
493         assert os.path.exists("backup_dir")
494         assert os.path.exists(os.path.join("backup_dir",
495             deltatar.volume_name_func("backup_dir", True, 0)))
496         if self.MODE_COMPRESSES:
497             n_vols = 1
498         else:
499             n_vols = 2
500         for i_vol in range(n_vols):
501             assert os.path.exists(os.path.join("backup_dir",
502                 deltatar.volume_name_func("backup_dir", True, i_vol)))
503         assert not os.path.exists(os.path.join("backup_dir",
504             deltatar.volume_name_func("backup_dir", True, n_vols)))
505
506         shutil.rmtree("source_dir2")
507
508         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
509         tar_path = os.path.join("backup_dir", tar_filename)
510
511         index_filename = deltatar.index_name_func(True)
512         index_path = os.path.join("backup_dir", index_filename)
513
514         # this should automatically restore the huge file
515         f = deltatar.open_auxiliary_file(index_path, 'r')
516         offset = None
517         while True:
518             l = f.readline()
519             if not len(l):
520                 break
521             data = json.loads(l.decode('UTF-8'))
522             if data.get('type', '') == 'file' and\
523                     deltatar.unprefixed(data['path']) == "huge":
524                 offset = data['offset']
525                 break
526
527         assert offset is not None
528
529         fo = open(tar_path, 'rb')
530         fo.seek(offset)
531         def new_volume_handler(mode, tarobj, base_name, volume_number):
532             suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
533             if self.ENCRYPTION is not None:
534                 # deltatar module is shadowed here
535                 suf += "." + deltatar_PDTCRYPT_EXTENSION
536             tarobj.open_volume(datetime.now().strftime(
537                 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
538         new_volume_handler = partial(new_volume_handler, self.MODE)
539
540         crypto_ctx = None
541         if self.ENCRYPTION is not None:
542             crypto_ctx = crypto.Decrypt (password)
543
544         tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
545                               new_volume_handler=new_volume_handler,
546                               encryption=crypto_ctx)
547
548         member = tarobj.next()
549         member.path = deltatar.unprefixed(member.path)
550         member.name = deltatar.unprefixed(member.name)
551         tarobj.extract(member)
552         tarobj.close()
553         fo.close()
554         assert self.hash['source_dir2/huge'] == self.md5sum('huge')
555
556         os.unlink("huge")
557
558
559     def test_restore_manual_from_index_twice (self):
560         """
561         Creates a full backup and restore the same file twice. This *must* fail
562         when encryption is active.
563
564         Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
565         backwards within the same file. This prevents the encryption layer from
566         exploding due to a reused IV in an overall valid archive.
567
568         This test anticipates possible future mistakes since it’s entirely
569         feasible to implement backward seeks for *_Stream* with concat mode.
570         """
571         # this test only works for uncompressed or concat compressed modes
572         if self.MODE.startswith("|") or self.MODE_COMPRESSES:
573             raise SkipTest("this test only works for uncompressed "
574                            "or concat compressed modes")
575
576         password, paramversion = self.ENCRYPTION or (None, None)
577         deltatar = DeltaTar(mode=self.MODE, password=password,
578                             crypto_paramversion=paramversion,
579                             logger=self.consoleLogger)
580
581         self.hash = dict()
582         os.makedirs("source_dir2")
583         self.hash["source_dir2/samefile"] = \
584             self.create_file("source_dir2/samefile", 1 * 1024)
585
586         # create first backup
587         deltatar.create_full_backup(
588             source_path="source_dir2",
589             backup_path="backup_dir")
590
591         assert os.path.exists("backup_dir")
592         assert os.path.exists(os.path.join("backup_dir",
593             deltatar.volume_name_func("backup_dir", True, 0)))
594
595         shutil.rmtree("source_dir2")
596
597         tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
598         tar_path = os.path.join("backup_dir", tar_filename)
599
600         index_filename = deltatar.index_name_func(True)
601         index_path = os.path.join("backup_dir", index_filename)
602
603         f = deltatar.open_auxiliary_file(index_path, "r")
604         offset = None
605         while True:
606             l = f.readline()
607             if not len(l):
608                 break
609             data = json.loads(l.decode("UTF-8"))
610             if data.get("type", "") == "file" and\
611                     deltatar.unprefixed(data["path"]) == "samefile":
612                 offset = data["offset"]
613                 break
614
615         assert offset is not None
616
617         fo = open(tar_path, "rb")
618         fo.seek(offset)
619
620         crypto_ctx = None
621         if self.ENCRYPTION is not None:
622             crypto_ctx = crypto.Decrypt (password)
623
624         tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
625                               encryption=crypto_ctx)
626         member = tarobj.next()
627         member.path = deltatar.unprefixed(member.path)
628         member.name = deltatar.unprefixed(member.name)
629
630         # extract once â€¦
631         tarobj.extract(member)
632         assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
633
634         # â€¦ and twice
635         try:
636             tarobj.extract(member)
637         except tarfile.StreamError:
638             if crypto_ctx is not None:
639                 pass # good: seeking backwards not allowed
640             else:
641                 raise
642         tarobj.close()
643         fo.close()
644         assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
645
646         os.unlink("samefile")
647
648
649     def test_restore_from_index(self):
650         '''
651         Restores a full backup using an index file.
652         '''
653         if self.MODE.startswith(':') or self.MODE.startswith('|'):
654             raise SkipTest('this test only works for uncompressed '
655                            'or concat compressed modes')
656
657         password, paramversion = self.ENCRYPTION or (None, None)
658         deltatar = DeltaTar(mode=self.MODE, password=password,
659                             crypto_paramversion=paramversion,
660                             logger=self.consoleLogger)
661
662         # create first backup
663         deltatar.create_full_backup(
664             source_path="source_dir",
665             backup_path="backup_dir")
666
667         shutil.rmtree("source_dir")
668
669         # this should automatically restore all volumes
670         index_filename = deltatar.index_name_func(True)
671         index_path = os.path.join("backup_dir", index_filename)
672
673         deltatar.restore_backup(target_path="source_dir",
674             backup_indexes_paths=[index_path])
675
676         for key, value in self.hash.items():
677             assert os.path.exists(key)
678             if value:
679                 assert value == self.md5sum(key)
680
681     def test_restore_multivol_from_index(self):
682         '''
683         Restores a full multivolume backup using an index file.
684         '''
685         if self.MODE.startswith(':') or self.MODE.startswith('|'):
686             raise SkipTest('this test only works for uncompressed '
687                            'or concat compressed modes')
688
689         password, paramversion = self.ENCRYPTION or (None, None)
690         deltatar = DeltaTar(mode=self.MODE, password=password,
691                             crypto_paramversion=paramversion,
692                             logger=self.consoleLogger)
693
694         # create first backup
695         deltatar.create_full_backup(
696             source_path="source_dir",
697             backup_path="backup_dir",
698             max_volume_size=2)
699
700         shutil.rmtree("source_dir")
701
702         # this should automatically restore all volumes
703         index_filename = deltatar.index_name_func(True)
704         index_path = os.path.join("backup_dir", index_filename)
705
706         deltatar.restore_backup(target_path="source_dir",
707             backup_indexes_paths=[index_path])
708
709         for key, value in self.hash.items():
710             assert os.path.exists(key)
711             if value:
712                 assert value == self.md5sum(key)
713
714     def test_create_basic_filtering(self):
715         '''
716         Tests create backup basic filtering.
717         '''
718         password, paramversion = self.ENCRYPTION or (None, None)
719         deltatar = DeltaTar(mode=self.MODE, password=password,
720                             crypto_paramversion=paramversion,
721                             logger=self.consoleLogger,
722                             included_files=["test", "small"],
723                             excluded_files=["test/huge"])
724
725         # create first backup
726         deltatar.create_full_backup(
727             source_path="source_dir",
728             backup_path="backup_dir")
729
730         assert os.path.exists("backup_dir")
731         shutil.rmtree("source_dir")
732
733         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
734         tar_path = os.path.join("backup_dir", tar_filename)
735
736         deltatar.restore_backup(target_path="source_dir",
737                                 backup_tar_path=tar_path)
738
739         assert os.path.exists("source_dir/small")
740         assert os.path.exists("source_dir/test")
741         assert os.path.exists("source_dir/test/huge2")
742         assert os.path.exists("source_dir/test/test2")
743
744         assert not os.path.exists("source_dir/test/huge")
745         assert not os.path.exists("source_dir/big")
746
747     def test_create_filter_func(self):
748         '''
749         Tests create backup basic filtering.
750         '''
751         visited_paths = []
752         def filter_func(visited_paths, path):
753             if path not in visited_paths:
754                 visited_paths.append(path)
755             return True
756
757         filter_func = partial(filter_func, visited_paths)
758
759         password, paramversion = self.ENCRYPTION or (None, None)
760         deltatar = DeltaTar(mode=self.MODE, password=password,
761                             crypto_paramversion=paramversion,
762                             logger=self.consoleLogger,
763                             included_files=["test", "small"],
764                             excluded_files=["test/huge"],
765                             filter_func=filter_func)
766
767         # create first backup
768         deltatar.create_full_backup(
769             source_path="source_dir",
770             backup_path="backup_dir")
771
772         assert os.path.exists("backup_dir")
773         shutil.rmtree("source_dir")
774
775         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
776         tar_path = os.path.join("backup_dir", tar_filename)
777
778         deltatar.restore_backup(target_path="source_dir",
779                                 backup_tar_path=tar_path)
780         assert set(visited_paths) == set([
781                 'small',
782                 'test',
783                 'test/huge2',
784                 'test/test2'
785             ])
786
787     def test_create_filter_out_func(self):
788         '''
789         Tests create backup basic filtering.
790         '''
791         visited_paths = []
792         def filter_func(visited_paths, path):
793             '''
794             Filter out everything
795             '''
796             if path not in visited_paths:
797                 visited_paths.append(path)
798             return False
799
800         filter_func = partial(filter_func, visited_paths)
801
802         password, paramversion = self.ENCRYPTION or (None, None)
803         deltatar = DeltaTar(mode=self.MODE, password=password,
804                             crypto_paramversion=paramversion,
805                             logger=self.consoleLogger,
806                             included_files=["test", "small"],
807                             excluded_files=["test/huge"],
808                             filter_func=filter_func)
809
810         # create first backup
811         deltatar.create_full_backup(
812             source_path="source_dir",
813             backup_path="backup_dir")
814
815         assert os.path.exists("backup_dir")
816         shutil.rmtree("source_dir")
817
818         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
819         tar_path = os.path.join("backup_dir", tar_filename)
820
821         deltatar.restore_backup(target_path="source_dir",
822                                 backup_tar_path=tar_path)
823         assert set(visited_paths) == set([
824                 'small',
825                 'test'
826             ])
827
828         # check that effectively no file was backed up
829         assert not os.path.exists("source_dir/small")
830         assert not os.path.exists("source_dir/big")
831         assert not os.path.exists("source_dir/test")
832
833     def test_restore_index_basic_filtering(self):
834         '''
835         Creates a backup, and then filter when doing the index based restore.
836         '''
837         if self.MODE.startswith(':') or self.MODE.startswith('|'):
838             raise SkipTest('this test only works for uncompressed '
839                            'or concat compressed modes')
840
841         password, paramversion = self.ENCRYPTION or (None, None)
842         deltatar = DeltaTar(mode=self.MODE, password=password,
843                             crypto_paramversion=paramversion,
844                             logger=self.consoleLogger)
845
846         # create first backup
847         deltatar.create_full_backup(
848             source_path="source_dir",
849             backup_path="backup_dir")
850
851         assert os.path.exists("backup_dir")
852         shutil.rmtree("source_dir")
853
854         index_filename = deltatar.index_name_func(True)
855         index_path = os.path.join("backup_dir", index_filename)
856
857         deltatar.included_files = ["test", "small"]
858         deltatar.excluded_files = ["test/huge"]
859         deltatar.restore_backup(target_path="source_dir",
860             backup_indexes_paths=[index_path])
861
862         assert os.path.exists("source_dir/small")
863         assert os.path.exists("source_dir/test")
864         assert os.path.exists("source_dir/test/huge2")
865         assert os.path.exists("source_dir/test/test2")
866
867         assert not os.path.exists("source_dir/test/huge")
868         assert not os.path.exists("source_dir/big")
869
870     def test_restore_index_filter_func(self):
871         '''
872         Creates a backup, and then filter when doing the index based restore,
873         using the filter function.
874         '''
875         if self.MODE.startswith(':') or self.MODE.startswith('|'):
876             raise SkipTest('this test only works for uncompressed '
877                            'or concat compressed modes')
878
879         visited_paths = []
880         def filter_func(visited_paths, path):
881             if path not in visited_paths:
882                 visited_paths.append(path)
883             return True
884
885         filter_func = partial(filter_func, visited_paths)
886
887         password, paramversion = self.ENCRYPTION or (None, None)
888         deltatar = DeltaTar(mode=self.MODE, password=password,
889                             crypto_paramversion=paramversion,
890                             logger=self.consoleLogger)
891
892         # create first backup
893         deltatar.create_full_backup(
894             source_path="source_dir",
895             backup_path="backup_dir")
896
897         assert os.path.exists("backup_dir")
898         shutil.rmtree("source_dir")
899
900         index_filename = deltatar.index_name_func(True)
901         index_path = os.path.join("backup_dir", index_filename)
902
903         deltatar.included_files = ["test", "small"]
904         deltatar.excluded_files = ["test/huge"]
905         deltatar.filter_func = filter_func
906         deltatar.restore_backup(target_path="source_dir",
907             backup_indexes_paths=[index_path])
908
909         assert set(visited_paths) == set([
910                 'small',
911                 'test',
912                 'test/huge2',
913                 'test/test2'
914             ])
915
916     def test_restore_tar_basic_filtering(self):
917         '''
918         Creates a backup, and then filter when doing the tar based restore.
919         '''
920         password, paramversion = self.ENCRYPTION or (None, None)
921         deltatar = DeltaTar(mode=self.MODE, password=password,
922                             crypto_paramversion=paramversion,
923                             logger=self.consoleLogger)
924
925         # create first backup
926         deltatar.create_full_backup(
927             source_path="source_dir",
928             backup_path="backup_dir")
929
930         assert os.path.exists("backup_dir")
931         shutil.rmtree("source_dir")
932
933         deltatar.included_files = ["test", "small"]
934         deltatar.excluded_files = ["test/huge"]
935
936         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
937         tar_path = os.path.join("backup_dir", tar_filename)
938
939         deltatar.restore_backup(target_path="source_dir",
940                                 backup_tar_path=tar_path)
941
942         assert os.path.exists("source_dir/small")
943         assert os.path.exists("source_dir/test")
944         assert os.path.exists("source_dir/test/huge2")
945         assert os.path.exists("source_dir/test/test2")
946
947         assert not os.path.exists("source_dir/test/huge")
948         assert not os.path.exists("source_dir/big")
949
950     def test_restore_tar_filter_func(self):
951         '''
952         Creates a backup, and then filter when doing the tar based restore,
953         using the filter function.
954         '''
955         visited_paths = []
956         def filter_func(visited_paths, path):
957             if path not in visited_paths:
958                 visited_paths.append(path)
959             return True
960
961         filter_func = partial(filter_func, visited_paths)
962
963         password, paramversion = self.ENCRYPTION or (None, None)
964         deltatar = DeltaTar(mode=self.MODE, password=password,
965                             crypto_paramversion=paramversion,
966                             logger=self.consoleLogger)
967
968         # create first backup
969         deltatar.create_full_backup(
970             source_path="source_dir",
971             backup_path="backup_dir")
972
973         assert os.path.exists("backup_dir")
974         shutil.rmtree("source_dir")
975
976         index_filename = deltatar.index_name_func(True)
977         index_path = os.path.join("backup_dir", index_filename)
978
979         deltatar.included_files = ["test", "small"]
980         deltatar.excluded_files = ["test/huge"]
981         deltatar.filter_func = filter_func
982
983         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
984         tar_path = os.path.join("backup_dir", tar_filename)
985
986         deltatar.restore_backup(target_path="source_dir",
987                                 backup_tar_path=tar_path)
988         assert set(visited_paths) == set([
989                 'small',
990                 'test',
991                 'test/huge2',
992                 'test/test2'
993             ])
994
995     def test_filter_path_regexp(self):
996         '''
997         Test specifically the deltatar.filter_path function with regular
998         expressions
999         '''
1000         included_files = [
1001             re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1002             re.compile('^yes$'),
1003             'testing'
1004         ]
1005         excluded_files = [
1006             re.compile('^testing/in_the'),
1007         ]
1008         deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1009                             excluded_files=excluded_files)
1010
1011         # assert valid and invalid paths
1012         assert deltatar.filter_path('test/hola')
1013         assert deltatar.filter_path('test/hola/any/thing')
1014         assert deltatar.filter_path('test/caracola/caracolero')
1015         assert deltatar.filter_path('test/caracola/caracolero/yeah')
1016         assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1017         assert deltatar.filter_path('yes')
1018         assert deltatar.filter_path('testing')
1019         assert deltatar.filter_path('testing/yes')
1020         assert deltatar.filter_path('testing/in_th')
1021
1022         assert not deltatar.filter_path('something')
1023         assert not deltatar.filter_path('other/thing')
1024         assert not deltatar.filter_path('test_ing')
1025         assert not deltatar.filter_path('test/hola_lala')
1026         assert not deltatar.filter_path('test/agur')
1027         assert not deltatar.filter_path('testing_something')
1028         assert not deltatar.filter_path('yeso')
1029         assert not deltatar.filter_path('yes/o')
1030         assert not deltatar.filter_path('yes_o')
1031         assert not deltatar.filter_path('testing/in_the')
1032         assert not deltatar.filter_path('testing/in_the_field')
1033         assert not deltatar.filter_path('testing/in_the/field')
1034
1035     def test_filter_path_parent(self):
1036         '''
1037         Test specifically the deltatar.filter_path function for parent matching
1038         '''
1039         included_files = [
1040             'testing/path/to/some/thing'
1041         ]
1042         deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1043
1044         # assert valid and invalid paths
1045         assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1046         assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1047         assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1048         assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1049         assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1050         assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1051         assert deltatar.filter_path('testing/something/else') == NO_MATCH
1052
1053     def test_parent_matching_simple_full_backup(self):
1054         '''
1055         Create a full backup using parent matching
1056         '''
1057         included_files = [
1058             'test/huge2'
1059         ]
1060
1061         password, paramversion = self.ENCRYPTION or (None, None)
1062         deltatar = DeltaTar(mode=self.MODE, password=password,
1063                             crypto_paramversion=paramversion,
1064                             logger=self.consoleLogger,
1065                             included_files=included_files)
1066
1067         # create first backup
1068         deltatar.create_full_backup(
1069             source_path="source_dir",
1070             backup_path="backup_dir")
1071
1072         assert os.path.exists("backup_dir")
1073         shutil.rmtree("source_dir")
1074
1075         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1076         tar_path = os.path.join("backup_dir", tar_filename)
1077
1078         deltatar = DeltaTar(mode=self.MODE, password=password,
1079                             logger=self.consoleLogger)
1080         deltatar.restore_backup(target_path="source_dir",
1081                                 backup_tar_path=tar_path)
1082
1083         assert os.path.exists('source_dir/test/huge2')
1084         assert os.path.exists('source_dir/test/')
1085         assert not os.path.exists('source_dir/test/huge')
1086         assert not os.path.exists('source_dir/big')
1087         assert not os.path.exists('source_dir/small')
1088
1089     def test_parent_matching_simple_full_backup_restore(self):
1090         '''
1091         Create a full backup and restores it using parent matching
1092         '''
1093         included_files = [
1094             'test/huge2'
1095         ]
1096
1097         password, paramversion = self.ENCRYPTION or (None, None)
1098         deltatar = DeltaTar(mode=self.MODE, password=password,
1099                             crypto_paramversion=paramversion,
1100                             logger=self.consoleLogger)
1101
1102         # create first backup
1103         deltatar.create_full_backup(
1104             source_path="source_dir",
1105             backup_path="backup_dir")
1106
1107         assert os.path.exists("backup_dir")
1108         shutil.rmtree("source_dir")
1109
1110         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1111         tar_path = os.path.join("backup_dir", tar_filename)
1112
1113         deltatar = DeltaTar(mode=self.MODE, password=password,
1114                             logger=self.consoleLogger,
1115                             included_files=included_files)
1116         deltatar.restore_backup(target_path="source_dir",
1117                                 backup_tar_path=tar_path)
1118
1119         assert os.path.exists('source_dir/test/huge2')
1120         assert os.path.exists('source_dir/test/')
1121         assert not os.path.exists('source_dir/test/huge')
1122         assert not os.path.exists('source_dir/big')
1123         assert not os.path.exists('source_dir/small')
1124
1125     def test_parent_matching_index_full_backup_restore(self):
1126         '''
1127         Create a full backup and restores it using parent matching
1128         '''
1129         included_files = [
1130             'test/huge2'
1131         ]
1132
1133         password, paramversion = self.ENCRYPTION or (None, None)
1134         deltatar = DeltaTar(mode=self.MODE, password=password,
1135                             crypto_paramversion=paramversion,
1136                             logger=self.consoleLogger)
1137
1138         # create first backup
1139         deltatar.create_full_backup(
1140             source_path="source_dir",
1141             backup_path="backup_dir")
1142
1143         assert os.path.exists("backup_dir")
1144         shutil.rmtree("source_dir")
1145
1146         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1147         tar_path = os.path.join("backup_dir", tar_filename)
1148
1149         deltatar = DeltaTar(mode=self.MODE, password=password,
1150                             logger=self.consoleLogger,
1151                             included_files=included_files)
1152         deltatar.restore_backup(target_path="source_dir",
1153                                 backup_tar_path=tar_path)
1154
1155         assert os.path.exists('source_dir/test/huge2')
1156         assert os.path.exists('source_dir/test/')
1157         assert not os.path.exists('source_dir/test/huge')
1158         assert not os.path.exists('source_dir/big')
1159         assert not os.path.exists('source_dir/small')
1160
1161     def test_collate_iterators(self):
1162         '''
1163         Tests the collate iterators functionality with two exact directories,
1164         using an index iterator from a backup and the exact same source dir.
1165         '''
1166         password, paramversion = self.ENCRYPTION or (None, None)
1167         deltatar = DeltaTar(mode=self.MODE, password=password,
1168                             crypto_paramversion=paramversion,
1169                             logger=self.consoleLogger)
1170
1171         # create first backup
1172         deltatar.create_full_backup(
1173             source_path="source_dir",
1174             backup_path="backup_dir")
1175
1176         assert os.path.exists("backup_dir")
1177
1178         cwd = os.getcwd()
1179         index_filename = deltatar.index_name_func(is_full=True)
1180         index_path = os.path.join(cwd, "backup_dir", index_filename)
1181         index_it = deltatar.iterate_index_path(index_path)
1182
1183         os.chdir('source_dir')
1184         dir_it = deltatar._recursive_walk_dir('.')
1185         path_it = deltatar.jsonize_path_iterator(dir_it)
1186
1187         try:
1188             for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1189                 assert deltatar._equal_stat_dicts(path1, path2)
1190         finally:
1191             os.chdir(cwd)
1192
1193     def test_collate_iterators_diffdirs(self):
1194         '''
1195         Use the collate iterators functionality with two different directories.
1196         It must behave in an expected way.
1197         '''
1198         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1199
1200         password, paramversion = self.ENCRYPTION or (None, None)
1201         deltatar = DeltaTar(mode=self.MODE, password=password,
1202                             crypto_paramversion=paramversion,
1203                             logger=self.consoleLogger)
1204
1205         # create first backup
1206         deltatar.create_full_backup(
1207             source_path="source_dir",
1208             backup_path="backup_dir")
1209
1210         assert os.path.exists("backup_dir")
1211         self.hash["source_dir/z"]  = self.create_file("source_dir/z", 100)
1212
1213         cwd = os.getcwd()
1214         index_filename = deltatar.index_name_func(is_full=True)
1215         index_path = os.path.join(cwd, "backup_dir", index_filename)
1216         index_it = deltatar.iterate_index_path(index_path)
1217
1218         os.chdir('source_dir')
1219         dir_it = deltatar._recursive_walk_dir('.')
1220         path_it = deltatar.jsonize_path_iterator(dir_it)
1221
1222         try:
1223             for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1224                 if path2['path'] == 'z':
1225                     assert not path1
1226                 else:
1227                     assert deltatar._equal_stat_dicts(path1, path2)
1228         finally:
1229             os.chdir(cwd)
1230
1231     def test_collate_iterators_diffdirs2(self):
1232         '''
1233         Use the collate iterators functionality with two different directories.
1234         It must behave in an expected way.
1235         '''
1236         password, paramversion = self.ENCRYPTION or (None, None)
1237         deltatar = DeltaTar(mode=self.MODE, password=password,
1238                             crypto_paramversion=paramversion,
1239                             logger=self.consoleLogger)
1240
1241         # create first backup
1242         deltatar.create_full_backup(
1243             source_path="source_dir",
1244             backup_path="backup_dir")
1245
1246         assert os.path.exists("backup_dir")
1247
1248         # add some new files and directories
1249         os.makedirs('source_dir/bigdir')
1250         self.hash["source_dir/bigdir"] = ""
1251         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1252         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1253         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1254
1255         cwd = os.getcwd()
1256         index_filename = deltatar.index_name_func(is_full=True)
1257         index_path = os.path.join(cwd, "backup_dir", index_filename)
1258         index_it = deltatar.iterate_index_path(index_path)
1259
1260         os.chdir('source_dir')
1261         dir_it = deltatar._recursive_walk_dir('.')
1262         path_it = deltatar.jsonize_path_iterator(dir_it)
1263
1264         visited_pairs = []
1265
1266         try:
1267             for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1268                 visited_pairs.append(
1269                     (deltatar.unprefixed(path1['path']) if path1 else None,
1270                      path2['path'] if path2 else None)
1271                 )
1272         finally:
1273             assert visited_pairs == [
1274                 (u'big', u'big'),
1275                 (None, u'bigdir'),
1276                 (u'small', u'small'),
1277                 (u'test', u'test'),
1278                 (None, u'zzzz'),
1279                 (None, u'bigdir/a'),
1280                 (None, u'bigdir/b'),
1281                 (u'test/huge', u'test/huge'),
1282                 (u'test/huge2', u'test/huge2'),
1283                 (u'test/test2', u'test/test2'),
1284             ]
1285             os.chdir(cwd)
1286
1287     def test_create_empty_diff_backup(self):
1288         '''
1289         Creates an empty (no changes) backup diff
1290         '''
1291         password, paramversion = self.ENCRYPTION or (None, None)
1292         deltatar = DeltaTar(mode=self.MODE, password=password,
1293                             crypto_paramversion=paramversion,
1294                             logger=self.consoleLogger)
1295
1296         # create first backup
1297         deltatar.create_full_backup(
1298             source_path="source_dir",
1299             backup_path="backup_dir")
1300
1301         prev_index_filename = deltatar.index_name_func(is_full=True)
1302         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1303
1304         deltatar.create_diff_backup("source_dir", "backup_dir2",
1305                                     prev_index_path)
1306
1307         # check index items
1308         index_path = os.path.join("backup_dir2",
1309             deltatar.index_name_func(is_full=False))
1310         index_it = deltatar.iterate_index_path(index_path)
1311         n = 0
1312         for i in index_it:
1313             n += 1
1314             assert i[0]['path'].startswith("list://")
1315
1316         assert n == 6
1317
1318         # check the tar file
1319         assert os.path.exists("backup_dir2")
1320         shutil.rmtree("source_dir")
1321
1322         tar_filename = deltatar.volume_name_func('backup_dir2',
1323             is_full=False, volume_number=0)
1324         tar_path = os.path.join("backup_dir2", tar_filename)
1325
1326         # no file restored, because the diff was empty
1327         deltatar.restore_backup(target_path="source_dir",
1328                                 backup_tar_path=tar_path)
1329         assert len(os.listdir("source_dir")) == 0
1330
1331
1332     def test_create_diff_backup1(self):
1333         '''
1334         Creates a diff backup when there are new files
1335         '''
1336         password, paramversion = self.ENCRYPTION or (None, None)
1337         deltatar = DeltaTar(mode=self.MODE, password=password,
1338                             crypto_paramversion=paramversion,
1339                             logger=self.consoleLogger)
1340
1341         # create first backup
1342         deltatar.create_full_backup(
1343             source_path="source_dir",
1344             backup_path="backup_dir")
1345
1346         prev_index_filename = deltatar.index_name_func(is_full=True)
1347         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1348
1349         # add some new files and directories
1350         os.makedirs('source_dir/bigdir')
1351         self.hash["source_dir/bigdir"] = ""
1352         os.unlink("source_dir/small")
1353         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1354         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1355         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1356
1357         deltatar.create_diff_backup("source_dir", "backup_dir2",
1358                                     prev_index_path)
1359
1360         # check index items
1361         index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
1362         index_it = deltatar.iterate_index_path(index_path)
1363         l = [i[0]['path'] for i in index_it]
1364
1365         assert l == [
1366             'list://big',
1367             'snapshot://bigdir',
1368             'delete://small',
1369             'list://test',
1370             'snapshot://zzzz',
1371             'snapshot://bigdir/a',
1372             'snapshot://bigdir/b',
1373             'list://test/huge',
1374             'list://test/huge2',
1375             'list://test/test2',
1376         ]
1377
1378         # check the tar file
1379         assert os.path.exists("backup_dir2")
1380         shutil.rmtree("source_dir")
1381
1382         # create source_dir with the small file, that will be then deleted by
1383         # the restore_backup
1384         os.mkdir("source_dir")
1385         open("source_dir/small", 'wb').close()
1386
1387         tar_filename = deltatar.volume_name_func('backup_dir2',
1388             is_full=False, volume_number=0)
1389         tar_path = os.path.join("backup_dir2", tar_filename)
1390
1391         # restore the backup, this will create only the new files
1392         deltatar.restore_backup(target_path="source_dir",
1393                                 backup_tar_path=tar_path)
1394         # the order doesn't matter
1395         assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
1396
1397     def test_restore_from_index_diff_backup(self):
1398         '''
1399         Creates a full backup, modifies some files, creates a diff backup,
1400         then restores the diff backup from zero.
1401         '''
1402         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1403             raise SkipTest('this test only works for uncompressed '
1404                            'or concat compressed modes')
1405
1406         password, paramversion = self.ENCRYPTION or (None, None)
1407         deltatar = DeltaTar(mode=self.MODE, password=password,
1408                             crypto_paramversion=paramversion,
1409                             logger=self.consoleLogger)
1410
1411         # create first backup
1412         deltatar.create_full_backup(
1413             source_path="source_dir",
1414             backup_path="backup_dir")
1415
1416         prev_index_filename = deltatar.index_name_func(is_full=True)
1417         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1418
1419         # add some new files and directories
1420         os.makedirs('source_dir/bigdir')
1421         self.hash["source_dir/bigdir"] = ""
1422         os.unlink("source_dir/small")
1423         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1424         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1425         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1426
1427         deltatar.create_diff_backup("source_dir", "backup_dir2",
1428                                     prev_index_path)
1429
1430         # apply diff backup in target_dir
1431         index_filename = deltatar.index_name_func(is_full=False)
1432         index_path = os.path.join("backup_dir2", index_filename)
1433         deltatar.restore_backup("target_dir",
1434             backup_indexes_paths=[index_path, prev_index_path])
1435
1436         # then compare the two directories source_dir and target_dir and check
1437         # they are the same
1438         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1439
1440     def test_restore_from_index_diff_backup2(self):
1441         '''
1442         Creates a full backup, modifies some files, creates a diff backup,
1443         then restores the diff backup with the full backup as a starting point.
1444         '''
1445         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1446             raise SkipTest('this test only works for uncompressed '
1447                            'or concat compressed modes')
1448
1449         password, paramversion = self.ENCRYPTION or (None, None)
1450         deltatar = DeltaTar(mode=self.MODE, password=password,
1451                             crypto_paramversion=paramversion,
1452                             logger=self.consoleLogger)
1453
1454         # create first backup
1455         deltatar.create_full_backup(
1456             source_path="source_dir",
1457             backup_path="backup_dir")
1458
1459         prev_index_filename = deltatar.index_name_func(is_full=True)
1460         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1461
1462         # add some new files and directories
1463         os.makedirs('source_dir/bigdir')
1464         self.hash["source_dir/bigdir"] = ""
1465         os.unlink("source_dir/small")
1466         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1467         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1468         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1469         shutil.rmtree("source_dir/test")
1470
1471         deltatar.create_diff_backup("source_dir", "backup_dir2",
1472                                     prev_index_path)
1473
1474         # first restore initial backup in target_dir
1475         tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1476         tar_path = os.path.join("backup_dir", tar_filename)
1477         deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1478
1479         # then apply diff backup in target_dir
1480         index_filename = deltatar.index_name_func(is_full=False)
1481         index_path = os.path.join("backup_dir2", index_filename)
1482         deltatar.restore_backup("target_dir",
1483             backup_indexes_paths=[index_path, prev_index_path])
1484
1485         # then compare the two directories source_dir and target_dir and check
1486         # they are the same
1487         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1488
1489     def test_restore_from_index_diff_backup3(self):
1490         '''
1491         Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1492         diff backup, then restores the diff backup with the full backup as a
1493         starting point.
1494         '''
1495         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1496             raise SkipTest('this test only works for uncompressed '
1497                            'or concat compressed modes')
1498
1499         password, paramversion = self.ENCRYPTION or (None, None)
1500         deltatar = DeltaTar(mode=self.MODE, password=password,
1501                             crypto_paramversion=paramversion,
1502                             logger=self.consoleLogger)
1503
1504         shutil.rmtree("source_dir")
1505         shutil.copytree(self.GIT_DIR, "source_dir")
1506         shutil.copytree(self.GIT_DIR, "source_dir_diff")
1507
1508         # create first backup
1509         deltatar.create_full_backup(
1510             source_path="source_dir",
1511             backup_path="backup_dir")
1512
1513         prev_index_filename = deltatar.index_name_func(is_full=True)
1514         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1515
1516         # alter the source_dir randomly
1517         source_it = deltatar._recursive_walk_dir('source_dir_diff')
1518
1519         for path in source_it:
1520             # if path doesn't exist (might have previously removed) ignore it.
1521             # also ignore it (i.e. do not change it) 70% of the time
1522             if not os.path.exists(path) or random.random() < 0.7:
1523                 continue
1524
1525             # remove the file
1526             if os.path.isdir(path):
1527                 shutil.rmtree(path)
1528             else:
1529                 os.unlink(path)
1530
1531         deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1532                                     prev_index_path)
1533
1534         # first restore initial backup in target_dir
1535         tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1536         tar_path = os.path.join("backup_dir", tar_filename)
1537         deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1538
1539         # and check that target_dir equals to source_dir (which is the same as
1540         # self.GIT_DIR initially)
1541         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1542
1543         # then apply diff backup in target_dir
1544         index_filename = deltatar.index_name_func(is_full=False)
1545         index_path = os.path.join("backup_dir2", index_filename)
1546         deltatar.restore_backup("target_dir",
1547             backup_indexes_paths=[index_path, prev_index_path])
1548
1549         # and check that target_dir equals to source_dir_diff (the randomly
1550         # altered self.GIT_DIR directory)
1551         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1552
1553         # then delete target_dir and apply diff backup from zero and check again
1554         shutil.rmtree("target_dir")
1555         deltatar.restore_backup("target_dir",
1556             backup_indexes_paths=[index_path, prev_index_path])
1557
1558         # and check that target_dir equals to source_dir_diff (the randomly
1559         # altered self.GIT_DIR directory)
1560         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1561
1562     def test_restore_from_index_diff_backup3_multivol(self):
1563         '''
1564         Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1565         diff backup, then restores the diff backup with the full backup as a
1566         starting point.
1567         '''
1568         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1569             raise SkipTest('this test only works for uncompressed '
1570                            'or concat compressed modes')
1571
1572         password, paramversion = self.ENCRYPTION or (None, None)
1573         deltatar = DeltaTar(mode=self.MODE, password=password,
1574                             crypto_paramversion=paramversion,
1575                             logger=self.consoleLogger)
1576
1577         shutil.rmtree("source_dir")
1578         shutil.copytree(self.GIT_DIR, "source_dir")
1579         shutil.copytree(self.GIT_DIR, "source_dir_diff")
1580
1581         # create first backup
1582         deltatar.create_full_backup(
1583             source_path="source_dir",
1584             backup_path="backup_dir",
1585             max_volume_size=1)
1586
1587         prev_index_filename = deltatar.index_name_func(is_full=True)
1588         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1589
1590         # alter the source_dir randomly
1591         source_it = deltatar._recursive_walk_dir('source_dir_diff')
1592
1593         for path in source_it:
1594             # if path doesn't exist (might have previously removed) ignore it.
1595             # also ignore it (i.e. do not change it) 70% of the time
1596             if not os.path.exists(path) or random.random() < 0.7:
1597                 continue
1598
1599             # remove the file
1600             if os.path.isdir(path):
1601                 shutil.rmtree(path)
1602             else:
1603                 os.unlink(path)
1604
1605         deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1606                                     prev_index_path, max_volume_size=1)
1607
1608         # first restore initial backup in target_dir
1609         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1610         tar_path = os.path.join("backup_dir", tar_filename)
1611         deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1612
1613         # and check that target_dir equals to source_dir (which is the same as
1614         # self.GIT_DIR initially)
1615         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1616
1617         # then apply diff backup in target_dir
1618         index_filename = deltatar.index_name_func(is_full=False)
1619         index_path = os.path.join("backup_dir2", index_filename)
1620         deltatar.restore_backup("target_dir",
1621             backup_indexes_paths=[index_path, prev_index_path])
1622
1623         # and check that target_dir equals to source_dir_diff (the randomly
1624         # altered self.GIT_DIR directory)
1625         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1626
1627         # then delete target_dir and apply diff backup from zero and check again
1628         shutil.rmtree("target_dir")
1629         deltatar.restore_backup("target_dir",
1630             backup_indexes_paths=[index_path, prev_index_path])
1631
1632         # and check that target_dir equals to source_dir_diff (the randomly
1633         # altered self.GIT_DIR directory)
1634         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1635
1636     def check_equal_dirs(self, path1, path2, deltatar):
1637         '''
1638         compare the two directories source_dir and target_dir and check
1639         # they are the same
1640         '''
1641         source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
1642         source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
1643         target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
1644         target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1645         while True:
1646             try:
1647                 sitem = next(source_it)
1648                 titem = next(target_it)
1649             except StopIteration:
1650                 try:
1651                     titem = next(target_it)
1652                     raise Exception("iterators do not stop at the same time")
1653                 except StopIteration:
1654                     break
1655             try:
1656                 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
1657             except Exception as e:
1658                 print("SITEM: " + str(sitem))
1659                 print("TITEM: " + str(titem))
1660                 raise e
1661
1662     def test_create_no_symlinks(self):
1663         '''
1664         Creates a full backup from different varieties of symlinks. The
1665         extracted archive may not contain any symlinks but the file contents
1666         '''
1667
1668         os.system("rm -rf source_dir")
1669         os.makedirs("source_dir/symlinks")
1670         fd = os.open("source_dir/symlinks/valid_linkname",
1671                      os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1672         os.write(fd, b"valid link target for symlink tests; please ignore\n")
1673         os.close(fd)
1674         # first one is good, the rest points nowhere
1675         self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1676         self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1677         self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1678         self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
1679         password, paramversion = self.ENCRYPTION or (None, None)
1680         deltatar = DeltaTar(mode=self.MODE, password=password,
1681                             crypto_paramversion=paramversion,
1682                             logger=self.consoleLogger)
1683
1684         # create first backup
1685         deltatar.create_full_backup(source_path="source_dir",
1686                                     backup_path="backup_dir")
1687
1688         assert os.path.exists("backup_dir")
1689         shutil.rmtree("source_dir")
1690         assert not os.path.exists("source_dir")
1691
1692         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1693         tar_path = os.path.join("backup_dir", tar_filename)
1694
1695         deltatar.restore_backup(target_path="source_dir",
1696                                 backup_tar_path=tar_path)
1697
1698         for _r, _ds, fs in os.walk("source_dir/symlinks"):
1699         # only the valid link plus the linked file may be found in the
1700         # extracted archive
1701             assert len(fs) == 2
1702             for f in fs:
1703                 # the link must have been resolved and file contents must match
1704                 # the linked file
1705                 assert not os.path.islink(f)
1706                 with open("source_dir/symlinks/valid_linkname") as a:
1707                     with open("source_dir/symlinks/whatever") as b:
1708                         assert a.read() == b.read()
1709
1710     def test_restore_with_symlinks(self):
1711         '''
1712         Creates a full backup containing different varieties of symlinks. All
1713         of them must be filtered out.
1714         '''
1715         password, paramversion = self.ENCRYPTION or (None, None)
1716         deltatar = DeltaTar(mode=self.MODE, password=password,
1717                             crypto_paramversion=paramversion,
1718                             logger=self.consoleLogger)
1719
1720         # create first backup
1721         deltatar.create_full_backup(source_path="source_dir",
1722                                     backup_path="backup_dir")
1723
1724         assert os.path.exists("backup_dir")
1725         shutil.rmtree("source_dir")
1726
1727         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1728         tar_path = os.path.join("backup_dir", tar_filename)
1729
1730         # add symlinks to existing archive
1731
1732         def add_symlink (a, name, dst):
1733             l = tarfile.TarInfo("snapshot://%s" % name)
1734             l.type = tarfile.SYMTYPE
1735             l.linkname = dst
1736             a.addfile(l)
1737             return name
1738
1739         try:
1740             with tarfile.open(tar_path,mode="a") as a:
1741                 checkme = \
1742                     [ add_symlink(a, "symlinks/foo", "internal-file")
1743                     , add_symlink(a, "symlinks/bar", "/absolute/path")
1744                     , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1745         except tarfile.ReadError as e:
1746             if self.MODE == '#' or self.MODE.endswith ("gz"):
1747                 checkme = []
1748             else:
1749                 raise
1750         except ValueError as e:
1751             if self.MODE.startswith ('#'):
1752                 checkme = []
1753             else:
1754                 raise
1755
1756         deltatar.restore_backup(target_path="source_dir",
1757                                 backup_tar_path=tar_path)
1758
1759         # check what happened to our symlinks
1760         for name in checkme:
1761             fullpath = os.path.join("source_dir", name)
1762             assert not os.path.exists(fullpath)
1763
1764     def test_restore_malicious_symlinks(self):
1765         '''
1766         Creates a full backup containing a symlink and a file of the same name.
1767         This simulates a symlink attack with a link pointing to some external
1768         path that is abused to write outside the extraction prefix.
1769         '''
1770         password, paramversion = self.ENCRYPTION or (None, None)
1771         deltatar = DeltaTar(mode=self.MODE, password=password,
1772                             crypto_paramversion=paramversion,
1773                             logger=self.consoleLogger)
1774
1775         # create first backup
1776         deltatar.create_full_backup(source_path="source_dir",
1777                                     backup_path="backup_dir")
1778
1779         assert os.path.exists("backup_dir")
1780         shutil.rmtree("source_dir")
1781
1782         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1783         tar_path = os.path.join("backup_dir", tar_filename)
1784
1785         # add symlinks to existing archive
1786
1787         def add_symlink (a, name, dst):
1788             l = tarfile.TarInfo("snapshot://%s" % name)
1789             l.type = tarfile.SYMTYPE
1790             l.linkname = dst
1791             a.addfile(l)
1792
1793         def add_file (a, name):
1794             f = tarfile.TarInfo("snapshot://%s" % name)
1795             f.type = tarfile.REGTYPE
1796             a.addfile(f)
1797
1798         testpath = "symlinks/pernicious-link"
1799         testdst = "/tmp/does/not/exist"
1800
1801         try:
1802             with tarfile.open(tar_path, mode="a") as a:
1803                 add_symlink(a, testpath, testdst)
1804                 add_symlink(a, testpath, testdst+"X")
1805                 add_symlink(a, testpath, testdst+"XXX")
1806                 add_file(a, testpath)
1807         except tarfile.ReadError as e:
1808             if self.MODE == '#' or self.MODE.endswith ("gz"):
1809                 pass
1810             else:
1811                 raise
1812         except ValueError as e:
1813             if self.MODE.startswith ('#'):
1814                 pass # O_APPEND of concat archives not feasible
1815             else:
1816                 raise
1817
1818         deltatar.restore_backup(target_path="source_dir",
1819                                 backup_tar_path=tar_path)
1820
1821         # check whether the link was extracted; deltatar seems to only ever
1822         # retrieve the first item it finds for a given path which in the case
1823         # at hand is a symlink to some non-existent path
1824         fullpath = os.path.join("source_dir", testpath)
1825         assert not os.path.exists(fullpath)
1826
1827 class DeltaTar2Test(DeltaTarTest):
1828     '''
1829     Same as DeltaTar but with specific ":" mode
1830     '''
1831     MODE = ':'
1832
1833
1834 class DeltaTarStreamTest(DeltaTarTest):
1835     '''
1836     Same as DeltaTar but with specific uncompressed stream mode
1837     '''
1838     MODE = '|'
1839
1840
1841 class DeltaTarGzipTest(DeltaTarTest):
1842     '''
1843     Same as DeltaTar but with specific gzip mode
1844     '''
1845     MODE = ':gz'
1846     MODE_COMPRESSES = True
1847
1848
1849 class DeltaTarGzipStreamTest(DeltaTarTest):
1850     '''
1851     Same as DeltaTar but with specific gzip stream mode
1852     '''
1853     MODE = '|gz'
1854     MODE_COMPRESSES = True
1855
1856
1857 @skip('Bz2 tests are too slow..')
1858 class DeltaTarBz2Test(DeltaTarTest):
1859     '''
1860     Same as DeltaTar but with specific bz2 mode
1861     '''
1862     MODE = ':bz2'
1863     MODE_COMPRESSES = True
1864
1865
1866 @skip('Bz2 tests are too slow..')
1867 class DeltaTarBz2StreamTest(DeltaTarTest):
1868     '''
1869     Same as DeltaTar but with specific bz2 stream mode
1870     '''
1871     MODE = '|bz2'
1872     MODE_COMPRESSES = True
1873
1874
1875 class DeltaTarGzipConcatTest(DeltaTarTest):
1876     '''
1877     Same as DeltaTar but with specific gzip concat stream mode
1878     '''
1879     MODE = '#gz'
1880     MODE_COMPRESSES = True
1881
1882
1883 class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
1884     '''
1885     Same as DeltaTar but with specific gzip aes128 concat stream mode
1886     '''
1887     MODE = '#gz'
1888     ENCRYPTION = ('some magic key', 1)
1889     MODE_COMPRESSES = True
1890
1891
1892 class DeltaTarAes128ConcatTest(DeltaTarTest):
1893     '''
1894     Same as DeltaTar but with specific aes128 concat stream mode
1895     '''
1896     MODE = '#'
1897     ENCRYPTION = ('some magic key', 1)
1898
1899