Change unit tests to check for expected exception
[python-delta-tar] / testing / test_deltatar.py
CommitLineData
0708a374
ERE
1# Copyright (C) 2013 Intra2net AG
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU Lesser General Public License as published
5# by the Free Software Foundation; either version 3 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU Lesser General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program. If not, see
15# <http://www.gnu.org/licenses/lgpl-3.0.html>
16
17# Author: Eduardo Robles Elvira <edulix@wadobo.com>
18
f5d9144b 19import errno
0708a374 20import os
0d5c1970 21import re
cbac9f0b 22import random
e5c6ca04 23import shutil
0708a374 24import logging
b8fc2f5d
ERE
25import binascii
26import json
27from datetime import datetime
28from functools import partial
bd011242 29from unittest import skip, SkipTest
0708a374 30
83f5fd71 31import deltatar.tarfile as tarfile
f698c99c 32from deltatar.tarfile import TarFile
974408b5 33from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
f698c99c
PG
34from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35import deltatar.crypto as crypto
0708a374 36
0708a374
ERE
37from . import BaseTest
38from . import new_volume_handler
39
0708a374
ERE
40class DeltaTarTest(BaseTest):
41 """
42 Test backups
43 """
da26094a 44 MODE = ''
8ea0be50 45 MODE_COMPRESSES = False
da26094a 46
f698c99c 47 ENCRYPTION = None # (password : str, paramversion : int) option
da26094a 48
188b845d
TJ
49 GIT_DIR = '.git'
50
0708a374
ERE
51 def setUp(self):
52 '''
53 Create base test data
54 '''
a4e8b8af 55 self.pwd = os.getcwd()
cbac9f0b 56 os.system('rm -rf target_dir source_dir* backup_dir* huge')
0708a374
ERE
57 os.makedirs('source_dir/test/test2')
58 self.hash = dict()
e5c6ca04 59 self.hash["source_dir/test/test2"] = ''
0708a374
ERE
60 self.hash["source_dir/big"] = self.create_file("source_dir/big", 50000)
61 self.hash["source_dir/small"] = self.create_file("source_dir/small", 100)
62 self.hash["source_dir/test/huge"] = self.create_file("source_dir/test/huge", 700000)
d5361dac 63 self.hash["source_dir/test/huge2"] = self.create_file("source_dir/test/huge2", 800000)
0708a374
ERE
64
65 self.consoleLogger = logging.StreamHandler()
66 self.consoleLogger.setLevel(logging.DEBUG)
67
188b845d
TJ
68 if not os.path.isdir(self.GIT_DIR):
69 # Not running inside git tree, take our
70 # own testing directory as source.
71 self.GIT_DIR = 'testing'
72
73 if not os.path.isdir(self.GIT_DIR):
74 raise Exception('No input directory found: ' + self.GIT_DIR)
75
0708a374
ERE
76 def tearDown(self):
77 '''
cb7a3911 78 Remove temporal files created by unit tests and reset globals.
0708a374 79 '''
a4e8b8af 80 os.chdir(self.pwd)
cbac9f0b 81 os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
cb7a3911
PG
82 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
83 ("I am fully aware that this will void my warranty.")
0708a374 84
b8fc2f5d 85 def test_restore_simple_full_backup(self):
0708a374
ERE
86 '''
87 Creates a full backup without any filtering and restores it.
88 '''
f698c99c
PG
89 password, paramversion = self.ENCRYPTION or (None, None)
90 deltatar = DeltaTar(mode=self.MODE, password=password,
91 crypto_paramversion=paramversion,
da26094a 92 logger=self.consoleLogger)
0708a374
ERE
93
94 # create first backup
95 deltatar.create_full_backup(
96 source_path="source_dir",
97 backup_path="backup_dir")
98
e5c6ca04
ERE
99 assert os.path.exists("backup_dir")
100 shutil.rmtree("source_dir")
101
102 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
103 tar_path = os.path.join("backup_dir", tar_filename)
104
e5c6ca04
ERE
105 deltatar.restore_backup(target_path="source_dir",
106 backup_tar_path=tar_path)
107
be60ffd0 108 for key, value in self.hash.items():
e5c6ca04
ERE
109 assert os.path.exists(key)
110 if value:
e82f14f5 111 assert value == self.md5sum(key)
da26094a 112
cb7a3911
PG
113
114 def test_create_backup_max_file_length (self):
115 """
116 Creates a full backup including one file that exceeds the (purposely
117 lowered) upper bound on GCM encrypted objects. This will yield multiple
118 encrypted objects for one plaintext file.
119
120 Success is verified by splitting the archive at object boundaries and
121 counting the parts.
122 """
123 if self.MODE_COMPRESSES is True:
124 raise SkipTest ("GCM file length test not meaningful with compression.")
125 if self.ENCRYPTION is None:
126 raise SkipTest ("GCM file length applies only to encrypted backups.")
127
128 new_max = 20000 # cannot be less than tar block size
129 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
130 ("I am fully aware that this will void my warranty.",
131 new_max)
132
133 password, paramversion = self.ENCRYPTION
134 deltatar = DeltaTar (mode=self.MODE, password=password,
135 crypto_paramversion=paramversion,
136 logger=self.consoleLogger)
137
138 self.hash = dict ()
139 os.makedirs ("source_dir2")
140 for f, s in [("empty" , 0) # 1 tar objects
141 ,("slightly_larger", new_max + 1) # 2
142 ,("twice" , 2 * new_max) # 3
143 ]:
144 f = "source_dir2/%s" % f
145 self.hash [f] = self.create_file (f, s)
146
147 deltatar.create_full_backup \
148 (source_path="source_dir2", backup_path="backup_dir")
149
150 assert os.path.exists ("backup_dir")
151 shutil.rmtree ("source_dir2")
152
153 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
154 backup_path = os.path.join("backup_dir", backup_filename)
155
156 # split the resulting archive into its constituents without
157 # decrypting
158 ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
159 "-o backup_dir/split <\'%s\'" % backup_path)
160
161 assert os.path.exists ("backup_dir/split")
162
163 dents = os.listdir ("backup_dir/split")
164 assert len (dents) == 6
165
166
167 def test_restore_backup_max_file_length (self):
168 """
169 Creates a full backup including one file that exceeds the (purposely
170 lowered) upper bound on GCM encrypted objects. This will yield two
171 encrypted objects for one plaintext file.
172
173 Success is verified by splitting the archive at object boundaries and
174 counting the parts.
175 """
176 if self.MODE_COMPRESSES is True:
177 raise SkipTest ("GCM file length test not meaningful with compression.")
178 if self.ENCRYPTION is None:
179 raise SkipTest ("GCM file length applies only to encrypted backups.")
180
181 new_max = 20000 # cannot be less than tar block size
182 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
183 ("I am fully aware that this will void my warranty.",
184 new_max)
185
186 password, paramversion = self.ENCRYPTION
187 deltatar = DeltaTar (mode=self.MODE, password=password,
188 crypto_paramversion=paramversion,
189 logger=self.consoleLogger)
190
191 self.hash = dict ()
192 os.makedirs ("source_dir2")
193 for f, s in [("empty" , 0) # 1 tar objects
194 ,("slightly_larger", new_max + 1) # 2
195 ,("twice" , 2 * new_max) # 3
196 ]:
197 f = "source_dir2/%s" % f
198 self.hash [f] = self.create_file (f, s)
199
200 deltatar.create_full_backup \
201 (source_path="source_dir2", backup_path="backup_dir")
202
203 assert os.path.exists ("backup_dir")
204 shutil.rmtree ("source_dir2")
205
206 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
207 backup_path = os.path.join("backup_dir", backup_filename)
208
209 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
210 tar_path = os.path.join("backup_dir", tar_filename)
211
212 deltatar.restore_backup(target_path="source_dir2",
213 backup_tar_path=tar_path)
214
215 for key, value in self.hash.items():
216 assert os.path.exists(key)
217 if value:
218 assert value == self.md5sum(key)
219
220
fac2cfe1
PG
221 def test_create_backup_index_max_file_length (self):
222 """
223 Creates a full backup including one file that exceeds the (purposely
224 lowered) upper bound on GCM encrypted objects. This will yield two
225 encrypted objects for one plaintext file.
226
227 Success is verified by splitting the archive at object boundaries and
228 counting the parts.
229 """
230 if self.MODE_COMPRESSES is True:
231 raise SkipTest ("GCM file length test not meaningful with compression.")
232 if self.ENCRYPTION is None:
233 raise SkipTest ("GCM file length applies only to encrypted backups.")
234
235 new_max = 5000
236 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
237 ("I am fully aware that this will void my warranty.",
238 new_max)
239
240 password, paramversion = self.ENCRYPTION
241 deltatar = DeltaTar (mode=self.MODE, password=password,
242 crypto_paramversion=paramversion,
243 logger=self.consoleLogger)
244
245 self.hash = dict ()
246 os.makedirs ("source_dir2")
247 for i in range (42):
248 f = "source_dir2/dummy_%rd" % i
249 self.hash [f] = self.create_file (f, i)
250
1c2f7f07 251 with self.assertRaises (crypto.InvalidFileCounter):
fac2cfe1
PG
252 deltatar.create_full_backup \
253 (source_path="source_dir2", backup_path="backup_dir")
fac2cfe1
PG
254 shutil.rmtree ("source_dir2")
255
256
6c678f3a
ERE
257 def test_check_index_checksum(self):
258 '''
259 Creates a full backup and checks the index' checksum of files
260 '''
f698c99c
PG
261 password, paramversion = self.ENCRYPTION or (None, None)
262 deltatar = DeltaTar(mode=self.MODE, password=password,
263 crypto_paramversion=paramversion,
6c678f3a
ERE
264 logger=self.consoleLogger)
265
266 # create first backup
267 deltatar.create_full_backup(
268 source_path="source_dir",
269 backup_path="backup_dir")
270
271
272 index_filename = deltatar.index_name_func(True)
273 index_path = os.path.join("backup_dir", index_filename)
274
be60ffd0 275 f = open(index_path, 'rb')
6c678f3a
ERE
276 crc = None
277 checked = False
278 began_list = False
be60ffd0
ERE
279 while True:
280 l = f.readline()
281 if l == b'':
282 break
283 if b'BEGIN-FILE-LIST' in l:
c2ffe2ec 284 crc = binascii.crc32(l) & 0xFFFFffff
6c678f3a 285 began_list = True
be60ffd0 286 elif b'END-FILE-LIST' in l:
6c678f3a
ERE
287 crc = binascii.crc32(l, crc) & 0xffffffff
288
289 # next line contains the crc
be60ffd0 290 data = json.loads(f.readline().decode("UTF-8"))
6c678f3a
ERE
291 assert data['type'] == 'file-list-checksum'
292 assert data['checksum'] == crc
293 checked = True
294 break
295 elif began_list:
296 crc = binascii.crc32(l, crc) & 0xffffffff
c7609167 297 f.close()
6c678f3a 298
b8fc2f5d
ERE
299
300 def test_restore_multivol(self):
d5361dac 301 '''
b8fc2f5d
ERE
302 Creates a full backup without any filtering with multiple volumes and
303 restore it.
d5361dac 304 '''
ba7760a7
CH
305 if ':gz' in self.MODE:
306 raise SkipTest('compression information is lost when creating '
307 'multiple volumes with no Stream')
308
f698c99c
PG
309 password, paramversion = self.ENCRYPTION or (None, None)
310 deltatar = DeltaTar(mode=self.MODE, password=password,
311 crypto_paramversion=paramversion,
d5361dac
ERE
312 logger=self.consoleLogger)
313
b8fc2f5d
ERE
314 self.hash = dict()
315 os.makedirs('source_dir2')
316 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
317 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
318
d5361dac
ERE
319 # create first backup
320 deltatar.create_full_backup(
b8fc2f5d 321 source_path="source_dir2",
d5361dac
ERE
322 backup_path="backup_dir",
323 max_volume_size=1)
324
325 assert os.path.exists("backup_dir")
326 assert os.path.exists(os.path.join("backup_dir",
327 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
328 if self.MODE_COMPRESSES:
329 n_vols = 1
330 else:
331 n_vols = 2
332 for i_vol in range(n_vols):
333 assert os.path.exists(os.path.join("backup_dir",
334 deltatar.volume_name_func("backup_dir", True, i_vol)))
335 assert not os.path.exists(os.path.join("backup_dir",
336 deltatar.volume_name_func("backup_dir", True, n_vols)))
d5361dac 337
b8fc2f5d 338 shutil.rmtree("source_dir2")
d5361dac
ERE
339
340 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
341 tar_path = os.path.join("backup_dir", tar_filename)
342
343 # this should automatically restore all volumes
b8fc2f5d 344 deltatar.restore_backup(target_path="source_dir2",
d5361dac
ERE
345 backup_tar_path=tar_path)
346
be60ffd0 347 for key, value in self.hash.items():
d5361dac
ERE
348 assert os.path.exists(key)
349 if value:
350 assert value == self.md5sum(key)
351
14e2e92d
DGM
352 def test_restore_multivol_split(self):
353 '''
354 Creates a full backup without any filtering with multiple volumes
355 with big files bigger than the max volume size and
356 restore it.
357 '''
f61e1822 358 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
359 raise SkipTest('this test only works for uncompressed '
360 'or concat compressed modes')
f61e1822 361
f698c99c
PG
362 password, paramversion = self.ENCRYPTION or (None, None)
363 deltatar = DeltaTar(mode=self.MODE, password=password,
364 crypto_paramversion=paramversion,
14e2e92d
DGM
365 logger=self.consoleLogger)
366
14e2e92d
DGM
367 self.hash = dict()
368 os.makedirs('source_dir2')
82f75df4 369 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 3*1024*1024)
14e2e92d
DGM
370 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 4*1024*1024)
371 self.hash["source_dir2/huge2"] = self.create_file("source_dir2/huge2", 4*1024*1024)
372
373 # create first backup
374 deltatar.create_full_backup(
375 source_path="source_dir2",
376 backup_path="backup_dir",
377 max_volume_size=2)
378
379 assert os.path.exists("backup_dir")
380 assert os.path.exists(os.path.join("backup_dir",
381 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
382 if self.MODE_COMPRESSES:
383 n_vols = 1
384 else:
385 n_vols = 6
386 for i_vol in range(n_vols):
387 assert os.path.exists(os.path.join("backup_dir",
388 deltatar.volume_name_func("backup_dir", True, i_vol)))
389 assert not os.path.exists(os.path.join("backup_dir",
390 deltatar.volume_name_func("backup_dir", True, n_vols)))
14e2e92d
DGM
391
392 shutil.rmtree("source_dir2")
393
394 index_filename = deltatar.index_name_func(True)
395 index_path = os.path.join("backup_dir", index_filename)
396
397 deltatar.restore_backup(target_path="source_dir2",
398 backup_indexes_paths=[index_path])
399
400 for key, value in self.hash.items():
401 assert os.path.exists(key)
402 if value:
403 assert value == self.md5sum(key)
404
405
9eae9a1f
ERE
406 def test_full_backup_index_extra_data(self):
407 '''
408 Tests that the index file for a full backup can store extra_data and
409 that this data can be retrieved.
410 '''
f698c99c
PG
411 password, paramversion = self.ENCRYPTION or (None, None)
412 deltatar = DeltaTar(mode=self.MODE, password=password,
413 crypto_paramversion=paramversion,
9eae9a1f
ERE
414 logger=self.consoleLogger)
415
416 extra_data = dict(
417 hola="caracola",
418 otra_cosa=[1, "lista"],
419 y_otra=dict(bola=1.1)
420 )
421
422 deltatar.create_full_backup(
423 source_path="source_dir",
424 backup_path="backup_dir",
425 extra_data=extra_data)
426
427 index_filename = deltatar.index_name_func(is_full=True)
428 index_path = os.path.join("backup_dir", index_filename)
429
430 # iterate_index_path retrieves extra_data, and thus we can then compare
431 index_it = deltatar.iterate_index_path(index_path)
432 self.assertEqual(index_it.extra_data, extra_data)
433
434
435 def test_diff_backup_index_extra_data(self):
436 '''
437 Tests that the index file for a diff backup can store extra_data and
438 that this data can be retrieved.
439 '''
f698c99c
PG
440 password, paramversion = self.ENCRYPTION or (None, None)
441 deltatar = DeltaTar(mode=self.MODE, password=password,
442 crypto_paramversion=paramversion,
9eae9a1f
ERE
443 logger=self.consoleLogger)
444
445 extra_data = dict(
446 hola="caracola",
447 otra_cosa=[1, "lista"],
448 y_otra=dict(bola=1.1)
449 )
450 # do first backup
451 deltatar.create_full_backup(
452 source_path="source_dir",
453 backup_path="backup_dir")
454
455
456 prev_index_filename = deltatar.index_name_func(is_full=True)
457 prev_index_path = os.path.join("backup_dir", prev_index_filename)
458
459 # create empty diff backup
460 deltatar.create_diff_backup("source_dir", "backup_dir2",
461 prev_index_path, extra_data=extra_data)
462
463 index_filename = deltatar.index_name_func(is_full=False)
464 index_path = os.path.join("backup_dir2", index_filename)
465
466 # iterate_index_path retrieves extra_data, and thus we can then compare
467 index_it = deltatar.iterate_index_path(index_path)
468 self.assertEqual(index_it.extra_data, extra_data)
469
8825be52
DGM
470 def test_restore_multivol2(self):
471 '''
472 Creates a full backup without any filtering with multiple volumes and
473 restore it.
474 '''
f698c99c
PG
475 password, paramversion = self.ENCRYPTION or (None, None)
476 deltatar = DeltaTar(mode=self.MODE, password=password,
477 crypto_paramversion=paramversion,
8825be52
DGM
478 logger=self.consoleLogger)
479
188b845d 480 shutil.copytree(self.GIT_DIR, "source_dir2")
8825be52
DGM
481
482 # create first backup
483 deltatar.create_full_backup(
484 source_path="source_dir2",
485 backup_path="backup_dir",
486 max_volume_size=1)
487
488 assert os.path.exists("backup_dir")
489 assert os.path.exists(os.path.join("backup_dir",
490 deltatar.volume_name_func("backup_dir", True, 0)))
8825be52
DGM
491
492 shutil.rmtree("source_dir2")
493
494 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
495 tar_path = os.path.join("backup_dir", tar_filename)
496
497 # this should automatically restore all volumes
498 deltatar.restore_backup(target_path="source_dir2",
499 backup_tar_path=tar_path)
500
188b845d 501 self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
8825be52 502
b8fc2f5d
ERE
503 def test_restore_multivol_manual_from_index(self):
504 '''
505 Creates a full backup without any filtering with multiple volumes and
506 restore it.
507 '''
508 # this test only works for uncompressed or concat compressed modes
509 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
510 raise SkipTest('this test only works for uncompressed '
511 'or concat compressed modes')
b8fc2f5d 512
f698c99c
PG
513 password, paramversion = self.ENCRYPTION or (None, None)
514 deltatar = DeltaTar(mode=self.MODE, password=password,
515 crypto_paramversion=paramversion,
b8fc2f5d
ERE
516 logger=self.consoleLogger)
517
b8fc2f5d
ERE
518 self.hash = dict()
519 os.makedirs('source_dir2')
520 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
521 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
522
523 # create first backup
524 deltatar.create_full_backup(
525 source_path="source_dir2",
526 backup_path="backup_dir",
527 max_volume_size=1)
528
529 assert os.path.exists("backup_dir")
530 assert os.path.exists(os.path.join("backup_dir",
531 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
532 if self.MODE_COMPRESSES:
533 n_vols = 1
534 else:
535 n_vols = 2
536 for i_vol in range(n_vols):
537 assert os.path.exists(os.path.join("backup_dir",
538 deltatar.volume_name_func("backup_dir", True, i_vol)))
539 assert not os.path.exists(os.path.join("backup_dir",
540 deltatar.volume_name_func("backup_dir", True, n_vols)))
b8fc2f5d
ERE
541
542 shutil.rmtree("source_dir2")
543
544 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
545 tar_path = os.path.join("backup_dir", tar_filename)
546
547 index_filename = deltatar.index_name_func(True)
548 index_path = os.path.join("backup_dir", index_filename)
549
550 # this should automatically restore the huge file
9eccb1c2 551 f = deltatar.open_auxiliary_file(index_path, 'r')
10798176 552 offset = None
2967b3e1
ERE
553 while True:
554 l = f.readline()
555 if not len(l):
556 break
be60ffd0 557 data = json.loads(l.decode('UTF-8'))
8adbe50d 558 if data.get('type', '') == 'file' and\
eb6d0069 559 deltatar.unprefixed(data['path']) == "huge":
b8fc2f5d
ERE
560 offset = data['offset']
561 break
562
10798176
CH
563 assert offset is not None
564
be60ffd0 565 fo = open(tar_path, 'rb')
b8fc2f5d
ERE
566 fo.seek(offset)
567 def new_volume_handler(mode, tarobj, base_name, volume_number):
f698c99c
PG
568 suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
569 if self.ENCRYPTION is not None:
570 # deltatar module is shadowed here
571 suf += "." + deltatar_PDTCRYPT_EXTENSION
b8fc2f5d 572 tarobj.open_volume(datetime.now().strftime(
f698c99c 573 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
b8fc2f5d
ERE
574 new_volume_handler = partial(new_volume_handler, self.MODE)
575
f698c99c
PG
576 crypto_ctx = None
577 if self.ENCRYPTION is not None:
578 crypto_ctx = crypto.Decrypt (password)
579
b8fc2f5d 580 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
b8fc2f5d 581 new_volume_handler=new_volume_handler,
f698c99c
PG
582 encryption=crypto_ctx)
583
8adbe50d
ERE
584 member = tarobj.next()
585 member.path = deltatar.unprefixed(member.path)
586 member.name = deltatar.unprefixed(member.name)
587 tarobj.extract(member)
b8fc2f5d 588 tarobj.close()
c7609167 589 fo.close()
b8fc2f5d
ERE
590 assert self.hash['source_dir2/huge'] == self.md5sum('huge')
591
592 os.unlink("huge")
6c678f3a 593
f698c99c 594
9e092947
PG
595 def test_restore_manual_from_index_twice (self):
596 """
597 Creates a full backup and restore the same file twice. This *must* fail
598 when encryption is active.
599
600 Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
601 backwards within the same file. This prevents the encryption layer from
602 exploding due to a reused IV in an overall valid archive.
603
604 This test anticipates possible future mistakes since it’s entirely
605 feasible to implement backward seeks for *_Stream* with concat mode.
606 """
607 # this test only works for uncompressed or concat compressed modes
608 if self.MODE.startswith("|") or self.MODE_COMPRESSES:
609 raise SkipTest("this test only works for uncompressed "
610 "or concat compressed modes")
611
612 password, paramversion = self.ENCRYPTION or (None, None)
613 deltatar = DeltaTar(mode=self.MODE, password=password,
614 crypto_paramversion=paramversion,
615 logger=self.consoleLogger)
616
617 self.hash = dict()
618 os.makedirs("source_dir2")
619 self.hash["source_dir2/samefile"] = \
620 self.create_file("source_dir2/samefile", 1 * 1024)
621
622 # create first backup
623 deltatar.create_full_backup(
624 source_path="source_dir2",
625 backup_path="backup_dir")
626
627 assert os.path.exists("backup_dir")
628 assert os.path.exists(os.path.join("backup_dir",
629 deltatar.volume_name_func("backup_dir", True, 0)))
630
631 shutil.rmtree("source_dir2")
632
633 tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
634 tar_path = os.path.join("backup_dir", tar_filename)
635
636 index_filename = deltatar.index_name_func(True)
637 index_path = os.path.join("backup_dir", index_filename)
638
639 f = deltatar.open_auxiliary_file(index_path, "r")
640 offset = None
641 while True:
642 l = f.readline()
643 if not len(l):
644 break
645 data = json.loads(l.decode("UTF-8"))
646 if data.get("type", "") == "file" and\
647 deltatar.unprefixed(data["path"]) == "samefile":
648 offset = data["offset"]
649 break
650
651 assert offset is not None
652
653 fo = open(tar_path, "rb")
654 fo.seek(offset)
655
656 crypto_ctx = None
657 if self.ENCRYPTION is not None:
658 crypto_ctx = crypto.Decrypt (password)
659
660 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
661 encryption=crypto_ctx)
662 member = tarobj.next()
663 member.path = deltatar.unprefixed(member.path)
664 member.name = deltatar.unprefixed(member.name)
665
666 # extract once …
667 tarobj.extract(member)
668 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
669
670 # … and twice
671 try:
672 tarobj.extract(member)
673 except tarfile.StreamError:
674 if crypto_ctx is not None:
675 pass # good: seeking backwards not allowed
676 else:
677 raise
678 tarobj.close()
679 fo.close()
680 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
681
682 os.unlink("samefile")
683
684
11684b1d
ERE
685 def test_restore_from_index(self):
686 '''
55b8686d 687 Restores a full backup using an index file.
11684b1d 688 '''
11684b1d 689 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
690 raise SkipTest('this test only works for uncompressed '
691 'or concat compressed modes')
11684b1d 692
f698c99c
PG
693 password, paramversion = self.ENCRYPTION or (None, None)
694 deltatar = DeltaTar(mode=self.MODE, password=password,
695 crypto_paramversion=paramversion,
11684b1d
ERE
696 logger=self.consoleLogger)
697
698 # create first backup
699 deltatar.create_full_backup(
700 source_path="source_dir",
55b8686d 701 backup_path="backup_dir")
11684b1d
ERE
702
703 shutil.rmtree("source_dir")
704
705 # this should automatically restore all volumes
706 index_filename = deltatar.index_name_func(True)
707 index_path = os.path.join("backup_dir", index_filename)
708
709 deltatar.restore_backup(target_path="source_dir",
710 backup_indexes_paths=[index_path])
711
be60ffd0 712 for key, value in self.hash.items():
11684b1d
ERE
713 assert os.path.exists(key)
714 if value:
715 assert value == self.md5sum(key)
6c678f3a 716
b8fc2f5d
ERE
717 def test_restore_multivol_from_index(self):
718 '''
719 Restores a full multivolume backup using an index file.
720 '''
b8fc2f5d 721 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
722 raise SkipTest('this test only works for uncompressed '
723 'or concat compressed modes')
b8fc2f5d 724
f698c99c
PG
725 password, paramversion = self.ENCRYPTION or (None, None)
726 deltatar = DeltaTar(mode=self.MODE, password=password,
727 crypto_paramversion=paramversion,
b8fc2f5d
ERE
728 logger=self.consoleLogger)
729
730 # create first backup
731 deltatar.create_full_backup(
732 source_path="source_dir",
733 backup_path="backup_dir",
8390c925 734 max_volume_size=2)
b8fc2f5d
ERE
735
736 shutil.rmtree("source_dir")
737
738 # this should automatically restore all volumes
739 index_filename = deltatar.index_name_func(True)
740 index_path = os.path.join("backup_dir", index_filename)
741
742 deltatar.restore_backup(target_path="source_dir",
743 backup_indexes_paths=[index_path])
744
be60ffd0 745 for key, value in self.hash.items():
b8fc2f5d
ERE
746 assert os.path.exists(key)
747 if value:
748 assert value == self.md5sum(key)
da26094a 749
c1af2184
ERE
750 def test_create_basic_filtering(self):
751 '''
752 Tests create backup basic filtering.
753 '''
f698c99c
PG
754 password, paramversion = self.ENCRYPTION or (None, None)
755 deltatar = DeltaTar(mode=self.MODE, password=password,
756 crypto_paramversion=paramversion,
c1af2184 757 logger=self.consoleLogger,
eb6d0069
CH
758 included_files=["test", "small"],
759 excluded_files=["test/huge"])
c1af2184
ERE
760
761 # create first backup
762 deltatar.create_full_backup(
763 source_path="source_dir",
764 backup_path="backup_dir")
765
766 assert os.path.exists("backup_dir")
767 shutil.rmtree("source_dir")
768
769 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
770 tar_path = os.path.join("backup_dir", tar_filename)
771
772 deltatar.restore_backup(target_path="source_dir",
773 backup_tar_path=tar_path)
774
eb6d0069
CH
775 assert os.path.exists("source_dir/small")
776 assert os.path.exists("source_dir/test")
777 assert os.path.exists("source_dir/test/huge2")
778 assert os.path.exists("source_dir/test/test2")
c1af2184 779
eb6d0069
CH
780 assert not os.path.exists("source_dir/test/huge")
781 assert not os.path.exists("source_dir/big")
c1af2184 782
862b3726
ERE
783 def test_create_filter_func(self):
784 '''
785 Tests create backup basic filtering.
786 '''
787 visited_paths = []
788 def filter_func(visited_paths, path):
789 if path not in visited_paths:
790 visited_paths.append(path)
791 return True
792
793 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
794
795 password, paramversion = self.ENCRYPTION or (None, None)
796 deltatar = DeltaTar(mode=self.MODE, password=password,
797 crypto_paramversion=paramversion,
862b3726 798 logger=self.consoleLogger,
eb6d0069
CH
799 included_files=["test", "small"],
800 excluded_files=["test/huge"],
862b3726
ERE
801 filter_func=filter_func)
802
803 # create first backup
804 deltatar.create_full_backup(
805 source_path="source_dir",
806 backup_path="backup_dir")
807
808 assert os.path.exists("backup_dir")
809 shutil.rmtree("source_dir")
810
811 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
812 tar_path = os.path.join("backup_dir", tar_filename)
813
814 deltatar.restore_backup(target_path="source_dir",
815 backup_tar_path=tar_path)
9af328e2 816 assert set(visited_paths) == set([
eb6d0069
CH
817 'small',
818 'test',
819 'test/huge2',
820 'test/test2'
9af328e2 821 ])
862b3726
ERE
822
823 def test_create_filter_out_func(self):
824 '''
825 Tests create backup basic filtering.
826 '''
827 visited_paths = []
828 def filter_func(visited_paths, path):
829 '''
830 Filter out everything
831 '''
832 if path not in visited_paths:
833 visited_paths.append(path)
834 return False
835
836 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
837
838 password, paramversion = self.ENCRYPTION or (None, None)
839 deltatar = DeltaTar(mode=self.MODE, password=password,
840 crypto_paramversion=paramversion,
862b3726 841 logger=self.consoleLogger,
eb6d0069
CH
842 included_files=["test", "small"],
843 excluded_files=["test/huge"],
862b3726
ERE
844 filter_func=filter_func)
845
846 # create first backup
847 deltatar.create_full_backup(
848 source_path="source_dir",
849 backup_path="backup_dir")
850
851 assert os.path.exists("backup_dir")
852 shutil.rmtree("source_dir")
853
854 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
855 tar_path = os.path.join("backup_dir", tar_filename)
856
857 deltatar.restore_backup(target_path="source_dir",
858 backup_tar_path=tar_path)
9af328e2 859 assert set(visited_paths) == set([
eb6d0069
CH
860 'small',
861 'test'
9af328e2 862 ])
862b3726
ERE
863
864 # check that effectively no file was backed up
eb6d0069
CH
865 assert not os.path.exists("source_dir/small")
866 assert not os.path.exists("source_dir/big")
867 assert not os.path.exists("source_dir/test")
862b3726 868
9af328e2
ERE
869 def test_restore_index_basic_filtering(self):
870 '''
871 Creates a backup, and then filter when doing the index based restore.
872 '''
f391d8e9 873 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
874 raise SkipTest('this test only works for uncompressed '
875 'or concat compressed modes')
f391d8e9 876
f698c99c
PG
877 password, paramversion = self.ENCRYPTION or (None, None)
878 deltatar = DeltaTar(mode=self.MODE, password=password,
879 crypto_paramversion=paramversion,
9af328e2
ERE
880 logger=self.consoleLogger)
881
882 # create first backup
883 deltatar.create_full_backup(
884 source_path="source_dir",
885 backup_path="backup_dir")
886
887 assert os.path.exists("backup_dir")
888 shutil.rmtree("source_dir")
889
890 index_filename = deltatar.index_name_func(True)
891 index_path = os.path.join("backup_dir", index_filename)
892
eb6d0069
CH
893 deltatar.included_files = ["test", "small"]
894 deltatar.excluded_files = ["test/huge"]
9af328e2
ERE
895 deltatar.restore_backup(target_path="source_dir",
896 backup_indexes_paths=[index_path])
897
eb6d0069
CH
898 assert os.path.exists("source_dir/small")
899 assert os.path.exists("source_dir/test")
900 assert os.path.exists("source_dir/test/huge2")
901 assert os.path.exists("source_dir/test/test2")
9af328e2 902
eb6d0069
CH
903 assert not os.path.exists("source_dir/test/huge")
904 assert not os.path.exists("source_dir/big")
9af328e2
ERE
905
906 def test_restore_index_filter_func(self):
907 '''
908 Creates a backup, and then filter when doing the index based restore,
909 using the filter function.
910 '''
f391d8e9 911 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
912 raise SkipTest('this test only works for uncompressed '
913 'or concat compressed modes')
f391d8e9 914
9af328e2
ERE
915 visited_paths = []
916 def filter_func(visited_paths, path):
917 if path not in visited_paths:
918 visited_paths.append(path)
919 return True
920
921 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
922
923 password, paramversion = self.ENCRYPTION or (None, None)
924 deltatar = DeltaTar(mode=self.MODE, password=password,
925 crypto_paramversion=paramversion,
9af328e2
ERE
926 logger=self.consoleLogger)
927
928 # create first backup
929 deltatar.create_full_backup(
930 source_path="source_dir",
931 backup_path="backup_dir")
932
933 assert os.path.exists("backup_dir")
934 shutil.rmtree("source_dir")
935
936 index_filename = deltatar.index_name_func(True)
937 index_path = os.path.join("backup_dir", index_filename)
938
eb6d0069
CH
939 deltatar.included_files = ["test", "small"]
940 deltatar.excluded_files = ["test/huge"]
9af328e2
ERE
941 deltatar.filter_func = filter_func
942 deltatar.restore_backup(target_path="source_dir",
943 backup_indexes_paths=[index_path])
944
945 assert set(visited_paths) == set([
eb6d0069
CH
946 'small',
947 'test',
948 'test/huge2',
949 'test/test2'
9af328e2
ERE
950 ])
951
e5f5681b
ERE
952 def test_restore_tar_basic_filtering(self):
953 '''
954 Creates a backup, and then filter when doing the tar based restore.
955 '''
f698c99c
PG
956 password, paramversion = self.ENCRYPTION or (None, None)
957 deltatar = DeltaTar(mode=self.MODE, password=password,
958 crypto_paramversion=paramversion,
e5f5681b
ERE
959 logger=self.consoleLogger)
960
961 # create first backup
962 deltatar.create_full_backup(
963 source_path="source_dir",
964 backup_path="backup_dir")
965
966 assert os.path.exists("backup_dir")
967 shutil.rmtree("source_dir")
968
eb6d0069
CH
969 deltatar.included_files = ["test", "small"]
970 deltatar.excluded_files = ["test/huge"]
e5f5681b
ERE
971
972 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
973 tar_path = os.path.join("backup_dir", tar_filename)
974
975 deltatar.restore_backup(target_path="source_dir",
976 backup_tar_path=tar_path)
977
eb6d0069
CH
978 assert os.path.exists("source_dir/small")
979 assert os.path.exists("source_dir/test")
980 assert os.path.exists("source_dir/test/huge2")
981 assert os.path.exists("source_dir/test/test2")
e5f5681b 982
eb6d0069
CH
983 assert not os.path.exists("source_dir/test/huge")
984 assert not os.path.exists("source_dir/big")
e5f5681b
ERE
985
986 def test_restore_tar_filter_func(self):
987 '''
988 Creates a backup, and then filter when doing the tar based restore,
989 using the filter function.
990 '''
991 visited_paths = []
992 def filter_func(visited_paths, path):
993 if path not in visited_paths:
994 visited_paths.append(path)
995 return True
996
997 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
998
999 password, paramversion = self.ENCRYPTION or (None, None)
1000 deltatar = DeltaTar(mode=self.MODE, password=password,
1001 crypto_paramversion=paramversion,
e5f5681b
ERE
1002 logger=self.consoleLogger)
1003
1004 # create first backup
1005 deltatar.create_full_backup(
1006 source_path="source_dir",
1007 backup_path="backup_dir")
1008
1009 assert os.path.exists("backup_dir")
1010 shutil.rmtree("source_dir")
1011
1012 index_filename = deltatar.index_name_func(True)
1013 index_path = os.path.join("backup_dir", index_filename)
1014
eb6d0069
CH
1015 deltatar.included_files = ["test", "small"]
1016 deltatar.excluded_files = ["test/huge"]
e5f5681b
ERE
1017 deltatar.filter_func = filter_func
1018
1019 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1020 tar_path = os.path.join("backup_dir", tar_filename)
1021
1022 deltatar.restore_backup(target_path="source_dir",
1023 backup_tar_path=tar_path)
1024 assert set(visited_paths) == set([
eb6d0069
CH
1025 'small',
1026 'test',
1027 'test/huge2',
1028 'test/test2'
e5f5681b
ERE
1029 ])
1030
974408b5 1031 def test_filter_path_regexp(self):
0d5c1970
ERE
1032 '''
1033 Test specifically the deltatar.filter_path function with regular
1034 expressions
1035 '''
1036 included_files = [
eb6d0069
CH
1037 re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1038 re.compile('^yes$'),
1039 'testing'
0d5c1970
ERE
1040 ]
1041 excluded_files = [
eb6d0069 1042 re.compile('^testing/in_the'),
0d5c1970
ERE
1043 ]
1044 deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1045 excluded_files=excluded_files)
1046
1047 # assert valid and invalid paths
eb6d0069
CH
1048 assert deltatar.filter_path('test/hola')
1049 assert deltatar.filter_path('test/hola/any/thing')
1050 assert deltatar.filter_path('test/caracola/caracolero')
1051 assert deltatar.filter_path('test/caracola/caracolero/yeah')
1052 assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1053 assert deltatar.filter_path('yes')
1054 assert deltatar.filter_path('testing')
1055 assert deltatar.filter_path('testing/yes')
1056 assert deltatar.filter_path('testing/in_th')
1057
1058 assert not deltatar.filter_path('something')
1059 assert not deltatar.filter_path('other/thing')
1060 assert not deltatar.filter_path('test_ing')
1061 assert not deltatar.filter_path('test/hola_lala')
1062 assert not deltatar.filter_path('test/agur')
1063 assert not deltatar.filter_path('testing_something')
1064 assert not deltatar.filter_path('yeso')
1065 assert not deltatar.filter_path('yes/o')
1066 assert not deltatar.filter_path('yes_o')
1067 assert not deltatar.filter_path('testing/in_the')
1068 assert not deltatar.filter_path('testing/in_the_field')
1069 assert not deltatar.filter_path('testing/in_the/field')
e5f5681b 1070
974408b5
ERE
1071 def test_filter_path_parent(self):
1072 '''
1073 Test specifically the deltatar.filter_path function for parent matching
1074 '''
1075 included_files = [
eb6d0069 1076 'testing/path/to/some/thing'
974408b5
ERE
1077 ]
1078 deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1079
1080 # assert valid and invalid paths
eb6d0069
CH
1081 assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1082 assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1083 assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1084 assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1085 assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1086 assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1087 assert deltatar.filter_path('testing/something/else') == NO_MATCH
974408b5
ERE
1088
1089 def test_parent_matching_simple_full_backup(self):
1090 '''
1091 Create a full backup using parent matching
1092 '''
1093 included_files = [
eb6d0069 1094 'test/huge2'
974408b5 1095 ]
f698c99c
PG
1096
1097 password, paramversion = self.ENCRYPTION or (None, None)
1098 deltatar = DeltaTar(mode=self.MODE, password=password,
1099 crypto_paramversion=paramversion,
974408b5
ERE
1100 logger=self.consoleLogger,
1101 included_files=included_files)
1102
1103 # create first backup
1104 deltatar.create_full_backup(
1105 source_path="source_dir",
1106 backup_path="backup_dir")
1107
1108 assert os.path.exists("backup_dir")
1109 shutil.rmtree("source_dir")
1110
1111 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1112 tar_path = os.path.join("backup_dir", tar_filename)
1113
f698c99c 1114 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1115 logger=self.consoleLogger)
1116 deltatar.restore_backup(target_path="source_dir",
1117 backup_tar_path=tar_path)
1118
1119 assert os.path.exists('source_dir/test/huge2')
1120 assert os.path.exists('source_dir/test/')
1121 assert not os.path.exists('source_dir/test/huge')
1122 assert not os.path.exists('source_dir/big')
1123 assert not os.path.exists('source_dir/small')
1124
1125 def test_parent_matching_simple_full_backup_restore(self):
1126 '''
1127 Create a full backup and restores it using parent matching
1128 '''
1129 included_files = [
eb6d0069 1130 'test/huge2'
974408b5 1131 ]
f698c99c
PG
1132
1133 password, paramversion = self.ENCRYPTION or (None, None)
1134 deltatar = DeltaTar(mode=self.MODE, password=password,
1135 crypto_paramversion=paramversion,
974408b5
ERE
1136 logger=self.consoleLogger)
1137
1138 # create first backup
1139 deltatar.create_full_backup(
1140 source_path="source_dir",
1141 backup_path="backup_dir")
1142
1143 assert os.path.exists("backup_dir")
1144 shutil.rmtree("source_dir")
1145
1146 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1147 tar_path = os.path.join("backup_dir", tar_filename)
1148
f698c99c 1149 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1150 logger=self.consoleLogger,
1151 included_files=included_files)
1152 deltatar.restore_backup(target_path="source_dir",
1153 backup_tar_path=tar_path)
1154
1155 assert os.path.exists('source_dir/test/huge2')
1156 assert os.path.exists('source_dir/test/')
1157 assert not os.path.exists('source_dir/test/huge')
1158 assert not os.path.exists('source_dir/big')
1159 assert not os.path.exists('source_dir/small')
1160
1161 def test_parent_matching_index_full_backup_restore(self):
1162 '''
1163 Create a full backup and restores it using parent matching
1164 '''
1165 included_files = [
eb6d0069 1166 'test/huge2'
974408b5 1167 ]
f698c99c
PG
1168
1169 password, paramversion = self.ENCRYPTION or (None, None)
1170 deltatar = DeltaTar(mode=self.MODE, password=password,
1171 crypto_paramversion=paramversion,
974408b5
ERE
1172 logger=self.consoleLogger)
1173
1174 # create first backup
1175 deltatar.create_full_backup(
1176 source_path="source_dir",
1177 backup_path="backup_dir")
1178
1179 assert os.path.exists("backup_dir")
1180 shutil.rmtree("source_dir")
1181
1182 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1183 tar_path = os.path.join("backup_dir", tar_filename)
1184
f698c99c 1185 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1186 logger=self.consoleLogger,
1187 included_files=included_files)
1188 deltatar.restore_backup(target_path="source_dir",
1189 backup_tar_path=tar_path)
1190
1191 assert os.path.exists('source_dir/test/huge2')
1192 assert os.path.exists('source_dir/test/')
1193 assert not os.path.exists('source_dir/test/huge')
1194 assert not os.path.exists('source_dir/big')
1195 assert not os.path.exists('source_dir/small')
1196
d07c8065
ERE
1197 def test_collate_iterators(self):
1198 '''
1199 Tests the collate iterators functionality with two exact directories,
1200 using an index iterator from a backup and the exact same source dir.
1201 '''
f698c99c
PG
1202 password, paramversion = self.ENCRYPTION or (None, None)
1203 deltatar = DeltaTar(mode=self.MODE, password=password,
1204 crypto_paramversion=paramversion,
d07c8065
ERE
1205 logger=self.consoleLogger)
1206
1207 # create first backup
1208 deltatar.create_full_backup(
1209 source_path="source_dir",
1210 backup_path="backup_dir")
1211
1212 assert os.path.exists("backup_dir")
1213
1214 cwd = os.getcwd()
1215 index_filename = deltatar.index_name_func(is_full=True)
1216 index_path = os.path.join(cwd, "backup_dir", index_filename)
1217 index_it = deltatar.iterate_index_path(index_path)
1218
1219 os.chdir('source_dir')
1220 dir_it = deltatar._recursive_walk_dir('.')
1221 path_it = deltatar.jsonize_path_iterator(dir_it)
1222
1223 try:
ea6d3c3e 1224 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
d07c8065
ERE
1225 assert deltatar._equal_stat_dicts(path1, path2)
1226 finally:
1227 os.chdir(cwd)
1228
1229 def test_collate_iterators_diffdirs(self):
1230 '''
1231 Use the collate iterators functionality with two different directories.
1232 It must behave in an expected way.
1233 '''
1234 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1235
f698c99c
PG
1236 password, paramversion = self.ENCRYPTION or (None, None)
1237 deltatar = DeltaTar(mode=self.MODE, password=password,
1238 crypto_paramversion=paramversion,
d07c8065
ERE
1239 logger=self.consoleLogger)
1240
1241 # create first backup
1242 deltatar.create_full_backup(
1243 source_path="source_dir",
1244 backup_path="backup_dir")
1245
1246 assert os.path.exists("backup_dir")
1247 self.hash["source_dir/z"] = self.create_file("source_dir/z", 100)
1248
1249 cwd = os.getcwd()
1250 index_filename = deltatar.index_name_func(is_full=True)
1251 index_path = os.path.join(cwd, "backup_dir", index_filename)
1252 index_it = deltatar.iterate_index_path(index_path)
1253
1254 os.chdir('source_dir')
1255 dir_it = deltatar._recursive_walk_dir('.')
1256 path_it = deltatar.jsonize_path_iterator(dir_it)
1257
1258 try:
ea6d3c3e 1259 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
eb6d0069 1260 if path2['path'] == 'z':
d07c8065
ERE
1261 assert not path1
1262 else:
1263 assert deltatar._equal_stat_dicts(path1, path2)
1264 finally:
1265 os.chdir(cwd)
1266
42d39ca7 1267 def test_collate_iterators_diffdirs2(self):
aae127d0 1268 '''
42d39ca7
ERE
1269 Use the collate iterators functionality with two different directories.
1270 It must behave in an expected way.
aae127d0 1271 '''
f698c99c
PG
1272 password, paramversion = self.ENCRYPTION or (None, None)
1273 deltatar = DeltaTar(mode=self.MODE, password=password,
1274 crypto_paramversion=paramversion,
42d39ca7
ERE
1275 logger=self.consoleLogger)
1276
1277 # create first backup
1278 deltatar.create_full_backup(
1279 source_path="source_dir",
1280 backup_path="backup_dir")
1281
1282 assert os.path.exists("backup_dir")
1283
1284 # add some new files and directories
1285 os.makedirs('source_dir/bigdir')
1286 self.hash["source_dir/bigdir"] = ""
1287 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1288 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
aae127d0
ERE
1289 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1290
42d39ca7
ERE
1291 cwd = os.getcwd()
1292 index_filename = deltatar.index_name_func(is_full=True)
1293 index_path = os.path.join(cwd, "backup_dir", index_filename)
1294 index_it = deltatar.iterate_index_path(index_path)
1295
1296 os.chdir('source_dir')
1297 dir_it = deltatar._recursive_walk_dir('.')
1298 path_it = deltatar.jsonize_path_iterator(dir_it)
1299
1300 visited_pairs = []
1301
1302 try:
ea6d3c3e 1303 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
42d39ca7
ERE
1304 visited_pairs.append(
1305 (deltatar.unprefixed(path1['path']) if path1 else None,
1306 path2['path'] if path2 else None)
1307 )
1308 finally:
1309 assert visited_pairs == [
eb6d0069
CH
1310 (u'big', u'big'),
1311 (None, u'bigdir'),
1312 (u'small', u'small'),
1313 (u'test', u'test'),
1314 (None, u'zzzz'),
1315 (None, u'bigdir/a'),
1316 (None, u'bigdir/b'),
1317 (u'test/huge', u'test/huge'),
1318 (u'test/huge2', u'test/huge2'),
1319 (u'test/test2', u'test/test2'),
42d39ca7
ERE
1320 ]
1321 os.chdir(cwd)
1322
1323 def test_create_empty_diff_backup(self):
1324 '''
1325 Creates an empty (no changes) backup diff
1326 '''
f698c99c
PG
1327 password, paramversion = self.ENCRYPTION or (None, None)
1328 deltatar = DeltaTar(mode=self.MODE, password=password,
1329 crypto_paramversion=paramversion,
aae127d0
ERE
1330 logger=self.consoleLogger)
1331
1332 # create first backup
1333 deltatar.create_full_backup(
1334 source_path="source_dir",
1335 backup_path="backup_dir")
1336
1337 prev_index_filename = deltatar.index_name_func(is_full=True)
1338 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1339
1340 deltatar.create_diff_backup("source_dir", "backup_dir2",
1341 prev_index_path)
1342
1343 # check index items
df86af81
ERE
1344 index_path = os.path.join("backup_dir2",
1345 deltatar.index_name_func(is_full=False))
aae127d0 1346 index_it = deltatar.iterate_index_path(index_path)
82de3376 1347 n = 0
aae127d0 1348 for i in index_it:
82de3376 1349 n += 1
aae127d0 1350 assert i[0]['path'].startswith("list://")
9af328e2 1351
ea6d3c3e 1352 assert n == 6
8adbe50d
ERE
1353
1354 # check the tar file
82de3376
ERE
1355 assert os.path.exists("backup_dir2")
1356 shutil.rmtree("source_dir")
1357
df86af81
ERE
1358 tar_filename = deltatar.volume_name_func('backup_dir2',
1359 is_full=False, volume_number=0)
82de3376
ERE
1360 tar_path = os.path.join("backup_dir2", tar_filename)
1361
1362 # no file restored, because the diff was empty
1363 deltatar.restore_backup(target_path="source_dir",
1364 backup_tar_path=tar_path)
1365 assert len(os.listdir("source_dir")) == 0
1366
8adbe50d 1367
42d39ca7
ERE
1368 def test_create_diff_backup1(self):
1369 '''
1370 Creates a diff backup when there are new files
1371 '''
f698c99c
PG
1372 password, paramversion = self.ENCRYPTION or (None, None)
1373 deltatar = DeltaTar(mode=self.MODE, password=password,
1374 crypto_paramversion=paramversion,
42d39ca7
ERE
1375 logger=self.consoleLogger)
1376
1377 # create first backup
1378 deltatar.create_full_backup(
1379 source_path="source_dir",
1380 backup_path="backup_dir")
1381
1382 prev_index_filename = deltatar.index_name_func(is_full=True)
1383 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1384
1385 # add some new files and directories
1386 os.makedirs('source_dir/bigdir')
1387 self.hash["source_dir/bigdir"] = ""
0519f161 1388 os.unlink("source_dir/small")
42d39ca7
ERE
1389 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1390 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1391 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1392
1393 deltatar.create_diff_backup("source_dir", "backup_dir2",
1394 prev_index_path)
1395
1396 # check index items
df86af81 1397 index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
42d39ca7 1398 index_it = deltatar.iterate_index_path(index_path)
df86af81
ERE
1399 l = [i[0]['path'] for i in index_it]
1400
1401 assert l == [
eb6d0069
CH
1402 'list://big',
1403 'snapshot://bigdir',
1404 'delete://small',
1405 'list://test',
1406 'snapshot://zzzz',
1407 'snapshot://bigdir/a',
1408 'snapshot://bigdir/b',
1409 'list://test/huge',
1410 'list://test/huge2',
1411 'list://test/test2',
df86af81 1412 ]
42d39ca7
ERE
1413
1414 # check the tar file
1415 assert os.path.exists("backup_dir2")
1416 shutil.rmtree("source_dir")
1417
0519f161
ERE
1418 # create source_dir with the small file, that will be then deleted by
1419 # the restore_backup
1420 os.mkdir("source_dir")
be60ffd0 1421 open("source_dir/small", 'wb').close()
0519f161 1422
df86af81
ERE
1423 tar_filename = deltatar.volume_name_func('backup_dir2',
1424 is_full=False, volume_number=0)
42d39ca7
ERE
1425 tar_path = os.path.join("backup_dir2", tar_filename)
1426
1427 # restore the backup, this will create only the new files
1428 deltatar.restore_backup(target_path="source_dir",
1429 backup_tar_path=tar_path)
a345b1c9
DGM
1430 # the order doesn't matter
1431 assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
42d39ca7 1432
df99a044
ERE
1433 def test_restore_from_index_diff_backup(self):
1434 '''
1435 Creates a full backup, modifies some files, creates a diff backup,
1436 then restores the diff backup from zero.
1437 '''
df99a044 1438 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1439 raise SkipTest('this test only works for uncompressed '
1440 'or concat compressed modes')
df99a044 1441
f698c99c
PG
1442 password, paramversion = self.ENCRYPTION or (None, None)
1443 deltatar = DeltaTar(mode=self.MODE, password=password,
1444 crypto_paramversion=paramversion,
df99a044
ERE
1445 logger=self.consoleLogger)
1446
1447 # create first backup
1448 deltatar.create_full_backup(
1449 source_path="source_dir",
1450 backup_path="backup_dir")
1451
1452 prev_index_filename = deltatar.index_name_func(is_full=True)
1453 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1454
1455 # add some new files and directories
1456 os.makedirs('source_dir/bigdir')
1457 self.hash["source_dir/bigdir"] = ""
1458 os.unlink("source_dir/small")
1459 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1460 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1461 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1462
1463 deltatar.create_diff_backup("source_dir", "backup_dir2",
1464 prev_index_path)
1465
1466 # apply diff backup in target_dir
df86af81 1467 index_filename = deltatar.index_name_func(is_full=False)
df99a044
ERE
1468 index_path = os.path.join("backup_dir2", index_filename)
1469 deltatar.restore_backup("target_dir",
1470 backup_indexes_paths=[index_path, prev_index_path])
1471
1472 # then compare the two directories source_dir and target_dir and check
1473 # they are the same
cbac9f0b 1474 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
df99a044
ERE
1475
1476 def test_restore_from_index_diff_backup2(self):
1477 '''
1478 Creates a full backup, modifies some files, creates a diff backup,
1479 then restores the diff backup with the full backup as a starting point.
1480 '''
df99a044 1481 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1482 raise SkipTest('this test only works for uncompressed '
1483 'or concat compressed modes')
df99a044 1484
f698c99c
PG
1485 password, paramversion = self.ENCRYPTION or (None, None)
1486 deltatar = DeltaTar(mode=self.MODE, password=password,
1487 crypto_paramversion=paramversion,
df99a044
ERE
1488 logger=self.consoleLogger)
1489
1490 # create first backup
1491 deltatar.create_full_backup(
1492 source_path="source_dir",
1493 backup_path="backup_dir")
1494
1495 prev_index_filename = deltatar.index_name_func(is_full=True)
1496 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1497
1498 # add some new files and directories
1499 os.makedirs('source_dir/bigdir')
1500 self.hash["source_dir/bigdir"] = ""
1501 os.unlink("source_dir/small")
1502 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1503 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1504 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
d86735e4 1505 shutil.rmtree("source_dir/test")
df99a044
ERE
1506
1507 deltatar.create_diff_backup("source_dir", "backup_dir2",
1508 prev_index_path)
1509
1510 # first restore initial backup in target_dir
df86af81 1511 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
df99a044
ERE
1512 tar_path = os.path.join("backup_dir", tar_filename)
1513 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1514
1515 # then apply diff backup in target_dir
df86af81 1516 index_filename = deltatar.index_name_func(is_full=False)
df99a044
ERE
1517 index_path = os.path.join("backup_dir2", index_filename)
1518 deltatar.restore_backup("target_dir",
1519 backup_indexes_paths=[index_path, prev_index_path])
1520
1521 # then compare the two directories source_dir and target_dir and check
1522 # they are the same
cbac9f0b
ERE
1523 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1524
1525 def test_restore_from_index_diff_backup3(self):
1526 '''
188b845d 1527 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
cbac9f0b
ERE
1528 diff backup, then restores the diff backup with the full backup as a
1529 starting point.
1530 '''
cbac9f0b 1531 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1532 raise SkipTest('this test only works for uncompressed '
1533 'or concat compressed modes')
cbac9f0b 1534
f698c99c
PG
1535 password, paramversion = self.ENCRYPTION or (None, None)
1536 deltatar = DeltaTar(mode=self.MODE, password=password,
1537 crypto_paramversion=paramversion,
cbac9f0b
ERE
1538 logger=self.consoleLogger)
1539
1540 shutil.rmtree("source_dir")
188b845d
TJ
1541 shutil.copytree(self.GIT_DIR, "source_dir")
1542 shutil.copytree(self.GIT_DIR, "source_dir_diff")
cbac9f0b
ERE
1543
1544 # create first backup
1545 deltatar.create_full_backup(
1546 source_path="source_dir",
1547 backup_path="backup_dir")
1548
1549 prev_index_filename = deltatar.index_name_func(is_full=True)
1550 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1551
1552 # alter the source_dir randomly
1553 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1554
1555 for path in source_it:
1556 # if path doesn't exist (might have previously removed) ignore it.
1557 # also ignore it (i.e. do not change it) 70% of the time
1558 if not os.path.exists(path) or random.random() < 0.7:
1559 continue
1560
1561 # remove the file
1562 if os.path.isdir(path):
1563 shutil.rmtree(path)
1564 else:
1565 os.unlink(path)
1566
1215b602 1567 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
cbac9f0b 1568 prev_index_path)
cbac9f0b
ERE
1569
1570 # first restore initial backup in target_dir
df86af81 1571 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
cbac9f0b
ERE
1572 tar_path = os.path.join("backup_dir", tar_filename)
1573 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1574
1575 # and check that target_dir equals to source_dir (which is the same as
188b845d 1576 # self.GIT_DIR initially)
cbac9f0b
ERE
1577 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1578
1579 # then apply diff backup in target_dir
df86af81 1580 index_filename = deltatar.index_name_func(is_full=False)
8825be52
DGM
1581 index_path = os.path.join("backup_dir2", index_filename)
1582 deltatar.restore_backup("target_dir",
1583 backup_indexes_paths=[index_path, prev_index_path])
1584
1585 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1586 # altered self.GIT_DIR directory)
8825be52
DGM
1587 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1588
1589 # then delete target_dir and apply diff backup from zero and check again
1590 shutil.rmtree("target_dir")
1591 deltatar.restore_backup("target_dir",
1592 backup_indexes_paths=[index_path, prev_index_path])
1593
1594 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1595 # altered self.GIT_DIR directory)
8825be52
DGM
1596 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1597
1598 def test_restore_from_index_diff_backup3_multivol(self):
1599 '''
188b845d 1600 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
8825be52
DGM
1601 diff backup, then restores the diff backup with the full backup as a
1602 starting point.
1603 '''
8825be52 1604 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1605 raise SkipTest('this test only works for uncompressed '
1606 'or concat compressed modes')
8825be52 1607
f698c99c
PG
1608 password, paramversion = self.ENCRYPTION or (None, None)
1609 deltatar = DeltaTar(mode=self.MODE, password=password,
1610 crypto_paramversion=paramversion,
8825be52
DGM
1611 logger=self.consoleLogger)
1612
1613 shutil.rmtree("source_dir")
188b845d
TJ
1614 shutil.copytree(self.GIT_DIR, "source_dir")
1615 shutil.copytree(self.GIT_DIR, "source_dir_diff")
8825be52
DGM
1616
1617 # create first backup
1618 deltatar.create_full_backup(
1619 source_path="source_dir",
1620 backup_path="backup_dir",
1621 max_volume_size=1)
1622
1623 prev_index_filename = deltatar.index_name_func(is_full=True)
1624 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1625
1626 # alter the source_dir randomly
1627 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1628
1629 for path in source_it:
1630 # if path doesn't exist (might have previously removed) ignore it.
1631 # also ignore it (i.e. do not change it) 70% of the time
1632 if not os.path.exists(path) or random.random() < 0.7:
1633 continue
1634
1635 # remove the file
1636 if os.path.isdir(path):
1637 shutil.rmtree(path)
1638 else:
1639 os.unlink(path)
1640
1641 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1642 prev_index_path, max_volume_size=1)
1643
1644 # first restore initial backup in target_dir
1645 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1646 tar_path = os.path.join("backup_dir", tar_filename)
1647 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1648
1649 # and check that target_dir equals to source_dir (which is the same as
188b845d 1650 # self.GIT_DIR initially)
8825be52
DGM
1651 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1652
1653 # then apply diff backup in target_dir
df86af81 1654 index_filename = deltatar.index_name_func(is_full=False)
cbac9f0b
ERE
1655 index_path = os.path.join("backup_dir2", index_filename)
1656 deltatar.restore_backup("target_dir",
1657 backup_indexes_paths=[index_path, prev_index_path])
1658
1659 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1660 # altered self.GIT_DIR directory)
cbac9f0b
ERE
1661 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1662
1663 # then delete target_dir and apply diff backup from zero and check again
1664 shutil.rmtree("target_dir")
1665 deltatar.restore_backup("target_dir",
1666 backup_indexes_paths=[index_path, prev_index_path])
1667
1668 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1669 # altered self.GIT_DIR directory)
cbac9f0b
ERE
1670 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1671
1672 def check_equal_dirs(self, path1, path2, deltatar):
1673 '''
1674 compare the two directories source_dir and target_dir and check
1675 # they are the same
1676 '''
eb6d0069 1677 source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
df99a044 1678 source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
eb6d0069 1679 target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
df99a044
ERE
1680 target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1681 while True:
1682 try:
be60ffd0
ERE
1683 sitem = next(source_it)
1684 titem = next(target_it)
df99a044
ERE
1685 except StopIteration:
1686 try:
be60ffd0 1687 titem = next(target_it)
df99a044
ERE
1688 raise Exception("iterators do not stop at the same time")
1689 except StopIteration:
1690 break
a638b8d7 1691 try:
5200d2aa 1692 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
be60ffd0 1693 except Exception as e:
5200d2aa
TJ
1694 print("SITEM: " + str(sitem))
1695 print("TITEM: " + str(titem))
a638b8d7 1696 raise e
df99a044 1697
f5d9144b
PG
1698 def test_create_no_symlinks(self):
1699 '''
1700 Creates a full backup from different varieties of symlinks. The
1701 extracted archive may not contain any symlinks but the file contents
1702 '''
1703
1704 os.system("rm -rf source_dir")
1705 os.makedirs("source_dir/symlinks")
1706 fd = os.open("source_dir/symlinks/valid_linkname",
1707 os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1708 os.write(fd, b"valid link target for symlink tests; please ignore\n")
1709 os.close(fd)
1710 # first one is good, the rest points nowhere
1711 self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1712 self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1713 self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1714 self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
f698c99c
PG
1715 password, paramversion = self.ENCRYPTION or (None, None)
1716 deltatar = DeltaTar(mode=self.MODE, password=password,
1717 crypto_paramversion=paramversion,
f5d9144b
PG
1718 logger=self.consoleLogger)
1719
1720 # create first backup
1721 deltatar.create_full_backup(source_path="source_dir",
1722 backup_path="backup_dir")
1723
1724 assert os.path.exists("backup_dir")
1725 shutil.rmtree("source_dir")
1726 assert not os.path.exists("source_dir")
1727
1728 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1729 tar_path = os.path.join("backup_dir", tar_filename)
1730
1731 deltatar.restore_backup(target_path="source_dir",
1732 backup_tar_path=tar_path)
1733
1734 for _r, _ds, fs in os.walk("source_dir/symlinks"):
1735 # only the valid link plus the linked file may be found in the
1736 # extracted archive
1737 assert len(fs) == 2
1738 for f in fs:
1739 # the link must have been resolved and file contents must match
1740 # the linked file
1741 assert not os.path.islink(f)
1742 with open("source_dir/symlinks/valid_linkname") as a:
1743 with open("source_dir/symlinks/whatever") as b:
1744 assert a.read() == b.read()
1745
83f5fd71
PG
1746 def test_restore_with_symlinks(self):
1747 '''
9b13f5c4
PG
1748 Creates a full backup containing different varieties of symlinks. All
1749 of them must be filtered out.
83f5fd71 1750 '''
f698c99c
PG
1751 password, paramversion = self.ENCRYPTION or (None, None)
1752 deltatar = DeltaTar(mode=self.MODE, password=password,
1753 crypto_paramversion=paramversion,
83f5fd71
PG
1754 logger=self.consoleLogger)
1755
1756 # create first backup
1757 deltatar.create_full_backup(source_path="source_dir",
1758 backup_path="backup_dir")
1759
1760 assert os.path.exists("backup_dir")
1761 shutil.rmtree("source_dir")
1762
1763 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1764 tar_path = os.path.join("backup_dir", tar_filename)
1765
1766 # add symlinks to existing archive
1767
9b13f5c4 1768 def add_symlink (a, name, dst):
83f5fd71
PG
1769 l = tarfile.TarInfo("snapshot://%s" % name)
1770 l.type = tarfile.SYMTYPE
1771 l.linkname = dst
1772 a.addfile(l)
9b13f5c4 1773 return name
83f5fd71 1774
5faea0e1
PG
1775 try:
1776 with tarfile.open(tar_path,mode="a") as a:
1777 checkme = \
1778 [ add_symlink(a, "symlinks/foo", "internal-file")
1779 , add_symlink(a, "symlinks/bar", "/absolute/path")
1780 , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1781 except tarfile.ReadError as e:
1782 if self.MODE == '#' or self.MODE.endswith ("gz"):
1783 checkme = []
1784 else:
1785 raise
1786 except ValueError as e:
1787 if self.MODE.startswith ('#'):
1788 checkme = []
1789 else:
1790 raise
83f5fd71
PG
1791
1792 deltatar.restore_backup(target_path="source_dir",
1793 backup_tar_path=tar_path)
1794
1795 # check what happened to our symlinks
9b13f5c4
PG
1796 for name in checkme:
1797 fullpath = os.path.join("source_dir", name)
1798 assert not os.path.exists(fullpath)
f5d9144b 1799
43ce978b
PG
1800 def test_restore_malicious_symlinks(self):
1801 '''
1802 Creates a full backup containing a symlink and a file of the same name.
1803 This simulates a symlink attack with a link pointing to some external
1804 path that is abused to write outside the extraction prefix.
1805 '''
f698c99c
PG
1806 password, paramversion = self.ENCRYPTION or (None, None)
1807 deltatar = DeltaTar(mode=self.MODE, password=password,
1808 crypto_paramversion=paramversion,
43ce978b
PG
1809 logger=self.consoleLogger)
1810
1811 # create first backup
1812 deltatar.create_full_backup(source_path="source_dir",
1813 backup_path="backup_dir")
1814
1815 assert os.path.exists("backup_dir")
1816 shutil.rmtree("source_dir")
1817
1818 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1819 tar_path = os.path.join("backup_dir", tar_filename)
1820
1821 # add symlinks to existing archive
1822
1823 def add_symlink (a, name, dst):
1824 l = tarfile.TarInfo("snapshot://%s" % name)
1825 l.type = tarfile.SYMTYPE
1826 l.linkname = dst
1827 a.addfile(l)
1828
1829 def add_file (a, name):
1830 f = tarfile.TarInfo("snapshot://%s" % name)
1831 f.type = tarfile.REGTYPE
1832 a.addfile(f)
43ce978b
PG
1833
1834 testpath = "symlinks/pernicious-link"
9b13f5c4
PG
1835 testdst = "/tmp/does/not/exist"
1836
5faea0e1
PG
1837 try:
1838 with tarfile.open(tar_path, mode="a") as a:
1839 add_symlink(a, testpath, testdst)
1840 add_symlink(a, testpath, testdst+"X")
1841 add_symlink(a, testpath, testdst+"XXX")
1842 add_file(a, testpath)
1843 except tarfile.ReadError as e:
1844 if self.MODE == '#' or self.MODE.endswith ("gz"):
1845 pass
1846 else:
1847 raise
1848 except ValueError as e:
1849 if self.MODE.startswith ('#'):
1850 pass # O_APPEND of concat archives not feasible
1851 else:
1852 raise
43ce978b 1853
43ce978b
PG
1854 deltatar.restore_backup(target_path="source_dir",
1855 backup_tar_path=tar_path)
1856
9b13f5c4
PG
1857 # check whether the link was extracted; deltatar seems to only ever
1858 # retrieve the first item it finds for a given path which in the case
1859 # at hand is a symlink to some non-existent path
43ce978b 1860 fullpath = os.path.join("source_dir", testpath)
9b13f5c4 1861 assert not os.path.exists(fullpath)
43ce978b 1862
da26094a
ERE
1863class DeltaTar2Test(DeltaTarTest):
1864 '''
1865 Same as DeltaTar but with specific ":" mode
1866 '''
1867 MODE = ':'
1868
1869
1870class DeltaTarStreamTest(DeltaTarTest):
1871 '''
1872 Same as DeltaTar but with specific uncompressed stream mode
1873 '''
1874 MODE = '|'
1875
1876
1877class DeltaTarGzipTest(DeltaTarTest):
1878 '''
1879 Same as DeltaTar but with specific gzip mode
1880 '''
1881 MODE = ':gz'
8ea0be50 1882 MODE_COMPRESSES = True
da26094a
ERE
1883
1884
da26094a
ERE
1885class DeltaTarGzipStreamTest(DeltaTarTest):
1886 '''
1887 Same as DeltaTar but with specific gzip stream mode
1888 '''
1889 MODE = '|gz'
8ea0be50 1890 MODE_COMPRESSES = True
da26094a
ERE
1891
1892
bd011242
CH
1893@skip('Bz2 tests are too slow..')
1894class DeltaTarBz2Test(DeltaTarTest):
1895 '''
1896 Same as DeltaTar but with specific bz2 mode
1897 '''
1898 MODE = ':bz2'
8ea0be50 1899 MODE_COMPRESSES = True
ea6d3c3e 1900
bd011242
CH
1901
1902@skip('Bz2 tests are too slow..')
1903class DeltaTarBz2StreamTest(DeltaTarTest):
1904 '''
1905 Same as DeltaTar but with specific bz2 stream mode
1906 '''
1907 MODE = '|bz2'
8ea0be50 1908 MODE_COMPRESSES = True
da26094a
ERE
1909
1910
1911class DeltaTarGzipConcatTest(DeltaTarTest):
1912 '''
1913 Same as DeltaTar but with specific gzip concat stream mode
1914 '''
1915 MODE = '#gz'
8ea0be50 1916 MODE_COMPRESSES = True
da26094a
ERE
1917
1918
da26094a
ERE
1919class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
1920 '''
1921 Same as DeltaTar but with specific gzip aes128 concat stream mode
1922 '''
f698c99c
PG
1923 MODE = '#gz'
1924 ENCRYPTION = ('some magic key', 1)
8ea0be50 1925 MODE_COMPRESSES = True
da26094a
ERE
1926
1927
ac5e4184
ERE
1928class DeltaTarAes128ConcatTest(DeltaTarTest):
1929 '''
1930 Same as DeltaTar but with specific aes128 concat stream mode
1931 '''
d1c38f40 1932 MODE = '#'
f698c99c 1933 ENCRYPTION = ('some magic key', 1)
ac5e4184
ERE
1934
1935