Merge branch 'log-parser'
authorChristian Herdtweck <christian.herdtweck@intra2net.com>
Thu, 19 May 2022 11:23:42 +0000 (13:23 +0200)
committerChristian Herdtweck <christian.herdtweck@intra2net.com>
Thu, 19 May 2022 11:23:42 +0000 (13:23 +0200)
src/log_read.py
test/test_log_read.py

index 1d8bef4..724f4e1 100644 (file)
@@ -35,7 +35,7 @@ Runs stat in a loop to find out whether file size has changed. Then reads the
 new data and forwards that
 
 .. todo:: Want to also use lsof to find out whether file/pipe/socket was
-          closed, so can return from read loop
+          closed, so can automatically return from read loop.
 
 :py:class:`LineReader` takes output of :py:class:`IterativeReader` and returns
 it line-wise as is normal for log files
@@ -50,12 +50,12 @@ INTERFACE
 ------------------------------------------------------
 
 """
-
 import os
 import os.path
 import re
 from warnings import warn
 import logging
+from contextlib import contextmanager
 from .iter_helpers import zip_longest
 from .type_helpers import is_str_or_byte, is_file_obj
 
@@ -70,6 +70,11 @@ def true_func(_):
     return True
 
 
+def false_func(_):
+    """Replacement for :py:func:`check_is_used`. Returns `False` always."""
+    return False
+
+
 def check_is_used(file_handle):
     """
     Check whether file is being written to.
@@ -130,7 +135,7 @@ class IterativeReader(object):
     implement a different :py:meth:`prepare_result` method.
     """
 
-    def __init__(self, sources, descs=None, return_when_done=False):
+    def __init__(self, sources, descs=None, keep_watching=False):
         """
         Create a reader; do some basic checks on args.
 
@@ -145,10 +150,10 @@ class IterativeReader(object):
                       a single source, then descs is also converted to a list
                       of length 1. If not given (i.e. None), will use
                       :py:func:`create_description` to guess descriptions
-        :param bool return_when_done: ignore file_handle if no-one is writing
-                                      to it any more. Return from iterator when
-                                      all watched files are done (not
-                                      implemented yet)
+        :param bool keep_watching: keep watching file that is not changing in
+                                   size. Need to manually tell whether file
+                                   is being written to or not since auto-detect
+                                   is not implemented yet.
         :raises: OSError when testing fstat on source
         """
         if not sources:
@@ -209,10 +214,11 @@ class IterativeReader(object):
         self.last_sizes = [0 for _ in self.file_objs]
         self.ignore = [False for _ in self.file_objs]
 
-        if return_when_done:
-            self.is_used_func = check_is_used
-        else:
+        if keep_watching:
             self.is_used_func = true_func
+        else:
+            self.is_used_func = false_func
+        # use some day: self.is_used_func = check_is_used
 
         for obj, file_handle, description in \
                 zip(self.file_objs, self.file_handles, self.descriptions):
@@ -232,10 +238,13 @@ class IterativeReader(object):
         """
         Continue reading from sources, yield results.
 
-        yields result of :py:meth:`prepare_result`, which depends on what sub
-        class you called this function from.
+        yields result of :py:meth:`prepare_result`, which depends on what
+        subclass you called this function from.
         """
         while True:
+            if all(self.ignore):
+                break
+
             for idx, (obj, file_handle, description, last_size, do_ignore) in \
                     enumerate(zip(self.file_objs, self.file_handles,
                                   self.descriptions, self.last_sizes,
@@ -249,10 +258,6 @@ class IterativeReader(object):
                 # compare to old size
                 if new_size == last_size:
                     if not self.is_used_func(file_handle):
-                        warn('no one is writing to {0} / {1} -- '
-                             'stop watching it!'
-                             .format(file_handle, description),
-                             category=LogReadWarning)
                         self.ignore[idx] = True
                 else:
                     if new_size < last_size:  # happened at start of some tests
@@ -275,8 +280,7 @@ class IterativeReader(object):
                         new_data = str(ude)
 
                     # post-processing
-                    to_yield = self.prepare_result(description, new_data, idx)
-                    for result in to_yield:
+                    for result in self.prepare_result(description, new_data, idx):
                         yield result
 
                     # prepare next iteration
@@ -289,21 +293,18 @@ class IterativeReader(object):
         Intended for overwriting in subclasses.
 
         This function is called from __iter__ for each new data that becomes
-        available. It has to return some iterable whose entries are yielded
-        from iteration over objects of this class.
+        available. It has to provide results which are forwarded to caller.
 
-        This base implementation just returns its input in a list, so new data
-        is yielded from __iter__ as-is. Subclass implementations can also yield
-        tuples.
+        This base implementation just yields its input, so new data is yielded
+        from `__iter__` as-is.
 
         :param str description: Description of source of lines, one of
                                 :py:data:`self.descriptions`
         :param str data: Text data read from source
         :param idx: Index of data source
-        :returns: [(description, data, idx], same as input
-        :rtype [(str, str, int)]
+        :returns: nothing but yields [(description, data, idx], same as input
         """
