"""
def __init__(self, name, mode):
- mode = {
+ _mode = {
"r": os.O_RDONLY,
"w": os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
}[mode]
if hasattr(os, "O_BINARY"):
- mode |= os.O_BINARY
- self.fd = os.open(name, mode, 0666)
+ _mode |= os.O_BINARY
+ self.fd = os.open(name, _mode, 0666)
+ self.offset = 0
def close(self):
os.close(self.fd)
def read(self, size):
- return os.read(self.fd, size)
+ ret = os.read(self.fd, size)
+ self.offset += len(ret)
+ return ret
def write(self, s):
+ self.offset += len(s)
os.write(self.fd, s)
+ def tell(self):
+ return self.offset
+
class _Stream:
"""Class that serves as an adapter between TarFile and
a stream-like object. The stream-like object only
self.name = self.name[:-3]
self.__write(self.name + NUL)
- def new_compression_block(self):
+ def new_compression_block(self, set_last_block_offset=False):
'''
Used to notify a new tar block is coming to create a new zip block
'''
if self.mode != "w":
raise CompressionError("new compression blocks can only be added in mode 'w'")
if self.comptype == "gz":
- self._new_gz_block()
+ self._new_gz_block(set_last_block_offset)
else:
raise CompressionError("Concat compression only available for comptype 'gz'")
- def _new_gz_block(self):
+ def _new_gz_block(self, set_last_block_offset=False):
'''
Add a new gzip block, closing last one
'''
# if aes, we encrypt after compression
if self.enctype == 'aes':
self.__write_to_file(self.encryption.close_enc())
+ if set_last_block_offset:
+ self.last_block_offset = self.fileobj.tell()
self.encryption = aescrypto.AESCrypt(self.password,
key_length=self.key_length)
self.encryption.init()
self.__write_to_file(self.encryption.salt_str)
+ elif set_last_block_offset:
+ self.last_block_offset = self.fileobj.tell()
+ self.fileobj
timestamp = struct.pack("<L", long(time.time()))
self.__write("\037\213\010\000%s\002\377" % timestamp)
# scan the whole archive.
return self.members
+ def get_last_member_offset(self):
+ """Return the last member offset. Usually this is self.fileobj.tell(),
+ but when there's encryption or concat compression going on it's more
+ complicated than that.
+ """
+ if isinstance(self.fileobj, _Stream):
+ return self.fileobj.last_block_offset
+ else:
+ return self.fileobj.tell()
+
+
def getnames(self):
"""Return the members of the archive as a list of their names. It has
the same order as the list returned by getmembers().
tarinfo = copy.copy(tarinfo)
if self.concat_compression:
- self.fileobj.new_compression_block()
+ self.fileobj.new_compression_block(set_last_block_offset=True)
buf = tarinfo.tobuf(self.format, self.encoding, self.errors)
self.fileobj.write(buf)
assert os.path.exists("big")
assert hash == self.md5sum("big")
+ def test_concat_extract_fileobj(self):
+ '''
+ Create a tar file with only one file inside, using concat compression
+ mode, then decompress it with tarlib module using the fileobj parameter.
+ '''
+
+ # create the content of the file to compress and hash it
+ hash = self.create_file("big", 50000)
+
+ # create the tar file with volumes
+ tarobj = TarFile.open("sample.tar.gz",
+ mode="w#gz",
+ concat_compression=True)
+ tarobj.add("big")
+ pos = tarobj.get_last_member_offset()
+ tarobj.close()
+ os.unlink("big")
+
+ fo = open("sample.tar.gz", 'r')
+ fo.seek(pos)
+ tarobj = TarFile.open(mode="r#gz", fileobj=fo)
+ tarobj.extract(tarobj.next())
+ tarobj.close()
+ assert os.path.exists("big")
+ assert hash == self.md5sum("big")
+
+ def test_concat_extract_one_fileobj(self):
+ '''
+ Create a tar file with multiple files inside, using concat compression
+ mode, then decompress it with tarlib module using the fileobj parameter.
+ '''
+
+ # create the content of the file to compress and hash it
+ hash = dict()
+ hash["big"] = self.create_file("big", 50000)
+ hash["small"] = self.create_file("small", 100)
+ hash["small2"] = self.create_file("small2", 354)
+
+ # create the tar file with volumes
+ tarobj = TarFile.open("sample.tar.gz",
+ mode="w#gz",
+ concat_compression=True)
+ tarobj.add("big")
+ tarobj.add("small")
+ pos = tarobj.get_last_member_offset()
+ tarobj.add("small2")
+ tarobj.close()
+
+ assert os.path.exists("sample.tar.gz")
+
+ os.unlink("big")
+ os.unlink("small")
+ os.unlink("small2")
+
+ # extract only the "small" file
+ fo = open("sample.tar.gz", 'r')
+ fo.seek(pos)
+ tarobj = TarFile.open(mode="r#gz", fileobj=fo)
+ tarobj.extract(tarobj.next())
+ tarobj.close()
+ assert os.path.exists("small")
+ assert hash['small'] == self.md5sum("small")
+
+ # we didn't extract the other files
+ assert not os.path.exists("big")
+ assert not os.path.exists("small2")
+
def test_multiple_files_zcat_extract(self):
'''
Create a tar file with only multiple files inside, using concat