# check err if time too low
estim_empty_smallest = test_helpers.DEFAULT_TIME_TO_FULL_ERR / 2.
- self.assertRaises(test_helpers.DiscFullPreventionError,
+ self.assertRaises(KeyboardInterrupt,
f, state, estim_empty_newest, estim_empty_smallest,
estim_empty_mean)
# show the error message:
state.available = int(test_helpers.DEFAULT_DISC_SIZE_ERR / 2.)
state.used = state.size - state.available
state.capacity = int(100. * state.used / state.size)
- self.assertRaises(test_helpers.DiscFullPreventionError,
+ self.assertRaises(KeyboardInterrupt,
f, state, estim_empty_newest, estim_empty_smallest,
estim_empty_mean)
# check other kill levels
+ self.assertRaises(test_helpers.DiscFullPreventionError,
+ f, state, estim_empty_newest, estim_empty_smallest,
+ estim_empty_mean,
+ kill_level=test_helpers.KILL_EXCEPTION)
+
self.assertRaises(SystemExit,
f, state, estim_empty_newest, estim_empty_smallest,
estim_empty_mean,
# this even kills unittest:
#f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean,
- # kill_level=test_helpers.KILL_REALLY_MEAN_EXIT)
+ # kill_level=test_helpers.KILL_BRUTALLY_ONLY_LAST_RESORT)
#self.fail('should have exited!')
def test_thread(self):
# since we do not want to fill up the disc, we incrementally decrease
# the tolerance so even our hopefully half-empty filesystems will
# trigger warnings / errors
- actual_states = get_filesystem_fill_states()
- min_free = min(fill_state.available for fill_state in actual_states)
- print(min_free)
+ min_free = None
+ min_name = None
+ for fill_state in get_filesystem_fill_states():
+ if fill_state.name in test_helpers.NOT_REAL_FILESYSTEMS:
+ continue
+ if (min_free is None) or (fill_state.available < min_free):
+ min_free = fill_state.available
+ min_name = fill_state.name
+ print('min free: {0} with {1}B'.format(min_name, min_free))
+
+ # set space tolerance such that no warn/err should show
+ size_warn = min_free/4;
+ size_err = min_free/2;
+ def decision_function(*args):
+ test_helpers.default_disc_full_decision_function(
+ *args, size_warn=size_warn, size_err=size_err)
+ self._internal_test_thread(decision_function)
+
+ # set space tolerance such that show warn but no err
+ size_warn = min_free*2;
+ size_err = min_free/2;
+ def decision_function(*args):
+ test_helpers.default_disc_full_decision_function(
+ *args, size_warn=size_warn, size_err=size_err)
+ self.assertWarn(test_helpers.DiscFullPreventionWarning, None,
+ self._internal_test_thread, decision_function)
+
+ # set space tolerance such that err should be raised
+ size_warn = min_free*4;
+ size_err = min_free*2;
+ def decision_function(*args):
+ test_helpers.default_disc_full_decision_function(
+ *args, size_warn=size_warn, size_err=size_err)
+ self.assertRaises(KeyboardInterrupt,
+ self._internal_test_thread, decision_function)
+
+ def _internal_test_thread(self, decision_function):
- sleep_time_checker = 0
+ sleep_time_checker = 0.1
sleep_time_tester = 1
time_tolerance = 0.1
+ main_thread_name = 'MainThread'
+ checker_thread_name = 'discFillChck'
- # run something with default settings. Should give no trouble
- decision_function = test_helpers.default_disc_full_decision_function
- print('before: threads are {0}'
- .format([thread.name for thread in threading.enumerate()]))
+ curr_threads = [thread.name for thread in threading.enumerate()]
+ self.assertEqual(curr_threads, [main_thread_name])
start_time = dt.now()
- with test_helpers.disc_fill_checked(sleep_time_checker,
- decision_function):
- print('within: threads are {0}'
- .format([thread.name for thread in threading.enumerate()]))
+ with test_helpers.disc_fill_checked(test_helpers.METHOD_THREAD,
+ sleep_time_checker,
+ decision_function) as checker:
+ curr_threads = [thread.name for thread in threading.enumerate()]
self.nonsense_function(sleep_time_tester)
time_taken = (dt.now() - start_time).total_seconds()
- print('after: threads are {0}'
- .format([thread.name for thread in threading.enumerate()]))
+ self.assertEqual(curr_threads, [main_thread_name, checker_thread_name])
+
+ # stop checker thread
+ checker.stop()
+ checker.join()
+ curr_threads = [thread.name for thread in threading.enumerate()]
+ self.assertEqual(curr_threads, [main_thread_name])
+
self.assertTrue(time_taken >= sleep_time_tester)
self.assertTrue(time_taken < sleep_time_tester + time_tolerance)
- # run with thresholds set so we get warnings
- # run with thresholds set so we get errors
-
def nonsense_function(self, sleep_time):
""" function to run while checking disc fill state; just sleeps """
import contextlib
from threading import Thread
from time import sleep
-from file_helpers import get_filesystem_fill_states
+from file_helpers import get_filesystem_fill_states, FilesystemFillState
from datetime import datetime as dt
from buffers import LogarithmicBuffer
from itertools import tee
from warnings import warn
-from sys import exit, stderr
-from os import _exit as bad_exit_function_that_should_not_be_used
+from sys import stderr, exit as sys_exit
+from os import getpid, kill, _exit as brutal_exit_function_that_should_not_be_used
+import signal
try:
from warnings import ResourceWarning
WARN_BASE_CLASS = UserWarning
+#: filesystems shown by df that usually do not correspond to something on disc
+#: (except maybe swap)
+NOT_REAL_FILESYSTEMS = 'none', 'shmfs', 'procfs', 'tmpfs'
+
+
class DiscFullPreventionError(Exception):
""" Exception raised when disc space gets critical
self.time_estim = time_estim
-class DiscFillCheckerThread(Thread):
- """ a thread that checks disc fill in regular intervals """
+class DiscFillChecker:
+ """ checker for disc fill status """
def __init__(self, interval=10, decision_function=None):
- """ creates a DiscFillCheckerThread
-
- :param int interval: time in seconds between checks of disc usage
- :param decision_function: function that decides when to issue a warning
- or an error. See
- :py:func:`default_disc_full_decision_function`
- as an example. Function should raise
- :py:class:`DiscFullPreventionWarning` and
- :py:class:`DiscFullPreventionError`
- """
- super(DiscFillCheckerThread, self).__init__(name='discFillChck')
-
- self.daemon = True # works since python 2.6
# set variables
self.interval = interval
if decision_function is None:
self.decision_function = default_disc_full_decision_function
else:
- # need to check warn_func somehow (using introspection?)
+ # check decision_function:
+ GIGABYTE = 2**30
+ state = FilesystemFillState()
+ state.name = 'dummy'
+ state.size = 10 * GIGABYTE
+ state.used = 1 * GIGABYTE
+ state.available = 9 * GIGABYTE # asume this should be enough
+ state.capacity = 10
+ state.mount_point = '/not/mounted'
+ decision_function(state, None, None, None)
+
self.decision_function = decision_function
# remember relevant disc stats at start
internal helper called from __init__ and run
"""
- if fs_state.name == 'tmpfs':
- print('ignoring {0}'.format(fs_state))
+ if fs_state.name in NOT_REAL_FILESYSTEMS:
return []
buf = None
buf.add((dt.now(), float(fs_state.available)))
return buf.get_all()
+ def do_check(self):
+ """ check disc fill state """
+
+ # loop over current disc fill state
+ for fs_state in get_filesystem_fill_states():
+ # remember newest value with others for same file system
+ fill_states = self._internal_state_buffer(fs_state)
+ if not fill_states:
+ continue
+
+ # estimate time until full (i.e. when available == 0)
+ times_until_empty = \
+ [calc_time_until_zero(old_state, new_state)
+ for old_state, new_state in pairwise(fill_states)]
+
+ # call user-defined function to decide
+ min_time = None
+ if any(time > 0 for time in times_until_empty):
+ min_time = min(time for time in times_until_empty
+ if time>0)
+ avg_time = calc_time_until_zero(fill_states[0],
+ fill_states[-1])
+ self.decision_function(fs_state,
+ times_until_empty[-1], # newest
+ min_time, # smallest
+ avg_time) # average
+
+
+class DiscFillCheckerThread(Thread, DiscFillChecker):
+ """ a thread that checks disc fill in regular intervals """
+
+ def __init__(self, *args, **kwargs):
+ """ creates a DiscFillCheckerThread
+
+ :param int interval: time in seconds between checks of disc usage
+ :param decision_function: function that decides when to issue a warning
+ or an error. See
+ :py:func:`default_disc_full_decision_function`
+ as an example. Function should raise
+ :py:class:`DiscFullPreventionWarning` and
+ :py:class:`DiscFullPreventionError`
+ """
+ Thread.__init__(self, name='discFillChck')
+ DiscFillChecker.__init__(self, *args, **kwargs)
+
+ # make this thread a daemon
+ self.daemon = True # works since python 2.6
+
+ # init do_stop
+ self.do_stop = False
+
def run(self):
- while True:
+ print('checker thread running')
+ while not self.do_stop:
sleep(self.interval)
- fs_states = get_filesystem_fill_states()
- for fs_state in fs_states:
- # ignore tmpfs since is in memory or swap, so fill up is not
- # as bad
- if fs_state.name == 'tmpfs':
- continue
-
- fill_states = self._internal_state_buffer(fs_state)
-
- # estimate time until full (i.e. when available == 0)
- times_until_empty = \
- [calc_time_until_zero(old_state, new_state)
- for old_state, new_state in pairwise(fill_states)]
-
- # call user-defined function to decide
- min_time = None
- if any(time > 0 for time in times_until_empty):
- min_time = min(time for time in times_until_empty
- if time>0)
- avg_time = calc_time_until_zero(fill_states[0],
- fill_states[-1])
- self.decision_function(fs_state,
- times_until_empty[-1], # newest
- min_time, # smallest
- avg_time) # average
+ self.do_check()
+
+ def stop(self):
+ self.do_stop = True
MEGABYTE = 2**20
DEFAULT_TIME_TO_FULL_WARN = 10 * MINUTE
DEFAULT_TIME_TO_FULL_ERR = MINUTE
-#: levels at which to kill running tests: regular exception (subclass of
-#: Exception), base exception (to avoid "catch(exception)") or sys.exit
+#: levels at which to kill running tests: regular exception, easily caught
+#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked!
KILL_EXCEPTION = "KILL_EXCEPTION"
+
+#: levels at which to kill running tests: exception that is not a subclass of
+#: Exception to avoid "catch(Exception)".
+#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked!
KILL_SYS_EXIT = "KILL_SYS_EXIT"
-KILL_REALLY_MEAN_EXIT = "REALLY_MEAN_EXIT"
+
+#: levels at which to kill running tests: brutal way that does not allow any
+#: default cleanup (os._exit). Last resort when all others do not work!
+KILL_BRUTALLY_ONLY_LAST_RESORT = "KILL_BRUTALLY_ONLY_LAST_RESORT"
+
+#: levels at which to kill running tests: SIGINT
+KILL_SIGINT = "KILL_SIGINT"
+
def default_disc_full_decision_function(curr_state,
estim_empty_newest,
size_err=DEFAULT_DISC_SIZE_ERR,
time_warn=DEFAULT_TIME_TO_FULL_WARN,
time_err=DEFAULT_TIME_TO_FULL_ERR,
- kill_level=KILL_EXCEPTION):
+ kill_level=KILL_SIGINT):
""" decide whether to warn or raise error because disc is getting full
called by DiscFillCheckerThread regularly; params estim_empty_* are result
:py:data:`DEFAULT_DISC_SIZE_WARN`
:param kill_level: how to kill the running application: with sys.exit or
by raising a "regular" Exception subclass (default).
+ Brutal way or SIGINT (default) are also options.
Values should be one like :py:data:`KILL_EXCEPTION`
"""
+ if curr_state.name == '/dev/mapper/ssd2-local':
+ print('checking ssd2: free is {0:.1f}MB, warn at {1:.1f}MB, err at {2:.1f}MB'
+ .format(float(curr_state.available) / MEGABYTE,
+ float(size_warn) / MEGABYTE,
+ float(size_err) / MEGABYTE))
+
# err?
raise_err = False
if curr_state.available < size_err:
# what kind of err?
if raise_err:
+ print('Disc fill below tolerance or disc full soon:', file=stderr)
+ print('{0}, full in {1} s'
+ .format(curr_state, estim_empty_smallest), file=stderr)
if kill_level == KILL_EXCEPTION:
raise DiscFullPreventionError(curr_state, estim_empty_smallest)
- elif kill_level == KILL_REALLY_MEAN_EXIT:
- print('Disc fill below tolerance or disc full soon', file=stderr)
- print('{0}, full in {1} s'
- .format(curr_state, estim_empty_smallest), file=stderr)
+ elif kill_level == KILL_BRUTALLY_ONLY_LAST_RESORT:
print('Exiting now the brutal way', file=stderr)
- bad_exit_function_that_should_not_be_used(1)
- else: # kill_level == SYS_EXIT or unknown
- print('Disc fill below tolerance or disc full soon:', file=stderr)
- print('{0}, full in {1} s'
- .format(curr_state, estim_empty_smallest), file=stderr)
+ brutal_exit_function_that_should_not_be_used(1)
+ elif kill_level == KILL_SYS_EXIT:
print('Exiting now', file=stderr)
- exit("Disc nearly full!")
+ sys_exit("Disc nearly full!")
+ else: # kill_level == KILL_SIGINT or unknown
+ print('Sending SIGINT', file=stderr)
+ kill(getpid(), signal.SIGINT)
# warn?
if curr_state.available < size_warn:
return zip(a, b)
+METHOD_THREAD = 'thread'
+METHOD_ALARM = 'alarm'
+
+
@contextlib.contextmanager
-def disc_fill_checked(*args, **kwargs):
- """ run test function while separate thread watches disc space
+def disc_fill_checked(method=METHOD_THREAD, *args, **kwargs):
+ """ run test function while watching disc space
- all args are forwarded to :py:class:`DiscFillCheckerThread` constructor
+ all args are forwarded to :py:class:`DiscFillChecker` constructor
+
+ depending on method, will create a separate thread (default) to watch disc
+ or use SIGALRM. First one does not interfere with signals within tested
+ method, second can use a simple Exception to stop the test
"""
- # todo: add args and forward
- DiscFillCheckerThread(*args, **kwargs).start()
+ if method == METHOD_THREAD:
+ checker = DiscFillCheckerThread(*args, **kwargs)
+ checker.start()
+ elif method == METHOD_ALARM:
+ # check that alarms are not used otherwise
+ old_handler = signal.getsignal(signal.SIGALRM)
+ if old_handler not in (signal.SIG_IGN, signal.SIG_DFL, None):
+ raise RuntimeError('Signal handler for SIGALRM set '
+ '-- do not want to override!')
+
+ # create checker
+ checker = DiscFillChecker(*args, **kwargs)
+
+ # register checker as alarm handler
+ def alarm_signal_handler(signum ,frame):
+ print('todo: check signum, frame?')
+ checker.do_check()
+ if not checker.do_stop:
+ signal.alarm(checker.interval) # set alarm again
+ signal.signal(signal.SIGALRM, alarm_signal_handler)
+
+ # set alarm
+ signal.alarm(checker.interval)
+ else:
+ raise ValueError('unknown checking method!')
try:
- yield
+ # run decorated function
+ yield checker
finally:
- pass # wait for other to stop or so? or stop it?
+ # stop checker
+ checker.stop()
+
+ # clean up
+ if method == METHOD_THREAD:
+ checker.join()
+ elif method == METHOD_ALARM:
+ signal.alarm(0) # cancel alarms
+ signal.signal(signal.SIGALRM, old_handler)
+ else:
+ raise NotImplementedError('implement cleanup for new method!')