import logging
from warnings import warn
-from src.log_read import IterativeReader, LineReader, LogReadWarning
+from src.log_read import *
# get best clock
perf_counter = time.perf_counter
def __init__(self, file_name, text_pattern, n_writes=None,
pause_time=0.1, do_encode=None, use_logging=True):
- """ creates thread, deamon is True
+ """ creates thread, daemon is True
if n_writes is None, will write indefinitely; else writes text_pattern
n_writes times, formatted with (counter, perf_counter)
reader = IterativeReader([file_handle, file_handle],
['desc1', 'desc2'])
self.helper_test_len(reader, 2)
- reader = IterativeReader((file_handle for idx in range(5)))
+ reader = IterativeReader((file_handle for _ in range(5)))
self.helper_test_len(reader, 5)
self.assertRaises(ValueError, IterativeReader,
(file_handle for idx in range(5)),
'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e',
'8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n',
'end\n', '\nend\n', '\n\nend\n\n']
- lines_expected = ['line{0}'.format(idx) for idx in range(12)] \
- + ['', '', '']
+ lines_expected = ['line{0}'.format(idx) for idx in range(12)] + ['', '', '']
# create writer
LogFileWriter(self.temp_file, texts, n_writes=len(texts),
use_logging=use_logging).start()
# read
- lines_read = []
with open(self.temp_file, 'rt') as file_handle:
reader = LineReader(file_handle, keep_watching=True)
self.helper_test_len(reader, 1)
line_read))
self.assertEqual(line_expected, line_read)
+ @unittest.skipIf(not os.access('/var/log/messages', os.R_OK),
+ "messages not accessible")
+ def test_parse_messages(self):
+ """Try parsing first 100 lines of messages if running on linux"""
+ with LogParser.create_for('/var/log/messages', SYS_LOG_PATTERN) as parser:
+ for line_count, (_, data, _) in enumerate(parser):
+ if line_count > 100:
+ break
+ self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
+ @unittest.skipIf(not os.access('/var/log/syslog', os.R_OK),
+ "syslog not accessible")
+ def test_parse_syslog(self):
+ """Try parsing first 100 lines of syslog if running on linux"""
+ with LogParser.create_for('/var/log/syslog', SYS_LOG_PATTERN) as parser:
+ for line_count, (_, data, _) in enumerate(parser):
+ if line_count > 100:
+ break
+ self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
+ @unittest.skipIf(not os.access('/var/log/maillog', os.R_OK),
+ "maillog not accessible")
+ def test_parse_maillog(self):
+ """Try parsing first 100 lines of maillog if running on linux"""
+ with LogParser.create_for('/var/log/maillog', SYS_LOG_PATTERN) as parser:
+ for line_count, (_, data, _) in enumerate(parser):
+ if line_count > 100:
+ break
+ self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
+ @unittest.skipIf(not os.access('/var/log/squid/access.log', os.R_OK),
+ "proxy log not accessible")
+ def test_parse_proxy_log(self):
+ """Try parsing first 100 lines of proxy log if running on linux"""
+ with LogParser.create_for('/var/log/squid/access.log', PROXY_LOG_PATTERN) as parser:
+ for line_count, (_, data, _) in enumerate(parser):
+ if line_count > 100:
+ break
+ self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
if __name__ == '__main__':
unittest.main()