"uname": self.uname,
"gname": self.gname,
"devmajor": self.devmajor,
- "devminor": self.devminor
+ "devminor": self.devminor,
+ "offset_data": self.offset_data
}
if info["type"] == DIRTYPE and not info["name"].endswith("/"):
# Test string fields for values that exceed the field length or cannot
# be represented in ASCII encoding.
for name, hname, length in (
- ("name", "path", LENGTH_NAME), ("linkname", "linkpath", LENGTH_LINK),
- ("uname", "uname", 32), ("gname", "gname", 32)):
+ ("name", "path", LENGTH_NAME),
+ ("linkname", "linkpath", LENGTH_LINK),
+ ("uname", "uname", 32),
+ ("gname", "gname", 32)):
if hname in pax_headers:
# The pax header has priority.
if len(info[name]) > length:
pax_headers[hname] = val
+ if self.ismultivol():
+ pax_headers["GNU.volume.filename"] = unicode(self.name)
+ pax_headers["GNU.volume.size"] = unicode(info['size'] - self.offset_data)
+ pax_headers["GNU.volume.offset"] = unicode(self.offset_data)
+
# Test number fields for values that exceed the field limit or values
# that like to be stored as float.
for name, digits in (("uid", 8), ("gid", 8), ("size", 12), ("mtime", 12)):
pax_headers[keyword] = value
pos += length
+
# Fetch the next header.
try:
next = self.fromtarfile(tarfile)
except HeaderError:
raise SubsequentHeaderError("missing or bad subsequent header")
+ if next and next.type == GNUTYPE_MULTIVOL:
+ if "GNU.volume.filename" in pax_headers:
+ next.name = pax_headers["GNU.volume.filename"]
+ if "GNU.volume.size" in pax_headers:
+ next.size = int(pax_headers["GNU.volume.size"])
+ #if "GNU.volume.offset" in pax_headers:
+ #next.offset_data = int(pax_headers["GNU.volume.offset"])
+
if self.type in (XHDTYPE, SOLARIS_XHDTYPE):
# Patch the TarInfo object with the extended header info.
next._apply_pax_info(pax_headers, tarfile.encoding, tarfile.errors)
def isdev(self):
return self.type in (CHRTYPE, BLKTYPE, FIFOTYPE)
def ismultivol(self):
- return self.type == GNUTYPE_MULTIVOL
+ return self.type == GNUTYPE_MULTIVOL or 'GNU.volume.offset' in self.pax_headers
# class TarInfo
class TarFile(object):
import sys, os, unittest, hashlib, random, string
-from deltatar.tarfile import TarFile
+from deltatar.tarfile import TarFile, PAX_FORMAT
def new_volume_handler(tarobj, base_name, volume_number):
volume_path = "%s.%d" % (base_name, volume_number)
for key, value in hash.iteritems():
assert os.path.exists(key)
assert value == self.md5sum(key)
+
+ def test_multivolume_pax_compress(self):
+
+ # create the content of the file to compress and hash it
+ hash = self.create_file("big", 50000)
+
+ # create the tar file with volumes
+ tarobj = TarFile.open("sample.tar",
+ mode="w",
+ format=PAX_FORMAT,
+ max_volume_size=30000,
+ new_volume_handler=new_volume_handler)
+ tarobj.add("big")
+ tarobj.close()
+
+ # check that the tar volumes were correctly created
+ assert os.path.exists("sample.tar")
+ assert os.path.exists("sample.tar.1")
+ assert not os.path.exists("sample.tar.2")
+
+ os.unlink("big")
+ assert not os.path.exists("big")
+
+ # extract with normal tar and check output
+ os.system("tar xfM sample.tar --file=sample.tar.1")
+ assert os.path.exists("big")
+ assert hash == self.md5sum("big")
+
+ def test_multivolume_pax_extract(self):
+ '''
+ Create a volume with gnu tar command and extract it with our tarfiel lib
+ '''
+ # create the content of the file to compress and hash it
+ hash = self.create_file("big", 5*1024*1024)
+ os.system("cp big big1")
+
+ # create the tar file with volumes
+ os.system("tar cM -L 3M big --format=pax --file=sample.tar --file=sample.tar.1")
+
+ # check that the tar volumes were correctly created
+ assert os.path.exists("sample.tar")
+ assert os.path.exists("sample.tar.1")
+ assert not os.path.exists("sample.tar.2")
+
+ os.unlink("big")
+ assert not os.path.exists("big")
+
+ # extract and check output
+ tarobj = TarFile.open("sample.tar",
+ mode="r",
+ new_volume_handler=new_volume_handler)
+ tarobj.extractall()
+ tarobj.close()
+ os.system("cp big big2")
+ assert os.path.exists("big")
+ assert hash == self.md5sum("big")
+
+ def test_multivolume_pax_compress_extract(self):
+
+ # create the content of the file to compress and hash it
+ hash = self.create_file("big", 50000)
+ os.system("cp big big1")
+
+ # create the tar file with volumes
+ tarobj = TarFile.open("sample.tar",
+ mode="w",
+ format=PAX_FORMAT,
+ max_volume_size=30000,
+ new_volume_handler=new_volume_handler)
+ tarobj.add("big")
+ tarobj.close()
+
+ # check that the tar volumes were correctly created
+ assert os.path.exists("sample.tar")
+ assert os.path.exists("sample.tar.1")
+ assert not os.path.exists("sample.tar.2")
+
+ os.unlink("big")
+ assert not os.path.exists("big")
+
+ # extract and check output
+ tarobj = TarFile.open("sample.tar",
+ mode="r",
+ new_volume_handler=new_volume_handler)
+ tarobj.extractall()
+ tarobj.close()
+ os.system("cp big big2")
+ assert os.path.exists("big")
+ assert hash == self.md5sum("big")