From e69ddd9aa654acb3a3e5f4689c7cd9afbc6f00f7 Mon Sep 17 00:00:00 2001 From: Christian Herdtweck Date: Tue, 2 Oct 2018 10:02:54 +0200 Subject: [PATCH] Remove unused test_helpers --- src/test_helpers.py | 505 ---------------------------------------------- test/disc_filler_test.py | 76 ------- test/test_test_helper.py | 286 -------------------------- 3 files changed, 0 insertions(+), 867 deletions(-) delete mode 100644 src/test_helpers.py delete mode 100755 test/disc_filler_test.py delete mode 100644 test/test_test_helper.py diff --git a/src/test_helpers.py b/src/test_helpers.py deleted file mode 100644 index 4bc4b48..0000000 --- a/src/test_helpers.py +++ /dev/null @@ -1,505 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. -# -# Copyright (c) 2016-2018 Intra2net AG - -""" Helpers for developing quick test scripts - -Creation motivated by fear of filling disc space during long-running stress -tests - -.. todo:: Find out why fs not in REAL_FILESYSTEMS generate warnings so quickly - even if they are still empty -""" - -from __future__ import print_function - -from contextlib import contextmanager -from threading import Thread -import time -from datetime import datetime as dt -from itertools import tee -from warnings import warn -from sys import stderr, platform, version_info, exit as sys_exit -from os import getpid, kill, _exit as brutal_exit_function_DO_NOT_USE -import signal - -try: - from warnings import ResourceWarning - WARN_BASE_CLASS = ResourceWarning -except ImportError: - # only added in python 3.2 - WARN_BASE_CLASS = UserWarning - -from .buffers import LogarithmicBuffer -from .file_helpers import get_filesystem_fill_states, FilesystemFillState, \ - get_mount_info, get_fill_from_statvfs, \ - NOT_REAL_FILESYSTEMS_SPEC, size_str -from .iter_helpers import pairwise - - -class DiscFullPreventionError(Exception): - """ Exception raised when disc space gets critical - - issued from function called by :py:class:`DiscFillCheckerThread` - usually only happens after issuing a :py:class:`DiscFullPreventionWarning` - """ - def __init__(self, state, time_estim): - super(DiscFullPreventionError, self).__init__( - 'Interrupting test to avoid full disc ({0}, full in {1}s)' - .format(state, "?" if time_estim==None else time_estim)) - self.state = state - self.time_estim = time_estim - - -class DiscFullPreventionWarning(WARN_BASE_CLASS): - """ Exception raised when disc space approaches problematic state - - issued from function called by :py:class:`DiscFillCheckerThread` - If disc fills up further, will be followed by a - :py:class:`DiscFullPreventionError` or sys.exit - """ - def __init__(self, state, time_estim): - super(DiscFullPreventionWarning, self).__init__( - 'Disc nearly full! Might soon abort test ({0}, ' - 'full in {1}s)' - .format(state, "?" if time_estim==None else time_estim)) - self.state = state - self.time_estim = time_estim - - -class DiscFillChecker: - """ checker for disc fill status """ - - def __init__(self, interval=10, decision_function=None): - - # set variables - self.interval = interval - if decision_function is None: - self.decision_function = default_disc_full_decision_function - else: - # check decision_function: - GIGABYTE = 2**30 - state = FilesystemFillState() - state.name = 'dummy' - state.size = 10 * GIGABYTE - state.used = 1 * GIGABYTE - state.available = 9 * GIGABYTE # asume this should be enough - state.capacity = 10 - state.mount_point = '/not/mounted' - decision_function(state, None, None, None) - - self.decision_function = decision_function - - # remember relevant disc stats at start - self._bufs = {} - for fs_state in get_filesystem_fill_states(): - self._internal_state_buffer(fs_state) - - def _internal_state_buffer(self, fs_state): - """ update internal stats buffer, returns all estims - - internal helper called from __init__ and run - """ - if fs_state.name in NOT_REAL_FILESYSTEMS_SPEC: - return [] - if fs_state.size == 0: - return [] - - buf = None - try: - buf = self._bufs[fs_state.name] - except KeyError: - # new file system -- need to create a new buffer - buf = LogarithmicBuffer(5) - self._bufs[fs_state.name] = buf - - buf.add((dt.now(), float(fs_state.available))) - return buf.get_all() - - def do_check(self): - """ check disc fill state """ - - # loop over current disc fill state - for fs_state in get_filesystem_fill_states(): - # remember newest value with others for same file system - fill_states = self._internal_state_buffer(fs_state) - if not fill_states: - continue - - # estimate time until full (i.e. when available == 0) - times_until_empty = \ - [calc_time_until_zero(old_state, new_state) - for old_state, new_state in pairwise(fill_states)] - - # call user-defined function to decide - min_time = None - if any(time != None and time > 0 for time in times_until_empty): - min_time = min(time for time in times_until_empty - if time != None and time>0) - avg_time = calc_time_until_zero(fill_states[0], - fill_states[-1]) - self.decision_function(fs_state, - times_until_empty[-1], # newest - min_time, # smallest - avg_time) # average - - -class DiscFillCheckerThread(Thread, DiscFillChecker): - """ a thread that checks disc fill in regular intervals """ - - def __init__(self, *args, **kwargs): - """ creates a DiscFillCheckerThread - - :param int interval: time in seconds between checks of disc usage - :param decision_function: function that decides when to issue a warning - or an error. See - :py:func:`default_disc_full_decision_function` - as an example. Function should raise - :py:class:`DiscFullPreventionWarning` and - :py:class:`DiscFullPreventionError` - """ - Thread.__init__(self, name='discFillChck') - DiscFillChecker.__init__(self, *args, **kwargs) - - # make this thread a daemon - self.daemon = True # works since python 2.6 - - # init do_stop - self.do_stop = False - - def run(self): - print('checker thread running') - while not self.do_stop: - time.sleep(self.interval) - self.do_check() - - def stop(self): - self.do_stop = True - - -MEGABYTE = 2**20 -MINUTE = 60 - -#: default thresholds for :py:func:`default_disc_full_decision_function` -DEFAULT_DISC_SIZE_WARN = 10 * MEGABYTE -DEFAULT_DISC_SIZE_ERR = MEGABYTE -DEFAULT_TIME_TO_FULL_WARN = 10 * MINUTE -DEFAULT_TIME_TO_FULL_ERR = MINUTE - -#: levels at which to kill running tests: regular exception, easily caught -#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked! -KILL_EXCEPTION = "KILL_EXCEPTION" - -#: levels at which to kill running tests: exception that is not a subclass of -#: Exception to avoid "catch(Exception)". -#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked! -KILL_SYS_EXIT = "KILL_SYS_EXIT" - -#: levels at which to kill running tests: brutal way that does not allow any -#: default cleanup (os._exit). Last resort when all others do not work! -KILL_BRUTALLY_ONLY_LAST_RESORT = "KILL_BRUTALLY_ONLY_LAST_RESORT" - -#: levels at which to kill running tests: SIGINT -KILL_SIGINT = "KILL_SIGINT" - - -def default_disc_full_decision_function(curr_state, - estim_empty_newest, - estim_empty_smallest, - estim_empty_mean, - size_warn=DEFAULT_DISC_SIZE_WARN, - size_err=DEFAULT_DISC_SIZE_ERR, - time_warn=DEFAULT_TIME_TO_FULL_WARN, - time_err=DEFAULT_TIME_TO_FULL_ERR, - kill_level=KILL_SIGINT): - """ decide whether to warn or raise error because disc is getting full - - called by DiscFillCheckerThread regularly; params estim_empty_* are result - of :py:func:`calc_time_until_zero`, so None means that disc state did not - change. - - Issues a :py:class:`DiscFullPreventionWarning` when space gets problematic - and raises an error or calls sys.exit when it is really critical. - - :param estim_empty_newest: the newest time estimate - :param estim_empty_smallest: the smallest of all non-negative estimates - :param estim_empty_mean: estim between earliest and newest estimate - :param int size/err_warn/err: thresholds, default to constants like - :py:data:`DEFAULT_DISC_SIZE_WARN` - :param kill_level: how to kill the running application: with sys.exit or - by raising a "regular" Exception subclass (default). - Brutal way or SIGINT (default) are also options. - Values should be one like :py:data:`KILL_EXCEPTION` - """ - - # err? - raise_err = False - if curr_state.available < size_err: - raise_err = True - if (estim_empty_smallest != None) and (estim_empty_smallest < time_err): - raise_err = True - - # what kind of err? - if raise_err: - print('Disc fill below tolerance or disc full soon:', file=stderr) - print('{0}, full in {1} s' - .format(curr_state, estim_empty_smallest), file=stderr) - if kill_level == KILL_EXCEPTION: - raise DiscFullPreventionError(curr_state, estim_empty_smallest) - elif kill_level == KILL_BRUTALLY_ONLY_LAST_RESORT: - print('Exiting now the brutal way', file=stderr) - brutal_exit_function_DO_NOT_USE(1) - elif kill_level == KILL_SYS_EXIT: - print('Exiting now', file=stderr) - sys_exit("Disc nearly full!") - else: # kill_level == KILL_SIGINT or unknown - print('Sending SIGINT', file=stderr) - kill(getpid(), signal.SIGINT) - - # warn? - if curr_state.available < size_warn: - warn(DiscFullPreventionWarning(curr_state, estim_empty_smallest), - stacklevel=3) - if (estim_empty_smallest != None) and (estim_empty_smallest < time_warn): - warn(DiscFullPreventionWarning(curr_state, estim_empty_smallest), - stacklevel=3) - - -def calc_time_until_zero(old_state, new_state): - """ calc time until value hits zero given (time1, value1), (time2, value2) - - returns time from now until empty in seconds - - delta_date = new_date - old_date - delta_free = new_free - old_free - free(t) = new_free + (t-new_t) * delta_free / delta_date - = 0 - <==> -new_free = (t-new_t) * delta_free / delta_date - <==> -new_free * delta_date / date_free = t - new_t - <==> diff_t := new_t - t = new_free * delta_date / delta_free - - compare to now: - diff_to_now = t + (-new_t + new_t) - now = (t - new_t) + (new_t - now) - = -diff_t - (now-new_t) = (new_t-now) - diff_t - - To avoid zero-division, returns None if delta_free == 0 - """ - old_date, old_free = old_state - new_date, new_free = new_state - if new_free == old_free: - return None # avoid zero-division - time_diff = new_free * (new_date - old_date).total_seconds() \ - / (new_free - old_free) - - # compare to now - return (new_date - dt.now()).total_seconds() - time_diff - - -METHOD_THREAD = 'thread' -METHOD_ALARM = 'alarm' - - -@contextmanager -def disc_fill_checked(method=METHOD_THREAD, *args, **kwargs): - """ run test function while watching disc space - - all args are forwarded to :py:class:`DiscFillChecker` constructor - - depending on method, will create a separate thread (default) to watch disc - or use SIGALRM. First one does not interfere with signals within tested - method, second can use a simple Exception to stop the test - """ - - if method == METHOD_THREAD: - checker = DiscFillCheckerThread(*args, **kwargs) - checker.start() - elif method == METHOD_ALARM: - # check that alarms are not used otherwise - old_handler = signal.getsignal(signal.SIGALRM) - if old_handler not in (signal.SIG_IGN, signal.SIG_DFL, None): - raise RuntimeError('Signal handler for SIGALRM set ' - '-- do not want to override!') - - # create checker - checker = DiscFillChecker(*args, **kwargs) - - # register checker as alarm handler - def alarm_signal_handler(signum ,frame): - print('todo: check signum, frame?') - checker.do_check() - if not checker.do_stop: - signal.alarm(checker.interval) # set alarm again - signal.signal(signal.SIGALRM, alarm_signal_handler) - - # set alarm - signal.alarm(checker.interval) - else: - raise ValueError('unknown checking method!') - - try: - # run decorated function - yield checker - finally: - # stop checker - checker.stop() - - # clean up - if method == METHOD_THREAD: - checker.join() - elif method == METHOD_ALARM: - signal.alarm(0) # cancel alarms - signal.signal(signal.SIGALRM, old_handler) - else: - raise NotImplementedError('implement cleanup for new method!') - - -def watch_disc_fill(paths=None, interval=0.1): - """ watch disc fill state: show start, end and maxima - - gets fill state of all filesystems, reports start and maxima until user - hits Ctrl-C; then prints final state and repeats max and exits - - :param float interval: time to sleep between checks (seconds) - :param paths: iterable over paths (optional); if given just checks - filesytems that contain them; if not given (default) will - check all filesystems - """ - - if paths is None: - mounts = None - else: - mounts = tuple(set(get_mount_info(path) for path in paths)) - - def get_states(): - if mounts is None: - for state in get_filesystem_fill_states(): - if state.name in NOT_REAL_FILESYSTEMS_SPEC: - continue - yield state - else: - for mount_point in mounts: - yield get_fill_from_statvfs(mount_point) - - # start state - print('{0}: start states are:'.format(dt.now())) - first_fills = dict() - max_vals = dict() - for state in get_states(): - print('{0}, {1} used'.format(state, size_str(state.used))) - max_vals[state.mount_point] = state.used - first_fills[state.mount_point] = state.used - - try: - # loop until user presses Ctrl-C - print('checking every {0:.1f}s, press Ctrl-C to stop...' - .format(interval)) - while True: - time.sleep(interval) - try: - for state in get_states(): - if state.used > max_vals[state.mount_point]: - print('{0}: new max {1:,d}B for {2}' - .format(dt.now(), state.used, state)) - max_vals[state.mount_point] = state.used - except KeyError: - # a new file system was added - for state in get_states(): - if state.mount_point not in max_vals: - print('{0}: new filesystem {1}, {2:,d}B used' - .format(dt.now(), state, state.used)) - max_vals[state.mount_point] = state.used - first_fills[state.mount_point] = state.used - except KeyboardInterrupt: - # end function - print(' --> caught KeyboardInterrupt') - print('{0}: end states and maxima:'.format(dt.now())) - for state in get_states(): - firstf = first_fills[state.mount_point] - maxf = max_vals[state.mount_point] - print('{0}, start/max/end fill: {1} / {2} ({3}) / {4} ({5})' - .format(state, size_str(firstf), size_str(maxf), - size_str(maxf-firstf, True), size_str(state.used), - size_str(state.used-firstf, True))) - - -def get_perf_counter(): - """ return the best monotonic clock for performance measure - - returned clocks are monotonic and measure "absoulte"/wall-clock time - differences (as opposed to time processor spends on the particular python - code, so sleep(1) should result in a perf counter diff of 1). - - before py 3.3: return monotonic clock from linux system - after that: returns function :py:function:`time.perf_counter` - - .. warn:: absolute values mean nothing, only differences between calls! - - For simple single-command use cases: use ipython's %timeit magic command. - - Usage: - - from test_helpers import get_perf_counter - perf_counter = get_perf_counter() - start_val = perf_counter() - # ... do something - end_val = perf_counter - - """ - - if version_info.major == 2: - if platform.startswith('linux'): - return monotonic_clock_py2() - else: - return time.time - elif version_info.major == 3 and version_info.minor < 3: - return time.clock - else: - return time.perf_counter - - -def monotonic_clock_py2(): - """ get a monotonic clock in py2 using ctypes and clock_gettime - - copied from: - https://stackoverflow.com/questions/1205722/ - how-do-i-get-monotonic-time-durations-in-python - """ - - import ctypes - from os import strerror - CLOCK_MONOTONIC_RAW = 4 # see - - class timespec(ctypes.Structure): - _fields_ = [ - ('tv_sec', ctypes.c_long), - ('tv_nsec', ctypes.c_long) - ] - - librt = ctypes.CDLL('librt.so.1', use_errno=True) - clock_gettime = librt.clock_gettime - clock_gettime.argtypes = [ctypes.c_int, ctypes.POINTER(timespec)] - - def monotonic_time(): - t = timespec() - if clock_gettime(CLOCK_MONOTONIC_RAW , ctypes.pointer(t)) != 0: - errno_ = ctypes.get_errno() - raise OSError(errno_, strerror(errno_)) - return t.tv_sec + t.tv_nsec * 1e-9 - - return monotonic_time diff --git a/test/disc_filler_test.py b/test/disc_filler_test.py deleted file mode 100755 index 11783e9..0000000 --- a/test/disc_filler_test.py +++ /dev/null @@ -1,76 +0,0 @@ -#!/usr/bin/env python3 - -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. -# -# Copyright (c) 2016-2018 Intra2net AG - -""" Test test_helpers.DiscFillChecker by actually writing lots of data to disc - -Indulge in the luxury of assuming we have python3 here ;-) - -*NOT* a unittest (unittest discover only finds tests in files test_*.py). -""" - -from sys import stderr, argv as cmd_line_args -from tempfile import NamedTemporaryFile -from time import monotonic -from os import fstatvfs - -from test_helpers import disc_fill_checked - - -def main(test_dir): - """ Main function, called when running file as script """ - - with disc_fill_checked(): - fill_disc(test_dir) - - -def fill_disc(test_dir): - """ write data to file in given directory """ - - source = '/dev/urandom' - chunk_size = 4096 - temp_file_prefix = 'pyi2n_disc_fill_test_' - print_interval = 1 - - with open(source, 'rb') as read_handle: - with NamedTemporaryFile(dir=test_dir, prefix=temp_file_prefix) \ - as write_handle: - os_handle = write_handle.fileno() - print('copying data from {0} to {1}...'.format(source, - write_handle.name)) - print_fs_stats(os_handle) - last_print = monotonic() - while True: - write_handle.write(read_handle.read(chunk_size)) - if monotonic() - last_print > print_interval: - print_fs_stats(os_handle) - last_print = monotonic() - - -def print_fs_stats(os_handle): - print(fstatvfs(os_handle)) - - -if __name__ == '__main__': - if len(cmd_line_args) != 2: - print('Need to known which dir I am allowed to abuse', file=stderr) - exit(1) - main(cmd_line_args[1]) diff --git a/test/test_test_helper.py b/test/test_test_helper.py deleted file mode 100644 index 38fa26b..0000000 --- a/test/test_test_helper.py +++ /dev/null @@ -1,286 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. -# -# Copyright (c) 2016-2018 Intra2net AG - -""" test_helper_unittest.py: unit tests for test_helpers - -Tests classes and functions in test_helpers - -Should be able run from python2 and python3! - -For help see :py:mod:`unittest` -""" - -# TODO: remove print, use logging instead -from __future__ import print_function - -from __future__ import absolute_import - -import unittest -from datetime import timedelta, datetime as dt -from src.file_helpers import FilesystemFillState, get_filesystem_fill_states -import warnings -from time import sleep -import threading - -from src import test_helpers - - -class TestHelperTester(unittest.TestCase): - - def assertNoWarn(self, func, *args, **kwargs): - """ check that calling function raises no warning - - use warnings.catch_warnings instead of self.assertWarn - for compatibility with python 2 - """ - - result = None - with warnings.catch_warnings(record=True) as warn_catcher: - result = func(*args, **kwargs) - - self.assertEqual(len(warn_catcher), 0) - - return result - - - def assertWarn(self, warn_type, n_warns, message, func, *args, **kwargs): - """ check that calling function raises no warning - - use warnings.catch_warnings instead of self.assertWarn - for compatibility with python 2 - """ - - with warnings.catch_warnings(record=True) as warn_catcher: - func(*args, **kwargs) - - if n_warns is None: - self.assertTrue(len(warn_catcher) > 0) - else: - self.assertEqual(len(warn_catcher), n_warns) - - if warn_type is not None: - self.assertTrue(issubclass(warn_catcher[-1].category, - warn_type)) - if message is not None: - self.assertEqual(warn_catcher[-1].message, message) - #print(warn_catcher[-1].message) - - - def test_zero_estimator(self): - """ test function :py:func:`test_helpers.calc_time_until_zero` """ - - # need to allow some tolerance since calc_time_until_zero uses now() - TOLERANCE = 0.1 - - start_time = dt.now() - time1 = start_time - timedelta(seconds=20) # 20s ago - time2 = start_time - timedelta(seconds=10) # 10s ago - - # test constant disc usage - self.assertEqual(test_helpers.calc_time_until_zero((time1, 1), - (time2, 1)), None) - - # test gaining disc space (1 byte per second, empty 30s ago) - estim = test_helpers.calc_time_until_zero((time1, 10), (time2, 20)) - self.assertTrue(estim < 0) - expect = ((time1-timedelta(seconds=10)) - dt.now()).total_seconds() - self.assertLess(abs(estim - expect), TOLERANCE) - - # test slowly losing disc space (1 byte per second, empty now) - estim = test_helpers.calc_time_until_zero((time1, 20), (time2, 10)) - expect = (start_time - dt.now()).total_seconds() - self.assertLess(abs(estim - expect), TOLERANCE) - - - def test_decision_function(self): - """ tests function default_disc_full_decision_function - - :py:func:`test_helpers.default_disc_full_decision_function` - """ - - f = test_helpers.default_disc_full_decision_function - - # create dummy state - GIGABYTE = 2**30 - state = FilesystemFillState() - state.name = 'dummy' - state.size = 10 * GIGABYTE - state.used = 9 * GIGABYTE - state.available = 1 * GIGABYTE - state.capacity = 90 - state.mount_point = '/not/mounted' - - HOUR = 60*60 - estim_empty_newest = None # currently not used - estim_empty_mean = None # currently not used - estim_empty_smallest = None # simulate no proper estimate - - # check no warning if no time estimate (and enough space) - self.assertNoWarn(f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean) - - # check no warning if enough time (and enough space) - estim_empty_smallest = HOUR # enough time - self.assertNoWarn(f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean) - - # check warn if time getting low - estim_empty_smallest = (test_helpers.DEFAULT_TIME_TO_FULL_WARN + - test_helpers.DEFAULT_TIME_TO_FULL_ERR) / 2. - self.assertWarn(test_helpers.DiscFullPreventionWarning, 1, None, - f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean) - - # check err if time too low - estim_empty_smallest = test_helpers.DEFAULT_TIME_TO_FULL_ERR / 2. - self.assertRaises(KeyboardInterrupt, - f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean) - # show the error message: - #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean) - - # check warning if disc space is getting low - estim_empty_smallest = HOUR - state.available = int((test_helpers.DEFAULT_DISC_SIZE_WARN + - test_helpers.DEFAULT_DISC_SIZE_ERR ) / 2.) - state.used = state.size - state.available - state.capacity = int(100. * state.used / state.size) - self.assertWarn(test_helpers.DiscFullPreventionWarning, 1, None, - f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean) - - # check error if disc space is too low - state.available = int(test_helpers.DEFAULT_DISC_SIZE_ERR / 2.) - state.used = state.size - state.available - state.capacity = int(100. * state.used / state.size) - self.assertRaises(KeyboardInterrupt, - f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean) - - # check other kill levels - self.assertRaises(test_helpers.DiscFullPreventionError, - f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean, - kill_level=test_helpers.KILL_EXCEPTION) - - self.assertRaises(SystemExit, - f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean, - kill_level=test_helpers.KILL_SYS_EXIT) - - # this even kills unittest: - #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean, - # kill_level=test_helpers.KILL_BRUTALLY_ONLY_LAST_RESORT) - #self.fail('should have exited!') - - @unittest.skip('Works on its own but now when building-and-installing') - def test_thread(self): - """ test the DiscFillCheckerThread """ - - #threading.setprofile(self.threading_profile_func) - # somehow damages df output ... need to check! - - # since we do not want to fill up the disc, we incrementally decrease - # the tolerance so even our hopefully half-empty filesystems will - # trigger warnings / errors - min_free = None - min_name = None - for fill_state in get_filesystem_fill_states(): - if fill_state.size == 0: - continue - if (min_free is None) or (fill_state.available < min_free): - min_free = fill_state.available - min_name = fill_state.name - print('min free: {0} with {1}B'.format(min_name, min_free)) - - if min_free is None: - self.fail('Found no filesystem with size>0 --> cannot test') - - # set space tolerance such that no warn/err should show - size_warn = min_free/4; - size_err = min_free/2; - def decision_function(*args): - test_helpers.default_disc_full_decision_function( - *args, size_warn=size_warn, size_err=size_err) - self._internal_test_thread(decision_function) - - # set space tolerance such that show warn but no err - size_warn = min_free*2; - size_err = min_free/2; - def decision_function(*args): - test_helpers.default_disc_full_decision_function( - *args, size_warn=size_warn, size_err=size_err) - self.assertWarn(test_helpers.DiscFullPreventionWarning, None, None, - self._internal_test_thread, decision_function) - - # set space tolerance such that err should be raised - size_warn = min_free*4; - size_err = min_free*2; - def decision_function(*args): - test_helpers.default_disc_full_decision_function( - *args, size_warn=size_warn, size_err=size_err) - self.assertRaises(KeyboardInterrupt, - self._internal_test_thread, decision_function) - - def _internal_test_thread(self, decision_function): - - sleep_time_checker = 0.1 - sleep_time_tester = 1 - time_tolerance = 0.1 - main_thread_name = 'MainThread' - checker_thread_name = 'discFillChck' - - curr_threads = [thread.name for thread in threading.enumerate()] - self.assertEqual(curr_threads, [main_thread_name]) - start_time = dt.now() - with test_helpers.disc_fill_checked(test_helpers.METHOD_THREAD, - sleep_time_checker, - decision_function) as checker: - curr_threads = [thread.name for thread in threading.enumerate()] - self.nonsense_function(sleep_time_tester) - time_taken = (dt.now() - start_time).total_seconds() - self.assertEqual(curr_threads, [main_thread_name, checker_thread_name]) - - # stop checker thread - checker.stop() - checker.join() - curr_threads = [thread.name for thread in threading.enumerate()] - self.assertEqual(curr_threads, [main_thread_name]) - - self.assertTrue(time_taken >= sleep_time_tester) - self.assertTrue(time_taken < sleep_time_tester + time_tolerance) - - - def nonsense_function(self, sleep_time): - """ function to run while checking disc fill state; just sleeps - - to make this test quicker could try to use profiling to count calls to - disc checker function and stop this after 2 or so - """ - sleep(sleep_time) - - def threading_profile_func(self, *args, **kwargs): - """ todo: set this as profiling function for checker thread, count - calls to disc checker function, trigger event or so """ - print('profiling with args {0} and kwargs {1}' - .format([arg for arg in args if len(args) < 20], kwargs)) - -if __name__ == '__main__': - unittest.main() -- 1.7.1