new data and forwards that
.. todo:: Want to also use lsof to find out whether file/pipe/socket was
- closed, so can return from read loop
+ closed, so can automatically return from read loop.
:py:class:`LineReader` takes output of :py:class:`IterativeReader` and returns
it line-wise as is normal for log files
return True
+def false_func(_):
+ """Replacement for :py:func:`check_is_used`. Returns `False` always."""
+ return False
+
+
def check_is_used(file_handle):
"""
Check whether file is being written to.
implement a different :py:meth:`prepare_result` method.
"""
- def __init__(self, sources, descs=None, return_when_done=False):
+ def __init__(self, sources, descs=None, keep_watching=False):
"""
Create a reader; do some basic checks on args.
a single source, then descs is also converted to a list
of length 1. If not given (i.e. None), will use
:py:func:`create_description` to guess descriptions
- :param bool return_when_done: ignore file_handle if no-one is writing
- to it any more. Return from iterator when
- all watched files are done (not
- implemented yet)
+ :param bool keep_watching: keep watching file that is not changing in
+ size. Need to manually tell whether file
+ is being written to or not since auto-detect
+ is not implemented yet.
:raises: OSError when testing fstat on source
"""
if not sources:
self.last_sizes = [0 for _ in self.file_objs]
self.ignore = [False for _ in self.file_objs]
- if return_when_done:
- self.is_used_func = check_is_used
- else:
+ if keep_watching:
self.is_used_func = true_func
+ else:
+ self.is_used_func = false_func
+ # use some day: self.is_used_func = check_is_used
for obj, file_handle, description in \
zip(self.file_objs, self.file_handles, self.descriptions):
class you called this function from.
"""
while True:
+ if all(self.ignore):
+ break
+
for idx, (obj, file_handle, description, last_size, do_ignore) in \
enumerate(zip(self.file_objs, self.file_handles,
self.descriptions, self.last_sizes,
# compare to old size
if new_size == last_size:
if not self.is_used_func(file_handle):
- warn('no one is writing to {0} / {1} -- '
- 'stop watching it!'
- .format(file_handle, description),
- category=LogReadWarning)
self.ignore[idx] = True
else:
if new_size < last_size: # happened at start of some tests
Requires a pattern for log lines, auto-detection is not implemented yet.
- Iteration returns re.match result or -- if matching failed -- the original
- raw line.
+ Iteration returns :py:class:`re.match` result or -- if matching failed --
+ the original raw line. Usage recommendation:
+
+ with open(log_file_name, 'rt') as file_handle:
+ for _, data, _ in log_read.LogParser(file_handle, pattern=my_pattern):
+ try:
+ line_parts = data.groupdict()
+ except AttributeError: # no groupdict --> could not parse
+ print(f'Failed to parse line {data}')
+ continue
+ ...do stuff with line_parts...
"""
def __init__(self, log_file, pattern=None):
time_diffs = []
with open(self.temp_file, 'rt') as file_handle:
- reader = IterativeReader(file_handle)
+ reader = IterativeReader(file_handle, keep_watching=True)
self.helper_test_len(reader, 1)
counter = -1 # we may have to adapt this manually
for desc, text, source_idx in reader:
# read
lines_read = []
with open(self.temp_file, 'rt') as file_handle:
- reader = LineReader(file_handle)
+ reader = LineReader(file_handle, keep_watching=True)
self.helper_test_len(reader, 1)
for line_expected, (_, line_read, _) in zip(lines_expected, reader):