"""
follow process output, log files and pipes using select and poll
+DEPRECATED
+(at least for files see log_read; may still be usefull for pipes/sockets)
+
Main class is Follower which does the polling and selecting, it is best used
in a with-statement as follows::
--- /dev/null
+""" Unittest for follow.py
+
+This is a little more involved since in order to test properly, need a second
+thread that writes to file/socket/stdout
+
+..todo:: as in log_read_unittest: disable buffering in LogFileWriter
+
+DEPRECATED
+(at least for files see log_read; may still be usefull for pipes/sockets)
+
+.. codeauthor:: Christian Herdtweck, christian.herdtweck@intra2net.com
+"""
+
+
+import unittest
+from tempfile import mkstemp
+import threading
+from follow import *
+import time
+import os
+import os.path
+
+
+class LogFileWriter(threading.Thread):
+ """ thread that creates and writes to given file handle """
+
+ def __init__(self, file_name, text_pattern, n_writes=None,
+ pause_time=0.1, do_encode=None):
+ """ creates thread, deamon is True
+
+ if n_writes is None, will write indefinitely; else writes text_pattern
+ n_writes times, formatted with (counter, time.perf_counter)
+ """
+ super().__init__(daemon=True)
+ self.file_name = file_name
+ self.text_pattern = text_pattern
+ self.n_writes = n_writes
+ self.pause_time = pause_time
+ self.do_encode = do_encode
+
+ def run(self):
+ counter = 0
+ if self.do_encode:
+ mode = 'wb'
+ else:
+ mode = 'wt'
+
+ with open(self.file_name, mode) as file_handle:
+ while True:
+ if self.n_writes is not None and counter >= self.n_writes:
+ break
+
+ if self.do_encode:
+ file_handle.write(self.text_pattern
+ .format(counter, time.perf_counter())
+ .encode(self.do_encode))
+ else:
+ file_handle.write(self.text_pattern
+ .format(counter, time.perf_counter()))
+ print('wrote {0}'.format(counter))
+ counter += 1
+ time.sleep(self.pause_time)
+
+
+class FollowTester(unittest.TestCase):
+ """ Unittest for follow.py """
+
+ def test_logfile(self):
+ """ create logfile, write to it, expect output """
+
+ text_pattern = '{0}:{1}\n'
+ n_texts = 10
+ pause_time = 1
+ encoding = 'ascii'
+
+ LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts,
+ pause_time=pause_time, do_encode=encoding).start()
+ print('testing with log file {0}'.format(self.temp_file))
+ time_diffs = []
+
+ with open(self.temp_file) as read_handle:
+ follower = Follower(read_handle)
+ for counter, (source, desc, text, flags) in \
+ enumerate(follower):
+ receive_time = time.perf_counter()
+ print('received text: "{0}"'.format(text))
+ index = text.index(':')
+ counter = int(text[:index].strip())
+ write_time = float(text[index+1:].strip())
+ time_diffs.append(receive_time - write_time)
+ if counter == n_texts:
+ break
+
+
+ def setUp(self):
+ """ called before each test """
+ print('setup test')
+ temp_handle, temp_name = mkstemp()
+ os.close(temp_handle)
+ self.temp_file = temp_name
+ print('created temp file ' + self.temp_file)
+
+ def tearDown(self):
+ """ called after each test """
+ print('tear down test')
+ if os.path.isfile(self.temp_file):
+ print('delete temp file' + self.temp_file)
+ os.unlink(self.temp_file)
+
+ @classmethod
+ def setUpClass(clz):
+ """ called once before the first test """
+ print('setup test class')
+ clz.temp_file = None
+
+ @classmethod
+ def tearDownClass(clz):
+ """ called once after the last test """
+ print('tear down test class')
+
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+""" Iterative reading of log files
+
+Basic Functionality (class :py:class:`IterativeReader`:)
+Runs stat in a loop to find out whether file size has changed. Then reads the
+new data and forwards that
+
+..todo:: Want to also use lsof to find out whether file/pipe/socket was closed,
+ so can return from read loop
+
+:py:class:`LineReader` takes output of :py:class:`IterativeReader` and returns
+it line-wise as is normal for log files
+
+:py:class:`LogParser` takes those lines and tries to parse them into fields
+like date, time, module name, urgency and message.
+
+..todo:: auto-detect log line layout
+
+.. codeauthor:: Christian Herdtweck, christian.herdtweck@intra2net.com
+"""
+
+import os
+from warnings import warn
+import os.path
+from itertools import zip_longest
+
+from type_helpers import is_str_or_byte, is_file_obj
+
+
+class LogReadWarning(UserWarning):
+ """ warnings issued by classes in this module """
+ pass
+
+
+def true_func(unused_argument_but_that_is_ok):
+ """ does nothing, always returns True """
+ return True
+
+
+def check_is_used(some_file_or_handle):
+ """ check whether file is being written to
+
+ to be implemented, e.g. using lsof
+ """
+ raise NotImplementedError()
+
+
+_create_description_unknown_counter = 0
+
+def create_description(file_obj, file_desc):
+ """ create some description for given file-like object / file descriptor
+
+ :param file_obj: file-like object
+ :param int file_desc: os-level file descriptor
+ :returns: string
+ """
+
+ global _create_description_unknown_counter
+
+ try:
+ desc = file_obj.name
+ if desc:
+ return desc
+ except AttributeError:
+ pass
+
+ if file_desc is not None:
+ return 'file{0}'.format(file_desc)
+ else:
+ _create_description_unknown_counter += 1
+ return 'unknown{0}'.format(_create_description_unknown_counter)
+
+
+#: error message for IterativeReader constructor
+_STR_ERR = 'not accepting file name "{0}" since cannot guarantee closing ' \
+ 'files --> use with open(file_name)!'
+
+
+class IterativeReader:
+ """ reads from a given file
+
+ Uses os.stat(file_obj.fileno()).st_size as measure whether file has changed
+ or not; Always reads as much data as possible
+
+ Catches most common exceptions in iteration (not constructor)
+
+ Does not are about closing files, so does not accept file names
+
+ This is the base for class :py:class:`LineReader` that just has to
+ implement a different :py:meth:`prepare_result` method
+ """
+
+ def __init__(self, sources, descs=None, return_when_done=False):
+ """ creates a reader; does some basic checks on args
+
+ :param sources: iterable over sources. Sources can be opened file
+ objects or read-opened os-level file descriptors.
+ Calling code has to ensure they are closed properly, so
+ best use this within a "with open(file_name) as
+ file_handle:"-context. If sources is a single file
+ obj/descriptor, both source and desc will be converted
+ to lists of length 1
+ :param descs: can be anything of same length as sources. If sources is
+ a single source, then descs is also converted to a list
+ of length 1. If not given (i.e. None), will use
+ :py:func:`create_description` to guess descriptions
+ :param bool return_when_done: ignore file_handle if no-one is writing
+ to it any more. Return from iterator when
+ all watched files are done (not
+ implemented yet)
+ :raises: OSError when testing fstat on source
+ """
+ if not sources:
+ raise ValueError('need at least some source!')
+ elif is_str_or_byte(sources):
+ raise ValueError(_STR_ERR.format(sources))
+ elif is_file_obj(sources) or isinstance(sources, int):
+ source_input = [sources, ]
+ desc_input = [descs, ]
+ else:
+ source_input = sources # assume some iterable
+ desc_input = descs
+
+ # now divide sources into os-level file descriptors for os.fstat,
+ # and file objects for read()
+ self.file_objs = []
+ self.file_descs = [] # file descriptOR, not descriptION
+ for source in source_input:
+ if is_file_obj(source):
+ self.file_objs.append(source)
+ self.file_descs.append(source.fileno())
+ elif isinstance(source, int):
+ self.file_objs.append(os.fdopen(source))
+ self.file_descs.append(source)
+ elif is_str_or_byte(source):
+ raise ValueError(_STR_ERR.format(source))
+ else:
+ raise ValueError('source {0} is neither file obj nor file '
+ 'descriptor!')
+
+ # try to fstat the new file descriptor
+ os.fstat(self.file_descs[-1])
+
+ # guess descriptions if not given
+ if not desc_input:
+ self.descriptions = [create_description(obj, file_desc)
+ for obj, file_desc
+ in zip(self.file_objs, self.file_descs)]
+ else:
+ try:
+ if len(desc_input) != len(self.file_objs):
+ raise ValueError('need same number of sources and '
+ 'descriptions!')
+ except TypeError:
+ pass # desc_input is generator or so
+
+ self.descriptions = []
+ for obj, file_desc, description in \
+ zip_longest(self.file_objs, self.file_descs, desc_input):
+ if obj is None:
+ raise ValueError('more descriptions than sources!')
+ elif description is None:
+ self.descriptions.append(create_description(obj,
+ file_desc))
+ else:
+ self.descriptions.append(description)
+
+ self.last_sizes = [0 for _ in self.file_objs]
+ self.ignore = [False for _ in self.file_objs]
+
+ if return_when_done:
+ self.is_used = check_is_used
+ else:
+ self.is_used = true_func
+
+ for obj, file_desc, description in zip(self.file_objs, self.file_descs,
+ self.descriptions):
+ print('file descriptor {0}, file obj {1}, description "{2}"'
+ .format(file_desc, obj, description))
+
+ def n_sources(self):
+ return len(self.file_objs)
+
+ def n_active_sources(self):
+ return len(self.ignore) - sum(self.ignore)
+
+ def __iter__(self):
+ while True:
+ for idx, (obj, file_desc, description, last_size, do_ignore) in \
+ enumerate(zip(self.file_objs, self.file_descs,
+ self.descriptions, self.last_sizes,
+ self.ignore)):
+
+ if do_ignore:
+ continue
+
+ # get new file size
+ new_size = os.fstat(file_desc).st_size
+
+ # compare to old size
+ if new_size == last_size:
+ if not self.is_used(file_desc):
+ warn('no one is writing to {0} / {1} -- '
+ 'stop watching it!'
+ .format(file_desc, description),
+ category=LogReadWarning)
+ self.do_ignore[idx] = True
+ elif new_size < last_size:
+ warn('{0} / {1} has become smaller ({2} --> {3})!'
+ .format(obj, description, last_size, new_size),
+ category=LogReadWarning)
+ else: # (new_size > last_size)
+ try:
+ new_data = obj.read()
+ except OSError as ose: # includes IOErrors
+ warn('io error reading from {0} / {1}: {2})'
+ .format(obj, description, ose),
+ category=LogReadWarning)
+ if len(new_data) != new_size - last_size:
+ warn('read unexpected amount from {0} / {1}: '
+ '{2} bytes instead of {3} bytes!'
+ .format(obj, description, len(new_data),
+ new_size-last_size),
+ category=LogReadWarning)
+
+ # post-processing
+ to_yield = self.prepare_result(description, new_data, idx)
+ for result in to_yield:
+ yield result
+
+ # prepare next iteration
+ self.last_sizes[idx] = new_size
+
+ def prepare_result(self, description, data, idx):
+ """ from raw new data create some yield-able results
+
+ to be intended for overwriting in sub-classes
+
+ this function is called from __iter__ for each new data that becomes
+ available. It has to return some iterable whose entries are yielded
+ from iteration over objects of this class.
+
+ This base implementation just returns its input in a list, so new data
+ is yielded from __iter__ as-is
+ """
+ return [(description, data), ]
+
+
+LINE_SPLITTERS = '\n\r'
+
+class LineReader(IterativeReader):
+ """ an IterativeReader that returns new data line-wise
+
+ this means buffering partial line data
+ """
+
+ def __init__(self, *args, **kwargs):
+ """ forwards all args and kwargs to :py:class:`IterativeReader` """
+ super().__init__(self, *args, **kwargs)
+ self.line_buffers = ['' for _ in range(self.n_sources)]
+
+ def prepare_result(self, decription, new_data, idx):
+ """ take raw new data and split it into lines
+
+ if line is not complete, then buffer it
+
+ returns lines without their newline characters
+ """
+
+ all_data = self.line_buffers[idx] + new_data
+ result = []
+ should_be_no_new_lines = False
+ for line in all_data.splitlines(keepends=True):
+ if line[-1] in LINE_SPLITTERS:
+ result.append((description, line.rstrip(LINE_SPLITTERS)))
+ elif should_be_no_new_lines:
+ raise ValueError('line splitters are not compatible with'
+ 'str.splitlines!')
+ else:
+ self.line_buffers[idx] = line
+ should_be_no_new_lines = True # (this should be the last)
+
+ return result
+
+
+class LogParser:
+ """ takes lines from LineReader and parses their contents """
+ pass
--- /dev/null
+""" Unittests for log_read
+
+Creates own thread to write data to a log file
+
+.. codeauthor:: Christian Herdtweck, christian.herdtweck@intra2net.com
+"""
+
+import unittest
+from threading import Thread
+from tempfile import mkstemp
+import os
+import time
+import logging
+
+from log_read import IterativeReader
+
+
+class LogFileWriter(Thread):
+ """ thread that creates and writes to given file """
+
+ def __init__(self, file_name, text_pattern, n_writes=None,
+ pause_time=0.1, do_encode=None, use_logging=True):
+ """ creates thread, deamon is True
+
+ if n_writes is None, will write indefinitely; else writes text_pattern
+ n_writes times, formatted with (counter, time.perf_counter)
+ If do_encode is True, will encode text to bytes and open file handle
+ in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text.
+ If use_logging is False, will open file and run file_handle.write;
+ If use_logging is True, will create logger that logs to file and use
+ logging.info (no file_handle.write)
+ """
+ super().__init__(daemon=True)
+ self.file_name = file_name
+ self.text_pattern = text_pattern
+ self.n_writes = n_writes
+ self.pause_time = pause_time
+ self.do_encode = do_encode
+ self.use_logging = use_logging
+
+ def run(self):
+ counter = 0
+ if self.do_encode:
+ mode = 'wb'
+ buffering = 0 # no buffering -- only allowed for byte mode
+ else:
+ mode = 'wt'
+ buffering = 1 # line buffering -- only allowed for text mode
+
+ if self.use_logging:
+ logging.basicConfig(filename=self.file_name, level=logging.INFO,
+ format='%(msg)s')
+ while True:
+ if self.n_writes is not None and counter >= self.n_writes:
+ break
+ self.write_and_sleep(logging.info, counter)
+ counter += 1
+ else:
+ with open(self.file_name, mode=mode, buffering=buffering) \
+ as file_handle:
+ while True:
+ if self.n_writes is not None and counter >= self.n_writes:
+ break
+ self.write_and_sleep(file_handle.write, counter)
+ counter += 1
+
+ def write_and_sleep(self, write_func, counter):
+ text = self.text_pattern.format(counter, time.perf_counter())
+ if self.do_encode:
+ text = text.encode(self.do_encode)
+ write_func(text)
+ #print('wrote {0}'.format(counter))
+ time.sleep(self.pause_time)
+
+
+class LogReadTester(unittest.TestCase):
+ """ class with all the tests """
+
+ def setUp(self):
+ """ called before each test """
+ print('setup test')
+ temp_handle, temp_name = mkstemp()
+ os.close(temp_handle)
+ self.temp_file = temp_name
+ print('created temp file ' + self.temp_file)
+
+ def tearDown(self):
+ """ called after each test """
+ print('tear down test')
+ if os.path.isfile(self.temp_file):
+ print('delete temp file' + self.temp_file)
+ os.unlink(self.temp_file)
+
+ @classmethod
+ def setUpClass(clz):
+ """ called once before the first test """
+ print('setup test class')
+ clz.temp_file = None
+
+ @classmethod
+ def tearDownClass(clz):
+ """ called once after the last test """
+ print('tear down test class')
+
+ def test_args(self):
+ self.assertRaises(TypeError, IterativeReader) # no args
+ self.assertRaises(ValueError, IterativeReader, [], 'test')
+ self.assertRaises(ValueError, IterativeReader, [], ['test', ])
+ self.assertRaises(ValueError, IterativeReader, self.temp_file)
+ self.assertRaises(ValueError, IterativeReader, [self.temp_file, ])
+ with open(self.temp_file, 'rt') as file_handle:
+ reader = IterativeReader(file_handle)
+ self.assertEqual(reader.n_sources(), 1)
+ reader = IterativeReader([file_handle, ])
+ self.assertEqual(reader.n_sources(), 1)
+ reader = IterativeReader(file_handle, 'desc')
+ self.assertEqual(reader.n_sources(), 1)
+ reader = IterativeReader([file_handle, ], ['desc', ])
+ self.assertEqual(reader.n_sources(), 1)
+ reader = IterativeReader(file_handle, ['desc', ])
+ self.assertEqual(reader.n_sources(), 1)
+ self.assertRaises(ValueError, IterativeReader,
+ [file_handle, ], 'desc', )
+ reader = IterativeReader([file_handle, file_handle],
+ ['desc1', 'desc2'])
+ self.assertEqual(reader.n_sources(), 2)
+ reader = IterativeReader((file_handle for idx in range(5)))
+ self.assertEqual(reader.n_sources(), 5)
+ self.assertRaises(ValueError, IterativeReader,
+ (file_handle for idx in range(5)),
+ tuple('desc' for idx in range(4)))
+ self.assertRaises(ValueError, IterativeReader,
+ (file_handle for idx in range(5)),
+ ('desc' for idx in range(6)))
+
+ def test_simple_read(self):
+ """ write fixed number of lines, see how fast they are retrieved """
+
+ # need newline only when writing text (because of write buffering)
+ param_combinations = ('{0}:{1}\n', None, False), \
+ ('{0}:{1}\n', 'ascii', False), \
+ ('{0}:{1} ' , 'ascii', False)
+ #('{0}:{1}\n', None , True), \ logging seems
+ #('{0}:{1}\n', 'ascii', True), \ to buffer writes
+ #('{0}:{1} ' , None , True), \ to files
+ #('{0}:{1} ' , 'ascii', True)
+
+ n_texts = 10
+ pause_time = 0.01 # 100 tps (texts per second)
+
+ for text_pattern, encoding, use_logging in param_combinations:
+ LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts,
+ pause_time=pause_time, do_encode=encoding,
+ use_logging=use_logging).start()
+ print('testing with log file {0}'.format(self.temp_file))
+ print('encoding is {0}, use logging = {1}'.format(encoding,
+ use_logging))
+ time_diffs = []
+
+ with open(self.temp_file, 'rt') as file_handle:
+ reader = IterativeReader(file_handle)
+ for counter, (desc, text) in enumerate(reader):
+ receive_time = time.perf_counter()
+ text = text.strip()
+ print('{1}: received text "{0}"'.format(text, counter))
+ index = text.index(':')
+ count_text = int(text[:index].strip())
+ self.assertEqual(count_text, counter)
+ write_time = float(text[index+1:].strip())
+ time_diffs.append((receive_time - write_time)*1.e6)
+ if counter == n_texts-1:
+ print('stop since have {0} reads'.format(counter))
+ break
+ print('time diffs in us: {0}'.format(time_diffs))
+ self.assertTrue(max(time_diffs) < 1000., 'read took more than 1ms!')
+
+if __name__ == '__main__':
+ unittest.main()