From 36a315a0a728bb79f1c827738e8dc4651bea0b32 Mon Sep 17 00:00:00 2001 From: Eduardo Robles Elvira Date: Thu, 20 Jun 2013 13:20:21 +0200 Subject: [PATCH] initial not fully functional pax multivol implementation --- deltatar/tarfile.py | 25 +++++++++++-- testing/test_multivol.py | 91 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 111 insertions(+), 5 deletions(-) diff --git a/deltatar/tarfile.py b/deltatar/tarfile.py index 72e9d8e..657404e 100644 --- a/deltatar/tarfile.py +++ b/deltatar/tarfile.py @@ -975,7 +975,8 @@ class TarInfo(object): "uname": self.uname, "gname": self.gname, "devmajor": self.devmajor, - "devminor": self.devminor + "devminor": self.devminor, + "offset_data": self.offset_data } if info["type"] == DIRTYPE and not info["name"].endswith("/"): @@ -1049,8 +1050,10 @@ class TarInfo(object): # Test string fields for values that exceed the field length or cannot # be represented in ASCII encoding. for name, hname, length in ( - ("name", "path", LENGTH_NAME), ("linkname", "linkpath", LENGTH_LINK), - ("uname", "uname", 32), ("gname", "gname", 32)): + ("name", "path", LENGTH_NAME), + ("linkname", "linkpath", LENGTH_LINK), + ("uname", "uname", 32), + ("gname", "gname", 32)): if hname in pax_headers: # The pax header has priority. @@ -1068,6 +1071,11 @@ class TarInfo(object): if len(info[name]) > length: pax_headers[hname] = val + if self.ismultivol(): + pax_headers["GNU.volume.filename"] = unicode(self.name) + pax_headers["GNU.volume.size"] = unicode(info['size'] - self.offset_data) + pax_headers["GNU.volume.offset"] = unicode(self.offset_data) + # Test number fields for values that exceed the field limit or values # that like to be stored as float. for name, digits in (("uid", 8), ("gid", 8), ("size", 12), ("mtime", 12)): @@ -1410,12 +1418,21 @@ class TarInfo(object): pax_headers[keyword] = value pos += length + # Fetch the next header. try: next = self.fromtarfile(tarfile) except HeaderError: raise SubsequentHeaderError("missing or bad subsequent header") + if next and next.type == GNUTYPE_MULTIVOL: + if "GNU.volume.filename" in pax_headers: + next.name = pax_headers["GNU.volume.filename"] + if "GNU.volume.size" in pax_headers: + next.size = int(pax_headers["GNU.volume.size"]) + #if "GNU.volume.offset" in pax_headers: + #next.offset_data = int(pax_headers["GNU.volume.offset"]) + if self.type in (XHDTYPE, SOLARIS_XHDTYPE): # Patch the TarInfo object with the extended header info. next._apply_pax_info(pax_headers, tarfile.encoding, tarfile.errors) @@ -1485,7 +1502,7 @@ class TarInfo(object): def isdev(self): return self.type in (CHRTYPE, BLKTYPE, FIFOTYPE) def ismultivol(self): - return self.type == GNUTYPE_MULTIVOL + return self.type == GNUTYPE_MULTIVOL or 'GNU.volume.offset' in self.pax_headers # class TarInfo class TarFile(object): diff --git a/testing/test_multivol.py b/testing/test_multivol.py index 3dae89e..00010a8 100644 --- a/testing/test_multivol.py +++ b/testing/test_multivol.py @@ -1,6 +1,6 @@ import sys, os, unittest, hashlib, random, string -from deltatar.tarfile import TarFile +from deltatar.tarfile import TarFile, PAX_FORMAT def new_volume_handler(tarobj, base_name, volume_number): volume_path = "%s.%d" % (base_name, volume_number) @@ -307,3 +307,92 @@ class MultivolTest(unittest.TestCase): for key, value in hash.iteritems(): assert os.path.exists(key) assert value == self.md5sum(key) + + def test_multivolume_pax_compress(self): + + # create the content of the file to compress and hash it + hash = self.create_file("big", 50000) + + # create the tar file with volumes + tarobj = TarFile.open("sample.tar", + mode="w", + format=PAX_FORMAT, + max_volume_size=30000, + new_volume_handler=new_volume_handler) + tarobj.add("big") + tarobj.close() + + # check that the tar volumes were correctly created + assert os.path.exists("sample.tar") + assert os.path.exists("sample.tar.1") + assert not os.path.exists("sample.tar.2") + + os.unlink("big") + assert not os.path.exists("big") + + # extract with normal tar and check output + os.system("tar xfM sample.tar --file=sample.tar.1") + assert os.path.exists("big") + assert hash == self.md5sum("big") + + def test_multivolume_pax_extract(self): + ''' + Create a volume with gnu tar command and extract it with our tarfiel lib + ''' + # create the content of the file to compress and hash it + hash = self.create_file("big", 5*1024*1024) + os.system("cp big big1") + + # create the tar file with volumes + os.system("tar cM -L 3M big --format=pax --file=sample.tar --file=sample.tar.1") + + # check that the tar volumes were correctly created + assert os.path.exists("sample.tar") + assert os.path.exists("sample.tar.1") + assert not os.path.exists("sample.tar.2") + + os.unlink("big") + assert not os.path.exists("big") + + # extract and check output + tarobj = TarFile.open("sample.tar", + mode="r", + new_volume_handler=new_volume_handler) + tarobj.extractall() + tarobj.close() + os.system("cp big big2") + assert os.path.exists("big") + assert hash == self.md5sum("big") + + def test_multivolume_pax_compress_extract(self): + + # create the content of the file to compress and hash it + hash = self.create_file("big", 50000) + os.system("cp big big1") + + # create the tar file with volumes + tarobj = TarFile.open("sample.tar", + mode="w", + format=PAX_FORMAT, + max_volume_size=30000, + new_volume_handler=new_volume_handler) + tarobj.add("big") + tarobj.close() + + # check that the tar volumes were correctly created + assert os.path.exists("sample.tar") + assert os.path.exists("sample.tar.1") + assert not os.path.exists("sample.tar.2") + + os.unlink("big") + assert not os.path.exists("big") + + # extract and check output + tarobj = TarFile.open("sample.tar", + mode="r", + new_volume_handler=new_volume_handler) + tarobj.extractall() + tarobj.close() + os.system("cp big big2") + assert os.path.exists("big") + assert hash == self.md5sum("big") -- 1.7.1