new data and forwards that
.. todo:: Want to also use lsof to find out whether file/pipe/socket was
- closed, so can return from read loop
+ closed, so can automatically return from read loop.
:py:class:`LineReader` takes output of :py:class:`IterativeReader` and returns
it line-wise as is normal for log files
------------------------------------------------------
"""
-
import os
import os.path
import re
from warnings import warn
import logging
+from contextlib import contextmanager
from .iter_helpers import zip_longest
from .type_helpers import is_str_or_byte, is_file_obj
return True
+def false_func(_):
+ """Replacement for :py:func:`check_is_used`. Returns `False` always."""
+ return False
+
+
def check_is_used(file_handle):
"""
Check whether file is being written to.
implement a different :py:meth:`prepare_result` method.
"""
- def __init__(self, sources, descs=None, return_when_done=False):
+ def __init__(self, sources, descs=None, keep_watching=False):
"""
Create a reader; do some basic checks on args.
a single source, then descs is also converted to a list
of length 1. If not given (i.e. None), will use
:py:func:`create_description` to guess descriptions
- :param bool return_when_done: ignore file_handle if no-one is writing
- to it any more. Return from iterator when
- all watched files are done (not
- implemented yet)
+ :param bool keep_watching: keep watching file that is not changing in
+ size. Need to manually tell whether file
+ is being written to or not since auto-detect
+ is not implemented yet.
:raises: OSError when testing fstat on source
"""
if not sources:
self.last_sizes = [0 for _ in self.file_objs]
self.ignore = [False for _ in self.file_objs]
- if return_when_done:
- self.is_used_func = check_is_used
- else:
+ if keep_watching:
self.is_used_func = true_func
+ else:
+ self.is_used_func = false_func
+ # use some day: self.is_used_func = check_is_used
for obj, file_handle, description in \
zip(self.file_objs, self.file_handles, self.descriptions):
"""
Continue reading from sources, yield results.
- yields result of :py:meth:`prepare_result`, which depends on what sub
- class you called this function from.
+ yields result of :py:meth:`prepare_result`, which depends on what
+ subclass you called this function from.
"""
while True:
+ if all(self.ignore):
+ break
+
for idx, (obj, file_handle, description, last_size, do_ignore) in \
enumerate(zip(self.file_objs, self.file_handles,
self.descriptions, self.last_sizes,
# compare to old size
if new_size == last_size:
if not self.is_used_func(file_handle):
- warn('no one is writing to {0} / {1} -- '
- 'stop watching it!'
- .format(file_handle, description),
- category=LogReadWarning)
self.ignore[idx] = True
else:
if new_size < last_size: # happened at start of some tests
new_data = str(ude)
# post-processing
- to_yield = self.prepare_result(description, new_data, idx)
- for result in to_yield:
+ for result in self.prepare_result(description, new_data, idx):
yield result
# prepare next iteration
Intended for overwriting in subclasses.
This function is called from __iter__ for each new data that becomes
- available. It has to return some iterable whose entries are yielded
- from iteration over objects of this class.
+ available. It has to provide results which are forwarded to caller.
- This base implementation just returns its input in a list, so new data
- is yielded from __iter__ as-is. Subclass implementations can also yield
- tuples.
+ This base implementation just yields its input, so new data is yielded
+ from `__iter__` as-is.
:param str description: Description of source of lines, one of
:py:data:`self.descriptions`
:param str data: Text data read from source
:param idx: Index of data source
- :returns: [(description, data, idx], same as input
- :rtype [(str, str, int)]
+ :returns: nothing but yields [(description, data, idx], same as input
"""
- return [(description, data, idx), ]
+ yield description, data, idx
#: characters to `rstrip()` from end of complete lines
"""
all_data = self.line_buffers[idx] + new_data
self.line_buffers[idx] = ''
- result = []
should_be_no_new_lines = False
for line in all_data.splitlines(True):
if line[-1] in LINE_SPLITTERS:
- result.append((description, line.rstrip(LINE_SPLITTERS), idx))
+ yield description, line.rstrip(LINE_SPLITTERS), idx
elif should_be_no_new_lines:
# self-check
raise ValueError('Programming error: something went wrong with '
self.line_buffers[idx] = line
should_be_no_new_lines = True # (this should be the last)
- return result
-
class LogParser(LineReader):
"""
Requires a pattern for log lines, auto-detection is not implemented yet.
- Iteration returns re.match result or -- if matching failed -- the original
- raw line.
+ Iteration returns :py:class:`re.match` result or -- if matching failed --
+ None. The latest unparsed line is available as `self.last_unparsed_line`.
+ Usage recommendation:
+
+ with open(log_file_name, 'rt') as file_handle:
+ parser = log_read.LogParser(file_handle, pattern=my_pattern):
+ for _, data, _ in parser:
+ if data is None:
+ print(f'Failed to parse line {parser.last_unparsed_line}')
+ continue
+ line_parts = data.groupdict()
+ ...do stuff with line_parts...
"""
def __init__(self, log_file, pattern=None):
super(LogParser, self).__init__(log_file)
self.pattern = pattern
+ self.last_unparsed_line = ''
def prepare_result(self, *args):
"""
# let super class split data into lines
for description, raw_line, idx in \
super(LogParser, self).prepare_result(*args):
- result = re.match(self.pattern, raw_line)
- if result:
- return description, result, idx
+ matches = re.match(self.pattern, raw_line)
+ if matches:
+ yield description, matches, idx
else:
- return description, raw_line, idx
+ self.last_unparsed_line = raw_line
+ yield description, None, idx
+
+ @classmethod
+ @contextmanager
+ def create_for(cls, filename, *args, **kwargs):
+ """
+ Open single file, yield LogParser. Ensures file is closed afterwards.
+
+ This allows opening file and creation LogParser for it to one line:
+
+ with LogParser.create_for('/var/log/messages', SYS_LOG_PATTERN) as parser:
+ for _, matches, _ in parser:
+ try:
+ print(matches.groupdict())
+ except Exception:
+ print(f'UNPARSED: {parser.last_unparsed_line}')
+
+ :param str filename: something that :py:meth:`open` accepts
+ :param args: Forwarded to constructor
+ :param kwargs: Forwarded to constructor
+ """
+ with open(filename) as file_handle:
+ yield cls(file_handle, *args, **kwargs)
+
+
+################################################################################
+# PATTERNS FOR FREQUENT LOG FILES
+################################################################################
+
+# pattern of squid proxy logs. group names are best guesses
+PROXY_LOG_PATTERN = \
+ r'\s*(?P<timestamp>\d+\.\d+\.\d+\s+\d+:\d+:\d+|\d+\.\d+)\s+(?P<size1>\d+)\s+' \
+ + r'(?P<ip>\d+\.\d+\.\d+\.\d+)\s+(?P<status_text>[A-Z_]+)/(?P<status_code>\d+)\s+' \
+ + r'(?P<size2>\d+)\s+(?P<command>\S+)\s+(?P<url>\S+)\s+(?P<user>\S+)\s+' \
+ + r'(?P<action>[A-Z_]+)/(?P<origin>\S+)\s+(?P<mimetype>\S+)\s+(?P<unknown>.*)\s*'
+
+# pattern for linux system logs (usually "messages" or "syslog" also "maillog"
+SYS_LOG_PATTERN = \
+ r'\s*(?P<timestamp>\w{3} +\d{2} \d{2}:\d{2}:\d{2}) (?P<hostname>\S+) ' \
+ + r'(?P<procname>[^\[\]:]+)(?:\[(?P<pid>\d+)\])?: (?P<message>.*)'
import logging
from warnings import warn
-from src.log_read import IterativeReader, LineReader, LogReadWarning
+from src.log_read import *
# get best clock
perf_counter = time.perf_counter
def __init__(self, file_name, text_pattern, n_writes=None,
pause_time=0.1, do_encode=None, use_logging=True):
- """ creates thread, deamon is True
+ """ creates thread, daemon is True
if n_writes is None, will write indefinitely; else writes text_pattern
n_writes times, formatted with (counter, perf_counter)
reader = IterativeReader([file_handle, file_handle],
['desc1', 'desc2'])
self.helper_test_len(reader, 2)
- reader = IterativeReader((file_handle for idx in range(5)))
+ reader = IterativeReader((file_handle for _ in range(5)))
self.helper_test_len(reader, 5)
self.assertRaises(ValueError, IterativeReader,
(file_handle for idx in range(5)),
time_diffs = []
with open(self.temp_file, 'rt') as file_handle:
- reader = IterativeReader(file_handle)
+ reader = IterativeReader(file_handle, keep_watching=True)
self.helper_test_len(reader, 1)
counter = -1 # we may have to adapt this manually
for desc, text, source_idx in reader:
'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e',
'8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n',
'end\n', '\nend\n', '\n\nend\n\n']
- lines_expected = ['line{0}'.format(idx) for idx in range(12)] \
- + ['', '', '']
+ lines_expected = ['line{0}'.format(idx) for idx in range(12)] + ['', '', '']
# create writer
LogFileWriter(self.temp_file, texts, n_writes=len(texts),
use_logging=use_logging).start()
# read
- lines_read = []
with open(self.temp_file, 'rt') as file_handle:
- reader = LineReader(file_handle)
+ reader = LineReader(file_handle, keep_watching=True)
self.helper_test_len(reader, 1)
for line_expected, (_, line_read, _) in zip(lines_expected, reader):
line_read))
self.assertEqual(line_expected, line_read)
+ @unittest.skipIf(not os.access('/var/log/messages', os.R_OK),
+ "messages not accessible")
+ def test_parse_messages(self):
+ """Try parsing first 100 lines of messages if running on linux"""
+ with LogParser.create_for('/var/log/messages', SYS_LOG_PATTERN) as parser:
+ for line_count, (_, data, _) in enumerate(parser):
+ if line_count > 100:
+ break
+ self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
+ @unittest.skipIf(not os.access('/var/log/syslog', os.R_OK),
+ "syslog not accessible")
+ def test_parse_syslog(self):
+ """Try parsing first 100 lines of syslog if running on linux"""
+ with LogParser.create_for('/var/log/syslog', SYS_LOG_PATTERN) as parser:
+ for line_count, (_, data, _) in enumerate(parser):
+ if line_count > 100:
+ break
+ self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
+ @unittest.skipIf(not os.access('/var/log/maillog', os.R_OK),
+ "maillog not accessible")
+ def test_parse_maillog(self):
+ """Try parsing first 100 lines of maillog if running on linux"""
+ with LogParser.create_for('/var/log/maillog', SYS_LOG_PATTERN) as parser:
+ for line_count, (_, data, _) in enumerate(parser):
+ if line_count > 100:
+ break
+ self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
+ @unittest.skipIf(not os.access('/var/log/squid/access.log', os.R_OK),
+ "proxy log not accessible")
+ def test_parse_proxy_log(self):
+ """Try parsing first 100 lines of proxy log if running on linux"""
+ with LogParser.create_for('/var/log/squid/access.log', PROXY_LOG_PATTERN) as parser:
+ for line_count, (_, data, _) in enumerate(parser):
+ if line_count > 100:
+ break
+ self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
+
if __name__ == '__main__':
unittest.main()