GZ_METHOD_DEFLATE)
#---------------------------------------------------------
+# archive handling mode
+#---------------------------------------------------------
+
+ARCMODE_PLAIN = 0
+ARCMODE_ENCRYPT = 1 << 0
+ARCMODE_COMPRESS = 1 << 1
+ARCMODE_CONCAT = 1 << 2
+
+def arcmode_fmt (m):
+ if m == ARCMODE_PLAIN:
+ return "PLAIN"
+ first = True
+ ret = "["
+ def chkappend (b, s):
+ nonlocal m
+ nonlocal ret
+ nonlocal first
+ if m & b:
+ if first is True: first = False
+ else: ret += " |"
+ ret += " " + s
+ chkappend (ARCMODE_ENCRYPT, "ENCRYPT")
+ chkappend (ARCMODE_COMPRESS, "COMPRESS")
+ chkappend (ARCMODE_CONCAT, "CONCAT")
+ return ret + " ]"
+
+
+def arcmode_set (concat=False, encryption=None, comptype=None, init=ARCMODE_PLAIN):
+ ret = init
+ if bool (concat) is True:
+ ret |= ARCMODE_CONCAT
+ if encryption is not None:
+ ret |= ARCMODE_ENCRYPT
+ if comptype == "gz":
+ ret |= ARCMODE_COMPRESS
+ return ret
+
+#---------------------------------------------------------
# tarfile constants
#---------------------------------------------------------
# File types that tarfile supports:
remainder = -1 # track size in encrypted entries
def __init__(self, name, mode, comptype, fileobj, bufsize,
- concat_stream=False, encryption=None, enccounter=None,
+ concat=False, encryption=None, enccounter=None,
compresslevel=9):
"""Construct a _Stream object.
"""
+ self.arcmode = arcmode_set (concat, encryption, comptype)
+
self._extfileobj = True
if fileobj is None:
fileobj = _LowLevelFile(name, mode)
# stream interface
fileobj = _StreamProxy(fileobj)
comptype = fileobj.getcomptype()
+ if comptype == '':
+ comptype = "tar"
self.name = name or ""
self.mode = mode
self.concat_pos = 0
self.closed = False
self.flags = 0
- self.concat_stream = concat_stream
self.last_block_offset = 0
self.dbuf = b"" # ???
self.exception = None # communicate decompression failure
self.exception = zlib.error
self._init_read_gz()
elif mode == "w":
- if concat_stream is False:
- if self.encryption is not None:
+ if not (self.arcmode & ARCMODE_CONCAT):
+ if self.arcmode & ARCMODE_ENCRYPT:
self._init_write_encrypt (name, enccounter)
self._init_write_gz ()
self.crc = zlib.crc32(b"") & 0xFFFFffff
elif comptype == "bz2":
- if self.encryption is not None:
+ if self.arcmode & ARCMODE_ENCRYPT:
raise InvalidEncryptionError("encryption not available for "
- "compression %s" % comptype)
+ "compression “%s”" % comptype)
try:
import bz2
except ImportError:
self.cmp = bz2.BZ2Compressor()
elif comptype == 'xz':
- if self.encryption is not None:
+ if self.arcmode & ARCMODE_ENCRYPT:
raise InvalidEncryptionError("encryption not available for "
- "compression %s" % comptype)
+ "compression “%s”" % comptype)
try:
import lzma
except ImportError:
self.cmp = lzma.LZMACompressor()
elif comptype == "tar":
- if concat_stream is False \
+ if not (self.arcmode & ARCMODE_CONCAT) \
and mode == "w" \
- and self.encryption is not None:
+ and self.arcmode & ARCMODE_ENCRYPT:
self._init_write_encrypt (name, enccounter)
else:
- if self.encryption is not None:
+ if self.arcmode & ARCMODE_ENCRYPT:
raise InvalidEncryptionError("encryption not available for "
- "compression %s" % comptype)
+ "compression “%s”" % comptype)
raise CompressionError("unknown compression type %r" % comptype)
except:
self.close()
- def _init_write_encrypt (self, entry=None, counter=None,
- set_last_block_offset=False):
- """Save position for delayed write of header; fill the header location
- with dummy bytes."""
- if self.encryption is not None:
- # first thing, proclaim new object to the encryption context
- # secondly, assemble the header with the updated parameters
- # and commit it directly to the underlying stream, bypassing the
- # encryption layer in .__write().
- dummyhdr = self.encryption.next (entry, counter=counter)
- if dummyhdr is None:
- raise EncryptionError ("Crypto.next(): bad dummy header") # XXX
+ def next (self, name):
+ if self.arcmode & ARCMODE_COMPRESS:
+ if getattr (self, "cmp", None) is not None:
+ self._finalize_write_gz ()
+ if self.arcmode & ARCMODE_ENCRYPT:
+ self._finalize_write_encrypt ()
+ self._init_write_encrypt (name, set_last_block_offset=True)
+ if self.arcmode & ARCMODE_COMPRESS:
+ self._init_write_gz (set_last_block_offset =
+ not (self.arcmode & ARCMODE_ENCRYPT))
+ return self.last_block_offset
+
- self.lasthdr = self.fileobj.tell()
- self.__write_to_file(dummyhdr)
- if set_last_block_offset is True:
- self.last_block_offset = self.lasthdr
+ def next_volume (self, name):
+ # with non-concat modes, this is taken care by the _Stream
+ # ctor as invoked by the newvol handler
+ if self.arcmode & ARCMODE_COMPRESS:
+ if getattr (self, "cmp", None) is not None:
+ # e. g. compressed PAX header written
+ self._finalize_write_gz ()
+ if self.arcmode & ARCMODE_ENCRYPT:
+ self._init_write_encrypt (name)
+ if self.arcmode & ARCMODE_COMPRESS:
+ self._init_write_gz ()
+
+
+ def _init_write_encrypt (self, entry=None, set_last_block_offset=False):
+ """
+ Save position for delayed write of header; fill the header location
+ with dummy bytes.
+ """
+ # first thing, proclaim new object to the encryption context
+ # secondly, assemble the header with the updated parameters
+ # and commit it directly to the underlying stream, bypassing the
+ # encryption layer in .__write().
+ dummyhdr = self.encryption.next (entry, counter=self.enccounter)
+ if dummyhdr is None:
+ raise EncryptionError ("Crypto.next(): bad dummy header") # XXX
+ self.lasthdr = self.fileobj.tell()
+ self.__write_to_file(dummyhdr)
+ if set_last_block_offset is True:
+ self.last_block_offset = self.lasthdr
def _finalize_write_encrypt (self):
Returns the list of IV fixed parts as used during encryption.
"""
- if self.encryption is not None \
- and self.lasthdr is not None :
+ if self.lasthdr is not None:
self.__sync ()
pos0 = self.fileobj.tell ()
self.fileobj.seek_set (self.lasthdr)
the file
'''
tow = s
- if self.encryption is not None:
+ if self.arcmode & ARCMODE_ENCRYPT:
tow = self.encryption.process(s)
self.__write_to_file(tow)
if close_fileobj is True:
if self.mode == "w":
- if self.cmp is not None:
+ if self.arcmode & ARCMODE_COMPRESS:
self._finalize_write_gz ()
# end of Tar archive marker (two empty blocks) was written
self.__sync()
# finalize encryption last; no writes may be performed after
# this point
- self._finalize_write_encrypt ()
+ if self.arcmode & ARCMODE_ENCRYPT:
+ self._finalize_write_encrypt ()
if not self._extfileobj:
self.fileobj.close()
def _init_read_encrypt (self):
"""Initialize encryption for next entry in archive. Read a header and
notify the crypto context."""
- if self.encryption is not None:
+ if self.arcmode & ARCMODE_ENCRYPT:
lasthdr = self.fileobj.tell ()
try:
hdr = crypto.hdr_read_stream (self.fileobj)
"""
Finalize decryption.
"""
- if self.encryption is not None \
- and self.lasthdr is not None :
+ if self.arcmode & ARCMODE_ENCRYPT \
+ and self.lasthdr is not None :
assert self.remainder >= 0
if self.remainder > 0:
self.remainder = 0
# happens at the end of the file
# _init_read_gz failed in the previous iteration so
# self.cmp.decompress fails here
- if self.concat_stream:
+ if self.arcmode & ARCMODE_CONCAT:
pass
else:
raise ReadError("invalid compressed data")
- if self.comptype == "gz" and hasattr(self, "crc"):
+ if self.arcmode & ARCMODE_COMPRESS and hasattr(self, "crc"):
self.crc = self.zlib.crc32(buf, self.crc) & 0xFFFFffff
- if self.concat_stream and len(self.cmp.unused_data) != 0:
+ if self.arcmode & ARCMODE_CONCAT \
+ and len(self.cmp.unused_data) != 0:
self.buf = self.cmp.unused_data + self.buf
self.close(close_fileobj=False)
try:
t = [self.buf]
while c < size:
todo = size
- if self.encryption is not None:
+ if self.arcmode & ARCMODE_ENCRYPT:
if self.remainder <= 0:
# prepare next object
if self._init_read_encrypt () is False: # EOF
# only read up to the end of the encrypted object
todo = min (size, self.remainder)
buf = self.fileobj.read(todo)
- if self.encryption is not None:
+ if self.arcmode & ARCMODE_ENCRYPT:
# decrypt the thing
buf = self.encryption.process (buf)
if todo == self.remainder:
fileobject = ExFileObject # The file-object for extractfile().
- concat_compression = False # Used to separate in different zip members each
- # file, used for robustness.
+ arcmode = ARCMODE_PLAIN # Object processing mode (“concat”, encryption,
+ # compression)
save_to_members = True # If new members are saved. This can be disabled
# if you manage lots of files and don't want
tarinfo=None, dereference=None, ignore_zeros=None, encoding=None,
errors="surrogateescape", pax_headers=None, debug=None,
errorlevel=None, max_volume_size=None, new_volume_handler=None,
- concat_compression=False, nacl=None,
+ concat=False, nacl=None,
save_to_members=True):
"""Open an (uncompressed) tar archive `name'. `mode' is either 'r' to
read from an existing archive, 'a' to append data to an existing
if len(mode) > 1 or mode not in "raw":
raise ValueError("mode must be 'r', 'a' or 'w'")
self.mode = mode
- self.concat_compression = concat_compression
+ self.arcmode = arcmode_set (concat)
self.nacl = nacl
self._mode = {"r": "rb", "a": "r+b", "w": "wb"}[mode]
raise ValueError("mode must be 'r' or 'w'")
stream = _Stream(name, filemode, comptype, fileobj, bufsize,
- concat_stream=True, encryption=encryption,
+ concat=True, encryption=encryption,
compresslevel=compresslevel)
- if comptype != "tar":
- kwargs ["concat_compression"] = True
+ kwargs ["concat"] = True
try:
t = cls(name, filemode, stream, **kwargs)
except: # XXX except what?
tarinfo = copy.copy(tarinfo)
- if self.concat_compression is True:
- if getattr (self.fileobj, "cmp", None) is not None:
- self.fileobj._finalize_write_gz ()
- encrypt = getattr (self.fileobj, "encryption", None) is not None
- if encrypt is True:
- self.fileobj._finalize_write_encrypt ()
- self.fileobj._init_write_encrypt (tarinfo.name,
- set_last_block_offset=True)
- self.fileobj._init_write_gz (set_last_block_offset=not encrypt)
- self.last_block_offset = self.fileobj.last_block_offset
+ if self.arcmode & ARCMODE_CONCAT:
+ self.last_block_offset = self.fileobj.next (tarinfo.name)
else:
self.last_block_offset = self.fileobj.tell()
self.volume_tarinfo = None
- if self.concat_compression is True:
- # with non-concat modes, this is taken care by the _Stream
- # ctor as invoked by the newvol handler
- if getattr (self.fileobj, "cmp", None) is not None:
- # e. g. compressed PAX header written
- self.fileobj._finalize_write_gz ()
- if getattr (self.fileobj, "encryption", None) is not None:
- self.fileobj._init_write_encrypt (tarinfo.name)
- self.fileobj._init_write_gz ()
+ if self.arcmode & ARCMODE_CONCAT:
+ self.fileobj.next_volume (tarinfo.name)
# write new volume header
buf = tarinfo.tobuf(self.format, self.encoding, self.errors)
fileobj=None,
bufsize=self.fileobj.bufsize,
encryption=encryption,
- concat_stream=self.fileobj.concat_stream)
+ concat=self.fileobj.arcmode & ARCMODE_CONCAT)
else:
# here, we lose information about compression/encryption!
self._dbg(3, 'open_volume: builtin open')
# create the tar file with volumes
tarobj = TarFile.open("sample.tar.gz",
mode="w#gz",
- format=GNU_FORMAT,
- concat_compression=True)
+ format=GNU_FORMAT)
tarobj.add("big")
tarobj.close()
os.unlink("big")
# create the tar file with volumes
tarobj = TarFile.open("sample.tar.gz",
- mode="w#gz",
- concat_compression=True)
+ mode="w#gz")
tarobj.add("big")
tarobj.close()
os.unlink("big")
# create the tar file with volumes
tarobj = TarFile.open("sample.tar.gz",
- mode="w#gz",
- concat_compression=True)
+ mode="w#gz")
tarobj.add("big")
pos = tarobj.get_last_member_offset()
tarobj.close()
# create the tar file with volumes
tarobj = TarFile.open("sample.tar.gz",
- mode="w#gz",
- concat_compression=True)
+ mode="w#gz")
tarobj.add("big")
tarobj.add("small")
pos = tarobj.get_last_member_offset()
# create the tar file with volumes
tarobj = TarFile.open("sample.tar.gz",
mode="w#gz",
- concat_compression=True,
max_volume_size=1000000,
new_volume_handler=new_volume_handler)
tarobj.add("small")
fo = open("sample.tar.gz", 'rb')
fo.seek(pos)
tarobj = TarFile.open(mode="r#gz", fileobj=fo,
- concat_compression=True,
new_volume_handler=new_volume_handler_fo)
tarobj.extract(tarobj.next())
tarobj.close()
# create the tar file with volumes
tarobj = TarFile.open("sample.tar.gz",
- mode="w#gz",
- concat_compression=True)
+ mode="w#gz")
tarobj.add("big")
tarobj.add("small")
tarobj.add("small2")
hash["small2"] = self.create_file("small2", 354)
# create the tar file with volumes
- tarobj = TarFile.open("sample.tar.gz",
- mode="w#gz",
- concat_compression=True)
+ tarobj = TarFile.open("sample.tar.gz", mode="w#gz")
tarobj.add("big")
tarobj.add("small")
tarobj.add("small2")
# create the tar file with volumes
tarobj = TarFile.open("sample.tar.gz",
mode="w#gz",
- concat_compression=True,
max_volume_size=20000,
new_volume_handler=new_volume_handler)
tarobj.add("big")
hash["small2"] = self.create_file("small2", 354)
# create the tar file with volumes
- tarobj = TarFile.open("sample.tar.gz",
- mode="w#gz",
- concat_compression=True)
+ tarobj = TarFile.open("sample.tar.gz", mode="w#gz")
tarobj.add("big")
tarobj.add("small")
tarobj.add("small2")
hash["small2"] = self.create_file("small2", 354)
# create the tar file with volumes
- tarobj = TarFile.open("sample.tar.gz",
- mode="w#gz",
- concat_compression=True)
+ tarobj = TarFile.open("sample.tar.gz", mode="w#gz")
tarobj.add("big")
tarobj.add("small")
tarobj.add("small2")
hash["small2"] = self.create_file("small2", 354)
# create the tar file with volumes
- tarobj = TarFile.open("sample.tar.gz",
- mode="w#gz",
- concat_compression=True)
+ tarobj = TarFile.open("sample.tar.gz", mode="w#gz")
tarobj.add("big")
tarobj.add("small")
tarobj.add("small2")