1 # Copyright (C) 2013 Intra2net AG
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU Lesser General Public License as published
5 # by the Free Software Foundation; either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Lesser General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see
15 # <http://www.gnu.org/licenses/lgpl-3.0.html>
17 # Author: Eduardo Robles Elvira <edulix@wadobo.com>
27 from datetime import datetime
28 from functools import partial
29 from unittest import skip, SkipTest
31 import deltatar.tarfile as tarfile
32 from deltatar.tarfile import TarFile
33 from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
34 from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35 import deltatar.crypto as crypto
37 from . import BaseTest
38 from . import new_volume_handler
40 # Enable warning messages from deltatar. This minimizes the SNR of
41 # test runs, but none of the messages are meaningful in any way.
42 VERBOSE_TEST_OUTPUT = False
44 class DeltaTarTest(BaseTest):
49 MODE_COMPRESSES = False
51 ENCRYPTION = None # (password : str, paramversion : int) option
62 self.pwd = os.getcwd()
63 os.system('rm -rf target_dir source_dir* backup_dir* huge')
64 os.makedirs('source_dir/test/test2')
66 self.hash["source_dir/test/test2"] = ''
67 self.hash["source_dir/big"] = self.create_file("source_dir/big", 50000)
68 self.hash["source_dir/small"] = self.create_file("source_dir/small", 100)
69 self.hash["source_dir/test/huge"] = self.create_file("source_dir/test/huge", 700000)
70 self.hash["source_dir/test/huge2"] = self.create_file("source_dir/test/huge2", 800000)
72 self.consoleLogger = None
73 if VERBOSE_TEST_OUTPUT is True:
74 self.consoleLogger = logging.StreamHandler()
75 self.consoleLogger.setLevel(logging.DEBUG)
77 if not os.path.isdir(self.GIT_DIR):
78 # Not running inside git tree, take our
79 # own testing directory as source.
80 self.GIT_DIR = 'testing'
82 if not os.path.isdir(self.GIT_DIR):
83 raise Exception('No input directory found: ' + self.GIT_DIR)
85 if self.FSTEST is not None:
90 Remove temporary files created by unit tests and restore the API
93 for att, val in self.FSAPI_SAVED:
94 setattr (os, att, val)
96 os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
97 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
98 ("I am fully aware that this will void my warranty.")
100 def test_restore_simple_full_backup(self):
102 Creates a full backup without any filtering and restores it.
104 password, paramversion = self.ENCRYPTION or (None, None)
105 deltatar = DeltaTar(mode=self.MODE, password=password,
106 crypto_paramversion=paramversion,
107 logger=self.consoleLogger)
109 # create first backup
110 deltatar.create_full_backup(
111 source_path="source_dir",
112 backup_path="backup_dir")
114 assert os.path.exists("backup_dir")
115 shutil.rmtree("source_dir")
117 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
118 tar_path = os.path.join("backup_dir", tar_filename)
120 deltatar.restore_backup(target_path="source_dir",
121 backup_tar_path=tar_path)
123 for key, value in self.hash.items():
124 assert os.path.exists(key)
126 assert value == self.md5sum(key)
129 def test_create_backup_max_file_length (self):
131 Creates a full backup including one file that exceeds the (purposely
132 lowered) upper bound on GCM encrypted objects. This will yield multiple
133 encrypted objects for one plaintext file.
135 Success is verified by splitting the archive at object boundaries and
138 if self.MODE_COMPRESSES is True:
139 raise SkipTest ("GCM file length test not meaningful with compression.")
140 if self.ENCRYPTION is None:
141 raise SkipTest ("GCM file length applies only to encrypted backups.")
143 new_max = 20000 # cannot be less than tar block size
144 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
145 ("I am fully aware that this will void my warranty.",
148 password, paramversion = self.ENCRYPTION
149 deltatar = DeltaTar (mode=self.MODE, password=password,
150 crypto_paramversion=paramversion,
151 logger=self.consoleLogger)
154 os.makedirs ("source_dir2")
155 for f, s in [("empty" , 0) # 1 tar objects
156 ,("slightly_larger", new_max + 1) # 2
157 ,("twice" , 2 * new_max) # 3
159 f = "source_dir2/%s" % f
160 self.hash [f] = self.create_file (f, s)
162 deltatar.create_full_backup \
163 (source_path="source_dir2", backup_path="backup_dir")
165 assert os.path.exists ("backup_dir")
166 shutil.rmtree ("source_dir2")
168 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
169 backup_path = os.path.join("backup_dir", backup_filename)
171 # split the resulting archive into its constituents without
173 ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
174 "-o backup_dir/split <\'%s\'" % backup_path)
176 assert os.path.exists ("backup_dir/split")
178 dents = os.listdir ("backup_dir/split")
179 assert len (dents) == 6
182 def test_restore_backup_max_file_length (self):
184 Creates a full backup including one file that exceeds the (purposely
185 lowered) upper bound on GCM encrypted objects. This will yield two
186 encrypted objects for one plaintext file.
188 Success is verified by splitting the archive at object boundaries and
191 if self.MODE_COMPRESSES is True:
192 raise SkipTest ("GCM file length test not meaningful with compression.")
193 if self.ENCRYPTION is None:
194 raise SkipTest ("GCM file length applies only to encrypted backups.")
196 new_max = 20000 # cannot be less than tar block size
197 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
198 ("I am fully aware that this will void my warranty.",
201 password, paramversion = self.ENCRYPTION
202 deltatar = DeltaTar (mode=self.MODE, password=password,
203 crypto_paramversion=paramversion,
204 logger=self.consoleLogger)
207 os.makedirs ("source_dir2")
208 for f, s in [("empty" , 0) # 1 tar objects
209 ,("almost_large" , new_max - 1) # 2
210 ,("large" , new_max) # 3
211 ,("slightly_larger", new_max + 1) # 4
212 ,("twice" , 2 * new_max) # 5
213 ,("twice_plus_one" , (2 * new_max) + 1) # 6
215 f = "source_dir2/%s" % f
216 self.hash [f] = self.create_file (f, s)
218 deltatar.create_full_backup \
219 (source_path="source_dir2", backup_path="backup_dir")
221 assert os.path.exists ("backup_dir")
222 shutil.rmtree ("source_dir2")
224 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
225 backup_path = os.path.join("backup_dir", backup_filename)
227 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
228 tar_path = os.path.join("backup_dir", tar_filename)
230 deltatar.restore_backup(target_path="source_dir2",
231 backup_tar_path=tar_path)
233 for key, value in self.hash.items():
234 assert os.path.exists(key)
236 assert value == self.md5sum(key)
239 def test_create_backup_index_max_file_length (self):
241 Creates a full backup with a too large index file for the upper bound
242 of the GCM encryption. Since the index file has a fixed IV file counter
243 of AES_GCM_IV_CNT_INDEX, we expect the crypto layer to abort.
245 60+ GB of (potentially compressed) index file should last for a while...
247 if self.MODE_COMPRESSES is True:
248 raise SkipTest ("GCM file length test not meaningful with compression.")
249 if self.ENCRYPTION is None:
250 raise SkipTest ("GCM file length applies only to encrypted backups.")
253 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
254 ("I am fully aware that this will void my warranty.",
257 password, paramversion = self.ENCRYPTION
258 deltatar = DeltaTar (mode=self.MODE, password=password,
259 crypto_paramversion=paramversion,
260 logger=self.consoleLogger)
263 os.makedirs ("source_dir2")
265 f = "source_dir2/dummy_%rd" % i
266 self.hash [f] = self.create_file (f, i)
268 with self.assertRaises (crypto.InvalidFileCounter):
269 deltatar.create_full_backup \
270 (source_path="source_dir2", backup_path="backup_dir")
271 shutil.rmtree ("source_dir2")
274 def test_check_index_checksum(self):
276 Creates a full backup and checks the index' checksum of files
278 password, paramversion = self.ENCRYPTION or (None, None)
279 deltatar = DeltaTar(mode=self.MODE, password=password,
280 crypto_paramversion=paramversion,
281 logger=self.consoleLogger)
283 # create first backup
284 deltatar.create_full_backup(
285 source_path="source_dir",
286 backup_path="backup_dir")
289 index_filename = deltatar.index_name_func(True)
290 index_path = os.path.join("backup_dir", index_filename)
292 f = open(index_path, 'rb')
300 if b'BEGIN-FILE-LIST' in l:
301 crc = binascii.crc32(l) & 0xFFFFffff
303 elif b'END-FILE-LIST' in l:
304 crc = binascii.crc32(l, crc) & 0xffffffff
306 # next line contains the crc
307 data = json.loads(f.readline().decode("UTF-8"))
308 assert data['type'] == 'file-list-checksum'
309 assert data['checksum'] == crc
313 crc = binascii.crc32(l, crc) & 0xffffffff
317 def test_restore_multivol(self):
319 Creates a full backup without any filtering with multiple volumes and
322 if ':gz' in self.MODE:
323 raise SkipTest('compression information is lost when creating '
324 'multiple volumes with no Stream')
326 password, paramversion = self.ENCRYPTION or (None, None)
327 deltatar = DeltaTar(mode=self.MODE, password=password,
328 crypto_paramversion=paramversion,
329 logger=self.consoleLogger)
332 os.makedirs('source_dir2')
333 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
334 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
336 # create first backup
337 deltatar.create_full_backup(
338 source_path="source_dir2",
339 backup_path="backup_dir",
342 assert os.path.exists("backup_dir")
343 assert os.path.exists(os.path.join("backup_dir",
344 deltatar.volume_name_func("backup_dir", True, 0)))
345 if self.MODE_COMPRESSES:
349 for i_vol in range(n_vols):
350 assert os.path.exists(os.path.join("backup_dir",
351 deltatar.volume_name_func("backup_dir", True, i_vol)))
352 assert not os.path.exists(os.path.join("backup_dir",
353 deltatar.volume_name_func("backup_dir", True, n_vols)))
355 shutil.rmtree("source_dir2")
357 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
358 tar_path = os.path.join("backup_dir", tar_filename)
360 # this should automatically restore all volumes
361 deltatar.restore_backup(target_path="source_dir2",
362 backup_tar_path=tar_path)
364 for key, value in self.hash.items():
365 assert os.path.exists(key)
367 assert value == self.md5sum(key)
369 def test_restore_multivol_split(self):
371 Creates a full backup without any filtering with multiple volumes
372 with big files bigger than the max volume size and
375 if self.MODE.startswith(':') or self.MODE.startswith('|'):
376 raise SkipTest('this test only works for uncompressed '
377 'or concat compressed modes')
379 password, paramversion = self.ENCRYPTION or (None, None)
380 deltatar = DeltaTar(mode=self.MODE, password=password,
381 crypto_paramversion=paramversion,
382 logger=self.consoleLogger)
385 os.makedirs('source_dir2')
386 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 3*1024*1024)
387 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 4*1024*1024)
388 self.hash["source_dir2/huge2"] = self.create_file("source_dir2/huge2", 4*1024*1024)
390 # create first backup
391 deltatar.create_full_backup(
392 source_path="source_dir2",
393 backup_path="backup_dir",
396 assert os.path.exists("backup_dir")
397 assert os.path.exists(os.path.join("backup_dir",
398 deltatar.volume_name_func("backup_dir", True, 0)))
399 if self.MODE_COMPRESSES:
403 for i_vol in range(n_vols):
404 assert os.path.exists(os.path.join("backup_dir",
405 deltatar.volume_name_func("backup_dir", True, i_vol)))
406 assert not os.path.exists(os.path.join("backup_dir",
407 deltatar.volume_name_func("backup_dir", True, n_vols)))
409 shutil.rmtree("source_dir2")
411 index_filename = deltatar.index_name_func(True)
412 index_path = os.path.join("backup_dir", index_filename)
414 deltatar.restore_backup(target_path="source_dir2",
415 backup_indexes_paths=[index_path])
417 for key, value in self.hash.items():
418 assert os.path.exists(key)
420 assert value == self.md5sum(key)
423 def test_full_backup_index_extra_data(self):
425 Tests that the index file for a full backup can store extra_data and
426 that this data can be retrieved.
428 password, paramversion = self.ENCRYPTION or (None, None)
429 deltatar = DeltaTar(mode=self.MODE, password=password,
430 crypto_paramversion=paramversion,
431 logger=self.consoleLogger)
435 otra_cosa=[1, "lista"],
436 y_otra=dict(bola=1.1)
439 deltatar.create_full_backup(
440 source_path="source_dir",
441 backup_path="backup_dir",
442 extra_data=extra_data)
444 index_filename = deltatar.index_name_func(is_full=True)
445 index_path = os.path.join("backup_dir", index_filename)
447 # iterate_index_path retrieves extra_data, and thus we can then compare
448 index_it = deltatar.iterate_index_path(index_path)
449 self.assertEqual(index_it.extra_data, extra_data)
452 def test_diff_backup_index_extra_data(self):
454 Tests that the index file for a diff backup can store extra_data and
455 that this data can be retrieved.
457 password, paramversion = self.ENCRYPTION or (None, None)
458 deltatar = DeltaTar(mode=self.MODE, password=password,
459 crypto_paramversion=paramversion,
460 logger=self.consoleLogger)
464 otra_cosa=[1, "lista"],
465 y_otra=dict(bola=1.1)
468 deltatar.create_full_backup(
469 source_path="source_dir",
470 backup_path="backup_dir")
473 prev_index_filename = deltatar.index_name_func(is_full=True)
474 prev_index_path = os.path.join("backup_dir", prev_index_filename)
476 # create empty diff backup
477 deltatar.create_diff_backup("source_dir", "backup_dir2",
478 prev_index_path, extra_data=extra_data)
480 index_filename = deltatar.index_name_func(is_full=False)
481 index_path = os.path.join("backup_dir2", index_filename)
483 # iterate_index_path retrieves extra_data, and thus we can then compare
484 index_it = deltatar.iterate_index_path(index_path)
485 self.assertEqual(index_it.extra_data, extra_data)
487 def test_restore_multivol2(self):
489 Creates a full backup without any filtering with multiple volumes and
492 password, paramversion = self.ENCRYPTION or (None, None)
493 deltatar = DeltaTar(mode=self.MODE, password=password,
494 crypto_paramversion=paramversion,
495 logger=self.consoleLogger)
497 shutil.copytree(self.GIT_DIR, "source_dir2")
499 # create first backup
500 deltatar.create_full_backup(
501 source_path="source_dir2",
502 backup_path="backup_dir",
505 assert os.path.exists("backup_dir")
506 assert os.path.exists(os.path.join("backup_dir",
507 deltatar.volume_name_func("backup_dir", True, 0)))
509 shutil.rmtree("source_dir2")
511 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
512 tar_path = os.path.join("backup_dir", tar_filename)
514 # this should automatically restore all volumes
515 deltatar.restore_backup(target_path="source_dir2",
516 backup_tar_path=tar_path)
518 self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
520 def test_restore_multivol_manual_from_index(self):
522 Creates a full backup without any filtering with multiple volumes and
525 # this test only works for uncompressed or concat compressed modes
526 if self.MODE.startswith(':') or self.MODE.startswith('|'):
527 raise SkipTest('this test only works for uncompressed '
528 'or concat compressed modes')
530 password, paramversion = self.ENCRYPTION or (None, None)
531 deltatar = DeltaTar(mode=self.MODE, password=password,
532 crypto_paramversion=paramversion,
533 logger=self.consoleLogger)
536 os.makedirs('source_dir2')
537 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
538 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
540 # create first backup
541 deltatar.create_full_backup(
542 source_path="source_dir2",
543 backup_path="backup_dir",
546 assert os.path.exists("backup_dir")
547 assert os.path.exists(os.path.join("backup_dir",
548 deltatar.volume_name_func("backup_dir", True, 0)))
549 if self.MODE_COMPRESSES:
553 for i_vol in range(n_vols):
554 assert os.path.exists(os.path.join("backup_dir",
555 deltatar.volume_name_func("backup_dir", True, i_vol)))
556 assert not os.path.exists(os.path.join("backup_dir",
557 deltatar.volume_name_func("backup_dir", True, n_vols)))
559 shutil.rmtree("source_dir2")
561 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
562 tar_path = os.path.join("backup_dir", tar_filename)
564 index_filename = deltatar.index_name_func(True)
565 index_path = os.path.join("backup_dir", index_filename)
567 # this should automatically restore the huge file
568 f = deltatar.open_auxiliary_file(index_path, 'r')
574 data = json.loads(l.decode('UTF-8'))
575 if data.get('type', '') == 'file' and\
576 deltatar.unprefixed(data['path']) == "huge":
577 offset = data['offset']
580 assert offset is not None
582 fo = open(tar_path, 'rb')
584 def new_volume_handler(mode, tarobj, base_name, volume_number):
585 suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
586 if self.ENCRYPTION is not None:
587 # deltatar module is shadowed here
588 suf += "." + deltatar_PDTCRYPT_EXTENSION
589 tarobj.open_volume(datetime.now().strftime(
590 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
591 new_volume_handler = partial(new_volume_handler, self.MODE)
594 if self.ENCRYPTION is not None:
595 crypto_ctx = crypto.Decrypt (password)
597 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
598 new_volume_handler=new_volume_handler,
599 encryption=crypto_ctx)
601 member = tarobj.next()
602 member.path = deltatar.unprefixed(member.path)
603 member.name = deltatar.unprefixed(member.name)
604 tarobj.extract(member)
607 assert self.hash['source_dir2/huge'] == self.md5sum('huge')
612 def test_restore_manual_from_index_twice (self):
614 Creates a full backup and restore the same file twice. This *must* fail
615 when encryption is active.
617 Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
618 backwards within the same file. This prevents the encryption layer from
619 exploding due to a reused IV in an overall valid archive.
621 This test anticipates possible future mistakes since it’s entirely
622 feasible to implement backward seeks for *_Stream* with concat mode.
624 # this test only works for uncompressed or concat compressed modes
625 if self.MODE.startswith("|") or self.MODE_COMPRESSES:
626 raise SkipTest("this test only works for uncompressed "
627 "or concat compressed modes")
629 password, paramversion = self.ENCRYPTION or (None, None)
630 deltatar = DeltaTar(mode=self.MODE, password=password,
631 crypto_paramversion=paramversion,
632 logger=self.consoleLogger)
635 os.makedirs("source_dir2")
636 self.hash["source_dir2/samefile"] = \
637 self.create_file("source_dir2/samefile", 1 * 1024)
639 # create first backup
640 deltatar.create_full_backup(
641 source_path="source_dir2",
642 backup_path="backup_dir")
644 assert os.path.exists("backup_dir")
645 assert os.path.exists(os.path.join("backup_dir",
646 deltatar.volume_name_func("backup_dir", True, 0)))
648 shutil.rmtree("source_dir2")
650 tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
651 tar_path = os.path.join("backup_dir", tar_filename)
653 index_filename = deltatar.index_name_func(True)
654 index_path = os.path.join("backup_dir", index_filename)
656 f = deltatar.open_auxiliary_file(index_path, "r")
662 data = json.loads(l.decode("UTF-8"))
663 if data.get("type", "") == "file" and\
664 deltatar.unprefixed(data["path"]) == "samefile":
665 offset = data["offset"]
668 assert offset is not None
670 fo = open(tar_path, "rb")
674 if self.ENCRYPTION is not None:
675 crypto_ctx = crypto.Decrypt (password)
677 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
678 encryption=crypto_ctx)
679 member = tarobj.next()
680 member.path = deltatar.unprefixed(member.path)
681 member.name = deltatar.unprefixed(member.name)
684 tarobj.extract(member)
685 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
689 tarobj.extract(member)
690 except tarfile.StreamError:
691 if crypto_ctx is not None:
692 pass # good: seeking backwards not allowed
697 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
699 os.unlink("samefile")
702 def test_restore_from_index(self):
704 Restores a full backup using an index file.
706 if self.MODE.startswith(':') or self.MODE.startswith('|'):
707 raise SkipTest('this test only works for uncompressed '
708 'or concat compressed modes')
710 password, paramversion = self.ENCRYPTION or (None, None)
711 deltatar = DeltaTar(mode=self.MODE, password=password,
712 crypto_paramversion=paramversion,
713 logger=self.consoleLogger)
715 # create first backup
716 deltatar.create_full_backup(
717 source_path="source_dir",
718 backup_path="backup_dir")
720 shutil.rmtree("source_dir")
722 # this should automatically restore all volumes
723 index_filename = deltatar.index_name_func(True)
724 index_path = os.path.join("backup_dir", index_filename)
726 deltatar.restore_backup(target_path="source_dir",
727 backup_indexes_paths=[index_path])
729 for key, value in self.hash.items():
730 assert os.path.exists(key)
732 assert value == self.md5sum(key)
734 def test_restore_multivol_from_index(self):
736 Restores a full multivolume backup using an index file.
738 if self.MODE.startswith(':') or self.MODE.startswith('|'):
739 raise SkipTest('this test only works for uncompressed '
740 'or concat compressed modes')
742 password, paramversion = self.ENCRYPTION or (None, None)
743 deltatar = DeltaTar(mode=self.MODE, password=password,
744 crypto_paramversion=paramversion,
745 logger=self.consoleLogger)
747 # create first backup
748 deltatar.create_full_backup(
749 source_path="source_dir",
750 backup_path="backup_dir",
753 shutil.rmtree("source_dir")
755 # this should automatically restore all volumes
756 index_filename = deltatar.index_name_func(True)
757 index_path = os.path.join("backup_dir", index_filename)
759 deltatar.restore_backup(target_path="source_dir",
760 backup_indexes_paths=[index_path])
762 for key, value in self.hash.items():
763 assert os.path.exists(key)
765 assert value == self.md5sum(key)
767 def test_create_basic_filtering(self):
769 Tests create backup basic filtering.
771 password, paramversion = self.ENCRYPTION or (None, None)
772 deltatar = DeltaTar(mode=self.MODE, password=password,
773 crypto_paramversion=paramversion,
774 logger=self.consoleLogger,
775 included_files=["test", "small"],
776 excluded_files=["test/huge"])
778 # create first backup
779 deltatar.create_full_backup(
780 source_path="source_dir",
781 backup_path="backup_dir")
783 assert os.path.exists("backup_dir")
784 shutil.rmtree("source_dir")
786 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
787 tar_path = os.path.join("backup_dir", tar_filename)
789 deltatar.restore_backup(target_path="source_dir",
790 backup_tar_path=tar_path)
792 assert os.path.exists("source_dir/small")
793 assert os.path.exists("source_dir/test")
794 assert os.path.exists("source_dir/test/huge2")
795 assert os.path.exists("source_dir/test/test2")
797 assert not os.path.exists("source_dir/test/huge")
798 assert not os.path.exists("source_dir/big")
800 def test_create_filter_func(self):
802 Tests create backup basic filtering.
805 def filter_func(visited_paths, path):
806 if path not in visited_paths:
807 visited_paths.append(path)
810 filter_func = partial(filter_func, visited_paths)
812 password, paramversion = self.ENCRYPTION or (None, None)
813 deltatar = DeltaTar(mode=self.MODE, password=password,
814 crypto_paramversion=paramversion,
815 logger=self.consoleLogger,
816 included_files=["test", "small"],
817 excluded_files=["test/huge"],
818 filter_func=filter_func)
820 # create first backup
821 deltatar.create_full_backup(
822 source_path="source_dir",
823 backup_path="backup_dir")
825 assert os.path.exists("backup_dir")
826 shutil.rmtree("source_dir")
828 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
829 tar_path = os.path.join("backup_dir", tar_filename)
831 deltatar.restore_backup(target_path="source_dir",
832 backup_tar_path=tar_path)
833 assert set(visited_paths) == set([
840 def test_create_filter_out_func(self):
842 Tests create backup basic filtering.
845 def filter_func(visited_paths, path):
847 Filter out everything
849 if path not in visited_paths:
850 visited_paths.append(path)
853 filter_func = partial(filter_func, visited_paths)
855 password, paramversion = self.ENCRYPTION or (None, None)
856 deltatar = DeltaTar(mode=self.MODE, password=password,
857 crypto_paramversion=paramversion,
858 logger=self.consoleLogger,
859 included_files=["test", "small"],
860 excluded_files=["test/huge"],
861 filter_func=filter_func)
863 # create first backup
864 deltatar.create_full_backup(
865 source_path="source_dir",
866 backup_path="backup_dir")
868 assert os.path.exists("backup_dir")
869 shutil.rmtree("source_dir")
871 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
872 tar_path = os.path.join("backup_dir", tar_filename)
874 deltatar.restore_backup(target_path="source_dir",
875 backup_tar_path=tar_path)
876 assert set(visited_paths) == set([
881 # check that effectively no file was backed up
882 assert not os.path.exists("source_dir/small")
883 assert not os.path.exists("source_dir/big")
884 assert not os.path.exists("source_dir/test")
886 def test_restore_index_basic_filtering(self):
888 Creates a backup, and then filter when doing the index based restore.
890 if self.MODE.startswith(':') or self.MODE.startswith('|'):
891 raise SkipTest('this test only works for uncompressed '
892 'or concat compressed modes')
894 password, paramversion = self.ENCRYPTION or (None, None)
895 deltatar = DeltaTar(mode=self.MODE, password=password,
896 crypto_paramversion=paramversion,
897 logger=self.consoleLogger)
899 # create first backup
900 deltatar.create_full_backup(
901 source_path="source_dir",
902 backup_path="backup_dir")
904 assert os.path.exists("backup_dir")
905 shutil.rmtree("source_dir")
907 index_filename = deltatar.index_name_func(True)
908 index_path = os.path.join("backup_dir", index_filename)
910 deltatar.included_files = ["test", "small"]
911 deltatar.excluded_files = ["test/huge"]
912 deltatar.restore_backup(target_path="source_dir",
913 backup_indexes_paths=[index_path])
915 assert os.path.exists("source_dir/small")
916 assert os.path.exists("source_dir/test")
917 assert os.path.exists("source_dir/test/huge2")
918 assert os.path.exists("source_dir/test/test2")
920 assert not os.path.exists("source_dir/test/huge")
921 assert not os.path.exists("source_dir/big")
923 def test_restore_index_filter_func(self):
925 Creates a backup, and then filter when doing the index based restore,
926 using the filter function.
928 if self.MODE.startswith(':') or self.MODE.startswith('|'):
929 raise SkipTest('this test only works for uncompressed '
930 'or concat compressed modes')
933 def filter_func(visited_paths, path):
934 if path not in visited_paths:
935 visited_paths.append(path)
938 filter_func = partial(filter_func, visited_paths)
940 password, paramversion = self.ENCRYPTION or (None, None)
941 deltatar = DeltaTar(mode=self.MODE, password=password,
942 crypto_paramversion=paramversion,
943 logger=self.consoleLogger)
945 # create first backup
946 deltatar.create_full_backup(
947 source_path="source_dir",
948 backup_path="backup_dir")
950 assert os.path.exists("backup_dir")
951 shutil.rmtree("source_dir")
953 index_filename = deltatar.index_name_func(True)
954 index_path = os.path.join("backup_dir", index_filename)
956 deltatar.included_files = ["test", "small"]
957 deltatar.excluded_files = ["test/huge"]
958 deltatar.filter_func = filter_func
959 deltatar.restore_backup(target_path="source_dir",
960 backup_indexes_paths=[index_path])
962 assert set(visited_paths) == set([
969 def test_restore_tar_basic_filtering(self):
971 Creates a backup, and then filter when doing the tar based restore.
973 password, paramversion = self.ENCRYPTION or (None, None)
974 deltatar = DeltaTar(mode=self.MODE, password=password,
975 crypto_paramversion=paramversion,
976 logger=self.consoleLogger)
978 # create first backup
979 deltatar.create_full_backup(
980 source_path="source_dir",
981 backup_path="backup_dir")
983 assert os.path.exists("backup_dir")
984 shutil.rmtree("source_dir")
986 deltatar.included_files = ["test", "small"]
987 deltatar.excluded_files = ["test/huge"]
989 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
990 tar_path = os.path.join("backup_dir", tar_filename)
992 deltatar.restore_backup(target_path="source_dir",
993 backup_tar_path=tar_path)
995 assert os.path.exists("source_dir/small")
996 assert os.path.exists("source_dir/test")
997 assert os.path.exists("source_dir/test/huge2")
998 assert os.path.exists("source_dir/test/test2")
1000 assert not os.path.exists("source_dir/test/huge")
1001 assert not os.path.exists("source_dir/big")
1003 def test_restore_tar_filter_func(self):
1005 Creates a backup, and then filter when doing the tar based restore,
1006 using the filter function.
1009 def filter_func(visited_paths, path):
1010 if path not in visited_paths:
1011 visited_paths.append(path)
1014 filter_func = partial(filter_func, visited_paths)
1016 password, paramversion = self.ENCRYPTION or (None, None)
1017 deltatar = DeltaTar(mode=self.MODE, password=password,
1018 crypto_paramversion=paramversion,
1019 logger=self.consoleLogger)
1021 # create first backup
1022 deltatar.create_full_backup(
1023 source_path="source_dir",
1024 backup_path="backup_dir")
1026 assert os.path.exists("backup_dir")
1027 shutil.rmtree("source_dir")
1029 index_filename = deltatar.index_name_func(True)
1030 index_path = os.path.join("backup_dir", index_filename)
1032 deltatar.included_files = ["test", "small"]
1033 deltatar.excluded_files = ["test/huge"]
1034 deltatar.filter_func = filter_func
1036 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1037 tar_path = os.path.join("backup_dir", tar_filename)
1039 deltatar.restore_backup(target_path="source_dir",
1040 backup_tar_path=tar_path)
1041 assert set(visited_paths) == set([
1048 def test_filter_path_regexp(self):
1050 Test specifically the deltatar.filter_path function with regular
1054 re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1055 re.compile('^yes$'),
1059 re.compile('^testing/in_the'),
1061 deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1062 excluded_files=excluded_files)
1064 # assert valid and invalid paths
1065 assert deltatar.filter_path('test/hola')
1066 assert deltatar.filter_path('test/hola/any/thing')
1067 assert deltatar.filter_path('test/caracola/caracolero')
1068 assert deltatar.filter_path('test/caracola/caracolero/yeah')
1069 assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1070 assert deltatar.filter_path('yes')
1071 assert deltatar.filter_path('testing')
1072 assert deltatar.filter_path('testing/yes')
1073 assert deltatar.filter_path('testing/in_th')
1075 assert not deltatar.filter_path('something')
1076 assert not deltatar.filter_path('other/thing')
1077 assert not deltatar.filter_path('test_ing')
1078 assert not deltatar.filter_path('test/hola_lala')
1079 assert not deltatar.filter_path('test/agur')
1080 assert not deltatar.filter_path('testing_something')
1081 assert not deltatar.filter_path('yeso')
1082 assert not deltatar.filter_path('yes/o')
1083 assert not deltatar.filter_path('yes_o')
1084 assert not deltatar.filter_path('testing/in_the')
1085 assert not deltatar.filter_path('testing/in_the_field')
1086 assert not deltatar.filter_path('testing/in_the/field')
1088 def test_filter_path_parent(self):
1090 Test specifically the deltatar.filter_path function for parent matching
1093 'testing/path/to/some/thing'
1095 deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1097 # assert valid and invalid paths
1098 assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1099 assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1100 assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1101 assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1102 assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1103 assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1104 assert deltatar.filter_path('testing/something/else') == NO_MATCH
1106 def test_parent_matching_simple_full_backup(self):
1108 Create a full backup using parent matching
1114 password, paramversion = self.ENCRYPTION or (None, None)
1115 deltatar = DeltaTar(mode=self.MODE, password=password,
1116 crypto_paramversion=paramversion,
1117 logger=self.consoleLogger,
1118 included_files=included_files)
1120 # create first backup
1121 deltatar.create_full_backup(
1122 source_path="source_dir",
1123 backup_path="backup_dir")
1125 assert os.path.exists("backup_dir")
1126 shutil.rmtree("source_dir")
1128 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1129 tar_path = os.path.join("backup_dir", tar_filename)
1131 deltatar = DeltaTar(mode=self.MODE, password=password,
1132 logger=self.consoleLogger)
1133 deltatar.restore_backup(target_path="source_dir",
1134 backup_tar_path=tar_path)
1136 assert os.path.exists('source_dir/test/huge2')
1137 assert os.path.exists('source_dir/test/')
1138 assert not os.path.exists('source_dir/test/huge')
1139 assert not os.path.exists('source_dir/big')
1140 assert not os.path.exists('source_dir/small')
1142 def test_parent_matching_simple_full_backup_restore(self):
1144 Create a full backup and restores it using parent matching
1150 password, paramversion = self.ENCRYPTION or (None, None)
1151 deltatar = DeltaTar(mode=self.MODE, password=password,
1152 crypto_paramversion=paramversion,
1153 logger=self.consoleLogger)
1155 # create first backup
1156 deltatar.create_full_backup(
1157 source_path="source_dir",
1158 backup_path="backup_dir")
1160 assert os.path.exists("backup_dir")
1161 shutil.rmtree("source_dir")
1163 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1164 tar_path = os.path.join("backup_dir", tar_filename)
1166 deltatar = DeltaTar(mode=self.MODE, password=password,
1167 logger=self.consoleLogger,
1168 included_files=included_files)
1169 deltatar.restore_backup(target_path="source_dir",
1170 backup_tar_path=tar_path)
1172 assert os.path.exists('source_dir/test/huge2')
1173 assert os.path.exists('source_dir/test/')
1174 assert not os.path.exists('source_dir/test/huge')
1175 assert not os.path.exists('source_dir/big')
1176 assert not os.path.exists('source_dir/small')
1178 def test_parent_matching_index_full_backup_restore(self):
1180 Create a full backup and restores it using parent matching
1186 password, paramversion = self.ENCRYPTION or (None, None)
1187 deltatar = DeltaTar(mode=self.MODE, password=password,
1188 crypto_paramversion=paramversion,
1189 logger=self.consoleLogger)
1191 # create first backup
1192 deltatar.create_full_backup(
1193 source_path="source_dir",
1194 backup_path="backup_dir")
1196 assert os.path.exists("backup_dir")
1197 shutil.rmtree("source_dir")
1199 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1200 tar_path = os.path.join("backup_dir", tar_filename)
1202 deltatar = DeltaTar(mode=self.MODE, password=password,
1203 logger=self.consoleLogger,
1204 included_files=included_files)
1205 deltatar.restore_backup(target_path="source_dir",
1206 backup_tar_path=tar_path)
1208 assert os.path.exists('source_dir/test/huge2')
1209 assert os.path.exists('source_dir/test/')
1210 assert not os.path.exists('source_dir/test/huge')
1211 assert not os.path.exists('source_dir/big')
1212 assert not os.path.exists('source_dir/small')
1214 def test_collate_iterators(self):
1216 Tests the collate iterators functionality with two exact directories,
1217 using an index iterator from a backup and the exact same source dir.
1219 password, paramversion = self.ENCRYPTION or (None, None)
1220 deltatar = DeltaTar(mode=self.MODE, password=password,
1221 crypto_paramversion=paramversion,
1222 logger=self.consoleLogger)
1224 # create first backup
1225 deltatar.create_full_backup(
1226 source_path="source_dir",
1227 backup_path="backup_dir")
1229 assert os.path.exists("backup_dir")
1232 index_filename = deltatar.index_name_func(is_full=True)
1233 index_path = os.path.join(cwd, "backup_dir", index_filename)
1234 index_it = deltatar.iterate_index_path(index_path)
1236 os.chdir('source_dir')
1237 dir_it = deltatar._recursive_walk_dir('.')
1238 path_it = deltatar.jsonize_path_iterator(dir_it)
1241 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1242 assert deltatar._equal_stat_dicts(path1, path2)
1246 def test_collate_iterators_diffdirs(self):
1248 Use the collate iterators functionality with two different directories.
1249 It must behave in an expected way.
1251 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1253 password, paramversion = self.ENCRYPTION or (None, None)
1254 deltatar = DeltaTar(mode=self.MODE, password=password,
1255 crypto_paramversion=paramversion,
1256 logger=self.consoleLogger)
1258 # create first backup
1259 deltatar.create_full_backup(
1260 source_path="source_dir",
1261 backup_path="backup_dir")
1263 assert os.path.exists("backup_dir")
1264 self.hash["source_dir/z"] = self.create_file("source_dir/z", 100)
1267 index_filename = deltatar.index_name_func(is_full=True)
1268 index_path = os.path.join(cwd, "backup_dir", index_filename)
1269 index_it = deltatar.iterate_index_path(index_path)
1271 os.chdir('source_dir')
1272 dir_it = deltatar._recursive_walk_dir('.')
1273 path_it = deltatar.jsonize_path_iterator(dir_it)
1276 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1277 if path2['path'] == 'z':
1280 assert deltatar._equal_stat_dicts(path1, path2)
1284 def test_collate_iterators_diffdirs2(self):
1286 Use the collate iterators functionality with two different directories.
1287 It must behave in an expected way.
1289 password, paramversion = self.ENCRYPTION or (None, None)
1290 deltatar = DeltaTar(mode=self.MODE, password=password,
1291 crypto_paramversion=paramversion,
1292 logger=self.consoleLogger)
1294 # create first backup
1295 deltatar.create_full_backup(
1296 source_path="source_dir",
1297 backup_path="backup_dir")
1299 assert os.path.exists("backup_dir")
1301 # add some new files and directories
1302 os.makedirs('source_dir/bigdir')
1303 self.hash["source_dir/bigdir"] = ""
1304 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1305 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1306 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1309 index_filename = deltatar.index_name_func(is_full=True)
1310 index_path = os.path.join(cwd, "backup_dir", index_filename)
1311 index_it = deltatar.iterate_index_path(index_path)
1313 os.chdir('source_dir')
1314 dir_it = deltatar._recursive_walk_dir('.')
1315 path_it = deltatar.jsonize_path_iterator(dir_it)
1320 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1321 visited_pairs.append(
1322 (deltatar.unprefixed(path1['path']) if path1 else None,
1323 path2['path'] if path2 else None)
1326 assert visited_pairs == [
1329 (u'small', u'small'),
1332 (None, u'bigdir/a'),
1333 (None, u'bigdir/b'),
1334 (u'test/huge', u'test/huge'),
1335 (u'test/huge2', u'test/huge2'),
1336 (u'test/test2', u'test/test2'),
1340 def test_create_empty_diff_backup(self):
1342 Creates an empty (no changes) backup diff
1344 password, paramversion = self.ENCRYPTION or (None, None)
1345 deltatar = DeltaTar(mode=self.MODE, password=password,
1346 crypto_paramversion=paramversion,
1347 logger=self.consoleLogger)
1349 # create first backup
1350 deltatar.create_full_backup(
1351 source_path="source_dir",
1352 backup_path="backup_dir")
1354 prev_index_filename = deltatar.index_name_func(is_full=True)
1355 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1357 deltatar.create_diff_backup("source_dir", "backup_dir2",
1361 index_path = os.path.join("backup_dir2",
1362 deltatar.index_name_func(is_full=False))
1363 index_it = deltatar.iterate_index_path(index_path)
1367 assert i[0]['path'].startswith("list://")
1371 # check the tar file
1372 assert os.path.exists("backup_dir2")
1373 shutil.rmtree("source_dir")
1375 tar_filename = deltatar.volume_name_func('backup_dir2',
1376 is_full=False, volume_number=0)
1377 tar_path = os.path.join("backup_dir2", tar_filename)
1379 # no file restored, because the diff was empty
1380 deltatar.restore_backup(target_path="source_dir",
1381 backup_tar_path=tar_path)
1382 assert len(os.listdir("source_dir")) == 0
1385 def test_create_diff_backup1(self):
1387 Creates a diff backup when there are new files
1389 password, paramversion = self.ENCRYPTION or (None, None)
1390 deltatar = DeltaTar(mode=self.MODE, password=password,
1391 crypto_paramversion=paramversion,
1392 logger=self.consoleLogger)
1394 # create first backup
1395 deltatar.create_full_backup(
1396 source_path="source_dir",
1397 backup_path="backup_dir")
1399 prev_index_filename = deltatar.index_name_func(is_full=True)
1400 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1402 # add some new files and directories
1403 os.makedirs('source_dir/bigdir')
1404 self.hash["source_dir/bigdir"] = ""
1405 os.unlink("source_dir/small")
1406 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1407 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1408 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1410 deltatar.create_diff_backup("source_dir", "backup_dir2",
1414 index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
1415 index_it = deltatar.iterate_index_path(index_path)
1416 l = [i[0]['path'] for i in index_it]
1420 'snapshot://bigdir',
1424 'snapshot://bigdir/a',
1425 'snapshot://bigdir/b',
1427 'list://test/huge2',
1428 'list://test/test2',
1431 # check the tar file
1432 assert os.path.exists("backup_dir2")
1433 shutil.rmtree("source_dir")
1435 # create source_dir with the small file, that will be then deleted by
1436 # the restore_backup
1437 os.mkdir("source_dir")
1438 open("source_dir/small", 'wb').close()
1440 tar_filename = deltatar.volume_name_func('backup_dir2',
1441 is_full=False, volume_number=0)
1442 tar_path = os.path.join("backup_dir2", tar_filename)
1444 # restore the backup, this will create only the new files
1445 deltatar.restore_backup(target_path="source_dir",
1446 backup_tar_path=tar_path)
1447 # the order doesn't matter
1448 assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
1450 def test_restore_from_index_diff_backup(self):
1452 Creates a full backup, modifies some files, creates a diff backup,
1453 then restores the diff backup from zero.
1455 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1456 raise SkipTest('this test only works for uncompressed '
1457 'or concat compressed modes')
1459 password, paramversion = self.ENCRYPTION or (None, None)
1460 deltatar = DeltaTar(mode=self.MODE, password=password,
1461 crypto_paramversion=paramversion,
1462 logger=self.consoleLogger)
1464 # create first backup
1465 deltatar.create_full_backup(
1466 source_path="source_dir",
1467 backup_path="backup_dir")
1469 prev_index_filename = deltatar.index_name_func(is_full=True)
1470 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1472 # add some new files and directories
1473 os.makedirs('source_dir/bigdir')
1474 self.hash["source_dir/bigdir"] = ""
1475 os.unlink("source_dir/small")
1476 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1477 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1478 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1480 deltatar.create_diff_backup("source_dir", "backup_dir2",
1483 # apply diff backup in target_dir
1484 index_filename = deltatar.index_name_func(is_full=False)
1485 index_path = os.path.join("backup_dir2", index_filename)
1486 deltatar.restore_backup("target_dir",
1487 backup_indexes_paths=[index_path, prev_index_path])
1489 # then compare the two directories source_dir and target_dir and check
1491 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1493 def test_restore_from_index_diff_backup2(self):
1495 Creates a full backup, modifies some files, creates a diff backup,
1496 then restores the diff backup with the full backup as a starting point.
1498 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1499 raise SkipTest('this test only works for uncompressed '
1500 'or concat compressed modes')
1502 password, paramversion = self.ENCRYPTION or (None, None)
1503 deltatar = DeltaTar(mode=self.MODE, password=password,
1504 crypto_paramversion=paramversion,
1505 logger=self.consoleLogger)
1507 # create first backup
1508 deltatar.create_full_backup(
1509 source_path="source_dir",
1510 backup_path="backup_dir")
1512 prev_index_filename = deltatar.index_name_func(is_full=True)
1513 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1515 # add some new files and directories
1516 os.makedirs('source_dir/bigdir')
1517 self.hash["source_dir/bigdir"] = ""
1518 os.unlink("source_dir/small")
1519 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1520 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1521 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1522 shutil.rmtree("source_dir/test")
1524 deltatar.create_diff_backup("source_dir", "backup_dir2",
1527 # first restore initial backup in target_dir
1528 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1529 tar_path = os.path.join("backup_dir", tar_filename)
1530 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1532 # then apply diff backup in target_dir
1533 index_filename = deltatar.index_name_func(is_full=False)
1534 index_path = os.path.join("backup_dir2", index_filename)
1535 deltatar.restore_backup("target_dir",
1536 backup_indexes_paths=[index_path, prev_index_path])
1538 # then compare the two directories source_dir and target_dir and check
1540 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1542 def test_restore_from_index_diff_backup3(self):
1544 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1545 diff backup, then restores the diff backup with the full backup as a
1548 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1549 raise SkipTest('this test only works for uncompressed '
1550 'or concat compressed modes')
1552 password, paramversion = self.ENCRYPTION or (None, None)
1553 deltatar = DeltaTar(mode=self.MODE, password=password,
1554 crypto_paramversion=paramversion,
1555 logger=self.consoleLogger)
1557 shutil.rmtree("source_dir")
1558 shutil.copytree(self.GIT_DIR, "source_dir")
1559 shutil.copytree(self.GIT_DIR, "source_dir_diff")
1561 # create first backup
1562 deltatar.create_full_backup(
1563 source_path="source_dir",
1564 backup_path="backup_dir")
1566 prev_index_filename = deltatar.index_name_func(is_full=True)
1567 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1569 # alter the source_dir randomly
1570 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1572 for path in source_it:
1573 # if path doesn't exist (might have previously removed) ignore it.
1574 # also ignore it (i.e. do not change it) 70% of the time
1575 if not os.path.exists(path) or random.random() < 0.7:
1579 if os.path.isdir(path):
1584 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1587 # first restore initial backup in target_dir
1588 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1589 tar_path = os.path.join("backup_dir", tar_filename)
1590 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1592 # and check that target_dir equals to source_dir (which is the same as
1593 # self.GIT_DIR initially)
1594 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1596 # then apply diff backup in target_dir
1597 index_filename = deltatar.index_name_func(is_full=False)
1598 index_path = os.path.join("backup_dir2", index_filename)
1599 deltatar.restore_backup("target_dir",
1600 backup_indexes_paths=[index_path, prev_index_path])
1602 # and check that target_dir equals to source_dir_diff (the randomly
1603 # altered self.GIT_DIR directory)
1604 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1606 # then delete target_dir and apply diff backup from zero and check again
1607 shutil.rmtree("target_dir")
1608 deltatar.restore_backup("target_dir",
1609 backup_indexes_paths=[index_path, prev_index_path])
1611 # and check that target_dir equals to source_dir_diff (the randomly
1612 # altered self.GIT_DIR directory)
1613 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1615 def test_restore_from_index_diff_backup3_multivol(self):
1617 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1618 diff backup, then restores the diff backup with the full backup as a
1621 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1622 raise SkipTest('this test only works for uncompressed '
1623 'or concat compressed modes')
1625 password, paramversion = self.ENCRYPTION or (None, None)
1626 deltatar = DeltaTar(mode=self.MODE, password=password,
1627 crypto_paramversion=paramversion,
1628 logger=self.consoleLogger)
1630 shutil.rmtree("source_dir")
1631 shutil.copytree(self.GIT_DIR, "source_dir")
1632 shutil.copytree(self.GIT_DIR, "source_dir_diff")
1634 # create first backup
1635 deltatar.create_full_backup(
1636 source_path="source_dir",
1637 backup_path="backup_dir",
1640 prev_index_filename = deltatar.index_name_func(is_full=True)
1641 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1643 # alter the source_dir randomly
1644 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1646 for path in source_it:
1647 # if path doesn't exist (might have previously removed) ignore it.
1648 # also ignore it (i.e. do not change it) 70% of the time
1649 if not os.path.exists(path) or random.random() < 0.7:
1653 if os.path.isdir(path):
1658 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1659 prev_index_path, max_volume_size=1)
1661 # first restore initial backup in target_dir
1662 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1663 tar_path = os.path.join("backup_dir", tar_filename)
1664 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1666 # and check that target_dir equals to source_dir (which is the same as
1667 # self.GIT_DIR initially)
1668 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1670 # then apply diff backup in target_dir
1671 index_filename = deltatar.index_name_func(is_full=False)
1672 index_path = os.path.join("backup_dir2", index_filename)
1673 deltatar.restore_backup("target_dir",
1674 backup_indexes_paths=[index_path, prev_index_path])
1676 # and check that target_dir equals to source_dir_diff (the randomly
1677 # altered self.GIT_DIR directory)
1678 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1680 # then delete target_dir and apply diff backup from zero and check again
1681 shutil.rmtree("target_dir")
1682 deltatar.restore_backup("target_dir",
1683 backup_indexes_paths=[index_path, prev_index_path])
1685 # and check that target_dir equals to source_dir_diff (the randomly
1686 # altered self.GIT_DIR directory)
1687 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1689 def check_equal_dirs(self, path1, path2, deltatar):
1691 compare the two directories source_dir and target_dir and check
1694 source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
1695 source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
1696 target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
1697 target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1700 sitem = next(source_it)
1701 titem = next(target_it)
1702 except StopIteration:
1704 titem = next(target_it)
1705 raise Exception("iterators do not stop at the same time")
1706 except StopIteration:
1709 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
1710 except Exception as e:
1711 print("SITEM: " + str(sitem))
1712 print("TITEM: " + str(titem))
1715 def test_create_no_symlinks(self):
1717 Creates a full backup from different varieties of symlinks. The
1718 extracted archive may not contain any symlinks but the file contents
1721 os.system("rm -rf source_dir")
1722 os.makedirs("source_dir/symlinks")
1723 fd = os.open("source_dir/symlinks/valid_linkname",
1724 os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1725 os.write(fd, b"valid link target for symlink tests; please ignore\n")
1727 # first one is good, the rest points nowhere
1728 self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1729 self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1730 self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1731 self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
1732 password, paramversion = self.ENCRYPTION or (None, None)
1733 deltatar = DeltaTar(mode=self.MODE, password=password,
1734 crypto_paramversion=paramversion,
1735 logger=self.consoleLogger)
1737 # create first backup
1738 deltatar.create_full_backup(source_path="source_dir",
1739 backup_path="backup_dir")
1741 assert os.path.exists("backup_dir")
1742 shutil.rmtree("source_dir")
1743 assert not os.path.exists("source_dir")
1745 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1746 tar_path = os.path.join("backup_dir", tar_filename)
1748 deltatar.restore_backup(target_path="source_dir",
1749 backup_tar_path=tar_path)
1751 for _r, _ds, fs in os.walk("source_dir/symlinks"):
1752 # only the valid link plus the linked file may be found in the
1756 # the link must have been resolved and file contents must match
1758 assert not os.path.islink(f)
1759 with open("source_dir/symlinks/valid_linkname") as a:
1760 with open("source_dir/symlinks/whatever") as b:
1761 assert a.read() == b.read()
1763 def test_restore_with_symlinks(self):
1765 Creates a full backup containing different varieties of symlinks. All
1766 of them must be filtered out.
1768 password, paramversion = self.ENCRYPTION or (None, None)
1769 deltatar = DeltaTar(mode=self.MODE, password=password,
1770 crypto_paramversion=paramversion,
1771 logger=self.consoleLogger)
1773 # create first backup
1774 deltatar.create_full_backup(source_path="source_dir",
1775 backup_path="backup_dir")
1777 assert os.path.exists("backup_dir")
1778 shutil.rmtree("source_dir")
1780 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1781 tar_path = os.path.join("backup_dir", tar_filename)
1783 # add symlinks to existing archive
1785 def add_symlink (a, name, dst):
1786 l = tarfile.TarInfo("snapshot://%s" % name)
1787 l.type = tarfile.SYMTYPE
1793 with tarfile.open(tar_path,mode="a") as a:
1795 [ add_symlink(a, "symlinks/foo", "internal-file")
1796 , add_symlink(a, "symlinks/bar", "/absolute/path")
1797 , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1798 except tarfile.ReadError as e:
1799 if self.MODE == '#' or self.MODE.endswith ("gz"):
1803 except ValueError as e:
1804 if self.MODE.startswith ('#'):
1809 deltatar.restore_backup(target_path="source_dir",
1810 backup_tar_path=tar_path)
1812 # check what happened to our symlinks
1813 for name in checkme:
1814 fullpath = os.path.join("source_dir", name)
1815 assert not os.path.exists(fullpath)
1817 def test_restore_malicious_symlinks(self):
1819 Creates a full backup containing a symlink and a file of the same name.
1820 This simulates a symlink attack with a link pointing to some external
1821 path that is abused to write outside the extraction prefix.
1823 password, paramversion = self.ENCRYPTION or (None, None)
1824 deltatar = DeltaTar(mode=self.MODE, password=password,
1825 crypto_paramversion=paramversion,
1826 logger=self.consoleLogger)
1828 # create first backup
1829 deltatar.create_full_backup(source_path="source_dir",
1830 backup_path="backup_dir")
1832 assert os.path.exists("backup_dir")
1833 shutil.rmtree("source_dir")
1835 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1836 tar_path = os.path.join("backup_dir", tar_filename)
1838 # add symlinks to existing archive
1840 def add_symlink (a, name, dst):
1841 l = tarfile.TarInfo("snapshot://%s" % name)
1842 l.type = tarfile.SYMTYPE
1846 def add_file (a, name):
1847 f = tarfile.TarInfo("snapshot://%s" % name)
1848 f.type = tarfile.REGTYPE
1851 testpath = "symlinks/pernicious-link"
1852 testdst = "/tmp/does/not/exist"
1855 with tarfile.open(tar_path, mode="a") as a:
1856 add_symlink(a, testpath, testdst)
1857 add_symlink(a, testpath, testdst+"X")
1858 add_symlink(a, testpath, testdst+"XXX")
1859 add_file(a, testpath)
1860 except tarfile.ReadError as e:
1861 if self.MODE == '#' or self.MODE.endswith ("gz"):
1865 except ValueError as e:
1866 if self.MODE.startswith ('#'):
1867 pass # O_APPEND of concat archives not feasible
1871 deltatar.restore_backup(target_path="source_dir",
1872 backup_tar_path=tar_path)
1874 # check whether the link was extracted; deltatar seems to only ever
1875 # retrieve the first item it finds for a given path which in the case
1876 # at hand is a symlink to some non-existent path
1877 fullpath = os.path.join("source_dir", testpath)
1878 assert not os.path.exists(fullpath)
1881 def fsapi_access_true (self):
1883 Chicanery for testing improper use of the *os* module.
1885 def yes (*_a, **_ka): return True
1886 self.FSAPI_SAVED.append (("access", getattr (os, "access")))
1887 setattr (os, "access", yes)
1890 class DeltaTar2Test(DeltaTarTest):
1892 Same as DeltaTar but with specific ":" mode
1897 class DeltaTarStreamTest(DeltaTarTest):
1899 Same as DeltaTar but with specific uncompressed stream mode
1904 class DeltaTarGzipTest(DeltaTarTest):
1906 Same as DeltaTar but with specific gzip mode
1909 MODE_COMPRESSES = True
1912 class DeltaTarGzipStreamTest(DeltaTarTest):
1914 Same as DeltaTar but with specific gzip stream mode
1917 MODE_COMPRESSES = True
1920 @skip('Bz2 tests are too slow..')
1921 class DeltaTarBz2Test(DeltaTarTest):
1923 Same as DeltaTar but with specific bz2 mode
1926 MODE_COMPRESSES = True
1929 @skip('Bz2 tests are too slow..')
1930 class DeltaTarBz2StreamTest(DeltaTarTest):
1932 Same as DeltaTar but with specific bz2 stream mode
1935 MODE_COMPRESSES = True
1938 class DeltaTarGzipConcatTest(DeltaTarTest):
1940 Same as DeltaTar but with specific gzip concat stream mode
1943 MODE_COMPRESSES = True
1946 class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
1948 Same as DeltaTar but with specific gzip aes128 concat stream mode
1951 ENCRYPTION = ('some magic key', 1)
1952 MODE_COMPRESSES = True
1955 class DeltaTarAes128ConcatTest(DeltaTarTest):
1957 Same as DeltaTar but with specific aes128 concat stream mode
1960 ENCRYPTION = ('some magic key', 1)
1963 class DeltaTarFilesystemHandlingTest(DeltaTarGzipTest):
1965 Mess with filesystem APIs.
1967 FSTEST = fsapi_access_true