+++ /dev/null
-#!/usr/bin/env python
-
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""
-follow process output, log files and pipes using select and poll
-
-DEPRECATED
-(at least for files see log_read; may still be usefull for pipes/sockets)
-
-Main class is Follower which does the polling and selecting, it is best used
-in a with-statement as follows::
-
- with follow('/var/log/messages') as flwr
- for line in flwr:
- do_something_with(line)
-
-This will read the given file and yield its contents line-by-line until the end
-of the file. It will then wait for new additions to the file and provide the
-new lines newly instantaneously
-
-Things to note:
-
-* all data must be line-based!
-* will only work on Linux (except for sockets maybe).
-* create in py2 but try to stay py3-compatible.
-
-[START: not implemented yet]
-
-If following a log file, a LogParser can be attached that auto-detects some of
-the log's structure (like date and time fields, log levels and sources) from
-its first few lines.. This can be used anlogously. Of course, you can also
-specify the log's structure (to be implemented before the auto-detect...)::
-
- with follow_log('/var/log/messages') as log_flwr:
- for content in log_flwr:
- do_something_with(content.datetime, content.log_level, content.text)
-
-[END: not implemented yet]
-
-A Follower is an iterator, which means you can do lots of cool things with it,
-including (see also itertools package, itertool recipies, "Functional
-Programming Howto")::
-
- # ignore source and description:
- for _, _, text_line in my_follower:
- do_something_with(line)
-
- # enumerate:
- for line_number, line in enumerate(my_follower)
- do_something_with(line, line_number)
-
- # combine with other iterator:
- for line, other_data in zip(my_follwer, other_iter)
- do_something_with(line, other_data)
-
- # filter:
- for line in my_follower if test(my_func):
- do_something_with(line)
-
- # tee:
- iter1, iter2 = itertools.tee(my_follower)
- --> can read from both (but each line is given to only one of them)
-
- # for each element, peek at the next one to help do the right thing
- for line, next_line in pairwise(my_follower):
- do_something_with(line, peek_next_line=next_line)
-
- # create new iterator or generator
- for line in my_follwer:
- some_result = do_something_with(line)
- yield some_result
-
-NOT possible::
-
- len(my_follower)
- Follower(my_file, my_file) # (twice the same)
-"""
-
-from __future__ import print_function
-from warnings import warn
-from select import poll, POLLIN, POLLOUT, POLLPRI, POLLHUP, POLLERR, POLLNVAL
-from subprocess import Popen, PIPE
-from socket import socket as Socket
-
-from .type_helpers import isstr
-
-# #############################################################################
-# CONSTANTS
-# #############################################################################
-DEFAULT_POLL_FLAGS = POLLIN | POLLPRI | POLLHUP | POLLERR
-
-DEFAULT_POLL_TIMEOUT = 10
-
-# #############################################################################
-# EXCEPTIONS
-# #############################################################################
-
-
-def flags_to_str(flags):
- """ return short string representation for poll flags """
- text_parts = []
- if flags & POLLIN:
- text_parts.append('IN')
- if flags & POLLOUT:
- text_parts.append('OUT')
- if flags & POLLPRI:
- text_parts.append('PRI')
- if flags & POLLERR:
- text_parts.append('ERR')
- if flags & POLLHUP:
- text_parts.append('HUP')
- if flags & POLLNVAL:
- text_parts.append('INVALID')
- return ','.join(text_parts)
-
-
-class PollFlagsException(Exception):
- """ exception raised if polling returned unexpected flags """
- def __init__(self, flags, desc):
- super(PollFlagsException, self).__init__(
- 'Unexpected flags from polling {0}: {1}'.format(
- desc, flags_to_str(flags)))
-
-
-class PollHupException(Exception):
- """ exception raised when polled source sends a SIGHUP """
- def __init__(self, desc):
- super(PollHupException, self).__init__(
- 'Received HUP from polling, {0} was probably killed!'.format(desc))
-
-
-class PollUnknownSourceException(Exception):
- """ exception raised when polling returns unknown source """
- def __init__(self, file_no):
- super(PollUnknownSourceException, self).__init__(
- 'Unknown source returned from polling: file_no={0}!'.format(
- file_no))
-
-
-class Follower(object):
- """ uses select and poll to follow some set of pipes, files, sockets
-
- relies on data being line-based!
-
- will read as much data as possible and then wait for more
-
- Iterator over lines!
- """
-
- # iterator over zip(sources, file_nos, descriptions)
- _source_iter = None
-
- # for polling:
- _poller = None
- _flags = None
- _timeout = None
-
- def __init__(self, *sources_and_descriptions, **other_args):
- """ create a Follower for given sources and optional descriptions
-
- Will guess if args are just sources or also descriptions of these.
- All of these are possible:
-
- * Follower(src).
- * Follower(src, desc).
- * Follower(src1, src2, src3).
- * Follower(src1, desc1, src2, desc2, src3, desc3).
- * Follower(src1, src2, desc2, src3).
- * Follower(src, desc_ignored, desc_used) # warn but accept.
- * Follower(single_list_of_sources_and_descs).
-
- Not possible:
-
- * Follower().
- * Follower(desc, src).
-
- Descriptions must be strings, they identify the sources in generated
- output and are used in error messages
-
- Sources must be file handles, open pipes or open sockets (or anything
- else that gives a reasonable fileno(), so help of the :py:mod:`select`
- module)
-
- Sources are not closed!
-
- other_args can include flags and timeout
- """
-
- # take care about other_args
- if 'flags' in other_args:
- self._flags = other_args['flags']
- else:
- self._flags = DEFAULT_POLL_FLAGS
- if 'timeout' in other_args:
- self._timeout = other_args['timeout']
- else:
- self._timeout = DEFAULT_POLL_TIMEOUT
-
- for key in other_args.keys():
- if key not in ('flags', 'timeout'):
- raise ValueError('argument not recognized: {0}'.format(key))
-
- self._poller = poll()
-
- sources = []
- file_nos = []
- descriptions = []
-
- # take care of sources_and_descriptions
- if not sources_and_descriptions:
- raise ValueError('need at least one source!')
- elif len(sources_and_descriptions) == 1 \
- and isinstance(sources_and_descriptions[0], (list, tuple)):
- sources_and_descriptions = sources_and_descriptions[0]
-
- for arg in sources_and_descriptions:
- if isstr(arg): # is a description
- if len(descriptions) == 0:
- raise ValueError(
- 'first arg must be source, not a description!')
- if descriptions[-1] is not None:
- warn('Overwriting description "{0}" with "{1}"'.format(
- descriptions[-1], arg))
- descriptions[-1] = arg
- else: # is a new source
- sources.append(arg)
- file_nos.append(arg.fileno())
- descriptions.append(None)
- self._poller.register(arg, self._flags)
- # end: for all args
-
- # need iterator over these 3 lists all the time
- self._source_iter = list((src, fno, desc) for src, fno, desc in
- zip(sources, file_nos, descriptions))
- # end: Follower constructor
-
- def next(self):
- """ main function to poll next line from sources
-
- returns (source, desc, line_stripped, flags)
- """
-
- while True:
- result = self._poller.poll(self._timeout)
- for file_no, flags in result:
-
- # identify source
- desc = -1
- source = None
- for curr_src, curr_no, curr_desc in self._source_iter:
- if curr_no == file_no:
- desc = curr_desc
- source = curr_src
- break
- # end: loop over self._source_iter
- if desc is -1:
- raise PollUnknownSourceException(file_no)
-
- if not flags & self._flags:
- raise PollFlagsException(flags, desc)
-
- if flags & POLLHUP:
- #raise PollHupException(desc)
- warn('received a hangup during polling for {0}'
- .format(desc))
-
- if flags & POLLERR:
- warn('received an err during polling for {0}'
- .format(desc))
-
- if flags & POLLNVAL:
- warn('poll replied "invalid request" err during polling '
- 'for {0}'
- .format(desc))
-
- # read line from source
- line = source.read() # was readline(), but try something else
- # removed the rstrip() from return
-
- if not line:
- continue
- # if reach here, we have a new line of text
-
- return source, desc, line, flags
- # for poll results
- # end: inf loop
- # end: Follower._next_line
-
- def __iter__(self):
- """ makes this an iterator, called by iter(my_follower) """
- return self
-
- def __next__(self):
- """ called by next(my_follower) """
- return self.next()
-# end: class Follower
-
-
-# #############################################################################
-# CONTEXT MANAGEMENT
-# #############################################################################
-
-#: defualt args for Popen constructor in case a process is given as cmd
-DEFAULT_SUBPROCESS_ARGS = dict(bufsize=1, stdin=None, stdout=PIPE, stderr=PIPE)
-
-class FollowContextManager(object):
- """ context for Follower objects, ensures everything is closed properly
-
- opens and closes files and sockets; communicates and waits for Popen
- process objects whose stdout and stderr pipes are followed
- """
-
- files = None
- file_descs = None
- file_handles = None
-
- sockets = None
- socket_descs = None
-
- procs = None
- proc_descs = None
- proc_objs = None
-
- def __init__(self, files=None, file_descs=None,
- sockets=None, socket_descs=None,
- procs=None, proc_descs=None,
- subprocess_args=None):
- """ create a context manager for Follower
-
- check args and that they match.
- tries to guess good descs for files, sockets and procs that are not
- given
-
- :param files: list/tuple of, or single file handle or file name
- :param file_descs: None or list/tuple of same length as files
- :param sockets: list/tuple of, or single socket
- :param socket_descs: None or list/tuple of same length as sockets
- :param procs: list/tuple of, or single Popen object or command itself
- as str/list/tuple
- :param proc_descs: None or list/tuple of same length as procs
- :param dict subprocess_args: dict or args that are merged with
- :py:data:`DEFAULT_SUBPROCESS_ARGS`
-
- ..seealso:: could have tried to implement this as a nesting of many
- individual context managers, see :py:mod:`contextlib`
- """
- print('__init__')
-
- # set files and file_descs and ensure that they matching lists
- if files is None:
- self.files = []
- elif isstr(files):
- self.files = [files, ]
- elif isinstance(files, file):
- self.files = [files, ]
- else:
- self.files = files
- if file_descs is None:
- temp_descs = [None for _ in self.files]
- elif len(self.files) == len(file_descs):
- temp_descs = file_descs
- else:
- raise ValueError('if given file descs, need desc for all files!')
-
- # try to guess good file_desc values; ensure they are str
- self.file_descs = []
- for file_nr, (file, file_desc) in \
- enumerate(zip(self.files, temp_descs)):
- if isstr(file_desc):
- self.file_descs.append(file_desc)
- continue
- elif file_desc is None: # need to guess something
- if isstr(file):
- self.file_descs.append(file)
- else:
- self.file_descs.append('file{0}'.format(file_nr))
- else: # desc is neither str nor None
- raise ValueError('file descs must be string or None!')
-
- # set sockets and socket_descs and ensure that they matching lists
- if sockets is None:
- self.sockets = []
- elif isinstance(sockets, Socket):
- self.sockets = [sockets, ]
- else:
- self.sockets = sockets
- if socket_descs is None:
- temp_descs = [None for _ in self.sockets]
- elif len(self.sockets) == len(socket_descs):
- temp_descs = socket_descs
- else:
- raise ValueError('if given socket descs, '
- 'need descs for all sockets!')
-
- # try to guess good socket_desc values; ensure they are str
- self.socket_descs = []
- for file_nr, (socket, socket_desc) in \
- enumerate(zip(self.sockets, temp_descs)):
- if isstr(socket_desc):
- self.socket_descs.append(socket_desc)
- elif socket_desc is None: # need to guess something
- self.socket_descs.append('socket{0}'.format(socket_nr))
- else: # desc is neither str nor None
- raise ValueError('socket descs must be string or None!')
-
- # set procs and proc_descs and ensure they matching lists
- if procs is None:
- self.procs = []
- elif isstr(procs):
- self.procs = [procs, ]
- elif isinstance(procs, Popen):
- self.procs = [procs, ]
- else:
- self.procs = procs
- if proc_descs is None:
- temp_descs = [None for _ in self.procs]
- elif len(proc_descs) == len(self.procs):
- temp_descs = proc_descs
- else:
- raise ValueError('if given proc descs, need descs for all procs!')
-
- # try to guess good proc_desc values; ensure they are str
- self.proc_descs = []
- for proc_nr, (proc, proc_desc) in \
- enumerate(zip(self.procs, temp_descs)):
- if isstr(proc_desc):
- self.proc_descs.append(proc_desc)
- elif proc_desc is None: # need to guess something
- if isstr(proc):
- self.proc_descs.append(proc)
- elif isinstance(proc, (tuple, list)):
- self.proc_descs.append(' '.join(proc))
- elif isinstance(proc, Popen):
- if isstr(proc.args):
- self.proc_descs.append(proc.args)
- else:
- self.proc_descs.append(' '.join(proc.args))
- else:
- self.proc_descs.append('proc{0}'.format(proc_nr))
- else: # desc is neither str nor None
- raise ValueError('proc descs must be string or None!')
-
- self.subprocess_args = DEFAULT_SUBPROCESS_ARGS
- if subprocess_args is not None:
- self.subprocess_args.update(subprocess_args)
-
- def __enter__(self):
- """ called when entering run context
-
- opens files, tries to create descs, and assembles args for Follower
- """
- print('__enter__')
-
- args = []
-
- # open files
- self.file_handles = []
- for file_arg, desc in zip(self.files, self.file_descs):
- if isinstance(file_arg, str):
- new_handle = open(file_arg)
- else: # assume file_arg is a file handle
- new_handle = file_arg
- self.file_handles.append(new_handle)
- args.append(new_handle)
- args.append(desc)
- # end: for files and file_descs
-
- for sock_number, (socket, desc) in \
- enumerate(zip(self.sockets, self.socket_descs)):
- args.append(socket)
- args.append(desc)
-
- self.proc_objs = []
- for proc, desc in zip(self.procs, self.proc_descs):
- if isstr(proc) or isinstance(proc, (list,tuple)):
- proc = Popen(proc, **self.subprocess_args)
- self.proc_objs.append(proc)
- args.append(proc.stdout)
- args.append(desc + '_out')
- args.append(proc.stderr)
- args.append(desc + '_err')
-
- return Follower(args)
-
- def __exit__(self, exc_type, exc_value, traceback):
- """ called when leaving run context """
- print('__exit__ with args {0}, {1}, {2}'.format(exc_type, exc_value,
- traceback))
-
- # close files
- for handle in self.file_handles:
- try:
- handle.close()
- except Exception:
- warn('ignoring exception exiting follower context manager!')
- for proc in self.proc_objs:
- try:
- proc.kill()
- rest_out, rest_err = proc.communicate()
- if rest_out or rest_err:
- warn('Ignoring left-over output in proc')
- except Exception:
- warn('ignoring exception exiting follower context manager!')
-# end: class FollowContextManager
-
-
-def follow(*args, **kwargs):
- """ creates a ContextManager for a Follower to be used in "with" statements
-
- for example:
-
- with follow('/var/log/messages') as log_flwr
- for source, desc, line in log_flwr
- do_something_with(line)
-
- for specification of args see FollowContextManager constructor help
- """
- return FollowContextManager(*args, **kwargs)
-
-
-# #############################################################################
-# TESTING
-# (hard to create unittest -- would need separate threads to write to some
-# file /pipe/socket so can test the interactivity...)
-# #############################################################################
-
-from datetime import date, datetime as dt, timedelta as td
-from subprocess import Popen, PIPE
-
-syslog_file = '/var/log/messages'
-time_diff_seconds = 60
-syslog_time_format = '%b %d %H:%M:%S'
-
-
-def test_syslog_line(n_lines, source, desc, line,
- today_str, start_time):
- """ called by test functions for each line of syslog """
-
- if n_lines % 1000 == 0:
- print(('{0:6d} old lines, showing lines after {1}; '
- 'abort using Ctrl-C').format(n_lines, start_time))
- if line[:6] != today_str:
- return
- try:
- log_time = dt.strptime(line[:15], syslog_time_format)
- log_time = log_time.replace(year=start_time.year)
- except ValueError:
- log_time = None
-
- if log_time is None or log_time > start_time:
- print('line {0} from "{1}", (orig from {2}): {3}'.format(
- n_lines, desc, log_time, line))
-
-
-def test_follower_syslog():
- """ test Follower on syslog file using PIPEs and an open file
-
- show lines from the last 5 minutes
- """
-
- start_time = dt.now() - td(seconds=time_diff_seconds)
- today_str = date.today().strftime(syslog_time_format)[:6]
- if today_str[4] == '0':
- today_str = today_str[:4] + ' ' + today_str[5:] # 'Oct 01' --> 'Oct 1'
-
- # create process for 'tail -f syslog' --> 2 pipes (stdout + stderr)
- proc = Popen(['tail', '-f', syslog_file], stdout=PIPE, stderr=PIPE)
-
- # read from all three sources
- with open(syslog_file, 'r') as file_handle:
- flwr = Follower(file_handle, 'syslog file', proc.stdout, 'tail syslog',
- proc.stderr, 'tail stderr')
- for n_lines, (source, desc, line, flags) in enumerate(flwr):
- if flags:
- print('received flags {0}'.format(flags_to_str(flags)))
- test_syslog_line(n_lines, source, desc, line,
- today_str, start_time)
-
-
-def test_follower_context():
- """ test FollowContextManager and follow() function """
-
- today_str = date.today().strftime(syslog_time_format)[:6]
- if today_str[4] == '0':
- today_str = today_str[:4] + ' ' + today_str[5:] # 'Oct 01' --> 'Oct 1'
- start_time = dt.now() - td(seconds=time_diff_seconds)
- with follow(syslog_file) as flwr:
- for n_lines, (source, desc, line, flags) in enumerate(flwr):
- if flags:
- print('received flags {0}'.format(flags_to_str(flags)))
- test_syslog_line(n_lines, source, desc, line,
- today_str, start_time)
-
-def test_context_proc():
- """ test FollowContextManager's ability to wrap proc args """
-
- today_str = date.today().strftime(syslog_time_format)[:6]
- if today_str[4] == '0':
- today_str = today_str[:4] + ' ' + today_str[5:] # 'Oct 01' --> 'Oct 1'
- start_time = dt.now() - td(seconds=time_diff_seconds)
- with follow(procs=(['tail', '-f', syslog_file], )) as flwr:
- for n_lines, (source, desc, line, flags) in enumerate(flwr):
- if flags:
- print('received flags {0}'.format(flags_to_str(flags)))
- test_syslog_line(n_lines, source, desc, line,
- today_str, start_time)
-
-
-def test():
- """ Main function, tests some of class's functionality """
-
- #test_follower_syslog()
- #test_follower_context()
- test_context_proc()
-# end: function main
-
-
-if __name__ == '__main__':
- test()
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-""" Unittest for follow.py
-
-This is a little more involved since in order to test properly, need a second
-thread that writes to file/socket/stdout
-
-.. todo:: as in log_read_unittest: disable buffering in LogFileWriter
-
-.. warn:: DEPRECATED (at least for files see log_read; may still be useful for
- pipes/sockets)
-"""
-
-from __future__ import absolute_import
-
-import unittest
-from tempfile import mkstemp
-import threading
-from src.follow import *
-from time import sleep
-import os
-import os.path
-from src.test_helpers import get_perf_counter
-
-# different counter needed on different python versions
-perf_counter = get_perf_counter()
-
-
-class LogFileWriter(threading.Thread):
- """ thread that creates and writes to given file handle """
-
- def __init__(self, file_name, text_pattern, n_writes=None,
- pause_time=0.1, do_encode=None):
- """ creates thread, deamon is True
-
- if n_writes is None, will write indefinitely; else writes text_pattern
- n_writes times, formatted with (counter, perf_counter)
- """
- super(LogFileWriter, self).__init__()
- self.daemon = True
- self.file_name = file_name
- self.text_pattern = text_pattern
- self.n_writes = n_writes
- self.pause_time = pause_time
- self.do_encode = do_encode
-
- def run(self):
- counter = 0
- if self.do_encode:
- mode = 'wb'
- else:
- mode = 'wt'
-
- with open(self.file_name, mode) as file_handle:
- while True:
- if self.n_writes is not None and counter >= self.n_writes:
- break
-
- if self.do_encode:
- file_handle.write(self.text_pattern
- .format(counter, perf_counter())
- .encode(self.do_encode))
- else:
- file_handle.write(self.text_pattern
- .format(counter, perf_counter()))
- print('wrote {0}'.format(counter))
- counter += 1
- sleep(self.pause_time)
-
-
-@unittest.skip("Need to debug but deprecated anyway")
-class FollowTester(unittest.TestCase):
- """ Unittest for follow.py """
-
- def test_logfile(self):
- """ create logfile, write to it, expect output """
-
- text_pattern = '{0}:{1}\n'
- n_texts = 10
- pause_time = 1
- encoding = 'ascii'
-
- LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts,
- pause_time=pause_time, do_encode=encoding).start()
- print('testing with log file {0}'.format(self.temp_file))
- time_diffs = []
-
- with open(self.temp_file) as read_handle:
- follower = Follower(read_handle)
- for counter, (source, desc, text, flags) in \
- enumerate(follower):
- receive_time = perf_counter()
- print('received text: "{0}"'.format(text))
- index = text.index(':')
- counter = int(text[:index].strip())
- write_time = float(text[index+1:].strip())
- time_diffs.append(receive_time - write_time)
- if counter == n_texts:
- break
-
-
- def setUp(self):
- """ called before each test """
- print('setup test')
- temp_handle, temp_name = mkstemp()
- os.close(temp_handle)
- self.temp_file = temp_name
- print('created temp file ' + self.temp_file)
-
- def tearDown(self):
- """ called after each test """
- print('tear down test')
- if os.path.isfile(self.temp_file):
- print('delete temp file' + self.temp_file)
- os.unlink(self.temp_file)
-
- @classmethod
- def setUpClass(clz):
- """ called once before the first test """
- print('setup test class')
- clz.temp_file = None
-
- @classmethod
- def tearDownClass(clz):
- """ called once after the last test """
- print('tear down test class')
-
-
-
-if __name__ == '__main__':
- unittest.main()