1 # Copyright (C) 2013 Intra2net AG
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU Lesser General Public License as published
5 # by the Free Software Foundation; either version 3 of the License, or
6 # (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU Lesser General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program. If not, see
15 # <http://www.gnu.org/licenses/lgpl-3.0.html>
17 # Author: Eduardo Robles Elvira <edulix@wadobo.com>
27 from datetime import datetime
28 from functools import partial
29 from unittest import skip, SkipTest
31 import deltatar.tarfile as tarfile
32 from deltatar.tarfile import TarFile
33 from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
34 from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35 import deltatar.crypto as crypto
37 from . import BaseTest
38 from . import new_volume_handler
40 class DeltaTarTest(BaseTest):
45 MODE_COMPRESSES = False
47 ENCRYPTION = None # (password : str, paramversion : int) option
55 self.pwd = os.getcwd()
56 os.system('rm -rf target_dir source_dir* backup_dir* huge')
57 os.makedirs('source_dir/test/test2')
59 self.hash["source_dir/test/test2"] = ''
60 self.hash["source_dir/big"] = self.create_file("source_dir/big", 50000)
61 self.hash["source_dir/small"] = self.create_file("source_dir/small", 100)
62 self.hash["source_dir/test/huge"] = self.create_file("source_dir/test/huge", 700000)
63 self.hash["source_dir/test/huge2"] = self.create_file("source_dir/test/huge2", 800000)
65 self.consoleLogger = logging.StreamHandler()
66 self.consoleLogger.setLevel(logging.DEBUG)
68 if not os.path.isdir(self.GIT_DIR):
69 # Not running inside git tree, take our
70 # own testing directory as source.
71 self.GIT_DIR = 'testing'
73 if not os.path.isdir(self.GIT_DIR):
74 raise Exception('No input directory found: ' + self.GIT_DIR)
78 Remove temporal files created by unit tests and reset globals.
81 os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
82 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
83 ("I am fully aware that this will void my warranty.")
85 def test_restore_simple_full_backup(self):
87 Creates a full backup without any filtering and restores it.
89 password, paramversion = self.ENCRYPTION or (None, None)
90 deltatar = DeltaTar(mode=self.MODE, password=password,
91 crypto_paramversion=paramversion,
92 logger=self.consoleLogger)
95 deltatar.create_full_backup(
96 source_path="source_dir",
97 backup_path="backup_dir")
99 assert os.path.exists("backup_dir")
100 shutil.rmtree("source_dir")
102 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
103 tar_path = os.path.join("backup_dir", tar_filename)
105 deltatar.restore_backup(target_path="source_dir",
106 backup_tar_path=tar_path)
108 for key, value in self.hash.items():
109 assert os.path.exists(key)
111 assert value == self.md5sum(key)
114 def test_create_backup_max_file_length (self):
116 Creates a full backup including one file that exceeds the (purposely
117 lowered) upper bound on GCM encrypted objects. This will yield multiple
118 encrypted objects for one plaintext file.
120 Success is verified by splitting the archive at object boundaries and
123 if self.MODE_COMPRESSES is True:
124 raise SkipTest ("GCM file length test not meaningful with compression.")
125 if self.ENCRYPTION is None:
126 raise SkipTest ("GCM file length applies only to encrypted backups.")
128 new_max = 20000 # cannot be less than tar block size
129 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
130 ("I am fully aware that this will void my warranty.",
133 password, paramversion = self.ENCRYPTION
134 deltatar = DeltaTar (mode=self.MODE, password=password,
135 crypto_paramversion=paramversion,
136 logger=self.consoleLogger)
139 os.makedirs ("source_dir2")
140 for f, s in [("empty" , 0) # 1 tar objects
141 ,("slightly_larger", new_max + 1) # 2
142 ,("twice" , 2 * new_max) # 3
144 f = "source_dir2/%s" % f
145 self.hash [f] = self.create_file (f, s)
147 deltatar.create_full_backup \
148 (source_path="source_dir2", backup_path="backup_dir")
150 assert os.path.exists ("backup_dir")
151 shutil.rmtree ("source_dir2")
153 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
154 backup_path = os.path.join("backup_dir", backup_filename)
156 # split the resulting archive into its constituents without
158 ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
159 "-o backup_dir/split <\'%s\'" % backup_path)
161 assert os.path.exists ("backup_dir/split")
163 dents = os.listdir ("backup_dir/split")
164 assert len (dents) == 6
167 def test_restore_backup_max_file_length (self):
169 Creates a full backup including one file that exceeds the (purposely
170 lowered) upper bound on GCM encrypted objects. This will yield two
171 encrypted objects for one plaintext file.
173 Success is verified by splitting the archive at object boundaries and
176 if self.MODE_COMPRESSES is True:
177 raise SkipTest ("GCM file length test not meaningful with compression.")
178 if self.ENCRYPTION is None:
179 raise SkipTest ("GCM file length applies only to encrypted backups.")
181 new_max = 20000 # cannot be less than tar block size
182 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
183 ("I am fully aware that this will void my warranty.",
186 password, paramversion = self.ENCRYPTION
187 deltatar = DeltaTar (mode=self.MODE, password=password,
188 crypto_paramversion=paramversion,
189 logger=self.consoleLogger)
192 os.makedirs ("source_dir2")
193 for f, s in [("empty" , 0) # 1 tar objects
194 ,("almost_large" , new_max - 1) # 2
195 ,("large" , new_max) # 3
196 ,("slightly_larger", new_max + 1) # 4
197 ,("twice" , 2 * new_max) # 5
198 ,("twice_plus_one" , (2 * new_max) + 1) # 6
200 f = "source_dir2/%s" % f
201 self.hash [f] = self.create_file (f, s)
203 deltatar.create_full_backup \
204 (source_path="source_dir2", backup_path="backup_dir")
206 assert os.path.exists ("backup_dir")
207 shutil.rmtree ("source_dir2")
209 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
210 backup_path = os.path.join("backup_dir", backup_filename)
212 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
213 tar_path = os.path.join("backup_dir", tar_filename)
215 deltatar.restore_backup(target_path="source_dir2",
216 backup_tar_path=tar_path)
218 for key, value in self.hash.items():
219 assert os.path.exists(key)
221 assert value == self.md5sum(key)
224 def test_create_backup_index_max_file_length (self):
226 Creates a full backup with a too large index file for the upper bound
227 of the GCM encryption. Since the index file has a fixed IV file counter
228 of AES_GCM_IV_CNT_INDEX, we expect the crypto layer to abort.
230 60+ GB of (potentially compressed) index file should last for a while...
232 if self.MODE_COMPRESSES is True:
233 raise SkipTest ("GCM file length test not meaningful with compression.")
234 if self.ENCRYPTION is None:
235 raise SkipTest ("GCM file length applies only to encrypted backups.")
238 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
239 ("I am fully aware that this will void my warranty.",
242 password, paramversion = self.ENCRYPTION
243 deltatar = DeltaTar (mode=self.MODE, password=password,
244 crypto_paramversion=paramversion,
245 logger=self.consoleLogger)
248 os.makedirs ("source_dir2")
250 f = "source_dir2/dummy_%rd" % i
251 self.hash [f] = self.create_file (f, i)
253 with self.assertRaises (crypto.InvalidFileCounter):
254 deltatar.create_full_backup \
255 (source_path="source_dir2", backup_path="backup_dir")
256 shutil.rmtree ("source_dir2")
259 def test_check_index_checksum(self):
261 Creates a full backup and checks the index' checksum of files
263 password, paramversion = self.ENCRYPTION or (None, None)
264 deltatar = DeltaTar(mode=self.MODE, password=password,
265 crypto_paramversion=paramversion,
266 logger=self.consoleLogger)
268 # create first backup
269 deltatar.create_full_backup(
270 source_path="source_dir",
271 backup_path="backup_dir")
274 index_filename = deltatar.index_name_func(True)
275 index_path = os.path.join("backup_dir", index_filename)
277 f = open(index_path, 'rb')
285 if b'BEGIN-FILE-LIST' in l:
286 crc = binascii.crc32(l) & 0xFFFFffff
288 elif b'END-FILE-LIST' in l:
289 crc = binascii.crc32(l, crc) & 0xffffffff
291 # next line contains the crc
292 data = json.loads(f.readline().decode("UTF-8"))
293 assert data['type'] == 'file-list-checksum'
294 assert data['checksum'] == crc
298 crc = binascii.crc32(l, crc) & 0xffffffff
302 def test_restore_multivol(self):
304 Creates a full backup without any filtering with multiple volumes and
307 if ':gz' in self.MODE:
308 raise SkipTest('compression information is lost when creating '
309 'multiple volumes with no Stream')
311 password, paramversion = self.ENCRYPTION or (None, None)
312 deltatar = DeltaTar(mode=self.MODE, password=password,
313 crypto_paramversion=paramversion,
314 logger=self.consoleLogger)
317 os.makedirs('source_dir2')
318 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
319 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
321 # create first backup
322 deltatar.create_full_backup(
323 source_path="source_dir2",
324 backup_path="backup_dir",
327 assert os.path.exists("backup_dir")
328 assert os.path.exists(os.path.join("backup_dir",
329 deltatar.volume_name_func("backup_dir", True, 0)))
330 if self.MODE_COMPRESSES:
334 for i_vol in range(n_vols):
335 assert os.path.exists(os.path.join("backup_dir",
336 deltatar.volume_name_func("backup_dir", True, i_vol)))
337 assert not os.path.exists(os.path.join("backup_dir",
338 deltatar.volume_name_func("backup_dir", True, n_vols)))
340 shutil.rmtree("source_dir2")
342 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
343 tar_path = os.path.join("backup_dir", tar_filename)
345 # this should automatically restore all volumes
346 deltatar.restore_backup(target_path="source_dir2",
347 backup_tar_path=tar_path)
349 for key, value in self.hash.items():
350 assert os.path.exists(key)
352 assert value == self.md5sum(key)
354 def test_restore_multivol_split(self):
356 Creates a full backup without any filtering with multiple volumes
357 with big files bigger than the max volume size and
360 if self.MODE.startswith(':') or self.MODE.startswith('|'):
361 raise SkipTest('this test only works for uncompressed '
362 'or concat compressed modes')
364 password, paramversion = self.ENCRYPTION or (None, None)
365 deltatar = DeltaTar(mode=self.MODE, password=password,
366 crypto_paramversion=paramversion,
367 logger=self.consoleLogger)
370 os.makedirs('source_dir2')
371 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 3*1024*1024)
372 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 4*1024*1024)
373 self.hash["source_dir2/huge2"] = self.create_file("source_dir2/huge2", 4*1024*1024)
375 # create first backup
376 deltatar.create_full_backup(
377 source_path="source_dir2",
378 backup_path="backup_dir",
381 assert os.path.exists("backup_dir")
382 assert os.path.exists(os.path.join("backup_dir",
383 deltatar.volume_name_func("backup_dir", True, 0)))
384 if self.MODE_COMPRESSES:
388 for i_vol in range(n_vols):
389 assert os.path.exists(os.path.join("backup_dir",
390 deltatar.volume_name_func("backup_dir", True, i_vol)))
391 assert not os.path.exists(os.path.join("backup_dir",
392 deltatar.volume_name_func("backup_dir", True, n_vols)))
394 shutil.rmtree("source_dir2")
396 index_filename = deltatar.index_name_func(True)
397 index_path = os.path.join("backup_dir", index_filename)
399 deltatar.restore_backup(target_path="source_dir2",
400 backup_indexes_paths=[index_path])
402 for key, value in self.hash.items():
403 assert os.path.exists(key)
405 assert value == self.md5sum(key)
408 def test_full_backup_index_extra_data(self):
410 Tests that the index file for a full backup can store extra_data and
411 that this data can be retrieved.
413 password, paramversion = self.ENCRYPTION or (None, None)
414 deltatar = DeltaTar(mode=self.MODE, password=password,
415 crypto_paramversion=paramversion,
416 logger=self.consoleLogger)
420 otra_cosa=[1, "lista"],
421 y_otra=dict(bola=1.1)
424 deltatar.create_full_backup(
425 source_path="source_dir",
426 backup_path="backup_dir",
427 extra_data=extra_data)
429 index_filename = deltatar.index_name_func(is_full=True)
430 index_path = os.path.join("backup_dir", index_filename)
432 # iterate_index_path retrieves extra_data, and thus we can then compare
433 index_it = deltatar.iterate_index_path(index_path)
434 self.assertEqual(index_it.extra_data, extra_data)
437 def test_diff_backup_index_extra_data(self):
439 Tests that the index file for a diff backup can store extra_data and
440 that this data can be retrieved.
442 password, paramversion = self.ENCRYPTION or (None, None)
443 deltatar = DeltaTar(mode=self.MODE, password=password,
444 crypto_paramversion=paramversion,
445 logger=self.consoleLogger)
449 otra_cosa=[1, "lista"],
450 y_otra=dict(bola=1.1)
453 deltatar.create_full_backup(
454 source_path="source_dir",
455 backup_path="backup_dir")
458 prev_index_filename = deltatar.index_name_func(is_full=True)
459 prev_index_path = os.path.join("backup_dir", prev_index_filename)
461 # create empty diff backup
462 deltatar.create_diff_backup("source_dir", "backup_dir2",
463 prev_index_path, extra_data=extra_data)
465 index_filename = deltatar.index_name_func(is_full=False)
466 index_path = os.path.join("backup_dir2", index_filename)
468 # iterate_index_path retrieves extra_data, and thus we can then compare
469 index_it = deltatar.iterate_index_path(index_path)
470 self.assertEqual(index_it.extra_data, extra_data)
472 def test_restore_multivol2(self):
474 Creates a full backup without any filtering with multiple volumes and
477 password, paramversion = self.ENCRYPTION or (None, None)
478 deltatar = DeltaTar(mode=self.MODE, password=password,
479 crypto_paramversion=paramversion,
480 logger=self.consoleLogger)
482 shutil.copytree(self.GIT_DIR, "source_dir2")
484 # create first backup
485 deltatar.create_full_backup(
486 source_path="source_dir2",
487 backup_path="backup_dir",
490 assert os.path.exists("backup_dir")
491 assert os.path.exists(os.path.join("backup_dir",
492 deltatar.volume_name_func("backup_dir", True, 0)))
494 shutil.rmtree("source_dir2")
496 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
497 tar_path = os.path.join("backup_dir", tar_filename)
499 # this should automatically restore all volumes
500 deltatar.restore_backup(target_path="source_dir2",
501 backup_tar_path=tar_path)
503 self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
505 def test_restore_multivol_manual_from_index(self):
507 Creates a full backup without any filtering with multiple volumes and
510 # this test only works for uncompressed or concat compressed modes
511 if self.MODE.startswith(':') or self.MODE.startswith('|'):
512 raise SkipTest('this test only works for uncompressed '
513 'or concat compressed modes')
515 password, paramversion = self.ENCRYPTION or (None, None)
516 deltatar = DeltaTar(mode=self.MODE, password=password,
517 crypto_paramversion=paramversion,
518 logger=self.consoleLogger)
521 os.makedirs('source_dir2')
522 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
523 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
525 # create first backup
526 deltatar.create_full_backup(
527 source_path="source_dir2",
528 backup_path="backup_dir",
531 assert os.path.exists("backup_dir")
532 assert os.path.exists(os.path.join("backup_dir",
533 deltatar.volume_name_func("backup_dir", True, 0)))
534 if self.MODE_COMPRESSES:
538 for i_vol in range(n_vols):
539 assert os.path.exists(os.path.join("backup_dir",
540 deltatar.volume_name_func("backup_dir", True, i_vol)))
541 assert not os.path.exists(os.path.join("backup_dir",
542 deltatar.volume_name_func("backup_dir", True, n_vols)))
544 shutil.rmtree("source_dir2")
546 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
547 tar_path = os.path.join("backup_dir", tar_filename)
549 index_filename = deltatar.index_name_func(True)
550 index_path = os.path.join("backup_dir", index_filename)
552 # this should automatically restore the huge file
553 f = deltatar.open_auxiliary_file(index_path, 'r')
559 data = json.loads(l.decode('UTF-8'))
560 if data.get('type', '') == 'file' and\
561 deltatar.unprefixed(data['path']) == "huge":
562 offset = data['offset']
565 assert offset is not None
567 fo = open(tar_path, 'rb')
569 def new_volume_handler(mode, tarobj, base_name, volume_number):
570 suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
571 if self.ENCRYPTION is not None:
572 # deltatar module is shadowed here
573 suf += "." + deltatar_PDTCRYPT_EXTENSION
574 tarobj.open_volume(datetime.now().strftime(
575 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
576 new_volume_handler = partial(new_volume_handler, self.MODE)
579 if self.ENCRYPTION is not None:
580 crypto_ctx = crypto.Decrypt (password)
582 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
583 new_volume_handler=new_volume_handler,
584 encryption=crypto_ctx)
586 member = tarobj.next()
587 member.path = deltatar.unprefixed(member.path)
588 member.name = deltatar.unprefixed(member.name)
589 tarobj.extract(member)
592 assert self.hash['source_dir2/huge'] == self.md5sum('huge')
597 def test_restore_manual_from_index_twice (self):
599 Creates a full backup and restore the same file twice. This *must* fail
600 when encryption is active.
602 Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
603 backwards within the same file. This prevents the encryption layer from
604 exploding due to a reused IV in an overall valid archive.
606 This test anticipates possible future mistakes since it’s entirely
607 feasible to implement backward seeks for *_Stream* with concat mode.
609 # this test only works for uncompressed or concat compressed modes
610 if self.MODE.startswith("|") or self.MODE_COMPRESSES:
611 raise SkipTest("this test only works for uncompressed "
612 "or concat compressed modes")
614 password, paramversion = self.ENCRYPTION or (None, None)
615 deltatar = DeltaTar(mode=self.MODE, password=password,
616 crypto_paramversion=paramversion,
617 logger=self.consoleLogger)
620 os.makedirs("source_dir2")
621 self.hash["source_dir2/samefile"] = \
622 self.create_file("source_dir2/samefile", 1 * 1024)
624 # create first backup
625 deltatar.create_full_backup(
626 source_path="source_dir2",
627 backup_path="backup_dir")
629 assert os.path.exists("backup_dir")
630 assert os.path.exists(os.path.join("backup_dir",
631 deltatar.volume_name_func("backup_dir", True, 0)))
633 shutil.rmtree("source_dir2")
635 tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
636 tar_path = os.path.join("backup_dir", tar_filename)
638 index_filename = deltatar.index_name_func(True)
639 index_path = os.path.join("backup_dir", index_filename)
641 f = deltatar.open_auxiliary_file(index_path, "r")
647 data = json.loads(l.decode("UTF-8"))
648 if data.get("type", "") == "file" and\
649 deltatar.unprefixed(data["path"]) == "samefile":
650 offset = data["offset"]
653 assert offset is not None
655 fo = open(tar_path, "rb")
659 if self.ENCRYPTION is not None:
660 crypto_ctx = crypto.Decrypt (password)
662 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
663 encryption=crypto_ctx)
664 member = tarobj.next()
665 member.path = deltatar.unprefixed(member.path)
666 member.name = deltatar.unprefixed(member.name)
669 tarobj.extract(member)
670 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
674 tarobj.extract(member)
675 except tarfile.StreamError:
676 if crypto_ctx is not None:
677 pass # good: seeking backwards not allowed
682 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
684 os.unlink("samefile")
687 def test_restore_from_index(self):
689 Restores a full backup using an index file.
691 if self.MODE.startswith(':') or self.MODE.startswith('|'):
692 raise SkipTest('this test only works for uncompressed '
693 'or concat compressed modes')
695 password, paramversion = self.ENCRYPTION or (None, None)
696 deltatar = DeltaTar(mode=self.MODE, password=password,
697 crypto_paramversion=paramversion,
698 logger=self.consoleLogger)
700 # create first backup
701 deltatar.create_full_backup(
702 source_path="source_dir",
703 backup_path="backup_dir")
705 shutil.rmtree("source_dir")
707 # this should automatically restore all volumes
708 index_filename = deltatar.index_name_func(True)
709 index_path = os.path.join("backup_dir", index_filename)
711 deltatar.restore_backup(target_path="source_dir",
712 backup_indexes_paths=[index_path])
714 for key, value in self.hash.items():
715 assert os.path.exists(key)
717 assert value == self.md5sum(key)
719 def test_restore_multivol_from_index(self):
721 Restores a full multivolume backup using an index file.
723 if self.MODE.startswith(':') or self.MODE.startswith('|'):
724 raise SkipTest('this test only works for uncompressed '
725 'or concat compressed modes')
727 password, paramversion = self.ENCRYPTION or (None, None)
728 deltatar = DeltaTar(mode=self.MODE, password=password,
729 crypto_paramversion=paramversion,
730 logger=self.consoleLogger)
732 # create first backup
733 deltatar.create_full_backup(
734 source_path="source_dir",
735 backup_path="backup_dir",
738 shutil.rmtree("source_dir")
740 # this should automatically restore all volumes
741 index_filename = deltatar.index_name_func(True)
742 index_path = os.path.join("backup_dir", index_filename)
744 deltatar.restore_backup(target_path="source_dir",
745 backup_indexes_paths=[index_path])
747 for key, value in self.hash.items():
748 assert os.path.exists(key)
750 assert value == self.md5sum(key)
752 def test_create_basic_filtering(self):
754 Tests create backup basic filtering.
756 password, paramversion = self.ENCRYPTION or (None, None)
757 deltatar = DeltaTar(mode=self.MODE, password=password,
758 crypto_paramversion=paramversion,
759 logger=self.consoleLogger,
760 included_files=["test", "small"],
761 excluded_files=["test/huge"])
763 # create first backup
764 deltatar.create_full_backup(
765 source_path="source_dir",
766 backup_path="backup_dir")
768 assert os.path.exists("backup_dir")
769 shutil.rmtree("source_dir")
771 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
772 tar_path = os.path.join("backup_dir", tar_filename)
774 deltatar.restore_backup(target_path="source_dir",
775 backup_tar_path=tar_path)
777 assert os.path.exists("source_dir/small")
778 assert os.path.exists("source_dir/test")
779 assert os.path.exists("source_dir/test/huge2")
780 assert os.path.exists("source_dir/test/test2")
782 assert not os.path.exists("source_dir/test/huge")
783 assert not os.path.exists("source_dir/big")
785 def test_create_filter_func(self):
787 Tests create backup basic filtering.
790 def filter_func(visited_paths, path):
791 if path not in visited_paths:
792 visited_paths.append(path)
795 filter_func = partial(filter_func, visited_paths)
797 password, paramversion = self.ENCRYPTION or (None, None)
798 deltatar = DeltaTar(mode=self.MODE, password=password,
799 crypto_paramversion=paramversion,
800 logger=self.consoleLogger,
801 included_files=["test", "small"],
802 excluded_files=["test/huge"],
803 filter_func=filter_func)
805 # create first backup
806 deltatar.create_full_backup(
807 source_path="source_dir",
808 backup_path="backup_dir")
810 assert os.path.exists("backup_dir")
811 shutil.rmtree("source_dir")
813 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
814 tar_path = os.path.join("backup_dir", tar_filename)
816 deltatar.restore_backup(target_path="source_dir",
817 backup_tar_path=tar_path)
818 assert set(visited_paths) == set([
825 def test_create_filter_out_func(self):
827 Tests create backup basic filtering.
830 def filter_func(visited_paths, path):
832 Filter out everything
834 if path not in visited_paths:
835 visited_paths.append(path)
838 filter_func = partial(filter_func, visited_paths)
840 password, paramversion = self.ENCRYPTION or (None, None)
841 deltatar = DeltaTar(mode=self.MODE, password=password,
842 crypto_paramversion=paramversion,
843 logger=self.consoleLogger,
844 included_files=["test", "small"],
845 excluded_files=["test/huge"],
846 filter_func=filter_func)
848 # create first backup
849 deltatar.create_full_backup(
850 source_path="source_dir",
851 backup_path="backup_dir")
853 assert os.path.exists("backup_dir")
854 shutil.rmtree("source_dir")
856 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
857 tar_path = os.path.join("backup_dir", tar_filename)
859 deltatar.restore_backup(target_path="source_dir",
860 backup_tar_path=tar_path)
861 assert set(visited_paths) == set([
866 # check that effectively no file was backed up
867 assert not os.path.exists("source_dir/small")
868 assert not os.path.exists("source_dir/big")
869 assert not os.path.exists("source_dir/test")
871 def test_restore_index_basic_filtering(self):
873 Creates a backup, and then filter when doing the index based restore.
875 if self.MODE.startswith(':') or self.MODE.startswith('|'):
876 raise SkipTest('this test only works for uncompressed '
877 'or concat compressed modes')
879 password, paramversion = self.ENCRYPTION or (None, None)
880 deltatar = DeltaTar(mode=self.MODE, password=password,
881 crypto_paramversion=paramversion,
882 logger=self.consoleLogger)
884 # create first backup
885 deltatar.create_full_backup(
886 source_path="source_dir",
887 backup_path="backup_dir")
889 assert os.path.exists("backup_dir")
890 shutil.rmtree("source_dir")
892 index_filename = deltatar.index_name_func(True)
893 index_path = os.path.join("backup_dir", index_filename)
895 deltatar.included_files = ["test", "small"]
896 deltatar.excluded_files = ["test/huge"]
897 deltatar.restore_backup(target_path="source_dir",
898 backup_indexes_paths=[index_path])
900 assert os.path.exists("source_dir/small")
901 assert os.path.exists("source_dir/test")
902 assert os.path.exists("source_dir/test/huge2")
903 assert os.path.exists("source_dir/test/test2")
905 assert not os.path.exists("source_dir/test/huge")
906 assert not os.path.exists("source_dir/big")
908 def test_restore_index_filter_func(self):
910 Creates a backup, and then filter when doing the index based restore,
911 using the filter function.
913 if self.MODE.startswith(':') or self.MODE.startswith('|'):
914 raise SkipTest('this test only works for uncompressed '
915 'or concat compressed modes')
918 def filter_func(visited_paths, path):
919 if path not in visited_paths:
920 visited_paths.append(path)
923 filter_func = partial(filter_func, visited_paths)
925 password, paramversion = self.ENCRYPTION or (None, None)
926 deltatar = DeltaTar(mode=self.MODE, password=password,
927 crypto_paramversion=paramversion,
928 logger=self.consoleLogger)
930 # create first backup
931 deltatar.create_full_backup(
932 source_path="source_dir",
933 backup_path="backup_dir")
935 assert os.path.exists("backup_dir")
936 shutil.rmtree("source_dir")
938 index_filename = deltatar.index_name_func(True)
939 index_path = os.path.join("backup_dir", index_filename)
941 deltatar.included_files = ["test", "small"]
942 deltatar.excluded_files = ["test/huge"]
943 deltatar.filter_func = filter_func
944 deltatar.restore_backup(target_path="source_dir",
945 backup_indexes_paths=[index_path])
947 assert set(visited_paths) == set([
954 def test_restore_tar_basic_filtering(self):
956 Creates a backup, and then filter when doing the tar based restore.
958 password, paramversion = self.ENCRYPTION or (None, None)
959 deltatar = DeltaTar(mode=self.MODE, password=password,
960 crypto_paramversion=paramversion,
961 logger=self.consoleLogger)
963 # create first backup
964 deltatar.create_full_backup(
965 source_path="source_dir",
966 backup_path="backup_dir")
968 assert os.path.exists("backup_dir")
969 shutil.rmtree("source_dir")
971 deltatar.included_files = ["test", "small"]
972 deltatar.excluded_files = ["test/huge"]
974 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
975 tar_path = os.path.join("backup_dir", tar_filename)
977 deltatar.restore_backup(target_path="source_dir",
978 backup_tar_path=tar_path)
980 assert os.path.exists("source_dir/small")
981 assert os.path.exists("source_dir/test")
982 assert os.path.exists("source_dir/test/huge2")
983 assert os.path.exists("source_dir/test/test2")
985 assert not os.path.exists("source_dir/test/huge")
986 assert not os.path.exists("source_dir/big")
988 def test_restore_tar_filter_func(self):
990 Creates a backup, and then filter when doing the tar based restore,
991 using the filter function.
994 def filter_func(visited_paths, path):
995 if path not in visited_paths:
996 visited_paths.append(path)
999 filter_func = partial(filter_func, visited_paths)
1001 password, paramversion = self.ENCRYPTION or (None, None)
1002 deltatar = DeltaTar(mode=self.MODE, password=password,
1003 crypto_paramversion=paramversion,
1004 logger=self.consoleLogger)
1006 # create first backup
1007 deltatar.create_full_backup(
1008 source_path="source_dir",
1009 backup_path="backup_dir")
1011 assert os.path.exists("backup_dir")
1012 shutil.rmtree("source_dir")
1014 index_filename = deltatar.index_name_func(True)
1015 index_path = os.path.join("backup_dir", index_filename)
1017 deltatar.included_files = ["test", "small"]
1018 deltatar.excluded_files = ["test/huge"]
1019 deltatar.filter_func = filter_func
1021 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1022 tar_path = os.path.join("backup_dir", tar_filename)
1024 deltatar.restore_backup(target_path="source_dir",
1025 backup_tar_path=tar_path)
1026 assert set(visited_paths) == set([
1033 def test_filter_path_regexp(self):
1035 Test specifically the deltatar.filter_path function with regular
1039 re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1040 re.compile('^yes$'),
1044 re.compile('^testing/in_the'),
1046 deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1047 excluded_files=excluded_files)
1049 # assert valid and invalid paths
1050 assert deltatar.filter_path('test/hola')
1051 assert deltatar.filter_path('test/hola/any/thing')
1052 assert deltatar.filter_path('test/caracola/caracolero')
1053 assert deltatar.filter_path('test/caracola/caracolero/yeah')
1054 assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1055 assert deltatar.filter_path('yes')
1056 assert deltatar.filter_path('testing')
1057 assert deltatar.filter_path('testing/yes')
1058 assert deltatar.filter_path('testing/in_th')
1060 assert not deltatar.filter_path('something')
1061 assert not deltatar.filter_path('other/thing')
1062 assert not deltatar.filter_path('test_ing')
1063 assert not deltatar.filter_path('test/hola_lala')
1064 assert not deltatar.filter_path('test/agur')
1065 assert not deltatar.filter_path('testing_something')
1066 assert not deltatar.filter_path('yeso')
1067 assert not deltatar.filter_path('yes/o')
1068 assert not deltatar.filter_path('yes_o')
1069 assert not deltatar.filter_path('testing/in_the')
1070 assert not deltatar.filter_path('testing/in_the_field')
1071 assert not deltatar.filter_path('testing/in_the/field')
1073 def test_filter_path_parent(self):
1075 Test specifically the deltatar.filter_path function for parent matching
1078 'testing/path/to/some/thing'
1080 deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1082 # assert valid and invalid paths
1083 assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1084 assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1085 assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1086 assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1087 assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1088 assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1089 assert deltatar.filter_path('testing/something/else') == NO_MATCH
1091 def test_parent_matching_simple_full_backup(self):
1093 Create a full backup using parent matching
1099 password, paramversion = self.ENCRYPTION or (None, None)
1100 deltatar = DeltaTar(mode=self.MODE, password=password,
1101 crypto_paramversion=paramversion,
1102 logger=self.consoleLogger,
1103 included_files=included_files)
1105 # create first backup
1106 deltatar.create_full_backup(
1107 source_path="source_dir",
1108 backup_path="backup_dir")
1110 assert os.path.exists("backup_dir")
1111 shutil.rmtree("source_dir")
1113 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1114 tar_path = os.path.join("backup_dir", tar_filename)
1116 deltatar = DeltaTar(mode=self.MODE, password=password,
1117 logger=self.consoleLogger)
1118 deltatar.restore_backup(target_path="source_dir",
1119 backup_tar_path=tar_path)
1121 assert os.path.exists('source_dir/test/huge2')
1122 assert os.path.exists('source_dir/test/')
1123 assert not os.path.exists('source_dir/test/huge')
1124 assert not os.path.exists('source_dir/big')
1125 assert not os.path.exists('source_dir/small')
1127 def test_parent_matching_simple_full_backup_restore(self):
1129 Create a full backup and restores it using parent matching
1135 password, paramversion = self.ENCRYPTION or (None, None)
1136 deltatar = DeltaTar(mode=self.MODE, password=password,
1137 crypto_paramversion=paramversion,
1138 logger=self.consoleLogger)
1140 # create first backup
1141 deltatar.create_full_backup(
1142 source_path="source_dir",
1143 backup_path="backup_dir")
1145 assert os.path.exists("backup_dir")
1146 shutil.rmtree("source_dir")
1148 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1149 tar_path = os.path.join("backup_dir", tar_filename)
1151 deltatar = DeltaTar(mode=self.MODE, password=password,
1152 logger=self.consoleLogger,
1153 included_files=included_files)
1154 deltatar.restore_backup(target_path="source_dir",
1155 backup_tar_path=tar_path)
1157 assert os.path.exists('source_dir/test/huge2')
1158 assert os.path.exists('source_dir/test/')
1159 assert not os.path.exists('source_dir/test/huge')
1160 assert not os.path.exists('source_dir/big')
1161 assert not os.path.exists('source_dir/small')
1163 def test_parent_matching_index_full_backup_restore(self):
1165 Create a full backup and restores it using parent matching
1171 password, paramversion = self.ENCRYPTION or (None, None)
1172 deltatar = DeltaTar(mode=self.MODE, password=password,
1173 crypto_paramversion=paramversion,
1174 logger=self.consoleLogger)
1176 # create first backup
1177 deltatar.create_full_backup(
1178 source_path="source_dir",
1179 backup_path="backup_dir")
1181 assert os.path.exists("backup_dir")
1182 shutil.rmtree("source_dir")
1184 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1185 tar_path = os.path.join("backup_dir", tar_filename)
1187 deltatar = DeltaTar(mode=self.MODE, password=password,
1188 logger=self.consoleLogger,
1189 included_files=included_files)
1190 deltatar.restore_backup(target_path="source_dir",
1191 backup_tar_path=tar_path)
1193 assert os.path.exists('source_dir/test/huge2')
1194 assert os.path.exists('source_dir/test/')
1195 assert not os.path.exists('source_dir/test/huge')
1196 assert not os.path.exists('source_dir/big')
1197 assert not os.path.exists('source_dir/small')
1199 def test_collate_iterators(self):
1201 Tests the collate iterators functionality with two exact directories,
1202 using an index iterator from a backup and the exact same source dir.
1204 password, paramversion = self.ENCRYPTION or (None, None)
1205 deltatar = DeltaTar(mode=self.MODE, password=password,
1206 crypto_paramversion=paramversion,
1207 logger=self.consoleLogger)
1209 # create first backup
1210 deltatar.create_full_backup(
1211 source_path="source_dir",
1212 backup_path="backup_dir")
1214 assert os.path.exists("backup_dir")
1217 index_filename = deltatar.index_name_func(is_full=True)
1218 index_path = os.path.join(cwd, "backup_dir", index_filename)
1219 index_it = deltatar.iterate_index_path(index_path)
1221 os.chdir('source_dir')
1222 dir_it = deltatar._recursive_walk_dir('.')
1223 path_it = deltatar.jsonize_path_iterator(dir_it)
1226 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1227 assert deltatar._equal_stat_dicts(path1, path2)
1231 def test_collate_iterators_diffdirs(self):
1233 Use the collate iterators functionality with two different directories.
1234 It must behave in an expected way.
1236 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1238 password, paramversion = self.ENCRYPTION or (None, None)
1239 deltatar = DeltaTar(mode=self.MODE, password=password,
1240 crypto_paramversion=paramversion,
1241 logger=self.consoleLogger)
1243 # create first backup
1244 deltatar.create_full_backup(
1245 source_path="source_dir",
1246 backup_path="backup_dir")
1248 assert os.path.exists("backup_dir")
1249 self.hash["source_dir/z"] = self.create_file("source_dir/z", 100)
1252 index_filename = deltatar.index_name_func(is_full=True)
1253 index_path = os.path.join(cwd, "backup_dir", index_filename)
1254 index_it = deltatar.iterate_index_path(index_path)
1256 os.chdir('source_dir')
1257 dir_it = deltatar._recursive_walk_dir('.')
1258 path_it = deltatar.jsonize_path_iterator(dir_it)
1261 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1262 if path2['path'] == 'z':
1265 assert deltatar._equal_stat_dicts(path1, path2)
1269 def test_collate_iterators_diffdirs2(self):
1271 Use the collate iterators functionality with two different directories.
1272 It must behave in an expected way.
1274 password, paramversion = self.ENCRYPTION or (None, None)
1275 deltatar = DeltaTar(mode=self.MODE, password=password,
1276 crypto_paramversion=paramversion,
1277 logger=self.consoleLogger)
1279 # create first backup
1280 deltatar.create_full_backup(
1281 source_path="source_dir",
1282 backup_path="backup_dir")
1284 assert os.path.exists("backup_dir")
1286 # add some new files and directories
1287 os.makedirs('source_dir/bigdir')
1288 self.hash["source_dir/bigdir"] = ""
1289 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1290 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1291 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1294 index_filename = deltatar.index_name_func(is_full=True)
1295 index_path = os.path.join(cwd, "backup_dir", index_filename)
1296 index_it = deltatar.iterate_index_path(index_path)
1298 os.chdir('source_dir')
1299 dir_it = deltatar._recursive_walk_dir('.')
1300 path_it = deltatar.jsonize_path_iterator(dir_it)
1305 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1306 visited_pairs.append(
1307 (deltatar.unprefixed(path1['path']) if path1 else None,
1308 path2['path'] if path2 else None)
1311 assert visited_pairs == [
1314 (u'small', u'small'),
1317 (None, u'bigdir/a'),
1318 (None, u'bigdir/b'),
1319 (u'test/huge', u'test/huge'),
1320 (u'test/huge2', u'test/huge2'),
1321 (u'test/test2', u'test/test2'),
1325 def test_create_empty_diff_backup(self):
1327 Creates an empty (no changes) backup diff
1329 password, paramversion = self.ENCRYPTION or (None, None)
1330 deltatar = DeltaTar(mode=self.MODE, password=password,
1331 crypto_paramversion=paramversion,
1332 logger=self.consoleLogger)
1334 # create first backup
1335 deltatar.create_full_backup(
1336 source_path="source_dir",
1337 backup_path="backup_dir")
1339 prev_index_filename = deltatar.index_name_func(is_full=True)
1340 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1342 deltatar.create_diff_backup("source_dir", "backup_dir2",
1346 index_path = os.path.join("backup_dir2",
1347 deltatar.index_name_func(is_full=False))
1348 index_it = deltatar.iterate_index_path(index_path)
1352 assert i[0]['path'].startswith("list://")
1356 # check the tar file
1357 assert os.path.exists("backup_dir2")
1358 shutil.rmtree("source_dir")
1360 tar_filename = deltatar.volume_name_func('backup_dir2',
1361 is_full=False, volume_number=0)
1362 tar_path = os.path.join("backup_dir2", tar_filename)
1364 # no file restored, because the diff was empty
1365 deltatar.restore_backup(target_path="source_dir",
1366 backup_tar_path=tar_path)
1367 assert len(os.listdir("source_dir")) == 0
1370 def test_create_diff_backup1(self):
1372 Creates a diff backup when there are new files
1374 password, paramversion = self.ENCRYPTION or (None, None)
1375 deltatar = DeltaTar(mode=self.MODE, password=password,
1376 crypto_paramversion=paramversion,
1377 logger=self.consoleLogger)
1379 # create first backup
1380 deltatar.create_full_backup(
1381 source_path="source_dir",
1382 backup_path="backup_dir")
1384 prev_index_filename = deltatar.index_name_func(is_full=True)
1385 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1387 # add some new files and directories
1388 os.makedirs('source_dir/bigdir')
1389 self.hash["source_dir/bigdir"] = ""
1390 os.unlink("source_dir/small")
1391 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1392 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1393 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1395 deltatar.create_diff_backup("source_dir", "backup_dir2",
1399 index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
1400 index_it = deltatar.iterate_index_path(index_path)
1401 l = [i[0]['path'] for i in index_it]
1405 'snapshot://bigdir',
1409 'snapshot://bigdir/a',
1410 'snapshot://bigdir/b',
1412 'list://test/huge2',
1413 'list://test/test2',
1416 # check the tar file
1417 assert os.path.exists("backup_dir2")
1418 shutil.rmtree("source_dir")
1420 # create source_dir with the small file, that will be then deleted by
1421 # the restore_backup
1422 os.mkdir("source_dir")
1423 open("source_dir/small", 'wb').close()
1425 tar_filename = deltatar.volume_name_func('backup_dir2',
1426 is_full=False, volume_number=0)
1427 tar_path = os.path.join("backup_dir2", tar_filename)
1429 # restore the backup, this will create only the new files
1430 deltatar.restore_backup(target_path="source_dir",
1431 backup_tar_path=tar_path)
1432 # the order doesn't matter
1433 assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
1435 def test_restore_from_index_diff_backup(self):
1437 Creates a full backup, modifies some files, creates a diff backup,
1438 then restores the diff backup from zero.
1440 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1441 raise SkipTest('this test only works for uncompressed '
1442 'or concat compressed modes')
1444 password, paramversion = self.ENCRYPTION or (None, None)
1445 deltatar = DeltaTar(mode=self.MODE, password=password,
1446 crypto_paramversion=paramversion,
1447 logger=self.consoleLogger)
1449 # create first backup
1450 deltatar.create_full_backup(
1451 source_path="source_dir",
1452 backup_path="backup_dir")
1454 prev_index_filename = deltatar.index_name_func(is_full=True)
1455 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1457 # add some new files and directories
1458 os.makedirs('source_dir/bigdir')
1459 self.hash["source_dir/bigdir"] = ""
1460 os.unlink("source_dir/small")
1461 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1462 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1463 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1465 deltatar.create_diff_backup("source_dir", "backup_dir2",
1468 # apply diff backup in target_dir
1469 index_filename = deltatar.index_name_func(is_full=False)
1470 index_path = os.path.join("backup_dir2", index_filename)
1471 deltatar.restore_backup("target_dir",
1472 backup_indexes_paths=[index_path, prev_index_path])
1474 # then compare the two directories source_dir and target_dir and check
1476 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1478 def test_restore_from_index_diff_backup2(self):
1480 Creates a full backup, modifies some files, creates a diff backup,
1481 then restores the diff backup with the full backup as a starting point.
1483 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1484 raise SkipTest('this test only works for uncompressed '
1485 'or concat compressed modes')
1487 password, paramversion = self.ENCRYPTION or (None, None)
1488 deltatar = DeltaTar(mode=self.MODE, password=password,
1489 crypto_paramversion=paramversion,
1490 logger=self.consoleLogger)
1492 # create first backup
1493 deltatar.create_full_backup(
1494 source_path="source_dir",
1495 backup_path="backup_dir")
1497 prev_index_filename = deltatar.index_name_func(is_full=True)
1498 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1500 # add some new files and directories
1501 os.makedirs('source_dir/bigdir')
1502 self.hash["source_dir/bigdir"] = ""
1503 os.unlink("source_dir/small")
1504 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1505 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1506 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1507 shutil.rmtree("source_dir/test")
1509 deltatar.create_diff_backup("source_dir", "backup_dir2",
1512 # first restore initial backup in target_dir
1513 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1514 tar_path = os.path.join("backup_dir", tar_filename)
1515 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1517 # then apply diff backup in target_dir
1518 index_filename = deltatar.index_name_func(is_full=False)
1519 index_path = os.path.join("backup_dir2", index_filename)
1520 deltatar.restore_backup("target_dir",
1521 backup_indexes_paths=[index_path, prev_index_path])
1523 # then compare the two directories source_dir and target_dir and check
1525 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1527 def test_restore_from_index_diff_backup3(self):
1529 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1530 diff backup, then restores the diff backup with the full backup as a
1533 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1534 raise SkipTest('this test only works for uncompressed '
1535 'or concat compressed modes')
1537 password, paramversion = self.ENCRYPTION or (None, None)
1538 deltatar = DeltaTar(mode=self.MODE, password=password,
1539 crypto_paramversion=paramversion,
1540 logger=self.consoleLogger)
1542 shutil.rmtree("source_dir")
1543 shutil.copytree(self.GIT_DIR, "source_dir")
1544 shutil.copytree(self.GIT_DIR, "source_dir_diff")
1546 # create first backup
1547 deltatar.create_full_backup(
1548 source_path="source_dir",
1549 backup_path="backup_dir")
1551 prev_index_filename = deltatar.index_name_func(is_full=True)
1552 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1554 # alter the source_dir randomly
1555 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1557 for path in source_it:
1558 # if path doesn't exist (might have previously removed) ignore it.
1559 # also ignore it (i.e. do not change it) 70% of the time
1560 if not os.path.exists(path) or random.random() < 0.7:
1564 if os.path.isdir(path):
1569 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1572 # first restore initial backup in target_dir
1573 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1574 tar_path = os.path.join("backup_dir", tar_filename)
1575 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1577 # and check that target_dir equals to source_dir (which is the same as
1578 # self.GIT_DIR initially)
1579 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1581 # then apply diff backup in target_dir
1582 index_filename = deltatar.index_name_func(is_full=False)
1583 index_path = os.path.join("backup_dir2", index_filename)
1584 deltatar.restore_backup("target_dir",
1585 backup_indexes_paths=[index_path, prev_index_path])
1587 # and check that target_dir equals to source_dir_diff (the randomly
1588 # altered self.GIT_DIR directory)
1589 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1591 # then delete target_dir and apply diff backup from zero and check again
1592 shutil.rmtree("target_dir")
1593 deltatar.restore_backup("target_dir",
1594 backup_indexes_paths=[index_path, prev_index_path])
1596 # and check that target_dir equals to source_dir_diff (the randomly
1597 # altered self.GIT_DIR directory)
1598 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1600 def test_restore_from_index_diff_backup3_multivol(self):
1602 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1603 diff backup, then restores the diff backup with the full backup as a
1606 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1607 raise SkipTest('this test only works for uncompressed '
1608 'or concat compressed modes')
1610 password, paramversion = self.ENCRYPTION or (None, None)
1611 deltatar = DeltaTar(mode=self.MODE, password=password,
1612 crypto_paramversion=paramversion,
1613 logger=self.consoleLogger)
1615 shutil.rmtree("source_dir")
1616 shutil.copytree(self.GIT_DIR, "source_dir")
1617 shutil.copytree(self.GIT_DIR, "source_dir_diff")
1619 # create first backup
1620 deltatar.create_full_backup(
1621 source_path="source_dir",
1622 backup_path="backup_dir",
1625 prev_index_filename = deltatar.index_name_func(is_full=True)
1626 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1628 # alter the source_dir randomly
1629 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1631 for path in source_it:
1632 # if path doesn't exist (might have previously removed) ignore it.
1633 # also ignore it (i.e. do not change it) 70% of the time
1634 if not os.path.exists(path) or random.random() < 0.7:
1638 if os.path.isdir(path):
1643 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1644 prev_index_path, max_volume_size=1)
1646 # first restore initial backup in target_dir
1647 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1648 tar_path = os.path.join("backup_dir", tar_filename)
1649 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1651 # and check that target_dir equals to source_dir (which is the same as
1652 # self.GIT_DIR initially)
1653 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1655 # then apply diff backup in target_dir
1656 index_filename = deltatar.index_name_func(is_full=False)
1657 index_path = os.path.join("backup_dir2", index_filename)
1658 deltatar.restore_backup("target_dir",
1659 backup_indexes_paths=[index_path, prev_index_path])
1661 # and check that target_dir equals to source_dir_diff (the randomly
1662 # altered self.GIT_DIR directory)
1663 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1665 # then delete target_dir and apply diff backup from zero and check again
1666 shutil.rmtree("target_dir")
1667 deltatar.restore_backup("target_dir",
1668 backup_indexes_paths=[index_path, prev_index_path])
1670 # and check that target_dir equals to source_dir_diff (the randomly
1671 # altered self.GIT_DIR directory)
1672 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1674 def check_equal_dirs(self, path1, path2, deltatar):
1676 compare the two directories source_dir and target_dir and check
1679 source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
1680 source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
1681 target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
1682 target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1685 sitem = next(source_it)
1686 titem = next(target_it)
1687 except StopIteration:
1689 titem = next(target_it)
1690 raise Exception("iterators do not stop at the same time")
1691 except StopIteration:
1694 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
1695 except Exception as e:
1696 print("SITEM: " + str(sitem))
1697 print("TITEM: " + str(titem))
1700 def test_create_no_symlinks(self):
1702 Creates a full backup from different varieties of symlinks. The
1703 extracted archive may not contain any symlinks but the file contents
1706 os.system("rm -rf source_dir")
1707 os.makedirs("source_dir/symlinks")
1708 fd = os.open("source_dir/symlinks/valid_linkname",
1709 os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1710 os.write(fd, b"valid link target for symlink tests; please ignore\n")
1712 # first one is good, the rest points nowhere
1713 self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1714 self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1715 self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1716 self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
1717 password, paramversion = self.ENCRYPTION or (None, None)
1718 deltatar = DeltaTar(mode=self.MODE, password=password,
1719 crypto_paramversion=paramversion,
1720 logger=self.consoleLogger)
1722 # create first backup
1723 deltatar.create_full_backup(source_path="source_dir",
1724 backup_path="backup_dir")
1726 assert os.path.exists("backup_dir")
1727 shutil.rmtree("source_dir")
1728 assert not os.path.exists("source_dir")
1730 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1731 tar_path = os.path.join("backup_dir", tar_filename)
1733 deltatar.restore_backup(target_path="source_dir",
1734 backup_tar_path=tar_path)
1736 for _r, _ds, fs in os.walk("source_dir/symlinks"):
1737 # only the valid link plus the linked file may be found in the
1741 # the link must have been resolved and file contents must match
1743 assert not os.path.islink(f)
1744 with open("source_dir/symlinks/valid_linkname") as a:
1745 with open("source_dir/symlinks/whatever") as b:
1746 assert a.read() == b.read()
1748 def test_restore_with_symlinks(self):
1750 Creates a full backup containing different varieties of symlinks. All
1751 of them must be filtered out.
1753 password, paramversion = self.ENCRYPTION or (None, None)
1754 deltatar = DeltaTar(mode=self.MODE, password=password,
1755 crypto_paramversion=paramversion,
1756 logger=self.consoleLogger)
1758 # create first backup
1759 deltatar.create_full_backup(source_path="source_dir",
1760 backup_path="backup_dir")
1762 assert os.path.exists("backup_dir")
1763 shutil.rmtree("source_dir")
1765 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1766 tar_path = os.path.join("backup_dir", tar_filename)
1768 # add symlinks to existing archive
1770 def add_symlink (a, name, dst):
1771 l = tarfile.TarInfo("snapshot://%s" % name)
1772 l.type = tarfile.SYMTYPE
1778 with tarfile.open(tar_path,mode="a") as a:
1780 [ add_symlink(a, "symlinks/foo", "internal-file")
1781 , add_symlink(a, "symlinks/bar", "/absolute/path")
1782 , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1783 except tarfile.ReadError as e:
1784 if self.MODE == '#' or self.MODE.endswith ("gz"):
1788 except ValueError as e:
1789 if self.MODE.startswith ('#'):
1794 deltatar.restore_backup(target_path="source_dir",
1795 backup_tar_path=tar_path)
1797 # check what happened to our symlinks
1798 for name in checkme:
1799 fullpath = os.path.join("source_dir", name)
1800 assert not os.path.exists(fullpath)
1802 def test_restore_malicious_symlinks(self):
1804 Creates a full backup containing a symlink and a file of the same name.
1805 This simulates a symlink attack with a link pointing to some external
1806 path that is abused to write outside the extraction prefix.
1808 password, paramversion = self.ENCRYPTION or (None, None)
1809 deltatar = DeltaTar(mode=self.MODE, password=password,
1810 crypto_paramversion=paramversion,
1811 logger=self.consoleLogger)
1813 # create first backup
1814 deltatar.create_full_backup(source_path="source_dir",
1815 backup_path="backup_dir")
1817 assert os.path.exists("backup_dir")
1818 shutil.rmtree("source_dir")
1820 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1821 tar_path = os.path.join("backup_dir", tar_filename)
1823 # add symlinks to existing archive
1825 def add_symlink (a, name, dst):
1826 l = tarfile.TarInfo("snapshot://%s" % name)
1827 l.type = tarfile.SYMTYPE
1831 def add_file (a, name):
1832 f = tarfile.TarInfo("snapshot://%s" % name)
1833 f.type = tarfile.REGTYPE
1836 testpath = "symlinks/pernicious-link"
1837 testdst = "/tmp/does/not/exist"
1840 with tarfile.open(tar_path, mode="a") as a:
1841 add_symlink(a, testpath, testdst)
1842 add_symlink(a, testpath, testdst+"X")
1843 add_symlink(a, testpath, testdst+"XXX")
1844 add_file(a, testpath)
1845 except tarfile.ReadError as e:
1846 if self.MODE == '#' or self.MODE.endswith ("gz"):
1850 except ValueError as e:
1851 if self.MODE.startswith ('#'):
1852 pass # O_APPEND of concat archives not feasible
1856 deltatar.restore_backup(target_path="source_dir",
1857 backup_tar_path=tar_path)
1859 # check whether the link was extracted; deltatar seems to only ever
1860 # retrieve the first item it finds for a given path which in the case
1861 # at hand is a symlink to some non-existent path
1862 fullpath = os.path.join("source_dir", testpath)
1863 assert not os.path.exists(fullpath)
1865 class DeltaTar2Test(DeltaTarTest):
1867 Same as DeltaTar but with specific ":" mode
1872 class DeltaTarStreamTest(DeltaTarTest):
1874 Same as DeltaTar but with specific uncompressed stream mode
1879 class DeltaTarGzipTest(DeltaTarTest):
1881 Same as DeltaTar but with specific gzip mode
1884 MODE_COMPRESSES = True
1887 class DeltaTarGzipStreamTest(DeltaTarTest):
1889 Same as DeltaTar but with specific gzip stream mode
1892 MODE_COMPRESSES = True
1895 @skip('Bz2 tests are too slow..')
1896 class DeltaTarBz2Test(DeltaTarTest):
1898 Same as DeltaTar but with specific bz2 mode
1901 MODE_COMPRESSES = True
1904 @skip('Bz2 tests are too slow..')
1905 class DeltaTarBz2StreamTest(DeltaTarTest):
1907 Same as DeltaTar but with specific bz2 stream mode
1910 MODE_COMPRESSES = True
1913 class DeltaTarGzipConcatTest(DeltaTarTest):
1915 Same as DeltaTar but with specific gzip concat stream mode
1918 MODE_COMPRESSES = True
1921 class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
1923 Same as DeltaTar but with specific gzip aes128 concat stream mode
1926 ENCRYPTION = ('some magic key', 1)
1927 MODE_COMPRESSES = True
1930 class DeltaTarAes128ConcatTest(DeltaTarTest):
1932 Same as DeltaTar but with specific aes128 concat stream mode
1935 ENCRYPTION = ('some magic key', 1)