assert not os.path.exists("big")
assert not os.path.exists("small2")
+ def test_concat_extract_one_fileobj_multivol(self):
+ '''
+ Create a tar file with multiple files inside and multiple volume,
+ using concat compression mode, then decompress a file spanning two
+ volumess with tarlib module using the fileobj parameter.
+ '''
+
+ # create the content of the file to compress and hash it
+ hash = dict()
+ hash["small"] = self.create_file("small", 100000)
+ hash["big"] = self.create_file("big", 1200000)
+
+ # create the tar file with volumes
+ tarobj = TarFile.open("sample.tar.gz",
+ mode="w#gz",
+ concat_compression=True,
+ max_volume_size=1000000,
+ new_volume_handler=new_volume_handler)
+ tarobj.add("small")
+ tarobj.add("big")
+ pos = tarobj.get_last_member_offset()
+ tarobj.close()
+
+ assert os.path.exists("sample.tar.gz")
+
+ os.unlink("big")
+ os.unlink("small")
+
+ def new_volume_handler_fo(tarobj, base_name, volume_number):
+ '''
+ Handles the new volumes, ignoring base_name as it'll be None because
+ we'll be using a seek fileobj.
+ '''
+ volume_path = "sample.tar.gz.%d" % volume_number
+ tarobj.open_volume(volume_path)
+
+ # extract only the "small" file
+ fo = open("sample.tar.gz", 'r')
+ fo.seek(pos)
+ tarobj = TarFile.open(mode="r#gz", fileobj=fo,
+ concat_compression=True,
+ new_volume_handler=new_volume_handler_fo)
+ tarobj.extract(tarobj.next())
+ tarobj.close()
+ assert os.path.exists("big")
+ assert hash['big'] == self.md5sum("big")
+
+ # we didn't extract the other files
+ assert not os.path.exists("small")
+
def test_multiple_files_zcat_extract(self):
'''
Create a tar file with only multiple files inside, using concat
import os
import shutil
import logging
+import binascii
+import json
+from datetime import datetime
+from functools import partial
+from deltatar.tarfile import TarFile, GNU_FORMAT
from deltatar.deltatar import DeltaTar
import filesplit
'''
Create base test data
'''
- os.system('rm -rf source_dir backup_dir')
+ os.system('rm -rf source_dir source_dir2 backup_dir huge')
os.makedirs('source_dir/test/test2')
self.hash = dict()
self.hash["source_dir/test/test2"] = ''
'''
Remove temporal files created by unit tests
'''
- os.system("rm -rf source_dir backup_dir")
+ os.system("rm -rf source_dir source_dir2 backup_dir huge")
- def test_create_simple_full_backup(self):
+ def test_restore_simple_full_backup(self):
'''
Creates a full backup without any filtering and restores it.
'''
'''
Creates a full backup and checks the index' checksum of files
'''
- import binascii
- import json
deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
logger=self.consoleLogger)
elif began_list:
crc = binascii.crc32(l, crc) & 0xffffffff
- def test_create_multivol(self):
+
+ def test_restore_multivol(self):
'''
- Creates a full backup without any filtering with multiple volumes.
+ Creates a full backup without any filtering with multiple volumes and
+ restore it.
'''
deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
logger=self.consoleLogger)
+
+ self.hash = dict()
+ os.makedirs('source_dir2')
+ self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
+ self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
+
# create first backup
deltatar.create_full_backup(
- source_path="source_dir",
+ source_path="source_dir2",
backup_path="backup_dir",
max_volume_size=1)
deltatar.volume_name_func("backup_dir", True, 0)))
assert os.path.exists(os.path.join("backup_dir",
deltatar.volume_name_func("backup_dir", True, 1)))
- assert os.path.exists(os.path.join("backup_dir",
- deltatar.volume_name_func("backup_dir", True, 2)))
- shutil.rmtree("source_dir")
+ shutil.rmtree("source_dir2")
tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
tar_path = os.path.join("backup_dir", tar_filename)
# this should automatically restore all volumes
- deltatar.restore_backup(target_path="source_dir",
+ deltatar.restore_backup(target_path="source_dir2",
backup_tar_path=tar_path)
for key, value in self.hash.iteritems():
if value:
assert value == self.md5sum(key)
+ def test_restore_multivol_manual_from_index(self):
+ '''
+ Creates a full backup without any filtering with multiple volumes and
+ restore it.
+ '''
+ # this test only works for uncompressed or concat compressed modes
+ if self.MODE.startswith(':') or self.MODE.startswith('|'):
+ return
+
+ deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
+ logger=self.consoleLogger)
+
+
+ self.hash = dict()
+ os.makedirs('source_dir2')
+ self.hash["source_dir2/big"] = self.create_file("source_dir2/big", 100000)
+ self.hash["source_dir2/huge"] = self.create_file("source_dir2/huge", 1200000)
+
+ # create first backup
+ deltatar.create_full_backup(
+ source_path="source_dir2",
+ backup_path="backup_dir",
+ max_volume_size=1)
+
+ assert os.path.exists("backup_dir")
+ assert os.path.exists(os.path.join("backup_dir",
+ deltatar.volume_name_func("backup_dir", True, 0)))
+ assert os.path.exists(os.path.join("backup_dir",
+ deltatar.volume_name_func("backup_dir", True, 1)))
+
+ shutil.rmtree("source_dir2")
+
+ tar_filename = deltatar.volume_name_func('backup_dir', True, 0)
+ tar_path = os.path.join("backup_dir", tar_filename)
+
+ index_filename = deltatar.index_name_func(True)
+ index_path = os.path.join("backup_dir", index_filename)
+
+ # this should automatically restore the huge file
+ f = open(index_path, 'r')
+ for l in f.readline():
+ data = json.loads(f.readline())
+ if data.get('type', '') == 'file' and data['path'] == "./huge":
+ offset = data['offset']
+ break
+
+ fo = open(tar_path, 'r')
+ fo.seek(offset)
+ def new_volume_handler(mode, tarobj, base_name, volume_number):
+ tarobj.open_volume(datetime.now().strftime(
+ "backup_dir/bfull-%y-%m-%d-%H%M-002.tar") +\
+ DeltaTar._DeltaTar__file_extensions_dict[mode])
+ new_volume_handler = partial(new_volume_handler, self.MODE)
+
+ tarobj = TarFile.open(mode="r" + self.MODE, fileobj=fo,
+ concat_compression=True,
+ new_volume_handler=new_volume_handler,
+ password=self.PASSWORD)
+ tarobj.extract(tarobj.next())
+ tarobj.close()
+ assert self.hash['source_dir2/huge'] == self.md5sum('huge')
+
+ os.unlink("huge")
def test_restore_from_index(self):
'''
if value:
assert value == self.md5sum(key)
+ def test_restore_multivol_from_index(self):
+ '''
+ Restores a full multivolume backup using an index file.
+ '''
+ # this test only works for uncompressed or concat compressed modes
+ if self.MODE.startswith(':') or self.MODE.startswith('|'):
+ return
+
+ deltatar = DeltaTar(mode=self.MODE, password=self.PASSWORD,
+ logger=self.consoleLogger)
+
+ # create first backup
+ deltatar.create_full_backup(
+ source_path="source_dir",
+ backup_path="backup_dir",
+ max_volume_size=1)
+
+ shutil.rmtree("source_dir")
+
+ # this should automatically restore all volumes
+ index_filename = deltatar.index_name_func(True)
+ index_path = os.path.join("backup_dir", index_filename)
+
+ deltatar.restore_backup(target_path="source_dir",
+ backup_indexes_paths=[index_path])
+
+ for key, value in self.hash.iteritems():
+ assert os.path.exists(key)
+ if value:
+ assert value == self.md5sum(key)
class DeltaTar2Test(DeltaTarTest):
'''