From: Christian Herdtweck Date: Mon, 8 Oct 2018 07:27:14 +0000 (+0200) Subject: Remove deprecated log reader utility X-Git-Tag: v1.3~19^2~8 X-Git-Url: http://developer.intra2net.com/git/?a=commitdiff_plain;h=869379a39b73caf64131616506e11cfced8c1aea;p=pyi2ncommon Remove deprecated log reader utility Use log_read instead --- diff --git a/src/follow.py b/src/follow.py deleted file mode 100644 index 314dcf3..0000000 --- a/src/follow.py +++ /dev/null @@ -1,638 +0,0 @@ -#!/usr/bin/env python - -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. -# -# Copyright (c) 2016-2018 Intra2net AG - -""" -follow process output, log files and pipes using select and poll - -DEPRECATED -(at least for files see log_read; may still be usefull for pipes/sockets) - -Main class is Follower which does the polling and selecting, it is best used -in a with-statement as follows:: - - with follow('/var/log/messages') as flwr - for line in flwr: - do_something_with(line) - -This will read the given file and yield its contents line-by-line until the end -of the file. It will then wait for new additions to the file and provide the -new lines newly instantaneously - -Things to note: - -* all data must be line-based! -* will only work on Linux (except for sockets maybe). -* create in py2 but try to stay py3-compatible. - -[START: not implemented yet] - -If following a log file, a LogParser can be attached that auto-detects some of -the log's structure (like date and time fields, log levels and sources) from -its first few lines.. This can be used anlogously. Of course, you can also -specify the log's structure (to be implemented before the auto-detect...):: - - with follow_log('/var/log/messages') as log_flwr: - for content in log_flwr: - do_something_with(content.datetime, content.log_level, content.text) - -[END: not implemented yet] - -A Follower is an iterator, which means you can do lots of cool things with it, -including (see also itertools package, itertool recipies, "Functional -Programming Howto"):: - - # ignore source and description: - for _, _, text_line in my_follower: - do_something_with(line) - - # enumerate: - for line_number, line in enumerate(my_follower) - do_something_with(line, line_number) - - # combine with other iterator: - for line, other_data in zip(my_follwer, other_iter) - do_something_with(line, other_data) - - # filter: - for line in my_follower if test(my_func): - do_something_with(line) - - # tee: - iter1, iter2 = itertools.tee(my_follower) - --> can read from both (but each line is given to only one of them) - - # for each element, peek at the next one to help do the right thing - for line, next_line in pairwise(my_follower): - do_something_with(line, peek_next_line=next_line) - - # create new iterator or generator - for line in my_follwer: - some_result = do_something_with(line) - yield some_result - -NOT possible:: - - len(my_follower) - Follower(my_file, my_file) # (twice the same) -""" - -from __future__ import print_function -from warnings import warn -from select import poll, POLLIN, POLLOUT, POLLPRI, POLLHUP, POLLERR, POLLNVAL -from subprocess import Popen, PIPE -from socket import socket as Socket - -from .type_helpers import isstr - -# ############################################################################# -# CONSTANTS -# ############################################################################# -DEFAULT_POLL_FLAGS = POLLIN | POLLPRI | POLLHUP | POLLERR - -DEFAULT_POLL_TIMEOUT = 10 - -# ############################################################################# -# EXCEPTIONS -# ############################################################################# - - -def flags_to_str(flags): - """ return short string representation for poll flags """ - text_parts = [] - if flags & POLLIN: - text_parts.append('IN') - if flags & POLLOUT: - text_parts.append('OUT') - if flags & POLLPRI: - text_parts.append('PRI') - if flags & POLLERR: - text_parts.append('ERR') - if flags & POLLHUP: - text_parts.append('HUP') - if flags & POLLNVAL: - text_parts.append('INVALID') - return ','.join(text_parts) - - -class PollFlagsException(Exception): - """ exception raised if polling returned unexpected flags """ - def __init__(self, flags, desc): - super(PollFlagsException, self).__init__( - 'Unexpected flags from polling {0}: {1}'.format( - desc, flags_to_str(flags))) - - -class PollHupException(Exception): - """ exception raised when polled source sends a SIGHUP """ - def __init__(self, desc): - super(PollHupException, self).__init__( - 'Received HUP from polling, {0} was probably killed!'.format(desc)) - - -class PollUnknownSourceException(Exception): - """ exception raised when polling returns unknown source """ - def __init__(self, file_no): - super(PollUnknownSourceException, self).__init__( - 'Unknown source returned from polling: file_no={0}!'.format( - file_no)) - - -class Follower(object): - """ uses select and poll to follow some set of pipes, files, sockets - - relies on data being line-based! - - will read as much data as possible and then wait for more - - Iterator over lines! - """ - - # iterator over zip(sources, file_nos, descriptions) - _source_iter = None - - # for polling: - _poller = None - _flags = None - _timeout = None - - def __init__(self, *sources_and_descriptions, **other_args): - """ create a Follower for given sources and optional descriptions - - Will guess if args are just sources or also descriptions of these. - All of these are possible: - - * Follower(src). - * Follower(src, desc). - * Follower(src1, src2, src3). - * Follower(src1, desc1, src2, desc2, src3, desc3). - * Follower(src1, src2, desc2, src3). - * Follower(src, desc_ignored, desc_used) # warn but accept. - * Follower(single_list_of_sources_and_descs). - - Not possible: - - * Follower(). - * Follower(desc, src). - - Descriptions must be strings, they identify the sources in generated - output and are used in error messages - - Sources must be file handles, open pipes or open sockets (or anything - else that gives a reasonable fileno(), so help of the :py:mod:`select` - module) - - Sources are not closed! - - other_args can include flags and timeout - """ - - # take care about other_args - if 'flags' in other_args: - self._flags = other_args['flags'] - else: - self._flags = DEFAULT_POLL_FLAGS - if 'timeout' in other_args: - self._timeout = other_args['timeout'] - else: - self._timeout = DEFAULT_POLL_TIMEOUT - - for key in other_args.keys(): - if key not in ('flags', 'timeout'): - raise ValueError('argument not recognized: {0}'.format(key)) - - self._poller = poll() - - sources = [] - file_nos = [] - descriptions = [] - - # take care of sources_and_descriptions - if not sources_and_descriptions: - raise ValueError('need at least one source!') - elif len(sources_and_descriptions) == 1 \ - and isinstance(sources_and_descriptions[0], (list, tuple)): - sources_and_descriptions = sources_and_descriptions[0] - - for arg in sources_and_descriptions: - if isstr(arg): # is a description - if len(descriptions) == 0: - raise ValueError( - 'first arg must be source, not a description!') - if descriptions[-1] is not None: - warn('Overwriting description "{0}" with "{1}"'.format( - descriptions[-1], arg)) - descriptions[-1] = arg - else: # is a new source - sources.append(arg) - file_nos.append(arg.fileno()) - descriptions.append(None) - self._poller.register(arg, self._flags) - # end: for all args - - # need iterator over these 3 lists all the time - self._source_iter = list((src, fno, desc) for src, fno, desc in - zip(sources, file_nos, descriptions)) - # end: Follower constructor - - def next(self): - """ main function to poll next line from sources - - returns (source, desc, line_stripped, flags) - """ - - while True: - result = self._poller.poll(self._timeout) - for file_no, flags in result: - - # identify source - desc = -1 - source = None - for curr_src, curr_no, curr_desc in self._source_iter: - if curr_no == file_no: - desc = curr_desc - source = curr_src - break - # end: loop over self._source_iter - if desc is -1: - raise PollUnknownSourceException(file_no) - - if not flags & self._flags: - raise PollFlagsException(flags, desc) - - if flags & POLLHUP: - #raise PollHupException(desc) - warn('received a hangup during polling for {0}' - .format(desc)) - - if flags & POLLERR: - warn('received an err during polling for {0}' - .format(desc)) - - if flags & POLLNVAL: - warn('poll replied "invalid request" err during polling ' - 'for {0}' - .format(desc)) - - # read line from source - line = source.read() # was readline(), but try something else - # removed the rstrip() from return - - if not line: - continue - # if reach here, we have a new line of text - - return source, desc, line, flags - # for poll results - # end: inf loop - # end: Follower._next_line - - def __iter__(self): - """ makes this an iterator, called by iter(my_follower) """ - return self - - def __next__(self): - """ called by next(my_follower) """ - return self.next() -# end: class Follower - - -# ############################################################################# -# CONTEXT MANAGEMENT -# ############################################################################# - -#: defualt args for Popen constructor in case a process is given as cmd -DEFAULT_SUBPROCESS_ARGS = dict(bufsize=1, stdin=None, stdout=PIPE, stderr=PIPE) - -class FollowContextManager(object): - """ context for Follower objects, ensures everything is closed properly - - opens and closes files and sockets; communicates and waits for Popen - process objects whose stdout and stderr pipes are followed - """ - - files = None - file_descs = None - file_handles = None - - sockets = None - socket_descs = None - - procs = None - proc_descs = None - proc_objs = None - - def __init__(self, files=None, file_descs=None, - sockets=None, socket_descs=None, - procs=None, proc_descs=None, - subprocess_args=None): - """ create a context manager for Follower - - check args and that they match. - tries to guess good descs for files, sockets and procs that are not - given - - :param files: list/tuple of, or single file handle or file name - :param file_descs: None or list/tuple of same length as files - :param sockets: list/tuple of, or single socket - :param socket_descs: None or list/tuple of same length as sockets - :param procs: list/tuple of, or single Popen object or command itself - as str/list/tuple - :param proc_descs: None or list/tuple of same length as procs - :param dict subprocess_args: dict or args that are merged with - :py:data:`DEFAULT_SUBPROCESS_ARGS` - - ..seealso:: could have tried to implement this as a nesting of many - individual context managers, see :py:mod:`contextlib` - """ - print('__init__') - - # set files and file_descs and ensure that they matching lists - if files is None: - self.files = [] - elif isstr(files): - self.files = [files, ] - elif isinstance(files, file): - self.files = [files, ] - else: - self.files = files - if file_descs is None: - temp_descs = [None for _ in self.files] - elif len(self.files) == len(file_descs): - temp_descs = file_descs - else: - raise ValueError('if given file descs, need desc for all files!') - - # try to guess good file_desc values; ensure they are str - self.file_descs = [] - for file_nr, (file, file_desc) in \ - enumerate(zip(self.files, temp_descs)): - if isstr(file_desc): - self.file_descs.append(file_desc) - continue - elif file_desc is None: # need to guess something - if isstr(file): - self.file_descs.append(file) - else: - self.file_descs.append('file{0}'.format(file_nr)) - else: # desc is neither str nor None - raise ValueError('file descs must be string or None!') - - # set sockets and socket_descs and ensure that they matching lists - if sockets is None: - self.sockets = [] - elif isinstance(sockets, Socket): - self.sockets = [sockets, ] - else: - self.sockets = sockets - if socket_descs is None: - temp_descs = [None for _ in self.sockets] - elif len(self.sockets) == len(socket_descs): - temp_descs = socket_descs - else: - raise ValueError('if given socket descs, ' - 'need descs for all sockets!') - - # try to guess good socket_desc values; ensure they are str - self.socket_descs = [] - for file_nr, (socket, socket_desc) in \ - enumerate(zip(self.sockets, temp_descs)): - if isstr(socket_desc): - self.socket_descs.append(socket_desc) - elif socket_desc is None: # need to guess something - self.socket_descs.append('socket{0}'.format(socket_nr)) - else: # desc is neither str nor None - raise ValueError('socket descs must be string or None!') - - # set procs and proc_descs and ensure they matching lists - if procs is None: - self.procs = [] - elif isstr(procs): - self.procs = [procs, ] - elif isinstance(procs, Popen): - self.procs = [procs, ] - else: - self.procs = procs - if proc_descs is None: - temp_descs = [None for _ in self.procs] - elif len(proc_descs) == len(self.procs): - temp_descs = proc_descs - else: - raise ValueError('if given proc descs, need descs for all procs!') - - # try to guess good proc_desc values; ensure they are str - self.proc_descs = [] - for proc_nr, (proc, proc_desc) in \ - enumerate(zip(self.procs, temp_descs)): - if isstr(proc_desc): - self.proc_descs.append(proc_desc) - elif proc_desc is None: # need to guess something - if isstr(proc): - self.proc_descs.append(proc) - elif isinstance(proc, (tuple, list)): - self.proc_descs.append(' '.join(proc)) - elif isinstance(proc, Popen): - if isstr(proc.args): - self.proc_descs.append(proc.args) - else: - self.proc_descs.append(' '.join(proc.args)) - else: - self.proc_descs.append('proc{0}'.format(proc_nr)) - else: # desc is neither str nor None - raise ValueError('proc descs must be string or None!') - - self.subprocess_args = DEFAULT_SUBPROCESS_ARGS - if subprocess_args is not None: - self.subprocess_args.update(subprocess_args) - - def __enter__(self): - """ called when entering run context - - opens files, tries to create descs, and assembles args for Follower - """ - print('__enter__') - - args = [] - - # open files - self.file_handles = [] - for file_arg, desc in zip(self.files, self.file_descs): - if isinstance(file_arg, str): - new_handle = open(file_arg) - else: # assume file_arg is a file handle - new_handle = file_arg - self.file_handles.append(new_handle) - args.append(new_handle) - args.append(desc) - # end: for files and file_descs - - for sock_number, (socket, desc) in \ - enumerate(zip(self.sockets, self.socket_descs)): - args.append(socket) - args.append(desc) - - self.proc_objs = [] - for proc, desc in zip(self.procs, self.proc_descs): - if isstr(proc) or isinstance(proc, (list,tuple)): - proc = Popen(proc, **self.subprocess_args) - self.proc_objs.append(proc) - args.append(proc.stdout) - args.append(desc + '_out') - args.append(proc.stderr) - args.append(desc + '_err') - - return Follower(args) - - def __exit__(self, exc_type, exc_value, traceback): - """ called when leaving run context """ - print('__exit__ with args {0}, {1}, {2}'.format(exc_type, exc_value, - traceback)) - - # close files - for handle in self.file_handles: - try: - handle.close() - except Exception: - warn('ignoring exception exiting follower context manager!') - for proc in self.proc_objs: - try: - proc.kill() - rest_out, rest_err = proc.communicate() - if rest_out or rest_err: - warn('Ignoring left-over output in proc') - except Exception: - warn('ignoring exception exiting follower context manager!') -# end: class FollowContextManager - - -def follow(*args, **kwargs): - """ creates a ContextManager for a Follower to be used in "with" statements - - for example: - - with follow('/var/log/messages') as log_flwr - for source, desc, line in log_flwr - do_something_with(line) - - for specification of args see FollowContextManager constructor help - """ - return FollowContextManager(*args, **kwargs) - - -# ############################################################################# -# TESTING -# (hard to create unittest -- would need separate threads to write to some -# file /pipe/socket so can test the interactivity...) -# ############################################################################# - -from datetime import date, datetime as dt, timedelta as td -from subprocess import Popen, PIPE - -syslog_file = '/var/log/messages' -time_diff_seconds = 60 -syslog_time_format = '%b %d %H:%M:%S' - - -def test_syslog_line(n_lines, source, desc, line, - today_str, start_time): - """ called by test functions for each line of syslog """ - - if n_lines % 1000 == 0: - print(('{0:6d} old lines, showing lines after {1}; ' - 'abort using Ctrl-C').format(n_lines, start_time)) - if line[:6] != today_str: - return - try: - log_time = dt.strptime(line[:15], syslog_time_format) - log_time = log_time.replace(year=start_time.year) - except ValueError: - log_time = None - - if log_time is None or log_time > start_time: - print('line {0} from "{1}", (orig from {2}): {3}'.format( - n_lines, desc, log_time, line)) - - -def test_follower_syslog(): - """ test Follower on syslog file using PIPEs and an open file - - show lines from the last 5 minutes - """ - - start_time = dt.now() - td(seconds=time_diff_seconds) - today_str = date.today().strftime(syslog_time_format)[:6] - if today_str[4] == '0': - today_str = today_str[:4] + ' ' + today_str[5:] # 'Oct 01' --> 'Oct 1' - - # create process for 'tail -f syslog' --> 2 pipes (stdout + stderr) - proc = Popen(['tail', '-f', syslog_file], stdout=PIPE, stderr=PIPE) - - # read from all three sources - with open(syslog_file, 'r') as file_handle: - flwr = Follower(file_handle, 'syslog file', proc.stdout, 'tail syslog', - proc.stderr, 'tail stderr') - for n_lines, (source, desc, line, flags) in enumerate(flwr): - if flags: - print('received flags {0}'.format(flags_to_str(flags))) - test_syslog_line(n_lines, source, desc, line, - today_str, start_time) - - -def test_follower_context(): - """ test FollowContextManager and follow() function """ - - today_str = date.today().strftime(syslog_time_format)[:6] - if today_str[4] == '0': - today_str = today_str[:4] + ' ' + today_str[5:] # 'Oct 01' --> 'Oct 1' - start_time = dt.now() - td(seconds=time_diff_seconds) - with follow(syslog_file) as flwr: - for n_lines, (source, desc, line, flags) in enumerate(flwr): - if flags: - print('received flags {0}'.format(flags_to_str(flags))) - test_syslog_line(n_lines, source, desc, line, - today_str, start_time) - -def test_context_proc(): - """ test FollowContextManager's ability to wrap proc args """ - - today_str = date.today().strftime(syslog_time_format)[:6] - if today_str[4] == '0': - today_str = today_str[:4] + ' ' + today_str[5:] # 'Oct 01' --> 'Oct 1' - start_time = dt.now() - td(seconds=time_diff_seconds) - with follow(procs=(['tail', '-f', syslog_file], )) as flwr: - for n_lines, (source, desc, line, flags) in enumerate(flwr): - if flags: - print('received flags {0}'.format(flags_to_str(flags))) - test_syslog_line(n_lines, source, desc, line, - today_str, start_time) - - -def test(): - """ Main function, tests some of class's functionality """ - - #test_follower_syslog() - #test_follower_context() - test_context_proc() -# end: function main - - -if __name__ == '__main__': - test() diff --git a/test/test_follow.py b/test/test_follow.py deleted file mode 100644 index bcbd168..0000000 --- a/test/test_follow.py +++ /dev/null @@ -1,149 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. -# -# Copyright (c) 2016-2018 Intra2net AG - -""" Unittest for follow.py - -This is a little more involved since in order to test properly, need a second -thread that writes to file/socket/stdout - -.. todo:: as in log_read_unittest: disable buffering in LogFileWriter - -.. warn:: DEPRECATED (at least for files see log_read; may still be useful for - pipes/sockets) -""" - -from __future__ import absolute_import - -import unittest -from tempfile import mkstemp -import threading -from src.follow import * -from time import sleep -import os -import os.path -from src.test_helpers import get_perf_counter - -# different counter needed on different python versions -perf_counter = get_perf_counter() - - -class LogFileWriter(threading.Thread): - """ thread that creates and writes to given file handle """ - - def __init__(self, file_name, text_pattern, n_writes=None, - pause_time=0.1, do_encode=None): - """ creates thread, deamon is True - - if n_writes is None, will write indefinitely; else writes text_pattern - n_writes times, formatted with (counter, perf_counter) - """ - super(LogFileWriter, self).__init__() - self.daemon = True - self.file_name = file_name - self.text_pattern = text_pattern - self.n_writes = n_writes - self.pause_time = pause_time - self.do_encode = do_encode - - def run(self): - counter = 0 - if self.do_encode: - mode = 'wb' - else: - mode = 'wt' - - with open(self.file_name, mode) as file_handle: - while True: - if self.n_writes is not None and counter >= self.n_writes: - break - - if self.do_encode: - file_handle.write(self.text_pattern - .format(counter, perf_counter()) - .encode(self.do_encode)) - else: - file_handle.write(self.text_pattern - .format(counter, perf_counter())) - print('wrote {0}'.format(counter)) - counter += 1 - sleep(self.pause_time) - - -@unittest.skip("Need to debug but deprecated anyway") -class FollowTester(unittest.TestCase): - """ Unittest for follow.py """ - - def test_logfile(self): - """ create logfile, write to it, expect output """ - - text_pattern = '{0}:{1}\n' - n_texts = 10 - pause_time = 1 - encoding = 'ascii' - - LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts, - pause_time=pause_time, do_encode=encoding).start() - print('testing with log file {0}'.format(self.temp_file)) - time_diffs = [] - - with open(self.temp_file) as read_handle: - follower = Follower(read_handle) - for counter, (source, desc, text, flags) in \ - enumerate(follower): - receive_time = perf_counter() - print('received text: "{0}"'.format(text)) - index = text.index(':') - counter = int(text[:index].strip()) - write_time = float(text[index+1:].strip()) - time_diffs.append(receive_time - write_time) - if counter == n_texts: - break - - - def setUp(self): - """ called before each test """ - print('setup test') - temp_handle, temp_name = mkstemp() - os.close(temp_handle) - self.temp_file = temp_name - print('created temp file ' + self.temp_file) - - def tearDown(self): - """ called after each test """ - print('tear down test') - if os.path.isfile(self.temp_file): - print('delete temp file' + self.temp_file) - os.unlink(self.temp_file) - - @classmethod - def setUpClass(clz): - """ called once before the first test """ - print('setup test class') - clz.temp_file = None - - @classmethod - def tearDownClass(clz): - """ called once after the last test """ - print('tear down test class') - - - -if __name__ == '__main__': - unittest.main()