-        return [(description, data, idx), ]
+        yield description, data, idx
 
 
 #: characters to `rstrip()` from end of complete lines
@@ -336,11 +337,10 @@ class LineReader(IterativeReader):
         """
         all_data = self.line_buffers[idx] + new_data
         self.line_buffers[idx] = ''
-        result = []
         should_be_no_new_lines = False
         for line in all_data.splitlines(True):
             if line[-1] in LINE_SPLITTERS:
-                result.append((description, line.rstrip(LINE_SPLITTERS), idx))
+                yield description, line.rstrip(LINE_SPLITTERS), idx
             elif should_be_no_new_lines:
                 # self-check
                 raise ValueError('Programming error: something went wrong with '
@@ -349,8 +349,6 @@ class LineReader(IterativeReader):
                 self.line_buffers[idx] = line
                 should_be_no_new_lines = True  # (this should be the last)
 
-        return result
-
 
 class LogParser(LineReader):
     """
@@ -358,8 +356,18 @@ class LogParser(LineReader):
 
     Requires a pattern for log lines, auto-detection is not implemented yet.
 
-    Iteration returns re.match result or -- if matching failed -- the original
-    raw line.
+    Iteration returns :py:class:`re.match` result or -- if matching failed --
+    None. The latest unparsed line is available as `self.last_unparsed_line`.
+    Usage recommendation:
+
+        with open(log_file_name, 'rt') as file_handle:
+            parser = log_read.LogParser(file_handle, pattern=my_pattern):
+            for _, data, _ in parser:
+                if data is None:
+                    print(f'Failed to parse line {parser.last_unparsed_line}')
+                    continue
+                line_parts = data.groupdict()
+                ...do stuff with line_parts...
     """
 
     def __init__(self, log_file, pattern=None):
@@ -374,6 +382,7 @@ class LogParser(LineReader):
         super(LogParser, self).__init__(log_file)
 
         self.pattern = pattern
+        self.last_unparsed_line = ''
 
     def prepare_result(self, *args):
         """
@@ -389,8 +398,48 @@ class LogParser(LineReader):
         # let super class split data into lines
         for description, raw_line, idx in \
                 super(LogParser, self).prepare_result(*args):
-            result = re.match(self.pattern, raw_line)
-            if result:
-                return description, result, idx
+            matches = re.match(self.pattern, raw_line)
+            if matches:
+                yield description, matches, idx
             else:
-                return description, raw_line, idx
+                self.last_unparsed_line = raw_line
+                yield description, None, idx
+
+    @classmethod
+    @contextmanager
+    def create_for(cls, filename, *args, **kwargs):
+        """
+        Open single file, yield LogParser. Ensures file is closed afterwards.
+
+        This allows opening file and creation LogParser for it to one line:
+
+            with LogParser.create_for('/var/log/messages', SYS_LOG_PATTERN) as parser:
+                for _, matches, _ in parser:
+                    try:
+                        print(matches.groupdict())
+                    except Exception:
+                        print(f'UNPARSED: {parser.last_unparsed_line}')
+
+        :param str filename: something that :py:meth:`open` accepts
+        :param args: Forwarded to constructor
+        :param kwargs: Forwarded to constructor
+        """
+        with open(filename) as file_handle:
+            yield cls(file_handle, *args, **kwargs)
+
+
+################################################################################
+# PATTERNS FOR FREQUENT LOG FILES
+################################################################################
+
+# pattern of squid proxy logs. group names are best guesses
+PROXY_LOG_PATTERN = \
+    r'\s*(?P<timestamp>\d+\.\d+\.\d+\s+\d+:\d+:\d+|\d+\.\d+)\s+(?P<size1>\d+)\s+' \
+    + r'(?P<ip>\d+\.\d+\.\d+\.\d+)\s+(?P<status_text>[A-Z_]+)/(?P<status_code>\d+)\s+' \
+    + r'(?P<size2>\d+)\s+(?P<command>\S+)\s+(?P<url>\S+)\s+(?P<user>\S+)\s+' \
+    + r'(?P<action>[A-Z_]+)/(?P<origin>\S+)\s+(?P<mimetype>\S+)\s+(?P<unknown>.*)\s*'
+
+# pattern for linux system logs (usually "messages" or "syslog" also "maillog"
+SYS_LOG_PATTERN = \
+    r'\s*(?P<timestamp>\w{3} +\d{2} \d{2}:\d{2}:\d{2}) (?P<hostname>\S+) ' \
+    + r'(?P<procname>[^\[\]:]+)(?:\[(?P<pid>\d+)\])?: (?P<message>.*)'
index 28576a4..68c9160 100644 (file)
@@ -31,7 +31,7 @@ import time
 import logging
 from warnings import warn
 
