#!/usr/bin/env python3 #------------------------------------------------------------------- # tarfile.py #------------------------------------------------------------------- # Copyright (C) 2002 Lars Gustäbel # All rights reserved. # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation # files (the "Software"), to deal in the Software without # restriction, including without limitation the rights to use, # copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the # Software is furnished to do so, subject to the following # conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES # OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR # OTHER DEALINGS IN THE SOFTWARE. # """Read from and write to tar format archives. """ __version__ = "$Revision: 85213 $" # $Source$ version = "0.9.0" __author__ = "Lars Gustäbel (lars@gustaebel.de)" __date__ = "$Date$" __cvsid__ = "$Id$" __credits__ = "Gustavo Niemeyer, Niels Gustäbel, Richard Townsend, Eduardo Robles." #--------- # Imports #--------- import binascii import copy import errno import functools import io import mmap import operator import os import re import shutil import stat import struct import sys import time import traceback # XXX from . import crypto try: import grp, pwd except ImportError: grp = pwd = None # os.symlink on Windows prior to 6.0 raises NotImplementedError symlink_exception = (AttributeError, NotImplementedError) try: # OSError (winerror=1314) will be raised if the caller does not hold the # SeCreateSymbolicLinkPrivilege privilege symlink_exception += (OSError,) except NameError: pass # from tarfile import * __all__ = ["TarFile", "TarInfo", "is_tarfile", "TarError"] from builtins import open as _open # Since 'open' is TarFile.open #--------------------------------------------------------- # tar constants #--------------------------------------------------------- NUL = b"\0" # the null character BLOCKSIZE = 512 # length of processing blocks RECORDSIZE = BLOCKSIZE * 20 # length of records GNU_MAGIC = b"ustar \0" # magic gnu tar string POSIX_MAGIC = b"ustar\x0000" # magic posix tar string LENGTH_NAME = 100 # maximum length of a filename LENGTH_LINK = 100 # maximum length of a linkname LENGTH_PREFIX = 155 # maximum length of the prefix field REGTYPE = b"0" # regular file AREGTYPE = b"\0" # regular file LNKTYPE = b"1" # link (inside tarfile) SYMTYPE = b"2" # symbolic link CHRTYPE = b"3" # character special device BLKTYPE = b"4" # block special device DIRTYPE = b"5" # directory FIFOTYPE = b"6" # fifo special device CONTTYPE = b"7" # contiguous file GNUTYPE_LONGNAME = b"L" # GNU tar longname GNUTYPE_LONGLINK = b"K" # GNU tar longlink GNUTYPE_SPARSE = b"S" # GNU tar sparse file GNUTYPE_MULTIVOL = b"M" # GNU tar continuation of a file that began on # another volume XHDTYPE = b"x" # POSIX.1-2001 extended header XGLTYPE = b"g" # POSIX.1-2001 global header SOLARIS_XHDTYPE = b"X" # Solaris extended header USTAR_FORMAT = 0 # POSIX.1-1988 (ustar) format GNU_FORMAT = 1 # GNU tar format PAX_FORMAT = 2 # POSIX.1-2001 (pax) format DEFAULT_FORMAT = GNU_FORMAT GZ_FMT_HEADER = b"= 0: s = bytearray([0o200]) else: s = bytearray([0o377]) n = 256 ** digits + n for i in range(digits - 1): s.insert(1, n & 0o377) n >>= 8 else: raise ValueError("overflow in number field") return s def calc_chksums(buf): """Calculate the checksum for a member's header by summing up all characters except for the chksum field which is treated as if it was filled with spaces. According to the GNU tar sources, some tars (Sun and NeXT) calculate chksum with signed char, which will be different if there are chars in the buffer with the high bit set. So we calculate two checksums, unsigned and signed. """ unsigned_chksum = 256 + sum(struct.unpack_from("148B8x356B", buf)) signed_chksum = 256 + sum(struct.unpack_from("148b8x356b", buf)) return unsigned_chksum, signed_chksum def copyfileobj(src, dst, length=None): """Copy length bytes from fileobj src to fileobj dst. If length is None, copy the entire content. """ if length == 0: return if length is None: shutil.copyfileobj(src, dst) return blocks, remainder = divmod(length, BUFSIZE) for b in range(blocks): buf = src.read(BUFSIZE) dst.write(buf) if len(buf) < BUFSIZE: raise OSError("end of file reached") if remainder != 0: buf = src.read(remainder) dst.write(buf) if len(buf) < remainder: raise OSError("end of file reached") def filemode(mode): """Deprecated in this location; use stat.filemode.""" import warnings warnings.warn("deprecated in favor of stat.filemode", DeprecationWarning, 2) return stat.filemode(mode) class TarError(Exception): """Base exception.""" pass class ExtractError(TarError): """General exception for extract errors.""" pass class ReadError(TarError): """Exception for unreadable tar archives.""" pass class CompressionError(TarError): """Exception for unavailable compression methods.""" pass class StreamError(TarError): """Exception for unsupported operations on stream-like TarFiles.""" pass class HeaderError(TarError): """Base exception for header errors.""" pass class EmptyHeaderError(HeaderError): """Exception for empty headers.""" pass class TruncatedHeaderError(HeaderError): """Exception for truncated headers.""" pass class EOFHeaderError(HeaderError): """Exception for end of file headers.""" pass class InvalidHeaderError(HeaderError): """Exception for invalid headers.""" pass class SubsequentHeaderError(HeaderError): """Exception for missing and invalid extended headers.""" pass class InvalidEncryptionError(TarError): """Exception for undefined crypto modes and combinations.""" pass class DecryptionError(TarError): """Exception for error during decryption.""" pass class EncryptionError(TarError): """Exception for error during encryption.""" pass class EndOfFile(Exception): """Signal end of file condition when they’re not an error.""" pass #--------------------------- # internal stream interface #--------------------------- class _LowLevelFile: """Low-level file object. Supports reading and writing. It is used instead of a regular file object for streaming access. """ def __init__(self, name, mode): _mode = { "r": os.O_RDONLY, "w": os.O_RDWR | os.O_CREAT | os.O_TRUNC, }[mode] if hasattr(os, "O_BINARY"): _mode |= os.O_BINARY # pylint: disable=no-member self.fd = os.open(name, _mode, 0o666) self.offset = 0 def close(self): os.close(self.fd) def read(self, size): ret = os.read(self.fd, size) self.offset += len(ret) return ret def write(self, s, pos=None): if pos is not None: p0 = self.offset os.lseek (self.fd, pos, os.SEEK_SET) n = os.write(self.fd, s) if pos is None: self.offset += len(s) else: append = pos + n - p0 if append > 0: self.offset += append os.lseek (self.fd, p0, os.SEEK_SET) def tell(self): return self.offset def seek_set (self, pos): os.lseek (self.fd, pos, os.SEEK_SET) self.offset = pos def gz_header (name=None): timestamp = int(time.time()) flags = 0x0 if name is None: name = b"" else: flags |= GZ_FLAG_FNAME if type(name) is str: name = name.encode("iso-8859-1", "replace") if name.endswith(b".pdtcrypt"): name = name[:-9] if name.endswith(b".gz"): name = name[:-3] # RFC1952 says we must use ISO-8859-1 for the FNAME field. name += NUL hdr = struct.pack (GZ_FMT_HEADER, GZ_MAGIC [0], GZ_MAGIC [1], GZ_METHOD_DEFLATE, flags, timestamp, GZ_DEFLATE_FLAGS, GZ_OS_CODE) return hdr + name class _Stream: """Class that serves as an adapter between TarFile and a stream-like object. The stream-like object only needs to have a read() or write() method and is accessed blockwise. Use of gzip or bzip2 compression is possible. A stream-like object could be for example: sys.stdin, sys.stdout, a socket, a tape device etc. _Stream is intended to be used only internally but is nevertherless used externally by Deltatar. When encrypting, the ``enccounter`` will be used for initializing the first cryptographic context. When decrypting, its value will be compared to the decrypted object. Decryption fails if the value does not match. In effect, this means that a ``_Stream`` whose ctor was passed ``enccounter`` can only be used to encrypt or decrypt a single object. """ remainder = -1 # track size in encrypted entries tolerance = TOLERANCE_STRICT def __init__(self, name, mode, comptype, fileobj, bufsize, concat=False, encryption=None, enccounter=None, compresslevel=9, tolerance=TOLERANCE_STRICT): """Construct a _Stream object. """ self.arcmode = arcmode_set (concat, encryption, comptype) self.tolerance = tolerance self._extfileobj = True if fileobj is None: fileobj = _LowLevelFile(name, mode) self._extfileobj = False if comptype == '*': # Enable transparent compression detection for the # stream interface fileobj = _StreamProxy(fileobj) comptype = fileobj.getcomptype() if comptype == '': comptype = "tar" self.enccounter = None if self.arcmode & ARCMODE_ENCRYPT: self.enccounter = enccounter self.name = name or "" self.mode = mode self.comptype = comptype self.cmp = None self.fileobj = fileobj self.bufsize = bufsize self.buf = b"" self.pos = 0 self.concat_pos = 0 self.closed = False self.flags = 0 self.last_block_offset = 0 self.dbuf = b"" # ??? self.exception = None # communicate decompression failure self.compresslevel = compresslevel self.bytes_written = 0 # crypto parameters self.encryption = encryption self.lasthdr = None if encryption is not None: encryption.reset_last_iv () try: if comptype == "gz": try: import zlib except ImportError: raise CompressionError("zlib module is not available") self.zlib = zlib if mode == "r": self.exception = zlib.error self._init_read_gz() elif mode == "w": if not (self.arcmode & ARCMODE_CONCAT): if self.arcmode & ARCMODE_ENCRYPT: self._init_write_encrypt (name) self._init_write_gz () self.crc = zlib.crc32(b"") & 0xFFFFffff elif comptype == "bz2": if self.arcmode & ARCMODE_ENCRYPT: raise InvalidEncryptionError("encryption not available for " "compression “%s”" % comptype) try: import bz2 except ImportError: raise CompressionError("bz2 module is not available") if mode == "r": self.dbuf = b"" self.cmp = bz2.BZ2Decompressor() self.exception = OSError else: self.cmp = bz2.BZ2Compressor() elif comptype == 'xz': if self.arcmode & ARCMODE_ENCRYPT: raise InvalidEncryptionError("encryption not available for " "compression “%s”" % comptype) try: import lzma except ImportError: raise CompressionError("lzma module is not available") if mode == "r": self.dbuf = b"" self.cmp = lzma.LZMADecompressor() self.exception = lzma.LZMAError else: self.cmp = lzma.LZMACompressor() elif comptype == "tar": if not (self.arcmode & ARCMODE_CONCAT) \ and mode == "w" \ and self.arcmode & ARCMODE_ENCRYPT: self._init_write_encrypt (name) else: if self.arcmode & ARCMODE_ENCRYPT: raise InvalidEncryptionError("encryption not available for " "compression “%s”" % comptype) raise CompressionError("unknown compression type %r" % comptype) except: if not self._extfileobj: self.fileobj.close() self.closed = True raise def __del__(self): if hasattr(self, "closed") and not self.closed: try: self.close() except crypto.InternalError: # context already finalized due to abort but close() tried # to use it pass def next (self, name): if self.arcmode & ARCMODE_COMPRESS: if getattr (self, "cmp", None) is not None: self._finalize_write_gz () self.__sync() if self.arcmode & ~(ARCMODE_ENCRYPT | ARCMODE_COMPRESS): self.last_block_offset = self.fileobj.tell() if self.arcmode & ARCMODE_ENCRYPT: self._finalize_write_encrypt () self._init_write_encrypt (name, set_last_block_offset=True) if self.arcmode & ARCMODE_COMPRESS: self._init_write_gz (set_last_block_offset = not (self.arcmode & ARCMODE_ENCRYPT)) return self.last_block_offset def next_volume (self, name): # with non-concat modes, this is taken care by the _Stream # ctor as invoked by the newvol handler if self.arcmode & ARCMODE_COMPRESS: if getattr (self, "cmp", None) is not None: # e. g. compressed PAX header written self._finalize_write_gz () if self.arcmode & ARCMODE_ENCRYPT: self._init_write_encrypt (name) if self.arcmode & ARCMODE_COMPRESS: self._init_write_gz () def _init_write_encrypt (self, entry=None, set_last_block_offset=False): """ Save position for delayed write of header; fill the header location with dummy bytes. """ # first thing, proclaim new object to the encryption context # secondly, assemble the header with the updated parameters # and commit it directly to the underlying stream, bypassing the # encryption layer in .__write(). dummyhdr = self.encryption.next (entry, counter=self.enccounter) if dummyhdr is None: raise EncryptionError ("Crypto.next(): bad dummy header") # XXX self.lasthdr = self.fileobj.tell() self.__write_to_file(dummyhdr) if set_last_block_offset is True: self.last_block_offset = self.lasthdr def _finalize_write_encrypt (self): """ Seek back to header position, read dummy bytes, finalize crypto obtaining the actual header, write header, seek back to current position. Returns the list of IV fixed parts as used during encryption. """ if self.lasthdr is not None: pos0 = self.fileobj.tell () self.fileobj.seek_set (self.lasthdr) dummy = self.fileobj.read (crypto.PDTCRYPT_HDR_SIZE) pos1 = self.fileobj.tell () dpos = pos1 - self.lasthdr assert dpos == crypto.PDTCRYPT_HDR_SIZE self.fileobj.seek_set (pos0) data, hdr, _ = self.encryption.done (dummy) self.__write_to_file(hdr, pos=self.lasthdr) self.__write_to_file(data) # append remainder of data self.lasthdr = -1 def _finalize_write_gz (self): if self.cmp is not None: chunk = self.buf + self.cmp.flush() if chunk: if self.comptype == "gz": # The native zlib crc is an unsigned 32-bit integer, but # the Python wrapper implicitly casts that to a signed C # long. So, on a 32-bit box self.crc may "look negative", # while the same crc on a 64-bit box may "look positive". # To avoid irksome warnings from the `struct` module, force # it to look positive on all boxes. chunk += struct.pack(" self.bufsize: self.__enc_write(self.buf[:self.bufsize]) self.buf = self.buf[self.bufsize:] def __write_to_file(self, s, pos=None): ''' Writes directly to the fileobj; updates self.bytes_written. If “pos” is given, the stream will seek to that position first and back afterwards, and the total of bytes written is not updated. ''' self.fileobj.write(s, pos) if pos is None: self.bytes_written += len(s) def __enc_write(self, s): """ If encryption is active, the string s is encrypted before being written to the file. """ if len (s) == 0: return if self.arcmode & ARCMODE_ENCRYPT: buf = s while len (buf) > 0: n, ct = self.encryption.process(buf) self.__write_to_file(ct) buf = buf [n:] if len (buf) > 0: # The entire plaintext was not consumed: The size limit # for encrypted objects was reached. Transparently create # a new encrypted object and continue processing the input. self._finalize_write_encrypt () self._init_write_encrypt () else: self.__write_to_file(s) def estim_file_size(self): """ estimates size of file if closing it now The result may differ greatly from the amount of data sent to write() due to compression, encryption and buffering. In tests the result (before calling close()) was up to 12k smaller than the final file size if compression is being used because zlib/bz2 compressors do not allow inspection of their buffered data :-( Still, we add what close() would add: 8 bytes for gz checksum, one encryption block size if encryption is used and the size of our own buffer """ if self.closed: return self.bytes_written result = self.bytes_written if self.buf: result += len(self.buf) if self.comptype == 'gz': result += 8 # 2 longs = 8 byte (no extra info written for bzip2) return result def close(self, close_fileobj=True): """Close the _Stream object. No operation should be done on it afterwards. """ if self.closed: return if close_fileobj is True: if self.mode == "w": if self.arcmode & ARCMODE_COMPRESS: self._finalize_write_gz () # end of Tar archive marker (two empty blocks) was written # finalize encryption last; no writes may be performed after # this point self.__sync () if self.arcmode & ARCMODE_ENCRYPT: self._finalize_write_encrypt () if not self._extfileobj: self.fileobj.close() else: # read the zlib crc and length and check them if self.mode == "r" and self.comptype == "gz": read_crc = self.__read(4) read_length = self.__read(4) calculated_crc = self.crc if struct.unpack("= 0 if self.remainder > 0: self.remainder = 0 try: data = self.encryption.done () except crypto.InvalidGCMTag as exn: raise DecryptionError ("decryption failed: %s" % exn) return data def tell(self): """Return the stream's file pointer position. """ return self.pos def seek(self, pos=0): """Set the stream's file pointer to pos. Negative seeking is forbidden. """ if pos == self.pos: pass # nothing to do elif pos - self.pos >= 0: blocks, remainder = divmod(pos - self.pos, self.bufsize) if self.encryption is not None: # IV succession is only preserved between successive objects. self.encryption.reset_last_iv () for i in range(blocks): self.read(self.bufsize) self.read(remainder) else: raise StreamError("seeking backwards is not allowed") return self.pos def read(self, size=None): """Return the next size number of bytes from the stream. If size is not defined, return all bytes of the stream up to EOF. """ if size is None: t = [] while True: buf = self._read(self.bufsize) if not buf: break t.append(buf) buf = b"".join(t) else: buf = self._read(size) self.pos += len(buf) return buf def readline(self): """Reads just one line, new line character included """ # if \n in dbuf, no read neads to be done if b'\n' in self.dbuf: pos = self.dbuf.index(b'\n') + 1 ret = self.dbuf[:pos] self.dbuf = self.dbuf[pos:] return ret buf = [] while True: chunk = self._read(self.bufsize) # nothing more to read, so return the buffer if not chunk: return b''.join(buf) buf.append(chunk) # if \n found, return the new line if b'\n' in chunk: dbuf = b''.join(buf) pos = dbuf.index(b'\n') + 1 self.dbuf = dbuf[pos:] + self.dbuf return dbuf[:pos] def _read(self, size): """Return size bytes from the stream. """ c = len(self.dbuf) t = [self.dbuf] while c < size: buf = self.__read(self.bufsize) if not buf: break if self.cmp is not None: try: buf = self.cmp.decompress(buf) except self.exception as exn: raise ReadError("invalid compressed data (%r)" % exn) except Exception as e: # happens at the end of the file # _init_read_gz failed in the previous iteration so # self.cmp.decompress fails here if self.arcmode & ARCMODE_CONCAT: pass else: raise ReadError("invalid compressed data") if self.arcmode & ARCMODE_COMPRESS and hasattr(self, "crc"): self.crc = self.zlib.crc32(buf, self.crc) & 0xFFFFffff if self.arcmode & ARCMODE_CONCAT \ and len(self.cmp.unused_data) != 0: self.buf = self.cmp.unused_data + self.buf self.close(close_fileobj=False) try: self._init_read_gz() except DecryptionError: if self.tolerance != TOLERANCE_STRICT: # return whatever data was processed successfully if len (buf) > 0: t.append (buf) if len (t) > 0: break raise except ReadError: # gzip troubles if self.tolerance == TOLERANCE_RESCUE: if len (buf) > 0: t.append (buf) if len (t) > 0: break raise except EndOfFile: # happens at the end of the file pass self.crc = self.zlib.crc32(b"") & 0xFFFFffff self.closed = False t.append(buf) c += len(buf) t = b"".join(t) self.dbuf = t[size:] return t[:size] def __read(self, size): """ Return size bytes from stream. If internal buffer is empty, read another block from the stream. The function returns up to size bytes of data. When an error occurs during decryption, everything until the end of the last successfully finalized object is returned. """ c = len(self.buf) t = [self.buf] if c > 0 else [] good_crypto = len (t) while c < size: todo = size try: if self.arcmode & ARCMODE_ENCRYPT: if self.remainder <= 0: # prepare next object if self._init_read_encrypt () is False: # EOF buf = None break # while # only read up to the end of the encrypted object todo = min (size, self.remainder) buf = self.fileobj.read(todo) if self.arcmode & ARCMODE_ENCRYPT: # decrypt the thing buf = self._read_encrypt (buf) if todo == self.remainder: # at the end of a crypto object; finalization will fail if # the GCM tag does not match trailing = self._finalize_read_encrypt () good_crypto = len (t) + 1 if len (trailing) > 0: buf += trailing self.remainder = 0 else: self.remainder -= todo except DecryptionError: if self.tolerance == TOLERANCE_STRICT: raise self.encryption.drop () if self.tolerance == TOLERANCE_RECOVER: if good_crypto == 0: raise # this may occur at any of the three crypto operations above. # some objects did validate; discard all data after it; next # call will start with the bad object and error out immediately self.buf = b"".join (t [good_crypto:]) return b"".join (t [:good_crypto]) elif self.tolerance == TOLERANCE_RESCUE: # keep what we have so far despite the finalization issue t.append (buf) c += len (buf) break else: raise RuntimeError("internal error: bad tolerance level") if not buf: ## XXX stream terminated prematurely; this should be an error break t.append(buf) c += len(buf) t = b"".join(t) self.buf = t[size:] return t[:size] class _StreamProxy(object): """Small proxy class that enables transparent compression detection for the Stream interface (mode 'r|*'). """ def __init__(self, fileobj): self.fileobj = fileobj self.buf = self.fileobj.read(BLOCKSIZE) def read(self, size): # pylint: disable=method-hidden self.read = self.fileobj.read return self.buf def getcomptype(self): if self.buf.startswith(GZ_MAGIC_DEFLATE): return "gz" elif self.buf[0:3] == b"BZh" and self.buf[4:10] == b"1AY&SY": return "bz2" elif self.buf.startswith((b"\x5d\x00\x00\x80", b"\xfd7zXZ")): return "xz" else: return "tar" def close(self): self.fileobj.close() # class StreamProxy #------------------------ # Extraction file object #------------------------ class _FileInFile(object): """A thin wrapper around an existing file object that provides a part of its data as an individual file object. """ def __init__(self, fileobj, offset, size, blockinfo=None): self.fileobj = fileobj self.offset = offset self.size = size self.position = 0 self.name = getattr(fileobj, "name", None) self.closed = False if blockinfo is None: blockinfo = [(0, size)] # Construct a map with data and zero blocks. self.map_index = 0 self.map = [] lastpos = 0 realpos = self.offset for offset, size in blockinfo: if offset > lastpos: self.map.append((False, lastpos, offset, None)) self.map.append((True, offset, offset + size, realpos)) realpos += size lastpos = offset + size if lastpos < self.size: self.map.append((False, lastpos, self.size, None)) def flush(self): pass def readable(self): return True def writable(self): return False def seekable(self): return self.fileobj.seekable() def tell(self): """Return the current file position. """ return self.position def seek(self, position, whence=io.SEEK_SET): """Seek to a position in the file. """ if whence == io.SEEK_SET: self.position = min(max(position, 0), self.size) elif whence == io.SEEK_CUR: if position < 0: self.position = max(self.position + position, 0) else: self.position = min(self.position + position, self.size) elif whence == io.SEEK_END: self.position = max(min(self.size + position, self.size), 0) else: raise ValueError("Invalid argument") return self.position def read(self, size=None): """Read data from the file. """ if size is None: size = self.size - self.position else: size = min(size, self.size - self.position) buf = b"" while size > 0: while True: data, start, stop, offset = self.map[self.map_index] if start <= self.position < stop: break else: self.map_index += 1 if self.map_index == len(self.map): self.map_index = 0 length = min(size, stop - self.position) if data: self.fileobj.seek(offset + (self.position - start)) buf += self.fileobj.read(length) else: buf += NUL * length size -= length self.position += length return buf def readinto(self, b): buf = self.read(len(b)) b[:len(buf)] = buf return len(buf) def close(self): self.closed = True #class _FileInFile class ExFileObject(io.BufferedReader): def __init__(self, tarfile, tarinfo): fileobj = _FileInFile(tarfile.fileobj, tarinfo.offset_data, tarinfo.size, tarinfo.sparse) super().__init__(fileobj) #class ExFileObject #------------------ # Exported Classes #------------------ class TarInfo(object): """Informational class which holds the details about an archive member given by a tar header block. TarInfo objects are returned by TarFile.getmember(), TarFile.getmembers() and TarFile.gettarinfo() and are usually created internally. """ __slots__ = ("name", "mode", "uid", "gid", "size", "mtime", "chksum", "type", "linkname", "uname", "gname", "devmajor", "devminor", "volume_offset", "offset", "offset_data", "pax_headers", "sparse", "tarfile", "_sparse_structs", "_link_target") def __init__(self, name=""): """Construct a TarInfo object. name is the optional name of the member. """ self.name = name # member name self.mode = 0o644 # file permissions self.uid = 0 # user id self.gid = 0 # group id self.size = 0 # file size self.mtime = 0 # modification time self.chksum = 0 # header checksum self.type = REGTYPE # member type self.linkname = "" # link name self.uname = "" # user name self.gname = "" # group name self.devmajor = 0 # device major number self.devminor = 0 # device minor number self.offset = 0 # the tar header starts here self.offset_data = 0 # the file's data starts here self.volume_offset = 0 # the file's data corresponds with the data # starting at this position self.sparse = None # sparse member information self.pax_headers = {} # pax header information # In pax headers the "name" and "linkname" field are called # "path" and "linkpath". def _getpath(self): return self.name def _setpath(self, name): self.name = name path = property(_getpath, _setpath) def _getlinkpath(self): return self.linkname def _setlinkpath(self, linkname): self.linkname = linkname linkpath = property(_getlinkpath, _setlinkpath) def __repr__(self): return "<%s %r at %#x>" % (self.__class__.__name__,self.name,id(self)) def get_info(self, encoding=None, errors=None): """Return the TarInfo's attributes as a dictionary. """ info = { "name": self.name, "mode": self.mode & 0o7777, "uid": self.uid, "gid": self.gid, "size": self.size, "mtime": self.mtime, "chksum": self.chksum, "type": self.type, "linkname": self.linkname, "uname": self.uname, "gname": self.gname, "devmajor": self.devmajor, "devminor": self.devminor, "offset_data": self.offset_data, "volume_offset": self.volume_offset } if info["type"] == DIRTYPE and not info["name"].endswith("/"): info["name"] += "/" return info def tobuf(self, format=DEFAULT_FORMAT, encoding=ENCODING, errors="surrogateescape"): """Return a tar header as a string of 512 byte blocks. """ info = self.get_info(encoding, errors) if format == USTAR_FORMAT: return self.create_ustar_header(info, encoding, errors) elif format == GNU_FORMAT: return self.create_gnu_header(info, encoding, errors) elif format == PAX_FORMAT: return self.create_pax_header(info, encoding, errors) else: raise ValueError("invalid format") def create_ustar_header(self, info, encoding, errors): """Return the object as a ustar header block. """ info["magic"] = POSIX_MAGIC if len(info["linkname"]) > LENGTH_LINK: raise ValueError("linkname is too long") if len(info["name"]) > LENGTH_NAME: info["prefix"], info["name"] = self._posix_split_name(info["name"]) return self._create_header(info, USTAR_FORMAT, encoding, errors) def create_gnu_header(self, info, encoding, errors): """Return the object as a GNU header block sequence. """ info["magic"] = GNU_MAGIC if self.ismultivol(): prefix = [ itn(info.get("atime", 0), 12, GNU_FORMAT), itn(info.get("ctime", 0), 12, GNU_FORMAT), itn(self.volume_offset, 12, GNU_FORMAT), itn(0, 119, GNU_FORMAT), # stuff unused in this tar implementation, set to zero ] info['prefix'] = b"".join(prefix) info['size'] = info['size'] - self.volume_offset buf = b"" if len(info["linkname"]) > LENGTH_LINK: buf += self._create_gnu_long_header(info["linkname"], GNUTYPE_LONGLINK, encoding, errors) if len(info["name"]) > LENGTH_NAME: buf += self._create_gnu_long_header(info["name"], GNUTYPE_LONGNAME, encoding, errors) return buf + self._create_header(info, GNU_FORMAT, encoding, errors) def create_pax_header(self, info, encoding, errors): """Return the object as a ustar header block. If it cannot be represented this way, prepend a pax extended header sequence with supplement information. """ info["magic"] = POSIX_MAGIC pax_headers = self.pax_headers.copy() if self.ismultivol(): info['size'] = info['size'] - self.volume_offset # Test string fields for values that exceed the field length or cannot # be represented in ASCII encoding. for name, hname, length in ( ("name", "path", LENGTH_NAME), ("linkname", "linkpath", LENGTH_LINK), ("uname", "uname", 32), ("gname", "gname", 32)): if hname in pax_headers: # The pax header has priority. continue # Try to encode the string as ASCII. try: info[name].encode("ascii", "strict") except UnicodeEncodeError: pax_headers[hname] = info[name] continue if len(info[name]) > length: pax_headers[hname] = info[name] # Test number fields for values that exceed the field limit or values # that like to be stored as float. for name, digits in (("uid", 8), ("gid", 8), ("size", 12), ("mtime", 12)): if name in pax_headers: # The pax header has priority. Avoid overflow. info[name] = 0 continue val = info[name] if not 0 <= val < 8 ** (digits - 1) or isinstance(val, float): pax_headers[name] = str(val) info[name] = 0 # Create a pax extended header if necessary. if pax_headers: buf = self._create_pax_generic_header(pax_headers, XHDTYPE, encoding) else: buf = b"" return buf + self._create_header(info, USTAR_FORMAT, "ascii", "replace") @classmethod def create_pax_global_header(cls, pax_headers): """Return the object as a pax global header block sequence. """ return cls._create_pax_generic_header(pax_headers, XGLTYPE, "utf-8") def _posix_split_name(self, name): """Split a name longer than 100 chars into a prefix and a name part. """ prefix = name[:LENGTH_PREFIX + 1] while prefix and prefix[-1] != "/": prefix = prefix[:-1] name = name[len(prefix):] prefix = prefix[:-1] if not prefix or len(name) > LENGTH_NAME: raise ValueError("name is too long") return prefix, name @staticmethod def _create_header(info, format, encoding, errors): """Return a header block. info is a dictionary with file information, format must be one of the *_FORMAT constants. """ parts = [ stn(info.get("name", ""), 100, encoding, errors), itn(info.get("mode", 0) & 0o7777, 8, format), itn(info.get("uid", 0), 8, format), itn(info.get("gid", 0), 8, format), itn(info.get("size", 0), 12, format), itn(info.get("mtime", 0), 12, format), b" ", # checksum field info.get("type", REGTYPE), stn(info.get("linkname", ""), 100, encoding, errors), info.get("magic", POSIX_MAGIC), stn(info.get("uname", ""), 32, encoding, errors), stn(info.get("gname", ""), 32, encoding, errors), itn(info.get("devmajor", 0), 8, format), itn(info.get("devminor", 0), 8, format), sbtn(info.get("prefix", ""), 155, encoding, errors) ] buf = struct.pack("%ds" % BLOCKSIZE, b"".join(parts)) chksum = calc_chksums(buf[-BLOCKSIZE:])[0] buf = buf[:-364] + bytes("%06o\0" % chksum, "ascii") + buf[-357:] return buf @staticmethod def _create_payload(payload): """Return the string payload filled with zero bytes up to the next 512 byte border. """ blocks, remainder = divmod(len(payload), BLOCKSIZE) if remainder > 0: payload += (BLOCKSIZE - remainder) * NUL return payload @classmethod def _create_gnu_long_header(cls, name, type, encoding, errors): """Return a GNUTYPE_LONGNAME or GNUTYPE_LONGLINK sequence for name. """ name = name.encode(encoding, errors) + NUL info = {} info["name"] = "././@LongLink" info["type"] = type info["size"] = len(name) info["magic"] = GNU_MAGIC # create extended header + name blocks. return cls._create_header(info, USTAR_FORMAT, encoding, errors) + \ cls._create_payload(name) @classmethod def _create_pax_generic_header(cls, pax_headers, type, encoding): """Return a POSIX.1-2008 extended or global header sequence that contains a list of keyword, value pairs. The values must be strings. """ # Check if one of the fields contains surrogate characters and thereby # forces hdrcharset=BINARY, see _proc_pax() for more information. binary = False for keyword, value in pax_headers.items(): try: value.encode("utf-8", "strict") except UnicodeEncodeError: binary = True break records = b"" if binary: # Put the hdrcharset field at the beginning of the header. records += b"21 hdrcharset=BINARY\n" for keyword, value in pax_headers.items(): keyword = keyword.encode("utf-8") if binary: # Try to restore the original byte representation of `value'. # Needless to say, that the encoding must match the string. value = value.encode(encoding, "surrogateescape") else: value = value.encode("utf-8") l = len(keyword) + len(value) + 3 # ' ' + '=' + '\n' n = p = 0 while True: n = l + len(str(p)) if n == p: break p = n records += bytes(str(p), "ascii") + b" " + keyword + b"=" + value + b"\n" # We use a hardcoded "././@PaxHeader" name like star does # instead of the one that POSIX recommends. info = {} info["name"] = "././@PaxHeader" info["type"] = type info["size"] = len(records) info["magic"] = POSIX_MAGIC # Create pax header + record blocks. return cls._create_header(info, USTAR_FORMAT, "ascii", "replace") + \ cls._create_payload(records) @classmethod def frombuf(cls, buf, encoding, errors): """Construct a TarInfo object from a 512 byte bytes object. """ if len(buf) == 0: raise EmptyHeaderError("empty header") if len(buf) != BLOCKSIZE: raise TruncatedHeaderError("truncated header") if buf.count(NUL) == BLOCKSIZE: raise EOFHeaderError("end of file header") chksum = nti(buf[148:156]) if chksum not in calc_chksums(buf): raise InvalidHeaderError("bad checksum") obj = cls() obj.name = nts(buf[0:100], encoding, errors) obj.mode = nti(buf[100:108]) obj.uid = nti(buf[108:116]) obj.gid = nti(buf[116:124]) obj.size = nti(buf[124:136]) obj.mtime = nti(buf[136:148]) obj.chksum = chksum obj.type = buf[156:157] obj.linkname = nts(buf[157:257], encoding, errors) obj.uname = nts(buf[265:297], encoding, errors) obj.gname = nts(buf[297:329], encoding, errors) obj.devmajor = nti(buf[329:337]) obj.devminor = nti(buf[337:345]) prefix = nts(buf[345:500], encoding, errors) # The old GNU sparse format occupies some of the unused # space in the buffer for up to 4 sparse structures. # Save the them for later processing in _proc_sparse(). if obj.type == GNUTYPE_SPARSE: pos = 386 structs = [] for i in range(4): try: offset = nti(buf[pos:pos + 12]) numbytes = nti(buf[pos + 12:pos + 24]) except ValueError: break structs.append((offset, numbytes)) pos += 24 isextended = bool(buf[482]) origsize = nti(buf[483:495]) obj._sparse_structs = (structs, isextended, origsize) # Old V7 tar format represents a directory as a regular # file with a trailing slash. if obj.type == AREGTYPE and obj.name.endswith("/"): obj.type = DIRTYPE # Remove redundant slashes from directories. if obj.isdir(): obj.name = obj.name.rstrip("/") # Reconstruct a ustar longname. if prefix and obj.type not in GNU_TYPES: obj.name = prefix + "/" + obj.name else: obj.offset_data = nti(buf[369:381]) return obj @classmethod def fromtarfile(cls, tarfile): """Return the next TarInfo object from TarFile object tarfile. """ buf = tarfile.fileobj.read(BLOCKSIZE) obj = cls.frombuf(buf, tarfile.encoding, tarfile.errors) obj.offset = tarfile.fileobj.tell() - BLOCKSIZE return obj._proc_member(tarfile) #-------------------------------------------------------------------------- # The following are methods that are called depending on the type of a # member. The entry point is _proc_member() which can be overridden in a # subclass to add custom _proc_*() methods. A _proc_*() method MUST # implement the following # operations: # 1. Set self.offset_data to the position where the data blocks begin, # if there is data that follows. # 2. Set tarfile.offset to the position where the next member's header will # begin. # 3. Return self or another valid TarInfo object. def _proc_member(self, tarfile): """Choose the right processing method depending on the type and call it. """ if self.type in (GNUTYPE_LONGNAME, GNUTYPE_LONGLINK): return self._proc_gnulong(tarfile) elif self.type == GNUTYPE_SPARSE: return self._proc_sparse(tarfile) elif self.type in (XHDTYPE, XGLTYPE, SOLARIS_XHDTYPE): return self._proc_pax(tarfile) else: return self._proc_builtin(tarfile) def _proc_builtin(self, tarfile): """Process a builtin type or an unknown type which will be treated as a regular file. """ self.offset_data = tarfile.fileobj.tell() offset = self.offset_data if self.isreg() or self.ismultivol() or self.type not in SUPPORTED_TYPES: # Skip the following data blocks. offset += self._block(self.size) tarfile.offset = offset # Patch the TarInfo object with saved global # header information. self._apply_pax_info(tarfile.pax_headers, tarfile.encoding, tarfile.errors) return self def _proc_gnulong(self, tarfile): """Process the blocks that hold a GNU longname or longlink member. """ buf = tarfile.fileobj.read(self._block(self.size)) # Fetch the next header and process it. try: next = self.fromtarfile(tarfile) except HeaderError: raise SubsequentHeaderError("missing or bad subsequent header") # Patch the TarInfo object from the next header with # the longname information. next.offset = self.offset if self.type == GNUTYPE_LONGNAME: next.name = nts(buf, tarfile.encoding, tarfile.errors) elif self.type == GNUTYPE_LONGLINK: next.linkname = nts(buf, tarfile.encoding, tarfile.errors) return next def _proc_sparse(self, tarfile): """Process a GNU sparse header plus extra headers. """ # We already collected some sparse structures in frombuf(). structs, isextended, origsize = self._sparse_structs del self._sparse_structs # Collect sparse structures from extended header blocks. while isextended: buf = tarfile.fileobj.read(BLOCKSIZE) pos = 0 for i in range(21): try: offset = nti(buf[pos:pos + 12]) numbytes = nti(buf[pos + 12:pos + 24]) except ValueError: break if offset and numbytes: structs.append((offset, numbytes)) pos += 24 isextended = bool(buf[504]) self.sparse = structs self.offset_data = tarfile.fileobj.tell() tarfile.offset = self.offset_data + self._block(self.size) self.size = origsize return self def _proc_pax(self, tarfile): """Process an extended or global header as described in POSIX.1-2008. """ # Read the header information. buf = tarfile.fileobj.read(self._block(self.size)) # A pax header stores supplemental information for either # the following file (extended) or all following files # (global). if self.type == XGLTYPE: pax_headers = tarfile.pax_headers else: pax_headers = tarfile.pax_headers.copy() # Check if the pax header contains a hdrcharset field. This tells us # the encoding of the path, linkpath, uname and gname fields. Normally, # these fields are UTF-8 encoded but since POSIX.1-2008 tar # implementations are allowed to store them as raw binary strings if # the translation to UTF-8 fails. match = re.search(br"\d+ hdrcharset=([^\n]+)\n", buf) if match is not None: pax_headers["hdrcharset"] = match.group(1).decode("utf-8") # For the time being, we don't care about anything other than "BINARY". # The only other value that is currently allowed by the standard is # "ISO-IR 10646 2000 UTF-8" in other words UTF-8. hdrcharset = pax_headers.get("hdrcharset") if hdrcharset == "BINARY": encoding = tarfile.encoding else: encoding = "utf-8" # Parse pax header information. A record looks like that: # "%d %s=%s\n" % (length, keyword, value). length is the size # of the complete record including the length field itself and # the newline. keyword and value are both UTF-8 encoded strings. regex = re.compile(br"(\d+) ([^=]+)=") pos = 0 while True: match = regex.match(buf, pos) if not match: break length, keyword = match.groups() length = int(length) value = buf[match.end(2) + 1:match.start(1) + length - 1] # Normally, we could just use "utf-8" as the encoding and "strict" # as the error handler, but we better not take the risk. For # example, GNU tar <= 1.23 is known to store filenames it cannot # translate to UTF-8 as raw strings (unfortunately without a # hdrcharset=BINARY header). # We first try the strict standard encoding, and if that fails we # fall back on the user's encoding and error handler. keyword = self._decode_pax_field(keyword, "utf-8", "utf-8", tarfile.errors) if keyword in PAX_NAME_FIELDS: value = self._decode_pax_field(value, encoding, tarfile.encoding, tarfile.errors) else: value = self._decode_pax_field(value, "utf-8", "utf-8", tarfile.errors) pax_headers[keyword] = value pos += length # Fetch the next header. try: next = self.fromtarfile(tarfile) except HeaderError: raise SubsequentHeaderError("missing or bad subsequent header") # Process GNU sparse information. if "GNU.sparse.map" in pax_headers: # GNU extended sparse format version 0.1. self._proc_gnusparse_01(next, pax_headers) elif "GNU.sparse.size" in pax_headers: # GNU extended sparse format version 0.0. self._proc_gnusparse_00(next, pax_headers, buf) elif pax_headers.get("GNU.sparse.major") == "1" and pax_headers.get("GNU.sparse.minor") == "0": # GNU extended sparse format version 1.0. self._proc_gnusparse_10(next, pax_headers, tarfile) if self.type in (XHDTYPE, SOLARIS_XHDTYPE): # Patch the TarInfo object with the extended header info. next._apply_pax_info(pax_headers, tarfile.encoding, tarfile.errors) next.offset = self.offset if "size" in pax_headers: # If the extended header replaces the size field, # we need to recalculate the offset where the next # header starts. offset = next.offset_data if next.isreg() or next.type not in SUPPORTED_TYPES: offset += next._block(next.size) tarfile.offset = offset if next is not None: if "GNU.volume.filename" in pax_headers: if pax_headers["GNU.volume.filename"] == next.name: if "GNU.volume.size" in pax_headers: next.size = int(pax_headers["GNU.volume.size"]) if "GNU.volume.offset" in pax_headers: next.volume_offset = int(pax_headers["GNU.volume.offset"]) for key in pax_headers.keys(): if key.startswith("GNU.volume"): del tarfile.pax_headers[key] return next def _proc_gnusparse_00(self, next, pax_headers, buf): """Process a GNU tar extended sparse header, version 0.0. """ offsets = [] for match in re.finditer(br"\d+ GNU.sparse.offset=(\d+)\n", buf): offsets.append(int(match.group(1))) numbytes = [] for match in re.finditer(br"\d+ GNU.sparse.numbytes=(\d+)\n", buf): numbytes.append(int(match.group(1))) next.sparse = list(zip(offsets, numbytes)) def _proc_gnusparse_01(self, next, pax_headers): """Process a GNU tar extended sparse header, version 0.1. """ sparse = [int(x) for x in pax_headers["GNU.sparse.map"].split(",")] next.sparse = list(zip(sparse[::2], sparse[1::2])) def _proc_gnusparse_10(self, next, pax_headers, tarfile): """Process a GNU tar extended sparse header, version 1.0. """ fields = None sparse = [] buf = tarfile.fileobj.read(BLOCKSIZE) fields, buf = buf.split(b"\n", 1) fields = int(fields) while len(sparse) < fields * 2: if b"\n" not in buf: buf += tarfile.fileobj.read(BLOCKSIZE) number, buf = buf.split(b"\n", 1) sparse.append(int(number)) next.offset_data = tarfile.fileobj.tell() next.sparse = list(zip(sparse[::2], sparse[1::2])) def _apply_pax_info(self, pax_headers, encoding, errors): """Replace fields with supplemental information from a previous pax extended or global header. """ for keyword, value in pax_headers.items(): if keyword == "GNU.sparse.name": setattr(self, "path", value) elif keyword == "GNU.sparse.size": setattr(self, "size", int(value)) elif keyword == "GNU.sparse.realsize": setattr(self, "size", int(value)) elif keyword in PAX_FIELDS: if keyword in PAX_NUMBER_FIELDS: try: value = PAX_NUMBER_FIELDS[keyword](value) except ValueError: value = 0 if keyword == "path": value = value.rstrip("/") # pylint: disable=no-member setattr(self, keyword, value) self.pax_headers = pax_headers.copy() def _decode_pax_field(self, value, encoding, fallback_encoding, fallback_errors): """Decode a single field from a pax record. """ try: return value.decode(encoding, "strict") except UnicodeDecodeError: return value.decode(fallback_encoding, fallback_errors) def _block(self, count): """Round up a byte count by BLOCKSIZE and return it, e.g. _block(834) => 1024. """ blocks, remainder = divmod(count, BLOCKSIZE) if remainder: blocks += 1 return blocks * BLOCKSIZE def isreg(self): return self.type in REGULAR_TYPES def isfile(self): return self.isreg() def isdir(self): return self.type == DIRTYPE def issym(self): return self.type == SYMTYPE def islnk(self): return self.type == LNKTYPE def ischr(self): return self.type == CHRTYPE def isblk(self): return self.type == BLKTYPE def isfifo(self): return self.type == FIFOTYPE def issparse(self): return self.sparse is not None def isdev(self): return self.type in (CHRTYPE, BLKTYPE, FIFOTYPE) def ismultivol(self): return self.type == GNUTYPE_MULTIVOL or self.volume_offset > 0 or\ "GNU.volume.offset" in self.pax_headers # class TarInfo class TarFile(object): """The TarFile Class provides an interface to tar archives. """ debug = 0 # May be set from 0 (no msgs) to 3 (all msgs) dereference = False # If true, add content of linked file to the # tar file, else the link. ignore_zeros = False # If true, skips empty or invalid blocks and # continues processing. max_volume_size = None # If different from None, establishes maximum # size of tar volumes new_volume_handler = None # function handler to be executed before when # a new volume is needed volume_number = 0 # current volume number, used for multi volume # support errorlevel = 1 # If 0, fatal errors only appear in debug # messages (if debug >= 0). If > 0, errors # are passed to the caller as exceptions. format = DEFAULT_FORMAT # The format to use when creating an archive. encoding = ENCODING # Encoding for 8-bit character strings. errors = None # Error handler for unicode conversion. tarinfo = TarInfo # The default TarInfo class to use. fileobject = ExFileObject # The file-object for extractfile(). arcmode = ARCMODE_PLAIN # Object processing mode (“concat”, encryption, # compression) save_to_members = True # If new members are saved. This can be disabled # if you manage lots of files and don't want # to have high memory usage cache_uid2user = {} # cache to avoid getpwuid calls. It always parses /etc/passwd. cache_gid2group = {} # same cache for groups def __init__(self, name=None, mode="r", fileobj=None, format=None, tarinfo=None, dereference=None, ignore_zeros=None, encoding=None, errors="surrogateescape", pax_headers=None, debug=None, errorlevel=None, max_volume_size=None, new_volume_handler=None, concat=False, nacl=None, save_to_members=True): """Open an (uncompressed) tar archive `name'. `mode' is either 'r' to read from an existing archive, 'a' to append data to an existing file or 'w' to create a new file overwriting an existing one. `mode' defaults to 'r'. If `fileobj' is given, it is used for reading or writing data. If it can be determined, `mode' is overridden by `fileobj's mode. `fileobj' is not closed, when TarFile is closed. """ if len(mode) > 1 or mode not in "raw": raise ValueError("mode must be 'r', 'a' or 'w'") self.mode = mode self.arcmode = arcmode_set (concat) self.nacl = nacl self._mode = {"r": "rb", "a": "r+b", "w": "wb"}[mode] if not fileobj: if self.mode == "a" and not os.path.exists(name): # Create nonexistent files in append mode. self.mode = "w" self._mode = "wb" fileobj = bltn_open(name, self._mode) self._extfileobj = False else: if name is None and hasattr(fileobj, "name"): name = fileobj.name # when fileobj is a gzip.GzipFile, fileobj.mode is an int (not valid for us) if hasattr(fileobj, "mode") and isinstance(fileobj.mode, str): self._mode = fileobj.mode self._extfileobj = True self.name = os.path.abspath(name) if name else None self.base_name = self.name = os.path.abspath(name) if name else None self.fileobj = fileobj # Init attributes. if format is not None: self.format = format if tarinfo is not None: self.tarinfo = tarinfo if dereference is not None: self.dereference = dereference if ignore_zeros is not None: self.ignore_zeros = ignore_zeros if encoding is not None: self.encoding = encoding self.errors = errors if pax_headers is not None and self.format == PAX_FORMAT: self.pax_headers = pax_headers else: self.pax_headers = {} if debug is not None: self.debug = debug if errorlevel is not None: self.errorlevel = errorlevel # Init datastructures. if max_volume_size and max_volume_size < 3*BLOCKSIZE: raise ValueError("max_volume_size needs to be at least %d" % (3*BLOCKSIZE)) if max_volume_size and not callable(new_volume_handler): raise ValueError("new_volume_handler needs to be set and be callable for multivolume support") if max_volume_size: self.max_volume_size = int(max_volume_size) else: self.max_volume_size = None self.save_to_members = save_to_members self.new_volume_handler = new_volume_handler self.closed = False self.members = [] # list of members as TarInfo objects self._loaded = False # flag if all members have been read self.offset = self.fileobj.tell() # current position in the archive file self.inodes = {} # dictionary caching the inodes of # archive members already added try: if self.mode == "r": self.firstmember = None self.firstmember = self.next() if self.mode == "a": # Move to the end of the archive, # before the first empty block. while True: self.fileobj.seek(self.offset) try: tarinfo = self.tarinfo.fromtarfile(self) self.members.append(tarinfo) except EOFHeaderError: self.fileobj.seek(self.offset) break except HeaderError as e: raise ReadError(str(e)) if self.mode in "aw": self._loaded = True if self.pax_headers: buf = self.tarinfo.create_pax_global_header(self.pax_headers.copy()) self.fileobj.write(buf) self.offset += len(buf) except: if not self._extfileobj: self.fileobj.close() self.closed = True raise #-------------------------------------------------------------------------- # Below are the classmethods which act as alternate constructors to the # TarFile class. The open() method is the only one that is needed for # public use; it is the "super"-constructor and is able to select an # adequate "sub"-constructor for a particular compression using the mapping # from OPEN_METH. # # This concept allows one to subclass TarFile without losing the comfort of # the super-constructor. A sub-constructor is registered and made available # by adding it to the mapping in OPEN_METH. @classmethod def open(cls, name=None, mode="r", fileobj=None, bufsize=RECORDSIZE, encryption=None, compresslevel=9, tolerance=TOLERANCE_STRICT, **kwargs): """Open a tar archive for reading, writing or appending. Return an appropriate TarFile class. mode: 'r' or 'r:*' open for reading with transparent compression 'r:' open for reading exclusively uncompressed 'r:gz' open for reading with gzip compression 'r:bz2' open for reading with bzip2 compression 'r:xz' open for reading with lzma compression 'a' or 'a:' open for appending, creating the file if necessary 'w' or 'w:' open for writing without compression 'w:gz' open for writing with gzip compression 'w:bz2' open for writing with bzip2 compression 'w:xz' open for writing with lzma compression 'r|*' open a stream of tar blocks with transparent compression 'r|' open an uncompressed stream of tar blocks for reading 'r|gz' open a gzip compressed stream of tar blocks 'r|bz2' open a bzip2 compressed stream of tar blocks 'r|xz' open an lzma compressed stream of tar blocks 'w|' open an uncompressed stream for writing 'w|gz' open a gzip compressed stream for writing 'w|bz2' open a bzip2 compressed stream for writing 'w|xz' open an lzma compressed stream for writing 'r#gz' open a stream of gzip compressed tar blocks for reading 'w#gz' open a stream of gzip compressed tar blocks for writing """ if not name and not fileobj: raise ValueError("nothing to open") if mode in ("r", "r:*"): # Find out which *open() is appropriate for opening the file. for comptype in cls.OPEN_METH: func = getattr(cls, cls.OPEN_METH[comptype]) if fileobj is not None: saved_pos = fileobj.tell() try: return func(name, "r", fileobj, **kwargs) except (ReadError, CompressionError) as e: # usually nothing exceptional but sometimes is if fileobj is not None: fileobj.seek(saved_pos) continue raise ReadError("file could not be opened successfully") elif ":" in mode: filemode, comptype = mode.split(":", 1) filemode = filemode or "r" comptype = comptype or "tar" # Select the *open() function according to # given compression. if comptype in cls.OPEN_METH: func = getattr(cls, cls.OPEN_METH[comptype]) else: raise CompressionError("unknown compression type %r" % comptype) # Pass on compression level for gzip / bzip2. if comptype == 'gz' or comptype == 'bz2': kwargs['compresslevel'] = compresslevel if 'max_volume_size' in kwargs: if comptype != 'tar' and filemode in 'wa' \ and kwargs['max_volume_size']: import warnings warnings.warn('Only the first volume will be compressed ' 'for modes with "w:"!') return func(name, filemode, fileobj, **kwargs) elif "|" in mode: filemode, comptype = mode.split("|", 1) filemode = filemode or "r" comptype = comptype or "tar" if filemode not in "rw": raise ValueError("mode must be 'r' or 'w'") t = cls(name, filemode, _Stream(name, filemode, comptype, fileobj, bufsize, compresslevel=compresslevel), **kwargs) t._extfileobj = False return t elif "#" in mode: filemode, comptype = mode.split("#", 1) filemode = filemode or "r" if filemode not in "rw": raise ValueError ("mode %s not compatible with concat " "archive; must be 'r' or 'w'" % mode) stream = _Stream(name, filemode, comptype, fileobj, bufsize, concat=True, encryption=encryption, compresslevel=compresslevel, tolerance=tolerance) kwargs ["concat"] = True try: t = cls(name, filemode, stream, **kwargs) except: # XXX except what? stream.close() raise # XXX raise what? t._extfileobj = False return t elif mode in "aw": return cls.taropen(name, mode, fileobj, **kwargs) raise ValueError("undiscernible mode %r" % mode) @classmethod def open_at_offset(cls, offset, *a, **kwa): """ Same as ``.open()``, but start reading at the given offset. Assumes a seekable file object. Returns *None* if opening failed due to a read problem. """ fileobj = kwa.get ("fileobj") if fileobj is not None: fileobj.seek (offset) return cls.open (*a, **kwa) @classmethod def taropen(cls, name, mode="r", fileobj=None, **kwargs): """Open uncompressed tar archive name for reading or writing. """ if len(mode) > 1 or mode not in "raw": raise ValueError("mode must be 'r', 'a' or 'w'") return cls(name, mode, fileobj, **kwargs) @classmethod def gzopen(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs): """Open gzip compressed tar archive name for reading or writing. Appending is not allowed. """ if len(mode) > 1 or mode not in "rw": raise ValueError("mode must be 'r' or 'w'") try: import gzip gzip.GzipFile except (ImportError, AttributeError): raise CompressionError("gzip module is not available") extfileobj = fileobj is not None try: fileobj = gzip.GzipFile(name, mode + "b", compresslevel, fileobj) t = cls.taropen(name, mode, fileobj, **kwargs) except OSError: if not extfileobj and fileobj is not None: fileobj.close() if fileobj is None: raise raise ReadError("not a gzip file") except: if not extfileobj and fileobj is not None: fileobj.close() raise t._extfileobj = extfileobj return t @classmethod def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs): """Open bzip2 compressed tar archive name for reading or writing. Appending is not allowed. """ if len(mode) > 1 or mode not in "rw": raise ValueError("mode must be 'r' or 'w'.") try: import bz2 except ImportError: raise CompressionError("bz2 module is not available") fileobj = bz2.BZ2File(fileobj or name, mode, compresslevel=compresslevel) try: t = cls.taropen(name, mode, fileobj, **kwargs) except (OSError, EOFError): fileobj.close() raise ReadError("not a bzip2 file") t._extfileobj = False return t @classmethod def xzopen(cls, name, mode="r", fileobj=None, preset=None, **kwargs): """Open lzma compressed tar archive name for reading or writing. Appending is not allowed. """ if mode not in ("r", "w"): raise ValueError("mode must be 'r' or 'w'") try: import lzma except ImportError: raise CompressionError("lzma module is not available") fileobj = lzma.LZMAFile(fileobj or name, mode, preset=preset) try: t = cls.taropen(name, mode, fileobj, **kwargs) except (lzma.LZMAError, EOFError): fileobj.close() raise ReadError("not an lzma file") t._extfileobj = False return t # All *open() methods are registered here. OPEN_METH = { "tar": "taropen", # uncompressed tar "gz": "gzopen", # gzip compressed tar "bz2": "bz2open", # bzip2 compressed tar "xz": "xzopen" # lzma compressed tar } #-------------------------------------------------------------------------- # The public methods which TarFile provides: def close(self): """Close the TarFile. In write-mode, two finishing zero blocks are appended to the archive. A special case are empty archives which are initialized accordingly so the two mandatory blocks of zeros are written abiding by the requested encryption and compression settings. """ if self.closed: return if self.mode in "aw": if self.arcmode & ARCMODE_CONCAT and self.fileobj.tell () == 0: self.fileobj.next ("") self.fileobj.write(NUL * (BLOCKSIZE * 2)) self.offset += (BLOCKSIZE * 2) # fill up the end with zero-blocks # (like option -b20 for tar does) blocks, remainder = divmod(self.offset, RECORDSIZE) if remainder > 0: self.fileobj.write(NUL * (RECORDSIZE - remainder)) if not self._extfileobj: self.fileobj.close() self.closed = True def getmember(self, name): """Return a TarInfo object for member `name'. If `name' can not be found in the archive, KeyError is raised. If a member occurs more than once in the archive, its last occurrence is assumed to be the most up-to-date version. """ tarinfo = self._getmember(name) if tarinfo is None: raise KeyError("filename %r not found" % name) return tarinfo def getmembers(self): """Return the members of the archive as a list of TarInfo objects. The list has the same order as the members in the archive. """ self._check() if not self._loaded: # if we want to obtain a list of self._load() # all members, we first have to # scan the whole archive. return self.members def get_last_member_offset(self): """Return the last member offset. Usually this is self.fileobj.tell(), but when there's encryption or concat compression going on it's more complicated than that. """ return self.last_block_offset def getnames(self): """Return the members of the archive as a list of their names. It has the same order as the list returned by getmembers(). """ return [tarinfo.name for tarinfo in self.getmembers()] def gettarinfo(self, name=None, arcname=None, fileobj=None): """Create a TarInfo object for either the file `name' or the file object `fileobj' (using os.fstat on its file descriptor). You can modify some of the TarInfo's attributes before you add it using addfile(). If given, `arcname' specifies an alternative name for the file in the archive. """ self._check("aw") # When fileobj is given, replace name by # fileobj's real name. if fileobj is not None: name = fileobj.name # Building the name of the member in the archive. # Backward slashes are converted to forward slashes, # Absolute paths are turned to relative paths. if arcname is None: arcname = name drv, arcname = os.path.splitdrive(arcname) arcname = arcname.replace(os.sep, "/") arcname = arcname.lstrip("/") # Now, fill the TarInfo object with # information specific for the file. tarinfo = self.tarinfo() tarinfo.tarfile = self # Use os.stat or os.lstat, depending on platform # and if symlinks shall be resolved. if fileobj is None: if hasattr(os, "lstat") and not self.dereference: statres = os.lstat(name) else: statres = os.stat(name) else: statres = os.fstat(fileobj.fileno()) linkname = "" stmd = statres.st_mode if stat.S_ISREG(stmd): inode = (statres.st_ino, statres.st_dev) if not self.dereference and statres.st_nlink > 1 and \ inode in self.inodes and arcname != self.inodes[inode]: # Is it a hardlink to an already # archived file? type = LNKTYPE linkname = self.inodes[inode] else: # The inode is added only if its valid. # For win32 it is always 0. type = REGTYPE if inode[0] and self.save_to_members: self.inodes[inode] = arcname elif stat.S_ISDIR(stmd): type = DIRTYPE elif stat.S_ISFIFO(stmd): type = FIFOTYPE elif stat.S_ISLNK(stmd): type = SYMTYPE linkname = os.readlink(name) elif stat.S_ISCHR(stmd): type = CHRTYPE elif stat.S_ISBLK(stmd): type = BLKTYPE else: return None # Fill the TarInfo object with all # information we can get. tarinfo.name = arcname tarinfo.mode = stmd tarinfo.uid = statres.st_uid tarinfo.gid = statres.st_gid if type == REGTYPE: tarinfo.size = statres.st_size else: tarinfo.size = 0 tarinfo.mtime = statres.st_mtime tarinfo.type = type tarinfo.linkname = linkname if pwd: if tarinfo.uid in self.cache_uid2user: tarinfo.uname = self.cache_uid2user[tarinfo.uid] else: try: tarinfo.uname = pwd.getpwuid(tarinfo.uid)[0] self.cache_uid2user[tarinfo.uid] = tarinfo.uname except KeyError: # remember user does not exist: # same default value as in tarinfo class self.cache_uid2user[tarinfo.uid] = "" if grp: if tarinfo.gid in self.cache_gid2group: tarinfo.gname = self.cache_gid2group[tarinfo.gid] else: try: tarinfo.gname = grp.getgrgid(tarinfo.gid)[0] self.cache_gid2group[tarinfo.gid] = tarinfo.gname except KeyError: # remember group does not exist: # same default value as in tarinfo class self.cache_gid2group[tarinfo.gid] = "" if type in (CHRTYPE, BLKTYPE): if hasattr(os, "major") and hasattr(os, "minor"): tarinfo.devmajor = os.major(statres.st_rdev) tarinfo.devminor = os.minor(statres.st_rdev) return tarinfo def list(self, verbose=True): """Print a table of contents to sys.stdout. If `verbose' is False, only the names of the members are printed. If it is True, an `ls -l'-like output is produced. """ self._check() for tarinfo in self: if verbose: print(stat.filemode(tarinfo.mode), end=' ') print("%s/%s" % (tarinfo.uname or tarinfo.uid, tarinfo.gname or tarinfo.gid), end=' ') if tarinfo.ischr() or tarinfo.isblk(): print("%10s" % ("%d,%d" \ % (tarinfo.devmajor, tarinfo.devminor)), end=' ') else: print("%10d" % tarinfo.size, end=' ') print("%d-%02d-%02d %02d:%02d:%02d" \ % time.localtime(tarinfo.mtime)[:6], end=' ') print(tarinfo.name + ("/" if tarinfo.isdir() else ""), end=' ') if verbose: if tarinfo.issym(): print("->", tarinfo.linkname, end=' ') if tarinfo.islnk(): print("link to", tarinfo.linkname, end=' ') print() def add(self, name, arcname=None, recursive=True, exclude=None, *, filter=None): """Add the file `name' to the archive. `name' may be any type of file (directory, fifo, symbolic link, etc.). If given, `arcname' specifies an alternative name for the file in the archive. Directories are added recursively by default. This can be avoided by setting `recursive' to False. `exclude' is a function that should return True for each filename to be excluded. `filter' is a function that expects a TarInfo object argument and returns the changed TarInfo object, if it returns None the TarInfo object will be excluded from the archive. """ self._check("aw") if arcname is None: arcname = name # Exclude pathnames. if exclude is not None: import warnings warnings.warn("use the filter argument instead", DeprecationWarning, 2) if exclude(name): self._dbg(2, "tarfile: Excluded %r" % name) return # Skip if somebody tries to archive the archive... if self.name is not None and os.path.abspath(name) == self.name: self._dbg(2, "tarfile: Skipped %r" % name) return self._dbg(1, name) # Create a TarInfo object from the file. tarinfo = self.gettarinfo(name, arcname) if tarinfo is None: self._dbg(1, "tarfile: Unsupported type %r" % name) return # Change or exclude the TarInfo object. if filter is not None: tarinfo = filter(tarinfo) if tarinfo is None: self._dbg(2, "tarfile: Excluded %r" % name) return # Append the tar header and data to the archive. if tarinfo.isreg(): with bltn_open(name, "rb") as f: self.addfile(tarinfo, f) elif tarinfo.isdir(): self.addfile(tarinfo) if recursive: for f in os.listdir(name): self.add(os.path.join(name, f), os.path.join(arcname, f), recursive, exclude, filter=filter) else: self.addfile(tarinfo) def _size_left_file(self): """Calculates size left in a volume with a maximum volume size. Assumes self.max_volume_size is set. If using compression through a _Stream, use _size_left_stream instead """ # left-over size = max_size - offset - 2 zero-blocks written in close size_left = self.max_volume_size - 2*BLOCKSIZE - self.offset # limit size left to a discrete number of blocks, because we won't # write only half a block when writting the end of a volume # and filling with zeros return BLOCKSIZE * (size_left // BLOCKSIZE) def _size_left_stream(self): """ Calculates size left in a volume if using comression/encryption Assumes self.max_volume_size is set and self.fileobj is a _Stream (otherwise use _size_left_file) """ # left-over size = max_size - bytes written - 2 zero-blocks (close) size_left = self.max_volume_size - self.fileobj.estim_file_size() \ - 2*BLOCKSIZE return BLOCKSIZE * (size_left // BLOCKSIZE) def addfile(self, tarinfo, fileobj=None): """Add the TarInfo object `tarinfo' to the archive. If `fileobj' is given, tarinfo.size bytes are read from it and added to the archive. You can create TarInfo objects using gettarinfo(). On Windows platforms, `fileobj' should always be opened with mode 'rb' to avoid irritation about the file size. """ self._check("aw") tarinfo = copy.copy(tarinfo) if self.arcmode & ARCMODE_CONCAT: self.last_block_offset = self.fileobj.next (tarinfo.name) else: self.last_block_offset = self.fileobj.tell() buf = tarinfo.tobuf(self.format, self.encoding, self.errors) self.fileobj.write(buf) self.offset += len(buf) if self.max_volume_size: if isinstance(self.fileobj, _Stream): _size_left = self._size_left_stream else: _size_left = self._size_left_file else: _size_left = lambda: tarinfo.size # If there's no data to follow, finish if not fileobj: if self.save_to_members: self.members.append(tarinfo) return target_size_left = _size_left() source_size_left = tarinfo.size assert tarinfo.volume_offset == 0 # we only split volumes in the middle of a file, that means we have # to write at least one block if target_size_left < BLOCKSIZE: target_size_left = BLOCKSIZE # loop over multiple volumes while source_size_left > 0: # Write as much data as possble from source into target. # When compressing data, we cannot easily predict how much data we # can write until target_size_left == 0 --> need to iterate size_can_write = min(target_size_left, source_size_left) while size_can_write > 0: copyfileobj(fileobj, self.fileobj, size_can_write) self.offset += size_can_write source_size_left -= size_can_write target_size_left = _size_left() size_can_write = min(target_size_left, source_size_left) # now target_size_left == 0 or source_size_left == 0 # if there is data left to write, we need to create a new volume if source_size_left > 0: # Only finalize the crypto entry here if we’re continuing with # another one; otherwise, the encryption must include the block # padding below. tarinfo.type = GNUTYPE_MULTIVOL if not self.new_volume_handler or\ not callable(self.new_volume_handler): raise Exception("We need to create a new volume and you " "didn't supply a new_volume_handler") # the new volume handler should do everything needed to # start working in a new volume. usually, the handler calls # to self.open_volume self.volume_number += 1 # set to be used by open_volume, because in the case of a PAX # tar it needs to write information about the volume and offset # in the global header tarinfo.volume_offset = tarinfo.size - source_size_left self.volume_tarinfo = tarinfo # the “new_volume_handler” is supposed to call .close() on the # “fileobj” _Stream self.new_volume_handler(self, self.base_name, self.volume_number) self.volume_tarinfo = None if self.arcmode & ARCMODE_CONCAT: self.fileobj.next_volume (tarinfo.name) # write new volume header buf = tarinfo.tobuf(self.format, self.encoding, self.errors) self.fileobj.write(buf) self.offset += len(buf) # adjust variables; open_volume should have reset self.offset # --> _size_left should be big again target_size_left = _size_left() size_can_write = min(target_size_left, source_size_left) self._dbg(3, 'new volume') # now, all data has been written. We may have to fill up the rest of # the block in target with 0s remainder = (tarinfo.size - tarinfo.volume_offset) % BLOCKSIZE if remainder > 0: self.fileobj.write(NUL * (BLOCKSIZE - remainder)) self.offset += BLOCKSIZE - remainder if self.save_to_members: self.members.append(tarinfo) def open_volume(self, name="", fileobj=None, encryption=None): ''' Called by the user to change this tar file to point to a new volume. ''' # open the file using either fileobj or name if not fileobj: if self.mode == "a" and not os.path.exists(name): # Create nonexistent files in append mode. self.mode = "w" self._mode = "wb" self._extfileobj = False if isinstance(self.fileobj, _Stream): self._dbg(3, 'open_volume: create a _Stream') fileobj = _Stream(name=name, mode=self.fileobj.mode, comptype=self.fileobj.comptype, fileobj=None, bufsize=self.fileobj.bufsize, encryption=encryption or self.fileobj.encryption, concat=self.fileobj.arcmode & ARCMODE_CONCAT, tolerance=self.fileobj.tolerance) else: # here, we lose information about compression/encryption! self._dbg(3, 'open_volume: builtin open') fileobj = bltn_open(name, self._mode) else: if name is None and hasattr(fileobj, "name"): name = fileobj.name if hasattr(fileobj, "mode"): self._mode = fileobj.mode self._extfileobj = True self._dbg(3, 'open_volume: using external fileobj {}', fileobj) self.name = os.path.abspath(name) if name else None self.fileobj.close() self.fileobj = fileobj # init data structures self.closed = False self.members = [] # list of members as TarInfo objects self._loaded = False # flag if all members have been read self.offset = self.fileobj.tell() # current position in the archive file self.inodes = {} # dictionary caching the inodes of # archive members already added try: if self.mode == "r": self.firstmember = None self.firstmember = self.next() if self.mode == "a": # Move to the end of the archive, # before the first empty block. while True: self.fileobj.seek(self.offset) try: tarinfo = self.tarinfo.fromtarfile(self) self.members.append(tarinfo) except EOFHeaderError: self.fileobj.seek(self.offset) break except HeaderError as e: raise ReadError(str(e)) if self.mode in "aw": self._loaded = True if self.format == PAX_FORMAT: volume_info = { "GNU.volume.filename": str(self.volume_tarinfo.name), "GNU.volume.size": str(self.volume_tarinfo.size - self.volume_tarinfo.volume_offset), "GNU.volume.offset": str(self.volume_tarinfo.volume_offset), } self.pax_headers.update(volume_info) if isinstance(self.fileobj, _Stream): self.fileobj._init_write_gz () buf = self.tarinfo.create_pax_global_header(volume_info.copy()) self.fileobj.write(buf) self.offset += len(buf) except Exception as exn: if not self._extfileobj: self.fileobj.close() self.closed = True raise def extractall(self, path=".", members=None, filter=None, unlink=False): """Extract all members from the archive to the current working directory and set owner, modification time and permissions on directories afterwards. `path' specifies a different directory to extract to. `members' is optional and must be a subset of the list returned by getmembers(). """ directories = [] if members is None: members = self for tarinfo in members: if self.volume_number > 0 and tarinfo.ismultivol(): continue if filter and not filter(tarinfo): continue if tarinfo.isdir(): # Extract directories with a safe mode. directories.append(tarinfo) tarinfo = copy.copy(tarinfo) tarinfo.mode = 0o0700 # Do not set_attrs directories, as we will do that further down self.extract(tarinfo, path, set_attrs=not tarinfo.isdir(), unlink=unlink) # Reverse sort directories. directories.sort(key=lambda a: a.name) directories.reverse() # Set correct owner, mtime and filemode on directories. for tarinfo in directories: dirpath = os.path.join(path, tarinfo.name) try: self.chown(tarinfo, dirpath) self.utime(tarinfo, dirpath) self.chmod(tarinfo, dirpath) except ExtractError as e: if self.errorlevel > 1: raise else: self._dbg(1, "tarfile: %s" % e) def extract(self, member, path="", set_attrs=True, symlink_cb=None, unlink=False): """Extract a member from the archive to the current working directory, using its full name. Its file information is extracted as accurately as possible. `member' may be a filename or a TarInfo object. You can specify a different directory using `path'. File attributes (owner, mtime, mode) are set unless `set_attrs' is False. ``symlink_cb`` is a hook accepting a function that is passed the ``member``, ``path``, and ``set_attrs`` arguments if the tarinfo for ``member`` indicates a symlink in which case only the callback passed will be applied, skipping the actual extraction. In case the callback is invoked, its return value is passed on to the caller. """ self._check("r") if isinstance(member, str): tarinfo = self.getmember(member) else: tarinfo = member # Prepare the link target for makelink(). if tarinfo.islnk(): tarinfo._link_target = os.path.join(path, tarinfo.linkname) if symlink_cb is not None and tarinfo.issym(): return symlink_cb(member, path, set_attrs) try: self._extract_member(tarinfo, os.path.join(path, tarinfo.name), set_attrs=set_attrs, unlink=unlink) except EnvironmentError as e: if self.errorlevel > 0: raise else: if e.filename is None: self._dbg(1, "tarfile: %s" % e.strerror) else: self._dbg(1, "tarfile: %s %r" % (e.strerror, e.filename)) except ExtractError as e: if self.errorlevel > 1: raise else: self._dbg(1, "tarfile: %s" % e) def extractfile(self, member): """Extract a member from the archive as a file object. `member' may be a filename or a TarInfo object. If `member' is a regular file or a link, an io.BufferedReader object is returned. Otherwise, None is returned. """ self._check("r") if isinstance(member, str): tarinfo = self.getmember(member) else: tarinfo = member if tarinfo.isreg() or tarinfo.ismultivol() or\ tarinfo.type not in SUPPORTED_TYPES: # If a member's type is unknown, it is treated as a # regular file. return self.fileobject(self, tarinfo) elif tarinfo.islnk() or tarinfo.issym(): if isinstance(self.fileobj, _Stream): # A small but ugly workaround for the case that someone tries # to extract a (sym)link as a file-object from a non-seekable # stream of tar blocks. raise StreamError("cannot extract (sym)link as file object") else: # A (sym)link's file object is its target's file object. return self.extractfile(self._find_link_target(tarinfo)) else: # If there's no data associated with the member (directory, chrdev, # blkdev, etc.), return None instead of a file object. return None def _extract_member(self, tarinfo, targetpath, set_attrs=True, unlink=False): """Extract the TarInfo object tarinfo to a physical file called targetpath. """ # Fetch the TarInfo object for the given name # and build the destination pathname, replacing # forward slashes to platform specific separators. targetpath = targetpath.rstrip("/") targetpath = targetpath.replace("/", os.sep) # Create all upper directories. upperdirs = os.path.dirname(targetpath) if upperdirs and not os.path.exists(upperdirs): # Create directories that are not part of the archive with # default permissions. os.makedirs(upperdirs) if tarinfo.islnk() or tarinfo.issym(): self._dbg(1, "%s -> %s" % (tarinfo.name, tarinfo.linkname)) else: self._dbg(1, tarinfo.name) if unlink is True: _unlinkfirst(targetpath) if tarinfo.isreg(): self.makefile(tarinfo, targetpath) elif tarinfo.isdir(): self.makedir(tarinfo, targetpath) elif tarinfo.isfifo(): self.makefifo(tarinfo, targetpath) elif tarinfo.ischr() or tarinfo.isblk(): self.makedev(tarinfo, targetpath) elif tarinfo.islnk() or tarinfo.issym(): self.makelink(tarinfo, targetpath) elif tarinfo.type not in SUPPORTED_TYPES: self.makeunknown(tarinfo, targetpath) else: self.makefile(tarinfo, targetpath) if set_attrs: self.chown(tarinfo, targetpath) if not tarinfo.issym(): self.chmod(tarinfo, targetpath) self.utime(tarinfo, targetpath) #-------------------------------------------------------------------------- # Below are the different file methods. They are called via # _extract_member() when extract() is called. They can be replaced in a # subclass to implement other functionality. def makedir(self, tarinfo, targetpath): """Make a directory called targetpath. """ try: # Use a safe mode for the directory, the real mode is set # later in _extract_member(). os.mkdir(targetpath, 0o0700) except FileExistsError: pass def makefile(self, tarinfo, targetpath): """Make a file called targetpath. """ source = self.fileobj source.seek(tarinfo.offset_data) decrypt = False iterate = True target = bltn_open(targetpath, "wb") if tarinfo.sparse is not None: try: for offset, size in tarinfo.sparse: target.seek(offset) copyfileobj(source, target, size) target.seek(tarinfo.size) target.truncate() finally: target.close() return while iterate: iterate = False try: copyfileobj(source, target, tarinfo.size) except OSError: source.close() # only if we are extracting a multivolume this can be treated if not self.new_volume_handler: raise Exception("We need to read a new volume and you" " didn't supply a new_volume_handler") # the new volume handler should do everything needed to # start working in a new volume. usually, the handler calls # to self.open_volume self.volume_number += 1 self.new_volume_handler(self, self.base_name, self.volume_number) tarinfo = self.firstmember source = self.fileobj iterate = True finally: if iterate is False: target.close() def makeunknown(self, tarinfo, targetpath): """Make a file from a TarInfo object with an unknown type at targetpath. """ self.makefile(tarinfo, targetpath) self._dbg(1, "tarfile: Unknown file type %r, " \ "extracted as regular file." % tarinfo.type) def makefifo(self, tarinfo, targetpath): """Make a fifo called targetpath. """ if hasattr(os, "mkfifo"): os.mkfifo(targetpath) else: raise ExtractError("fifo not supported by system") def makedev(self, tarinfo, targetpath): """Make a character or block device called targetpath. """ if not hasattr(os, "mknod") or not hasattr(os, "makedev"): raise ExtractError("special devices not supported by system") mode = tarinfo.mode if tarinfo.isblk(): mode |= stat.S_IFBLK else: mode |= stat.S_IFCHR os.mknod(targetpath, mode, os.makedev(tarinfo.devmajor, tarinfo.devminor)) def makelink(self, tarinfo, targetpath): """Make a (symbolic) link called targetpath. If it cannot be created (platform limitation), we try to make a copy of the referenced file instead of a link. """ try: # For systems that support symbolic and hard links. if tarinfo.issym(): os.symlink(tarinfo.linkname, targetpath) else: # See extract(). if os.path.exists(tarinfo._link_target): os.link(tarinfo._link_target, targetpath) else: self._extract_member(self._find_link_target(tarinfo), targetpath) except symlink_exception: try: self._extract_member(self._find_link_target(tarinfo), targetpath) except KeyError: raise ExtractError("unable to resolve link inside archive") def chown(self, tarinfo, targetpath): """Set owner of targetpath according to tarinfo. """ if pwd and hasattr(os, "geteuid") and os.geteuid() == 0: # We have to be root to do so. try: g = grp.getgrnam(tarinfo.gname)[2] except KeyError: g = tarinfo.gid try: u = pwd.getpwnam(tarinfo.uname)[2] except KeyError: u = tarinfo.uid try: if tarinfo.issym() and hasattr(os, "lchown"): os.lchown(targetpath, u, g) else: os.chown(targetpath, u, g) except OSError as e: raise ExtractError("could not change owner") def chmod(self, tarinfo, targetpath): """Set file permissions of targetpath according to tarinfo. """ if hasattr(os, 'chmod'): try: os.chmod(targetpath, tarinfo.mode) except OSError as e: raise ExtractError("could not change mode") def utime(self, tarinfo, targetpath): """Set modification time of targetpath according to tarinfo. """ if not hasattr(os, 'utime'): return try: os.utime(targetpath, (tarinfo.mtime, tarinfo.mtime)) except OSError as e: raise ExtractError("could not change modification time") #-------------------------------------------------------------------------- def next(self): """Return the next member of the archive as a TarInfo object, when TarFile is opened for reading. Return None if there is no more available. """ self._check("ra") if self.firstmember is not None: m = self.firstmember self.firstmember = None return m # Read the next block. self.fileobj.seek(self.offset) tarinfo = None while True: try: tarinfo = self.tarinfo.fromtarfile(self) except EOFHeaderError as e: if self.ignore_zeros: self._dbg(2, "0x%X: %s" % (self.offset, e)) self.offset += BLOCKSIZE continue except InvalidHeaderError as e: if self.ignore_zeros: self._dbg(2, "0x%X: %s" % (self.offset, e)) self.offset += BLOCKSIZE continue elif self.offset == 0: raise ReadError(str(e)) except EmptyHeaderError: if self.offset == 0: raise ReadError("empty file") except TruncatedHeaderError as e: if self.offset == 0: raise ReadError(str(e)) except SubsequentHeaderError as e: raise ReadError(str(e)) break if tarinfo is not None: if self.save_to_members: self.members.append(tarinfo) else: self._loaded = True return tarinfo #-------------------------------------------------------------------------- # Little helper methods: def _getmember(self, name, tarinfo=None, normalize=False): """Find an archive member by name from bottom to top. If tarinfo is given, it is used as the starting point. """ # Ensure that all members have been loaded. members = self.getmembers() # Limit the member search list up to tarinfo. if tarinfo is not None: members = members[:members.index(tarinfo)] if normalize: name = os.path.normpath(name) for member in reversed(members): if normalize: member_name = os.path.normpath(member.name) else: member_name = member.name if name == member_name: return member def _load(self): """Read through the entire archive file and look for readable members. """ while True: tarinfo = self.next() if tarinfo is None: break self._loaded = True def _check(self, mode=None): """Check if TarFile is still open, and if the operation's mode corresponds to TarFile's mode. """ if self.closed: raise OSError("%s is closed" % self.__class__.__name__) if mode is not None and self.mode not in mode: raise OSError("bad operation for mode %r" % self.mode) def _find_link_target(self, tarinfo): """Find the target member of a symlink or hardlink member in the archive. """ if tarinfo.issym(): # Always search the entire archive. linkname = "/".join(filter(None, (os.path.dirname(tarinfo.name), tarinfo.linkname))) limit = None else: # Search the archive before the link, because a hard link is # just a reference to an already archived file. linkname = tarinfo.linkname limit = tarinfo member = self._getmember(linkname, tarinfo=limit, normalize=True) if member is None: raise KeyError("linkname %r not found" % linkname) return member def __iter__(self): """Provide an iterator object. """ if self._loaded: return iter(self.members) else: return TarIter(self) def _dbg(self, level, msg, *args): """Write debugging output to sys.stderr. """ if level <= self.debug: print(msg.format(*args), file=sys.stderr) def __enter__(self): self._check() return self def __exit__(self, type, value, traceback): if type is None: self.close() else: # An exception occurred. We must not call close() because # it would try to write end-of-archive blocks and padding. if not self._extfileobj: self.fileobj.close() self.closed = True def _unlinkfirst(targetpath): try: os.unlink(targetpath) except OSError as e: if e.errno == errno.ENOENT or e.errno == errno.EISDIR: pass # class TarFile class TarIter: """Iterator Class. for tarinfo in TarFile(...): suite... """ def __init__(self, tarfile): """Construct a TarIter object. """ self.tarfile = tarfile self.index = 0 def __iter__(self): """Return iterator object. """ return self def __next__(self): """Return the next item using TarFile's next() method. When all members have been read, set TarFile as _loaded. """ # Fix for SF #1100429: Under rare circumstances it can # happen that getmembers() is called during iteration, # which will cause TarIter to stop prematurely. if self.index == 0 and self.tarfile.firstmember is not None: tarinfo = self.tarfile.next() elif self.index < len(self.tarfile.members): tarinfo = self.tarfile.members[self.index] elif not self.tarfile._loaded: tarinfo = self.tarfile.next() if not tarinfo: self.tarfile._loaded = True raise StopIteration else: raise StopIteration self.index += 1 return tarinfo #--------------------------------------------------------- # support functionality for rescue mode #--------------------------------------------------------- TAR_FMT_HDR = (# See tar(5): "<" "100s" # ← char name[100]; /* 100 */ "8s" # ← char mode[8]; /* 108 */ "8s" # ← char uid[8]; /* 116 */ "8s" # ← char gid[8]; /* 124 */ "12s" # ← char size[12]; /* 136 */ "12s" # ← char mtime[12]; /* 148 */ "8s" # ← char checksum[8]; /* 156 */ "B" # ← char typeflag[1]; /* 157 */ "100s" # ← char linkname[100]; /* 257 */ "6s" # ← char magic[6]; /* 263 */ "2s" # ← char version[2]; /* 265 */ "32s" # ← char uname[32]; /* 297 */ "32s" # ← char gname[32]; /* 329 */ "8s" # ← char devmajor[8]; /* 337 */ "8s" # ← char devminor[8]; /* 345 */ "12s" # ← char atime[12]; /* 357 */ "12s" # ← char ctime[12]; /* 369 */ "12s" # ← char offset[12]; /* 381 */ "4s" # ← char longnames[4]; /* 385 */ "B" # ← char unused[1]; /* 386 */ "" # struct { "12s" # ← char offset[12]; "12s" # ← char numbytes[12]; "12s" # ← char offset[12]; "12s" # ← char numbytes[12]; "12s" # ← char offset[12]; "12s" # ← char numbytes[12]; "12s" # ← char offset[12]; "12s" # ← char numbytes[12]; "" # } sparse[4]; /* 482 */ "B" # ← char isextended[1]; /* 483 */ "12s" # ← char realsize[12]; /* 495 */ "17s" # ← char pad[17]; /* 512 */ ) # The “magic” and “version” fields are special: # # tar(5) # magic The magic field holds the five characters “ustar” followed by a # space. Note that POSIX ustar archives have a trailing null. # # however, “tar.h”: # # /* OLDGNU_MAGIC uses both magic and version fields, which are contiguous. # Found in an archive, it indicates an old GNU header format, which will be # hopefully become obsolescent. With OLDGNU_MAGIC, uname and gname are # valid, though the header is not truly POSIX conforming. */ # # TAR_HDR_OFF_MAGIC = 257 TAR_FMT_OLDGNU_MAGIC = b"ustar " def read_gnu_tar_hdr (data): if len (data) != BLOCKSIZE: # header requires one complete block return None try: name, mode, \ uid, gid, \ size, mtime, \ checksum, \ typeflag, \ linkname, \ magic, \ version, \ uname, \ gname, \ devmajor, \ devminor, \ atime, \ ctime, \ offset, \ longnames, \ unused, \ offset1, numbytes1, \ offset2, numbytes2, \ offset3, numbytes3, \ offset4, numbytes4, \ isextended, \ realsize, \ pad = struct.unpack (TAR_FMT_HDR, data) except struct.error: return None if magic != TAR_FMT_OLDGNU_MAGIC: return None # return all except “unused” and “pad” return \ { "name" : name, "mode" : mode , "uid" : uid , "gid" : gid , "size" : size, "mtime" : mtime , "checksum" : checksum , "typeflag" : typeflag , "linkname" : linkname , "magic" : magic , "version" : version , "uname" : uname, "gname" : gname , "devmajor" : devmajor, "devminor" : devminor , "atime" : atime, "ctime" : ctime , "offset" : offset , "longnames" : longnames , "offset1" : offset1, "numbytes1" : numbytes1 , "offset2" : offset2, "numbytes2" : numbytes2 , "offset3" : offset3, "numbytes3" : numbytes3 , "offset4" : offset4, "numbytes4" : numbytes4 , "isextended" : isextended , "realsize" : realsize } def tar_hdr_check_chksum (data): hdr = read_gnu_tar_hdr (data) if hdr is None: return False s = calc_chksums (data) return nti (hdr ["checksum"]) in s def readable_tar_objects_offsets (ifd): """ Traverse blocks in file, trying to extract tar headers. """ pos = 0 offsets = [] mm = mmap.mmap(ifd, 0, mmap.MAP_SHARED, mmap.PROT_READ) pos = TAR_HDR_OFF_MAGIC while True: pos = mm.find (TAR_FMT_OLDGNU_MAGIC, pos) if pos == -1: break off = pos - TAR_HDR_OFF_MAGIC mm.seek (off) blk = mm.read (BLOCKSIZE) if tar_hdr_check_chksum (blk) is True: offsets.append (off) pos += 1 return offsets def locate_gz_hdr_candidates (fd): """ Walk over instances of the GZ magic in the payload, collecting their positions. If the offset of the first found instance is not zero, the file begins with leading garbage. Note that since the GZ magic consists of only two bytes, we expect a lot of false positives inside binary data. :return: The list of offsets in the file. """ pos = 0 cands = [] mm = mmap.mmap(fd, 0, mmap.MAP_SHARED, mmap.PROT_READ) while True: pos = mm.find (GZ_MAGIC_BYTES, pos) if pos == -1: break cands.append (pos) pos += len (GZ_MAGIC_BYTES) return cands HDR_CAND_GOOD = 0 # header marks begin of valid object HDR_CAND_FISHY = 1 # inconclusive HDR_CAND_JUNK = 2 # not a header / object unreadable def read_cstring (fd, max=-1, encoding=None): """ Read one NUL-terminated string from *fd* into a Python string. If *max* is non-negative, reading will terminate after the specified number of bytes. Optionally, an *encoding* may be specified to interpret the data as. :returns: *None* if parsing failed or the maximum number of bytes has been exceeded; a Python string with the data otherwise. """ buf = b"" l = 0 while True: c = os.read (fd, 1) if c == NUL: break if max >= 0 and l > max: return None buf += c l += 1 if encoding is not None: buf = buf.decode (encoding) return buf def inspect_gz_hdr (fd, off): """ Attempt to parse a Gzip header in *fd* at position *off*. The format is documented as RFC1952. Returns a verdict about the quality of that header plus the parsed header when readable. Problematic sizes such as fields running past the EOF are treated as garbage. Properties in which the header merely doesn’t conform to the spec (garbage flag bits, bogus timestamp) are considered “fishy”. No validation is possible on embedded strings because they are single-byte encoded. """ fname = None flags = 0x00 dflags = 0x00 mtime = 0x00000000 oscode = 0x00 verdict = HDR_CAND_GOOD os.lseek (fd, off, os.SEEK_SET) if os.lseek (fd, 0, os.SEEK_CUR) != off: return HDR_CAND_JUNK, None raw = os.read (fd, GZ_HEADER_SIZE) if len (raw) != GZ_HEADER_SIZE: return HDR_CAND_JUNK, None flags = 0x0 try: _m1, _m2, meth, flags, mtime, dflags, oscode = \ struct.unpack (GZ_FMT_HEADER, raw) if meth != GZ_METHOD_DEFLATE: # only deflate is supported return HDR_CAND_JUNK, None except struct.error as exn: return HDR_CAND_JUNK, None if mtime > int (time.time ()): verdict = HDR_CAND_FISHY if dflags != GZ_DEFLATE_FLAGS: verdict = HDR_CAND_FISHY if oscode != GZ_OS_CODE: verdict = HDR_CAND_FISHY if flags & GZ_FLAG_FTEXT: # created by some contrarian verdict = HDR_CAND_FISHY if flags & GZ_FLAG_FEXTRA: xlen = struct.unpack (" 0 and clen > 0: good.append (cand) return good def reconstruct_offsets_gz (fname): """ From the given file, retrieve all GZ header-like offsets (“candidates”). Then check each of those locations whether they can be processed as compressed data. """ ifd = os.open (fname, os.O_RDONLY) try: cands = locate_gz_hdr_candidates (ifd) return readable_gz_objects_offsets (ifd, cands) finally: os.close (ifd) def reconstruct_offsets_tar (fname): """ From the given file, retrieve all tar header-like offsets (“candidates”). Then check each of those locations whether they can be processed as tar data. """ ifd = os.open (fname, os.O_RDONLY) try: return readable_tar_objects_offsets (ifd) finally: os.close (ifd) def read_tarobj_at_offset (fileobj, offset, mode, secret=None, strict_validation=True): """ :type strict_validation: bool :param strict_validation: Enable strict IV checking in the crypto layer. Should be disabled when dealing with potentially corrupted data. """ decr = None if secret is not None: ks = secret [0] if ks == crypto.PDTCRYPT_SECRET_PW: decr = crypto.Decrypt (password=secret [1], strict_ivs=strict_validation) elif ks == crypto.PDTCRYPT_SECRET_KEY: key = binascii.unhexlify (secret [1]) decr = crypto.Decrypt (key=key, strict_ivs=strict_validation) else: raise RuntimeError try: tarobj = \ TarFile.open_at_offset (offset, mode=mode, fileobj=fileobj, format=GNU_FORMAT, concat='#' in mode, encryption=decr, save_to_members=False, tolerance=TOLERANCE_RESCUE) except (ReadError, EndOfFile): return None return tarobj.next () def idxent_of_tarinfo (tarinfo): """ Scrape the information relevant for the index from a *TarInfo* object. Keys like the inode number that lack a corresponding field in a TarInfo will be set to some neutral value. Example output: { "inode" : 0 , "uid" : 0 , "path" : "snapshot://annotations.db" , "offset" : 0 , "volume" : 0 , "mode" : 33152 , "ctime" : 1502798115 , "mtime" : 1502196423 , "size" : 144 , "type" : "file" , "gid" : 0 } """ return \ { "inode" : 0 # ignored when reading the index , "uid" : tarinfo.uid , "gid" : tarinfo.gid , "path" : tarinfo.name # keeping URI scheme , "offset" : 0 # to be added by the caller , "volume" : tarinfo.volume_offset , "mode" : tarinfo.mode , "ctime" : tarinfo.mtime , "mtime" : tarinfo.mtime , "size" : tarinfo.size , "type" : tarinfo.type } def gen_rescue_index (gen_volume_name, mode, maxvol=None, password=None, key=None): infos = [] psidx = [] # pseudo index, return value offsets = None secret = crypto.make_secret (password=password, key=key) nvol = 0 while True: vpath = gen_volume_name (nvol) try: if secret is not None: offsets = crypto.reconstruct_offsets (vpath, secret) elif mode == "#gz": offsets = reconstruct_offsets_gz (vpath) elif mode == "#": offsets = reconstruct_offsets_tar (vpath) else: raise TarError ("no rescue handling for mode “%s”" % mode) except FileNotFoundError as exn: # volume does not exist if maxvol is not None and nvol < maxvol: continue # explicit volume number specified, ignore missing ones else: break fileobj = bltn_open (vpath, "rb") def aux (acc, off): obj = read_tarobj_at_offset (fileobj, off, mode, secret=secret, strict_validation=False) if obj is not None: acc.append ((off, nvol, obj)) return acc infos += functools.reduce (aux, offsets, []) fileobj.close() nvol += 1 def aux (o, nvol, ti): ie = idxent_of_tarinfo (ti) ie ["offset"] = o ie ["volume"] = nvol return ie psidx = [ aux (o, nvol, ti) for o, nvol, ti in infos ] return psidx #-------------------- # exported functions #-------------------- def is_tarfile(name): """Return True if name points to a tar archive that we are able to handle, else return False. """ try: t = open(name) t.close() return True except TarError: return False bltn_open = open open = TarFile.open