From: Christian Herdtweck Date: Wed, 13 Jan 2016 16:14:13 +0000 (+0100) Subject: continue testing and correcting disc-usage checker; decision function and zero-estima... X-Git-Tag: v1.2~71 X-Git-Url: http://developer.intra2net.com/git/?a=commitdiff_plain;h=91aae1f362217204cbbbe26c11a1206744cf104b;p=pyi2ncommon continue testing and correcting disc-usage checker; decision function and zero-estimation correct --- diff --git a/test/test_helper_unittest.py b/test/test_helper_unittest.py new file mode 100644 index 0000000..cf2fc37 --- /dev/null +++ b/test/test_helper_unittest.py @@ -0,0 +1,180 @@ +""" test_helper_unittest.py: unit tests for test_helpers + +Tests classes and functions in test_helpers + +Should be able run from python2 and python3! + +For help see :py:mod:`unittest` + +.. codeauthor:: Christian Herdtweck, christian.herdtweck@intra2net.com +""" + +from __future__ import print_function + +import unittest +from datetime import timedelta, datetime as dt +from file_helpers import FilesystemFillState, get_filesystem_fill_states +import warnings + +import test_helpers + + +class TestHelperTester(unittest.TestCase): + + def assertNoWarn(self, func, *args, **kwargs): + """ check that calling function raises no warning + + use warnings.catch_warnings instead of self.assertWarn + for compatibility with python 2 + """ + + result = None + with warnings.catch_warnings(record=True) as warn_catcher: + result = func(*args, **kwargs) + + self.assertEqual(len(warn_catcher), 0) + + return result + + + def assertWarn(self, warn_type, message, func, *args, **kwargs): + """ check that calling function raises no warning + + use warnings.catch_warnings instead of self.assertWarn + for compatibility with python 2 + """ + + with warnings.catch_warnings(record=True) as warn_catcher: + func(*args, **kwargs) + + self.assertEqual(len(warn_catcher), 1) + if warn_type is not None: + self.assertTrue(issubclass(warn_catcher[-1].category, + warn_type)) + if message is not None: + self.assertEqual(warn_catcher[-1].message, message) + #print(warn_catcher[-1].message) + + + def test_zero_estimator(self): + """ test function :py:func:`test_helpers.calc_time_until_zero` """ + + # need to allow some tolerance since calc_time_until_zero uses now() + TOLERANCE = 0.1 + + start_time = dt.now() + time1 = start_time - timedelta(seconds=20) # 20s ago + time2 = start_time - timedelta(seconds=10) # 10s ago + + # test constant disc usage + self.assertEqual(test_helpers.calc_time_until_zero((time1, 1), + (time2, 1)), None) + + # test gaining disc space (1 byte per second, empty 30s ago) + estim = test_helpers.calc_time_until_zero((time1, 10), (time2, 20)) + self.assertTrue(estim < 0) + expect = ((time1-timedelta(seconds=10)) - dt.now()).total_seconds() + self.assertLess(abs(estim - expect), TOLERANCE) + + # test slowly losing disc space (1 byte per second, empty now) + estim = test_helpers.calc_time_until_zero((time1, 20), (time2, 10)) + expect = (start_time - dt.now()).total_seconds() + self.assertLess(abs(estim - expect), TOLERANCE) + + + def test_decision_function(self): + """ tests function default_disc_full_decision_function + + :py:func:`test_helpers.default_disc_full_decision_function` + """ + + f = test_helpers.default_disc_full_decision_function + + # create dummy state + GIGABYTE = 2**30 + state = FilesystemFillState() + state.name = 'dummy' + state.size = 10 * GIGABYTE + state.used = 9 * GIGABYTE + state.available = 1 * GIGABYTE + state.capacity = 90 + state.mount_point = '/not/mounted' + + HOUR = 60*60 + estim_empty_newest = None # currently not used + estim_empty_mean = None # currently not used + estim_empty_smallest = None # simulate no proper estimate + + # check no warning if no time estimate (and enough space) + self.assertNoWarn(f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean) + + # check no warning if enough time (and enough space) + estim_empty_smallest = HOUR # enough time + self.assertNoWarn(f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean) + + # check warn if time getting low + estim_empty_smallest = (test_helpers.DEFAULT_TIME_TO_FULL_WARN + + test_helpers.DEFAULT_TIME_TO_FULL_ERR) / 2. + self.assertWarn(test_helpers.DiscFullPreventionWarning, None, + f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean) + + # check err if time too low + estim_empty_smallest = test_helpers.DEFAULT_TIME_TO_FULL_ERR / 2. + self.assertRaises(test_helpers.DiscFullPreventionError, + f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean) + # show the error message: + #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean) + + # check warning if disc space is getting low + estim_empty_smallest = HOUR + state.available = int((test_helpers.DEFAULT_DISC_SIZE_WARN + + test_helpers.DEFAULT_DISC_SIZE_ERR ) / 2.) + state.used = state.size - state.available + state.capacity = int(100. * state.used / state.size) + self.assertWarn(test_helpers.DiscFullPreventionWarning, None, + f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean) + + # check error if disc space is too low + state.available = int(test_helpers.DEFAULT_DISC_SIZE_ERR / 2.) + state.used = state.size - state.available + state.capacity = int(100. * state.used / state.size) + self.assertRaises(test_helpers.DiscFullPreventionError, + f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean) + + # check other kill levels + self.assertRaises(SystemExit, + f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean, + kill_level=test_helpers.KILL_SYS_EXIT) + + # this even kills unittest: + #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean, + # kill_level=test_helpers.KILL_REALLY_MEAN_EXIT) + #self.fail('should have exited!') + + def test_thread(self): + """ test the DiscFillCheckerThread """ + + # since we do not want to fill up the disc, we incrementally decrease + # the tolerance so even our hopefully half-empty filesystems will + # trigger warnings / errors + actual_states = get_filesystem_fill_states() + min_free = min(fill_state.available for fill_state in actual_states) + print(min_free) + + sleep_time = 0 + + # run something with default settings. Should give no trouble + + # run with thresholds set so we get warnings + # run with thresholds set so we get errors + + +if __name__ == '__main__': + unittest.main() diff --git a/test_helpers.py b/test_helpers.py index fec0164..548071b 100644 --- a/test_helpers.py +++ b/test_helpers.py @@ -6,6 +6,8 @@ tests .. codeauthor:: Christian Herdtweck, christian.herdtweck@intra2net.com """ +from __future__ import print_function + import contextlib from threading import Thread from time import sleep @@ -13,11 +15,61 @@ from file_helpers import get_filesystem_fill_states from datetime import datetime as dt from buffers import LogarithmicBuffer from itertools import tee +from warnings import warn +from sys import exit, stderr +from os import _exit as bad_exit_function_that_should_not_be_used + +try: + from warnings import ResourceWarning + WARN_BASE_CLASS = ResourceWarning +except ImportError: + # only added in python 3.2 + WARN_BASE_CLASS = UserWarning + + +class DiscFullPreventionError(Exception): + """ Exception raised when disc space gets critical + + issued from function called by :py:class:`DiscFillCheckerThread` + usually only happens after issuing a :py:class:`DiscFullPreventionWarning` + """ + def __init__(self, state, time_estim): + super(DiscFullPreventionError, self).__init__( + 'Interrupting test to avoid full disc ({0}, full in {1} s)' + .format(state, time_estim)) + self.state = state + self.time_estim = time_estim + + +class DiscFullPreventionWarning(WARN_BASE_CLASS): + """ Exception raised when disc space approaches problematic state + + issued from function called by :py:class:`DiscFillCheckerThread` + If disc fills up further, will be followed by a + :py:class:`DiscFullPreventionError` or sys.exit + """ + def __init__(self, state, time_estim): + super(DiscFullPreventionWarning, self).__init__( + 'Disc nearly full! Might soon abort test ({0}, ' + 'full in {1} s)'.format(state, time_estim)) + self.state = state + self.time_estim = time_estim + class DiscFillCheckerThread(Thread): """ a thread that checks disc fill in regular intervals """ def __init__(self, interval=10, decision_function = None): + """ creates a DiscFillCheckerThread + + :param int interval: time in seconds between checks of disc usage + :param decision_function: function that decides when to issue a warning + or an error. See + :py:func:`default_disc_full_decision_function` + as an example. Function should raise + :py:class:`DiscFullPreventionWarning` and + :py:class:`DiscFullPreventionError` + """ super(DiscFillCheckerThread, self).__init__(name='discFillChck', daemon=True) # set variables @@ -58,7 +110,7 @@ class DiscFillCheckerThread(Thread): # estimate time until full (i.e. when available == 0) times_until_empty = \ - [calc_times_until_zero(old_state, new_state) + [calc_time_until_zero(old_state, new_state) for old_state, new_state in pairwise(fill_states)] # call user-defined function to decide @@ -66,31 +118,91 @@ class DiscFillCheckerThread(Thread): if any(times_until_empty > 0): min_time = min(time for time in times_until_empty if time>0) - avg_time = calc_times_until_zero(fill_states[0], + avg_time = calc_time_until_zero(fill_states[0], fill_states[-1]) - self.decision_function(stat, + self.decision_function(fs_state, times_until_empty[-1], # newest - min(times_until_empty), # smallest + min_time, # smallest avg_time) # average +MEGABYTE = 2**20 +MINUTE = 60 + +#: default thresholds for :py:func:`default_disc_full_decision_function` +DEFAULT_DISC_SIZE_WARN = 10 * MEGABYTE +DEFAULT_DISC_SIZE_ERR = MEGABYTE +DEFAULT_TIME_TO_FULL_WARN = 10 * MINUTE +DEFAULT_TIME_TO_FULL_ERR = MINUTE + +#: levels at which to kill running tests: regular exception (subclass of +#: Exception), base exception (to avoid "catch(exception)") or sys.exit +KILL_EXCEPTION = "KILL_EXCEPTION" +KILL_SYS_EXIT = "KILL_SYS_EXIT" +KILL_REALLY_MEAN_EXIT = "REALLY_MEAN_EXIT" + def default_disc_full_decision_function(curr_state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean): + estim_empty_mean, + size_warn=DEFAULT_DISC_SIZE_WARN, + size_err=DEFAULT_DISC_SIZE_ERR, + time_warn=DEFAULT_TIME_TO_FULL_WARN, + time_err=DEFAULT_TIME_TO_FULL_ERR, + kill_level=KILL_EXCEPTION): """ decide whether to warn or raise error because disc is getting full called by DiscFillCheckerThread regularly; params estim_empty_* are result - of :py:func:`calc_times_until_zero`, so None means that disc state did not + of :py:func:`calc_time_until_zero`, so None means that disc state did not change. + Issues a :py:class:`DiscFullPreventionWarning` when space gets problematic + and raises an error or calls sys.exit when it is really critical. + :param estim_empty_newest: the newest time estimate :param estim_empty_smallest: the smallest of all non-negative estimates :param estim_empty_mean: estim between earliest and newest estimate + :param int size/err_warn/err: thresholds, default to constants like + :py:data:`DEFAULT_DISC_SIZE_WARN` + :param kill_level: how to kill the running application: with sys.exit or + by raising a "regular" Exception subclass (default). + Values should be one like :py:data:`KILL_EXCEPTION` """ - pass -def calc_times_until_zero(old_state, new_state): + # err? + raise_err = False + if curr_state.available < size_err: + raise_err = True + if (estim_empty_smallest != None) and (estim_empty_smallest < time_err): + raise_err = True + + # what kind of err? + if raise_err: + if kill_level == KILL_EXCEPTION: + raise DiscFullPreventionError(curr_state, estim_empty_smallest) + elif kill_level == KILL_REALLY_MEAN_EXIT: + print('Disc fill below tolerance or disc full soon', file=stderr) + print('{0}, full in {1:.1f}s' + .format(curr_state, estim_empty_smallest), file=stderr) + print('Exiting now the brutal way', file=stderr) + bad_exit_function_that_should_not_be_used(1) + else: # kill_level == SYS_EXIT or unknown + print('Disc fill below tolerance or disc full soon:', file=stderr) + print('{0}, full in {1} s' + .format(curr_state, estim_empty_smallest), file=stderr) + print('Exiting now', file=stderr) + exit("Disc nearly full!") + + # warn? + if curr_state.available < size_warn: + warn(DiscFullPreventionWarning(curr_state, estim_empty_smallest), + stacklevel=3) + if (estim_empty_smallest != None) and (estim_empty_smallest < time_warn): + warn(DiscFullPreventionWarning(curr_state, estim_empty_smallest), + stacklevel=3) + + +def calc_time_until_zero(old_state, new_state): """ calc time until value hits zero given (time1, value1), (time2, value2) returns time from now until empty in seconds @@ -98,12 +210,14 @@ def calc_times_until_zero(old_state, new_state): delta_date = new_date - old_date delta_free = new_free - old_free free(t) = new_free + (t-new_t) * delta_free / delta_date - = 0 <==> - diff_t := new_t - t = new_free * delta_date / delta_free + = 0 + <==> -new_free = (t-new_t) * delta_free / delta_date + <==> -new_free * delta_date / date_free = t - new_t + <==> diff_t := new_t - t = new_free * delta_date / delta_free compare to now: diff_to_now = t + (-new_t + new_t) - now = (t - new_t) + (new_t - now) - = -diff_t - (now-new_t) + = -diff_t - (now-new_t) = (new_t-now) - diff_t To avoid zero-division, returns None if delta_free == 0 """ @@ -112,10 +226,10 @@ def calc_times_until_zero(old_state, new_state): if new_free == old_free: return None # avoid zero-division time_diff = new_free * (new_date - old_date).total_seconds() \ - / (old_free - new_free) + / (new_free - old_free) # compare to now - return (now - new_date).total_seconds() - time_diff + return (new_date - dt.now()).total_seconds() - time_diff def pairwise(iterable):