blocks, remainder = divmod(length, BUFSIZE)
for b in xrange(blocks):
buf = src.read(BUFSIZE)
+ dst.write(buf)
if len(buf) < BUFSIZE:
raise IOError("end of file reached")
- dst.write(buf)
if remainder != 0:
buf = src.read(remainder)
+ dst.write(buf)
if len(buf) < remainder:
raise IOError("end of file reached")
- dst.write(buf)
return
filemode_table = (
# Reconstruct a ustar longname.
if prefix and obj.type not in GNU_TYPES:
obj.name = prefix + "/" + obj.name
+ else:
+ obj.offset_data = nti(buf[369:381])
return obj
@classmethod
members = self
for tarinfo in members:
+ if self.volume_number > 0 and tarinfo.ismultivol():
+ continue
+
if tarinfo.isdir():
# Extract directories with a safe mode.
directories.append(tarinfo)
else:
tarinfo = member
- if tarinfo.isreg():
+ if tarinfo.isreg() or tarinfo.ismultivol():
return self.fileobject(self, tarinfo)
elif tarinfo.type not in SUPPORTED_TYPES:
"""Make a file called targetpath.
"""
source = self.extractfile(tarinfo)
- try:
- with bltn_open(targetpath, "wb") as target:
- copyfileobj(source, target)
- finally:
- source.close()
+ iterate = True
+ target = bltn_open(targetpath, "wb")
+
+ while iterate:
+ iterate = False
+ try:
+ copyfileobj(source, target, tarinfo.size)
+ except IOError:
+ source.close()
+ # only if we are extracting a multivolume this can be treated
+ if not self.new_volume_handler:
+ target.close()
+ raise Exception("We need to read a new volume and you"
+ " didn't supply a new_volume_handler")
+
+ # the new volume handler should do everything needed to
+ # start working in a new volume. usually, the handler calls
+ # to self.open_volume
+ self.volume_number += 1
+ self.new_volume_handler(self, self.base_name, self.volume_number)
+ tarinfo = self.firstmember
+ source = self.extractfile(tarinfo)
+ iterate = True
+
+ target.close()
+
def makeunknown(self, tarinfo, targetpath):
"""Make a file from a TarInfo object with an unknown type
def create_random_file(self, path, length):
f = open(path, 'w')
- s = string.lowercase + string.digits
- data = ''.join(random.sample(s, 1) * length)
+ s = string.lowercase + string.digits + "\n"
+ if len(s) < length:
+ s += s*(length/len(s))
+ data = s[:length]
f.write(data)
f.close()
Create a volume and extract it
'''
# create the content of the file to compress and hash it
- self.create_random_file("big", 50000)
+ self.create_random_file("big", 5*1024*1024)
hash = self.md5sum("big")
# create the tar file with volumes
tarobj = TarFile.open("sample.tar",
mode="w",
- max_volume_size=30000,
+ max_volume_size=3*1024*1024,
new_volume_handler=new_volume_handler)
tarobj.add("big")
tarobj.close()
# extract and check output
tarobj = TarFile.open("sample.tar",
- mode="w",
- max_volume_size=30000,
+ mode="r",
+ new_volume_handler=new_volume_handler)
+ tarobj.extractall()
+ tarobj.close()
+ assert os.path.exists("big")
+ assert hash == self.md5sum("big")
+
+ def test_volume_extract2(self):
+ '''
+ Create a volume with gnu tar command and extract it with our tarfiel lib
+ '''
+ # create the content of the file to compress and hash it
+ self.create_random_file("big", 5*1024*1024)
+ hash = self.md5sum("big")
+
+ # create the tar file with volumes
+ os.system("tar cM -L 3M big --file=sample.tar --file=sample.tar.1")
+
+ # check that the tar volumes were correctly created
+ assert os.path.exists("sample.tar")
+ assert os.path.exists("sample.tar.1")
+ assert not os.path.exists("sample.tar.2")
+
+ os.unlink("big")
+ assert not os.path.exists("big")
+
+ # extract and check output
+ tarobj = TarFile.open("sample.tar",
+ mode="r",
new_volume_handler=new_volume_handler)
tarobj.extractall()
tarobj.close()
assert os.path.exists("big")
assert hash == self.md5sum("big")
- # TODO: test_volume_extract2
- # TODO: test_volume_extract3
- # TODO: test creating a volume with gnu tar cmd and extract it with our tool
\ No newline at end of file
+
+ def test_volume_extract2(self):
+ '''
+ Create a volume with gnu tar command and extract it with our tarfiel lib
+ '''
+ # create the content of the file to compress and hash it
+ self.create_random_file("big", 5*1024*1024)
+ hash = self.md5sum("big")
+
+ # create the tar file with volumes
+ os.system("tar cM -L 3M big --file=sample.tar --file=sample.tar.1")
+
+ # check that the tar volumes were correctly created
+ assert os.path.exists("sample.tar")
+ assert os.path.exists("sample.tar.1")
+ assert not os.path.exists("sample.tar.2")
+
+ os.unlink("big")
+ assert not os.path.exists("big")
+
+ # extract and check output
+ tarobj = TarFile.open("sample.tar",
+ mode="r",
+ new_volume_handler=new_volume_handler)
+ tarobj.extractall()
+ tarobj.close()
+ assert os.path.exists("big")
+ assert hash == self.md5sum("big")
+
+
+ def test_multiple_files_volume(self):
+ # create the content of the file to compress and hash it
+
+ # create sample data
+ hash = dict()
+ self.create_random_file("big", 50000)
+ hash["big"] = self.md5sum("big")
+ self.create_random_file("small", 100)
+ hash["small"] = self.md5sum("small")
+ self.create_random_file("small2", 354)
+ hash["small2"] = self.md5sum("small2")
+
+ # create the tar file with volumes
+ tarobj = TarFile.open("sample.tar",
+ mode="w",
+ max_volume_size=20000,
+ new_volume_handler=new_volume_handler)
+ tarobj.add("big")
+ tarobj.add("small")
+ tarobj.add("small2")
+ tarobj.close()
+
+ # check that the tar volumes were correctly created
+ assert os.path.exists("sample.tar")
+ assert os.path.exists("sample.tar.1")
+ assert os.path.exists("sample.tar.2")
+ assert not os.path.exists("sample.tar.3")
+
+ os.unlink("big")
+ os.unlink("small")
+ os.unlink("small2")
+
+ # extract and check output
+ tarobj = TarFile.open("sample.tar",
+ mode="r",
+ new_volume_handler=new_volume_handler)
+ tarobj.extractall()
+ tarobj.close()
+ for key, value in hash.iteritems():
+ assert os.path.exists(key)
+ assert value == self.md5sum(key)
\ No newline at end of file