From: Christian Herdtweck Date: Thu, 19 May 2022 11:16:50 +0000 (+0200) Subject: Add patterns for common linux log lines X-Git-Tag: v1.7.1~2^2 X-Git-Url: http://developer.intra2net.com/git/?p=pyi2ncommon;a=commitdiff_plain;h=0dd2fb19e974950f085eed45ea094ccc1bb66837 Add patterns for common linux log lines Needed to analyze large proxy logs in support. This helped. Syslog pattern simplifies testing of log_read. --- diff --git a/src/log_read.py b/src/log_read.py index 3b973bf..724f4e1 100644 --- a/src/log_read.py +++ b/src/log_read.py @@ -426,3 +426,20 @@ class LogParser(LineReader): """ with open(filename) as file_handle: yield cls(file_handle, *args, **kwargs) + + +################################################################################ +# PATTERNS FOR FREQUENT LOG FILES +################################################################################ + +# pattern of squid proxy logs. group names are best guesses +PROXY_LOG_PATTERN = \ + r'\s*(?P\d+\.\d+\.\d+\s+\d+:\d+:\d+|\d+\.\d+)\s+(?P\d+)\s+' \ + + r'(?P\d+\.\d+\.\d+\.\d+)\s+(?P[A-Z_]+)/(?P\d+)\s+' \ + + r'(?P\d+)\s+(?P\S+)\s+(?P\S+)\s+(?P\S+)\s+' \ + + r'(?P[A-Z_]+)/(?P\S+)\s+(?P\S+)\s+(?P.*)\s*' + +# pattern for linux system logs (usually "messages" or "syslog" also "maillog" +SYS_LOG_PATTERN = \ + r'\s*(?P\w{3} +\d{2} \d{2}:\d{2}:\d{2}) (?P\S+) ' \ + + r'(?P[^\[\]:]+)(?:\[(?P\d+)\])?: (?P.*)' diff --git a/test/test_log_read.py b/test/test_log_read.py index 1b2f5f5..68c9160 100644 --- a/test/test_log_read.py +++ b/test/test_log_read.py @@ -31,7 +31,7 @@ import time import logging from warnings import warn -from src.log_read import IterativeReader, LineReader, LogReadWarning +from src.log_read import * # get best clock perf_counter = time.perf_counter @@ -44,7 +44,7 @@ class LogFileWriter(Thread): def __init__(self, file_name, text_pattern, n_writes=None, pause_time=0.1, do_encode=None, use_logging=True): - """ creates thread, deamon is True + """ creates thread, daemon is True if n_writes is None, will write indefinitely; else writes text_pattern n_writes times, formatted with (counter, perf_counter) @@ -156,7 +156,7 @@ class LogReadTester(unittest.TestCase): reader = IterativeReader([file_handle, file_handle], ['desc1', 'desc2']) self.helper_test_len(reader, 2) - reader = IterativeReader((file_handle for idx in range(5))) + reader = IterativeReader((file_handle for _ in range(5))) self.helper_test_len(reader, 5) self.assertRaises(ValueError, IterativeReader, (file_handle for idx in range(5)), @@ -236,8 +236,7 @@ class LogReadTester(unittest.TestCase): 'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e', '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n', 'end\n', '\nend\n', '\n\nend\n\n'] - lines_expected = ['line{0}'.format(idx) for idx in range(12)] \ - + ['', '', ''] + lines_expected = ['line{0}'.format(idx) for idx in range(12)] + ['', '', ''] # create writer LogFileWriter(self.temp_file, texts, n_writes=len(texts), @@ -245,7 +244,6 @@ class LogReadTester(unittest.TestCase): use_logging=use_logging).start() # read - lines_read = [] with open(self.temp_file, 'rt') as file_handle: reader = LineReader(file_handle, keep_watching=True) self.helper_test_len(reader, 1) @@ -259,6 +257,46 @@ class LogReadTester(unittest.TestCase): line_read)) self.assertEqual(line_expected, line_read) + @unittest.skipIf(not os.access('/var/log/messages', os.R_OK), + "messages not accessible") + def test_parse_messages(self): + """Try parsing first 100 lines of messages if running on linux""" + with LogParser.create_for('/var/log/messages', SYS_LOG_PATTERN) as parser: + for line_count, (_, data, _) in enumerate(parser): + if line_count > 100: + break + self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}") + + @unittest.skipIf(not os.access('/var/log/syslog', os.R_OK), + "syslog not accessible") + def test_parse_syslog(self): + """Try parsing first 100 lines of syslog if running on linux""" + with LogParser.create_for('/var/log/syslog', SYS_LOG_PATTERN) as parser: + for line_count, (_, data, _) in enumerate(parser): + if line_count > 100: + break + self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}") + + @unittest.skipIf(not os.access('/var/log/maillog', os.R_OK), + "maillog not accessible") + def test_parse_maillog(self): + """Try parsing first 100 lines of maillog if running on linux""" + with LogParser.create_for('/var/log/maillog', SYS_LOG_PATTERN) as parser: + for line_count, (_, data, _) in enumerate(parser): + if line_count > 100: + break + self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}") + + @unittest.skipIf(not os.access('/var/log/squid/access.log', os.R_OK), + "proxy log not accessible") + def test_parse_proxy_log(self): + """Try parsing first 100 lines of proxy log if running on linux""" + with LogParser.create_for('/var/log/squid/access.log', PROXY_LOG_PATTERN) as parser: + for line_count, (_, data, _) in enumerate(parser): + if line_count > 100: + break + self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}") + if __name__ == '__main__': unittest.main()