1 # Copyright (C) 2013 Intra2net AG
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU Lesser General Public License as published
5 # by the Free Software Foundation; either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Lesser General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see
15 # <http://www.gnu.org/licenses/lgpl-3.0.html>
17 # Author: Eduardo Robles Elvira <edulix@wadobo.com>
27 from datetime import datetime
28 from functools import partial
29 from unittest import skip, SkipTest
31 import deltatar.tarfile as tarfile
32 from deltatar.tarfile import TarFile
33 from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
34 from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35 import deltatar.crypto as crypto
37 from . import BaseTest
38 from . import new_volume_handler
40 # Enable warning messages from deltatar. This minimizes the SNR of
41 # test runs, but none of the messages are meaningful in any way.
42 VERBOSE_TEST_OUTPUT = False
44 class DeltaTarTest(BaseTest):
49 MODE_COMPRESSES = False
51 ENCRYPTION = None # (password : str, paramversion : int) option
59 self.pwd = os.getcwd()
60 os.system('rm -rf target_dir source_dir* backup_dir* huge')
61 os.makedirs('source_dir/test/test2')
63 self.hash["source_dir/test/test2"] = ''
64 self.hash["source_dir/big"] = self.create_file("source_dir/big", 50000)
65 self.hash["source_dir/small"] = self.create_file("source_dir/small", 100)
66 self.hash["source_dir/test/huge"] = self.create_file("source_dir/test/huge", 700000)
67 self.hash["source_dir/test/huge2"] = self.create_file("source_dir/test/huge2", 800000)
69 self.consoleLogger = None
70 if VERBOSE_TEST_OUTPUT is True:
71 self.consoleLogger = logging.StreamHandler()
72 self.consoleLogger.setLevel(logging.DEBUG)
74 if not os.path.isdir(self.GIT_DIR):
75 # Not running inside git tree, take our
76 # own testing directory as source.
77 self.GIT_DIR = 'testing'
79 if not os.path.isdir(self.GIT_DIR):
80 raise Exception('No input directory found: ' + self.GIT_DIR)
84 Remove temporal files created by unit tests and reset globals.
87 os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
88 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
89 ("I am fully aware that this will void my warranty.")
91 def test_restore_simple_full_backup(self):
93 Creates a full backup without any filtering and restores it.
95 password, paramversion = self.ENCRYPTION or (None, None)
96 deltatar = DeltaTar(mode=self.MODE, password=password,
97 crypto_paramversion=paramversion,
98 logger=self.consoleLogger)
100 # create first backup
101 deltatar.create_full_backup(
102 source_path="source_dir",
103 backup_path="backup_dir")
105 assert os.path.exists("backup_dir")
106 shutil.rmtree("source_dir")
108 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
109 tar_path = os.path.join("backup_dir", tar_filename)
111 deltatar.restore_backup(target_path="source_dir",
112 backup_tar_path=tar_path)
114 for key, value in self.hash.items():
115 assert os.path.exists(key)
117 assert value == self.md5sum(key)
120 def test_create_backup_max_file_length (self):
122 Creates a full backup including one file that exceeds the (purposely
123 lowered) upper bound on GCM encrypted objects. This will yield multiple
124 encrypted objects for one plaintext file.
126 Success is verified by splitting the archive at object boundaries and
129 if self.MODE_COMPRESSES is True:
130 raise SkipTest ("GCM file length test not meaningful with compression.")
131 if self.ENCRYPTION is None:
132 raise SkipTest ("GCM file length applies only to encrypted backups.")
134 new_max = 20000 # cannot be less than tar block size
135 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
136 ("I am fully aware that this will void my warranty.",
139 password, paramversion = self.ENCRYPTION
140 deltatar = DeltaTar (mode=self.MODE, password=password,
141 crypto_paramversion=paramversion,
142 logger=self.consoleLogger)
145 os.makedirs ("source_dir2")
146 for f, s in [("empty" , 0) # 1 tar objects
147 ,("slightly_larger", new_max + 1) # 2
148 ,("twice" , 2 * new_max) # 3
150 f = "source_dir2/%s" % f
151 self.hash [f] = self.create_file (f, s)
153 deltatar.create_full_backup \
154 (source_path="source_dir2", backup_path="backup_dir")
156 assert os.path.exists ("backup_dir")
157 shutil.rmtree ("source_dir2")
159 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
160 backup_path = os.path.join("backup_dir", backup_filename)
162 # split the resulting archive into its constituents without
164 ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
165 "-o backup_dir/split <\'%s\'" % backup_path)
167 assert os.path.exists ("backup_dir/split")
169 dents = os.listdir ("backup_dir/split")
170 assert len (dents) == 6
173 def test_restore_backup_max_file_length (self):
175 Creates a full backup including one file that exceeds the (purposely
176 lowered) upper bound on GCM encrypted objects. This will yield two
177 encrypted objects for one plaintext file.
179 Success is verified by splitting the archive at object boundaries and
182 if self.MODE_COMPRESSES is True:
183 raise SkipTest ("GCM file length test not meaningful with compression.")
184 if self.ENCRYPTION is None:
185 raise SkipTest ("GCM file length applies only to encrypted backups.")
187 new_max = 20000 # cannot be less than tar block size
188 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
189 ("I am fully aware that this will void my warranty.",
192 password, paramversion = self.ENCRYPTION
193 deltatar = DeltaTar (mode=self.MODE, password=password,
194 crypto_paramversion=paramversion,
195 logger=self.consoleLogger)
198 os.makedirs ("source_dir2")
199 for f, s in [("empty" , 0) # 1 tar objects
200 ,("almost_large" , new_max - 1) # 2
201 ,("large" , new_max) # 3
202 ,("slightly_larger", new_max + 1) # 4
203 ,("twice" , 2 * new_max) # 5
204 ,("twice_plus_one" , (2 * new_max) + 1) # 6
206 f = "source_dir2/%s" % f
207 self.hash [f] = self.create_file (f, s)
209 deltatar.create_full_backup \
210 (source_path="source_dir2", backup_path="backup_dir")
212 assert os.path.exists ("backup_dir")
213 shutil.rmtree ("source_dir2")
215 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
216 backup_path = os.path.join("backup_dir", backup_filename)
218 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
219 tar_path = os.path.join("backup_dir", tar_filename)
221 deltatar.restore_backup(target_path="source_dir2",
222 backup_tar_path=tar_path)
224 for key, value in self.hash.items():
225 assert os.path.exists(key)
227 assert value == self.md5sum(key)
230 def test_create_backup_index_max_file_length (self):
232 Creates a full backup with a too large index file for the upper bound
233 of the GCM encryption. Since the index file has a fixed IV file counter
234 of AES_GCM_IV_CNT_INDEX, we expect the crypto layer to abort.
236 60+ GB of (potentially compressed) index file should last for a while...
238 if self.MODE_COMPRESSES is True:
239 raise SkipTest ("GCM file length test not meaningful with compression.")
240 if self.ENCRYPTION is None:
241 raise SkipTest ("GCM file length applies only to encrypted backups.")
244 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
245 ("I am fully aware that this will void my warranty.",
248 password, paramversion = self.ENCRYPTION
249 deltatar = DeltaTar (mode=self.MODE, password=password,
250 crypto_paramversion=paramversion,
251 logger=self.consoleLogger)
254 os.makedirs ("source_dir2")
256 f = "source_dir2/dummy_%rd" % i
257 self.hash [f] = self.create_file (f, i)
259 with self.assertRaises (crypto.InvalidFileCounter):
260 deltatar.create_full_backup \
261 (source_path="source_dir2", backup_path="backup_dir")
262 shutil.rmtree ("source_dir2")
265 def test_check_index_checksum(self):
267 Creates a full backup and checks the index' checksum of files
269 password, paramversion = self.ENCRYPTION or (None, None)
270 deltatar = DeltaTar(mode=self.MODE, password=password,
271 crypto_paramversion=paramversion,
272 logger=self.consoleLogger)
274 # create first backup
275 deltatar.create_full_backup(
276 source_path="source_dir",
277 backup_path="backup_dir")
280 index_filename = deltatar.index_name_func(True)
281 index_path = os.path.join("backup_dir", index_filename)
283 f = open(index_path, 'rb')
291 if b'BEGIN-FILE-LIST' in l:
292 crc = binascii.crc32(l) & 0xFFFFffff
294 elif b'END-FILE-LIST' in l:
295 crc = binascii.crc32(l, crc) & 0xffffffff
297 # next line contains the crc
298 data = json.loads(f.readline().decode("UTF-8"))
299 assert data['type'] == 'file-list-checksum'
300 assert data['checksum'] == crc
304 crc = binascii.crc32(l, crc) & 0xffffffff
308 def test_restore_multivol(self):
310 Creates a full backup without any filtering with multiple volumes and
313 if ':gz' in self.MODE:
314 raise SkipTest('compression information is lost when creating '
315 'multiple volumes with no Stream')
317 password, paramversion = self.ENCRYPTION or (None, None)
318 deltatar = DeltaTar(mode=self.MODE, password=password,
319 crypto_paramversion=paramversion,
320 logger=self.consoleLogger)
323 os.makedirs('source_dir2')
324 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
325 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
327 # create first backup
328 deltatar.create_full_backup(
329 source_path="source_dir2",
330 backup_path="backup_dir",
333 assert os.path.exists("backup_dir")
334 assert os.path.exists(os.path.join("backup_dir",
335 deltatar.volume_name_func("backup_dir", True, 0)))
336 if self.MODE_COMPRESSES:
340 for i_vol in range(n_vols):
341 assert os.path.exists(os.path.join("backup_dir",
342 deltatar.volume_name_func("backup_dir", True, i_vol)))
343 assert not os.path.exists(os.path.join("backup_dir",
344 deltatar.volume_name_func("backup_dir", True, n_vols)))
346 shutil.rmtree("source_dir2")
348 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
349 tar_path = os.path.join("backup_dir", tar_filename)
351 # this should automatically restore all volumes
352 deltatar.restore_backup(target_path="source_dir2",
353 backup_tar_path=tar_path)
355 for key, value in self.hash.items():
356 assert os.path.exists(key)
358 assert value == self.md5sum(key)
360 def test_restore_multivol_split(self):
362 Creates a full backup without any filtering with multiple volumes
363 with big files bigger than the max volume size and
366 if self.MODE.startswith(':') or self.MODE.startswith('|'):
367 raise SkipTest('this test only works for uncompressed '
368 'or concat compressed modes')
370 password, paramversion = self.ENCRYPTION or (None, None)
371 deltatar = DeltaTar(mode=self.MODE, password=password,
372 crypto_paramversion=paramversion,
373 logger=self.consoleLogger)
376 os.makedirs('source_dir2')
377 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 3*1024*1024)
378 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 4*1024*1024)
379 self.hash["source_dir2/huge2"] = self.create_file("source_dir2/huge2", 4*1024*1024)
381 # create first backup
382 deltatar.create_full_backup(
383 source_path="source_dir2",
384 backup_path="backup_dir",
387 assert os.path.exists("backup_dir")
388 assert os.path.exists(os.path.join("backup_dir",
389 deltatar.volume_name_func("backup_dir", True, 0)))
390 if self.MODE_COMPRESSES:
394 for i_vol in range(n_vols):
395 assert os.path.exists(os.path.join("backup_dir",
396 deltatar.volume_name_func("backup_dir", True, i_vol)))
397 assert not os.path.exists(os.path.join("backup_dir",
398 deltatar.volume_name_func("backup_dir", True, n_vols)))
400 shutil.rmtree("source_dir2")
402 index_filename = deltatar.index_name_func(True)
403 index_path = os.path.join("backup_dir", index_filename)
405 deltatar.restore_backup(target_path="source_dir2",
406 backup_indexes_paths=[index_path])
408 for key, value in self.hash.items():
409 assert os.path.exists(key)
411 assert value == self.md5sum(key)
414 def test_full_backup_index_extra_data(self):
416 Tests that the index file for a full backup can store extra_data and
417 that this data can be retrieved.
419 password, paramversion = self.ENCRYPTION or (None, None)
420 deltatar = DeltaTar(mode=self.MODE, password=password,
421 crypto_paramversion=paramversion,
422 logger=self.consoleLogger)
426 otra_cosa=[1, "lista"],
427 y_otra=dict(bola=1.1)
430 deltatar.create_full_backup(
431 source_path="source_dir",
432 backup_path="backup_dir",
433 extra_data=extra_data)
435 index_filename = deltatar.index_name_func(is_full=True)
436 index_path = os.path.join("backup_dir", index_filename)
438 # iterate_index_path retrieves extra_data, and thus we can then compare
439 index_it = deltatar.iterate_index_path(index_path)
440 self.assertEqual(index_it.extra_data, extra_data)
443 def test_diff_backup_index_extra_data(self):
445 Tests that the index file for a diff backup can store extra_data and
446 that this data can be retrieved.
448 password, paramversion = self.ENCRYPTION or (None, None)
449 deltatar = DeltaTar(mode=self.MODE, password=password,
450 crypto_paramversion=paramversion,
451 logger=self.consoleLogger)
455 otra_cosa=[1, "lista"],
456 y_otra=dict(bola=1.1)
459 deltatar.create_full_backup(
460 source_path="source_dir",
461 backup_path="backup_dir")
464 prev_index_filename = deltatar.index_name_func(is_full=True)
465 prev_index_path = os.path.join("backup_dir", prev_index_filename)
467 # create empty diff backup
468 deltatar.create_diff_backup("source_dir", "backup_dir2",
469 prev_index_path, extra_data=extra_data)
471 index_filename = deltatar.index_name_func(is_full=False)
472 index_path = os.path.join("backup_dir2", index_filename)
474 # iterate_index_path retrieves extra_data, and thus we can then compare
475 index_it = deltatar.iterate_index_path(index_path)
476 self.assertEqual(index_it.extra_data, extra_data)
478 def test_restore_multivol2(self):
480 Creates a full backup without any filtering with multiple volumes and
483 password, paramversion = self.ENCRYPTION or (None, None)
484 deltatar = DeltaTar(mode=self.MODE, password=password,
485 crypto_paramversion=paramversion,
486 logger=self.consoleLogger)
488 shutil.copytree(self.GIT_DIR, "source_dir2")
490 # create first backup
491 deltatar.create_full_backup(
492 source_path="source_dir2",
493 backup_path="backup_dir",
496 assert os.path.exists("backup_dir")
497 assert os.path.exists(os.path.join("backup_dir",
498 deltatar.volume_name_func("backup_dir", True, 0)))
500 shutil.rmtree("source_dir2")
502 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
503 tar_path = os.path.join("backup_dir", tar_filename)
505 # this should automatically restore all volumes
506 deltatar.restore_backup(target_path="source_dir2",
507 backup_tar_path=tar_path)
509 self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
511 def test_restore_multivol_manual_from_index(self):
513 Creates a full backup without any filtering with multiple volumes and
516 # this test only works for uncompressed or concat compressed modes
517 if self.MODE.startswith(':') or self.MODE.startswith('|'):
518 raise SkipTest('this test only works for uncompressed '
519 'or concat compressed modes')
521 password, paramversion = self.ENCRYPTION or (None, None)
522 deltatar = DeltaTar(mode=self.MODE, password=password,
523 crypto_paramversion=paramversion,
524 logger=self.consoleLogger)
527 os.makedirs('source_dir2')
528 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
529 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
531 # create first backup
532 deltatar.create_full_backup(
533 source_path="source_dir2",
534 backup_path="backup_dir",
537 assert os.path.exists("backup_dir")
538 assert os.path.exists(os.path.join("backup_dir",
539 deltatar.volume_name_func("backup_dir", True, 0)))
540 if self.MODE_COMPRESSES:
544 for i_vol in range(n_vols):
545 assert os.path.exists(os.path.join("backup_dir",
546 deltatar.volume_name_func("backup_dir", True, i_vol)))
547 assert not os.path.exists(os.path.join("backup_dir",
548 deltatar.volume_name_func("backup_dir", True, n_vols)))
550 shutil.rmtree("source_dir2")
552 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
553 tar_path = os.path.join("backup_dir", tar_filename)
555 index_filename = deltatar.index_name_func(True)
556 index_path = os.path.join("backup_dir", index_filename)
558 # this should automatically restore the huge file
559 f = deltatar.open_auxiliary_file(index_path, 'r')
565 data = json.loads(l.decode('UTF-8'))
566 if data.get('type', '') == 'file' and\
567 deltatar.unprefixed(data['path']) == "huge":
568 offset = data['offset']
571 assert offset is not None
573 fo = open(tar_path, 'rb')
575 def new_volume_handler(mode, tarobj, base_name, volume_number):
576 suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
577 if self.ENCRYPTION is not None:
578 # deltatar module is shadowed here
579 suf += "." + deltatar_PDTCRYPT_EXTENSION
580 tarobj.open_volume(datetime.now().strftime(
581 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
582 new_volume_handler = partial(new_volume_handler, self.MODE)
585 if self.ENCRYPTION is not None:
586 crypto_ctx = crypto.Decrypt (password)
588 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
589 new_volume_handler=new_volume_handler,
590 encryption=crypto_ctx)
592 member = tarobj.next()
593 member.path = deltatar.unprefixed(member.path)
594 member.name = deltatar.unprefixed(member.name)
595 tarobj.extract(member)
598 assert self.hash['source_dir2/huge'] == self.md5sum('huge')
603 def test_restore_manual_from_index_twice (self):
605 Creates a full backup and restore the same file twice. This *must* fail
606 when encryption is active.
608 Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
609 backwards within the same file. This prevents the encryption layer from
610 exploding due to a reused IV in an overall valid archive.
612 This test anticipates possible future mistakes since it’s entirely
613 feasible to implement backward seeks for *_Stream* with concat mode.
615 # this test only works for uncompressed or concat compressed modes
616 if self.MODE.startswith("|") or self.MODE_COMPRESSES:
617 raise SkipTest("this test only works for uncompressed "
618 "or concat compressed modes")
620 password, paramversion = self.ENCRYPTION or (None, None)
621 deltatar = DeltaTar(mode=self.MODE, password=password,
622 crypto_paramversion=paramversion,
623 logger=self.consoleLogger)
626 os.makedirs("source_dir2")
627 self.hash["source_dir2/samefile"] = \
628 self.create_file("source_dir2/samefile", 1 * 1024)
630 # create first backup
631 deltatar.create_full_backup(
632 source_path="source_dir2",
633 backup_path="backup_dir")
635 assert os.path.exists("backup_dir")
636 assert os.path.exists(os.path.join("backup_dir",
637 deltatar.volume_name_func("backup_dir", True, 0)))
639 shutil.rmtree("source_dir2")
641 tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
642 tar_path = os.path.join("backup_dir", tar_filename)
644 index_filename = deltatar.index_name_func(True)
645 index_path = os.path.join("backup_dir", index_filename)
647 f = deltatar.open_auxiliary_file(index_path, "r")
653 data = json.loads(l.decode("UTF-8"))
654 if data.get("type", "") == "file" and\
655 deltatar.unprefixed(data["path"]) == "samefile":
656 offset = data["offset"]
659 assert offset is not None
661 fo = open(tar_path, "rb")
665 if self.ENCRYPTION is not None:
666 crypto_ctx = crypto.Decrypt (password)
668 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
669 encryption=crypto_ctx)
670 member = tarobj.next()
671 member.path = deltatar.unprefixed(member.path)
672 member.name = deltatar.unprefixed(member.name)
675 tarobj.extract(member)
676 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
680 tarobj.extract(member)
681 except tarfile.StreamError:
682 if crypto_ctx is not None:
683 pass # good: seeking backwards not allowed
688 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
690 os.unlink("samefile")
693 def test_restore_from_index(self):
695 Restores a full backup using an index file.
697 if self.MODE.startswith(':') or self.MODE.startswith('|'):
698 raise SkipTest('this test only works for uncompressed '
699 'or concat compressed modes')
701 password, paramversion = self.ENCRYPTION or (None, None)
702 deltatar = DeltaTar(mode=self.MODE, password=password,
703 crypto_paramversion=paramversion,
704 logger=self.consoleLogger)
706 # create first backup
707 deltatar.create_full_backup(
708 source_path="source_dir",
709 backup_path="backup_dir")
711 shutil.rmtree("source_dir")
713 # this should automatically restore all volumes
714 index_filename = deltatar.index_name_func(True)
715 index_path = os.path.join("backup_dir", index_filename)
717 deltatar.restore_backup(target_path="source_dir",
718 backup_indexes_paths=[index_path])
720 for key, value in self.hash.items():
721 assert os.path.exists(key)
723 assert value == self.md5sum(key)
725 def test_restore_multivol_from_index(self):
727 Restores a full multivolume backup using an index file.
729 if self.MODE.startswith(':') or self.MODE.startswith('|'):
730 raise SkipTest('this test only works for uncompressed '
731 'or concat compressed modes')
733 password, paramversion = self.ENCRYPTION or (None, None)
734 deltatar = DeltaTar(mode=self.MODE, password=password,
735 crypto_paramversion=paramversion,
736 logger=self.consoleLogger)
738 # create first backup
739 deltatar.create_full_backup(
740 source_path="source_dir",
741 backup_path="backup_dir",
744 shutil.rmtree("source_dir")
746 # this should automatically restore all volumes
747 index_filename = deltatar.index_name_func(True)
748 index_path = os.path.join("backup_dir", index_filename)
750 deltatar.restore_backup(target_path="source_dir",
751 backup_indexes_paths=[index_path])
753 for key, value in self.hash.items():
754 assert os.path.exists(key)
756 assert value == self.md5sum(key)
758 def test_create_basic_filtering(self):
760 Tests create backup basic filtering.
762 password, paramversion = self.ENCRYPTION or (None, None)
763 deltatar = DeltaTar(mode=self.MODE, password=password,
764 crypto_paramversion=paramversion,
765 logger=self.consoleLogger,
766 included_files=["test", "small"],
767 excluded_files=["test/huge"])
769 # create first backup
770 deltatar.create_full_backup(
771 source_path="source_dir",
772 backup_path="backup_dir")
774 assert os.path.exists("backup_dir")
775 shutil.rmtree("source_dir")
777 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
778 tar_path = os.path.join("backup_dir", tar_filename)
780 deltatar.restore_backup(target_path="source_dir",
781 backup_tar_path=tar_path)
783 assert os.path.exists("source_dir/small")
784 assert os.path.exists("source_dir/test")
785 assert os.path.exists("source_dir/test/huge2")
786 assert os.path.exists("source_dir/test/test2")
788 assert not os.path.exists("source_dir/test/huge")
789 assert not os.path.exists("source_dir/big")
791 def test_create_filter_func(self):
793 Tests create backup basic filtering.
796 def filter_func(visited_paths, path):
797 if path not in visited_paths:
798 visited_paths.append(path)
801 filter_func = partial(filter_func, visited_paths)
803 password, paramversion = self.ENCRYPTION or (None, None)
804 deltatar = DeltaTar(mode=self.MODE, password=password,
805 crypto_paramversion=paramversion,
806 logger=self.consoleLogger,
807 included_files=["test", "small"],
808 excluded_files=["test/huge"],
809 filter_func=filter_func)
811 # create first backup
812 deltatar.create_full_backup(
813 source_path="source_dir",
814 backup_path="backup_dir")
816 assert os.path.exists("backup_dir")
817 shutil.rmtree("source_dir")
819 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
820 tar_path = os.path.join("backup_dir", tar_filename)
822 deltatar.restore_backup(target_path="source_dir",
823 backup_tar_path=tar_path)
824 assert set(visited_paths) == set([
831 def test_create_filter_out_func(self):
833 Tests create backup basic filtering.
836 def filter_func(visited_paths, path):
838 Filter out everything
840 if path not in visited_paths:
841 visited_paths.append(path)
844 filter_func = partial(filter_func, visited_paths)
846 password, paramversion = self.ENCRYPTION or (None, None)
847 deltatar = DeltaTar(mode=self.MODE, password=password,
848 crypto_paramversion=paramversion,
849 logger=self.consoleLogger,
850 included_files=["test", "small"],
851 excluded_files=["test/huge"],
852 filter_func=filter_func)
854 # create first backup
855 deltatar.create_full_backup(
856 source_path="source_dir",
857 backup_path="backup_dir")
859 assert os.path.exists("backup_dir")
860 shutil.rmtree("source_dir")
862 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
863 tar_path = os.path.join("backup_dir", tar_filename)
865 deltatar.restore_backup(target_path="source_dir",
866 backup_tar_path=tar_path)
867 assert set(visited_paths) == set([
872 # check that effectively no file was backed up
873 assert not os.path.exists("source_dir/small")
874 assert not os.path.exists("source_dir/big")
875 assert not os.path.exists("source_dir/test")
877 def test_restore_index_basic_filtering(self):
879 Creates a backup, and then filter when doing the index based restore.
881 if self.MODE.startswith(':') or self.MODE.startswith('|'):
882 raise SkipTest('this test only works for uncompressed '
883 'or concat compressed modes')
885 password, paramversion = self.ENCRYPTION or (None, None)
886 deltatar = DeltaTar(mode=self.MODE, password=password,
887 crypto_paramversion=paramversion,
888 logger=self.consoleLogger)
890 # create first backup
891 deltatar.create_full_backup(
892 source_path="source_dir",
893 backup_path="backup_dir")
895 assert os.path.exists("backup_dir")
896 shutil.rmtree("source_dir")
898 index_filename = deltatar.index_name_func(True)
899 index_path = os.path.join("backup_dir", index_filename)
901 deltatar.included_files = ["test", "small"]
902 deltatar.excluded_files = ["test/huge"]
903 deltatar.restore_backup(target_path="source_dir",
904 backup_indexes_paths=[index_path])
906 assert os.path.exists("source_dir/small")
907 assert os.path.exists("source_dir/test")
908 assert os.path.exists("source_dir/test/huge2")
909 assert os.path.exists("source_dir/test/test2")
911 assert not os.path.exists("source_dir/test/huge")
912 assert not os.path.exists("source_dir/big")
914 def test_restore_index_filter_func(self):
916 Creates a backup, and then filter when doing the index based restore,
917 using the filter function.
919 if self.MODE.startswith(':') or self.MODE.startswith('|'):
920 raise SkipTest('this test only works for uncompressed '
921 'or concat compressed modes')
924 def filter_func(visited_paths, path):
925 if path not in visited_paths:
926 visited_paths.append(path)
929 filter_func = partial(filter_func, visited_paths)
931 password, paramversion = self.ENCRYPTION or (None, None)
932 deltatar = DeltaTar(mode=self.MODE, password=password,
933 crypto_paramversion=paramversion,
934 logger=self.consoleLogger)
936 # create first backup
937 deltatar.create_full_backup(
938 source_path="source_dir",
939 backup_path="backup_dir")
941 assert os.path.exists("backup_dir")
942 shutil.rmtree("source_dir")
944 index_filename = deltatar.index_name_func(True)
945 index_path = os.path.join("backup_dir", index_filename)
947 deltatar.included_files = ["test", "small"]
948 deltatar.excluded_files = ["test/huge"]
949 deltatar.filter_func = filter_func
950 deltatar.restore_backup(target_path="source_dir",
951 backup_indexes_paths=[index_path])
953 assert set(visited_paths) == set([
960 def test_restore_tar_basic_filtering(self):
962 Creates a backup, and then filter when doing the tar based restore.
964 password, paramversion = self.ENCRYPTION or (None, None)
965 deltatar = DeltaTar(mode=self.MODE, password=password,
966 crypto_paramversion=paramversion,
967 logger=self.consoleLogger)
969 # create first backup
970 deltatar.create_full_backup(
971 source_path="source_dir",
972 backup_path="backup_dir")
974 assert os.path.exists("backup_dir")
975 shutil.rmtree("source_dir")
977 deltatar.included_files = ["test", "small"]
978 deltatar.excluded_files = ["test/huge"]
980 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
981 tar_path = os.path.join("backup_dir", tar_filename)
983 deltatar.restore_backup(target_path="source_dir",
984 backup_tar_path=tar_path)
986 assert os.path.exists("source_dir/small")
987 assert os.path.exists("source_dir/test")
988 assert os.path.exists("source_dir/test/huge2")
989 assert os.path.exists("source_dir/test/test2")
991 assert not os.path.exists("source_dir/test/huge")
992 assert not os.path.exists("source_dir/big")
994 def test_restore_tar_filter_func(self):
996 Creates a backup, and then filter when doing the tar based restore,
997 using the filter function.
1000 def filter_func(visited_paths, path):
1001 if path not in visited_paths:
1002 visited_paths.append(path)
1005 filter_func = partial(filter_func, visited_paths)
1007 password, paramversion = self.ENCRYPTION or (None, None)
1008 deltatar = DeltaTar(mode=self.MODE, password=password,
1009 crypto_paramversion=paramversion,
1010 logger=self.consoleLogger)
1012 # create first backup
1013 deltatar.create_full_backup(
1014 source_path="source_dir",
1015 backup_path="backup_dir")
1017 assert os.path.exists("backup_dir")
1018 shutil.rmtree("source_dir")
1020 index_filename = deltatar.index_name_func(True)
1021 index_path = os.path.join("backup_dir", index_filename)
1023 deltatar.included_files = ["test", "small"]
1024 deltatar.excluded_files = ["test/huge"]
1025 deltatar.filter_func = filter_func
1027 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1028 tar_path = os.path.join("backup_dir", tar_filename)
1030 deltatar.restore_backup(target_path="source_dir",
1031 backup_tar_path=tar_path)
1032 assert set(visited_paths) == set([
1039 def test_filter_path_regexp(self):
1041 Test specifically the deltatar.filter_path function with regular
1045 re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1046 re.compile('^yes$'),
1050 re.compile('^testing/in_the'),
1052 deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1053 excluded_files=excluded_files)
1055 # assert valid and invalid paths
1056 assert deltatar.filter_path('test/hola')
1057 assert deltatar.filter_path('test/hola/any/thing')
1058 assert deltatar.filter_path('test/caracola/caracolero')
1059 assert deltatar.filter_path('test/caracola/caracolero/yeah')
1060 assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1061 assert deltatar.filter_path('yes')
1062 assert deltatar.filter_path('testing')
1063 assert deltatar.filter_path('testing/yes')
1064 assert deltatar.filter_path('testing/in_th')
1066 assert not deltatar.filter_path('something')
1067 assert not deltatar.filter_path('other/thing')
1068 assert not deltatar.filter_path('test_ing')
1069 assert not deltatar.filter_path('test/hola_lala')
1070 assert not deltatar.filter_path('test/agur')
1071 assert not deltatar.filter_path('testing_something')
1072 assert not deltatar.filter_path('yeso')
1073 assert not deltatar.filter_path('yes/o')
1074 assert not deltatar.filter_path('yes_o')
1075 assert not deltatar.filter_path('testing/in_the')
1076 assert not deltatar.filter_path('testing/in_the_field')
1077 assert not deltatar.filter_path('testing/in_the/field')
1079 def test_filter_path_parent(self):
1081 Test specifically the deltatar.filter_path function for parent matching
1084 'testing/path/to/some/thing'
1086 deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1088 # assert valid and invalid paths
1089 assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1090 assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1091 assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1092 assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1093 assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1094 assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1095 assert deltatar.filter_path('testing/something/else') == NO_MATCH
1097 def test_parent_matching_simple_full_backup(self):
1099 Create a full backup using parent matching
1105 password, paramversion = self.ENCRYPTION or (None, None)
1106 deltatar = DeltaTar(mode=self.MODE, password=password,
1107 crypto_paramversion=paramversion,
1108 logger=self.consoleLogger,
1109 included_files=included_files)
1111 # create first backup
1112 deltatar.create_full_backup(
1113 source_path="source_dir",
1114 backup_path="backup_dir")
1116 assert os.path.exists("backup_dir")
1117 shutil.rmtree("source_dir")
1119 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1120 tar_path = os.path.join("backup_dir", tar_filename)
1122 deltatar = DeltaTar(mode=self.MODE, password=password,
1123 logger=self.consoleLogger)
1124 deltatar.restore_backup(target_path="source_dir",
1125 backup_tar_path=tar_path)
1127 assert os.path.exists('source_dir/test/huge2')
1128 assert os.path.exists('source_dir/test/')
1129 assert not os.path.exists('source_dir/test/huge')
1130 assert not os.path.exists('source_dir/big')
1131 assert not os.path.exists('source_dir/small')
1133 def test_parent_matching_simple_full_backup_restore(self):
1135 Create a full backup and restores it using parent matching
1141 password, paramversion = self.ENCRYPTION or (None, None)
1142 deltatar = DeltaTar(mode=self.MODE, password=password,
1143 crypto_paramversion=paramversion,
1144 logger=self.consoleLogger)
1146 # create first backup
1147 deltatar.create_full_backup(
1148 source_path="source_dir",
1149 backup_path="backup_dir")
1151 assert os.path.exists("backup_dir")
1152 shutil.rmtree("source_dir")
1154 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1155 tar_path = os.path.join("backup_dir", tar_filename)
1157 deltatar = DeltaTar(mode=self.MODE, password=password,
1158 logger=self.consoleLogger,
1159 included_files=included_files)
1160 deltatar.restore_backup(target_path="source_dir",
1161 backup_tar_path=tar_path)
1163 assert os.path.exists('source_dir/test/huge2')
1164 assert os.path.exists('source_dir/test/')
1165 assert not os.path.exists('source_dir/test/huge')
1166 assert not os.path.exists('source_dir/big')
1167 assert not os.path.exists('source_dir/small')
1169 def test_parent_matching_index_full_backup_restore(self):
1171 Create a full backup and restores it using parent matching
1177 password, paramversion = self.ENCRYPTION or (None, None)
1178 deltatar = DeltaTar(mode=self.MODE, password=password,
1179 crypto_paramversion=paramversion,
1180 logger=self.consoleLogger)
1182 # create first backup
1183 deltatar.create_full_backup(
1184 source_path="source_dir",
1185 backup_path="backup_dir")
1187 assert os.path.exists("backup_dir")
1188 shutil.rmtree("source_dir")
1190 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1191 tar_path = os.path.join("backup_dir", tar_filename)
1193 deltatar = DeltaTar(mode=self.MODE, password=password,
1194 logger=self.consoleLogger,
1195 included_files=included_files)
1196 deltatar.restore_backup(target_path="source_dir",
1197 backup_tar_path=tar_path)
1199 assert os.path.exists('source_dir/test/huge2')
1200 assert os.path.exists('source_dir/test/')
1201 assert not os.path.exists('source_dir/test/huge')
1202 assert not os.path.exists('source_dir/big')
1203 assert not os.path.exists('source_dir/small')
1205 def test_collate_iterators(self):
1207 Tests the collate iterators functionality with two exact directories,
1208 using an index iterator from a backup and the exact same source dir.
1210 password, paramversion = self.ENCRYPTION or (None, None)
1211 deltatar = DeltaTar(mode=self.MODE, password=password,
1212 crypto_paramversion=paramversion,
1213 logger=self.consoleLogger)
1215 # create first backup
1216 deltatar.create_full_backup(
1217 source_path="source_dir",
1218 backup_path="backup_dir")
1220 assert os.path.exists("backup_dir")
1223 index_filename = deltatar.index_name_func(is_full=True)
1224 index_path = os.path.join(cwd, "backup_dir", index_filename)
1225 index_it = deltatar.iterate_index_path(index_path)
1227 os.chdir('source_dir')
1228 dir_it = deltatar._recursive_walk_dir('.')
1229 path_it = deltatar.jsonize_path_iterator(dir_it)
1232 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1233 assert deltatar._equal_stat_dicts(path1, path2)
1237 def test_collate_iterators_diffdirs(self):
1239 Use the collate iterators functionality with two different directories.
1240 It must behave in an expected way.
1242 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1244 password, paramversion = self.ENCRYPTION or (None, None)
1245 deltatar = DeltaTar(mode=self.MODE, password=password,
1246 crypto_paramversion=paramversion,
1247 logger=self.consoleLogger)
1249 # create first backup
1250 deltatar.create_full_backup(
1251 source_path="source_dir",
1252 backup_path="backup_dir")
1254 assert os.path.exists("backup_dir")
1255 self.hash["source_dir/z"] = self.create_file("source_dir/z", 100)
1258 index_filename = deltatar.index_name_func(is_full=True)
1259 index_path = os.path.join(cwd, "backup_dir", index_filename)
1260 index_it = deltatar.iterate_index_path(index_path)
1262 os.chdir('source_dir')
1263 dir_it = deltatar._recursive_walk_dir('.')
1264 path_it = deltatar.jsonize_path_iterator(dir_it)
1267 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1268 if path2['path'] == 'z':
1271 assert deltatar._equal_stat_dicts(path1, path2)
1275 def test_collate_iterators_diffdirs2(self):
1277 Use the collate iterators functionality with two different directories.
1278 It must behave in an expected way.
1280 password, paramversion = self.ENCRYPTION or (None, None)
1281 deltatar = DeltaTar(mode=self.MODE, password=password,
1282 crypto_paramversion=paramversion,
1283 logger=self.consoleLogger)
1285 # create first backup
1286 deltatar.create_full_backup(
1287 source_path="source_dir",
1288 backup_path="backup_dir")
1290 assert os.path.exists("backup_dir")
1292 # add some new files and directories
1293 os.makedirs('source_dir/bigdir')
1294 self.hash["source_dir/bigdir"] = ""
1295 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1296 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1297 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1300 index_filename = deltatar.index_name_func(is_full=True)
1301 index_path = os.path.join(cwd, "backup_dir", index_filename)
1302 index_it = deltatar.iterate_index_path(index_path)
1304 os.chdir('source_dir')
1305 dir_it = deltatar._recursive_walk_dir('.')
1306 path_it = deltatar.jsonize_path_iterator(dir_it)
1311 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1312 visited_pairs.append(
1313 (deltatar.unprefixed(path1['path']) if path1 else None,
1314 path2['path'] if path2 else None)
1317 assert visited_pairs == [
1320 (u'small', u'small'),
1323 (None, u'bigdir/a'),
1324 (None, u'bigdir/b'),
1325 (u'test/huge', u'test/huge'),
1326 (u'test/huge2', u'test/huge2'),
1327 (u'test/test2', u'test/test2'),
1331 def test_create_empty_diff_backup(self):
1333 Creates an empty (no changes) backup diff
1335 password, paramversion = self.ENCRYPTION or (None, None)
1336 deltatar = DeltaTar(mode=self.MODE, password=password,
1337 crypto_paramversion=paramversion,
1338 logger=self.consoleLogger)
1340 # create first backup
1341 deltatar.create_full_backup(
1342 source_path="source_dir",
1343 backup_path="backup_dir")
1345 prev_index_filename = deltatar.index_name_func(is_full=True)
1346 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1348 deltatar.create_diff_backup("source_dir", "backup_dir2",
1352 index_path = os.path.join("backup_dir2",
1353 deltatar.index_name_func(is_full=False))
1354 index_it = deltatar.iterate_index_path(index_path)
1358 assert i[0]['path'].startswith("list://")
1362 # check the tar file
1363 assert os.path.exists("backup_dir2")
1364 shutil.rmtree("source_dir")
1366 tar_filename = deltatar.volume_name_func('backup_dir2',
1367 is_full=False, volume_number=0)
1368 tar_path = os.path.join("backup_dir2", tar_filename)
1370 # no file restored, because the diff was empty
1371 deltatar.restore_backup(target_path="source_dir",
1372 backup_tar_path=tar_path)
1373 assert len(os.listdir("source_dir")) == 0
1376 def test_create_diff_backup1(self):
1378 Creates a diff backup when there are new files
1380 password, paramversion = self.ENCRYPTION or (None, None)
1381 deltatar = DeltaTar(mode=self.MODE, password=password,
1382 crypto_paramversion=paramversion,
1383 logger=self.consoleLogger)
1385 # create first backup
1386 deltatar.create_full_backup(
1387 source_path="source_dir",
1388 backup_path="backup_dir")
1390 prev_index_filename = deltatar.index_name_func(is_full=True)
1391 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1393 # add some new files and directories
1394 os.makedirs('source_dir/bigdir')
1395 self.hash["source_dir/bigdir"] = ""
1396 os.unlink("source_dir/small")
1397 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1398 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1399 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1401 deltatar.create_diff_backup("source_dir", "backup_dir2",
1405 index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
1406 index_it = deltatar.iterate_index_path(index_path)
1407 l = [i[0]['path'] for i in index_it]
1411 'snapshot://bigdir',
1415 'snapshot://bigdir/a',
1416 'snapshot://bigdir/b',
1418 'list://test/huge2',
1419 'list://test/test2',
1422 # check the tar file
1423 assert os.path.exists("backup_dir2")
1424 shutil.rmtree("source_dir")
1426 # create source_dir with the small file, that will be then deleted by
1427 # the restore_backup
1428 os.mkdir("source_dir")
1429 open("source_dir/small", 'wb').close()
1431 tar_filename = deltatar.volume_name_func('backup_dir2',
1432 is_full=False, volume_number=0)
1433 tar_path = os.path.join("backup_dir2", tar_filename)
1435 # restore the backup, this will create only the new files
1436 deltatar.restore_backup(target_path="source_dir",
1437 backup_tar_path=tar_path)
1438 # the order doesn't matter
1439 assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
1441 def test_restore_from_index_diff_backup(self):
1443 Creates a full backup, modifies some files, creates a diff backup,
1444 then restores the diff backup from zero.
1446 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1447 raise SkipTest('this test only works for uncompressed '
1448 'or concat compressed modes')
1450 password, paramversion = self.ENCRYPTION or (None, None)
1451 deltatar = DeltaTar(mode=self.MODE, password=password,
1452 crypto_paramversion=paramversion,
1453 logger=self.consoleLogger)
1455 # create first backup
1456 deltatar.create_full_backup(
1457 source_path="source_dir",
1458 backup_path="backup_dir")
1460 prev_index_filename = deltatar.index_name_func(is_full=True)
1461 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1463 # add some new files and directories
1464 os.makedirs('source_dir/bigdir')
1465 self.hash["source_dir/bigdir"] = ""
1466 os.unlink("source_dir/small")
1467 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1468 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1469 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1471 deltatar.create_diff_backup("source_dir", "backup_dir2",
1474 # apply diff backup in target_dir
1475 index_filename = deltatar.index_name_func(is_full=False)
1476 index_path = os.path.join("backup_dir2", index_filename)
1477 deltatar.restore_backup("target_dir",
1478 backup_indexes_paths=[index_path, prev_index_path])
1480 # then compare the two directories source_dir and target_dir and check
1482 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1484 def test_restore_from_index_diff_backup2(self):
1486 Creates a full backup, modifies some files, creates a diff backup,
1487 then restores the diff backup with the full backup as a starting point.
1489 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1490 raise SkipTest('this test only works for uncompressed '
1491 'or concat compressed modes')
1493 password, paramversion = self.ENCRYPTION or (None, None)
1494 deltatar = DeltaTar(mode=self.MODE, password=password,
1495 crypto_paramversion=paramversion,
1496 logger=self.consoleLogger)
1498 # create first backup
1499 deltatar.create_full_backup(
1500 source_path="source_dir",
1501 backup_path="backup_dir")
1503 prev_index_filename = deltatar.index_name_func(is_full=True)
1504 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1506 # add some new files and directories
1507 os.makedirs('source_dir/bigdir')
1508 self.hash["source_dir/bigdir"] = ""
1509 os.unlink("source_dir/small")
1510 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1511 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1512 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1513 shutil.rmtree("source_dir/test")
1515 deltatar.create_diff_backup("source_dir", "backup_dir2",
1518 # first restore initial backup in target_dir
1519 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1520 tar_path = os.path.join("backup_dir", tar_filename)
1521 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1523 # then apply diff backup in target_dir
1524 index_filename = deltatar.index_name_func(is_full=False)
1525 index_path = os.path.join("backup_dir2", index_filename)
1526 deltatar.restore_backup("target_dir",
1527 backup_indexes_paths=[index_path, prev_index_path])
1529 # then compare the two directories source_dir and target_dir and check
1531 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1533 def test_restore_from_index_diff_backup3(self):
1535 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1536 diff backup, then restores the diff backup with the full backup as a
1539 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1540 raise SkipTest('this test only works for uncompressed '
1541 'or concat compressed modes')
1543 password, paramversion = self.ENCRYPTION or (None, None)
1544 deltatar = DeltaTar(mode=self.MODE, password=password,
1545 crypto_paramversion=paramversion,
1546 logger=self.consoleLogger)
1548 shutil.rmtree("source_dir")
1549 shutil.copytree(self.GIT_DIR, "source_dir")
1550 shutil.copytree(self.GIT_DIR, "source_dir_diff")
1552 # create first backup
1553 deltatar.create_full_backup(
1554 source_path="source_dir",
1555 backup_path="backup_dir")
1557 prev_index_filename = deltatar.index_name_func(is_full=True)
1558 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1560 # alter the source_dir randomly
1561 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1563 for path in source_it:
1564 # if path doesn't exist (might have previously removed) ignore it.
1565 # also ignore it (i.e. do not change it) 70% of the time
1566 if not os.path.exists(path) or random.random() < 0.7:
1570 if os.path.isdir(path):
1575 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1578 # first restore initial backup in target_dir
1579 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1580 tar_path = os.path.join("backup_dir", tar_filename)
1581 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1583 # and check that target_dir equals to source_dir (which is the same as
1584 # self.GIT_DIR initially)
1585 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1587 # then apply diff backup in target_dir
1588 index_filename = deltatar.index_name_func(is_full=False)
1589 index_path = os.path.join("backup_dir2", index_filename)
1590 deltatar.restore_backup("target_dir",
1591 backup_indexes_paths=[index_path, prev_index_path])
1593 # and check that target_dir equals to source_dir_diff (the randomly
1594 # altered self.GIT_DIR directory)
1595 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1597 # then delete target_dir and apply diff backup from zero and check again
1598 shutil.rmtree("target_dir")
1599 deltatar.restore_backup("target_dir",
1600 backup_indexes_paths=[index_path, prev_index_path])
1602 # and check that target_dir equals to source_dir_diff (the randomly
1603 # altered self.GIT_DIR directory)
1604 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1606 def test_restore_from_index_diff_backup3_multivol(self):
1608 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1609 diff backup, then restores the diff backup with the full backup as a
1612 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1613 raise SkipTest('this test only works for uncompressed '
1614 'or concat compressed modes')
1616 password, paramversion = self.ENCRYPTION or (None, None)
1617 deltatar = DeltaTar(mode=self.MODE, password=password,
1618 crypto_paramversion=paramversion,
1619 logger=self.consoleLogger)
1621 shutil.rmtree("source_dir")
1622 shutil.copytree(self.GIT_DIR, "source_dir")
1623 shutil.copytree(self.GIT_DIR, "source_dir_diff")
1625 # create first backup
1626 deltatar.create_full_backup(
1627 source_path="source_dir",
1628 backup_path="backup_dir",
1631 prev_index_filename = deltatar.index_name_func(is_full=True)
1632 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1634 # alter the source_dir randomly
1635 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1637 for path in source_it:
1638 # if path doesn't exist (might have previously removed) ignore it.
1639 # also ignore it (i.e. do not change it) 70% of the time
1640 if not os.path.exists(path) or random.random() < 0.7:
1644 if os.path.isdir(path):
1649 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1650 prev_index_path, max_volume_size=1)
1652 # first restore initial backup in target_dir
1653 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1654 tar_path = os.path.join("backup_dir", tar_filename)
1655 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1657 # and check that target_dir equals to source_dir (which is the same as
1658 # self.GIT_DIR initially)
1659 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1661 # then apply diff backup in target_dir
1662 index_filename = deltatar.index_name_func(is_full=False)
1663 index_path = os.path.join("backup_dir2", index_filename)
1664 deltatar.restore_backup("target_dir",
1665 backup_indexes_paths=[index_path, prev_index_path])
1667 # and check that target_dir equals to source_dir_diff (the randomly
1668 # altered self.GIT_DIR directory)
1669 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1671 # then delete target_dir and apply diff backup from zero and check again
1672 shutil.rmtree("target_dir")
1673 deltatar.restore_backup("target_dir",
1674 backup_indexes_paths=[index_path, prev_index_path])
1676 # and check that target_dir equals to source_dir_diff (the randomly
1677 # altered self.GIT_DIR directory)
1678 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1680 def check_equal_dirs(self, path1, path2, deltatar):
1682 compare the two directories source_dir and target_dir and check
1685 source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
1686 source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
1687 target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
1688 target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1691 sitem = next(source_it)
1692 titem = next(target_it)
1693 except StopIteration:
1695 titem = next(target_it)
1696 raise Exception("iterators do not stop at the same time")
1697 except StopIteration:
1700 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
1701 except Exception as e:
1702 print("SITEM: " + str(sitem))
1703 print("TITEM: " + str(titem))
1706 def test_create_no_symlinks(self):
1708 Creates a full backup from different varieties of symlinks. The
1709 extracted archive may not contain any symlinks but the file contents
1712 os.system("rm -rf source_dir")
1713 os.makedirs("source_dir/symlinks")
1714 fd = os.open("source_dir/symlinks/valid_linkname",
1715 os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1716 os.write(fd, b"valid link target for symlink tests; please ignore\n")
1718 # first one is good, the rest points nowhere
1719 self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1720 self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1721 self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1722 self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
1723 password, paramversion = self.ENCRYPTION or (None, None)
1724 deltatar = DeltaTar(mode=self.MODE, password=password,
1725 crypto_paramversion=paramversion,
1726 logger=self.consoleLogger)
1728 # create first backup
1729 deltatar.create_full_backup(source_path="source_dir",
1730 backup_path="backup_dir")
1732 assert os.path.exists("backup_dir")
1733 shutil.rmtree("source_dir")
1734 assert not os.path.exists("source_dir")
1736 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1737 tar_path = os.path.join("backup_dir", tar_filename)
1739 deltatar.restore_backup(target_path="source_dir",
1740 backup_tar_path=tar_path)
1742 for _r, _ds, fs in os.walk("source_dir/symlinks"):
1743 # only the valid link plus the linked file may be found in the
1747 # the link must have been resolved and file contents must match
1749 assert not os.path.islink(f)
1750 with open("source_dir/symlinks/valid_linkname") as a:
1751 with open("source_dir/symlinks/whatever") as b:
1752 assert a.read() == b.read()
1754 def test_restore_with_symlinks(self):
1756 Creates a full backup containing different varieties of symlinks. All
1757 of them must be filtered out.
1759 password, paramversion = self.ENCRYPTION or (None, None)
1760 deltatar = DeltaTar(mode=self.MODE, password=password,
1761 crypto_paramversion=paramversion,
1762 logger=self.consoleLogger)
1764 # create first backup
1765 deltatar.create_full_backup(source_path="source_dir",
1766 backup_path="backup_dir")
1768 assert os.path.exists("backup_dir")
1769 shutil.rmtree("source_dir")
1771 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1772 tar_path = os.path.join("backup_dir", tar_filename)
1774 # add symlinks to existing archive
1776 def add_symlink (a, name, dst):
1777 l = tarfile.TarInfo("snapshot://%s" % name)
1778 l.type = tarfile.SYMTYPE
1784 with tarfile.open(tar_path,mode="a") as a:
1786 [ add_symlink(a, "symlinks/foo", "internal-file")
1787 , add_symlink(a, "symlinks/bar", "/absolute/path")
1788 , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1789 except tarfile.ReadError as e:
1790 if self.MODE == '#' or self.MODE.endswith ("gz"):
1794 except ValueError as e:
1795 if self.MODE.startswith ('#'):
1800 deltatar.restore_backup(target_path="source_dir",
1801 backup_tar_path=tar_path)
1803 # check what happened to our symlinks
1804 for name in checkme:
1805 fullpath = os.path.join("source_dir", name)
1806 assert not os.path.exists(fullpath)
1808 def test_restore_malicious_symlinks(self):
1810 Creates a full backup containing a symlink and a file of the same name.
1811 This simulates a symlink attack with a link pointing to some external
1812 path that is abused to write outside the extraction prefix.
1814 password, paramversion = self.ENCRYPTION or (None, None)
1815 deltatar = DeltaTar(mode=self.MODE, password=password,
1816 crypto_paramversion=paramversion,
1817 logger=self.consoleLogger)
1819 # create first backup
1820 deltatar.create_full_backup(source_path="source_dir",
1821 backup_path="backup_dir")
1823 assert os.path.exists("backup_dir")
1824 shutil.rmtree("source_dir")
1826 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1827 tar_path = os.path.join("backup_dir", tar_filename)
1829 # add symlinks to existing archive
1831 def add_symlink (a, name, dst):
1832 l = tarfile.TarInfo("snapshot://%s" % name)
1833 l.type = tarfile.SYMTYPE
1837 def add_file (a, name):
1838 f = tarfile.TarInfo("snapshot://%s" % name)
1839 f.type = tarfile.REGTYPE
1842 testpath = "symlinks/pernicious-link"
1843 testdst = "/tmp/does/not/exist"
1846 with tarfile.open(tar_path, mode="a") as a:
1847 add_symlink(a, testpath, testdst)
1848 add_symlink(a, testpath, testdst+"X")
1849 add_symlink(a, testpath, testdst+"XXX")
1850 add_file(a, testpath)
1851 except tarfile.ReadError as e:
1852 if self.MODE == '#' or self.MODE.endswith ("gz"):
1856 except ValueError as e:
1857 if self.MODE.startswith ('#'):
1858 pass # O_APPEND of concat archives not feasible
1862 deltatar.restore_backup(target_path="source_dir",
1863 backup_tar_path=tar_path)
1865 # check whether the link was extracted; deltatar seems to only ever
1866 # retrieve the first item it finds for a given path which in the case
1867 # at hand is a symlink to some non-existent path
1868 fullpath = os.path.join("source_dir", testpath)
1869 assert not os.path.exists(fullpath)
1871 class DeltaTar2Test(DeltaTarTest):
1873 Same as DeltaTar but with specific ":" mode
1878 class DeltaTarStreamTest(DeltaTarTest):
1880 Same as DeltaTar but with specific uncompressed stream mode
1885 class DeltaTarGzipTest(DeltaTarTest):
1887 Same as DeltaTar but with specific gzip mode
1890 MODE_COMPRESSES = True
1893 class DeltaTarGzipStreamTest(DeltaTarTest):
1895 Same as DeltaTar but with specific gzip stream mode
1898 MODE_COMPRESSES = True
1901 @skip('Bz2 tests are too slow..')
1902 class DeltaTarBz2Test(DeltaTarTest):
1904 Same as DeltaTar but with specific bz2 mode
1907 MODE_COMPRESSES = True
1910 @skip('Bz2 tests are too slow..')
1911 class DeltaTarBz2StreamTest(DeltaTarTest):
1913 Same as DeltaTar but with specific bz2 stream mode
1916 MODE_COMPRESSES = True
1919 class DeltaTarGzipConcatTest(DeltaTarTest):
1921 Same as DeltaTar but with specific gzip concat stream mode
1924 MODE_COMPRESSES = True
1927 class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
1929 Same as DeltaTar but with specific gzip aes128 concat stream mode
1932 ENCRYPTION = ('some magic key', 1)
1933 MODE_COMPRESSES = True
1936 class DeltaTarAes128ConcatTest(DeltaTarTest):
1938 Same as DeltaTar but with specific aes128 concat stream mode
1941 ENCRYPTION = ('some magic key', 1)