Extend unit test for larger than GCM encrytion size
[python-delta-tar] / testing / test_deltatar.py
CommitLineData
0708a374
ERE
1# Copyright (C) 2013 Intra2net AG
2#
3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU Lesser General Public License as published
5# by the Free Software Foundation; either version 3 of the License, or
6# (at your option) any later version.
7#
8# This program is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU Lesser General Public License for more details.
12#
13# You should have received a copy of the GNU General Public License
14# along with this program. If not, see
15# <http://www.gnu.org/licenses/lgpl-3.0.html>
16
17# Author: Eduardo Robles Elvira <edulix@wadobo.com>
18
f5d9144b 19import errno
0708a374 20import os
0d5c1970 21import re
cbac9f0b 22import random
e5c6ca04 23import shutil
0708a374 24import logging
b8fc2f5d
ERE
25import binascii
26import json
27from datetime import datetime
28from functools import partial
bd011242 29from unittest import skip, SkipTest
0708a374 30
83f5fd71 31import deltatar.tarfile as tarfile
f698c99c 32from deltatar.tarfile import TarFile
974408b5 33from deltatar.deltatar import DeltaTar, NO_MATCH, MATCH, PARENT_MATCH
f698c99c
PG
34from deltatar.deltatar import PDTCRYPT_EXTENSION as deltatar_PDTCRYPT_EXTENSION
35import deltatar.crypto as crypto
0708a374 36
0708a374
ERE
37from . import BaseTest
38from . import new_volume_handler
39
0708a374
ERE
40class DeltaTarTest(BaseTest):
41 """
42 Test backups
43 """
da26094a 44 MODE = ''
8ea0be50 45 MODE_COMPRESSES = False
da26094a 46
f698c99c 47 ENCRYPTION = None # (password : str, paramversion : int) option
da26094a 48
188b845d
TJ
49 GIT_DIR = '.git'
50
0708a374
ERE
51 def setUp(self):
52 '''
53 Create base test data
54 '''
a4e8b8af 55 self.pwd = os.getcwd()
cbac9f0b 56 os.system('rm -rf target_dir source_dir* backup_dir* huge')
0708a374
ERE
57 os.makedirs('source_dir/test/test2')
58 self.hash = dict()
e5c6ca04 59 self.hash["source_dir/test/test2"] = ''
0708a374
ERE
60 self.hash["source_dir/big"] = self.create_file("source_dir/big", 50000)
61 self.hash["source_dir/small"] = self.create_file("source_dir/small", 100)
62 self.hash["source_dir/test/huge"] = self.create_file("source_dir/test/huge", 700000)
d5361dac 63 self.hash["source_dir/test/huge2"] = self.create_file("source_dir/test/huge2", 800000)
0708a374
ERE
64
65 self.consoleLogger = logging.StreamHandler()
66 self.consoleLogger.setLevel(logging.DEBUG)
67
188b845d
TJ
68 if not os.path.isdir(self.GIT_DIR):
69 # Not running inside git tree, take our
70 # own testing directory as source.
71 self.GIT_DIR = 'testing'
72
73 if not os.path.isdir(self.GIT_DIR):
74 raise Exception('No input directory found: ' + self.GIT_DIR)
75
0708a374
ERE
76 def tearDown(self):
77 '''
cb7a3911 78 Remove temporal files created by unit tests and reset globals.
0708a374 79 '''
a4e8b8af 80 os.chdir(self.pwd)
cbac9f0b 81 os.system("rm -rf source_dir target_dir source_dir* backup_dir* huge")
cb7a3911
PG
82 _ = crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
83 ("I am fully aware that this will void my warranty.")
0708a374 84
b8fc2f5d 85 def test_restore_simple_full_backup(self):
0708a374
ERE
86 '''
87 Creates a full backup without any filtering and restores it.
88 '''
f698c99c
PG
89 password, paramversion = self.ENCRYPTION or (None, None)
90 deltatar = DeltaTar(mode=self.MODE, password=password,
91 crypto_paramversion=paramversion,
da26094a 92 logger=self.consoleLogger)
0708a374
ERE
93
94 # create first backup
95 deltatar.create_full_backup(
96 source_path="source_dir",
97 backup_path="backup_dir")
98
e5c6ca04
ERE
99 assert os.path.exists("backup_dir")
100 shutil.rmtree("source_dir")
101
102 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
103 tar_path = os.path.join("backup_dir", tar_filename)
104
e5c6ca04
ERE
105 deltatar.restore_backup(target_path="source_dir",
106 backup_tar_path=tar_path)
107
be60ffd0 108 for key, value in self.hash.items():
e5c6ca04
ERE
109 assert os.path.exists(key)
110 if value:
e82f14f5 111 assert value == self.md5sum(key)
da26094a 112
cb7a3911
PG
113
114 def test_create_backup_max_file_length (self):
115 """
116 Creates a full backup including one file that exceeds the (purposely
117 lowered) upper bound on GCM encrypted objects. This will yield multiple
118 encrypted objects for one plaintext file.
119
120 Success is verified by splitting the archive at object boundaries and
121 counting the parts.
122 """
123 if self.MODE_COMPRESSES is True:
124 raise SkipTest ("GCM file length test not meaningful with compression.")
125 if self.ENCRYPTION is None:
126 raise SkipTest ("GCM file length applies only to encrypted backups.")
127
128 new_max = 20000 # cannot be less than tar block size
129 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
130 ("I am fully aware that this will void my warranty.",
131 new_max)
132
133 password, paramversion = self.ENCRYPTION
134 deltatar = DeltaTar (mode=self.MODE, password=password,
135 crypto_paramversion=paramversion,
136 logger=self.consoleLogger)
137
138 self.hash = dict ()
139 os.makedirs ("source_dir2")
140 for f, s in [("empty" , 0) # 1 tar objects
141 ,("slightly_larger", new_max + 1) # 2
142 ,("twice" , 2 * new_max) # 3
143 ]:
144 f = "source_dir2/%s" % f
145 self.hash [f] = self.create_file (f, s)
146
147 deltatar.create_full_backup \
148 (source_path="source_dir2", backup_path="backup_dir")
149
150 assert os.path.exists ("backup_dir")
151 shutil.rmtree ("source_dir2")
152
153 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
154 backup_path = os.path.join("backup_dir", backup_filename)
155
156 # split the resulting archive into its constituents without
157 # decrypting
158 ret = os.system("python3 ./deltatar/crypto.py process -D -S -i - "
159 "-o backup_dir/split <\'%s\'" % backup_path)
160
161 assert os.path.exists ("backup_dir/split")
162
163 dents = os.listdir ("backup_dir/split")
164 assert len (dents) == 6
165
166
167 def test_restore_backup_max_file_length (self):
168 """
169 Creates a full backup including one file that exceeds the (purposely
170 lowered) upper bound on GCM encrypted objects. This will yield two
171 encrypted objects for one plaintext file.
172
173 Success is verified by splitting the archive at object boundaries and
174 counting the parts.
175 """
176 if self.MODE_COMPRESSES is True:
177 raise SkipTest ("GCM file length test not meaningful with compression.")
178 if self.ENCRYPTION is None:
179 raise SkipTest ("GCM file length applies only to encrypted backups.")
180
181 new_max = 20000 # cannot be less than tar block size
182 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
183 ("I am fully aware that this will void my warranty.",
184 new_max)
185
186 password, paramversion = self.ENCRYPTION
187 deltatar = DeltaTar (mode=self.MODE, password=password,
188 crypto_paramversion=paramversion,
189 logger=self.consoleLogger)
190
191 self.hash = dict ()
192 os.makedirs ("source_dir2")
193 for f, s in [("empty" , 0) # 1 tar objects
ca520c21
TJ
194 ,("almost_large" , new_max - 1) # 2
195 ,("large" , new_max) # 3
196 ,("slightly_larger", new_max + 1) # 4
197 ,("twice" , 2 * new_max) # 5
198 ,("twice_plus_one" , (2 * new_max) + 1) # 6
cb7a3911
PG
199 ]:
200 f = "source_dir2/%s" % f
201 self.hash [f] = self.create_file (f, s)
202
203 deltatar.create_full_backup \
204 (source_path="source_dir2", backup_path="backup_dir")
205
206 assert os.path.exists ("backup_dir")
207 shutil.rmtree ("source_dir2")
208
209 backup_filename = deltatar.volume_name_func ("backup_dir", True, 0)
210 backup_path = os.path.join("backup_dir", backup_filename)
211
212 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
213 tar_path = os.path.join("backup_dir", tar_filename)
214
215 deltatar.restore_backup(target_path="source_dir2",
216 backup_tar_path=tar_path)
217
218 for key, value in self.hash.items():
219 assert os.path.exists(key)
220 if value:
221 assert value == self.md5sum(key)
222
223
fac2cfe1
PG
224 def test_create_backup_index_max_file_length (self):
225 """
226 Creates a full backup including one file that exceeds the (purposely
227 lowered) upper bound on GCM encrypted objects. This will yield two
228 encrypted objects for one plaintext file.
229
230 Success is verified by splitting the archive at object boundaries and
231 counting the parts.
232 """
233 if self.MODE_COMPRESSES is True:
234 raise SkipTest ("GCM file length test not meaningful with compression.")
235 if self.ENCRYPTION is None:
236 raise SkipTest ("GCM file length applies only to encrypted backups.")
237
238 new_max = 5000
239 crypto._testing_set_PDTCRYPT_MAX_OBJ_SIZE \
240 ("I am fully aware that this will void my warranty.",
241 new_max)
242
243 password, paramversion = self.ENCRYPTION
244 deltatar = DeltaTar (mode=self.MODE, password=password,
245 crypto_paramversion=paramversion,
246 logger=self.consoleLogger)
247
248 self.hash = dict ()
249 os.makedirs ("source_dir2")
250 for i in range (42):
251 f = "source_dir2/dummy_%rd" % i
252 self.hash [f] = self.create_file (f, i)
253
1c2f7f07 254 with self.assertRaises (crypto.InvalidFileCounter):
fac2cfe1
PG
255 deltatar.create_full_backup \
256 (source_path="source_dir2", backup_path="backup_dir")
fac2cfe1
PG
257 shutil.rmtree ("source_dir2")
258
259
6c678f3a
ERE
260 def test_check_index_checksum(self):
261 '''
262 Creates a full backup and checks the index' checksum of files
263 '''
f698c99c
PG
264 password, paramversion = self.ENCRYPTION or (None, None)
265 deltatar = DeltaTar(mode=self.MODE, password=password,
266 crypto_paramversion=paramversion,
6c678f3a
ERE
267 logger=self.consoleLogger)
268
269 # create first backup
270 deltatar.create_full_backup(
271 source_path="source_dir",
272 backup_path="backup_dir")
273
274
275 index_filename = deltatar.index_name_func(True)
276 index_path = os.path.join("backup_dir", index_filename)
277
be60ffd0 278 f = open(index_path, 'rb')
6c678f3a
ERE
279 crc = None
280 checked = False
281 began_list = False
be60ffd0
ERE
282 while True:
283 l = f.readline()
284 if l == b'':
285 break
286 if b'BEGIN-FILE-LIST' in l:
c2ffe2ec 287 crc = binascii.crc32(l) & 0xFFFFffff
6c678f3a 288 began_list = True
be60ffd0 289 elif b'END-FILE-LIST' in l:
6c678f3a
ERE
290 crc = binascii.crc32(l, crc) & 0xffffffff
291
292 # next line contains the crc
be60ffd0 293 data = json.loads(f.readline().decode("UTF-8"))
6c678f3a
ERE
294 assert data['type'] == 'file-list-checksum'
295 assert data['checksum'] == crc
296 checked = True
297 break
298 elif began_list:
299 crc = binascii.crc32(l, crc) & 0xffffffff
c7609167 300 f.close()
6c678f3a 301
b8fc2f5d
ERE
302
303 def test_restore_multivol(self):
d5361dac 304 '''
b8fc2f5d
ERE
305 Creates a full backup without any filtering with multiple volumes and
306 restore it.
d5361dac 307 '''
ba7760a7
CH
308 if ':gz' in self.MODE:
309 raise SkipTest('compression information is lost when creating '
310 'multiple volumes with no Stream')
311
f698c99c
PG
312 password, paramversion = self.ENCRYPTION or (None, None)
313 deltatar = DeltaTar(mode=self.MODE, password=password,
314 crypto_paramversion=paramversion,
d5361dac
ERE
315 logger=self.consoleLogger)
316
b8fc2f5d
ERE
317 self.hash = dict()
318 os.makedirs('source_dir2')
319 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
320 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
321
d5361dac
ERE
322 # create first backup
323 deltatar.create_full_backup(
b8fc2f5d 324 source_path="source_dir2",
d5361dac
ERE
325 backup_path="backup_dir",
326 max_volume_size=1)
327
328 assert os.path.exists("backup_dir")
329 assert os.path.exists(os.path.join("backup_dir",
330 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
331 if self.MODE_COMPRESSES:
332 n_vols = 1
333 else:
334 n_vols = 2
335 for i_vol in range(n_vols):
336 assert os.path.exists(os.path.join("backup_dir",
337 deltatar.volume_name_func("backup_dir", True, i_vol)))
338 assert not os.path.exists(os.path.join("backup_dir",
339 deltatar.volume_name_func("backup_dir", True, n_vols)))
d5361dac 340
b8fc2f5d 341 shutil.rmtree("source_dir2")
d5361dac
ERE
342
343 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
344 tar_path = os.path.join("backup_dir", tar_filename)
345
346 # this should automatically restore all volumes
b8fc2f5d 347 deltatar.restore_backup(target_path="source_dir2",
d5361dac
ERE
348 backup_tar_path=tar_path)
349
be60ffd0 350 for key, value in self.hash.items():
d5361dac
ERE
351 assert os.path.exists(key)
352 if value:
353 assert value == self.md5sum(key)
354
14e2e92d
DGM
355 def test_restore_multivol_split(self):
356 '''
357 Creates a full backup without any filtering with multiple volumes
358 with big files bigger than the max volume size and
359 restore it.
360 '''
f61e1822 361 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
362 raise SkipTest('this test only works for uncompressed '
363 'or concat compressed modes')
f61e1822 364
f698c99c
PG
365 password, paramversion = self.ENCRYPTION or (None, None)
366 deltatar = DeltaTar(mode=self.MODE, password=password,
367 crypto_paramversion=paramversion,
14e2e92d
DGM
368 logger=self.consoleLogger)
369
14e2e92d
DGM
370 self.hash = dict()
371 os.makedirs('source_dir2')
82f75df4 372 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 3*1024*1024)
14e2e92d
DGM
373 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 4*1024*1024)
374 self.hash["source_dir2/huge2"] = self.create_file("source_dir2/huge2", 4*1024*1024)
375
376 # create first backup
377 deltatar.create_full_backup(
378 source_path="source_dir2",
379 backup_path="backup_dir",
380 max_volume_size=2)
381
382 assert os.path.exists("backup_dir")
383 assert os.path.exists(os.path.join("backup_dir",
384 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
385 if self.MODE_COMPRESSES:
386 n_vols = 1
387 else:
388 n_vols = 6
389 for i_vol in range(n_vols):
390 assert os.path.exists(os.path.join("backup_dir",
391 deltatar.volume_name_func("backup_dir", True, i_vol)))
392 assert not os.path.exists(os.path.join("backup_dir",
393 deltatar.volume_name_func("backup_dir", True, n_vols)))
14e2e92d
DGM
394
395 shutil.rmtree("source_dir2")
396
397 index_filename = deltatar.index_name_func(True)
398 index_path = os.path.join("backup_dir", index_filename)
399
400 deltatar.restore_backup(target_path="source_dir2",
401 backup_indexes_paths=[index_path])
402
403 for key, value in self.hash.items():
404 assert os.path.exists(key)
405 if value:
406 assert value == self.md5sum(key)
407
408
9eae9a1f
ERE
409 def test_full_backup_index_extra_data(self):
410 '''
411 Tests that the index file for a full backup can store extra_data and
412 that this data can be retrieved.
413 '''
f698c99c
PG
414 password, paramversion = self.ENCRYPTION or (None, None)
415 deltatar = DeltaTar(mode=self.MODE, password=password,
416 crypto_paramversion=paramversion,
9eae9a1f
ERE
417 logger=self.consoleLogger)
418
419 extra_data = dict(
420 hola="caracola",
421 otra_cosa=[1, "lista"],
422 y_otra=dict(bola=1.1)
423 )
424
425 deltatar.create_full_backup(
426 source_path="source_dir",
427 backup_path="backup_dir",
428 extra_data=extra_data)
429
430 index_filename = deltatar.index_name_func(is_full=True)
431 index_path = os.path.join("backup_dir", index_filename)
432
433 # iterate_index_path retrieves extra_data, and thus we can then compare
434 index_it = deltatar.iterate_index_path(index_path)
435 self.assertEqual(index_it.extra_data, extra_data)
436
437
438 def test_diff_backup_index_extra_data(self):
439 '''
440 Tests that the index file for a diff backup can store extra_data and
441 that this data can be retrieved.
442 '''
f698c99c
PG
443 password, paramversion = self.ENCRYPTION or (None, None)
444 deltatar = DeltaTar(mode=self.MODE, password=password,
445 crypto_paramversion=paramversion,
9eae9a1f
ERE
446 logger=self.consoleLogger)
447
448 extra_data = dict(
449 hola="caracola",
450 otra_cosa=[1, "lista"],
451 y_otra=dict(bola=1.1)
452 )
453 # do first backup
454 deltatar.create_full_backup(
455 source_path="source_dir",
456 backup_path="backup_dir")
457
458
459 prev_index_filename = deltatar.index_name_func(is_full=True)
460 prev_index_path = os.path.join("backup_dir", prev_index_filename)
461
462 # create empty diff backup
463 deltatar.create_diff_backup("source_dir", "backup_dir2",
464 prev_index_path, extra_data=extra_data)
465
466 index_filename = deltatar.index_name_func(is_full=False)
467 index_path = os.path.join("backup_dir2", index_filename)
468
469 # iterate_index_path retrieves extra_data, and thus we can then compare
470 index_it = deltatar.iterate_index_path(index_path)
471 self.assertEqual(index_it.extra_data, extra_data)
472
8825be52
DGM
473 def test_restore_multivol2(self):
474 '''
475 Creates a full backup without any filtering with multiple volumes and
476 restore it.
477 '''
f698c99c
PG
478 password, paramversion = self.ENCRYPTION or (None, None)
479 deltatar = DeltaTar(mode=self.MODE, password=password,
480 crypto_paramversion=paramversion,
8825be52
DGM
481 logger=self.consoleLogger)
482
188b845d 483 shutil.copytree(self.GIT_DIR, "source_dir2")
8825be52
DGM
484
485 # create first backup
486 deltatar.create_full_backup(
487 source_path="source_dir2",
488 backup_path="backup_dir",
489 max_volume_size=1)
490
491 assert os.path.exists("backup_dir")
492 assert os.path.exists(os.path.join("backup_dir",
493 deltatar.volume_name_func("backup_dir", True, 0)))
8825be52
DGM
494
495 shutil.rmtree("source_dir2")
496
497 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
498 tar_path = os.path.join("backup_dir", tar_filename)
499
500 # this should automatically restore all volumes
501 deltatar.restore_backup(target_path="source_dir2",
502 backup_tar_path=tar_path)
503
188b845d 504 self.check_equal_dirs(self.GIT_DIR, 'source_dir2', deltatar)
8825be52 505
b8fc2f5d
ERE
506 def test_restore_multivol_manual_from_index(self):
507 '''
508 Creates a full backup without any filtering with multiple volumes and
509 restore it.
510 '''
511 # this test only works for uncompressed or concat compressed modes
512 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
513 raise SkipTest('this test only works for uncompressed '
514 'or concat compressed modes')
b8fc2f5d 515
f698c99c
PG
516 password, paramversion = self.ENCRYPTION or (None, None)
517 deltatar = DeltaTar(mode=self.MODE, password=password,
518 crypto_paramversion=paramversion,
b8fc2f5d
ERE
519 logger=self.consoleLogger)
520
b8fc2f5d
ERE
521 self.hash = dict()
522 os.makedirs('source_dir2')
523 self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
524 self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
525
526 # create first backup
527 deltatar.create_full_backup(
528 source_path="source_dir2",
529 backup_path="backup_dir",
530 max_volume_size=1)
531
532 assert os.path.exists("backup_dir")
533 assert os.path.exists(os.path.join("backup_dir",
534 deltatar.volume_name_func("backup_dir", True, 0)))
82f75df4
CH
535 if self.MODE_COMPRESSES:
536 n_vols = 1
537 else:
538 n_vols = 2
539 for i_vol in range(n_vols):
540 assert os.path.exists(os.path.join("backup_dir",
541 deltatar.volume_name_func("backup_dir", True, i_vol)))
542 assert not os.path.exists(os.path.join("backup_dir",
543 deltatar.volume_name_func("backup_dir", True, n_vols)))
b8fc2f5d
ERE
544
545 shutil.rmtree("source_dir2")
546
547 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
548 tar_path = os.path.join("backup_dir", tar_filename)
549
550 index_filename = deltatar.index_name_func(True)
551 index_path = os.path.join("backup_dir", index_filename)
552
553 # this should automatically restore the huge file
9eccb1c2 554 f = deltatar.open_auxiliary_file(index_path, 'r')
10798176 555 offset = None
2967b3e1
ERE
556 while True:
557 l = f.readline()
558 if not len(l):
559 break
be60ffd0 560 data = json.loads(l.decode('UTF-8'))
8adbe50d 561 if data.get('type', '') == 'file' and\
eb6d0069 562 deltatar.unprefixed(data['path']) == "huge":
b8fc2f5d
ERE
563 offset = data['offset']
564 break
565
10798176
CH
566 assert offset is not None
567
be60ffd0 568 fo = open(tar_path, 'rb')
b8fc2f5d
ERE
569 fo.seek(offset)
570 def new_volume_handler(mode, tarobj, base_name, volume_number):
f698c99c
PG
571 suf = DeltaTar._DeltaTar__file_extensions_dict[mode]
572 if self.ENCRYPTION is not None:
573 # deltatar module is shadowed here
574 suf += "." + deltatar_PDTCRYPT_EXTENSION
b8fc2f5d 575 tarobj.open_volume(datetime.now().strftime(
f698c99c 576 "backup_dir/bfull-%Y-%m-%d-%H%M-002.tar") + suf)
b8fc2f5d
ERE
577 new_volume_handler = partial(new_volume_handler, self.MODE)
578
f698c99c
PG
579 crypto_ctx = None
580 if self.ENCRYPTION is not None:
581 crypto_ctx = crypto.Decrypt (password)
582
b8fc2f5d 583 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
b8fc2f5d 584 new_volume_handler=new_volume_handler,
f698c99c
PG
585 encryption=crypto_ctx)
586
8adbe50d
ERE
587 member = tarobj.next()
588 member.path = deltatar.unprefixed(member.path)
589 member.name = deltatar.unprefixed(member.name)
590 tarobj.extract(member)
b8fc2f5d 591 tarobj.close()
c7609167 592 fo.close()
b8fc2f5d
ERE
593 assert self.hash['source_dir2/huge'] == self.md5sum('huge')
594
595 os.unlink("huge")
6c678f3a 596
f698c99c 597
9e092947
PG
598 def test_restore_manual_from_index_twice (self):
599 """
600 Creates a full backup and restore the same file twice. This *must* fail
601 when encryption is active.
602
603 Currently, tarfile.py’s *_Stream* class conveniently disallows seeking
604 backwards within the same file. This prevents the encryption layer from
605 exploding due to a reused IV in an overall valid archive.
606
607 This test anticipates possible future mistakes since it’s entirely
608 feasible to implement backward seeks for *_Stream* with concat mode.
609 """
610 # this test only works for uncompressed or concat compressed modes
611 if self.MODE.startswith("|") or self.MODE_COMPRESSES:
612 raise SkipTest("this test only works for uncompressed "
613 "or concat compressed modes")
614
615 password, paramversion = self.ENCRYPTION or (None, None)
616 deltatar = DeltaTar(mode=self.MODE, password=password,
617 crypto_paramversion=paramversion,
618 logger=self.consoleLogger)
619
620 self.hash = dict()
621 os.makedirs("source_dir2")
622 self.hash["source_dir2/samefile"] = \
623 self.create_file("source_dir2/samefile", 1 * 1024)
624
625 # create first backup
626 deltatar.create_full_backup(
627 source_path="source_dir2",
628 backup_path="backup_dir")
629
630 assert os.path.exists("backup_dir")
631 assert os.path.exists(os.path.join("backup_dir",
632 deltatar.volume_name_func("backup_dir", True, 0)))
633
634 shutil.rmtree("source_dir2")
635
636 tar_filename = deltatar.volume_name_func("backup_dir", True, 0)
637 tar_path = os.path.join("backup_dir", tar_filename)
638
639 index_filename = deltatar.index_name_func(True)
640 index_path = os.path.join("backup_dir", index_filename)
641
642 f = deltatar.open_auxiliary_file(index_path, "r")
643 offset = None
644 while True:
645 l = f.readline()
646 if not len(l):
647 break
648 data = json.loads(l.decode("UTF-8"))
649 if data.get("type", "") == "file" and\
650 deltatar.unprefixed(data["path"]) == "samefile":
651 offset = data["offset"]
652 break
653
654 assert offset is not None
655
656 fo = open(tar_path, "rb")
657 fo.seek(offset)
658
659 crypto_ctx = None
660 if self.ENCRYPTION is not None:
661 crypto_ctx = crypto.Decrypt (password)
662
663 tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
664 encryption=crypto_ctx)
665 member = tarobj.next()
666 member.path = deltatar.unprefixed(member.path)
667 member.name = deltatar.unprefixed(member.name)
668
669 # extract once …
670 tarobj.extract(member)
671 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
672
673 # … and twice
674 try:
675 tarobj.extract(member)
676 except tarfile.StreamError:
677 if crypto_ctx is not None:
678 pass # good: seeking backwards not allowed
679 else:
680 raise
681 tarobj.close()
682 fo.close()
683 assert self.hash["source_dir2/samefile"] == self.md5sum("samefile")
684
685 os.unlink("samefile")
686
687
11684b1d
ERE
688 def test_restore_from_index(self):
689 '''
55b8686d 690 Restores a full backup using an index file.
11684b1d 691 '''
11684b1d 692 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
693 raise SkipTest('this test only works for uncompressed '
694 'or concat compressed modes')
11684b1d 695
f698c99c
PG
696 password, paramversion = self.ENCRYPTION or (None, None)
697 deltatar = DeltaTar(mode=self.MODE, password=password,
698 crypto_paramversion=paramversion,
11684b1d
ERE
699 logger=self.consoleLogger)
700
701 # create first backup
702 deltatar.create_full_backup(
703 source_path="source_dir",
55b8686d 704 backup_path="backup_dir")
11684b1d
ERE
705
706 shutil.rmtree("source_dir")
707
708 # this should automatically restore all volumes
709 index_filename = deltatar.index_name_func(True)
710 index_path = os.path.join("backup_dir", index_filename)
711
712 deltatar.restore_backup(target_path="source_dir",
713 backup_indexes_paths=[index_path])
714
be60ffd0 715 for key, value in self.hash.items():
11684b1d
ERE
716 assert os.path.exists(key)
717 if value:
718 assert value == self.md5sum(key)
6c678f3a 719
b8fc2f5d
ERE
720 def test_restore_multivol_from_index(self):
721 '''
722 Restores a full multivolume backup using an index file.
723 '''
b8fc2f5d 724 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
725 raise SkipTest('this test only works for uncompressed '
726 'or concat compressed modes')
b8fc2f5d 727
f698c99c
PG
728 password, paramversion = self.ENCRYPTION or (None, None)
729 deltatar = DeltaTar(mode=self.MODE, password=password,
730 crypto_paramversion=paramversion,
b8fc2f5d
ERE
731 logger=self.consoleLogger)
732
733 # create first backup
734 deltatar.create_full_backup(
735 source_path="source_dir",
736 backup_path="backup_dir",
8390c925 737 max_volume_size=2)
b8fc2f5d
ERE
738
739 shutil.rmtree("source_dir")
740
741 # this should automatically restore all volumes
742 index_filename = deltatar.index_name_func(True)
743 index_path = os.path.join("backup_dir", index_filename)
744
745 deltatar.restore_backup(target_path="source_dir",
746 backup_indexes_paths=[index_path])
747
be60ffd0 748 for key, value in self.hash.items():
b8fc2f5d
ERE
749 assert os.path.exists(key)
750 if value:
751 assert value == self.md5sum(key)
da26094a 752
c1af2184
ERE
753 def test_create_basic_filtering(self):
754 '''
755 Tests create backup basic filtering.
756 '''
f698c99c
PG
757 password, paramversion = self.ENCRYPTION or (None, None)
758 deltatar = DeltaTar(mode=self.MODE, password=password,
759 crypto_paramversion=paramversion,
c1af2184 760 logger=self.consoleLogger,
eb6d0069
CH
761 included_files=["test", "small"],
762 excluded_files=["test/huge"])
c1af2184
ERE
763
764 # create first backup
765 deltatar.create_full_backup(
766 source_path="source_dir",
767 backup_path="backup_dir")
768
769 assert os.path.exists("backup_dir")
770 shutil.rmtree("source_dir")
771
772 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
773 tar_path = os.path.join("backup_dir", tar_filename)
774
775 deltatar.restore_backup(target_path="source_dir",
776 backup_tar_path=tar_path)
777
eb6d0069
CH
778 assert os.path.exists("source_dir/small")
779 assert os.path.exists("source_dir/test")
780 assert os.path.exists("source_dir/test/huge2")
781 assert os.path.exists("source_dir/test/test2")
c1af2184 782
eb6d0069
CH
783 assert not os.path.exists("source_dir/test/huge")
784 assert not os.path.exists("source_dir/big")
c1af2184 785
862b3726
ERE
786 def test_create_filter_func(self):
787 '''
788 Tests create backup basic filtering.
789 '''
790 visited_paths = []
791 def filter_func(visited_paths, path):
792 if path not in visited_paths:
793 visited_paths.append(path)
794 return True
795
796 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
797
798 password, paramversion = self.ENCRYPTION or (None, None)
799 deltatar = DeltaTar(mode=self.MODE, password=password,
800 crypto_paramversion=paramversion,
862b3726 801 logger=self.consoleLogger,
eb6d0069
CH
802 included_files=["test", "small"],
803 excluded_files=["test/huge"],
862b3726
ERE
804 filter_func=filter_func)
805
806 # create first backup
807 deltatar.create_full_backup(
808 source_path="source_dir",
809 backup_path="backup_dir")
810
811 assert os.path.exists("backup_dir")
812 shutil.rmtree("source_dir")
813
814 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
815 tar_path = os.path.join("backup_dir", tar_filename)
816
817 deltatar.restore_backup(target_path="source_dir",
818 backup_tar_path=tar_path)
9af328e2 819 assert set(visited_paths) == set([
eb6d0069
CH
820 'small',
821 'test',
822 'test/huge2',
823 'test/test2'
9af328e2 824 ])
862b3726
ERE
825
826 def test_create_filter_out_func(self):
827 '''
828 Tests create backup basic filtering.
829 '''
830 visited_paths = []
831 def filter_func(visited_paths, path):
832 '''
833 Filter out everything
834 '''
835 if path not in visited_paths:
836 visited_paths.append(path)
837 return False
838
839 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
840
841 password, paramversion = self.ENCRYPTION or (None, None)
842 deltatar = DeltaTar(mode=self.MODE, password=password,
843 crypto_paramversion=paramversion,
862b3726 844 logger=self.consoleLogger,
eb6d0069
CH
845 included_files=["test", "small"],
846 excluded_files=["test/huge"],
862b3726
ERE
847 filter_func=filter_func)
848
849 # create first backup
850 deltatar.create_full_backup(
851 source_path="source_dir",
852 backup_path="backup_dir")
853
854 assert os.path.exists("backup_dir")
855 shutil.rmtree("source_dir")
856
857 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
858 tar_path = os.path.join("backup_dir", tar_filename)
859
860 deltatar.restore_backup(target_path="source_dir",
861 backup_tar_path=tar_path)
9af328e2 862 assert set(visited_paths) == set([
eb6d0069
CH
863 'small',
864 'test'
9af328e2 865 ])
862b3726
ERE
866
867 # check that effectively no file was backed up
eb6d0069
CH
868 assert not os.path.exists("source_dir/small")
869 assert not os.path.exists("source_dir/big")
870 assert not os.path.exists("source_dir/test")
862b3726 871
9af328e2
ERE
872 def test_restore_index_basic_filtering(self):
873 '''
874 Creates a backup, and then filter when doing the index based restore.
875 '''
f391d8e9 876 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
877 raise SkipTest('this test only works for uncompressed '
878 'or concat compressed modes')
f391d8e9 879
f698c99c
PG
880 password, paramversion = self.ENCRYPTION or (None, None)
881 deltatar = DeltaTar(mode=self.MODE, password=password,
882 crypto_paramversion=paramversion,
9af328e2
ERE
883 logger=self.consoleLogger)
884
885 # create first backup
886 deltatar.create_full_backup(
887 source_path="source_dir",
888 backup_path="backup_dir")
889
890 assert os.path.exists("backup_dir")
891 shutil.rmtree("source_dir")
892
893 index_filename = deltatar.index_name_func(True)
894 index_path = os.path.join("backup_dir", index_filename)
895
eb6d0069
CH
896 deltatar.included_files = ["test", "small"]
897 deltatar.excluded_files = ["test/huge"]
9af328e2
ERE
898 deltatar.restore_backup(target_path="source_dir",
899 backup_indexes_paths=[index_path])
900
eb6d0069
CH
901 assert os.path.exists("source_dir/small")
902 assert os.path.exists("source_dir/test")
903 assert os.path.exists("source_dir/test/huge2")
904 assert os.path.exists("source_dir/test/test2")
9af328e2 905
eb6d0069
CH
906 assert not os.path.exists("source_dir/test/huge")
907 assert not os.path.exists("source_dir/big")
9af328e2
ERE
908
909 def test_restore_index_filter_func(self):
910 '''
911 Creates a backup, and then filter when doing the index based restore,
912 using the filter function.
913 '''
f391d8e9 914 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
915 raise SkipTest('this test only works for uncompressed '
916 'or concat compressed modes')
f391d8e9 917
9af328e2
ERE
918 visited_paths = []
919 def filter_func(visited_paths, path):
920 if path not in visited_paths:
921 visited_paths.append(path)
922 return True
923
924 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
925
926 password, paramversion = self.ENCRYPTION or (None, None)
927 deltatar = DeltaTar(mode=self.MODE, password=password,
928 crypto_paramversion=paramversion,
9af328e2
ERE
929 logger=self.consoleLogger)
930
931 # create first backup
932 deltatar.create_full_backup(
933 source_path="source_dir",
934 backup_path="backup_dir")
935
936 assert os.path.exists("backup_dir")
937 shutil.rmtree("source_dir")
938
939 index_filename = deltatar.index_name_func(True)
940 index_path = os.path.join("backup_dir", index_filename)
941
eb6d0069
CH
942 deltatar.included_files = ["test", "small"]
943 deltatar.excluded_files = ["test/huge"]
9af328e2
ERE
944 deltatar.filter_func = filter_func
945 deltatar.restore_backup(target_path="source_dir",
946 backup_indexes_paths=[index_path])
947
948 assert set(visited_paths) == set([
eb6d0069
CH
949 'small',
950 'test',
951 'test/huge2',
952 'test/test2'
9af328e2
ERE
953 ])
954
e5f5681b
ERE
955 def test_restore_tar_basic_filtering(self):
956 '''
957 Creates a backup, and then filter when doing the tar based restore.
958 '''
f698c99c
PG
959 password, paramversion = self.ENCRYPTION or (None, None)
960 deltatar = DeltaTar(mode=self.MODE, password=password,
961 crypto_paramversion=paramversion,
e5f5681b
ERE
962 logger=self.consoleLogger)
963
964 # create first backup
965 deltatar.create_full_backup(
966 source_path="source_dir",
967 backup_path="backup_dir")
968
969 assert os.path.exists("backup_dir")
970 shutil.rmtree("source_dir")
971
eb6d0069
CH
972 deltatar.included_files = ["test", "small"]
973 deltatar.excluded_files = ["test/huge"]
e5f5681b
ERE
974
975 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
976 tar_path = os.path.join("backup_dir", tar_filename)
977
978 deltatar.restore_backup(target_path="source_dir",
979 backup_tar_path=tar_path)
980
eb6d0069
CH
981 assert os.path.exists("source_dir/small")
982 assert os.path.exists("source_dir/test")
983 assert os.path.exists("source_dir/test/huge2")
984 assert os.path.exists("source_dir/test/test2")
e5f5681b 985
eb6d0069
CH
986 assert not os.path.exists("source_dir/test/huge")
987 assert not os.path.exists("source_dir/big")
e5f5681b
ERE
988
989 def test_restore_tar_filter_func(self):
990 '''
991 Creates a backup, and then filter when doing the tar based restore,
992 using the filter function.
993 '''
994 visited_paths = []
995 def filter_func(visited_paths, path):
996 if path not in visited_paths:
997 visited_paths.append(path)
998 return True
999
1000 filter_func = partial(filter_func, visited_paths)
f698c99c
PG
1001
1002 password, paramversion = self.ENCRYPTION or (None, None)
1003 deltatar = DeltaTar(mode=self.MODE, password=password,
1004 crypto_paramversion=paramversion,
e5f5681b
ERE
1005 logger=self.consoleLogger)
1006
1007 # create first backup
1008 deltatar.create_full_backup(
1009 source_path="source_dir",
1010 backup_path="backup_dir")
1011
1012 assert os.path.exists("backup_dir")
1013 shutil.rmtree("source_dir")
1014
1015 index_filename = deltatar.index_name_func(True)
1016 index_path = os.path.join("backup_dir", index_filename)
1017
eb6d0069
CH
1018 deltatar.included_files = ["test", "small"]
1019 deltatar.excluded_files = ["test/huge"]
e5f5681b
ERE
1020 deltatar.filter_func = filter_func
1021
1022 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1023 tar_path = os.path.join("backup_dir", tar_filename)
1024
1025 deltatar.restore_backup(target_path="source_dir",
1026 backup_tar_path=tar_path)
1027 assert set(visited_paths) == set([
eb6d0069
CH
1028 'small',
1029 'test',
1030 'test/huge2',
1031 'test/test2'
e5f5681b
ERE
1032 ])
1033
974408b5 1034 def test_filter_path_regexp(self):
0d5c1970
ERE
1035 '''
1036 Test specifically the deltatar.filter_path function with regular
1037 expressions
1038 '''
1039 included_files = [
eb6d0069
CH
1040 re.compile('^test/(hola|caracola/caracolero)(|/.*)$'),
1041 re.compile('^yes$'),
1042 'testing'
0d5c1970
ERE
1043 ]
1044 excluded_files = [
eb6d0069 1045 re.compile('^testing/in_the'),
0d5c1970
ERE
1046 ]
1047 deltatar = DeltaTar(mode=self.MODE, included_files=included_files,
1048 excluded_files=excluded_files)
1049
1050 # assert valid and invalid paths
eb6d0069
CH
1051 assert deltatar.filter_path('test/hola')
1052 assert deltatar.filter_path('test/hola/any/thing')
1053 assert deltatar.filter_path('test/caracola/caracolero')
1054 assert deltatar.filter_path('test/caracola/caracolero/yeah')
1055 assert deltatar.filter_path('test/caracola/caracolero/whatever/aa')
1056 assert deltatar.filter_path('yes')
1057 assert deltatar.filter_path('testing')
1058 assert deltatar.filter_path('testing/yes')
1059 assert deltatar.filter_path('testing/in_th')
1060
1061 assert not deltatar.filter_path('something')
1062 assert not deltatar.filter_path('other/thing')
1063 assert not deltatar.filter_path('test_ing')
1064 assert not deltatar.filter_path('test/hola_lala')
1065 assert not deltatar.filter_path('test/agur')
1066 assert not deltatar.filter_path('testing_something')
1067 assert not deltatar.filter_path('yeso')
1068 assert not deltatar.filter_path('yes/o')
1069 assert not deltatar.filter_path('yes_o')
1070 assert not deltatar.filter_path('testing/in_the')
1071 assert not deltatar.filter_path('testing/in_the_field')
1072 assert not deltatar.filter_path('testing/in_the/field')
e5f5681b 1073
974408b5
ERE
1074 def test_filter_path_parent(self):
1075 '''
1076 Test specifically the deltatar.filter_path function for parent matching
1077 '''
1078 included_files = [
eb6d0069 1079 'testing/path/to/some/thing'
974408b5
ERE
1080 ]
1081 deltatar = DeltaTar(mode=self.MODE, included_files=included_files)
1082
1083 # assert valid and invalid paths
eb6d0069
CH
1084 assert deltatar.filter_path('testing', is_dir=True) == PARENT_MATCH
1085 assert deltatar.filter_path('testing/path/', is_dir=True) == PARENT_MATCH
1086 assert deltatar.filter_path('testing/path/to', is_dir=True) == PARENT_MATCH
1087 assert deltatar.filter_path('testing/path/to/some', is_dir=True) == PARENT_MATCH
1088 assert deltatar.filter_path('testing/path/to/some/thing') == MATCH
1089 assert deltatar.filter_path('testing/path/to/some/thing/what&/ever') == MATCH
1090 assert deltatar.filter_path('testing/something/else') == NO_MATCH
974408b5
ERE
1091
1092 def test_parent_matching_simple_full_backup(self):
1093 '''
1094 Create a full backup using parent matching
1095 '''
1096 included_files = [
eb6d0069 1097 'test/huge2'
974408b5 1098 ]
f698c99c
PG
1099
1100 password, paramversion = self.ENCRYPTION or (None, None)
1101 deltatar = DeltaTar(mode=self.MODE, password=password,
1102 crypto_paramversion=paramversion,
974408b5
ERE
1103 logger=self.consoleLogger,
1104 included_files=included_files)
1105
1106 # create first backup
1107 deltatar.create_full_backup(
1108 source_path="source_dir",
1109 backup_path="backup_dir")
1110
1111 assert os.path.exists("backup_dir")
1112 shutil.rmtree("source_dir")
1113
1114 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1115 tar_path = os.path.join("backup_dir", tar_filename)
1116
f698c99c 1117 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1118 logger=self.consoleLogger)
1119 deltatar.restore_backup(target_path="source_dir",
1120 backup_tar_path=tar_path)
1121
1122 assert os.path.exists('source_dir/test/huge2')
1123 assert os.path.exists('source_dir/test/')
1124 assert not os.path.exists('source_dir/test/huge')
1125 assert not os.path.exists('source_dir/big')
1126 assert not os.path.exists('source_dir/small')
1127
1128 def test_parent_matching_simple_full_backup_restore(self):
1129 '''
1130 Create a full backup and restores it using parent matching
1131 '''
1132 included_files = [
eb6d0069 1133 'test/huge2'
974408b5 1134 ]
f698c99c
PG
1135
1136 password, paramversion = self.ENCRYPTION or (None, None)
1137 deltatar = DeltaTar(mode=self.MODE, password=password,
1138 crypto_paramversion=paramversion,
974408b5
ERE
1139 logger=self.consoleLogger)
1140
1141 # create first backup
1142 deltatar.create_full_backup(
1143 source_path="source_dir",
1144 backup_path="backup_dir")
1145
1146 assert os.path.exists("backup_dir")
1147 shutil.rmtree("source_dir")
1148
1149 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1150 tar_path = os.path.join("backup_dir", tar_filename)
1151
f698c99c 1152 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1153 logger=self.consoleLogger,
1154 included_files=included_files)
1155 deltatar.restore_backup(target_path="source_dir",
1156 backup_tar_path=tar_path)
1157
1158 assert os.path.exists('source_dir/test/huge2')
1159 assert os.path.exists('source_dir/test/')
1160 assert not os.path.exists('source_dir/test/huge')
1161 assert not os.path.exists('source_dir/big')
1162 assert not os.path.exists('source_dir/small')
1163
1164 def test_parent_matching_index_full_backup_restore(self):
1165 '''
1166 Create a full backup and restores it using parent matching
1167 '''
1168 included_files = [
eb6d0069 1169 'test/huge2'
974408b5 1170 ]
f698c99c
PG
1171
1172 password, paramversion = self.ENCRYPTION or (None, None)
1173 deltatar = DeltaTar(mode=self.MODE, password=password,
1174 crypto_paramversion=paramversion,
974408b5
ERE
1175 logger=self.consoleLogger)
1176
1177 # create first backup
1178 deltatar.create_full_backup(
1179 source_path="source_dir",
1180 backup_path="backup_dir")
1181
1182 assert os.path.exists("backup_dir")
1183 shutil.rmtree("source_dir")
1184
1185 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1186 tar_path = os.path.join("backup_dir", tar_filename)
1187
f698c99c 1188 deltatar = DeltaTar(mode=self.MODE, password=password,
974408b5
ERE
1189 logger=self.consoleLogger,
1190 included_files=included_files)
1191 deltatar.restore_backup(target_path="source_dir",
1192 backup_tar_path=tar_path)
1193
1194 assert os.path.exists('source_dir/test/huge2')
1195 assert os.path.exists('source_dir/test/')
1196 assert not os.path.exists('source_dir/test/huge')
1197 assert not os.path.exists('source_dir/big')
1198 assert not os.path.exists('source_dir/small')
1199
d07c8065
ERE
1200 def test_collate_iterators(self):
1201 '''
1202 Tests the collate iterators functionality with two exact directories,
1203 using an index iterator from a backup and the exact same source dir.
1204 '''
f698c99c
PG
1205 password, paramversion = self.ENCRYPTION or (None, None)
1206 deltatar = DeltaTar(mode=self.MODE, password=password,
1207 crypto_paramversion=paramversion,
d07c8065
ERE
1208 logger=self.consoleLogger)
1209
1210 # create first backup
1211 deltatar.create_full_backup(
1212 source_path="source_dir",
1213 backup_path="backup_dir")
1214
1215 assert os.path.exists("backup_dir")
1216
1217 cwd = os.getcwd()
1218 index_filename = deltatar.index_name_func(is_full=True)
1219 index_path = os.path.join(cwd, "backup_dir", index_filename)
1220 index_it = deltatar.iterate_index_path(index_path)
1221
1222 os.chdir('source_dir')
1223 dir_it = deltatar._recursive_walk_dir('.')
1224 path_it = deltatar.jsonize_path_iterator(dir_it)
1225
1226 try:
ea6d3c3e 1227 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
d07c8065
ERE
1228 assert deltatar._equal_stat_dicts(path1, path2)
1229 finally:
1230 os.chdir(cwd)
1231
1232 def test_collate_iterators_diffdirs(self):
1233 '''
1234 Use the collate iterators functionality with two different directories.
1235 It must behave in an expected way.
1236 '''
1237 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1238
f698c99c
PG
1239 password, paramversion = self.ENCRYPTION or (None, None)
1240 deltatar = DeltaTar(mode=self.MODE, password=password,
1241 crypto_paramversion=paramversion,
d07c8065
ERE
1242 logger=self.consoleLogger)
1243
1244 # create first backup
1245 deltatar.create_full_backup(
1246 source_path="source_dir",
1247 backup_path="backup_dir")
1248
1249 assert os.path.exists("backup_dir")
1250 self.hash["source_dir/z"] = self.create_file("source_dir/z", 100)
1251
1252 cwd = os.getcwd()
1253 index_filename = deltatar.index_name_func(is_full=True)
1254 index_path = os.path.join(cwd, "backup_dir", index_filename)
1255 index_it = deltatar.iterate_index_path(index_path)
1256
1257 os.chdir('source_dir')
1258 dir_it = deltatar._recursive_walk_dir('.')
1259 path_it = deltatar.jsonize_path_iterator(dir_it)
1260
1261 try:
ea6d3c3e 1262 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
eb6d0069 1263 if path2['path'] == 'z':
d07c8065
ERE
1264 assert not path1
1265 else:
1266 assert deltatar._equal_stat_dicts(path1, path2)
1267 finally:
1268 os.chdir(cwd)
1269
42d39ca7 1270 def test_collate_iterators_diffdirs2(self):
aae127d0 1271 '''
42d39ca7
ERE
1272 Use the collate iterators functionality with two different directories.
1273 It must behave in an expected way.
aae127d0 1274 '''
f698c99c
PG
1275 password, paramversion = self.ENCRYPTION or (None, None)
1276 deltatar = DeltaTar(mode=self.MODE, password=password,
1277 crypto_paramversion=paramversion,
42d39ca7
ERE
1278 logger=self.consoleLogger)
1279
1280 # create first backup
1281 deltatar.create_full_backup(
1282 source_path="source_dir",
1283 backup_path="backup_dir")
1284
1285 assert os.path.exists("backup_dir")
1286
1287 # add some new files and directories
1288 os.makedirs('source_dir/bigdir')
1289 self.hash["source_dir/bigdir"] = ""
1290 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1291 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
aae127d0
ERE
1292 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1293
42d39ca7
ERE
1294 cwd = os.getcwd()
1295 index_filename = deltatar.index_name_func(is_full=True)
1296 index_path = os.path.join(cwd, "backup_dir", index_filename)
1297 index_it = deltatar.iterate_index_path(index_path)
1298
1299 os.chdir('source_dir')
1300 dir_it = deltatar._recursive_walk_dir('.')
1301 path_it = deltatar.jsonize_path_iterator(dir_it)
1302
1303 visited_pairs = []
1304
1305 try:
ea6d3c3e 1306 for path1, path2, l_no in deltatar.collate_iterators(index_it, path_it):
42d39ca7
ERE
1307 visited_pairs.append(
1308 (deltatar.unprefixed(path1['path']) if path1 else None,
1309 path2['path'] if path2 else None)
1310 )
1311 finally:
1312 assert visited_pairs == [
eb6d0069
CH
1313 (u'big', u'big'),
1314 (None, u'bigdir'),
1315 (u'small', u'small'),
1316 (u'test', u'test'),
1317 (None, u'zzzz'),
1318 (None, u'bigdir/a'),
1319 (None, u'bigdir/b'),
1320 (u'test/huge', u'test/huge'),
1321 (u'test/huge2', u'test/huge2'),
1322 (u'test/test2', u'test/test2'),
42d39ca7
ERE
1323 ]
1324 os.chdir(cwd)
1325
1326 def test_create_empty_diff_backup(self):
1327 '''
1328 Creates an empty (no changes) backup diff
1329 '''
f698c99c
PG
1330 password, paramversion = self.ENCRYPTION or (None, None)
1331 deltatar = DeltaTar(mode=self.MODE, password=password,
1332 crypto_paramversion=paramversion,
aae127d0
ERE
1333 logger=self.consoleLogger)
1334
1335 # create first backup
1336 deltatar.create_full_backup(
1337 source_path="source_dir",
1338 backup_path="backup_dir")
1339
1340 prev_index_filename = deltatar.index_name_func(is_full=True)
1341 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1342
1343 deltatar.create_diff_backup("source_dir", "backup_dir2",
1344 prev_index_path)
1345
1346 # check index items
df86af81
ERE
1347 index_path = os.path.join("backup_dir2",
1348 deltatar.index_name_func(is_full=False))
aae127d0 1349 index_it = deltatar.iterate_index_path(index_path)
82de3376 1350 n = 0
aae127d0 1351 for i in index_it:
82de3376 1352 n += 1
aae127d0 1353 assert i[0]['path'].startswith("list://")
9af328e2 1354
ea6d3c3e 1355 assert n == 6
8adbe50d
ERE
1356
1357 # check the tar file
82de3376
ERE
1358 assert os.path.exists("backup_dir2")
1359 shutil.rmtree("source_dir")
1360
df86af81
ERE
1361 tar_filename = deltatar.volume_name_func('backup_dir2',
1362 is_full=False, volume_number=0)
82de3376
ERE
1363 tar_path = os.path.join("backup_dir2", tar_filename)
1364
1365 # no file restored, because the diff was empty
1366 deltatar.restore_backup(target_path="source_dir",
1367 backup_tar_path=tar_path)
1368 assert len(os.listdir("source_dir")) == 0
1369
8adbe50d 1370
42d39ca7
ERE
1371 def test_create_diff_backup1(self):
1372 '''
1373 Creates a diff backup when there are new files
1374 '''
f698c99c
PG
1375 password, paramversion = self.ENCRYPTION or (None, None)
1376 deltatar = DeltaTar(mode=self.MODE, password=password,
1377 crypto_paramversion=paramversion,
42d39ca7
ERE
1378 logger=self.consoleLogger)
1379
1380 # create first backup
1381 deltatar.create_full_backup(
1382 source_path="source_dir",
1383 backup_path="backup_dir")
1384
1385 prev_index_filename = deltatar.index_name_func(is_full=True)
1386 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1387
1388 # add some new files and directories
1389 os.makedirs('source_dir/bigdir')
1390 self.hash["source_dir/bigdir"] = ""
0519f161 1391 os.unlink("source_dir/small")
42d39ca7
ERE
1392 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1393 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1394 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1395
1396 deltatar.create_diff_backup("source_dir", "backup_dir2",
1397 prev_index_path)
1398
1399 # check index items
df86af81 1400 index_path = os.path.join("backup_dir2", deltatar.index_name_func(is_full=False))
42d39ca7 1401 index_it = deltatar.iterate_index_path(index_path)
df86af81
ERE
1402 l = [i[0]['path'] for i in index_it]
1403
1404 assert l == [
eb6d0069
CH
1405 'list://big',
1406 'snapshot://bigdir',
1407 'delete://small',
1408 'list://test',
1409 'snapshot://zzzz',
1410 'snapshot://bigdir/a',
1411 'snapshot://bigdir/b',
1412 'list://test/huge',
1413 'list://test/huge2',
1414 'list://test/test2',
df86af81 1415 ]
42d39ca7
ERE
1416
1417 # check the tar file
1418 assert os.path.exists("backup_dir2")
1419 shutil.rmtree("source_dir")
1420
0519f161
ERE
1421 # create source_dir with the small file, that will be then deleted by
1422 # the restore_backup
1423 os.mkdir("source_dir")
be60ffd0 1424 open("source_dir/small", 'wb').close()
0519f161 1425
df86af81
ERE
1426 tar_filename = deltatar.volume_name_func('backup_dir2',
1427 is_full=False, volume_number=0)
42d39ca7
ERE
1428 tar_path = os.path.join("backup_dir2", tar_filename)
1429
1430 # restore the backup, this will create only the new files
1431 deltatar.restore_backup(target_path="source_dir",
1432 backup_tar_path=tar_path)
a345b1c9
DGM
1433 # the order doesn't matter
1434 assert set(os.listdir("source_dir")) == set(['zzzz', 'bigdir'])
42d39ca7 1435
df99a044
ERE
1436 def test_restore_from_index_diff_backup(self):
1437 '''
1438 Creates a full backup, modifies some files, creates a diff backup,
1439 then restores the diff backup from zero.
1440 '''
df99a044 1441 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1442 raise SkipTest('this test only works for uncompressed '
1443 'or concat compressed modes')
df99a044 1444
f698c99c
PG
1445 password, paramversion = self.ENCRYPTION or (None, None)
1446 deltatar = DeltaTar(mode=self.MODE, password=password,
1447 crypto_paramversion=paramversion,
df99a044
ERE
1448 logger=self.consoleLogger)
1449
1450 # create first backup
1451 deltatar.create_full_backup(
1452 source_path="source_dir",
1453 backup_path="backup_dir")
1454
1455 prev_index_filename = deltatar.index_name_func(is_full=True)
1456 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1457
1458 # add some new files and directories
1459 os.makedirs('source_dir/bigdir')
1460 self.hash["source_dir/bigdir"] = ""
1461 os.unlink("source_dir/small")
1462 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1463 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1464 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
1465
1466 deltatar.create_diff_backup("source_dir", "backup_dir2",
1467 prev_index_path)
1468
1469 # apply diff backup in target_dir
df86af81 1470 index_filename = deltatar.index_name_func(is_full=False)
df99a044
ERE
1471 index_path = os.path.join("backup_dir2", index_filename)
1472 deltatar.restore_backup("target_dir",
1473 backup_indexes_paths=[index_path, prev_index_path])
1474
1475 # then compare the two directories source_dir and target_dir and check
1476 # they are the same
cbac9f0b 1477 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
df99a044
ERE
1478
1479 def test_restore_from_index_diff_backup2(self):
1480 '''
1481 Creates a full backup, modifies some files, creates a diff backup,
1482 then restores the diff backup with the full backup as a starting point.
1483 '''
df99a044 1484 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1485 raise SkipTest('this test only works for uncompressed '
1486 'or concat compressed modes')
df99a044 1487
f698c99c
PG
1488 password, paramversion = self.ENCRYPTION or (None, None)
1489 deltatar = DeltaTar(mode=self.MODE, password=password,
1490 crypto_paramversion=paramversion,
df99a044
ERE
1491 logger=self.consoleLogger)
1492
1493 # create first backup
1494 deltatar.create_full_backup(
1495 source_path="source_dir",
1496 backup_path="backup_dir")
1497
1498 prev_index_filename = deltatar.index_name_func(is_full=True)
1499 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1500
1501 # add some new files and directories
1502 os.makedirs('source_dir/bigdir')
1503 self.hash["source_dir/bigdir"] = ""
1504 os.unlink("source_dir/small")
1505 self.hash["source_dir/bigdir/a"] = self.create_file("source_dir/bigdir/a", 100)
1506 self.hash["source_dir/bigdir/b"] = self.create_file("source_dir/bigdir/b", 500)
1507 self.hash["source_dir/zzzz"] = self.create_file("source_dir/zzzz", 100)
d86735e4 1508 shutil.rmtree("source_dir/test")
df99a044
ERE
1509
1510 deltatar.create_diff_backup("source_dir", "backup_dir2",
1511 prev_index_path)
1512
1513 # first restore initial backup in target_dir
df86af81 1514 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
df99a044
ERE
1515 tar_path = os.path.join("backup_dir", tar_filename)
1516 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1517
1518 # then apply diff backup in target_dir
df86af81 1519 index_filename = deltatar.index_name_func(is_full=False)
df99a044
ERE
1520 index_path = os.path.join("backup_dir2", index_filename)
1521 deltatar.restore_backup("target_dir",
1522 backup_indexes_paths=[index_path, prev_index_path])
1523
1524 # then compare the two directories source_dir and target_dir and check
1525 # they are the same
cbac9f0b
ERE
1526 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1527
1528 def test_restore_from_index_diff_backup3(self):
1529 '''
188b845d 1530 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
cbac9f0b
ERE
1531 diff backup, then restores the diff backup with the full backup as a
1532 starting point.
1533 '''
cbac9f0b 1534 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1535 raise SkipTest('this test only works for uncompressed '
1536 'or concat compressed modes')
cbac9f0b 1537
f698c99c
PG
1538 password, paramversion = self.ENCRYPTION or (None, None)
1539 deltatar = DeltaTar(mode=self.MODE, password=password,
1540 crypto_paramversion=paramversion,
cbac9f0b
ERE
1541 logger=self.consoleLogger)
1542
1543 shutil.rmtree("source_dir")
188b845d
TJ
1544 shutil.copytree(self.GIT_DIR, "source_dir")
1545 shutil.copytree(self.GIT_DIR, "source_dir_diff")
cbac9f0b
ERE
1546
1547 # create first backup
1548 deltatar.create_full_backup(
1549 source_path="source_dir",
1550 backup_path="backup_dir")
1551
1552 prev_index_filename = deltatar.index_name_func(is_full=True)
1553 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1554
1555 # alter the source_dir randomly
1556 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1557
1558 for path in source_it:
1559 # if path doesn't exist (might have previously removed) ignore it.
1560 # also ignore it (i.e. do not change it) 70% of the time
1561 if not os.path.exists(path) or random.random() < 0.7:
1562 continue
1563
1564 # remove the file
1565 if os.path.isdir(path):
1566 shutil.rmtree(path)
1567 else:
1568 os.unlink(path)
1569
1215b602 1570 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
cbac9f0b 1571 prev_index_path)
cbac9f0b
ERE
1572
1573 # first restore initial backup in target_dir
df86af81 1574 tar_filename = deltatar.volume_name_func('backup_dir', is_full=True, volume_number=0)
cbac9f0b
ERE
1575 tar_path = os.path.join("backup_dir", tar_filename)
1576 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1577
1578 # and check that target_dir equals to source_dir (which is the same as
188b845d 1579 # self.GIT_DIR initially)
cbac9f0b
ERE
1580 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1581
1582 # then apply diff backup in target_dir
df86af81 1583 index_filename = deltatar.index_name_func(is_full=False)
8825be52
DGM
1584 index_path = os.path.join("backup_dir2", index_filename)
1585 deltatar.restore_backup("target_dir",
1586 backup_indexes_paths=[index_path, prev_index_path])
1587
1588 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1589 # altered self.GIT_DIR directory)
8825be52
DGM
1590 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1591
1592 # then delete target_dir and apply diff backup from zero and check again
1593 shutil.rmtree("target_dir")
1594 deltatar.restore_backup("target_dir",
1595 backup_indexes_paths=[index_path, prev_index_path])
1596
1597 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1598 # altered self.GIT_DIR directory)
8825be52
DGM
1599 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1600
1601 def test_restore_from_index_diff_backup3_multivol(self):
1602 '''
188b845d 1603 Creates a full backup of self.GIT_DIR, modifies some random files, creates a
8825be52
DGM
1604 diff backup, then restores the diff backup with the full backup as a
1605 starting point.
1606 '''
8825be52 1607 if self.MODE.startswith(':') or self.MODE.startswith('|'):
bd011242
CH
1608 raise SkipTest('this test only works for uncompressed '
1609 'or concat compressed modes')
8825be52 1610
f698c99c
PG
1611 password, paramversion = self.ENCRYPTION or (None, None)
1612 deltatar = DeltaTar(mode=self.MODE, password=password,
1613 crypto_paramversion=paramversion,
8825be52
DGM
1614 logger=self.consoleLogger)
1615
1616 shutil.rmtree("source_dir")
188b845d
TJ
1617 shutil.copytree(self.GIT_DIR, "source_dir")
1618 shutil.copytree(self.GIT_DIR, "source_dir_diff")
8825be52
DGM
1619
1620 # create first backup
1621 deltatar.create_full_backup(
1622 source_path="source_dir",
1623 backup_path="backup_dir",
1624 max_volume_size=1)
1625
1626 prev_index_filename = deltatar.index_name_func(is_full=True)
1627 prev_index_path = os.path.join("backup_dir", prev_index_filename)
1628
1629 # alter the source_dir randomly
1630 source_it = deltatar._recursive_walk_dir('source_dir_diff')
1631
1632 for path in source_it:
1633 # if path doesn't exist (might have previously removed) ignore it.
1634 # also ignore it (i.e. do not change it) 70% of the time
1635 if not os.path.exists(path) or random.random() < 0.7:
1636 continue
1637
1638 # remove the file
1639 if os.path.isdir(path):
1640 shutil.rmtree(path)
1641 else:
1642 os.unlink(path)
1643
1644 deltatar.create_diff_backup("source_dir_diff", "backup_dir2",
1645 prev_index_path, max_volume_size=1)
1646
1647 # first restore initial backup in target_dir
1648 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1649 tar_path = os.path.join("backup_dir", tar_filename)
1650 deltatar.restore_backup("target_dir", backup_tar_path=tar_path)
1651
1652 # and check that target_dir equals to source_dir (which is the same as
188b845d 1653 # self.GIT_DIR initially)
8825be52
DGM
1654 self.check_equal_dirs('source_dir', 'target_dir', deltatar)
1655
1656 # then apply diff backup in target_dir
df86af81 1657 index_filename = deltatar.index_name_func(is_full=False)
cbac9f0b
ERE
1658 index_path = os.path.join("backup_dir2", index_filename)
1659 deltatar.restore_backup("target_dir",
1660 backup_indexes_paths=[index_path, prev_index_path])
1661
1662 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1663 # altered self.GIT_DIR directory)
cbac9f0b
ERE
1664 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1665
1666 # then delete target_dir and apply diff backup from zero and check again
1667 shutil.rmtree("target_dir")
1668 deltatar.restore_backup("target_dir",
1669 backup_indexes_paths=[index_path, prev_index_path])
1670
1671 # and check that target_dir equals to source_dir_diff (the randomly
188b845d 1672 # altered self.GIT_DIR directory)
cbac9f0b
ERE
1673 self.check_equal_dirs('source_dir_diff', 'target_dir', deltatar)
1674
1675 def check_equal_dirs(self, path1, path2, deltatar):
1676 '''
1677 compare the two directories source_dir and target_dir and check
1678 # they are the same
1679 '''
eb6d0069 1680 source_it = deltatar._recursive_walk_dir(path1, keep_base_dir=True)
df99a044 1681 source_it = deltatar.jsonize_path_iterator(source_it, strip=1)
eb6d0069 1682 target_it = deltatar._recursive_walk_dir(path2, keep_base_dir=True)
df99a044
ERE
1683 target_it = deltatar.jsonize_path_iterator(target_it, strip=1)
1684 while True:
1685 try:
be60ffd0
ERE
1686 sitem = next(source_it)
1687 titem = next(target_it)
df99a044
ERE
1688 except StopIteration:
1689 try:
be60ffd0 1690 titem = next(target_it)
df99a044
ERE
1691 raise Exception("iterators do not stop at the same time")
1692 except StopIteration:
1693 break
a638b8d7 1694 try:
5200d2aa 1695 assert deltatar._equal_stat_dicts(sitem[0], titem[0])
be60ffd0 1696 except Exception as e:
5200d2aa
TJ
1697 print("SITEM: " + str(sitem))
1698 print("TITEM: " + str(titem))
a638b8d7 1699 raise e
df99a044 1700
f5d9144b
PG
1701 def test_create_no_symlinks(self):
1702 '''
1703 Creates a full backup from different varieties of symlinks. The
1704 extracted archive may not contain any symlinks but the file contents
1705 '''
1706
1707 os.system("rm -rf source_dir")
1708 os.makedirs("source_dir/symlinks")
1709 fd = os.open("source_dir/symlinks/valid_linkname",
1710 os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644)
1711 os.write(fd, b"valid link target for symlink tests; please ignore\n")
1712 os.close(fd)
1713 # first one is good, the rest points nowhere
1714 self.create_symlink("valid_linkname", "source_dir/symlinks/whatever")
1715 self.create_symlink("/foo/bar/baz", "source_dir/symlinks/xyzzy")
1716 self.create_symlink("burp/../buzz", "source_dir/symlinks/blup")
1717 self.create_symlink("../../../../biz", "source_dir/symlinks/bleep")
f698c99c
PG
1718 password, paramversion = self.ENCRYPTION or (None, None)
1719 deltatar = DeltaTar(mode=self.MODE, password=password,
1720 crypto_paramversion=paramversion,
f5d9144b
PG
1721 logger=self.consoleLogger)
1722
1723 # create first backup
1724 deltatar.create_full_backup(source_path="source_dir",
1725 backup_path="backup_dir")
1726
1727 assert os.path.exists("backup_dir")
1728 shutil.rmtree("source_dir")
1729 assert not os.path.exists("source_dir")
1730
1731 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1732 tar_path = os.path.join("backup_dir", tar_filename)
1733
1734 deltatar.restore_backup(target_path="source_dir",
1735 backup_tar_path=tar_path)
1736
1737 for _r, _ds, fs in os.walk("source_dir/symlinks"):
1738 # only the valid link plus the linked file may be found in the
1739 # extracted archive
1740 assert len(fs) == 2
1741 for f in fs:
1742 # the link must have been resolved and file contents must match
1743 # the linked file
1744 assert not os.path.islink(f)
1745 with open("source_dir/symlinks/valid_linkname") as a:
1746 with open("source_dir/symlinks/whatever") as b:
1747 assert a.read() == b.read()
1748
83f5fd71
PG
1749 def test_restore_with_symlinks(self):
1750 '''
9b13f5c4
PG
1751 Creates a full backup containing different varieties of symlinks. All
1752 of them must be filtered out.
83f5fd71 1753 '''
f698c99c
PG
1754 password, paramversion = self.ENCRYPTION or (None, None)
1755 deltatar = DeltaTar(mode=self.MODE, password=password,
1756 crypto_paramversion=paramversion,
83f5fd71
PG
1757 logger=self.consoleLogger)
1758
1759 # create first backup
1760 deltatar.create_full_backup(source_path="source_dir",
1761 backup_path="backup_dir")
1762
1763 assert os.path.exists("backup_dir")
1764 shutil.rmtree("source_dir")
1765
1766 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1767 tar_path = os.path.join("backup_dir", tar_filename)
1768
1769 # add symlinks to existing archive
1770
9b13f5c4 1771 def add_symlink (a, name, dst):
83f5fd71
PG
1772 l = tarfile.TarInfo("snapshot://%s" % name)
1773 l.type = tarfile.SYMTYPE
1774 l.linkname = dst
1775 a.addfile(l)
9b13f5c4 1776 return name
83f5fd71 1777
5faea0e1
PG
1778 try:
1779 with tarfile.open(tar_path,mode="a") as a:
1780 checkme = \
1781 [ add_symlink(a, "symlinks/foo", "internal-file")
1782 , add_symlink(a, "symlinks/bar", "/absolute/path")
1783 , add_symlink(a, "symlinks/baz", "../parent/../../paths") ]
1784 except tarfile.ReadError as e:
1785 if self.MODE == '#' or self.MODE.endswith ("gz"):
1786 checkme = []
1787 else:
1788 raise
1789 except ValueError as e:
1790 if self.MODE.startswith ('#'):
1791 checkme = []
1792 else:
1793 raise
83f5fd71
PG
1794
1795 deltatar.restore_backup(target_path="source_dir",
1796 backup_tar_path=tar_path)
1797
1798 # check what happened to our symlinks
9b13f5c4
PG
1799 for name in checkme:
1800 fullpath = os.path.join("source_dir", name)
1801 assert not os.path.exists(fullpath)
f5d9144b 1802
43ce978b
PG
1803 def test_restore_malicious_symlinks(self):
1804 '''
1805 Creates a full backup containing a symlink and a file of the same name.
1806 This simulates a symlink attack with a link pointing to some external
1807 path that is abused to write outside the extraction prefix.
1808 '''
f698c99c
PG
1809 password, paramversion = self.ENCRYPTION or (None, None)
1810 deltatar = DeltaTar(mode=self.MODE, password=password,
1811 crypto_paramversion=paramversion,
43ce978b
PG
1812 logger=self.consoleLogger)
1813
1814 # create first backup
1815 deltatar.create_full_backup(source_path="source_dir",
1816 backup_path="backup_dir")
1817
1818 assert os.path.exists("backup_dir")
1819 shutil.rmtree("source_dir")
1820
1821 tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
1822 tar_path = os.path.join("backup_dir", tar_filename)
1823
1824 # add symlinks to existing archive
1825
1826 def add_symlink (a, name, dst):
1827 l = tarfile.TarInfo("snapshot://%s" % name)
1828 l.type = tarfile.SYMTYPE
1829 l.linkname = dst
1830 a.addfile(l)
1831
1832 def add_file (a, name):
1833 f = tarfile.TarInfo("snapshot://%s" % name)
1834 f.type = tarfile.REGTYPE
1835 a.addfile(f)
43ce978b
PG
1836
1837 testpath = "symlinks/pernicious-link"
9b13f5c4
PG
1838 testdst = "/tmp/does/not/exist"
1839
5faea0e1
PG
1840 try:
1841 with tarfile.open(tar_path, mode="a") as a:
1842 add_symlink(a, testpath, testdst)
1843 add_symlink(a, testpath, testdst+"X")
1844 add_symlink(a, testpath, testdst+"XXX")
1845 add_file(a, testpath)
1846 except tarfile.ReadError as e:
1847 if self.MODE == '#' or self.MODE.endswith ("gz"):
1848 pass
1849 else:
1850 raise
1851 except ValueError as e:
1852 if self.MODE.startswith ('#'):
1853 pass # O_APPEND of concat archives not feasible
1854 else:
1855 raise
43ce978b 1856
43ce978b
PG
1857 deltatar.restore_backup(target_path="source_dir",
1858 backup_tar_path=tar_path)
1859
9b13f5c4
PG
1860 # check whether the link was extracted; deltatar seems to only ever
1861 # retrieve the first item it finds for a given path which in the case
1862 # at hand is a symlink to some non-existent path
43ce978b 1863 fullpath = os.path.join("source_dir", testpath)
9b13f5c4 1864 assert not os.path.exists(fullpath)
43ce978b 1865
da26094a
ERE
1866class DeltaTar2Test(DeltaTarTest):
1867 '''
1868 Same as DeltaTar but with specific ":" mode
1869 '''
1870 MODE = ':'
1871
1872
1873class DeltaTarStreamTest(DeltaTarTest):
1874 '''
1875 Same as DeltaTar but with specific uncompressed stream mode
1876 '''
1877 MODE = '|'
1878
1879
1880class DeltaTarGzipTest(DeltaTarTest):
1881 '''
1882 Same as DeltaTar but with specific gzip mode
1883 '''
1884 MODE = ':gz'
8ea0be50 1885 MODE_COMPRESSES = True
da26094a
ERE
1886
1887
da26094a
ERE
1888class DeltaTarGzipStreamTest(DeltaTarTest):
1889 '''
1890 Same as DeltaTar but with specific gzip stream mode
1891 '''
1892 MODE = '|gz'
8ea0be50 1893 MODE_COMPRESSES = True
da26094a
ERE
1894
1895
bd011242
CH
1896@skip('Bz2 tests are too slow..')
1897class DeltaTarBz2Test(DeltaTarTest):
1898 '''
1899 Same as DeltaTar but with specific bz2 mode
1900 '''
1901 MODE = ':bz2'
8ea0be50 1902 MODE_COMPRESSES = True
ea6d3c3e 1903
bd011242
CH
1904
1905@skip('Bz2 tests are too slow..')
1906class DeltaTarBz2StreamTest(DeltaTarTest):
1907 '''
1908 Same as DeltaTar but with specific bz2 stream mode
1909 '''
1910 MODE = '|bz2'
8ea0be50 1911 MODE_COMPRESSES = True
da26094a
ERE
1912
1913
1914class DeltaTarGzipConcatTest(DeltaTarTest):
1915 '''
1916 Same as DeltaTar but with specific gzip concat stream mode
1917 '''
1918 MODE = '#gz'
8ea0be50 1919 MODE_COMPRESSES = True
da26094a
ERE
1920
1921
da26094a
ERE
1922class DeltaTarGzipAes128ConcatTest(DeltaTarTest):
1923 '''
1924 Same as DeltaTar but with specific gzip aes128 concat stream mode
1925 '''
f698c99c
PG
1926 MODE = '#gz'
1927 ENCRYPTION = ('some magic key', 1)
8ea0be50 1928 MODE_COMPRESSES = True
da26094a
ERE
1929
1930
ac5e4184
ERE
1931class DeltaTarAes128ConcatTest(DeltaTarTest):
1932 '''
1933 Same as DeltaTar but with specific aes128 concat stream mode
1934 '''
d1c38f40 1935 MODE = '#'
f698c99c 1936 ENCRYPTION = ('some magic key', 1)
ac5e4184
ERE
1937
1938