1 # Copyright (C) 2013 Intra2net AG
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU Lesser General Public License as published
5 # by the Free Software Foundation; either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Lesser General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see
15 # <http://www.gnu.org/licenses/lgpl-3.0.html>
17 # Author: Eduardo Robles Elvira <edulix@wadobo.com>
27 from datetime import datetime
28 from functools import partial
29 from unittest import skip, SkipTest
31 import deltatar.tarfile as tarfile
32 from deltatar.tarfile import TarFile
33 from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
34 from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35 import deltatar.crypto as crypto
37 from . import BaseTest
38 from . import new_volume_handler
40 # Enable warning messages from deltatar. This minimizes the SNR of
41 # test runs, but none of the messages are meaningful in any way.
42 VERBOSE_TEST_OUTPUT = False
44 class DeltaTarTest(BaseTest):
49 MODE_COMPRESSES = False
51 ENCRYPTION = None # (password : str, paramversion : int) option
62 self.pwd = os.getcwd()
63 os.system('rm -rf target_dir source_dir* backup_dir* huge')
64 os.makedirs('source_dir/test/test2')
66 self.hash["source_dir/test/test2"] = ''
67 self.hash["source_dir/big"] = self.create_file("source_dir/big", 50000)
68 self.hash["source_dir/small"] = self.create_file("source_dir/small", 100)
69 self.hash["source_dir/test/huge"] = self.create_file("source_dir/test/huge", 700000)
70 self.hash["source_dir/test/huge2"] = self.create_file("source_dir/test/huge2", 800000)
72 self.consoleLogger = None
73 if VERBOSE_TEST_OUTPUT is True:
74 self.consoleLogger = logging.StreamHandler()
75 self.consoleLogger.setLevel(logging.DEBUG)
77 if not os.path.isdir(self.GIT_DIR):
78 # Not running inside git tree, take our
79 # own testing directory as source.
80 self.GIT_DIR = 'testing'
82 if not os.path.isdir(self.GIT_DIR):
83 raise Exception('No input directory found: ' + self.GIT_DIR)
85 if self.FSTEST is not None:
90 Remove temporary files created by unit tests and restore the API
93 for att, val in self.FSAPI_SAVED:
94 setattr (os, att, val)
96 os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
97 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
98 ("I am fully aware that this will void my warranty.")
100 def test_restore_simple_full_backup(self):
102 Creates a full backup without any filtering and restores it.
104 password, paramversion = self.ENCRYPTION or (None, None)
105 deltatar = DeltaTar(mode=self.MODE, password=password,
106 crypto_paramversion=paramversion,
107 logger=self.consoleLogger)
109 # create first backup
110 deltatar.create_full_backup(
111 source_path="source_dir",
112 backup_path="backup_dir")
114 assert os.path.exists("backup_dir")
115 shutil.rmtree("source_dir")
117 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
118 tar_path = os.path.join("backup_dir", tar_filename)
120 deltatar.restore_backup(target_path="source_dir",
121 backup_tar_path=tar_path)
123 for key, value in self.hash.items():
124 assert os.path.exists(key)
126 assert value == self.md5sum(key)
129 def test_create_backup_max_file_length (self):
131 Creates a full backup including one file that exceeds the (purposely
132 lowered) upper bound on GCM encrypted objects. This will yield multiple
133 encrypted objects for one plaintext file.
135 Success is verified by splitting the archive at object boundaries and
138 if self.MODE_COMPRESSES is True:
139 raise SkipTest ("GCM file length test not meaningful with compression.")
140 if self.ENCRYPTION is None:
141 raise SkipTest ("GCM file length applies only to encrypted backups.")
143 new_max = 20000 # cannot be less than tar block size
144 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
145 ("I am fully aware that this will void my warranty.",
148 password, paramversion = self.ENCRYPTION
149 deltatar = DeltaTar (mode=self.MODE, password=password,
150 crypto_paramversion=paramversion,
151 logger=self.consoleLogger)
154 os.makedirs ("source_dir2")
155 for f, s in [("empty" , 0) # 1 tar objects
156 ,("slightly_larger", new_max + 1) # 2
157 ,("twice" , 2 * new_max) # 3
159 f = "source_dir2/%s" % f
160 self.hash [f] = self.create_file (f, s)
162 deltatar.create_full_backup \
163 (source_path="source_dir2", backup_path="backup_dir")
165 assert os.path.exists ("backup_dir")
166 shutil.rmtree ("source_dir2")
168 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
169 backup_path = os.path.join("backup_dir", backup_filename)
171 # split the resulting archive into its constituents without
173 ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
174 "-o backup_dir/split <\'%s\'" % backup_path)
176 assert os.path.exists ("backup_dir/split")
178 dents = os.listdir ("backup_dir/split")
179 assert len (dents) == 6
182 def test_restore_backup_max_file_length (self):
184 Creates a full backup including one file that exceeds the (purposely
185 lowered) upper bound on GCM encrypted objects. This will yield two
186 encrypted objects for one plaintext file.
188 Success is verified by splitting the archive at object boundaries and
191 if self.MODE_COMPRESSES is True:
192 raise SkipTest ("GCM file length test not meaningful with compression.")
193 if self.ENCRYPTION is None:
194 raise SkipTest ("GCM file length applies only to encrypted backups.")
196 new_max = 20000 # cannot be less than tar block size
197 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
198 ("I am fully aware that this will void my warranty.",
201 password, paramversion = self.ENCRYPTION
202 deltatar = DeltaTar (mode=self.MODE, password=password,
203 crypto_paramversion=paramversion,
204 logger=self.consoleLogger)
207 os.makedirs ("source_dir2")
208 for f, s in [("empty" , 0) # 1 tar objects
209 ,("almost_large" , new_max - 1) # 2
210 ,("large" , new_max) # 3
211 ,("slightly_larger", new_max + 1) # 4
212 ,("twice" , 2 * new_max) # 5
213 ,("twice_plus_one" , (2 * new_max) + 1) # 6
215 f = "source_dir2/%s" % f
216 self.hash [f] = self.create_file (f, s)
218 deltatar.create_full_backup \
219 (source_path="source_dir2", backup_path="backup_dir")
221 assert os.path.exists ("backup_dir")
222 shutil.rmtree ("source_dir2")
224 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
225 backup_path = os.path.join("backup_dir", backup_filename)
227 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
228 tar_path = os.path.join("backup_dir", tar_filename)
230 deltatar.restore_backup(target_path="source_dir2",
231 backup_tar_path=tar_path)
233 for key, value in self.hash.items():
234 assert os.path.exists(key)
236 assert value == self.md5sum(key)
239 def test_create_backup_index_max_file_length (self):
241 Creates a full backup with a too large index file for the upper bound
242 of the GCM encryption. Since the index file has a fixed IV file counter
243 of AES_GCM_IV_CNT_INDEX, we expect the crypto layer to abort.
245 60+ GB of (potentially compressed) index file should last for a while...
247 if self.MODE_COMPRESSES is True:
248 raise SkipTest ("GCM file length test not meaningful with compression.")
249 if self.ENCRYPTION is None:
250 raise SkipTest ("GCM file length applies only to encrypted backups.")
253 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
254 ("I am fully aware that this will void my warranty.",
257 password, paramversion = self.ENCRYPTION
258 deltatar = DeltaTar (mode=self.MODE, password=password,
259 crypto_paramversion=paramversion,
260 logger=self.consoleLogger)
263 os.makedirs ("source_dir2")
265 f = "source_dir2/dummy_%rd" % i
266 self.hash [f] = self.create_file (f, i)
268 with self.assertRaises (crypto.InvalidFileCounter):
269 deltatar.create_full_backup \
270 (source_path="source_dir2", backup_path="backup_dir")
271 shutil.rmtree ("source_dir2")
274 def test_check_index_checksum(self):
276 Creates a full backup and checks the index' checksum of files
278 password, paramversion = self.ENCRYPTION or (None, None)
279 deltatar = DeltaTar(mode=self.MODE, password=password,
280 crypto_paramversion=paramversion,
281 logger=self.consoleLogger)
283 # create first backup
284 deltatar.create_full_backup(
285 source_path="source_dir",
286 backup_path="backup_dir")
289 index_filename = deltatar.index_name_func(True)
290 index_path = os.path.join("backup_dir", index_filename)
292 f = open(index_path, 'rb')
300 if b'BEGIN-FILE-LIST' in l:
301 crc = binascii.crc32(l) & 0xFFFFffff
303 elif b'END-FILE-LIST' in l:
304 crc = binascii.crc32(l, crc) & 0xffffffff
306 # next line contains the crc
307 data = json.loads(f.readline().decode("UTF-8"))
308 assert data['type'] == 'file-list-checksum'
309 assert data['checksum'] == crc
313 crc = binascii.crc32(l, crc) & 0xffffffff
317 def test_restore_multivol(self):
319 Creates a full backup without any filtering with multiple volumes and
322 if ':gz' in self.MODE:
323 raise SkipTest('compression information is lost when creating '
324 'multiple volumes with no Stream')
326 password, paramversion = self.ENCRYPTION or (None, None)
327 deltatar = DeltaTar(mode=self.MODE, password=password,
328 crypto_paramversion=paramversion,
329 logger=self.consoleLogger)
332 os.makedirs('source_dir2')
333 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
334 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
336 # create first backup
337 deltatar.create_full_backup(
338 source_path="source_dir2",
339 backup_path="backup_dir",
342 assert os.path.exists("backup_dir")
343 assert os.path.exists(os.path.join("backup_dir",
344 deltatar.volume_name_func("backup_dir", True, 0)))
345 if self.MODE_COMPRESSES:
349 for i_vol in range(n_vols):
350 assert os.path.exists(os.path.join("backup_dir",
351 deltatar.volume_name_func("backup_dir", True, i_vol)))
352 assert not os.path.exists(os.path.join("backup_dir",
353 deltatar.volume_name_func("backup_dir", True, n_vols)))
355 shutil.rmtree("source_dir2")
357 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
358 tar_path = os.path.join("backup_dir", tar_filename)
360 # this should automatically restore all volumes
361 deltatar.restore_backup(target_path="source_dir2",
362 backup_tar_path=tar_path)
364 for key, value in self.hash.items():
365 assert os.path.exists(key)
367 assert value == self.md5sum(key)
369 def test_restore_multivol_split(self):
371 Creates a full backup without any filtering with multiple volumes
372 with big files bigger than the max volume size and
375 if self.MODE.startswith(':') or self.MODE.startswith('|'):
376 raise SkipTest('this test only works for uncompressed '
377 'or concat compressed modes')
379 password, paramversion = self.ENCRYPTION or (None, None)
380 deltatar = DeltaTar(mode=self.MODE, password=password,
381 crypto_paramversion=paramversion,
382 logger=self.consoleLogger)
385 os.makedirs('source_dir2')
386 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 3*1024*1024)
387 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 4*1024*1024)
388 self.hash["source_dir2/huge2"] = self.create_file("source_dir2/huge2", 4*1024*1024)
390 # create first backup
391 deltatar.create_full_backup(
392 source_path="source_dir2",
393 backup_path="backup_dir",
396 assert os.path.exists("backup_dir")
397 assert os.path.exists(os.path.join("backup_dir",
398 deltatar.volume_name_func("backup_dir", True, 0)))
399 if self.MODE_COMPRESSES:
403 for i_vol in range(n_vols):
404 assert os.path.exists(os.path.join("backup_dir",
405 deltatar.volume_name_func("backup_dir", True, i_vol)))
406 assert not os.path.exists(os.path.join("backup_dir",
407 deltatar.volume_name_func("backup_dir", True, n_vols)))
409 shutil.rmtree("source_dir2")
411 index_filename = deltatar.index_name_func(True)
412 index_path = os.path.join("backup_dir", index_filename)
414 deltatar.restore_backup(target_path="source_dir2",
415 backup_indexes_paths=[index_path])
417 for key, value in self.hash.items():
418 assert os.path.exists(key)
420 assert value == self.md5sum(key)
423 def test_full_backup_index_extra_data(self):
425 Tests that the index file for a full backup can store extra_data and
426 that this data can be retrieved.
428 password, paramversion = self.ENCRYPTION or (None, None)
429 deltatar = DeltaTar(mode=self.MODE, password=password,
430 crypto_paramversion=paramversion,
431 logger=self.consoleLogger)
435 otra_cosa=[1, "lista"],
436 y_otra=dict(bola=1.1)
439 deltatar.create_full_backup(
440 source_path="source_dir",
441 backup_path="backup_dir",
442 extra_data=extra_data)
444 index_filename = deltatar.index_name_func(is_full=True)
445 index_path = os.path.join("backup_dir", index_filename)
447 # iterate_index_path retrieves extra_data, and thus we can then compare
448 index_it = deltatar.iterate_index_path(index_path)
449 self.assertEqual(index_it.extra_data, extra_data)
452 def test_diff_backup_index_extra_data(self):
454 Tests that the index file for a diff backup can store extra_data and
455 that this data can be retrieved.
457 password, paramversion = self.ENCRYPTION or (None, None)
458 deltatar = DeltaTar(mode=self.MODE, password=password,
459 crypto_paramversion=paramversion,
460 logger=self.consoleLogger)
464 otra_cosa=[1, "lista"],
465 y_otra=dict(bola=1.1)
468 deltatar.create_full_backup(
469 source_path="source_dir",
470 backup_path="backup_dir")
473 prev_index_filename = deltatar.index_name_func(is_full=True)
474 prev_index_path = os.path.join("backup_dir", prev_index_filename)
476 # create empty diff backup
477 deltatar.create_diff_backup("source_dir", "backup_dir2",
478 prev_index_path, extra_data=extra_data)
480 index_filename = deltatar.index_name_func(is_full=False)
481 index_path = os.path.join("backup_dir2", index_filename)
483 # iterate_index_path retrieves extra_data, and thus we can then compare
484 index_it = deltatar.iterate_index_path(index_path)
485 self.assertEqual(index_it.extra_data, extra_data)
487 def test_restore_multivol2(self):
489 Creates a full backup without any filtering with multiple volumes and
492 password, paramversion = self.ENCRYPTION or (None, None)
493 deltatar = DeltaTar(mode=self.MODE, password=password,
494 crypto_paramversion=paramversion,
495 logger=self.consoleLogger)
497 shutil.copytree(self.GIT_DIR, "source_dir2")
499 # create first backup
500 deltatar.create_full_backup(
501 source_path="source_dir2",
502 backup_path="backup_dir",
505 assert os.path.exists("backup_dir")
506 assert os.path.exists(os.path.join("backup_dir",
507 deltatar.volume_name_func("backup_dir", True, 0)))
509 shutil.rmtree("source_dir2")
511 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
512 tar_path = os.path.join("backup_dir", tar_filename)
514 # this should automatically restore all volumes
515 deltatar.restore_backup(target_path="source_dir2",
516 backup_tar_path=tar_path)
518 self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
520 def test_restore_multivol_manual_from_index(self):
522 Creates a full backup without any filtering with multiple volumes and
525 # this test only works for uncompressed or concat compressed modes
526 if self.MODE.startswith(':') or self.MODE.startswith('|'):
527 raise SkipTest('this test only works for uncompressed '
528 'or concat compressed modes')
530 password, paramversion = self.ENCRYPTION or (None, None)
531 deltatar = DeltaTar(mode=self.MODE, password=password,
532 crypto_paramversion=paramversion,
533 logger=self.consoleLogger)
536 os.makedirs('source_dir2')
537 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
538 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
540 # create first backup
541 deltatar.create_full_backup(
542 source_path="source_dir2",
543 backup_path="backup_dir",
546 assert os.path.exists("backup_dir")
547 assert os.path.exists(os.path.join("backup_dir",
548 deltatar.volume_name_func("backup_dir", True, 0)))
549 if self.MODE_COMPRESSES:
553 for i_vol in range(n_vols):
554 assert os.path.exists(os.path.join("backup_dir",
555 deltatar.volume_name_func("backup_dir", True, i_vol)))
556 assert not os.path.exists(os.path.join("backup_dir",
557 deltatar.volume_name_func("backup_dir", True, n_vols)))
559 shutil.rmtree("source_dir2")
561 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
562 tar_path = os.path.join("backup_dir", tar_filename)
564 index_filename = deltatar.index_name_func(True)
565 index_path = os.path.join("backup_dir", index_filename)
567 # this should automatically restore the huge file
568 f = deltatar.open_auxiliary_file(index_path, 'r')
574 data = json.loads(l.decode('UTF-8'))
575 if data.get('type', '') == 'file' and\
576 deltatar.unprefixed(data['path']) == "huge":
577 offset = data['offset']
580 assert offset is not None
582 fo = open(tar_path, 'rb')
584 def new_volume_handler(mode, tarobj, base_name, volume_number):
585 suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
586 if self.ENCRYPTION is not None:
587 # deltatar module is shadowed here
588 suf += "." + deltatar_PDTCRYPT_EXTENSION
589 tarobj.open_volume(datetime.now().strftime(
590 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
591 new_volume_handler = partial(new_volume_handler, self.MODE)
594 if self.ENCRYPTION is not None:
595 crypto_ctx = crypto.Decrypt (password)
597 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
598 new_volume_handler=new_volume_handler,
599 encryption=crypto_ctx)
601 member = tarobj.next()
602 member.path = deltatar.unprefixed(member.path)
603 member.name = deltatar.unprefixed(member.name)
604 tarobj.extract(member)
607 assert self.hash['source_dir2/huge'] == self.md5sum('huge')
612 def test_restore_manual_from_index_twice (self):
614 Creates a full backup and restore the same file twice. This *must* fail
615 when encryption is active.
617 Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
618 backwards within the same file. This prevents the encryption layer from
619 exploding due to a reused IV in an overall valid archive.
621 This test anticipates possible future mistakes since it’s entirely
622 feasible to implement backward seeks for *_Stream* with concat mode.
624 # this test only works for uncompressed or concat compressed modes
625 if self.MODE.startswith("|") or self.MODE_COMPRESSES:
626 raise SkipTest("this test only works for uncompressed "
627 "or concat compressed modes")
629 password, paramversion = self.ENCRYPTION or (None, None)
630 deltatar = DeltaTar(mode=self.MODE, password=password,
631 crypto_paramversion=paramversion,
632 logger=self.consoleLogger)
635 os.makedirs("source_dir2")
636 self.hash["source_dir2/samefile"] = \
637 self.create_file("source_dir2/samefile", 1 * 1024)
639 # create first backup
640 deltatar.create_full_backup(
641 source_path="source_dir2",
642 backup_path="backup_dir")
644 assert os.path.exists("backup_dir")
645 assert os.path.exists(os.path.join("backup_dir",
646 deltatar.volume_name_func("backup_dir", True, 0)))
648 shutil.rmtree("source_dir2")
650 tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
651 tar_path = os.path.join("backup_dir", tar_filename)
653 index_filename = deltatar.index_name_func(True)
654 index_path = os.path.join("backup_dir", index_filename)
656 f = deltatar.open_auxiliary_file(index_path, "r")
662 data = json.loads(l.decode("UTF-8"))
663 if data.get("type", "") == "file" and\
664 deltatar.unprefixed(data["path"]) == "samefile":
665 offset = data["offset"]
668 assert offset is not None
670 fo = open(tar_path, "rb")
674 if self.ENCRYPTION is not None:
675 crypto_ctx = crypto.Decrypt (password)
677 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
678 encryption=crypto_ctx)
679 member = tarobj.next()
680 member.path = deltatar.unprefixed(member.path)
681 member.name = deltatar.unprefixed(member.name)
684 tarobj.extract(member)
685 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
689 tarobj.extract(member)
690 except tarfile.StreamError:
691 if crypto_ctx is not None:
692 pass # good: seeking backwards not allowed
697 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
699 os.unlink("samefile")
702 def test_restore_from_index(self):
704 Restores a full backup using an index file.
706 if self.MODE.startswith(':') or self.MODE.startswith('|'):
707 raise SkipTest('this test only works for uncompressed '
708 'or concat compressed modes')
710 password, paramversion = self.ENCRYPTION or (None, None)
711 deltatar = DeltaTar(mode=self.MODE, password=password,
712 crypto_paramversion=paramversion,
713 logger=self.consoleLogger)
715 # create first backup
716 deltatar.create_full_backup(
717 source_path="source_dir",
718 backup_path="backup_dir")
720 shutil.rmtree("source_dir")
722 # this should automatically restore all volumes
723 index_filename = deltatar.index_name_func(True)
724 index_path = os.path.join("backup_dir", index_filename)
726 deltatar.restore_backup(target_path="source_dir",
727 backup_indexes_paths=[index_path])
729 for key, value in self.hash.items():
730 assert os.path.exists(key)
732 assert value == self.md5sum(key)
734 def test_restore_multivol_from_index(self):
736 Restores a full multivolume backup using an index file.
738 if self.MODE.startswith(':') or self.MODE.startswith('|'):
739 raise SkipTest('this test only works for uncompressed '
740 'or concat compressed modes')
742 password, paramversion = self.ENCRYPTION or (None, None)
743 deltatar = DeltaTar(mode=self.MODE, password=password,
744 crypto_paramversion=paramversion,
745 logger=self.consoleLogger)
747 # create first backup
748 deltatar.create_full_backup(
749 source_path="source_dir",
750 backup_path="backup_dir",
753 shutil.rmtree("source_dir")
755 # this should automatically restore all volumes
756 index_filename = deltatar.index_name_func(True)
757 index_path = os.path.join("backup_dir", index_filename)
759 deltatar.restore_backup(target_path="source_dir",
760 backup_indexes_paths=[index_path])
762 for key, value in self.hash.items():
763 assert os.path.exists(key)
765 assert value == self.md5sum(key)
767 def test_create_basic_filtering(self):
769 Tests create backup basic filtering.
771 password, paramversion = self.ENCRYPTION or (None, None)
772 deltatar = DeltaTar(mode=self.MODE, password=password,
773 crypto_paramversion=paramversion,
774 logger=self.consoleLogger,
775 included_files=["test", "small"],
776 excluded_files=["test/huge"])
778 # create first backup
779 deltatar.create_full_backup(
780 source_path="source_dir",
781 backup_path="backup_dir")
783 assert os.path.exists("backup_dir")
784 shutil.rmtree("source_dir")
786 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
787 tar_path = os.path.join("backup_dir", tar_filename)
789 deltatar.restore_backup(target_path="source_dir",
790 backup_tar_path=tar_path)
792 assert os.path.exists("source_dir/small")
793 assert os.path.exists("source_dir/test")
794 assert os.path.exists("source_dir/test/huge2")
795 assert os.path.exists("source_dir/test/test2")
797 assert not os.path.exists("source_dir/test/huge")
798 assert not os.path.exists("source_dir/big")
800 def test_create_filter_func(self):
802 Tests create backup basic filtering.
805 def filter_func(visited_paths, path):
806 if path not in visited_paths:
807 visited_paths.append(path)
810 filter_func = partial(filter_func, visited_paths)
812 password, paramversion = self.ENCRYPTION or (None, None)
813 deltatar = DeltaTar(mode=self.MODE, password=password,
814 crypto_paramversion=paramversion,
815 logger=self.consoleLogger,
816 included_files=["test", "small"],
817 excluded_files=["test/huge"],
818 filter_func=filter_func)
820 # create first backup
821 deltatar.create_full_backup(
822 source_path="source_dir",
823 backup_path="backup_dir")
825 assert os.path.exists("backup_dir")
826 shutil.rmtree("source_dir")
828 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
829 tar_path = os.path.join("backup_dir", tar_filename)
831 deltatar.restore_backup(target_path="source_dir",
832 backup_tar_path=tar_path)
833 assert set(visited_paths) == set([
840 def test_create_filter_out_func(self):
842 Tests create backup basic filtering.
845 def filter_func(visited_paths, path):
847 Filter out everything
849 if path not in visited_paths:
850 visited_paths.append(path)
853 filter_func = partial(filter_func, visited_paths)
855 password, paramversion = self.ENCRYPTION or (None, None)
856 deltatar = DeltaTar(mode=self.MODE, password=password,
857 crypto_paramversion=paramversion,
858 logger=self.consoleLogger,
859 included_files=["test", "small"],
860 excluded_files=["test/huge"],
861 filter_func=filter_func)
863 # create first backup
864 deltatar.create_full_backup(
865 source_path="source_dir",
866 backup_path="backup_dir")
868 assert os.path.exists("backup_dir")
869 shutil.rmtree("source_dir")
871 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
872 tar_path = os.path.join("backup_dir", tar_filename)
874 deltatar.restore_backup(target_path="source_dir",
875 backup_tar_path=tar_path)
876 assert set(visited_paths) == set([
881 # check that effectively no file was backed up
882 assert not os.path.exists("source_dir/small")
883 assert not os.path.exists("source_dir/big")
884 assert not os.path.exists("source_dir/test")
886 def test_restore_index_basic_filtering(self):
888 Creates a backup, and then filter when doing the index based restore.
890 if self.MODE.startswith(':') or self.MODE.startswith('|'):
891 raise SkipTest('this test only works for uncompressed '
892 'or concat compressed modes')
894 password, paramversion = self.ENCRYPTION or (None, None)
895 deltatar = DeltaTar(mode=self.MODE, password=password,
896 crypto_paramversion=paramversion,
897 logger=self.consoleLogger)
899 # create first backup
900 deltatar.create_full_backup(
901 source_path="source_dir",
902 backup_path="backup_dir")
904 assert os.path.exists("backup_dir")
905 shutil.rmtree("source_dir")
907 index_filename = deltatar.index_name_func(True)
908 index_path = os.path.join("backup_dir", index_filename)
910 deltatar.included_files = ["test", "small"]
911 deltatar.excluded_files = ["test/huge"]
912 deltatar.restore_backup(target_path="source_dir",
913 backup_indexes_paths=[index_path])
915 assert os.path.exists("source_dir/small")
916 assert os.path.exists("source_dir/test")
917 assert os.path.exists("source_dir/test/huge2")
918 assert os.path.exists("source_dir/test/test2")
920 assert not os.path.exists("source_dir/test/huge")
921 assert not os.path.exists("source_dir/big")
923 def test_restore_index_filter_func(self):
925 Creates a backup, and then filter when doing the index based restore,
926 using the filter function.
928 if self.MODE.startswith(':') or self.MODE.startswith('|'):
929 raise SkipTest('this test only works for uncompressed '
930 'or concat compressed modes')
933 def filter_func(visited_paths, path):
934 if path not in visited_paths:
935 visited_paths.append(path)
938 filter_func = partial(filter_func, visited_paths)
940 password, paramversion = self.ENCRYPTION or (None, None)
941 deltatar = DeltaTar(mode=self.MODE, password=password,
942 crypto_paramversion=paramversion,
943 logger=self.consoleLogger)
945 # create first backup
946 deltatar.create_full_backup(
947 source_path="source_dir",
948 backup_path="backup_dir")
950 assert os.path.exists("backup_dir")
951 shutil.rmtree("source_dir")
953 index_filename = deltatar.index_name_func(True)
954 index_path = os.path.join("backup_dir", index_filename)
956 deltatar.included_files = ["test", "small"]
957 deltatar.excluded_files = ["test/huge"]
958 deltatar.filter_func = filter_func
959 deltatar.restore_backup(target_path="source_dir",
960 backup_indexes_paths=[index_path])
962 assert set(visited_paths) == set([
969 def test_restore_tar_basic_filtering(self):
971 Creates a backup, and then filter when doing the tar based restore.
973 password, paramversion = self.ENCRYPTION or (None, None)
974 deltatar = DeltaTar(mode=self.MODE, password=password,
975 crypto_paramversion=paramversion,
976 logger=self.consoleLogger)
978 # create first backup
979 deltatar.create_full_backup(
980 source_path="source_dir",
981 backup_path="backup_dir")
983 assert os.path.exists("backup_dir")
984 shutil.rmtree("source_dir")
986 deltatar.included_files = ["test", "small"]
987 deltatar.excluded_files = ["test/huge"]
989 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
990 tar_path = os.path.join("backup_dir", tar_filename)
992 deltatar.restore_backup(target_path="source_dir",
993 backup_tar_path=tar_path)
995 assert os.path.exists("source_dir/small")
996 assert os.path.exists("source_dir/test")
997 assert os.path.exists("source_dir/test/huge2")
998 assert os.path.exists("source_dir/test/test2")
1000 assert not os.path.exists("source_dir/test/huge")
1001 assert not os.path.exists("source_dir/big")
1003 def test_restore_tar_filter_func(self):
1005 Creates a backup, and then filter when doing the tar based restore,
1006 using the filter function.
1009 def filter_func(visited_paths, path):
1010 if path not in visited_paths:
1011 visited_paths.append(path)
1014 filter_func = partial(filter_func, visited_paths)
1016 password, paramversion = self.ENCRYPTION or (None, None)
1017 deltatar = DeltaTar(mode=self.MODE, password=password,
1018 crypto_paramversion=paramversion,
1019 logger=self.consoleLogger)
1021 # create first backup
1022 deltatar.create_full_backup(
1023 source_path="source_dir",
1024 backup_path="backup_dir")
1026 assert os.path.exists("backup_dir")
1027 shutil.rmtree("source_dir")
1029 index_filename = deltatar.index_name_func(True)
1030 index_path = os.path.join("backup_dir", index_filename)
1032 deltatar.included_files = ["test", "small"]
1033 deltatar.excluded_files = ["test/huge"]
1034 deltatar.filter_func = filter_func
1036 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1037 tar_path = os.path.join("backup_dir", tar_filename)
1039 deltatar.restore_backup(target_path="source_dir",
1040 backup_tar_path=tar_path)
1041 assert set(visited_paths) == set([
1048 def test_filter_path_regexp(self):
1050 Test specifically the deltatar.filter_path function with regular
1054 re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1055 re.compile('^yes$'),
1059 re.compile('^testing/in_the'),
1061 deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1062 excluded_files=excluded_files)
1064 # assert valid and invalid paths
1065 assert deltatar.filter_path('test/hola')
1066 assert deltatar.filter_path('test/hola/any/thing')
1067 assert deltatar.filter_path('test/caracola/caracolero')
1068 assert deltatar.filter_path('test/caracola/caracolero/yeah')
1069 assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1070 assert deltatar.filter_path('yes')
1071 assert deltatar.filter_path('testing')
1072 assert deltatar.filter_path('testing/yes')
1073 assert deltatar.filter_path('testing/in_th')
1075 assert not deltatar.filter_path('something')
1076 assert not deltatar.filter_path('other/thing')
1077 assert not deltatar.filter_path('test_ing')
1078 assert not deltatar.filter_path('test/hola_lala')
1079 assert not deltatar.filter_path('test/agur')
1080 assert not deltatar.filter_path('testing_something')
1081 assert not deltatar.filter_path('yeso')
1082 assert not deltatar.filter_path('yes/o')
1083 assert not deltatar.filter_path('yes_o')
1084 assert not deltatar.filter_path('testing/in_the')
1085 assert not deltatar.filter_path('testing/in_the_field')
1086 assert not deltatar.filter_path('testing/in_the/field')
1088 def test_filter_path_parent(self):
1090 Test specifically the deltatar.filter_path function for parent matching
1093 'testing/path/to/some/thing'
1095 deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1097 # assert valid and invalid paths
1098 assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1099 assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1100 assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1101 assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1102 assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1103 assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1104 assert deltatar.filter_path('testing/something/else') == NO_MATCH
1106 def test_parent_matching_simple_full_backup(self):
1108 Create a full backup using parent matching
1114 password, paramversion = self.ENCRYPTION or (None, None)
1115 deltatar = DeltaTar(mode=self.MODE, password=password,
1116 crypto_paramversion=paramversion,
1117 logger=self.consoleLogger,
1118 included_files=included_files)
1120 # create first backup
1121 deltatar.create_full_backup(
1122 source_path="source_dir",
1123 backup_path="backup_dir")
1125 assert os.path.exists("backup_dir")
1126 shutil.rmtree("source_dir")
1128 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1129 tar_path = os.path.join("backup_dir", tar_filename)
1131 deltatar = DeltaTar(mode=self.MODE, password=password,
1132 logger=self.consoleLogger)
1133 deltatar.restore_backup(target_path="source_dir",
1134 backup_tar_path=tar_path)
1136 assert os.path.exists('source_dir/test/huge2')
1137 assert os.path.exists('source_dir/test/')
1138 assert not os.path.exists('source_dir/test/huge')
1139 assert not os.path.exists('source_dir/big')
1140 assert not os.path.exists('source_dir/small')
1142 def test_parent_matching_simple_full_backup_restore(self):
1144 Create a full backup and restores it using parent matching
1150 password, paramversion = self.ENCRYPTION or (None, None)
1151 deltatar = DeltaTar(mode=self.MODE, password=password,
1152 crypto_paramversion=paramversion,
1153 logger=self.consoleLogger)
1155 # create first backup
1156 deltatar.create_full_backup(
1157 source_path="source_dir",
1158 backup_path="backup_dir")
1160 assert os.path.exists("backup_dir")
1161 shutil.rmtree("source_dir")
1163 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1164 tar_path = os.path.join("backup_dir", tar_filename)
1166 deltatar = DeltaTar(mode=self.MODE, password=password,
1167 logger=self.consoleLogger,
1168 included_files=included_files)
1169 deltatar.restore_backup(target_path="source_dir",
1170 backup_tar_path=tar_path)
1172 assert os.path.exists('source_dir/test/huge2')
1173 assert os.path.exists('source_dir/test/')
1174 assert not os.path.exists('source_dir/test/huge')
1175 assert not os.path.exists('source_dir/big')
1176 assert not os.path.exists('source_dir/small')
1178 def test_parent_matching_index_full_backup_restore(self):
1180 Create a full backup and restores it using parent matching
1186 password, paramversion = self.ENCRYPTION or (None, None)
1187 deltatar = DeltaTar(mode=self.MODE, password=password,
1188 crypto_paramversion=paramversion,
1189 logger=self.consoleLogger)
1191 # create first backup
1192 deltatar.create_full_backup(
1193 source_path="source_dir",
1194 backup_path="backup_dir")
1196 assert os.path.exists("backup_dir")
1197 shutil.rmtree("source_dir")
1199 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1200 tar_path = os.path.join("backup_dir", tar_filename)
1202 deltatar = DeltaTar(mode=self.MODE, password=password,
1203 logger=self.consoleLogger,
1204 included_files=included_files)
1205 deltatar.restore_backup(target_path="source_dir",
1206 backup_tar_path=tar_path)
1208 assert os.path.exists('source_dir/test/huge2')
1209 assert os.path.exists('source_dir/test/')
1210 assert not os.path.exists('source_dir/test/huge')
1211 assert not os.path.exists('source_dir/big')
1212 assert not os.path.exists('source_dir/small')
1214 def test_collate_iterators(self):
1216 Tests the collate iterators functionality with two exact directories,
1217 using an index iterator from a backup and the exact same source dir.
1219 password, paramversion = self.ENCRYPTION or (None, None)
1220 deltatar = DeltaTar(mode=self.MODE, password=password,
1221 crypto_paramversion=paramversion,
1222 logger=self.consoleLogger)
1224 # create first backup
1225 deltatar.create_full_backup(
1226 source_path="source_dir",
1227 backup_path="backup_dir")
1229 assert os.path.exists("backup_dir")
1232 index_filename = deltatar.index_name_func(is_full=True)
1233 index_path = os.path.join(cwd, "backup_dir", index_filename)
1234 index_it = deltatar.iterate_index_path(index_path)
1236 os.chdir('source_dir')
1237 dir_it = deltatar._recursive_walk_dir('.')
1238 path_it = deltatar.jsonize_path_iterator(dir_it)
1241 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1242 assert deltatar._equal_stat_dicts(path1, path2)
1246 def test_collate_iterators_diffdirs(self):
1248 Use the collate iterators functionality with two different directories.
1249 It must behave in an expected way.
1251 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1253 password, paramversion = self.ENCRYPTION or (None, None)
1254 deltatar = DeltaTar(mode=self.MODE, password=password,
1255 crypto_paramversion=paramversion,
1256 logger=self.consoleLogger)
1258 # create first backup
1259 deltatar.create_full_backup(
1260 source_path="source_dir",
1261 backup_path="backup_dir")
1263 assert os.path.exists("backup_dir")
1264 self.hash["source_dir/z"] = self.create_file("source_dir/z", 100)
1267 index_filename = deltatar.index_name_func(is_full=True)
1268 index_path = os.path.join(cwd, "backup_dir", index_filename)
1269 index_it = deltatar.iterate_index_path(index_path)
1271 os.chdir('source_dir')
1272 dir_it = deltatar._recursive_walk_dir('.')
1273 path_it = deltatar.jsonize_path_iterator(dir_it)
1276 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1277 if path2['path'] == 'z':
1280 assert deltatar._equal_stat_dicts(path1, path2)
1284 def test_collate_iterators_diffdirs2(self):
1286 Use the collate iterators functionality with two different directories.
1287 It must behave in an expected way.
1289 password, paramversion = self.ENCRYPTION or (None, None)
1290 deltatar = DeltaTar(mode=self.MODE, password=password,
1291 crypto_paramversion=paramversion,
1292 logger=self.consoleLogger)
1294 # create first backup
1295 deltatar.create_full_backup(
1296 source_path="source_dir",
1297 backup_path="backup_dir")
1299 assert os.path.exists("backup_dir")
1301 # add some new files and directories
1302 os.makedirs('source_dir/bigdir')
1303 self.hash["source_dir/bigdir"] = ""
1304 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1305 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1306 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1309 index_filename = deltatar.index_name_func(is_full=True)
1310 index_path = os.path.join(cwd, "backup_dir", index_filename)
1311 index_it = deltatar.iterate_index_path(index_path)
1313 os.chdir('source_dir')
1314 dir_it = deltatar._recursive_walk_dir('.')
1315 path_it = deltatar.jsonize_path_iterator(dir_it)
1320 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1321 visited_pairs.append(
1322 (deltatar.unprefixed(path1['path']) if path1 else None,
1323 path2['path'] if path2 else None)
1326 assert visited_pairs == [
1329 (u'small', u'small'),
1332 (None, u'bigdir/a'),
1333 (None, u'bigdir/b'),
1334 (u'test/huge', u'test/huge'),
1335 (u'test/huge2', u'test/huge2'),
1336 (u'test/test2', u'test/test2'),
1340 def test_create_empty_diff_backup(self):
1342 Creates an empty (no changes) backup diff
1344 password, paramversion = self.ENCRYPTION or (None, None)
1345 deltatar = DeltaTar(mode=self.MODE, password=password,
1346 crypto_paramversion=paramversion,
1347 logger=self.consoleLogger)
1349 # create first backup
1350 deltatar.create_full_backup(
1351 source_path="source_dir",
1352 backup_path="backup_dir")
1354 prev_index_filename = deltatar.index_name_func(is_full=True)
1355 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1357 deltatar.create_diff_backup("source_dir", "backup_dir2",
1361 index_path = os.path.join("backup_dir2",
1362 deltatar.index_name_func(is_full=False))
1363 index_it = deltatar.iterate_index_path(index_path)
1367 assert i[0]['path'].startswith("list://")
1371 # check the tar file
1372 assert os.path.exists("backup_dir2")
1373 shutil.rmtree("source_dir")
1375 tar_filename = deltatar.volume_name_func('backup_dir2',
1376 is_full=False, volume_number=0)
1377 tar_path = os.path.join("backup_dir2", tar_filename)
1379 # no file restored, because the diff was empty
1380 deltatar.restore_backup(target_path="source_dir",
1381 backup_tar_path=tar_path)
1382 assert len(os.listdir("source_dir")) == 0
1385 def test_create_diff_backup1(self):
1387 Creates a diff backup when there are new files
1389 password, paramversion = self.ENCRYPTION or (None, None)
1390 deltatar = DeltaTar(mode=self.MODE, password=password,
1391 crypto_paramversion=paramversion,
1392 logger=self.consoleLogger)
1394 # create first backup
1395 deltatar.create_full_backup(
1396 source_path="source_dir",
1397 backup_path="backup_dir")
1399 prev_index_filename = deltatar.index_name_func(is_full=True)
1400 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1402 # add some new files and directories
1403 os.makedirs('source_dir/bigdir')
1404 self.hash["source_dir/bigdir"] = ""
1405 os.unlink("source_dir/small")
1406 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1407 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1408 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1410 deltatar.create_diff_backup("source_dir", "backup_dir2",
1414 index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
1415 index_it = deltatar.iterate_index_path(index_path)
1416 l = [i[0]['path'] for i in index_it]
1420 'snapshot://bigdir',
1424 'snapshot://bigdir/a',
1425 'snapshot://bigdir/b',
1427 'list://test/huge2',
1428 'list://test/test2',
1431 # check the tar file
1432 assert os.path.exists("backup_dir2")
1433 shutil.rmtree("source_dir")
1435 # create source_dir with the small file, that will be then deleted by
1436 # the restore_backup
1437 os.mkdir("source_dir")
1438 open("source_dir/small", 'wb').close()
1440 tar_filename = deltatar.volume_name_func('backup_dir2',
1441 is_full=False, volume_number=0)
1442 tar_path = os.path.join("backup_dir2", tar_filename)
1444 # restore the backup, this will create only the new files
1445 deltatar.restore_backup(target_path="source_dir",
1446 backup_tar_path=tar_path)
1447 # the order doesn't matter
1448 assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
1450 def test_restore_from_index_diff_backup(self):
1452 Creates a full backup, modifies some files, creates a diff backup,
1453 then restores the diff backup from zero.
1455 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1456 raise SkipTest('this test only works for uncompressed '
1457 'or concat compressed modes')
1459 password, paramversion = self.ENCRYPTION or (None, None)
1460 deltatar = DeltaTar(mode=self.MODE, password=password,
1461 crypto_paramversion=paramversion,
1462 logger=self.consoleLogger)
1464 # create first backup
1465 deltatar.create_full_backup(
1466 source_path="source_dir",
1467 backup_path="backup_dir")
1469 prev_index_filename = deltatar.index_name_func(is_full=True)
1470 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1472 # add some new files and directories
1473 os.makedirs('source_dir/bigdir')
1474 self.hash["source_dir/bigdir"] = ""
1475 os.unlink("source_dir/small")
1476 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1477 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1478 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1480 deltatar.create_diff_backup("source_dir", "backup_dir2",
1483 # apply diff backup in target_dir
1484 index_filename = deltatar.index_name_func(is_full=False)
1485 index_path = os.path.join("backup_dir2", index_filename)
1486 deltatar.restore_backup("target_dir",
1487 backup_indexes_paths=[index_path, prev_index_path])
1489 # then compare the two directories source_dir and target_dir and check
1491 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1493 def test_restore_from_index_diff_backup2(self):
1495 Creates a full backup, modifies some files, creates a diff backup,
1496 then restores the diff backup with the full backup as a starting point.
1498 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1499 raise SkipTest('this test only works for uncompressed '
1500 'or concat compressed modes')
1502 password, paramversion = self.ENCRYPTION or (None, None)
1503 deltatar = DeltaTar(mode=self.MODE, password=password,
1504 crypto_paramversion=paramversion,
1505 logger=self.consoleLogger)
1507 # create first backup
1508 deltatar.create_full_backup(
1509 source_path="source_dir",
1510 backup_path="backup_dir")
1512 prev_index_filename = deltatar.index_name_func(is_full=True)
1513 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1515 # add some new files and directories
1516 os.makedirs('source_dir/bigdir')
1517 self.hash["source_dir/bigdir"] = ""
1518 os.unlink("source_dir/small")
1519 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1520 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1521 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1522 shutil.rmtree("source_dir/test")
1524 deltatar.create_diff_backup("source_dir", "backup_dir2",
1527 # first restore initial backup in target_dir
1528 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1529 tar_path = os.path.join("backup_dir", tar_filename)
1530 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1532 # then apply diff backup in target_dir
1533 index_filename = deltatar.index_name_func(is_full=False)
1534 index_path = os.path.join("backup_dir2", index_filename)
1537 deltatar.restore_backup("target_dir",
1538 backup_indexes_paths=[index_path, prev_index_path])
1540 # then compare the two directories source_dir and target_dir and check
1542 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1543 except FileNotFoundError as exn:
1544 if self.FSTEST is None:
1545 # fs traversal may fail here
1548 def test_restore_from_index_diff_backup3(self):
1550 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1551 diff backup, then restores the diff backup with the full backup as a
1554 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1555 raise SkipTest('this test only works for uncompressed '
1556 'or concat compressed modes')
1558 password, paramversion = self.ENCRYPTION or (None, None)
1559 deltatar = DeltaTar(mode=self.MODE, password=password,
1560 crypto_paramversion=paramversion,
1561 logger=self.consoleLogger)
1563 shutil.rmtree("source_dir")
1564 shutil.copytree(self.GIT_DIR, "source_dir")
1565 shutil.copytree(self.GIT_DIR, "source_dir_diff")
1567 # create first backup
1568 deltatar.create_full_backup(
1569 source_path="source_dir",
1570 backup_path="backup_dir")
1572 prev_index_filename = deltatar.index_name_func(is_full=True)
1573 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1575 # alter the source_dir randomly
1576 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1578 for path in source_it:
1579 # if path doesn't exist (might have previously removed) ignore it.
1580 # also ignore it (i.e. do not change it) 70% of the time
1581 if not os.path.exists(path) or random.random() < 0.7:
1585 if os.path.isdir(path):
1590 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1593 # first restore initial backup in target_dir
1594 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1595 tar_path = os.path.join("backup_dir", tar_filename)
1596 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1598 # and check that target_dir equals to source_dir (which is the same as
1599 # self.GIT_DIR initially)
1600 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1602 # then apply diff backup in target_dir
1603 index_filename = deltatar.index_name_func(is_full=False)
1604 index_path = os.path.join("backup_dir2", index_filename)
1605 deltatar.restore_backup("target_dir",
1606 backup_indexes_paths=[index_path, prev_index_path])
1608 # and check that target_dir equals to source_dir_diff (the randomly
1609 # altered self.GIT_DIR directory)
1610 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1612 # then delete target_dir and apply diff backup from zero and check again
1613 shutil.rmtree("target_dir")
1614 deltatar.restore_backup("target_dir",
1615 backup_indexes_paths=[index_path, prev_index_path])
1617 # and check that target_dir equals to source_dir_diff (the randomly
1618 # altered self.GIT_DIR directory)
1619 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1621 def test_restore_from_index_diff_backup3_multivol(self):
1623 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1624 diff backup, then restores the diff backup with the full backup as a
1627 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1628 raise SkipTest('this test only works for uncompressed '
1629 'or concat compressed modes')
1631 password, paramversion = self.ENCRYPTION or (None, None)
1632 deltatar = DeltaTar(mode=self.MODE, password=password,
1633 crypto_paramversion=paramversion,
1634 logger=self.consoleLogger)
1636 shutil.rmtree("source_dir")
1637 shutil.copytree(self.GIT_DIR, "source_dir")
1638 shutil.copytree(self.GIT_DIR, "source_dir_diff")
1640 # create first backup
1641 deltatar.create_full_backup(
1642 source_path="source_dir",
1643 backup_path="backup_dir",
1646 prev_index_filename = deltatar.index_name_func(is_full=True)
1647 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1649 # alter the source_dir randomly
1650 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1652 for path in source_it:
1653 # if path doesn't exist (might have previously removed) ignore it.
1654 # also ignore it (i.e. do not change it) 70% of the time
1655 if not os.path.exists(path) or random.random() < 0.7:
1659 if os.path.isdir(path):
1664 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1665 prev_index_path, max_volume_size=1)
1667 # first restore initial backup in target_dir
1668 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1669 tar_path = os.path.join("backup_dir", tar_filename)
1670 if self.FSTEST is not None:
1671 return # the below will fail in stat checks, but that is expected
1672 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1674 # and check that target_dir equals to source_dir (which is the same as
1675 # self.GIT_DIR initially)
1676 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1678 # then apply diff backup in target_dir
1679 index_filename = deltatar.index_name_func(is_full=False)
1680 index_path = os.path.join("backup_dir2", index_filename)
1681 deltatar.restore_backup("target_dir",
1682 backup_indexes_paths=[index_path, prev_index_path])
1684 # and check that target_dir equals to source_dir_diff (the randomly
1685 # altered self.GIT_DIR directory)
1686 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1688 # then delete target_dir and apply diff backup from zero and check again
1689 shutil.rmtree("target_dir")
1690 deltatar.restore_backup("target_dir",
1691 backup_indexes_paths=[index_path, prev_index_path])
1693 # and check that target_dir equals to source_dir_diff (the randomly
1694 # altered self.GIT_DIR directory)
1695 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1697 def check_equal_dirs(self, path1, path2, deltatar):
1699 compare the two directories source_dir and target_dir and check
1702 source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
1703 source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
1704 target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
1705 target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1708 sitem = next(source_it)
1709 titem = next(target_it)
1710 except StopIteration:
1712 titem = next(target_it)
1713 raise Exception("iterators do not stop at the same time")
1714 except StopIteration:
1717 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
1718 except Exception as e:
1719 print("SITEM: " + str(sitem))
1720 print("TITEM: " + str(titem))
1723 def test_create_no_symlinks(self):
1725 Creates a full backup from different varieties of symlinks. The
1726 extracted archive may not contain any symlinks but the file contents
1729 os.system("rm -rf source_dir")
1730 os.makedirs("source_dir/symlinks")
1731 fd = os.open("source_dir/symlinks/valid_linkname",
1732 os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1733 os.write(fd, b"valid link target for symlink tests; please ignore\n")
1735 # first one is good, the rest points nowhere
1736 self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1737 self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1738 self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1739 self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
1740 password, paramversion = self.ENCRYPTION or (None, None)
1741 deltatar = DeltaTar(mode=self.MODE, password=password,
1742 crypto_paramversion=paramversion,
1743 logger=self.consoleLogger)
1745 # create first backup
1746 deltatar.create_full_backup(source_path="source_dir",
1747 backup_path="backup_dir")
1749 assert os.path.exists("backup_dir")
1750 shutil.rmtree("source_dir")
1751 assert not os.path.exists("source_dir")
1753 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1754 tar_path = os.path.join("backup_dir", tar_filename)
1756 deltatar.restore_backup(target_path="source_dir",
1757 backup_tar_path=tar_path)
1759 for _r, _ds, fs in os.walk("source_dir/symlinks"):
1760 # only the valid link plus the linked file may be found in the
1764 # the link must have been resolved and file contents must match
1766 assert not os.path.islink(f)
1767 with open("source_dir/symlinks/valid_linkname") as a:
1768 with open("source_dir/symlinks/whatever") as b:
1769 assert a.read() == b.read()
1771 def test_restore_with_symlinks(self):
1773 Creates a full backup containing different varieties of symlinks. All
1774 of them must be filtered out.
1776 password, paramversion = self.ENCRYPTION or (None, None)
1777 deltatar = DeltaTar(mode=self.MODE, password=password,
1778 crypto_paramversion=paramversion,
1779 logger=self.consoleLogger)
1781 # create first backup
1782 deltatar.create_full_backup(source_path="source_dir",
1783 backup_path="backup_dir")
1785 assert os.path.exists("backup_dir")
1786 shutil.rmtree("source_dir")
1788 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1789 tar_path = os.path.join("backup_dir", tar_filename)
1791 # add symlinks to existing archive
1793 def add_symlink (a, name, dst):
1794 l = tarfile.TarInfo("snapshot://%s" % name)
1795 l.type = tarfile.SYMTYPE
1801 with tarfile.open(tar_path,mode="a") as a:
1803 [ add_symlink(a, "symlinks/foo", "internal-file")
1804 , add_symlink(a, "symlinks/bar", "/absolute/path")
1805 , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1806 except tarfile.ReadError as e:
1807 if self.MODE == '#' or self.MODE.endswith ("gz"):
1811 except ValueError as e:
1812 if self.MODE.startswith ('#'):
1817 deltatar.restore_backup(target_path="source_dir",
1818 backup_tar_path=tar_path)
1820 # check what happened to our symlinks
1821 for name in checkme:
1822 fullpath = os.path.join("source_dir", name)
1823 assert not os.path.exists(fullpath)
1825 def test_restore_malicious_symlinks(self):
1827 Creates a full backup containing a symlink and a file of the same name.
1828 This simulates a symlink attack with a link pointing to some external
1829 path that is abused to write outside the extraction prefix.
1831 password, paramversion = self.ENCRYPTION or (None, None)
1832 deltatar = DeltaTar(mode=self.MODE, password=password,
1833 crypto_paramversion=paramversion,
1834 logger=self.consoleLogger)
1836 # create first backup
1837 deltatar.create_full_backup(source_path="source_dir",
1838 backup_path="backup_dir")
1840 assert os.path.exists("backup_dir")
1841 shutil.rmtree("source_dir")
1843 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1844 tar_path = os.path.join("backup_dir", tar_filename)
1846 # add symlinks to existing archive
1848 def add_symlink (a, name, dst):
1849 l = tarfile.TarInfo("snapshot://%s" % name)
1850 l.type = tarfile.SYMTYPE
1854 def add_file (a, name):
1855 f = tarfile.TarInfo("snapshot://%s" % name)
1856 f.type = tarfile.REGTYPE
1859 testpath = "symlinks/pernicious-link"
1860 testdst = "/tmp/does/not/exist"
1863 with tarfile.open(tar_path, mode="a") as a:
1864 add_symlink(a, testpath, testdst)
1865 add_symlink(a, testpath, testdst+"X")
1866 add_symlink(a, testpath, testdst+"XXX")
1867 add_file(a, testpath)
1868 except tarfile.ReadError as e:
1869 if self.MODE == '#' or self.MODE.endswith ("gz"):
1873 except ValueError as e:
1874 if self.MODE.startswith ('#'):
1875 pass # O_APPEND of concat archives not feasible
1879 deltatar.restore_backup(target_path="source_dir",
1880 backup_tar_path=tar_path)
1882 # check whether the link was extracted; deltatar seems to only ever
1883 # retrieve the first item it finds for a given path which in the case
1884 # at hand is a symlink to some non-existent path
1885 fullpath = os.path.join("source_dir", testpath)
1886 assert not os.path.exists(fullpath)
1889 def fsapi_access_true (self):
1891 Chicanery for testing improper use of the *os* module.
1893 def yes (*_a, **_ka): return True
1894 self.FSAPI_SAVED.append (("access", getattr (os, "access")))
1895 setattr (os, "access", yes)
1898 class DeltaTar2Test(DeltaTarTest):
1900 Same as DeltaTar but with specific ":" mode
1905 class DeltaTarStreamTest(DeltaTarTest):
1907 Same as DeltaTar but with specific uncompressed stream mode
1912 class DeltaTarGzipTest(DeltaTarTest):
1914 Same as DeltaTar but with specific gzip mode
1917 MODE_COMPRESSES = True
1920 class DeltaTarGzipStreamTest(DeltaTarTest):
1922 Same as DeltaTar but with specific gzip stream mode
1925 MODE_COMPRESSES = True
1928 @skip('Bz2 tests are too slow..')
1929 class DeltaTarBz2Test(DeltaTarTest):
1931 Same as DeltaTar but with specific bz2 mode
1934 MODE_COMPRESSES = True
1937 @skip('Bz2 tests are too slow..')
1938 class DeltaTarBz2StreamTest(DeltaTarTest):
1940 Same as DeltaTar but with specific bz2 stream mode
1943 MODE_COMPRESSES = True
1946 class DeltaTarGzipConcatTest(DeltaTarTest):
1948 Same as DeltaTar but with specific gzip concat stream mode
1951 MODE_COMPRESSES = True
1954 class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
1956 Same as DeltaTar but with specific gzip aes128 concat stream mode
1959 ENCRYPTION = ('some magic key', 1)
1960 MODE_COMPRESSES = True
1963 class DeltaTarAes128ConcatTest(DeltaTarTest):
1965 Same as DeltaTar but with specific aes128 concat stream mode
1968 ENCRYPTION = ('some magic key', 1)
1971 class DeltaTarFilesystemHandlingTestBase(BaseTest):
1973 Mess with filesystem APIs.
1975 FSTEST = fsapi_access_true
1977 class DeltaTarFSGzipTest(DeltaTarFilesystemHandlingTestBase,
1981 class DeltaTarFSGzipConcatTest(DeltaTarFilesystemHandlingTestBase,
1982 DeltaTarGzipConcatTest):
1985 class DeltaTarFSAes128ConcatTest(DeltaTarFilesystemHandlingTestBase,
1986 DeltaTarAes128ConcatTest):
1989 class DeltaTarFSGzipAes128ConcatTest(DeltaTarFilesystemHandlingTestBase,
1990 DeltaTarGzipAes128ConcatTest):