graciously handle GCM data length limit
[python-delta-tar] / testing / test_deltatar.py
CommitLineData
0708a374
ERE
1# Copyright (C) 2013 Intra2net AG
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU Lesser General Public License as published
5# by the Free Software Foundation; either version 3 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU Lesser General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program. If not, see
15# <http://www.gnu.org/licenses/lgpl-3.0.html>
16
17# Author: Eduardo Robles Elvira <edulix@wadobo.com>
18
f5d9144b 19import errno
0708a374 20import os
0d5c1970 21import re
cbac9f0b 22import random
e5c6ca04 23import shutil
0708a374 24import logging
b8fc2f5d
ERE
25import binascii
26import json
27from datetime import datetime
28from functools import partial
bd011242 29from unittest import skip, SkipTest
0708a374 30
83f5fd71 31import deltatar.tarfile as tarfile
f698c99c 32from deltatar.tarfile import TarFile
974408b5 33from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
f698c99c
PG
34from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35import deltatar.crypto as crypto
0708a374 36
0708a374
ERE
37from . import BaseTest
38from . import new_volume_handler
39
0708a374
ERE
40class DeltaTarTest(BaseTest):
41 """
42 Test backups
43 """
da26094a 44 MODE = ''
8ea0be50 45 MODE_COMPRESSES = False
da26094a 46
f698c99c 47 ENCRYPTION = None # (password : str, paramversion : int) option
da26094a 48
188b845d
TJ
49 GIT_DIR = '.git'
50
0708a374
ERE
51 def setUp(self):
52 '''
53 Create base test data
54 '''
a4e8b8af 55 self.pwd = os.getcwd()
cbac9f0b 56 os.system('rm -rf target_dir source_dir* backup_dir* huge')
0708a374
ERE
57 os.makedirs('source_dir/test/test2')
58 self.hash = dict()
e5c6ca04 59 self.hash["source_dir/test/test2"] = ''
0708a374
ERE
60 self.hash["source_dir/big"] = self.create_file("source_dir/big", 50000)
61 self.hash["source_dir/small"] = self.create_file("source_dir/small", 100)
62 self.hash["source_dir/test/huge"] = self.create_file("source_dir/test/huge", 700000)
d5361dac 63 self.hash["source_dir/test/huge2"] = self.create_file("source_dir/test/huge2", 800000)
0708a374
ERE
64
65 self.consoleLogger = logging.StreamHandler()
66 self.consoleLogger.setLevel(logging.DEBUG)
67
188b845d
TJ
68 if not os.path.isdir(self.GIT_DIR):
69 # Not running inside git tree, take our
70 # own testing directory as source.
71 self.GIT_DIR = 'testing'
72
73 if not os.path.isdir(self.GIT_DIR):
74 raise Exception('No input directory found: ' + self.GIT_DIR)
75
0708a374
ERE
76 def tearDown(self):
77 '''
cb7a3911 78 Remove temporal files created by unit tests and reset globals.
0708a374 79 '''
a4e8b8af 80 os.chdir(self.pwd)
cbac9f0b 81 os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
cb7a3911
PG
82 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
83 ("I am fully aware that this will void my warranty.")
0708a374 84
b8fc2f5d 85 def test_restore_simple_full_backup(self):
0708a374
ERE
86 '''
87 Creates a full backup without any filtering and restores it.
88 '''
f698c99c
PG
89 password, paramversion = self.ENCRYPTION or (None, None)
90 deltatar = DeltaTar(mode=self.MODE, password=password,
91 crypto_paramversion=paramversion,
da26094a 92 logger=self.consoleLogger)
0708a374
ERE
93
94 # create first backup
95 deltatar.create_full_backup(
96 source_path="source_dir",
97 backup_path="backup_dir")
98
e5c6ca04
ERE
99 assert os.path.exists("backup_dir")
100 shutil.rmtree("source_dir")
101
102 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
103 tar_path = os.path.join("backup_dir", tar_filename)
104
e5c6ca04
ERE
105 deltatar.restore_backup(target_path="source_dir",
106 backup_tar_path=tar_path)
107
be60ffd0 108 for key, value in self.hash.items():
e5c6ca04
ERE
109 assert os.path.exists(key)
110 if value:
e82f14f5 111 assert value == self.md5sum(key)
da26094a 112
cb7a3911
PG
113
114 def test_create_backup_max_file_length (self):
115 """
116 Creates a full backup including one file that exceeds the (purposely
117 lowered) upper bound on GCM encrypted objects. This will yield multiple
118 encrypted objects for one plaintext file.
119
120 Success is verified by splitting the archive at object boundaries and
121 counting the parts.
122 """
123 if self.MODE_COMPRESSES is True:
124 raise SkipTest ("GCM file length test not meaningful with compression.")
125 if self.ENCRYPTION is None:
126 raise SkipTest ("GCM file length applies only to encrypted backups.")
127
128 new_max = 20000 # cannot be less than tar block size
129 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
130 ("I am fully aware that this will void my warranty.",
131 new_max)
132
133 password, paramversion = self.ENCRYPTION
134 deltatar = DeltaTar (mode=self.MODE, password=password,
135 crypto_paramversion=paramversion,
136 logger=self.consoleLogger)
137
138 self.hash = dict ()
139 os.makedirs ("source_dir2")
140 for f, s in [("empty" , 0) # 1 tar objects
141 ,("slightly_larger", new_max + 1) # 2
142 ,("twice" , 2 * new_max) # 3
143 ]:
144 f = "source_dir2/%s" % f
145 self.hash [f] = self.create_file (f, s)
146
147 deltatar.create_full_backup \
148 (source_path="source_dir2", backup_path="backup_dir")
149
150 assert os.path.exists ("backup_dir")
151 shutil.rmtree ("source_dir2")
152
153 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
154 backup_path = os.path.join("backup_dir", backup_filename)
155
156 # split the resulting archive into its constituents without
157 # decrypting
158 ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
159 "-o backup_dir/split <\'%s\'" % backup_path)
160
161 assert os.path.exists ("backup_dir/split")
162
163 dents = os.listdir ("backup_dir/split")
164 assert len (dents) == 6
165
166
167 def test_restore_backup_max_file_length (self):
168 """
169 Creates a full backup including one file that exceeds the (purposely
170 lowered) upper bound on GCM encrypted objects. This will yield two
171 encrypted objects for one plaintext file.
172
173 Success is verified by splitting the archive at object boundaries and
174 counting the parts.
175 """
176 if self.MODE_COMPRESSES is True:
177 raise SkipTest ("GCM file length test not meaningful with compression.")
178 if self.ENCRYPTION is None:
179 raise SkipTest ("GCM file length applies only to encrypted backups.")
180
181 new_max = 20000 # cannot be less than tar block size
182 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
183 ("I am fully aware that this will void my warranty.",
184 new_max)
185
186 password, paramversion = self.ENCRYPTION
187 deltatar = DeltaTar (mode=self.MODE, password=password,
188 crypto_paramversion=paramversion,
189 logger=self.consoleLogger)
190
191 self.hash = dict ()
192 os.makedirs ("source_dir2")
193 for f, s in [("empty" , 0) # 1 tar objects
194 ,("slightly_larger", new_max + 1) # 2
195 ,("twice" , 2 * new_max) # 3
196 ]:
197 f = "source_dir2/%s" % f
198 self.hash [f] = self.create_file (f, s)
199
200 deltatar.create_full_backup \
201 (source_path="source_dir2", backup_path="backup_dir")
202
203 assert os.path.exists ("backup_dir")
204 shutil.rmtree ("source_dir2")
205
206 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
207 backup_path = os.path.join("backup_dir", backup_filename)
208
209 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
210 tar_path = os.path.join("backup_dir", tar_filename)
211
212 deltatar.restore_backup(target_path="source_dir2",
213 backup_tar_path=tar_path)
214
215 for key, value in self.hash.items():
216 assert os.path.exists(key)
217 if value:
218 assert value == self.md5sum(key)
219
220
6c678f3a
ERE
221 def test_check_index_checksum(self):
222 '''
223 Creates a full backup and checks the index' checksum of files
224 '''
f698c99c
PG
225 password, paramversion = self.ENCRYPTION or (None, None)
226 deltatar = DeltaTar(mode=self.MODE, password=password,
227 crypto_paramversion=paramversion,
6c678f3a
ERE
228 logger=self.consoleLogger)
229
230 # create first backup
231 deltatar.create_full_backup(
232 source_path="source_dir",
233 backup_path="backup_dir")
234
235
236 index_filename = deltatar.index_name_func(True)
237 index_path = os.path.join("backup_dir", index_filename)
238
be60ffd0 239 f = open(index_path, 'rb')
6c678f3a
ERE
240 crc = None
241 checked = False
242 began_list = False
be60ffd0
ERE
243 while True:
244 l = f.readline()
245 if l == b'':
246 break
247 if b'BEGIN-FILE-LIST' in l:
c2ffe2ec 248 crc = binascii.crc32(l) & 0xFFFFffff
6c678f3a 249 began_list = True
be60ffd0 250 elif b'END-FILE-LIST' in l:
6c678f3a
ERE
251 crc = binascii.crc32(l, crc) & 0xffffffff
252
253 # next line contains the crc
be60ffd0 254 data = json.loads(f.readline().decode("UTF-8"))
6c678f3a
ERE
255 assert data['type'] == 'file-list-checksum'
256 assert data['checksum'] == crc
257 checked = True
258 break
259 elif began_list:
260 crc = binascii.crc32(l, crc) & 0xffffffff
c7609167 261 f.close()
6c678f3a 262
b8fc2f5d
ERE
263
264 def test_restore_multivol(self):
d5361dac 265 '''
b8fc2f5d
ERE
266 Creates a full backup without any filtering with multiple volumes and
267 restore it.
d5361dac 268 '''
ba7760a7
CH
269 if ':gz' in self.MODE:
270 raise SkipTest('compression information is lost when creating '
271 'multiple volumes with no Stream')
272
f698c99c
PG
273 password, paramversion = self.ENCRYPTION or (None, None)
274 deltatar = DeltaTar(mode=self.MODE, password=password,
275 crypto_paramversion=paramversion,
d5361dac
ERE
276 logger=self.consoleLogger)
277
b8fc2f5d
ERE
278 self.hash = dict()
279 os.makedirs('source_dir2')
280 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
281 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
282
d5361dac
ERE
283 # create first backup
284 deltatar.create_full_backup(
b8fc2f5d 285 source_path="source_dir2",
d5361dac
ERE
286 backup_path="backup_dir",
287 max_volume_size=1)
288
289 assert os.path.exists("backup_dir")
290 assert os.path.exists(os.path.join("backup_dir",
291 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
292 if self.MODE_COMPRESSES:
293 n_vols = 1
294 else:
295 n_vols = 2
296 for i_vol in range(n_vols):
297 assert os.path.exists(os.path.join("backup_dir",
298 deltatar.volume_name_func("backup_dir", True, i_vol)))
299 assert not os.path.exists(os.path.join("backup_dir",
300 deltatar.volume_name_func("backup_dir", True, n_vols)))
d5361dac 301
b8fc2f5d 302 shutil.rmtree("source_dir2")
d5361dac
ERE
303
304 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
305 tar_path = os.path.join("backup_dir", tar_filename)
306
307 # this should automatically restore all volumes
b8fc2f5d 308 deltatar.restore_backup(target_path="source_dir2",
d5361dac
ERE
309 backup_tar_path=tar_path)
310
be60ffd0 311 for key, value in self.hash.items():
d5361dac
ERE
312 assert os.path.exists(key)
313 if value:
314 assert value == self.md5sum(key)
315
14e2e92d
DGM
316 def test_restore_multivol_split(self):
317 '''
318 Creates a full backup without any filtering with multiple volumes
319 with big files bigger than the max volume size and
320 restore it.
321 '''
f61e1822 322 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
323 raise SkipTest('this test only works for uncompressed '
324 'or concat compressed modes')
f61e1822 325
f698c99c
PG
326 password, paramversion = self.ENCRYPTION or (None, None)
327 deltatar = DeltaTar(mode=self.MODE, password=password,
328 crypto_paramversion=paramversion,
14e2e92d
DGM
329 logger=self.consoleLogger)
330
14e2e92d
DGM
331 self.hash = dict()
332 os.makedirs('source_dir2')
82f75df4 333 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 3*1024*1024)
14e2e92d
DGM
334 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 4*1024*1024)
335 self.hash["source_dir2/huge2"] = self.create_file("source_dir2/huge2", 4*1024*1024)
336
337 # create first backup
338 deltatar.create_full_backup(
339 source_path="source_dir2",
340 backup_path="backup_dir",
341 max_volume_size=2)
342
343 assert os.path.exists("backup_dir")
344 assert os.path.exists(os.path.join("backup_dir",
345 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
346 if self.MODE_COMPRESSES:
347 n_vols = 1
348 else:
349 n_vols = 6
350 for i_vol in range(n_vols):
351 assert os.path.exists(os.path.join("backup_dir",
352 deltatar.volume_name_func("backup_dir", True, i_vol)))
353 assert not os.path.exists(os.path.join("backup_dir",
354 deltatar.volume_name_func("backup_dir", True, n_vols)))
14e2e92d
DGM
355
356 shutil.rmtree("source_dir2")
357
358 index_filename = deltatar.index_name_func(True)
359 index_path = os.path.join("backup_dir", index_filename)
360
361 deltatar.restore_backup(target_path="source_dir2",
362 backup_indexes_paths=[index_path])
363
364 for key, value in self.hash.items():
365 assert os.path.exists(key)
366 if value:
367 assert value == self.md5sum(key)
368
369
9eae9a1f
ERE
370 def test_full_backup_index_extra_data(self):
371 '''
372 Tests that the index file for a full backup can store extra_data and
373 that this data can be retrieved.
374 '''
f698c99c
PG
375 password, paramversion = self.ENCRYPTION or (None, None)
376 deltatar = DeltaTar(mode=self.MODE, password=password,
377 crypto_paramversion=paramversion,
9eae9a1f
ERE
378 logger=self.consoleLogger)
379
380 extra_data = dict(
381 hola="caracola",
382 otra_cosa=[1, "lista"],
383 y_otra=dict(bola=1.1)
384 )
385
386 deltatar.create_full_backup(
387 source_path="source_dir",
388 backup_path="backup_dir",
389 extra_data=extra_data)
390
391 index_filename = deltatar.index_name_func(is_full=True)
392 index_path = os.path.join("backup_dir", index_filename)
393
394 # iterate_index_path retrieves extra_data, and thus we can then compare
395 index_it = deltatar.iterate_index_path(index_path)
396 self.assertEqual(index_it.extra_data, extra_data)
397
398
399 def test_diff_backup_index_extra_data(self):
400 '''
401 Tests that the index file for a diff backup can store extra_data and
402 that this data can be retrieved.
403 '''
f698c99c
PG
404 password, paramversion = self.ENCRYPTION or (None, None)
405 deltatar = DeltaTar(mode=self.MODE, password=password,
406 crypto_paramversion=paramversion,
9eae9a1f
ERE
407 logger=self.consoleLogger)
408
409 extra_data = dict(
410 hola="caracola",
411 otra_cosa=[1, "lista"],
412 y_otra=dict(bola=1.1)
413 )
414 # do first backup
415 deltatar.create_full_backup(
416 source_path="source_dir",
417 backup_path="backup_dir")
418
419
420 prev_index_filename = deltatar.index_name_func(is_full=True)
421 prev_index_path = os.path.join("backup_dir", prev_index_filename)
422
423 # create empty diff backup
424 deltatar.create_diff_backup("source_dir", "backup_dir2",
425 prev_index_path, extra_data=extra_data)
426
427 index_filename = deltatar.index_name_func(is_full=False)
428 index_path = os.path.join("backup_dir2", index_filename)
429
430 # iterate_index_path retrieves extra_data, and thus we can then compare
431 index_it = deltatar.iterate_index_path(index_path)
432 self.assertEqual(index_it.extra_data, extra_data)
433
8825be52
DGM
434 def test_restore_multivol2(self):
435 '''
436 Creates a full backup without any filtering with multiple volumes and
437 restore it.
438 '''
f698c99c
PG
439 password, paramversion = self.ENCRYPTION or (None, None)
440 deltatar = DeltaTar(mode=self.MODE, password=password,
441 crypto_paramversion=paramversion,
8825be52
DGM
442 logger=self.consoleLogger)
443
188b845d 444 shutil.copytree(self.GIT_DIR, "source_dir2")
8825be52
DGM
445
446 # create first backup
447 deltatar.create_full_backup(
448 source_path="source_dir2",
449 backup_path="backup_dir",
450 max_volume_size=1)
451
452 assert os.path.exists("backup_dir")
453 assert os.path.exists(os.path.join("backup_dir",
454 deltatar.volume_name_func("backup_dir", True, 0)))
8825be52
DGM
455
456 shutil.rmtree("source_dir2")
457
458 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
459 tar_path = os.path.join("backup_dir", tar_filename)
460
461 # this should automatically restore all volumes
462 deltatar.restore_backup(target_path="source_dir2",
463 backup_tar_path=tar_path)
464
188b845d 465 self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
8825be52 466
b8fc2f5d
ERE
467 def test_restore_multivol_manual_from_index(self):
468 '''
469 Creates a full backup without any filtering with multiple volumes and
470 restore it.
471 '''
472 # this test only works for uncompressed or concat compressed modes
473 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
474 raise SkipTest('this test only works for uncompressed '
475 'or concat compressed modes')
b8fc2f5d 476
f698c99c
PG
477 password, paramversion = self.ENCRYPTION or (None, None)
478 deltatar = DeltaTar(mode=self.MODE, password=password,
479 crypto_paramversion=paramversion,
b8fc2f5d
ERE
480 logger=self.consoleLogger)
481
b8fc2f5d
ERE
482 self.hash = dict()
483 os.makedirs('source_dir2')
484 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
485 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
486
487 # create first backup
488 deltatar.create_full_backup(
489 source_path="source_dir2",
490 backup_path="backup_dir",
491 max_volume_size=1)
492
493 assert os.path.exists("backup_dir")
494 assert os.path.exists(os.path.join("backup_dir",
495 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
496 if self.MODE_COMPRESSES:
497 n_vols = 1
498 else:
499 n_vols = 2
500 for i_vol in range(n_vols):
501 assert os.path.exists(os.path.join("backup_dir",
502 deltatar.volume_name_func("backup_dir", True, i_vol)))
503 assert not os.path.exists(os.path.join("backup_dir",
504 deltatar.volume_name_func("backup_dir", True, n_vols)))
b8fc2f5d
ERE
505
506 shutil.rmtree("source_dir2")
507
508 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
509 tar_path = os.path.join("backup_dir", tar_filename)
510
511 index_filename = deltatar.index_name_func(True)
512 index_path = os.path.join("backup_dir", index_filename)
513
514 # this should automatically restore the huge file
9eccb1c2 515 f = deltatar.open_auxiliary_file(index_path, 'r')
10798176 516 offset = None
2967b3e1
ERE
517 while True:
518 l = f.readline()
519 if not len(l):
520 break
be60ffd0 521 data = json.loads(l.decode('UTF-8'))
8adbe50d 522 if data.get('type', '') == 'file' and\
eb6d0069 523 deltatar.unprefixed(data['path']) == "huge":
b8fc2f5d
ERE
524 offset = data['offset']
525 break
526
10798176
CH
527 assert offset is not None
528
be60ffd0 529 fo = open(tar_path, 'rb')
b8fc2f5d
ERE
530 fo.seek(offset)
531 def new_volume_handler(mode, tarobj, base_name, volume_number):
f698c99c
PG
532 suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
533 if self.ENCRYPTION is not None:
534 # deltatar module is shadowed here
535 suf += "." + deltatar_PDTCRYPT_EXTENSION
b8fc2f5d 536 tarobj.open_volume(datetime.now().strftime(
f698c99c 537 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
b8fc2f5d
ERE
538 new_volume_handler = partial(new_volume_handler, self.MODE)
539
f698c99c
PG
540 crypto_ctx = None
541 if self.ENCRYPTION is not None:
542 crypto_ctx = crypto.Decrypt (password)
543
b8fc2f5d 544 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
b8fc2f5d 545 new_volume_handler=new_volume_handler,
f698c99c
PG
546 encryption=crypto_ctx)
547
8adbe50d
ERE
548 member = tarobj.next()
549 member.path = deltatar.unprefixed(member.path)
550 member.name = deltatar.unprefixed(member.name)
551 tarobj.extract(member)
b8fc2f5d 552 tarobj.close()
c7609167 553 fo.close()
b8fc2f5d
ERE
554 assert self.hash['source_dir2/huge'] == self.md5sum('huge')
555
556 os.unlink("huge")
6c678f3a 557
f698c99c 558
9e092947
PG
559 def test_restore_manual_from_index_twice (self):
560 """
561 Creates a full backup and restore the same file twice. This *must* fail
562 when encryption is active.
563
564 Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
565 backwards within the same file. This prevents the encryption layer from
566 exploding due to a reused IV in an overall valid archive.
567
568 This test anticipates possible future mistakes since it’s entirely
569 feasible to implement backward seeks for *_Stream* with concat mode.
570 """
571 # this test only works for uncompressed or concat compressed modes
572 if self.MODE.startswith("|") or self.MODE_COMPRESSES:
573 raise SkipTest("this test only works for uncompressed "
574 "or concat compressed modes")
575
576 password, paramversion = self.ENCRYPTION or (None, None)
577 deltatar = DeltaTar(mode=self.MODE, password=password,
578 crypto_paramversion=paramversion,
579 logger=self.consoleLogger)
580
581 self.hash = dict()
582 os.makedirs("source_dir2")
583 self.hash["source_dir2/samefile"] = \
584 self.create_file("source_dir2/samefile", 1 * 1024)
585
586 # create first backup
587 deltatar.create_full_backup(
588 source_path="source_dir2",
589 backup_path="backup_dir")
590
591 assert os.path.exists("backup_dir")
592 assert os.path.exists(os.path.join("backup_dir",
593 deltatar.volume_name_func("backup_dir", True, 0)))
594
595 shutil.rmtree("source_dir2")
596
597 tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
598 tar_path = os.path.join("backup_dir", tar_filename)
599
600 index_filename = deltatar.index_name_func(True)
601 index_path = os.path.join("backup_dir", index_filename)
602
603 f = deltatar.open_auxiliary_file(index_path, "r")
604 offset = None
605 while True:
606 l = f.readline()
607 if not len(l):
608 break
609 data = json.loads(l.decode("UTF-8"))
610 if data.get("type", "") == "file" and\
611 deltatar.unprefixed(data["path"]) == "samefile":
612 offset = data["offset"]
613 break
614
615 assert offset is not None
616
617 fo = open(tar_path, "rb")
618 fo.seek(offset)
619
620 crypto_ctx = None
621 if self.ENCRYPTION is not None:
622 crypto_ctx = crypto.Decrypt (password)
623
624 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
625 encryption=crypto_ctx)
626 member = tarobj.next()
627 member.path = deltatar.unprefixed(member.path)
628 member.name = deltatar.unprefixed(member.name)
629
630 # extract once …
631 tarobj.extract(member)
632 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
633
634 # … and twice
635 try:
636 tarobj.extract(member)
637 except tarfile.StreamError:
638 if crypto_ctx is not None:
639 pass # good: seeking backwards not allowed
640 else:
641 raise
642 tarobj.close()
643 fo.close()
644 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
645
646 os.unlink("samefile")
647
648
11684b1d
ERE
649 def test_restore_from_index(self):
650 '''
55b8686d 651 Restores a full backup using an index file.
11684b1d 652 '''
11684b1d 653 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
654 raise SkipTest('this test only works for uncompressed '
655 'or concat compressed modes')
11684b1d 656
f698c99c
PG
657 password, paramversion = self.ENCRYPTION or (None, None)
658 deltatar = DeltaTar(mode=self.MODE, password=password,
659 crypto_paramversion=paramversion,
11684b1d
ERE
660 logger=self.consoleLogger)
661
662 # create first backup
663 deltatar.create_full_backup(
664 source_path="source_dir",
55b8686d 665 backup_path="backup_dir")
11684b1d
ERE
666
667 shutil.rmtree("source_dir")
668
669 # this should automatically restore all volumes
670 index_filename = deltatar.index_name_func(True)
671 index_path = os.path.join("backup_dir", index_filename)
672
673 deltatar.restore_backup(target_path="source_dir",
674 backup_indexes_paths=[index_path])
675
be60ffd0 676 for key, value in self.hash.items():
11684b1d
ERE
677 assert os.path.exists(key)
678 if value:
679 assert value == self.md5sum(key)
6c678f3a 680
b8fc2f5d
ERE
681 def test_restore_multivol_from_index(self):
682 '''
683 Restores a full multivolume backup using an index file.
684 '''
b8fc2f5d 685 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
686 raise SkipTest('this test only works for uncompressed '
687 'or concat compressed modes')
b8fc2f5d 688
f698c99c
PG
689 password, paramversion = self.ENCRYPTION or (None, None)
690 deltatar = DeltaTar(mode=self.MODE, password=password,
691 crypto_paramversion=paramversion,
b8fc2f5d
ERE
692 logger=self.consoleLogger)
693
694 # create first backup
695 deltatar.create_full_backup(
696 source_path="source_dir",
697 backup_path="backup_dir",
8390c925 698 max_volume_size=2)
b8fc2f5d
ERE
699
700 shutil.rmtree("source_dir")
701
702 # this should automatically restore all volumes
703 index_filename = deltatar.index_name_func(True)
704 index_path = os.path.join("backup_dir", index_filename)
705
706 deltatar.restore_backup(target_path="source_dir",
707 backup_indexes_paths=[index_path])
708
be60ffd0 709 for key, value in self.hash.items():
b8fc2f5d
ERE
710 assert os.path.exists(key)
711 if value:
712 assert value == self.md5sum(key)
da26094a 713
c1af2184
ERE
714 def test_create_basic_filtering(self):
715 '''
716 Tests create backup basic filtering.
717 '''
f698c99c
PG
718 password, paramversion = self.ENCRYPTION or (None, None)
719 deltatar = DeltaTar(mode=self.MODE, password=password,
720 crypto_paramversion=paramversion,
c1af2184 721 logger=self.consoleLogger,
eb6d0069
CH
722 included_files=["test", "small"],
723 excluded_files=["test/huge"])
c1af2184
ERE
724
725 # create first backup
726 deltatar.create_full_backup(
727 source_path="source_dir",
728 backup_path="backup_dir")
729
730 assert os.path.exists("backup_dir")
731 shutil.rmtree("source_dir")
732
733 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
734 tar_path = os.path.join("backup_dir", tar_filename)
735
736 deltatar.restore_backup(target_path="source_dir",
737 backup_tar_path=tar_path)
738
eb6d0069
CH
739 assert os.path.exists("source_dir/small")
740 assert os.path.exists("source_dir/test")
741 assert os.path.exists("source_dir/test/huge2")
742 assert os.path.exists("source_dir/test/test2")
c1af2184 743
eb6d0069
CH
744 assert not os.path.exists("source_dir/test/huge")
745 assert not os.path.exists("source_dir/big")
c1af2184 746
862b3726
ERE
747 def test_create_filter_func(self):
748 '''
749 Tests create backup basic filtering.
750 '''
751 visited_paths = []
752 def filter_func(visited_paths, path):
753 if path not in visited_paths:
754 visited_paths.append(path)
755 return True
756
757 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
758
759 password, paramversion = self.ENCRYPTION or (None, None)
760 deltatar = DeltaTar(mode=self.MODE, password=password,
761 crypto_paramversion=paramversion,
862b3726 762 logger=self.consoleLogger,
eb6d0069
CH
763 included_files=["test", "small"],
764 excluded_files=["test/huge"],
862b3726
ERE
765 filter_func=filter_func)
766
767 # create first backup
768 deltatar.create_full_backup(
769 source_path="source_dir",
770 backup_path="backup_dir")
771
772 assert os.path.exists("backup_dir")
773 shutil.rmtree("source_dir")
774
775 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
776 tar_path = os.path.join("backup_dir", tar_filename)
777
778 deltatar.restore_backup(target_path="source_dir",
779 backup_tar_path=tar_path)
9af328e2 780 assert set(visited_paths) == set([
eb6d0069
CH
781 'small',
782 'test',
783 'test/huge2',
784 'test/test2'
9af328e2 785 ])
862b3726
ERE
786
787 def test_create_filter_out_func(self):
788 '''
789 Tests create backup basic filtering.
790 '''
791 visited_paths = []
792 def filter_func(visited_paths, path):
793 '''
794 Filter out everything
795 '''
796 if path not in visited_paths:
797 visited_paths.append(path)
798 return False
799
800 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
801
802 password, paramversion = self.ENCRYPTION or (None, None)
803 deltatar = DeltaTar(mode=self.MODE, password=password,
804 crypto_paramversion=paramversion,
862b3726 805 logger=self.consoleLogger,
eb6d0069
CH
806 included_files=["test", "small"],
807 excluded_files=["test/huge"],
862b3726
ERE
808 filter_func=filter_func)
809
810 # create first backup
811 deltatar.create_full_backup(
812 source_path="source_dir",
813 backup_path="backup_dir")
814
815 assert os.path.exists("backup_dir")
816 shutil.rmtree("source_dir")
817
818 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
819 tar_path = os.path.join("backup_dir", tar_filename)
820
821 deltatar.restore_backup(target_path="source_dir",
822 backup_tar_path=tar_path)
9af328e2 823 assert set(visited_paths) == set([
eb6d0069
CH
824 'small',
825 'test'
9af328e2 826 ])
862b3726
ERE
827
828 # check that effectively no file was backed up
eb6d0069
CH
829 assert not os.path.exists("source_dir/small")
830 assert not os.path.exists("source_dir/big")
831 assert not os.path.exists("source_dir/test")
862b3726 832
9af328e2
ERE
833 def test_restore_index_basic_filtering(self):
834 '''
835 Creates a backup, and then filter when doing the index based restore.
836 '''
f391d8e9 837 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
838 raise SkipTest('this test only works for uncompressed '
839 'or concat compressed modes')
f391d8e9 840
f698c99c
PG
841 password, paramversion = self.ENCRYPTION or (None, None)
842 deltatar = DeltaTar(mode=self.MODE, password=password,
843 crypto_paramversion=paramversion,
9af328e2
ERE
844 logger=self.consoleLogger)
845
846 # create first backup
847 deltatar.create_full_backup(
848 source_path="source_dir",
849 backup_path="backup_dir")
850
851 assert os.path.exists("backup_dir")
852 shutil.rmtree("source_dir")
853
854 index_filename = deltatar.index_name_func(True)
855 index_path = os.path.join("backup_dir", index_filename)
856
eb6d0069
CH
857 deltatar.included_files = ["test", "small"]
858 deltatar.excluded_files = ["test/huge"]
9af328e2
ERE
859 deltatar.restore_backup(target_path="source_dir",
860 backup_indexes_paths=[index_path])
861
eb6d0069
CH
862 assert os.path.exists("source_dir/small")
863 assert os.path.exists("source_dir/test")
864 assert os.path.exists("source_dir/test/huge2")
865 assert os.path.exists("source_dir/test/test2")
9af328e2 866
eb6d0069
CH
867 assert not os.path.exists("source_dir/test/huge")
868 assert not os.path.exists("source_dir/big")
9af328e2
ERE
869
870 def test_restore_index_filter_func(self):
871 '''
872 Creates a backup, and then filter when doing the index based restore,
873 using the filter function.
874 '''
f391d8e9 875 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
876 raise SkipTest('this test only works for uncompressed '
877 'or concat compressed modes')
f391d8e9 878
9af328e2
ERE
879 visited_paths = []
880 def filter_func(visited_paths, path):
881 if path not in visited_paths:
882 visited_paths.append(path)
883 return True
884
885 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
886
887 password, paramversion = self.ENCRYPTION or (None, None)
888 deltatar = DeltaTar(mode=self.MODE, password=password,
889 crypto_paramversion=paramversion,
9af328e2
ERE
890 logger=self.consoleLogger)
891
892 # create first backup
893 deltatar.create_full_backup(
894 source_path="source_dir",
895 backup_path="backup_dir")
896
897 assert os.path.exists("backup_dir")
898 shutil.rmtree("source_dir")
899
900 index_filename = deltatar.index_name_func(True)
901 index_path = os.path.join("backup_dir", index_filename)
902
eb6d0069
CH
903 deltatar.included_files = ["test", "small"]
904 deltatar.excluded_files = ["test/huge"]
9af328e2
ERE
905 deltatar.filter_func = filter_func
906 deltatar.restore_backup(target_path="source_dir",
907 backup_indexes_paths=[index_path])
908
909 assert set(visited_paths) == set([
eb6d0069
CH
910 'small',
911 'test',
912 'test/huge2',
913 'test/test2'
9af328e2
ERE
914 ])
915
e5f5681b
ERE
916 def test_restore_tar_basic_filtering(self):
917 '''
918 Creates a backup, and then filter when doing the tar based restore.
919 '''
f698c99c
PG
920 password, paramversion = self.ENCRYPTION or (None, None)
921 deltatar = DeltaTar(mode=self.MODE, password=password,
922 crypto_paramversion=paramversion,
e5f5681b
ERE
923 logger=self.consoleLogger)
924
925 # create first backup
926 deltatar.create_full_backup(
927 source_path="source_dir",
928 backup_path="backup_dir")
929
930 assert os.path.exists("backup_dir")
931 shutil.rmtree("source_dir")
932
eb6d0069
CH
933 deltatar.included_files = ["test", "small"]
934 deltatar.excluded_files = ["test/huge"]
e5f5681b
ERE
935
936 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
937 tar_path = os.path.join("backup_dir", tar_filename)
938
939 deltatar.restore_backup(target_path="source_dir",
940 backup_tar_path=tar_path)
941
eb6d0069
CH
942 assert os.path.exists("source_dir/small")
943 assert os.path.exists("source_dir/test")
944 assert os.path.exists("source_dir/test/huge2")
945 assert os.path.exists("source_dir/test/test2")
e5f5681b 946
eb6d0069
CH
947 assert not os.path.exists("source_dir/test/huge")
948 assert not os.path.exists("source_dir/big")
e5f5681b
ERE
949
950 def test_restore_tar_filter_func(self):
951 '''
952 Creates a backup, and then filter when doing the tar based restore,
953 using the filter function.
954 '''
955 visited_paths = []
956 def filter_func(visited_paths, path):
957 if path not in visited_paths:
958 visited_paths.append(path)
959 return True
960
961 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
962
963 password, paramversion = self.ENCRYPTION or (None, None)
964 deltatar = DeltaTar(mode=self.MODE, password=password,
965 crypto_paramversion=paramversion,
e5f5681b
ERE
966 logger=self.consoleLogger)
967
968 # create first backup
969 deltatar.create_full_backup(
970 source_path="source_dir",
971 backup_path="backup_dir")
972
973 assert os.path.exists("backup_dir")
974 shutil.rmtree("source_dir")
975
976 index_filename = deltatar.index_name_func(True)
977 index_path = os.path.join("backup_dir", index_filename)
978
eb6d0069
CH
979 deltatar.included_files = ["test", "small"]
980 deltatar.excluded_files = ["test/huge"]
e5f5681b
ERE
981 deltatar.filter_func = filter_func
982
983 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
984 tar_path = os.path.join("backup_dir", tar_filename)
985
986 deltatar.restore_backup(target_path="source_dir",
987 backup_tar_path=tar_path)
988 assert set(visited_paths) == set([
eb6d0069
CH
989 'small',
990 'test',
991 'test/huge2',
992 'test/test2'
e5f5681b
ERE
993 ])
994
974408b5 995 def test_filter_path_regexp(self):
0d5c1970
ERE
996 '''
997 Test specifically the deltatar.filter_path function with regular
998 expressions
999 '''
1000 included_files = [
eb6d0069
CH
1001 re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1002 re.compile('^yes$'),
1003 'testing'
0d5c1970
ERE
1004 ]
1005 excluded_files = [
eb6d0069 1006 re.compile('^testing/in_the'),
0d5c1970
ERE
1007 ]
1008 deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1009 excluded_files=excluded_files)
1010
1011 # assert valid and invalid paths
eb6d0069
CH
1012 assert deltatar.filter_path('test/hola')
1013 assert deltatar.filter_path('test/hola/any/thing')
1014 assert deltatar.filter_path('test/caracola/caracolero')
1015 assert deltatar.filter_path('test/caracola/caracolero/yeah')
1016 assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1017 assert deltatar.filter_path('yes')
1018 assert deltatar.filter_path('testing')
1019 assert deltatar.filter_path('testing/yes')
1020 assert deltatar.filter_path('testing/in_th')
1021
1022 assert not deltatar.filter_path('something')
1023 assert not deltatar.filter_path('other/thing')
1024 assert not deltatar.filter_path('test_ing')
1025 assert not deltatar.filter_path('test/hola_lala')
1026 assert not deltatar.filter_path('test/agur')
1027 assert not deltatar.filter_path('testing_something')
1028 assert not deltatar.filter_path('yeso')
1029 assert not deltatar.filter_path('yes/o')
1030 assert not deltatar.filter_path('yes_o')
1031 assert not deltatar.filter_path('testing/in_the')
1032 assert not deltatar.filter_path('testing/in_the_field')
1033 assert not deltatar.filter_path('testing/in_the/field')
e5f5681b 1034
974408b5
ERE
1035 def test_filter_path_parent(self):
1036 '''
1037 Test specifically the deltatar.filter_path function for parent matching
1038 '''
1039 included_files = [
eb6d0069 1040 'testing/path/to/some/thing'
974408b5
ERE
1041 ]
1042 deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1043
1044 # assert valid and invalid paths
eb6d0069
CH
1045 assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1046 assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1047 assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1048 assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1049 assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1050 assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1051 assert deltatar.filter_path('testing/something/else') == NO_MATCH
974408b5
ERE
1052
1053 def test_parent_matching_simple_full_backup(self):
1054 '''
1055 Create a full backup using parent matching
1056 '''
1057 included_files = [
eb6d0069 1058 'test/huge2'
974408b5 1059 ]
f698c99c
PG
1060
1061 password, paramversion = self.ENCRYPTION or (None, None)
1062 deltatar = DeltaTar(mode=self.MODE, password=password,
1063 crypto_paramversion=paramversion,
974408b5
ERE
1064 logger=self.consoleLogger,
1065 included_files=included_files)
1066
1067 # create first backup
1068 deltatar.create_full_backup(
1069 source_path="source_dir",
1070 backup_path="backup_dir")
1071
1072 assert os.path.exists("backup_dir")
1073 shutil.rmtree("source_dir")
1074
1075 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1076 tar_path = os.path.join("backup_dir", tar_filename)
1077
f698c99c 1078 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1079 logger=self.consoleLogger)
1080 deltatar.restore_backup(target_path="source_dir",
1081 backup_tar_path=tar_path)
1082
1083 assert os.path.exists('source_dir/test/huge2')
1084 assert os.path.exists('source_dir/test/')
1085 assert not os.path.exists('source_dir/test/huge')
1086 assert not os.path.exists('source_dir/big')
1087 assert not os.path.exists('source_dir/small')
1088
1089 def test_parent_matching_simple_full_backup_restore(self):
1090 '''
1091 Create a full backup and restores it using parent matching
1092 '''
1093 included_files = [
eb6d0069 1094 'test/huge2'
974408b5 1095 ]
f698c99c
PG
1096
1097 password, paramversion = self.ENCRYPTION or (None, None)
1098 deltatar = DeltaTar(mode=self.MODE, password=password,
1099 crypto_paramversion=paramversion,
974408b5
ERE
1100 logger=self.consoleLogger)
1101
1102 # create first backup
1103 deltatar.create_full_backup(
1104 source_path="source_dir",
1105 backup_path="backup_dir")
1106
1107 assert os.path.exists("backup_dir")
1108 shutil.rmtree("source_dir")
1109
1110 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1111 tar_path = os.path.join("backup_dir", tar_filename)
1112
f698c99c 1113 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1114 logger=self.consoleLogger,
1115 included_files=included_files)
1116 deltatar.restore_backup(target_path="source_dir",
1117 backup_tar_path=tar_path)
1118
1119 assert os.path.exists('source_dir/test/huge2')
1120 assert os.path.exists('source_dir/test/')
1121 assert not os.path.exists('source_dir/test/huge')
1122 assert not os.path.exists('source_dir/big')
1123 assert not os.path.exists('source_dir/small')
1124
1125 def test_parent_matching_index_full_backup_restore(self):
1126 '''
1127 Create a full backup and restores it using parent matching
1128 '''
1129 included_files = [
eb6d0069 1130 'test/huge2'
974408b5 1131 ]
f698c99c
PG
1132
1133 password, paramversion = self.ENCRYPTION or (None, None)
1134 deltatar = DeltaTar(mode=self.MODE, password=password,
1135 crypto_paramversion=paramversion,
974408b5
ERE
1136 logger=self.consoleLogger)
1137
1138 # create first backup
1139 deltatar.create_full_backup(
1140 source_path="source_dir",
1141 backup_path="backup_dir")
1142
1143 assert os.path.exists("backup_dir")
1144 shutil.rmtree("source_dir")
1145
1146 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1147 tar_path = os.path.join("backup_dir", tar_filename)
1148
f698c99c 1149 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1150 logger=self.consoleLogger,
1151 included_files=included_files)
1152 deltatar.restore_backup(target_path="source_dir",
1153 backup_tar_path=tar_path)
1154
1155 assert os.path.exists('source_dir/test/huge2')
1156 assert os.path.exists('source_dir/test/')
1157 assert not os.path.exists('source_dir/test/huge')
1158 assert not os.path.exists('source_dir/big')
1159 assert not os.path.exists('source_dir/small')
1160
d07c8065
ERE
1161 def test_collate_iterators(self):
1162 '''
1163 Tests the collate iterators functionality with two exact directories,
1164 using an index iterator from a backup and the exact same source dir.
1165 '''
f698c99c
PG
1166 password, paramversion = self.ENCRYPTION or (None, None)
1167 deltatar = DeltaTar(mode=self.MODE, password=password,
1168 crypto_paramversion=paramversion,
d07c8065
ERE
1169 logger=self.consoleLogger)
1170
1171 # create first backup
1172 deltatar.create_full_backup(
1173 source_path="source_dir",
1174 backup_path="backup_dir")
1175
1176 assert os.path.exists("backup_dir")
1177
1178 cwd = os.getcwd()
1179 index_filename = deltatar.index_name_func(is_full=True)
1180 index_path = os.path.join(cwd, "backup_dir", index_filename)
1181 index_it = deltatar.iterate_index_path(index_path)
1182
1183 os.chdir('source_dir')
1184 dir_it = deltatar._recursive_walk_dir('.')
1185 path_it = deltatar.jsonize_path_iterator(dir_it)
1186
1187 try:
ea6d3c3e 1188 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
d07c8065
ERE
1189 assert deltatar._equal_stat_dicts(path1, path2)
1190 finally:
1191 os.chdir(cwd)
1192
1193 def test_collate_iterators_diffdirs(self):
1194 '''
1195 Use the collate iterators functionality with two different directories.
1196 It must behave in an expected way.
1197 '''
1198 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1199
f698c99c
PG
1200 password, paramversion = self.ENCRYPTION or (None, None)
1201 deltatar = DeltaTar(mode=self.MODE, password=password,
1202 crypto_paramversion=paramversion,
d07c8065
ERE
1203 logger=self.consoleLogger)
1204
1205 # create first backup
1206 deltatar.create_full_backup(
1207 source_path="source_dir",
1208 backup_path="backup_dir")
1209
1210 assert os.path.exists("backup_dir")
1211 self.hash["source_dir/z"] = self.create_file("source_dir/z", 100)
1212
1213 cwd = os.getcwd()
1214 index_filename = deltatar.index_name_func(is_full=True)
1215 index_path = os.path.join(cwd, "backup_dir", index_filename)
1216 index_it = deltatar.iterate_index_path(index_path)
1217
1218 os.chdir('source_dir')
1219 dir_it = deltatar._recursive_walk_dir('.')
1220 path_it = deltatar.jsonize_path_iterator(dir_it)
1221
1222 try:
ea6d3c3e 1223 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
eb6d0069 1224 if path2['path'] == 'z':
d07c8065
ERE
1225 assert not path1
1226 else:
1227 assert deltatar._equal_stat_dicts(path1, path2)
1228 finally:
1229 os.chdir(cwd)
1230
42d39ca7 1231 def test_collate_iterators_diffdirs2(self):
aae127d0 1232 '''
42d39ca7
ERE
1233 Use the collate iterators functionality with two different directories.
1234 It must behave in an expected way.
aae127d0 1235 '''
f698c99c
PG
1236 password, paramversion = self.ENCRYPTION or (None, None)
1237 deltatar = DeltaTar(mode=self.MODE, password=password,
1238 crypto_paramversion=paramversion,
42d39ca7
ERE
1239 logger=self.consoleLogger)
1240
1241 # create first backup
1242 deltatar.create_full_backup(
1243 source_path="source_dir",
1244 backup_path="backup_dir")
1245
1246 assert os.path.exists("backup_dir")
1247
1248 # add some new files and directories
1249 os.makedirs('source_dir/bigdir')
1250 self.hash["source_dir/bigdir"] = ""
1251 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1252 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
aae127d0
ERE
1253 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1254
42d39ca7
ERE
1255 cwd = os.getcwd()
1256 index_filename = deltatar.index_name_func(is_full=True)
1257 index_path = os.path.join(cwd, "backup_dir", index_filename)
1258 index_it = deltatar.iterate_index_path(index_path)
1259
1260 os.chdir('source_dir')
1261 dir_it = deltatar._recursive_walk_dir('.')
1262 path_it = deltatar.jsonize_path_iterator(dir_it)
1263
1264 visited_pairs = []
1265
1266 try:
ea6d3c3e 1267 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
42d39ca7
ERE
1268 visited_pairs.append(
1269 (deltatar.unprefixed(path1['path']) if path1 else None,
1270 path2['path'] if path2 else None)
1271 )
1272 finally:
1273 assert visited_pairs == [
eb6d0069
CH
1274 (u'big', u'big'),
1275 (None, u'bigdir'),
1276 (u'small', u'small'),
1277 (u'test', u'test'),
1278 (None, u'zzzz'),
1279 (None, u'bigdir/a'),
1280 (None, u'bigdir/b'),
1281 (u'test/huge', u'test/huge'),
1282 (u'test/huge2', u'test/huge2'),
1283 (u'test/test2', u'test/test2'),
42d39ca7
ERE
1284 ]
1285 os.chdir(cwd)
1286
1287 def test_create_empty_diff_backup(self):
1288 '''
1289 Creates an empty (no changes) backup diff
1290 '''
f698c99c
PG
1291 password, paramversion = self.ENCRYPTION or (None, None)
1292 deltatar = DeltaTar(mode=self.MODE, password=password,
1293 crypto_paramversion=paramversion,
aae127d0
ERE
1294 logger=self.consoleLogger)
1295
1296 # create first backup
1297 deltatar.create_full_backup(
1298 source_path="source_dir",
1299 backup_path="backup_dir")
1300
1301 prev_index_filename = deltatar.index_name_func(is_full=True)
1302 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1303
1304 deltatar.create_diff_backup("source_dir", "backup_dir2",
1305 prev_index_path)
1306
1307 # check index items
df86af81
ERE
1308 index_path = os.path.join("backup_dir2",
1309 deltatar.index_name_func(is_full=False))
aae127d0 1310 index_it = deltatar.iterate_index_path(index_path)
82de3376 1311 n = 0
aae127d0 1312 for i in index_it:
82de3376 1313 n += 1
aae127d0 1314 assert i[0]['path'].startswith("list://")
9af328e2 1315
ea6d3c3e 1316 assert n == 6
8adbe50d
ERE
1317
1318 # check the tar file
82de3376
ERE
1319 assert os.path.exists("backup_dir2")
1320 shutil.rmtree("source_dir")
1321
df86af81
ERE
1322 tar_filename = deltatar.volume_name_func('backup_dir2',
1323 is_full=False, volume_number=0)
82de3376
ERE
1324 tar_path = os.path.join("backup_dir2", tar_filename)
1325
1326 # no file restored, because the diff was empty
1327 deltatar.restore_backup(target_path="source_dir",
1328 backup_tar_path=tar_path)
1329 assert len(os.listdir("source_dir")) == 0
1330
8adbe50d 1331
42d39ca7
ERE
1332 def test_create_diff_backup1(self):
1333 '''
1334 Creates a diff backup when there are new files
1335 '''
f698c99c
PG
1336 password, paramversion = self.ENCRYPTION or (None, None)
1337 deltatar = DeltaTar(mode=self.MODE, password=password,
1338 crypto_paramversion=paramversion,
42d39ca7
ERE
1339 logger=self.consoleLogger)
1340
1341 # create first backup
1342 deltatar.create_full_backup(
1343 source_path="source_dir",
1344 backup_path="backup_dir")
1345
1346 prev_index_filename = deltatar.index_name_func(is_full=True)
1347 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1348
1349 # add some new files and directories
1350 os.makedirs('source_dir/bigdir')
1351 self.hash["source_dir/bigdir"] = ""
0519f161 1352 os.unlink("source_dir/small")
42d39ca7
ERE
1353 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1354 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1355 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1356
1357 deltatar.create_diff_backup("source_dir", "backup_dir2",
1358 prev_index_path)
1359
1360 # check index items
df86af81 1361 index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
42d39ca7 1362 index_it = deltatar.iterate_index_path(index_path)
df86af81
ERE
1363 l = [i[0]['path'] for i in index_it]
1364
1365 assert l == [
eb6d0069
CH
1366 'list://big',
1367 'snapshot://bigdir',
1368 'delete://small',
1369 'list://test',
1370 'snapshot://zzzz',
1371 'snapshot://bigdir/a',
1372 'snapshot://bigdir/b',
1373 'list://test/huge',
1374 'list://test/huge2',
1375 'list://test/test2',
df86af81 1376 ]
42d39ca7
ERE
1377
1378 # check the tar file
1379 assert os.path.exists("backup_dir2")
1380 shutil.rmtree("source_dir")
1381
0519f161
ERE
1382 # create source_dir with the small file, that will be then deleted by
1383 # the restore_backup
1384 os.mkdir("source_dir")
be60ffd0 1385 open("source_dir/small", 'wb').close()
0519f161 1386
df86af81
ERE
1387 tar_filename = deltatar.volume_name_func('backup_dir2',
1388 is_full=False, volume_number=0)
42d39ca7
ERE
1389 tar_path = os.path.join("backup_dir2", tar_filename)
1390
1391 # restore the backup, this will create only the new files
1392 deltatar.restore_backup(target_path="source_dir",
1393 backup_tar_path=tar_path)
a345b1c9
DGM
1394 # the order doesn't matter
1395 assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
42d39ca7 1396
df99a044
ERE
1397 def test_restore_from_index_diff_backup(self):
1398 '''
1399 Creates a full backup, modifies some files, creates a diff backup,
1400 then restores the diff backup from zero.
1401 '''
df99a044 1402 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1403 raise SkipTest('this test only works for uncompressed '
1404 'or concat compressed modes')
df99a044 1405
f698c99c
PG
1406 password, paramversion = self.ENCRYPTION or (None, None)
1407 deltatar = DeltaTar(mode=self.MODE, password=password,
1408 crypto_paramversion=paramversion,
df99a044
ERE
1409 logger=self.consoleLogger)
1410
1411 # create first backup
1412 deltatar.create_full_backup(
1413 source_path="source_dir",
1414 backup_path="backup_dir")
1415
1416 prev_index_filename = deltatar.index_name_func(is_full=True)
1417 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1418
1419 # add some new files and directories
1420 os.makedirs('source_dir/bigdir')
1421 self.hash["source_dir/bigdir"] = ""
1422 os.unlink("source_dir/small")
1423 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1424 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1425 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1426
1427 deltatar.create_diff_backup("source_dir", "backup_dir2",
1428 prev_index_path)
1429
1430 # apply diff backup in target_dir
df86af81 1431 index_filename = deltatar.index_name_func(is_full=False)
df99a044
ERE
1432 index_path = os.path.join("backup_dir2", index_filename)
1433 deltatar.restore_backup("target_dir",
1434 backup_indexes_paths=[index_path, prev_index_path])
1435
1436 # then compare the two directories source_dir and target_dir and check
1437 # they are the same
cbac9f0b 1438 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
df99a044
ERE
1439
1440 def test_restore_from_index_diff_backup2(self):
1441 '''
1442 Creates a full backup, modifies some files, creates a diff backup,
1443 then restores the diff backup with the full backup as a starting point.
1444 '''
df99a044 1445 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1446 raise SkipTest('this test only works for uncompressed '
1447 'or concat compressed modes')
df99a044 1448
f698c99c
PG
1449 password, paramversion = self.ENCRYPTION or (None, None)
1450 deltatar = DeltaTar(mode=self.MODE, password=password,
1451 crypto_paramversion=paramversion,
df99a044
ERE
1452 logger=self.consoleLogger)
1453
1454 # create first backup
1455 deltatar.create_full_backup(
1456 source_path="source_dir",
1457 backup_path="backup_dir")
1458
1459 prev_index_filename = deltatar.index_name_func(is_full=True)
1460 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1461
1462 # add some new files and directories
1463 os.makedirs('source_dir/bigdir')
1464 self.hash["source_dir/bigdir"] = ""
1465 os.unlink("source_dir/small")
1466 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1467 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1468 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
d86735e4 1469 shutil.rmtree("source_dir/test")
df99a044
ERE
1470
1471 deltatar.create_diff_backup("source_dir", "backup_dir2",
1472 prev_index_path)
1473
1474 # first restore initial backup in target_dir
df86af81 1475 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
df99a044
ERE
1476 tar_path = os.path.join("backup_dir", tar_filename)
1477 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1478
1479 # then apply diff backup in target_dir
df86af81 1480 index_filename = deltatar.index_name_func(is_full=False)
df99a044
ERE
1481 index_path = os.path.join("backup_dir2", index_filename)
1482 deltatar.restore_backup("target_dir",
1483 backup_indexes_paths=[index_path, prev_index_path])
1484
1485 # then compare the two directories source_dir and target_dir and check
1486 # they are the same
cbac9f0b
ERE
1487 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1488
1489 def test_restore_from_index_diff_backup3(self):
1490 '''
188b845d 1491 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
cbac9f0b
ERE
1492 diff backup, then restores the diff backup with the full backup as a
1493 starting point.
1494 '''
cbac9f0b 1495 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1496 raise SkipTest('this test only works for uncompressed '
1497 'or concat compressed modes')
cbac9f0b 1498
f698c99c
PG
1499 password, paramversion = self.ENCRYPTION or (None, None)
1500 deltatar = DeltaTar(mode=self.MODE, password=password,
1501 crypto_paramversion=paramversion,
cbac9f0b
ERE
1502 logger=self.consoleLogger)
1503
1504 shutil.rmtree("source_dir")
188b845d
TJ
1505 shutil.copytree(self.GIT_DIR, "source_dir")
1506 shutil.copytree(self.GIT_DIR, "source_dir_diff")
cbac9f0b
ERE
1507
1508 # create first backup
1509 deltatar.create_full_backup(
1510 source_path="source_dir",
1511 backup_path="backup_dir")
1512
1513 prev_index_filename = deltatar.index_name_func(is_full=True)
1514 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1515
1516 # alter the source_dir randomly
1517 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1518
1519 for path in source_it:
1520 # if path doesn't exist (might have previously removed) ignore it.
1521 # also ignore it (i.e. do not change it) 70% of the time
1522 if not os.path.exists(path) or random.random() < 0.7:
1523 continue
1524
1525 # remove the file
1526 if os.path.isdir(path):
1527 shutil.rmtree(path)
1528 else:
1529 os.unlink(path)
1530
1215b602 1531 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
cbac9f0b 1532 prev_index_path)
cbac9f0b
ERE
1533
1534 # first restore initial backup in target_dir
df86af81 1535 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
cbac9f0b
ERE
1536 tar_path = os.path.join("backup_dir", tar_filename)
1537 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1538
1539 # and check that target_dir equals to source_dir (which is the same as
188b845d 1540 # self.GIT_DIR initially)
cbac9f0b
ERE
1541 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1542
1543 # then apply diff backup in target_dir
df86af81 1544 index_filename = deltatar.index_name_func(is_full=False)
8825be52
DGM
1545 index_path = os.path.join("backup_dir2", index_filename)
1546 deltatar.restore_backup("target_dir",
1547 backup_indexes_paths=[index_path, prev_index_path])
1548
1549 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1550 # altered self.GIT_DIR directory)
8825be52
DGM
1551 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1552
1553 # then delete target_dir and apply diff backup from zero and check again
1554 shutil.rmtree("target_dir")
1555 deltatar.restore_backup("target_dir",
1556 backup_indexes_paths=[index_path, prev_index_path])
1557
1558 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1559 # altered self.GIT_DIR directory)
8825be52
DGM
1560 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1561
1562 def test_restore_from_index_diff_backup3_multivol(self):
1563 '''
188b845d 1564 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
8825be52
DGM
1565 diff backup, then restores the diff backup with the full backup as a
1566 starting point.
1567 '''
8825be52 1568 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1569 raise SkipTest('this test only works for uncompressed '
1570 'or concat compressed modes')
8825be52 1571
f698c99c
PG
1572 password, paramversion = self.ENCRYPTION or (None, None)
1573 deltatar = DeltaTar(mode=self.MODE, password=password,
1574 crypto_paramversion=paramversion,
8825be52
DGM
1575 logger=self.consoleLogger)
1576
1577 shutil.rmtree("source_dir")
188b845d
TJ
1578 shutil.copytree(self.GIT_DIR, "source_dir")
1579 shutil.copytree(self.GIT_DIR, "source_dir_diff")
8825be52
DGM
1580
1581 # create first backup
1582 deltatar.create_full_backup(
1583 source_path="source_dir",
1584 backup_path="backup_dir",
1585 max_volume_size=1)
1586
1587 prev_index_filename = deltatar.index_name_func(is_full=True)
1588 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1589
1590 # alter the source_dir randomly
1591 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1592
1593 for path in source_it:
1594 # if path doesn't exist (might have previously removed) ignore it.
1595 # also ignore it (i.e. do not change it) 70% of the time
1596 if not os.path.exists(path) or random.random() < 0.7:
1597 continue
1598
1599 # remove the file
1600 if os.path.isdir(path):
1601 shutil.rmtree(path)
1602 else:
1603 os.unlink(path)
1604
1605 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1606 prev_index_path, max_volume_size=1)
1607
1608 # first restore initial backup in target_dir
1609 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1610 tar_path = os.path.join("backup_dir", tar_filename)
1611 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1612
1613 # and check that target_dir equals to source_dir (which is the same as
188b845d 1614 # self.GIT_DIR initially)
8825be52
DGM
1615 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1616
1617 # then apply diff backup in target_dir
df86af81 1618 index_filename = deltatar.index_name_func(is_full=False)
cbac9f0b
ERE
1619 index_path = os.path.join("backup_dir2", index_filename)
1620 deltatar.restore_backup("target_dir",
1621 backup_indexes_paths=[index_path, prev_index_path])
1622
1623 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1624 # altered self.GIT_DIR directory)
cbac9f0b
ERE
1625 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1626
1627 # then delete target_dir and apply diff backup from zero and check again
1628 shutil.rmtree("target_dir")
1629 deltatar.restore_backup("target_dir",
1630 backup_indexes_paths=[index_path, prev_index_path])
1631
1632 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1633 # altered self.GIT_DIR directory)
cbac9f0b
ERE
1634 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1635
1636 def check_equal_dirs(self, path1, path2, deltatar):
1637 '''
1638 compare the two directories source_dir and target_dir and check
1639 # they are the same
1640 '''
eb6d0069 1641 source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
df99a044 1642 source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
eb6d0069 1643 target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
df99a044
ERE
1644 target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1645 while True:
1646 try:
be60ffd0
ERE
1647 sitem = next(source_it)
1648 titem = next(target_it)
df99a044
ERE
1649 except StopIteration:
1650 try:
be60ffd0 1651 titem = next(target_it)
df99a044
ERE
1652 raise Exception("iterators do not stop at the same time")
1653 except StopIteration:
1654 break
a638b8d7 1655 try:
5200d2aa 1656 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
be60ffd0 1657 except Exception as e:
5200d2aa
TJ
1658 print("SITEM: " + str(sitem))
1659 print("TITEM: " + str(titem))
a638b8d7 1660 raise e
df99a044 1661
f5d9144b
PG
1662 def test_create_no_symlinks(self):
1663 '''
1664 Creates a full backup from different varieties of symlinks. The
1665 extracted archive may not contain any symlinks but the file contents
1666 '''
1667
1668 os.system("rm -rf source_dir")
1669 os.makedirs("source_dir/symlinks")
1670 fd = os.open("source_dir/symlinks/valid_linkname",
1671 os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1672 os.write(fd, b"valid link target for symlink tests; please ignore\n")
1673 os.close(fd)
1674 # first one is good, the rest points nowhere
1675 self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1676 self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1677 self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1678 self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
f698c99c
PG
1679 password, paramversion = self.ENCRYPTION or (None, None)
1680 deltatar = DeltaTar(mode=self.MODE, password=password,
1681 crypto_paramversion=paramversion,
f5d9144b
PG
1682 logger=self.consoleLogger)
1683
1684 # create first backup
1685 deltatar.create_full_backup(source_path="source_dir",
1686 backup_path="backup_dir")
1687
1688 assert os.path.exists("backup_dir")
1689 shutil.rmtree("source_dir")
1690 assert not os.path.exists("source_dir")
1691
1692 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1693 tar_path = os.path.join("backup_dir", tar_filename)
1694
1695 deltatar.restore_backup(target_path="source_dir",
1696 backup_tar_path=tar_path)
1697
1698 for _r, _ds, fs in os.walk("source_dir/symlinks"):
1699 # only the valid link plus the linked file may be found in the
1700 # extracted archive
1701 assert len(fs) == 2
1702 for f in fs:
1703 # the link must have been resolved and file contents must match
1704 # the linked file
1705 assert not os.path.islink(f)
1706 with open("source_dir/symlinks/valid_linkname") as a:
1707 with open("source_dir/symlinks/whatever") as b:
1708 assert a.read() == b.read()
1709
83f5fd71
PG
1710 def test_restore_with_symlinks(self):
1711 '''
9b13f5c4
PG
1712 Creates a full backup containing different varieties of symlinks. All
1713 of them must be filtered out.
83f5fd71 1714 '''
f698c99c
PG
1715 password, paramversion = self.ENCRYPTION or (None, None)
1716 deltatar = DeltaTar(mode=self.MODE, password=password,
1717 crypto_paramversion=paramversion,
83f5fd71
PG
1718 logger=self.consoleLogger)
1719
1720 # create first backup
1721 deltatar.create_full_backup(source_path="source_dir",
1722 backup_path="backup_dir")
1723
1724 assert os.path.exists("backup_dir")
1725 shutil.rmtree("source_dir")
1726
1727 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1728 tar_path = os.path.join("backup_dir", tar_filename)
1729
1730 # add symlinks to existing archive
1731
9b13f5c4 1732 def add_symlink (a, name, dst):
83f5fd71
PG
1733 l = tarfile.TarInfo("snapshot://%s" % name)
1734 l.type = tarfile.SYMTYPE
1735 l.linkname = dst
1736 a.addfile(l)
9b13f5c4 1737 return name
83f5fd71 1738
5faea0e1
PG
1739 try:
1740 with tarfile.open(tar_path,mode="a") as a:
1741 checkme = \
1742 [ add_symlink(a, "symlinks/foo", "internal-file")
1743 , add_symlink(a, "symlinks/bar", "/absolute/path")
1744 , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1745 except tarfile.ReadError as e:
1746 if self.MODE == '#' or self.MODE.endswith ("gz"):
1747 checkme = []
1748 else:
1749 raise
1750 except ValueError as e:
1751 if self.MODE.startswith ('#'):
1752 checkme = []
1753 else:
1754 raise
83f5fd71
PG
1755
1756 deltatar.restore_backup(target_path="source_dir",
1757 backup_tar_path=tar_path)
1758
1759 # check what happened to our symlinks
9b13f5c4
PG
1760 for name in checkme:
1761 fullpath = os.path.join("source_dir", name)
1762 assert not os.path.exists(fullpath)
f5d9144b 1763
43ce978b
PG
1764 def test_restore_malicious_symlinks(self):
1765 '''
1766 Creates a full backup containing a symlink and a file of the same name.
1767 This simulates a symlink attack with a link pointing to some external
1768 path that is abused to write outside the extraction prefix.
1769 '''
f698c99c
PG
1770 password, paramversion = self.ENCRYPTION or (None, None)
1771 deltatar = DeltaTar(mode=self.MODE, password=password,
1772 crypto_paramversion=paramversion,
43ce978b
PG
1773 logger=self.consoleLogger)
1774
1775 # create first backup
1776 deltatar.create_full_backup(source_path="source_dir",
1777 backup_path="backup_dir")
1778
1779 assert os.path.exists("backup_dir")
1780 shutil.rmtree("source_dir")
1781
1782 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1783 tar_path = os.path.join("backup_dir", tar_filename)
1784
1785 # add symlinks to existing archive
1786
1787 def add_symlink (a, name, dst):
1788 l = tarfile.TarInfo("snapshot://%s" % name)
1789 l.type = tarfile.SYMTYPE
1790 l.linkname = dst
1791 a.addfile(l)
1792
1793 def add_file (a, name):
1794 f = tarfile.TarInfo("snapshot://%s" % name)
1795 f.type = tarfile.REGTYPE
1796 a.addfile(f)
43ce978b
PG
1797
1798 testpath = "symlinks/pernicious-link"
9b13f5c4
PG
1799 testdst = "/tmp/does/not/exist"
1800
5faea0e1
PG
1801 try:
1802 with tarfile.open(tar_path, mode="a") as a:
1803 add_symlink(a, testpath, testdst)
1804 add_symlink(a, testpath, testdst+"X")
1805 add_symlink(a, testpath, testdst+"XXX")
1806 add_file(a, testpath)
1807 except tarfile.ReadError as e:
1808 if self.MODE == '#' or self.MODE.endswith ("gz"):
1809 pass
1810 else:
1811 raise
1812 except ValueError as e:
1813 if self.MODE.startswith ('#'):
1814 pass # O_APPEND of concat archives not feasible
1815 else:
1816 raise
43ce978b 1817
43ce978b
PG
1818 deltatar.restore_backup(target_path="source_dir",
1819 backup_tar_path=tar_path)
1820
9b13f5c4
PG
1821 # check whether the link was extracted; deltatar seems to only ever
1822 # retrieve the first item it finds for a given path which in the case
1823 # at hand is a symlink to some non-existent path
43ce978b 1824 fullpath = os.path.join("source_dir", testpath)
9b13f5c4 1825 assert not os.path.exists(fullpath)
43ce978b 1826
da26094a
ERE
1827class DeltaTar2Test(DeltaTarTest):
1828 '''
1829 Same as DeltaTar but with specific ":" mode
1830 '''
1831 MODE = ':'
1832
1833
1834class DeltaTarStreamTest(DeltaTarTest):
1835 '''
1836 Same as DeltaTar but with specific uncompressed stream mode
1837 '''
1838 MODE = '|'
1839
1840
1841class DeltaTarGzipTest(DeltaTarTest):
1842 '''
1843 Same as DeltaTar but with specific gzip mode
1844 '''
1845 MODE = ':gz'
8ea0be50 1846 MODE_COMPRESSES = True
da26094a
ERE
1847
1848
da26094a
ERE
1849class DeltaTarGzipStreamTest(DeltaTarTest):
1850 '''
1851 Same as DeltaTar but with specific gzip stream mode
1852 '''
1853 MODE = '|gz'
8ea0be50 1854 MODE_COMPRESSES = True
da26094a
ERE
1855
1856
bd011242
CH
1857@skip('Bz2 tests are too slow..')
1858class DeltaTarBz2Test(DeltaTarTest):
1859 '''
1860 Same as DeltaTar but with specific bz2 mode
1861 '''
1862 MODE = ':bz2'
8ea0be50 1863 MODE_COMPRESSES = True
ea6d3c3e 1864
bd011242
CH
1865
1866@skip('Bz2 tests are too slow..')
1867class DeltaTarBz2StreamTest(DeltaTarTest):
1868 '''
1869 Same as DeltaTar but with specific bz2 stream mode
1870 '''
1871 MODE = '|bz2'
8ea0be50 1872 MODE_COMPRESSES = True
da26094a
ERE
1873
1874
1875class DeltaTarGzipConcatTest(DeltaTarTest):
1876 '''
1877 Same as DeltaTar but with specific gzip concat stream mode
1878 '''
1879 MODE = '#gz'
8ea0be50 1880 MODE_COMPRESSES = True
da26094a
ERE
1881
1882
da26094a
ERE
1883class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
1884 '''
1885 Same as DeltaTar but with specific gzip aes128 concat stream mode
1886 '''
f698c99c
PG
1887 MODE = '#gz'
1888 ENCRYPTION = ('some magic key', 1)
8ea0be50 1889 MODE_COMPRESSES = True
da26094a
ERE
1890
1891
ac5e4184
ERE
1892class DeltaTarAes128ConcatTest(DeltaTarTest):
1893 '''
1894 Same as DeltaTar but with specific aes128 concat stream mode
1895 '''
d1c38f40 1896 MODE = '#'
f698c99c 1897 ENCRYPTION = ('some magic key', 1)
ac5e4184
ERE
1898
1899