--- /dev/null
+#!/usr/bin/env python
+
+# The software in this package is distributed under the GNU General
+# Public License version 2 (with a special exception described below).
+#
+# A copy of GNU General Public License (GPL) is included in this distribution,
+# in the file COPYING.GPL.
+#
+# As a special exception, if other files instantiate templates or use macros
+# or inline functions from this file, or you compile this file and link it
+# with other works to produce a work based on this file, this file
+# does not by itself cause the resulting work to be covered
+# by the GNU General Public License.
+#
+# However the source code for this file must still be made available
+# in accordance with section (3) of the GNU General Public License.
+#
+# This exception does not invalidate any other reasons why a work based
+# on this file might be covered by the GNU General Public License.
+
+"""
+follow.py: follow process output, log files and pipes using select and poll
+
+Main class is Follower which does the polling and selecting, it is best used
+ in a with-statement as follows:
+
+with follow('/var/log/messages') as flwr
+ for line in flwr:
+ do_something_with(line)
+
+This will read the given file and yield its contents line-by-line until the end
+of the file. It will then wait for new additions to the file and provide the
+new lines newly instantaneously
+
+Things to note:
+* all data must be line-based!
+* will only work on Linux (except for sockets maybe)
+* create in py2 but try to stay py3-compatibel
+
+[START: not implemented yet]
+If following a log file, a LogParser can be attached that auto-detects some of
+ the log's structure (like date and time fields, log levels and sources) from
+ its first few lines.. This can be used anlogously. Of course, you can also
+ specify the log's structure (to be implemented before the auto-detect...).
+
+with follow_log('/var/log/messages') as log_flwr:
+ for content in log_flwr:
+ do_something_with(content.datetime, content.log_level, content.text)
+[END: not implemented yet]
+
+A Follower is an iterator, which means you can do lots of cool things with it,
+ including (see also itertools package, itertool recipies, "Functional
+ Programming Howto"):
+
+# ignore source and description:
+for _, _, text_line in my_follower:
+ do_something_with(line)
+
+# enumerate:
+for line_number, line in enumerate(my_follower)
+ do_something_with(line, line_number)
+
+# combine with other iterator:
+for line, other_data in zip(my_follwer, other_iter)
+ do_something_with(line, other_data)
+
+# filter:
+for line in my_follower if test(my_func):
+ do_something_with(line)
+
+# tee:
+iter1, iter2 = itertools.tee(my_follower)
+--> can read from both (but each line is given to only one of them)
+
+# for each element, peek at the next one to help do the right thing
+for line, next_line in pairwise(my_follower):
+ do_something_with(line, peek_next_line=next_line)
+
+# create new iterator or generator
+for line in my_follwer:
+ some_result = do_something_with(line)
+ yield some_result
+
+NOT possible: len(my_follower), Follower(my_file, my_file) # (twice the same)
+
+Christian Herdtweck, Intra2net, July 2015
+(c) Intra2net AG 2015
+"""
+
+from __future__ import print_function
+from warnings import warn
+import select
+
+# #############################################################################
+# CONSTANTS
+# #############################################################################
+DEFAULT_POLL_FLAGS = select.POLLIN | select.POLLPRI | select.POLLHUP
+# | select.POLLERR
+
+DEFAULT_POLL_TIMEOUT = 10
+
+# #############################################################################
+# EXCEPTIONS
+# #############################################################################
+
+
+def flags_to_str(flags):
+ """ return short string representation for poll flags """
+ text_parts = []
+ if flags & select.POLLIN:
+ text_parts.append('IN')
+ if flags & select.POLLOUT:
+ text_parts.append('OUT')
+ if flags & select.POLLPRI:
+ text_parts.append('PRI')
+ if flags & select.POLLERR:
+ text_parts.append('ERR')
+ if flags & select.POLLHUP:
+ text_parts.append('HUP')
+ if flags & select.POLLNVAL:
+ text_parts.append('INVALID')
+ return ','.join(text_parts)
+
+
+class PollFlagsException(Exception):
+ """ exception raised if polling returned unexpected flags """
+ def __init__(self, flags, desc):
+ super(PollFlagsException, self).__init__(
+ 'Unexpected flags from polling {0}: {1}'.format(
+ desc, flags_to_str(flags)))
+
+
+class PollHupException(Exception):
+ """ exception raised when polled source sends a SIGHUP """
+ def __init__(self, desc):
+ super(PollHupException, self).__init__(
+ 'Received HUP from polling, {0} was probably killed!'.format(desc))
+
+
+class PollUnknownSourceException(Exception):
+ """ exception raised when polling returns unknown source """
+ def __init__(self, file_no):
+ super(PollUnknownSourceException, self).__init__(
+ 'Unknown source returned from polling: file_no={0}!'.format(
+ file_no))
+
+
+class Follower(object):
+ """ uses select and poll to follow some set of pipes, files, sockets
+
+ relies on data being line-based!
+
+ will read as much data as possible and then wait for more
+
+ Iterator over lines!
+ """
+
+ # iterator over zip(sources, file_nos, descriptions)
+ _source_iter = None
+
+ # for polling:
+ _poller = None
+ _flags = None
+ _timeout = None
+
+ def __init__(self, *sources_and_descriptions, **other_args):
+ """ create a Follower for given sources and optional descriptions
+
+ Will guess if args are just sources or also descriptions of these.
+ All of these are possible:
+ Follower(src)
+ Follower(src, desc)
+ Follower(src1, src2, src3)
+ Follower(src1, desc1, src2, desc2, src3, desc3)
+ Follower(src1, src2, desc2, src3)
+ Follower(src, desc_ignored, desc_used) # warn but accept
+ Follower(single_list_of_sources_and_descs)
+
+ Not possible:
+ Follower()
+ Follower(desc, src)
+
+ Descriptions must be strings, they identify the sources in generated
+ output and are used in error messages
+ Sources must be file handles, open pipes or open sockets (or anything
+ else that gives a reasonable fileno(), so help of the select module)
+ Sources are not closed!
+
+ other_args can include flags and timeout
+ """
+
+ # take care about other_args
+ if 'flags' in other_args:
+ self._flags = other_args['flags']
+ else:
+ self._flags = DEFAULT_POLL_FLAGS
+ if 'timeout' in other_args:
+ self._timeout = other_args['timeout']
+ else:
+ self._timeout = DEFAULT_POLL_TIMEOUT
+
+ for key in other_args.keys():
+ if key not in ('flags', 'timeout'):
+ raise ValueError('argument not recognized: {0}'.format(key))
+
+ self._poller = select.poll()
+
+ sources = []
+ file_nos = []
+ descriptions = []
+
+ # take care of sources_and_descriptions
+ if not sources_and_descriptions:
+ raise ValueError('need at least one source!')
+ elif len(sources_and_descriptions) == 1 \
+ and isinstance(sources_and_descriptions[0], (list, tuple)):
+ sources_and_descriptions = sources_and_descriptions[0]
+
+ for arg in sources_and_descriptions:
+ if isinstance(arg, str): # is a description
+ if len(descriptions) == 0:
+ raise ValueError(
+ 'first arg must be source, not a description!')
+ if descriptions[-1] is not None:
+ warn('Overwriting description "{0}" with "{1}"'.format(
+ descriptions[-1], arg))
+ descriptions[-1] = arg
+ else: # is a new source
+ sources.append(arg)
+ file_nos.append(arg.fileno())
+ descriptions.append(None)
+ self._poller.register(arg, self._flags)
+ # end: for all args
+
+ # need iterator over these 3 lists all the time
+ self._source_iter = list((src, fno, desc) for src, fno, desc in
+ zip(sources, file_nos, descriptions))
+ # end: Follower constructor
+
+ def next(self):
+ """ main function to poll next line from sources
+
+ returns (source, desc, line_stripped)
+ """
+
+ while True:
+ result = self._poller.poll(self._timeout)
+ for file_no, flags in result:
+
+ # identify source
+ desc = -1
+ source = None
+ for curr_src, curr_no, curr_desc in self._source_iter:
+ if curr_no == file_no:
+ desc = curr_desc
+ source = curr_src
+ break
+ # end: loop over self._source_iter
+ if desc is -1:
+ raise PollUnknownSourceException(file_no)
+
+ if not flags & self._flags:
+ raise PollFlagsException(flags, desc)
+
+ if flags & select.POLLHUP:
+ raise PollHupException(desc)
+
+ # read line from source
+ line = source.readline()
+
+ if not line:
+ continue
+ # if reach here, we have a new line of text
+
+ return source, desc, line.strip()
+ # for poll results
+ # end: inf loop
+ # end: Follower._next_line
+
+ def __iter__(self):
+ """ makes this an iterator, called by iter(my_follower) """
+ return self
+
+ def __next__(self):
+ """ called by next(my_follower) """
+ return self.next()
+# end: class Follower
+
+
+# #############################################################################
+# CONTEXT MANAGEMENT
+# #############################################################################
+
+
+class FollowContextManager(object):
+ """ context for Follower objects, ensures everything is closed properly
+
+ opens and closes files and sockets; communicates and waits for Popen
+ process objects whose stdout and stderr pipes are followed
+ """
+
+ files = None
+ file_descs = None
+ file_handles = None
+
+ sockets = None
+ socket_descs = None
+
+ procs = None
+ proc_descs = None
+
+ def __init__(self, files=None, file_descs=None,
+ sockets=None, socket_descs=None,
+ procs=None, proc_descs=None):
+ """ create a context manager for Follower, only does arg checking """
+ print('__init__')
+
+ if files is None:
+ self.files = []
+ if isinstance(files, str):
+ self.files = [files, ]
+ else:
+ self.files = files
+ if file_descs is None:
+ self.file_descs = [None for _ in self.files]
+ elif len(self.files) == len(self.file_descs):
+ self.file_descs = file_descs
+ else:
+ raise ValueError('if given file descs, need desc for all files!')
+
+ if sockets is None:
+ self.sockets = []
+ else:
+ self.sockets = sockets
+ self.socket_descs = socket_descs
+ if procs is None:
+ procs = []
+ else:
+ self.procs = procs
+ self.proc_descs = proc_descs
+
+ def __enter__(self):
+ """ called when entering run context """
+ print('__enter__')
+
+ args = []
+
+ # open files
+ self.file_handles = []
+ for file_arg, new_desc in zip(self.files, self.file_descs):
+ if isinstance(file_arg, str):
+ new_handle = open(file_arg)
+ if new_desc is None:
+ new_desc = file_arg
+ else: # assume file_arg is a file handle
+ new_handle = file_arg
+ if new_desc is None:
+ new_desc = file_arg.name
+ self.file_handles.append(new_handle)
+ args.append(new_handle)
+ args.append(new_desc)
+ # end: for files and file_descs
+
+ # TODO: repeat for sockets and procs, add them to args
+ return Follower(args)
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """ called when leaving run context """
+ print('__exit__ with args {0}, {1}, {2}'.format(exc_type, exc_value,
+ traceback))
+
+ # close files
+ for handle in self.file_handles:
+ try:
+ handle.close()
+ except:
+ warn('ignoring exception exiting follower context manager!')
+
+ # TODO: communicate and wait for procs, close sockets
+# end: class FollowContextManager
+
+
+def follow(*args, **kwargs):
+ """ creates a ContextManager for a Follower to be used in "with" statements
+
+ for example:
+
+ with follow('/var/log/messages') as log_flwr
+ for source, desc, line in log_flwr
+ do_something_with(line)
+
+ for specification of args see FollowContextManager constructor help
+ """
+ return FollowContextManager(*args, **kwargs)
+
+
+# #############################################################################
+# TESTING
+# #############################################################################
+
+from datetime import datetime as dt, timedelta as td
+from subprocess import Popen, PIPE
+
+syslog_file = '/var/log/messages'
+time_diff_seconds = 300
+syslog_time_format = '%b %d %H:%M:%S'
+
+
+def test_syslog_line(n_lines, source, desc, line, start_time):
+ """ called by test functions for each line of syslog """
+
+ if n_lines % 1000 == 0:
+ print(('{0} old lines, showing lines after {1}; '
+ 'abort using Ctrl-C').format(n_lines, start_time))
+ try:
+ log_time = dt.strptime(line[:15], syslog_time_format)
+ log_time = log_time.replace(year=start_time.year)
+ except ValueError:
+ log_time = None
+
+ if False: # log_time is None or log_time > start_time:
+ print('line {0} from "{1}", (orig from {2}): {3}'.format(
+ n_lines, desc, log_time, line))
+
+
+def test_follower_syslog():
+ """ test Follower on syslog file using PIPEs and an open file
+
+ show lines from the last 5 minutes
+ """
+
+ start_time = dt.now() - td(seconds=time_diff_seconds)
+
+ # create process for 'tail -f syslog' --> 2 pipes (stdout + stderr)
+ proc = Popen(['tail', '-f', syslog_file], stdout=PIPE, stderr=PIPE)
+
+ # read from all three sources
+ with open(syslog_file, 'r') as file_handle:
+ flwr = Follower(file_handle, 'syslog file', proc.stdout, 'tail syslog',
+ proc.stderr, 'tail stderr')
+ for n_lines, (source, desc, line) in enumerate(flwr):
+ test_syslog_line(n_lines, source, desc, line, start_time)
+ # until user aborts
+ # end: with open syslog
+# end: function test_follower_syslog
+
+
+def test_follower_context():
+ """ test FollowContextManager and follow() function """
+
+ start_time = dt.now() - td(seconds=time_diff_seconds)
+
+ with follow(syslog_file) as flwr:
+ for n_lines, (source, desc, line) in enumerate(flwr):
+ test_syslog_line(n_lines, source, desc, line, start_time)
+
+
+def test():
+ """ Main function, tests some of class's functionality """
+
+ if False:
+ test_follower_syslog()
+ if True:
+ test_follower_context()
+# end: function main
+
+
+if __name__ == '__main__':
+ test()