add unit test for mishandling access(3)
[python-delta-tar] / testing / test_deltatar.py
1 # Copyright (C) 2013 Intra2net AG
2 #
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU Lesser General Public License as published
5 # by the Free Software Foundation; either version 3 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU Lesser General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program.  If not, see
15 # <http://www.gnu.org/licenses/lgpl-3.0.html>
16
17 # Author: Eduardo Robles Elvira <edulix@wadobo.com>
18
19 import errno
20 import os
21 import re
22 import random
23 import shutil
24 import logging
25 import binascii
26 import json
27 from datetime import datetime
28 from functools import partial
29 from unittest import skip, SkipTest
30
31 import deltatar.tarfile as tarfile
32 from deltatar.tarfile import TarFile
33 from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
34 from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35 import deltatar.crypto as crypto
36
37 from . import BaseTest
38 from . import new_volume_handler
39
40 # Enable warning messages from deltatar. This minimizes the SNR of
41 # test runs, but none of the messages are meaningful in any way.
42 VERBOSE_TEST_OUTPUT = False
43
44 class DeltaTarTest(BaseTest):
45     """
46     Test backups
47     """
48     MODE = ''
49     MODE_COMPRESSES = False
50
51     ENCRYPTION = None  # (password : str, paramversion : int) option
52
53     GIT_DIR = '.git'
54
55     FSTEST = None
56     FSAPI_SAVED = []
57
58     def setUp(self):
59         '''
60         Create base test data
61         '''
62         self.pwd = os.getcwd()
63         os.system('rm -rf target_dir source_dir* backup_dir* huge')
64         os.makedirs('source_dir/test/test2')
65         self.hash = dict()
66         self.hash["source_dir/test/test2"] = ''
67         self.hash["source_dir/big"]  = self.create_file("source_dir/big", 50000)
68         self.hash["source_dir/small"]  = self.create_file("source_dir/small", 100)
69         self.hash["source_dir/test/huge"]  = self.create_file("source_dir/test/huge", 700000)
70         self.hash["source_dir/test/huge2"]  = self.create_file("source_dir/test/huge2", 800000)
71
72         self.consoleLogger = None
73         if VERBOSE_TEST_OUTPUT is True:
74             self.consoleLogger = logging.StreamHandler()
75             self.consoleLogger.setLevel(logging.DEBUG)
76
77         if not os.path.isdir(self.GIT_DIR):
78             # Not running inside git tree, take our
79             # own testing directory as source.
80             self.GIT_DIR = 'testing'
81
82             if not os.path.isdir(self.GIT_DIR):
83                 raise Exception('No input directory found: ' + self.GIT_DIR)
84
85         if self.FSTEST is not None:
86             self.FSTEST ()
87
88     def tearDown(self):
89         '''
90         Remove temporary files created by unit tests and restore the API
91         functions in *os*.
92         '''
93         for att, val in self.FSAPI_SAVED:
94             setattr (os, att, val)
95         os.chdir(self.pwd)
96         os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
97         _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
98                   ("I am fully aware that this will void my warranty.")
99
100     def test_restore_simple_full_backup(self):
101         '''
102         Creates a full backup without any filtering and restores it.
103         '''
104         password, paramversion = self.ENCRYPTION or (None, None)
105         deltatar = DeltaTar(mode=self.MODE, password=password,
106                             crypto_paramversion=paramversion,
107                             logger=self.consoleLogger)
108
109         # create first backup
110         deltatar.create_full_backup(
111             source_path="source_dir",
112             backup_path="backup_dir")
113
114         assert os.path.exists("backup_dir")
115         shutil.rmtree("source_dir")
116
117         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
118         tar_path = os.path.join("backup_dir", tar_filename)
119
120         deltatar.restore_backup(target_path="source_dir",
121                                 backup_tar_path=tar_path)
122
123         for key, value in self.hash.items():
124             assert os.path.exists(key)
125             if value:
126                 assert value == self.md5sum(key)
127
128
129     def test_create_backup_max_file_length (self):
130         """
131         Creates a full backup including one file that exceeds the (purposely
132         lowered) upper bound on GCM encrypted objects. This will yield multiple
133         encrypted objects for one plaintext file.
134
135         Success is verified by splitting the archive at object boundaries and
136         counting the parts.
137         """
138         if self.MODE_COMPRESSES is True:
139             raise SkipTest ("GCM file length test not meaningful with compression.")
140         if self.ENCRYPTION is None:
141             raise SkipTest ("GCM file length applies only to encrypted backups.")
142
143         new_max = 20000 # cannot be less than tar block size
144         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
145                 ("I am fully aware that this will void my warranty.",
146                  new_max)
147
148         password, paramversion = self.ENCRYPTION
149         deltatar = DeltaTar (mode=self.MODE, password=password,
150                              crypto_paramversion=paramversion,
151                              logger=self.consoleLogger)
152
153         self.hash = dict ()
154         os.makedirs ("source_dir2")
155         for f, s in [("empty"          , 0)             # 1 tar objects
156                     ,("slightly_larger", new_max + 1)   # 2
157                     ,("twice"          , 2 * new_max)   # 3
158                     ]:
159             f = "source_dir2/%s" % f
160             self.hash [f] = self.create_file (f, s)
161
162         deltatar.create_full_backup \
163                 (source_path="source_dir2", backup_path="backup_dir")
164
165         assert os.path.exists ("backup_dir")
166         shutil.rmtree ("source_dir2")
167
168         backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
169         backup_path     = os.path.join("backup_dir", backup_filename)
170
171         # split the resulting archive into its constituents without
172         # decrypting
173         ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
174                         "-o backup_dir/split <\'%s\'" % backup_path)
175
176         assert os.path.exists ("backup_dir/split")
177
178         dents = os.listdir ("backup_dir/split")
179         assert len (dents) == 6
180
181
182     def test_restore_backup_max_file_length (self):
183         """
184         Creates a full backup including one file that exceeds the (purposely
185         lowered) upper bound on GCM encrypted objects. This will yield two
186         encrypted objects for one plaintext file.
187
188         Success is verified by splitting the archive at object boundaries and
189         counting the parts.
190         """
191         if self.MODE_COMPRESSES is True:
192             raise SkipTest ("GCM file length test not meaningful with compression.")
193         if self.ENCRYPTION is None:
194             raise SkipTest ("GCM file length applies only to encrypted backups.")
195
196         new_max = 20000 # cannot be less than tar block size
197         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
198                     ("I am fully aware that this will void my warranty.",
199                      new_max)
200
201         password, paramversion = self.ENCRYPTION
202         deltatar = DeltaTar (mode=self.MODE, password=password,
203                              crypto_paramversion=paramversion,
204                              logger=self.consoleLogger)
205
206         self.hash = dict ()
207         os.makedirs ("source_dir2")
208         for f, s in [("empty"          , 0)             # 1 tar objects
209                     ,("almost_large"   , new_max - 1)   # 2
210                     ,("large"          , new_max)       # 3
211                     ,("slightly_larger", new_max + 1)   # 4
212                     ,("twice"          , 2 * new_max)   # 5
213                     ,("twice_plus_one" , (2 * new_max) + 1)   # 6
214                     ]:
215             f = "source_dir2/%s" % f
216             self.hash [f] = self.create_file (f, s)
217
218         deltatar.create_full_backup \
219                 (source_path="source_dir2", backup_path="backup_dir")
220
221         assert os.path.exists ("backup_dir")
222         shutil.rmtree ("source_dir2")
223
224         backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
225         backup_path     = os.path.join("backup_dir", backup_filename)
226
227         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
228         tar_path = os.path.join("backup_dir", tar_filename)
229
230         deltatar.restore_backup(target_path="source_dir2",
231                                 backup_tar_path=tar_path)
232
233         for key, value in self.hash.items():
234             assert os.path.exists(key)
235             if value:
236                 assert value == self.md5sum(key)
237
238
239     def test_create_backup_index_max_file_length (self):
240         """
241         Creates a full backup with a too large index file for the upper bound
242         of the GCM encryption. Since the index file has a fixed IV file counter
243         of AES_GCM_IV_CNT_INDEX, we expect the crypto layer to abort.
244
245         60+ GB of (potentially compressed) index file should last for a while...
246         """
247         if self.MODE_COMPRESSES is True:
248             raise SkipTest ("GCM file length test not meaningful with compression.")
249         if self.ENCRYPTION is None:
250             raise SkipTest ("GCM file length applies only to encrypted backups.")
251
252         new_max = 5000
253         crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
254                     ("I am fully aware that this will void my warranty.",
255                      new_max)
256
257         password, paramversion = self.ENCRYPTION
258         deltatar = DeltaTar (mode=self.MODE, password=password,
259                              crypto_paramversion=paramversion,
260                              logger=self.consoleLogger)
261
262         self.hash = dict ()
263         os.makedirs ("source_dir2")
264         for i in range (42):
265             f = "source_dir2/dummy_%rd" % i
266             self.hash [f] = self.create_file (f, i)
267
268         with self.assertRaises (crypto.InvalidFileCounter):
269             deltatar.create_full_backup \
270                     (source_path="source_dir2", backup_path="backup_dir")
271         shutil.rmtree ("source_dir2")
272
273
274     def test_check_index_checksum(self):
275         '''
276         Creates a full backup and checks the index' checksum of files
277         '''
278         password, paramversion = self.ENCRYPTION or (None, None)
279         deltatar = DeltaTar(mode=self.MODE, password=password,
280                             crypto_paramversion=paramversion,
281                             logger=self.consoleLogger)
282
283         # create first backup
284         deltatar.create_full_backup(
285             source_path="source_dir",
286             backup_path="backup_dir")
287
288
289         index_filename = deltatar.index_name_func(True)
290         index_path = os.path.join("backup_dir", index_filename)
291
292         f = open(index_path, 'rb')
293         crc = None
294         checked = False
295         began_list = False
296         while True:
297             l = f.readline()
298             if l == b'':
299                 break
300             if b'BEGIN-FILE-LIST' in l:
301                 crc = binascii.crc32(l) & 0xFFFFffff
302                 began_list = True
303             elif b'END-FILE-LIST' in l:
304                 crc = binascii.crc32(l, crc) & 0xffffffff
305
306                 # next line contains the crc
307                 data = json.loads(f.readline().decode("UTF-8"))
308                 assert data['type'] == 'file-list-checksum'
309                 assert data['checksum'] == crc
310                 checked = True
311                 break
312             elif began_list:
313                 crc = binascii.crc32(l, crc) & 0xffffffff
314         f.close()
315
316
317     def test_restore_multivol(self):
318         '''
319         Creates a full backup without any filtering with multiple volumes and
320         restore it.
321         '''
322         if ':gz' in self.MODE:
323             raise SkipTest('compression information is lost when creating '
324                            'multiple volumes with no Stream')
325
326         password, paramversion = self.ENCRYPTION or (None, None)
327         deltatar = DeltaTar(mode=self.MODE, password=password,
328                             crypto_paramversion=paramversion,
329                             logger=self.consoleLogger)
330
331         self.hash = dict()
332         os.makedirs('source_dir2')
333         self.hash["source_dir2/big"]  = self.create_file("source_dir2/big", 100000)
334         self.hash["source_dir2/huge"]  = self.create_file("source_dir2/huge", 1200000)
335
336         # create first backup
337         deltatar.create_full_backup(
338             source_path="source_dir2",
339             backup_path="backup_dir",
340             max_volume_size=1)
341
342         assert os.path.exists("backup_dir")
343         assert os.path.exists(os.path.join("backup_dir",
344             deltatar.volume_name_func("backup_dir", True, 0)))
345         if self.MODE_COMPRESSES:
346             n_vols = 1
347         else:
348             n_vols = 2
349         for i_vol in range(n_vols):
350             assert os.path.exists(os.path.join("backup_dir",
351                 deltatar.volume_name_func("backup_dir", True, i_vol)))
352         assert not os.path.exists(os.path.join("backup_dir",
353             deltatar.volume_name_func("backup_dir", True, n_vols)))
354
355         shutil.rmtree("source_dir2")
356
357         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
358         tar_path = os.path.join("backup_dir", tar_filename)
359
360         # this should automatically restore all volumes
361         deltatar.restore_backup(target_path="source_dir2",
362                                 backup_tar_path=tar_path)
363
364         for key, value in self.hash.items():
365             assert os.path.exists(key)
366             if value:
367                 assert value == self.md5sum(key)
368
369     def test_restore_multivol_split(self):
370         '''
371         Creates a full backup without any filtering with multiple volumes
372         with big files bigger than the max volume size and
373         restore it.
374         '''
375         if self.MODE.startswith(':') or self.MODE.startswith('|'):
376             raise SkipTest('this test only works for uncompressed '
377                            'or concat compressed modes')
378
379         password, paramversion = self.ENCRYPTION or (None, None)
380         deltatar = DeltaTar(mode=self.MODE, password=password,
381                             crypto_paramversion=paramversion,
382                             logger=self.consoleLogger)
383
384         self.hash = dict()
385         os.makedirs('source_dir2')
386         self.hash["source_dir2/big"]  = self.create_file("source_dir2/big", 3*1024*1024)
387         self.hash["source_dir2/huge"]  = self.create_file("source_dir2/huge", 4*1024*1024)
388         self.hash["source_dir2/huge2"]  = self.create_file("source_dir2/huge2", 4*1024*1024)
389
390         # create first backup
391         deltatar.create_full_backup(
392             source_path="source_dir2",
393             backup_path="backup_dir",
394             max_volume_size=2)
395
396         assert os.path.exists("backup_dir")
397         assert os.path.exists(os.path.join("backup_dir",
398             deltatar.volume_name_func("backup_dir", True, 0)))
399         if self.MODE_COMPRESSES:
400             n_vols = 1
401         else:
402             n_vols = 6
403         for i_vol in range(n_vols):
404             assert os.path.exists(os.path.join("backup_dir",
405                 deltatar.volume_name_func("backup_dir", True, i_vol)))
406         assert not os.path.exists(os.path.join("backup_dir",
407             deltatar.volume_name_func("backup_dir", True, n_vols)))
408
409         shutil.rmtree("source_dir2")
410
411         index_filename = deltatar.index_name_func(True)
412         index_path = os.path.join("backup_dir", index_filename)
413
414         deltatar.restore_backup(target_path="source_dir2",
415             backup_indexes_paths=[index_path])
416
417         for key, value in self.hash.items():
418             assert os.path.exists(key)
419             if value:
420                 assert value == self.md5sum(key)
421
422
423     def test_full_backup_index_extra_data(self):
424         '''
425         Tests that the index file for a full backup can store extra_data and
426         that this data can be retrieved.
427         '''
428         password, paramversion = self.ENCRYPTION or (None, None)
429         deltatar = DeltaTar(mode=self.MODE, password=password,
430                             crypto_paramversion=paramversion,
431                             logger=self.consoleLogger)
432
433         extra_data = dict(
434             hola="caracola",
435             otra_cosa=[1, "lista"],
436             y_otra=dict(bola=1.1)
437         )
438
439         deltatar.create_full_backup(
440             source_path="source_dir",
441             backup_path="backup_dir",
442             extra_data=extra_data)
443
444         index_filename = deltatar.index_name_func(is_full=True)
445         index_path = os.path.join("backup_dir", index_filename)
446
447         # iterate_index_path retrieves extra_data, and thus we can then compare
448         index_it = deltatar.iterate_index_path(index_path)
449         self.assertEqual(index_it.extra_data, extra_data)
450
451
452     def test_diff_backup_index_extra_data(self):
453         '''
454         Tests that the index file for a diff backup can store extra_data and
455         that this data can be retrieved.
456         '''
457         password, paramversion = self.ENCRYPTION or (None, None)
458         deltatar = DeltaTar(mode=self.MODE, password=password,
459                             crypto_paramversion=paramversion,
460                             logger=self.consoleLogger)
461
462         extra_data = dict(
463             hola="caracola",
464             otra_cosa=[1, "lista"],
465             y_otra=dict(bola=1.1)
466         )
467         # do first backup
468         deltatar.create_full_backup(
469             source_path="source_dir",
470             backup_path="backup_dir")
471
472
473         prev_index_filename = deltatar.index_name_func(is_full=True)
474         prev_index_path = os.path.join("backup_dir", prev_index_filename)
475
476         # create empty diff backup
477         deltatar.create_diff_backup("source_dir", "backup_dir2",
478                                     prev_index_path, extra_data=extra_data)
479
480         index_filename = deltatar.index_name_func(is_full=False)
481         index_path = os.path.join("backup_dir2", index_filename)
482
483         # iterate_index_path retrieves extra_data, and thus we can then compare
484         index_it = deltatar.iterate_index_path(index_path)
485         self.assertEqual(index_it.extra_data, extra_data)
486
487     def test_restore_multivol2(self):
488         '''
489         Creates a full backup without any filtering with multiple volumes and
490         restore it.
491         '''
492         password, paramversion = self.ENCRYPTION or (None, None)
493         deltatar = DeltaTar(mode=self.MODE, password=password,
494                             crypto_paramversion=paramversion,
495                             logger=self.consoleLogger)
496
497         shutil.copytree(self.GIT_DIR, "source_dir2")
498
499         # create first backup
500         deltatar.create_full_backup(
501             source_path="source_dir2",
502             backup_path="backup_dir",
503             max_volume_size=1)
504
505         assert os.path.exists("backup_dir")
506         assert os.path.exists(os.path.join("backup_dir",
507             deltatar.volume_name_func("backup_dir", True, 0)))
508
509         shutil.rmtree("source_dir2")
510
511         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
512         tar_path = os.path.join("backup_dir", tar_filename)
513
514         # this should automatically restore all volumes
515         deltatar.restore_backup(target_path="source_dir2",
516                                 backup_tar_path=tar_path)
517
518         self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
519
520     def test_restore_multivol_manual_from_index(self):
521         '''
522         Creates a full backup without any filtering with multiple volumes and
523         restore it.
524         '''
525         # this test only works for uncompressed or concat compressed modes
526         if self.MODE.startswith(':') or self.MODE.startswith('|'):
527             raise SkipTest('this test only works for uncompressed '
528                            'or concat compressed modes')
529
530         password, paramversion = self.ENCRYPTION or (None, None)
531         deltatar = DeltaTar(mode=self.MODE, password=password,
532                             crypto_paramversion=paramversion,
533                             logger=self.consoleLogger)
534
535         self.hash = dict()
536         os.makedirs('source_dir2')
537         self.hash["source_dir2/big"]  = self.create_file("source_dir2/big", 100000)
538         self.hash["source_dir2/huge"]  = self.create_file("source_dir2/huge", 1200000)
539
540         # create first backup
541         deltatar.create_full_backup(
542             source_path="source_dir2",
543             backup_path="backup_dir",
544             max_volume_size=1)
545
546         assert os.path.exists("backup_dir")
547         assert os.path.exists(os.path.join("backup_dir",
548             deltatar.volume_name_func("backup_dir", True, 0)))
549         if self.MODE_COMPRESSES:
550             n_vols = 1
551         else:
552             n_vols = 2
553         for i_vol in range(n_vols):
554             assert os.path.exists(os.path.join("backup_dir",
555                 deltatar.volume_name_func("backup_dir", True, i_vol)))
556         assert not os.path.exists(os.path.join("backup_dir",
557             deltatar.volume_name_func("backup_dir", True, n_vols)))
558
559         shutil.rmtree("source_dir2")
560
561         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
562         tar_path = os.path.join("backup_dir", tar_filename)
563
564         index_filename = deltatar.index_name_func(True)
565         index_path = os.path.join("backup_dir", index_filename)
566
567         # this should automatically restore the huge file
568         f = deltatar.open_auxiliary_file(index_path, 'r')
569         offset = None
570         while True:
571             l = f.readline()
572             if not len(l):
573                 break
574             data = json.loads(l.decode('UTF-8'))
575             if data.get('type', '') == 'file' and\
576                     deltatar.unprefixed(data['path']) == "huge":
577                 offset = data['offset']
578                 break
579
580         assert offset is not None
581
582         fo = open(tar_path, 'rb')
583         fo.seek(offset)
584         def new_volume_handler(mode, tarobj, base_name, volume_number):
585             suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
586             if self.ENCRYPTION is not None:
587                 # deltatar module is shadowed here
588                 suf += "." + deltatar_PDTCRYPT_EXTENSION
589             tarobj.open_volume(datetime.now().strftime(
590                 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
591         new_volume_handler = partial(new_volume_handler, self.MODE)
592
593         crypto_ctx = None
594         if self.ENCRYPTION is not None:
595             crypto_ctx = crypto.Decrypt (password)
596
597         tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
598                               new_volume_handler=new_volume_handler,
599                               encryption=crypto_ctx)
600
601         member = tarobj.next()
602         member.path = deltatar.unprefixed(member.path)
603         member.name = deltatar.unprefixed(member.name)
604         tarobj.extract(member)
605         tarobj.close()
606         fo.close()
607         assert self.hash['source_dir2/huge'] == self.md5sum('huge')
608
609         os.unlink("huge")
610
611
612     def test_restore_manual_from_index_twice (self):
613         """
614         Creates a full backup and restore the same file twice. This *must* fail
615         when encryption is active.
616
617         Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
618         backwards within the same file. This prevents the encryption layer from
619         exploding due to a reused IV in an overall valid archive.
620
621         This test anticipates possible future mistakes since it’s entirely
622         feasible to implement backward seeks for *_Stream* with concat mode.
623         """
624         # this test only works for uncompressed or concat compressed modes
625         if self.MODE.startswith("|") or self.MODE_COMPRESSES:
626             raise SkipTest("this test only works for uncompressed "
627                            "or concat compressed modes")
628
629         password, paramversion = self.ENCRYPTION or (None, None)
630         deltatar = DeltaTar(mode=self.MODE, password=password,
631                             crypto_paramversion=paramversion,
632                             logger=self.consoleLogger)
633
634         self.hash = dict()
635         os.makedirs("source_dir2")
636         self.hash["source_dir2/samefile"] = \
637             self.create_file("source_dir2/samefile", 1 * 1024)
638
639         # create first backup
640         deltatar.create_full_backup(
641             source_path="source_dir2",
642             backup_path="backup_dir")
643
644         assert os.path.exists("backup_dir")
645         assert os.path.exists(os.path.join("backup_dir",
646             deltatar.volume_name_func("backup_dir", True, 0)))
647
648         shutil.rmtree("source_dir2")
649
650         tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
651         tar_path = os.path.join("backup_dir", tar_filename)
652
653         index_filename = deltatar.index_name_func(True)
654         index_path = os.path.join("backup_dir", index_filename)
655
656         f = deltatar.open_auxiliary_file(index_path, "r")
657         offset = None
658         while True:
659             l = f.readline()
660             if not len(l):
661                 break
662             data = json.loads(l.decode("UTF-8"))
663             if data.get("type", "") == "file" and\
664                     deltatar.unprefixed(data["path"]) == "samefile":
665                 offset = data["offset"]
666                 break
667
668         assert offset is not None
669
670         fo = open(tar_path, "rb")
671         fo.seek(offset)
672
673         crypto_ctx = None
674         if self.ENCRYPTION is not None:
675             crypto_ctx = crypto.Decrypt (password)
676
677         tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
678                               encryption=crypto_ctx)
679         member = tarobj.next()
680         member.path = deltatar.unprefixed(member.path)
681         member.name = deltatar.unprefixed(member.name)
682
683         # extract once â€¦
684         tarobj.extract(member)
685         assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
686
687         # â€¦ and twice
688         try:
689             tarobj.extract(member)
690         except tarfile.StreamError:
691             if crypto_ctx is not None:
692                 pass # good: seeking backwards not allowed
693             else:
694                 raise
695         tarobj.close()
696         fo.close()
697         assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
698
699         os.unlink("samefile")
700
701
702     def test_restore_from_index(self):
703         '''
704         Restores a full backup using an index file.
705         '''
706         if self.MODE.startswith(':') or self.MODE.startswith('|'):
707             raise SkipTest('this test only works for uncompressed '
708                            'or concat compressed modes')
709
710         password, paramversion = self.ENCRYPTION or (None, None)
711         deltatar = DeltaTar(mode=self.MODE, password=password,
712                             crypto_paramversion=paramversion,
713                             logger=self.consoleLogger)
714
715         # create first backup
716         deltatar.create_full_backup(
717             source_path="source_dir",
718             backup_path="backup_dir")
719
720         shutil.rmtree("source_dir")
721
722         # this should automatically restore all volumes
723         index_filename = deltatar.index_name_func(True)
724         index_path = os.path.join("backup_dir", index_filename)
725
726         deltatar.restore_backup(target_path="source_dir",
727             backup_indexes_paths=[index_path])
728
729         for key, value in self.hash.items():
730             assert os.path.exists(key)
731             if value:
732                 assert value == self.md5sum(key)
733
734     def test_restore_multivol_from_index(self):
735         '''
736         Restores a full multivolume backup using an index file.
737         '''
738         if self.MODE.startswith(':') or self.MODE.startswith('|'):
739             raise SkipTest('this test only works for uncompressed '
740                            'or concat compressed modes')
741
742         password, paramversion = self.ENCRYPTION or (None, None)
743         deltatar = DeltaTar(mode=self.MODE, password=password,
744                             crypto_paramversion=paramversion,
745                             logger=self.consoleLogger)
746
747         # create first backup
748         deltatar.create_full_backup(
749             source_path="source_dir",
750             backup_path="backup_dir",
751             max_volume_size=2)
752
753         shutil.rmtree("source_dir")
754
755         # this should automatically restore all volumes
756         index_filename = deltatar.index_name_func(True)
757         index_path = os.path.join("backup_dir", index_filename)
758
759         deltatar.restore_backup(target_path="source_dir",
760             backup_indexes_paths=[index_path])
761
762         for key, value in self.hash.items():
763             assert os.path.exists(key)
764             if value:
765                 assert value == self.md5sum(key)
766
767     def test_create_basic_filtering(self):
768         '''
769         Tests create backup basic filtering.
770         '''
771         password, paramversion = self.ENCRYPTION or (None, None)
772         deltatar = DeltaTar(mode=self.MODE, password=password,
773                             crypto_paramversion=paramversion,
774                             logger=self.consoleLogger,
775                             included_files=["test", "small"],
776                             excluded_files=["test/huge"])
777
778         # create first backup
779         deltatar.create_full_backup(
780             source_path="source_dir",
781             backup_path="backup_dir")
782
783         assert os.path.exists("backup_dir")
784         shutil.rmtree("source_dir")
785
786         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
787         tar_path = os.path.join("backup_dir", tar_filename)
788
789         deltatar.restore_backup(target_path="source_dir",
790                                 backup_tar_path=tar_path)
791
792         assert os.path.exists("source_dir/small")
793         assert os.path.exists("source_dir/test")
794         assert os.path.exists("source_dir/test/huge2")
795         assert os.path.exists("source_dir/test/test2")
796
797         assert not os.path.exists("source_dir/test/huge")
798         assert not os.path.exists("source_dir/big")
799
800     def test_create_filter_func(self):
801         '''
802         Tests create backup basic filtering.
803         '''
804         visited_paths = []
805         def filter_func(visited_paths, path):
806             if path not in visited_paths:
807                 visited_paths.append(path)
808             return True
809
810         filter_func = partial(filter_func, visited_paths)
811
812         password, paramversion = self.ENCRYPTION or (None, None)
813         deltatar = DeltaTar(mode=self.MODE, password=password,
814                             crypto_paramversion=paramversion,
815                             logger=self.consoleLogger,
816                             included_files=["test", "small"],
817                             excluded_files=["test/huge"],
818                             filter_func=filter_func)
819
820         # create first backup
821         deltatar.create_full_backup(
822             source_path="source_dir",
823             backup_path="backup_dir")
824
825         assert os.path.exists("backup_dir")
826         shutil.rmtree("source_dir")
827
828         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
829         tar_path = os.path.join("backup_dir", tar_filename)
830
831         deltatar.restore_backup(target_path="source_dir",
832                                 backup_tar_path=tar_path)
833         assert set(visited_paths) == set([
834                 'small',
835                 'test',
836                 'test/huge2',
837                 'test/test2'
838             ])
839
840     def test_create_filter_out_func(self):
841         '''
842         Tests create backup basic filtering.
843         '''
844         visited_paths = []
845         def filter_func(visited_paths, path):
846             '''
847             Filter out everything
848             '''
849             if path not in visited_paths:
850                 visited_paths.append(path)
851             return False
852
853         filter_func = partial(filter_func, visited_paths)
854
855         password, paramversion = self.ENCRYPTION or (None, None)
856         deltatar = DeltaTar(mode=self.MODE, password=password,
857                             crypto_paramversion=paramversion,
858                             logger=self.consoleLogger,
859                             included_files=["test", "small"],
860                             excluded_files=["test/huge"],
861                             filter_func=filter_func)
862
863         # create first backup
864         deltatar.create_full_backup(
865             source_path="source_dir",
866             backup_path="backup_dir")
867
868         assert os.path.exists("backup_dir")
869         shutil.rmtree("source_dir")
870
871         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
872         tar_path = os.path.join("backup_dir", tar_filename)
873
874         deltatar.restore_backup(target_path="source_dir",
875                                 backup_tar_path=tar_path)
876         assert set(visited_paths) == set([
877                 'small',
878                 'test'
879             ])
880
881         # check that effectively no file was backed up
882         assert not os.path.exists("source_dir/small")
883         assert not os.path.exists("source_dir/big")
884         assert not os.path.exists("source_dir/test")
885
886     def test_restore_index_basic_filtering(self):
887         '''
888         Creates a backup, and then filter when doing the index based restore.
889         '''
890         if self.MODE.startswith(':') or self.MODE.startswith('|'):
891             raise SkipTest('this test only works for uncompressed '
892                            'or concat compressed modes')
893
894         password, paramversion = self.ENCRYPTION or (None, None)
895         deltatar = DeltaTar(mode=self.MODE, password=password,
896                             crypto_paramversion=paramversion,
897                             logger=self.consoleLogger)
898
899         # create first backup
900         deltatar.create_full_backup(
901             source_path="source_dir",
902             backup_path="backup_dir")
903
904         assert os.path.exists("backup_dir")
905         shutil.rmtree("source_dir")
906
907         index_filename = deltatar.index_name_func(True)
908         index_path = os.path.join("backup_dir", index_filename)
909
910         deltatar.included_files = ["test", "small"]
911         deltatar.excluded_files = ["test/huge"]
912         deltatar.restore_backup(target_path="source_dir",
913             backup_indexes_paths=[index_path])
914
915         assert os.path.exists("source_dir/small")
916         assert os.path.exists("source_dir/test")
917         assert os.path.exists("source_dir/test/huge2")
918         assert os.path.exists("source_dir/test/test2")
919
920         assert not os.path.exists("source_dir/test/huge")
921         assert not os.path.exists("source_dir/big")
922
923     def test_restore_index_filter_func(self):
924         '''
925         Creates a backup, and then filter when doing the index based restore,
926         using the filter function.
927         '''
928         if self.MODE.startswith(':') or self.MODE.startswith('|'):
929             raise SkipTest('this test only works for uncompressed '
930                            'or concat compressed modes')
931
932         visited_paths = []
933         def filter_func(visited_paths, path):
934             if path not in visited_paths:
935                 visited_paths.append(path)
936             return True
937
938         filter_func = partial(filter_func, visited_paths)
939
940         password, paramversion = self.ENCRYPTION or (None, None)
941         deltatar = DeltaTar(mode=self.MODE, password=password,
942                             crypto_paramversion=paramversion,
943                             logger=self.consoleLogger)
944
945         # create first backup
946         deltatar.create_full_backup(
947             source_path="source_dir",
948             backup_path="backup_dir")
949
950         assert os.path.exists("backup_dir")
951         shutil.rmtree("source_dir")
952
953         index_filename = deltatar.index_name_func(True)
954         index_path = os.path.join("backup_dir", index_filename)
955
956         deltatar.included_files = ["test", "small"]
957         deltatar.excluded_files = ["test/huge"]
958         deltatar.filter_func = filter_func
959         deltatar.restore_backup(target_path="source_dir",
960             backup_indexes_paths=[index_path])
961
962         assert set(visited_paths) == set([
963                 'small',
964                 'test',
965                 'test/huge2',
966                 'test/test2'
967             ])
968
969     def test_restore_tar_basic_filtering(self):
970         '''
971         Creates a backup, and then filter when doing the tar based restore.
972         '''
973         password, paramversion = self.ENCRYPTION or (None, None)
974         deltatar = DeltaTar(mode=self.MODE, password=password,
975                             crypto_paramversion=paramversion,
976                             logger=self.consoleLogger)
977
978         # create first backup
979         deltatar.create_full_backup(
980             source_path="source_dir",
981             backup_path="backup_dir")
982
983         assert os.path.exists("backup_dir")
984         shutil.rmtree("source_dir")
985
986         deltatar.included_files = ["test", "small"]
987         deltatar.excluded_files = ["test/huge"]
988
989         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
990         tar_path = os.path.join("backup_dir", tar_filename)
991
992         deltatar.restore_backup(target_path="source_dir",
993                                 backup_tar_path=tar_path)
994
995         assert os.path.exists("source_dir/small")
996         assert os.path.exists("source_dir/test")
997         assert os.path.exists("source_dir/test/huge2")
998         assert os.path.exists("source_dir/test/test2")
999
1000         assert not os.path.exists("source_dir/test/huge")
1001         assert not os.path.exists("source_dir/big")
1002
1003     def test_restore_tar_filter_func(self):
1004         '''
1005         Creates a backup, and then filter when doing the tar based restore,
1006         using the filter function.
1007         '''
1008         visited_paths = []
1009         def filter_func(visited_paths, path):
1010             if path not in visited_paths:
1011                 visited_paths.append(path)
1012             return True
1013
1014         filter_func = partial(filter_func, visited_paths)
1015
1016         password, paramversion = self.ENCRYPTION or (None, None)
1017         deltatar = DeltaTar(mode=self.MODE, password=password,
1018                             crypto_paramversion=paramversion,
1019                             logger=self.consoleLogger)
1020
1021         # create first backup
1022         deltatar.create_full_backup(
1023             source_path="source_dir",
1024             backup_path="backup_dir")
1025
1026         assert os.path.exists("backup_dir")
1027         shutil.rmtree("source_dir")
1028
1029         index_filename = deltatar.index_name_func(True)
1030         index_path = os.path.join("backup_dir", index_filename)
1031
1032         deltatar.included_files = ["test", "small"]
1033         deltatar.excluded_files = ["test/huge"]
1034         deltatar.filter_func = filter_func
1035
1036         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1037         tar_path = os.path.join("backup_dir", tar_filename)
1038
1039         deltatar.restore_backup(target_path="source_dir",
1040                                 backup_tar_path=tar_path)
1041         assert set(visited_paths) == set([
1042                 'small',
1043                 'test',
1044                 'test/huge2',
1045                 'test/test2'
1046             ])
1047
1048     def test_filter_path_regexp(self):
1049         '''
1050         Test specifically the deltatar.filter_path function with regular
1051         expressions
1052         '''
1053         included_files = [
1054             re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1055             re.compile('^yes$'),
1056             'testing'
1057         ]
1058         excluded_files = [
1059             re.compile('^testing/in_the'),
1060         ]
1061         deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1062                             excluded_files=excluded_files)
1063
1064         # assert valid and invalid paths
1065         assert deltatar.filter_path('test/hola')
1066         assert deltatar.filter_path('test/hola/any/thing')
1067         assert deltatar.filter_path('test/caracola/caracolero')
1068         assert deltatar.filter_path('test/caracola/caracolero/yeah')
1069         assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1070         assert deltatar.filter_path('yes')
1071         assert deltatar.filter_path('testing')
1072         assert deltatar.filter_path('testing/yes')
1073         assert deltatar.filter_path('testing/in_th')
1074
1075         assert not deltatar.filter_path('something')
1076         assert not deltatar.filter_path('other/thing')
1077         assert not deltatar.filter_path('test_ing')
1078         assert not deltatar.filter_path('test/hola_lala')
1079         assert not deltatar.filter_path('test/agur')
1080         assert not deltatar.filter_path('testing_something')
1081         assert not deltatar.filter_path('yeso')
1082         assert not deltatar.filter_path('yes/o')
1083         assert not deltatar.filter_path('yes_o')
1084         assert not deltatar.filter_path('testing/in_the')
1085         assert not deltatar.filter_path('testing/in_the_field')
1086         assert not deltatar.filter_path('testing/in_the/field')
1087
1088     def test_filter_path_parent(self):
1089         '''
1090         Test specifically the deltatar.filter_path function for parent matching
1091         '''
1092         included_files = [
1093             'testing/path/to/some/thing'
1094         ]
1095         deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1096
1097         # assert valid and invalid paths
1098         assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1099         assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1100         assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1101         assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1102         assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1103         assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1104         assert deltatar.filter_path('testing/something/else') == NO_MATCH
1105
1106     def test_parent_matching_simple_full_backup(self):
1107         '''
1108         Create a full backup using parent matching
1109         '''
1110         included_files = [
1111             'test/huge2'
1112         ]
1113
1114         password, paramversion = self.ENCRYPTION or (None, None)
1115         deltatar = DeltaTar(mode=self.MODE, password=password,
1116                             crypto_paramversion=paramversion,
1117                             logger=self.consoleLogger,
1118                             included_files=included_files)
1119
1120         # create first backup
1121         deltatar.create_full_backup(
1122             source_path="source_dir",
1123             backup_path="backup_dir")
1124
1125         assert os.path.exists("backup_dir")
1126         shutil.rmtree("source_dir")
1127
1128         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1129         tar_path = os.path.join("backup_dir", tar_filename)
1130
1131         deltatar = DeltaTar(mode=self.MODE, password=password,
1132                             logger=self.consoleLogger)
1133         deltatar.restore_backup(target_path="source_dir",
1134                                 backup_tar_path=tar_path)
1135
1136         assert os.path.exists('source_dir/test/huge2')
1137         assert os.path.exists('source_dir/test/')
1138         assert not os.path.exists('source_dir/test/huge')
1139         assert not os.path.exists('source_dir/big')
1140         assert not os.path.exists('source_dir/small')
1141
1142     def test_parent_matching_simple_full_backup_restore(self):
1143         '''
1144         Create a full backup and restores it using parent matching
1145         '''
1146         included_files = [
1147             'test/huge2'
1148         ]
1149
1150         password, paramversion = self.ENCRYPTION or (None, None)
1151         deltatar = DeltaTar(mode=self.MODE, password=password,
1152                             crypto_paramversion=paramversion,
1153                             logger=self.consoleLogger)
1154
1155         # create first backup
1156         deltatar.create_full_backup(
1157             source_path="source_dir",
1158             backup_path="backup_dir")
1159
1160         assert os.path.exists("backup_dir")
1161         shutil.rmtree("source_dir")
1162
1163         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1164         tar_path = os.path.join("backup_dir", tar_filename)
1165
1166         deltatar = DeltaTar(mode=self.MODE, password=password,
1167                             logger=self.consoleLogger,
1168                             included_files=included_files)
1169         deltatar.restore_backup(target_path="source_dir",
1170                                 backup_tar_path=tar_path)
1171
1172         assert os.path.exists('source_dir/test/huge2')
1173         assert os.path.exists('source_dir/test/')
1174         assert not os.path.exists('source_dir/test/huge')
1175         assert not os.path.exists('source_dir/big')
1176         assert not os.path.exists('source_dir/small')
1177
1178     def test_parent_matching_index_full_backup_restore(self):
1179         '''
1180         Create a full backup and restores it using parent matching
1181         '''
1182         included_files = [
1183             'test/huge2'
1184         ]
1185
1186         password, paramversion = self.ENCRYPTION or (None, None)
1187         deltatar = DeltaTar(mode=self.MODE, password=password,
1188                             crypto_paramversion=paramversion,
1189                             logger=self.consoleLogger)
1190
1191         # create first backup
1192         deltatar.create_full_backup(
1193             source_path="source_dir",
1194             backup_path="backup_dir")
1195
1196         assert os.path.exists("backup_dir")
1197         shutil.rmtree("source_dir")
1198
1199         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1200         tar_path = os.path.join("backup_dir", tar_filename)
1201
1202         deltatar = DeltaTar(mode=self.MODE, password=password,
1203                             logger=self.consoleLogger,
1204                             included_files=included_files)
1205         deltatar.restore_backup(target_path="source_dir",
1206                                 backup_tar_path=tar_path)
1207
1208         assert os.path.exists('source_dir/test/huge2')
1209         assert os.path.exists('source_dir/test/')
1210         assert not os.path.exists('source_dir/test/huge')
1211         assert not os.path.exists('source_dir/big')
1212         assert not os.path.exists('source_dir/small')
1213
1214     def test_collate_iterators(self):
1215         '''
1216         Tests the collate iterators functionality with two exact directories,
1217         using an index iterator from a backup and the exact same source dir.
1218         '''
1219         password, paramversion = self.ENCRYPTION or (None, None)
1220         deltatar = DeltaTar(mode=self.MODE, password=password,
1221                             crypto_paramversion=paramversion,
1222                             logger=self.consoleLogger)
1223
1224         # create first backup
1225         deltatar.create_full_backup(
1226             source_path="source_dir",
1227             backup_path="backup_dir")
1228
1229         assert os.path.exists("backup_dir")
1230
1231         cwd = os.getcwd()
1232         index_filename = deltatar.index_name_func(is_full=True)
1233         index_path = os.path.join(cwd, "backup_dir", index_filename)
1234         index_it = deltatar.iterate_index_path(index_path)
1235
1236         os.chdir('source_dir')
1237         dir_it = deltatar._recursive_walk_dir('.')
1238         path_it = deltatar.jsonize_path_iterator(dir_it)
1239
1240         try:
1241             for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1242                 assert deltatar._equal_stat_dicts(path1, path2)
1243         finally:
1244             os.chdir(cwd)
1245
1246     def test_collate_iterators_diffdirs(self):
1247         '''
1248         Use the collate iterators functionality with two different directories.
1249         It must behave in an expected way.
1250         '''
1251         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1252
1253         password, paramversion = self.ENCRYPTION or (None, None)
1254         deltatar = DeltaTar(mode=self.MODE, password=password,
1255                             crypto_paramversion=paramversion,
1256                             logger=self.consoleLogger)
1257
1258         # create first backup
1259         deltatar.create_full_backup(
1260             source_path="source_dir",
1261             backup_path="backup_dir")
1262
1263         assert os.path.exists("backup_dir")
1264         self.hash["source_dir/z"]  = self.create_file("source_dir/z", 100)
1265
1266         cwd = os.getcwd()
1267         index_filename = deltatar.index_name_func(is_full=True)
1268         index_path = os.path.join(cwd, "backup_dir", index_filename)
1269         index_it = deltatar.iterate_index_path(index_path)
1270
1271         os.chdir('source_dir')
1272         dir_it = deltatar._recursive_walk_dir('.')
1273         path_it = deltatar.jsonize_path_iterator(dir_it)
1274
1275         try:
1276             for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1277                 if path2['path'] == 'z':
1278                     assert not path1
1279                 else:
1280                     assert deltatar._equal_stat_dicts(path1, path2)
1281         finally:
1282             os.chdir(cwd)
1283
1284     def test_collate_iterators_diffdirs2(self):
1285         '''
1286         Use the collate iterators functionality with two different directories.
1287         It must behave in an expected way.
1288         '''
1289         password, paramversion = self.ENCRYPTION or (None, None)
1290         deltatar = DeltaTar(mode=self.MODE, password=password,
1291                             crypto_paramversion=paramversion,
1292                             logger=self.consoleLogger)
1293
1294         # create first backup
1295         deltatar.create_full_backup(
1296             source_path="source_dir",
1297             backup_path="backup_dir")
1298
1299         assert os.path.exists("backup_dir")
1300
1301         # add some new files and directories
1302         os.makedirs('source_dir/bigdir')
1303         self.hash["source_dir/bigdir"] = ""
1304         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1305         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1306         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1307
1308         cwd = os.getcwd()
1309         index_filename = deltatar.index_name_func(is_full=True)
1310         index_path = os.path.join(cwd, "backup_dir", index_filename)
1311         index_it = deltatar.iterate_index_path(index_path)
1312
1313         os.chdir('source_dir')
1314         dir_it = deltatar._recursive_walk_dir('.')
1315         path_it = deltatar.jsonize_path_iterator(dir_it)
1316
1317         visited_pairs = []
1318
1319         try:
1320             for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1321                 visited_pairs.append(
1322                     (deltatar.unprefixed(path1['path']) if path1 else None,
1323                      path2['path'] if path2 else None)
1324                 )
1325         finally:
1326             assert visited_pairs == [
1327                 (u'big', u'big'),
1328                 (None, u'bigdir'),
1329                 (u'small', u'small'),
1330                 (u'test', u'test'),
1331                 (None, u'zzzz'),
1332                 (None, u'bigdir/a'),
1333                 (None, u'bigdir/b'),
1334                 (u'test/huge', u'test/huge'),
1335                 (u'test/huge2', u'test/huge2'),
1336                 (u'test/test2', u'test/test2'),
1337             ]
1338             os.chdir(cwd)
1339
1340     def test_create_empty_diff_backup(self):
1341         '''
1342         Creates an empty (no changes) backup diff
1343         '''
1344         password, paramversion = self.ENCRYPTION or (None, None)
1345         deltatar = DeltaTar(mode=self.MODE, password=password,
1346                             crypto_paramversion=paramversion,
1347                             logger=self.consoleLogger)
1348
1349         # create first backup
1350         deltatar.create_full_backup(
1351             source_path="source_dir",
1352             backup_path="backup_dir")
1353
1354         prev_index_filename = deltatar.index_name_func(is_full=True)
1355         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1356
1357         deltatar.create_diff_backup("source_dir", "backup_dir2",
1358                                     prev_index_path)
1359
1360         # check index items
1361         index_path = os.path.join("backup_dir2",
1362             deltatar.index_name_func(is_full=False))
1363         index_it = deltatar.iterate_index_path(index_path)
1364         n = 0
1365         for i in index_it:
1366             n += 1
1367             assert i[0]['path'].startswith("list://")
1368
1369         assert n == 6
1370
1371         # check the tar file
1372         assert os.path.exists("backup_dir2")
1373         shutil.rmtree("source_dir")
1374
1375         tar_filename = deltatar.volume_name_func('backup_dir2',
1376             is_full=False, volume_number=0)
1377         tar_path = os.path.join("backup_dir2", tar_filename)
1378
1379         # no file restored, because the diff was empty
1380         deltatar.restore_backup(target_path="source_dir",
1381                                 backup_tar_path=tar_path)
1382         assert len(os.listdir("source_dir")) == 0
1383
1384
1385     def test_create_diff_backup1(self):
1386         '''
1387         Creates a diff backup when there are new files
1388         '''
1389         password, paramversion = self.ENCRYPTION or (None, None)
1390         deltatar = DeltaTar(mode=self.MODE, password=password,
1391                             crypto_paramversion=paramversion,
1392                             logger=self.consoleLogger)
1393
1394         # create first backup
1395         deltatar.create_full_backup(
1396             source_path="source_dir",
1397             backup_path="backup_dir")
1398
1399         prev_index_filename = deltatar.index_name_func(is_full=True)
1400         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1401
1402         # add some new files and directories
1403         os.makedirs('source_dir/bigdir')
1404         self.hash["source_dir/bigdir"] = ""
1405         os.unlink("source_dir/small")
1406         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1407         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1408         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1409
1410         deltatar.create_diff_backup("source_dir", "backup_dir2",
1411                                     prev_index_path)
1412
1413         # check index items
1414         index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
1415         index_it = deltatar.iterate_index_path(index_path)
1416         l = [i[0]['path'] for i in index_it]
1417
1418         assert l == [
1419             'list://big',
1420             'snapshot://bigdir',
1421             'delete://small',
1422             'list://test',
1423             'snapshot://zzzz',
1424             'snapshot://bigdir/a',
1425             'snapshot://bigdir/b',
1426             'list://test/huge',
1427             'list://test/huge2',
1428             'list://test/test2',
1429         ]
1430
1431         # check the tar file
1432         assert os.path.exists("backup_dir2")
1433         shutil.rmtree("source_dir")
1434
1435         # create source_dir with the small file, that will be then deleted by
1436         # the restore_backup
1437         os.mkdir("source_dir")
1438         open("source_dir/small", 'wb').close()
1439
1440         tar_filename = deltatar.volume_name_func('backup_dir2',
1441             is_full=False, volume_number=0)
1442         tar_path = os.path.join("backup_dir2", tar_filename)
1443
1444         # restore the backup, this will create only the new files
1445         deltatar.restore_backup(target_path="source_dir",
1446                                 backup_tar_path=tar_path)
1447         # the order doesn't matter
1448         assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
1449
1450     def test_restore_from_index_diff_backup(self):
1451         '''
1452         Creates a full backup, modifies some files, creates a diff backup,
1453         then restores the diff backup from zero.
1454         '''
1455         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1456             raise SkipTest('this test only works for uncompressed '
1457                            'or concat compressed modes')
1458
1459         password, paramversion = self.ENCRYPTION or (None, None)
1460         deltatar = DeltaTar(mode=self.MODE, password=password,
1461                             crypto_paramversion=paramversion,
1462                             logger=self.consoleLogger)
1463
1464         # create first backup
1465         deltatar.create_full_backup(
1466             source_path="source_dir",
1467             backup_path="backup_dir")
1468
1469         prev_index_filename = deltatar.index_name_func(is_full=True)
1470         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1471
1472         # add some new files and directories
1473         os.makedirs('source_dir/bigdir')
1474         self.hash["source_dir/bigdir"] = ""
1475         os.unlink("source_dir/small")
1476         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1477         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1478         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1479
1480         deltatar.create_diff_backup("source_dir", "backup_dir2",
1481                                     prev_index_path)
1482
1483         # apply diff backup in target_dir
1484         index_filename = deltatar.index_name_func(is_full=False)
1485         index_path = os.path.join("backup_dir2", index_filename)
1486         deltatar.restore_backup("target_dir",
1487             backup_indexes_paths=[index_path, prev_index_path])
1488
1489         # then compare the two directories source_dir and target_dir and check
1490         # they are the same
1491         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1492
1493     def test_restore_from_index_diff_backup2(self):
1494         '''
1495         Creates a full backup, modifies some files, creates a diff backup,
1496         then restores the diff backup with the full backup as a starting point.
1497         '''
1498         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1499             raise SkipTest('this test only works for uncompressed '
1500                            'or concat compressed modes')
1501
1502         password, paramversion = self.ENCRYPTION or (None, None)
1503         deltatar = DeltaTar(mode=self.MODE, password=password,
1504                             crypto_paramversion=paramversion,
1505                             logger=self.consoleLogger)
1506
1507         # create first backup
1508         deltatar.create_full_backup(
1509             source_path="source_dir",
1510             backup_path="backup_dir")
1511
1512         prev_index_filename = deltatar.index_name_func(is_full=True)
1513         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1514
1515         # add some new files and directories
1516         os.makedirs('source_dir/bigdir')
1517         self.hash["source_dir/bigdir"] = ""
1518         os.unlink("source_dir/small")
1519         self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1520         self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1521         self.hash["source_dir/zzzz"]  = self.create_file("source_dir/zzzz", 100)
1522         shutil.rmtree("source_dir/test")
1523
1524         deltatar.create_diff_backup("source_dir", "backup_dir2",
1525                                     prev_index_path)
1526
1527         # first restore initial backup in target_dir
1528         tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1529         tar_path = os.path.join("backup_dir", tar_filename)
1530         deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1531
1532         # then apply diff backup in target_dir
1533         index_filename = deltatar.index_name_func(is_full=False)
1534         index_path = os.path.join("backup_dir2", index_filename)
1535         deltatar.restore_backup("target_dir",
1536             backup_indexes_paths=[index_path, prev_index_path])
1537
1538         # then compare the two directories source_dir and target_dir and check
1539         # they are the same
1540         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1541
1542     def test_restore_from_index_diff_backup3(self):
1543         '''
1544         Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1545         diff backup, then restores the diff backup with the full backup as a
1546         starting point.
1547         '''
1548         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1549             raise SkipTest('this test only works for uncompressed '
1550                            'or concat compressed modes')
1551
1552         password, paramversion = self.ENCRYPTION or (None, None)
1553         deltatar = DeltaTar(mode=self.MODE, password=password,
1554                             crypto_paramversion=paramversion,
1555                             logger=self.consoleLogger)
1556
1557         shutil.rmtree("source_dir")
1558         shutil.copytree(self.GIT_DIR, "source_dir")
1559         shutil.copytree(self.GIT_DIR, "source_dir_diff")
1560
1561         # create first backup
1562         deltatar.create_full_backup(
1563             source_path="source_dir",
1564             backup_path="backup_dir")
1565
1566         prev_index_filename = deltatar.index_name_func(is_full=True)
1567         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1568
1569         # alter the source_dir randomly
1570         source_it = deltatar._recursive_walk_dir('source_dir_diff')
1571
1572         for path in source_it:
1573             # if path doesn't exist (might have previously removed) ignore it.
1574             # also ignore it (i.e. do not change it) 70% of the time
1575             if not os.path.exists(path) or random.random() < 0.7:
1576                 continue
1577
1578             # remove the file
1579             if os.path.isdir(path):
1580                 shutil.rmtree(path)
1581             else:
1582                 os.unlink(path)
1583
1584         deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1585                                     prev_index_path)
1586
1587         # first restore initial backup in target_dir
1588         tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1589         tar_path = os.path.join("backup_dir", tar_filename)
1590         deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1591
1592         # and check that target_dir equals to source_dir (which is the same as
1593         # self.GIT_DIR initially)
1594         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1595
1596         # then apply diff backup in target_dir
1597         index_filename = deltatar.index_name_func(is_full=False)
1598         index_path = os.path.join("backup_dir2", index_filename)
1599         deltatar.restore_backup("target_dir",
1600             backup_indexes_paths=[index_path, prev_index_path])
1601
1602         # and check that target_dir equals to source_dir_diff (the randomly
1603         # altered self.GIT_DIR directory)
1604         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1605
1606         # then delete target_dir and apply diff backup from zero and check again
1607         shutil.rmtree("target_dir")
1608         deltatar.restore_backup("target_dir",
1609             backup_indexes_paths=[index_path, prev_index_path])
1610
1611         # and check that target_dir equals to source_dir_diff (the randomly
1612         # altered self.GIT_DIR directory)
1613         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1614
1615     def test_restore_from_index_diff_backup3_multivol(self):
1616         '''
1617         Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1618         diff backup, then restores the diff backup with the full backup as a
1619         starting point.
1620         '''
1621         if self.MODE.startswith(':') or self.MODE.startswith('|'):
1622             raise SkipTest('this test only works for uncompressed '
1623                            'or concat compressed modes')
1624
1625         password, paramversion = self.ENCRYPTION or (None, None)
1626         deltatar = DeltaTar(mode=self.MODE, password=password,
1627                             crypto_paramversion=paramversion,
1628                             logger=self.consoleLogger)
1629
1630         shutil.rmtree("source_dir")
1631         shutil.copytree(self.GIT_DIR, "source_dir")
1632         shutil.copytree(self.GIT_DIR, "source_dir_diff")
1633
1634         # create first backup
1635         deltatar.create_full_backup(
1636             source_path="source_dir",
1637             backup_path="backup_dir",
1638             max_volume_size=1)
1639
1640         prev_index_filename = deltatar.index_name_func(is_full=True)
1641         prev_index_path = os.path.join("backup_dir", prev_index_filename)
1642
1643         # alter the source_dir randomly
1644         source_it = deltatar._recursive_walk_dir('source_dir_diff')
1645
1646         for path in source_it:
1647             # if path doesn't exist (might have previously removed) ignore it.
1648             # also ignore it (i.e. do not change it) 70% of the time
1649             if not os.path.exists(path) or random.random() < 0.7:
1650                 continue
1651
1652             # remove the file
1653             if os.path.isdir(path):
1654                 shutil.rmtree(path)
1655             else:
1656                 os.unlink(path)
1657
1658         deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1659                                     prev_index_path, max_volume_size=1)
1660
1661         # first restore initial backup in target_dir
1662         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1663         tar_path = os.path.join("backup_dir", tar_filename)
1664         deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1665
1666         # and check that target_dir equals to source_dir (which is the same as
1667         # self.GIT_DIR initially)
1668         self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1669
1670         # then apply diff backup in target_dir
1671         index_filename = deltatar.index_name_func(is_full=False)
1672         index_path = os.path.join("backup_dir2", index_filename)
1673         deltatar.restore_backup("target_dir",
1674             backup_indexes_paths=[index_path, prev_index_path])
1675
1676         # and check that target_dir equals to source_dir_diff (the randomly
1677         # altered self.GIT_DIR directory)
1678         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1679
1680         # then delete target_dir and apply diff backup from zero and check again
1681         shutil.rmtree("target_dir")
1682         deltatar.restore_backup("target_dir",
1683             backup_indexes_paths=[index_path, prev_index_path])
1684
1685         # and check that target_dir equals to source_dir_diff (the randomly
1686         # altered self.GIT_DIR directory)
1687         self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1688
1689     def check_equal_dirs(self, path1, path2, deltatar):
1690         '''
1691         compare the two directories source_dir and target_dir and check
1692         # they are the same
1693         '''
1694         source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
1695         source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
1696         target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
1697         target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1698         while True:
1699             try:
1700                 sitem = next(source_it)
1701                 titem = next(target_it)
1702             except StopIteration:
1703                 try:
1704                     titem = next(target_it)
1705                     raise Exception("iterators do not stop at the same time")
1706                 except StopIteration:
1707                     break
1708             try:
1709                 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
1710             except Exception as e:
1711                 print("SITEM: " + str(sitem))
1712                 print("TITEM: " + str(titem))
1713                 raise e
1714
1715     def test_create_no_symlinks(self):
1716         '''
1717         Creates a full backup from different varieties of symlinks. The
1718         extracted archive may not contain any symlinks but the file contents
1719         '''
1720
1721         os.system("rm -rf source_dir")
1722         os.makedirs("source_dir/symlinks")
1723         fd = os.open("source_dir/symlinks/valid_linkname",
1724                      os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1725         os.write(fd, b"valid link target for symlink tests; please ignore\n")
1726         os.close(fd)
1727         # first one is good, the rest points nowhere
1728         self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1729         self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1730         self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1731         self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
1732         password, paramversion = self.ENCRYPTION or (None, None)
1733         deltatar = DeltaTar(mode=self.MODE, password=password,
1734                             crypto_paramversion=paramversion,
1735                             logger=self.consoleLogger)
1736
1737         # create first backup
1738         deltatar.create_full_backup(source_path="source_dir",
1739                                     backup_path="backup_dir")
1740
1741         assert os.path.exists("backup_dir")
1742         shutil.rmtree("source_dir")
1743         assert not os.path.exists("source_dir")
1744
1745         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1746         tar_path = os.path.join("backup_dir", tar_filename)
1747
1748         deltatar.restore_backup(target_path="source_dir",
1749                                 backup_tar_path=tar_path)
1750
1751         for _r, _ds, fs in os.walk("source_dir/symlinks"):
1752         # only the valid link plus the linked file may be found in the
1753         # extracted archive
1754             assert len(fs) == 2
1755             for f in fs:
1756                 # the link must have been resolved and file contents must match
1757                 # the linked file
1758                 assert not os.path.islink(f)
1759                 with open("source_dir/symlinks/valid_linkname") as a:
1760                     with open("source_dir/symlinks/whatever") as b:
1761                         assert a.read() == b.read()
1762
1763     def test_restore_with_symlinks(self):
1764         '''
1765         Creates a full backup containing different varieties of symlinks. All
1766         of them must be filtered out.
1767         '''
1768         password, paramversion = self.ENCRYPTION or (None, None)
1769         deltatar = DeltaTar(mode=self.MODE, password=password,
1770                             crypto_paramversion=paramversion,
1771                             logger=self.consoleLogger)
1772
1773         # create first backup
1774         deltatar.create_full_backup(source_path="source_dir",
1775                                     backup_path="backup_dir")
1776
1777         assert os.path.exists("backup_dir")
1778         shutil.rmtree("source_dir")
1779
1780         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1781         tar_path = os.path.join("backup_dir", tar_filename)
1782
1783         # add symlinks to existing archive
1784
1785         def add_symlink (a, name, dst):
1786             l = tarfile.TarInfo("snapshot://%s" % name)
1787             l.type = tarfile.SYMTYPE
1788             l.linkname = dst
1789             a.addfile(l)
1790             return name
1791
1792         try:
1793             with tarfile.open(tar_path,mode="a") as a:
1794                 checkme = \
1795                     [ add_symlink(a, "symlinks/foo", "internal-file")
1796                     , add_symlink(a, "symlinks/bar", "/absolute/path")
1797                     , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1798         except tarfile.ReadError as e:
1799             if self.MODE == '#' or self.MODE.endswith ("gz"):
1800                 checkme = []
1801             else:
1802                 raise
1803         except ValueError as e:
1804             if self.MODE.startswith ('#'):
1805                 checkme = []
1806             else:
1807                 raise
1808
1809         deltatar.restore_backup(target_path="source_dir",
1810                                 backup_tar_path=tar_path)
1811
1812         # check what happened to our symlinks
1813         for name in checkme:
1814             fullpath = os.path.join("source_dir", name)
1815             assert not os.path.exists(fullpath)
1816
1817     def test_restore_malicious_symlinks(self):
1818         '''
1819         Creates a full backup containing a symlink and a file of the same name.
1820         This simulates a symlink attack with a link pointing to some external
1821         path that is abused to write outside the extraction prefix.
1822         '''
1823         password, paramversion = self.ENCRYPTION or (None, None)
1824         deltatar = DeltaTar(mode=self.MODE, password=password,
1825                             crypto_paramversion=paramversion,
1826                             logger=self.consoleLogger)
1827
1828         # create first backup
1829         deltatar.create_full_backup(source_path="source_dir",
1830                                     backup_path="backup_dir")
1831
1832         assert os.path.exists("backup_dir")
1833         shutil.rmtree("source_dir")
1834
1835         tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1836         tar_path = os.path.join("backup_dir", tar_filename)
1837
1838         # add symlinks to existing archive
1839
1840         def add_symlink (a, name, dst):
1841             l = tarfile.TarInfo("snapshot://%s" % name)
1842             l.type = tarfile.SYMTYPE
1843             l.linkname = dst
1844             a.addfile(l)
1845
1846         def add_file (a, name):
1847             f = tarfile.TarInfo("snapshot://%s" % name)
1848             f.type = tarfile.REGTYPE
1849             a.addfile(f)
1850
1851         testpath = "symlinks/pernicious-link"
1852         testdst = "/tmp/does/not/exist"
1853
1854         try:
1855             with tarfile.open(tar_path, mode="a") as a:
1856                 add_symlink(a, testpath, testdst)
1857                 add_symlink(a, testpath, testdst+"X")
1858                 add_symlink(a, testpath, testdst+"XXX")
1859                 add_file(a, testpath)
1860         except tarfile.ReadError as e:
1861             if self.MODE == '#' or self.MODE.endswith ("gz"):
1862                 pass
1863             else:
1864                 raise
1865         except ValueError as e:
1866             if self.MODE.startswith ('#'):
1867                 pass # O_APPEND of concat archives not feasible
1868             else:
1869                 raise
1870
1871         deltatar.restore_backup(target_path="source_dir",
1872                                 backup_tar_path=tar_path)
1873
1874         # check whether the link was extracted; deltatar seems to only ever
1875         # retrieve the first item it finds for a given path which in the case
1876         # at hand is a symlink to some non-existent path
1877         fullpath = os.path.join("source_dir", testpath)
1878         assert not os.path.exists(fullpath)
1879
1880
1881 def fsapi_access_true (self):
1882     """
1883     Chicanery for testing improper use of the *os* module.
1884     """
1885     def yes (*_a, **_ka): return True
1886     self.FSAPI_SAVED.append (("access", getattr (os, "access")))
1887     setattr (os, "access", yes)
1888
1889
1890 class DeltaTar2Test(DeltaTarTest):
1891     '''
1892     Same as DeltaTar but with specific ":" mode
1893     '''
1894     MODE = ':'
1895
1896
1897 class DeltaTarStreamTest(DeltaTarTest):
1898     '''
1899     Same as DeltaTar but with specific uncompressed stream mode
1900     '''
1901     MODE = '|'
1902
1903
1904 class DeltaTarGzipTest(DeltaTarTest):
1905     '''
1906     Same as DeltaTar but with specific gzip mode
1907     '''
1908     MODE = ':gz'
1909     MODE_COMPRESSES = True
1910
1911
1912 class DeltaTarGzipStreamTest(DeltaTarTest):
1913     '''
1914     Same as DeltaTar but with specific gzip stream mode
1915     '''
1916     MODE = '|gz'
1917     MODE_COMPRESSES = True
1918
1919
1920 @skip('Bz2 tests are too slow..')
1921 class DeltaTarBz2Test(DeltaTarTest):
1922     '''
1923     Same as DeltaTar but with specific bz2 mode
1924     '''
1925     MODE = ':bz2'
1926     MODE_COMPRESSES = True
1927
1928
1929 @skip('Bz2 tests are too slow..')
1930 class DeltaTarBz2StreamTest(DeltaTarTest):
1931     '''
1932     Same as DeltaTar but with specific bz2 stream mode
1933     '''
1934     MODE = '|bz2'
1935     MODE_COMPRESSES = True
1936
1937
1938 class DeltaTarGzipConcatTest(DeltaTarTest):
1939     '''
1940     Same as DeltaTar but with specific gzip concat stream mode
1941     '''
1942     MODE = '#gz'
1943     MODE_COMPRESSES = True
1944
1945
1946 class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
1947     '''
1948     Same as DeltaTar but with specific gzip aes128 concat stream mode
1949     '''
1950     MODE = '#gz'
1951     ENCRYPTION = ('some magic key', 1)
1952     MODE_COMPRESSES = True
1953
1954
1955 class DeltaTarAes128ConcatTest(DeltaTarTest):
1956     '''
1957     Same as DeltaTar but with specific aes128 concat stream mode
1958     '''
1959     MODE = '#'
1960     ENCRYPTION = ('some magic key', 1)
1961
1962
1963 class DeltaTarFilesystemHandlingTest(DeltaTarGzipTest):
1964     '''
1965     Mess with filesystem APIs.
1966     '''
1967     FSTEST = fsapi_access_true
1968