1 # Copyright (C) 2013 Intra2net AG
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU Lesser General Public License as published
5 # by the Free Software Foundation; either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Lesser General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see
15 # <http://www.gnu.org/licenses/lgpl-3.0.html>
17 # Author: Eduardo Robles Elvira <edulix@wadobo.com>
27 from datetime import datetime
28 from functools import partial
29 from unittest import skip, SkipTest
31 import deltatar.tarfile as tarfile
32 from deltatar.tarfile import TarFile
33 from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
34 from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35 import deltatar.crypto as crypto
37 from . import BaseTest
38 from . import new_volume_handler
40 class DeltaTarTest(BaseTest):
45 MODE_COMPRESSES = False
47 ENCRYPTION = None # (password : str, paramversion : int) option
55 self.pwd = os.getcwd()
56 os.system('rm -rf target_dir source_dir* backup_dir* huge')
57 os.makedirs('source_dir/test/test2')
59 self.hash["source_dir/test/test2"] = ''
60 self.hash["source_dir/big"] = self.create_file("source_dir/big", 50000)
61 self.hash["source_dir/small"] = self.create_file("source_dir/small", 100)
62 self.hash["source_dir/test/huge"] = self.create_file("source_dir/test/huge", 700000)
63 self.hash["source_dir/test/huge2"] = self.create_file("source_dir/test/huge2", 800000)
65 self.consoleLogger = logging.StreamHandler()
66 self.consoleLogger.setLevel(logging.DEBUG)
68 if not os.path.isdir(self.GIT_DIR):
69 # Not running inside git tree, take our
70 # own testing directory as source.
71 self.GIT_DIR = 'testing'
73 if not os.path.isdir(self.GIT_DIR):
74 raise Exception('No input directory found: ' + self.GIT_DIR)
78 Remove temporal files created by unit tests and reset globals.
81 os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
82 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
83 ("I am fully aware that this will void my warranty.")
85 def test_restore_simple_full_backup(self):
87 Creates a full backup without any filtering and restores it.
89 password, paramversion = self.ENCRYPTION or (None, None)
90 deltatar = DeltaTar(mode=self.MODE, password=password,
91 crypto_paramversion=paramversion,
92 logger=self.consoleLogger)
95 deltatar.create_full_backup(
96 source_path="source_dir",
97 backup_path="backup_dir")
99 assert os.path.exists("backup_dir")
100 shutil.rmtree("source_dir")
102 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
103 tar_path = os.path.join("backup_dir", tar_filename)
105 deltatar.restore_backup(target_path="source_dir",
106 backup_tar_path=tar_path)
108 for key, value in self.hash.items():
109 assert os.path.exists(key)
111 assert value == self.md5sum(key)
114 def test_create_backup_max_file_length (self):
116 Creates a full backup including one file that exceeds the (purposely
117 lowered) upper bound on GCM encrypted objects. This will yield multiple
118 encrypted objects for one plaintext file.
120 Success is verified by splitting the archive at object boundaries and
123 if self.MODE_COMPRESSES is True:
124 raise SkipTest ("GCM file length test not meaningful with compression.")
125 if self.ENCRYPTION is None:
126 raise SkipTest ("GCM file length applies only to encrypted backups.")
128 new_max = 20000 # cannot be less than tar block size
129 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
130 ("I am fully aware that this will void my warranty.",
133 password, paramversion = self.ENCRYPTION
134 deltatar = DeltaTar (mode=self.MODE, password=password,
135 crypto_paramversion=paramversion,
136 logger=self.consoleLogger)
139 os.makedirs ("source_dir2")
140 for f, s in [("empty" , 0) # 1 tar objects
141 ,("slightly_larger", new_max + 1) # 2
142 ,("twice" , 2 * new_max) # 3
144 f = "source_dir2/%s" % f
145 self.hash [f] = self.create_file (f, s)
147 deltatar.create_full_backup \
148 (source_path="source_dir2", backup_path="backup_dir")
150 assert os.path.exists ("backup_dir")
151 shutil.rmtree ("source_dir2")
153 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
154 backup_path = os.path.join("backup_dir", backup_filename)
156 # split the resulting archive into its constituents without
158 ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
159 "-o backup_dir/split <\'%s\'" % backup_path)
161 assert os.path.exists ("backup_dir/split")
163 dents = os.listdir ("backup_dir/split")
164 assert len (dents) == 6
167 def test_restore_backup_max_file_length (self):
169 Creates a full backup including one file that exceeds the (purposely
170 lowered) upper bound on GCM encrypted objects. This will yield two
171 encrypted objects for one plaintext file.
173 Success is verified by splitting the archive at object boundaries and
176 if self.MODE_COMPRESSES is True:
177 raise SkipTest ("GCM file length test not meaningful with compression.")
178 if self.ENCRYPTION is None:
179 raise SkipTest ("GCM file length applies only to encrypted backups.")
181 new_max = 20000 # cannot be less than tar block size
182 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
183 ("I am fully aware that this will void my warranty.",
186 password, paramversion = self.ENCRYPTION
187 deltatar = DeltaTar (mode=self.MODE, password=password,
188 crypto_paramversion=paramversion,
189 logger=self.consoleLogger)
192 os.makedirs ("source_dir2")
193 for f, s in [("empty" , 0) # 1 tar objects
194 ,("slightly_larger", new_max + 1) # 2
195 ,("twice" , 2 * new_max) # 3
197 f = "source_dir2/%s" % f
198 self.hash [f] = self.create_file (f, s)
200 deltatar.create_full_backup \
201 (source_path="source_dir2", backup_path="backup_dir")
203 assert os.path.exists ("backup_dir")
204 shutil.rmtree ("source_dir2")
206 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
207 backup_path = os.path.join("backup_dir", backup_filename)
209 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
210 tar_path = os.path.join("backup_dir", tar_filename)
212 deltatar.restore_backup(target_path="source_dir2",
213 backup_tar_path=tar_path)
215 for key, value in self.hash.items():
216 assert os.path.exists(key)
218 assert value == self.md5sum(key)
221 def test_check_index_checksum(self):
223 Creates a full backup and checks the index' checksum of files
225 password, paramversion = self.ENCRYPTION or (None, None)
226 deltatar = DeltaTar(mode=self.MODE, password=password,
227 crypto_paramversion=paramversion,
228 logger=self.consoleLogger)
230 # create first backup
231 deltatar.create_full_backup(
232 source_path="source_dir",
233 backup_path="backup_dir")
236 index_filename = deltatar.index_name_func(True)
237 index_path = os.path.join("backup_dir", index_filename)
239 f = open(index_path, 'rb')
247 if b'BEGIN-FILE-LIST' in l:
248 crc = binascii.crc32(l) & 0xFFFFffff
250 elif b'END-FILE-LIST' in l:
251 crc = binascii.crc32(l, crc) & 0xffffffff
253 # next line contains the crc
254 data = json.loads(f.readline().decode("UTF-8"))
255 assert data['type'] == 'file-list-checksum'
256 assert data['checksum'] == crc
260 crc = binascii.crc32(l, crc) & 0xffffffff
264 def test_restore_multivol(self):
266 Creates a full backup without any filtering with multiple volumes and
269 if ':gz' in self.MODE:
270 raise SkipTest('compression information is lost when creating '
271 'multiple volumes with no Stream')
273 password, paramversion = self.ENCRYPTION or (None, None)
274 deltatar = DeltaTar(mode=self.MODE, password=password,
275 crypto_paramversion=paramversion,
276 logger=self.consoleLogger)
279 os.makedirs('source_dir2')
280 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
281 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
283 # create first backup
284 deltatar.create_full_backup(
285 source_path="source_dir2",
286 backup_path="backup_dir",
289 assert os.path.exists("backup_dir")
290 assert os.path.exists(os.path.join("backup_dir",
291 deltatar.volume_name_func("backup_dir", True, 0)))
292 if self.MODE_COMPRESSES:
296 for i_vol in range(n_vols):
297 assert os.path.exists(os.path.join("backup_dir",
298 deltatar.volume_name_func("backup_dir", True, i_vol)))
299 assert not os.path.exists(os.path.join("backup_dir",
300 deltatar.volume_name_func("backup_dir", True, n_vols)))
302 shutil.rmtree("source_dir2")
304 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
305 tar_path = os.path.join("backup_dir", tar_filename)
307 # this should automatically restore all volumes
308 deltatar.restore_backup(target_path="source_dir2",
309 backup_tar_path=tar_path)
311 for key, value in self.hash.items():
312 assert os.path.exists(key)
314 assert value == self.md5sum(key)
316 def test_restore_multivol_split(self):
318 Creates a full backup without any filtering with multiple volumes
319 with big files bigger than the max volume size and
322 if self.MODE.startswith(':') or self.MODE.startswith('|'):
323 raise SkipTest('this test only works for uncompressed '
324 'or concat compressed modes')
326 password, paramversion = self.ENCRYPTION or (None, None)
327 deltatar = DeltaTar(mode=self.MODE, password=password,
328 crypto_paramversion=paramversion,
329 logger=self.consoleLogger)
332 os.makedirs('source_dir2')
333 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 3*1024*1024)
334 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 4*1024*1024)
335 self.hash["source_dir2/huge2"] = self.create_file("source_dir2/huge2", 4*1024*1024)
337 # create first backup
338 deltatar.create_full_backup(
339 source_path="source_dir2",
340 backup_path="backup_dir",
343 assert os.path.exists("backup_dir")
344 assert os.path.exists(os.path.join("backup_dir",
345 deltatar.volume_name_func("backup_dir", True, 0)))
346 if self.MODE_COMPRESSES:
350 for i_vol in range(n_vols):
351 assert os.path.exists(os.path.join("backup_dir",
352 deltatar.volume_name_func("backup_dir", True, i_vol)))
353 assert not os.path.exists(os.path.join("backup_dir",
354 deltatar.volume_name_func("backup_dir", True, n_vols)))
356 shutil.rmtree("source_dir2")
358 index_filename = deltatar.index_name_func(True)
359 index_path = os.path.join("backup_dir", index_filename)
361 deltatar.restore_backup(target_path="source_dir2",
362 backup_indexes_paths=[index_path])
364 for key, value in self.hash.items():
365 assert os.path.exists(key)
367 assert value == self.md5sum(key)
370 def test_full_backup_index_extra_data(self):
372 Tests that the index file for a full backup can store extra_data and
373 that this data can be retrieved.
375 password, paramversion = self.ENCRYPTION or (None, None)
376 deltatar = DeltaTar(mode=self.MODE, password=password,
377 crypto_paramversion=paramversion,
378 logger=self.consoleLogger)
382 otra_cosa=[1, "lista"],
383 y_otra=dict(bola=1.1)
386 deltatar.create_full_backup(
387 source_path="source_dir",
388 backup_path="backup_dir",
389 extra_data=extra_data)
391 index_filename = deltatar.index_name_func(is_full=True)
392 index_path = os.path.join("backup_dir", index_filename)
394 # iterate_index_path retrieves extra_data, and thus we can then compare
395 index_it = deltatar.iterate_index_path(index_path)
396 self.assertEqual(index_it.extra_data, extra_data)
399 def test_diff_backup_index_extra_data(self):
401 Tests that the index file for a diff backup can store extra_data and
402 that this data can be retrieved.
404 password, paramversion = self.ENCRYPTION or (None, None)
405 deltatar = DeltaTar(mode=self.MODE, password=password,
406 crypto_paramversion=paramversion,
407 logger=self.consoleLogger)
411 otra_cosa=[1, "lista"],
412 y_otra=dict(bola=1.1)
415 deltatar.create_full_backup(
416 source_path="source_dir",
417 backup_path="backup_dir")
420 prev_index_filename = deltatar.index_name_func(is_full=True)
421 prev_index_path = os.path.join("backup_dir", prev_index_filename)
423 # create empty diff backup
424 deltatar.create_diff_backup("source_dir", "backup_dir2",
425 prev_index_path, extra_data=extra_data)
427 index_filename = deltatar.index_name_func(is_full=False)
428 index_path = os.path.join("backup_dir2", index_filename)
430 # iterate_index_path retrieves extra_data, and thus we can then compare
431 index_it = deltatar.iterate_index_path(index_path)
432 self.assertEqual(index_it.extra_data, extra_data)
434 def test_restore_multivol2(self):
436 Creates a full backup without any filtering with multiple volumes and
439 password, paramversion = self.ENCRYPTION or (None, None)
440 deltatar = DeltaTar(mode=self.MODE, password=password,
441 crypto_paramversion=paramversion,
442 logger=self.consoleLogger)
444 shutil.copytree(self.GIT_DIR, "source_dir2")
446 # create first backup
447 deltatar.create_full_backup(
448 source_path="source_dir2",
449 backup_path="backup_dir",
452 assert os.path.exists("backup_dir")
453 assert os.path.exists(os.path.join("backup_dir",
454 deltatar.volume_name_func("backup_dir", True, 0)))
456 shutil.rmtree("source_dir2")
458 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
459 tar_path = os.path.join("backup_dir", tar_filename)
461 # this should automatically restore all volumes
462 deltatar.restore_backup(target_path="source_dir2",
463 backup_tar_path=tar_path)
465 self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
467 def test_restore_multivol_manual_from_index(self):
469 Creates a full backup without any filtering with multiple volumes and
472 # this test only works for uncompressed or concat compressed modes
473 if self.MODE.startswith(':') or self.MODE.startswith('|'):
474 raise SkipTest('this test only works for uncompressed '
475 'or concat compressed modes')
477 password, paramversion = self.ENCRYPTION or (None, None)
478 deltatar = DeltaTar(mode=self.MODE, password=password,
479 crypto_paramversion=paramversion,
480 logger=self.consoleLogger)
483 os.makedirs('source_dir2')
484 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
485 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
487 # create first backup
488 deltatar.create_full_backup(
489 source_path="source_dir2",
490 backup_path="backup_dir",
493 assert os.path.exists("backup_dir")
494 assert os.path.exists(os.path.join("backup_dir",
495 deltatar.volume_name_func("backup_dir", True, 0)))
496 if self.MODE_COMPRESSES:
500 for i_vol in range(n_vols):
501 assert os.path.exists(os.path.join("backup_dir",
502 deltatar.volume_name_func("backup_dir", True, i_vol)))
503 assert not os.path.exists(os.path.join("backup_dir",
504 deltatar.volume_name_func("backup_dir", True, n_vols)))
506 shutil.rmtree("source_dir2")
508 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
509 tar_path = os.path.join("backup_dir", tar_filename)
511 index_filename = deltatar.index_name_func(True)
512 index_path = os.path.join("backup_dir", index_filename)
514 # this should automatically restore the huge file
515 f = deltatar.open_auxiliary_file(index_path, 'r')
521 data = json.loads(l.decode('UTF-8'))
522 if data.get('type', '') == 'file' and\
523 deltatar.unprefixed(data['path']) == "huge":
524 offset = data['offset']
527 assert offset is not None
529 fo = open(tar_path, 'rb')
531 def new_volume_handler(mode, tarobj, base_name, volume_number):
532 suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
533 if self.ENCRYPTION is not None:
534 # deltatar module is shadowed here
535 suf += "." + deltatar_PDTCRYPT_EXTENSION
536 tarobj.open_volume(datetime.now().strftime(
537 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
538 new_volume_handler = partial(new_volume_handler, self.MODE)
541 if self.ENCRYPTION is not None:
542 crypto_ctx = crypto.Decrypt (password)
544 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
545 new_volume_handler=new_volume_handler,
546 encryption=crypto_ctx)
548 member = tarobj.next()
549 member.path = deltatar.unprefixed(member.path)
550 member.name = deltatar.unprefixed(member.name)
551 tarobj.extract(member)
554 assert self.hash['source_dir2/huge'] == self.md5sum('huge')
559 def test_restore_manual_from_index_twice (self):
561 Creates a full backup and restore the same file twice. This *must* fail
562 when encryption is active.
564 Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
565 backwards within the same file. This prevents the encryption layer from
566 exploding due to a reused IV in an overall valid archive.
568 This test anticipates possible future mistakes since it’s entirely
569 feasible to implement backward seeks for *_Stream* with concat mode.
571 # this test only works for uncompressed or concat compressed modes
572 if self.MODE.startswith("|") or self.MODE_COMPRESSES:
573 raise SkipTest("this test only works for uncompressed "
574 "or concat compressed modes")
576 password, paramversion = self.ENCRYPTION or (None, None)
577 deltatar = DeltaTar(mode=self.MODE, password=password,
578 crypto_paramversion=paramversion,
579 logger=self.consoleLogger)
582 os.makedirs("source_dir2")
583 self.hash["source_dir2/samefile"] = \
584 self.create_file("source_dir2/samefile", 1 * 1024)
586 # create first backup
587 deltatar.create_full_backup(
588 source_path="source_dir2",
589 backup_path="backup_dir")
591 assert os.path.exists("backup_dir")
592 assert os.path.exists(os.path.join("backup_dir",
593 deltatar.volume_name_func("backup_dir", True, 0)))
595 shutil.rmtree("source_dir2")
597 tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
598 tar_path = os.path.join("backup_dir", tar_filename)
600 index_filename = deltatar.index_name_func(True)
601 index_path = os.path.join("backup_dir", index_filename)
603 f = deltatar.open_auxiliary_file(index_path, "r")
609 data = json.loads(l.decode("UTF-8"))
610 if data.get("type", "") == "file" and\
611 deltatar.unprefixed(data["path"]) == "samefile":
612 offset = data["offset"]
615 assert offset is not None
617 fo = open(tar_path, "rb")
621 if self.ENCRYPTION is not None:
622 crypto_ctx = crypto.Decrypt (password)
624 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
625 encryption=crypto_ctx)
626 member = tarobj.next()
627 member.path = deltatar.unprefixed(member.path)
628 member.name = deltatar.unprefixed(member.name)
631 tarobj.extract(member)
632 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
636 tarobj.extract(member)
637 except tarfile.StreamError:
638 if crypto_ctx is not None:
639 pass # good: seeking backwards not allowed
644 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
646 os.unlink("samefile")
649 def test_restore_from_index(self):
651 Restores a full backup using an index file.
653 if self.MODE.startswith(':') or self.MODE.startswith('|'):
654 raise SkipTest('this test only works for uncompressed '
655 'or concat compressed modes')
657 password, paramversion = self.ENCRYPTION or (None, None)
658 deltatar = DeltaTar(mode=self.MODE, password=password,
659 crypto_paramversion=paramversion,
660 logger=self.consoleLogger)
662 # create first backup
663 deltatar.create_full_backup(
664 source_path="source_dir",
665 backup_path="backup_dir")
667 shutil.rmtree("source_dir")
669 # this should automatically restore all volumes
670 index_filename = deltatar.index_name_func(True)
671 index_path = os.path.join("backup_dir", index_filename)
673 deltatar.restore_backup(target_path="source_dir",
674 backup_indexes_paths=[index_path])
676 for key, value in self.hash.items():
677 assert os.path.exists(key)
679 assert value == self.md5sum(key)
681 def test_restore_multivol_from_index(self):
683 Restores a full multivolume backup using an index file.
685 if self.MODE.startswith(':') or self.MODE.startswith('|'):
686 raise SkipTest('this test only works for uncompressed '
687 'or concat compressed modes')
689 password, paramversion = self.ENCRYPTION or (None, None)
690 deltatar = DeltaTar(mode=self.MODE, password=password,
691 crypto_paramversion=paramversion,
692 logger=self.consoleLogger)
694 # create first backup
695 deltatar.create_full_backup(
696 source_path="source_dir",
697 backup_path="backup_dir",
700 shutil.rmtree("source_dir")
702 # this should automatically restore all volumes
703 index_filename = deltatar.index_name_func(True)
704 index_path = os.path.join("backup_dir", index_filename)
706 deltatar.restore_backup(target_path="source_dir",
707 backup_indexes_paths=[index_path])
709 for key, value in self.hash.items():
710 assert os.path.exists(key)
712 assert value == self.md5sum(key)
714 def test_create_basic_filtering(self):
716 Tests create backup basic filtering.
718 password, paramversion = self.ENCRYPTION or (None, None)
719 deltatar = DeltaTar(mode=self.MODE, password=password,
720 crypto_paramversion=paramversion,
721 logger=self.consoleLogger,
722 included_files=["test", "small"],
723 excluded_files=["test/huge"])
725 # create first backup
726 deltatar.create_full_backup(
727 source_path="source_dir",
728 backup_path="backup_dir")
730 assert os.path.exists("backup_dir")
731 shutil.rmtree("source_dir")
733 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
734 tar_path = os.path.join("backup_dir", tar_filename)
736 deltatar.restore_backup(target_path="source_dir",
737 backup_tar_path=tar_path)
739 assert os.path.exists("source_dir/small")
740 assert os.path.exists("source_dir/test")
741 assert os.path.exists("source_dir/test/huge2")
742 assert os.path.exists("source_dir/test/test2")
744 assert not os.path.exists("source_dir/test/huge")
745 assert not os.path.exists("source_dir/big")
747 def test_create_filter_func(self):
749 Tests create backup basic filtering.
752 def filter_func(visited_paths, path):
753 if path not in visited_paths:
754 visited_paths.append(path)
757 filter_func = partial(filter_func, visited_paths)
759 password, paramversion = self.ENCRYPTION or (None, None)
760 deltatar = DeltaTar(mode=self.MODE, password=password,
761 crypto_paramversion=paramversion,
762 logger=self.consoleLogger,
763 included_files=["test", "small"],
764 excluded_files=["test/huge"],
765 filter_func=filter_func)
767 # create first backup
768 deltatar.create_full_backup(
769 source_path="source_dir",
770 backup_path="backup_dir")
772 assert os.path.exists("backup_dir")
773 shutil.rmtree("source_dir")
775 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
776 tar_path = os.path.join("backup_dir", tar_filename)
778 deltatar.restore_backup(target_path="source_dir",
779 backup_tar_path=tar_path)
780 assert set(visited_paths) == set([
787 def test_create_filter_out_func(self):
789 Tests create backup basic filtering.
792 def filter_func(visited_paths, path):
794 Filter out everything
796 if path not in visited_paths:
797 visited_paths.append(path)
800 filter_func = partial(filter_func, visited_paths)
802 password, paramversion = self.ENCRYPTION or (None, None)
803 deltatar = DeltaTar(mode=self.MODE, password=password,
804 crypto_paramversion=paramversion,
805 logger=self.consoleLogger,
806 included_files=["test", "small"],
807 excluded_files=["test/huge"],
808 filter_func=filter_func)
810 # create first backup
811 deltatar.create_full_backup(
812 source_path="source_dir",
813 backup_path="backup_dir")
815 assert os.path.exists("backup_dir")
816 shutil.rmtree("source_dir")
818 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
819 tar_path = os.path.join("backup_dir", tar_filename)
821 deltatar.restore_backup(target_path="source_dir",
822 backup_tar_path=tar_path)
823 assert set(visited_paths) == set([
828 # check that effectively no file was backed up
829 assert not os.path.exists("source_dir/small")
830 assert not os.path.exists("source_dir/big")
831 assert not os.path.exists("source_dir/test")
833 def test_restore_index_basic_filtering(self):
835 Creates a backup, and then filter when doing the index based restore.
837 if self.MODE.startswith(':') or self.MODE.startswith('|'):
838 raise SkipTest('this test only works for uncompressed '
839 'or concat compressed modes')
841 password, paramversion = self.ENCRYPTION or (None, None)
842 deltatar = DeltaTar(mode=self.MODE, password=password,
843 crypto_paramversion=paramversion,
844 logger=self.consoleLogger)
846 # create first backup
847 deltatar.create_full_backup(
848 source_path="source_dir",
849 backup_path="backup_dir")
851 assert os.path.exists("backup_dir")
852 shutil.rmtree("source_dir")
854 index_filename = deltatar.index_name_func(True)
855 index_path = os.path.join("backup_dir", index_filename)
857 deltatar.included_files = ["test", "small"]
858 deltatar.excluded_files = ["test/huge"]
859 deltatar.restore_backup(target_path="source_dir",
860 backup_indexes_paths=[index_path])
862 assert os.path.exists("source_dir/small")
863 assert os.path.exists("source_dir/test")
864 assert os.path.exists("source_dir/test/huge2")
865 assert os.path.exists("source_dir/test/test2")
867 assert not os.path.exists("source_dir/test/huge")
868 assert not os.path.exists("source_dir/big")
870 def test_restore_index_filter_func(self):
872 Creates a backup, and then filter when doing the index based restore,
873 using the filter function.
875 if self.MODE.startswith(':') or self.MODE.startswith('|'):
876 raise SkipTest('this test only works for uncompressed '
877 'or concat compressed modes')
880 def filter_func(visited_paths, path):
881 if path not in visited_paths:
882 visited_paths.append(path)
885 filter_func = partial(filter_func, visited_paths)
887 password, paramversion = self.ENCRYPTION or (None, None)
888 deltatar = DeltaTar(mode=self.MODE, password=password,
889 crypto_paramversion=paramversion,
890 logger=self.consoleLogger)
892 # create first backup
893 deltatar.create_full_backup(
894 source_path="source_dir",
895 backup_path="backup_dir")
897 assert os.path.exists("backup_dir")
898 shutil.rmtree("source_dir")
900 index_filename = deltatar.index_name_func(True)
901 index_path = os.path.join("backup_dir", index_filename)
903 deltatar.included_files = ["test", "small"]
904 deltatar.excluded_files = ["test/huge"]
905 deltatar.filter_func = filter_func
906 deltatar.restore_backup(target_path="source_dir",
907 backup_indexes_paths=[index_path])
909 assert set(visited_paths) == set([
916 def test_restore_tar_basic_filtering(self):
918 Creates a backup, and then filter when doing the tar based restore.
920 password, paramversion = self.ENCRYPTION or (None, None)
921 deltatar = DeltaTar(mode=self.MODE, password=password,
922 crypto_paramversion=paramversion,
923 logger=self.consoleLogger)
925 # create first backup
926 deltatar.create_full_backup(
927 source_path="source_dir",
928 backup_path="backup_dir")
930 assert os.path.exists("backup_dir")
931 shutil.rmtree("source_dir")
933 deltatar.included_files = ["test", "small"]
934 deltatar.excluded_files = ["test/huge"]
936 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
937 tar_path = os.path.join("backup_dir", tar_filename)
939 deltatar.restore_backup(target_path="source_dir",
940 backup_tar_path=tar_path)
942 assert os.path.exists("source_dir/small")
943 assert os.path.exists("source_dir/test")
944 assert os.path.exists("source_dir/test/huge2")
945 assert os.path.exists("source_dir/test/test2")
947 assert not os.path.exists("source_dir/test/huge")
948 assert not os.path.exists("source_dir/big")
950 def test_restore_tar_filter_func(self):
952 Creates a backup, and then filter when doing the tar based restore,
953 using the filter function.
956 def filter_func(visited_paths, path):
957 if path not in visited_paths:
958 visited_paths.append(path)
961 filter_func = partial(filter_func, visited_paths)
963 password, paramversion = self.ENCRYPTION or (None, None)
964 deltatar = DeltaTar(mode=self.MODE, password=password,
965 crypto_paramversion=paramversion,
966 logger=self.consoleLogger)
968 # create first backup
969 deltatar.create_full_backup(
970 source_path="source_dir",
971 backup_path="backup_dir")
973 assert os.path.exists("backup_dir")
974 shutil.rmtree("source_dir")
976 index_filename = deltatar.index_name_func(True)
977 index_path = os.path.join("backup_dir", index_filename)
979 deltatar.included_files = ["test", "small"]
980 deltatar.excluded_files = ["test/huge"]
981 deltatar.filter_func = filter_func
983 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
984 tar_path = os.path.join("backup_dir", tar_filename)
986 deltatar.restore_backup(target_path="source_dir",
987 backup_tar_path=tar_path)
988 assert set(visited_paths) == set([
995 def test_filter_path_regexp(self):
997 Test specifically the deltatar.filter_path function with regular
1001 re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1002 re.compile('^yes$'),
1006 re.compile('^testing/in_the'),
1008 deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1009 excluded_files=excluded_files)
1011 # assert valid and invalid paths
1012 assert deltatar.filter_path('test/hola')
1013 assert deltatar.filter_path('test/hola/any/thing')
1014 assert deltatar.filter_path('test/caracola/caracolero')
1015 assert deltatar.filter_path('test/caracola/caracolero/yeah')
1016 assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1017 assert deltatar.filter_path('yes')
1018 assert deltatar.filter_path('testing')
1019 assert deltatar.filter_path('testing/yes')
1020 assert deltatar.filter_path('testing/in_th')
1022 assert not deltatar.filter_path('something')
1023 assert not deltatar.filter_path('other/thing')
1024 assert not deltatar.filter_path('test_ing')
1025 assert not deltatar.filter_path('test/hola_lala')
1026 assert not deltatar.filter_path('test/agur')
1027 assert not deltatar.filter_path('testing_something')
1028 assert not deltatar.filter_path('yeso')
1029 assert not deltatar.filter_path('yes/o')
1030 assert not deltatar.filter_path('yes_o')
1031 assert not deltatar.filter_path('testing/in_the')
1032 assert not deltatar.filter_path('testing/in_the_field')
1033 assert not deltatar.filter_path('testing/in_the/field')
1035 def test_filter_path_parent(self):
1037 Test specifically the deltatar.filter_path function for parent matching
1040 'testing/path/to/some/thing'
1042 deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1044 # assert valid and invalid paths
1045 assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1046 assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1047 assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1048 assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1049 assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1050 assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1051 assert deltatar.filter_path('testing/something/else') == NO_MATCH
1053 def test_parent_matching_simple_full_backup(self):
1055 Create a full backup using parent matching
1061 password, paramversion = self.ENCRYPTION or (None, None)
1062 deltatar = DeltaTar(mode=self.MODE, password=password,
1063 crypto_paramversion=paramversion,
1064 logger=self.consoleLogger,
1065 included_files=included_files)
1067 # create first backup
1068 deltatar.create_full_backup(
1069 source_path="source_dir",
1070 backup_path="backup_dir")
1072 assert os.path.exists("backup_dir")
1073 shutil.rmtree("source_dir")
1075 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1076 tar_path = os.path.join("backup_dir", tar_filename)
1078 deltatar = DeltaTar(mode=self.MODE, password=password,
1079 logger=self.consoleLogger)
1080 deltatar.restore_backup(target_path="source_dir",
1081 backup_tar_path=tar_path)
1083 assert os.path.exists('source_dir/test/huge2')
1084 assert os.path.exists('source_dir/test/')
1085 assert not os.path.exists('source_dir/test/huge')
1086 assert not os.path.exists('source_dir/big')
1087 assert not os.path.exists('source_dir/small')
1089 def test_parent_matching_simple_full_backup_restore(self):
1091 Create a full backup and restores it using parent matching
1097 password, paramversion = self.ENCRYPTION or (None, None)
1098 deltatar = DeltaTar(mode=self.MODE, password=password,
1099 crypto_paramversion=paramversion,
1100 logger=self.consoleLogger)
1102 # create first backup
1103 deltatar.create_full_backup(
1104 source_path="source_dir",
1105 backup_path="backup_dir")
1107 assert os.path.exists("backup_dir")
1108 shutil.rmtree("source_dir")
1110 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1111 tar_path = os.path.join("backup_dir", tar_filename)
1113 deltatar = DeltaTar(mode=self.MODE, password=password,
1114 logger=self.consoleLogger,
1115 included_files=included_files)
1116 deltatar.restore_backup(target_path="source_dir",
1117 backup_tar_path=tar_path)
1119 assert os.path.exists('source_dir/test/huge2')
1120 assert os.path.exists('source_dir/test/')
1121 assert not os.path.exists('source_dir/test/huge')
1122 assert not os.path.exists('source_dir/big')
1123 assert not os.path.exists('source_dir/small')
1125 def test_parent_matching_index_full_backup_restore(self):
1127 Create a full backup and restores it using parent matching
1133 password, paramversion = self.ENCRYPTION or (None, None)
1134 deltatar = DeltaTar(mode=self.MODE, password=password,
1135 crypto_paramversion=paramversion,
1136 logger=self.consoleLogger)
1138 # create first backup
1139 deltatar.create_full_backup(
1140 source_path="source_dir",
1141 backup_path="backup_dir")
1143 assert os.path.exists("backup_dir")
1144 shutil.rmtree("source_dir")
1146 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1147 tar_path = os.path.join("backup_dir", tar_filename)
1149 deltatar = DeltaTar(mode=self.MODE, password=password,
1150 logger=self.consoleLogger,
1151 included_files=included_files)
1152 deltatar.restore_backup(target_path="source_dir",
1153 backup_tar_path=tar_path)
1155 assert os.path.exists('source_dir/test/huge2')
1156 assert os.path.exists('source_dir/test/')
1157 assert not os.path.exists('source_dir/test/huge')
1158 assert not os.path.exists('source_dir/big')
1159 assert not os.path.exists('source_dir/small')
1161 def test_collate_iterators(self):
1163 Tests the collate iterators functionality with two exact directories,
1164 using an index iterator from a backup and the exact same source dir.
1166 password, paramversion = self.ENCRYPTION or (None, None)
1167 deltatar = DeltaTar(mode=self.MODE, password=password,
1168 crypto_paramversion=paramversion,
1169 logger=self.consoleLogger)
1171 # create first backup
1172 deltatar.create_full_backup(
1173 source_path="source_dir",
1174 backup_path="backup_dir")
1176 assert os.path.exists("backup_dir")
1179 index_filename = deltatar.index_name_func(is_full=True)
1180 index_path = os.path.join(cwd, "backup_dir", index_filename)
1181 index_it = deltatar.iterate_index_path(index_path)
1183 os.chdir('source_dir')
1184 dir_it = deltatar._recursive_walk_dir('.')
1185 path_it = deltatar.jsonize_path_iterator(dir_it)
1188 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1189 assert deltatar._equal_stat_dicts(path1, path2)
1193 def test_collate_iterators_diffdirs(self):
1195 Use the collate iterators functionality with two different directories.
1196 It must behave in an expected way.
1198 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1200 password, paramversion = self.ENCRYPTION or (None, None)
1201 deltatar = DeltaTar(mode=self.MODE, password=password,
1202 crypto_paramversion=paramversion,
1203 logger=self.consoleLogger)
1205 # create first backup
1206 deltatar.create_full_backup(
1207 source_path="source_dir",
1208 backup_path="backup_dir")
1210 assert os.path.exists("backup_dir")
1211 self.hash["source_dir/z"] = self.create_file("source_dir/z", 100)
1214 index_filename = deltatar.index_name_func(is_full=True)
1215 index_path = os.path.join(cwd, "backup_dir", index_filename)
1216 index_it = deltatar.iterate_index_path(index_path)
1218 os.chdir('source_dir')
1219 dir_it = deltatar._recursive_walk_dir('.')
1220 path_it = deltatar.jsonize_path_iterator(dir_it)
1223 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1224 if path2['path'] == 'z':
1227 assert deltatar._equal_stat_dicts(path1, path2)
1231 def test_collate_iterators_diffdirs2(self):
1233 Use the collate iterators functionality with two different directories.
1234 It must behave in an expected way.
1236 password, paramversion = self.ENCRYPTION or (None, None)
1237 deltatar = DeltaTar(mode=self.MODE, password=password,
1238 crypto_paramversion=paramversion,
1239 logger=self.consoleLogger)
1241 # create first backup
1242 deltatar.create_full_backup(
1243 source_path="source_dir",
1244 backup_path="backup_dir")
1246 assert os.path.exists("backup_dir")
1248 # add some new files and directories
1249 os.makedirs('source_dir/bigdir')
1250 self.hash["source_dir/bigdir"] = ""
1251 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1252 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1253 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1256 index_filename = deltatar.index_name_func(is_full=True)
1257 index_path = os.path.join(cwd, "backup_dir", index_filename)
1258 index_it = deltatar.iterate_index_path(index_path)
1260 os.chdir('source_dir')
1261 dir_it = deltatar._recursive_walk_dir('.')
1262 path_it = deltatar.jsonize_path_iterator(dir_it)
1267 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1268 visited_pairs.append(
1269 (deltatar.unprefixed(path1['path']) if path1 else None,
1270 path2['path'] if path2 else None)
1273 assert visited_pairs == [
1276 (u'small', u'small'),
1279 (None, u'bigdir/a'),
1280 (None, u'bigdir/b'),
1281 (u'test/huge', u'test/huge'),
1282 (u'test/huge2', u'test/huge2'),
1283 (u'test/test2', u'test/test2'),
1287 def test_create_empty_diff_backup(self):
1289 Creates an empty (no changes) backup diff
1291 password, paramversion = self.ENCRYPTION or (None, None)
1292 deltatar = DeltaTar(mode=self.MODE, password=password,
1293 crypto_paramversion=paramversion,
1294 logger=self.consoleLogger)
1296 # create first backup
1297 deltatar.create_full_backup(
1298 source_path="source_dir",
1299 backup_path="backup_dir")
1301 prev_index_filename = deltatar.index_name_func(is_full=True)
1302 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1304 deltatar.create_diff_backup("source_dir", "backup_dir2",
1308 index_path = os.path.join("backup_dir2",
1309 deltatar.index_name_func(is_full=False))
1310 index_it = deltatar.iterate_index_path(index_path)
1314 assert i[0]['path'].startswith("list://")
1318 # check the tar file
1319 assert os.path.exists("backup_dir2")
1320 shutil.rmtree("source_dir")
1322 tar_filename = deltatar.volume_name_func('backup_dir2',
1323 is_full=False, volume_number=0)
1324 tar_path = os.path.join("backup_dir2", tar_filename)
1326 # no file restored, because the diff was empty
1327 deltatar.restore_backup(target_path="source_dir",
1328 backup_tar_path=tar_path)
1329 assert len(os.listdir("source_dir")) == 0
1332 def test_create_diff_backup1(self):
1334 Creates a diff backup when there are new files
1336 password, paramversion = self.ENCRYPTION or (None, None)
1337 deltatar = DeltaTar(mode=self.MODE, password=password,
1338 crypto_paramversion=paramversion,
1339 logger=self.consoleLogger)
1341 # create first backup
1342 deltatar.create_full_backup(
1343 source_path="source_dir",
1344 backup_path="backup_dir")
1346 prev_index_filename = deltatar.index_name_func(is_full=True)
1347 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1349 # add some new files and directories
1350 os.makedirs('source_dir/bigdir')
1351 self.hash["source_dir/bigdir"] = ""
1352 os.unlink("source_dir/small")
1353 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1354 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1355 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1357 deltatar.create_diff_backup("source_dir", "backup_dir2",
1361 index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
1362 index_it = deltatar.iterate_index_path(index_path)
1363 l = [i[0]['path'] for i in index_it]
1367 'snapshot://bigdir',
1371 'snapshot://bigdir/a',
1372 'snapshot://bigdir/b',
1374 'list://test/huge2',
1375 'list://test/test2',
1378 # check the tar file
1379 assert os.path.exists("backup_dir2")
1380 shutil.rmtree("source_dir")
1382 # create source_dir with the small file, that will be then deleted by
1383 # the restore_backup
1384 os.mkdir("source_dir")
1385 open("source_dir/small", 'wb').close()
1387 tar_filename = deltatar.volume_name_func('backup_dir2',
1388 is_full=False, volume_number=0)
1389 tar_path = os.path.join("backup_dir2", tar_filename)
1391 # restore the backup, this will create only the new files
1392 deltatar.restore_backup(target_path="source_dir",
1393 backup_tar_path=tar_path)
1394 # the order doesn't matter
1395 assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
1397 def test_restore_from_index_diff_backup(self):
1399 Creates a full backup, modifies some files, creates a diff backup,
1400 then restores the diff backup from zero.
1402 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1403 raise SkipTest('this test only works for uncompressed '
1404 'or concat compressed modes')
1406 password, paramversion = self.ENCRYPTION or (None, None)
1407 deltatar = DeltaTar(mode=self.MODE, password=password,
1408 crypto_paramversion=paramversion,
1409 logger=self.consoleLogger)
1411 # create first backup
1412 deltatar.create_full_backup(
1413 source_path="source_dir",
1414 backup_path="backup_dir")
1416 prev_index_filename = deltatar.index_name_func(is_full=True)
1417 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1419 # add some new files and directories
1420 os.makedirs('source_dir/bigdir')
1421 self.hash["source_dir/bigdir"] = ""
1422 os.unlink("source_dir/small")
1423 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1424 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1425 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1427 deltatar.create_diff_backup("source_dir", "backup_dir2",
1430 # apply diff backup in target_dir
1431 index_filename = deltatar.index_name_func(is_full=False)
1432 index_path = os.path.join("backup_dir2", index_filename)
1433 deltatar.restore_backup("target_dir",
1434 backup_indexes_paths=[index_path, prev_index_path])
1436 # then compare the two directories source_dir and target_dir and check
1438 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1440 def test_restore_from_index_diff_backup2(self):
1442 Creates a full backup, modifies some files, creates a diff backup,
1443 then restores the diff backup with the full backup as a starting point.
1445 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1446 raise SkipTest('this test only works for uncompressed '
1447 'or concat compressed modes')
1449 password, paramversion = self.ENCRYPTION or (None, None)
1450 deltatar = DeltaTar(mode=self.MODE, password=password,
1451 crypto_paramversion=paramversion,
1452 logger=self.consoleLogger)
1454 # create first backup
1455 deltatar.create_full_backup(
1456 source_path="source_dir",
1457 backup_path="backup_dir")
1459 prev_index_filename = deltatar.index_name_func(is_full=True)
1460 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1462 # add some new files and directories
1463 os.makedirs('source_dir/bigdir')
1464 self.hash["source_dir/bigdir"] = ""
1465 os.unlink("source_dir/small")
1466 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1467 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1468 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1469 shutil.rmtree("source_dir/test")
1471 deltatar.create_diff_backup("source_dir", "backup_dir2",
1474 # first restore initial backup in target_dir
1475 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1476 tar_path = os.path.join("backup_dir", tar_filename)
1477 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1479 # then apply diff backup in target_dir
1480 index_filename = deltatar.index_name_func(is_full=False)
1481 index_path = os.path.join("backup_dir2", index_filename)
1482 deltatar.restore_backup("target_dir",
1483 backup_indexes_paths=[index_path, prev_index_path])
1485 # then compare the two directories source_dir and target_dir and check
1487 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1489 def test_restore_from_index_diff_backup3(self):
1491 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1492 diff backup, then restores the diff backup with the full backup as a
1495 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1496 raise SkipTest('this test only works for uncompressed '
1497 'or concat compressed modes')
1499 password, paramversion = self.ENCRYPTION or (None, None)
1500 deltatar = DeltaTar(mode=self.MODE, password=password,
1501 crypto_paramversion=paramversion,
1502 logger=self.consoleLogger)
1504 shutil.rmtree("source_dir")
1505 shutil.copytree(self.GIT_DIR, "source_dir")
1506 shutil.copytree(self.GIT_DIR, "source_dir_diff")
1508 # create first backup
1509 deltatar.create_full_backup(
1510 source_path="source_dir",
1511 backup_path="backup_dir")
1513 prev_index_filename = deltatar.index_name_func(is_full=True)
1514 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1516 # alter the source_dir randomly
1517 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1519 for path in source_it:
1520 # if path doesn't exist (might have previously removed) ignore it.
1521 # also ignore it (i.e. do not change it) 70% of the time
1522 if not os.path.exists(path) or random.random() < 0.7:
1526 if os.path.isdir(path):
1531 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1534 # first restore initial backup in target_dir
1535 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1536 tar_path = os.path.join("backup_dir", tar_filename)
1537 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1539 # and check that target_dir equals to source_dir (which is the same as
1540 # self.GIT_DIR initially)
1541 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1543 # then apply diff backup in target_dir
1544 index_filename = deltatar.index_name_func(is_full=False)
1545 index_path = os.path.join("backup_dir2", index_filename)
1546 deltatar.restore_backup("target_dir",
1547 backup_indexes_paths=[index_path, prev_index_path])
1549 # and check that target_dir equals to source_dir_diff (the randomly
1550 # altered self.GIT_DIR directory)
1551 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1553 # then delete target_dir and apply diff backup from zero and check again
1554 shutil.rmtree("target_dir")
1555 deltatar.restore_backup("target_dir",
1556 backup_indexes_paths=[index_path, prev_index_path])
1558 # and check that target_dir equals to source_dir_diff (the randomly
1559 # altered self.GIT_DIR directory)
1560 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1562 def test_restore_from_index_diff_backup3_multivol(self):
1564 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1565 diff backup, then restores the diff backup with the full backup as a
1568 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1569 raise SkipTest('this test only works for uncompressed '
1570 'or concat compressed modes')
1572 password, paramversion = self.ENCRYPTION or (None, None)
1573 deltatar = DeltaTar(mode=self.MODE, password=password,
1574 crypto_paramversion=paramversion,
1575 logger=self.consoleLogger)
1577 shutil.rmtree("source_dir")
1578 shutil.copytree(self.GIT_DIR, "source_dir")
1579 shutil.copytree(self.GIT_DIR, "source_dir_diff")
1581 # create first backup
1582 deltatar.create_full_backup(
1583 source_path="source_dir",
1584 backup_path="backup_dir",
1587 prev_index_filename = deltatar.index_name_func(is_full=True)
1588 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1590 # alter the source_dir randomly
1591 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1593 for path in source_it:
1594 # if path doesn't exist (might have previously removed) ignore it.
1595 # also ignore it (i.e. do not change it) 70% of the time
1596 if not os.path.exists(path) or random.random() < 0.7:
1600 if os.path.isdir(path):
1605 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1606 prev_index_path, max_volume_size=1)
1608 # first restore initial backup in target_dir
1609 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1610 tar_path = os.path.join("backup_dir", tar_filename)
1611 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1613 # and check that target_dir equals to source_dir (which is the same as
1614 # self.GIT_DIR initially)
1615 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1617 # then apply diff backup in target_dir
1618 index_filename = deltatar.index_name_func(is_full=False)
1619 index_path = os.path.join("backup_dir2", index_filename)
1620 deltatar.restore_backup("target_dir",
1621 backup_indexes_paths=[index_path, prev_index_path])
1623 # and check that target_dir equals to source_dir_diff (the randomly
1624 # altered self.GIT_DIR directory)
1625 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1627 # then delete target_dir and apply diff backup from zero and check again
1628 shutil.rmtree("target_dir")
1629 deltatar.restore_backup("target_dir",
1630 backup_indexes_paths=[index_path, prev_index_path])
1632 # and check that target_dir equals to source_dir_diff (the randomly
1633 # altered self.GIT_DIR directory)
1634 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1636 def check_equal_dirs(self, path1, path2, deltatar):
1638 compare the two directories source_dir and target_dir and check
1641 source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
1642 source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
1643 target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
1644 target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1647 sitem = next(source_it)
1648 titem = next(target_it)
1649 except StopIteration:
1651 titem = next(target_it)
1652 raise Exception("iterators do not stop at the same time")
1653 except StopIteration:
1656 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
1657 except Exception as e:
1658 print("SITEM: " + str(sitem))
1659 print("TITEM: " + str(titem))
1662 def test_create_no_symlinks(self):
1664 Creates a full backup from different varieties of symlinks. The
1665 extracted archive may not contain any symlinks but the file contents
1668 os.system("rm -rf source_dir")
1669 os.makedirs("source_dir/symlinks")
1670 fd = os.open("source_dir/symlinks/valid_linkname",
1671 os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1672 os.write(fd, b"valid link target for symlink tests; please ignore\n")
1674 # first one is good, the rest points nowhere
1675 self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1676 self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1677 self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1678 self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
1679 password, paramversion = self.ENCRYPTION or (None, None)
1680 deltatar = DeltaTar(mode=self.MODE, password=password,
1681 crypto_paramversion=paramversion,
1682 logger=self.consoleLogger)
1684 # create first backup
1685 deltatar.create_full_backup(source_path="source_dir",
1686 backup_path="backup_dir")
1688 assert os.path.exists("backup_dir")
1689 shutil.rmtree("source_dir")
1690 assert not os.path.exists("source_dir")
1692 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1693 tar_path = os.path.join("backup_dir", tar_filename)
1695 deltatar.restore_backup(target_path="source_dir",
1696 backup_tar_path=tar_path)
1698 for _r, _ds, fs in os.walk("source_dir/symlinks"):
1699 # only the valid link plus the linked file may be found in the
1703 # the link must have been resolved and file contents must match
1705 assert not os.path.islink(f)
1706 with open("source_dir/symlinks/valid_linkname") as a:
1707 with open("source_dir/symlinks/whatever") as b:
1708 assert a.read() == b.read()
1710 def test_restore_with_symlinks(self):
1712 Creates a full backup containing different varieties of symlinks. All
1713 of them must be filtered out.
1715 password, paramversion = self.ENCRYPTION or (None, None)
1716 deltatar = DeltaTar(mode=self.MODE, password=password,
1717 crypto_paramversion=paramversion,
1718 logger=self.consoleLogger)
1720 # create first backup
1721 deltatar.create_full_backup(source_path="source_dir",
1722 backup_path="backup_dir")
1724 assert os.path.exists("backup_dir")
1725 shutil.rmtree("source_dir")
1727 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1728 tar_path = os.path.join("backup_dir", tar_filename)
1730 # add symlinks to existing archive
1732 def add_symlink (a, name, dst):
1733 l = tarfile.TarInfo("snapshot://%s" % name)
1734 l.type = tarfile.SYMTYPE
1740 with tarfile.open(tar_path,mode="a") as a:
1742 [ add_symlink(a, "symlinks/foo", "internal-file")
1743 , add_symlink(a, "symlinks/bar", "/absolute/path")
1744 , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1745 except tarfile.ReadError as e:
1746 if self.MODE == '#' or self.MODE.endswith ("gz"):
1750 except ValueError as e:
1751 if self.MODE.startswith ('#'):
1756 deltatar.restore_backup(target_path="source_dir",
1757 backup_tar_path=tar_path)
1759 # check what happened to our symlinks
1760 for name in checkme:
1761 fullpath = os.path.join("source_dir", name)
1762 assert not os.path.exists(fullpath)
1764 def test_restore_malicious_symlinks(self):
1766 Creates a full backup containing a symlink and a file of the same name.
1767 This simulates a symlink attack with a link pointing to some external
1768 path that is abused to write outside the extraction prefix.
1770 password, paramversion = self.ENCRYPTION or (None, None)
1771 deltatar = DeltaTar(mode=self.MODE, password=password,
1772 crypto_paramversion=paramversion,
1773 logger=self.consoleLogger)
1775 # create first backup
1776 deltatar.create_full_backup(source_path="source_dir",
1777 backup_path="backup_dir")
1779 assert os.path.exists("backup_dir")
1780 shutil.rmtree("source_dir")
1782 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1783 tar_path = os.path.join("backup_dir", tar_filename)
1785 # add symlinks to existing archive
1787 def add_symlink (a, name, dst):
1788 l = tarfile.TarInfo("snapshot://%s" % name)
1789 l.type = tarfile.SYMTYPE
1793 def add_file (a, name):
1794 f = tarfile.TarInfo("snapshot://%s" % name)
1795 f.type = tarfile.REGTYPE
1798 testpath = "symlinks/pernicious-link"
1799 testdst = "/tmp/does/not/exist"
1802 with tarfile.open(tar_path, mode="a") as a:
1803 add_symlink(a, testpath, testdst)
1804 add_symlink(a, testpath, testdst+"X")
1805 add_symlink(a, testpath, testdst+"XXX")
1806 add_file(a, testpath)
1807 except tarfile.ReadError as e:
1808 if self.MODE == '#' or self.MODE.endswith ("gz"):
1812 except ValueError as e:
1813 if self.MODE.startswith ('#'):
1814 pass # O_APPEND of concat archives not feasible
1818 deltatar.restore_backup(target_path="source_dir",
1819 backup_tar_path=tar_path)
1821 # check whether the link was extracted; deltatar seems to only ever
1822 # retrieve the first item it finds for a given path which in the case
1823 # at hand is a symlink to some non-existent path
1824 fullpath = os.path.join("source_dir", testpath)
1825 assert not os.path.exists(fullpath)
1827 class DeltaTar2Test(DeltaTarTest):
1829 Same as DeltaTar but with specific ":" mode
1834 class DeltaTarStreamTest(DeltaTarTest):
1836 Same as DeltaTar but with specific uncompressed stream mode
1841 class DeltaTarGzipTest(DeltaTarTest):
1843 Same as DeltaTar but with specific gzip mode
1846 MODE_COMPRESSES = True
1849 class DeltaTarGzipStreamTest(DeltaTarTest):
1851 Same as DeltaTar but with specific gzip stream mode
1854 MODE_COMPRESSES = True
1857 @skip('Bz2 tests are too slow..')
1858 class DeltaTarBz2Test(DeltaTarTest):
1860 Same as DeltaTar but with specific bz2 mode
1863 MODE_COMPRESSES = True
1866 @skip('Bz2 tests are too slow..')
1867 class DeltaTarBz2StreamTest(DeltaTarTest):
1869 Same as DeltaTar but with specific bz2 stream mode
1872 MODE_COMPRESSES = True
1875 class DeltaTarGzipConcatTest(DeltaTarTest):
1877 Same as DeltaTar but with specific gzip concat stream mode
1880 MODE_COMPRESSES = True
1883 class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
1885 Same as DeltaTar but with specific gzip aes128 concat stream mode
1888 ENCRYPTION = ('some magic key', 1)
1889 MODE_COMPRESSES = True
1892 class DeltaTarAes128ConcatTest(DeltaTarTest):
1894 Same as DeltaTar but with specific aes128 concat stream mode
1897 ENCRYPTION = ('some magic key', 1)