From e7d4918020d75a2f2c6fffe8f1733ab70f7e0c2b Mon Sep 17 00:00:00 2001 From: Christian Herdtweck Date: Tue, 27 Oct 2015 13:43:45 +0100 Subject: [PATCH] created log_read.py with functional base class IterativeReader this replaces follow.py because I realized that the approach there (using select.poll) is not guaranteed to work. log_read uses stat.st_size as its c++ exemplar does --- follow.py | 3 + follow_unittest.py | 124 ++++++++++++++++++++++ log_read.py | 287 ++++++++++++++++++++++++++++++++++++++++++++++++++ log_read_unittest.py | 178 +++++++++++++++++++++++++++++++ 4 files changed, 592 insertions(+), 0 deletions(-) create mode 100644 follow_unittest.py create mode 100644 log_read.py create mode 100644 log_read_unittest.py diff --git a/follow.py b/follow.py index bd14a05..3b8e7ce 100644 --- a/follow.py +++ b/follow.py @@ -21,6 +21,9 @@ """ follow process output, log files and pipes using select and poll +DEPRECATED +(at least for files see log_read; may still be usefull for pipes/sockets) + Main class is Follower which does the polling and selecting, it is best used in a with-statement as follows:: diff --git a/follow_unittest.py b/follow_unittest.py new file mode 100644 index 0000000..9a0921a --- /dev/null +++ b/follow_unittest.py @@ -0,0 +1,124 @@ +""" Unittest for follow.py + +This is a little more involved since in order to test properly, need a second +thread that writes to file/socket/stdout + +..todo:: as in log_read_unittest: disable buffering in LogFileWriter + +DEPRECATED +(at least for files see log_read; may still be usefull for pipes/sockets) + +.. codeauthor:: Christian Herdtweck, christian.herdtweck@intra2net.com +""" + + +import unittest +from tempfile import mkstemp +import threading +from follow import * +import time +import os +import os.path + + +class LogFileWriter(threading.Thread): + """ thread that creates and writes to given file handle """ + + def __init__(self, file_name, text_pattern, n_writes=None, + pause_time=0.1, do_encode=None): + """ creates thread, deamon is True + + if n_writes is None, will write indefinitely; else writes text_pattern + n_writes times, formatted with (counter, time.perf_counter) + """ + super().__init__(daemon=True) + self.file_name = file_name + self.text_pattern = text_pattern + self.n_writes = n_writes + self.pause_time = pause_time + self.do_encode = do_encode + + def run(self): + counter = 0 + if self.do_encode: + mode = 'wb' + else: + mode = 'wt' + + with open(self.file_name, mode) as file_handle: + while True: + if self.n_writes is not None and counter >= self.n_writes: + break + + if self.do_encode: + file_handle.write(self.text_pattern + .format(counter, time.perf_counter()) + .encode(self.do_encode)) + else: + file_handle.write(self.text_pattern + .format(counter, time.perf_counter())) + print('wrote {0}'.format(counter)) + counter += 1 + time.sleep(self.pause_time) + + +class FollowTester(unittest.TestCase): + """ Unittest for follow.py """ + + def test_logfile(self): + """ create logfile, write to it, expect output """ + + text_pattern = '{0}:{1}\n' + n_texts = 10 + pause_time = 1 + encoding = 'ascii' + + LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts, + pause_time=pause_time, do_encode=encoding).start() + print('testing with log file {0}'.format(self.temp_file)) + time_diffs = [] + + with open(self.temp_file) as read_handle: + follower = Follower(read_handle) + for counter, (source, desc, text, flags) in \ + enumerate(follower): + receive_time = time.perf_counter() + print('received text: "{0}"'.format(text)) + index = text.index(':') + counter = int(text[:index].strip()) + write_time = float(text[index+1:].strip()) + time_diffs.append(receive_time - write_time) + if counter == n_texts: + break + + + def setUp(self): + """ called before each test """ + print('setup test') + temp_handle, temp_name = mkstemp() + os.close(temp_handle) + self.temp_file = temp_name + print('created temp file ' + self.temp_file) + + def tearDown(self): + """ called after each test """ + print('tear down test') + if os.path.isfile(self.temp_file): + print('delete temp file' + self.temp_file) + os.unlink(self.temp_file) + + @classmethod + def setUpClass(clz): + """ called once before the first test """ + print('setup test class') + clz.temp_file = None + + @classmethod + def tearDownClass(clz): + """ called once after the last test """ + print('tear down test class') + + + +if __name__ == '__main__': + unittest.main() diff --git a/log_read.py b/log_read.py new file mode 100644 index 0000000..5cdd29f --- /dev/null +++ b/log_read.py @@ -0,0 +1,287 @@ +""" Iterative reading of log files + +Basic Functionality (class :py:class:`IterativeReader`:) +Runs stat in a loop to find out whether file size has changed. Then reads the +new data and forwards that + +..todo:: Want to also use lsof to find out whether file/pipe/socket was closed, + so can return from read loop + +:py:class:`LineReader` takes output of :py:class:`IterativeReader` and returns +it line-wise as is normal for log files + +:py:class:`LogParser` takes those lines and tries to parse them into fields +like date, time, module name, urgency and message. + +..todo:: auto-detect log line layout + +.. codeauthor:: Christian Herdtweck, christian.herdtweck@intra2net.com +""" + +import os +from warnings import warn +import os.path +from itertools import zip_longest + +from type_helpers import is_str_or_byte, is_file_obj + + +class LogReadWarning(UserWarning): + """ warnings issued by classes in this module """ + pass + + +def true_func(unused_argument_but_that_is_ok): + """ does nothing, always returns True """ + return True + + +def check_is_used(some_file_or_handle): + """ check whether file is being written to + + to be implemented, e.g. using lsof + """ + raise NotImplementedError() + + +_create_description_unknown_counter = 0 + +def create_description(file_obj, file_desc): + """ create some description for given file-like object / file descriptor + + :param file_obj: file-like object + :param int file_desc: os-level file descriptor + :returns: string + """ + + global _create_description_unknown_counter + + try: + desc = file_obj.name + if desc: + return desc + except AttributeError: + pass + + if file_desc is not None: + return 'file{0}'.format(file_desc) + else: + _create_description_unknown_counter += 1 + return 'unknown{0}'.format(_create_description_unknown_counter) + + +#: error message for IterativeReader constructor +_STR_ERR = 'not accepting file name "{0}" since cannot guarantee closing ' \ + 'files --> use with open(file_name)!' + + +class IterativeReader: + """ reads from a given file + + Uses os.stat(file_obj.fileno()).st_size as measure whether file has changed + or not; Always reads as much data as possible + + Catches most common exceptions in iteration (not constructor) + + Does not are about closing files, so does not accept file names + + This is the base for class :py:class:`LineReader` that just has to + implement a different :py:meth:`prepare_result` method + """ + + def __init__(self, sources, descs=None, return_when_done=False): + """ creates a reader; does some basic checks on args + + :param sources: iterable over sources. Sources can be opened file + objects or read-opened os-level file descriptors. + Calling code has to ensure they are closed properly, so + best use this within a "with open(file_name) as + file_handle:"-context. If sources is a single file + obj/descriptor, both source and desc will be converted + to lists of length 1 + :param descs: can be anything of same length as sources. If sources is + a single source, then descs is also converted to a list + of length 1. If not given (i.e. None), will use + :py:func:`create_description` to guess descriptions + :param bool return_when_done: ignore file_handle if no-one is writing + to it any more. Return from iterator when + all watched files are done (not + implemented yet) + :raises: OSError when testing fstat on source + """ + if not sources: + raise ValueError('need at least some source!') + elif is_str_or_byte(sources): + raise ValueError(_STR_ERR.format(sources)) + elif is_file_obj(sources) or isinstance(sources, int): + source_input = [sources, ] + desc_input = [descs, ] + else: + source_input = sources # assume some iterable + desc_input = descs + + # now divide sources into os-level file descriptors for os.fstat, + # and file objects for read() + self.file_objs = [] + self.file_descs = [] # file descriptOR, not descriptION + for source in source_input: + if is_file_obj(source): + self.file_objs.append(source) + self.file_descs.append(source.fileno()) + elif isinstance(source, int): + self.file_objs.append(os.fdopen(source)) + self.file_descs.append(source) + elif is_str_or_byte(source): + raise ValueError(_STR_ERR.format(source)) + else: + raise ValueError('source {0} is neither file obj nor file ' + 'descriptor!') + + # try to fstat the new file descriptor + os.fstat(self.file_descs[-1]) + + # guess descriptions if not given + if not desc_input: + self.descriptions = [create_description(obj, file_desc) + for obj, file_desc + in zip(self.file_objs, self.file_descs)] + else: + try: + if len(desc_input) != len(self.file_objs): + raise ValueError('need same number of sources and ' + 'descriptions!') + except TypeError: + pass # desc_input is generator or so + + self.descriptions = [] + for obj, file_desc, description in \ + zip_longest(self.file_objs, self.file_descs, desc_input): + if obj is None: + raise ValueError('more descriptions than sources!') + elif description is None: + self.descriptions.append(create_description(obj, + file_desc)) + else: + self.descriptions.append(description) + + self.last_sizes = [0 for _ in self.file_objs] + self.ignore = [False for _ in self.file_objs] + + if return_when_done: + self.is_used = check_is_used + else: + self.is_used = true_func + + for obj, file_desc, description in zip(self.file_objs, self.file_descs, + self.descriptions): + print('file descriptor {0}, file obj {1}, description "{2}"' + .format(file_desc, obj, description)) + + def n_sources(self): + return len(self.file_objs) + + def n_active_sources(self): + return len(self.ignore) - sum(self.ignore) + + def __iter__(self): + while True: + for idx, (obj, file_desc, description, last_size, do_ignore) in \ + enumerate(zip(self.file_objs, self.file_descs, + self.descriptions, self.last_sizes, + self.ignore)): + + if do_ignore: + continue + + # get new file size + new_size = os.fstat(file_desc).st_size + + # compare to old size + if new_size == last_size: + if not self.is_used(file_desc): + warn('no one is writing to {0} / {1} -- ' + 'stop watching it!' + .format(file_desc, description), + category=LogReadWarning) + self.do_ignore[idx] = True + elif new_size < last_size: + warn('{0} / {1} has become smaller ({2} --> {3})!' + .format(obj, description, last_size, new_size), + category=LogReadWarning) + else: # (new_size > last_size) + try: + new_data = obj.read() + except OSError as ose: # includes IOErrors + warn('io error reading from {0} / {1}: {2})' + .format(obj, description, ose), + category=LogReadWarning) + if len(new_data) != new_size - last_size: + warn('read unexpected amount from {0} / {1}: ' + '{2} bytes instead of {3} bytes!' + .format(obj, description, len(new_data), + new_size-last_size), + category=LogReadWarning) + + # post-processing + to_yield = self.prepare_result(description, new_data, idx) + for result in to_yield: + yield result + + # prepare next iteration + self.last_sizes[idx] = new_size + + def prepare_result(self, description, data, idx): + """ from raw new data create some yield-able results + + to be intended for overwriting in sub-classes + + this function is called from __iter__ for each new data that becomes + available. It has to return some iterable whose entries are yielded + from iteration over objects of this class. + + This base implementation just returns its input in a list, so new data + is yielded from __iter__ as-is + """ + return [(description, data), ] + + +LINE_SPLITTERS = '\n\r' + +class LineReader(IterativeReader): + """ an IterativeReader that returns new data line-wise + + this means buffering partial line data + """ + + def __init__(self, *args, **kwargs): + """ forwards all args and kwargs to :py:class:`IterativeReader` """ + super().__init__(self, *args, **kwargs) + self.line_buffers = ['' for _ in range(self.n_sources)] + + def prepare_result(self, decription, new_data, idx): + """ take raw new data and split it into lines + + if line is not complete, then buffer it + + returns lines without their newline characters + """ + + all_data = self.line_buffers[idx] + new_data + result = [] + should_be_no_new_lines = False + for line in all_data.splitlines(keepends=True): + if line[-1] in LINE_SPLITTERS: + result.append((description, line.rstrip(LINE_SPLITTERS))) + elif should_be_no_new_lines: + raise ValueError('line splitters are not compatible with' + 'str.splitlines!') + else: + self.line_buffers[idx] = line + should_be_no_new_lines = True # (this should be the last) + + return result + + +class LogParser: + """ takes lines from LineReader and parses their contents """ + pass diff --git a/log_read_unittest.py b/log_read_unittest.py new file mode 100644 index 0000000..4e092ba --- /dev/null +++ b/log_read_unittest.py @@ -0,0 +1,178 @@ +""" Unittests for log_read + +Creates own thread to write data to a log file + +.. codeauthor:: Christian Herdtweck, christian.herdtweck@intra2net.com +""" + +import unittest +from threading import Thread +from tempfile import mkstemp +import os +import time +import logging + +from log_read import IterativeReader + + +class LogFileWriter(Thread): + """ thread that creates and writes to given file """ + + def __init__(self, file_name, text_pattern, n_writes=None, + pause_time=0.1, do_encode=None, use_logging=True): + """ creates thread, deamon is True + + if n_writes is None, will write indefinitely; else writes text_pattern + n_writes times, formatted with (counter, time.perf_counter) + If do_encode is True, will encode text to bytes and open file handle + in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text. + If use_logging is False, will open file and run file_handle.write; + If use_logging is True, will create logger that logs to file and use + logging.info (no file_handle.write) + """ + super().__init__(daemon=True) + self.file_name = file_name + self.text_pattern = text_pattern + self.n_writes = n_writes + self.pause_time = pause_time + self.do_encode = do_encode + self.use_logging = use_logging + + def run(self): + counter = 0 + if self.do_encode: + mode = 'wb' + buffering = 0 # no buffering -- only allowed for byte mode + else: + mode = 'wt' + buffering = 1 # line buffering -- only allowed for text mode + + if self.use_logging: + logging.basicConfig(filename=self.file_name, level=logging.INFO, + format='%(msg)s') + while True: + if self.n_writes is not None and counter >= self.n_writes: + break + self.write_and_sleep(logging.info, counter) + counter += 1 + else: + with open(self.file_name, mode=mode, buffering=buffering) \ + as file_handle: + while True: + if self.n_writes is not None and counter >= self.n_writes: + break + self.write_and_sleep(file_handle.write, counter) + counter += 1 + + def write_and_sleep(self, write_func, counter): + text = self.text_pattern.format(counter, time.perf_counter()) + if self.do_encode: + text = text.encode(self.do_encode) + write_func(text) + #print('wrote {0}'.format(counter)) + time.sleep(self.pause_time) + + +class LogReadTester(unittest.TestCase): + """ class with all the tests """ + + def setUp(self): + """ called before each test """ + print('setup test') + temp_handle, temp_name = mkstemp() + os.close(temp_handle) + self.temp_file = temp_name + print('created temp file ' + self.temp_file) + + def tearDown(self): + """ called after each test """ + print('tear down test') + if os.path.isfile(self.temp_file): + print('delete temp file' + self.temp_file) + os.unlink(self.temp_file) + + @classmethod + def setUpClass(clz): + """ called once before the first test """ + print('setup test class') + clz.temp_file = None + + @classmethod + def tearDownClass(clz): + """ called once after the last test """ + print('tear down test class') + + def test_args(self): + self.assertRaises(TypeError, IterativeReader) # no args + self.assertRaises(ValueError, IterativeReader, [], 'test') + self.assertRaises(ValueError, IterativeReader, [], ['test', ]) + self.assertRaises(ValueError, IterativeReader, self.temp_file) + self.assertRaises(ValueError, IterativeReader, [self.temp_file, ]) + with open(self.temp_file, 'rt') as file_handle: + reader = IterativeReader(file_handle) + self.assertEqual(reader.n_sources(), 1) + reader = IterativeReader([file_handle, ]) + self.assertEqual(reader.n_sources(), 1) + reader = IterativeReader(file_handle, 'desc') + self.assertEqual(reader.n_sources(), 1) + reader = IterativeReader([file_handle, ], ['desc', ]) + self.assertEqual(reader.n_sources(), 1) + reader = IterativeReader(file_handle, ['desc', ]) + self.assertEqual(reader.n_sources(), 1) + self.assertRaises(ValueError, IterativeReader, + [file_handle, ], 'desc', ) + reader = IterativeReader([file_handle, file_handle], + ['desc1', 'desc2']) + self.assertEqual(reader.n_sources(), 2) + reader = IterativeReader((file_handle for idx in range(5))) + self.assertEqual(reader.n_sources(), 5) + self.assertRaises(ValueError, IterativeReader, + (file_handle for idx in range(5)), + tuple('desc' for idx in range(4))) + self.assertRaises(ValueError, IterativeReader, + (file_handle for idx in range(5)), + ('desc' for idx in range(6))) + + def test_simple_read(self): + """ write fixed number of lines, see how fast they are retrieved """ + + # need newline only when writing text (because of write buffering) + param_combinations = ('{0}:{1}\n', None, False), \ + ('{0}:{1}\n', 'ascii', False), \ + ('{0}:{1} ' , 'ascii', False) + #('{0}:{1}\n', None , True), \ logging seems + #('{0}:{1}\n', 'ascii', True), \ to buffer writes + #('{0}:{1} ' , None , True), \ to files + #('{0}:{1} ' , 'ascii', True) + + n_texts = 10 + pause_time = 0.01 # 100 tps (texts per second) + + for text_pattern, encoding, use_logging in param_combinations: + LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts, + pause_time=pause_time, do_encode=encoding, + use_logging=use_logging).start() + print('testing with log file {0}'.format(self.temp_file)) + print('encoding is {0}, use logging = {1}'.format(encoding, + use_logging)) + time_diffs = [] + + with open(self.temp_file, 'rt') as file_handle: + reader = IterativeReader(file_handle) + for counter, (desc, text) in enumerate(reader): + receive_time = time.perf_counter() + text = text.strip() + print('{1}: received text "{0}"'.format(text, counter)) + index = text.index(':') + count_text = int(text[:index].strip()) + self.assertEqual(count_text, counter) + write_time = float(text[index+1:].strip()) + time_diffs.append((receive_time - write_time)*1.e6) + if counter == n_texts-1: + print('stop since have {0} reads'.format(counter)) + break + print('time diffs in us: {0}'.format(time_diffs)) + self.assertTrue(max(time_diffs) < 1000., 'read took more than 1ms!') + +if __name__ == '__main__': + unittest.main() -- 1.7.1