Remove deprecated log reader utility
authorChristian Herdtweck <christian.herdtweck@intra2net.com>
Mon, 8 Oct 2018 07:27:14 +0000 (09:27 +0200)
committerChristian Herdtweck <christian.herdtweck@intra2net.com>
Mon, 8 Oct 2018 07:29:02 +0000 (09:29 +0200)
Use log_read instead

src/follow.py [deleted file]
test/test_follow.py [deleted file]

diff --git a/src/follow.py b/src/follow.py
deleted file mode 100644 (file)
index 314dcf3..0000000
+++ /dev/null
@@ -1,638 +0,0 @@
-#!/usr/bin/env python
-
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-"""
-follow process output, log files and pipes using select and poll
-
-DEPRECATED
-(at least for files see log_read; may still be usefull for pipes/sockets)
-
-Main class is Follower which does the polling and selecting, it is best used
-in a with-statement as follows::
-
-    with follow('/var/log/messages') as flwr
-        for line in flwr:
-            do_something_with(line)
-
-This will read the given file and yield its contents line-by-line until the end
-of the file. It will then wait for new additions to the file and provide the
-new lines newly instantaneously
-
-Things to note:
-
-* all data must be line-based!
-* will only work on Linux (except for sockets maybe).
-* create in py2 but try to stay py3-compatible.
-
-[START: not implemented yet]
-
-If following a log file, a LogParser can be attached that auto-detects some of
-the log's structure (like date and time fields, log levels and sources) from
-its first few lines.. This can be used anlogously. Of course, you can also
-specify the log's structure (to be implemented before the auto-detect...)::
-
-    with follow_log('/var/log/messages') as log_flwr:
-        for content in log_flwr:
-            do_something_with(content.datetime, content.log_level, content.text)
-
-[END: not implemented yet]
-
-A Follower is an iterator, which means you can do lots of cool things with it,
-including (see also itertools package, itertool recipies, "Functional
-Programming Howto")::
-
-    # ignore source and description:
-    for _, _, text_line in my_follower:
-        do_something_with(line)
-
-    # enumerate:
-    for line_number, line in enumerate(my_follower)
-        do_something_with(line, line_number)
-
-    # combine with other iterator:
-    for line, other_data in zip(my_follwer, other_iter)
-        do_something_with(line, other_data)
-
-    # filter:
-    for line in my_follower if test(my_func):
-        do_something_with(line)
-
-    # tee:
-    iter1, iter2 = itertools.tee(my_follower)
-    --> can read from both (but each line is given to only one of them)
-
-    # for each element, peek at the next one to help do the right thing
-    for line, next_line in pairwise(my_follower):
-        do_something_with(line, peek_next_line=next_line)
-
-    # create new iterator or generator
-    for line in my_follwer:
-        some_result = do_something_with(line)
-        yield some_result
-
-NOT possible::
-
-  len(my_follower)
-  Follower(my_file, my_file) #  (twice the same)
-"""
-
-from __future__ import print_function
-from warnings import warn
-from select import poll, POLLIN, POLLOUT, POLLPRI, POLLHUP, POLLERR, POLLNVAL
-from subprocess import Popen, PIPE
-from socket import socket as Socket
-
-from .type_helpers import isstr
-
-# #############################################################################
-# CONSTANTS
-# #############################################################################
-DEFAULT_POLL_FLAGS = POLLIN | POLLPRI | POLLHUP | POLLERR
-
-DEFAULT_POLL_TIMEOUT = 10
-
-# #############################################################################
-# EXCEPTIONS
-# #############################################################################
-
-
-def flags_to_str(flags):
-    """ return short string representation for poll flags """
-    text_parts = []
-    if flags & POLLIN:
-        text_parts.append('IN')
-    if flags & POLLOUT:
-        text_parts.append('OUT')
-    if flags & POLLPRI:
-        text_parts.append('PRI')
-    if flags & POLLERR:
-        text_parts.append('ERR')
-    if flags & POLLHUP:
-        text_parts.append('HUP')
-    if flags & POLLNVAL:
-        text_parts.append('INVALID')
-    return ','.join(text_parts)
-
-
-class PollFlagsException(Exception):
-    """ exception raised if polling returned unexpected flags """
-    def __init__(self, flags, desc):
-        super(PollFlagsException, self).__init__(
-            'Unexpected flags from polling {0}: {1}'.format(
-                desc, flags_to_str(flags)))
-
-
-class PollHupException(Exception):
-    """ exception raised when polled source sends a SIGHUP """
-    def __init__(self, desc):
-        super(PollHupException, self).__init__(
-            'Received HUP from polling, {0} was probably killed!'.format(desc))
-
-
-class PollUnknownSourceException(Exception):
-    """ exception raised when polling returns unknown source """
-    def __init__(self, file_no):
-        super(PollUnknownSourceException, self).__init__(
-            'Unknown source returned from polling: file_no={0}!'.format(
-                file_no))
-
-
-class Follower(object):
-    """ uses select and poll to follow some set of pipes, files, sockets
-
-    relies on data being line-based!
-
-    will read as much data as possible and then wait for more
-
-    Iterator over lines!
-    """
-
-    # iterator over zip(sources, file_nos, descriptions)
-    _source_iter = None
-
-    # for polling:
-    _poller = None
-    _flags = None
-    _timeout = None
-
-    def __init__(self, *sources_and_descriptions, **other_args):
-        """ create a Follower for given sources and optional descriptions
-
-        Will guess if args are just sources or also descriptions of these.
-        All of these are possible:
-
-        * Follower(src).
-        * Follower(src, desc).
-        * Follower(src1, src2, src3).
-        * Follower(src1, desc1, src2, desc2, src3, desc3).
-        * Follower(src1, src2, desc2, src3).
-        * Follower(src, desc_ignored, desc_used)     # warn but accept.
-        * Follower(single_list_of_sources_and_descs).
-
-        Not possible:
-
-        * Follower().
-        * Follower(desc, src).
-
-        Descriptions must be strings, they identify the sources in generated
-        output and are used in error messages
-
-        Sources must be file handles, open pipes or open sockets (or anything
-        else that gives a reasonable fileno(), so help of the :py:mod:`select`
-        module)
-
-        Sources are not closed!
-
-        other_args can include flags and timeout
-        """
-
-        # take care about other_args
-        if 'flags' in other_args:
-            self._flags = other_args['flags']
-        else:
-            self._flags = DEFAULT_POLL_FLAGS
-        if 'timeout' in other_args:
-            self._timeout = other_args['timeout']
-        else:
-            self._timeout = DEFAULT_POLL_TIMEOUT
-
-        for key in other_args.keys():
-            if key not in ('flags', 'timeout'):
-                raise ValueError('argument not recognized: {0}'.format(key))
-
-        self._poller = poll()
-
-        sources = []
-        file_nos = []
-        descriptions = []
-
-        # take care of sources_and_descriptions
-        if not sources_and_descriptions:
-            raise ValueError('need at least one source!')
-        elif len(sources_and_descriptions) == 1 \
-                and isinstance(sources_and_descriptions[0], (list, tuple)):
-            sources_and_descriptions = sources_and_descriptions[0]
-
-        for arg in sources_and_descriptions:
-            if isstr(arg):  # is a description
-                if len(descriptions) == 0:
-                    raise ValueError(
-                        'first arg must be source, not a description!')
-                if descriptions[-1] is not None:
-                    warn('Overwriting description "{0}" with "{1}"'.format(
-                        descriptions[-1], arg))
-                descriptions[-1] = arg
-            else:                        # is a new source
-                sources.append(arg)
-                file_nos.append(arg.fileno())
-                descriptions.append(None)
-                self._poller.register(arg, self._flags)
-        # end: for all args
-
-        # need iterator over these 3 lists all the time
-        self._source_iter = list((src, fno, desc) for src, fno, desc in
-                                 zip(sources, file_nos, descriptions))
-    # end: Follower constructor
-
-    def next(self):
-        """ main function to poll next line from sources
-
-        returns (source, desc, line_stripped, flags)
-        """
-
-        while True:
-            result = self._poller.poll(self._timeout)
-            for file_no, flags in result:
-
-                # identify source
-                desc = -1
-                source = None
-                for curr_src, curr_no, curr_desc in self._source_iter:
-                    if curr_no == file_no:
-                        desc = curr_desc
-                        source = curr_src
-                        break
-                # end: loop over self._source_iter
-                if desc is -1:
-                    raise PollUnknownSourceException(file_no)
-
-                if not flags & self._flags:
-                    raise PollFlagsException(flags, desc)
-
-                if flags & POLLHUP:
-                    #raise PollHupException(desc)
-                    warn('received a hangup during polling for {0}'
-                         .format(desc))
-
-                if flags & POLLERR:
-                    warn('received an err during polling for {0}'
-                         .format(desc))
-
-                if flags & POLLNVAL:
-                    warn('poll replied "invalid request" err during polling '
-                         'for {0}'
-                         .format(desc))
-
-                # read line from source
-                line = source.read()  # was readline(), but try something else
-                                      # removed the rstrip() from return
-
-                if not line:
-                    continue
-                # if reach here, we have a new line of text
-
-                return source, desc, line, flags
-            # for poll results
-        # end: inf loop
-    # end: Follower._next_line
-
-    def __iter__(self):
-        """ makes this an iterator, called by iter(my_follower) """
-        return self
-
-    def __next__(self):
-        """ called by next(my_follower) """
-        return self.next()
-# end: class Follower
-
-
-# #############################################################################
-# CONTEXT MANAGEMENT
-# #############################################################################
-
-#: defualt args for Popen constructor in case a process is given as cmd
-DEFAULT_SUBPROCESS_ARGS = dict(bufsize=1, stdin=None, stdout=PIPE, stderr=PIPE)
-
-class FollowContextManager(object):
-    """ context for Follower objects, ensures everything is closed properly
-
-    opens and closes files and sockets; communicates and waits for Popen
-      process objects whose stdout and stderr pipes are followed
-    """
-
-    files = None
-    file_descs = None
-    file_handles = None
-
-    sockets = None
-    socket_descs = None
-
-    procs = None
-    proc_descs = None
-    proc_objs = None
-
-    def __init__(self, files=None, file_descs=None,
-                 sockets=None, socket_descs=None,
-                 procs=None, proc_descs=None,
-                 subprocess_args=None):
-        """ create a context manager for Follower
-
-        check args and that they match.
-        tries to guess good descs for files, sockets and procs that are not
-        given
-
-        :param files: list/tuple of, or single file handle or file name
-        :param file_descs: None or list/tuple of same length as files
-        :param sockets: list/tuple of, or single socket
-        :param socket_descs: None or list/tuple of same length as sockets
-        :param procs: list/tuple of, or single Popen object or command itself
-                      as str/list/tuple
-        :param proc_descs: None or list/tuple of same length as procs
-        :param dict subprocess_args: dict or args that are merged with
-                                     :py:data:`DEFAULT_SUBPROCESS_ARGS`
-
-        ..seealso:: could have tried to implement this as a nesting of many
-                    individual context managers, see :py:mod:`contextlib`
-        """
-        print('__init__')
-
-        # set files and file_descs and ensure that they matching lists
-        if files is None:
-            self.files = []
-        elif isstr(files):
-            self.files = [files, ]
-        elif isinstance(files, file):
-            self.files = [files, ]
-        else:
-            self.files = files
-        if file_descs is None:
-            temp_descs = [None for _ in self.files]
-        elif len(self.files) == len(file_descs):
-            temp_descs = file_descs
-        else:
-            raise ValueError('if given file descs, need desc for all files!')
-
-        # try to guess good file_desc values; ensure they are str
-        self.file_descs = []
-        for file_nr, (file, file_desc) in \
-                enumerate(zip(self.files, temp_descs)):
-            if isstr(file_desc):
-                self.file_descs.append(file_desc)
-                continue
-            elif file_desc is None:   # need to guess something
-                if isstr(file):
-                    self.file_descs.append(file)
-                else:
-                    self.file_descs.append('file{0}'.format(file_nr))
-            else:   # desc is neither str nor None
-                raise ValueError('file descs must be string or None!')
-
-        # set sockets and socket_descs and ensure that they matching lists
-        if sockets is None:
-            self.sockets = []
-        elif isinstance(sockets, Socket):
-            self.sockets = [sockets, ]
-        else:
-            self.sockets = sockets
-        if socket_descs is None:
-            temp_descs = [None for _ in self.sockets]
-        elif len(self.sockets) == len(socket_descs):
-            temp_descs = socket_descs
-        else:
-            raise ValueError('if given socket descs, '
-                             'need descs for all sockets!')
-
-        # try to guess good socket_desc values; ensure they are str
-        self.socket_descs = []
-        for file_nr, (socket, socket_desc) in \
-                enumerate(zip(self.sockets, temp_descs)):
-            if isstr(socket_desc):
-                self.socket_descs.append(socket_desc)
-            elif socket_desc is None:   # need to guess something
-                self.socket_descs.append('socket{0}'.format(socket_nr))
-            else:   # desc is neither str nor None
-                raise ValueError('socket descs must be string or None!')
-
-        # set procs and proc_descs and ensure they matching lists
-        if procs is None:
-            self.procs = []
-        elif isstr(procs):
-            self.procs = [procs, ]
-        elif isinstance(procs, Popen):
-            self.procs = [procs, ]
-        else:
-            self.procs = procs
-        if proc_descs is None:
-            temp_descs = [None for _ in self.procs]
-        elif len(proc_descs) == len(self.procs):
-            temp_descs = proc_descs
-        else:
-            raise ValueError('if given proc descs, need descs for all procs!')
-
-        # try to guess good proc_desc values; ensure they are str
-        self.proc_descs = []
-        for proc_nr, (proc, proc_desc) in \
-                enumerate(zip(self.procs, temp_descs)):
-            if isstr(proc_desc):
-                self.proc_descs.append(proc_desc)
-            elif proc_desc is None:   # need to guess something
-                if isstr(proc):
-                    self.proc_descs.append(proc)
-                elif isinstance(proc, (tuple, list)):
-                    self.proc_descs.append(' '.join(proc))
-                elif isinstance(proc, Popen):
-                    if isstr(proc.args):
-                        self.proc_descs.append(proc.args)
-                    else:
-                        self.proc_descs.append(' '.join(proc.args))
-                else:
-                    self.proc_descs.append('proc{0}'.format(proc_nr))
-            else:   # desc is neither str nor None
-                raise ValueError('proc descs must be string or None!')
-
-        self.subprocess_args = DEFAULT_SUBPROCESS_ARGS
-        if subprocess_args is not None:
-            self.subprocess_args.update(subprocess_args)
-
-    def __enter__(self):
-        """ called when entering run context
-
-        opens files, tries to create descs, and assembles args for Follower
-        """
-        print('__enter__')
-
-        args = []
-
-        # open files
-        self.file_handles = []
-        for file_arg, desc in zip(self.files, self.file_descs):
-            if isinstance(file_arg, str):
-                new_handle = open(file_arg)
-            else:       # assume file_arg is a file handle
-                new_handle = file_arg
-            self.file_handles.append(new_handle)
-            args.append(new_handle)
-            args.append(desc)
-        # end: for files and file_descs
-
-        for sock_number, (socket, desc) in \
-                enumerate(zip(self.sockets, self.socket_descs)):
-            args.append(socket)
-            args.append(desc)
-
-        self.proc_objs = []
-        for proc, desc in zip(self.procs, self.proc_descs):
-            if isstr(proc) or isinstance(proc, (list,tuple)):
-                proc = Popen(proc, **self.subprocess_args)
-                self.proc_objs.append(proc)
-            args.append(proc.stdout)
-            args.append(desc + '_out')
-            args.append(proc.stderr)
-            args.append(desc + '_err')
-
-        return Follower(args)
-
-    def __exit__(self, exc_type, exc_value, traceback):
-        """ called when leaving run context """
-        print('__exit__ with args {0}, {1}, {2}'.format(exc_type, exc_value,
-                                                        traceback))
-
-        # close files
-        for handle in self.file_handles:
-            try:
-                handle.close()
-            except Exception:
-                warn('ignoring exception exiting follower context manager!')
-        for proc in self.proc_objs:
-            try:
-                proc.kill()
-                rest_out, rest_err = proc.communicate()
-                if rest_out or rest_err:
-                    warn('Ignoring left-over output in proc')
-            except Exception:
-                warn('ignoring exception exiting follower context manager!')
-# end: class FollowContextManager
-
-
-def follow(*args, **kwargs):
-    """ creates a ContextManager for a Follower to be used in "with" statements
-
-    for example:
-
-    with follow('/var/log/messages') as log_flwr
-        for source, desc, line in log_flwr
-            do_something_with(line)
-
-    for specification of args see FollowContextManager constructor help
-    """
-    return FollowContextManager(*args, **kwargs)
-
-
-# #############################################################################
-# TESTING
-# (hard to create unittest -- would need separate threads to write to some
-#  file /pipe/socket so can test the interactivity...)
-# #############################################################################
-
-from datetime import date, datetime as dt, timedelta as td
-from subprocess import Popen, PIPE
-
-syslog_file = '/var/log/messages'
-time_diff_seconds = 60
-syslog_time_format = '%b %d %H:%M:%S'
-
-
-def test_syslog_line(n_lines, source, desc, line,
-                     today_str, start_time):
-    """ called by test functions for each line of syslog """
-
-    if n_lines % 1000 == 0:
-        print(('{0:6d} old lines, showing lines after {1}; '
-               'abort using Ctrl-C').format(n_lines, start_time))
-    if line[:6] != today_str:
-        return
-    try:
-        log_time = dt.strptime(line[:15], syslog_time_format)
-        log_time = log_time.replace(year=start_time.year)
-    except ValueError:
-        log_time = None
-
-    if log_time is None or log_time > start_time:
-        print('line {0} from "{1}", (orig from {2}): {3}'.format(
-            n_lines, desc, log_time, line))
-
-
-def test_follower_syslog():
-    """ test Follower on syslog file using PIPEs and an open file
-
-    show lines from the last 5 minutes
-    """
-
-    start_time = dt.now() - td(seconds=time_diff_seconds)
-    today_str = date.today().strftime(syslog_time_format)[:6]
-    if today_str[4] == '0':
-        today_str = today_str[:4] + ' ' + today_str[5:] # 'Oct 01' --> 'Oct  1'
-
-    # create process for 'tail -f syslog' --> 2 pipes (stdout + stderr)
-    proc = Popen(['tail', '-f', syslog_file], stdout=PIPE, stderr=PIPE)
-
-    # read from all three sources
-    with open(syslog_file, 'r') as file_handle:
-        flwr = Follower(file_handle, 'syslog file', proc.stdout, 'tail syslog',
-                        proc.stderr, 'tail stderr')
-        for n_lines, (source, desc, line, flags) in enumerate(flwr):
-            if flags:
-                print('received flags {0}'.format(flags_to_str(flags)))
-            test_syslog_line(n_lines, source, desc, line,
-                             today_str, start_time)
-
-
-def test_follower_context():
-    """ test FollowContextManager and follow() function """
-
-    today_str = date.today().strftime(syslog_time_format)[:6]
-    if today_str[4] == '0':
-        today_str = today_str[:4] + ' ' + today_str[5:] # 'Oct 01' --> 'Oct  1'
-    start_time = dt.now() - td(seconds=time_diff_seconds)
-    with follow(syslog_file) as flwr:
-        for n_lines, (source, desc, line, flags) in enumerate(flwr):
-            if flags:
-                print('received flags {0}'.format(flags_to_str(flags)))
-            test_syslog_line(n_lines, source, desc, line,
-                             today_str, start_time)
-
-def test_context_proc():
-    """ test FollowContextManager's ability to wrap proc args """
-
-    today_str = date.today().strftime(syslog_time_format)[:6]
-    if today_str[4] == '0':
-        today_str = today_str[:4] + ' ' + today_str[5:] # 'Oct 01' --> 'Oct  1'
-    start_time = dt.now() - td(seconds=time_diff_seconds)
-    with follow(procs=(['tail', '-f', syslog_file], )) as flwr:
-        for n_lines, (source, desc, line, flags) in enumerate(flwr):
-            if flags:
-                print('received flags {0}'.format(flags_to_str(flags)))
-            test_syslog_line(n_lines, source, desc, line,
-                             today_str, start_time)
-
-
-def test():
-    """ Main function, tests some of class's functionality """
-
-    #test_follower_syslog()
-    #test_follower_context()
-    test_context_proc()
-# end: function main
-
-
-if __name__ == '__main__':
-    test()
diff --git a/test/test_follow.py b/test/test_follow.py
deleted file mode 100644 (file)
index bcbd168..0000000
+++ /dev/null
@@ -1,149 +0,0 @@
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-""" Unittest for follow.py
-
-This is a little more involved since in order to test properly, need a second
-thread that writes to file/socket/stdout
-
-.. todo:: as in log_read_unittest: disable buffering in LogFileWriter
-
-.. warn:: DEPRECATED (at least for files see log_read; may still be useful for
-          pipes/sockets)
-"""
-
-from __future__ import absolute_import
-
-import unittest
-from tempfile import mkstemp
-import threading
-from src.follow import *
-from time import sleep
-import os
-import os.path
-from src.test_helpers import get_perf_counter
-
-# different counter needed on different python versions
-perf_counter = get_perf_counter()
-
-
-class LogFileWriter(threading.Thread):
-    """ thread that creates and writes to given file handle """
-
-    def __init__(self, file_name, text_pattern, n_writes=None,
-                 pause_time=0.1, do_encode=None):
-        """ creates thread, deamon is True
-        
-        if n_writes is None, will write indefinitely; else writes text_pattern
-        n_writes times, formatted with (counter, perf_counter)
-        """
-        super(LogFileWriter, self).__init__()
-        self.daemon = True
-        self.file_name = file_name
-        self.text_pattern = text_pattern
-        self.n_writes = n_writes
-        self.pause_time = pause_time
-        self.do_encode = do_encode
-
-    def run(self):
-        counter = 0
-        if self.do_encode:
-            mode = 'wb'
-        else:
-            mode = 'wt'
-
-        with open(self.file_name, mode) as file_handle:
-            while True:
-                if self.n_writes is not None and counter >= self.n_writes:
-                    break
-
-                if self.do_encode:
-                    file_handle.write(self.text_pattern
-                                           .format(counter, perf_counter())
-                                           .encode(self.do_encode))
-                else:
-                    file_handle.write(self.text_pattern
-                                           .format(counter, perf_counter()))
-                print('wrote {0}'.format(counter))
-                counter += 1
-                sleep(self.pause_time)
-
-
-@unittest.skip("Need to debug but deprecated anyway")
-class FollowTester(unittest.TestCase):
-    """ Unittest for follow.py """
-
-    def test_logfile(self):
-        """ create logfile, write to it, expect output """
-
-        text_pattern = '{0}:{1}\n'
-        n_texts = 10
-        pause_time = 1
-        encoding = 'ascii'
-
-        LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts,
-                      pause_time=pause_time, do_encode=encoding).start()
-        print('testing with log file {0}'.format(self.temp_file))
-        time_diffs = []
-
-        with open(self.temp_file) as read_handle:
-            follower = Follower(read_handle)
-            for counter, (source, desc, text, flags) in \
-                    enumerate(follower):
-                receive_time = perf_counter()
-                print('received text: "{0}"'.format(text))
-                index = text.index(':')
-                counter = int(text[:index].strip())
-                write_time = float(text[index+1:].strip())
-                time_diffs.append(receive_time - write_time)
-                if counter == n_texts:
-                    break
-
-
-    def setUp(self):
-        """ called before each test """
-        print('setup test')
-        temp_handle, temp_name = mkstemp()
-        os.close(temp_handle)
-        self.temp_file = temp_name
-        print('created temp file ' + self.temp_file)
-
-    def tearDown(self):
-        """ called after each test """
-        print('tear down test')
-        if os.path.isfile(self.temp_file):
-            print('delete temp file' + self.temp_file)
-            os.unlink(self.temp_file)
-
-    @classmethod
-    def setUpClass(clz):
-        """ called once before the first test """
-        print('setup test class')
-        clz.temp_file = None
-
-    @classmethod
-    def tearDownClass(clz):
-        """ called once after the last test """
-        print('tear down test class')
-
-
-
-if __name__ == '__main__':
-    unittest.main()