created log_read.py with functional base class IterativeReader
authorChristian Herdtweck <christian.herdtweck@intra2net.com>
Tue, 27 Oct 2015 12:43:45 +0000 (13:43 +0100)
committerChristian Herdtweck <christian.herdtweck@intra2net.com>
Tue, 27 Oct 2015 12:43:45 +0000 (13:43 +0100)
this replaces follow.py because I realized that the approach there (using
select.poll) is not guaranteed to work. log_read uses stat.st_size as its
c++ exemplar does

follow.py
follow_unittest.py [new file with mode: 0644]
log_read.py [new file with mode: 0644]
log_read_unittest.py [new file with mode: 0644]

index bd14a05..3b8e7ce 100644 (file)
--- a/follow.py
+++ b/follow.py
@@ -21,6 +21,9 @@
 """
 follow process output, log files and pipes using select and poll
 
+DEPRECATED
+(at least for files see log_read; may still be usefull for pipes/sockets)
+
 Main class is Follower which does the polling and selecting, it is best used
 in a with-statement as follows::
 
diff --git a/follow_unittest.py b/follow_unittest.py
new file mode 100644 (file)
index 0000000..9a0921a
--- /dev/null
@@ -0,0 +1,124 @@
+""" Unittest for follow.py
+
+This is a little more involved since in order to test properly, need a second
+thread that writes to file/socket/stdout
+
+..todo:: as in log_read_unittest: disable buffering in LogFileWriter
+
+DEPRECATED
+(at least for files see log_read; may still be usefull for pipes/sockets)
+
+.. codeauthor:: Christian Herdtweck, christian.herdtweck@intra2net.com
+"""
+
+
+import unittest
+from tempfile import mkstemp
+import threading
+from follow import *
+import time
+import os
+import os.path
+
+
+class LogFileWriter(threading.Thread):
+    """ thread that creates and writes to given file handle """
+
+    def __init__(self, file_name, text_pattern, n_writes=None,
+                 pause_time=0.1, do_encode=None):
+        """ creates thread, deamon is True
+        
+        if n_writes is None, will write indefinitely; else writes text_pattern
+        n_writes times, formatted with (counter, time.perf_counter)
+        """
+        super().__init__(daemon=True)
+        self.file_name = file_name
+        self.text_pattern = text_pattern
+        self.n_writes = n_writes
+        self.pause_time = pause_time
+        self.do_encode = do_encode
+
+    def run(self):
+        counter = 0
+        if self.do_encode:
+            mode = 'wb'
+        else:
+            mode = 'wt'
+
+        with open(self.file_name, mode) as file_handle:
+            while True:
+                if self.n_writes is not None and counter >= self.n_writes:
+                    break
+
+                if self.do_encode:
+                    file_handle.write(self.text_pattern
+                                           .format(counter, time.perf_counter())
+                                           .encode(self.do_encode))
+                else:
+                    file_handle.write(self.text_pattern
+                                           .format(counter, time.perf_counter()))
+                print('wrote {0}'.format(counter))
+                counter += 1
+                time.sleep(self.pause_time)
+
+
+class FollowTester(unittest.TestCase):
+    """ Unittest for follow.py """
+
+    def test_logfile(self):
+        """ create logfile, write to it, expect output """
+
+        text_pattern = '{0}:{1}\n'
+        n_texts = 10
+        pause_time = 1
+        encoding = 'ascii'
+
+        LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts,
+                      pause_time=pause_time, do_encode=encoding).start()
+        print('testing with log file {0}'.format(self.temp_file))
+        time_diffs = []
+
+        with open(self.temp_file) as read_handle:
+            follower = Follower(read_handle)
+            for counter, (source, desc, text, flags) in \
+                    enumerate(follower):
+                receive_time = time.perf_counter()
+                print('received text: "{0}"'.format(text))
+                index = text.index(':')
+                counter = int(text[:index].strip())
+                write_time = float(text[index+1:].strip())
+                time_diffs.append(receive_time - write_time)
+                if counter == n_texts:
+                    break
+
+
+    def setUp(self):
+        """ called before each test """
+        print('setup test')
+        temp_handle, temp_name = mkstemp()
+        os.close(temp_handle)
+        self.temp_file = temp_name
+        print('created temp file ' + self.temp_file)
+
+    def tearDown(self):
+        """ called after each test """
+        print('tear down test')
+        if os.path.isfile(self.temp_file):
+            print('delete temp file' + self.temp_file)
+            os.unlink(self.temp_file)
+
+    @classmethod
+    def setUpClass(clz):
+        """ called once before the first test """
+        print('setup test class')
+        clz.temp_file = None
+
+    @classmethod
+    def tearDownClass(clz):
+        """ called once after the last test """
+        print('tear down test class')
+
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/log_read.py b/log_read.py
new file mode 100644 (file)
index 0000000..5cdd29f
--- /dev/null
@@ -0,0 +1,287 @@
+""" Iterative reading of log files
+
+Basic Functionality (class :py:class:`IterativeReader`:)
+Runs stat in a loop to find out whether file size has changed. Then reads the
+new data and forwards that
+
+..todo:: Want to also use lsof to find out whether file/pipe/socket was closed,
+         so can return from read loop
+
+:py:class:`LineReader` takes output of :py:class:`IterativeReader` and returns
+it line-wise as is normal for log files
+
+:py:class:`LogParser` takes those lines and tries to parse them into fields
+like date, time, module name, urgency and message.
+
+..todo:: auto-detect log line layout
+
+.. codeauthor:: Christian Herdtweck, christian.herdtweck@intra2net.com
+"""
+
+import os
+from warnings import warn
+import os.path
+from itertools import zip_longest
+
+from type_helpers import is_str_or_byte, is_file_obj
+
+
+class LogReadWarning(UserWarning):
+    """ warnings issued by classes in this module """
+    pass
+
+
+def true_func(unused_argument_but_that_is_ok):
+    """ does nothing, always returns True """
+    return True
+
+
+def check_is_used(some_file_or_handle):
+    """ check whether file is being written to
+
+    to be implemented, e.g. using lsof
+    """
+    raise NotImplementedError()
+
+
+_create_description_unknown_counter = 0
+
+def create_description(file_obj, file_desc):
+    """ create some description for given file-like object / file descriptor
+
+    :param file_obj: file-like object
+    :param int file_desc: os-level file descriptor
+    :returns: string
+    """
+
+    global _create_description_unknown_counter
+
+    try:
+        desc = file_obj.name
+        if desc:
+            return desc
+    except AttributeError:
+        pass
+
+    if file_desc is not None:
+        return 'file{0}'.format(file_desc)
+    else:
+        _create_description_unknown_counter += 1
+        return 'unknown{0}'.format(_create_description_unknown_counter)
+
+
+#: error message for IterativeReader constructor
+_STR_ERR = 'not accepting file name "{0}" since cannot guarantee closing ' \
+           'files --> use with open(file_name)!'
+
+
+class IterativeReader:
+    """ reads from a given file
+
+    Uses os.stat(file_obj.fileno()).st_size as measure whether file has changed
+    or not; Always reads as much data as possible
+
+    Catches most common exceptions in iteration (not constructor)
+
+    Does not are about closing files, so does not accept file names
+
+    This is the base for class :py:class:`LineReader` that just has to
+    implement a different :py:meth:`prepare_result` method
+    """
+
+    def __init__(self, sources, descs=None, return_when_done=False):
+        """ creates a reader; does some basic checks on args
+
+        :param sources: iterable over sources. Sources can be opened file
+                        objects or read-opened os-level file descriptors.
+                        Calling code has to ensure they are closed properly, so
+                        best use this within a "with open(file_name) as
+                        file_handle:"-context. If sources is a single file
+                        obj/descriptor, both source and desc will be converted
+                        to lists of length 1
+        :param descs: can be anything of same length as sources. If sources is
+                      a single source, then descs is also converted to a list
+                      of length 1. If not given (i.e. None), will use
+                      :py:func:`create_description` to guess descriptions
+        :param bool return_when_done: ignore file_handle if no-one is writing
+                                      to it any more. Return from iterator when
+                                      all watched files are done (not
+                                      implemented yet)
+        :raises: OSError when testing fstat on source
+        """
+        if not sources:
+            raise ValueError('need at least some source!')
+        elif is_str_or_byte(sources):
+            raise ValueError(_STR_ERR.format(sources))
+        elif is_file_obj(sources) or isinstance(sources, int):
+            source_input = [sources, ]
+            desc_input = [descs, ]
+        else:
+            source_input = sources  # assume some iterable
+            desc_input = descs
+
+        # now divide sources into os-level file descriptors for os.fstat,
+        # and file objects for read()
+        self.file_objs = []
+        self.file_descs = []          # file descriptOR, not descriptION
+        for source in source_input:
+            if is_file_obj(source):
+                self.file_objs.append(source)
+                self.file_descs.append(source.fileno())
+            elif isinstance(source, int):
+                self.file_objs.append(os.fdopen(source))
+                self.file_descs.append(source)
+            elif is_str_or_byte(source):
+                raise ValueError(_STR_ERR.format(source))
+            else:
+                raise ValueError('source {0} is neither file obj nor file '
+                                 'descriptor!')
+
+            # try to fstat the new file descriptor
+            os.fstat(self.file_descs[-1])
+
+        # guess descriptions if not given
+        if not desc_input:
+            self.descriptions = [create_description(obj, file_desc)
+                                 for obj, file_desc
+                                 in zip(self.file_objs, self.file_descs)]
+        else:
+            try:
+                if len(desc_input) != len(self.file_objs):
+                    raise ValueError('need same number of sources and '
+                                     'descriptions!')
+            except TypeError:
+                pass  # desc_input is generator or so
+
+            self.descriptions = []
+            for obj, file_desc, description in \
+                    zip_longest(self.file_objs, self.file_descs, desc_input):
+                if obj is None:
+                    raise ValueError('more descriptions than sources!')
+                elif description is None:
+                    self.descriptions.append(create_description(obj,
+                                                                file_desc))
+                else:
+                    self.descriptions.append(description)
+
+        self.last_sizes = [0 for _ in self.file_objs]
+        self.ignore = [False for _ in self.file_objs]
+
+        if return_when_done:
+            self.is_used = check_is_used
+        else:
+            self.is_used = true_func
+
+        for obj, file_desc, description in zip(self.file_objs, self.file_descs,
+                                               self.descriptions):
+            print('file descriptor {0}, file obj {1}, description "{2}"'
+                  .format(file_desc, obj, description))
+
+    def n_sources(self):
+        return len(self.file_objs)
+
+    def n_active_sources(self):
+        return len(self.ignore) - sum(self.ignore)
+
+    def __iter__(self):
+        while True:
+            for idx, (obj, file_desc, description, last_size, do_ignore) in \
+                    enumerate(zip(self.file_objs, self.file_descs,
+                                  self.descriptions, self.last_sizes,
+                                  self.ignore)):
+
+                if do_ignore:
+                    continue
+
+                # get new file size
+                new_size = os.fstat(file_desc).st_size
+
+                # compare to old size
+                if new_size == last_size:
+                    if not self.is_used(file_desc):
+                        warn('no one is writing to {0} / {1} -- '
+                             'stop watching it!'
+                             .format(file_desc, description),
+                             category=LogReadWarning)
+                        self.do_ignore[idx] = True
+                elif new_size < last_size:
+                    warn('{0} / {1} has become smaller ({2} --> {3})!'
+                         .format(obj, description, last_size, new_size),
+                         category=LogReadWarning)
+                else:   # (new_size > last_size)
+                    try:
+                        new_data = obj.read()
+                    except OSError as ose:    # includes IOErrors
+                        warn('io error reading from {0} / {1}: {2})'
+                             .format(obj, description, ose),
+                             category=LogReadWarning)
+                    if len(new_data) != new_size - last_size:
+                        warn('read unexpected amount from {0} / {1}: '
+                             '{2} bytes instead of {3} bytes!'
+                             .format(obj, description, len(new_data),
+                                     new_size-last_size),
+                             category=LogReadWarning)
+
+                    # post-processing
+                    to_yield = self.prepare_result(description, new_data, idx)
+                    for result in to_yield:
+                        yield result
+
+                    # prepare next iteration
+                    self.last_sizes[idx] = new_size
+
+    def prepare_result(self, description, data, idx):
+        """ from raw new data create some yield-able results
+
+        to be intended for overwriting in sub-classes
+
+        this function is called from __iter__ for each new data that becomes
+        available. It has to return some iterable whose entries are yielded
+        from iteration over objects of this class.
+
+        This base implementation just returns its input in a list, so new data
+        is yielded from __iter__ as-is
+        """
+        return [(description, data), ]
+
+
+LINE_SPLITTERS = '\n\r'
+
+class LineReader(IterativeReader):
+    """ an IterativeReader that returns new data line-wise
+    
+    this means buffering partial line data
+    """
+
+    def __init__(self, *args, **kwargs):
+        """ forwards all args and kwargs to :py:class:`IterativeReader` """
+        super().__init__(self, *args, **kwargs)
+        self.line_buffers = ['' for _ in range(self.n_sources)]
+
+    def prepare_result(self, decription, new_data, idx):
+        """ take raw new data and split it into lines
+
+        if line is not complete, then buffer it
+
+        returns lines without their newline characters
+        """
+
+        all_data = self.line_buffers[idx] + new_data
+        result = []
+        should_be_no_new_lines = False
+        for line in all_data.splitlines(keepends=True):
+            if line[-1] in LINE_SPLITTERS:
+                result.append((description, line.rstrip(LINE_SPLITTERS)))
+            elif should_be_no_new_lines:
+                raise ValueError('line splitters are not compatible with'
+                                 'str.splitlines!')
+            else:
+                self.line_buffers[idx] = line
+                should_be_no_new_lines = True  # (this should be the last)
+
+        return result
+
+
+class LogParser:
+    """ takes lines from LineReader and parses their contents """
+    pass
diff --git a/log_read_unittest.py b/log_read_unittest.py
new file mode 100644 (file)
index 0000000..4e092ba
--- /dev/null
@@ -0,0 +1,178 @@
+""" Unittests for log_read
+
+Creates own thread to write data to a log file
+
+.. codeauthor:: Christian Herdtweck, christian.herdtweck@intra2net.com
+"""
+
+import unittest
+from threading import Thread
+from tempfile import mkstemp
+import os
+import time
+import logging
+
+from log_read import IterativeReader
+
+
+class LogFileWriter(Thread):
+    """ thread that creates and writes to given file """
+
+    def __init__(self, file_name, text_pattern, n_writes=None,
+                 pause_time=0.1, do_encode=None, use_logging=True):
+        """ creates thread, deamon is True
+
+        if n_writes is None, will write indefinitely; else writes text_pattern
+        n_writes times, formatted with (counter, time.perf_counter)
+        If do_encode is True, will encode text to bytes and open file handle
+        in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text.
+        If use_logging is False, will open file and run file_handle.write;
+        If use_logging is True, will create logger that logs to file and use
+        logging.info (no file_handle.write)
+        """
+        super().__init__(daemon=True)
+        self.file_name = file_name
+        self.text_pattern = text_pattern
+        self.n_writes = n_writes
+        self.pause_time = pause_time
+        self.do_encode = do_encode
+        self.use_logging = use_logging
+
+    def run(self):
+        counter = 0
+        if self.do_encode:
+            mode = 'wb'
+            buffering = 0  # no buffering -- only allowed for byte mode
+        else:
+            mode = 'wt'
+            buffering = 1  # line buffering -- only allowed for text mode
+
+        if self.use_logging:
+            logging.basicConfig(filename=self.file_name, level=logging.INFO,
+                                format='%(msg)s')
+            while True:
+                if self.n_writes is not None and counter >= self.n_writes:
+                    break
+                self.write_and_sleep(logging.info, counter)
+                counter += 1
+        else:
+            with open(self.file_name, mode=mode, buffering=buffering) \
+                    as file_handle:
+                while True:
+                    if self.n_writes is not None and counter >= self.n_writes:
+                        break
+                    self.write_and_sleep(file_handle.write, counter)
+                    counter += 1
+
+    def write_and_sleep(self, write_func, counter):
+        text = self.text_pattern.format(counter, time.perf_counter())
+        if self.do_encode:
+            text = text.encode(self.do_encode)
+        write_func(text)
+        #print('wrote {0}'.format(counter))
+        time.sleep(self.pause_time)
+
+
+class LogReadTester(unittest.TestCase):
+    """ class with all the tests """
+
+    def setUp(self):
+        """ called before each test """
+        print('setup test')
+        temp_handle, temp_name = mkstemp()
+        os.close(temp_handle)
+        self.temp_file = temp_name
+        print('created temp file ' + self.temp_file)
+
+    def tearDown(self):
+        """ called after each test """
+        print('tear down test')
+        if os.path.isfile(self.temp_file):
+            print('delete temp file' + self.temp_file)
+            os.unlink(self.temp_file)
+
+    @classmethod
+    def setUpClass(clz):
+        """ called once before the first test """
+        print('setup test class')
+        clz.temp_file = None
+
+    @classmethod
+    def tearDownClass(clz):
+        """ called once after the last test """
+        print('tear down test class')
+
+    def test_args(self):
+        self.assertRaises(TypeError, IterativeReader)  # no args
+        self.assertRaises(ValueError, IterativeReader, [], 'test')
+        self.assertRaises(ValueError, IterativeReader, [], ['test', ])
+        self.assertRaises(ValueError, IterativeReader, self.temp_file)
+        self.assertRaises(ValueError, IterativeReader, [self.temp_file, ])
+        with open(self.temp_file, 'rt') as file_handle:
+            reader = IterativeReader(file_handle)
+            self.assertEqual(reader.n_sources(), 1)
+            reader = IterativeReader([file_handle, ])
+            self.assertEqual(reader.n_sources(), 1)
+            reader = IterativeReader(file_handle, 'desc')
+            self.assertEqual(reader.n_sources(), 1)
+            reader = IterativeReader([file_handle, ], ['desc', ])
+            self.assertEqual(reader.n_sources(), 1)
+            reader = IterativeReader(file_handle, ['desc', ])
+            self.assertEqual(reader.n_sources(), 1)
+            self.assertRaises(ValueError, IterativeReader,
+                              [file_handle, ], 'desc', )
+            reader = IterativeReader([file_handle, file_handle],
+                                     ['desc1', 'desc2'])
+            self.assertEqual(reader.n_sources(), 2)
+            reader = IterativeReader((file_handle for idx in range(5)))
+            self.assertEqual(reader.n_sources(), 5)
+            self.assertRaises(ValueError, IterativeReader,
+                              (file_handle for idx in range(5)),
+                              tuple('desc' for idx in range(4)))
+            self.assertRaises(ValueError, IterativeReader,
+                              (file_handle for idx in range(5)),
+                              ('desc' for idx in range(6)))
+
+    def test_simple_read(self):
+        """ write fixed number of lines, see how fast they are retrieved """
+
+        # need newline only when writing text (because of write buffering)
+        param_combinations = ('{0}:{1}\n', None,    False), \
+                             ('{0}:{1}\n', 'ascii', False), \
+                             ('{0}:{1} ' , 'ascii', False)
+                             #('{0}:{1}\n', None   , True), \  logging seems
+                             #('{0}:{1}\n', 'ascii', True), \  to buffer writes
+                             #('{0}:{1} ' , None   , True), \  to files
+                             #('{0}:{1} ' , 'ascii', True)
+
+        n_texts = 10
+        pause_time = 0.01  # 100 tps (texts per second)
+
+        for text_pattern, encoding, use_logging in param_combinations:
+            LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts,
+                          pause_time=pause_time, do_encode=encoding,
+                          use_logging=use_logging).start()
+            print('testing with log file {0}'.format(self.temp_file))
+            print('encoding is {0}, use logging = {1}'.format(encoding,
+                                                              use_logging))
+            time_diffs = []
+
+            with open(self.temp_file, 'rt') as file_handle:
+                reader = IterativeReader(file_handle)
+                for counter, (desc, text) in enumerate(reader):
+                    receive_time = time.perf_counter()
+                    text = text.strip()
+                    print('{1}: received text "{0}"'.format(text, counter))
+                    index = text.index(':')
+                    count_text = int(text[:index].strip())
+                    self.assertEqual(count_text, counter)
+                    write_time = float(text[index+1:].strip())
+                    time_diffs.append((receive_time - write_time)*1.e6)
+                    if counter == n_texts-1:
+                        print('stop since have {0} reads'.format(counter))
+                        break
+            print('time diffs in us: {0}'.format(time_diffs))
+            self.assertTrue(max(time_diffs) < 1000., 'read took more than 1ms!')
+
+if __name__ == '__main__':
+    unittest.main()