Merge branch 'tarfile-unlink'
[python-delta-tar] / testing / test_deltatar.py
CommitLineData
0708a374
ERE
1# Copyright (C) 2013 Intra2net AG
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU Lesser General Public License as published
5# by the Free Software Foundation; either version 3 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU Lesser General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program. If not, see
15# <http://www.gnu.org/licenses/lgpl-3.0.html>
16
17# Author: Eduardo Robles Elvira <edulix@wadobo.com>
18
f5d9144b 19import errno
0708a374 20import os
0d5c1970 21import re
cbac9f0b 22import random
e5c6ca04 23import shutil
0708a374 24import logging
b8fc2f5d
ERE
25import binascii
26import json
27from datetime import datetime
28from functools import partial
bd011242 29from unittest import skip, SkipTest
0708a374 30
83f5fd71 31import deltatar.tarfile as tarfile
f698c99c 32from deltatar.tarfile import TarFile
974408b5 33from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
f698c99c
PG
34from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35import deltatar.crypto as crypto
0708a374 36
0708a374
ERE
37from . import BaseTest
38from . import new_volume_handler
39
105c8e26
PG
40# Enable warning messages from deltatar. This minimizes the SNR of
41# test runs, but none of the messages are meaningful in any way.
42VERBOSE_TEST_OUTPUT = False
43
0708a374
ERE
44class DeltaTarTest(BaseTest):
45 """
46 Test backups
47 """
da26094a 48 MODE = ''
8ea0be50 49 MODE_COMPRESSES = False
da26094a 50
f698c99c 51 ENCRYPTION = None # (password : str, paramversion : int) option
da26094a 52
188b845d
TJ
53 GIT_DIR = '.git'
54
cc552d6e
PG
55 FSTEST = None
56 FSAPI_SAVED = []
57
0708a374
ERE
58 def setUp(self):
59 '''
60 Create base test data
61 '''
a4e8b8af 62 self.pwd = os.getcwd()
cbac9f0b 63 os.system('rm -rf target_dir source_dir* backup_dir* huge')
0708a374
ERE
64 os.makedirs('source_dir/test/test2')
65 self.hash = dict()
e5c6ca04 66 self.hash["source_dir/test/test2"] = ''
0708a374
ERE
67 self.hash["source_dir/big"] = self.create_file("source_dir/big", 50000)
68 self.hash["source_dir/small"] = self.create_file("source_dir/small", 100)
69 self.hash["source_dir/test/huge"] = self.create_file("source_dir/test/huge", 700000)
d5361dac 70 self.hash["source_dir/test/huge2"] = self.create_file("source_dir/test/huge2", 800000)
0708a374 71
105c8e26
PG
72 self.consoleLogger = None
73 if VERBOSE_TEST_OUTPUT is True:
74 self.consoleLogger = logging.StreamHandler()
75 self.consoleLogger.setLevel(logging.DEBUG)
0708a374 76
188b845d
TJ
77 if not os.path.isdir(self.GIT_DIR):
78 # Not running inside git tree, take our
79 # own testing directory as source.
80 self.GIT_DIR = 'testing'
81
82 if not os.path.isdir(self.GIT_DIR):
83 raise Exception('No input directory found: ' + self.GIT_DIR)
84
cc552d6e
PG
85 if self.FSTEST is not None:
86 self.FSTEST ()
87
0708a374
ERE
88 def tearDown(self):
89 '''
cc552d6e
PG
90 Remove temporary files created by unit tests and restore the API
91 functions in *os*.
0708a374 92 '''
cc552d6e
PG
93 for att, val in self.FSAPI_SAVED:
94 setattr (os, att, val)
a4e8b8af 95 os.chdir(self.pwd)
cbac9f0b 96 os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
cb7a3911
PG
97 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
98 ("I am fully aware that this will void my warranty.")
0708a374 99
b8fc2f5d 100 def test_restore_simple_full_backup(self):
0708a374
ERE
101 '''
102 Creates a full backup without any filtering and restores it.
103 '''
f698c99c
PG
104 password, paramversion = self.ENCRYPTION or (None, None)
105 deltatar = DeltaTar(mode=self.MODE, password=password,
106 crypto_paramversion=paramversion,
da26094a 107 logger=self.consoleLogger)
0708a374
ERE
108
109 # create first backup
110 deltatar.create_full_backup(
111 source_path="source_dir",
112 backup_path="backup_dir")
113
e5c6ca04
ERE
114 assert os.path.exists("backup_dir")
115 shutil.rmtree("source_dir")
116
117 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
118 tar_path = os.path.join("backup_dir", tar_filename)
119
e5c6ca04
ERE
120 deltatar.restore_backup(target_path="source_dir",
121 backup_tar_path=tar_path)
122
be60ffd0 123 for key, value in self.hash.items():
e5c6ca04
ERE
124 assert os.path.exists(key)
125 if value:
e82f14f5 126 assert value == self.md5sum(key)
da26094a 127
cb7a3911
PG
128
129 def test_create_backup_max_file_length (self):
130 """
131 Creates a full backup including one file that exceeds the (purposely
132 lowered) upper bound on GCM encrypted objects. This will yield multiple
133 encrypted objects for one plaintext file.
134
135 Success is verified by splitting the archive at object boundaries and
136 counting the parts.
137 """
138 if self.MODE_COMPRESSES is True:
139 raise SkipTest ("GCM file length test not meaningful with compression.")
140 if self.ENCRYPTION is None:
141 raise SkipTest ("GCM file length applies only to encrypted backups.")
142
143 new_max = 20000 # cannot be less than tar block size
144 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
145 ("I am fully aware that this will void my warranty.",
146 new_max)
147
148 password, paramversion = self.ENCRYPTION
149 deltatar = DeltaTar (mode=self.MODE, password=password,
150 crypto_paramversion=paramversion,
151 logger=self.consoleLogger)
152
153 self.hash = dict ()
154 os.makedirs ("source_dir2")
155 for f, s in [("empty" , 0) # 1 tar objects
156 ,("slightly_larger", new_max + 1) # 2
157 ,("twice" , 2 * new_max) # 3
158 ]:
159 f = "source_dir2/%s" % f
160 self.hash [f] = self.create_file (f, s)
161
162 deltatar.create_full_backup \
163 (source_path="source_dir2", backup_path="backup_dir")
164
165 assert os.path.exists ("backup_dir")
166 shutil.rmtree ("source_dir2")
167
168 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
169 backup_path = os.path.join("backup_dir", backup_filename)
170
171 # split the resulting archive into its constituents without
172 # decrypting
173 ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
174 "-o backup_dir/split <\'%s\'" % backup_path)
175
176 assert os.path.exists ("backup_dir/split")
177
178 dents = os.listdir ("backup_dir/split")
179 assert len (dents) == 6
180
181
182 def test_restore_backup_max_file_length (self):
183 """
184 Creates a full backup including one file that exceeds the (purposely
185 lowered) upper bound on GCM encrypted objects. This will yield two
186 encrypted objects for one plaintext file.
187
188 Success is verified by splitting the archive at object boundaries and
189 counting the parts.
190 """
191 if self.MODE_COMPRESSES is True:
192 raise SkipTest ("GCM file length test not meaningful with compression.")
193 if self.ENCRYPTION is None:
194 raise SkipTest ("GCM file length applies only to encrypted backups.")
195
196 new_max = 20000 # cannot be less than tar block size
197 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
198 ("I am fully aware that this will void my warranty.",
199 new_max)
200
201 password, paramversion = self.ENCRYPTION
202 deltatar = DeltaTar (mode=self.MODE, password=password,
203 crypto_paramversion=paramversion,
204 logger=self.consoleLogger)
205
206 self.hash = dict ()
207 os.makedirs ("source_dir2")
208 for f, s in [("empty" , 0) # 1 tar objects
ca520c21
TJ
209 ,("almost_large" , new_max - 1) # 2
210 ,("large" , new_max) # 3
211 ,("slightly_larger", new_max + 1) # 4
212 ,("twice" , 2 * new_max) # 5
213 ,("twice_plus_one" , (2 * new_max) + 1) # 6
cb7a3911
PG
214 ]:
215 f = "source_dir2/%s" % f
216 self.hash [f] = self.create_file (f, s)
217
218 deltatar.create_full_backup \
219 (source_path="source_dir2", backup_path="backup_dir")
220
221 assert os.path.exists ("backup_dir")
222 shutil.rmtree ("source_dir2")
223
224 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
225 backup_path = os.path.join("backup_dir", backup_filename)
226
227 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
228 tar_path = os.path.join("backup_dir", tar_filename)
229
230 deltatar.restore_backup(target_path="source_dir2",
231 backup_tar_path=tar_path)
232
233 for key, value in self.hash.items():
234 assert os.path.exists(key)
235 if value:
236 assert value == self.md5sum(key)
237
238
fac2cfe1
PG
239 def test_create_backup_index_max_file_length (self):
240 """
17b810c6
TJ
241 Creates a full backup with a too large index file for the upper bound
242 of the GCM encryption. Since the index file has a fixed IV file counter
243 of AES_GCM_IV_CNT_INDEX, we expect the crypto layer to abort.
fac2cfe1 244
17b810c6 245 60+ GB of (potentially compressed) index file should last for a while...
fac2cfe1
PG
246 """
247 if self.MODE_COMPRESSES is True:
248 raise SkipTest ("GCM file length test not meaningful with compression.")
249 if self.ENCRYPTION is None:
250 raise SkipTest ("GCM file length applies only to encrypted backups.")
251
252 new_max = 5000
253 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
254 ("I am fully aware that this will void my warranty.",
255 new_max)
256
257 password, paramversion = self.ENCRYPTION
258 deltatar = DeltaTar (mode=self.MODE, password=password,
259 crypto_paramversion=paramversion,
260 logger=self.consoleLogger)
261
262 self.hash = dict ()
263 os.makedirs ("source_dir2")
264 for i in range (42):
265 f = "source_dir2/dummy_%rd" % i
266 self.hash [f] = self.create_file (f, i)
267
1c2f7f07 268 with self.assertRaises (crypto.InvalidFileCounter):
fac2cfe1
PG
269 deltatar.create_full_backup \
270 (source_path="source_dir2", backup_path="backup_dir")
fac2cfe1
PG
271 shutil.rmtree ("source_dir2")
272
273
6c678f3a
ERE
274 def test_check_index_checksum(self):
275 '''
276 Creates a full backup and checks the index' checksum of files
277 '''
f698c99c
PG
278 password, paramversion = self.ENCRYPTION or (None, None)
279 deltatar = DeltaTar(mode=self.MODE, password=password,
280 crypto_paramversion=paramversion,
6c678f3a
ERE
281 logger=self.consoleLogger)
282
283 # create first backup
284 deltatar.create_full_backup(
285 source_path="source_dir",
286 backup_path="backup_dir")
287
288
289 index_filename = deltatar.index_name_func(True)
290 index_path = os.path.join("backup_dir", index_filename)
291
be60ffd0 292 f = open(index_path, 'rb')
6c678f3a
ERE
293 crc = None
294 checked = False
295 began_list = False
be60ffd0
ERE
296 while True:
297 l = f.readline()
298 if l == b'':
299 break
300 if b'BEGIN-FILE-LIST' in l:
c2ffe2ec 301 crc = binascii.crc32(l) & 0xFFFFffff
6c678f3a 302 began_list = True
be60ffd0 303 elif b'END-FILE-LIST' in l:
6c678f3a
ERE
304 crc = binascii.crc32(l, crc) & 0xffffffff
305
306 # next line contains the crc
be60ffd0 307 data = json.loads(f.readline().decode("UTF-8"))
6c678f3a
ERE
308 assert data['type'] == 'file-list-checksum'
309 assert data['checksum'] == crc
310 checked = True
311 break
312 elif began_list:
313 crc = binascii.crc32(l, crc) & 0xffffffff
c7609167 314 f.close()
6c678f3a 315
b8fc2f5d
ERE
316
317 def test_restore_multivol(self):
d5361dac 318 '''
b8fc2f5d
ERE
319 Creates a full backup without any filtering with multiple volumes and
320 restore it.
d5361dac 321 '''
ba7760a7
CH
322 if ':gz' in self.MODE:
323 raise SkipTest('compression information is lost when creating '
324 'multiple volumes with no Stream')
325
f698c99c
PG
326 password, paramversion = self.ENCRYPTION or (None, None)
327 deltatar = DeltaTar(mode=self.MODE, password=password,
328 crypto_paramversion=paramversion,
d5361dac
ERE
329 logger=self.consoleLogger)
330
b8fc2f5d
ERE
331 self.hash = dict()
332 os.makedirs('source_dir2')
333 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
334 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
335
d5361dac
ERE
336 # create first backup
337 deltatar.create_full_backup(
b8fc2f5d 338 source_path="source_dir2",
d5361dac
ERE
339 backup_path="backup_dir",
340 max_volume_size=1)
341
342 assert os.path.exists("backup_dir")
343 assert os.path.exists(os.path.join("backup_dir",
344 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
345 if self.MODE_COMPRESSES:
346 n_vols = 1
347 else:
348 n_vols = 2
349 for i_vol in range(n_vols):
350 assert os.path.exists(os.path.join("backup_dir",
351 deltatar.volume_name_func("backup_dir", True, i_vol)))
352 assert not os.path.exists(os.path.join("backup_dir",
353 deltatar.volume_name_func("backup_dir", True, n_vols)))
d5361dac 354
b8fc2f5d 355 shutil.rmtree("source_dir2")
d5361dac
ERE
356
357 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
358 tar_path = os.path.join("backup_dir", tar_filename)
359
360 # this should automatically restore all volumes
b8fc2f5d 361 deltatar.restore_backup(target_path="source_dir2",
d5361dac
ERE
362 backup_tar_path=tar_path)
363
be60ffd0 364 for key, value in self.hash.items():
d5361dac
ERE
365 assert os.path.exists(key)
366 if value:
367 assert value == self.md5sum(key)
368
14e2e92d
DGM
369 def test_restore_multivol_split(self):
370 '''
371 Creates a full backup without any filtering with multiple volumes
372 with big files bigger than the max volume size and
373 restore it.
374 '''
f61e1822 375 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
376 raise SkipTest('this test only works for uncompressed '
377 'or concat compressed modes')
f61e1822 378
f698c99c
PG
379 password, paramversion = self.ENCRYPTION or (None, None)
380 deltatar = DeltaTar(mode=self.MODE, password=password,
381 crypto_paramversion=paramversion,
14e2e92d
DGM
382 logger=self.consoleLogger)
383
14e2e92d
DGM
384 self.hash = dict()
385 os.makedirs('source_dir2')
82f75df4 386 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 3*1024*1024)
14e2e92d
DGM
387 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 4*1024*1024)
388 self.hash["source_dir2/huge2"] = self.create_file("source_dir2/huge2", 4*1024*1024)
389
390 # create first backup
391 deltatar.create_full_backup(
392 source_path="source_dir2",
393 backup_path="backup_dir",
394 max_volume_size=2)
395
396 assert os.path.exists("backup_dir")
397 assert os.path.exists(os.path.join("backup_dir",
398 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
399 if self.MODE_COMPRESSES:
400 n_vols = 1
401 else:
402 n_vols = 6
403 for i_vol in range(n_vols):
404 assert os.path.exists(os.path.join("backup_dir",
405 deltatar.volume_name_func("backup_dir", True, i_vol)))
406 assert not os.path.exists(os.path.join("backup_dir",
407 deltatar.volume_name_func("backup_dir", True, n_vols)))
14e2e92d
DGM
408
409 shutil.rmtree("source_dir2")
410
411 index_filename = deltatar.index_name_func(True)
412 index_path = os.path.join("backup_dir", index_filename)
413
414 deltatar.restore_backup(target_path="source_dir2",
415 backup_indexes_paths=[index_path])
416
417 for key, value in self.hash.items():
418 assert os.path.exists(key)
419 if value:
420 assert value == self.md5sum(key)
421
422
9eae9a1f
ERE
423 def test_full_backup_index_extra_data(self):
424 '''
425 Tests that the index file for a full backup can store extra_data and
426 that this data can be retrieved.
427 '''
f698c99c
PG
428 password, paramversion = self.ENCRYPTION or (None, None)
429 deltatar = DeltaTar(mode=self.MODE, password=password,
430 crypto_paramversion=paramversion,
9eae9a1f
ERE
431 logger=self.consoleLogger)
432
433 extra_data = dict(
434 hola="caracola",
435 otra_cosa=[1, "lista"],
436 y_otra=dict(bola=1.1)
437 )
438
439 deltatar.create_full_backup(
440 source_path="source_dir",
441 backup_path="backup_dir",
442 extra_data=extra_data)
443
444 index_filename = deltatar.index_name_func(is_full=True)
445 index_path = os.path.join("backup_dir", index_filename)
446
447 # iterate_index_path retrieves extra_data, and thus we can then compare
448 index_it = deltatar.iterate_index_path(index_path)
449 self.assertEqual(index_it.extra_data, extra_data)
450
451
452 def test_diff_backup_index_extra_data(self):
453 '''
454 Tests that the index file for a diff backup can store extra_data and
455 that this data can be retrieved.
456 '''
f698c99c
PG
457 password, paramversion = self.ENCRYPTION or (None, None)
458 deltatar = DeltaTar(mode=self.MODE, password=password,
459 crypto_paramversion=paramversion,
9eae9a1f
ERE
460 logger=self.consoleLogger)
461
462 extra_data = dict(
463 hola="caracola",
464 otra_cosa=[1, "lista"],
465 y_otra=dict(bola=1.1)
466 )
467 # do first backup
468 deltatar.create_full_backup(
469 source_path="source_dir",
470 backup_path="backup_dir")
471
472
473 prev_index_filename = deltatar.index_name_func(is_full=True)
474 prev_index_path = os.path.join("backup_dir", prev_index_filename)
475
476 # create empty diff backup
477 deltatar.create_diff_backup("source_dir", "backup_dir2",
478 prev_index_path, extra_data=extra_data)
479
480 index_filename = deltatar.index_name_func(is_full=False)
481 index_path = os.path.join("backup_dir2", index_filename)
482
483 # iterate_index_path retrieves extra_data, and thus we can then compare
484 index_it = deltatar.iterate_index_path(index_path)
485 self.assertEqual(index_it.extra_data, extra_data)
486
8825be52
DGM
487 def test_restore_multivol2(self):
488 '''
489 Creates a full backup without any filtering with multiple volumes and
490 restore it.
491 '''
f698c99c
PG
492 password, paramversion = self.ENCRYPTION or (None, None)
493 deltatar = DeltaTar(mode=self.MODE, password=password,
494 crypto_paramversion=paramversion,
8825be52
DGM
495 logger=self.consoleLogger)
496
188b845d 497 shutil.copytree(self.GIT_DIR, "source_dir2")
8825be52
DGM
498
499 # create first backup
500 deltatar.create_full_backup(
501 source_path="source_dir2",
502 backup_path="backup_dir",
503 max_volume_size=1)
504
505 assert os.path.exists("backup_dir")
506 assert os.path.exists(os.path.join("backup_dir",
507 deltatar.volume_name_func("backup_dir", True, 0)))
8825be52
DGM
508
509 shutil.rmtree("source_dir2")
510
511 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
512 tar_path = os.path.join("backup_dir", tar_filename)
513
514 # this should automatically restore all volumes
515 deltatar.restore_backup(target_path="source_dir2",
516 backup_tar_path=tar_path)
517
188b845d 518 self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
8825be52 519
b8fc2f5d
ERE
520 def test_restore_multivol_manual_from_index(self):
521 '''
522 Creates a full backup without any filtering with multiple volumes and
523 restore it.
524 '''
525 # this test only works for uncompressed or concat compressed modes
526 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
527 raise SkipTest('this test only works for uncompressed '
528 'or concat compressed modes')
b8fc2f5d 529
f698c99c
PG
530 password, paramversion = self.ENCRYPTION or (None, None)
531 deltatar = DeltaTar(mode=self.MODE, password=password,
532 crypto_paramversion=paramversion,
b8fc2f5d
ERE
533 logger=self.consoleLogger)
534
b8fc2f5d
ERE
535 self.hash = dict()
536 os.makedirs('source_dir2')
537 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
538 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
539
540 # create first backup
541 deltatar.create_full_backup(
542 source_path="source_dir2",
543 backup_path="backup_dir",
544 max_volume_size=1)
545
546 assert os.path.exists("backup_dir")
547 assert os.path.exists(os.path.join("backup_dir",
548 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
549 if self.MODE_COMPRESSES:
550 n_vols = 1
551 else:
552 n_vols = 2
553 for i_vol in range(n_vols):
554 assert os.path.exists(os.path.join("backup_dir",
555 deltatar.volume_name_func("backup_dir", True, i_vol)))
556 assert not os.path.exists(os.path.join("backup_dir",
557 deltatar.volume_name_func("backup_dir", True, n_vols)))
b8fc2f5d
ERE
558
559 shutil.rmtree("source_dir2")
560
561 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
562 tar_path = os.path.join("backup_dir", tar_filename)
563
564 index_filename = deltatar.index_name_func(True)
565 index_path = os.path.join("backup_dir", index_filename)
566
567 # this should automatically restore the huge file
9eccb1c2 568 f = deltatar.open_auxiliary_file(index_path, 'r')
10798176 569 offset = None
2967b3e1
ERE
570 while True:
571 l = f.readline()
572 if not len(l):
573 break
be60ffd0 574 data = json.loads(l.decode('UTF-8'))
8adbe50d 575 if data.get('type', '') == 'file' and\
eb6d0069 576 deltatar.unprefixed(data['path']) == "huge":
b8fc2f5d
ERE
577 offset = data['offset']
578 break
579
10798176
CH
580 assert offset is not None
581
be60ffd0 582 fo = open(tar_path, 'rb')
b8fc2f5d
ERE
583 fo.seek(offset)
584 def new_volume_handler(mode, tarobj, base_name, volume_number):
f698c99c
PG
585 suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
586 if self.ENCRYPTION is not None:
587 # deltatar module is shadowed here
588 suf += "." + deltatar_PDTCRYPT_EXTENSION
b8fc2f5d 589 tarobj.open_volume(datetime.now().strftime(
f698c99c 590 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
b8fc2f5d
ERE
591 new_volume_handler = partial(new_volume_handler, self.MODE)
592
f698c99c
PG
593 crypto_ctx = None
594 if self.ENCRYPTION is not None:
595 crypto_ctx = crypto.Decrypt (password)
596
b8fc2f5d 597 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
b8fc2f5d 598 new_volume_handler=new_volume_handler,
f698c99c
PG
599 encryption=crypto_ctx)
600
8adbe50d
ERE
601 member = tarobj.next()
602 member.path = deltatar.unprefixed(member.path)
603 member.name = deltatar.unprefixed(member.name)
604 tarobj.extract(member)
b8fc2f5d 605 tarobj.close()
c7609167 606 fo.close()
b8fc2f5d
ERE
607 assert self.hash['source_dir2/huge'] == self.md5sum('huge')
608
609 os.unlink("huge")
6c678f3a 610
f698c99c 611
9e092947
PG
612 def test_restore_manual_from_index_twice (self):
613 """
614 Creates a full backup and restore the same file twice. This *must* fail
615 when encryption is active.
616
617 Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
618 backwards within the same file. This prevents the encryption layer from
619 exploding due to a reused IV in an overall valid archive.
620
621 This test anticipates possible future mistakes since it’s entirely
622 feasible to implement backward seeks for *_Stream* with concat mode.
623 """
624 # this test only works for uncompressed or concat compressed modes
625 if self.MODE.startswith("|") or self.MODE_COMPRESSES:
626 raise SkipTest("this test only works for uncompressed "
627 "or concat compressed modes")
628
629 password, paramversion = self.ENCRYPTION or (None, None)
630 deltatar = DeltaTar(mode=self.MODE, password=password,
631 crypto_paramversion=paramversion,
632 logger=self.consoleLogger)
633
634 self.hash = dict()
635 os.makedirs("source_dir2")
636 self.hash["source_dir2/samefile"] = \
637 self.create_file("source_dir2/samefile", 1 * 1024)
638
639 # create first backup
640 deltatar.create_full_backup(
641 source_path="source_dir2",
642 backup_path="backup_dir")
643
644 assert os.path.exists("backup_dir")
645 assert os.path.exists(os.path.join("backup_dir",
646 deltatar.volume_name_func("backup_dir", True, 0)))
647
648 shutil.rmtree("source_dir2")
649
650 tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
651 tar_path = os.path.join("backup_dir", tar_filename)
652
653 index_filename = deltatar.index_name_func(True)
654 index_path = os.path.join("backup_dir", index_filename)
655
656 f = deltatar.open_auxiliary_file(index_path, "r")
657 offset = None
658 while True:
659 l = f.readline()
660 if not len(l):
661 break
662 data = json.loads(l.decode("UTF-8"))
663 if data.get("type", "") == "file" and\
664 deltatar.unprefixed(data["path"]) == "samefile":
665 offset = data["offset"]
666 break
667
668 assert offset is not None
669
670 fo = open(tar_path, "rb")
671 fo.seek(offset)
672
673 crypto_ctx = None
674 if self.ENCRYPTION is not None:
675 crypto_ctx = crypto.Decrypt (password)
676
677 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
678 encryption=crypto_ctx)
679 member = tarobj.next()
680 member.path = deltatar.unprefixed(member.path)
681 member.name = deltatar.unprefixed(member.name)
682
683 # extract once …
684 tarobj.extract(member)
685 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
686
687 # … and twice
688 try:
689 tarobj.extract(member)
690 except tarfile.StreamError:
691 if crypto_ctx is not None:
692 pass # good: seeking backwards not allowed
693 else:
694 raise
695 tarobj.close()
696 fo.close()
697 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
698
699 os.unlink("samefile")
700
701
11684b1d
ERE
702 def test_restore_from_index(self):
703 '''
55b8686d 704 Restores a full backup using an index file.
11684b1d 705 '''
11684b1d 706 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
707 raise SkipTest('this test only works for uncompressed '
708 'or concat compressed modes')
11684b1d 709
f698c99c
PG
710 password, paramversion = self.ENCRYPTION or (None, None)
711 deltatar = DeltaTar(mode=self.MODE, password=password,
712 crypto_paramversion=paramversion,
11684b1d
ERE
713 logger=self.consoleLogger)
714
715 # create first backup
716 deltatar.create_full_backup(
717 source_path="source_dir",
55b8686d 718 backup_path="backup_dir")
11684b1d
ERE
719
720 shutil.rmtree("source_dir")
721
722 # this should automatically restore all volumes
723 index_filename = deltatar.index_name_func(True)
724 index_path = os.path.join("backup_dir", index_filename)
725
726 deltatar.restore_backup(target_path="source_dir",
727 backup_indexes_paths=[index_path])
728
be60ffd0 729 for key, value in self.hash.items():
11684b1d
ERE
730 assert os.path.exists(key)
731 if value:
732 assert value == self.md5sum(key)
6c678f3a 733
b8fc2f5d
ERE
734 def test_restore_multivol_from_index(self):
735 '''
736 Restores a full multivolume backup using an index file.
737 '''
b8fc2f5d 738 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
739 raise SkipTest('this test only works for uncompressed '
740 'or concat compressed modes')
b8fc2f5d 741
f698c99c
PG
742 password, paramversion = self.ENCRYPTION or (None, None)
743 deltatar = DeltaTar(mode=self.MODE, password=password,
744 crypto_paramversion=paramversion,
b8fc2f5d
ERE
745 logger=self.consoleLogger)
746
747 # create first backup
748 deltatar.create_full_backup(
749 source_path="source_dir",
750 backup_path="backup_dir",
8390c925 751 max_volume_size=2)
b8fc2f5d
ERE
752
753 shutil.rmtree("source_dir")
754
755 # this should automatically restore all volumes
756 index_filename = deltatar.index_name_func(True)
757 index_path = os.path.join("backup_dir", index_filename)
758
759 deltatar.restore_backup(target_path="source_dir",
760 backup_indexes_paths=[index_path])
761
be60ffd0 762 for key, value in self.hash.items():
b8fc2f5d
ERE
763 assert os.path.exists(key)
764 if value:
765 assert value == self.md5sum(key)
da26094a 766
c1af2184
ERE
767 def test_create_basic_filtering(self):
768 '''
769 Tests create backup basic filtering.
770 '''
f698c99c
PG
771 password, paramversion = self.ENCRYPTION or (None, None)
772 deltatar = DeltaTar(mode=self.MODE, password=password,
773 crypto_paramversion=paramversion,
c1af2184 774 logger=self.consoleLogger,
eb6d0069
CH
775 included_files=["test", "small"],
776 excluded_files=["test/huge"])
c1af2184
ERE
777
778 # create first backup
779 deltatar.create_full_backup(
780 source_path="source_dir",
781 backup_path="backup_dir")
782
783 assert os.path.exists("backup_dir")
784 shutil.rmtree("source_dir")
785
786 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
787 tar_path = os.path.join("backup_dir", tar_filename)
788
789 deltatar.restore_backup(target_path="source_dir",
790 backup_tar_path=tar_path)
791
eb6d0069
CH
792 assert os.path.exists("source_dir/small")
793 assert os.path.exists("source_dir/test")
794 assert os.path.exists("source_dir/test/huge2")
795 assert os.path.exists("source_dir/test/test2")
c1af2184 796
eb6d0069
CH
797 assert not os.path.exists("source_dir/test/huge")
798 assert not os.path.exists("source_dir/big")
c1af2184 799
862b3726
ERE
800 def test_create_filter_func(self):
801 '''
802 Tests create backup basic filtering.
803 '''
804 visited_paths = []
805 def filter_func(visited_paths, path):
806 if path not in visited_paths:
807 visited_paths.append(path)
808 return True
809
810 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
811
812 password, paramversion = self.ENCRYPTION or (None, None)
813 deltatar = DeltaTar(mode=self.MODE, password=password,
814 crypto_paramversion=paramversion,
862b3726 815 logger=self.consoleLogger,
eb6d0069
CH
816 included_files=["test", "small"],
817 excluded_files=["test/huge"],
862b3726
ERE
818 filter_func=filter_func)
819
820 # create first backup
821 deltatar.create_full_backup(
822 source_path="source_dir",
823 backup_path="backup_dir")
824
825 assert os.path.exists("backup_dir")
826 shutil.rmtree("source_dir")
827
828 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
829 tar_path = os.path.join("backup_dir", tar_filename)
830
831 deltatar.restore_backup(target_path="source_dir",
832 backup_tar_path=tar_path)
9af328e2 833 assert set(visited_paths) == set([
eb6d0069
CH
834 'small',
835 'test',
836 'test/huge2',
837 'test/test2'
9af328e2 838 ])
862b3726
ERE
839
840 def test_create_filter_out_func(self):
841 '''
842 Tests create backup basic filtering.
843 '''
844 visited_paths = []
845 def filter_func(visited_paths, path):
846 '''
847 Filter out everything
848 '''
849 if path not in visited_paths:
850 visited_paths.append(path)
851 return False
852
853 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
854
855 password, paramversion = self.ENCRYPTION or (None, None)
856 deltatar = DeltaTar(mode=self.MODE, password=password,
857 crypto_paramversion=paramversion,
862b3726 858 logger=self.consoleLogger,
eb6d0069
CH
859 included_files=["test", "small"],
860 excluded_files=["test/huge"],
862b3726
ERE
861 filter_func=filter_func)
862
863 # create first backup
864 deltatar.create_full_backup(
865 source_path="source_dir",
866 backup_path="backup_dir")
867
868 assert os.path.exists("backup_dir")
869 shutil.rmtree("source_dir")
870
871 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
872 tar_path = os.path.join("backup_dir", tar_filename)
873
874 deltatar.restore_backup(target_path="source_dir",
875 backup_tar_path=tar_path)
9af328e2 876 assert set(visited_paths) == set([
eb6d0069
CH
877 'small',
878 'test'
9af328e2 879 ])
862b3726
ERE
880
881 # check that effectively no file was backed up
eb6d0069
CH
882 assert not os.path.exists("source_dir/small")
883 assert not os.path.exists("source_dir/big")
884 assert not os.path.exists("source_dir/test")
862b3726 885
9af328e2
ERE
886 def test_restore_index_basic_filtering(self):
887 '''
888 Creates a backup, and then filter when doing the index based restore.
889 '''
f391d8e9 890 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
891 raise SkipTest('this test only works for uncompressed '
892 'or concat compressed modes')
f391d8e9 893
f698c99c
PG
894 password, paramversion = self.ENCRYPTION or (None, None)
895 deltatar = DeltaTar(mode=self.MODE, password=password,
896 crypto_paramversion=paramversion,
9af328e2
ERE
897 logger=self.consoleLogger)
898
899 # create first backup
900 deltatar.create_full_backup(
901 source_path="source_dir",
902 backup_path="backup_dir")
903
904 assert os.path.exists("backup_dir")
905 shutil.rmtree("source_dir")
906
907 index_filename = deltatar.index_name_func(True)
908 index_path = os.path.join("backup_dir", index_filename)
909
eb6d0069
CH
910 deltatar.included_files = ["test", "small"]
911 deltatar.excluded_files = ["test/huge"]
9af328e2
ERE
912 deltatar.restore_backup(target_path="source_dir",
913 backup_indexes_paths=[index_path])
914
eb6d0069
CH
915 assert os.path.exists("source_dir/small")
916 assert os.path.exists("source_dir/test")
917 assert os.path.exists("source_dir/test/huge2")
918 assert os.path.exists("source_dir/test/test2")
9af328e2 919
eb6d0069
CH
920 assert not os.path.exists("source_dir/test/huge")
921 assert not os.path.exists("source_dir/big")
9af328e2
ERE
922
923 def test_restore_index_filter_func(self):
924 '''
925 Creates a backup, and then filter when doing the index based restore,
926 using the filter function.
927 '''
f391d8e9 928 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
929 raise SkipTest('this test only works for uncompressed '
930 'or concat compressed modes')
f391d8e9 931
9af328e2
ERE
932 visited_paths = []
933 def filter_func(visited_paths, path):
934 if path not in visited_paths:
935 visited_paths.append(path)
936 return True
937
938 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
939
940 password, paramversion = self.ENCRYPTION or (None, None)
941 deltatar = DeltaTar(mode=self.MODE, password=password,
942 crypto_paramversion=paramversion,
9af328e2
ERE
943 logger=self.consoleLogger)
944
945 # create first backup
946 deltatar.create_full_backup(
947 source_path="source_dir",
948 backup_path="backup_dir")
949
950 assert os.path.exists("backup_dir")
951 shutil.rmtree("source_dir")
952
953 index_filename = deltatar.index_name_func(True)
954 index_path = os.path.join("backup_dir", index_filename)
955
eb6d0069
CH
956 deltatar.included_files = ["test", "small"]
957 deltatar.excluded_files = ["test/huge"]
9af328e2
ERE
958 deltatar.filter_func = filter_func
959 deltatar.restore_backup(target_path="source_dir",
960 backup_indexes_paths=[index_path])
961
962 assert set(visited_paths) == set([
eb6d0069
CH
963 'small',
964 'test',
965 'test/huge2',
966 'test/test2'
9af328e2
ERE
967 ])
968
e5f5681b
ERE
969 def test_restore_tar_basic_filtering(self):
970 '''
971 Creates a backup, and then filter when doing the tar based restore.
972 '''
f698c99c
PG
973 password, paramversion = self.ENCRYPTION or (None, None)
974 deltatar = DeltaTar(mode=self.MODE, password=password,
975 crypto_paramversion=paramversion,
e5f5681b
ERE
976 logger=self.consoleLogger)
977
978 # create first backup
979 deltatar.create_full_backup(
980 source_path="source_dir",
981 backup_path="backup_dir")
982
983 assert os.path.exists("backup_dir")
984 shutil.rmtree("source_dir")
985
eb6d0069
CH
986 deltatar.included_files = ["test", "small"]
987 deltatar.excluded_files = ["test/huge"]
e5f5681b
ERE
988
989 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
990 tar_path = os.path.join("backup_dir", tar_filename)
991
992 deltatar.restore_backup(target_path="source_dir",
993 backup_tar_path=tar_path)
994
eb6d0069
CH
995 assert os.path.exists("source_dir/small")
996 assert os.path.exists("source_dir/test")
997 assert os.path.exists("source_dir/test/huge2")
998 assert os.path.exists("source_dir/test/test2")
e5f5681b 999
eb6d0069
CH
1000 assert not os.path.exists("source_dir/test/huge")
1001 assert not os.path.exists("source_dir/big")
e5f5681b
ERE
1002
1003 def test_restore_tar_filter_func(self):
1004 '''
1005 Creates a backup, and then filter when doing the tar based restore,
1006 using the filter function.
1007 '''
1008 visited_paths = []
1009 def filter_func(visited_paths, path):
1010 if path not in visited_paths:
1011 visited_paths.append(path)
1012 return True
1013
1014 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
1015
1016 password, paramversion = self.ENCRYPTION or (None, None)
1017 deltatar = DeltaTar(mode=self.MODE, password=password,
1018 crypto_paramversion=paramversion,
e5f5681b
ERE
1019 logger=self.consoleLogger)
1020
1021 # create first backup
1022 deltatar.create_full_backup(
1023 source_path="source_dir",
1024 backup_path="backup_dir")
1025
1026 assert os.path.exists("backup_dir")
1027 shutil.rmtree("source_dir")
1028
1029 index_filename = deltatar.index_name_func(True)
1030 index_path = os.path.join("backup_dir", index_filename)
1031
eb6d0069
CH
1032 deltatar.included_files = ["test", "small"]
1033 deltatar.excluded_files = ["test/huge"]
e5f5681b
ERE
1034 deltatar.filter_func = filter_func
1035
1036 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1037 tar_path = os.path.join("backup_dir", tar_filename)
1038
1039 deltatar.restore_backup(target_path="source_dir",
1040 backup_tar_path=tar_path)
1041 assert set(visited_paths) == set([
eb6d0069
CH
1042 'small',
1043 'test',
1044 'test/huge2',
1045 'test/test2'
e5f5681b
ERE
1046 ])
1047
974408b5 1048 def test_filter_path_regexp(self):
0d5c1970
ERE
1049 '''
1050 Test specifically the deltatar.filter_path function with regular
1051 expressions
1052 '''
1053 included_files = [
eb6d0069
CH
1054 re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1055 re.compile('^yes$'),
1056 'testing'
0d5c1970
ERE
1057 ]
1058 excluded_files = [
eb6d0069 1059 re.compile('^testing/in_the'),
0d5c1970
ERE
1060 ]
1061 deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1062 excluded_files=excluded_files)
1063
1064 # assert valid and invalid paths
eb6d0069
CH
1065 assert deltatar.filter_path('test/hola')
1066 assert deltatar.filter_path('test/hola/any/thing')
1067 assert deltatar.filter_path('test/caracola/caracolero')
1068 assert deltatar.filter_path('test/caracola/caracolero/yeah')
1069 assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1070 assert deltatar.filter_path('yes')
1071 assert deltatar.filter_path('testing')
1072 assert deltatar.filter_path('testing/yes')
1073 assert deltatar.filter_path('testing/in_th')
1074
1075 assert not deltatar.filter_path('something')
1076 assert not deltatar.filter_path('other/thing')
1077 assert not deltatar.filter_path('test_ing')
1078 assert not deltatar.filter_path('test/hola_lala')
1079 assert not deltatar.filter_path('test/agur')
1080 assert not deltatar.filter_path('testing_something')
1081 assert not deltatar.filter_path('yeso')
1082 assert not deltatar.filter_path('yes/o')
1083 assert not deltatar.filter_path('yes_o')
1084 assert not deltatar.filter_path('testing/in_the')
1085 assert not deltatar.filter_path('testing/in_the_field')
1086 assert not deltatar.filter_path('testing/in_the/field')
e5f5681b 1087
974408b5
ERE
1088 def test_filter_path_parent(self):
1089 '''
1090 Test specifically the deltatar.filter_path function for parent matching
1091 '''
1092 included_files = [
eb6d0069 1093 'testing/path/to/some/thing'
974408b5
ERE
1094 ]
1095 deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1096
1097 # assert valid and invalid paths
eb6d0069
CH
1098 assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1099 assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1100 assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1101 assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1102 assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1103 assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1104 assert deltatar.filter_path('testing/something/else') == NO_MATCH
974408b5
ERE
1105
1106 def test_parent_matching_simple_full_backup(self):
1107 '''
1108 Create a full backup using parent matching
1109 '''
1110 included_files = [
eb6d0069 1111 'test/huge2'
974408b5 1112 ]
f698c99c
PG
1113
1114 password, paramversion = self.ENCRYPTION or (None, None)
1115 deltatar = DeltaTar(mode=self.MODE, password=password,
1116 crypto_paramversion=paramversion,
974408b5
ERE
1117 logger=self.consoleLogger,
1118 included_files=included_files)
1119
1120 # create first backup
1121 deltatar.create_full_backup(
1122 source_path="source_dir",
1123 backup_path="backup_dir")
1124
1125 assert os.path.exists("backup_dir")
1126 shutil.rmtree("source_dir")
1127
1128 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1129 tar_path = os.path.join("backup_dir", tar_filename)
1130
f698c99c 1131 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1132 logger=self.consoleLogger)
1133 deltatar.restore_backup(target_path="source_dir",
1134 backup_tar_path=tar_path)
1135
1136 assert os.path.exists('source_dir/test/huge2')
1137 assert os.path.exists('source_dir/test/')
1138 assert not os.path.exists('source_dir/test/huge')
1139 assert not os.path.exists('source_dir/big')
1140 assert not os.path.exists('source_dir/small')
1141
1142 def test_parent_matching_simple_full_backup_restore(self):
1143 '''
1144 Create a full backup and restores it using parent matching
1145 '''
1146 included_files = [
eb6d0069 1147 'test/huge2'
974408b5 1148 ]
f698c99c
PG
1149
1150 password, paramversion = self.ENCRYPTION or (None, None)
1151 deltatar = DeltaTar(mode=self.MODE, password=password,
1152 crypto_paramversion=paramversion,
974408b5
ERE
1153 logger=self.consoleLogger)
1154
1155 # create first backup
1156 deltatar.create_full_backup(
1157 source_path="source_dir",
1158 backup_path="backup_dir")
1159
1160 assert os.path.exists("backup_dir")
1161 shutil.rmtree("source_dir")
1162
1163 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1164 tar_path = os.path.join("backup_dir", tar_filename)
1165
f698c99c 1166 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1167 logger=self.consoleLogger,
1168 included_files=included_files)
1169 deltatar.restore_backup(target_path="source_dir",
1170 backup_tar_path=tar_path)
1171
1172 assert os.path.exists('source_dir/test/huge2')
1173 assert os.path.exists('source_dir/test/')
1174 assert not os.path.exists('source_dir/test/huge')
1175 assert not os.path.exists('source_dir/big')
1176 assert not os.path.exists('source_dir/small')
1177
1178 def test_parent_matching_index_full_backup_restore(self):
1179 '''
1180 Create a full backup and restores it using parent matching
1181 '''
1182 included_files = [
eb6d0069 1183 'test/huge2'
974408b5 1184 ]
f698c99c
PG
1185
1186 password, paramversion = self.ENCRYPTION or (None, None)
1187 deltatar = DeltaTar(mode=self.MODE, password=password,
1188 crypto_paramversion=paramversion,
974408b5
ERE
1189 logger=self.consoleLogger)
1190
1191 # create first backup
1192 deltatar.create_full_backup(
1193 source_path="source_dir",
1194 backup_path="backup_dir")
1195
1196 assert os.path.exists("backup_dir")
1197 shutil.rmtree("source_dir")
1198
1199 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1200 tar_path = os.path.join("backup_dir", tar_filename)
1201
f698c99c 1202 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1203 logger=self.consoleLogger,
1204 included_files=included_files)
1205 deltatar.restore_backup(target_path="source_dir",
1206 backup_tar_path=tar_path)
1207
1208 assert os.path.exists('source_dir/test/huge2')
1209 assert os.path.exists('source_dir/test/')
1210 assert not os.path.exists('source_dir/test/huge')
1211 assert not os.path.exists('source_dir/big')
1212 assert not os.path.exists('source_dir/small')
1213
d07c8065
ERE
1214 def test_collate_iterators(self):
1215 '''
1216 Tests the collate iterators functionality with two exact directories,
1217 using an index iterator from a backup and the exact same source dir.
1218 '''
f698c99c
PG
1219 password, paramversion = self.ENCRYPTION or (None, None)
1220 deltatar = DeltaTar(mode=self.MODE, password=password,
1221 crypto_paramversion=paramversion,
d07c8065
ERE
1222 logger=self.consoleLogger)
1223
1224 # create first backup
1225 deltatar.create_full_backup(
1226 source_path="source_dir",
1227 backup_path="backup_dir")
1228
1229 assert os.path.exists("backup_dir")
1230
1231 cwd = os.getcwd()
1232 index_filename = deltatar.index_name_func(is_full=True)
1233 index_path = os.path.join(cwd, "backup_dir", index_filename)
1234 index_it = deltatar.iterate_index_path(index_path)
1235
1236 os.chdir('source_dir')
1237 dir_it = deltatar._recursive_walk_dir('.')
1238 path_it = deltatar.jsonize_path_iterator(dir_it)
1239
1240 try:
ea6d3c3e 1241 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
d07c8065
ERE
1242 assert deltatar._equal_stat_dicts(path1, path2)
1243 finally:
1244 os.chdir(cwd)
1245
1246 def test_collate_iterators_diffdirs(self):
1247 '''
1248 Use the collate iterators functionality with two different directories.
1249 It must behave in an expected way.
1250 '''
1251 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1252
f698c99c
PG
1253 password, paramversion = self.ENCRYPTION or (None, None)
1254 deltatar = DeltaTar(mode=self.MODE, password=password,
1255 crypto_paramversion=paramversion,
d07c8065
ERE
1256 logger=self.consoleLogger)
1257
1258 # create first backup
1259 deltatar.create_full_backup(
1260 source_path="source_dir",
1261 backup_path="backup_dir")
1262
1263 assert os.path.exists("backup_dir")
1264 self.hash["source_dir/z"] = self.create_file("source_dir/z", 100)
1265
1266 cwd = os.getcwd()
1267 index_filename = deltatar.index_name_func(is_full=True)
1268 index_path = os.path.join(cwd, "backup_dir", index_filename)
1269 index_it = deltatar.iterate_index_path(index_path)
1270
1271 os.chdir('source_dir')
1272 dir_it = deltatar._recursive_walk_dir('.')
1273 path_it = deltatar.jsonize_path_iterator(dir_it)
1274
1275 try:
ea6d3c3e 1276 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
eb6d0069 1277 if path2['path'] == 'z':
d07c8065
ERE
1278 assert not path1
1279 else:
1280 assert deltatar._equal_stat_dicts(path1, path2)
1281 finally:
1282 os.chdir(cwd)
1283
42d39ca7 1284 def test_collate_iterators_diffdirs2(self):
aae127d0 1285 '''
42d39ca7
ERE
1286 Use the collate iterators functionality with two different directories.
1287 It must behave in an expected way.
aae127d0 1288 '''
f698c99c
PG
1289 password, paramversion = self.ENCRYPTION or (None, None)
1290 deltatar = DeltaTar(mode=self.MODE, password=password,
1291 crypto_paramversion=paramversion,
42d39ca7
ERE
1292 logger=self.consoleLogger)
1293
1294 # create first backup
1295 deltatar.create_full_backup(
1296 source_path="source_dir",
1297 backup_path="backup_dir")
1298
1299 assert os.path.exists("backup_dir")
1300
1301 # add some new files and directories
1302 os.makedirs('source_dir/bigdir')
1303 self.hash["source_dir/bigdir"] = ""
1304 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1305 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
aae127d0
ERE
1306 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1307
42d39ca7
ERE
1308 cwd = os.getcwd()
1309 index_filename = deltatar.index_name_func(is_full=True)
1310 index_path = os.path.join(cwd, "backup_dir", index_filename)
1311 index_it = deltatar.iterate_index_path(index_path)
1312
1313 os.chdir('source_dir')
1314 dir_it = deltatar._recursive_walk_dir('.')
1315 path_it = deltatar.jsonize_path_iterator(dir_it)
1316
1317 visited_pairs = []
1318
1319 try:
ea6d3c3e 1320 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
42d39ca7
ERE
1321 visited_pairs.append(
1322 (deltatar.unprefixed(path1['path']) if path1 else None,
1323 path2['path'] if path2 else None)
1324 )
1325 finally:
1326 assert visited_pairs == [
eb6d0069
CH
1327 (u'big', u'big'),
1328 (None, u'bigdir'),
1329 (u'small', u'small'),
1330 (u'test', u'test'),
1331 (None, u'zzzz'),
1332 (None, u'bigdir/a'),
1333 (None, u'bigdir/b'),
1334 (u'test/huge', u'test/huge'),
1335 (u'test/huge2', u'test/huge2'),
1336 (u'test/test2', u'test/test2'),
42d39ca7
ERE
1337 ]
1338 os.chdir(cwd)
1339
1340 def test_create_empty_diff_backup(self):
1341 '''
1342 Creates an empty (no changes) backup diff
1343 '''
f698c99c
PG
1344 password, paramversion = self.ENCRYPTION or (None, None)
1345 deltatar = DeltaTar(mode=self.MODE, password=password,
1346 crypto_paramversion=paramversion,
aae127d0
ERE
1347 logger=self.consoleLogger)
1348
1349 # create first backup
1350 deltatar.create_full_backup(
1351 source_path="source_dir",
1352 backup_path="backup_dir")
1353
1354 prev_index_filename = deltatar.index_name_func(is_full=True)
1355 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1356
1357 deltatar.create_diff_backup("source_dir", "backup_dir2",
1358 prev_index_path)
1359
1360 # check index items
df86af81
ERE
1361 index_path = os.path.join("backup_dir2",
1362 deltatar.index_name_func(is_full=False))
aae127d0 1363 index_it = deltatar.iterate_index_path(index_path)
82de3376 1364 n = 0
aae127d0 1365 for i in index_it:
82de3376 1366 n += 1
aae127d0 1367 assert i[0]['path'].startswith("list://")
9af328e2 1368
ea6d3c3e 1369 assert n == 6
8adbe50d
ERE
1370
1371 # check the tar file
82de3376
ERE
1372 assert os.path.exists("backup_dir2")
1373 shutil.rmtree("source_dir")
1374
df86af81
ERE
1375 tar_filename = deltatar.volume_name_func('backup_dir2',
1376 is_full=False, volume_number=0)
82de3376
ERE
1377 tar_path = os.path.join("backup_dir2", tar_filename)
1378
1379 # no file restored, because the diff was empty
1380 deltatar.restore_backup(target_path="source_dir",
1381 backup_tar_path=tar_path)
1382 assert len(os.listdir("source_dir")) == 0
1383
8adbe50d 1384
42d39ca7
ERE
1385 def test_create_diff_backup1(self):
1386 '''
1387 Creates a diff backup when there are new files
1388 '''
f698c99c
PG
1389 password, paramversion = self.ENCRYPTION or (None, None)
1390 deltatar = DeltaTar(mode=self.MODE, password=password,
1391 crypto_paramversion=paramversion,
42d39ca7
ERE
1392 logger=self.consoleLogger)
1393
1394 # create first backup
1395 deltatar.create_full_backup(
1396 source_path="source_dir",
1397 backup_path="backup_dir")
1398
1399 prev_index_filename = deltatar.index_name_func(is_full=True)
1400 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1401
1402 # add some new files and directories
1403 os.makedirs('source_dir/bigdir')
1404 self.hash["source_dir/bigdir"] = ""
0519f161 1405 os.unlink("source_dir/small")
42d39ca7
ERE
1406 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1407 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1408 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1409
1410 deltatar.create_diff_backup("source_dir", "backup_dir2",
1411 prev_index_path)
1412
1413 # check index items
df86af81 1414 index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
42d39ca7 1415 index_it = deltatar.iterate_index_path(index_path)
df86af81
ERE
1416 l = [i[0]['path'] for i in index_it]
1417
1418 assert l == [
eb6d0069
CH
1419 'list://big',
1420 'snapshot://bigdir',
1421 'delete://small',
1422 'list://test',
1423 'snapshot://zzzz',
1424 'snapshot://bigdir/a',
1425 'snapshot://bigdir/b',
1426 'list://test/huge',
1427 'list://test/huge2',
1428 'list://test/test2',
df86af81 1429 ]
42d39ca7
ERE
1430
1431 # check the tar file
1432 assert os.path.exists("backup_dir2")
1433 shutil.rmtree("source_dir")
1434
0519f161
ERE
1435 # create source_dir with the small file, that will be then deleted by
1436 # the restore_backup
1437 os.mkdir("source_dir")
be60ffd0 1438 open("source_dir/small", 'wb').close()
0519f161 1439
df86af81
ERE
1440 tar_filename = deltatar.volume_name_func('backup_dir2',
1441 is_full=False, volume_number=0)
42d39ca7
ERE
1442 tar_path = os.path.join("backup_dir2", tar_filename)
1443
1444 # restore the backup, this will create only the new files
1445 deltatar.restore_backup(target_path="source_dir",
1446 backup_tar_path=tar_path)
a345b1c9
DGM
1447 # the order doesn't matter
1448 assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
42d39ca7 1449
df99a044
ERE
1450 def test_restore_from_index_diff_backup(self):
1451 '''
1452 Creates a full backup, modifies some files, creates a diff backup,
1453 then restores the diff backup from zero.
1454 '''
df99a044 1455 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1456 raise SkipTest('this test only works for uncompressed '
1457 'or concat compressed modes')
df99a044 1458
f698c99c
PG
1459 password, paramversion = self.ENCRYPTION or (None, None)
1460 deltatar = DeltaTar(mode=self.MODE, password=password,
1461 crypto_paramversion=paramversion,
df99a044
ERE
1462 logger=self.consoleLogger)
1463
1464 # create first backup
1465 deltatar.create_full_backup(
1466 source_path="source_dir",
1467 backup_path="backup_dir")
1468
1469 prev_index_filename = deltatar.index_name_func(is_full=True)
1470 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1471
1472 # add some new files and directories
1473 os.makedirs('source_dir/bigdir')
1474 self.hash["source_dir/bigdir"] = ""
1475 os.unlink("source_dir/small")
1476 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1477 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1478 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1479
1480 deltatar.create_diff_backup("source_dir", "backup_dir2",
1481 prev_index_path)
1482
1483 # apply diff backup in target_dir
df86af81 1484 index_filename = deltatar.index_name_func(is_full=False)
df99a044
ERE
1485 index_path = os.path.join("backup_dir2", index_filename)
1486 deltatar.restore_backup("target_dir",
1487 backup_indexes_paths=[index_path, prev_index_path])
1488
1489 # then compare the two directories source_dir and target_dir and check
1490 # they are the same
cbac9f0b 1491 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
df99a044
ERE
1492
1493 def test_restore_from_index_diff_backup2(self):
1494 '''
1495 Creates a full backup, modifies some files, creates a diff backup,
1496 then restores the diff backup with the full backup as a starting point.
1497 '''
df99a044 1498 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1499 raise SkipTest('this test only works for uncompressed '
1500 'or concat compressed modes')
df99a044 1501
f698c99c
PG
1502 password, paramversion = self.ENCRYPTION or (None, None)
1503 deltatar = DeltaTar(mode=self.MODE, password=password,
1504 crypto_paramversion=paramversion,
df99a044
ERE
1505 logger=self.consoleLogger)
1506
1507 # create first backup
1508 deltatar.create_full_backup(
1509 source_path="source_dir",
1510 backup_path="backup_dir")
1511
1512 prev_index_filename = deltatar.index_name_func(is_full=True)
1513 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1514
1515 # add some new files and directories
1516 os.makedirs('source_dir/bigdir')
1517 self.hash["source_dir/bigdir"] = ""
1518 os.unlink("source_dir/small")
1519 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1520 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1521 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
d86735e4 1522 shutil.rmtree("source_dir/test")
df99a044
ERE
1523
1524 deltatar.create_diff_backup("source_dir", "backup_dir2",
1525 prev_index_path)
1526
1527 # first restore initial backup in target_dir
df86af81 1528 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
df99a044
ERE
1529 tar_path = os.path.join("backup_dir", tar_filename)
1530 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1531
1532 # then apply diff backup in target_dir
df86af81 1533 index_filename = deltatar.index_name_func(is_full=False)
df99a044 1534 index_path = os.path.join("backup_dir2", index_filename)
df99a044 1535
a20c6239
PG
1536 try:
1537 deltatar.restore_backup("target_dir",
1538 backup_indexes_paths=[index_path, prev_index_path])
1539
1540 # then compare the two directories source_dir and target_dir and check
1541 # they are the same
1542 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1543 except FileNotFoundError as exn:
1544 if self.FSTEST is None:
1545 # fs traversal may fail here
1546 raise exn
cbac9f0b
ERE
1547
1548 def test_restore_from_index_diff_backup3(self):
1549 '''
188b845d 1550 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
cbac9f0b
ERE
1551 diff backup, then restores the diff backup with the full backup as a
1552 starting point.
1553 '''
cbac9f0b 1554 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1555 raise SkipTest('this test only works for uncompressed '
1556 'or concat compressed modes')
cbac9f0b 1557
f698c99c
PG
1558 password, paramversion = self.ENCRYPTION or (None, None)
1559 deltatar = DeltaTar(mode=self.MODE, password=password,
1560 crypto_paramversion=paramversion,
cbac9f0b
ERE
1561 logger=self.consoleLogger)
1562
1563 shutil.rmtree("source_dir")
188b845d
TJ
1564 shutil.copytree(self.GIT_DIR, "source_dir")
1565 shutil.copytree(self.GIT_DIR, "source_dir_diff")
cbac9f0b
ERE
1566
1567 # create first backup
1568 deltatar.create_full_backup(
1569 source_path="source_dir",
1570 backup_path="backup_dir")
1571
1572 prev_index_filename = deltatar.index_name_func(is_full=True)
1573 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1574
1575 # alter the source_dir randomly
1576 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1577
1578 for path in source_it:
1579 # if path doesn't exist (might have previously removed) ignore it.
1580 # also ignore it (i.e. do not change it) 70% of the time
1581 if not os.path.exists(path) or random.random() < 0.7:
1582 continue
1583
1584 # remove the file
1585 if os.path.isdir(path):
1586 shutil.rmtree(path)
1587 else:
1588 os.unlink(path)
1589
1215b602 1590 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
cbac9f0b 1591 prev_index_path)
cbac9f0b
ERE
1592
1593 # first restore initial backup in target_dir
df86af81 1594 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
cbac9f0b
ERE
1595 tar_path = os.path.join("backup_dir", tar_filename)
1596 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1597
1598 # and check that target_dir equals to source_dir (which is the same as
188b845d 1599 # self.GIT_DIR initially)
cbac9f0b
ERE
1600 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1601
1602 # then apply diff backup in target_dir
df86af81 1603 index_filename = deltatar.index_name_func(is_full=False)
8825be52
DGM
1604 index_path = os.path.join("backup_dir2", index_filename)
1605 deltatar.restore_backup("target_dir",
1606 backup_indexes_paths=[index_path, prev_index_path])
1607
1608 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1609 # altered self.GIT_DIR directory)
8825be52
DGM
1610 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1611
1612 # then delete target_dir and apply diff backup from zero and check again
1613 shutil.rmtree("target_dir")
1614 deltatar.restore_backup("target_dir",
1615 backup_indexes_paths=[index_path, prev_index_path])
1616
1617 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1618 # altered self.GIT_DIR directory)
8825be52
DGM
1619 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1620
1621 def test_restore_from_index_diff_backup3_multivol(self):
1622 '''
188b845d 1623 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
8825be52
DGM
1624 diff backup, then restores the diff backup with the full backup as a
1625 starting point.
1626 '''
8825be52 1627 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1628 raise SkipTest('this test only works for uncompressed '
1629 'or concat compressed modes')
8825be52 1630
f698c99c
PG
1631 password, paramversion = self.ENCRYPTION or (None, None)
1632 deltatar = DeltaTar(mode=self.MODE, password=password,
1633 crypto_paramversion=paramversion,
8825be52
DGM
1634 logger=self.consoleLogger)
1635
1636 shutil.rmtree("source_dir")
188b845d
TJ
1637 shutil.copytree(self.GIT_DIR, "source_dir")
1638 shutil.copytree(self.GIT_DIR, "source_dir_diff")
8825be52
DGM
1639
1640 # create first backup
1641 deltatar.create_full_backup(
1642 source_path="source_dir",
1643 backup_path="backup_dir",
1644 max_volume_size=1)
1645
1646 prev_index_filename = deltatar.index_name_func(is_full=True)
1647 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1648
1649 # alter the source_dir randomly
1650 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1651
1652 for path in source_it:
1653 # if path doesn't exist (might have previously removed) ignore it.
1654 # also ignore it (i.e. do not change it) 70% of the time
1655 if not os.path.exists(path) or random.random() < 0.7:
1656 continue
1657
1658 # remove the file
1659 if os.path.isdir(path):
1660 shutil.rmtree(path)
1661 else:
1662 os.unlink(path)
1663
1664 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1665 prev_index_path, max_volume_size=1)
1666
1667 # first restore initial backup in target_dir
1668 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1669 tar_path = os.path.join("backup_dir", tar_filename)
a20c6239
PG
1670 if self.FSTEST is not None:
1671 return # the below will fail in stat checks, but that is expected
8825be52
DGM
1672 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1673
1674 # and check that target_dir equals to source_dir (which is the same as
188b845d 1675 # self.GIT_DIR initially)
8825be52
DGM
1676 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1677
1678 # then apply diff backup in target_dir
df86af81 1679 index_filename = deltatar.index_name_func(is_full=False)
cbac9f0b
ERE
1680 index_path = os.path.join("backup_dir2", index_filename)
1681 deltatar.restore_backup("target_dir",
1682 backup_indexes_paths=[index_path, prev_index_path])
1683
1684 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1685 # altered self.GIT_DIR directory)
cbac9f0b
ERE
1686 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1687
1688 # then delete target_dir and apply diff backup from zero and check again
1689 shutil.rmtree("target_dir")
1690 deltatar.restore_backup("target_dir",
1691 backup_indexes_paths=[index_path, prev_index_path])
1692
1693 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1694 # altered self.GIT_DIR directory)
cbac9f0b
ERE
1695 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1696
1697 def check_equal_dirs(self, path1, path2, deltatar):
1698 '''
1699 compare the two directories source_dir and target_dir and check
1700 # they are the same
1701 '''
eb6d0069 1702 source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
df99a044 1703 source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
eb6d0069 1704 target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
df99a044
ERE
1705 target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1706 while True:
1707 try:
be60ffd0
ERE
1708 sitem = next(source_it)
1709 titem = next(target_it)
df99a044
ERE
1710 except StopIteration:
1711 try:
be60ffd0 1712 titem = next(target_it)
df99a044
ERE
1713 raise Exception("iterators do not stop at the same time")
1714 except StopIteration:
1715 break
a638b8d7 1716 try:
5200d2aa 1717 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
be60ffd0 1718 except Exception as e:
5200d2aa
TJ
1719 print("SITEM: " + str(sitem))
1720 print("TITEM: " + str(titem))
a638b8d7 1721 raise e
df99a044 1722
f5d9144b
PG
1723 def test_create_no_symlinks(self):
1724 '''
1725 Creates a full backup from different varieties of symlinks. The
1726 extracted archive may not contain any symlinks but the file contents
1727 '''
1728
1729 os.system("rm -rf source_dir")
1730 os.makedirs("source_dir/symlinks")
1731 fd = os.open("source_dir/symlinks/valid_linkname",
1732 os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1733 os.write(fd, b"valid link target for symlink tests; please ignore\n")
1734 os.close(fd)
1735 # first one is good, the rest points nowhere
1736 self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1737 self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1738 self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1739 self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
f698c99c
PG
1740 password, paramversion = self.ENCRYPTION or (None, None)
1741 deltatar = DeltaTar(mode=self.MODE, password=password,
1742 crypto_paramversion=paramversion,
f5d9144b
PG
1743 logger=self.consoleLogger)
1744
1745 # create first backup
1746 deltatar.create_full_backup(source_path="source_dir",
1747 backup_path="backup_dir")
1748
1749 assert os.path.exists("backup_dir")
1750 shutil.rmtree("source_dir")
1751 assert not os.path.exists("source_dir")
1752
1753 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1754 tar_path = os.path.join("backup_dir", tar_filename)
1755
1756 deltatar.restore_backup(target_path="source_dir",
1757 backup_tar_path=tar_path)
1758
1759 for _r, _ds, fs in os.walk("source_dir/symlinks"):
1760 # only the valid link plus the linked file may be found in the
1761 # extracted archive
1762 assert len(fs) == 2
1763 for f in fs:
1764 # the link must have been resolved and file contents must match
1765 # the linked file
1766 assert not os.path.islink(f)
1767 with open("source_dir/symlinks/valid_linkname") as a:
1768 with open("source_dir/symlinks/whatever") as b:
1769 assert a.read() == b.read()
1770
83f5fd71
PG
1771 def test_restore_with_symlinks(self):
1772 '''
9b13f5c4
PG
1773 Creates a full backup containing different varieties of symlinks. All
1774 of them must be filtered out.
83f5fd71 1775 '''
f698c99c
PG
1776 password, paramversion = self.ENCRYPTION or (None, None)
1777 deltatar = DeltaTar(mode=self.MODE, password=password,
1778 crypto_paramversion=paramversion,
83f5fd71
PG
1779 logger=self.consoleLogger)
1780
1781 # create first backup
1782 deltatar.create_full_backup(source_path="source_dir",
1783 backup_path="backup_dir")
1784
1785 assert os.path.exists("backup_dir")
1786 shutil.rmtree("source_dir")
1787
1788 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1789 tar_path = os.path.join("backup_dir", tar_filename)
1790
1791 # add symlinks to existing archive
1792
9b13f5c4 1793 def add_symlink (a, name, dst):
83f5fd71
PG
1794 l = tarfile.TarInfo("snapshot://%s" % name)
1795 l.type = tarfile.SYMTYPE
1796 l.linkname = dst
1797 a.addfile(l)
9b13f5c4 1798 return name
83f5fd71 1799
5faea0e1
PG
1800 try:
1801 with tarfile.open(tar_path,mode="a") as a:
1802 checkme = \
1803 [ add_symlink(a, "symlinks/foo", "internal-file")
1804 , add_symlink(a, "symlinks/bar", "/absolute/path")
1805 , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1806 except tarfile.ReadError as e:
1807 if self.MODE == '#' or self.MODE.endswith ("gz"):
1808 checkme = []
1809 else:
1810 raise
1811 except ValueError as e:
1812 if self.MODE.startswith ('#'):
1813 checkme = []
1814 else:
1815 raise
83f5fd71
PG
1816
1817 deltatar.restore_backup(target_path="source_dir",
1818 backup_tar_path=tar_path)
1819
1820 # check what happened to our symlinks
9b13f5c4
PG
1821 for name in checkme:
1822 fullpath = os.path.join("source_dir", name)
1823 assert not os.path.exists(fullpath)
f5d9144b 1824
fcb3615d 1825
43ce978b
PG
1826 def test_restore_malicious_symlinks(self):
1827 '''
1828 Creates a full backup containing a symlink and a file of the same name.
1829 This simulates a symlink attack with a link pointing to some external
1830 path that is abused to write outside the extraction prefix.
1831 '''
f698c99c
PG
1832 password, paramversion = self.ENCRYPTION or (None, None)
1833 deltatar = DeltaTar(mode=self.MODE, password=password,
1834 crypto_paramversion=paramversion,
43ce978b
PG
1835 logger=self.consoleLogger)
1836
1837 # create first backup
1838 deltatar.create_full_backup(source_path="source_dir",
1839 backup_path="backup_dir")
1840
1841 assert os.path.exists("backup_dir")
1842 shutil.rmtree("source_dir")
1843
1844 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1845 tar_path = os.path.join("backup_dir", tar_filename)
1846
1847 # add symlinks to existing archive
1848
1849 def add_symlink (a, name, dst):
1850 l = tarfile.TarInfo("snapshot://%s" % name)
1851 l.type = tarfile.SYMTYPE
1852 l.linkname = dst
1853 a.addfile(l)
1854
1855 def add_file (a, name):
1856 f = tarfile.TarInfo("snapshot://%s" % name)
1857 f.type = tarfile.REGTYPE
1858 a.addfile(f)
43ce978b
PG
1859
1860 testpath = "symlinks/pernicious-link"
9b13f5c4
PG
1861 testdst = "/tmp/does/not/exist"
1862
5faea0e1
PG
1863 try:
1864 with tarfile.open(tar_path, mode="a") as a:
1865 add_symlink(a, testpath, testdst)
1866 add_symlink(a, testpath, testdst+"X")
1867 add_symlink(a, testpath, testdst+"XXX")
1868 add_file(a, testpath)
1869 except tarfile.ReadError as e:
1870 if self.MODE == '#' or self.MODE.endswith ("gz"):
1871 pass
1872 else:
1873 raise
1874 except ValueError as e:
1875 if self.MODE.startswith ('#'):
1876 pass # O_APPEND of concat archives not feasible
1877 else:
1878 raise
43ce978b 1879
43ce978b
PG
1880 deltatar.restore_backup(target_path="source_dir",
1881 backup_tar_path=tar_path)
1882
9b13f5c4
PG
1883 # check whether the link was extracted; deltatar seems to only ever
1884 # retrieve the first item it finds for a given path which in the case
1885 # at hand is a symlink to some non-existent path
43ce978b 1886 fullpath = os.path.join("source_dir", testpath)
5f939f66 1887 assert not os.path.lexists(fullpath)
43ce978b 1888
cc552d6e 1889
fcb3615d
PG
1890class TarfileTest(BaseTest):
1891 pwd = None
1892
1893 def setUp(self):
1894 self.pwd = os.getcwd()
1895 os.makedirs("backup_dir", exist_ok=True)
1896
1897 def tearDown(self):
1898 '''
1899 Remove temporary files created by unit tests and restore the API
1900 functions in *os*.
1901 '''
1902 os.chdir(self.pwd)
1903 shutil.rmtree("backup_dir")
1904
1905 def test_extract_malicious_symlinks_unlink(self):
1906 '''
1907 Test symlink mitigation: The destination must be deleted prior to
1908 extraction.
1909 '''
1910 tar_path = os.path.join("backup_dir", "malicious-archive")
1911
1912 # add symlinks to existing archive
1913
1914 def add_symlink (a, name, dst):
1915 l = tarfile.TarInfo(name)
1916 l.type = tarfile.SYMTYPE
1917 l.linkname = dst
1918 a.addfile(l)
1919
1920 def add_file (a, name):
1921 f = tarfile.TarInfo(name)
1922 f.type = tarfile.REGTYPE
1923 a.addfile(f)
1924
1925 # Add a symlink pointing to must-not-exist, then append a file
1926 # object at the same path. The file must not end up at
1927 # “must-not-exist” (the pointee) but at “not-as-symlink” (the
1928 # pointer) that was unlinked prior to extraction.
1929 testpath = "test/not-a-symlink"
1930 testdst = "must-not-exist"
1931
1932 try:
1933 with tarfile.open(tar_path, mode="w") as a:
1934 add_symlink(a, testpath, testdst)
1935 add_file(a, testpath)
1936 except tarfile.ReadError as e:
1937 if self.MODE == '#' or self.MODE.endswith ("gz"):
1938 pass
1939 else:
1940 raise
1941 except ValueError as e:
1942 if self.MODE.startswith ('#'):
1943 pass # O_APPEND of concat archives not feasible
1944 else:
1945 raise
1946
1947 def test_extract(dst, unlink):
1948 with tarfile.open(tar_path, mode="r") as a:
1949 os.makedirs(dst, exist_ok=True)
1950 olddir = os.getcwd()
1951 try:
1952 os.chdir(dst)
1953 a.extractall(unlink=unlink)
1954 finally:
1955 os.chdir(olddir)
1956
1957 fullpath = os.path.join(dst, testpath)
1958 fulldst = os.path.join(dst, "test/%s" % testdst)
1959
1960 if unlink is True:
1961 # Check whether the file was extracted. The object at the
1962 # symlink location (source) must be the file. The must not
1963 # be an object at the symlink destination.
1964 assert not os.path.islink(fullpath)
1965 assert not os.path.exists(fulldst)
1966 else:
1967 # Without unlink protection, the file must be found at the
1968 # symlink destination with the symlink intact.
1969 assert os.path.islink(fullpath)
1970 assert os.path.exists(fulldst)
1971
1972
1973 test_extract("test_dst_unlinked" , True)
1974 test_extract("test_dst_symlinked", False)
1975
1976
cc552d6e
PG
1977def fsapi_access_true (self):
1978 """
1979 Chicanery for testing improper use of the *os* module.
1980 """
1981 def yes (*_a, **_ka): return True
1982 self.FSAPI_SAVED.append (("access", getattr (os, "access")))
1983 setattr (os, "access", yes)
1984
1985
da26094a
ERE
1986class DeltaTar2Test(DeltaTarTest):
1987 '''
1988 Same as DeltaTar but with specific ":" mode
1989 '''
1990 MODE = ':'
1991
1992
1993class DeltaTarStreamTest(DeltaTarTest):
1994 '''
1995 Same as DeltaTar but with specific uncompressed stream mode
1996 '''
1997 MODE = '|'
1998
1999
2000class DeltaTarGzipTest(DeltaTarTest):
2001 '''
2002 Same as DeltaTar but with specific gzip mode
2003 '''
2004 MODE = ':gz'
8ea0be50 2005 MODE_COMPRESSES = True
da26094a
ERE
2006
2007
da26094a
ERE
2008class DeltaTarGzipStreamTest(DeltaTarTest):
2009 '''
2010 Same as DeltaTar but with specific gzip stream mode
2011 '''
2012 MODE = '|gz'
8ea0be50 2013 MODE_COMPRESSES = True
da26094a
ERE
2014
2015
bd011242
CH
2016@skip('Bz2 tests are too slow..')
2017class DeltaTarBz2Test(DeltaTarTest):
2018 '''
2019 Same as DeltaTar but with specific bz2 mode
2020 '''
2021 MODE = ':bz2'
8ea0be50 2022 MODE_COMPRESSES = True
ea6d3c3e 2023
bd011242
CH
2024
2025@skip('Bz2 tests are too slow..')
2026class DeltaTarBz2StreamTest(DeltaTarTest):
2027 '''
2028 Same as DeltaTar but with specific bz2 stream mode
2029 '''
2030 MODE = '|bz2'
8ea0be50 2031 MODE_COMPRESSES = True
da26094a
ERE
2032
2033
2034class DeltaTarGzipConcatTest(DeltaTarTest):
2035 '''
2036 Same as DeltaTar but with specific gzip concat stream mode
2037 '''
2038 MODE = '#gz'
8ea0be50 2039 MODE_COMPRESSES = True
da26094a
ERE
2040
2041
da26094a
ERE
2042class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
2043 '''
2044 Same as DeltaTar but with specific gzip aes128 concat stream mode
2045 '''
f698c99c
PG
2046 MODE = '#gz'
2047 ENCRYPTION = ('some magic key', 1)
8ea0be50 2048 MODE_COMPRESSES = True
da26094a
ERE
2049
2050
ac5e4184
ERE
2051class DeltaTarAes128ConcatTest(DeltaTarTest):
2052 '''
2053 Same as DeltaTar but with specific aes128 concat stream mode
2054 '''
d1c38f40 2055 MODE = '#'
f698c99c 2056 ENCRYPTION = ('some magic key', 1)
ac5e4184
ERE
2057
2058
a20c6239 2059class DeltaTarFilesystemHandlingTestBase(BaseTest):
cc552d6e
PG
2060 '''
2061 Mess with filesystem APIs.
2062 '''
2063 FSTEST = fsapi_access_true
2064
a20c6239
PG
2065class DeltaTarFSGzipTest(DeltaTarFilesystemHandlingTestBase,
2066 DeltaTarGzipTest):
2067 pass
2068
2069class DeltaTarFSGzipConcatTest(DeltaTarFilesystemHandlingTestBase,
2070 DeltaTarGzipConcatTest):
2071 pass
2072
2073class DeltaTarFSAes128ConcatTest(DeltaTarFilesystemHandlingTestBase,
2074 DeltaTarAes128ConcatTest):
2075 pass
2076
2077class DeltaTarFSGzipAes128ConcatTest(DeltaTarFilesystemHandlingTestBase,
2078 DeltaTarGzipAes128ConcatTest):
2079 pass
2080