-from src.log_read import IterativeReader, LineReader, LogReadWarning
+from src.log_read import *
 
 # get best clock
 perf_counter = time.perf_counter
@@ -44,7 +44,7 @@ class LogFileWriter(Thread):
 
     def __init__(self, file_name, text_pattern, n_writes=None,
                  pause_time=0.1, do_encode=None, use_logging=True):
-        """ creates thread, deamon is True
+        """ creates thread, daemon is True
 
         if n_writes is None, will write indefinitely; else writes text_pattern
         n_writes times, formatted with (counter, perf_counter)
@@ -156,7 +156,7 @@ class LogReadTester(unittest.TestCase):
             reader = IterativeReader([file_handle, file_handle],
                                      ['desc1', 'desc2'])
             self.helper_test_len(reader, 2)
-            reader = IterativeReader((file_handle for idx in range(5)))
+            reader = IterativeReader((file_handle for _ in range(5)))
             self.helper_test_len(reader, 5)
             self.assertRaises(ValueError, IterativeReader,
                               (file_handle for idx in range(5)),
@@ -191,7 +191,7 @@ class LogReadTester(unittest.TestCase):
             time_diffs = []
 
             with open(self.temp_file, 'rt') as file_handle:
-                reader = IterativeReader(file_handle)
+                reader = IterativeReader(file_handle, keep_watching=True)
                 self.helper_test_len(reader, 1)
                 counter = -1        # we may have to adapt this manually
                 for desc, text, source_idx in reader:
@@ -236,8 +236,7 @@ class LogReadTester(unittest.TestCase):
                  'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e',
                  '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n',
                  'end\n', '\nend\n', '\n\nend\n\n']
-        lines_expected = ['line{0}'.format(idx) for idx in range(12)] \
-                         + ['', '', '']
+        lines_expected = ['line{0}'.format(idx) for idx in range(12)] + ['', '', '']
 
         # create writer
         LogFileWriter(self.temp_file, texts, n_writes=len(texts),
@@ -245,9 +244,8 @@ class LogReadTester(unittest.TestCase):
                       use_logging=use_logging).start()
 
         # read
-        lines_read = []
         with open(self.temp_file, 'rt') as file_handle:
-            reader = LineReader(file_handle)
+            reader = LineReader(file_handle, keep_watching=True)
             self.helper_test_len(reader, 1)
 
             for line_expected, (_, line_read, _) in zip(lines_expected, reader):
@@ -259,6 +257,46 @@ class LogReadTester(unittest.TestCase):
                                                                 line_read))
                     self.assertEqual(line_expected, line_read)
 
+    @unittest.skipIf(not os.access('/var/log/messages', os.R_OK),
+                     "messages not accessible")
+    def test_parse_messages(self):
+        """Try parsing first 100 lines of messages if running on linux"""
+        with LogParser.create_for('/var/log/messages', SYS_LOG_PATTERN) as parser:
+            for line_count, (_, data, _) in enumerate(parser):
+                if line_count > 100:
+                    break
+                self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
+    @unittest.skipIf(not os.access('/var/log/syslog', os.R_OK),
+                     "syslog not accessible")
+    def test_parse_syslog(self):
+        """Try parsing first 100 lines of syslog if running on linux"""
+        with LogParser.create_for('/var/log/syslog', SYS_LOG_PATTERN) as parser:
+            for line_count, (_, data, _) in enumerate(parser):
+                if line_count > 100:
+                    break
+                self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
+    @unittest.skipIf(not os.access('/var/log/maillog', os.R_OK),
+                     "maillog not accessible")
+    def test_parse_maillog(self):
+        """Try parsing first 100 lines of maillog if running on linux"""
+        with LogParser.create_for('/var/log/maillog', SYS_LOG_PATTERN) as parser:
+            for line_count, (_, data, _) in enumerate(parser):
+                if line_count > 100:
+                    break
+                self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
+    @unittest.skipIf(not os.access('/var/log/squid/access.log', os.R_OK),
+                     "proxy log not accessible")
+    def test_parse_proxy_log(self):
+        """Try parsing first 100 lines of proxy log if running on linux"""
+        with LogParser.create_for('/var/log/squid/access.log', PROXY_LOG_PATTERN) as parser:
+            for line_count, (_, data, _) in enumerate(parser):
+                if line_count > 100:
+                    break
+                self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
 
 if __name__ == '__main__':
     unittest.main()