From: Christian Herdtweck Date: Thu, 14 Jan 2016 13:05:59 +0000 (+0100) Subject: disc checker thread works now in unittests X-Git-Tag: v1.2~66 X-Git-Url: http://developer.intra2net.com/git/?a=commitdiff_plain;h=5783a4cd8f3ceb21b95125169ba519268ce188be;p=pyi2ncommon disc checker thread works now in unittests added alternativ implementation using alarms (untested) --- diff --git a/test/test_helper_unittest.py b/test/test_helper_unittest.py index 0040224..0b0eeb6 100644 --- a/test/test_helper_unittest.py +++ b/test/test_helper_unittest.py @@ -125,7 +125,7 @@ class TestHelperTester(unittest.TestCase): # check err if time too low estim_empty_smallest = test_helpers.DEFAULT_TIME_TO_FULL_ERR / 2. - self.assertRaises(test_helpers.DiscFullPreventionError, + self.assertRaises(KeyboardInterrupt, f, state, estim_empty_newest, estim_empty_smallest, estim_empty_mean) # show the error message: @@ -145,11 +145,16 @@ class TestHelperTester(unittest.TestCase): state.available = int(test_helpers.DEFAULT_DISC_SIZE_ERR / 2.) state.used = state.size - state.available state.capacity = int(100. * state.used / state.size) - self.assertRaises(test_helpers.DiscFullPreventionError, + self.assertRaises(KeyboardInterrupt, f, state, estim_empty_newest, estim_empty_smallest, estim_empty_mean) # check other kill levels + self.assertRaises(test_helpers.DiscFullPreventionError, + f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean, + kill_level=test_helpers.KILL_EXCEPTION) + self.assertRaises(SystemExit, f, state, estim_empty_newest, estim_empty_smallest, estim_empty_mean, @@ -157,7 +162,7 @@ class TestHelperTester(unittest.TestCase): # this even kills unittest: #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean, - # kill_level=test_helpers.KILL_REALLY_MEAN_EXIT) + # kill_level=test_helpers.KILL_BRUTALLY_ONLY_LAST_RESORT) #self.fail('should have exited!') def test_thread(self): @@ -169,33 +174,70 @@ class TestHelperTester(unittest.TestCase): # since we do not want to fill up the disc, we incrementally decrease # the tolerance so even our hopefully half-empty filesystems will # trigger warnings / errors - actual_states = get_filesystem_fill_states() - min_free = min(fill_state.available for fill_state in actual_states) - print(min_free) + min_free = None + min_name = None + for fill_state in get_filesystem_fill_states(): + if fill_state.name in test_helpers.NOT_REAL_FILESYSTEMS: + continue + if (min_free is None) or (fill_state.available < min_free): + min_free = fill_state.available + min_name = fill_state.name + print('min free: {0} with {1}B'.format(min_name, min_free)) + + # set space tolerance such that no warn/err should show + size_warn = min_free/4; + size_err = min_free/2; + def decision_function(*args): + test_helpers.default_disc_full_decision_function( + *args, size_warn=size_warn, size_err=size_err) + self._internal_test_thread(decision_function) + + # set space tolerance such that show warn but no err + size_warn = min_free*2; + size_err = min_free/2; + def decision_function(*args): + test_helpers.default_disc_full_decision_function( + *args, size_warn=size_warn, size_err=size_err) + self.assertWarn(test_helpers.DiscFullPreventionWarning, None, + self._internal_test_thread, decision_function) + + # set space tolerance such that err should be raised + size_warn = min_free*4; + size_err = min_free*2; + def decision_function(*args): + test_helpers.default_disc_full_decision_function( + *args, size_warn=size_warn, size_err=size_err) + self.assertRaises(KeyboardInterrupt, + self._internal_test_thread, decision_function) + + def _internal_test_thread(self, decision_function): - sleep_time_checker = 0 + sleep_time_checker = 0.1 sleep_time_tester = 1 time_tolerance = 0.1 + main_thread_name = 'MainThread' + checker_thread_name = 'discFillChck' - # run something with default settings. Should give no trouble - decision_function = test_helpers.default_disc_full_decision_function - print('before: threads are {0}' - .format([thread.name for thread in threading.enumerate()])) + curr_threads = [thread.name for thread in threading.enumerate()] + self.assertEqual(curr_threads, [main_thread_name]) start_time = dt.now() - with test_helpers.disc_fill_checked(sleep_time_checker, - decision_function): - print('within: threads are {0}' - .format([thread.name for thread in threading.enumerate()])) + with test_helpers.disc_fill_checked(test_helpers.METHOD_THREAD, + sleep_time_checker, + decision_function) as checker: + curr_threads = [thread.name for thread in threading.enumerate()] self.nonsense_function(sleep_time_tester) time_taken = (dt.now() - start_time).total_seconds() - print('after: threads are {0}' - .format([thread.name for thread in threading.enumerate()])) + self.assertEqual(curr_threads, [main_thread_name, checker_thread_name]) + + # stop checker thread + checker.stop() + checker.join() + curr_threads = [thread.name for thread in threading.enumerate()] + self.assertEqual(curr_threads, [main_thread_name]) + self.assertTrue(time_taken >= sleep_time_tester) self.assertTrue(time_taken < sleep_time_tester + time_tolerance) - # run with thresholds set so we get warnings - # run with thresholds set so we get errors - def nonsense_function(self, sleep_time): """ function to run while checking disc fill state; just sleeps """ diff --git a/test_helpers.py b/test_helpers.py index 77c243e..7d7072f 100644 --- a/test_helpers.py +++ b/test_helpers.py @@ -11,13 +11,14 @@ from __future__ import print_function import contextlib from threading import Thread from time import sleep -from file_helpers import get_filesystem_fill_states +from file_helpers import get_filesystem_fill_states, FilesystemFillState from datetime import datetime as dt from buffers import LogarithmicBuffer from itertools import tee from warnings import warn -from sys import exit, stderr -from os import _exit as bad_exit_function_that_should_not_be_used +from sys import stderr, exit as sys_exit +from os import getpid, kill, _exit as brutal_exit_function_that_should_not_be_used +import signal try: from warnings import ResourceWarning @@ -27,6 +28,11 @@ except ImportError: WARN_BASE_CLASS = UserWarning +#: filesystems shown by df that usually do not correspond to something on disc +#: (except maybe swap) +NOT_REAL_FILESYSTEMS = 'none', 'shmfs', 'procfs', 'tmpfs' + + class DiscFullPreventionError(Exception): """ Exception raised when disc space gets critical @@ -57,30 +63,27 @@ class DiscFullPreventionWarning(WARN_BASE_CLASS): self.time_estim = time_estim -class DiscFillCheckerThread(Thread): - """ a thread that checks disc fill in regular intervals """ +class DiscFillChecker: + """ checker for disc fill status """ def __init__(self, interval=10, decision_function=None): - """ creates a DiscFillCheckerThread - - :param int interval: time in seconds between checks of disc usage - :param decision_function: function that decides when to issue a warning - or an error. See - :py:func:`default_disc_full_decision_function` - as an example. Function should raise - :py:class:`DiscFullPreventionWarning` and - :py:class:`DiscFullPreventionError` - """ - super(DiscFillCheckerThread, self).__init__(name='discFillChck') - - self.daemon = True # works since python 2.6 # set variables self.interval = interval if decision_function is None: self.decision_function = default_disc_full_decision_function else: - # need to check warn_func somehow (using introspection?) + # check decision_function: + GIGABYTE = 2**30 + state = FilesystemFillState() + state.name = 'dummy' + state.size = 10 * GIGABYTE + state.used = 1 * GIGABYTE + state.available = 9 * GIGABYTE # asume this should be enough + state.capacity = 10 + state.mount_point = '/not/mounted' + decision_function(state, None, None, None) + self.decision_function = decision_function # remember relevant disc stats at start @@ -93,8 +96,7 @@ class DiscFillCheckerThread(Thread): internal helper called from __init__ and run """ - if fs_state.name == 'tmpfs': - print('ignoring {0}'.format(fs_state)) + if fs_state.name in NOT_REAL_FILESYSTEMS: return [] buf = None @@ -108,34 +110,65 @@ class DiscFillCheckerThread(Thread): buf.add((dt.now(), float(fs_state.available))) return buf.get_all() + def do_check(self): + """ check disc fill state """ + + # loop over current disc fill state + for fs_state in get_filesystem_fill_states(): + # remember newest value with others for same file system + fill_states = self._internal_state_buffer(fs_state) + if not fill_states: + continue + + # estimate time until full (i.e. when available == 0) + times_until_empty = \ + [calc_time_until_zero(old_state, new_state) + for old_state, new_state in pairwise(fill_states)] + + # call user-defined function to decide + min_time = None + if any(time > 0 for time in times_until_empty): + min_time = min(time for time in times_until_empty + if time>0) + avg_time = calc_time_until_zero(fill_states[0], + fill_states[-1]) + self.decision_function(fs_state, + times_until_empty[-1], # newest + min_time, # smallest + avg_time) # average + + +class DiscFillCheckerThread(Thread, DiscFillChecker): + """ a thread that checks disc fill in regular intervals """ + + def __init__(self, *args, **kwargs): + """ creates a DiscFillCheckerThread + + :param int interval: time in seconds between checks of disc usage + :param decision_function: function that decides when to issue a warning + or an error. See + :py:func:`default_disc_full_decision_function` + as an example. Function should raise + :py:class:`DiscFullPreventionWarning` and + :py:class:`DiscFullPreventionError` + """ + Thread.__init__(self, name='discFillChck') + DiscFillChecker.__init__(self, *args, **kwargs) + + # make this thread a daemon + self.daemon = True # works since python 2.6 + + # init do_stop + self.do_stop = False + def run(self): - while True: + print('checker thread running') + while not self.do_stop: sleep(self.interval) - fs_states = get_filesystem_fill_states() - for fs_state in fs_states: - # ignore tmpfs since is in memory or swap, so fill up is not - # as bad - if fs_state.name == 'tmpfs': - continue - - fill_states = self._internal_state_buffer(fs_state) - - # estimate time until full (i.e. when available == 0) - times_until_empty = \ - [calc_time_until_zero(old_state, new_state) - for old_state, new_state in pairwise(fill_states)] - - # call user-defined function to decide - min_time = None - if any(time > 0 for time in times_until_empty): - min_time = min(time for time in times_until_empty - if time>0) - avg_time = calc_time_until_zero(fill_states[0], - fill_states[-1]) - self.decision_function(fs_state, - times_until_empty[-1], # newest - min_time, # smallest - avg_time) # average + self.do_check() + + def stop(self): + self.do_stop = True MEGABYTE = 2**20 @@ -147,11 +180,22 @@ DEFAULT_DISC_SIZE_ERR = MEGABYTE DEFAULT_TIME_TO_FULL_WARN = 10 * MINUTE DEFAULT_TIME_TO_FULL_ERR = MINUTE -#: levels at which to kill running tests: regular exception (subclass of -#: Exception), base exception (to avoid "catch(exception)") or sys.exit +#: levels at which to kill running tests: regular exception, easily caught +#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked! KILL_EXCEPTION = "KILL_EXCEPTION" + +#: levels at which to kill running tests: exception that is not a subclass of +#: Exception to avoid "catch(Exception)". +#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked! KILL_SYS_EXIT = "KILL_SYS_EXIT" -KILL_REALLY_MEAN_EXIT = "REALLY_MEAN_EXIT" + +#: levels at which to kill running tests: brutal way that does not allow any +#: default cleanup (os._exit). Last resort when all others do not work! +KILL_BRUTALLY_ONLY_LAST_RESORT = "KILL_BRUTALLY_ONLY_LAST_RESORT" + +#: levels at which to kill running tests: SIGINT +KILL_SIGINT = "KILL_SIGINT" + def default_disc_full_decision_function(curr_state, estim_empty_newest, @@ -161,7 +205,7 @@ def default_disc_full_decision_function(curr_state, size_err=DEFAULT_DISC_SIZE_ERR, time_warn=DEFAULT_TIME_TO_FULL_WARN, time_err=DEFAULT_TIME_TO_FULL_ERR, - kill_level=KILL_EXCEPTION): + kill_level=KILL_SIGINT): """ decide whether to warn or raise error because disc is getting full called by DiscFillCheckerThread regularly; params estim_empty_* are result @@ -178,9 +222,16 @@ def default_disc_full_decision_function(curr_state, :py:data:`DEFAULT_DISC_SIZE_WARN` :param kill_level: how to kill the running application: with sys.exit or by raising a "regular" Exception subclass (default). + Brutal way or SIGINT (default) are also options. Values should be one like :py:data:`KILL_EXCEPTION` """ + if curr_state.name == '/dev/mapper/ssd2-local': + print('checking ssd2: free is {0:.1f}MB, warn at {1:.1f}MB, err at {2:.1f}MB' + .format(float(curr_state.available) / MEGABYTE, + float(size_warn) / MEGABYTE, + float(size_err) / MEGABYTE)) + # err? raise_err = False if curr_state.available < size_err: @@ -190,20 +241,20 @@ def default_disc_full_decision_function(curr_state, # what kind of err? if raise_err: + print('Disc fill below tolerance or disc full soon:', file=stderr) + print('{0}, full in {1} s' + .format(curr_state, estim_empty_smallest), file=stderr) if kill_level == KILL_EXCEPTION: raise DiscFullPreventionError(curr_state, estim_empty_smallest) - elif kill_level == KILL_REALLY_MEAN_EXIT: - print('Disc fill below tolerance or disc full soon', file=stderr) - print('{0}, full in {1} s' - .format(curr_state, estim_empty_smallest), file=stderr) + elif kill_level == KILL_BRUTALLY_ONLY_LAST_RESORT: print('Exiting now the brutal way', file=stderr) - bad_exit_function_that_should_not_be_used(1) - else: # kill_level == SYS_EXIT or unknown - print('Disc fill below tolerance or disc full soon:', file=stderr) - print('{0}, full in {1} s' - .format(curr_state, estim_empty_smallest), file=stderr) + brutal_exit_function_that_should_not_be_used(1) + elif kill_level == KILL_SYS_EXIT: print('Exiting now', file=stderr) - exit("Disc nearly full!") + sys_exit("Disc nearly full!") + else: # kill_level == KILL_SIGINT or unknown + print('Sending SIGINT', file=stderr) + kill(getpid(), signal.SIGINT) # warn? if curr_state.available < size_warn: @@ -254,17 +305,59 @@ def pairwise(iterable): return zip(a, b) +METHOD_THREAD = 'thread' +METHOD_ALARM = 'alarm' + + @contextlib.contextmanager -def disc_fill_checked(*args, **kwargs): - """ run test function while separate thread watches disc space +def disc_fill_checked(method=METHOD_THREAD, *args, **kwargs): + """ run test function while watching disc space - all args are forwarded to :py:class:`DiscFillCheckerThread` constructor + all args are forwarded to :py:class:`DiscFillChecker` constructor + + depending on method, will create a separate thread (default) to watch disc + or use SIGALRM. First one does not interfere with signals within tested + method, second can use a simple Exception to stop the test """ - # todo: add args and forward - DiscFillCheckerThread(*args, **kwargs).start() + if method == METHOD_THREAD: + checker = DiscFillCheckerThread(*args, **kwargs) + checker.start() + elif method == METHOD_ALARM: + # check that alarms are not used otherwise + old_handler = signal.getsignal(signal.SIGALRM) + if old_handler not in (signal.SIG_IGN, signal.SIG_DFL, None): + raise RuntimeError('Signal handler for SIGALRM set ' + '-- do not want to override!') + + # create checker + checker = DiscFillChecker(*args, **kwargs) + + # register checker as alarm handler + def alarm_signal_handler(signum ,frame): + print('todo: check signum, frame?') + checker.do_check() + if not checker.do_stop: + signal.alarm(checker.interval) # set alarm again + signal.signal(signal.SIGALRM, alarm_signal_handler) + + # set alarm + signal.alarm(checker.interval) + else: + raise ValueError('unknown checking method!') try: - yield + # run decorated function + yield checker finally: - pass # wait for other to stop or so? or stop it? + # stop checker + checker.stop() + + # clean up + if method == METHOD_THREAD: + checker.join() + elif method == METHOD_ALARM: + signal.alarm(0) # cancel alarms + signal.signal(signal.SIGALRM, old_handler) + else: + raise NotImplementedError('implement cleanup for new method!')