improve unit test SNR
[python-delta-tar] / testing / test_deltatar.py
CommitLineData
0708a374
ERE
1# Copyright (C) 2013 Intra2net AG
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU Lesser General Public License as published
5# by the Free Software Foundation; either version 3 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU Lesser General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program. If not, see
15# <http://www.gnu.org/licenses/lgpl-3.0.html>
16
17# Author: Eduardo Robles Elvira <edulix@wadobo.com>
18
f5d9144b 19import errno
0708a374 20import os
0d5c1970 21import re
cbac9f0b 22import random
e5c6ca04 23import shutil
0708a374 24import logging
b8fc2f5d
ERE
25import binascii
26import json
27from datetime import datetime
28from functools import partial
bd011242 29from unittest import skip, SkipTest
0708a374 30
83f5fd71 31import deltatar.tarfile as tarfile
f698c99c 32from deltatar.tarfile import TarFile
974408b5 33from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
f698c99c
PG
34from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35import deltatar.crypto as crypto
0708a374 36
0708a374
ERE
37from . import BaseTest
38from . import new_volume_handler
39
105c8e26
PG
40# Enable warning messages from deltatar. This minimizes the SNR of
41# test runs, but none of the messages are meaningful in any way.
42VERBOSE_TEST_OUTPUT = False
43
0708a374
ERE
44class DeltaTarTest(BaseTest):
45 """
46 Test backups
47 """
da26094a 48 MODE = ''
8ea0be50 49 MODE_COMPRESSES = False
da26094a 50
f698c99c 51 ENCRYPTION = None # (password : str, paramversion : int) option
da26094a 52
188b845d
TJ
53 GIT_DIR = '.git'
54
0708a374
ERE
55 def setUp(self):
56 '''
57 Create base test data
58 '''
a4e8b8af 59 self.pwd = os.getcwd()
cbac9f0b 60 os.system('rm -rf target_dir source_dir* backup_dir* huge')
0708a374
ERE
61 os.makedirs('source_dir/test/test2')
62 self.hash = dict()
e5c6ca04 63 self.hash["source_dir/test/test2"] = ''
0708a374
ERE
64 self.hash["source_dir/big"] = self.create_file("source_dir/big", 50000)
65 self.hash["source_dir/small"] = self.create_file("source_dir/small", 100)
66 self.hash["source_dir/test/huge"] = self.create_file("source_dir/test/huge", 700000)
d5361dac 67 self.hash["source_dir/test/huge2"] = self.create_file("source_dir/test/huge2", 800000)
0708a374 68
105c8e26
PG
69 self.consoleLogger = None
70 if VERBOSE_TEST_OUTPUT is True:
71 self.consoleLogger = logging.StreamHandler()
72 self.consoleLogger.setLevel(logging.DEBUG)
0708a374 73
188b845d
TJ
74 if not os.path.isdir(self.GIT_DIR):
75 # Not running inside git tree, take our
76 # own testing directory as source.
77 self.GIT_DIR = 'testing'
78
79 if not os.path.isdir(self.GIT_DIR):
80 raise Exception('No input directory found: ' + self.GIT_DIR)
81
0708a374
ERE
82 def tearDown(self):
83 '''
cb7a3911 84 Remove temporal files created by unit tests and reset globals.
0708a374 85 '''
a4e8b8af 86 os.chdir(self.pwd)
cbac9f0b 87 os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
cb7a3911
PG
88 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
89 ("I am fully aware that this will void my warranty.")
0708a374 90
b8fc2f5d 91 def test_restore_simple_full_backup(self):
0708a374
ERE
92 '''
93 Creates a full backup without any filtering and restores it.
94 '''
f698c99c
PG
95 password, paramversion = self.ENCRYPTION or (None, None)
96 deltatar = DeltaTar(mode=self.MODE, password=password,
97 crypto_paramversion=paramversion,
da26094a 98 logger=self.consoleLogger)
0708a374
ERE
99
100 # create first backup
101 deltatar.create_full_backup(
102 source_path="source_dir",
103 backup_path="backup_dir")
104
e5c6ca04
ERE
105 assert os.path.exists("backup_dir")
106 shutil.rmtree("source_dir")
107
108 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
109 tar_path = os.path.join("backup_dir", tar_filename)
110
e5c6ca04
ERE
111 deltatar.restore_backup(target_path="source_dir",
112 backup_tar_path=tar_path)
113
be60ffd0 114 for key, value in self.hash.items():
e5c6ca04
ERE
115 assert os.path.exists(key)
116 if value:
e82f14f5 117 assert value == self.md5sum(key)
da26094a 118
cb7a3911
PG
119
120 def test_create_backup_max_file_length (self):
121 """
122 Creates a full backup including one file that exceeds the (purposely
123 lowered) upper bound on GCM encrypted objects. This will yield multiple
124 encrypted objects for one plaintext file.
125
126 Success is verified by splitting the archive at object boundaries and
127 counting the parts.
128 """
129 if self.MODE_COMPRESSES is True:
130 raise SkipTest ("GCM file length test not meaningful with compression.")
131 if self.ENCRYPTION is None:
132 raise SkipTest ("GCM file length applies only to encrypted backups.")
133
134 new_max = 20000 # cannot be less than tar block size
135 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
136 ("I am fully aware that this will void my warranty.",
137 new_max)
138
139 password, paramversion = self.ENCRYPTION
140 deltatar = DeltaTar (mode=self.MODE, password=password,
141 crypto_paramversion=paramversion,
142 logger=self.consoleLogger)
143
144 self.hash = dict ()
145 os.makedirs ("source_dir2")
146 for f, s in [("empty" , 0) # 1 tar objects
147 ,("slightly_larger", new_max + 1) # 2
148 ,("twice" , 2 * new_max) # 3
149 ]:
150 f = "source_dir2/%s" % f
151 self.hash [f] = self.create_file (f, s)
152
153 deltatar.create_full_backup \
154 (source_path="source_dir2", backup_path="backup_dir")
155
156 assert os.path.exists ("backup_dir")
157 shutil.rmtree ("source_dir2")
158
159 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
160 backup_path = os.path.join("backup_dir", backup_filename)
161
162 # split the resulting archive into its constituents without
163 # decrypting
164 ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
165 "-o backup_dir/split <\'%s\'" % backup_path)
166
167 assert os.path.exists ("backup_dir/split")
168
169 dents = os.listdir ("backup_dir/split")
170 assert len (dents) == 6
171
172
173 def test_restore_backup_max_file_length (self):
174 """
175 Creates a full backup including one file that exceeds the (purposely
176 lowered) upper bound on GCM encrypted objects. This will yield two
177 encrypted objects for one plaintext file.
178
179 Success is verified by splitting the archive at object boundaries and
180 counting the parts.
181 """
182 if self.MODE_COMPRESSES is True:
183 raise SkipTest ("GCM file length test not meaningful with compression.")
184 if self.ENCRYPTION is None:
185 raise SkipTest ("GCM file length applies only to encrypted backups.")
186
187 new_max = 20000 # cannot be less than tar block size
188 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
189 ("I am fully aware that this will void my warranty.",
190 new_max)
191
192 password, paramversion = self.ENCRYPTION
193 deltatar = DeltaTar (mode=self.MODE, password=password,
194 crypto_paramversion=paramversion,
195 logger=self.consoleLogger)
196
197 self.hash = dict ()
198 os.makedirs ("source_dir2")
199 for f, s in [("empty" , 0) # 1 tar objects
ca520c21
TJ
200 ,("almost_large" , new_max - 1) # 2
201 ,("large" , new_max) # 3
202 ,("slightly_larger", new_max + 1) # 4
203 ,("twice" , 2 * new_max) # 5
204 ,("twice_plus_one" , (2 * new_max) + 1) # 6
cb7a3911
PG
205 ]:
206 f = "source_dir2/%s" % f
207 self.hash [f] = self.create_file (f, s)
208
209 deltatar.create_full_backup \
210 (source_path="source_dir2", backup_path="backup_dir")
211
212 assert os.path.exists ("backup_dir")
213 shutil.rmtree ("source_dir2")
214
215 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
216 backup_path = os.path.join("backup_dir", backup_filename)
217
218 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
219 tar_path = os.path.join("backup_dir", tar_filename)
220
221 deltatar.restore_backup(target_path="source_dir2",
222 backup_tar_path=tar_path)
223
224 for key, value in self.hash.items():
225 assert os.path.exists(key)
226 if value:
227 assert value == self.md5sum(key)
228
229
fac2cfe1
PG
230 def test_create_backup_index_max_file_length (self):
231 """
17b810c6
TJ
232 Creates a full backup with a too large index file for the upper bound
233 of the GCM encryption. Since the index file has a fixed IV file counter
234 of AES_GCM_IV_CNT_INDEX, we expect the crypto layer to abort.
fac2cfe1 235
17b810c6 236 60+ GB of (potentially compressed) index file should last for a while...
fac2cfe1
PG
237 """
238 if self.MODE_COMPRESSES is True:
239 raise SkipTest ("GCM file length test not meaningful with compression.")
240 if self.ENCRYPTION is None:
241 raise SkipTest ("GCM file length applies only to encrypted backups.")
242
243 new_max = 5000
244 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
245 ("I am fully aware that this will void my warranty.",
246 new_max)
247
248 password, paramversion = self.ENCRYPTION
249 deltatar = DeltaTar (mode=self.MODE, password=password,
250 crypto_paramversion=paramversion,
251 logger=self.consoleLogger)
252
253 self.hash = dict ()
254 os.makedirs ("source_dir2")
255 for i in range (42):
256 f = "source_dir2/dummy_%rd" % i
257 self.hash [f] = self.create_file (f, i)
258
1c2f7f07 259 with self.assertRaises (crypto.InvalidFileCounter):
fac2cfe1
PG
260 deltatar.create_full_backup \
261 (source_path="source_dir2", backup_path="backup_dir")
fac2cfe1
PG
262 shutil.rmtree ("source_dir2")
263
264
6c678f3a
ERE
265 def test_check_index_checksum(self):
266 '''
267 Creates a full backup and checks the index' checksum of files
268 '''
f698c99c
PG
269 password, paramversion = self.ENCRYPTION or (None, None)
270 deltatar = DeltaTar(mode=self.MODE, password=password,
271 crypto_paramversion=paramversion,
6c678f3a
ERE
272 logger=self.consoleLogger)
273
274 # create first backup
275 deltatar.create_full_backup(
276 source_path="source_dir",
277 backup_path="backup_dir")
278
279
280 index_filename = deltatar.index_name_func(True)
281 index_path = os.path.join("backup_dir", index_filename)
282
be60ffd0 283 f = open(index_path, 'rb')
6c678f3a
ERE
284 crc = None
285 checked = False
286 began_list = False
be60ffd0
ERE
287 while True:
288 l = f.readline()
289 if l == b'':
290 break
291 if b'BEGIN-FILE-LIST' in l:
c2ffe2ec 292 crc = binascii.crc32(l) & 0xFFFFffff
6c678f3a 293 began_list = True
be60ffd0 294 elif b'END-FILE-LIST' in l:
6c678f3a
ERE
295 crc = binascii.crc32(l, crc) & 0xffffffff
296
297 # next line contains the crc
be60ffd0 298 data = json.loads(f.readline().decode("UTF-8"))
6c678f3a
ERE
299 assert data['type'] == 'file-list-checksum'
300 assert data['checksum'] == crc
301 checked = True
302 break
303 elif began_list:
304 crc = binascii.crc32(l, crc) & 0xffffffff
c7609167 305 f.close()
6c678f3a 306
b8fc2f5d
ERE
307
308 def test_restore_multivol(self):
d5361dac 309 '''
b8fc2f5d
ERE
310 Creates a full backup without any filtering with multiple volumes and
311 restore it.
d5361dac 312 '''
ba7760a7
CH
313 if ':gz' in self.MODE:
314 raise SkipTest('compression information is lost when creating '
315 'multiple volumes with no Stream')
316
f698c99c
PG
317 password, paramversion = self.ENCRYPTION or (None, None)
318 deltatar = DeltaTar(mode=self.MODE, password=password,
319 crypto_paramversion=paramversion,
d5361dac
ERE
320 logger=self.consoleLogger)
321
b8fc2f5d
ERE
322 self.hash = dict()
323 os.makedirs('source_dir2')
324 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
325 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
326
d5361dac
ERE
327 # create first backup
328 deltatar.create_full_backup(
b8fc2f5d 329 source_path="source_dir2",
d5361dac
ERE
330 backup_path="backup_dir",
331 max_volume_size=1)
332
333 assert os.path.exists("backup_dir")
334 assert os.path.exists(os.path.join("backup_dir",
335 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
336 if self.MODE_COMPRESSES:
337 n_vols = 1
338 else:
339 n_vols = 2
340 for i_vol in range(n_vols):
341 assert os.path.exists(os.path.join("backup_dir",
342 deltatar.volume_name_func("backup_dir", True, i_vol)))
343 assert not os.path.exists(os.path.join("backup_dir",
344 deltatar.volume_name_func("backup_dir", True, n_vols)))
d5361dac 345
b8fc2f5d 346 shutil.rmtree("source_dir2")
d5361dac
ERE
347
348 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
349 tar_path = os.path.join("backup_dir", tar_filename)
350
351 # this should automatically restore all volumes
b8fc2f5d 352 deltatar.restore_backup(target_path="source_dir2",
d5361dac
ERE
353 backup_tar_path=tar_path)
354
be60ffd0 355 for key, value in self.hash.items():
d5361dac
ERE
356 assert os.path.exists(key)
357 if value:
358 assert value == self.md5sum(key)
359
14e2e92d
DGM
360 def test_restore_multivol_split(self):
361 '''
362 Creates a full backup without any filtering with multiple volumes
363 with big files bigger than the max volume size and
364 restore it.
365 '''
f61e1822 366 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
367 raise SkipTest('this test only works for uncompressed '
368 'or concat compressed modes')
f61e1822 369
f698c99c
PG
370 password, paramversion = self.ENCRYPTION or (None, None)
371 deltatar = DeltaTar(mode=self.MODE, password=password,
372 crypto_paramversion=paramversion,
14e2e92d
DGM
373 logger=self.consoleLogger)
374
14e2e92d
DGM
375 self.hash = dict()
376 os.makedirs('source_dir2')
82f75df4 377 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 3*1024*1024)
14e2e92d
DGM
378 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 4*1024*1024)
379 self.hash["source_dir2/huge2"] = self.create_file("source_dir2/huge2", 4*1024*1024)
380
381 # create first backup
382 deltatar.create_full_backup(
383 source_path="source_dir2",
384 backup_path="backup_dir",
385 max_volume_size=2)
386
387 assert os.path.exists("backup_dir")
388 assert os.path.exists(os.path.join("backup_dir",
389 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
390 if self.MODE_COMPRESSES:
391 n_vols = 1
392 else:
393 n_vols = 6
394 for i_vol in range(n_vols):
395 assert os.path.exists(os.path.join("backup_dir",
396 deltatar.volume_name_func("backup_dir", True, i_vol)))
397 assert not os.path.exists(os.path.join("backup_dir",
398 deltatar.volume_name_func("backup_dir", True, n_vols)))
14e2e92d
DGM
399
400 shutil.rmtree("source_dir2")
401
402 index_filename = deltatar.index_name_func(True)
403 index_path = os.path.join("backup_dir", index_filename)
404
405 deltatar.restore_backup(target_path="source_dir2",
406 backup_indexes_paths=[index_path])
407
408 for key, value in self.hash.items():
409 assert os.path.exists(key)
410 if value:
411 assert value == self.md5sum(key)
412
413
9eae9a1f
ERE
414 def test_full_backup_index_extra_data(self):
415 '''
416 Tests that the index file for a full backup can store extra_data and
417 that this data can be retrieved.
418 '''
f698c99c
PG
419 password, paramversion = self.ENCRYPTION or (None, None)
420 deltatar = DeltaTar(mode=self.MODE, password=password,
421 crypto_paramversion=paramversion,
9eae9a1f
ERE
422 logger=self.consoleLogger)
423
424 extra_data = dict(
425 hola="caracola",
426 otra_cosa=[1, "lista"],
427 y_otra=dict(bola=1.1)
428 )
429
430 deltatar.create_full_backup(
431 source_path="source_dir",
432 backup_path="backup_dir",
433 extra_data=extra_data)
434
435 index_filename = deltatar.index_name_func(is_full=True)
436 index_path = os.path.join("backup_dir", index_filename)
437
438 # iterate_index_path retrieves extra_data, and thus we can then compare
439 index_it = deltatar.iterate_index_path(index_path)
440 self.assertEqual(index_it.extra_data, extra_data)
441
442
443 def test_diff_backup_index_extra_data(self):
444 '''
445 Tests that the index file for a diff backup can store extra_data and
446 that this data can be retrieved.
447 '''
f698c99c
PG
448 password, paramversion = self.ENCRYPTION or (None, None)
449 deltatar = DeltaTar(mode=self.MODE, password=password,
450 crypto_paramversion=paramversion,
9eae9a1f
ERE
451 logger=self.consoleLogger)
452
453 extra_data = dict(
454 hola="caracola",
455 otra_cosa=[1, "lista"],
456 y_otra=dict(bola=1.1)
457 )
458 # do first backup
459 deltatar.create_full_backup(
460 source_path="source_dir",
461 backup_path="backup_dir")
462
463
464 prev_index_filename = deltatar.index_name_func(is_full=True)
465 prev_index_path = os.path.join("backup_dir", prev_index_filename)
466
467 # create empty diff backup
468 deltatar.create_diff_backup("source_dir", "backup_dir2",
469 prev_index_path, extra_data=extra_data)
470
471 index_filename = deltatar.index_name_func(is_full=False)
472 index_path = os.path.join("backup_dir2", index_filename)
473
474 # iterate_index_path retrieves extra_data, and thus we can then compare
475 index_it = deltatar.iterate_index_path(index_path)
476 self.assertEqual(index_it.extra_data, extra_data)
477
8825be52
DGM
478 def test_restore_multivol2(self):
479 '''
480 Creates a full backup without any filtering with multiple volumes and
481 restore it.
482 '''
f698c99c
PG
483 password, paramversion = self.ENCRYPTION or (None, None)
484 deltatar = DeltaTar(mode=self.MODE, password=password,
485 crypto_paramversion=paramversion,
8825be52
DGM
486 logger=self.consoleLogger)
487
188b845d 488 shutil.copytree(self.GIT_DIR, "source_dir2")
8825be52
DGM
489
490 # create first backup
491 deltatar.create_full_backup(
492 source_path="source_dir2",
493 backup_path="backup_dir",
494 max_volume_size=1)
495
496 assert os.path.exists("backup_dir")
497 assert os.path.exists(os.path.join("backup_dir",
498 deltatar.volume_name_func("backup_dir", True, 0)))
8825be52
DGM
499
500 shutil.rmtree("source_dir2")
501
502 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
503 tar_path = os.path.join("backup_dir", tar_filename)
504
505 # this should automatically restore all volumes
506 deltatar.restore_backup(target_path="source_dir2",
507 backup_tar_path=tar_path)
508
188b845d 509 self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
8825be52 510
b8fc2f5d
ERE
511 def test_restore_multivol_manual_from_index(self):
512 '''
513 Creates a full backup without any filtering with multiple volumes and
514 restore it.
515 '''
516 # this test only works for uncompressed or concat compressed modes
517 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
518 raise SkipTest('this test only works for uncompressed '
519 'or concat compressed modes')
b8fc2f5d 520
f698c99c
PG
521 password, paramversion = self.ENCRYPTION or (None, None)
522 deltatar = DeltaTar(mode=self.MODE, password=password,
523 crypto_paramversion=paramversion,
b8fc2f5d
ERE
524 logger=self.consoleLogger)
525
b8fc2f5d
ERE
526 self.hash = dict()
527 os.makedirs('source_dir2')
528 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
529 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
530
531 # create first backup
532 deltatar.create_full_backup(
533 source_path="source_dir2",
534 backup_path="backup_dir",
535 max_volume_size=1)
536
537 assert os.path.exists("backup_dir")
538 assert os.path.exists(os.path.join("backup_dir",
539 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
540 if self.MODE_COMPRESSES:
541 n_vols = 1
542 else:
543 n_vols = 2
544 for i_vol in range(n_vols):
545 assert os.path.exists(os.path.join("backup_dir",
546 deltatar.volume_name_func("backup_dir", True, i_vol)))
547 assert not os.path.exists(os.path.join("backup_dir",
548 deltatar.volume_name_func("backup_dir", True, n_vols)))
b8fc2f5d
ERE
549
550 shutil.rmtree("source_dir2")
551
552 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
553 tar_path = os.path.join("backup_dir", tar_filename)
554
555 index_filename = deltatar.index_name_func(True)
556 index_path = os.path.join("backup_dir", index_filename)
557
558 # this should automatically restore the huge file
9eccb1c2 559 f = deltatar.open_auxiliary_file(index_path, 'r')
10798176 560 offset = None
2967b3e1
ERE
561 while True:
562 l = f.readline()
563 if not len(l):
564 break
be60ffd0 565 data = json.loads(l.decode('UTF-8'))
8adbe50d 566 if data.get('type', '') == 'file' and\
eb6d0069 567 deltatar.unprefixed(data['path']) == "huge":
b8fc2f5d
ERE
568 offset = data['offset']
569 break
570
10798176
CH
571 assert offset is not None
572
be60ffd0 573 fo = open(tar_path, 'rb')
b8fc2f5d
ERE
574 fo.seek(offset)
575 def new_volume_handler(mode, tarobj, base_name, volume_number):
f698c99c
PG
576 suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
577 if self.ENCRYPTION is not None:
578 # deltatar module is shadowed here
579 suf += "." + deltatar_PDTCRYPT_EXTENSION
b8fc2f5d 580 tarobj.open_volume(datetime.now().strftime(
f698c99c 581 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
b8fc2f5d
ERE
582 new_volume_handler = partial(new_volume_handler, self.MODE)
583
f698c99c
PG
584 crypto_ctx = None
585 if self.ENCRYPTION is not None:
586 crypto_ctx = crypto.Decrypt (password)
587
b8fc2f5d 588 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
b8fc2f5d 589 new_volume_handler=new_volume_handler,
f698c99c
PG
590 encryption=crypto_ctx)
591
8adbe50d
ERE
592 member = tarobj.next()
593 member.path = deltatar.unprefixed(member.path)
594 member.name = deltatar.unprefixed(member.name)
595 tarobj.extract(member)
b8fc2f5d 596 tarobj.close()
c7609167 597 fo.close()
b8fc2f5d
ERE
598 assert self.hash['source_dir2/huge'] == self.md5sum('huge')
599
600 os.unlink("huge")
6c678f3a 601
f698c99c 602
9e092947
PG
603 def test_restore_manual_from_index_twice (self):
604 """
605 Creates a full backup and restore the same file twice. This *must* fail
606 when encryption is active.
607
608 Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
609 backwards within the same file. This prevents the encryption layer from
610 exploding due to a reused IV in an overall valid archive.
611
612 This test anticipates possible future mistakes since it’s entirely
613 feasible to implement backward seeks for *_Stream* with concat mode.
614 """
615 # this test only works for uncompressed or concat compressed modes
616 if self.MODE.startswith("|") or self.MODE_COMPRESSES:
617 raise SkipTest("this test only works for uncompressed "
618 "or concat compressed modes")
619
620 password, paramversion = self.ENCRYPTION or (None, None)
621 deltatar = DeltaTar(mode=self.MODE, password=password,
622 crypto_paramversion=paramversion,
623 logger=self.consoleLogger)
624
625 self.hash = dict()
626 os.makedirs("source_dir2")
627 self.hash["source_dir2/samefile"] = \
628 self.create_file("source_dir2/samefile", 1 * 1024)
629
630 # create first backup
631 deltatar.create_full_backup(
632 source_path="source_dir2",
633 backup_path="backup_dir")
634
635 assert os.path.exists("backup_dir")
636 assert os.path.exists(os.path.join("backup_dir",
637 deltatar.volume_name_func("backup_dir", True, 0)))
638
639 shutil.rmtree("source_dir2")
640
641 tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
642 tar_path = os.path.join("backup_dir", tar_filename)
643
644 index_filename = deltatar.index_name_func(True)
645 index_path = os.path.join("backup_dir", index_filename)
646
647 f = deltatar.open_auxiliary_file(index_path, "r")
648 offset = None
649 while True:
650 l = f.readline()
651 if not len(l):
652 break
653 data = json.loads(l.decode("UTF-8"))
654 if data.get("type", "") == "file" and\
655 deltatar.unprefixed(data["path"]) == "samefile":
656 offset = data["offset"]
657 break
658
659 assert offset is not None
660
661 fo = open(tar_path, "rb")
662 fo.seek(offset)
663
664 crypto_ctx = None
665 if self.ENCRYPTION is not None:
666 crypto_ctx = crypto.Decrypt (password)
667
668 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
669 encryption=crypto_ctx)
670 member = tarobj.next()
671 member.path = deltatar.unprefixed(member.path)
672 member.name = deltatar.unprefixed(member.name)
673
674 # extract once …
675 tarobj.extract(member)
676 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
677
678 # … and twice
679 try:
680 tarobj.extract(member)
681 except tarfile.StreamError:
682 if crypto_ctx is not None:
683 pass # good: seeking backwards not allowed
684 else:
685 raise
686 tarobj.close()
687 fo.close()
688 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
689
690 os.unlink("samefile")
691
692
11684b1d
ERE
693 def test_restore_from_index(self):
694 '''
55b8686d 695 Restores a full backup using an index file.
11684b1d 696 '''
11684b1d 697 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
698 raise SkipTest('this test only works for uncompressed '
699 'or concat compressed modes')
11684b1d 700
f698c99c
PG
701 password, paramversion = self.ENCRYPTION or (None, None)
702 deltatar = DeltaTar(mode=self.MODE, password=password,
703 crypto_paramversion=paramversion,
11684b1d
ERE
704 logger=self.consoleLogger)
705
706 # create first backup
707 deltatar.create_full_backup(
708 source_path="source_dir",
55b8686d 709 backup_path="backup_dir")
11684b1d
ERE
710
711 shutil.rmtree("source_dir")
712
713 # this should automatically restore all volumes
714 index_filename = deltatar.index_name_func(True)
715 index_path = os.path.join("backup_dir", index_filename)
716
717 deltatar.restore_backup(target_path="source_dir",
718 backup_indexes_paths=[index_path])
719
be60ffd0 720 for key, value in self.hash.items():
11684b1d
ERE
721 assert os.path.exists(key)
722 if value:
723 assert value == self.md5sum(key)
6c678f3a 724
b8fc2f5d
ERE
725 def test_restore_multivol_from_index(self):
726 '''
727 Restores a full multivolume backup using an index file.
728 '''
b8fc2f5d 729 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
730 raise SkipTest('this test only works for uncompressed '
731 'or concat compressed modes')
b8fc2f5d 732
f698c99c
PG
733 password, paramversion = self.ENCRYPTION or (None, None)
734 deltatar = DeltaTar(mode=self.MODE, password=password,
735 crypto_paramversion=paramversion,
b8fc2f5d
ERE
736 logger=self.consoleLogger)
737
738 # create first backup
739 deltatar.create_full_backup(
740 source_path="source_dir",
741 backup_path="backup_dir",
8390c925 742 max_volume_size=2)
b8fc2f5d
ERE
743
744 shutil.rmtree("source_dir")
745
746 # this should automatically restore all volumes
747 index_filename = deltatar.index_name_func(True)
748 index_path = os.path.join("backup_dir", index_filename)
749
750 deltatar.restore_backup(target_path="source_dir",
751 backup_indexes_paths=[index_path])
752
be60ffd0 753 for key, value in self.hash.items():
b8fc2f5d
ERE
754 assert os.path.exists(key)
755 if value:
756 assert value == self.md5sum(key)
da26094a 757
c1af2184
ERE
758 def test_create_basic_filtering(self):
759 '''
760 Tests create backup basic filtering.
761 '''
f698c99c
PG
762 password, paramversion = self.ENCRYPTION or (None, None)
763 deltatar = DeltaTar(mode=self.MODE, password=password,
764 crypto_paramversion=paramversion,
c1af2184 765 logger=self.consoleLogger,
eb6d0069
CH
766 included_files=["test", "small"],
767 excluded_files=["test/huge"])
c1af2184
ERE
768
769 # create first backup
770 deltatar.create_full_backup(
771 source_path="source_dir",
772 backup_path="backup_dir")
773
774 assert os.path.exists("backup_dir")
775 shutil.rmtree("source_dir")
776
777 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
778 tar_path = os.path.join("backup_dir", tar_filename)
779
780 deltatar.restore_backup(target_path="source_dir",
781 backup_tar_path=tar_path)
782
eb6d0069
CH
783 assert os.path.exists("source_dir/small")
784 assert os.path.exists("source_dir/test")
785 assert os.path.exists("source_dir/test/huge2")
786 assert os.path.exists("source_dir/test/test2")
c1af2184 787
eb6d0069
CH
788 assert not os.path.exists("source_dir/test/huge")
789 assert not os.path.exists("source_dir/big")
c1af2184 790
862b3726
ERE
791 def test_create_filter_func(self):
792 '''
793 Tests create backup basic filtering.
794 '''
795 visited_paths = []
796 def filter_func(visited_paths, path):
797 if path not in visited_paths:
798 visited_paths.append(path)
799 return True
800
801 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
802
803 password, paramversion = self.ENCRYPTION or (None, None)
804 deltatar = DeltaTar(mode=self.MODE, password=password,
805 crypto_paramversion=paramversion,
862b3726 806 logger=self.consoleLogger,
eb6d0069
CH
807 included_files=["test", "small"],
808 excluded_files=["test/huge"],
862b3726
ERE
809 filter_func=filter_func)
810
811 # create first backup
812 deltatar.create_full_backup(
813 source_path="source_dir",
814 backup_path="backup_dir")
815
816 assert os.path.exists("backup_dir")
817 shutil.rmtree("source_dir")
818
819 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
820 tar_path = os.path.join("backup_dir", tar_filename)
821
822 deltatar.restore_backup(target_path="source_dir",
823 backup_tar_path=tar_path)
9af328e2 824 assert set(visited_paths) == set([
eb6d0069
CH
825 'small',
826 'test',
827 'test/huge2',
828 'test/test2'
9af328e2 829 ])
862b3726
ERE
830
831 def test_create_filter_out_func(self):
832 '''
833 Tests create backup basic filtering.
834 '''
835 visited_paths = []
836 def filter_func(visited_paths, path):
837 '''
838 Filter out everything
839 '''
840 if path not in visited_paths:
841 visited_paths.append(path)
842 return False
843
844 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
845
846 password, paramversion = self.ENCRYPTION or (None, None)
847 deltatar = DeltaTar(mode=self.MODE, password=password,
848 crypto_paramversion=paramversion,
862b3726 849 logger=self.consoleLogger,
eb6d0069
CH
850 included_files=["test", "small"],
851 excluded_files=["test/huge"],
862b3726
ERE
852 filter_func=filter_func)
853
854 # create first backup
855 deltatar.create_full_backup(
856 source_path="source_dir",
857 backup_path="backup_dir")
858
859 assert os.path.exists("backup_dir")
860 shutil.rmtree("source_dir")
861
862 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
863 tar_path = os.path.join("backup_dir", tar_filename)
864
865 deltatar.restore_backup(target_path="source_dir",
866 backup_tar_path=tar_path)
9af328e2 867 assert set(visited_paths) == set([
eb6d0069
CH
868 'small',
869 'test'
9af328e2 870 ])
862b3726
ERE
871
872 # check that effectively no file was backed up
eb6d0069
CH
873 assert not os.path.exists("source_dir/small")
874 assert not os.path.exists("source_dir/big")
875 assert not os.path.exists("source_dir/test")
862b3726 876
9af328e2
ERE
877 def test_restore_index_basic_filtering(self):
878 '''
879 Creates a backup, and then filter when doing the index based restore.
880 '''
f391d8e9 881 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
882 raise SkipTest('this test only works for uncompressed '
883 'or concat compressed modes')
f391d8e9 884
f698c99c
PG
885 password, paramversion = self.ENCRYPTION or (None, None)
886 deltatar = DeltaTar(mode=self.MODE, password=password,
887 crypto_paramversion=paramversion,
9af328e2
ERE
888 logger=self.consoleLogger)
889
890 # create first backup
891 deltatar.create_full_backup(
892 source_path="source_dir",
893 backup_path="backup_dir")
894
895 assert os.path.exists("backup_dir")
896 shutil.rmtree("source_dir")
897
898 index_filename = deltatar.index_name_func(True)
899 index_path = os.path.join("backup_dir", index_filename)
900
eb6d0069
CH
901 deltatar.included_files = ["test", "small"]
902 deltatar.excluded_files = ["test/huge"]
9af328e2
ERE
903 deltatar.restore_backup(target_path="source_dir",
904 backup_indexes_paths=[index_path])
905
eb6d0069
CH
906 assert os.path.exists("source_dir/small")
907 assert os.path.exists("source_dir/test")
908 assert os.path.exists("source_dir/test/huge2")
909 assert os.path.exists("source_dir/test/test2")
9af328e2 910
eb6d0069
CH
911 assert not os.path.exists("source_dir/test/huge")
912 assert not os.path.exists("source_dir/big")
9af328e2
ERE
913
914 def test_restore_index_filter_func(self):
915 '''
916 Creates a backup, and then filter when doing the index based restore,
917 using the filter function.
918 '''
f391d8e9 919 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
920 raise SkipTest('this test only works for uncompressed '
921 'or concat compressed modes')
f391d8e9 922
9af328e2
ERE
923 visited_paths = []
924 def filter_func(visited_paths, path):
925 if path not in visited_paths:
926 visited_paths.append(path)
927 return True
928
929 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
930
931 password, paramversion = self.ENCRYPTION or (None, None)
932 deltatar = DeltaTar(mode=self.MODE, password=password,
933 crypto_paramversion=paramversion,
9af328e2
ERE
934 logger=self.consoleLogger)
935
936 # create first backup
937 deltatar.create_full_backup(
938 source_path="source_dir",
939 backup_path="backup_dir")
940
941 assert os.path.exists("backup_dir")
942 shutil.rmtree("source_dir")
943
944 index_filename = deltatar.index_name_func(True)
945 index_path = os.path.join("backup_dir", index_filename)
946
eb6d0069
CH
947 deltatar.included_files = ["test", "small"]
948 deltatar.excluded_files = ["test/huge"]
9af328e2
ERE
949 deltatar.filter_func = filter_func
950 deltatar.restore_backup(target_path="source_dir",
951 backup_indexes_paths=[index_path])
952
953 assert set(visited_paths) == set([
eb6d0069
CH
954 'small',
955 'test',
956 'test/huge2',
957 'test/test2'
9af328e2
ERE
958 ])
959
e5f5681b
ERE
960 def test_restore_tar_basic_filtering(self):
961 '''
962 Creates a backup, and then filter when doing the tar based restore.
963 '''
f698c99c
PG
964 password, paramversion = self.ENCRYPTION or (None, None)
965 deltatar = DeltaTar(mode=self.MODE, password=password,
966 crypto_paramversion=paramversion,
e5f5681b
ERE
967 logger=self.consoleLogger)
968
969 # create first backup
970 deltatar.create_full_backup(
971 source_path="source_dir",
972 backup_path="backup_dir")
973
974 assert os.path.exists("backup_dir")
975 shutil.rmtree("source_dir")
976
eb6d0069
CH
977 deltatar.included_files = ["test", "small"]
978 deltatar.excluded_files = ["test/huge"]
e5f5681b
ERE
979
980 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
981 tar_path = os.path.join("backup_dir", tar_filename)
982
983 deltatar.restore_backup(target_path="source_dir",
984 backup_tar_path=tar_path)
985
eb6d0069
CH
986 assert os.path.exists("source_dir/small")
987 assert os.path.exists("source_dir/test")
988 assert os.path.exists("source_dir/test/huge2")
989 assert os.path.exists("source_dir/test/test2")
e5f5681b 990
eb6d0069
CH
991 assert not os.path.exists("source_dir/test/huge")
992 assert not os.path.exists("source_dir/big")
e5f5681b
ERE
993
994 def test_restore_tar_filter_func(self):
995 '''
996 Creates a backup, and then filter when doing the tar based restore,
997 using the filter function.
998 '''
999 visited_paths = []
1000 def filter_func(visited_paths, path):
1001 if path not in visited_paths:
1002 visited_paths.append(path)
1003 return True
1004
1005 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
1006
1007 password, paramversion = self.ENCRYPTION or (None, None)
1008 deltatar = DeltaTar(mode=self.MODE, password=password,
1009 crypto_paramversion=paramversion,
e5f5681b
ERE
1010 logger=self.consoleLogger)
1011
1012 # create first backup
1013 deltatar.create_full_backup(
1014 source_path="source_dir",
1015 backup_path="backup_dir")
1016
1017 assert os.path.exists("backup_dir")
1018 shutil.rmtree("source_dir")
1019
1020 index_filename = deltatar.index_name_func(True)
1021 index_path = os.path.join("backup_dir", index_filename)
1022
eb6d0069
CH
1023 deltatar.included_files = ["test", "small"]
1024 deltatar.excluded_files = ["test/huge"]
e5f5681b
ERE
1025 deltatar.filter_func = filter_func
1026
1027 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1028 tar_path = os.path.join("backup_dir", tar_filename)
1029
1030 deltatar.restore_backup(target_path="source_dir",
1031 backup_tar_path=tar_path)
1032 assert set(visited_paths) == set([
eb6d0069
CH
1033 'small',
1034 'test',
1035 'test/huge2',
1036 'test/test2'
e5f5681b
ERE
1037 ])
1038
974408b5 1039 def test_filter_path_regexp(self):
0d5c1970
ERE
1040 '''
1041 Test specifically the deltatar.filter_path function with regular
1042 expressions
1043 '''
1044 included_files = [
eb6d0069
CH
1045 re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1046 re.compile('^yes$'),
1047 'testing'
0d5c1970
ERE
1048 ]
1049 excluded_files = [
eb6d0069 1050 re.compile('^testing/in_the'),
0d5c1970
ERE
1051 ]
1052 deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1053 excluded_files=excluded_files)
1054
1055 # assert valid and invalid paths
eb6d0069
CH
1056 assert deltatar.filter_path('test/hola')
1057 assert deltatar.filter_path('test/hola/any/thing')
1058 assert deltatar.filter_path('test/caracola/caracolero')
1059 assert deltatar.filter_path('test/caracola/caracolero/yeah')
1060 assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1061 assert deltatar.filter_path('yes')
1062 assert deltatar.filter_path('testing')
1063 assert deltatar.filter_path('testing/yes')
1064 assert deltatar.filter_path('testing/in_th')
1065
1066 assert not deltatar.filter_path('something')
1067 assert not deltatar.filter_path('other/thing')
1068 assert not deltatar.filter_path('test_ing')
1069 assert not deltatar.filter_path('test/hola_lala')
1070 assert not deltatar.filter_path('test/agur')
1071 assert not deltatar.filter_path('testing_something')
1072 assert not deltatar.filter_path('yeso')
1073 assert not deltatar.filter_path('yes/o')
1074 assert not deltatar.filter_path('yes_o')
1075 assert not deltatar.filter_path('testing/in_the')
1076 assert not deltatar.filter_path('testing/in_the_field')
1077 assert not deltatar.filter_path('testing/in_the/field')
e5f5681b 1078
974408b5
ERE
1079 def test_filter_path_parent(self):
1080 '''
1081 Test specifically the deltatar.filter_path function for parent matching
1082 '''
1083 included_files = [
eb6d0069 1084 'testing/path/to/some/thing'
974408b5
ERE
1085 ]
1086 deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1087
1088 # assert valid and invalid paths
eb6d0069
CH
1089 assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1090 assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1091 assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1092 assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1093 assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1094 assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1095 assert deltatar.filter_path('testing/something/else') == NO_MATCH
974408b5
ERE
1096
1097 def test_parent_matching_simple_full_backup(self):
1098 '''
1099 Create a full backup using parent matching
1100 '''
1101 included_files = [
eb6d0069 1102 'test/huge2'
974408b5 1103 ]
f698c99c
PG
1104
1105 password, paramversion = self.ENCRYPTION or (None, None)
1106 deltatar = DeltaTar(mode=self.MODE, password=password,
1107 crypto_paramversion=paramversion,
974408b5
ERE
1108 logger=self.consoleLogger,
1109 included_files=included_files)
1110
1111 # create first backup
1112 deltatar.create_full_backup(
1113 source_path="source_dir",
1114 backup_path="backup_dir")
1115
1116 assert os.path.exists("backup_dir")
1117 shutil.rmtree("source_dir")
1118
1119 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1120 tar_path = os.path.join("backup_dir", tar_filename)
1121
f698c99c 1122 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1123 logger=self.consoleLogger)
1124 deltatar.restore_backup(target_path="source_dir",
1125 backup_tar_path=tar_path)
1126
1127 assert os.path.exists('source_dir/test/huge2')
1128 assert os.path.exists('source_dir/test/')
1129 assert not os.path.exists('source_dir/test/huge')
1130 assert not os.path.exists('source_dir/big')
1131 assert not os.path.exists('source_dir/small')
1132
1133 def test_parent_matching_simple_full_backup_restore(self):
1134 '''
1135 Create a full backup and restores it using parent matching
1136 '''
1137 included_files = [
eb6d0069 1138 'test/huge2'
974408b5 1139 ]
f698c99c
PG
1140
1141 password, paramversion = self.ENCRYPTION or (None, None)
1142 deltatar = DeltaTar(mode=self.MODE, password=password,
1143 crypto_paramversion=paramversion,
974408b5
ERE
1144 logger=self.consoleLogger)
1145
1146 # create first backup
1147 deltatar.create_full_backup(
1148 source_path="source_dir",
1149 backup_path="backup_dir")
1150
1151 assert os.path.exists("backup_dir")
1152 shutil.rmtree("source_dir")
1153
1154 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1155 tar_path = os.path.join("backup_dir", tar_filename)
1156
f698c99c 1157 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1158 logger=self.consoleLogger,
1159 included_files=included_files)
1160 deltatar.restore_backup(target_path="source_dir",
1161 backup_tar_path=tar_path)
1162
1163 assert os.path.exists('source_dir/test/huge2')
1164 assert os.path.exists('source_dir/test/')
1165 assert not os.path.exists('source_dir/test/huge')
1166 assert not os.path.exists('source_dir/big')
1167 assert not os.path.exists('source_dir/small')
1168
1169 def test_parent_matching_index_full_backup_restore(self):
1170 '''
1171 Create a full backup and restores it using parent matching
1172 '''
1173 included_files = [
eb6d0069 1174 'test/huge2'
974408b5 1175 ]
f698c99c
PG
1176
1177 password, paramversion = self.ENCRYPTION or (None, None)
1178 deltatar = DeltaTar(mode=self.MODE, password=password,
1179 crypto_paramversion=paramversion,
974408b5
ERE
1180 logger=self.consoleLogger)
1181
1182 # create first backup
1183 deltatar.create_full_backup(
1184 source_path="source_dir",
1185 backup_path="backup_dir")
1186
1187 assert os.path.exists("backup_dir")
1188 shutil.rmtree("source_dir")
1189
1190 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1191 tar_path = os.path.join("backup_dir", tar_filename)
1192
f698c99c 1193 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1194 logger=self.consoleLogger,
1195 included_files=included_files)
1196 deltatar.restore_backup(target_path="source_dir",
1197 backup_tar_path=tar_path)
1198
1199 assert os.path.exists('source_dir/test/huge2')
1200 assert os.path.exists('source_dir/test/')
1201 assert not os.path.exists('source_dir/test/huge')
1202 assert not os.path.exists('source_dir/big')
1203 assert not os.path.exists('source_dir/small')
1204
d07c8065
ERE
1205 def test_collate_iterators(self):
1206 '''
1207 Tests the collate iterators functionality with two exact directories,
1208 using an index iterator from a backup and the exact same source dir.
1209 '''
f698c99c
PG
1210 password, paramversion = self.ENCRYPTION or (None, None)
1211 deltatar = DeltaTar(mode=self.MODE, password=password,
1212 crypto_paramversion=paramversion,
d07c8065
ERE
1213 logger=self.consoleLogger)
1214
1215 # create first backup
1216 deltatar.create_full_backup(
1217 source_path="source_dir",
1218 backup_path="backup_dir")
1219
1220 assert os.path.exists("backup_dir")
1221
1222 cwd = os.getcwd()
1223 index_filename = deltatar.index_name_func(is_full=True)
1224 index_path = os.path.join(cwd, "backup_dir", index_filename)
1225 index_it = deltatar.iterate_index_path(index_path)
1226
1227 os.chdir('source_dir')
1228 dir_it = deltatar._recursive_walk_dir('.')
1229 path_it = deltatar.jsonize_path_iterator(dir_it)
1230
1231 try:
ea6d3c3e 1232 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
d07c8065
ERE
1233 assert deltatar._equal_stat_dicts(path1, path2)
1234 finally:
1235 os.chdir(cwd)
1236
1237 def test_collate_iterators_diffdirs(self):
1238 '''
1239 Use the collate iterators functionality with two different directories.
1240 It must behave in an expected way.
1241 '''
1242 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1243
f698c99c
PG
1244 password, paramversion = self.ENCRYPTION or (None, None)
1245 deltatar = DeltaTar(mode=self.MODE, password=password,
1246 crypto_paramversion=paramversion,
d07c8065
ERE
1247 logger=self.consoleLogger)
1248
1249 # create first backup
1250 deltatar.create_full_backup(
1251 source_path="source_dir",
1252 backup_path="backup_dir")
1253
1254 assert os.path.exists("backup_dir")
1255 self.hash["source_dir/z"] = self.create_file("source_dir/z", 100)
1256
1257 cwd = os.getcwd()
1258 index_filename = deltatar.index_name_func(is_full=True)
1259 index_path = os.path.join(cwd, "backup_dir", index_filename)
1260 index_it = deltatar.iterate_index_path(index_path)
1261
1262 os.chdir('source_dir')
1263 dir_it = deltatar._recursive_walk_dir('.')
1264 path_it = deltatar.jsonize_path_iterator(dir_it)
1265
1266 try:
ea6d3c3e 1267 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
eb6d0069 1268 if path2['path'] == 'z':
d07c8065
ERE
1269 assert not path1
1270 else:
1271 assert deltatar._equal_stat_dicts(path1, path2)
1272 finally:
1273 os.chdir(cwd)
1274
42d39ca7 1275 def test_collate_iterators_diffdirs2(self):
aae127d0 1276 '''
42d39ca7
ERE
1277 Use the collate iterators functionality with two different directories.
1278 It must behave in an expected way.
aae127d0 1279 '''
f698c99c
PG
1280 password, paramversion = self.ENCRYPTION or (None, None)
1281 deltatar = DeltaTar(mode=self.MODE, password=password,
1282 crypto_paramversion=paramversion,
42d39ca7
ERE
1283 logger=self.consoleLogger)
1284
1285 # create first backup
1286 deltatar.create_full_backup(
1287 source_path="source_dir",
1288 backup_path="backup_dir")
1289
1290 assert os.path.exists("backup_dir")
1291
1292 # add some new files and directories
1293 os.makedirs('source_dir/bigdir')
1294 self.hash["source_dir/bigdir"] = ""
1295 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1296 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
aae127d0
ERE
1297 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1298
42d39ca7
ERE
1299 cwd = os.getcwd()
1300 index_filename = deltatar.index_name_func(is_full=True)
1301 index_path = os.path.join(cwd, "backup_dir", index_filename)
1302 index_it = deltatar.iterate_index_path(index_path)
1303
1304 os.chdir('source_dir')
1305 dir_it = deltatar._recursive_walk_dir('.')
1306 path_it = deltatar.jsonize_path_iterator(dir_it)
1307
1308 visited_pairs = []
1309
1310 try:
ea6d3c3e 1311 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
42d39ca7
ERE
1312 visited_pairs.append(
1313 (deltatar.unprefixed(path1['path']) if path1 else None,
1314 path2['path'] if path2 else None)
1315 )
1316 finally:
1317 assert visited_pairs == [
eb6d0069
CH
1318 (u'big', u'big'),
1319 (None, u'bigdir'),
1320 (u'small', u'small'),
1321 (u'test', u'test'),
1322 (None, u'zzzz'),
1323 (None, u'bigdir/a'),
1324 (None, u'bigdir/b'),
1325 (u'test/huge', u'test/huge'),
1326 (u'test/huge2', u'test/huge2'),
1327 (u'test/test2', u'test/test2'),
42d39ca7
ERE
1328 ]
1329 os.chdir(cwd)
1330
1331 def test_create_empty_diff_backup(self):
1332 '''
1333 Creates an empty (no changes) backup diff
1334 '''
f698c99c
PG
1335 password, paramversion = self.ENCRYPTION or (None, None)
1336 deltatar = DeltaTar(mode=self.MODE, password=password,
1337 crypto_paramversion=paramversion,
aae127d0
ERE
1338 logger=self.consoleLogger)
1339
1340 # create first backup
1341 deltatar.create_full_backup(
1342 source_path="source_dir",
1343 backup_path="backup_dir")
1344
1345 prev_index_filename = deltatar.index_name_func(is_full=True)
1346 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1347
1348 deltatar.create_diff_backup("source_dir", "backup_dir2",
1349 prev_index_path)
1350
1351 # check index items
df86af81
ERE
1352 index_path = os.path.join("backup_dir2",
1353 deltatar.index_name_func(is_full=False))
aae127d0 1354 index_it = deltatar.iterate_index_path(index_path)
82de3376 1355 n = 0
aae127d0 1356 for i in index_it:
82de3376 1357 n += 1
aae127d0 1358 assert i[0]['path'].startswith("list://")
9af328e2 1359
ea6d3c3e 1360 assert n == 6
8adbe50d
ERE
1361
1362 # check the tar file
82de3376
ERE
1363 assert os.path.exists("backup_dir2")
1364 shutil.rmtree("source_dir")
1365
df86af81
ERE
1366 tar_filename = deltatar.volume_name_func('backup_dir2',
1367 is_full=False, volume_number=0)
82de3376
ERE
1368 tar_path = os.path.join("backup_dir2", tar_filename)
1369
1370 # no file restored, because the diff was empty
1371 deltatar.restore_backup(target_path="source_dir",
1372 backup_tar_path=tar_path)
1373 assert len(os.listdir("source_dir")) == 0
1374
8adbe50d 1375
42d39ca7
ERE
1376 def test_create_diff_backup1(self):
1377 '''
1378 Creates a diff backup when there are new files
1379 '''
f698c99c
PG
1380 password, paramversion = self.ENCRYPTION or (None, None)
1381 deltatar = DeltaTar(mode=self.MODE, password=password,
1382 crypto_paramversion=paramversion,
42d39ca7
ERE
1383 logger=self.consoleLogger)
1384
1385 # create first backup
1386 deltatar.create_full_backup(
1387 source_path="source_dir",
1388 backup_path="backup_dir")
1389
1390 prev_index_filename = deltatar.index_name_func(is_full=True)
1391 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1392
1393 # add some new files and directories
1394 os.makedirs('source_dir/bigdir')
1395 self.hash["source_dir/bigdir"] = ""
0519f161 1396 os.unlink("source_dir/small")
42d39ca7
ERE
1397 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1398 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1399 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1400
1401 deltatar.create_diff_backup("source_dir", "backup_dir2",
1402 prev_index_path)
1403
1404 # check index items
df86af81 1405 index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
42d39ca7 1406 index_it = deltatar.iterate_index_path(index_path)
df86af81
ERE
1407 l = [i[0]['path'] for i in index_it]
1408
1409 assert l == [
eb6d0069
CH
1410 'list://big',
1411 'snapshot://bigdir',
1412 'delete://small',
1413 'list://test',
1414 'snapshot://zzzz',
1415 'snapshot://bigdir/a',
1416 'snapshot://bigdir/b',
1417 'list://test/huge',
1418 'list://test/huge2',
1419 'list://test/test2',
df86af81 1420 ]
42d39ca7
ERE
1421
1422 # check the tar file
1423 assert os.path.exists("backup_dir2")
1424 shutil.rmtree("source_dir")
1425
0519f161
ERE
1426 # create source_dir with the small file, that will be then deleted by
1427 # the restore_backup
1428 os.mkdir("source_dir")
be60ffd0 1429 open("source_dir/small", 'wb').close()
0519f161 1430
df86af81
ERE
1431 tar_filename = deltatar.volume_name_func('backup_dir2',
1432 is_full=False, volume_number=0)
42d39ca7
ERE
1433 tar_path = os.path.join("backup_dir2", tar_filename)
1434
1435 # restore the backup, this will create only the new files
1436 deltatar.restore_backup(target_path="source_dir",
1437 backup_tar_path=tar_path)
a345b1c9
DGM
1438 # the order doesn't matter
1439 assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
42d39ca7 1440
df99a044
ERE
1441 def test_restore_from_index_diff_backup(self):
1442 '''
1443 Creates a full backup, modifies some files, creates a diff backup,
1444 then restores the diff backup from zero.
1445 '''
df99a044 1446 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1447 raise SkipTest('this test only works for uncompressed '
1448 'or concat compressed modes')
df99a044 1449
f698c99c
PG
1450 password, paramversion = self.ENCRYPTION or (None, None)
1451 deltatar = DeltaTar(mode=self.MODE, password=password,
1452 crypto_paramversion=paramversion,
df99a044
ERE
1453 logger=self.consoleLogger)
1454
1455 # create first backup
1456 deltatar.create_full_backup(
1457 source_path="source_dir",
1458 backup_path="backup_dir")
1459
1460 prev_index_filename = deltatar.index_name_func(is_full=True)
1461 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1462
1463 # add some new files and directories
1464 os.makedirs('source_dir/bigdir')
1465 self.hash["source_dir/bigdir"] = ""
1466 os.unlink("source_dir/small")
1467 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1468 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1469 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1470
1471 deltatar.create_diff_backup("source_dir", "backup_dir2",
1472 prev_index_path)
1473
1474 # apply diff backup in target_dir
df86af81 1475 index_filename = deltatar.index_name_func(is_full=False)
df99a044
ERE
1476 index_path = os.path.join("backup_dir2", index_filename)
1477 deltatar.restore_backup("target_dir",
1478 backup_indexes_paths=[index_path, prev_index_path])
1479
1480 # then compare the two directories source_dir and target_dir and check
1481 # they are the same
cbac9f0b 1482 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
df99a044
ERE
1483
1484 def test_restore_from_index_diff_backup2(self):
1485 '''
1486 Creates a full backup, modifies some files, creates a diff backup,
1487 then restores the diff backup with the full backup as a starting point.
1488 '''
df99a044 1489 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1490 raise SkipTest('this test only works for uncompressed '
1491 'or concat compressed modes')
df99a044 1492
f698c99c
PG
1493 password, paramversion = self.ENCRYPTION or (None, None)
1494 deltatar = DeltaTar(mode=self.MODE, password=password,
1495 crypto_paramversion=paramversion,
df99a044
ERE
1496 logger=self.consoleLogger)
1497
1498 # create first backup
1499 deltatar.create_full_backup(
1500 source_path="source_dir",
1501 backup_path="backup_dir")
1502
1503 prev_index_filename = deltatar.index_name_func(is_full=True)
1504 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1505
1506 # add some new files and directories
1507 os.makedirs('source_dir/bigdir')
1508 self.hash["source_dir/bigdir"] = ""
1509 os.unlink("source_dir/small")
1510 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1511 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1512 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
d86735e4 1513 shutil.rmtree("source_dir/test")
df99a044
ERE
1514
1515 deltatar.create_diff_backup("source_dir", "backup_dir2",
1516 prev_index_path)
1517
1518 # first restore initial backup in target_dir
df86af81 1519 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
df99a044
ERE
1520 tar_path = os.path.join("backup_dir", tar_filename)
1521 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1522
1523 # then apply diff backup in target_dir
df86af81 1524 index_filename = deltatar.index_name_func(is_full=False)
df99a044
ERE
1525 index_path = os.path.join("backup_dir2", index_filename)
1526 deltatar.restore_backup("target_dir",
1527 backup_indexes_paths=[index_path, prev_index_path])
1528
1529 # then compare the two directories source_dir and target_dir and check
1530 # they are the same
cbac9f0b
ERE
1531 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1532
1533 def test_restore_from_index_diff_backup3(self):
1534 '''
188b845d 1535 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
cbac9f0b
ERE
1536 diff backup, then restores the diff backup with the full backup as a
1537 starting point.
1538 '''
cbac9f0b 1539 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1540 raise SkipTest('this test only works for uncompressed '
1541 'or concat compressed modes')
cbac9f0b 1542
f698c99c
PG
1543 password, paramversion = self.ENCRYPTION or (None, None)
1544 deltatar = DeltaTar(mode=self.MODE, password=password,
1545 crypto_paramversion=paramversion,
cbac9f0b
ERE
1546 logger=self.consoleLogger)
1547
1548 shutil.rmtree("source_dir")
188b845d
TJ
1549 shutil.copytree(self.GIT_DIR, "source_dir")
1550 shutil.copytree(self.GIT_DIR, "source_dir_diff")
cbac9f0b
ERE
1551
1552 # create first backup
1553 deltatar.create_full_backup(
1554 source_path="source_dir",
1555 backup_path="backup_dir")
1556
1557 prev_index_filename = deltatar.index_name_func(is_full=True)
1558 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1559
1560 # alter the source_dir randomly
1561 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1562
1563 for path in source_it:
1564 # if path doesn't exist (might have previously removed) ignore it.
1565 # also ignore it (i.e. do not change it) 70% of the time
1566 if not os.path.exists(path) or random.random() < 0.7:
1567 continue
1568
1569 # remove the file
1570 if os.path.isdir(path):
1571 shutil.rmtree(path)
1572 else:
1573 os.unlink(path)
1574
1215b602 1575 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
cbac9f0b 1576 prev_index_path)
cbac9f0b
ERE
1577
1578 # first restore initial backup in target_dir
df86af81 1579 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
cbac9f0b
ERE
1580 tar_path = os.path.join("backup_dir", tar_filename)
1581 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1582
1583 # and check that target_dir equals to source_dir (which is the same as
188b845d 1584 # self.GIT_DIR initially)
cbac9f0b
ERE
1585 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1586
1587 # then apply diff backup in target_dir
df86af81 1588 index_filename = deltatar.index_name_func(is_full=False)
8825be52
DGM
1589 index_path = os.path.join("backup_dir2", index_filename)
1590 deltatar.restore_backup("target_dir",
1591 backup_indexes_paths=[index_path, prev_index_path])
1592
1593 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1594 # altered self.GIT_DIR directory)
8825be52
DGM
1595 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1596
1597 # then delete target_dir and apply diff backup from zero and check again
1598 shutil.rmtree("target_dir")
1599 deltatar.restore_backup("target_dir",
1600 backup_indexes_paths=[index_path, prev_index_path])
1601
1602 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1603 # altered self.GIT_DIR directory)
8825be52
DGM
1604 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1605
1606 def test_restore_from_index_diff_backup3_multivol(self):
1607 '''
188b845d 1608 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
8825be52
DGM
1609 diff backup, then restores the diff backup with the full backup as a
1610 starting point.
1611 '''
8825be52 1612 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1613 raise SkipTest('this test only works for uncompressed '
1614 'or concat compressed modes')
8825be52 1615
f698c99c
PG
1616 password, paramversion = self.ENCRYPTION or (None, None)
1617 deltatar = DeltaTar(mode=self.MODE, password=password,
1618 crypto_paramversion=paramversion,
8825be52
DGM
1619 logger=self.consoleLogger)
1620
1621 shutil.rmtree("source_dir")
188b845d
TJ
1622 shutil.copytree(self.GIT_DIR, "source_dir")
1623 shutil.copytree(self.GIT_DIR, "source_dir_diff")
8825be52
DGM
1624
1625 # create first backup
1626 deltatar.create_full_backup(
1627 source_path="source_dir",
1628 backup_path="backup_dir",
1629 max_volume_size=1)
1630
1631 prev_index_filename = deltatar.index_name_func(is_full=True)
1632 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1633
1634 # alter the source_dir randomly
1635 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1636
1637 for path in source_it:
1638 # if path doesn't exist (might have previously removed) ignore it.
1639 # also ignore it (i.e. do not change it) 70% of the time
1640 if not os.path.exists(path) or random.random() < 0.7:
1641 continue
1642
1643 # remove the file
1644 if os.path.isdir(path):
1645 shutil.rmtree(path)
1646 else:
1647 os.unlink(path)
1648
1649 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1650 prev_index_path, max_volume_size=1)
1651
1652 # first restore initial backup in target_dir
1653 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1654 tar_path = os.path.join("backup_dir", tar_filename)
1655 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1656
1657 # and check that target_dir equals to source_dir (which is the same as
188b845d 1658 # self.GIT_DIR initially)
8825be52
DGM
1659 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1660
1661 # then apply diff backup in target_dir
df86af81 1662 index_filename = deltatar.index_name_func(is_full=False)
cbac9f0b
ERE
1663 index_path = os.path.join("backup_dir2", index_filename)
1664 deltatar.restore_backup("target_dir",
1665 backup_indexes_paths=[index_path, prev_index_path])
1666
1667 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1668 # altered self.GIT_DIR directory)
cbac9f0b
ERE
1669 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1670
1671 # then delete target_dir and apply diff backup from zero and check again
1672 shutil.rmtree("target_dir")
1673 deltatar.restore_backup("target_dir",
1674 backup_indexes_paths=[index_path, prev_index_path])
1675
1676 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1677 # altered self.GIT_DIR directory)
cbac9f0b
ERE
1678 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1679
1680 def check_equal_dirs(self, path1, path2, deltatar):
1681 '''
1682 compare the two directories source_dir and target_dir and check
1683 # they are the same
1684 '''
eb6d0069 1685 source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
df99a044 1686 source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
eb6d0069 1687 target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
df99a044
ERE
1688 target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1689 while True:
1690 try:
be60ffd0
ERE
1691 sitem = next(source_it)
1692 titem = next(target_it)
df99a044
ERE
1693 except StopIteration:
1694 try:
be60ffd0 1695 titem = next(target_it)
df99a044
ERE
1696 raise Exception("iterators do not stop at the same time")
1697 except StopIteration:
1698 break
a638b8d7 1699 try:
5200d2aa 1700 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
be60ffd0 1701 except Exception as e:
5200d2aa
TJ
1702 print("SITEM: " + str(sitem))
1703 print("TITEM: " + str(titem))
a638b8d7 1704 raise e
df99a044 1705
f5d9144b
PG
1706 def test_create_no_symlinks(self):
1707 '''
1708 Creates a full backup from different varieties of symlinks. The
1709 extracted archive may not contain any symlinks but the file contents
1710 '''
1711
1712 os.system("rm -rf source_dir")
1713 os.makedirs("source_dir/symlinks")
1714 fd = os.open("source_dir/symlinks/valid_linkname",
1715 os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1716 os.write(fd, b"valid link target for symlink tests; please ignore\n")
1717 os.close(fd)
1718 # first one is good, the rest points nowhere
1719 self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1720 self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1721 self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1722 self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
f698c99c
PG
1723 password, paramversion = self.ENCRYPTION or (None, None)
1724 deltatar = DeltaTar(mode=self.MODE, password=password,
1725 crypto_paramversion=paramversion,
f5d9144b
PG
1726 logger=self.consoleLogger)
1727
1728 # create first backup
1729 deltatar.create_full_backup(source_path="source_dir",
1730 backup_path="backup_dir")
1731
1732 assert os.path.exists("backup_dir")
1733 shutil.rmtree("source_dir")
1734 assert not os.path.exists("source_dir")
1735
1736 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1737 tar_path = os.path.join("backup_dir", tar_filename)
1738
1739 deltatar.restore_backup(target_path="source_dir",
1740 backup_tar_path=tar_path)
1741
1742 for _r, _ds, fs in os.walk("source_dir/symlinks"):
1743 # only the valid link plus the linked file may be found in the
1744 # extracted archive
1745 assert len(fs) == 2
1746 for f in fs:
1747 # the link must have been resolved and file contents must match
1748 # the linked file
1749 assert not os.path.islink(f)
1750 with open("source_dir/symlinks/valid_linkname") as a:
1751 with open("source_dir/symlinks/whatever") as b:
1752 assert a.read() == b.read()
1753
83f5fd71
PG
1754 def test_restore_with_symlinks(self):
1755 '''
9b13f5c4
PG
1756 Creates a full backup containing different varieties of symlinks. All
1757 of them must be filtered out.
83f5fd71 1758 '''
f698c99c
PG
1759 password, paramversion = self.ENCRYPTION or (None, None)
1760 deltatar = DeltaTar(mode=self.MODE, password=password,
1761 crypto_paramversion=paramversion,
83f5fd71
PG
1762 logger=self.consoleLogger)
1763
1764 # create first backup
1765 deltatar.create_full_backup(source_path="source_dir",
1766 backup_path="backup_dir")
1767
1768 assert os.path.exists("backup_dir")
1769 shutil.rmtree("source_dir")
1770
1771 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1772 tar_path = os.path.join("backup_dir", tar_filename)
1773
1774 # add symlinks to existing archive
1775
9b13f5c4 1776 def add_symlink (a, name, dst):
83f5fd71
PG
1777 l = tarfile.TarInfo("snapshot://%s" % name)
1778 l.type = tarfile.SYMTYPE
1779 l.linkname = dst
1780 a.addfile(l)
9b13f5c4 1781 return name
83f5fd71 1782
5faea0e1
PG
1783 try:
1784 with tarfile.open(tar_path,mode="a") as a:
1785 checkme = \
1786 [ add_symlink(a, "symlinks/foo", "internal-file")
1787 , add_symlink(a, "symlinks/bar", "/absolute/path")
1788 , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1789 except tarfile.ReadError as e:
1790 if self.MODE == '#' or self.MODE.endswith ("gz"):
1791 checkme = []
1792 else:
1793 raise
1794 except ValueError as e:
1795 if self.MODE.startswith ('#'):
1796 checkme = []
1797 else:
1798 raise
83f5fd71
PG
1799
1800 deltatar.restore_backup(target_path="source_dir",
1801 backup_tar_path=tar_path)
1802
1803 # check what happened to our symlinks
9b13f5c4
PG
1804 for name in checkme:
1805 fullpath = os.path.join("source_dir", name)
1806 assert not os.path.exists(fullpath)
f5d9144b 1807
43ce978b
PG
1808 def test_restore_malicious_symlinks(self):
1809 '''
1810 Creates a full backup containing a symlink and a file of the same name.
1811 This simulates a symlink attack with a link pointing to some external
1812 path that is abused to write outside the extraction prefix.
1813 '''
f698c99c
PG
1814 password, paramversion = self.ENCRYPTION or (None, None)
1815 deltatar = DeltaTar(mode=self.MODE, password=password,
1816 crypto_paramversion=paramversion,
43ce978b
PG
1817 logger=self.consoleLogger)
1818
1819 # create first backup
1820 deltatar.create_full_backup(source_path="source_dir",
1821 backup_path="backup_dir")
1822
1823 assert os.path.exists("backup_dir")
1824 shutil.rmtree("source_dir")
1825
1826 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1827 tar_path = os.path.join("backup_dir", tar_filename)
1828
1829 # add symlinks to existing archive
1830
1831 def add_symlink (a, name, dst):
1832 l = tarfile.TarInfo("snapshot://%s" % name)
1833 l.type = tarfile.SYMTYPE
1834 l.linkname = dst
1835 a.addfile(l)
1836
1837 def add_file (a, name):
1838 f = tarfile.TarInfo("snapshot://%s" % name)
1839 f.type = tarfile.REGTYPE
1840 a.addfile(f)
43ce978b
PG
1841
1842 testpath = "symlinks/pernicious-link"
9b13f5c4
PG
1843 testdst = "/tmp/does/not/exist"
1844
5faea0e1
PG
1845 try:
1846 with tarfile.open(tar_path, mode="a") as a:
1847 add_symlink(a, testpath, testdst)
1848 add_symlink(a, testpath, testdst+"X")
1849 add_symlink(a, testpath, testdst+"XXX")
1850 add_file(a, testpath)
1851 except tarfile.ReadError as e:
1852 if self.MODE == '#' or self.MODE.endswith ("gz"):
1853 pass
1854 else:
1855 raise
1856 except ValueError as e:
1857 if self.MODE.startswith ('#'):
1858 pass # O_APPEND of concat archives not feasible
1859 else:
1860 raise
43ce978b 1861
43ce978b
PG
1862 deltatar.restore_backup(target_path="source_dir",
1863 backup_tar_path=tar_path)
1864
9b13f5c4
PG
1865 # check whether the link was extracted; deltatar seems to only ever
1866 # retrieve the first item it finds for a given path which in the case
1867 # at hand is a symlink to some non-existent path
43ce978b 1868 fullpath = os.path.join("source_dir", testpath)
9b13f5c4 1869 assert not os.path.exists(fullpath)
43ce978b 1870
da26094a
ERE
1871class DeltaTar2Test(DeltaTarTest):
1872 '''
1873 Same as DeltaTar but with specific ":" mode
1874 '''
1875 MODE = ':'
1876
1877
1878class DeltaTarStreamTest(DeltaTarTest):
1879 '''
1880 Same as DeltaTar but with specific uncompressed stream mode
1881 '''
1882 MODE = '|'
1883
1884
1885class DeltaTarGzipTest(DeltaTarTest):
1886 '''
1887 Same as DeltaTar but with specific gzip mode
1888 '''
1889 MODE = ':gz'
8ea0be50 1890 MODE_COMPRESSES = True
da26094a
ERE
1891
1892
da26094a
ERE
1893class DeltaTarGzipStreamTest(DeltaTarTest):
1894 '''
1895 Same as DeltaTar but with specific gzip stream mode
1896 '''
1897 MODE = '|gz'
8ea0be50 1898 MODE_COMPRESSES = True
da26094a
ERE
1899
1900
bd011242
CH
1901@skip('Bz2 tests are too slow..')
1902class DeltaTarBz2Test(DeltaTarTest):
1903 '''
1904 Same as DeltaTar but with specific bz2 mode
1905 '''
1906 MODE = ':bz2'
8ea0be50 1907 MODE_COMPRESSES = True
ea6d3c3e 1908
bd011242
CH
1909
1910@skip('Bz2 tests are too slow..')
1911class DeltaTarBz2StreamTest(DeltaTarTest):
1912 '''
1913 Same as DeltaTar but with specific bz2 stream mode
1914 '''
1915 MODE = '|bz2'
8ea0be50 1916 MODE_COMPRESSES = True
da26094a
ERE
1917
1918
1919class DeltaTarGzipConcatTest(DeltaTarTest):
1920 '''
1921 Same as DeltaTar but with specific gzip concat stream mode
1922 '''
1923 MODE = '#gz'
8ea0be50 1924 MODE_COMPRESSES = True
da26094a
ERE
1925
1926
da26094a
ERE
1927class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
1928 '''
1929 Same as DeltaTar but with specific gzip aes128 concat stream mode
1930 '''
f698c99c
PG
1931 MODE = '#gz'
1932 ENCRYPTION = ('some magic key', 1)
8ea0be50 1933 MODE_COMPRESSES = True
da26094a
ERE
1934
1935
ac5e4184
ERE
1936class DeltaTarAes128ConcatTest(DeltaTarTest):
1937 '''
1938 Same as DeltaTar but with specific aes128 concat stream mode
1939 '''
d1c38f40 1940 MODE = '#'
f698c99c 1941 ENCRYPTION = ('some magic key', 1)
ac5e4184
ERE
1942
1943