bpo-32713: Fix tarfile.itn for large/negative float values. (GH-5434)
[python-delta-tar] / testing / test_deltatar.py
... / ...
CommitLineData
1# Copyright (C) 2013 Intra2net AG
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU Lesser General Public License as published
5# by the Free Software Foundation; either version 3 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU Lesser General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program. If not, see
15# <http://www.gnu.org/licenses/lgpl-3.0.html>
16
17# Author: Eduardo Robles Elvira <edulix@wadobo.com>
18
19import errno
20import os
21import re
22import random
23import shutil
24import logging
25import binascii
26import json
27from datetime import datetime
28from functools import partial
29from unittest import skip, SkipTest
30
31import deltatar.tarfile as tarfile
32from deltatar.tarfile import TarFile
33from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
34from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35import deltatar.crypto as crypto
36
37from . import BaseTest
38from . import new_volume_handler
39
40# Enable warning messages from deltatar. This minimizes the SNR of
41# test runs, but none of the messages are meaningful in any way.
42VERBOSE_TEST_OUTPUT = False
43
44class DeltaTarTest(BaseTest):
45 """
46 Test backups
47 """
48 MODE = ''
49 MODE_COMPRESSES = False
50
51 ENCRYPTION = None # (password : str, paramversion : int) option
52
53 GIT_DIR = '.git'
54
55 FSTEST = None
56 FSAPI_SAVED = []
57
58 def setUp(self):
59 '''
60 Create base test data
61 '''
62 self.pwd = os.getcwd()
63 os.system('rm -rf target_dir source_dir* backup_dir* huge')
64 os.makedirs('source_dir/test/test2')
65 self.hash = dict()
66 self.hash["source_dir/test/test2"] = ''
67 self.hash["source_dir/big"] = self.create_file("source_dir/big", 50000)
68 self.hash["source_dir/small"] = self.create_file("source_dir/small", 100)
69 self.hash["source_dir/test/huge"] = self.create_file("source_dir/test/huge", 700000)
70 self.hash["source_dir/test/huge2"] = self.create_file("source_dir/test/huge2", 800000)
71
72 self.consoleLogger = None
73 if VERBOSE_TEST_OUTPUT is True:
74 self.consoleLogger = logging.StreamHandler()
75 self.consoleLogger.setLevel(logging.DEBUG)
76
77 if not os.path.isdir(self.GIT_DIR):
78 # Not running inside git tree, take our
79 # own testing directory as source.
80 self.GIT_DIR = 'testing'
81
82 if not os.path.isdir(self.GIT_DIR):
83 raise Exception('No input directory found: ' + self.GIT_DIR)
84
85 if self.FSTEST is not None:
86 self.FSTEST ()
87
88 def tearDown(self):
89 '''
90 Remove temporary files created by unit tests and restore the API
91 functions in *os*.
92 '''
93 for att, val in self.FSAPI_SAVED:
94 setattr (os, att, val)
95 os.chdir(self.pwd)
96 os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
97 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
98 ("I am fully aware that this will void my warranty.")
99
100 def test_restore_simple_full_backup(self):
101 '''
102 Creates a full backup without any filtering and restores it.
103 '''
104 password, paramversion = self.ENCRYPTION or (None, None)
105 deltatar = DeltaTar(mode=self.MODE, password=password,
106 crypto_paramversion=paramversion,
107 logger=self.consoleLogger)
108
109 # create first backup
110 deltatar.create_full_backup(
111 source_path="source_dir",
112 backup_path="backup_dir")
113
114 assert os.path.exists("backup_dir")
115 shutil.rmtree("source_dir")
116
117 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
118 tar_path = os.path.join("backup_dir", tar_filename)
119
120 deltatar.restore_backup(target_path="source_dir",
121 backup_tar_path=tar_path)
122
123 for key, value in self.hash.items():
124 assert os.path.exists(key)
125 if value:
126 assert value == self.md5sum(key)
127
128
129 def test_create_backup_max_file_length (self):
130 """
131 Creates a full backup including one file that exceeds the (purposely
132 lowered) upper bound on GCM encrypted objects. This will yield multiple
133 encrypted objects for one plaintext file.
134
135 Success is verified by splitting the archive at object boundaries and
136 counting the parts.
137 """
138 if self.MODE_COMPRESSES is True:
139 raise SkipTest ("GCM file length test not meaningful with compression.")
140 if self.ENCRYPTION is None:
141 raise SkipTest ("GCM file length applies only to encrypted backups.")
142
143 new_max = 20000 # cannot be less than tar block size
144 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
145 ("I am fully aware that this will void my warranty.",
146 new_max)
147
148 password, paramversion = self.ENCRYPTION
149 deltatar = DeltaTar (mode=self.MODE, password=password,
150 crypto_paramversion=paramversion,
151 logger=self.consoleLogger)
152
153 self.hash = dict ()
154 os.makedirs ("source_dir2")
155 for f, s in [("empty" , 0) # 1 tar objects
156 ,("slightly_larger", new_max + 1) # 2
157 ,("twice" , 2 * new_max) # 3
158 ]:
159 f = "source_dir2/%s" % f
160 self.hash [f] = self.create_file (f, s)
161
162 deltatar.create_full_backup \
163 (source_path="source_dir2", backup_path="backup_dir")
164
165 assert os.path.exists ("backup_dir")
166 shutil.rmtree ("source_dir2")
167
168 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
169 backup_path = os.path.join("backup_dir", backup_filename)
170
171 # split the resulting archive into its constituents without
172 # decrypting
173 ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
174 "-o backup_dir/split <\'%s\'" % backup_path)
175
176 assert os.path.exists ("backup_dir/split")
177
178 dents = os.listdir ("backup_dir/split")
179 assert len (dents) == 6
180
181
182 def test_restore_backup_max_file_length (self):
183 """
184 Creates a full backup including one file that exceeds the (purposely
185 lowered) upper bound on GCM encrypted objects. This will yield two
186 encrypted objects for one plaintext file.
187
188 Success is verified by splitting the archive at object boundaries and
189 counting the parts.
190 """
191 if self.MODE_COMPRESSES is True:
192 raise SkipTest ("GCM file length test not meaningful with compression.")
193 if self.ENCRYPTION is None:
194 raise SkipTest ("GCM file length applies only to encrypted backups.")
195
196 new_max = 20000 # cannot be less than tar block size
197 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
198 ("I am fully aware that this will void my warranty.",
199 new_max)
200
201 password, paramversion = self.ENCRYPTION
202 deltatar = DeltaTar (mode=self.MODE, password=password,
203 crypto_paramversion=paramversion,
204 logger=self.consoleLogger)
205
206 self.hash = dict ()
207 os.makedirs ("source_dir2")
208 for f, s in [("empty" , 0) # 1 tar objects
209 ,("almost_large" , new_max - 1) # 2
210 ,("large" , new_max) # 3
211 ,("slightly_larger", new_max + 1) # 4
212 ,("twice" , 2 * new_max) # 5
213 ,("twice_plus_one" , (2 * new_max) + 1) # 6
214 ]:
215 f = "source_dir2/%s" % f
216 self.hash [f] = self.create_file (f, s)
217
218 deltatar.create_full_backup \
219 (source_path="source_dir2", backup_path="backup_dir")
220
221 assert os.path.exists ("backup_dir")
222 shutil.rmtree ("source_dir2")
223
224 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
225 backup_path = os.path.join("backup_dir", backup_filename)
226
227 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
228 tar_path = os.path.join("backup_dir", tar_filename)
229
230 deltatar.restore_backup(target_path="source_dir2",
231 backup_tar_path=tar_path)
232
233 for key, value in self.hash.items():
234 assert os.path.exists(key)
235 if value:
236 assert value == self.md5sum(key)
237
238
239 def test_create_backup_index_max_file_length (self):
240 """
241 Creates a full backup with a too large index file for the upper bound
242 of the GCM encryption. Since the index file has a fixed IV file counter
243 of AES_GCM_IV_CNT_INDEX, we expect the crypto layer to abort.
244
245 60+ GB of (potentially compressed) index file should last for a while...
246 """
247 if self.MODE_COMPRESSES is True:
248 raise SkipTest ("GCM file length test not meaningful with compression.")
249 if self.ENCRYPTION is None:
250 raise SkipTest ("GCM file length applies only to encrypted backups.")
251
252 new_max = 5000
253 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
254 ("I am fully aware that this will void my warranty.",
255 new_max)
256
257 password, paramversion = self.ENCRYPTION
258 deltatar = DeltaTar (mode=self.MODE, password=password,
259 crypto_paramversion=paramversion,
260 logger=self.consoleLogger)
261
262 self.hash = dict ()
263 os.makedirs ("source_dir2")
264 for i in range (42):
265 f = "source_dir2/dummy_%rd" % i
266 self.hash [f] = self.create_file (f, i)
267
268 with self.assertRaises (crypto.InvalidFileCounter):
269 deltatar.create_full_backup \
270 (source_path="source_dir2", backup_path="backup_dir")
271 shutil.rmtree ("source_dir2")
272
273
274 def test_check_index_checksum(self):
275 '''
276 Creates a full backup and checks the index' checksum of files
277 '''
278 password, paramversion = self.ENCRYPTION or (None, None)
279 deltatar = DeltaTar(mode=self.MODE, password=password,
280 crypto_paramversion=paramversion,
281 logger=self.consoleLogger)
282
283 # create first backup
284 deltatar.create_full_backup(
285 source_path="source_dir",
286 backup_path="backup_dir")
287
288
289 index_filename = deltatar.index_name_func(True)
290 index_path = os.path.join("backup_dir", index_filename)
291
292 f = open(index_path, 'rb')
293 crc = None
294 checked = False
295 began_list = False
296 while True:
297 l = f.readline()
298 if l == b'':
299 break
300 if b'BEGIN-FILE-LIST' in l:
301 crc = binascii.crc32(l) & 0xFFFFffff
302 began_list = True
303 elif b'END-FILE-LIST' in l:
304 crc = binascii.crc32(l, crc) & 0xffffffff
305
306 # next line contains the crc
307 data = json.loads(f.readline().decode("UTF-8"))
308 assert data['type'] == 'file-list-checksum'
309 assert data['checksum'] == crc
310 checked = True
311 break
312 elif began_list:
313 crc = binascii.crc32(l, crc) & 0xffffffff
314 f.close()
315
316
317 def test_restore_multivol(self):
318 '''
319 Creates a full backup without any filtering with multiple volumes and
320 restore it.
321 '''
322 if ':gz' in self.MODE:
323 raise SkipTest('compression information is lost when creating '
324 'multiple volumes with no Stream')
325
326 password, paramversion = self.ENCRYPTION or (None, None)
327 deltatar = DeltaTar(mode=self.MODE, password=password,
328 crypto_paramversion=paramversion,
329 logger=self.consoleLogger)
330
331 self.hash = dict()
332 os.makedirs('source_dir2')
333 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
334 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
335
336 # create first backup
337 deltatar.create_full_backup(
338 source_path="source_dir2",
339 backup_path="backup_dir",
340 max_volume_size=1)
341
342 assert os.path.exists("backup_dir")
343 assert os.path.exists(os.path.join("backup_dir",
344 deltatar.volume_name_func("backup_dir", True, 0)))
345 if self.MODE_COMPRESSES:
346 n_vols = 1
347 else:
348 n_vols = 2
349 for i_vol in range(n_vols):
350 assert os.path.exists(os.path.join("backup_dir",
351 deltatar.volume_name_func("backup_dir", True, i_vol)))
352 assert not os.path.exists(os.path.join("backup_dir",
353 deltatar.volume_name_func("backup_dir", True, n_vols)))
354
355 shutil.rmtree("source_dir2")
356
357 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
358 tar_path = os.path.join("backup_dir", tar_filename)
359
360 # this should automatically restore all volumes
361 deltatar.restore_backup(target_path="source_dir2",
362 backup_tar_path=tar_path)
363
364 for key, value in self.hash.items():
365 assert os.path.exists(key)
366 if value:
367 assert value == self.md5sum(key)
368
369 def test_restore_multivol_split(self):
370 '''
371 Creates a full backup without any filtering with multiple volumes
372 with big files bigger than the max volume size and
373 restore it.
374 '''
375 if self.MODE.startswith(':') or self.MODE.startswith('|'):
376 raise SkipTest('this test only works for uncompressed '
377 'or concat compressed modes')
378
379 password, paramversion = self.ENCRYPTION or (None, None)
380 deltatar = DeltaTar(mode=self.MODE, password=password,
381 crypto_paramversion=paramversion,
382 logger=self.consoleLogger)
383
384 self.hash = dict()
385 os.makedirs('source_dir2')
386 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 3*1024*1024)
387 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 4*1024*1024)
388 self.hash["source_dir2/huge2"] = self.create_file("source_dir2/huge2", 4*1024*1024)
389
390 # create first backup
391 deltatar.create_full_backup(
392 source_path="source_dir2",
393 backup_path="backup_dir",
394 max_volume_size=2)
395
396 assert os.path.exists("backup_dir")
397 assert os.path.exists(os.path.join("backup_dir",
398 deltatar.volume_name_func("backup_dir", True, 0)))
399 if self.MODE_COMPRESSES:
400 n_vols = 1
401 else:
402 n_vols = 6
403 for i_vol in range(n_vols):
404 assert os.path.exists(os.path.join("backup_dir",
405 deltatar.volume_name_func("backup_dir", True, i_vol)))
406 assert not os.path.exists(os.path.join("backup_dir",
407 deltatar.volume_name_func("backup_dir", True, n_vols)))
408
409 shutil.rmtree("source_dir2")
410
411 index_filename = deltatar.index_name_func(True)
412 index_path = os.path.join("backup_dir", index_filename)
413
414 deltatar.restore_backup(target_path="source_dir2",
415 backup_indexes_paths=[index_path])
416
417 for key, value in self.hash.items():
418 assert os.path.exists(key)
419 if value:
420 assert value == self.md5sum(key)
421
422
423 def test_full_backup_index_extra_data(self):
424 '''
425 Tests that the index file for a full backup can store extra_data and
426 that this data can be retrieved.
427 '''
428 password, paramversion = self.ENCRYPTION or (None, None)
429 deltatar = DeltaTar(mode=self.MODE, password=password,
430 crypto_paramversion=paramversion,
431 logger=self.consoleLogger)
432
433 extra_data = dict(
434 hola="caracola",
435 otra_cosa=[1, "lista"],
436 y_otra=dict(bola=1.1)
437 )
438
439 deltatar.create_full_backup(
440 source_path="source_dir",
441 backup_path="backup_dir",
442 extra_data=extra_data)
443
444 index_filename = deltatar.index_name_func(is_full=True)
445 index_path = os.path.join("backup_dir", index_filename)
446
447 # iterate_index_path retrieves extra_data, and thus we can then compare
448 index_it = deltatar.iterate_index_path(index_path)
449 self.assertEqual(index_it.extra_data, extra_data)
450
451
452 def test_diff_backup_index_extra_data(self):
453 '''
454 Tests that the index file for a diff backup can store extra_data and
455 that this data can be retrieved.
456 '''
457 password, paramversion = self.ENCRYPTION or (None, None)
458 deltatar = DeltaTar(mode=self.MODE, password=password,
459 crypto_paramversion=paramversion,
460 logger=self.consoleLogger)
461
462 extra_data = dict(
463 hola="caracola",
464 otra_cosa=[1, "lista"],
465 y_otra=dict(bola=1.1)
466 )
467 # do first backup
468 deltatar.create_full_backup(
469 source_path="source_dir",
470 backup_path="backup_dir")
471
472
473 prev_index_filename = deltatar.index_name_func(is_full=True)
474 prev_index_path = os.path.join("backup_dir", prev_index_filename)
475
476 # create empty diff backup
477 deltatar.create_diff_backup("source_dir", "backup_dir2",
478 prev_index_path, extra_data=extra_data)
479
480 index_filename = deltatar.index_name_func(is_full=False)
481 index_path = os.path.join("backup_dir2", index_filename)
482
483 # iterate_index_path retrieves extra_data, and thus we can then compare
484 index_it = deltatar.iterate_index_path(index_path)
485 self.assertEqual(index_it.extra_data, extra_data)
486
487 def test_restore_multivol2(self):
488 '''
489 Creates a full backup without any filtering with multiple volumes and
490 restore it.
491 '''
492 password, paramversion = self.ENCRYPTION or (None, None)
493 deltatar = DeltaTar(mode=self.MODE, password=password,
494 crypto_paramversion=paramversion,
495 logger=self.consoleLogger)
496
497 shutil.copytree(self.GIT_DIR, "source_dir2")
498
499 # create first backup
500 deltatar.create_full_backup(
501 source_path="source_dir2",
502 backup_path="backup_dir",
503 max_volume_size=1)
504
505 assert os.path.exists("backup_dir")
506 assert os.path.exists(os.path.join("backup_dir",
507 deltatar.volume_name_func("backup_dir", True, 0)))
508
509 shutil.rmtree("source_dir2")
510
511 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
512 tar_path = os.path.join("backup_dir", tar_filename)
513
514 # this should automatically restore all volumes
515 deltatar.restore_backup(target_path="source_dir2",
516 backup_tar_path=tar_path)
517
518 self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
519
520 def test_restore_multivol_manual_from_index(self):
521 '''
522 Creates a full backup without any filtering with multiple volumes and
523 restore it.
524 '''
525 # this test only works for uncompressed or concat compressed modes
526 if self.MODE.startswith(':') or self.MODE.startswith('|'):
527 raise SkipTest('this test only works for uncompressed '
528 'or concat compressed modes')
529
530 password, paramversion = self.ENCRYPTION or (None, None)
531 deltatar = DeltaTar(mode=self.MODE, password=password,
532 crypto_paramversion=paramversion,
533 logger=self.consoleLogger)
534
535 self.hash = dict()
536 os.makedirs('source_dir2')
537 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
538 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
539
540 # create first backup
541 deltatar.create_full_backup(
542 source_path="source_dir2",
543 backup_path="backup_dir",
544 max_volume_size=1)
545
546 assert os.path.exists("backup_dir")
547 assert os.path.exists(os.path.join("backup_dir",
548 deltatar.volume_name_func("backup_dir", True, 0)))
549 if self.MODE_COMPRESSES:
550 n_vols = 1
551 else:
552 n_vols = 2
553 for i_vol in range(n_vols):
554 assert os.path.exists(os.path.join("backup_dir",
555 deltatar.volume_name_func("backup_dir", True, i_vol)))
556 assert not os.path.exists(os.path.join("backup_dir",
557 deltatar.volume_name_func("backup_dir", True, n_vols)))
558
559 shutil.rmtree("source_dir2")
560
561 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
562 tar_path = os.path.join("backup_dir", tar_filename)
563
564 index_filename = deltatar.index_name_func(True)
565 index_path = os.path.join("backup_dir", index_filename)
566
567 # this should automatically restore the huge file
568 f = deltatar.open_auxiliary_file(index_path, 'r')
569 offset = None
570 while True:
571 l = f.readline()
572 if not len(l):
573 break
574 data = json.loads(l.decode('UTF-8'))
575 if data.get('type', '') == 'file' and\
576 deltatar.unprefixed(data['path']) == "huge":
577 offset = data['offset']
578 break
579
580 assert offset is not None
581
582 fo = open(tar_path, 'rb')
583 fo.seek(offset)
584 def new_volume_handler(mode, tarobj, base_name, volume_number):
585 suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
586 if self.ENCRYPTION is not None:
587 # deltatar module is shadowed here
588 suf += "." + deltatar_PDTCRYPT_EXTENSION
589 tarobj.open_volume(datetime.now().strftime(
590 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
591 new_volume_handler = partial(new_volume_handler, self.MODE)
592
593 crypto_ctx = None
594 if self.ENCRYPTION is not None:
595 crypto_ctx = crypto.Decrypt (password)
596
597 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
598 new_volume_handler=new_volume_handler,
599 encryption=crypto_ctx)
600
601 member = tarobj.next()
602 member.path = deltatar.unprefixed(member.path)
603 member.name = deltatar.unprefixed(member.name)
604 tarobj.extract(member)
605 tarobj.close()
606 fo.close()
607 assert self.hash['source_dir2/huge'] == self.md5sum('huge')
608
609 os.unlink("huge")
610
611
612 def test_restore_manual_from_index_twice (self):
613 """
614 Creates a full backup and restore the same file twice. This *must* fail
615 when encryption is active.
616
617 Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
618 backwards within the same file. This prevents the encryption layer from
619 exploding due to a reused IV in an overall valid archive.
620
621 This test anticipates possible future mistakes since it’s entirely
622 feasible to implement backward seeks for *_Stream* with concat mode.
623 """
624 # this test only works for uncompressed or concat compressed modes
625 if self.MODE.startswith("|") or self.MODE_COMPRESSES:
626 raise SkipTest("this test only works for uncompressed "
627 "or concat compressed modes")
628
629 password, paramversion = self.ENCRYPTION or (None, None)
630 deltatar = DeltaTar(mode=self.MODE, password=password,
631 crypto_paramversion=paramversion,
632 logger=self.consoleLogger)
633
634 self.hash = dict()
635 os.makedirs("source_dir2")
636 self.hash["source_dir2/samefile"] = \
637 self.create_file("source_dir2/samefile", 1 * 1024)
638
639 # create first backup
640 deltatar.create_full_backup(
641 source_path="source_dir2",
642 backup_path="backup_dir")
643
644 assert os.path.exists("backup_dir")
645 assert os.path.exists(os.path.join("backup_dir",
646 deltatar.volume_name_func("backup_dir", True, 0)))
647
648 shutil.rmtree("source_dir2")
649
650 tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
651 tar_path = os.path.join("backup_dir", tar_filename)
652
653 index_filename = deltatar.index_name_func(True)
654 index_path = os.path.join("backup_dir", index_filename)
655
656 f = deltatar.open_auxiliary_file(index_path, "r")
657 offset = None
658 while True:
659 l = f.readline()
660 if not len(l):
661 break
662 data = json.loads(l.decode("UTF-8"))
663 if data.get("type", "") == "file" and\
664 deltatar.unprefixed(data["path"]) == "samefile":
665 offset = data["offset"]
666 break
667
668 assert offset is not None
669
670 fo = open(tar_path, "rb")
671 fo.seek(offset)
672
673 crypto_ctx = None
674 if self.ENCRYPTION is not None:
675 crypto_ctx = crypto.Decrypt (password)
676
677 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
678 encryption=crypto_ctx)
679 member = tarobj.next()
680 member.path = deltatar.unprefixed(member.path)
681 member.name = deltatar.unprefixed(member.name)
682
683 # extract once …
684 tarobj.extract(member)
685 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
686
687 # … and twice
688 try:
689 tarobj.extract(member)
690 except tarfile.StreamError:
691 if crypto_ctx is not None:
692 pass # good: seeking backwards not allowed
693 else:
694 raise
695 tarobj.close()
696 fo.close()
697 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
698
699 os.unlink("samefile")
700
701
702 def test_restore_from_index(self):
703 '''
704 Restores a full backup using an index file.
705 '''
706 if self.MODE.startswith(':') or self.MODE.startswith('|'):
707 raise SkipTest('this test only works for uncompressed '
708 'or concat compressed modes')
709
710 password, paramversion = self.ENCRYPTION or (None, None)
711 deltatar = DeltaTar(mode=self.MODE, password=password,
712 crypto_paramversion=paramversion,
713 logger=self.consoleLogger)
714
715 # create first backup
716 deltatar.create_full_backup(
717 source_path="source_dir",
718 backup_path="backup_dir")
719
720 shutil.rmtree("source_dir")
721
722 # this should automatically restore all volumes
723 index_filename = deltatar.index_name_func(True)
724 index_path = os.path.join("backup_dir", index_filename)
725
726 deltatar.restore_backup(target_path="source_dir",
727 backup_indexes_paths=[index_path])
728
729 for key, value in self.hash.items():
730 assert os.path.exists(key)
731 if value:
732 assert value == self.md5sum(key)
733
734 def test_restore_multivol_from_index(self):
735 '''
736 Restores a full multivolume backup using an index file.
737 '''
738 if self.MODE.startswith(':') or self.MODE.startswith('|'):
739 raise SkipTest('this test only works for uncompressed '
740 'or concat compressed modes')
741
742 password, paramversion = self.ENCRYPTION or (None, None)
743 deltatar = DeltaTar(mode=self.MODE, password=password,
744 crypto_paramversion=paramversion,
745 logger=self.consoleLogger)
746
747 # create first backup
748 deltatar.create_full_backup(
749 source_path="source_dir",
750 backup_path="backup_dir",
751 max_volume_size=2)
752
753 shutil.rmtree("source_dir")
754
755 # this should automatically restore all volumes
756 index_filename = deltatar.index_name_func(True)
757 index_path = os.path.join("backup_dir", index_filename)
758
759 deltatar.restore_backup(target_path="source_dir",
760 backup_indexes_paths=[index_path])
761
762 for key, value in self.hash.items():
763 assert os.path.exists(key)
764 if value:
765 assert value == self.md5sum(key)
766
767 def test_create_basic_filtering(self):
768 '''
769 Tests create backup basic filtering.
770 '''
771 password, paramversion = self.ENCRYPTION or (None, None)
772 deltatar = DeltaTar(mode=self.MODE, password=password,
773 crypto_paramversion=paramversion,
774 logger=self.consoleLogger,
775 included_files=["test", "small"],
776 excluded_files=["test/huge"])
777
778 # create first backup
779 deltatar.create_full_backup(
780 source_path="source_dir",
781 backup_path="backup_dir")
782
783 assert os.path.exists("backup_dir")
784 shutil.rmtree("source_dir")
785
786 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
787 tar_path = os.path.join("backup_dir", tar_filename)
788
789 deltatar.restore_backup(target_path="source_dir",
790 backup_tar_path=tar_path)
791
792 assert os.path.exists("source_dir/small")
793 assert os.path.exists("source_dir/test")
794 assert os.path.exists("source_dir/test/huge2")
795 assert os.path.exists("source_dir/test/test2")
796
797 assert not os.path.exists("source_dir/test/huge")
798 assert not os.path.exists("source_dir/big")
799
800 def test_create_filter_func(self):
801 '''
802 Tests create backup basic filtering.
803 '''
804 visited_paths = []
805 def filter_func(visited_paths, path):
806 if path not in visited_paths:
807 visited_paths.append(path)
808 return True
809
810 filter_func = partial(filter_func, visited_paths)
811
812 password, paramversion = self.ENCRYPTION or (None, None)
813 deltatar = DeltaTar(mode=self.MODE, password=password,
814 crypto_paramversion=paramversion,
815 logger=self.consoleLogger,
816 included_files=["test", "small"],
817 excluded_files=["test/huge"],
818 filter_func=filter_func)
819
820 # create first backup
821 deltatar.create_full_backup(
822 source_path="source_dir",
823 backup_path="backup_dir")
824
825 assert os.path.exists("backup_dir")
826 shutil.rmtree("source_dir")
827
828 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
829 tar_path = os.path.join("backup_dir", tar_filename)
830
831 deltatar.restore_backup(target_path="source_dir",
832 backup_tar_path=tar_path)
833 assert set(visited_paths) == set([
834 'small',
835 'test',
836 'test/huge2',
837 'test/test2'
838 ])
839
840 def test_create_filter_out_func(self):
841 '''
842 Tests create backup basic filtering.
843 '''
844 visited_paths = []
845 def filter_func(visited_paths, path):
846 '''
847 Filter out everything
848 '''
849 if path not in visited_paths:
850 visited_paths.append(path)
851 return False
852
853 filter_func = partial(filter_func, visited_paths)
854
855 password, paramversion = self.ENCRYPTION or (None, None)
856 deltatar = DeltaTar(mode=self.MODE, password=password,
857 crypto_paramversion=paramversion,
858 logger=self.consoleLogger,
859 included_files=["test", "small"],
860 excluded_files=["test/huge"],
861 filter_func=filter_func)
862
863 # create first backup
864 deltatar.create_full_backup(
865 source_path="source_dir",
866 backup_path="backup_dir")
867
868 assert os.path.exists("backup_dir")
869 shutil.rmtree("source_dir")
870
871 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
872 tar_path = os.path.join("backup_dir", tar_filename)
873
874 deltatar.restore_backup(target_path="source_dir",
875 backup_tar_path=tar_path)
876 assert set(visited_paths) == set([
877 'small',
878 'test'
879 ])
880
881 # check that effectively no file was backed up
882 assert not os.path.exists("source_dir/small")
883 assert not os.path.exists("source_dir/big")
884 assert not os.path.exists("source_dir/test")
885
886 def test_restore_index_basic_filtering(self):
887 '''
888 Creates a backup, and then filter when doing the index based restore.
889 '''
890 if self.MODE.startswith(':') or self.MODE.startswith('|'):
891 raise SkipTest('this test only works for uncompressed '
892 'or concat compressed modes')
893
894 password, paramversion = self.ENCRYPTION or (None, None)
895 deltatar = DeltaTar(mode=self.MODE, password=password,
896 crypto_paramversion=paramversion,
897 logger=self.consoleLogger)
898
899 # create first backup
900 deltatar.create_full_backup(
901 source_path="source_dir",
902 backup_path="backup_dir")
903
904 assert os.path.exists("backup_dir")
905 shutil.rmtree("source_dir")
906
907 index_filename = deltatar.index_name_func(True)
908 index_path = os.path.join("backup_dir", index_filename)
909
910 deltatar.included_files = ["test", "small"]
911 deltatar.excluded_files = ["test/huge"]
912 deltatar.restore_backup(target_path="source_dir",
913 backup_indexes_paths=[index_path])
914
915 assert os.path.exists("source_dir/small")
916 assert os.path.exists("source_dir/test")
917 assert os.path.exists("source_dir/test/huge2")
918 assert os.path.exists("source_dir/test/test2")
919
920 assert not os.path.exists("source_dir/test/huge")
921 assert not os.path.exists("source_dir/big")
922
923 def test_restore_index_filter_func(self):
924 '''
925 Creates a backup, and then filter when doing the index based restore,
926 using the filter function.
927 '''
928 if self.MODE.startswith(':') or self.MODE.startswith('|'):
929 raise SkipTest('this test only works for uncompressed '
930 'or concat compressed modes')
931
932 visited_paths = []
933 def filter_func(visited_paths, path):
934 if path not in visited_paths:
935 visited_paths.append(path)
936 return True
937
938 filter_func = partial(filter_func, visited_paths)
939
940 password, paramversion = self.ENCRYPTION or (None, None)
941 deltatar = DeltaTar(mode=self.MODE, password=password,
942 crypto_paramversion=paramversion,
943 logger=self.consoleLogger)
944
945 # create first backup
946 deltatar.create_full_backup(
947 source_path="source_dir",
948 backup_path="backup_dir")
949
950 assert os.path.exists("backup_dir")
951 shutil.rmtree("source_dir")
952
953 index_filename = deltatar.index_name_func(True)
954 index_path = os.path.join("backup_dir", index_filename)
955
956 deltatar.included_files = ["test", "small"]
957 deltatar.excluded_files = ["test/huge"]
958 deltatar.filter_func = filter_func
959 deltatar.restore_backup(target_path="source_dir",
960 backup_indexes_paths=[index_path])
961
962 assert set(visited_paths) == set([
963 'small',
964 'test',
965 'test/huge2',
966 'test/test2'
967 ])
968
969 def test_restore_tar_basic_filtering(self):
970 '''
971 Creates a backup, and then filter when doing the tar based restore.
972 '''
973 password, paramversion = self.ENCRYPTION or (None, None)
974 deltatar = DeltaTar(mode=self.MODE, password=password,
975 crypto_paramversion=paramversion,
976 logger=self.consoleLogger)
977
978 # create first backup
979 deltatar.create_full_backup(
980 source_path="source_dir",
981 backup_path="backup_dir")
982
983 assert os.path.exists("backup_dir")
984 shutil.rmtree("source_dir")
985
986 deltatar.included_files = ["test", "small"]
987 deltatar.excluded_files = ["test/huge"]
988
989 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
990 tar_path = os.path.join("backup_dir", tar_filename)
991
992 deltatar.restore_backup(target_path="source_dir",
993 backup_tar_path=tar_path)
994
995 assert os.path.exists("source_dir/small")
996 assert os.path.exists("source_dir/test")
997 assert os.path.exists("source_dir/test/huge2")
998 assert os.path.exists("source_dir/test/test2")
999
1000 assert not os.path.exists("source_dir/test/huge")
1001 assert not os.path.exists("source_dir/big")
1002
1003 def test_restore_tar_filter_func(self):
1004 '''
1005 Creates a backup, and then filter when doing the tar based restore,
1006 using the filter function.
1007 '''
1008 visited_paths = []
1009 def filter_func(visited_paths, path):
1010 if path not in visited_paths:
1011 visited_paths.append(path)
1012 return True
1013
1014 filter_func = partial(filter_func, visited_paths)
1015
1016 password, paramversion = self.ENCRYPTION or (None, None)
1017 deltatar = DeltaTar(mode=self.MODE, password=password,
1018 crypto_paramversion=paramversion,
1019 logger=self.consoleLogger)
1020
1021 # create first backup
1022 deltatar.create_full_backup(
1023 source_path="source_dir",
1024 backup_path="backup_dir")
1025
1026 assert os.path.exists("backup_dir")
1027 shutil.rmtree("source_dir")
1028
1029 index_filename = deltatar.index_name_func(True)
1030 index_path = os.path.join("backup_dir", index_filename)
1031
1032 deltatar.included_files = ["test", "small"]
1033 deltatar.excluded_files = ["test/huge"]
1034 deltatar.filter_func = filter_func
1035
1036 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1037 tar_path = os.path.join("backup_dir", tar_filename)
1038
1039 deltatar.restore_backup(target_path="source_dir",
1040 backup_tar_path=tar_path)
1041 assert set(visited_paths) == set([
1042 'small',
1043 'test',
1044 'test/huge2',
1045 'test/test2'
1046 ])
1047
1048 def test_filter_path_regexp(self):
1049 '''
1050 Test specifically the deltatar.filter_path function with regular
1051 expressions
1052 '''
1053 included_files = [
1054 re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1055 re.compile('^yes$'),
1056 'testing'
1057 ]
1058 excluded_files = [
1059 re.compile('^testing/in_the'),
1060 ]
1061 deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1062 excluded_files=excluded_files)
1063
1064 # assert valid and invalid paths
1065 assert deltatar.filter_path('test/hola')
1066 assert deltatar.filter_path('test/hola/any/thing')
1067 assert deltatar.filter_path('test/caracola/caracolero')
1068 assert deltatar.filter_path('test/caracola/caracolero/yeah')
1069 assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1070 assert deltatar.filter_path('yes')
1071 assert deltatar.filter_path('testing')
1072 assert deltatar.filter_path('testing/yes')
1073 assert deltatar.filter_path('testing/in_th')
1074
1075 assert not deltatar.filter_path('something')
1076 assert not deltatar.filter_path('other/thing')
1077 assert not deltatar.filter_path('test_ing')
1078 assert not deltatar.filter_path('test/hola_lala')
1079 assert not deltatar.filter_path('test/agur')
1080 assert not deltatar.filter_path('testing_something')
1081 assert not deltatar.filter_path('yeso')
1082 assert not deltatar.filter_path('yes/o')
1083 assert not deltatar.filter_path('yes_o')
1084 assert not deltatar.filter_path('testing/in_the')
1085 assert not deltatar.filter_path('testing/in_the_field')
1086 assert not deltatar.filter_path('testing/in_the/field')
1087
1088 def test_filter_path_parent(self):
1089 '''
1090 Test specifically the deltatar.filter_path function for parent matching
1091 '''
1092 included_files = [
1093 'testing/path/to/some/thing'
1094 ]
1095 deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1096
1097 # assert valid and invalid paths
1098 assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1099 assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1100 assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1101 assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1102 assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1103 assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1104 assert deltatar.filter_path('testing/something/else') == NO_MATCH
1105
1106 def test_parent_matching_simple_full_backup(self):
1107 '''
1108 Create a full backup using parent matching
1109 '''
1110 included_files = [
1111 'test/huge2'
1112 ]
1113
1114 password, paramversion = self.ENCRYPTION or (None, None)
1115 deltatar = DeltaTar(mode=self.MODE, password=password,
1116 crypto_paramversion=paramversion,
1117 logger=self.consoleLogger,
1118 included_files=included_files)
1119
1120 # create first backup
1121 deltatar.create_full_backup(
1122 source_path="source_dir",
1123 backup_path="backup_dir")
1124
1125 assert os.path.exists("backup_dir")
1126 shutil.rmtree("source_dir")
1127
1128 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1129 tar_path = os.path.join("backup_dir", tar_filename)
1130
1131 deltatar = DeltaTar(mode=self.MODE, password=password,
1132 logger=self.consoleLogger)
1133 deltatar.restore_backup(target_path="source_dir",
1134 backup_tar_path=tar_path)
1135
1136 assert os.path.exists('source_dir/test/huge2')
1137 assert os.path.exists('source_dir/test/')
1138 assert not os.path.exists('source_dir/test/huge')
1139 assert not os.path.exists('source_dir/big')
1140 assert not os.path.exists('source_dir/small')
1141
1142 def test_parent_matching_simple_full_backup_restore(self):
1143 '''
1144 Create a full backup and restores it using parent matching
1145 '''
1146 included_files = [
1147 'test/huge2'
1148 ]
1149
1150 password, paramversion = self.ENCRYPTION or (None, None)
1151 deltatar = DeltaTar(mode=self.MODE, password=password,
1152 crypto_paramversion=paramversion,
1153 logger=self.consoleLogger)
1154
1155 # create first backup
1156 deltatar.create_full_backup(
1157 source_path="source_dir",
1158 backup_path="backup_dir")
1159
1160 assert os.path.exists("backup_dir")
1161 shutil.rmtree("source_dir")
1162
1163 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1164 tar_path = os.path.join("backup_dir", tar_filename)
1165
1166 deltatar = DeltaTar(mode=self.MODE, password=password,
1167 logger=self.consoleLogger,
1168 included_files=included_files)
1169 deltatar.restore_backup(target_path="source_dir",
1170 backup_tar_path=tar_path)
1171
1172 assert os.path.exists('source_dir/test/huge2')
1173 assert os.path.exists('source_dir/test/')
1174 assert not os.path.exists('source_dir/test/huge')
1175 assert not os.path.exists('source_dir/big')
1176 assert not os.path.exists('source_dir/small')
1177
1178 def test_parent_matching_index_full_backup_restore(self):
1179 '''
1180 Create a full backup and restores it using parent matching
1181 '''
1182 included_files = [
1183 'test/huge2'
1184 ]
1185
1186 password, paramversion = self.ENCRYPTION or (None, None)
1187 deltatar = DeltaTar(mode=self.MODE, password=password,
1188 crypto_paramversion=paramversion,
1189 logger=self.consoleLogger)
1190
1191 # create first backup
1192 deltatar.create_full_backup(
1193 source_path="source_dir",
1194 backup_path="backup_dir")
1195
1196 assert os.path.exists("backup_dir")
1197 shutil.rmtree("source_dir")
1198
1199 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1200 tar_path = os.path.join("backup_dir", tar_filename)
1201
1202 deltatar = DeltaTar(mode=self.MODE, password=password,
1203 logger=self.consoleLogger,
1204 included_files=included_files)
1205 deltatar.restore_backup(target_path="source_dir",
1206 backup_tar_path=tar_path)
1207
1208 assert os.path.exists('source_dir/test/huge2')
1209 assert os.path.exists('source_dir/test/')
1210 assert not os.path.exists('source_dir/test/huge')
1211 assert not os.path.exists('source_dir/big')
1212 assert not os.path.exists('source_dir/small')
1213
1214 def test_collate_iterators(self):
1215 '''
1216 Tests the collate iterators functionality with two exact directories,
1217 using an index iterator from a backup and the exact same source dir.
1218 '''
1219 password, paramversion = self.ENCRYPTION or (None, None)
1220 deltatar = DeltaTar(mode=self.MODE, password=password,
1221 crypto_paramversion=paramversion,
1222 logger=self.consoleLogger)
1223
1224 # create first backup
1225 deltatar.create_full_backup(
1226 source_path="source_dir",
1227 backup_path="backup_dir")
1228
1229 assert os.path.exists("backup_dir")
1230
1231 cwd = os.getcwd()
1232 index_filename = deltatar.index_name_func(is_full=True)
1233 index_path = os.path.join(cwd, "backup_dir", index_filename)
1234 index_it = deltatar.iterate_index_path(index_path)
1235
1236 os.chdir('source_dir')
1237 dir_it = deltatar._recursive_walk_dir('.')
1238 path_it = deltatar.jsonize_path_iterator(dir_it)
1239
1240 try:
1241 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1242 assert deltatar._equal_stat_dicts(path1, path2)
1243 finally:
1244 os.chdir(cwd)
1245
1246 def test_collate_iterators_diffdirs(self):
1247 '''
1248 Use the collate iterators functionality with two different directories.
1249 It must behave in an expected way.
1250 '''
1251 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1252
1253 password, paramversion = self.ENCRYPTION or (None, None)
1254 deltatar = DeltaTar(mode=self.MODE, password=password,
1255 crypto_paramversion=paramversion,
1256 logger=self.consoleLogger)
1257
1258 # create first backup
1259 deltatar.create_full_backup(
1260 source_path="source_dir",
1261 backup_path="backup_dir")
1262
1263 assert os.path.exists("backup_dir")
1264 self.hash["source_dir/z"] = self.create_file("source_dir/z", 100)
1265
1266 cwd = os.getcwd()
1267 index_filename = deltatar.index_name_func(is_full=True)
1268 index_path = os.path.join(cwd, "backup_dir", index_filename)
1269 index_it = deltatar.iterate_index_path(index_path)
1270
1271 os.chdir('source_dir')
1272 dir_it = deltatar._recursive_walk_dir('.')
1273 path_it = deltatar.jsonize_path_iterator(dir_it)
1274
1275 try:
1276 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1277 if path2['path'] == 'z':
1278 assert not path1
1279 else:
1280 assert deltatar._equal_stat_dicts(path1, path2)
1281 finally:
1282 os.chdir(cwd)
1283
1284 def test_collate_iterators_diffdirs2(self):
1285 '''
1286 Use the collate iterators functionality with two different directories.
1287 It must behave in an expected way.
1288 '''
1289 password, paramversion = self.ENCRYPTION or (None, None)
1290 deltatar = DeltaTar(mode=self.MODE, password=password,
1291 crypto_paramversion=paramversion,
1292 logger=self.consoleLogger)
1293
1294 # create first backup
1295 deltatar.create_full_backup(
1296 source_path="source_dir",
1297 backup_path="backup_dir")
1298
1299 assert os.path.exists("backup_dir")
1300
1301 # add some new files and directories
1302 os.makedirs('source_dir/bigdir')
1303 self.hash["source_dir/bigdir"] = ""
1304 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1305 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1306 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1307
1308 cwd = os.getcwd()
1309 index_filename = deltatar.index_name_func(is_full=True)
1310 index_path = os.path.join(cwd, "backup_dir", index_filename)
1311 index_it = deltatar.iterate_index_path(index_path)
1312
1313 os.chdir('source_dir')
1314 dir_it = deltatar._recursive_walk_dir('.')
1315 path_it = deltatar.jsonize_path_iterator(dir_it)
1316
1317 visited_pairs = []
1318
1319 try:
1320 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
1321 visited_pairs.append(
1322 (deltatar.unprefixed(path1['path']) if path1 else None,
1323 path2['path'] if path2 else None)
1324 )
1325 finally:
1326 assert visited_pairs == [
1327 (u'big', u'big'),
1328 (None, u'bigdir'),
1329 (u'small', u'small'),
1330 (u'test', u'test'),
1331 (None, u'zzzz'),
1332 (None, u'bigdir/a'),
1333 (None, u'bigdir/b'),
1334 (u'test/huge', u'test/huge'),
1335 (u'test/huge2', u'test/huge2'),
1336 (u'test/test2', u'test/test2'),
1337 ]
1338 os.chdir(cwd)
1339
1340 def test_create_empty_diff_backup(self):
1341 '''
1342 Creates an empty (no changes) backup diff
1343 '''
1344 password, paramversion = self.ENCRYPTION or (None, None)
1345 deltatar = DeltaTar(mode=self.MODE, password=password,
1346 crypto_paramversion=paramversion,
1347 logger=self.consoleLogger)
1348
1349 # create first backup
1350 deltatar.create_full_backup(
1351 source_path="source_dir",
1352 backup_path="backup_dir")
1353
1354 prev_index_filename = deltatar.index_name_func(is_full=True)
1355 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1356
1357 deltatar.create_diff_backup("source_dir", "backup_dir2",
1358 prev_index_path)
1359
1360 # check index items
1361 index_path = os.path.join("backup_dir2",
1362 deltatar.index_name_func(is_full=False))
1363 index_it = deltatar.iterate_index_path(index_path)
1364 n = 0
1365 for i in index_it:
1366 n += 1
1367 assert i[0]['path'].startswith("list://")
1368
1369 assert n == 6
1370
1371 # check the tar file
1372 assert os.path.exists("backup_dir2")
1373 shutil.rmtree("source_dir")
1374
1375 tar_filename = deltatar.volume_name_func('backup_dir2',
1376 is_full=False, volume_number=0)
1377 tar_path = os.path.join("backup_dir2", tar_filename)
1378
1379 # no file restored, because the diff was empty
1380 deltatar.restore_backup(target_path="source_dir",
1381 backup_tar_path=tar_path)
1382 assert len(os.listdir("source_dir")) == 0
1383
1384
1385 def test_create_diff_backup1(self):
1386 '''
1387 Creates a diff backup when there are new files
1388 '''
1389 password, paramversion = self.ENCRYPTION or (None, None)
1390 deltatar = DeltaTar(mode=self.MODE, password=password,
1391 crypto_paramversion=paramversion,
1392 logger=self.consoleLogger)
1393
1394 # create first backup
1395 deltatar.create_full_backup(
1396 source_path="source_dir",
1397 backup_path="backup_dir")
1398
1399 prev_index_filename = deltatar.index_name_func(is_full=True)
1400 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1401
1402 # add some new files and directories
1403 os.makedirs('source_dir/bigdir')
1404 self.hash["source_dir/bigdir"] = ""
1405 os.unlink("source_dir/small")
1406 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1407 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1408 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1409
1410 deltatar.create_diff_backup("source_dir", "backup_dir2",
1411 prev_index_path)
1412
1413 # check index items
1414 index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
1415 index_it = deltatar.iterate_index_path(index_path)
1416 l = [i[0]['path'] for i in index_it]
1417
1418 assert l == [
1419 'list://big',
1420 'snapshot://bigdir',
1421 'delete://small',
1422 'list://test',
1423 'snapshot://zzzz',
1424 'snapshot://bigdir/a',
1425 'snapshot://bigdir/b',
1426 'list://test/huge',
1427 'list://test/huge2',
1428 'list://test/test2',
1429 ]
1430
1431 # check the tar file
1432 assert os.path.exists("backup_dir2")
1433 shutil.rmtree("source_dir")
1434
1435 # create source_dir with the small file, that will be then deleted by
1436 # the restore_backup
1437 os.mkdir("source_dir")
1438 open("source_dir/small", 'wb').close()
1439
1440 tar_filename = deltatar.volume_name_func('backup_dir2',
1441 is_full=False, volume_number=0)
1442 tar_path = os.path.join("backup_dir2", tar_filename)
1443
1444 # restore the backup, this will create only the new files
1445 deltatar.restore_backup(target_path="source_dir",
1446 backup_tar_path=tar_path)
1447 # the order doesn't matter
1448 assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
1449
1450 def test_restore_from_index_diff_backup(self):
1451 '''
1452 Creates a full backup, modifies some files, creates a diff backup,
1453 then restores the diff backup from zero.
1454 '''
1455 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1456 raise SkipTest('this test only works for uncompressed '
1457 'or concat compressed modes')
1458
1459 password, paramversion = self.ENCRYPTION or (None, None)
1460 deltatar = DeltaTar(mode=self.MODE, password=password,
1461 crypto_paramversion=paramversion,
1462 logger=self.consoleLogger)
1463
1464 # create first backup
1465 deltatar.create_full_backup(
1466 source_path="source_dir",
1467 backup_path="backup_dir")
1468
1469 prev_index_filename = deltatar.index_name_func(is_full=True)
1470 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1471
1472 # add some new files and directories
1473 os.makedirs('source_dir/bigdir')
1474 self.hash["source_dir/bigdir"] = ""
1475 os.unlink("source_dir/small")
1476 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1477 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1478 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1479
1480 deltatar.create_diff_backup("source_dir", "backup_dir2",
1481 prev_index_path)
1482
1483 # apply diff backup in target_dir
1484 index_filename = deltatar.index_name_func(is_full=False)
1485 index_path = os.path.join("backup_dir2", index_filename)
1486 deltatar.restore_backup("target_dir",
1487 backup_indexes_paths=[index_path, prev_index_path])
1488
1489 # then compare the two directories source_dir and target_dir and check
1490 # they are the same
1491 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1492
1493 def test_restore_from_index_diff_backup2(self):
1494 '''
1495 Creates a full backup, modifies some files, creates a diff backup,
1496 then restores the diff backup with the full backup as a starting point.
1497 '''
1498 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1499 raise SkipTest('this test only works for uncompressed '
1500 'or concat compressed modes')
1501
1502 password, paramversion = self.ENCRYPTION or (None, None)
1503 deltatar = DeltaTar(mode=self.MODE, password=password,
1504 crypto_paramversion=paramversion,
1505 logger=self.consoleLogger)
1506
1507 # create first backup
1508 deltatar.create_full_backup(
1509 source_path="source_dir",
1510 backup_path="backup_dir")
1511
1512 prev_index_filename = deltatar.index_name_func(is_full=True)
1513 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1514
1515 # add some new files and directories
1516 os.makedirs('source_dir/bigdir')
1517 self.hash["source_dir/bigdir"] = ""
1518 os.unlink("source_dir/small")
1519 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1520 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1521 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1522 shutil.rmtree("source_dir/test")
1523
1524 deltatar.create_diff_backup("source_dir", "backup_dir2",
1525 prev_index_path)
1526
1527 # first restore initial backup in target_dir
1528 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1529 tar_path = os.path.join("backup_dir", tar_filename)
1530 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1531
1532 # then apply diff backup in target_dir
1533 index_filename = deltatar.index_name_func(is_full=False)
1534 index_path = os.path.join("backup_dir2", index_filename)
1535
1536 try:
1537 deltatar.restore_backup("target_dir",
1538 backup_indexes_paths=[index_path, prev_index_path])
1539
1540 # then compare the two directories source_dir and target_dir and check
1541 # they are the same
1542 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1543 except FileNotFoundError as exn:
1544 if self.FSTEST is None:
1545 # fs traversal may fail here
1546 raise exn
1547
1548 def test_restore_from_index_diff_backup3(self):
1549 '''
1550 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1551 diff backup, then restores the diff backup with the full backup as a
1552 starting point.
1553 '''
1554 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1555 raise SkipTest('this test only works for uncompressed '
1556 'or concat compressed modes')
1557
1558 password, paramversion = self.ENCRYPTION or (None, None)
1559 deltatar = DeltaTar(mode=self.MODE, password=password,
1560 crypto_paramversion=paramversion,
1561 logger=self.consoleLogger)
1562
1563 shutil.rmtree("source_dir")
1564 shutil.copytree(self.GIT_DIR, "source_dir")
1565 shutil.copytree(self.GIT_DIR, "source_dir_diff")
1566
1567 # create first backup
1568 deltatar.create_full_backup(
1569 source_path="source_dir",
1570 backup_path="backup_dir")
1571
1572 prev_index_filename = deltatar.index_name_func(is_full=True)
1573 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1574
1575 # alter the source_dir randomly
1576 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1577
1578 for path in source_it:
1579 # if path doesn't exist (might have previously removed) ignore it.
1580 # also ignore it (i.e. do not change it) 70% of the time
1581 if not os.path.exists(path) or random.random() < 0.7:
1582 continue
1583
1584 # remove the file
1585 if os.path.isdir(path):
1586 shutil.rmtree(path)
1587 else:
1588 os.unlink(path)
1589
1590 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1591 prev_index_path)
1592
1593 # first restore initial backup in target_dir
1594 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
1595 tar_path = os.path.join("backup_dir", tar_filename)
1596 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1597
1598 # and check that target_dir equals to source_dir (which is the same as
1599 # self.GIT_DIR initially)
1600 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1601
1602 # then apply diff backup in target_dir
1603 index_filename = deltatar.index_name_func(is_full=False)
1604 index_path = os.path.join("backup_dir2", index_filename)
1605 deltatar.restore_backup("target_dir",
1606 backup_indexes_paths=[index_path, prev_index_path])
1607
1608 # and check that target_dir equals to source_dir_diff (the randomly
1609 # altered self.GIT_DIR directory)
1610 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1611
1612 # then delete target_dir and apply diff backup from zero and check again
1613 shutil.rmtree("target_dir")
1614 deltatar.restore_backup("target_dir",
1615 backup_indexes_paths=[index_path, prev_index_path])
1616
1617 # and check that target_dir equals to source_dir_diff (the randomly
1618 # altered self.GIT_DIR directory)
1619 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1620
1621 def test_restore_from_index_diff_backup3_multivol(self):
1622 '''
1623 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
1624 diff backup, then restores the diff backup with the full backup as a
1625 starting point.
1626 '''
1627 if self.MODE.startswith(':') or self.MODE.startswith('|'):
1628 raise SkipTest('this test only works for uncompressed '
1629 'or concat compressed modes')
1630
1631 password, paramversion = self.ENCRYPTION or (None, None)
1632 deltatar = DeltaTar(mode=self.MODE, password=password,
1633 crypto_paramversion=paramversion,
1634 logger=self.consoleLogger)
1635
1636 shutil.rmtree("source_dir")
1637 shutil.copytree(self.GIT_DIR, "source_dir")
1638 shutil.copytree(self.GIT_DIR, "source_dir_diff")
1639
1640 # create first backup
1641 deltatar.create_full_backup(
1642 source_path="source_dir",
1643 backup_path="backup_dir",
1644 max_volume_size=1)
1645
1646 prev_index_filename = deltatar.index_name_func(is_full=True)
1647 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1648
1649 # alter the source_dir randomly
1650 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1651
1652 for path in source_it:
1653 # if path doesn't exist (might have previously removed) ignore it.
1654 # also ignore it (i.e. do not change it) 70% of the time
1655 if not os.path.exists(path) or random.random() < 0.7:
1656 continue
1657
1658 # remove the file
1659 if os.path.isdir(path):
1660 shutil.rmtree(path)
1661 else:
1662 os.unlink(path)
1663
1664 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1665 prev_index_path, max_volume_size=1)
1666
1667 # first restore initial backup in target_dir
1668 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1669 tar_path = os.path.join("backup_dir", tar_filename)
1670 if self.FSTEST is not None:
1671 return # the below will fail in stat checks, but that is expected
1672 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1673
1674 # and check that target_dir equals to source_dir (which is the same as
1675 # self.GIT_DIR initially)
1676 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1677
1678 # then apply diff backup in target_dir
1679 index_filename = deltatar.index_name_func(is_full=False)
1680 index_path = os.path.join("backup_dir2", index_filename)
1681 deltatar.restore_backup("target_dir",
1682 backup_indexes_paths=[index_path, prev_index_path])
1683
1684 # and check that target_dir equals to source_dir_diff (the randomly
1685 # altered self.GIT_DIR directory)
1686 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1687
1688 # then delete target_dir and apply diff backup from zero and check again
1689 shutil.rmtree("target_dir")
1690 deltatar.restore_backup("target_dir",
1691 backup_indexes_paths=[index_path, prev_index_path])
1692
1693 # and check that target_dir equals to source_dir_diff (the randomly
1694 # altered self.GIT_DIR directory)
1695 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1696
1697 def check_equal_dirs(self, path1, path2, deltatar):
1698 '''
1699 compare the two directories source_dir and target_dir and check
1700 # they are the same
1701 '''
1702 source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
1703 source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
1704 target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
1705 target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1706 while True:
1707 try:
1708 sitem = next(source_it)
1709 titem = next(target_it)
1710 except StopIteration:
1711 try:
1712 titem = next(target_it)
1713 raise Exception("iterators do not stop at the same time")
1714 except StopIteration:
1715 break
1716 try:
1717 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
1718 except Exception as e:
1719 print("SITEM: " + str(sitem))
1720 print("TITEM: " + str(titem))
1721 raise e
1722
1723 def test_create_no_symlinks(self):
1724 '''
1725 Creates a full backup from different varieties of symlinks. The
1726 extracted archive may not contain any symlinks but the file contents
1727 '''
1728
1729 os.system("rm -rf source_dir")
1730 os.makedirs("source_dir/symlinks")
1731 fd = os.open("source_dir/symlinks/valid_linkname",
1732 os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1733 os.write(fd, b"valid link target for symlink tests; please ignore\n")
1734 os.close(fd)
1735 # first one is good, the rest points nowhere
1736 self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1737 self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1738 self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1739 self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
1740 password, paramversion = self.ENCRYPTION or (None, None)
1741 deltatar = DeltaTar(mode=self.MODE, password=password,
1742 crypto_paramversion=paramversion,
1743 logger=self.consoleLogger)
1744
1745 # create first backup
1746 deltatar.create_full_backup(source_path="source_dir",
1747 backup_path="backup_dir")
1748
1749 assert os.path.exists("backup_dir")
1750 shutil.rmtree("source_dir")
1751 assert not os.path.exists("source_dir")
1752
1753 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1754 tar_path = os.path.join("backup_dir", tar_filename)
1755
1756 deltatar.restore_backup(target_path="source_dir",
1757 backup_tar_path=tar_path)
1758
1759 for _r, _ds, fs in os.walk("source_dir/symlinks"):
1760 # only the valid link plus the linked file may be found in the
1761 # extracted archive
1762 assert len(fs) == 2
1763 for f in fs:
1764 # the link must have been resolved and file contents must match
1765 # the linked file
1766 assert not os.path.islink(f)
1767 with open("source_dir/symlinks/valid_linkname") as a:
1768 with open("source_dir/symlinks/whatever") as b:
1769 assert a.read() == b.read()
1770
1771 def test_restore_with_symlinks(self):
1772 '''
1773 Creates a full backup containing different varieties of symlinks. All
1774 of them must be filtered out.
1775 '''
1776 password, paramversion = self.ENCRYPTION or (None, None)
1777 deltatar = DeltaTar(mode=self.MODE, password=password,
1778 crypto_paramversion=paramversion,
1779 logger=self.consoleLogger)
1780
1781 # create first backup
1782 deltatar.create_full_backup(source_path="source_dir",
1783 backup_path="backup_dir")
1784
1785 assert os.path.exists("backup_dir")
1786 shutil.rmtree("source_dir")
1787
1788 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1789 tar_path = os.path.join("backup_dir", tar_filename)
1790
1791 # add symlinks to existing archive
1792
1793 def add_symlink (a, name, dst):
1794 l = tarfile.TarInfo("snapshot://%s" % name)
1795 l.type = tarfile.SYMTYPE
1796 l.linkname = dst
1797 a.addfile(l)
1798 return name
1799
1800 try:
1801 with tarfile.open(tar_path,mode="a") as a:
1802 checkme = \
1803 [ add_symlink(a, "symlinks/foo", "internal-file")
1804 , add_symlink(a, "symlinks/bar", "/absolute/path")
1805 , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1806 except tarfile.ReadError as e:
1807 if self.MODE == '#' or self.MODE.endswith ("gz"):
1808 checkme = []
1809 else:
1810 raise
1811 except ValueError as e:
1812 if self.MODE.startswith ('#'):
1813 checkme = []
1814 else:
1815 raise
1816
1817 deltatar.restore_backup(target_path="source_dir",
1818 backup_tar_path=tar_path)
1819
1820 # check what happened to our symlinks
1821 for name in checkme:
1822 fullpath = os.path.join("source_dir", name)
1823 assert not os.path.exists(fullpath)
1824
1825
1826 def test_restore_malicious_symlinks(self):
1827 '''
1828 Creates a full backup containing a symlink and a file of the same name.
1829 This simulates a symlink attack with a link pointing to some external
1830 path that is abused to write outside the extraction prefix.
1831 '''
1832 password, paramversion = self.ENCRYPTION or (None, None)
1833 deltatar = DeltaTar(mode=self.MODE, password=password,
1834 crypto_paramversion=paramversion,
1835 logger=self.consoleLogger)
1836
1837 # create first backup
1838 deltatar.create_full_backup(source_path="source_dir",
1839 backup_path="backup_dir")
1840
1841 assert os.path.exists("backup_dir")
1842 shutil.rmtree("source_dir")
1843
1844 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1845 tar_path = os.path.join("backup_dir", tar_filename)
1846
1847 # add symlinks to existing archive
1848
1849 def add_symlink (a, name, dst):
1850 l = tarfile.TarInfo("snapshot://%s" % name)
1851 l.type = tarfile.SYMTYPE
1852 l.linkname = dst
1853 a.addfile(l)
1854
1855 def add_file (a, name):
1856 f = tarfile.TarInfo("snapshot://%s" % name)
1857 f.type = tarfile.REGTYPE
1858 a.addfile(f)
1859
1860 testpath = "symlinks/pernicious-link"
1861 testdst = "/tmp/does/not/exist"
1862
1863 try:
1864 with tarfile.open(tar_path, mode="a") as a:
1865 add_symlink(a, testpath, testdst)
1866 add_symlink(a, testpath, testdst+"X")
1867 add_symlink(a, testpath, testdst+"XXX")
1868 add_file(a, testpath)
1869 except tarfile.ReadError as e:
1870 if self.MODE == '#' or self.MODE.endswith ("gz"):
1871 pass
1872 else:
1873 raise
1874 except ValueError as e:
1875 if self.MODE.startswith ('#'):
1876 pass # O_APPEND of concat archives not feasible
1877 else:
1878 raise
1879
1880 deltatar.restore_backup(target_path="source_dir",
1881 backup_tar_path=tar_path)
1882
1883 # check whether the link was extracted; deltatar seems to only ever
1884 # retrieve the first item it finds for a given path which in the case
1885 # at hand is a symlink to some non-existent path
1886 fullpath = os.path.join("source_dir", testpath)
1887 assert not os.path.lexists(fullpath)
1888
1889
1890class TarfileTest(BaseTest):
1891 pwd = None
1892
1893 def setUp(self):
1894 self.pwd = os.getcwd()
1895 os.makedirs("backup_dir", exist_ok=True)
1896
1897 def tearDown(self):
1898 '''
1899 Remove temporary files created by unit tests and restore the API
1900 functions in *os*.
1901 '''
1902 os.chdir(self.pwd)
1903 shutil.rmtree("backup_dir")
1904
1905 def test_extract_malicious_symlinks_unlink(self):
1906 '''
1907 Test symlink mitigation: The destination must be deleted prior to
1908 extraction.
1909 '''
1910 tar_path = os.path.join("backup_dir", "malicious-archive")
1911
1912 # add symlinks to existing archive
1913
1914 def add_symlink (a, name, dst):
1915 l = tarfile.TarInfo(name)
1916 l.type = tarfile.SYMTYPE
1917 l.linkname = dst
1918 a.addfile(l)
1919
1920 def add_file (a, name):
1921 f = tarfile.TarInfo(name)
1922 f.type = tarfile.REGTYPE
1923 a.addfile(f)
1924
1925 # Add a symlink pointing to must-not-exist, then append a file
1926 # object at the same path. The file must not end up at
1927 # “must-not-exist” (the pointee) but at “not-as-symlink” (the
1928 # pointer) that was unlinked prior to extraction.
1929 testpath = "test/not-a-symlink"
1930 testdst = "must-not-exist"
1931
1932 try:
1933 with tarfile.open(tar_path, mode="w") as a:
1934 add_symlink(a, testpath, testdst)
1935 add_file(a, testpath)
1936 except tarfile.ReadError as e:
1937 if self.MODE == '#' or self.MODE.endswith ("gz"):
1938 pass
1939 else:
1940 raise
1941 except ValueError as e:
1942 if self.MODE.startswith ('#'):
1943 pass # O_APPEND of concat archives not feasible
1944 else:
1945 raise
1946
1947 def test_extract(dst, unlink):
1948 with tarfile.open(tar_path, mode="r") as a:
1949 os.makedirs(dst, exist_ok=True)
1950 olddir = os.getcwd()
1951 try:
1952 os.chdir(dst)
1953 a.extractall(unlink=unlink)
1954 finally:
1955 os.chdir(olddir)
1956
1957 fullpath = os.path.join(dst, testpath)
1958 fulldst = os.path.join(dst, "test/%s" % testdst)
1959
1960 if unlink is True:
1961 # Check whether the file was extracted. The object at the
1962 # symlink location (source) must be the file. The must not
1963 # be an object at the symlink destination.
1964 assert not os.path.islink(fullpath)
1965 assert not os.path.exists(fulldst)
1966 else:
1967 # Without unlink protection, the file must be found at the
1968 # symlink destination with the symlink intact.
1969 assert os.path.islink(fullpath)
1970 assert os.path.exists(fulldst)
1971
1972
1973 test_extract("test_dst_unlinked" , True)
1974 test_extract("test_dst_symlinked", False)
1975
1976
1977def fsapi_access_true (self):
1978 """
1979 Chicanery for testing improper use of the *os* module.
1980 """
1981 def yes (*_a, **_ka): return True
1982 self.FSAPI_SAVED.append (("access", getattr (os, "access")))
1983 setattr (os, "access", yes)
1984
1985
1986class DeltaTar2Test(DeltaTarTest):
1987 '''
1988 Same as DeltaTar but with specific ":" mode
1989 '''
1990 MODE = ':'
1991
1992
1993class DeltaTarStreamTest(DeltaTarTest):
1994 '''
1995 Same as DeltaTar but with specific uncompressed stream mode
1996 '''
1997 MODE = '|'
1998
1999
2000class DeltaTarGzipTest(DeltaTarTest):
2001 '''
2002 Same as DeltaTar but with specific gzip mode
2003 '''
2004 MODE = ':gz'
2005 MODE_COMPRESSES = True
2006
2007
2008class DeltaTarGzipStreamTest(DeltaTarTest):
2009 '''
2010 Same as DeltaTar but with specific gzip stream mode
2011 '''
2012 MODE = '|gz'
2013 MODE_COMPRESSES = True
2014
2015
2016@skip('Bz2 tests are too slow..')
2017class DeltaTarBz2Test(DeltaTarTest):
2018 '''
2019 Same as DeltaTar but with specific bz2 mode
2020 '''
2021 MODE = ':bz2'
2022 MODE_COMPRESSES = True
2023
2024
2025@skip('Bz2 tests are too slow..')
2026class DeltaTarBz2StreamTest(DeltaTarTest):
2027 '''
2028 Same as DeltaTar but with specific bz2 stream mode
2029 '''
2030 MODE = '|bz2'
2031 MODE_COMPRESSES = True
2032
2033
2034class DeltaTarGzipConcatTest(DeltaTarTest):
2035 '''
2036 Same as DeltaTar but with specific gzip concat stream mode
2037 '''
2038 MODE = '#gz'
2039 MODE_COMPRESSES = True
2040
2041
2042class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
2043 '''
2044 Same as DeltaTar but with specific gzip aes128 concat stream mode
2045 '''
2046 MODE = '#gz'
2047 ENCRYPTION = ('some magic key', 1)
2048 MODE_COMPRESSES = True
2049
2050
2051class DeltaTarAes128ConcatTest(DeltaTarTest):
2052 '''
2053 Same as DeltaTar but with specific aes128 concat stream mode
2054 '''
2055 MODE = '#'
2056 ENCRYPTION = ('some magic key', 1)
2057
2058
2059class DeltaTarFilesystemHandlingTestBase(BaseTest):
2060 '''
2061 Mess with filesystem APIs.
2062 '''
2063 FSTEST = fsapi_access_true
2064
2065class DeltaTarFSGzipTest(DeltaTarFilesystemHandlingTestBase,
2066 DeltaTarGzipTest):
2067 pass
2068
2069class DeltaTarFSGzipConcatTest(DeltaTarFilesystemHandlingTestBase,
2070 DeltaTarGzipConcatTest):
2071 pass
2072
2073class DeltaTarFSAes128ConcatTest(DeltaTarFilesystemHandlingTestBase,
2074 DeltaTarAes128ConcatTest):
2075 pass
2076
2077class DeltaTarFSGzipAes128ConcatTest(DeltaTarFilesystemHandlingTestBase,
2078 DeltaTarGzipAes128ConcatTest):
2079 pass
2080