add unit test for mishandling access(3)
[python-delta-tar] / testing / test_deltatar.py
CommitLineData
0708a374
ERE
1# Copyright (C) 2013 Intra2net AG
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU Lesser General Public License as published
5# by the Free Software Foundation; either version 3 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU Lesser General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program. If not, see
15# <http://www.gnu.org/licenses/lgpl-3.0.html>
16
17# Author: Eduardo Robles Elvira <edulix@wadobo.com>
18
f5d9144b 19import errno
0708a374 20import os
0d5c1970 21import re
cbac9f0b 22import random
e5c6ca04 23import shutil
0708a374 24import logging
b8fc2f5d
ERE
25import binascii
26import json
27from datetime import datetime
28from functools import partial
bd011242 29from unittest import skip, SkipTest
0708a374 30
83f5fd71 31import deltatar.tarfile as tarfile
f698c99c 32from deltatar.tarfile import TarFile
974408b5 33from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
f698c99c
PG
34from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35import deltatar.crypto as crypto
0708a374 36
0708a374
ERE
37from . import BaseTest
38from . import new_volume_handler
39
105c8e26
PG
40# Enable warning messages from deltatar. This minimizes the SNR of
41# test runs, but none of the messages are meaningful in any way.
42VERBOSE_TEST_OUTPUT = False
43
0708a374
ERE
44class DeltaTarTest(BaseTest):
45 """
46 Test backups
47 """
da26094a 48 MODE = ''
8ea0be50 49 MODE_COMPRESSES = False
da26094a 50
f698c99c 51 ENCRYPTION = None # (password : str, paramversion : int) option
da26094a 52
188b845d
TJ
53 GIT_DIR = '.git'
54
cc552d6e
PG
55 FSTEST = None
56 FSAPI_SAVED = []
57
0708a374
ERE
58 def setUp(self):
59 '''
60 Create base test data
61 '''
a4e8b8af 62 self.pwd = os.getcwd()
cbac9f0b 63 os.system('rm -rf target_dir source_dir* backup_dir* huge')
0708a374
ERE
64 os.makedirs('source_dir/test/test2')
65 self.hash = dict()
e5c6ca04 66 self.hash["source_dir/test/test2"] = ''
0708a374
ERE
67 self.hash["source_dir/big"] = self.create_file("source_dir/big", 50000)
68 self.hash["source_dir/small"] = self.create_file("source_dir/small", 100)
69 self.hash["source_dir/test/huge"] = self.create_file("source_dir/test/huge", 700000)
d5361dac 70 self.hash["source_dir/test/huge2"] = self.create_file("source_dir/test/huge2", 800000)
0708a374 71
105c8e26
PG
72 self.consoleLogger = None
73 if VERBOSE_TEST_OUTPUT is True:
74 self.consoleLogger = logging.StreamHandler()
75 self.consoleLogger.setLevel(logging.DEBUG)
0708a374 76
188b845d
TJ
77 if not os.path.isdir(self.GIT_DIR):
78 # Not running inside git tree, take our
79 # own testing directory as source.
80 self.GIT_DIR = 'testing'
81
82 if not os.path.isdir(self.GIT_DIR):
83 raise Exception('No input directory found: ' + self.GIT_DIR)
84
cc552d6e
PG
85 if self.FSTEST is not None:
86 self.FSTEST ()
87
0708a374
ERE
88 def tearDown(self):
89 '''
cc552d6e
PG
90 Remove temporary files created by unit tests and restore the API
91 functions in *os*.
0708a374 92 '''
cc552d6e
PG
93 for att, val in self.FSAPI_SAVED:
94 setattr (os, att, val)
a4e8b8af 95 os.chdir(self.pwd)
cbac9f0b 96 os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
cb7a3911
PG
97 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
98 ("I am fully aware that this will void my warranty.")
0708a374 99
b8fc2f5d 100 def test_restore_simple_full_backup(self):
0708a374
ERE
101 '''
102 Creates a full backup without any filtering and restores it.
103 '''
f698c99c
PG
104 password, paramversion = self.ENCRYPTION or (None, None)
105 deltatar = DeltaTar(mode=self.MODE, password=password,
106 crypto_paramversion=paramversion,
da26094a 107 logger=self.consoleLogger)
0708a374
ERE
108
109 # create first backup
110 deltatar.create_full_backup(
111 source_path="source_dir",
112 backup_path="backup_dir")
113
e5c6ca04
ERE
114 assert os.path.exists("backup_dir")
115 shutil.rmtree("source_dir")
116
117 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
118 tar_path = os.path.join("backup_dir", tar_filename)
119
e5c6ca04
ERE
120 deltatar.restore_backup(target_path="source_dir",
121 backup_tar_path=tar_path)
122
be60ffd0 123 for key, value in self.hash.items():
e5c6ca04
ERE
124 assert os.path.exists(key)
125 if value:
e82f14f5 126 assert value == self.md5sum(key)
da26094a 127
cb7a3911
PG
128
129 def test_create_backup_max_file_length (self):
130 """
131 Creates a full backup including one file that exceeds the (purposely
132 lowered) upper bound on GCM encrypted objects. This will yield multiple
133 encrypted objects for one plaintext file.
134
135 Success is verified by splitting the archive at object boundaries and
136 counting the parts.
137 """
138 if self.MODE_COMPRESSES is True:
139 raise SkipTest ("GCM file length test not meaningful with compression.")
140 if self.ENCRYPTION is None:
141 raise SkipTest ("GCM file length applies only to encrypted backups.")
142
143 new_max = 20000 # cannot be less than tar block size
144 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
145 ("I am fully aware that this will void my warranty.",
146 new_max)
147
148 password, paramversion = self.ENCRYPTION
149 deltatar = DeltaTar (mode=self.MODE, password=password,
150 crypto_paramversion=paramversion,
151 logger=self.consoleLogger)
152
153 self.hash = dict ()
154 os.makedirs ("source_dir2")
155 for f, s in [("empty" , 0) # 1 tar objects
156 ,("slightly_larger", new_max + 1) # 2
157 ,("twice" , 2 * new_max) # 3
158 ]:
159 f = "source_dir2/%s" % f
160 self.hash [f] = self.create_file (f, s)
161
162 deltatar.create_full_backup \
163 (source_path="source_dir2", backup_path="backup_dir")
164
165 assert os.path.exists ("backup_dir")
166 shutil.rmtree ("source_dir2")
167
168 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
169 backup_path = os.path.join("backup_dir", backup_filename)
170
171 # split the resulting archive into its constituents without
172 # decrypting
173 ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
174 "-o backup_dir/split <\'%s\'" % backup_path)
175
176 assert os.path.exists ("backup_dir/split")
177
178 dents = os.listdir ("backup_dir/split")
179 assert len (dents) == 6
180
181
182 def test_restore_backup_max_file_length (self):
183 """
184 Creates a full backup including one file that exceeds the (purposely
185 lowered) upper bound on GCM encrypted objects. This will yield two
186 encrypted objects for one plaintext file.
187
188 Success is verified by splitting the archive at object boundaries and
189 counting the parts.
190 """
191 if self.MODE_COMPRESSES is True:
192 raise SkipTest ("GCM file length test not meaningful with compression.")
193 if self.ENCRYPTION is None:
194 raise SkipTest ("GCM file length applies only to encrypted backups.")
195
196 new_max = 20000 # cannot be less than tar block size
197 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
198 ("I am fully aware that this will void my warranty.",
199 new_max)
200
201 password, paramversion = self.ENCRYPTION
202 deltatar = DeltaTar (mode=self.MODE, password=password,
203 crypto_paramversion=paramversion,
204 logger=self.consoleLogger)
205
206 self.hash = dict ()
207 os.makedirs ("source_dir2")
208 for f, s in [("empty" , 0) # 1 tar objects
ca520c21
TJ
209 ,("almost_large" , new_max - 1) # 2
210 ,("large" , new_max) # 3
211 ,("slightly_larger", new_max + 1) # 4
212 ,("twice" , 2 * new_max) # 5
213 ,("twice_plus_one" , (2 * new_max) + 1) # 6
cb7a3911
PG
214 ]:
215 f = "source_dir2/%s" % f
216 self.hash [f] = self.create_file (f, s)
217
218 deltatar.create_full_backup \
219 (source_path="source_dir2", backup_path="backup_dir")
220
221 assert os.path.exists ("backup_dir")
222 shutil.rmtree ("source_dir2")
223
224 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
225 backup_path = os.path.join("backup_dir", backup_filename)
226
227 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
228 tar_path = os.path.join("backup_dir", tar_filename)
229
230 deltatar.restore_backup(target_path="source_dir2",
231 backup_tar_path=tar_path)
232
233 for key, value in self.hash.items():
234 assert os.path.exists(key)
235 if value:
236 assert value == self.md5sum(key)
237
238
fac2cfe1
PG
239 def test_create_backup_index_max_file_length (self):
240 """
17b810c6
TJ
241 Creates a full backup with a too large index file for the upper bound
242 of the GCM encryption. Since the index file has a fixed IV file counter
243 of AES_GCM_IV_CNT_INDEX, we expect the crypto layer to abort.
fac2cfe1 244
17b810c6 245 60+ GB of (potentially compressed) index file should last for a while...
fac2cfe1
PG
246 """
247 if self.MODE_COMPRESSES is True:
248 raise SkipTest ("GCM file length test not meaningful with compression.")
249 if self.ENCRYPTION is None:
250 raise SkipTest ("GCM file length applies only to encrypted backups.")
251
252 new_max = 5000
253 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
254 ("I am fully aware that this will void my warranty.",
255 new_max)
256
257 password, paramversion = self.ENCRYPTION
258 deltatar = DeltaTar (mode=self.MODE, password=password,
259 crypto_paramversion=paramversion,
260 logger=self.consoleLogger)
261
262 self.hash = dict ()
263 os.makedirs ("source_dir2")
264 for i in range (42):
265 f = "source_dir2/dummy_%rd" % i
266 self.hash [f] = self.create_file (f, i)
267
1c2f7f07 268 with self.assertRaises (crypto.InvalidFileCounter):
fac2cfe1
PG
269 deltatar.create_full_backup \
270 (source_path="source_dir2", backup_path="backup_dir")
fac2cfe1
PG
271 shutil.rmtree ("source_dir2")
272
273
6c678f3a
ERE
274 def test_check_index_checksum(self):
275 '''
276 Creates a full backup and checks the index' checksum of files
277 '''
f698c99c
PG
278 password, paramversion = self.ENCRYPTION or (None, None)
279 deltatar = DeltaTar(mode=self.MODE, password=password,
280 crypto_paramversion=paramversion,
6c678f3a
ERE
281 logger=self.consoleLogger)
282
283 # create first backup
284 deltatar.create_full_backup(
285 source_path="source_dir",
286 backup_path="backup_dir")
287
288
289 index_filename = deltatar.index_name_func(True)
290 index_path = os.path.join("backup_dir", index_filename)
291
be60ffd0 292 f = open(index_path, 'rb')
6c678f3a
ERE
293 crc = None
294 checked = False
295 began_list = False
be60ffd0
ERE
296 while True:
297 l = f.readline()
298 if l == b'':
299 break
300 if b'BEGIN-FILE-LIST' in l:
c2ffe2ec 301 crc = binascii.crc32(l) & 0xFFFFffff
6c678f3a 302 began_list = True
be60ffd0 303 elif b'END-FILE-LIST' in l:
6c678f3a
ERE
304 crc = binascii.crc32(l, crc) & 0xffffffff
305
306 # next line contains the crc
be60ffd0 307 data = json.loads(f.readline().decode("UTF-8"))
6c678f3a
ERE
308 assert data['type'] == 'file-list-checksum'
309 assert data['checksum'] == crc
310 checked = True
311 break
312 elif began_list:
313 crc = binascii.crc32(l, crc) & 0xffffffff
c7609167 314 f.close()
6c678f3a 315
b8fc2f5d
ERE
316
317 def test_restore_multivol(self):
d5361dac 318 '''
b8fc2f5d
ERE
319 Creates a full backup without any filtering with multiple volumes and
320 restore it.
d5361dac 321 '''
ba7760a7
CH
322 if ':gz' in self.MODE:
323 raise SkipTest('compression information is lost when creating '
324 'multiple volumes with no Stream')
325
f698c99c
PG
326 password, paramversion = self.ENCRYPTION or (None, None)
327 deltatar = DeltaTar(mode=self.MODE, password=password,
328 crypto_paramversion=paramversion,
d5361dac
ERE
329 logger=self.consoleLogger)
330
b8fc2f5d
ERE
331 self.hash = dict()
332 os.makedirs('source_dir2')
333 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
334 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
335
d5361dac
ERE
336 # create first backup
337 deltatar.create_full_backup(
b8fc2f5d 338 source_path="source_dir2",
d5361dac
ERE
339 backup_path="backup_dir",
340 max_volume_size=1)
341
342 assert os.path.exists("backup_dir")
343 assert os.path.exists(os.path.join("backup_dir",
344 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
345 if self.MODE_COMPRESSES:
346 n_vols = 1
347 else:
348 n_vols = 2
349 for i_vol in range(n_vols):
350 assert os.path.exists(os.path.join("backup_dir",
351 deltatar.volume_name_func("backup_dir", True, i_vol)))
352 assert not os.path.exists(os.path.join("backup_dir",
353 deltatar.volume_name_func("backup_dir", True, n_vols)))
d5361dac 354
b8fc2f5d 355 shutil.rmtree("source_dir2")
d5361dac
ERE
356
357 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
358 tar_path = os.path.join("backup_dir", tar_filename)
359
360 # this should automatically restore all volumes
b8fc2f5d 361 deltatar.restore_backup(target_path="source_dir2",
d5361dac
ERE
362 backup_tar_path=tar_path)
363
be60ffd0 364 for key, value in self.hash.items():
d5361dac
ERE
365 assert os.path.exists(key)
366 if value:
367 assert value == self.md5sum(key)
368
14e2e92d
DGM
369 def test_restore_multivol_split(self):
370 '''
371 Creates a full backup without any filtering with multiple volumes
372 with big files bigger than the max volume size and
373 restore it.
374 '''
f61e1822 375 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
376 raise SkipTest('this test only works for uncompressed '
377 'or concat compressed modes')
f61e1822 378
f698c99c
PG
379 password, paramversion = self.ENCRYPTION or (None, None)
380 deltatar = DeltaTar(mode=self.MODE, password=password,
381 crypto_paramversion=paramversion,
14e2e92d
DGM
382 logger=self.consoleLogger)
383
14e2e92d
DGM
384 self.hash = dict()
385 os.makedirs('source_dir2')
82f75df4 386 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 3*1024*1024)
14e2e92d
DGM
387 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 4*1024*1024)
388 self.hash["source_dir2/huge2"] = self.create_file("source_dir2/huge2", 4*1024*1024)
389
390 # create first backup
391 deltatar.create_full_backup(
392 source_path="source_dir2",
393 backup_path="backup_dir",
394 max_volume_size=2)
395
396 assert os.path.exists("backup_dir")
397 assert os.path.exists(os.path.join("backup_dir",
398 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
399 if self.MODE_COMPRESSES:
400 n_vols = 1
401 else:
402 n_vols = 6
403 for i_vol in range(n_vols):
404 assert os.path.exists(os.path.join("backup_dir",
405 deltatar.volume_name_func("backup_dir", True, i_vol)))
406 assert not os.path.exists(os.path.join("backup_dir",
407 deltatar.volume_name_func("backup_dir", True, n_vols)))
14e2e92d
DGM
408
409 shutil.rmtree("source_dir2")
410
411 index_filename = deltatar.index_name_func(True)
412 index_path = os.path.join("backup_dir", index_filename)
413
414 deltatar.restore_backup(target_path="source_dir2",
415 backup_indexes_paths=[index_path])
416
417 for key, value in self.hash.items():
418 assert os.path.exists(key)
419 if value:
420 assert value == self.md5sum(key)
421
422
9eae9a1f
ERE
423 def test_full_backup_index_extra_data(self):
424 '''
425 Tests that the index file for a full backup can store extra_data and
426 that this data can be retrieved.
427 '''
f698c99c
PG
428 password, paramversion = self.ENCRYPTION or (None, None)
429 deltatar = DeltaTar(mode=self.MODE, password=password,
430 crypto_paramversion=paramversion,
9eae9a1f
ERE
431 logger=self.consoleLogger)
432
433 extra_data = dict(
434 hola="caracola",
435 otra_cosa=[1, "lista"],
436 y_otra=dict(bola=1.1)
437 )
438
439 deltatar.create_full_backup(
440 source_path="source_dir",
441 backup_path="backup_dir",
442 extra_data=extra_data)
443
444 index_filename = deltatar.index_name_func(is_full=True)
445 index_path = os.path.join("backup_dir", index_filename)
446
447 # iterate_index_path retrieves extra_data, and thus we can then compare
448 index_it = deltatar.iterate_index_path(index_path)
449 self.assertEqual(index_it.extra_data, extra_data)
450
451
452 def test_diff_backup_index_extra_data(self):
453 '''
454 Tests that the index file for a diff backup can store extra_data and
455 that this data can be retrieved.
456 '''
f698c99c
PG
457 password, paramversion = self.ENCRYPTION or (None, None)
458 deltatar = DeltaTar(mode=self.MODE, password=password,
459 crypto_paramversion=paramversion,
9eae9a1f
ERE
460 logger=self.consoleLogger)
461
462 extra_data = dict(
463 hola="caracola",
464 otra_cosa=[1, "lista"],
465 y_otra=dict(bola=1.1)
466 )
467 # do first backup
468 deltatar.create_full_backup(
469 source_path="source_dir",
470 backup_path="backup_dir")
471
472
473 prev_index_filename = deltatar.index_name_func(is_full=True)
474 prev_index_path = os.path.join("backup_dir", prev_index_filename)
475
476 # create empty diff backup
477 deltatar.create_diff_backup("source_dir", "backup_dir2",
478 prev_index_path, extra_data=extra_data)
479
480 index_filename = deltatar.index_name_func(is_full=False)
481 index_path = os.path.join("backup_dir2", index_filename)
482
483 # iterate_index_path retrieves extra_data, and thus we can then compare
484 index_it = deltatar.iterate_index_path(index_path)
485 self.assertEqual(index_it.extra_data, extra_data)
486
8825be52
DGM
487 def test_restore_multivol2(self):
488 '''
489 Creates a full backup without any filtering with multiple volumes and
490 restore it.
491 '''
f698c99c
PG
492 password, paramversion = self.ENCRYPTION or (None, None)
493 deltatar = DeltaTar(mode=self.MODE, password=password,
494 crypto_paramversion=paramversion,
8825be52
DGM
495 logger=self.consoleLogger)
496
188b845d 497 shutil.copytree(self.GIT_DIR, "source_dir2")
8825be52
DGM
498
499 # create first backup
500 deltatar.create_full_backup(
501 source_path="source_dir2",
502 backup_path="backup_dir",
503 max_volume_size=1)
504
505 assert os.path.exists("backup_dir")
506 assert os.path.exists(os.path.join("backup_dir",
507 deltatar.volume_name_func("backup_dir", True, 0)))
8825be52
DGM
508
509 shutil.rmtree("source_dir2")
510
511 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
512 tar_path = os.path.join("backup_dir", tar_filename)
513
514 # this should automatically restore all volumes
515 deltatar.restore_backup(target_path="source_dir2",
516 backup_tar_path=tar_path)
517
188b845d 518 self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
8825be52 519
b8fc2f5d
ERE
520 def test_restore_multivol_manual_from_index(self):
521 '''
522 Creates a full backup without any filtering with multiple volumes and
523 restore it.
524 '''
525 # this test only works for uncompressed or concat compressed modes
526 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
527 raise SkipTest('this test only works for uncompressed '
528 'or concat compressed modes')
b8fc2f5d 529
f698c99c
PG
530 password, paramversion = self.ENCRYPTION or (None, None)
531 deltatar = DeltaTar(mode=self.MODE, password=password,
532 crypto_paramversion=paramversion,
b8fc2f5d
ERE
533 logger=self.consoleLogger)
534
b8fc2f5d
ERE
535 self.hash = dict()
536 os.makedirs('source_dir2')
537 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
538 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
539
540 # create first backup
541 deltatar.create_full_backup(
542 source_path="source_dir2",
543 backup_path="backup_dir",
544 max_volume_size=1)
545
546 assert os.path.exists("backup_dir")
547 assert os.path.exists(os.path.join("backup_dir",
548 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
549 if self.MODE_COMPRESSES:
550 n_vols = 1
551 else:
552 n_vols = 2
553 for i_vol in range(n_vols):
554 assert os.path.exists(os.path.join("backup_dir",
555 deltatar.volume_name_func("backup_dir", True, i_vol)))
556 assert not os.path.exists(os.path.join("backup_dir",
557 deltatar.volume_name_func("backup_dir", True, n_vols)))
b8fc2f5d
ERE
558
559 shutil.rmtree("source_dir2")
560
561 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
562 tar_path = os.path.join("backup_dir", tar_filename)
563
564 index_filename = deltatar.index_name_func(True)
565 index_path = os.path.join("backup_dir", index_filename)
566
567 # this should automatically restore the huge file
9eccb1c2 568 f = deltatar.open_auxiliary_file(index_path, 'r')
10798176 569 offset = None
2967b3e1
ERE
570 while True:
571 l = f.readline()
572 if not len(l):
573 break
be60ffd0 574 data = json.loads(l.decode('UTF-8'))
8adbe50d 575 if data.get('type', '') == 'file' and\
eb6d0069 576 deltatar.unprefixed(data['path']) == "huge":
b8fc2f5d
ERE
577 offset = data['offset']
578 break
579
10798176
CH
580 assert offset is not None
581
be60ffd0 582 fo = open(tar_path, 'rb')
b8fc2f5d
ERE
583 fo.seek(offset)
584 def new_volume_handler(mode, tarobj, base_name, volume_number):
f698c99c
PG
585 suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
586 if self.ENCRYPTION is not None:
587 # deltatar module is shadowed here
588 suf += "." + deltatar_PDTCRYPT_EXTENSION
b8fc2f5d 589 tarobj.open_volume(datetime.now().strftime(
f698c99c 590 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
b8fc2f5d
ERE
591 new_volume_handler = partial(new_volume_handler, self.MODE)
592
f698c99c
PG
593 crypto_ctx = None
594 if self.ENCRYPTION is not None:
595 crypto_ctx = crypto.Decrypt (password)
596
b8fc2f5d 597 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
b8fc2f5d 598 new_volume_handler=new_volume_handler,
f698c99c
PG
599 encryption=crypto_ctx)
600
8adbe50d
ERE
601 member = tarobj.next()
602 member.path = deltatar.unprefixed(member.path)
603 member.name = deltatar.unprefixed(member.name)
604 tarobj.extract(member)
b8fc2f5d 605 tarobj.close()
c7609167 606 fo.close()
b8fc2f5d
ERE
607 assert self.hash['source_dir2/huge'] == self.md5sum('huge')
608
609 os.unlink("huge")
6c678f3a 610
f698c99c 611
9e092947
PG
612 def test_restore_manual_from_index_twice (self):
613 """
614 Creates a full backup and restore the same file twice. This *must* fail
615 when encryption is active.
616
617 Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
618 backwards within the same file. This prevents the encryption layer from
619 exploding due to a reused IV in an overall valid archive.
620
621 This test anticipates possible future mistakes since it’s entirely
622 feasible to implement backward seeks for *_Stream* with concat mode.
623 """
624 # this test only works for uncompressed or concat compressed modes
625 if self.MODE.startswith("|") or self.MODE_COMPRESSES:
626 raise SkipTest("this test only works for uncompressed "
627 "or concat compressed modes")
628
629 password, paramversion = self.ENCRYPTION or (None, None)
630 deltatar = DeltaTar(mode=self.MODE, password=password,
631 crypto_paramversion=paramversion,
632 logger=self.consoleLogger)
633
634 self.hash = dict()
635 os.makedirs("source_dir2")
636 self.hash["source_dir2/samefile"] = \
637 self.create_file("source_dir2/samefile", 1 * 1024)
638
639 # create first backup
640 deltatar.create_full_backup(
641 source_path="source_dir2",
642 backup_path="backup_dir")
643
644 assert os.path.exists("backup_dir")
645 assert os.path.exists(os.path.join("backup_dir",
646 deltatar.volume_name_func("backup_dir", True, 0)))
647
648 shutil.rmtree("source_dir2")
649
650 tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
651 tar_path = os.path.join("backup_dir", tar_filename)
652
653 index_filename = deltatar.index_name_func(True)
654 index_path = os.path.join("backup_dir", index_filename)
655
656 f = deltatar.open_auxiliary_file(index_path, "r")
657 offset = None
658 while True:
659 l = f.readline()
660 if not len(l):
661 break
662 data = json.loads(l.decode("UTF-8"))
663 if data.get("type", "") == "file" and\
664 deltatar.unprefixed(data["path"]) == "samefile":
665 offset = data["offset"]
666 break
667
668 assert offset is not None
669
670 fo = open(tar_path, "rb")
671 fo.seek(offset)
672
673 crypto_ctx = None
674 if self.ENCRYPTION is not None:
675 crypto_ctx = crypto.Decrypt (password)
676
677 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
678 encryption=crypto_ctx)
679 member = tarobj.next()
680 member.path = deltatar.unprefixed(member.path)
681 member.name = deltatar.unprefixed(member.name)
682
683 # extract once …
684 tarobj.extract(member)
685 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
686
687 # … and twice
688 try:
689 tarobj.extract(member)
690 except tarfile.StreamError:
691 if crypto_ctx is not None:
692 pass # good: seeking backwards not allowed
693 else:
694 raise
695 tarobj.close()
696 fo.close()
697 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
698
699 os.unlink("samefile")
700
701
11684b1d
ERE
702 def test_restore_from_index(self):
703 '''
55b8686d 704 Restores a full backup using an index file.
11684b1d 705 '''
11684b1d 706 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
707 raise SkipTest('this test only works for uncompressed '
708 'or concat compressed modes')
11684b1d 709
f698c99c
PG
710 password, paramversion = self.ENCRYPTION or (None, None)
711 deltatar = DeltaTar(mode=self.MODE, password=password,
712 crypto_paramversion=paramversion,
11684b1d
ERE
713 logger=self.consoleLogger)
714
715 # create first backup
716 deltatar.create_full_backup(
717 source_path="source_dir",
55b8686d 718 backup_path="backup_dir")
11684b1d
ERE
719
720 shutil.rmtree("source_dir")
721
722 # this should automatically restore all volumes
723 index_filename = deltatar.index_name_func(True)
724 index_path = os.path.join("backup_dir", index_filename)
725
726 deltatar.restore_backup(target_path="source_dir",
727 backup_indexes_paths=[index_path])
728
be60ffd0 729 for key, value in self.hash.items():
11684b1d
ERE
730 assert os.path.exists(key)
731 if value:
732 assert value == self.md5sum(key)
6c678f3a 733
b8fc2f5d
ERE
734 def test_restore_multivol_from_index(self):
735 '''
736 Restores a full multivolume backup using an index file.
737 '''
b8fc2f5d 738 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
739 raise SkipTest('this test only works for uncompressed '
740 'or concat compressed modes')
b8fc2f5d 741
f698c99c
PG
742 password, paramversion = self.ENCRYPTION or (None, None)
743 deltatar = DeltaTar(mode=self.MODE, password=password,
744 crypto_paramversion=paramversion,
b8fc2f5d
ERE
745 logger=self.consoleLogger)
746
747 # create first backup
748 deltatar.create_full_backup(
749 source_path="source_dir",
750 backup_path="backup_dir",
8390c925 751 max_volume_size=2)
b8fc2f5d
ERE
752
753 shutil.rmtree("source_dir")
754
755 # this should automatically restore all volumes
756 index_filename = deltatar.index_name_func(True)
757 index_path = os.path.join("backup_dir", index_filename)
758
759 deltatar.restore_backup(target_path="source_dir",
760 backup_indexes_paths=[index_path])
761
be60ffd0 762 for key, value in self.hash.items():
b8fc2f5d
ERE
763 assert os.path.exists(key)
764 if value:
765 assert value == self.md5sum(key)
da26094a 766
c1af2184
ERE
767 def test_create_basic_filtering(self):
768 '''
769 Tests create backup basic filtering.
770 '''
f698c99c
PG
771 password, paramversion = self.ENCRYPTION or (None, None)
772 deltatar = DeltaTar(mode=self.MODE, password=password,
773 crypto_paramversion=paramversion,
c1af2184 774 logger=self.consoleLogger,
eb6d0069
CH
775 included_files=["test", "small"],
776 excluded_files=["test/huge"])
c1af2184
ERE
777
778 # create first backup
779 deltatar.create_full_backup(
780 source_path="source_dir",
781 backup_path="backup_dir")
782
783 assert os.path.exists("backup_dir")
784 shutil.rmtree("source_dir")
785
786 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
787 tar_path = os.path.join("backup_dir", tar_filename)
788
789 deltatar.restore_backup(target_path="source_dir",
790 backup_tar_path=tar_path)
791
eb6d0069
CH
792 assert os.path.exists("source_dir/small")
793 assert os.path.exists("source_dir/test")
794 assert os.path.exists("source_dir/test/huge2")
795 assert os.path.exists("source_dir/test/test2")
c1af2184 796
eb6d0069
CH
797 assert not os.path.exists("source_dir/test/huge")
798 assert not os.path.exists("source_dir/big")
c1af2184 799
862b3726
ERE
800 def test_create_filter_func(self):
801 '''
802 Tests create backup basic filtering.
803 '''
804 visited_paths = []
805 def filter_func(visited_paths, path):
806 if path not in visited_paths:
807 visited_paths.append(path)
808 return True
809
810 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
811
812 password, paramversion = self.ENCRYPTION or (None, None)
813 deltatar = DeltaTar(mode=self.MODE, password=password,
814 crypto_paramversion=paramversion,
862b3726 815 logger=self.consoleLogger,
eb6d0069
CH
816 included_files=["test", "small"],
817 excluded_files=["test/huge"],
862b3726
ERE
818 filter_func=filter_func)
819
820 # create first backup
821 deltatar.create_full_backup(
822 source_path="source_dir",
823 backup_path="backup_dir")
824
825 assert os.path.exists("backup_dir")
826 shutil.rmtree("source_dir")
827
828 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
829 tar_path = os.path.join("backup_dir", tar_filename)
830
831 deltatar.restore_backup(target_path="source_dir",
832 backup_tar_path=tar_path)
9af328e2 833 assert set(visited_paths) == set([
eb6d0069
CH
834 'small',
835 'test',
836 'test/huge2',
837 'test/test2'
9af328e2 838 ])
862b3726
ERE
839
840 def test_create_filter_out_func(self):
841 '''
842 Tests create backup basic filtering.
843 '''
844 visited_paths = []
845 def filter_func(visited_paths, path):
846 '''
847 Filter out everything
848 '''
849 if path not in visited_paths:
850 visited_paths.append(path)
851 return False
852
853 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
854
855 password, paramversion = self.ENCRYPTION or (None, None)
856 deltatar = DeltaTar(mode=self.MODE, password=password,
857 crypto_paramversion=paramversion,
862b3726 858 logger=self.consoleLogger,
eb6d0069
CH
859 included_files=["test", "small"],
860 excluded_files=["test/huge"],
862b3726
ERE
861 filter_func=filter_func)
862
863 # create first backup
864 deltatar.create_full_backup(
865 source_path="source_dir",
866 backup_path="backup_dir")
867
868 assert os.path.exists("backup_dir")
869 shutil.rmtree("source_dir")
870
871 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
872 tar_path = os.path.join("backup_dir", tar_filename)
873
874 deltatar.restore_backup(target_path="source_dir",
875 backup_tar_path=tar_path)
9af328e2 876 assert set(visited_paths) == set([
eb6d0069
CH
877 'small',
878 'test'
9af328e2 879 ])
862b3726
ERE
880
881 # check that effectively no file was backed up
eb6d0069
CH
882 assert not os.path.exists("source_dir/small")
883 assert not os.path.exists("source_dir/big")
884 assert not os.path.exists("source_dir/test")
862b3726 885
9af328e2
ERE
886 def test_restore_index_basic_filtering(self):
887 '''
888 Creates a backup, and then filter when doing the index based restore.
889 '''
f391d8e9 890 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
891 raise SkipTest('this test only works for uncompressed '
892 'or concat compressed modes')
f391d8e9 893
f698c99c
PG
894 password, paramversion = self.ENCRYPTION or (None, None)
895 deltatar = DeltaTar(mode=self.MODE, password=password,
896 crypto_paramversion=paramversion,
9af328e2
ERE
897 logger=self.consoleLogger)
898
899 # create first backup
900 deltatar.create_full_backup(
901 source_path="source_dir",
902 backup_path="backup_dir")
903
904 assert os.path.exists("backup_dir")
905 shutil.rmtree("source_dir")
906
907 index_filename = deltatar.index_name_func(True)
908 index_path = os.path.join("backup_dir", index_filename)
909
eb6d0069
CH
910 deltatar.included_files = ["test", "small"]
911 deltatar.excluded_files = ["test/huge"]
9af328e2
ERE
912 deltatar.restore_backup(target_path="source_dir",
913 backup_indexes_paths=[index_path])
914
eb6d0069
CH
915 assert os.path.exists("source_dir/small")
916 assert os.path.exists("source_dir/test")
917 assert os.path.exists("source_dir/test/huge2")
918 assert os.path.exists("source_dir/test/test2")
9af328e2 919
eb6d0069
CH
920 assert not os.path.exists("source_dir/test/huge")
921 assert not os.path.exists("source_dir/big")
9af328e2
ERE
922
923 def test_restore_index_filter_func(self):
924 '''
925 Creates a backup, and then filter when doing the index based restore,
926 using the filter function.
927 '''
f391d8e9 928 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
929 raise SkipTest('this test only works for uncompressed '
930 'or concat compressed modes')
f391d8e9 931
9af328e2
ERE
932 visited_paths = []
933 def filter_func(visited_paths, path):
934 if path not in visited_paths:
935 visited_paths.append(path)
936 return True
937
938 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
939
940 password, paramversion = self.ENCRYPTION or (None, None)
941 deltatar = DeltaTar(mode=self.MODE, password=password,
942 crypto_paramversion=paramversion,
9af328e2
ERE
943 logger=self.consoleLogger)
944
945 # create first backup
946 deltatar.create_full_backup(
947 source_path="source_dir",
948 backup_path="backup_dir")
949
950 assert os.path.exists("backup_dir")
951 shutil.rmtree("source_dir")
952
953 index_filename = deltatar.index_name_func(True)
954 index_path = os.path.join("backup_dir", index_filename)
955
eb6d0069
CH
956 deltatar.included_files = ["test", "small"]
957 deltatar.excluded_files = ["test/huge"]
9af328e2
ERE
958 deltatar.filter_func = filter_func
959 deltatar.restore_backup(target_path="source_dir",
960 backup_indexes_paths=[index_path])
961
962 assert set(visited_paths) == set([
eb6d0069
CH
963 'small',
964 'test',
965 'test/huge2',
966 'test/test2'
9af328e2
ERE
967 ])
968
e5f5681b
ERE
969 def test_restore_tar_basic_filtering(self):
970 '''
971 Creates a backup, and then filter when doing the tar based restore.
972 '''
f698c99c
PG
973 password, paramversion = self.ENCRYPTION or (None, None)
974 deltatar = DeltaTar(mode=self.MODE, password=password,
975 crypto_paramversion=paramversion,
e5f5681b
ERE
976 logger=self.consoleLogger)
977
978 # create first backup
979 deltatar.create_full_backup(
980 source_path="source_dir",
981 backup_path="backup_dir")
982
983 assert os.path.exists("backup_dir")
984 shutil.rmtree("source_dir")
985
eb6d0069
CH
986 deltatar.included_files = ["test", "small"]
987 deltatar.excluded_files = ["test/huge"]
e5f5681b
ERE
988
989 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
990 tar_path = os.path.join("backup_dir", tar_filename)
991
992 deltatar.restore_backup(target_path="source_dir",
993 backup_tar_path=tar_path)
994
eb6d0069
CH
995 assert os.path.exists("source_dir/small")
996 assert os.path.exists("source_dir/test")
997 assert os.path.exists("source_dir/test/huge2")
998 assert os.path.exists("source_dir/test/test2")
e5f5681b 999
eb6d0069
CH
1000 assert not os.path.exists("source_dir/test/huge")
1001 assert not os.path.exists("source_dir/big")
e5f5681b
ERE
1002
1003 def test_restore_tar_filter_func(self):
1004 '''
1005 Creates a backup, and then filter when doing the tar based restore,
1006 using the filter function.
1007 '''
1008 visited_paths = []
1009 def filter_func(visited_paths, path):
1010 if path not in visited_paths:
1011 visited_paths.append(path)
1012 return True
1013
1014 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
1015
1016 password, paramversion = self.ENCRYPTION or (None, None)
1017 deltatar = DeltaTar(mode=self.MODE, password=password,
1018 crypto_paramversion=paramversion,
e5f5681b
ERE
1019 logger=self.consoleLogger)
1020
1021 # create first backup
1022 deltatar.create_full_backup(
1023 source_path="source_dir",
1024 backup_path="backup_dir")
1025
1026 assert os.path.exists("backup_dir")
1027 shutil.rmtree("source_dir")
1028
1029 index_filename = deltatar.index_name_func(True)
1030 index_path = os.path.join("backup_dir", index_filename)
1031
eb6d0069
CH
1032 deltatar.included_files = ["test", "small"]
1033 deltatar.excluded_files = ["test/huge"]
e5f5681b
ERE
1034 deltatar.filter_func = filter_func
1035
1036 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1037 tar_path = os.path.join("backup_dir", tar_filename)
1038
1039 deltatar.restore_backup(target_path="source_dir",
1040 backup_tar_path=tar_path)
1041 assert set(visited_paths) == set([
eb6d0069
CH
1042 'small',
1043 'test',
1044 'test/huge2',
1045 'test/test2'
e5f5681b
ERE
1046 ])
1047
974408b5 1048 def test_filter_path_regexp(self):
0d5c1970
ERE
1049 '''
1050 Test specifically the deltatar.filter_path function with regular
1051 expressions
1052 '''
1053 included_files = [
eb6d0069
CH
1054 re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1055 re.compile('^yes$'),
1056 'testing'
0d5c1970
ERE
1057 ]
1058 excluded_files = [
eb6d0069 1059 re.compile('^testing/in_the'),
0d5c1970
ERE
1060 ]
1061 deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1062 excluded_files=excluded_files)
1063
1064 # assert valid and invalid paths
eb6d0069
CH
1065 assert deltatar.filter_path('test/hola')
1066 assert deltatar.filter_path('test/hola/any/thing')
1067 assert deltatar.filter_path('test/caracola/caracolero')
1068 assert deltatar.filter_path('test/caracola/caracolero/yeah')
1069 assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1070 assert deltatar.filter_path('yes')
1071 assert deltatar.filter_path('testing')
1072 assert deltatar.filter_path('testing/yes')
1073 assert deltatar.filter_path('testing/in_th')
1074
1075 assert not deltatar.filter_path('something')
1076 assert not deltatar.filter_path('other/thing')
1077 assert not deltatar.filter_path('test_ing')
1078 assert not deltatar.filter_path('test/hola_lala')
1079 assert not deltatar.filter_path('test/agur')
1080 assert not deltatar.filter_path('testing_something')
1081 assert not deltatar.filter_path('yeso')
1082 assert not deltatar.filter_path('yes/o')
1083 assert not deltatar.filter_path('yes_o')
1084 assert not deltatar.filter_path('testing/in_the')
1085 assert not deltatar.filter_path('testing/in_the_field')
1086 assert not deltatar.filter_path('testing/in_the/field')
e5f5681b 1087
974408b5
ERE
1088 def test_filter_path_parent(self):
1089 '''
1090 Test specifically the deltatar.filter_path function for parent matching
1091 '''
1092 included_files = [
eb6d0069 1093 'testing/path/to/some/thing'
974408b5
ERE
1094 ]
1095 deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1096
1097 # assert valid and invalid paths
eb6d0069
CH
1098 assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1099 assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1100 assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1101 assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1102 assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1103 assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1104 assert deltatar.filter_path('testing/something/else') == NO_MATCH
974408b5
ERE
1105
1106 def test_parent_matching_simple_full_backup(self):
1107 '''
1108 Create a full backup using parent matching
1109 '''
1110 included_files = [
eb6d0069 1111 'test/huge2'
974408b5 1112 ]
f698c99c
PG
1113
1114 password, paramversion = self.ENCRYPTION or (None, None)
1115 deltatar = DeltaTar(mode=self.MODE, password=password,
1116 crypto_paramversion=paramversion,
974408b5
ERE
1117 logger=self.consoleLogger,
1118 included_files=included_files)
1119
1120 # create first backup
1121 deltatar.create_full_backup(
1122 source_path="source_dir",
1123 backup_path="backup_dir")
1124
1125 assert os.path.exists("backup_dir")
1126 shutil.rmtree("source_dir")
1127
1128 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1129 tar_path = os.path.join("backup_dir", tar_filename)
1130
f698c99c 1131 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1132 logger=self.consoleLogger)
1133 deltatar.restore_backup(target_path="source_dir",
1134 backup_tar_path=tar_path)
1135
1136 assert os.path.exists('source_dir/test/huge2')
1137 assert os.path.exists('source_dir/test/')
1138 assert not os.path.exists('source_dir/test/huge')
1139 assert not os.path.exists('source_dir/big')
1140 assert not os.path.exists('source_dir/small')
1141
1142 def test_parent_matching_simple_full_backup_restore(self):
1143 '''
1144 Create a full backup and restores it using parent matching
1145 '''
1146 included_files = [
eb6d0069 1147 'test/huge2'
974408b5 1148 ]
f698c99c
PG
1149
1150 password, paramversion = self.ENCRYPTION or (None, None)
1151 deltatar = DeltaTar(mode=self.MODE, password=password,
1152 crypto_paramversion=paramversion,
974408b5
ERE
1153 logger=self.consoleLogger)
1154
1155 # create first backup
1156 deltatar.create_full_backup(
1157 source_path="source_dir",
1158 backup_path="backup_dir")
1159
1160 assert os.path.exists("backup_dir")
1161 shutil.rmtree("source_dir")
1162
1163 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1164 tar_path = os.path.join("backup_dir", tar_filename)
1165
f698c99c 1166 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1167 logger=self.consoleLogger,
1168 included_files=included_files)
1169 deltatar.restore_backup(target_path="source_dir",
1170 backup_tar_path=tar_path)
1171
1172 assert os.path.exists('source_dir/test/huge2')
1173 assert os.path.exists('source_dir/test/')
1174 assert not os.path.exists('source_dir/test/huge')
1175 assert not os.path.exists('source_dir/big')
1176 assert not os.path.exists('source_dir/small')
1177
1178 def test_parent_matching_index_full_backup_restore(self):
1179 '''
1180 Create a full backup and restores it using parent matching
1181 '''
1182 included_files = [
eb6d0069 1183 'test/huge2'
974408b5 1184 ]
f698c99c
PG
1185
1186 password, paramversion = self.ENCRYPTION or (None, None)
1187 deltatar = DeltaTar(mode=self.MODE, password=password,
1188 crypto_paramversion=paramversion,
974408b5
ERE
1189 logger=self.consoleLogger)
1190
1191 # create first backup
1192 deltatar.create_full_backup(
1193 source_path="source_dir",
1194 backup_path="backup_dir")
1195
1196 assert os.path.exists("backup_dir")
1197 shutil.rmtree("source_dir")
1198
1199 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1200 tar_path = os.path.join("backup_dir", tar_filename)
1201
f698c99c 1202 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1203 logger=self.consoleLogger,
1204 included_files=included_files)
1205 deltatar.restore_backup(target_path="source_dir",
1206 backup_tar_path=tar_path)
1207
1208 assert os.path.exists('source_dir/test/huge2')
1209 assert os.path.exists('source_dir/test/')
1210 assert not os.path.exists('source_dir/test/huge')
1211 assert not os.path.exists('source_dir/big')
1212 assert not os.path.exists('source_dir/small')
1213
d07c8065
ERE
1214 def test_collate_iterators(self):
1215 '''
1216 Tests the collate iterators functionality with two exact directories,
1217 using an index iterator from a backup and the exact same source dir.
1218 '''
f698c99c
PG
1219 password, paramversion = self.ENCRYPTION or (None, None)
1220 deltatar = DeltaTar(mode=self.MODE, password=password,
1221 crypto_paramversion=paramversion,
d07c8065
ERE
1222 logger=self.consoleLogger)
1223
1224 # create first backup
1225 deltatar.create_full_backup(
1226 source_path="source_dir",
1227 backup_path="backup_dir")
1228
1229 assert os.path.exists("backup_dir")
1230
1231 cwd = os.getcwd()
1232 index_filename = deltatar.index_name_func(is_full=True)
1233 index_path = os.path.join(cwd, "backup_dir", index_filename)
1234 index_it = deltatar.iterate_index_path(index_path)
1235
1236 os.chdir('source_dir')
1237 dir_it = deltatar._recursive_walk_dir('.')
1238 path_it = deltatar.jsonize_path_iterator(dir_it)
1239
1240 try:
ea6d3c3e 1241 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
d07c8065
ERE
1242 assert deltatar._equal_stat_dicts(path1, path2)
1243 finally:
1244 os.chdir(cwd)
1245
1246 def test_collate_iterators_diffdirs(self):
1247 '''
1248 Use the collate iterators functionality with two different directories.
1249 It must behave in an expected way.
1250 '''
1251 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1252
f698c99c
PG
1253 password, paramversion = self.ENCRYPTION or (None, None)
1254 deltatar = DeltaTar(mode=self.MODE, password=password,
1255 crypto_paramversion=paramversion,
d07c8065
ERE
1256 logger=self.consoleLogger)
1257
1258 # create first backup
1259 deltatar.create_full_backup(
1260 source_path="source_dir",
1261 backup_path="backup_dir")
1262
1263 assert os.path.exists("backup_dir")
1264 self.hash["source_dir/z"] = self.create_file("source_dir/z", 100)
1265
1266 cwd = os.getcwd()
1267 index_filename = deltatar.index_name_func(is_full=True)
1268 index_path = os.path.join(cwd, "backup_dir", index_filename)
1269 index_it = deltatar.iterate_index_path(index_path)
1270
1271 os.chdir('source_dir')
1272 dir_it = deltatar._recursive_walk_dir('.')
1273 path_it = deltatar.jsonize_path_iterator(dir_it)
1274
1275 try:
ea6d3c3e 1276 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
eb6d0069 1277 if path2['path'] == 'z':
d07c8065
ERE
1278 assert not path1
1279 else:
1280 assert deltatar._equal_stat_dicts(path1, path2)
1281 finally:
1282 os.chdir(cwd)
1283
42d39ca7 1284 def test_collate_iterators_diffdirs2(self):
aae127d0 1285 '''
42d39ca7
ERE
1286 Use the collate iterators functionality with two different directories.
1287 It must behave in an expected way.
aae127d0 1288 '''
f698c99c
PG
1289 password, paramversion = self.ENCRYPTION or (None, None)
1290 deltatar = DeltaTar(mode=self.MODE, password=password,
1291 crypto_paramversion=paramversion,
42d39ca7
ERE
1292 logger=self.consoleLogger)
1293
1294 # create first backup
1295 deltatar.create_full_backup(
1296 source_path="source_dir",
1297 backup_path="backup_dir")
1298
1299 assert os.path.exists("backup_dir")
1300
1301 # add some new files and directories
1302 os.makedirs('source_dir/bigdir')
1303 self.hash["source_dir/bigdir"] = ""
1304 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1305 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
aae127d0
ERE
1306 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1307
42d39ca7
ERE
1308 cwd = os.getcwd()
1309 index_filename = deltatar.index_name_func(is_full=True)
1310 index_path = os.path.join(cwd, "backup_dir", index_filename)
1311 index_it = deltatar.iterate_index_path(index_path)
1312
1313 os.chdir('source_dir')
1314 dir_it = deltatar._recursive_walk_dir('.')
1315 path_it = deltatar.jsonize_path_iterator(dir_it)
1316
1317 visited_pairs = []
1318
1319 try:
ea6d3c3e 1320 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
42d39ca7
ERE
1321 visited_pairs.append(
1322 (deltatar.unprefixed(path1['path']) if path1 else None,
1323 path2['path'] if path2 else None)
1324 )
1325 finally:
1326 assert visited_pairs == [
eb6d0069
CH
1327 (u'big', u'big'),
1328 (None, u'bigdir'),
1329 (u'small', u'small'),
1330 (u'test', u'test'),
1331 (None, u'zzzz'),
1332 (None, u'bigdir/a'),
1333 (None, u'bigdir/b'),
1334 (u'test/huge', u'test/huge'),
1335 (u'test/huge2', u'test/huge2'),
1336 (u'test/test2', u'test/test2'),
42d39ca7
ERE
1337 ]
1338 os.chdir(cwd)
1339
1340 def test_create_empty_diff_backup(self):
1341 '''
1342 Creates an empty (no changes) backup diff
1343 '''
f698c99c
PG
1344 password, paramversion = self.ENCRYPTION or (None, None)
1345 deltatar = DeltaTar(mode=self.MODE, password=password,
1346 crypto_paramversion=paramversion,
aae127d0
ERE
1347 logger=self.consoleLogger)
1348
1349 # create first backup
1350 deltatar.create_full_backup(
1351 source_path="source_dir",
1352 backup_path="backup_dir")
1353
1354 prev_index_filename = deltatar.index_name_func(is_full=True)
1355 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1356
1357 deltatar.create_diff_backup("source_dir", "backup_dir2",
1358 prev_index_path)
1359
1360 # check index items
df86af81
ERE
1361 index_path = os.path.join("backup_dir2",
1362 deltatar.index_name_func(is_full=False))
aae127d0 1363 index_it = deltatar.iterate_index_path(index_path)
82de3376 1364 n = 0
aae127d0 1365 for i in index_it:
82de3376 1366 n += 1
aae127d0 1367 assert i[0]['path'].startswith("list://")
9af328e2 1368
ea6d3c3e 1369 assert n == 6
8adbe50d
ERE
1370
1371 # check the tar file
82de3376
ERE
1372 assert os.path.exists("backup_dir2")
1373 shutil.rmtree("source_dir")
1374
df86af81
ERE
1375 tar_filename = deltatar.volume_name_func('backup_dir2',
1376 is_full=False, volume_number=0)
82de3376
ERE
1377 tar_path = os.path.join("backup_dir2", tar_filename)
1378
1379 # no file restored, because the diff was empty
1380 deltatar.restore_backup(target_path="source_dir",
1381 backup_tar_path=tar_path)
1382 assert len(os.listdir("source_dir")) == 0
1383
8adbe50d 1384
42d39ca7
ERE
1385 def test_create_diff_backup1(self):
1386 '''
1387 Creates a diff backup when there are new files
1388 '''
f698c99c
PG
1389 password, paramversion = self.ENCRYPTION or (None, None)
1390 deltatar = DeltaTar(mode=self.MODE, password=password,
1391 crypto_paramversion=paramversion,
42d39ca7
ERE
1392 logger=self.consoleLogger)
1393
1394 # create first backup
1395 deltatar.create_full_backup(
1396 source_path="source_dir",
1397 backup_path="backup_dir")
1398
1399 prev_index_filename = deltatar.index_name_func(is_full=True)
1400 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1401
1402 # add some new files and directories
1403 os.makedirs('source_dir/bigdir')
1404 self.hash["source_dir/bigdir"] = ""
0519f161 1405 os.unlink("source_dir/small")
42d39ca7
ERE
1406 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1407 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1408 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1409
1410 deltatar.create_diff_backup("source_dir", "backup_dir2",
1411 prev_index_path)
1412
1413 # check index items
df86af81 1414 index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
42d39ca7 1415 index_it = deltatar.iterate_index_path(index_path)
df86af81
ERE
1416 l = [i[0]['path'] for i in index_it]
1417
1418 assert l == [
eb6d0069
CH
1419 'list://big',
1420 'snapshot://bigdir',
1421 'delete://small',
1422 'list://test',
1423 'snapshot://zzzz',
1424 'snapshot://bigdir/a',
1425 'snapshot://bigdir/b',
1426 'list://test/huge',
1427 'list://test/huge2',
1428 'list://test/test2',
df86af81 1429 ]
42d39ca7
ERE
1430
1431 # check the tar file
1432 assert os.path.exists("backup_dir2")
1433 shutil.rmtree("source_dir")
1434
0519f161
ERE
1435 # create source_dir with the small file, that will be then deleted by
1436 # the restore_backup
1437 os.mkdir("source_dir")
be60ffd0 1438 open("source_dir/small", 'wb').close()
0519f161 1439
df86af81
ERE
1440 tar_filename = deltatar.volume_name_func('backup_dir2',
1441 is_full=False, volume_number=0)
42d39ca7
ERE
1442 tar_path = os.path.join("backup_dir2", tar_filename)
1443
1444 # restore the backup, this will create only the new files
1445 deltatar.restore_backup(target_path="source_dir",
1446 backup_tar_path=tar_path)
a345b1c9
DGM
1447 # the order doesn't matter
1448 assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
42d39ca7 1449
df99a044
ERE
1450 def test_restore_from_index_diff_backup(self):
1451 '''
1452 Creates a full backup, modifies some files, creates a diff backup,
1453 then restores the diff backup from zero.
1454 '''
df99a044 1455 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1456 raise SkipTest('this test only works for uncompressed '
1457 'or concat compressed modes')
df99a044 1458
f698c99c
PG
1459 password, paramversion = self.ENCRYPTION or (None, None)
1460 deltatar = DeltaTar(mode=self.MODE, password=password,
1461 crypto_paramversion=paramversion,
df99a044
ERE
1462 logger=self.consoleLogger)
1463
1464 # create first backup
1465 deltatar.create_full_backup(
1466 source_path="source_dir",
1467 backup_path="backup_dir")
1468
1469 prev_index_filename = deltatar.index_name_func(is_full=True)
1470 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1471
1472 # add some new files and directories
1473 os.makedirs('source_dir/bigdir')
1474 self.hash["source_dir/bigdir"] = ""
1475 os.unlink("source_dir/small")
1476 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1477 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1478 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1479
1480 deltatar.create_diff_backup("source_dir", "backup_dir2",
1481 prev_index_path)
1482
1483 # apply diff backup in target_dir
df86af81 1484 index_filename = deltatar.index_name_func(is_full=False)
df99a044
ERE
1485 index_path = os.path.join("backup_dir2", index_filename)
1486 deltatar.restore_backup("target_dir",
1487 backup_indexes_paths=[index_path, prev_index_path])
1488
1489 # then compare the two directories source_dir and target_dir and check
1490 # they are the same
cbac9f0b 1491 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
df99a044
ERE
1492
1493 def test_restore_from_index_diff_backup2(self):
1494 '''
1495 Creates a full backup, modifies some files, creates a diff backup,
1496 then restores the diff backup with the full backup as a starting point.
1497 '''
df99a044 1498 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1499 raise SkipTest('this test only works for uncompressed '
1500 'or concat compressed modes')
df99a044 1501
f698c99c
PG
1502 password, paramversion = self.ENCRYPTION or (None, None)
1503 deltatar = DeltaTar(mode=self.MODE, password=password,
1504 crypto_paramversion=paramversion,
df99a044
ERE
1505 logger=self.consoleLogger)
1506
1507 # create first backup
1508 deltatar.create_full_backup(
1509 source_path="source_dir",
1510 backup_path="backup_dir")
1511
1512 prev_index_filename = deltatar.index_name_func(is_full=True)
1513 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1514
1515 # add some new files and directories
1516 os.makedirs('source_dir/bigdir')
1517 self.hash["source_dir/bigdir"] = ""
1518 os.unlink("source_dir/small")
1519 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1520 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1521 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
d86735e4 1522 shutil.rmtree("source_dir/test")
df99a044
ERE
1523
1524 deltatar.create_diff_backup("source_dir", "backup_dir2",
1525 prev_index_path)
1526
1527 # first restore initial backup in target_dir
df86af81 1528 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
df99a044
ERE
1529 tar_path = os.path.join("backup_dir", tar_filename)
1530 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1531
1532 # then apply diff backup in target_dir
df86af81 1533 index_filename = deltatar.index_name_func(is_full=False)
df99a044
ERE
1534 index_path = os.path.join("backup_dir2", index_filename)
1535 deltatar.restore_backup("target_dir",
1536 backup_indexes_paths=[index_path, prev_index_path])
1537
1538 # then compare the two directories source_dir and target_dir and check
1539 # they are the same
cbac9f0b
ERE
1540 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1541
1542 def test_restore_from_index_diff_backup3(self):
1543 '''
188b845d 1544 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
cbac9f0b
ERE
1545 diff backup, then restores the diff backup with the full backup as a
1546 starting point.
1547 '''
cbac9f0b 1548 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1549 raise SkipTest('this test only works for uncompressed '
1550 'or concat compressed modes')
cbac9f0b 1551
f698c99c
PG
1552 password, paramversion = self.ENCRYPTION or (None, None)
1553 deltatar = DeltaTar(mode=self.MODE, password=password,
1554 crypto_paramversion=paramversion,
cbac9f0b
ERE
1555 logger=self.consoleLogger)
1556
1557 shutil.rmtree("source_dir")
188b845d
TJ
1558 shutil.copytree(self.GIT_DIR, "source_dir")
1559 shutil.copytree(self.GIT_DIR, "source_dir_diff")
cbac9f0b
ERE
1560
1561 # create first backup
1562 deltatar.create_full_backup(
1563 source_path="source_dir",
1564 backup_path="backup_dir")
1565
1566 prev_index_filename = deltatar.index_name_func(is_full=True)
1567 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1568
1569 # alter the source_dir randomly
1570 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1571
1572 for path in source_it:
1573 # if path doesn't exist (might have previously removed) ignore it.
1574 # also ignore it (i.e. do not change it) 70% of the time
1575 if not os.path.exists(path) or random.random() < 0.7:
1576 continue
1577
1578 # remove the file
1579 if os.path.isdir(path):
1580 shutil.rmtree(path)
1581 else:
1582 os.unlink(path)
1583
1215b602 1584 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
cbac9f0b 1585 prev_index_path)
cbac9f0b
ERE
1586
1587 # first restore initial backup in target_dir
df86af81 1588 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
cbac9f0b
ERE
1589 tar_path = os.path.join("backup_dir", tar_filename)
1590 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1591
1592 # and check that target_dir equals to source_dir (which is the same as
188b845d 1593 # self.GIT_DIR initially)
cbac9f0b
ERE
1594 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1595
1596 # then apply diff backup in target_dir
df86af81 1597 index_filename = deltatar.index_name_func(is_full=False)
8825be52
DGM
1598 index_path = os.path.join("backup_dir2", index_filename)
1599 deltatar.restore_backup("target_dir",
1600 backup_indexes_paths=[index_path, prev_index_path])
1601
1602 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1603 # altered self.GIT_DIR directory)
8825be52
DGM
1604 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1605
1606 # then delete target_dir and apply diff backup from zero and check again
1607 shutil.rmtree("target_dir")
1608 deltatar.restore_backup("target_dir",
1609 backup_indexes_paths=[index_path, prev_index_path])
1610
1611 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1612 # altered self.GIT_DIR directory)
8825be52
DGM
1613 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1614
1615 def test_restore_from_index_diff_backup3_multivol(self):
1616 '''
188b845d 1617 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
8825be52
DGM
1618 diff backup, then restores the diff backup with the full backup as a
1619 starting point.
1620 '''
8825be52 1621 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1622 raise SkipTest('this test only works for uncompressed '
1623 'or concat compressed modes')
8825be52 1624
f698c99c
PG
1625 password, paramversion = self.ENCRYPTION or (None, None)
1626 deltatar = DeltaTar(mode=self.MODE, password=password,
1627 crypto_paramversion=paramversion,
8825be52
DGM
1628 logger=self.consoleLogger)
1629
1630 shutil.rmtree("source_dir")
188b845d
TJ
1631 shutil.copytree(self.GIT_DIR, "source_dir")
1632 shutil.copytree(self.GIT_DIR, "source_dir_diff")
8825be52
DGM
1633
1634 # create first backup
1635 deltatar.create_full_backup(
1636 source_path="source_dir",
1637 backup_path="backup_dir",
1638 max_volume_size=1)
1639
1640 prev_index_filename = deltatar.index_name_func(is_full=True)
1641 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1642
1643 # alter the source_dir randomly
1644 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1645
1646 for path in source_it:
1647 # if path doesn't exist (might have previously removed) ignore it.
1648 # also ignore it (i.e. do not change it) 70% of the time
1649 if not os.path.exists(path) or random.random() < 0.7:
1650 continue
1651
1652 # remove the file
1653 if os.path.isdir(path):
1654 shutil.rmtree(path)
1655 else:
1656 os.unlink(path)
1657
1658 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1659 prev_index_path, max_volume_size=1)
1660
1661 # first restore initial backup in target_dir
1662 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1663 tar_path = os.path.join("backup_dir", tar_filename)
1664 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1665
1666 # and check that target_dir equals to source_dir (which is the same as
188b845d 1667 # self.GIT_DIR initially)
8825be52
DGM
1668 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1669
1670 # then apply diff backup in target_dir
df86af81 1671 index_filename = deltatar.index_name_func(is_full=False)
cbac9f0b
ERE
1672 index_path = os.path.join("backup_dir2", index_filename)
1673 deltatar.restore_backup("target_dir",
1674 backup_indexes_paths=[index_path, prev_index_path])
1675
1676 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1677 # altered self.GIT_DIR directory)
cbac9f0b
ERE
1678 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1679
1680 # then delete target_dir and apply diff backup from zero and check again
1681 shutil.rmtree("target_dir")
1682 deltatar.restore_backup("target_dir",
1683 backup_indexes_paths=[index_path, prev_index_path])
1684
1685 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1686 # altered self.GIT_DIR directory)
cbac9f0b
ERE
1687 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1688
1689 def check_equal_dirs(self, path1, path2, deltatar):
1690 '''
1691 compare the two directories source_dir and target_dir and check
1692 # they are the same
1693 '''
eb6d0069 1694 source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
df99a044 1695 source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
eb6d0069 1696 target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
df99a044
ERE
1697 target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1698 while True:
1699 try:
be60ffd0
ERE
1700 sitem = next(source_it)
1701 titem = next(target_it)
df99a044
ERE
1702 except StopIteration:
1703 try:
be60ffd0 1704 titem = next(target_it)
df99a044
ERE
1705 raise Exception("iterators do not stop at the same time")
1706 except StopIteration:
1707 break
a638b8d7 1708 try:
5200d2aa 1709 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
be60ffd0 1710 except Exception as e:
5200d2aa
TJ
1711 print("SITEM: " + str(sitem))
1712 print("TITEM: " + str(titem))
a638b8d7 1713 raise e
df99a044 1714
f5d9144b
PG
1715 def test_create_no_symlinks(self):
1716 '''
1717 Creates a full backup from different varieties of symlinks. The
1718 extracted archive may not contain any symlinks but the file contents
1719 '''
1720
1721 os.system("rm -rf source_dir")
1722 os.makedirs("source_dir/symlinks")
1723 fd = os.open("source_dir/symlinks/valid_linkname",
1724 os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1725 os.write(fd, b"valid link target for symlink tests; please ignore\n")
1726 os.close(fd)
1727 # first one is good, the rest points nowhere
1728 self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1729 self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1730 self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1731 self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
f698c99c
PG
1732 password, paramversion = self.ENCRYPTION or (None, None)
1733 deltatar = DeltaTar(mode=self.MODE, password=password,
1734 crypto_paramversion=paramversion,
f5d9144b
PG
1735 logger=self.consoleLogger)
1736
1737 # create first backup
1738 deltatar.create_full_backup(source_path="source_dir",
1739 backup_path="backup_dir")
1740
1741 assert os.path.exists("backup_dir")
1742 shutil.rmtree("source_dir")
1743 assert not os.path.exists("source_dir")
1744
1745 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1746 tar_path = os.path.join("backup_dir", tar_filename)
1747
1748 deltatar.restore_backup(target_path="source_dir",
1749 backup_tar_path=tar_path)
1750
1751 for _r, _ds, fs in os.walk("source_dir/symlinks"):
1752 # only the valid link plus the linked file may be found in the
1753 # extracted archive
1754 assert len(fs) == 2
1755 for f in fs:
1756 # the link must have been resolved and file contents must match
1757 # the linked file
1758 assert not os.path.islink(f)
1759 with open("source_dir/symlinks/valid_linkname") as a:
1760 with open("source_dir/symlinks/whatever") as b:
1761 assert a.read() == b.read()
1762
83f5fd71
PG
1763 def test_restore_with_symlinks(self):
1764 '''
9b13f5c4
PG
1765 Creates a full backup containing different varieties of symlinks. All
1766 of them must be filtered out.
83f5fd71 1767 '''
f698c99c
PG
1768 password, paramversion = self.ENCRYPTION or (None, None)
1769 deltatar = DeltaTar(mode=self.MODE, password=password,
1770 crypto_paramversion=paramversion,
83f5fd71
PG
1771 logger=self.consoleLogger)
1772
1773 # create first backup
1774 deltatar.create_full_backup(source_path="source_dir",
1775 backup_path="backup_dir")
1776
1777 assert os.path.exists("backup_dir")
1778 shutil.rmtree("source_dir")
1779
1780 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1781 tar_path = os.path.join("backup_dir", tar_filename)
1782
1783 # add symlinks to existing archive
1784
9b13f5c4 1785 def add_symlink (a, name, dst):
83f5fd71
PG
1786 l = tarfile.TarInfo("snapshot://%s" % name)
1787 l.type = tarfile.SYMTYPE
1788 l.linkname = dst
1789 a.addfile(l)
9b13f5c4 1790 return name
83f5fd71 1791
5faea0e1
PG
1792 try:
1793 with tarfile.open(tar_path,mode="a") as a:
1794 checkme = \
1795 [ add_symlink(a, "symlinks/foo", "internal-file")
1796 , add_symlink(a, "symlinks/bar", "/absolute/path")
1797 , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1798 except tarfile.ReadError as e:
1799 if self.MODE == '#' or self.MODE.endswith ("gz"):
1800 checkme = []
1801 else:
1802 raise
1803 except ValueError as e:
1804 if self.MODE.startswith ('#'):
1805 checkme = []
1806 else:
1807 raise
83f5fd71
PG
1808
1809 deltatar.restore_backup(target_path="source_dir",
1810 backup_tar_path=tar_path)
1811
1812 # check what happened to our symlinks
9b13f5c4
PG
1813 for name in checkme:
1814 fullpath = os.path.join("source_dir", name)
1815 assert not os.path.exists(fullpath)
f5d9144b 1816
43ce978b
PG
1817 def test_restore_malicious_symlinks(self):
1818 '''
1819 Creates a full backup containing a symlink and a file of the same name.
1820 This simulates a symlink attack with a link pointing to some external
1821 path that is abused to write outside the extraction prefix.
1822 '''
f698c99c
PG
1823 password, paramversion = self.ENCRYPTION or (None, None)
1824 deltatar = DeltaTar(mode=self.MODE, password=password,
1825 crypto_paramversion=paramversion,
43ce978b
PG
1826 logger=self.consoleLogger)
1827
1828 # create first backup
1829 deltatar.create_full_backup(source_path="source_dir",
1830 backup_path="backup_dir")
1831
1832 assert os.path.exists("backup_dir")
1833 shutil.rmtree("source_dir")
1834
1835 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1836 tar_path = os.path.join("backup_dir", tar_filename)
1837
1838 # add symlinks to existing archive
1839
1840 def add_symlink (a, name, dst):
1841 l = tarfile.TarInfo("snapshot://%s" % name)
1842 l.type = tarfile.SYMTYPE
1843 l.linkname = dst
1844 a.addfile(l)
1845
1846 def add_file (a, name):
1847 f = tarfile.TarInfo("snapshot://%s" % name)
1848 f.type = tarfile.REGTYPE
1849 a.addfile(f)
43ce978b
PG
1850
1851 testpath = "symlinks/pernicious-link"
9b13f5c4
PG
1852 testdst = "/tmp/does/not/exist"
1853
5faea0e1
PG
1854 try:
1855 with tarfile.open(tar_path, mode="a") as a:
1856 add_symlink(a, testpath, testdst)
1857 add_symlink(a, testpath, testdst+"X")
1858 add_symlink(a, testpath, testdst+"XXX")
1859 add_file(a, testpath)
1860 except tarfile.ReadError as e:
1861 if self.MODE == '#' or self.MODE.endswith ("gz"):
1862 pass
1863 else:
1864 raise
1865 except ValueError as e:
1866 if self.MODE.startswith ('#'):
1867 pass # O_APPEND of concat archives not feasible
1868 else:
1869 raise
43ce978b 1870
43ce978b
PG
1871 deltatar.restore_backup(target_path="source_dir",
1872 backup_tar_path=tar_path)
1873
9b13f5c4
PG
1874 # check whether the link was extracted; deltatar seems to only ever
1875 # retrieve the first item it finds for a given path which in the case
1876 # at hand is a symlink to some non-existent path
43ce978b 1877 fullpath = os.path.join("source_dir", testpath)
9b13f5c4 1878 assert not os.path.exists(fullpath)
43ce978b 1879
cc552d6e
PG
1880
1881def fsapi_access_true (self):
1882 """
1883 Chicanery for testing improper use of the *os* module.
1884 """
1885 def yes (*_a, **_ka): return True
1886 self.FSAPI_SAVED.append (("access", getattr (os, "access")))
1887 setattr (os, "access", yes)
1888
1889
da26094a
ERE
1890class DeltaTar2Test(DeltaTarTest):
1891 '''
1892 Same as DeltaTar but with specific ":" mode
1893 '''
1894 MODE = ':'
1895
1896
1897class DeltaTarStreamTest(DeltaTarTest):
1898 '''
1899 Same as DeltaTar but with specific uncompressed stream mode
1900 '''
1901 MODE = '|'
1902
1903
1904class DeltaTarGzipTest(DeltaTarTest):
1905 '''
1906 Same as DeltaTar but with specific gzip mode
1907 '''
1908 MODE = ':gz'
8ea0be50 1909 MODE_COMPRESSES = True
da26094a
ERE
1910
1911
da26094a
ERE
1912class DeltaTarGzipStreamTest(DeltaTarTest):
1913 '''
1914 Same as DeltaTar but with specific gzip stream mode
1915 '''
1916 MODE = '|gz'
8ea0be50 1917 MODE_COMPRESSES = True
da26094a
ERE
1918
1919
bd011242
CH
1920@skip('Bz2 tests are too slow..')
1921class DeltaTarBz2Test(DeltaTarTest):
1922 '''
1923 Same as DeltaTar but with specific bz2 mode
1924 '''
1925 MODE = ':bz2'
8ea0be50 1926 MODE_COMPRESSES = True
ea6d3c3e 1927
bd011242
CH
1928
1929@skip('Bz2 tests are too slow..')
1930class DeltaTarBz2StreamTest(DeltaTarTest):
1931 '''
1932 Same as DeltaTar but with specific bz2 stream mode
1933 '''
1934 MODE = '|bz2'
8ea0be50 1935 MODE_COMPRESSES = True
da26094a
ERE
1936
1937
1938class DeltaTarGzipConcatTest(DeltaTarTest):
1939 '''
1940 Same as DeltaTar but with specific gzip concat stream mode
1941 '''
1942 MODE = '#gz'
8ea0be50 1943 MODE_COMPRESSES = True
da26094a
ERE
1944
1945
da26094a
ERE
1946class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
1947 '''
1948 Same as DeltaTar but with specific gzip aes128 concat stream mode
1949 '''
f698c99c
PG
1950 MODE = '#gz'
1951 ENCRYPTION = ('some magic key', 1)
8ea0be50 1952 MODE_COMPRESSES = True
da26094a
ERE
1953
1954
ac5e4184
ERE
1955class DeltaTarAes128ConcatTest(DeltaTarTest):
1956 '''
1957 Same as DeltaTar but with specific aes128 concat stream mode
1958 '''
d1c38f40 1959 MODE = '#'
f698c99c 1960 ENCRYPTION = ('some magic key', 1)
ac5e4184
ERE
1961
1962
cc552d6e
PG
1963class DeltaTarFilesystemHandlingTest(DeltaTarGzipTest):
1964 '''
1965 Mess with filesystem APIs.
1966 '''
1967 FSTEST = fsapi_access_true
1968