Merge branch 'log-read-improve'
authorChristian Herdtweck <christian.herdtweck@intra2net.com>
Mon, 8 Nov 2021 15:12:46 +0000 (16:12 +0100)
committerChristian Herdtweck <christian.herdtweck@intra2net.com>
Mon, 8 Nov 2021 15:12:46 +0000 (16:12 +0100)
src/log_read.py
test/test_log_read.py

index ad5d405..ce3e71b 100644 (file)
 #
 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
 
-""" Iterative reading of log files
+"""
+
+SUMMARY
+------------------------------------------------------
+Iterative reading of log files, similar to shell command `tail -f`.
+
+Copyright: Intra2net AG
+
+
+CONTENTS
+------------------------------------------------------
 
 Basic Functionality (class :py:class:`IterativeReader`):
 Runs stat in a loop to find out whether file size has changed. Then reads the
 new data and forwards that
 
-.. todo:: Want to also use lsof to find out whether file/pipe/socket was closed,
-         so can return from read loop
+.. todo:: Want to also use lsof to find out whether file/pipe/socket was
+          closed, so can return from read loop
 
 :py:class:`LineReader` takes output of :py:class:`IterativeReader` and returns
 it line-wise as is normal for log files
@@ -34,46 +44,58 @@ it line-wise as is normal for log files
 like date, time, module name, urgency and message.
 
 .. todo:: auto-detect log line layout
+
+
+INTERFACE
+------------------------------------------------------
+
 """
 
 import os
+import os.path
 import re
 from warnings import warn
-import os.path
 import logging
 from .iter_helpers import zip_longest
 from .type_helpers import is_str_or_byte, is_file_obj
 
 
 class LogReadWarning(UserWarning):
-    """ warnings issued by classes in this module """
+    """Warnings issued by classes in this module."""
     pass
 
 
-def true_func(unused_argument_but_that_is_ok):
-    """ does nothing, always returns True """
+def true_func(_):
+    """Replacement for :py:func:`check_is_used`. Returns `True` always."""
     return True
 
 
-def check_is_used(some_file_or_handle):
-    """ check whether file is being written to
+def check_is_used(file_handle):
+    """
+    Check whether file is being written to.
+
+    To be implemented, e.g. using lsof.
+
+    If beneficial could also easily supply python file object as arg.
 
-    to be implemented, e.g. using lsof
+    :param int file_handle: OS-level file descriptor
     """
-    raise NotImplementedError()
+    raise NotImplementedError(file_handle)
 
 
+#: counter for unknown sources in :py:func:`create_description`
 _create_description_unknown_counter = 0
 
 
-def create_description(file_obj, file_desc):
-    """ create some description for given file-like object / file descriptor
+def create_description(file_obj, file_handle):
+    """
+    Create some description for given file-like object / file descriptor.
 
     :param file_obj: file-like object
-    :param int file_desc: os-level file descriptor
-    :returns: string
+    :param int file_handle: os-level file descriptor
+    :returns: Short description for file-like object
+    :rtype: string
     """
-
     global _create_description_unknown_counter
 
     try:
@@ -83,8 +105,8 @@ def create_description(file_obj, file_desc):
     except AttributeError:
         pass
 
-    if file_desc is not None:
-        return 'file{0}'.format(file_desc)
+    if file_handle is not None:
+        return 'file{0}'.format(file_handle)
     else:
         _create_description_unknown_counter += 1
         return 'unknown{0}'.format(_create_description_unknown_counter)
@@ -96,21 +118,21 @@ _STR_ERR = 'not accepting file name "{0}" since cannot guarantee closing ' \
 
 
 class IterativeReader(object):
-    """ reads from a given file
-
-    Uses os.stat(file_obj.fileno()).st_size as measure whether file has changed
-    or not; Always reads as much data as possible
+    """
+    Read continuously from a given file.
 
-    Catches most common exceptions in iteration (not constructor)
+    Use `os.stat(file_obj.fileno()).st_size` as measure whether file has
+    changed or not; Always reads as much data as possible.
 
-    Does not care about closing files, so does not accept file names
+    Does not care about closing files, so does not accept file names.
 
     This is the base for class :py:class:`LineReader` that just has to
-    implement a different :py:meth:`prepare_result` method
+    implement a different :py:meth:`prepare_result` method.
     """
 
     def __init__(self, sources, descs=None, return_when_done=False):
