--- /dev/null
+""" test_helper_unittest.py: unit tests for test_helpers
+
+Tests classes and functions in test_helpers
+
+Should be able run from python2 and python3!
+
+For help see :py:mod:`unittest`
+
+.. codeauthor:: Christian Herdtweck, christian.herdtweck@intra2net.com
+"""
+
+from __future__ import print_function
+
+import unittest
+from datetime import timedelta, datetime as dt
+from file_helpers import FilesystemFillState, get_filesystem_fill_states
+import warnings
+
+import test_helpers
+
+
+class TestHelperTester(unittest.TestCase):
+
+ def assertNoWarn(self, func, *args, **kwargs):
+ """ check that calling function raises no warning
+
+ use warnings.catch_warnings instead of self.assertWarn
+ for compatibility with python 2
+ """
+
+ result = None
+ with warnings.catch_warnings(record=True) as warn_catcher:
+ result = func(*args, **kwargs)
+
+ self.assertEqual(len(warn_catcher), 0)
+
+ return result
+
+
+ def assertWarn(self, warn_type, message, func, *args, **kwargs):
+ """ check that calling function raises no warning
+
+ use warnings.catch_warnings instead of self.assertWarn
+ for compatibility with python 2
+ """
+
+ with warnings.catch_warnings(record=True) as warn_catcher:
+ func(*args, **kwargs)
+
+ self.assertEqual(len(warn_catcher), 1)
+ if warn_type is not None:
+ self.assertTrue(issubclass(warn_catcher[-1].category,
+ warn_type))
+ if message is not None:
+ self.assertEqual(warn_catcher[-1].message, message)
+ #print(warn_catcher[-1].message)
+
+
+ def test_zero_estimator(self):
+ """ test function :py:func:`test_helpers.calc_time_until_zero` """
+
+ # need to allow some tolerance since calc_time_until_zero uses now()
+ TOLERANCE = 0.1
+
+ start_time = dt.now()
+ time1 = start_time - timedelta(seconds=20) # 20s ago
+ time2 = start_time - timedelta(seconds=10) # 10s ago
+
+ # test constant disc usage
+ self.assertEqual(test_helpers.calc_time_until_zero((time1, 1),
+ (time2, 1)), None)
+
+ # test gaining disc space (1 byte per second, empty 30s ago)
+ estim = test_helpers.calc_time_until_zero((time1, 10), (time2, 20))
+ self.assertTrue(estim < 0)
+ expect = ((time1-timedelta(seconds=10)) - dt.now()).total_seconds()
+ self.assertLess(abs(estim - expect), TOLERANCE)
+
+ # test slowly losing disc space (1 byte per second, empty now)
+ estim = test_helpers.calc_time_until_zero((time1, 20), (time2, 10))
+ expect = (start_time - dt.now()).total_seconds()
+ self.assertLess(abs(estim - expect), TOLERANCE)
+
+
+ def test_decision_function(self):
+ """ tests function default_disc_full_decision_function
+
+ :py:func:`test_helpers.default_disc_full_decision_function`
+ """
+
+ f = test_helpers.default_disc_full_decision_function
+
+ # create dummy state
+ GIGABYTE = 2**30
+ state = FilesystemFillState()
+ state.name = 'dummy'
+ state.size = 10 * GIGABYTE
+ state.used = 9 * GIGABYTE
+ state.available = 1 * GIGABYTE
+ state.capacity = 90
+ state.mount_point = '/not/mounted'
+
+ HOUR = 60*60
+ estim_empty_newest = None # currently not used
+ estim_empty_mean = None # currently not used
+ estim_empty_smallest = None # simulate no proper estimate
+
+ # check no warning if no time estimate (and enough space)
+ self.assertNoWarn(f, state, estim_empty_newest, estim_empty_smallest,
+ estim_empty_mean)
+
+ # check no warning if enough time (and enough space)
+ estim_empty_smallest = HOUR # enough time
+ self.assertNoWarn(f, state, estim_empty_newest, estim_empty_smallest,
+ estim_empty_mean)
+
+ # check warn if time getting low
+ estim_empty_smallest = (test_helpers.DEFAULT_TIME_TO_FULL_WARN +
+ test_helpers.DEFAULT_TIME_TO_FULL_ERR) / 2.
+ self.assertWarn(test_helpers.DiscFullPreventionWarning, None,
+ f, state, estim_empty_newest, estim_empty_smallest,
+ estim_empty_mean)
+
+ # check err if time too low
+ estim_empty_smallest = test_helpers.DEFAULT_TIME_TO_FULL_ERR / 2.
+ self.assertRaises(test_helpers.DiscFullPreventionError,
+ f, state, estim_empty_newest, estim_empty_smallest,
+ estim_empty_mean)
+ # show the error message:
+ #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean)
+
+ # check warning if disc space is getting low
+ estim_empty_smallest = HOUR
+ state.available = int((test_helpers.DEFAULT_DISC_SIZE_WARN +
+ test_helpers.DEFAULT_DISC_SIZE_ERR ) / 2.)
+ state.used = state.size - state.available
+ state.capacity = int(100. * state.used / state.size)
+ self.assertWarn(test_helpers.DiscFullPreventionWarning, None,
+ f, state, estim_empty_newest, estim_empty_smallest,
+ estim_empty_mean)
+
+ # check error if disc space is too low
+ state.available = int(test_helpers.DEFAULT_DISC_SIZE_ERR / 2.)
+ state.used = state.size - state.available
+ state.capacity = int(100. * state.used / state.size)
+ self.assertRaises(test_helpers.DiscFullPreventionError,
+ f, state, estim_empty_newest, estim_empty_smallest,
+ estim_empty_mean)
+
+ # check other kill levels
+ self.assertRaises(SystemExit,
+ f, state, estim_empty_newest, estim_empty_smallest,
+ estim_empty_mean,
+ kill_level=test_helpers.KILL_SYS_EXIT)
+
+ # this even kills unittest:
+ #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean,
+ # kill_level=test_helpers.KILL_REALLY_MEAN_EXIT)
+ #self.fail('should have exited!')
+
+ def test_thread(self):
+ """ test the DiscFillCheckerThread """
+
+ # since we do not want to fill up the disc, we incrementally decrease
+ # the tolerance so even our hopefully half-empty filesystems will
+ # trigger warnings / errors
+ actual_states = get_filesystem_fill_states()
+ min_free = min(fill_state.available for fill_state in actual_states)
+ print(min_free)
+
+ sleep_time = 0
+
+ # run something with default settings. Should give no trouble
+
+ # run with thresholds set so we get warnings
+ # run with thresholds set so we get errors
+
+
+if __name__ == '__main__':
+ unittest.main()
.. codeauthor:: Christian Herdtweck, christian.herdtweck@intra2net.com
"""
+from __future__ import print_function
+
import contextlib
from threading import Thread
from time import sleep
from datetime import datetime as dt
from buffers import LogarithmicBuffer
from itertools import tee
+from warnings import warn
+from sys import exit, stderr
+from os import _exit as bad_exit_function_that_should_not_be_used
+
+try:
+ from warnings import ResourceWarning
+ WARN_BASE_CLASS = ResourceWarning
+except ImportError:
+ # only added in python 3.2
+ WARN_BASE_CLASS = UserWarning
+
+
+class DiscFullPreventionError(Exception):
+ """ Exception raised when disc space gets critical
+
+ issued from function called by :py:class:`DiscFillCheckerThread`
+ usually only happens after issuing a :py:class:`DiscFullPreventionWarning`
+ """
+ def __init__(self, state, time_estim):
+ super(DiscFullPreventionError, self).__init__(
+ 'Interrupting test to avoid full disc ({0}, full in {1} s)'
+ .format(state, time_estim))
+ self.state = state
+ self.time_estim = time_estim
+
+
+class DiscFullPreventionWarning(WARN_BASE_CLASS):
+ """ Exception raised when disc space approaches problematic state
+
+ issued from function called by :py:class:`DiscFillCheckerThread`
+ If disc fills up further, will be followed by a
+ :py:class:`DiscFullPreventionError` or sys.exit
+ """
+ def __init__(self, state, time_estim):
+ super(DiscFullPreventionWarning, self).__init__(
+ 'Disc nearly full! Might soon abort test ({0}, '
+ 'full in {1} s)'.format(state, time_estim))
+ self.state = state
+ self.time_estim = time_estim
+
class DiscFillCheckerThread(Thread):
""" a thread that checks disc fill in regular intervals """
def __init__(self, interval=10, decision_function = None):
+ """ creates a DiscFillCheckerThread
+
+ :param int interval: time in seconds between checks of disc usage
+ :param decision_function: function that decides when to issue a warning
+ or an error. See
+ :py:func:`default_disc_full_decision_function`
+ as an example. Function should raise
+ :py:class:`DiscFullPreventionWarning` and
+ :py:class:`DiscFullPreventionError`
+ """
super(DiscFillCheckerThread, self).__init__(name='discFillChck',
daemon=True)
# set variables
# estimate time until full (i.e. when available == 0)
times_until_empty = \
- [calc_times_until_zero(old_state, new_state)
+ [calc_time_until_zero(old_state, new_state)
for old_state, new_state in pairwise(fill_states)]
# call user-defined function to decide
if any(times_until_empty > 0):
min_time = min(time for time in times_until_empty
if time>0)
- avg_time = calc_times_until_zero(fill_states[0],
+ avg_time = calc_time_until_zero(fill_states[0],
fill_states[-1])
- self.decision_function(stat,
+ self.decision_function(fs_state,
times_until_empty[-1], # newest
- min(times_until_empty), # smallest
+ min_time, # smallest
avg_time) # average
+MEGABYTE = 2**20
+MINUTE = 60
+
+#: default thresholds for :py:func:`default_disc_full_decision_function`
+DEFAULT_DISC_SIZE_WARN = 10 * MEGABYTE
+DEFAULT_DISC_SIZE_ERR = MEGABYTE
+DEFAULT_TIME_TO_FULL_WARN = 10 * MINUTE
+DEFAULT_TIME_TO_FULL_ERR = MINUTE
+
+#: levels at which to kill running tests: regular exception (subclass of
+#: Exception), base exception (to avoid "catch(exception)") or sys.exit
+KILL_EXCEPTION = "KILL_EXCEPTION"
+KILL_SYS_EXIT = "KILL_SYS_EXIT"
+KILL_REALLY_MEAN_EXIT = "REALLY_MEAN_EXIT"
+
def default_disc_full_decision_function(curr_state,
estim_empty_newest,
estim_empty_smallest,
- estim_empty_mean):
+ estim_empty_mean,
+ size_warn=DEFAULT_DISC_SIZE_WARN,
+ size_err=DEFAULT_DISC_SIZE_ERR,
+ time_warn=DEFAULT_TIME_TO_FULL_WARN,
+ time_err=DEFAULT_TIME_TO_FULL_ERR,
+ kill_level=KILL_EXCEPTION):
""" decide whether to warn or raise error because disc is getting full
called by DiscFillCheckerThread regularly; params estim_empty_* are result
- of :py:func:`calc_times_until_zero`, so None means that disc state did not
+ of :py:func:`calc_time_until_zero`, so None means that disc state did not
change.
+ Issues a :py:class:`DiscFullPreventionWarning` when space gets problematic
+ and raises an error or calls sys.exit when it is really critical.
+
:param estim_empty_newest: the newest time estimate
:param estim_empty_smallest: the smallest of all non-negative estimates
:param estim_empty_mean: estim between earliest and newest estimate
+ :param int size/err_warn/err: thresholds, default to constants like
+ :py:data:`DEFAULT_DISC_SIZE_WARN`
+ :param kill_level: how to kill the running application: with sys.exit or
+ by raising a "regular" Exception subclass (default).
+ Values should be one like :py:data:`KILL_EXCEPTION`
"""
- pass
-def calc_times_until_zero(old_state, new_state):
+ # err?
+ raise_err = False
+ if curr_state.available < size_err:
+ raise_err = True
+ if (estim_empty_smallest != None) and (estim_empty_smallest < time_err):
+ raise_err = True
+
+ # what kind of err?
+ if raise_err:
+ if kill_level == KILL_EXCEPTION:
+ raise DiscFullPreventionError(curr_state, estim_empty_smallest)
+ elif kill_level == KILL_REALLY_MEAN_EXIT:
+ print('Disc fill below tolerance or disc full soon', file=stderr)
+ print('{0}, full in {1:.1f}s'
+ .format(curr_state, estim_empty_smallest), file=stderr)
+ print('Exiting now the brutal way', file=stderr)
+ bad_exit_function_that_should_not_be_used(1)
+ else: # kill_level == SYS_EXIT or unknown
+ print('Disc fill below tolerance or disc full soon:', file=stderr)
+ print('{0}, full in {1} s'
+ .format(curr_state, estim_empty_smallest), file=stderr)
+ print('Exiting now', file=stderr)
+ exit("Disc nearly full!")
+
+ # warn?
+ if curr_state.available < size_warn:
+ warn(DiscFullPreventionWarning(curr_state, estim_empty_smallest),
+ stacklevel=3)
+ if (estim_empty_smallest != None) and (estim_empty_smallest < time_warn):
+ warn(DiscFullPreventionWarning(curr_state, estim_empty_smallest),
+ stacklevel=3)
+
+
+def calc_time_until_zero(old_state, new_state):
""" calc time until value hits zero given (time1, value1), (time2, value2)
returns time from now until empty in seconds
delta_date = new_date - old_date
delta_free = new_free - old_free
free(t) = new_free + (t-new_t) * delta_free / delta_date
- = 0 <==>
- diff_t := new_t - t = new_free * delta_date / delta_free
+ = 0
+ <==> -new_free = (t-new_t) * delta_free / delta_date
+ <==> -new_free * delta_date / date_free = t - new_t
+ <==> diff_t := new_t - t = new_free * delta_date / delta_free
compare to now:
diff_to_now = t + (-new_t + new_t) - now = (t - new_t) + (new_t - now)
- = -diff_t - (now-new_t)
+ = -diff_t - (now-new_t) = (new_t-now) - diff_t
To avoid zero-division, returns None if delta_free == 0
"""
if new_free == old_free:
return None # avoid zero-division
time_diff = new_free * (new_date - old_date).total_seconds() \
- / (old_free - new_free)
+ / (new_free - old_free)
# compare to now
- return (now - new_date).total_seconds() - time_diff
+ return (new_date - dt.now()).total_seconds() - time_diff
def pairwise(iterable):