From 14e8b60dc5ac98ebc8e1993b46913ef248883809 Mon Sep 17 00:00:00 2001 From: Christian Herdtweck Date: Thu, 14 Jan 2016 17:18:11 +0100 Subject: [PATCH] moved python files from root dir into new src subdir as recommended in distutils docu --- arnied.py | 46 ---- buffers.py | 149 ------------ call_helpers.py | 83 ------- file_helpers.py | 176 -------------- follow.py | 638 --------------------------------------------------- iter_helpers.py | 33 --- log_helpers.py | 462 ------------------------------------- log_read.py | 308 ------------------------- src/arnied.py | 46 ++++ src/buffers.py | 149 ++++++++++++ src/call_helpers.py | 83 +++++++ src/file_helpers.py | 176 ++++++++++++++ src/follow.py | 638 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/iter_helpers.py | 33 +++ src/log_helpers.py | 462 +++++++++++++++++++++++++++++++++++++ src/log_read.py | 308 +++++++++++++++++++++++++ src/template.py | 100 ++++++++ src/test_helpers.py | 367 +++++++++++++++++++++++++++++ src/type_helpers.py | 100 ++++++++ template.py | 100 -------- test_helpers.py | 367 ----------------------------- type_helpers.py | 100 -------- 22 files changed, 2462 insertions(+), 2462 deletions(-) delete mode 100644 arnied.py delete mode 100644 buffers.py delete mode 100644 call_helpers.py delete mode 100644 file_helpers.py delete mode 100644 follow.py delete mode 100644 iter_helpers.py delete mode 100644 log_helpers.py delete mode 100644 log_read.py create mode 100644 src/arnied.py create mode 100644 src/buffers.py create mode 100644 src/call_helpers.py create mode 100644 src/file_helpers.py create mode 100644 src/follow.py create mode 100644 src/iter_helpers.py create mode 100644 src/log_helpers.py create mode 100644 src/log_read.py create mode 100644 src/template.py create mode 100644 src/test_helpers.py create mode 100644 src/type_helpers.py delete mode 100644 template.py delete mode 100644 test_helpers.py delete mode 100644 type_helpers.py diff --git a/arnied.py b/arnied.py deleted file mode 100644 index 38132f0..0000000 --- a/arnied.py +++ /dev/null @@ -1,46 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" Interface to arnied - -only a stub; should at least contain a set_cnf and get_cnf - -.. codeauthor:: Intra2net -""" - -def set_cnf(var_name, value): - """ not implemented yet """ - raise NotImplementedError() - - # not good enough: implementation in autotest guest/utils/backup_utils - - -def get_cnf(var_name, value): - """ not implemented yet """ - raise NotImplementedError() - - # not good enough: implementation in autotest guest/utils/backup_utils - - -def wait_for_generate(timeout=None): - """ wait for generate to run/finish - - to be copied from autotest arnied_wrapper - """ - - raise NotImplementedError() diff --git a/buffers.py b/buffers.py deleted file mode 100644 index 8e461ca..0000000 --- a/buffers.py +++ /dev/null @@ -1,149 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" -buffers.py: buffers of various shapes, sizes and functionalities - -Featuring:: - -* CircularBuffer -* LogarithmicBuffer: saves only last N items, and after that less and less so -* very few old items are kept - -.. codeauthor:: Intra2net -""" - -class CircularBuffer: - """ circular buffer for data; saves last N sets - - can return or run func on them afterwards in correct order - - public attributes (please read only!): buffer_size, n_items - """ - - buffer_size = None - _buffer = None - _buff_idx = None - n_items = None - - def __init__(self, size, empty_element=None): - """ initialize with N = size """ - if size < 1: - raise ValueError('size must be positive!') - self.buffer_size = size - self._buffer = [empty_element for idx in range(size)] - self._buff_idx = 0 - self.n_items = 0 - - def add(self, new_item): - """ add a new item to buffer -- might replace the oldest item """ - oldest_item = self._buffer[self._buff_idx] - self._buffer[self._buff_idx] = new_item - self._buff_idx = (self._buff_idx + 1) % self.buffer_size - self.n_items += 1 - return oldest_item - - def run_func(self, some_func): - """ run some_func(item) on last saved items in correct order """ - if self.n_items >= self.buffer_size: - for idx in range(self._buff_idx, self.buffer_size): - some_func(self._buffer[idx]) - - for idx in range(0, self._buff_idx): - some_func(self._buffer[idx]) - - def get_all(self): - """ return the buffered contents in correct order """ - result = [] - if self.n_items >= self.buffer_size: - for idx in range(self._buff_idx, self.buffer_size): - result.append(self._buffer[idx]) - - for idx in range(0, self._buff_idx): - result.append(self._buffer[idx]) - - return result - - -class LogarithmicBuffer: - """ saves only last N items, and after that less and less old data - - --> grows only logarithmically in size - - has a data_new which is a CircularBuffer for the newest N items - data_old is a list of items with ages approx 2, 4, 8, 16, ... counts - - Could easily extend to other bases than 2 to make even scarcer - """ - - def __init__(self, n_new): - self.n_new = n_new - if n_new < 0: - raise ValueError('n_new must be non-negative!') - if n_new == 0: - self._data_new = None - else: - self._data_new = CircularBuffer(n_new) - self._count = 0 - self._data_old = [] - - def add(self, new_item): - """ add a new item to buffer """ - if self._data_new: - newest_old_item = self._data_new.add(new_item) - age = self._count - self.n_new - else: - newest_old_item = new_item - age = self._count - - if age >= 0: - self._save_old(newest_old_item, age, 0) - self._count += 1 - - def _save_old(self, item, age, index): - """ internal helper for saving (or not) items in data_old """ - - # determine whether we throw it away or actually save it - if age % 2**index != 0: - return - - # we save it. But before check if we need to extend data_old or - # save the item we would overwrite - if len(self._data_old) <= index: - self._data_old.append(item) - else: - self._save_old(self._data_old[index], age, index+1) - self._data_old[index] = item - - def get_all(self): - """ get all buffer contents in correct order """ - if self._data_new: - return self._data_old[::-1] + self._data_new.get_all() - else: - return self._data_old[::-1] - - def get_size(self): - """ returns current number of saved elements in buffer - - size is O(log n) where n = number of added items - - more precisely: size is n_new + floor(log_2(m-1))+1 - where: m = n-n_new > 1 - (m=0 --> size=n_new; m=1 --> size=n_new+1) - """ - return self.n_new + len(self._data_old) diff --git a/call_helpers.py b/call_helpers.py deleted file mode 100644 index 1739d5f..0000000 --- a/call_helpers.py +++ /dev/null @@ -1,83 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" Helpers for calling commands, capture their output, return result code - -Subprocess library just does not provide all the simplicity we would like - -Stay python2 compatible --> no timeouts - -.. codeauthor:: Intra2net -""" - -from subprocess import Popen, PIPE - - -def call_and_capture(command, stdin_data=None, split_lines=True, - *args, **kwargs): - """ call command, captures stdout, stderr and return code, return when done - - Use only for commands with little output since all output has to be - buffered! - Quoting :py:mod:`subprocess`: - - ..note:: The data read is buffered in memory, so do not use this method if - the data size is large or unlimited. - - Forwards all args to Popen constructor, except: - stdout=PIPE (forced, ignored if in kwargs) - stderr=PIPE (forced, ignored if in kwargs) - shell=False (except if set in kwargs) - universal_newlines=True (except if set in kwargs) - - :param command: forwarded as first arg to Popen constructor - :param str stdin_data: forwarded to stdin of process through communicate - :param bool split_lines: True (default) to split output line-wise and - return list of strings; False --> return single - string for out and one for err - :param args: forwarded to Popen constructor - :param kwargs: forwarded to Popen constructor - :returns: (return_code, stdout, stderr); stdout and stderr are lists of - text lines (without terminating newlines); if universal_newlines is - True (default), the lines are of type str, otherwise they are non- - unicode text (type str in py2, bytes in py3). If split_lines is False - (not default), then stdout and stderr are single multi-line strings - - :raise: OSError (e.g., if command does not exist), ValueError if args are - invalid; no :py:class:`subprocess.CalledProcessError` nor - :py:class:`subprocess.TimeoutExpired` - """ - - # construct args - enforced_params = ('stdout', 'stderr') - my_kwargs = dict(stdout=PIPE, stderr=PIPE, - shell=False, universal_newlines=True) - for key, value in kwargs.items(): - if key not in enforced_params: - my_kwargs[key] = value - - # run command - proc = Popen(command, *args, **my_kwargs) - stdout_data, stderr_data = proc.communicate(stdin_data) - - # return - if split_lines: - return proc.returncode, stdout_data.splitlines(), \ - stderr_data.splitlines() - else: - return proc.returncode, stdout_data, stderr_data diff --git a/file_helpers.py b/file_helpers.py deleted file mode 100644 index 2035a77..0000000 --- a/file_helpers.py +++ /dev/null @@ -1,176 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" Helper functions and classes to deal with files and dirs and stuff - -Featuring:: - -* the cd context manager pwd(); with cd(other_dir): pwd(); pwd(); - will print current working dir, then other_dir, then first dir again -* a wrapper around "df" to determine size and usage of file systems - -.. todo:: change get_filesystem_fill_states to not fork - -.. codeauthor:: Intra2net -""" - -from __future__ import print_function - -from contextlib import contextmanager -import os -from warnings import warn -from math import floor, ceil - -from call_helpers import call_and_capture -from iter_helpers import pairwise - - -@contextmanager -def cd(path): - """A context manager which changes the working directory to the given - path, and then changes it back to its previous value on exit. - - Taken from comment for python recipe by Greg Warner at - http://code.activestate.com/recipes/576620-changedirectory-context-manager/ - (MIT license) - """ - prev_cwd = os.getcwd() - os.chdir(path) - try: - yield - finally: - os.chdir(prev_cwd) - - -DF_CMD = ['/usr/bin/df', '--no-sync', '--portability'] -DF_SIZE_UNIT = 1024 - -class FilesystemFillState: - """ representation of 1 line of the 'df' command - - has fields filesystem, size, used, available, capacity, mount_point - - Note that only apprixomately capacity == used/size - and that only approximately used + available == size - and that all sizes are in bytes - """ - - def __init__(self): - self.name = None - self.size = None - self.used = None - self.available = None - self.capacity = None - self.mount_point = None - - def __str__(self): - return '[Filesystem {0} mounted at {1}: {2}% used]' \ - .format(self.name, self.mount_point, self.capacity) - -def get_filesystem_fill_states(): - """ get fill state on all filesystems - - parses the output of cmd 'df', returns list of - :py:class:`FilesystemFillState` - - ..todo:: replace call to df (which forks) by call to statvfs combined with - some way to find all mounted partitions (parse /proc/mounts - whenever it changes) - """ - - # call - code, out, err = call_and_capture(DF_CMD) - - # warn if unexpected outcome - if code != 0: - warn('df returned non-zero exit code {0}!'.format(code)) - if err: - for line in err: - warn('df produced output to stderr: "{0}"'.format(line)) - - # find columns in output that are just spaces - min_len = min(len(line) for line in out) - separator_cols = [idx for idx in range(min_len) if \ - all(line[idx] == ' ' for line in out)] - checked_cols = [separator_cols[0], ] - for prev_col, col in pairwise(separator_cols): - if col != prev_col+1: - checked_cols.append(col) - separator_cols = checked_cols - - # check columns and their header - if len(separator_cols) != 5: - raise ValueError('unexpected number of separator columns: {0}' - .format(separator_cols)) # must eliminate neighbours? - - title_line = out[0] - title = title_line[ : separator_cols[0]].strip() - if title != 'Filesystem': - warn('Unexpected column title: "{0}" != "Filesystem"!' - .format(title)) - title = title_line[separator_cols[0] : separator_cols[1]].strip() - if title != '1024-blocks': - warn('Unexpected column title: "{0}" != "1024-blocks"!' - .format(title)) - title = title_line[separator_cols[1] : separator_cols[2]].strip() - if title != 'Used': - warn('Unexpected column title: "{0}" != "Used"!' - .format(title)) - title = title_line[separator_cols[2] : separator_cols[3]].strip() - if title != 'Available': - warn('Unexpected column title: "{0}" != "Available"!' - .format(title)) - title = title_line[separator_cols[3] : separator_cols[4]].strip() - if title != 'Capacity': - warn('Unexpected column title: "{0}" != "Capacity"!' - .format(title)) - title = title_line[separator_cols[4] : ].strip() - if title != 'Mounted on': - warn('Unexpected column title: "{0}" != "Mounted on"!' - .format(title)) - - # create result - result = [] - for line in out[1:]: - stats = FilesystemFillState() - stats.name = line[ : separator_cols[0]].strip() - stats.size = int(line[separator_cols[0] : separator_cols[1]].strip()) \ - * DF_SIZE_UNIT - stats.used = int(line[separator_cols[1] : separator_cols[2]].strip()) \ - * DF_SIZE_UNIT - stats.available = int(line[separator_cols[2] : separator_cols[3]]\ - .strip()) * DF_SIZE_UNIT - stats.capacity = int(line[separator_cols[3] : separator_cols[4]]\ - .strip()[:-1]) - stats.mount_point = line[separator_cols[4] : ].strip() - - # more checks: does match capacity - capacity = 100. * stats.used / stats.size - if abs(capacity - stats.capacity) > 5.: - warn('capacities for {0} deviate more than 5%: ' - '{1} != {2:.2f}(={3}/{4})'.format( - stats.name, stats.capacity, capacity, stats.used, stats.size)) - - size = stats.used + stats.available - if float(abs(stats.size - size)) / float(max(stats.size, size)) > 0.1: - warn('size for {0} differs by more than 10% from used+available!' - .format(stats.name)) - - result.append(stats) - - return result diff --git a/follow.py b/follow.py deleted file mode 100644 index 3cf8bcb..0000000 --- a/follow.py +++ /dev/null @@ -1,638 +0,0 @@ -#!/usr/bin/env python - -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" -follow process output, log files and pipes using select and poll - -DEPRECATED -(at least for files see log_read; may still be usefull for pipes/sockets) - -Main class is Follower which does the polling and selecting, it is best used -in a with-statement as follows:: - - with follow('/var/log/messages') as flwr - for line in flwr: - do_something_with(line) - -This will read the given file and yield its contents line-by-line until the end -of the file. It will then wait for new additions to the file and provide the -new lines newly instantaneously - -Things to note: - -* all data must be line-based! -* will only work on Linux (except for sockets maybe). -* create in py2 but try to stay py3-compatible. - -[START: not implemented yet] - -If following a log file, a LogParser can be attached that auto-detects some of -the log's structure (like date and time fields, log levels and sources) from -its first few lines.. This can be used anlogously. Of course, you can also -specify the log's structure (to be implemented before the auto-detect...):: - - with follow_log('/var/log/messages') as log_flwr: - for content in log_flwr: - do_something_with(content.datetime, content.log_level, content.text) - -[END: not implemented yet] - -A Follower is an iterator, which means you can do lots of cool things with it, -including (see also itertools package, itertool recipies, "Functional -Programming Howto"):: - - # ignore source and description: - for _, _, text_line in my_follower: - do_something_with(line) - - # enumerate: - for line_number, line in enumerate(my_follower) - do_something_with(line, line_number) - - # combine with other iterator: - for line, other_data in zip(my_follwer, other_iter) - do_something_with(line, other_data) - - # filter: - for line in my_follower if test(my_func): - do_something_with(line) - - # tee: - iter1, iter2 = itertools.tee(my_follower) - --> can read from both (but each line is given to only one of them) - - # for each element, peek at the next one to help do the right thing - for line, next_line in pairwise(my_follower): - do_something_with(line, peek_next_line=next_line) - - # create new iterator or generator - for line in my_follwer: - some_result = do_something_with(line) - yield some_result - -NOT possible:: - - len(my_follower) - Follower(my_file, my_file) # (twice the same) - -.. codeauthor:: Intra2net -""" - -from __future__ import print_function -from warnings import warn -from select import poll, POLLIN, POLLOUT, POLLPRI, POLLHUP, POLLERR, POLLNVAL -from subprocess import Popen, PIPE -from socket import socket as Socket - -from type_helpers import isstr - -# ############################################################################# -# CONSTANTS -# ############################################################################# -DEFAULT_POLL_FLAGS = POLLIN | POLLPRI | POLLHUP | POLLERR - -DEFAULT_POLL_TIMEOUT = 10 - -# ############################################################################# -# EXCEPTIONS -# ############################################################################# - - -def flags_to_str(flags): - """ return short string representation for poll flags """ - text_parts = [] - if flags & POLLIN: - text_parts.append('IN') - if flags & POLLOUT: - text_parts.append('OUT') - if flags & POLLPRI: - text_parts.append('PRI') - if flags & POLLERR: - text_parts.append('ERR') - if flags & POLLHUP: - text_parts.append('HUP') - if flags & POLLNVAL: - text_parts.append('INVALID') - return ','.join(text_parts) - - -class PollFlagsException(Exception): - """ exception raised if polling returned unexpected flags """ - def __init__(self, flags, desc): - super(PollFlagsException, self).__init__( - 'Unexpected flags from polling {0}: {1}'.format( - desc, flags_to_str(flags))) - - -class PollHupException(Exception): - """ exception raised when polled source sends a SIGHUP """ - def __init__(self, desc): - super(PollHupException, self).__init__( - 'Received HUP from polling, {0} was probably killed!'.format(desc)) - - -class PollUnknownSourceException(Exception): - """ exception raised when polling returns unknown source """ - def __init__(self, file_no): - super(PollUnknownSourceException, self).__init__( - 'Unknown source returned from polling: file_no={0}!'.format( - file_no)) - - -class Follower(object): - """ uses select and poll to follow some set of pipes, files, sockets - - relies on data being line-based! - - will read as much data as possible and then wait for more - - Iterator over lines! - """ - - # iterator over zip(sources, file_nos, descriptions) - _source_iter = None - - # for polling: - _poller = None - _flags = None - _timeout = None - - def __init__(self, *sources_and_descriptions, **other_args): - """ create a Follower for given sources and optional descriptions - - Will guess if args are just sources or also descriptions of these. - All of these are possible: - - * Follower(src). - * Follower(src, desc). - * Follower(src1, src2, src3). - * Follower(src1, desc1, src2, desc2, src3, desc3). - * Follower(src1, src2, desc2, src3). - * Follower(src, desc_ignored, desc_used) # warn but accept. - * Follower(single_list_of_sources_and_descs). - - Not possible: - - * Follower(). - * Follower(desc, src). - - Descriptions must be strings, they identify the sources in generated - output and are used in error messages - - Sources must be file handles, open pipes or open sockets (or anything - else that gives a reasonable fileno(), so help of the :py:mod:`select` - module) - - Sources are not closed! - - other_args can include flags and timeout - """ - - # take care about other_args - if 'flags' in other_args: - self._flags = other_args['flags'] - else: - self._flags = DEFAULT_POLL_FLAGS - if 'timeout' in other_args: - self._timeout = other_args['timeout'] - else: - self._timeout = DEFAULT_POLL_TIMEOUT - - for key in other_args.keys(): - if key not in ('flags', 'timeout'): - raise ValueError('argument not recognized: {0}'.format(key)) - - self._poller = poll() - - sources = [] - file_nos = [] - descriptions = [] - - # take care of sources_and_descriptions - if not sources_and_descriptions: - raise ValueError('need at least one source!') - elif len(sources_and_descriptions) == 1 \ - and isinstance(sources_and_descriptions[0], (list, tuple)): - sources_and_descriptions = sources_and_descriptions[0] - - for arg in sources_and_descriptions: - if isstr(arg): # is a description - if len(descriptions) == 0: - raise ValueError( - 'first arg must be source, not a description!') - if descriptions[-1] is not None: - warn('Overwriting description "{0}" with "{1}"'.format( - descriptions[-1], arg)) - descriptions[-1] = arg - else: # is a new source - sources.append(arg) - file_nos.append(arg.fileno()) - descriptions.append(None) - self._poller.register(arg, self._flags) - # end: for all args - - # need iterator over these 3 lists all the time - self._source_iter = list((src, fno, desc) for src, fno, desc in - zip(sources, file_nos, descriptions)) - # end: Follower constructor - - def next(self): - """ main function to poll next line from sources - - returns (source, desc, line_stripped, flags) - """ - - while True: - result = self._poller.poll(self._timeout) - for file_no, flags in result: - - # identify source - desc = -1 - source = None - for curr_src, curr_no, curr_desc in self._source_iter: - if curr_no == file_no: - desc = curr_desc - source = curr_src - break - # end: loop over self._source_iter - if desc is -1: - raise PollUnknownSourceException(file_no) - - if not flags & self._flags: - raise PollFlagsException(flags, desc) - - if flags & POLLHUP: - #raise PollHupException(desc) - warn('received a hangup during polling for {0}' - .format(desc)) - - if flags & POLLERR: - warn('received an err during polling for {0}' - .format(desc)) - - if flags & POLLNVAL: - warn('poll replied "invalid request" err during polling ' - 'for {0}' - .format(desc)) - - # read line from source - line = source.read() # was readline(), but try something else - # removed the rstrip() from return - - if not line: - continue - # if reach here, we have a new line of text - - return source, desc, line, flags - # for poll results - # end: inf loop - # end: Follower._next_line - - def __iter__(self): - """ makes this an iterator, called by iter(my_follower) """ - return self - - def __next__(self): - """ called by next(my_follower) """ - return self.next() -# end: class Follower - - -# ############################################################################# -# CONTEXT MANAGEMENT -# ############################################################################# - -#: defualt args for Popen constructor in case a process is given as cmd -DEFAULT_SUBPROCESS_ARGS = dict(bufsize=1, stdin=None, stdout=PIPE, stderr=PIPE) - -class FollowContextManager(object): - """ context for Follower objects, ensures everything is closed properly - - opens and closes files and sockets; communicates and waits for Popen - process objects whose stdout and stderr pipes are followed - """ - - files = None - file_descs = None - file_handles = None - - sockets = None - socket_descs = None - - procs = None - proc_descs = None - proc_objs = None - - def __init__(self, files=None, file_descs=None, - sockets=None, socket_descs=None, - procs=None, proc_descs=None, - subprocess_args=None): - """ create a context manager for Follower - - check args and that they match. - tries to guess good descs for files, sockets and procs that are not - given - - :param files: list/tuple of, or single file handle or file name - :param file_descs: None or list/tuple of same length as files - :param sockets: list/tuple of, or single socket - :param socket_descs: None or list/tuple of same length as sockets - :param procs: list/tuple of, or single Popen object or command itself - as str/list/tuple - :param proc_descs: None or list/tuple of same length as procs - :param dict subprocess_args: dict or args that are merged with - :py:data:`DEFAULT_SUBPROCESS_ARGS` - - ..seealso:: could have tried to implement this as a nesting of many - individual context managers, see :py:mod:`contextlib` - """ - print('__init__') - - # set files and file_descs and ensure that they matching lists - if files is None: - self.files = [] - elif isstr(files): - self.files = [files, ] - elif isinstance(files, file): - self.files = [files, ] - else: - self.files = files - if file_descs is None: - temp_descs = [None for _ in self.files] - elif len(self.files) == len(file_descs): - temp_descs = file_descs - else: - raise ValueError('if given file descs, need desc for all files!') - - # try to guess good file_desc values; ensure they are str - self.file_descs = [] - for file_nr, (file, file_desc) in \ - enumerate(zip(self.files, temp_descs)): - if isstr(file_desc): - self.file_descs.append(file_desc) - continue - elif file_desc is None: # need to guess something - if isstr(file): - self.file_descs.append(file) - else: - self.file_descs.append('file{0}'.format(file_nr)) - else: # desc is neither str nor None - raise ValueError('file descs must be string or None!') - - # set sockets and socket_descs and ensure that they matching lists - if sockets is None: - self.sockets = [] - elif isinstance(sockets, Socket): - self.sockets = [sockets, ] - else: - self.sockets = sockets - if socket_descs is None: - temp_descs = [None for _ in self.sockets] - elif len(self.sockets) == len(socket_descs): - temp_descs = socket_descs - else: - raise ValueError('if given socket descs, ' - 'need descs for all sockets!') - - # try to guess good socket_desc values; ensure they are str - self.socket_descs = [] - for file_nr, (socket, socket_desc) in \ - enumerate(zip(self.sockets, temp_descs)): - if isstr(socket_desc): - self.socket_descs.append(socket_desc) - elif socket_desc is None: # need to guess something - self.socket_descs.append('socket{0}'.format(socket_nr)) - else: # desc is neither str nor None - raise ValueError('socket descs must be string or None!') - - # set procs and proc_descs and ensure they matching lists - if procs is None: - self.procs = [] - elif isstr(procs): - self.procs = [procs, ] - elif isinstance(procs, Popen): - self.procs = [procs, ] - else: - self.procs = procs - if proc_descs is None: - temp_descs = [None for _ in self.procs] - elif len(proc_descs) == len(self.procs): - temp_descs = proc_descs - else: - raise ValueError('if given proc descs, need descs for all procs!') - - # try to guess good proc_desc values; ensure they are str - self.proc_descs = [] - for proc_nr, (proc, proc_desc) in \ - enumerate(zip(self.procs, temp_descs)): - if isstr(proc_desc): - self.proc_descs.append(proc_desc) - elif proc_desc is None: # need to guess something - if isstr(proc): - self.proc_descs.append(proc) - elif isinstance(proc, (tuple, list)): - self.proc_descs.append(' '.join(proc)) - elif isinstance(proc, Popen): - if isstr(proc.args): - self.proc_descs.append(proc.args) - else: - self.proc_descs.append(' '.join(proc.args)) - else: - self.proc_descs.append('proc{0}'.format(proc_nr)) - else: # desc is neither str nor None - raise ValueError('proc descs must be string or None!') - - self.subprocess_args = DEFAULT_SUBPROCESS_ARGS - if subprocess_args is not None: - self.subprocess_args.update(subprocess_args) - - def __enter__(self): - """ called when entering run context - - opens files, tries to create descs, and assembles args for Follower - """ - print('__enter__') - - args = [] - - # open files - self.file_handles = [] - for file_arg, desc in zip(self.files, self.file_descs): - if isinstance(file_arg, str): - new_handle = open(file_arg) - else: # assume file_arg is a file handle - new_handle = file_arg - self.file_handles.append(new_handle) - args.append(new_handle) - args.append(desc) - # end: for files and file_descs - - for sock_number, (socket, desc) in \ - enumerate(zip(self.sockets, self.socket_descs)): - args.append(socket) - args.append(desc) - - self.proc_objs = [] - for proc, desc in zip(self.procs, self.proc_descs): - if isstr(proc) or isinstance(proc, (list,tuple)): - proc = Popen(proc, **self.subprocess_args) - self.proc_objs.append(proc) - args.append(proc.stdout) - args.append(desc + '_out') - args.append(proc.stderr) - args.append(desc + '_err') - - return Follower(args) - - def __exit__(self, exc_type, exc_value, traceback): - """ called when leaving run context """ - print('__exit__ with args {0}, {1}, {2}'.format(exc_type, exc_value, - traceback)) - - # close files - for handle in self.file_handles: - try: - handle.close() - except Exception: - warn('ignoring exception exiting follower context manager!') - for proc in self.proc_objs: - try: - proc.kill() - rest_out, rest_err = proc.communicate() - if rest_out or rest_err: - warn('Ignoring left-over output in proc') - except Exception: - warn('ignoring exception exiting follower context manager!') -# end: class FollowContextManager - - -def follow(*args, **kwargs): - """ creates a ContextManager for a Follower to be used in "with" statements - - for example: - - with follow('/var/log/messages') as log_flwr - for source, desc, line in log_flwr - do_something_with(line) - - for specification of args see FollowContextManager constructor help - """ - return FollowContextManager(*args, **kwargs) - - -# ############################################################################# -# TESTING -# (hard to create unittest -- would need separate threads to write to some -# file /pipe/socket so can test the interactivity...) -# ############################################################################# - -from datetime import date, datetime as dt, timedelta as td -from subprocess import Popen, PIPE - -syslog_file = '/var/log/messages' -time_diff_seconds = 60 -syslog_time_format = '%b %d %H:%M:%S' - - -def test_syslog_line(n_lines, source, desc, line, - today_str, start_time): - """ called by test functions for each line of syslog """ - - if n_lines % 1000 == 0: - print(('{0:6d} old lines, showing lines after {1}; ' - 'abort using Ctrl-C').format(n_lines, start_time)) - if line[:6] != today_str: - return - try: - log_time = dt.strptime(line[:15], syslog_time_format) - log_time = log_time.replace(year=start_time.year) - except ValueError: - log_time = None - - if log_time is None or log_time > start_time: - print('line {0} from "{1}", (orig from {2}): {3}'.format( - n_lines, desc, log_time, line)) - - -def test_follower_syslog(): - """ test Follower on syslog file using PIPEs and an open file - - show lines from the last 5 minutes - """ - - start_time = dt.now() - td(seconds=time_diff_seconds) - today_str = date.today().strftime(syslog_time_format)[:6] - if today_str[4] == '0': - today_str = today_str[:4] + ' ' + today_str[5:] # 'Oct 01' --> 'Oct 1' - - # create process for 'tail -f syslog' --> 2 pipes (stdout + stderr) - proc = Popen(['tail', '-f', syslog_file], stdout=PIPE, stderr=PIPE) - - # read from all three sources - with open(syslog_file, 'r') as file_handle: - flwr = Follower(file_handle, 'syslog file', proc.stdout, 'tail syslog', - proc.stderr, 'tail stderr') - for n_lines, (source, desc, line, flags) in enumerate(flwr): - if flags: - print('received flags {0}'.format(flags_to_str(flags))) - test_syslog_line(n_lines, source, desc, line, - today_str, start_time) - - -def test_follower_context(): - """ test FollowContextManager and follow() function """ - - today_str = date.today().strftime(syslog_time_format)[:6] - if today_str[4] == '0': - today_str = today_str[:4] + ' ' + today_str[5:] # 'Oct 01' --> 'Oct 1' - start_time = dt.now() - td(seconds=time_diff_seconds) - with follow(syslog_file) as flwr: - for n_lines, (source, desc, line, flags) in enumerate(flwr): - if flags: - print('received flags {0}'.format(flags_to_str(flags))) - test_syslog_line(n_lines, source, desc, line, - today_str, start_time) - -def test_context_proc(): - """ test FollowContextManager's ability to wrap proc args """ - - today_str = date.today().strftime(syslog_time_format)[:6] - if today_str[4] == '0': - today_str = today_str[:4] + ' ' + today_str[5:] # 'Oct 01' --> 'Oct 1' - start_time = dt.now() - td(seconds=time_diff_seconds) - with follow(procs=(['tail', '-f', syslog_file], )) as flwr: - for n_lines, (source, desc, line, flags) in enumerate(flwr): - if flags: - print('received flags {0}'.format(flags_to_str(flags))) - test_syslog_line(n_lines, source, desc, line, - today_str, start_time) - - -def test(): - """ Main function, tests some of class's functionality """ - - #test_follower_syslog() - #test_follower_context() - test_context_proc() -# end: function main - - -if __name__ == '__main__': - test() diff --git a/iter_helpers.py b/iter_helpers.py deleted file mode 100644 index 624ede5..0000000 --- a/iter_helpers.py +++ /dev/null @@ -1,33 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" Helper functions with iterators -- functional program is great! - -Currently, recipes from python itertools docu (:py:mod:`itertools`) that have -been deemed useful - -.. codeauthor:: Intra2net -""" - -from itertools import * - -def pairwise(iterable): - """ s -> (s0,s1), (s1,s2), (s2, s3), ... """ - a, b = tee(iterable) - next(b, None) - return zip(a, b) diff --git a/log_helpers.py b/log_helpers.py deleted file mode 100644 index d51e641..0000000 --- a/log_helpers.py +++ /dev/null @@ -1,462 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" Helpers for logging; featuring: - -ShortLevelFormatter: provide a 4-char-sized field "shortlevel" for message -urgency (dbug/info/warn/err /crit) - -I2nLogger: logger that provides a notice(), allows omission for str.format - and is quite convenient in general - -get_logger: factor for creating I2nLoggers - -Further ideas: :: -* allow milliseconds in dateformat field (see test_short_level_format) -* create own basicConfig-like function that uses our classes as default - --> started using I2nLogger constructor and get_logger -* try to find out module that calls I2nLogger constructor to provide a good - default value for name (finding out the current module is a pain in the a..) - -..todo:: create unittests from test_* functions at bottom -..todo:: think about how to allow different levels per handler -..todo:: do not limit logs by line numbers but by disc size? Warn when at 50%, - 75%, 90%, 99% of limit? - -.. codeauthor:: Intra2net -""" - -import logging -from logging import DEBUG, INFO, WARNING, ERROR, CRITICAL, NOTSET -from math import log10, floor -import sys - -from type_helpers import isstr - -#: log level half-way between INFO and WARNING -NOTICE = (INFO + WARNING)/2 -logging.addLevelName(NOTICE, 'NOTICE') - -#: default formatting string for ShortLevelFormatter -DEFAULT_SHORT_LEVEL_FORMAT = '%(asctime)s:%(msecs)03d %(shortlevel)s| %(msg)s' - -#: default formatting string for date/time in ShortLevelFormatter -DEFAULT_SHORT_LEVEL_DATE_FORMAT = '%H:%M:%S' - -#: mapping from level name to level int for I2nLogger's set_level -LEVEL_DICT = dict(notset=NOTSET, debug=DEBUG, info=INFO, notice=NOTICE, - warning=WARNING, error=ERROR, critical=CRITICAL) - -#: min level allowed in I2nLogger -MIN_LEVEL = NOTSET - -#: max level allowed in I2nLogger -MAX_LEVEL = CRITICAL - -#: constant for I2nLogger streams argument: stream to stdout -STDOUT = sys.stdout - - -class ShortLevelFormatter(logging.Formatter): - """ - Formatter for logging handlers that allows use of format field "shortlevel" - - using this formatter, you can specify in the log message format string the - field "shortlevel" which will introduce in your log messages a 4-char field - for the log record urgency: "DEBUG" --> "dbug", "INFO" --> "info", - "WARNING" --> "warn", "ERROR" --> "err ", "CRITICAL" --> "crit" - - All other functionality (like other format fields) is inherited from base - class Formatter - - Most easily used indirectly through :py:class:`I2nLogger` - (uses this by default) - - Explicit usage example:: - - logger = logging.getLogger(__name__) - logger.setLevel(logging.DEBUG) - handler = logging.StreamHandler() - handler.setLevel(logging.DEBUG) - formatter = ShortLevelFormatter('%(shortlevel)s| %(msg)s') - handler.setFormatter(formatter) - logger.addHandler(handler) - - You can even add new levels:: - - notice = (logging.INFO + logging.WARNING)/2 - formatter.add_level(notice, 'note') - logger.log(notice, 'more important than info but no warning nor error') - - .. seealso:: testing funcion :py:func:`test_short_level_format` - """ - - def __init__(self, fmt=DEFAULT_SHORT_LEVEL_FORMAT, - datefmt=DEFAULT_SHORT_LEVEL_DATE_FORMAT, *args, **kwargs): - """ creates a new ShortLevelFormatter - - forwards all args to super class Formatter and initializes dict with - levelno and short string representations - """ - self.parent = super(ShortLevelFormatter, self) - self.parent.__init__(fmt=fmt, datefmt=datefmt, *args, **kwargs) - self._shortlevel_dict = {DEBUG: 'dbug', INFO: 'info', WARNING: 'warn', - ERROR: 'err ', CRITICAL: 'crit', - NOTSET: '----'} - - def format(self, record): - """ create string representation of given log record """ - try: - record.shortlevel = self._shortlevel_dict[record.levelno] - except KeyError: - record.shortlevel = '????' - - return self.parent.format(record) - - def add_level(self, levelno, shortlevel_str): - """ add a new message urgency level - - :param int levelno: numeric urgency level - :param str shortlevel_str: string representation of message urgency; - should be of length 4 - :returns: nothing - """ - - self._shortlevel_dict[levelno] = shortlevel_str - - -#: memory for all I2nLogger instances in current session to avoid overwrite -#: used in :py:func:`get_logger` and :py:class:`I2nLogger` constructor -_i2n_loggers = {} - - -def get_logger(name, *args, **kwargs): - """ factor for :py:class:`I2nLogger`: create new or return existing object - - all args as in :py:class:`I2nLogger` constructor; will be ignored if logger - with given name exists already! - """ - - if name in _i2n_loggers: - return _i2n_loggers[name] # ignore all args - else: - return I2nLogger(name, *args, **kwargs) - - -class I2nLogger: - """ a more convenient logger - - Features:: - * can be used without ".format()" as follows:: - - logger.info('the result is {0}', result) # more convenient than... - logger.info('the result is {0}'.format(result)) # ... the original - - * Has a :py:meth:`Logger.notice` function - * Has by default as only handler a :py:class:`logging.StreamHandler` to - stdout with a :py:class:`ShortLevelFormatter` - * Simplifies setting of logger's logging level using - :py:meth:`Logger.set_level`. The level is kept in sync between logger and - handlers and can be queried using :py:meth:`get_level` - (does not work accross logger hierarchies!) - * can specify name and level and [date]fmt all in constructor - * can limit the number of lines this logger will produce to prevent filling - hard drive with log file - (assumes one line per call to log/debug/info/notice/..., - only counts calls with priority above this logger's level) - * provides shorter method names: dbg, note, warn, err - - ..note:: Creating multiple instances with the same name is not allowed and - will result in an error. Use :py:func:`get_logger` instead of this - constructor to avoid such situtations - - ..note:: Do not change or use the underlying logger or functionality here - (in particular line counting, get_level) is no longer reliable! - """ - - def __init__(self, name, level=INFO, fmt=DEFAULT_SHORT_LEVEL_FORMAT, - datefmt=DEFAULT_SHORT_LEVEL_DATE_FORMAT, - streams=STDOUT, files=None, max_lines=None): - """ creates a I2nLogger; forwards args to logging.getLogger - - - :param str name: name of this logger, best practice is module name - :param int level: best use one of the constants DEBUG, INFO NOTICE, - WARNING, ERROR - :param str fmt: format of log messages, see - :py:class:`ShortLevelFormatter` for more help - :param str datefmt: format of date added to log messages, see - :py:class:`ShortLevelFormatter` for more help - :param streams: list/tuple of or a single stream to log to, default is - STDOUT (=sys.stdout) - :param files: list/tuple or single file name to log to - :param max_lines: number > 0 to limit number of output calls to that - number; give None (default) to no limit - :raise: ValueError if an I2nLogger with the same name exists already - """ - - # check if an instance with the same name has been created earlier - # to prevent conflicts - global _i2n_loggers - if name in _i2n_loggers: - raise ValueError("An I2nLogger with that exact name ('{0}') exists" - " already -- use get_logger instead!" - .format(name)) - - self._log = logging.getLogger(name) - self._level = min(MAX_LEVEL, max(MIN_LEVEL, level)) - self._log.setLevel(self._level) - - # remove handlers (sometimes there are mutliple by default) - for handler in self._log.handlers: - self._log.removeHandler(handler) - - # create new handlers and formatter - if streams is None: - stream = [] - elif not isinstance(streams, (list, tuple)): - streams = (streams, ) - for stream in streams: - formatter = ShortLevelFormatter(fmt=fmt, datefmt=datefmt) - formatter.add_level(NOTICE, 'note') - new_handler = logging.StreamHandler(stream) - new_handler.setFormatter(formatter) - new_handler.setLevel(self._level) - self._log.addHandler(new_handler) - - if files is None: - files = [] - elif not isinstance(files, (list, tuple)): - files = (files, ) - for file_name in files: - formatter = ShortLevelFormatter(fmt=fmt, datefmt=datefmt) - formatter.add_level(NOTICE, 'note') - new_handler = logging.FileHandler(file_name) - new_handler.setFormatter(formatter) - new_handler.setLevel(self._level) - self._log.addHandler(new_handler) - - # remember max_lines - self.set_max_lines(max_lines) - - # remember that this logger is a I2nLogger - _i2n_loggers[name] = self - - def dbg(self, message, *args, **kwargs): - self.log(DEBUG, message, *args, **kwargs) - - def debug(self, message, *args, **kwargs): - self.log(DEBUG, message, *args, **kwargs) - - def info(self, message, *args, **kwargs): - self.log(INFO, message, *args, **kwargs) - - def note(self, message, *args, **kwargs): - self.log(NOTICE, message, *args, **kwargs) - - def notice(self, message, *args, **kwargs): - self.log(NOTICE, message, *args, **kwargs) - - def warn(self, message, *args, **kwargs): - self.log(WARNING, message, *args, **kwargs) - - def warning(self, message, *args, **kwargs): - self.log(WARNING, message, *args, **kwargs) - - def err(self, message, *args, **kwargs): - self.log(ERROR, message, *args, **kwargs) - - def error(self, message, *args, **kwargs): - self.log(ERROR, message, *args, **kwargs) - - def critical(self, message, *args, **kwargs): - self.log(CRITICAL, message, *args, **kwargs) - - def log(self, level, message, *args, **kwargs): - if level >= self._level: - if self._line_counter == self._max_lines: - self._log.log(ERROR, - 'reached max number of output lines ({0}) ' - '-- will not log anything any more!' - .format(self._line_counter)) - self._line_counter += 1 - elif self._line_counter > self._max_lines: - return - else: - self._log.log(level, message.format(*args), **kwargs) - self._line_counter += 1 - - def log_count_if_interesting(self, count, level=INFO, counter_name=None): - """ Log value of a counter in gradually coarser intervals - - see :py:func:`is_interesting_count` for definition of "interesting" - """ - if is_interesting_count(count): - if counter_name: - self.log(level, '{0} counter is at {1}', counter_name, count) - else: - self.log(level, 'Counter is at {0}', count) - - def get_level(self): - """ return int level of this logger """ - return self._level - - def get_level_str(self): - """ returns :py:func:`logging.getLevelName` on :py:meth:`get_level` """ - return logging.getLevelName(self._level) - - def set_level(self, new_level): - """ set level given an int or a str - - :arg new_level: int or str (str is converted to lower case) - :raises: KeyError if new_level is a string that is not in - :py:data:`LEVEL_DICT` - """ - if isstr(new_level): - self._level = LEVEL_DICT[new_level.lower()] - else: - self._level = min(MAX_LEVEL, max(MIN_LEVEL, new_level)) - self._log.setLevel(self._level) - for handler in self._log.handlers: - handler.setLevel(self._level) - - def set_max_lines(self, max_lines): - """ limit number of lines this produces; give None to remove limit - - resets the line counter - """ - if max_lines > 0: - self._max_lines = max_lines - elif max_lines < 0 or (not max_lines): - self._max_lines = None - else: - raise ValueError('unexpected value for max_lines: {0}!' - .format(max_lines)) - self._line_counter = 0 - - def get_max_lines(self): - """ return current value for line limit """ - return self._max_lines - - def exceeded_max_lines(self): - """ return True if nothing will be logged because max_lines was reached - """ - if self._max_lines: - return self._line_counter >= self._max_lines - else: - return False - - -def n_digits(number): - """ returns the number of digits a number has in decimal format - - :returns: 1 for 1...9, 2 for 10...99, 3 for 100...999, ... - 0 for 0 (and everything else beween -1 and 1) - 1 for -1...-9, 2 for -10...-99, ... - """ - if abs(number) < 1: - return 0 - else: - return floor(log10(abs(number)))+1 - - -def is_interesting_count(counter): - """ return True if counter has reached an "interesting" value - - For the counter to be "interesting" becomes ever harder. At first it is - easy (returns True for 1,2,3,6,10), then becomes harder (True only for - 10,20,30,60,100) and harder (True for 100,200,300,600,1000) and this scheme - continues on a logartihmic scale. - - An example that will print lots in the beginning and then less and less:: - - counter = 0 - while not end_reached(): - do_something() - if is_interesting_count(counter): - log('reached iteration {0}'.format(counter)) - counter += 1 - - Or implicitly using I2nLogger::log_count_if_interesting(counter) - - :returns: True for a few values of counter, False most of the time - """ - - return float(counter) / 10.**(n_digits(counter)-1.) in (1., 2., 3., 6.) - - -def test_short_level_format(): - """ quick test of :py:class:`ShortLevelFormatter` """ - - logger = logging.getLogger('logtest') - logger.setLevel(DEBUG) - handler = logging.StreamHandler() - handler.setLevel(DEBUG) - formatter = ShortLevelFormatter( - '%(asctime)s:%(msecs)03d %(shortlevel)s| %(msg)s' - ' [regular levelname=%(levelname)s]', - datefmt='%H:%M:%S') - handler.setFormatter(formatter) - logger.addHandler(handler) - - # 'application' code - logger.debug('debug message') - logger.info('info message') - logger.warn('warn message') - logger.error('error message') - logger.critical('critical message') - logger.log(15, 'unknown level') - logger.log(NOTSET, 'level not set') - - # add notice level - notice = (logging.INFO + logging.WARNING)/2 - formatter.add_level(notice, 'note') - logger.log(notice, 'more important than info but no warning nor error') - - # try if exception formatting still works: - try: - logger.info('this is what an exception looks like:') - impossible_result = 1/0 - logger.critical('just divided 1/0! The result is {0}' - .format(impossible_result)) - except ZeroDivisionError: - logger.exception('1/0 still does not work!', exc_info=True) - - # done - logger.info('done testing') - - -def test_get_logger(): - log = get_logger('logger_test') - log2 = get_logger('logger_test') - print(log == log2) - - -def test_line_counter(): - log = get_logger('logger_test', max_lines=10) - for idx in range(20): - for _ in range(20): - log.debug('should not show nor count') - print('calling log for idx {0}'.format(idx)) - log.info('logging with idx {0}', idx) - log.log_count_if_interesting(idx) - -if __name__ == '__main__': - #test_short_level_format() - #test_get_logger() - test_line_counter() diff --git a/log_read.py b/log_read.py deleted file mode 100644 index 0c30e88..0000000 --- a/log_read.py +++ /dev/null @@ -1,308 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" Iterative reading of log files - -Basic Functionality (class :py:class:`IterativeReader`:) -Runs stat in a loop to find out whether file size has changed. Then reads the -new data and forwards that - -..todo:: Want to also use lsof to find out whether file/pipe/socket was closed, - so can return from read loop - -:py:class:`LineReader` takes output of :py:class:`IterativeReader` and returns -it line-wise as is normal for log files - -:py:class:`LogParser` takes those lines and tries to parse them into fields -like date, time, module name, urgency and message. - -..todo:: auto-detect log line layout - -.. codeauthor:: Intra2net -""" - -import os -from warnings import warn -import os.path -from itertools import zip_longest - -from type_helpers import is_str_or_byte, is_file_obj - - -class LogReadWarning(UserWarning): - """ warnings issued by classes in this module """ - pass - - -def true_func(unused_argument_but_that_is_ok): - """ does nothing, always returns True """ - return True - - -def check_is_used(some_file_or_handle): - """ check whether file is being written to - - to be implemented, e.g. using lsof - """ - raise NotImplementedError() - - -_create_description_unknown_counter = 0 - -def create_description(file_obj, file_desc): - """ create some description for given file-like object / file descriptor - - :param file_obj: file-like object - :param int file_desc: os-level file descriptor - :returns: string - """ - - global _create_description_unknown_counter - - try: - desc = file_obj.name - if desc: - return desc - except AttributeError: - pass - - if file_desc is not None: - return 'file{0}'.format(file_desc) - else: - _create_description_unknown_counter += 1 - return 'unknown{0}'.format(_create_description_unknown_counter) - - -#: error message for IterativeReader constructor -_STR_ERR = 'not accepting file name "{0}" since cannot guarantee closing ' \ - 'files --> use with open(file_name)!' - - -class IterativeReader: - """ reads from a given file - - Uses os.stat(file_obj.fileno()).st_size as measure whether file has changed - or not; Always reads as much data as possible - - Catches most common exceptions in iteration (not constructor) - - Does not are about closing files, so does not accept file names - - This is the base for class :py:class:`LineReader` that just has to - implement a different :py:meth:`prepare_result` method - """ - - def __init__(self, sources, descs=None, return_when_done=False): - """ creates a reader; does some basic checks on args - - :param sources: iterable over sources. Sources can be opened file - objects or read-opened os-level file descriptors. - Calling code has to ensure they are closed properly, so - best use this within a "with open(file_name) as - file_handle:"-context. If sources is a single file - obj/descriptor, both source and desc will be converted - to lists of length 1 - :param descs: can be anything of same length as sources. If sources is - a single source, then descs is also converted to a list - of length 1. If not given (i.e. None), will use - :py:func:`create_description` to guess descriptions - :param bool return_when_done: ignore file_handle if no-one is writing - to it any more. Return from iterator when - all watched files are done (not - implemented yet) - :raises: OSError when testing fstat on source - """ - if not sources: - raise ValueError('need at least some source!') - elif is_str_or_byte(sources): - raise ValueError(_STR_ERR.format(sources)) - elif is_file_obj(sources) or isinstance(sources, int): - source_input = [sources, ] - desc_input = [descs, ] - else: - source_input = sources # assume some iterable - desc_input = descs - - # now divide sources into os-level file descriptors for os.fstat, - # and file objects for read() - self.file_objs = [] - self.file_descs = [] # file descriptOR, not descriptION - for source in source_input: - if is_file_obj(source): - self.file_objs.append(source) - self.file_descs.append(source.fileno()) - elif isinstance(source, int): - self.file_objs.append(os.fdopen(source)) - self.file_descs.append(source) - elif is_str_or_byte(source): - raise ValueError(_STR_ERR.format(source)) - else: - raise ValueError('source {0} is neither file obj nor file ' - 'descriptor!') - - # try to fstat the new file descriptor just for testing - os.fstat(self.file_descs[-1]) - - # guess descriptions if not given - if not desc_input: - self.descriptions = [create_description(obj, file_desc) - for obj, file_desc - in zip(self.file_objs, self.file_descs)] - else: - try: - if len(desc_input) != len(self.file_objs): - raise ValueError('need same number of sources and ' - 'descriptions!') - except TypeError: - pass # desc_input is generator or so - - self.descriptions = [] - for obj, file_desc, description in \ - zip_longest(self.file_objs, self.file_descs, desc_input): - if obj is None: - raise ValueError('more descriptions than sources!') - elif description is None: - self.descriptions.append(create_description(obj, - file_desc)) - else: - self.descriptions.append(description) - - self.last_sizes = [0 for _ in self.file_objs] - self.ignore = [False for _ in self.file_objs] - - if return_when_done: - self.is_used = check_is_used - else: - self.is_used = true_func - - for obj, file_desc, description in zip(self.file_objs, self.file_descs, - self.descriptions): - print('file descriptor {0}, file obj {1}, description "{2}"' - .format(file_desc, obj, description)) - - def n_sources(self): - return len(self.file_objs) - - def n_active_sources(self): - return len(self.ignore) - sum(self.ignore) - - def __iter__(self): - while True: - for idx, (obj, file_desc, description, last_size, do_ignore) in \ - enumerate(zip(self.file_objs, self.file_descs, - self.descriptions, self.last_sizes, - self.ignore)): - - if do_ignore: - continue - - # get new file size - new_size = os.fstat(file_desc).st_size - - # compare to old size - if new_size == last_size: - if not self.is_used(file_desc): - warn('no one is writing to {0} / {1} -- ' - 'stop watching it!' - .format(file_desc, description), - category=LogReadWarning) - self.do_ignore[idx] = True - elif new_size < last_size: - warn('{0} / {1} has become smaller ({2} --> {3})!' - .format(obj, description, last_size, new_size), - category=LogReadWarning) - else: # (new_size > last_size) - try: - new_data = obj.read() - except OSError as ose: # includes IOErrors - warn('io error reading from {0} / {1}: {2})' - .format(obj, description, ose), - category=LogReadWarning) - if len(new_data) != new_size - last_size: - warn('read unexpected amount from {0} / {1}: ' - '{2} bytes instead of {3} bytes!' - .format(obj, description, len(new_data), - new_size-last_size), - category=LogReadWarning) - - # post-processing - to_yield = self.prepare_result(description, new_data, idx) - for result in to_yield: - yield result - - # prepare next iteration - self.last_sizes[idx] = new_size - - def prepare_result(self, description, data, idx): - """ from raw new data create some yield-able results - - to be intended for overwriting in sub-classes - - this function is called from __iter__ for each new data that becomes - available. It has to return some iterable whose entries are yielded - from iteration over objects of this class. - - This base implementation just returns its input in a list, so new data - is yielded from __iter__ as-is - """ - return [(description, data), ] - - -LINE_SPLITTERS = '\n\r' - -class LineReader(IterativeReader): - """ an IterativeReader that returns new data line-wise - - this means buffering partial line data - """ - - def __init__(self, *args, **kwargs): - """ forwards all args and kwargs to :py:class:`IterativeReader` """ - super().__init__(*args, **kwargs) - self.line_buffers = ['' for _ in range(self.n_sources())] - - def prepare_result(self, description, new_data, idx): - """ take raw new data and split it into lines - - if line is not complete, then buffer it - - returns lines without their newline characters - """ - - #print('splitting "{0}" + "{1}"'.format(self.line_buffers[idx], - # new_data.replace('\n', r'\n'))) - all_data = self.line_buffers[idx] + new_data - self.line_buffers[idx] = '' - result = [] - should_be_no_new_lines = False - for line in all_data.splitlines(keepends=True): - if line[-1] in LINE_SPLITTERS: - result.append((description, line.rstrip(LINE_SPLITTERS))) - elif should_be_no_new_lines: - raise ValueError('line splitters are not compatible with' - 'str.splitlines!') - else: - self.line_buffers[idx] = line - should_be_no_new_lines = True # (this should be the last) - - return result - - -class LogParser: - """ takes lines from LineReader and parses their contents """ - pass diff --git a/src/arnied.py b/src/arnied.py new file mode 100644 index 0000000..38132f0 --- /dev/null +++ b/src/arnied.py @@ -0,0 +1,46 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" Interface to arnied + +only a stub; should at least contain a set_cnf and get_cnf + +.. codeauthor:: Intra2net +""" + +def set_cnf(var_name, value): + """ not implemented yet """ + raise NotImplementedError() + + # not good enough: implementation in autotest guest/utils/backup_utils + + +def get_cnf(var_name, value): + """ not implemented yet """ + raise NotImplementedError() + + # not good enough: implementation in autotest guest/utils/backup_utils + + +def wait_for_generate(timeout=None): + """ wait for generate to run/finish + + to be copied from autotest arnied_wrapper + """ + + raise NotImplementedError() diff --git a/src/buffers.py b/src/buffers.py new file mode 100644 index 0000000..8e461ca --- /dev/null +++ b/src/buffers.py @@ -0,0 +1,149 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" +buffers.py: buffers of various shapes, sizes and functionalities + +Featuring:: + +* CircularBuffer +* LogarithmicBuffer: saves only last N items, and after that less and less so +* very few old items are kept + +.. codeauthor:: Intra2net +""" + +class CircularBuffer: + """ circular buffer for data; saves last N sets + + can return or run func on them afterwards in correct order + + public attributes (please read only!): buffer_size, n_items + """ + + buffer_size = None + _buffer = None + _buff_idx = None + n_items = None + + def __init__(self, size, empty_element=None): + """ initialize with N = size """ + if size < 1: + raise ValueError('size must be positive!') + self.buffer_size = size + self._buffer = [empty_element for idx in range(size)] + self._buff_idx = 0 + self.n_items = 0 + + def add(self, new_item): + """ add a new item to buffer -- might replace the oldest item """ + oldest_item = self._buffer[self._buff_idx] + self._buffer[self._buff_idx] = new_item + self._buff_idx = (self._buff_idx + 1) % self.buffer_size + self.n_items += 1 + return oldest_item + + def run_func(self, some_func): + """ run some_func(item) on last saved items in correct order """ + if self.n_items >= self.buffer_size: + for idx in range(self._buff_idx, self.buffer_size): + some_func(self._buffer[idx]) + + for idx in range(0, self._buff_idx): + some_func(self._buffer[idx]) + + def get_all(self): + """ return the buffered contents in correct order """ + result = [] + if self.n_items >= self.buffer_size: + for idx in range(self._buff_idx, self.buffer_size): + result.append(self._buffer[idx]) + + for idx in range(0, self._buff_idx): + result.append(self._buffer[idx]) + + return result + + +class LogarithmicBuffer: + """ saves only last N items, and after that less and less old data + + --> grows only logarithmically in size + + has a data_new which is a CircularBuffer for the newest N items + data_old is a list of items with ages approx 2, 4, 8, 16, ... counts + + Could easily extend to other bases than 2 to make even scarcer + """ + + def __init__(self, n_new): + self.n_new = n_new + if n_new < 0: + raise ValueError('n_new must be non-negative!') + if n_new == 0: + self._data_new = None + else: + self._data_new = CircularBuffer(n_new) + self._count = 0 + self._data_old = [] + + def add(self, new_item): + """ add a new item to buffer """ + if self._data_new: + newest_old_item = self._data_new.add(new_item) + age = self._count - self.n_new + else: + newest_old_item = new_item + age = self._count + + if age >= 0: + self._save_old(newest_old_item, age, 0) + self._count += 1 + + def _save_old(self, item, age, index): + """ internal helper for saving (or not) items in data_old """ + + # determine whether we throw it away or actually save it + if age % 2**index != 0: + return + + # we save it. But before check if we need to extend data_old or + # save the item we would overwrite + if len(self._data_old) <= index: + self._data_old.append(item) + else: + self._save_old(self._data_old[index], age, index+1) + self._data_old[index] = item + + def get_all(self): + """ get all buffer contents in correct order """ + if self._data_new: + return self._data_old[::-1] + self._data_new.get_all() + else: + return self._data_old[::-1] + + def get_size(self): + """ returns current number of saved elements in buffer + + size is O(log n) where n = number of added items + + more precisely: size is n_new + floor(log_2(m-1))+1 + where: m = n-n_new > 1 + (m=0 --> size=n_new; m=1 --> size=n_new+1) + """ + return self.n_new + len(self._data_old) diff --git a/src/call_helpers.py b/src/call_helpers.py new file mode 100644 index 0000000..1739d5f --- /dev/null +++ b/src/call_helpers.py @@ -0,0 +1,83 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" Helpers for calling commands, capture their output, return result code + +Subprocess library just does not provide all the simplicity we would like + +Stay python2 compatible --> no timeouts + +.. codeauthor:: Intra2net +""" + +from subprocess import Popen, PIPE + + +def call_and_capture(command, stdin_data=None, split_lines=True, + *args, **kwargs): + """ call command, captures stdout, stderr and return code, return when done + + Use only for commands with little output since all output has to be + buffered! + Quoting :py:mod:`subprocess`: + + ..note:: The data read is buffered in memory, so do not use this method if + the data size is large or unlimited. + + Forwards all args to Popen constructor, except: + stdout=PIPE (forced, ignored if in kwargs) + stderr=PIPE (forced, ignored if in kwargs) + shell=False (except if set in kwargs) + universal_newlines=True (except if set in kwargs) + + :param command: forwarded as first arg to Popen constructor + :param str stdin_data: forwarded to stdin of process through communicate + :param bool split_lines: True (default) to split output line-wise and + return list of strings; False --> return single + string for out and one for err + :param args: forwarded to Popen constructor + :param kwargs: forwarded to Popen constructor + :returns: (return_code, stdout, stderr); stdout and stderr are lists of + text lines (without terminating newlines); if universal_newlines is + True (default), the lines are of type str, otherwise they are non- + unicode text (type str in py2, bytes in py3). If split_lines is False + (not default), then stdout and stderr are single multi-line strings + + :raise: OSError (e.g., if command does not exist), ValueError if args are + invalid; no :py:class:`subprocess.CalledProcessError` nor + :py:class:`subprocess.TimeoutExpired` + """ + + # construct args + enforced_params = ('stdout', 'stderr') + my_kwargs = dict(stdout=PIPE, stderr=PIPE, + shell=False, universal_newlines=True) + for key, value in kwargs.items(): + if key not in enforced_params: + my_kwargs[key] = value + + # run command + proc = Popen(command, *args, **my_kwargs) + stdout_data, stderr_data = proc.communicate(stdin_data) + + # return + if split_lines: + return proc.returncode, stdout_data.splitlines(), \ + stderr_data.splitlines() + else: + return proc.returncode, stdout_data, stderr_data diff --git a/src/file_helpers.py b/src/file_helpers.py new file mode 100644 index 0000000..2035a77 --- /dev/null +++ b/src/file_helpers.py @@ -0,0 +1,176 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" Helper functions and classes to deal with files and dirs and stuff + +Featuring:: + +* the cd context manager pwd(); with cd(other_dir): pwd(); pwd(); + will print current working dir, then other_dir, then first dir again +* a wrapper around "df" to determine size and usage of file systems + +.. todo:: change get_filesystem_fill_states to not fork + +.. codeauthor:: Intra2net +""" + +from __future__ import print_function + +from contextlib import contextmanager +import os +from warnings import warn +from math import floor, ceil + +from call_helpers import call_and_capture +from iter_helpers import pairwise + + +@contextmanager +def cd(path): + """A context manager which changes the working directory to the given + path, and then changes it back to its previous value on exit. + + Taken from comment for python recipe by Greg Warner at + http://code.activestate.com/recipes/576620-changedirectory-context-manager/ + (MIT license) + """ + prev_cwd = os.getcwd() + os.chdir(path) + try: + yield + finally: + os.chdir(prev_cwd) + + +DF_CMD = ['/usr/bin/df', '--no-sync', '--portability'] +DF_SIZE_UNIT = 1024 + +class FilesystemFillState: + """ representation of 1 line of the 'df' command + + has fields filesystem, size, used, available, capacity, mount_point + + Note that only apprixomately capacity == used/size + and that only approximately used + available == size + and that all sizes are in bytes + """ + + def __init__(self): + self.name = None + self.size = None + self.used = None + self.available = None + self.capacity = None + self.mount_point = None + + def __str__(self): + return '[Filesystem {0} mounted at {1}: {2}% used]' \ + .format(self.name, self.mount_point, self.capacity) + +def get_filesystem_fill_states(): + """ get fill state on all filesystems + + parses the output of cmd 'df', returns list of + :py:class:`FilesystemFillState` + + ..todo:: replace call to df (which forks) by call to statvfs combined with + some way to find all mounted partitions (parse /proc/mounts + whenever it changes) + """ + + # call + code, out, err = call_and_capture(DF_CMD) + + # warn if unexpected outcome + if code != 0: + warn('df returned non-zero exit code {0}!'.format(code)) + if err: + for line in err: + warn('df produced output to stderr: "{0}"'.format(line)) + + # find columns in output that are just spaces + min_len = min(len(line) for line in out) + separator_cols = [idx for idx in range(min_len) if \ + all(line[idx] == ' ' for line in out)] + checked_cols = [separator_cols[0], ] + for prev_col, col in pairwise(separator_cols): + if col != prev_col+1: + checked_cols.append(col) + separator_cols = checked_cols + + # check columns and their header + if len(separator_cols) != 5: + raise ValueError('unexpected number of separator columns: {0}' + .format(separator_cols)) # must eliminate neighbours? + + title_line = out[0] + title = title_line[ : separator_cols[0]].strip() + if title != 'Filesystem': + warn('Unexpected column title: "{0}" != "Filesystem"!' + .format(title)) + title = title_line[separator_cols[0] : separator_cols[1]].strip() + if title != '1024-blocks': + warn('Unexpected column title: "{0}" != "1024-blocks"!' + .format(title)) + title = title_line[separator_cols[1] : separator_cols[2]].strip() + if title != 'Used': + warn('Unexpected column title: "{0}" != "Used"!' + .format(title)) + title = title_line[separator_cols[2] : separator_cols[3]].strip() + if title != 'Available': + warn('Unexpected column title: "{0}" != "Available"!' + .format(title)) + title = title_line[separator_cols[3] : separator_cols[4]].strip() + if title != 'Capacity': + warn('Unexpected column title: "{0}" != "Capacity"!' + .format(title)) + title = title_line[separator_cols[4] : ].strip() + if title != 'Mounted on': + warn('Unexpected column title: "{0}" != "Mounted on"!' + .format(title)) + + # create result + result = [] + for line in out[1:]: + stats = FilesystemFillState() + stats.name = line[ : separator_cols[0]].strip() + stats.size = int(line[separator_cols[0] : separator_cols[1]].strip()) \ + * DF_SIZE_UNIT + stats.used = int(line[separator_cols[1] : separator_cols[2]].strip()) \ + * DF_SIZE_UNIT + stats.available = int(line[separator_cols[2] : separator_cols[3]]\ + .strip()) * DF_SIZE_UNIT + stats.capacity = int(line[separator_cols[3] : separator_cols[4]]\ + .strip()[:-1]) + stats.mount_point = line[separator_cols[4] : ].strip() + + # more checks: does match capacity + capacity = 100. * stats.used / stats.size + if abs(capacity - stats.capacity) > 5.: + warn('capacities for {0} deviate more than 5%: ' + '{1} != {2:.2f}(={3}/{4})'.format( + stats.name, stats.capacity, capacity, stats.used, stats.size)) + + size = stats.used + stats.available + if float(abs(stats.size - size)) / float(max(stats.size, size)) > 0.1: + warn('size for {0} differs by more than 10% from used+available!' + .format(stats.name)) + + result.append(stats) + + return result diff --git a/src/follow.py b/src/follow.py new file mode 100644 index 0000000..3cf8bcb --- /dev/null +++ b/src/follow.py @@ -0,0 +1,638 @@ +#!/usr/bin/env python + +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" +follow process output, log files and pipes using select and poll + +DEPRECATED +(at least for files see log_read; may still be usefull for pipes/sockets) + +Main class is Follower which does the polling and selecting, it is best used +in a with-statement as follows:: + + with follow('/var/log/messages') as flwr + for line in flwr: + do_something_with(line) + +This will read the given file and yield its contents line-by-line until the end +of the file. It will then wait for new additions to the file and provide the +new lines newly instantaneously + +Things to note: + +* all data must be line-based! +* will only work on Linux (except for sockets maybe). +* create in py2 but try to stay py3-compatible. + +[START: not implemented yet] + +If following a log file, a LogParser can be attached that auto-detects some of +the log's structure (like date and time fields, log levels and sources) from +its first few lines.. This can be used anlogously. Of course, you can also +specify the log's structure (to be implemented before the auto-detect...):: + + with follow_log('/var/log/messages') as log_flwr: + for content in log_flwr: + do_something_with(content.datetime, content.log_level, content.text) + +[END: not implemented yet] + +A Follower is an iterator, which means you can do lots of cool things with it, +including (see also itertools package, itertool recipies, "Functional +Programming Howto"):: + + # ignore source and description: + for _, _, text_line in my_follower: + do_something_with(line) + + # enumerate: + for line_number, line in enumerate(my_follower) + do_something_with(line, line_number) + + # combine with other iterator: + for line, other_data in zip(my_follwer, other_iter) + do_something_with(line, other_data) + + # filter: + for line in my_follower if test(my_func): + do_something_with(line) + + # tee: + iter1, iter2 = itertools.tee(my_follower) + --> can read from both (but each line is given to only one of them) + + # for each element, peek at the next one to help do the right thing + for line, next_line in pairwise(my_follower): + do_something_with(line, peek_next_line=next_line) + + # create new iterator or generator + for line in my_follwer: + some_result = do_something_with(line) + yield some_result + +NOT possible:: + + len(my_follower) + Follower(my_file, my_file) # (twice the same) + +.. codeauthor:: Intra2net +""" + +from __future__ import print_function +from warnings import warn +from select import poll, POLLIN, POLLOUT, POLLPRI, POLLHUP, POLLERR, POLLNVAL +from subprocess import Popen, PIPE +from socket import socket as Socket + +from type_helpers import isstr + +# ############################################################################# +# CONSTANTS +# ############################################################################# +DEFAULT_POLL_FLAGS = POLLIN | POLLPRI | POLLHUP | POLLERR + +DEFAULT_POLL_TIMEOUT = 10 + +# ############################################################################# +# EXCEPTIONS +# ############################################################################# + + +def flags_to_str(flags): + """ return short string representation for poll flags """ + text_parts = [] + if flags & POLLIN: + text_parts.append('IN') + if flags & POLLOUT: + text_parts.append('OUT') + if flags & POLLPRI: + text_parts.append('PRI') + if flags & POLLERR: + text_parts.append('ERR') + if flags & POLLHUP: + text_parts.append('HUP') + if flags & POLLNVAL: + text_parts.append('INVALID') + return ','.join(text_parts) + + +class PollFlagsException(Exception): + """ exception raised if polling returned unexpected flags """ + def __init__(self, flags, desc): + super(PollFlagsException, self).__init__( + 'Unexpected flags from polling {0}: {1}'.format( + desc, flags_to_str(flags))) + + +class PollHupException(Exception): + """ exception raised when polled source sends a SIGHUP """ + def __init__(self, desc): + super(PollHupException, self).__init__( + 'Received HUP from polling, {0} was probably killed!'.format(desc)) + + +class PollUnknownSourceException(Exception): + """ exception raised when polling returns unknown source """ + def __init__(self, file_no): + super(PollUnknownSourceException, self).__init__( + 'Unknown source returned from polling: file_no={0}!'.format( + file_no)) + + +class Follower(object): + """ uses select and poll to follow some set of pipes, files, sockets + + relies on data being line-based! + + will read as much data as possible and then wait for more + + Iterator over lines! + """ + + # iterator over zip(sources, file_nos, descriptions) + _source_iter = None + + # for polling: + _poller = None + _flags = None + _timeout = None + + def __init__(self, *sources_and_descriptions, **other_args): + """ create a Follower for given sources and optional descriptions + + Will guess if args are just sources or also descriptions of these. + All of these are possible: + + * Follower(src). + * Follower(src, desc). + * Follower(src1, src2, src3). + * Follower(src1, desc1, src2, desc2, src3, desc3). + * Follower(src1, src2, desc2, src3). + * Follower(src, desc_ignored, desc_used) # warn but accept. + * Follower(single_list_of_sources_and_descs). + + Not possible: + + * Follower(). + * Follower(desc, src). + + Descriptions must be strings, they identify the sources in generated + output and are used in error messages + + Sources must be file handles, open pipes or open sockets (or anything + else that gives a reasonable fileno(), so help of the :py:mod:`select` + module) + + Sources are not closed! + + other_args can include flags and timeout + """ + + # take care about other_args + if 'flags' in other_args: + self._flags = other_args['flags'] + else: + self._flags = DEFAULT_POLL_FLAGS + if 'timeout' in other_args: + self._timeout = other_args['timeout'] + else: + self._timeout = DEFAULT_POLL_TIMEOUT + + for key in other_args.keys(): + if key not in ('flags', 'timeout'): + raise ValueError('argument not recognized: {0}'.format(key)) + + self._poller = poll() + + sources = [] + file_nos = [] + descriptions = [] + + # take care of sources_and_descriptions + if not sources_and_descriptions: + raise ValueError('need at least one source!') + elif len(sources_and_descriptions) == 1 \ + and isinstance(sources_and_descriptions[0], (list, tuple)): + sources_and_descriptions = sources_and_descriptions[0] + + for arg in sources_and_descriptions: + if isstr(arg): # is a description + if len(descriptions) == 0: + raise ValueError( + 'first arg must be source, not a description!') + if descriptions[-1] is not None: + warn('Overwriting description "{0}" with "{1}"'.format( + descriptions[-1], arg)) + descriptions[-1] = arg + else: # is a new source + sources.append(arg) + file_nos.append(arg.fileno()) + descriptions.append(None) + self._poller.register(arg, self._flags) + # end: for all args + + # need iterator over these 3 lists all the time + self._source_iter = list((src, fno, desc) for src, fno, desc in + zip(sources, file_nos, descriptions)) + # end: Follower constructor + + def next(self): + """ main function to poll next line from sources + + returns (source, desc, line_stripped, flags) + """ + + while True: + result = self._poller.poll(self._timeout) + for file_no, flags in result: + + # identify source + desc = -1 + source = None + for curr_src, curr_no, curr_desc in self._source_iter: + if curr_no == file_no: + desc = curr_desc + source = curr_src + break + # end: loop over self._source_iter + if desc is -1: + raise PollUnknownSourceException(file_no) + + if not flags & self._flags: + raise PollFlagsException(flags, desc) + + if flags & POLLHUP: + #raise PollHupException(desc) + warn('received a hangup during polling for {0}' + .format(desc)) + + if flags & POLLERR: + warn('received an err during polling for {0}' + .format(desc)) + + if flags & POLLNVAL: + warn('poll replied "invalid request" err during polling ' + 'for {0}' + .format(desc)) + + # read line from source + line = source.read() # was readline(), but try something else + # removed the rstrip() from return + + if not line: + continue + # if reach here, we have a new line of text + + return source, desc, line, flags + # for poll results + # end: inf loop + # end: Follower._next_line + + def __iter__(self): + """ makes this an iterator, called by iter(my_follower) """ + return self + + def __next__(self): + """ called by next(my_follower) """ + return self.next() +# end: class Follower + + +# ############################################################################# +# CONTEXT MANAGEMENT +# ############################################################################# + +#: defualt args for Popen constructor in case a process is given as cmd +DEFAULT_SUBPROCESS_ARGS = dict(bufsize=1, stdin=None, stdout=PIPE, stderr=PIPE) + +class FollowContextManager(object): + """ context for Follower objects, ensures everything is closed properly + + opens and closes files and sockets; communicates and waits for Popen + process objects whose stdout and stderr pipes are followed + """ + + files = None + file_descs = None + file_handles = None + + sockets = None + socket_descs = None + + procs = None + proc_descs = None + proc_objs = None + + def __init__(self, files=None, file_descs=None, + sockets=None, socket_descs=None, + procs=None, proc_descs=None, + subprocess_args=None): + """ create a context manager for Follower + + check args and that they match. + tries to guess good descs for files, sockets and procs that are not + given + + :param files: list/tuple of, or single file handle or file name + :param file_descs: None or list/tuple of same length as files + :param sockets: list/tuple of, or single socket + :param socket_descs: None or list/tuple of same length as sockets + :param procs: list/tuple of, or single Popen object or command itself + as str/list/tuple + :param proc_descs: None or list/tuple of same length as procs + :param dict subprocess_args: dict or args that are merged with + :py:data:`DEFAULT_SUBPROCESS_ARGS` + + ..seealso:: could have tried to implement this as a nesting of many + individual context managers, see :py:mod:`contextlib` + """ + print('__init__') + + # set files and file_descs and ensure that they matching lists + if files is None: + self.files = [] + elif isstr(files): + self.files = [files, ] + elif isinstance(files, file): + self.files = [files, ] + else: + self.files = files + if file_descs is None: + temp_descs = [None for _ in self.files] + elif len(self.files) == len(file_descs): + temp_descs = file_descs + else: + raise ValueError('if given file descs, need desc for all files!') + + # try to guess good file_desc values; ensure they are str + self.file_descs = [] + for file_nr, (file, file_desc) in \ + enumerate(zip(self.files, temp_descs)): + if isstr(file_desc): + self.file_descs.append(file_desc) + continue + elif file_desc is None: # need to guess something + if isstr(file): + self.file_descs.append(file) + else: + self.file_descs.append('file{0}'.format(file_nr)) + else: # desc is neither str nor None + raise ValueError('file descs must be string or None!') + + # set sockets and socket_descs and ensure that they matching lists + if sockets is None: + self.sockets = [] + elif isinstance(sockets, Socket): + self.sockets = [sockets, ] + else: + self.sockets = sockets + if socket_descs is None: + temp_descs = [None for _ in self.sockets] + elif len(self.sockets) == len(socket_descs): + temp_descs = socket_descs + else: + raise ValueError('if given socket descs, ' + 'need descs for all sockets!') + + # try to guess good socket_desc values; ensure they are str + self.socket_descs = [] + for file_nr, (socket, socket_desc) in \ + enumerate(zip(self.sockets, temp_descs)): + if isstr(socket_desc): + self.socket_descs.append(socket_desc) + elif socket_desc is None: # need to guess something + self.socket_descs.append('socket{0}'.format(socket_nr)) + else: # desc is neither str nor None + raise ValueError('socket descs must be string or None!') + + # set procs and proc_descs and ensure they matching lists + if procs is None: + self.procs = [] + elif isstr(procs): + self.procs = [procs, ] + elif isinstance(procs, Popen): + self.procs = [procs, ] + else: + self.procs = procs + if proc_descs is None: + temp_descs = [None for _ in self.procs] + elif len(proc_descs) == len(self.procs): + temp_descs = proc_descs + else: + raise ValueError('if given proc descs, need descs for all procs!') + + # try to guess good proc_desc values; ensure they are str + self.proc_descs = [] + for proc_nr, (proc, proc_desc) in \ + enumerate(zip(self.procs, temp_descs)): + if isstr(proc_desc): + self.proc_descs.append(proc_desc) + elif proc_desc is None: # need to guess something + if isstr(proc): + self.proc_descs.append(proc) + elif isinstance(proc, (tuple, list)): + self.proc_descs.append(' '.join(proc)) + elif isinstance(proc, Popen): + if isstr(proc.args): + self.proc_descs.append(proc.args) + else: + self.proc_descs.append(' '.join(proc.args)) + else: + self.proc_descs.append('proc{0}'.format(proc_nr)) + else: # desc is neither str nor None + raise ValueError('proc descs must be string or None!') + + self.subprocess_args = DEFAULT_SUBPROCESS_ARGS + if subprocess_args is not None: + self.subprocess_args.update(subprocess_args) + + def __enter__(self): + """ called when entering run context + + opens files, tries to create descs, and assembles args for Follower + """ + print('__enter__') + + args = [] + + # open files + self.file_handles = [] + for file_arg, desc in zip(self.files, self.file_descs): + if isinstance(file_arg, str): + new_handle = open(file_arg) + else: # assume file_arg is a file handle + new_handle = file_arg + self.file_handles.append(new_handle) + args.append(new_handle) + args.append(desc) + # end: for files and file_descs + + for sock_number, (socket, desc) in \ + enumerate(zip(self.sockets, self.socket_descs)): + args.append(socket) + args.append(desc) + + self.proc_objs = [] + for proc, desc in zip(self.procs, self.proc_descs): + if isstr(proc) or isinstance(proc, (list,tuple)): + proc = Popen(proc, **self.subprocess_args) + self.proc_objs.append(proc) + args.append(proc.stdout) + args.append(desc + '_out') + args.append(proc.stderr) + args.append(desc + '_err') + + return Follower(args) + + def __exit__(self, exc_type, exc_value, traceback): + """ called when leaving run context """ + print('__exit__ with args {0}, {1}, {2}'.format(exc_type, exc_value, + traceback)) + + # close files + for handle in self.file_handles: + try: + handle.close() + except Exception: + warn('ignoring exception exiting follower context manager!') + for proc in self.proc_objs: + try: + proc.kill() + rest_out, rest_err = proc.communicate() + if rest_out or rest_err: + warn('Ignoring left-over output in proc') + except Exception: + warn('ignoring exception exiting follower context manager!') +# end: class FollowContextManager + + +def follow(*args, **kwargs): + """ creates a ContextManager for a Follower to be used in "with" statements + + for example: + + with follow('/var/log/messages') as log_flwr + for source, desc, line in log_flwr + do_something_with(line) + + for specification of args see FollowContextManager constructor help + """ + return FollowContextManager(*args, **kwargs) + + +# ############################################################################# +# TESTING +# (hard to create unittest -- would need separate threads to write to some +# file /pipe/socket so can test the interactivity...) +# ############################################################################# + +from datetime import date, datetime as dt, timedelta as td +from subprocess import Popen, PIPE + +syslog_file = '/var/log/messages' +time_diff_seconds = 60 +syslog_time_format = '%b %d %H:%M:%S' + + +def test_syslog_line(n_lines, source, desc, line, + today_str, start_time): + """ called by test functions for each line of syslog """ + + if n_lines % 1000 == 0: + print(('{0:6d} old lines, showing lines after {1}; ' + 'abort using Ctrl-C').format(n_lines, start_time)) + if line[:6] != today_str: + return + try: + log_time = dt.strptime(line[:15], syslog_time_format) + log_time = log_time.replace(year=start_time.year) + except ValueError: + log_time = None + + if log_time is None or log_time > start_time: + print('line {0} from "{1}", (orig from {2}): {3}'.format( + n_lines, desc, log_time, line)) + + +def test_follower_syslog(): + """ test Follower on syslog file using PIPEs and an open file + + show lines from the last 5 minutes + """ + + start_time = dt.now() - td(seconds=time_diff_seconds) + today_str = date.today().strftime(syslog_time_format)[:6] + if today_str[4] == '0': + today_str = today_str[:4] + ' ' + today_str[5:] # 'Oct 01' --> 'Oct 1' + + # create process for 'tail -f syslog' --> 2 pipes (stdout + stderr) + proc = Popen(['tail', '-f', syslog_file], stdout=PIPE, stderr=PIPE) + + # read from all three sources + with open(syslog_file, 'r') as file_handle: + flwr = Follower(file_handle, 'syslog file', proc.stdout, 'tail syslog', + proc.stderr, 'tail stderr') + for n_lines, (source, desc, line, flags) in enumerate(flwr): + if flags: + print('received flags {0}'.format(flags_to_str(flags))) + test_syslog_line(n_lines, source, desc, line, + today_str, start_time) + + +def test_follower_context(): + """ test FollowContextManager and follow() function """ + + today_str = date.today().strftime(syslog_time_format)[:6] + if today_str[4] == '0': + today_str = today_str[:4] + ' ' + today_str[5:] # 'Oct 01' --> 'Oct 1' + start_time = dt.now() - td(seconds=time_diff_seconds) + with follow(syslog_file) as flwr: + for n_lines, (source, desc, line, flags) in enumerate(flwr): + if flags: + print('received flags {0}'.format(flags_to_str(flags))) + test_syslog_line(n_lines, source, desc, line, + today_str, start_time) + +def test_context_proc(): + """ test FollowContextManager's ability to wrap proc args """ + + today_str = date.today().strftime(syslog_time_format)[:6] + if today_str[4] == '0': + today_str = today_str[:4] + ' ' + today_str[5:] # 'Oct 01' --> 'Oct 1' + start_time = dt.now() - td(seconds=time_diff_seconds) + with follow(procs=(['tail', '-f', syslog_file], )) as flwr: + for n_lines, (source, desc, line, flags) in enumerate(flwr): + if flags: + print('received flags {0}'.format(flags_to_str(flags))) + test_syslog_line(n_lines, source, desc, line, + today_str, start_time) + + +def test(): + """ Main function, tests some of class's functionality """ + + #test_follower_syslog() + #test_follower_context() + test_context_proc() +# end: function main + + +if __name__ == '__main__': + test() diff --git a/src/iter_helpers.py b/src/iter_helpers.py new file mode 100644 index 0000000..624ede5 --- /dev/null +++ b/src/iter_helpers.py @@ -0,0 +1,33 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" Helper functions with iterators -- functional program is great! + +Currently, recipes from python itertools docu (:py:mod:`itertools`) that have +been deemed useful + +.. codeauthor:: Intra2net +""" + +from itertools import * + +def pairwise(iterable): + """ s -> (s0,s1), (s1,s2), (s2, s3), ... """ + a, b = tee(iterable) + next(b, None) + return zip(a, b) diff --git a/src/log_helpers.py b/src/log_helpers.py new file mode 100644 index 0000000..d51e641 --- /dev/null +++ b/src/log_helpers.py @@ -0,0 +1,462 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" Helpers for logging; featuring: + +ShortLevelFormatter: provide a 4-char-sized field "shortlevel" for message +urgency (dbug/info/warn/err /crit) + +I2nLogger: logger that provides a notice(), allows omission for str.format + and is quite convenient in general + +get_logger: factor for creating I2nLoggers + +Further ideas: :: +* allow milliseconds in dateformat field (see test_short_level_format) +* create own basicConfig-like function that uses our classes as default + --> started using I2nLogger constructor and get_logger +* try to find out module that calls I2nLogger constructor to provide a good + default value for name (finding out the current module is a pain in the a..) + +..todo:: create unittests from test_* functions at bottom +..todo:: think about how to allow different levels per handler +..todo:: do not limit logs by line numbers but by disc size? Warn when at 50%, + 75%, 90%, 99% of limit? + +.. codeauthor:: Intra2net +""" + +import logging +from logging import DEBUG, INFO, WARNING, ERROR, CRITICAL, NOTSET +from math import log10, floor +import sys + +from type_helpers import isstr + +#: log level half-way between INFO and WARNING +NOTICE = (INFO + WARNING)/2 +logging.addLevelName(NOTICE, 'NOTICE') + +#: default formatting string for ShortLevelFormatter +DEFAULT_SHORT_LEVEL_FORMAT = '%(asctime)s:%(msecs)03d %(shortlevel)s| %(msg)s' + +#: default formatting string for date/time in ShortLevelFormatter +DEFAULT_SHORT_LEVEL_DATE_FORMAT = '%H:%M:%S' + +#: mapping from level name to level int for I2nLogger's set_level +LEVEL_DICT = dict(notset=NOTSET, debug=DEBUG, info=INFO, notice=NOTICE, + warning=WARNING, error=ERROR, critical=CRITICAL) + +#: min level allowed in I2nLogger +MIN_LEVEL = NOTSET + +#: max level allowed in I2nLogger +MAX_LEVEL = CRITICAL + +#: constant for I2nLogger streams argument: stream to stdout +STDOUT = sys.stdout + + +class ShortLevelFormatter(logging.Formatter): + """ + Formatter for logging handlers that allows use of format field "shortlevel" + + using this formatter, you can specify in the log message format string the + field "shortlevel" which will introduce in your log messages a 4-char field + for the log record urgency: "DEBUG" --> "dbug", "INFO" --> "info", + "WARNING" --> "warn", "ERROR" --> "err ", "CRITICAL" --> "crit" + + All other functionality (like other format fields) is inherited from base + class Formatter + + Most easily used indirectly through :py:class:`I2nLogger` + (uses this by default) + + Explicit usage example:: + + logger = logging.getLogger(__name__) + logger.setLevel(logging.DEBUG) + handler = logging.StreamHandler() + handler.setLevel(logging.DEBUG) + formatter = ShortLevelFormatter('%(shortlevel)s| %(msg)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + + You can even add new levels:: + + notice = (logging.INFO + logging.WARNING)/2 + formatter.add_level(notice, 'note') + logger.log(notice, 'more important than info but no warning nor error') + + .. seealso:: testing funcion :py:func:`test_short_level_format` + """ + + def __init__(self, fmt=DEFAULT_SHORT_LEVEL_FORMAT, + datefmt=DEFAULT_SHORT_LEVEL_DATE_FORMAT, *args, **kwargs): + """ creates a new ShortLevelFormatter + + forwards all args to super class Formatter and initializes dict with + levelno and short string representations + """ + self.parent = super(ShortLevelFormatter, self) + self.parent.__init__(fmt=fmt, datefmt=datefmt, *args, **kwargs) + self._shortlevel_dict = {DEBUG: 'dbug', INFO: 'info', WARNING: 'warn', + ERROR: 'err ', CRITICAL: 'crit', + NOTSET: '----'} + + def format(self, record): + """ create string representation of given log record """ + try: + record.shortlevel = self._shortlevel_dict[record.levelno] + except KeyError: + record.shortlevel = '????' + + return self.parent.format(record) + + def add_level(self, levelno, shortlevel_str): + """ add a new message urgency level + + :param int levelno: numeric urgency level + :param str shortlevel_str: string representation of message urgency; + should be of length 4 + :returns: nothing + """ + + self._shortlevel_dict[levelno] = shortlevel_str + + +#: memory for all I2nLogger instances in current session to avoid overwrite +#: used in :py:func:`get_logger` and :py:class:`I2nLogger` constructor +_i2n_loggers = {} + + +def get_logger(name, *args, **kwargs): + """ factor for :py:class:`I2nLogger`: create new or return existing object + + all args as in :py:class:`I2nLogger` constructor; will be ignored if logger + with given name exists already! + """ + + if name in _i2n_loggers: + return _i2n_loggers[name] # ignore all args + else: + return I2nLogger(name, *args, **kwargs) + + +class I2nLogger: + """ a more convenient logger + + Features:: + * can be used without ".format()" as follows:: + + logger.info('the result is {0}', result) # more convenient than... + logger.info('the result is {0}'.format(result)) # ... the original + + * Has a :py:meth:`Logger.notice` function + * Has by default as only handler a :py:class:`logging.StreamHandler` to + stdout with a :py:class:`ShortLevelFormatter` + * Simplifies setting of logger's logging level using + :py:meth:`Logger.set_level`. The level is kept in sync between logger and + handlers and can be queried using :py:meth:`get_level` + (does not work accross logger hierarchies!) + * can specify name and level and [date]fmt all in constructor + * can limit the number of lines this logger will produce to prevent filling + hard drive with log file + (assumes one line per call to log/debug/info/notice/..., + only counts calls with priority above this logger's level) + * provides shorter method names: dbg, note, warn, err + + ..note:: Creating multiple instances with the same name is not allowed and + will result in an error. Use :py:func:`get_logger` instead of this + constructor to avoid such situtations + + ..note:: Do not change or use the underlying logger or functionality here + (in particular line counting, get_level) is no longer reliable! + """ + + def __init__(self, name, level=INFO, fmt=DEFAULT_SHORT_LEVEL_FORMAT, + datefmt=DEFAULT_SHORT_LEVEL_DATE_FORMAT, + streams=STDOUT, files=None, max_lines=None): + """ creates a I2nLogger; forwards args to logging.getLogger + + + :param str name: name of this logger, best practice is module name + :param int level: best use one of the constants DEBUG, INFO NOTICE, + WARNING, ERROR + :param str fmt: format of log messages, see + :py:class:`ShortLevelFormatter` for more help + :param str datefmt: format of date added to log messages, see + :py:class:`ShortLevelFormatter` for more help + :param streams: list/tuple of or a single stream to log to, default is + STDOUT (=sys.stdout) + :param files: list/tuple or single file name to log to + :param max_lines: number > 0 to limit number of output calls to that + number; give None (default) to no limit + :raise: ValueError if an I2nLogger with the same name exists already + """ + + # check if an instance with the same name has been created earlier + # to prevent conflicts + global _i2n_loggers + if name in _i2n_loggers: + raise ValueError("An I2nLogger with that exact name ('{0}') exists" + " already -- use get_logger instead!" + .format(name)) + + self._log = logging.getLogger(name) + self._level = min(MAX_LEVEL, max(MIN_LEVEL, level)) + self._log.setLevel(self._level) + + # remove handlers (sometimes there are mutliple by default) + for handler in self._log.handlers: + self._log.removeHandler(handler) + + # create new handlers and formatter + if streams is None: + stream = [] + elif not isinstance(streams, (list, tuple)): + streams = (streams, ) + for stream in streams: + formatter = ShortLevelFormatter(fmt=fmt, datefmt=datefmt) + formatter.add_level(NOTICE, 'note') + new_handler = logging.StreamHandler(stream) + new_handler.setFormatter(formatter) + new_handler.setLevel(self._level) + self._log.addHandler(new_handler) + + if files is None: + files = [] + elif not isinstance(files, (list, tuple)): + files = (files, ) + for file_name in files: + formatter = ShortLevelFormatter(fmt=fmt, datefmt=datefmt) + formatter.add_level(NOTICE, 'note') + new_handler = logging.FileHandler(file_name) + new_handler.setFormatter(formatter) + new_handler.setLevel(self._level) + self._log.addHandler(new_handler) + + # remember max_lines + self.set_max_lines(max_lines) + + # remember that this logger is a I2nLogger + _i2n_loggers[name] = self + + def dbg(self, message, *args, **kwargs): + self.log(DEBUG, message, *args, **kwargs) + + def debug(self, message, *args, **kwargs): + self.log(DEBUG, message, *args, **kwargs) + + def info(self, message, *args, **kwargs): + self.log(INFO, message, *args, **kwargs) + + def note(self, message, *args, **kwargs): + self.log(NOTICE, message, *args, **kwargs) + + def notice(self, message, *args, **kwargs): + self.log(NOTICE, message, *args, **kwargs) + + def warn(self, message, *args, **kwargs): + self.log(WARNING, message, *args, **kwargs) + + def warning(self, message, *args, **kwargs): + self.log(WARNING, message, *args, **kwargs) + + def err(self, message, *args, **kwargs): + self.log(ERROR, message, *args, **kwargs) + + def error(self, message, *args, **kwargs): + self.log(ERROR, message, *args, **kwargs) + + def critical(self, message, *args, **kwargs): + self.log(CRITICAL, message, *args, **kwargs) + + def log(self, level, message, *args, **kwargs): + if level >= self._level: + if self._line_counter == self._max_lines: + self._log.log(ERROR, + 'reached max number of output lines ({0}) ' + '-- will not log anything any more!' + .format(self._line_counter)) + self._line_counter += 1 + elif self._line_counter > self._max_lines: + return + else: + self._log.log(level, message.format(*args), **kwargs) + self._line_counter += 1 + + def log_count_if_interesting(self, count, level=INFO, counter_name=None): + """ Log value of a counter in gradually coarser intervals + + see :py:func:`is_interesting_count` for definition of "interesting" + """ + if is_interesting_count(count): + if counter_name: + self.log(level, '{0} counter is at {1}', counter_name, count) + else: + self.log(level, 'Counter is at {0}', count) + + def get_level(self): + """ return int level of this logger """ + return self._level + + def get_level_str(self): + """ returns :py:func:`logging.getLevelName` on :py:meth:`get_level` """ + return logging.getLevelName(self._level) + + def set_level(self, new_level): + """ set level given an int or a str + + :arg new_level: int or str (str is converted to lower case) + :raises: KeyError if new_level is a string that is not in + :py:data:`LEVEL_DICT` + """ + if isstr(new_level): + self._level = LEVEL_DICT[new_level.lower()] + else: + self._level = min(MAX_LEVEL, max(MIN_LEVEL, new_level)) + self._log.setLevel(self._level) + for handler in self._log.handlers: + handler.setLevel(self._level) + + def set_max_lines(self, max_lines): + """ limit number of lines this produces; give None to remove limit + + resets the line counter + """ + if max_lines > 0: + self._max_lines = max_lines + elif max_lines < 0 or (not max_lines): + self._max_lines = None + else: + raise ValueError('unexpected value for max_lines: {0}!' + .format(max_lines)) + self._line_counter = 0 + + def get_max_lines(self): + """ return current value for line limit """ + return self._max_lines + + def exceeded_max_lines(self): + """ return True if nothing will be logged because max_lines was reached + """ + if self._max_lines: + return self._line_counter >= self._max_lines + else: + return False + + +def n_digits(number): + """ returns the number of digits a number has in decimal format + + :returns: 1 for 1...9, 2 for 10...99, 3 for 100...999, ... + 0 for 0 (and everything else beween -1 and 1) + 1 for -1...-9, 2 for -10...-99, ... + """ + if abs(number) < 1: + return 0 + else: + return floor(log10(abs(number)))+1 + + +def is_interesting_count(counter): + """ return True if counter has reached an "interesting" value + + For the counter to be "interesting" becomes ever harder. At first it is + easy (returns True for 1,2,3,6,10), then becomes harder (True only for + 10,20,30,60,100) and harder (True for 100,200,300,600,1000) and this scheme + continues on a logartihmic scale. + + An example that will print lots in the beginning and then less and less:: + + counter = 0 + while not end_reached(): + do_something() + if is_interesting_count(counter): + log('reached iteration {0}'.format(counter)) + counter += 1 + + Or implicitly using I2nLogger::log_count_if_interesting(counter) + + :returns: True for a few values of counter, False most of the time + """ + + return float(counter) / 10.**(n_digits(counter)-1.) in (1., 2., 3., 6.) + + +def test_short_level_format(): + """ quick test of :py:class:`ShortLevelFormatter` """ + + logger = logging.getLogger('logtest') + logger.setLevel(DEBUG) + handler = logging.StreamHandler() + handler.setLevel(DEBUG) + formatter = ShortLevelFormatter( + '%(asctime)s:%(msecs)03d %(shortlevel)s| %(msg)s' + ' [regular levelname=%(levelname)s]', + datefmt='%H:%M:%S') + handler.setFormatter(formatter) + logger.addHandler(handler) + + # 'application' code + logger.debug('debug message') + logger.info('info message') + logger.warn('warn message') + logger.error('error message') + logger.critical('critical message') + logger.log(15, 'unknown level') + logger.log(NOTSET, 'level not set') + + # add notice level + notice = (logging.INFO + logging.WARNING)/2 + formatter.add_level(notice, 'note') + logger.log(notice, 'more important than info but no warning nor error') + + # try if exception formatting still works: + try: + logger.info('this is what an exception looks like:') + impossible_result = 1/0 + logger.critical('just divided 1/0! The result is {0}' + .format(impossible_result)) + except ZeroDivisionError: + logger.exception('1/0 still does not work!', exc_info=True) + + # done + logger.info('done testing') + + +def test_get_logger(): + log = get_logger('logger_test') + log2 = get_logger('logger_test') + print(log == log2) + + +def test_line_counter(): + log = get_logger('logger_test', max_lines=10) + for idx in range(20): + for _ in range(20): + log.debug('should not show nor count') + print('calling log for idx {0}'.format(idx)) + log.info('logging with idx {0}', idx) + log.log_count_if_interesting(idx) + +if __name__ == '__main__': + #test_short_level_format() + #test_get_logger() + test_line_counter() diff --git a/src/log_read.py b/src/log_read.py new file mode 100644 index 0000000..0c30e88 --- /dev/null +++ b/src/log_read.py @@ -0,0 +1,308 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" Iterative reading of log files + +Basic Functionality (class :py:class:`IterativeReader`:) +Runs stat in a loop to find out whether file size has changed. Then reads the +new data and forwards that + +..todo:: Want to also use lsof to find out whether file/pipe/socket was closed, + so can return from read loop + +:py:class:`LineReader` takes output of :py:class:`IterativeReader` and returns +it line-wise as is normal for log files + +:py:class:`LogParser` takes those lines and tries to parse them into fields +like date, time, module name, urgency and message. + +..todo:: auto-detect log line layout + +.. codeauthor:: Intra2net +""" + +import os +from warnings import warn +import os.path +from itertools import zip_longest + +from type_helpers import is_str_or_byte, is_file_obj + + +class LogReadWarning(UserWarning): + """ warnings issued by classes in this module """ + pass + + +def true_func(unused_argument_but_that_is_ok): + """ does nothing, always returns True """ + return True + + +def check_is_used(some_file_or_handle): + """ check whether file is being written to + + to be implemented, e.g. using lsof + """ + raise NotImplementedError() + + +_create_description_unknown_counter = 0 + +def create_description(file_obj, file_desc): + """ create some description for given file-like object / file descriptor + + :param file_obj: file-like object + :param int file_desc: os-level file descriptor + :returns: string + """ + + global _create_description_unknown_counter + + try: + desc = file_obj.name + if desc: + return desc + except AttributeError: + pass + + if file_desc is not None: + return 'file{0}'.format(file_desc) + else: + _create_description_unknown_counter += 1 + return 'unknown{0}'.format(_create_description_unknown_counter) + + +#: error message for IterativeReader constructor +_STR_ERR = 'not accepting file name "{0}" since cannot guarantee closing ' \ + 'files --> use with open(file_name)!' + + +class IterativeReader: + """ reads from a given file + + Uses os.stat(file_obj.fileno()).st_size as measure whether file has changed + or not; Always reads as much data as possible + + Catches most common exceptions in iteration (not constructor) + + Does not are about closing files, so does not accept file names + + This is the base for class :py:class:`LineReader` that just has to + implement a different :py:meth:`prepare_result` method + """ + + def __init__(self, sources, descs=None, return_when_done=False): + """ creates a reader; does some basic checks on args + + :param sources: iterable over sources. Sources can be opened file + objects or read-opened os-level file descriptors. + Calling code has to ensure they are closed properly, so + best use this within a "with open(file_name) as + file_handle:"-context. If sources is a single file + obj/descriptor, both source and desc will be converted + to lists of length 1 + :param descs: can be anything of same length as sources. If sources is + a single source, then descs is also converted to a list + of length 1. If not given (i.e. None), will use + :py:func:`create_description` to guess descriptions + :param bool return_when_done: ignore file_handle if no-one is writing + to it any more. Return from iterator when + all watched files are done (not + implemented yet) + :raises: OSError when testing fstat on source + """ + if not sources: + raise ValueError('need at least some source!') + elif is_str_or_byte(sources): + raise ValueError(_STR_ERR.format(sources)) + elif is_file_obj(sources) or isinstance(sources, int): + source_input = [sources, ] + desc_input = [descs, ] + else: + source_input = sources # assume some iterable + desc_input = descs + + # now divide sources into os-level file descriptors for os.fstat, + # and file objects for read() + self.file_objs = [] + self.file_descs = [] # file descriptOR, not descriptION + for source in source_input: + if is_file_obj(source): + self.file_objs.append(source) + self.file_descs.append(source.fileno()) + elif isinstance(source, int): + self.file_objs.append(os.fdopen(source)) + self.file_descs.append(source) + elif is_str_or_byte(source): + raise ValueError(_STR_ERR.format(source)) + else: + raise ValueError('source {0} is neither file obj nor file ' + 'descriptor!') + + # try to fstat the new file descriptor just for testing + os.fstat(self.file_descs[-1]) + + # guess descriptions if not given + if not desc_input: + self.descriptions = [create_description(obj, file_desc) + for obj, file_desc + in zip(self.file_objs, self.file_descs)] + else: + try: + if len(desc_input) != len(self.file_objs): + raise ValueError('need same number of sources and ' + 'descriptions!') + except TypeError: + pass # desc_input is generator or so + + self.descriptions = [] + for obj, file_desc, description in \ + zip_longest(self.file_objs, self.file_descs, desc_input): + if obj is None: + raise ValueError('more descriptions than sources!') + elif description is None: + self.descriptions.append(create_description(obj, + file_desc)) + else: + self.descriptions.append(description) + + self.last_sizes = [0 for _ in self.file_objs] + self.ignore = [False for _ in self.file_objs] + + if return_when_done: + self.is_used = check_is_used + else: + self.is_used = true_func + + for obj, file_desc, description in zip(self.file_objs, self.file_descs, + self.descriptions): + print('file descriptor {0}, file obj {1}, description "{2}"' + .format(file_desc, obj, description)) + + def n_sources(self): + return len(self.file_objs) + + def n_active_sources(self): + return len(self.ignore) - sum(self.ignore) + + def __iter__(self): + while True: + for idx, (obj, file_desc, description, last_size, do_ignore) in \ + enumerate(zip(self.file_objs, self.file_descs, + self.descriptions, self.last_sizes, + self.ignore)): + + if do_ignore: + continue + + # get new file size + new_size = os.fstat(file_desc).st_size + + # compare to old size + if new_size == last_size: + if not self.is_used(file_desc): + warn('no one is writing to {0} / {1} -- ' + 'stop watching it!' + .format(file_desc, description), + category=LogReadWarning) + self.do_ignore[idx] = True + elif new_size < last_size: + warn('{0} / {1} has become smaller ({2} --> {3})!' + .format(obj, description, last_size, new_size), + category=LogReadWarning) + else: # (new_size > last_size) + try: + new_data = obj.read() + except OSError as ose: # includes IOErrors + warn('io error reading from {0} / {1}: {2})' + .format(obj, description, ose), + category=LogReadWarning) + if len(new_data) != new_size - last_size: + warn('read unexpected amount from {0} / {1}: ' + '{2} bytes instead of {3} bytes!' + .format(obj, description, len(new_data), + new_size-last_size), + category=LogReadWarning) + + # post-processing + to_yield = self.prepare_result(description, new_data, idx) + for result in to_yield: + yield result + + # prepare next iteration + self.last_sizes[idx] = new_size + + def prepare_result(self, description, data, idx): + """ from raw new data create some yield-able results + + to be intended for overwriting in sub-classes + + this function is called from __iter__ for each new data that becomes + available. It has to return some iterable whose entries are yielded + from iteration over objects of this class. + + This base implementation just returns its input in a list, so new data + is yielded from __iter__ as-is + """ + return [(description, data), ] + + +LINE_SPLITTERS = '\n\r' + +class LineReader(IterativeReader): + """ an IterativeReader that returns new data line-wise + + this means buffering partial line data + """ + + def __init__(self, *args, **kwargs): + """ forwards all args and kwargs to :py:class:`IterativeReader` """ + super().__init__(*args, **kwargs) + self.line_buffers = ['' for _ in range(self.n_sources())] + + def prepare_result(self, description, new_data, idx): + """ take raw new data and split it into lines + + if line is not complete, then buffer it + + returns lines without their newline characters + """ + + #print('splitting "{0}" + "{1}"'.format(self.line_buffers[idx], + # new_data.replace('\n', r'\n'))) + all_data = self.line_buffers[idx] + new_data + self.line_buffers[idx] = '' + result = [] + should_be_no_new_lines = False + for line in all_data.splitlines(keepends=True): + if line[-1] in LINE_SPLITTERS: + result.append((description, line.rstrip(LINE_SPLITTERS))) + elif should_be_no_new_lines: + raise ValueError('line splitters are not compatible with' + 'str.splitlines!') + else: + self.line_buffers[idx] = line + should_be_no_new_lines = True # (this should be the last) + + return result + + +class LogParser: + """ takes lines from LineReader and parses their contents """ + pass diff --git a/src/template.py b/src/template.py new file mode 100644 index 0000000..0e01357 --- /dev/null +++ b/src/template.py @@ -0,0 +1,100 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" +Module name: Summary of this module + +Some overview over important classes and functions and such should be specified +in this docstring, but please keep it brief! + + +.. seealso:: http://thomas-cokelaer.info/tutorials/sphinx/rest_syntax.html +.. todo:: Do not forget to correct the codeauthor and license text commented by + hashes above. +.. note:: You should note this +.. warning:: This is a warning + +.. codeauthor:: Intra2net +""" + +from __future__ import print_function + +THE_CONSTANT = None +""" some constant with docstring *AFTER* the constant. Use this format! """ + + +ANOTHER_CONSTANT = 1 #: short docstring in same line -- note the #: + +#: constant with docstring in line above it -- note the #: +THIRD_CONST = 3.333 + + +class TestClass: + """ + test class + + does nothing, really + """ + + def __init__(self, args): + """ constructor, gets its own paragraph in documentation + + :raises NotImplementedError: always + """ + raise NotImplementedError() + + def test_method(self, arg): + """ test method, does nothing + + for more doc string examples, see function :py:func:`send_message` + + :param arg: some argument + :return: Nothing + :raises NotImplementedError: always + """ + raise NotImplementedError() +# end: class TestClass + + +def send_message(sender, recipient, message_body, priority=1): + """ Send a message to a recipient + + Example for a docstring that uses lots of reST fields, taken from + http://sphinx-doc.org/domains.html + + Does not use :py:class:`TestClass` at all. Also has nothing to do with + the python module :py:mod:`logging.handlers` + + Note that it is completely irrelevant what :py:data:`THE_CONSTANT` or + :py:data:`ANOTHER_CONSTANT` or :py:data:`THIRD_CONSTANT` are set to. I just + wanted to insert a link to a constant somewhere and find out how to + document constants + + :param str sender: The person sending the message + :param str recipient: The recipient of the message + :param str message_body: The body of the message + :param priority: The priority of the message, can be a number 1-5 + :type priority: integer or None + :return: the message id + :rtype: int + :raises ValueError: if the message_body exceeds 160 characters + :raises TypeError: if the message_body is not a basestring + :raises NotImplementedError: always + """ + + raise NotImplementedError() diff --git a/src/test_helpers.py b/src/test_helpers.py new file mode 100644 index 0000000..eebebc6 --- /dev/null +++ b/src/test_helpers.py @@ -0,0 +1,367 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" Helpers for developping quick test scripts + +Creation motivated by fear of filling disc space during long-running stress +tests + +.. codeauthor:: Intra2net +""" + +from __future__ import print_function + +from contextlib import contextmanager +from threading import Thread +from time import sleep +from datetime import datetime as dt +from itertools import tee +from warnings import warn +from sys import stderr, exit as sys_exit +from os import getpid, kill, _exit as brutal_exit_function_that_should_not_be_used +import signal + +try: + from warnings import ResourceWarning + WARN_BASE_CLASS = ResourceWarning +except ImportError: + # only added in python 3.2 + WARN_BASE_CLASS = UserWarning + +from buffers import LogarithmicBuffer +from file_helpers import get_filesystem_fill_states, FilesystemFillState +from iter_helpers import pairwise + + +#: filesystems shown by df that usually do not correspond to something on disc +#: (except maybe swap) +NOT_REAL_FILESYSTEMS = 'none', 'shmfs', 'procfs', 'tmpfs', 'ramfs' + + +class DiscFullPreventionError(Exception): + """ Exception raised when disc space gets critical + + issued from function called by :py:class:`DiscFillCheckerThread` + usually only happens after issuing a :py:class:`DiscFullPreventionWarning` + """ + def __init__(self, state, time_estim): + super(DiscFullPreventionError, self).__init__( + 'Interrupting test to avoid full disc ({0}, full in {1}s)' + .format(state, "?" if time_estim==None else time_estim)) + self.state = state + self.time_estim = time_estim + + +class DiscFullPreventionWarning(WARN_BASE_CLASS): + """ Exception raised when disc space approaches problematic state + + issued from function called by :py:class:`DiscFillCheckerThread` + If disc fills up further, will be followed by a + :py:class:`DiscFullPreventionError` or sys.exit + """ + def __init__(self, state, time_estim): + super(DiscFullPreventionWarning, self).__init__( + 'Disc nearly full! Might soon abort test ({0}, ' + 'full in {1}s)' + .format(state, "?" if time_estim==None else time_estim)) + self.state = state + self.time_estim = time_estim + + +class DiscFillChecker: + """ checker for disc fill status """ + + def __init__(self, interval=10, decision_function=None): + + # set variables + self.interval = interval + if decision_function is None: + self.decision_function = default_disc_full_decision_function + else: + # check decision_function: + GIGABYTE = 2**30 + state = FilesystemFillState() + state.name = 'dummy' + state.size = 10 * GIGABYTE + state.used = 1 * GIGABYTE + state.available = 9 * GIGABYTE # asume this should be enough + state.capacity = 10 + state.mount_point = '/not/mounted' + decision_function(state, None, None, None) + + self.decision_function = decision_function + + # remember relevant disc stats at start + self._bufs = {} + for fs_state in get_filesystem_fill_states(): + self._internal_state_buffer(fs_state) + + def _internal_state_buffer(self, fs_state): + """ update internal stats buffer, returns all estims + + internal helper called from __init__ and run + """ + if fs_state.name in NOT_REAL_FILESYSTEMS: + return [] + + buf = None + try: + buf = self._bufs[fs_state.name] + except KeyError: + # new file system -- need to create a new buffer + buf = LogarithmicBuffer(5) + self._bufs[fs_state.name] = buf + + buf.add((dt.now(), float(fs_state.available))) + return buf.get_all() + + def do_check(self): + """ check disc fill state """ + + # loop over current disc fill state + for fs_state in get_filesystem_fill_states(): + # remember newest value with others for same file system + fill_states = self._internal_state_buffer(fs_state) + if not fill_states: + continue + + # estimate time until full (i.e. when available == 0) + times_until_empty = \ + [calc_time_until_zero(old_state, new_state) + for old_state, new_state in pairwise(fill_states)] + + # call user-defined function to decide + min_time = None + if any(time != None and time > 0 for time in times_until_empty): + min_time = min(time for time in times_until_empty + if time != None and time>0) + avg_time = calc_time_until_zero(fill_states[0], + fill_states[-1]) + self.decision_function(fs_state, + times_until_empty[-1], # newest + min_time, # smallest + avg_time) # average + + +class DiscFillCheckerThread(Thread, DiscFillChecker): + """ a thread that checks disc fill in regular intervals """ + + def __init__(self, *args, **kwargs): + """ creates a DiscFillCheckerThread + + :param int interval: time in seconds between checks of disc usage + :param decision_function: function that decides when to issue a warning + or an error. See + :py:func:`default_disc_full_decision_function` + as an example. Function should raise + :py:class:`DiscFullPreventionWarning` and + :py:class:`DiscFullPreventionError` + """ + Thread.__init__(self, name='discFillChck') + DiscFillChecker.__init__(self, *args, **kwargs) + + # make this thread a daemon + self.daemon = True # works since python 2.6 + + # init do_stop + self.do_stop = False + + def run(self): + print('checker thread running') + while not self.do_stop: + sleep(self.interval) + self.do_check() + + def stop(self): + self.do_stop = True + + +MEGABYTE = 2**20 +MINUTE = 60 + +#: default thresholds for :py:func:`default_disc_full_decision_function` +DEFAULT_DISC_SIZE_WARN = 10 * MEGABYTE +DEFAULT_DISC_SIZE_ERR = MEGABYTE +DEFAULT_TIME_TO_FULL_WARN = 10 * MINUTE +DEFAULT_TIME_TO_FULL_ERR = MINUTE + +#: levels at which to kill running tests: regular exception, easily caught +#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked! +KILL_EXCEPTION = "KILL_EXCEPTION" + +#: levels at which to kill running tests: exception that is not a subclass of +#: Exception to avoid "catch(Exception)". +#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked! +KILL_SYS_EXIT = "KILL_SYS_EXIT" + +#: levels at which to kill running tests: brutal way that does not allow any +#: default cleanup (os._exit). Last resort when all others do not work! +KILL_BRUTALLY_ONLY_LAST_RESORT = "KILL_BRUTALLY_ONLY_LAST_RESORT" + +#: levels at which to kill running tests: SIGINT +KILL_SIGINT = "KILL_SIGINT" + + +def default_disc_full_decision_function(curr_state, + estim_empty_newest, + estim_empty_smallest, + estim_empty_mean, + size_warn=DEFAULT_DISC_SIZE_WARN, + size_err=DEFAULT_DISC_SIZE_ERR, + time_warn=DEFAULT_TIME_TO_FULL_WARN, + time_err=DEFAULT_TIME_TO_FULL_ERR, + kill_level=KILL_SIGINT): + """ decide whether to warn or raise error because disc is getting full + + called by DiscFillCheckerThread regularly; params estim_empty_* are result + of :py:func:`calc_time_until_zero`, so None means that disc state did not + change. + + Issues a :py:class:`DiscFullPreventionWarning` when space gets problematic + and raises an error or calls sys.exit when it is really critical. + + :param estim_empty_newest: the newest time estimate + :param estim_empty_smallest: the smallest of all non-negative estimates + :param estim_empty_mean: estim between earliest and newest estimate + :param int size/err_warn/err: thresholds, default to constants like + :py:data:`DEFAULT_DISC_SIZE_WARN` + :param kill_level: how to kill the running application: with sys.exit or + by raising a "regular" Exception subclass (default). + Brutal way or SIGINT (default) are also options. + Values should be one like :py:data:`KILL_EXCEPTION` + """ + + # err? + raise_err = False + if curr_state.available < size_err: + raise_err = True + if (estim_empty_smallest != None) and (estim_empty_smallest < time_err): + raise_err = True + + # what kind of err? + if raise_err: + print('Disc fill below tolerance or disc full soon:', file=stderr) + print('{0}, full in {1} s' + .format(curr_state, estim_empty_smallest), file=stderr) + if kill_level == KILL_EXCEPTION: + raise DiscFullPreventionError(curr_state, estim_empty_smallest) + elif kill_level == KILL_BRUTALLY_ONLY_LAST_RESORT: + print('Exiting now the brutal way', file=stderr) + brutal_exit_function_that_should_not_be_used(1) + elif kill_level == KILL_SYS_EXIT: + print('Exiting now', file=stderr) + sys_exit("Disc nearly full!") + else: # kill_level == KILL_SIGINT or unknown + print('Sending SIGINT', file=stderr) + kill(getpid(), signal.SIGINT) + + # warn? + if curr_state.available < size_warn: + warn(DiscFullPreventionWarning(curr_state, estim_empty_smallest), + stacklevel=3) + if (estim_empty_smallest != None) and (estim_empty_smallest < time_warn): + warn(DiscFullPreventionWarning(curr_state, estim_empty_smallest), + stacklevel=3) + + +def calc_time_until_zero(old_state, new_state): + """ calc time until value hits zero given (time1, value1), (time2, value2) + + returns time from now until empty in seconds + + delta_date = new_date - old_date + delta_free = new_free - old_free + free(t) = new_free + (t-new_t) * delta_free / delta_date + = 0 + <==> -new_free = (t-new_t) * delta_free / delta_date + <==> -new_free * delta_date / date_free = t - new_t + <==> diff_t := new_t - t = new_free * delta_date / delta_free + + compare to now: + diff_to_now = t + (-new_t + new_t) - now = (t - new_t) + (new_t - now) + = -diff_t - (now-new_t) = (new_t-now) - diff_t + + To avoid zero-division, returns None if delta_free == 0 + """ + old_date, old_free = old_state + new_date, new_free = new_state + if new_free == old_free: + return None # avoid zero-division + time_diff = new_free * (new_date - old_date).total_seconds() \ + / (new_free - old_free) + + # compare to now + return (new_date - dt.now()).total_seconds() - time_diff + + +METHOD_THREAD = 'thread' +METHOD_ALARM = 'alarm' + + +@contextmanager +def disc_fill_checked(method=METHOD_THREAD, *args, **kwargs): + """ run test function while watching disc space + + all args are forwarded to :py:class:`DiscFillChecker` constructor + + depending on method, will create a separate thread (default) to watch disc + or use SIGALRM. First one does not interfere with signals within tested + method, second can use a simple Exception to stop the test + """ + + if method == METHOD_THREAD: + checker = DiscFillCheckerThread(*args, **kwargs) + checker.start() + elif method == METHOD_ALARM: + # check that alarms are not used otherwise + old_handler = signal.getsignal(signal.SIGALRM) + if old_handler not in (signal.SIG_IGN, signal.SIG_DFL, None): + raise RuntimeError('Signal handler for SIGALRM set ' + '-- do not want to override!') + + # create checker + checker = DiscFillChecker(*args, **kwargs) + + # register checker as alarm handler + def alarm_signal_handler(signum ,frame): + print('todo: check signum, frame?') + checker.do_check() + if not checker.do_stop: + signal.alarm(checker.interval) # set alarm again + signal.signal(signal.SIGALRM, alarm_signal_handler) + + # set alarm + signal.alarm(checker.interval) + else: + raise ValueError('unknown checking method!') + + try: + # run decorated function + yield checker + finally: + # stop checker + checker.stop() + + # clean up + if method == METHOD_THREAD: + checker.join() + elif method == METHOD_ALARM: + signal.alarm(0) # cancel alarms + signal.signal(signal.SIGALRM, old_handler) + else: + raise NotImplementedError('implement cleanup for new method!') diff --git a/src/type_helpers.py b/src/type_helpers.py new file mode 100644 index 0000000..2972536 --- /dev/null +++ b/src/type_helpers.py @@ -0,0 +1,100 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" +Helpers for type checking and conversion, like isstr(x), is_file_obj(x) + +Provides abstraction from difference between PY2 and PY3 + +.. codeauthor:: Intra2net +""" + +from __future__ import print_function +import sys +from io import IOBase + +# determine python version +PY3 = sys.version_info.major == 3 +PY2 = sys.version_info.major == 2 + + +def isstr(var): + """ determines if the given var is a (regular/unicode/raw) string or not + + in python2, u'a' is not a subclass of str, so to get a True as result, you + have to test for basestring as superclass. + + In python3 that is no longer the case + + @returns True if the input is a regular string, a unicode or a raw string, + False for everything else (including byte literals like b'abc') + + For more complex py2/py3 compatibility issues, consider using six + (https://pythonhosted.org/six) + """ + + if PY3: + return isinstance(var, str) + else: + return isinstance(var, basestring) + + +def is_str_or_byte(var): + """ returns true for str, unicode and byte objects """ + if PY3: + return isinstance(var, (str, bytes)) + else: + return isinstance(var, basestring) + + +def is_unicode(var): + """ returns true for unicode strings + + py2: return True for type unicode but not type str + py3: return True for type str but not type bytes + """ + + if PY2: + return isinstance(var, unicode) + else: + return isinstance(var, str) + + +def is_file_obj(var): + """ determines whether given input is the result of 'open(file_name)' + + just checks whether given var is subclass of io.IOBase, which is also True + for 'file-like objects' like StringIO + """ + return isinstance(var, IOBase) + + +def main(): + """ Main function, called when running file as script + + just tells you what python version you are running (2 or 3) + """ + if PY3: + print('is python3') + else: + print('is python2') +# end: function main + + +if __name__ == '__main__': + main() diff --git a/template.py b/template.py deleted file mode 100644 index 0e01357..0000000 --- a/template.py +++ /dev/null @@ -1,100 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" -Module name: Summary of this module - -Some overview over important classes and functions and such should be specified -in this docstring, but please keep it brief! - - -.. seealso:: http://thomas-cokelaer.info/tutorials/sphinx/rest_syntax.html -.. todo:: Do not forget to correct the codeauthor and license text commented by - hashes above. -.. note:: You should note this -.. warning:: This is a warning - -.. codeauthor:: Intra2net -""" - -from __future__ import print_function - -THE_CONSTANT = None -""" some constant with docstring *AFTER* the constant. Use this format! """ - - -ANOTHER_CONSTANT = 1 #: short docstring in same line -- note the #: - -#: constant with docstring in line above it -- note the #: -THIRD_CONST = 3.333 - - -class TestClass: - """ - test class - - does nothing, really - """ - - def __init__(self, args): - """ constructor, gets its own paragraph in documentation - - :raises NotImplementedError: always - """ - raise NotImplementedError() - - def test_method(self, arg): - """ test method, does nothing - - for more doc string examples, see function :py:func:`send_message` - - :param arg: some argument - :return: Nothing - :raises NotImplementedError: always - """ - raise NotImplementedError() -# end: class TestClass - - -def send_message(sender, recipient, message_body, priority=1): - """ Send a message to a recipient - - Example for a docstring that uses lots of reST fields, taken from - http://sphinx-doc.org/domains.html - - Does not use :py:class:`TestClass` at all. Also has nothing to do with - the python module :py:mod:`logging.handlers` - - Note that it is completely irrelevant what :py:data:`THE_CONSTANT` or - :py:data:`ANOTHER_CONSTANT` or :py:data:`THIRD_CONSTANT` are set to. I just - wanted to insert a link to a constant somewhere and find out how to - document constants - - :param str sender: The person sending the message - :param str recipient: The recipient of the message - :param str message_body: The body of the message - :param priority: The priority of the message, can be a number 1-5 - :type priority: integer or None - :return: the message id - :rtype: int - :raises ValueError: if the message_body exceeds 160 characters - :raises TypeError: if the message_body is not a basestring - :raises NotImplementedError: always - """ - - raise NotImplementedError() diff --git a/test_helpers.py b/test_helpers.py deleted file mode 100644 index eebebc6..0000000 --- a/test_helpers.py +++ /dev/null @@ -1,367 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" Helpers for developping quick test scripts - -Creation motivated by fear of filling disc space during long-running stress -tests - -.. codeauthor:: Intra2net -""" - -from __future__ import print_function - -from contextlib import contextmanager -from threading import Thread -from time import sleep -from datetime import datetime as dt -from itertools import tee -from warnings import warn -from sys import stderr, exit as sys_exit -from os import getpid, kill, _exit as brutal_exit_function_that_should_not_be_used -import signal - -try: - from warnings import ResourceWarning - WARN_BASE_CLASS = ResourceWarning -except ImportError: - # only added in python 3.2 - WARN_BASE_CLASS = UserWarning - -from buffers import LogarithmicBuffer -from file_helpers import get_filesystem_fill_states, FilesystemFillState -from iter_helpers import pairwise - - -#: filesystems shown by df that usually do not correspond to something on disc -#: (except maybe swap) -NOT_REAL_FILESYSTEMS = 'none', 'shmfs', 'procfs', 'tmpfs', 'ramfs' - - -class DiscFullPreventionError(Exception): - """ Exception raised when disc space gets critical - - issued from function called by :py:class:`DiscFillCheckerThread` - usually only happens after issuing a :py:class:`DiscFullPreventionWarning` - """ - def __init__(self, state, time_estim): - super(DiscFullPreventionError, self).__init__( - 'Interrupting test to avoid full disc ({0}, full in {1}s)' - .format(state, "?" if time_estim==None else time_estim)) - self.state = state - self.time_estim = time_estim - - -class DiscFullPreventionWarning(WARN_BASE_CLASS): - """ Exception raised when disc space approaches problematic state - - issued from function called by :py:class:`DiscFillCheckerThread` - If disc fills up further, will be followed by a - :py:class:`DiscFullPreventionError` or sys.exit - """ - def __init__(self, state, time_estim): - super(DiscFullPreventionWarning, self).__init__( - 'Disc nearly full! Might soon abort test ({0}, ' - 'full in {1}s)' - .format(state, "?" if time_estim==None else time_estim)) - self.state = state - self.time_estim = time_estim - - -class DiscFillChecker: - """ checker for disc fill status """ - - def __init__(self, interval=10, decision_function=None): - - # set variables - self.interval = interval - if decision_function is None: - self.decision_function = default_disc_full_decision_function - else: - # check decision_function: - GIGABYTE = 2**30 - state = FilesystemFillState() - state.name = 'dummy' - state.size = 10 * GIGABYTE - state.used = 1 * GIGABYTE - state.available = 9 * GIGABYTE # asume this should be enough - state.capacity = 10 - state.mount_point = '/not/mounted' - decision_function(state, None, None, None) - - self.decision_function = decision_function - - # remember relevant disc stats at start - self._bufs = {} - for fs_state in get_filesystem_fill_states(): - self._internal_state_buffer(fs_state) - - def _internal_state_buffer(self, fs_state): - """ update internal stats buffer, returns all estims - - internal helper called from __init__ and run - """ - if fs_state.name in NOT_REAL_FILESYSTEMS: - return [] - - buf = None - try: - buf = self._bufs[fs_state.name] - except KeyError: - # new file system -- need to create a new buffer - buf = LogarithmicBuffer(5) - self._bufs[fs_state.name] = buf - - buf.add((dt.now(), float(fs_state.available))) - return buf.get_all() - - def do_check(self): - """ check disc fill state """ - - # loop over current disc fill state - for fs_state in get_filesystem_fill_states(): - # remember newest value with others for same file system - fill_states = self._internal_state_buffer(fs_state) - if not fill_states: - continue - - # estimate time until full (i.e. when available == 0) - times_until_empty = \ - [calc_time_until_zero(old_state, new_state) - for old_state, new_state in pairwise(fill_states)] - - # call user-defined function to decide - min_time = None - if any(time != None and time > 0 for time in times_until_empty): - min_time = min(time for time in times_until_empty - if time != None and time>0) - avg_time = calc_time_until_zero(fill_states[0], - fill_states[-1]) - self.decision_function(fs_state, - times_until_empty[-1], # newest - min_time, # smallest - avg_time) # average - - -class DiscFillCheckerThread(Thread, DiscFillChecker): - """ a thread that checks disc fill in regular intervals """ - - def __init__(self, *args, **kwargs): - """ creates a DiscFillCheckerThread - - :param int interval: time in seconds between checks of disc usage - :param decision_function: function that decides when to issue a warning - or an error. See - :py:func:`default_disc_full_decision_function` - as an example. Function should raise - :py:class:`DiscFullPreventionWarning` and - :py:class:`DiscFullPreventionError` - """ - Thread.__init__(self, name='discFillChck') - DiscFillChecker.__init__(self, *args, **kwargs) - - # make this thread a daemon - self.daemon = True # works since python 2.6 - - # init do_stop - self.do_stop = False - - def run(self): - print('checker thread running') - while not self.do_stop: - sleep(self.interval) - self.do_check() - - def stop(self): - self.do_stop = True - - -MEGABYTE = 2**20 -MINUTE = 60 - -#: default thresholds for :py:func:`default_disc_full_decision_function` -DEFAULT_DISC_SIZE_WARN = 10 * MEGABYTE -DEFAULT_DISC_SIZE_ERR = MEGABYTE -DEFAULT_TIME_TO_FULL_WARN = 10 * MINUTE -DEFAULT_TIME_TO_FULL_ERR = MINUTE - -#: levels at which to kill running tests: regular exception, easily caught -#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked! -KILL_EXCEPTION = "KILL_EXCEPTION" - -#: levels at which to kill running tests: exception that is not a subclass of -#: Exception to avoid "catch(Exception)". -#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked! -KILL_SYS_EXIT = "KILL_SYS_EXIT" - -#: levels at which to kill running tests: brutal way that does not allow any -#: default cleanup (os._exit). Last resort when all others do not work! -KILL_BRUTALLY_ONLY_LAST_RESORT = "KILL_BRUTALLY_ONLY_LAST_RESORT" - -#: levels at which to kill running tests: SIGINT -KILL_SIGINT = "KILL_SIGINT" - - -def default_disc_full_decision_function(curr_state, - estim_empty_newest, - estim_empty_smallest, - estim_empty_mean, - size_warn=DEFAULT_DISC_SIZE_WARN, - size_err=DEFAULT_DISC_SIZE_ERR, - time_warn=DEFAULT_TIME_TO_FULL_WARN, - time_err=DEFAULT_TIME_TO_FULL_ERR, - kill_level=KILL_SIGINT): - """ decide whether to warn or raise error because disc is getting full - - called by DiscFillCheckerThread regularly; params estim_empty_* are result - of :py:func:`calc_time_until_zero`, so None means that disc state did not - change. - - Issues a :py:class:`DiscFullPreventionWarning` when space gets problematic - and raises an error or calls sys.exit when it is really critical. - - :param estim_empty_newest: the newest time estimate - :param estim_empty_smallest: the smallest of all non-negative estimates - :param estim_empty_mean: estim between earliest and newest estimate - :param int size/err_warn/err: thresholds, default to constants like - :py:data:`DEFAULT_DISC_SIZE_WARN` - :param kill_level: how to kill the running application: with sys.exit or - by raising a "regular" Exception subclass (default). - Brutal way or SIGINT (default) are also options. - Values should be one like :py:data:`KILL_EXCEPTION` - """ - - # err? - raise_err = False - if curr_state.available < size_err: - raise_err = True - if (estim_empty_smallest != None) and (estim_empty_smallest < time_err): - raise_err = True - - # what kind of err? - if raise_err: - print('Disc fill below tolerance or disc full soon:', file=stderr) - print('{0}, full in {1} s' - .format(curr_state, estim_empty_smallest), file=stderr) - if kill_level == KILL_EXCEPTION: - raise DiscFullPreventionError(curr_state, estim_empty_smallest) - elif kill_level == KILL_BRUTALLY_ONLY_LAST_RESORT: - print('Exiting now the brutal way', file=stderr) - brutal_exit_function_that_should_not_be_used(1) - elif kill_level == KILL_SYS_EXIT: - print('Exiting now', file=stderr) - sys_exit("Disc nearly full!") - else: # kill_level == KILL_SIGINT or unknown - print('Sending SIGINT', file=stderr) - kill(getpid(), signal.SIGINT) - - # warn? - if curr_state.available < size_warn: - warn(DiscFullPreventionWarning(curr_state, estim_empty_smallest), - stacklevel=3) - if (estim_empty_smallest != None) and (estim_empty_smallest < time_warn): - warn(DiscFullPreventionWarning(curr_state, estim_empty_smallest), - stacklevel=3) - - -def calc_time_until_zero(old_state, new_state): - """ calc time until value hits zero given (time1, value1), (time2, value2) - - returns time from now until empty in seconds - - delta_date = new_date - old_date - delta_free = new_free - old_free - free(t) = new_free + (t-new_t) * delta_free / delta_date - = 0 - <==> -new_free = (t-new_t) * delta_free / delta_date - <==> -new_free * delta_date / date_free = t - new_t - <==> diff_t := new_t - t = new_free * delta_date / delta_free - - compare to now: - diff_to_now = t + (-new_t + new_t) - now = (t - new_t) + (new_t - now) - = -diff_t - (now-new_t) = (new_t-now) - diff_t - - To avoid zero-division, returns None if delta_free == 0 - """ - old_date, old_free = old_state - new_date, new_free = new_state - if new_free == old_free: - return None # avoid zero-division - time_diff = new_free * (new_date - old_date).total_seconds() \ - / (new_free - old_free) - - # compare to now - return (new_date - dt.now()).total_seconds() - time_diff - - -METHOD_THREAD = 'thread' -METHOD_ALARM = 'alarm' - - -@contextmanager -def disc_fill_checked(method=METHOD_THREAD, *args, **kwargs): - """ run test function while watching disc space - - all args are forwarded to :py:class:`DiscFillChecker` constructor - - depending on method, will create a separate thread (default) to watch disc - or use SIGALRM. First one does not interfere with signals within tested - method, second can use a simple Exception to stop the test - """ - - if method == METHOD_THREAD: - checker = DiscFillCheckerThread(*args, **kwargs) - checker.start() - elif method == METHOD_ALARM: - # check that alarms are not used otherwise - old_handler = signal.getsignal(signal.SIGALRM) - if old_handler not in (signal.SIG_IGN, signal.SIG_DFL, None): - raise RuntimeError('Signal handler for SIGALRM set ' - '-- do not want to override!') - - # create checker - checker = DiscFillChecker(*args, **kwargs) - - # register checker as alarm handler - def alarm_signal_handler(signum ,frame): - print('todo: check signum, frame?') - checker.do_check() - if not checker.do_stop: - signal.alarm(checker.interval) # set alarm again - signal.signal(signal.SIGALRM, alarm_signal_handler) - - # set alarm - signal.alarm(checker.interval) - else: - raise ValueError('unknown checking method!') - - try: - # run decorated function - yield checker - finally: - # stop checker - checker.stop() - - # clean up - if method == METHOD_THREAD: - checker.join() - elif method == METHOD_ALARM: - signal.alarm(0) # cancel alarms - signal.signal(signal.SIGALRM, old_handler) - else: - raise NotImplementedError('implement cleanup for new method!') diff --git a/type_helpers.py b/type_helpers.py deleted file mode 100644 index 2972536..0000000 --- a/type_helpers.py +++ /dev/null @@ -1,100 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" -Helpers for type checking and conversion, like isstr(x), is_file_obj(x) - -Provides abstraction from difference between PY2 and PY3 - -.. codeauthor:: Intra2net -""" - -from __future__ import print_function -import sys -from io import IOBase - -# determine python version -PY3 = sys.version_info.major == 3 -PY2 = sys.version_info.major == 2 - - -def isstr(var): - """ determines if the given var is a (regular/unicode/raw) string or not - - in python2, u'a' is not a subclass of str, so to get a True as result, you - have to test for basestring as superclass. - - In python3 that is no longer the case - - @returns True if the input is a regular string, a unicode or a raw string, - False for everything else (including byte literals like b'abc') - - For more complex py2/py3 compatibility issues, consider using six - (https://pythonhosted.org/six) - """ - - if PY3: - return isinstance(var, str) - else: - return isinstance(var, basestring) - - -def is_str_or_byte(var): - """ returns true for str, unicode and byte objects """ - if PY3: - return isinstance(var, (str, bytes)) - else: - return isinstance(var, basestring) - - -def is_unicode(var): - """ returns true for unicode strings - - py2: return True for type unicode but not type str - py3: return True for type str but not type bytes - """ - - if PY2: - return isinstance(var, unicode) - else: - return isinstance(var, str) - - -def is_file_obj(var): - """ determines whether given input is the result of 'open(file_name)' - - just checks whether given var is subclass of io.IOBase, which is also True - for 'file-like objects' like StringIO - """ - return isinstance(var, IOBase) - - -def main(): - """ Main function, called when running file as script - - just tells you what python version you are running (2 or 3) - """ - if PY3: - print('is python3') - else: - print('is python2') -# end: function main - - -if __name__ == '__main__': - main() -- 1.7.1