-        """ creates a reader; does some basic checks on args
+        """
+        Create a reader; do some basic checks on args.
 
         :param sources: iterable over sources. Sources can be opened file
                         objects or read-opened os-level file descriptors.
@@ -143,14 +165,14 @@ class IterativeReader(object):
         # now divide sources into os-level file descriptors for os.fstat,
         # and file objects for read()
         self.file_objs = []
-        self.file_descs = []          # file descriptOR, not descriptION
+        self.file_handles = []          # file descriptOR, not descriptION
         for source in source_input:
             if is_file_obj(source):
                 self.file_objs.append(source)
-                self.file_descs.append(source.fileno())
+                self.file_handles.append(source.fileno())
             elif isinstance(source, int):
                 self.file_objs.append(os.fdopen(source))
-                self.file_descs.append(source)
+                self.file_handles.append(source)
             elif is_str_or_byte(source):
                 raise ValueError(_STR_ERR.format(source))
             else:
@@ -158,13 +180,13 @@ class IterativeReader(object):
                                  'descriptor!')
 
             # try to fstat the new file descriptor just for testing
-            os.fstat(self.file_descs[-1])
+            os.fstat(self.file_handles[-1])
 
         # guess descriptions if not given
         if not desc_input:
-            self.descriptions = [create_description(obj, file_desc)
-                                 for obj, file_desc
-                                 in zip(self.file_objs, self.file_descs)]
+            self.descriptions = [create_description(obj, file_handle)
+                                 for obj, file_handle
+                                 in zip(self.file_objs, self.file_handles)]
         else:
             try:
                 if len(desc_input) != len(self.file_objs):
@@ -174,13 +196,13 @@ class IterativeReader(object):
                 pass  # desc_input is generator or so
 
             self.descriptions = []
-            for obj, file_desc, description in \
-                    zip_longest(self.file_objs, self.file_descs, desc_input):
+            for obj, file_handle, description in \
+                    zip_longest(self.file_objs, self.file_handles, desc_input):
                 if obj is None:
                     raise ValueError('more descriptions than sources!')
                 elif description is None:
                     self.descriptions.append(create_description(obj,
-                                                                file_desc))
+                                                                file_handle))
                 else:
                     self.descriptions.append(description)
 
@@ -188,48 +210,57 @@ class IterativeReader(object):
         self.ignore = [False for _ in self.file_objs]
 
         if return_when_done:
-            self.is_used = check_is_used
+            self.is_used_func = check_is_used
         else:
-            self.is_used = true_func
+            self.is_used_func = true_func
 
-        for obj, file_desc, description in \
-                zip(self.file_objs, self.file_descs, self.descriptions):
+        for obj, file_handle, description in \
+                zip(self.file_objs, self.file_handles, self.descriptions):
             logging.debug('log_read initialized with file descriptor {0}, '
                           'file obj {1}, description "{2}"'
-                          .format(file_desc, obj, description))
+                          .format(file_handle, obj, description))
 
     def n_sources(self):
+        """Return number of sources given to constructor."""
         return len(self.file_objs)
 
     def n_active_sources(self):
+        """Return number of sources we are actually watching."""
         return len(self.ignore) - sum(self.ignore)
 
     def __iter__(self):
+        """
+        Continue reading from sources, yield results.
+
+        yields result of :py:meth:`prepare_result`, which depends on what sub
+        class you called this function from.
+        """
         while True:
