Remove unused test_helpers
authorChristian Herdtweck <christian.herdtweck@intra2net.com>
Tue, 2 Oct 2018 08:02:54 +0000 (10:02 +0200)
committerChristian Herdtweck <christian.herdtweck@intra2net.com>
Mon, 8 Oct 2018 07:29:02 +0000 (09:29 +0200)
src/test_helpers.py [deleted file]
test/disc_filler_test.py [deleted file]
test/test_test_helper.py [deleted file]

diff --git a/src/test_helpers.py b/src/test_helpers.py
deleted file mode 100644 (file)
index 4bc4b48..0000000
+++ /dev/null
@@ -1,505 +0,0 @@
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-""" Helpers for developing quick test scripts
-
-Creation motivated by fear of filling disc space during long-running stress
-tests
-
-.. todo:: Find out why fs not in REAL_FILESYSTEMS generate warnings so quickly
-          even if they are still empty
-"""
-
-from __future__ import print_function
-
-from contextlib import contextmanager
-from threading import Thread
-import time
-from datetime import datetime as dt
-from itertools import tee
-from warnings import warn
-from sys import stderr, platform, version_info, exit as sys_exit
-from os import getpid, kill, _exit as brutal_exit_function_DO_NOT_USE
-import signal
-
-try:
-    from warnings import ResourceWarning
-    WARN_BASE_CLASS = ResourceWarning
-except ImportError:
-    # only added in python 3.2
-    WARN_BASE_CLASS = UserWarning
-
-from .buffers import LogarithmicBuffer
-from .file_helpers import get_filesystem_fill_states, FilesystemFillState, \
-                          get_mount_info, get_fill_from_statvfs, \
-                          NOT_REAL_FILESYSTEMS_SPEC, size_str
-from .iter_helpers import pairwise
-
-
-class DiscFullPreventionError(Exception):
-    """ Exception raised when disc space gets critical
-
-    issued from function called by :py:class:`DiscFillCheckerThread`
-    usually only happens after issuing a :py:class:`DiscFullPreventionWarning`
-    """
-    def __init__(self, state, time_estim):
-        super(DiscFullPreventionError, self).__init__(
-            'Interrupting test to avoid full disc ({0}, full in {1}s)'
-            .format(state, "?" if time_estim==None else time_estim))
-        self.state = state
-        self.time_estim = time_estim
-
-
-class DiscFullPreventionWarning(WARN_BASE_CLASS):
-    """ Exception raised when disc space approaches problematic state
-
-    issued from function called by :py:class:`DiscFillCheckerThread`
-    If disc fills up further, will be followed by a
-    :py:class:`DiscFullPreventionError` or sys.exit
-    """
-    def __init__(self, state, time_estim):
-        super(DiscFullPreventionWarning, self).__init__(
-              'Disc nearly full! Might soon abort test ({0}, '
-              'full in {1}s)'
-              .format(state, "?" if time_estim==None else time_estim))
-        self.state = state
-        self.time_estim = time_estim
-
-
-class DiscFillChecker:
-    """ checker for disc fill status """
-
-    def __init__(self, interval=10, decision_function=None):
-
-        # set variables
-        self.interval = interval
-        if decision_function is None:
-            self.decision_function = default_disc_full_decision_function
-        else:
-            # check decision_function:
-            GIGABYTE = 2**30
-            state = FilesystemFillState()
-            state.name = 'dummy'
-            state.size = 10 * GIGABYTE
-            state.used = 1 * GIGABYTE
-            state.available = 9 * GIGABYTE   # asume this should be enough
-            state.capacity = 10
-            state.mount_point = '/not/mounted'
-            decision_function(state, None, None, None)
-
-            self.decision_function = decision_function
-
-        # remember relevant disc stats at start
-        self._bufs = {}
-        for fs_state in get_filesystem_fill_states():
-            self._internal_state_buffer(fs_state)
-
-    def _internal_state_buffer(self, fs_state):
-        """ update internal stats buffer, returns all estims
-
-        internal helper called from __init__ and run
-        """
-        if fs_state.name in NOT_REAL_FILESYSTEMS_SPEC:
-            return []
-        if fs_state.size == 0:
-            return []
-
-        buf = None
-        try:
-            buf = self._bufs[fs_state.name]
-        except KeyError:
-            # new file system -- need to create a new buffer
-            buf = LogarithmicBuffer(5)
-            self._bufs[fs_state.name] =  buf
-
-        buf.add((dt.now(), float(fs_state.available)))
-        return buf.get_all()
-
-    def do_check(self):
-        """ check disc fill state """
-
-        # loop over current disc fill state
-        for fs_state in get_filesystem_fill_states():
-            # remember newest value with others for same file system
-            fill_states = self._internal_state_buffer(fs_state)
-            if not fill_states:
-                continue
-
-            # estimate time until full (i.e. when available == 0)
-            times_until_empty = \
-                [calc_time_until_zero(old_state, new_state)
-                 for old_state, new_state in pairwise(fill_states)]
-
-            # call user-defined function to decide
-            min_time = None
-            if any(time != None and time > 0 for time in times_until_empty):
-                min_time = min(time for time in times_until_empty
-                               if time != None and time>0)
-            avg_time = calc_time_until_zero(fill_states[0],
-                                             fill_states[-1])
-            self.decision_function(fs_state,
-                                   times_until_empty[-1],     # newest
-                                   min_time,                  # smallest
-                                   avg_time)                  # average
-
-
-class DiscFillCheckerThread(Thread, DiscFillChecker):
-    """ a thread that checks disc fill in regular intervals """
-
-    def __init__(self, *args, **kwargs):
-        """ creates a DiscFillCheckerThread
-
-        :param int interval: time in seconds between checks of disc usage
-        :param decision_function: function that decides when to issue a warning
-                                 or an error. See
-                                 :py:func:`default_disc_full_decision_function`
-                                 as an example. Function should raise
-                                 :py:class:`DiscFullPreventionWarning` and
-                                 :py:class:`DiscFullPreventionError`
-        """
-        Thread.__init__(self, name='discFillChck')
-        DiscFillChecker.__init__(self, *args, **kwargs)
-
-        # make this thread a daemon
-        self.daemon = True   # works since python 2.6
-
-        # init do_stop
-        self.do_stop = False
-
-    def run(self):
-        print('checker thread running')
-        while not self.do_stop:
-            time.sleep(self.interval)
-            self.do_check()
-
-    def stop(self):
-        self.do_stop = True
-
-
-MEGABYTE = 2**20
-MINUTE = 60
-
-#: default thresholds for :py:func:`default_disc_full_decision_function`
-DEFAULT_DISC_SIZE_WARN = 10 * MEGABYTE
-DEFAULT_DISC_SIZE_ERR = MEGABYTE
-DEFAULT_TIME_TO_FULL_WARN = 10 * MINUTE
-DEFAULT_TIME_TO_FULL_ERR = MINUTE
-
-#: levels at which to kill running tests: regular exception, easily caught
-#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked!
-KILL_EXCEPTION = "KILL_EXCEPTION"
-
-#: levels at which to kill running tests: exception that is not a subclass of
-#: Exception to avoid "catch(Exception)".
-#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked!
-KILL_SYS_EXIT = "KILL_SYS_EXIT"
-
-#: levels at which to kill running tests: brutal way that does not allow any
-#: default cleanup (os._exit). Last resort when all others do not work!
-KILL_BRUTALLY_ONLY_LAST_RESORT = "KILL_BRUTALLY_ONLY_LAST_RESORT"
-
-#: levels at which to kill running tests: SIGINT
-KILL_SIGINT = "KILL_SIGINT"
-
-
-def default_disc_full_decision_function(curr_state,
-                                        estim_empty_newest,
-                                        estim_empty_smallest,
-                                        estim_empty_mean,
-                                        size_warn=DEFAULT_DISC_SIZE_WARN,
-                                        size_err=DEFAULT_DISC_SIZE_ERR,
-                                        time_warn=DEFAULT_TIME_TO_FULL_WARN,
-                                        time_err=DEFAULT_TIME_TO_FULL_ERR,
-                                        kill_level=KILL_SIGINT):
-    """ decide whether to warn or raise error because disc is getting full
-
-    called by DiscFillCheckerThread regularly; params estim_empty_* are result
-    of :py:func:`calc_time_until_zero`, so None means that disc state did not
-    change.
-
-    Issues a :py:class:`DiscFullPreventionWarning` when space gets problematic
-    and raises an error or calls sys.exit when it is really critical.
-
-    :param estim_empty_newest: the newest time estimate
-    :param estim_empty_smallest: the smallest of all non-negative estimates
-    :param estim_empty_mean: estim between earliest and newest estimate
-    :param int size/err_warn/err: thresholds, default to constants like
-                                  :py:data:`DEFAULT_DISC_SIZE_WARN`
-    :param kill_level: how to kill the running application: with sys.exit or
-                       by raising a "regular" Exception subclass (default).
-                       Brutal way or SIGINT (default) are also options.
-                       Values should be one like :py:data:`KILL_EXCEPTION`
-    """
-
-    # err?
-    raise_err = False
-    if curr_state.available < size_err:
-        raise_err = True
-    if (estim_empty_smallest != None) and (estim_empty_smallest < time_err):
-        raise_err = True
-
-    # what kind of err?
-    if raise_err:
-        print('Disc fill below tolerance or disc full soon:', file=stderr)
-        print('{0}, full in {1} s'
-              .format(curr_state, estim_empty_smallest), file=stderr)
-        if kill_level == KILL_EXCEPTION:
-            raise DiscFullPreventionError(curr_state, estim_empty_smallest)
-        elif kill_level == KILL_BRUTALLY_ONLY_LAST_RESORT:
-            print('Exiting now the brutal way', file=stderr)
-            brutal_exit_function_DO_NOT_USE(1)
-        elif kill_level == KILL_SYS_EXIT:
-            print('Exiting now', file=stderr)
-            sys_exit("Disc nearly full!")
-        else:   # kill_level == KILL_SIGINT or unknown
-            print('Sending SIGINT', file=stderr)
-            kill(getpid(), signal.SIGINT)
-
-    # warn?
-    if curr_state.available < size_warn:
-        warn(DiscFullPreventionWarning(curr_state, estim_empty_smallest),
-             stacklevel=3)
-    if (estim_empty_smallest != None) and (estim_empty_smallest < time_warn):
-        warn(DiscFullPreventionWarning(curr_state, estim_empty_smallest),
-             stacklevel=3)
-
-
-def calc_time_until_zero(old_state, new_state):
-    """ calc time until value hits zero given (time1, value1), (time2, value2)
-
-    returns time from now until empty in seconds
-
-    delta_date = new_date - old_date
-    delta_free = new_free - old_free
-    free(t) = new_free + (t-new_t) * delta_free / delta_date
-            = 0
-    <==>  -new_free = (t-new_t) * delta_free / delta_date
-    <==>  -new_free * delta_date / date_free = t - new_t
-    <==>  diff_t := new_t - t = new_free * delta_date / delta_free
-
-    compare to now:
-    diff_to_now = t + (-new_t + new_t) - now = (t - new_t) + (new_t - now)
-                = -diff_t - (now-new_t) = (new_t-now) - diff_t
-
-    To avoid zero-division, returns None if delta_free == 0
-    """
-    old_date, old_free = old_state
-    new_date, new_free = new_state
-    if new_free == old_free:
-        return None            # avoid zero-division
-    time_diff = new_free * (new_date - old_date).total_seconds() \
-                         / (new_free - old_free)
-
-    # compare to now
-    return (new_date - dt.now()).total_seconds() - time_diff
-
-
-METHOD_THREAD = 'thread'
-METHOD_ALARM = 'alarm'
-
-
-@contextmanager
-def disc_fill_checked(method=METHOD_THREAD, *args, **kwargs):
-    """ run test function while watching disc space
-
-    all args are forwarded to :py:class:`DiscFillChecker` constructor
-
-    depending on method, will create a separate thread (default) to watch disc
-    or use SIGALRM. First one does not interfere with signals within tested
-    method, second can use a simple Exception to stop the test
-    """
-
-    if method == METHOD_THREAD:
-        checker = DiscFillCheckerThread(*args, **kwargs)
-        checker.start()
-    elif method == METHOD_ALARM:
-        # check that alarms are not used otherwise
-        old_handler = signal.getsignal(signal.SIGALRM)
-        if old_handler not in (signal.SIG_IGN, signal.SIG_DFL, None):
-            raise RuntimeError('Signal handler for SIGALRM set '
-                               '-- do not want to override!')
-
-        # create checker
-        checker = DiscFillChecker(*args, **kwargs)
-
-        # register checker as alarm handler
-        def alarm_signal_handler(signum ,frame):
-            print('todo: check signum, frame?')
-            checker.do_check()
-            if not checker.do_stop:
-                signal.alarm(checker.interval)   # set alarm again
-        signal.signal(signal.SIGALRM, alarm_signal_handler)
-
-        # set alarm
-        signal.alarm(checker.interval)
-    else:
-        raise ValueError('unknown checking method!')
-
-    try:
-        # run decorated function
-        yield checker
-    finally:
-        # stop checker
-        checker.stop()
-
-        # clean up
-        if method == METHOD_THREAD:
-            checker.join()
-        elif method == METHOD_ALARM:
-            signal.alarm(0)   # cancel alarms
-            signal.signal(signal.SIGALRM, old_handler)
-        else:
-            raise NotImplementedError('implement cleanup for new method!')
-
-
-def watch_disc_fill(paths=None, interval=0.1):
-    """ watch disc fill state: show start, end and maxima
-
-    gets fill state of all filesystems, reports start and maxima until user
-    hits Ctrl-C; then prints final state and repeats max and exits
-
-    :param float interval: time to sleep between checks (seconds)
-    :param paths: iterable over paths (optional); if given just checks
-                  filesytems that contain them; if not given (default) will
-                  check all filesystems
-    """
-
-    if paths is None:
-        mounts = None
-    else:
-        mounts = tuple(set(get_mount_info(path) for path in paths))
-
-    def get_states():
-        if mounts is None:
-            for state in get_filesystem_fill_states():
-                if state.name in NOT_REAL_FILESYSTEMS_SPEC:
-                    continue
-                yield state
-        else:
-            for mount_point in mounts:
-                yield get_fill_from_statvfs(mount_point)
-
-    # start state
-    print('{0}: start states are:'.format(dt.now()))
-    first_fills = dict()
-    max_vals = dict()
-    for state in get_states():
-        print('{0}, {1} used'.format(state, size_str(state.used)))
-        max_vals[state.mount_point] = state.used
-        first_fills[state.mount_point] = state.used
-
-    try:
-        # loop until user presses Ctrl-C
-        print('checking every {0:.1f}s, press Ctrl-C to stop...'
-              .format(interval))
-        while True:
-            time.sleep(interval)
-            try:
-                for state in get_states():
-                    if state.used > max_vals[state.mount_point]:
-                        print('{0}: new max {1:,d}B for {2}'
-                              .format(dt.now(), state.used, state))
-                        max_vals[state.mount_point] = state.used
-            except KeyError:
-                # a new file system was added
-                for state in get_states():
-                    if state.mount_point not in max_vals:
-                        print('{0}: new filesystem {1}, {2:,d}B used'
-                              .format(dt.now(), state, state.used))
-                        max_vals[state.mount_point] = state.used
-                        first_fills[state.mount_point] = state.used
-    except KeyboardInterrupt:
-        # end function
-        print(' --> caught KeyboardInterrupt')
-        print('{0}: end states and maxima:'.format(dt.now()))
-        for state in get_states():
-            firstf = first_fills[state.mount_point]
-            maxf = max_vals[state.mount_point]
-            print('{0}, start/max/end fill: {1} / {2} ({3}) / {4} ({5})'
-                  .format(state, size_str(firstf), size_str(maxf),
-                          size_str(maxf-firstf, True), size_str(state.used),
-                          size_str(state.used-firstf, True)))
-
-
-def get_perf_counter():
-    """ return the best monotonic clock for performance measure
-
-    returned clocks are monotonic and measure "absoulte"/wall-clock time
-    differences (as opposed to time processor spends on the particular python
-    code, so sleep(1) should result in a perf counter diff of 1).
-
-    before py 3.3: return monotonic clock from linux system
-    after that: returns function :py:function:`time.perf_counter`
-
-    .. warn:: absolute values mean nothing, only differences between calls!
-
-    For simple single-command use cases: use ipython's %timeit magic command.
-
-    Usage:
-
-      from test_helpers import get_perf_counter
-      perf_counter = get_perf_counter()
-      start_val = perf_counter()
-      # ... do something
-      end_val = perf_counter
-
-    """
-
-    if version_info.major == 2:
-        if platform.startswith('linux'):
-            return monotonic_clock_py2()
-        else:
-            return time.time
-    elif version_info.major == 3 and version_info.minor < 3:
-        return time.clock
-    else:
-        return time.perf_counter
-
-
-def monotonic_clock_py2():
-    """ get a monotonic clock in py2 using ctypes and clock_gettime
-
-    copied from:
-    https://stackoverflow.com/questions/1205722/
-    how-do-i-get-monotonic-time-durations-in-python
-    """
-
-    import ctypes
-    from os import strerror
-    CLOCK_MONOTONIC_RAW = 4 # see <linux/time.h>
-
-    class timespec(ctypes.Structure):
-        _fields_ = [
-            ('tv_sec', ctypes.c_long),
-            ('tv_nsec', ctypes.c_long)
-        ]
-
-    librt = ctypes.CDLL('librt.so.1', use_errno=True)
-    clock_gettime = librt.clock_gettime
-    clock_gettime.argtypes = [ctypes.c_int, ctypes.POINTER(timespec)]
-
-    def monotonic_time():
-        t = timespec()
-        if clock_gettime(CLOCK_MONOTONIC_RAW , ctypes.pointer(t)) != 0:
-            errno_ = ctypes.get_errno()
-            raise OSError(errno_, strerror(errno_))
-        return t.tv_sec + t.tv_nsec * 1e-9
-
-    return monotonic_time
diff --git a/test/disc_filler_test.py b/test/disc_filler_test.py
deleted file mode 100755 (executable)
index 11783e9..0000000
+++ /dev/null
@@ -1,76 +0,0 @@
-#!/usr/bin/env python3
-
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-""" Test test_helpers.DiscFillChecker by actually writing lots of data to disc
-
-Indulge in the luxury of assuming we have python3 here ;-)
-
-*NOT* a unittest (unittest discover only finds tests in files test_*.py).
-"""
-
-from sys import stderr, argv as cmd_line_args
-from tempfile import NamedTemporaryFile
-from time import monotonic
-from os import fstatvfs
-
-from test_helpers import disc_fill_checked
-
-
-def main(test_dir):
-    """ Main function, called when running file as script """
-
-    with disc_fill_checked():
-        fill_disc(test_dir)
-
-
-def fill_disc(test_dir):
-    """ write data to file in given directory """
-
-    source = '/dev/urandom'
-    chunk_size = 4096
-    temp_file_prefix = 'pyi2n_disc_fill_test_'
-    print_interval = 1
-
-    with open(source, 'rb') as read_handle:
-        with NamedTemporaryFile(dir=test_dir, prefix=temp_file_prefix) \
-                as write_handle:
-            os_handle = write_handle.fileno()
-            print('copying data from {0} to {1}...'.format(source,
-                                                           write_handle.name))
-            print_fs_stats(os_handle)
-            last_print = monotonic()
-            while True:
-                write_handle.write(read_handle.read(chunk_size))
-                if monotonic() - last_print > print_interval:
-                    print_fs_stats(os_handle)
-                    last_print = monotonic()
-
-
-def print_fs_stats(os_handle):
-    print(fstatvfs(os_handle))
-
-
-if __name__ == '__main__':
-    if len(cmd_line_args) != 2:
-        print('Need to known which dir I am allowed to abuse', file=stderr)
-        exit(1)
-    main(cmd_line_args[1])
diff --git a/test/test_test_helper.py b/test/test_test_helper.py
deleted file mode 100644 (file)
index 38fa26b..0000000
+++ /dev/null
@@ -1,286 +0,0 @@
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-""" test_helper_unittest.py: unit tests for test_helpers
-
-Tests classes and functions in test_helpers
-
-Should be able run from python2 and python3!
-
-For help see :py:mod:`unittest`
-"""
-
-# TODO: remove print, use logging instead
-from __future__ import print_function
-
-from __future__ import absolute_import
-
-import unittest
-from datetime import timedelta, datetime as dt
-from src.file_helpers import FilesystemFillState, get_filesystem_fill_states
-import warnings
-from time import sleep
-import threading
-
-from src import test_helpers
-
-
-class TestHelperTester(unittest.TestCase):
-
-    def assertNoWarn(self, func, *args, **kwargs):
-        """ check that calling function raises no warning
-
-        use warnings.catch_warnings instead of self.assertWarn
-        for compatibility with python 2
-        """
-
-        result = None
-        with warnings.catch_warnings(record=True) as warn_catcher:
-            result = func(*args, **kwargs)
-
-            self.assertEqual(len(warn_catcher), 0)
-
-        return result
-
-
-    def assertWarn(self, warn_type, n_warns, message, func, *args, **kwargs):
-        """ check that calling function raises no warning
-
-        use warnings.catch_warnings instead of self.assertWarn
-        for compatibility with python 2
-        """
-
-        with warnings.catch_warnings(record=True) as warn_catcher:
-            func(*args, **kwargs)
-
-            if n_warns is None:
-                self.assertTrue(len(warn_catcher) > 0)
-            else:
-                self.assertEqual(len(warn_catcher), n_warns)
-
-            if warn_type is not None:
-                self.assertTrue(issubclass(warn_catcher[-1].category,
-                                           warn_type))
-            if message is not None:
-                self.assertEqual(warn_catcher[-1].message, message)
-            #print(warn_catcher[-1].message)
-
-
-    def test_zero_estimator(self):
-        """ test function :py:func:`test_helpers.calc_time_until_zero` """
-
-        # need to allow some tolerance since calc_time_until_zero uses now()
-        TOLERANCE = 0.1
-
-        start_time = dt.now()
-        time1 = start_time - timedelta(seconds=20)    # 20s ago
-        time2 = start_time - timedelta(seconds=10)    # 10s ago
-
-        # test constant disc usage
-        self.assertEqual(test_helpers.calc_time_until_zero((time1, 1),
-                                                           (time2, 1)), None)
-
-        # test gaining disc space (1 byte per second, empty 30s ago)
-        estim = test_helpers.calc_time_until_zero((time1, 10), (time2, 20))
-        self.assertTrue(estim < 0)
-        expect = ((time1-timedelta(seconds=10)) - dt.now()).total_seconds()
-        self.assertLess(abs(estim - expect), TOLERANCE)
-
-        # test slowly losing disc space (1 byte per second, empty now)
-        estim = test_helpers.calc_time_until_zero((time1, 20), (time2, 10))
-        expect = (start_time - dt.now()).total_seconds()
-        self.assertLess(abs(estim - expect), TOLERANCE)
-
-
-    def test_decision_function(self):
-        """ tests function default_disc_full_decision_function
-
-        :py:func:`test_helpers.default_disc_full_decision_function`
-        """
-
-        f = test_helpers.default_disc_full_decision_function
-
-        # create dummy state
-        GIGABYTE = 2**30
-        state = FilesystemFillState()
-        state.name = 'dummy'
-        state.size = 10 * GIGABYTE
-        state.used = 9 * GIGABYTE
-        state.available = 1 * GIGABYTE
-        state.capacity = 90
-        state.mount_point = '/not/mounted'
-
-        HOUR = 60*60
-        estim_empty_newest = None    # currently not used
-        estim_empty_mean = None      # currently not used
-        estim_empty_smallest = None  # simulate no proper estimate
-
-        # check no warning if no time estimate (and enough space)
-        self.assertNoWarn(f, state, estim_empty_newest, estim_empty_smallest,
-                          estim_empty_mean)
-
-        # check no warning if enough time (and enough space)
-        estim_empty_smallest = HOUR      # enough time
-        self.assertNoWarn(f, state, estim_empty_newest, estim_empty_smallest,
-                          estim_empty_mean)
-
-        # check warn if time getting low
-        estim_empty_smallest = (test_helpers.DEFAULT_TIME_TO_FULL_WARN +
-                                test_helpers.DEFAULT_TIME_TO_FULL_ERR) / 2.
-        self.assertWarn(test_helpers.DiscFullPreventionWarning, 1, None,
-                        f, state, estim_empty_newest, estim_empty_smallest,
-                        estim_empty_mean)
-
-        # check err if time too low
-        estim_empty_smallest = test_helpers.DEFAULT_TIME_TO_FULL_ERR / 2.
-        self.assertRaises(KeyboardInterrupt,
-                          f, state, estim_empty_newest, estim_empty_smallest,
-                          estim_empty_mean)
-        # show the error message:
-        #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean)
-
-        # check warning if disc space is getting low
-        estim_empty_smallest = HOUR
-        state.available = int((test_helpers.DEFAULT_DISC_SIZE_WARN +
-                               test_helpers.DEFAULT_DISC_SIZE_ERR ) / 2.)
-        state.used = state.size - state.available
-        state.capacity = int(100. * state.used / state.size)
-        self.assertWarn(test_helpers.DiscFullPreventionWarning, 1, None,
-                        f, state, estim_empty_newest, estim_empty_smallest,
-                        estim_empty_mean)
-
-        # check error if disc space is too low
-        state.available = int(test_helpers.DEFAULT_DISC_SIZE_ERR / 2.)
-        state.used = state.size - state.available
-        state.capacity = int(100. * state.used / state.size)
-        self.assertRaises(KeyboardInterrupt,
-                          f, state, estim_empty_newest, estim_empty_smallest,
-                          estim_empty_mean)
-
-        # check other kill levels
-        self.assertRaises(test_helpers.DiscFullPreventionError,
-                          f, state, estim_empty_newest, estim_empty_smallest,
-                          estim_empty_mean,
-                          kill_level=test_helpers.KILL_EXCEPTION)
-
-        self.assertRaises(SystemExit,
-                          f, state, estim_empty_newest, estim_empty_smallest,
-                          estim_empty_mean,
-                          kill_level=test_helpers.KILL_SYS_EXIT)
-
-        # this even kills unittest:
-        #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean,
-        #  kill_level=test_helpers.KILL_BRUTALLY_ONLY_LAST_RESORT)
-        #self.fail('should have exited!')
-
-    @unittest.skip('Works on its own but now when building-and-installing')
-    def test_thread(self):
-        """ test the DiscFillCheckerThread """
-
-        #threading.setprofile(self.threading_profile_func)
-        # somehow damages df output ... need to check!
-
-        # since we do not want to fill up the disc, we incrementally decrease
-        # the tolerance so even our hopefully half-empty filesystems will
-        # trigger warnings / errors
-        min_free = None
-        min_name = None
-        for fill_state in get_filesystem_fill_states():
-            if fill_state.size == 0:
-                continue
-            if (min_free is None) or (fill_state.available < min_free):
-                min_free = fill_state.available
-                min_name = fill_state.name
-        print('min free: {0} with {1}B'.format(min_name, min_free))
-
-        if min_free is None:
-            self.fail('Found no filesystem with size>0 --> cannot test')
-
-        # set space tolerance such that no warn/err should show
-        size_warn = min_free/4;
-        size_err = min_free/2;
-        def decision_function(*args):
-            test_helpers.default_disc_full_decision_function(
-                *args, size_warn=size_warn, size_err=size_err)
-        self._internal_test_thread(decision_function)
-
-        # set space tolerance such that show warn but no err
-        size_warn = min_free*2;
-        size_err = min_free/2;
-        def decision_function(*args):
-            test_helpers.default_disc_full_decision_function(
-                *args, size_warn=size_warn, size_err=size_err)
-        self.assertWarn(test_helpers.DiscFullPreventionWarning, None, None,
-                        self._internal_test_thread, decision_function)
-
-        # set space tolerance such that err should be raised
-        size_warn = min_free*4;
-        size_err = min_free*2;
-        def decision_function(*args):
-            test_helpers.default_disc_full_decision_function(
-                *args, size_warn=size_warn, size_err=size_err)
-        self.assertRaises(KeyboardInterrupt,
-                          self._internal_test_thread, decision_function)
-
-    def _internal_test_thread(self, decision_function):
-
-        sleep_time_checker = 0.1
-        sleep_time_tester = 1
-        time_tolerance = 0.1
-        main_thread_name = 'MainThread'
-        checker_thread_name = 'discFillChck'
-
-        curr_threads = [thread.name for thread in threading.enumerate()]
-        self.assertEqual(curr_threads, [main_thread_name])
-        start_time = dt.now()
-        with test_helpers.disc_fill_checked(test_helpers.METHOD_THREAD,
-                                            sleep_time_checker,
-                                            decision_function) as checker:
-            curr_threads = [thread.name for thread in threading.enumerate()]
-            self.nonsense_function(sleep_time_tester)
-        time_taken = (dt.now() - start_time).total_seconds()
-        self.assertEqual(curr_threads, [main_thread_name, checker_thread_name])
-
-        # stop checker thread
-        checker.stop()
-        checker.join()
-        curr_threads = [thread.name for thread in threading.enumerate()]
-        self.assertEqual(curr_threads, [main_thread_name])
-
-        self.assertTrue(time_taken >= sleep_time_tester)
-        self.assertTrue(time_taken < sleep_time_tester + time_tolerance)
-
-
-    def nonsense_function(self, sleep_time):
-        """ function to run while checking disc fill state; just sleeps
-
-        to make this test quicker could try to use profiling to count calls to
-        disc checker function and stop this after 2 or so
-        """
-        sleep(sleep_time)
-
-    def threading_profile_func(self, *args, **kwargs):
-        """ todo: set this as profiling function for checker thread, count
-        calls to disc checker function, trigger event or so """
-        print('profiling with args {0} and kwargs {1}'
-              .format([arg for arg in args if len(args) < 20], kwargs))
-
-if __name__ == '__main__':
-    unittest.main()