# The software in this package is distributed under the GNU General # Public License version 2 (with a special exception described below). # # A copy of GNU General Public License (GPL) is included in this distribution, # in the file COPYING.GPL. # # As a special exception, if other files instantiate templates or use macros # or inline functions from this file, or you compile this file and link it # with other works to produce a work based on this file, this file # does not by itself cause the resulting work to be covered # by the GNU General Public License. # # However the source code for this file must still be made available # in accordance with section (3) of the GNU General Public License. # # This exception does not invalidate any other reasons why a work based # on this file might be covered by the GNU General Public License. # # Copyright (c) 2016-2018 Intra2net AG """ Iterative reading of log files, similar to shell command `tail -f`. Copyright: Intra2net AG Basic Functionality (class :py:class:`IterativeReader`): Runs stat in a loop to find out whether file size has changed. Then reads the new data and forwards that .. todo:: Want to also use lsof to find out whether file/pipe/socket was closed, so can automatically return from read loop. :py:class:`LineReader` takes output of :py:class:`IterativeReader` and returns it line-wise as is normal for log files :py:class:`LogParser` takes those lines and tries to parse them into fields like date, time, module name, urgency and message. .. todo:: auto-detect log line layout """ import os import os.path import re from warnings import warn import logging from contextlib import contextmanager from .iter_helpers import zip_longest from .type_helpers import is_str_or_byte, is_file_obj class LogReadWarning(UserWarning): """Warnings issued by classes in this module.""" pass def true_func(_): """Replacement for :py:func:`check_is_used`. Returns `True` always.""" return True def false_func(_): """Replacement for :py:func:`check_is_used`. Returns `False` always.""" return False def check_is_used(file_handle): """ Check whether file is being written to. To be implemented, e.g. using lsof. If beneficial could also easily supply python file object as arg. :param int file_handle: OS-level file descriptor """ raise NotImplementedError(file_handle) #: counter for unknown sources in :py:func:`create_description` _create_description_unknown_counter = 0 def create_description(file_obj, file_handle): """ Create some description for given file-like object / file descriptor. :param file_obj: file-like object :param int file_handle: os-level file descriptor :returns: Short description for file-like object :rtype: string """ global _create_description_unknown_counter try: desc = file_obj.name if desc: return desc except AttributeError: pass if file_handle is not None: return 'file{0}'.format(file_handle) else: _create_description_unknown_counter += 1 return 'unknown{0}'.format(_create_description_unknown_counter) #: error message for IterativeReader constructor _STR_ERR = 'not accepting file name "{0}" since cannot guarantee closing ' \ 'files --> use with open(file_name)!' class IterativeReader(object): """ Read continuously from a given file. Use `os.stat(file_obj.fileno()).st_size` as measure whether file has changed or not; Always reads as much data as possible. Does not care about closing files, so does not accept file names. This is the base for class :py:class:`LineReader` that just has to implement a different :py:meth:`prepare_result` method. """ def __init__(self, sources, descs=None, keep_watching=False): """ Create a reader; do some basic checks on args. :param sources: iterable over sources. Sources can be opened file objects or read-opened os-level file descriptors. Calling code has to ensure they are closed properly, so best use this within a "with open(file_name) as file_handle:"-context. If sources is a single file obj/descriptor, both source and desc will be converted to lists of length 1 :param descs: can be anything of same length as sources. If sources is a single source, then descs is also converted to a list of length 1. If not given (i.e. None), will use :py:func:`create_description` to guess descriptions :param bool keep_watching: keep watching file that is not changing in size. Need to manually tell whether file is being written to or not since auto-detect is not implemented yet. :raises: OSError when testing fstat on source """ if not sources: raise ValueError('need at least some source!') elif is_str_or_byte(sources): raise ValueError(_STR_ERR.format(sources)) elif is_file_obj(sources) or isinstance(sources, int): source_input = [sources, ] desc_input = [descs, ] else: source_input = sources # assume some iterable desc_input = descs # now divide sources into os-level file descriptors for os.fstat, # and file objects for read() self.file_objs = [] self.file_handles = [] # file descriptOR, not descriptION for source in source_input: if is_file_obj(source): self.file_objs.append(source) self.file_handles.append(source.fileno()) elif isinstance(source, int): self.file_objs.append(os.fdopen(source)) self.file_handles.append(source) elif is_str_or_byte(source): raise ValueError(_STR_ERR.format(source)) else: raise ValueError('source {0} is neither file obj nor file ' 'descriptor!') # try to fstat the new file descriptor just for testing os.fstat(self.file_handles[-1]) # guess descriptions if not given if not desc_input: self.descriptions = [create_description(obj, file_handle) for obj, file_handle in zip(self.file_objs, self.file_handles)] else: try: if len(desc_input) != len(self.file_objs): raise ValueError('need same number of sources and ' 'descriptions!') except TypeError: pass # desc_input is generator or so self.descriptions = [] for obj, file_handle, description in \ zip_longest(self.file_objs, self.file_handles, desc_input): if obj is None: raise ValueError('more descriptions than sources!') elif description is None: self.descriptions.append(create_description(obj, file_handle)) else: self.descriptions.append(description) self.last_sizes = [0 for _ in self.file_objs] self.ignore = [False for _ in self.file_objs] if keep_watching: self.is_used_func = true_func else: self.is_used_func = false_func # use some day: self.is_used_func = check_is_used for obj, file_handle, description in \ zip(self.file_objs, self.file_handles, self.descriptions): logging.debug('log_read initialized with file descriptor {0}, ' 'file obj {1}, description "{2}"' .format(file_handle, obj, description)) def n_sources(self): """Return number of sources given to constructor.""" return len(self.file_objs) def n_active_sources(self): """Return number of sources we are actually watching.""" return len(self.ignore) - sum(self.ignore) def __iter__(self): """ Continue reading from sources, yield results. yields result of :py:meth:`prepare_result`, which depends on what subclass you called this function from. """ while True: if all(self.ignore): break for idx, (obj, file_handle, description, last_size, do_ignore) in \ enumerate(zip(self.file_objs, self.file_handles, self.descriptions, self.last_sizes, self.ignore)): if do_ignore: continue # get new file size new_size = os.fstat(file_handle).st_size # compare to old size if new_size == last_size: if not self.is_used_func(file_handle): self.ignore[idx] = True else: if new_size < last_size: # happened at start of some tests warn('{0} / {1} has become smaller ({2} --> {3})! ' .format(obj, description, last_size, new_size) + 'Maybe you are reading from a half-initialized ' + 'file?', category=LogReadWarning) try: new_data = obj.read() except OSError as ose: # includes IOErrors warn('io error reading from {0} / {1}: {2})' .format(obj, description, ose), category=LogReadWarning) new_data = str(ose) except UnicodeDecodeError as ude: warn('unicode error reading from {0} / {1}: {2}' .format(obj, description, ude), category=LogReadWarning) new_data = str(ude) # post-processing for result in self.prepare_result(description, new_data, idx): yield result # prepare next iteration self.last_sizes[idx] = new_size def prepare_result(self, description, data, idx): """ From raw new data create some yield-able results. Intended for overwriting in subclasses. This function is called from `__iter__` for each new data that becomes available. It has to provide results which are forwarded to caller. This base implementation just yields its input, so new data is yielded from `__iter__` as-is. :param str description: Description of source of lines, one of :py:data:`self.descriptions` :param str data: Text data read from source :param idx: Index of data source :returns: nothing but yields [(description, data, idx)], same as input """ yield description, data, idx #: characters to `rstrip()` from end of complete lines LINE_SPLITTERS = '\n\r' class LineReader(IterativeReader): """ An :py:class:`IterativeReader` that returns new data line-wise. This means buffering partial line data. """ def __init__(self, *args, **kwargs): """Create an :py:class:`IterativeReader and buffers for sources.""" super(LineReader, self).__init__(*args, **kwargs) self.line_buffers = ['' for _ in range(self.n_sources())] def prepare_result(self, description, new_data, idx): """ Take raw new data and split it into lines. If line is not complete, then buffer it. Args: see super class method :py:meth:`IterativeReader.prepare_result` :returns: list of 3-tuples `(description, line, idx)` where `description` and `idx` are same as args, and `line` is without trailing newline characters :rtype: [(str, str, int)] """ all_data = self.line_buffers[idx] + new_data self.line_buffers[idx] = '' should_be_no_new_lines = False for line in all_data.splitlines(True): if line[-1] in LINE_SPLITTERS: yield description, line.rstrip(LINE_SPLITTERS), idx elif should_be_no_new_lines: # self-check raise ValueError('Programming error: something went wrong with ' 'line splitting/buffering.') else: self.line_buffers[idx] = line should_be_no_new_lines = True # (this should be the last) class LogParser(LineReader): """ Takes lines from :py:class:`LineReader` and parses their contents. Requires a pattern for log lines, auto-detection is not implemented yet. Iteration returns :py:class:`re.match` result or -- if matching failed -- None. The latest unparsed line is available as `self.last_unparsed_line`. Usage recommendation:: with open(log_file_name, 'rt') as file_handle: parser = log_read.LogParser(file_handle, pattern=my_pattern): for _, data, _ in parser: if data is None: print(f'Failed to parse line {parser.last_unparsed_line}') continue line_parts = data.groupdict() ...do stuff with line_parts... """ def __init__(self, log_file, pattern=None): """ Create a LogParser. :param log_file: source of log lines to parse :type log_file: see arg `sources` of constructor of :py:class:`IterativeReader` :param pattern: regexp to parse log lines; None (default) to return line as they are :type pattern: str or None (default) """ super(LogParser, self).__init__(log_file) self.pattern = pattern self.last_unparsed_line = '' def prepare_result(self, *args): """ Try parsing lines. Args: see super class method :py:meth:`IterativeReader.prepare_result` :returns: 3-tuples `(description, line, idx)` where `description` and `idx` are same as input args and `line` is either a :py:class:`re.Match` if line matched :py:data:`self.pattern` or just str if line did not match. :rtype: [(str, :py:class:`re.Match` OR str, int)] """ # let super class split data into lines for description, raw_line, idx in \ super(LogParser, self).prepare_result(*args): matches = re.match(self.pattern, raw_line) if matches: yield description, matches, idx else: self.last_unparsed_line = raw_line yield description, None, idx @classmethod @contextmanager def create_for(cls, filename, *args, **kwargs): """ Open single file, yield LogParser. Ensures file is closed afterwards. This allows opening file and creation LogParser for it to one line:: with LogParser.create_for('/var/log/messages', SYS_LOG_PATTERN) as parser: for _, matches, _ in parser: try: print(matches.groupdict()) except Exception: print(f'UNPARSED: {parser.last_unparsed_line}') :param str filename: something that :py:meth:`open` accepts :param args: Forwarded to constructor :param kwargs: Forwarded to constructor """ with open(filename) as file_handle: yield cls(file_handle, *args, **kwargs) ################################################################################ # PATTERNS FOR FREQUENT LOG FILES ################################################################################ # pattern of squid proxy logs. group names are best guesses PROXY_LOG_PATTERN = \ r'\s*(?P\d+\.\d+\.\d+\s+\d+:\d+:\d+|\d+\.\d+)\s+(?P\d+)\s+' \ + r'(?P\d+\.\d+\.\d+\.\d+)\s+(?P[A-Z_]+)/(?P\d+)\s+' \ + r'(?P\d+)\s+(?P\S+)\s+(?P\S+)\s+(?P\S+)\s+' \ + r'(?P[A-Z_]+)/(?P\S+)\s+(?P\S+)\s+(?P.*)\s*' # pattern for linux system logs (usually "messages" or "syslog" also "maillog" SYS_LOG_PATTERN = \ r'\s*(?P\w{3} +\d{1,2} \d{2}:\d{2}:\d{2}) (?P\S+) ' \ + r'(?P[^\[\]:]+)(?:\[(?P\d+)\])?: (?P.*)'