-            for idx, (obj, file_desc, description, last_size, do_ignore) in \
-                    enumerate(zip(self.file_objs, self.file_descs,
+            for idx, (obj, file_handle, description, last_size, do_ignore) in \
+                    enumerate(zip(self.file_objs, self.file_handles,
                                   self.descriptions, self.last_sizes,
                                   self.ignore)):
-
                 if do_ignore:
                     continue
 
                 # get new file size
-                new_size = os.fstat(file_desc).st_size
+                new_size = os.fstat(file_handle).st_size
 
                 # compare to old size
                 if new_size == last_size:
-                    if not self.is_used(file_desc):
+                    if not self.is_used_func(file_handle):
                         warn('no one is writing to {0} / {1} -- '
                              'stop watching it!'
-                             .format(file_desc, description),
+                             .format(file_handle, description),
                              category=LogReadWarning)
                         self.ignore[idx] = True
-                elif new_size < last_size:
-                    warn('{0} / {1} has become smaller ({2} --> {3})!'
-                         .format(obj, description, last_size, new_size),
-                         category=LogReadWarning)
-                else:   # (new_size > last_size)
+                else:
+                    if new_size < last_size:  # happened at start of some tests
+                        warn('{0} / {1} has become smaller ({2} --> {3})! '
+                             .format(obj, description, last_size, new_size)
+                             + 'Maybe you are reading from a half-initialized '
+                             + 'file?',
+                             category=LogReadWarning)
                     try:
                         new_data = obj.read()
                     except OSError as ose:    # includes IOErrors
@@ -252,47 +283,60 @@ class IterativeReader(object):
                     self.last_sizes[idx] = new_size
 
     def prepare_result(self, description, data, idx):
-        """ from raw new data create some yield-able results
+        """
+        From raw new data create some yield-able results.
 
-        to be intended for overwriting in sub-classes
+        Intended for overwriting in sub-classes.
 
-        this function is called from __iter__ for each new data that becomes
+        This function is called from __iter__ for each new data that becomes
         available. It has to return some iterable whose entries are yielded
         from iteration over objects of this class.
 
-        It receives the following args:
-        - the description of the source
-        - the data itself
-        - the index of the source
         The result must be an iterable of objects, which are yielded as-is, so
-        can have any form
+        can have any form.
 
         This base implementation just returns its input in a list, so new data
-        is yielded from __iter__ as-is
+        is yielded from __iter__ as-is.
+
+        Subclass implementations can also yield tuples.
+
+        :param str description: Description of source of lines, one of
+                                :py:data:`self.descriptions`
+        :param str new_data: Text data read from source
+        :param idx: Index of data source
+        :returns: [(description, data, idx], same as input
+        :rtype [(str, str, int)]
         """
-        return [(description, data), ]
+        return [(description, data, idx), ]
 
 
+#: characters to `rstrip()` from end of complete lines
 LINE_SPLITTERS = '\n\r'
 
 
 class LineReader(IterativeReader):
-    """ an IterativeReader that returns new data line-wise
+    """
+    An :py:class:`IterativeReader` that returns new data line-wise.
 
-    this means buffering partial line data
+    This means buffering partial line data.
     """
 
     def __init__(self, *args, **kwargs):
-        """ forwards all args and kwargs to :py:class:`IterativeReader` """
+        """Create an :py:class:`IterativeReader and buffers for sources."""
         super(LineReader, self).__init__(*args, **kwargs)
         self.line_buffers = ['' for _ in range(self.n_sources())]
 
     def prepare_result(self, description, new_data, idx):
-        """ take raw new data and split it into lines
+        """
+        Take raw new data and split it into lines.
 
-        if line is not complete, then buffer it
+        If line is not complete, then buffer it.
 
-        returns lines without their newline characters
+        Args: see super class method :py:meth:`IterativeReader.prepare_result`
+        :returns: list of 3-tuples `(description, line, idx)` where
+                  `description` and `idx` are same as args, and `line` is
+                  without trailing newline characters
+        :rtype: [(str, str, int)]
         """
         all_data = self.line_buffers[idx] + new_data
         self.line_buffers[idx] = ''
@@ -300,10 +344,11 @@ class LineReader(IterativeReader):
         should_be_no_new_lines = False
         for line in all_data.splitlines(True):
             if line[-1] in LINE_SPLITTERS:
-                result.append((description, line.rstrip(LINE_SPLITTERS)))
+                result.append((description, line.rstrip(LINE_SPLITTERS), idx))
             elif should_be_no_new_lines:
-                raise ValueError('line splitters are not compatible with'
-                                 'str.splitlines!')
+                # self-check
+                raise ValueError('Programming error: something went wrong with '
+                                 'line splitting/buffering.')
             else:
                 self.line_buffers[idx] = line
                 should_be_no_new_lines = True  # (this should be the last)
@@ -312,30 +357,44 @@ class LineReader(IterativeReader):
 
 
 class LogParser(LineReader):
-    """ takes lines from LineReader and parses their contents
+    """
+    Takes lines from :py:class:`LineReader` and parses their contents.
 
-    requires a pattern for log lines, auto-detection is not implemented yet
+    Requires a pattern for log lines, auto-detection is not implemented yet.
 
     Iteration returns re.match result or -- if matching failed -- the original
-    raw line
+    raw line.
     """
 
     def __init__(self, log_file, pattern=None):
-        """ create a LogParser
+        """
+        Create a LogParser.
 
         :param str log_file: name of log file to parse (required!)
         :param pattern: regexp to split log lines; None (default) to return
                         line as they are
+        :type pattern: str or None (default)
         """
         super(LogParser, self).__init__(log_file)
 
         self.pattern = pattern
 
     def prepare_result(self, *args):
+        """
+        Try parsing lines.
+
+        Args: see super class method :py:meth:`IterativeReader.prepare_result`
+        :returns: 3-tuples `(description, line, idx)` where `description` and
+                  `idx` are same as input args and `line` is either a
+                  :py:class:`re.Match` if line matched :py:data:`self.pattern`
+                  or just str if line did not match.
+        :rtype: [(str, :py:class:`re.Match` OR str, int)]
+        """
         # let super class split data into lines
-        for _, raw_line in super(LogParser, self).prepare_result(*args):
+        for description, raw_line, idx in \
+                super(LogParser, self).prepare_result(*args):
             result = re.match(self.pattern, raw_line)
             if result:
-                yield result
+                return (description, result, idx)
             else:
-                yield raw_line
+                return (description, raw_line, idx)
index daf8ea6..f5ebe61 100644 (file)
@@ -29,8 +29,9 @@ from tempfile import mkstemp
 import os
 import time
 import logging
+from warnings import warn
 
-from src.log_read import IterativeReader, LineReader
+from src.log_read import IterativeReader, LineReader, LogReadWarning
 
 # get best clock
 from sys import version_info
@@ -134,7 +135,7 @@ class LogReadTester(unittest.TestCase):
         """ helper function that tests length of vars in reader """
         self.assertEqual(reader.n_sources(), n_expected)
         self.assertEqual(len(reader.file_objs), n_expected)
-        self.assertEqual(len(reader.file_descs), n_expected)
+        self.assertEqual(len(reader.file_handles), n_expected)
         self.assertEqual(len(reader.descriptions), n_expected)
         self.assertEqual(len(reader.ignore), n_expected)
         self.assertEqual(len(reader.last_sizes), n_expected)
@@ -198,12 +199,24 @@ class LogReadTester(unittest.TestCase):
             with open(self.temp_file, 'rt') as file_handle:
                 reader = IterativeReader(file_handle)
                 self.helper_test_len(reader, 1)
-                for counter, (desc, text) in enumerate(reader):
+                counter = -1        # we may have to adapt this manually
+                for desc, text, source_idx in reader:
                     receive_time = perf_counter()
+                    self.assertEqual(desc, self.temp_file)
+                    self.assertEqual(source_idx, 0)
+                    counter += 1
                     text = text.strip()
                     if DEBUG:
                         print('{1}: received text "{0}" at {2}'
                               .format(text, counter, receive_time))
+                    if counter == 0 and not text:
+                        # if reader runs stat() before we write, we might get
+                        # a warning and one empty read here
+                        counter -= 1
+                        warn('Got an empty read, you should have seen another '
+                             'warning about file shrinking',
+                             category=LogReadWarning)
+                        continue
                     index = text.index(':')
                     count_text = int(text[:index].strip())
                     self.assertEqual(count_text, counter)
@@ -243,7 +256,7 @@ class LogReadTester(unittest.TestCase):
             reader = LineReader(file_handle)
             self.helper_test_len(reader, 1)
 
-            for line_expected, (_, line_read) in zip(lines_expected, reader):
+            for line_expected, (_, line_read, _) in zip(lines_expected, reader):
                 if 'end' in line_read:
                     break
                 else: