Add patterns for common linux log lines
authorChristian Herdtweck <christian.herdtweck@intra2net.com>
Thu, 19 May 2022 11:16:50 +0000 (13:16 +0200)
committerChristian Herdtweck <christian.herdtweck@intra2net.com>
Thu, 19 May 2022 11:23:09 +0000 (13:23 +0200)
Needed to analyze large proxy logs in support. This helped.

Syslog pattern simplifies testing of log_read.

src/log_read.py
test/test_log_read.py

index 3b973bf..724f4e1 100644 (file)
@@ -426,3 +426,20 @@ class LogParser(LineReader):
         """
         with open(filename) as file_handle:
             yield cls(file_handle, *args, **kwargs)
+
+
+################################################################################
+# PATTERNS FOR FREQUENT LOG FILES
+################################################################################
+
+# pattern of squid proxy logs. group names are best guesses
+PROXY_LOG_PATTERN = \
+    r'\s*(?P<timestamp>\d+\.\d+\.\d+\s+\d+:\d+:\d+|\d+\.\d+)\s+(?P<size1>\d+)\s+' \
+    + r'(?P<ip>\d+\.\d+\.\d+\.\d+)\s+(?P<status_text>[A-Z_]+)/(?P<status_code>\d+)\s+' \
+    + r'(?P<size2>\d+)\s+(?P<command>\S+)\s+(?P<url>\S+)\s+(?P<user>\S+)\s+' \
+    + r'(?P<action>[A-Z_]+)/(?P<origin>\S+)\s+(?P<mimetype>\S+)\s+(?P<unknown>.*)\s*'
+
+# pattern for linux system logs (usually "messages" or "syslog" also "maillog"
+SYS_LOG_PATTERN = \
+    r'\s*(?P<timestamp>\w{3} +\d{2} \d{2}:\d{2}:\d{2}) (?P<hostname>\S+) ' \
+    + r'(?P<procname>[^\[\]:]+)(?:\[(?P<pid>\d+)\])?: (?P<message>.*)'
index 1b2f5f5..68c9160 100644 (file)
@@ -31,7 +31,7 @@ import time
 import logging
 from warnings import warn
 
-from src.log_read import IterativeReader, LineReader, LogReadWarning
+from src.log_read import *
 
 # get best clock
 perf_counter = time.perf_counter
@@ -44,7 +44,7 @@ class LogFileWriter(Thread):
 
     def __init__(self, file_name, text_pattern, n_writes=None,
                  pause_time=0.1, do_encode=None, use_logging=True):
-        """ creates thread, deamon is True
+        """ creates thread, daemon is True
 
         if n_writes is None, will write indefinitely; else writes text_pattern
         n_writes times, formatted with (counter, perf_counter)
@@ -156,7 +156,7 @@ class LogReadTester(unittest.TestCase):
             reader = IterativeReader([file_handle, file_handle],
                                      ['desc1', 'desc2'])
             self.helper_test_len(reader, 2)
-            reader = IterativeReader((file_handle for idx in range(5)))
+            reader = IterativeReader((file_handle for _ in range(5)))
             self.helper_test_len(reader, 5)
             self.assertRaises(ValueError, IterativeReader,
                               (file_handle for idx in range(5)),
@@ -236,8 +236,7 @@ class LogReadTester(unittest.TestCase):
                  'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e',
                  '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n',
                  'end\n', '\nend\n', '\n\nend\n\n']
-        lines_expected = ['line{0}'.format(idx) for idx in range(12)] \
-                         + ['', '', '']
+        lines_expected = ['line{0}'.format(idx) for idx in range(12)] + ['', '', '']
 
         # create writer
         LogFileWriter(self.temp_file, texts, n_writes=len(texts),
@@ -245,7 +244,6 @@ class LogReadTester(unittest.TestCase):
                       use_logging=use_logging).start()
 
         # read
-        lines_read = []
         with open(self.temp_file, 'rt') as file_handle:
             reader = LineReader(file_handle, keep_watching=True)
             self.helper_test_len(reader, 1)
@@ -259,6 +257,46 @@ class LogReadTester(unittest.TestCase):
                                                                 line_read))
                     self.assertEqual(line_expected, line_read)
 
+    @unittest.skipIf(not os.access('/var/log/messages', os.R_OK),
+                     "messages not accessible")
+    def test_parse_messages(self):
+        """Try parsing first 100 lines of messages if running on linux"""
+        with LogParser.create_for('/var/log/messages', SYS_LOG_PATTERN) as parser:
+            for line_count, (_, data, _) in enumerate(parser):
+                if line_count > 100:
+                    break
+                self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
+    @unittest.skipIf(not os.access('/var/log/syslog', os.R_OK),
+                     "syslog not accessible")
+    def test_parse_syslog(self):
+        """Try parsing first 100 lines of syslog if running on linux"""
+        with LogParser.create_for('/var/log/syslog', SYS_LOG_PATTERN) as parser:
+            for line_count, (_, data, _) in enumerate(parser):
+                if line_count > 100:
+                    break
+                self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
+    @unittest.skipIf(not os.access('/var/log/maillog', os.R_OK),
+                     "maillog not accessible")
+    def test_parse_maillog(self):
+        """Try parsing first 100 lines of maillog if running on linux"""
+        with LogParser.create_for('/var/log/maillog', SYS_LOG_PATTERN) as parser:
+            for line_count, (_, data, _) in enumerate(parser):
+                if line_count > 100:
+                    break
+                self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
+    @unittest.skipIf(not os.access('/var/log/squid/access.log', os.R_OK),
+                     "proxy log not accessible")
+    def test_parse_proxy_log(self):
+        """Try parsing first 100 lines of proxy log if running on linux"""
+        with LogParser.create_for('/var/log/squid/access.log', PROXY_LOG_PATTERN) as parser:
+            for line_count, (_, data, _) in enumerate(parser):
+                if line_count > 100:
+                    break
+                self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
 
 if __name__ == '__main__':
     unittest.main()