disc checker thread works now in unittests
authorChristian Herdtweck <christian.herdtweck@intra2net.com>
Thu, 14 Jan 2016 13:05:59 +0000 (14:05 +0100)
committerChristian Herdtweck <christian.herdtweck@intra2net.com>
Thu, 14 Jan 2016 13:05:59 +0000 (14:05 +0100)
added alternativ implementation using alarms (untested)

test/test_helper_unittest.py
test_helpers.py

index 0040224..0b0eeb6 100644 (file)
@@ -125,7 +125,7 @@ class TestHelperTester(unittest.TestCase):
 
         # check err if time too low
         estim_empty_smallest = test_helpers.DEFAULT_TIME_TO_FULL_ERR / 2.
-        self.assertRaises(test_helpers.DiscFullPreventionError,
+        self.assertRaises(KeyboardInterrupt,
                           f, state, estim_empty_newest, estim_empty_smallest,
                           estim_empty_mean)
         # show the error message:
@@ -145,11 +145,16 @@ class TestHelperTester(unittest.TestCase):
         state.available = int(test_helpers.DEFAULT_DISC_SIZE_ERR / 2.)
         state.used = state.size - state.available
         state.capacity = int(100. * state.used / state.size)
-        self.assertRaises(test_helpers.DiscFullPreventionError,
+        self.assertRaises(KeyboardInterrupt,
                           f, state, estim_empty_newest, estim_empty_smallest,
                           estim_empty_mean)
 
         # check other kill levels
+        self.assertRaises(test_helpers.DiscFullPreventionError,
+                          f, state, estim_empty_newest, estim_empty_smallest,
+                          estim_empty_mean,
+                          kill_level=test_helpers.KILL_EXCEPTION)
+
         self.assertRaises(SystemExit,
                           f, state, estim_empty_newest, estim_empty_smallest,
                           estim_empty_mean,
@@ -157,7 +162,7 @@ class TestHelperTester(unittest.TestCase):
 
         # this even kills unittest:
         #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean,
-        #  kill_level=test_helpers.KILL_REALLY_MEAN_EXIT)
+        #  kill_level=test_helpers.KILL_BRUTALLY_ONLY_LAST_RESORT)
         #self.fail('should have exited!')
 
     def test_thread(self):
@@ -169,33 +174,70 @@ class TestHelperTester(unittest.TestCase):
         # since we do not want to fill up the disc, we incrementally decrease
         # the tolerance so even our hopefully half-empty filesystems will
         # trigger warnings / errors
-        actual_states = get_filesystem_fill_states()
-        min_free = min(fill_state.available for fill_state in actual_states)
-        print(min_free)
+        min_free = None
+        min_name = None
+        for fill_state in get_filesystem_fill_states():
+            if fill_state.name in test_helpers.NOT_REAL_FILESYSTEMS:
+                continue
+            if (min_free is None) or (fill_state.available < min_free):
+                min_free = fill_state.available
+                min_name = fill_state.name
+        print('min free: {0} with {1}B'.format(min_name, min_free))
+
+        # set space tolerance such that no warn/err should show
+        size_warn = min_free/4;
+        size_err = min_free/2;
+        def decision_function(*args):
+            test_helpers.default_disc_full_decision_function(
+                *args, size_warn=size_warn, size_err=size_err)
+        self._internal_test_thread(decision_function)
+
+        # set space tolerance such that show warn but no err
+        size_warn = min_free*2;
+        size_err = min_free/2;
+        def decision_function(*args):
+            test_helpers.default_disc_full_decision_function(
+                *args, size_warn=size_warn, size_err=size_err)
+        self.assertWarn(test_helpers.DiscFullPreventionWarning, None,
+                        self._internal_test_thread, decision_function)
+
+        # set space tolerance such that err should be raised
+        size_warn = min_free*4;
+        size_err = min_free*2;
+        def decision_function(*args):
+            test_helpers.default_disc_full_decision_function(
+                *args, size_warn=size_warn, size_err=size_err)
+        self.assertRaises(KeyboardInterrupt,
+                          self._internal_test_thread, decision_function)
+
+    def _internal_test_thread(self, decision_function):
 
-        sleep_time_checker = 0
+        sleep_time_checker = 0.1
         sleep_time_tester = 1
         time_tolerance = 0.1
+        main_thread_name = 'MainThread'
+        checker_thread_name = 'discFillChck'
 
-        # run something with default settings. Should give no trouble
-        decision_function = test_helpers.default_disc_full_decision_function
-        print('before: threads are {0}'
-              .format([thread.name for thread in threading.enumerate()]))
+        curr_threads = [thread.name for thread in threading.enumerate()]
+        self.assertEqual(curr_threads, [main_thread_name])
         start_time = dt.now()
-        with test_helpers.disc_fill_checked(sleep_time_checker,
-                                           decision_function):
-            print('within: threads are {0}'
-                  .format([thread.name for thread in threading.enumerate()]))
+        with test_helpers.disc_fill_checked(test_helpers.METHOD_THREAD,
+                                            sleep_time_checker,
+                                            decision_function) as checker:
+            curr_threads = [thread.name for thread in threading.enumerate()]
             self.nonsense_function(sleep_time_tester)
         time_taken = (dt.now() - start_time).total_seconds()
-        print('after: threads are {0}'
-              .format([thread.name for thread in threading.enumerate()]))
+        self.assertEqual(curr_threads, [main_thread_name, checker_thread_name])
+
+        # stop checker thread
+        checker.stop()
+        checker.join()
+        curr_threads = [thread.name for thread in threading.enumerate()]
+        self.assertEqual(curr_threads, [main_thread_name])
+
         self.assertTrue(time_taken >= sleep_time_tester)
         self.assertTrue(time_taken < sleep_time_tester + time_tolerance)
 
-        # run with thresholds set so we get warnings
-        # run with thresholds set so we get errors
-
 
     def nonsense_function(self, sleep_time):
         """ function to run while checking disc fill state; just sleeps """
index 77c243e..7d7072f 100644 (file)
@@ -11,13 +11,14 @@ from __future__ import print_function
 import contextlib
 from threading import Thread
 from time import sleep
-from file_helpers import get_filesystem_fill_states
+from file_helpers import get_filesystem_fill_states, FilesystemFillState
 from datetime import datetime as dt
 from buffers import LogarithmicBuffer
 from itertools import tee
 from warnings import warn
-from sys import exit, stderr
-from os import _exit as bad_exit_function_that_should_not_be_used
+from sys import stderr, exit as sys_exit
+from os import getpid, kill, _exit as brutal_exit_function_that_should_not_be_used
+import signal
 
 try:
     from warnings import ResourceWarning
@@ -27,6 +28,11 @@ except ImportError:
     WARN_BASE_CLASS = UserWarning
 
 
+#: filesystems shown by df that usually do not correspond to something on disc
+#: (except maybe swap)
+NOT_REAL_FILESYSTEMS = 'none', 'shmfs', 'procfs', 'tmpfs'
+
+
 class DiscFullPreventionError(Exception):
     """ Exception raised when disc space gets critical
 
@@ -57,30 +63,27 @@ class DiscFullPreventionWarning(WARN_BASE_CLASS):
         self.time_estim = time_estim
 
 
-class DiscFillCheckerThread(Thread):
-    """ a thread that checks disc fill in regular intervals """
+class DiscFillChecker:
+    """ checker for disc fill status """
 
     def __init__(self, interval=10, decision_function=None):
-        """ creates a DiscFillCheckerThread
-
-        :param int interval: time in seconds between checks of disc usage
-        :param decision_function: function that decides when to issue a warning
-                                 or an error. See
-                                 :py:func:`default_disc_full_decision_function`
-                                 as an example. Function should raise
-                                 :py:class:`DiscFullPreventionWarning` and
-                                 :py:class:`DiscFullPreventionError`
-        """
-        super(DiscFillCheckerThread, self).__init__(name='discFillChck')
-
-        self.daemon = True   # works since python 2.6
 
         # set variables
         self.interval = interval
         if decision_function is None:
             self.decision_function = default_disc_full_decision_function
         else:
-            # need to check warn_func somehow (using introspection?)
+            # check decision_function:
+            GIGABYTE = 2**30
+            state = FilesystemFillState()
+            state.name = 'dummy'
+            state.size = 10 * GIGABYTE
+            state.used = 1 * GIGABYTE
+            state.available = 9 * GIGABYTE   # asume this should be enough
+            state.capacity = 10
+            state.mount_point = '/not/mounted'
+            decision_function(state, None, None, None)
+
             self.decision_function = decision_function
 
         # remember relevant disc stats at start
@@ -93,8 +96,7 @@ class DiscFillCheckerThread(Thread):
 
         internal helper called from __init__ and run
         """
-        if fs_state.name == 'tmpfs':
-            print('ignoring {0}'.format(fs_state))
+        if fs_state.name in NOT_REAL_FILESYSTEMS:
             return []
 
         buf = None
@@ -108,34 +110,65 @@ class DiscFillCheckerThread(Thread):
         buf.add((dt.now(), float(fs_state.available)))
         return buf.get_all()
 
+    def do_check(self):
+        """ check disc fill state """
+
+        # loop over current disc fill state
+        for fs_state in get_filesystem_fill_states():
+            # remember newest value with others for same file system
+            fill_states = self._internal_state_buffer(fs_state)
+            if not fill_states:
+                continue
+
+            # estimate time until full (i.e. when available == 0)
+            times_until_empty = \
+                [calc_time_until_zero(old_state, new_state)
+                 for old_state, new_state in pairwise(fill_states)]
+
+            # call user-defined function to decide
+            min_time = None
+            if any(time > 0 for time in times_until_empty):
+                min_time = min(time for time in times_until_empty
+                               if time>0)
+            avg_time = calc_time_until_zero(fill_states[0],
+                                             fill_states[-1])
+            self.decision_function(fs_state,
+                                   times_until_empty[-1],     # newest
+                                   min_time,                  # smallest
+                                   avg_time)                  # average
+
+
+class DiscFillCheckerThread(Thread, DiscFillChecker):
+    """ a thread that checks disc fill in regular intervals """
+
+    def __init__(self, *args, **kwargs):
+        """ creates a DiscFillCheckerThread
+
+        :param int interval: time in seconds between checks of disc usage
+        :param decision_function: function that decides when to issue a warning
+                                 or an error. See
+                                 :py:func:`default_disc_full_decision_function`
+                                 as an example. Function should raise
+                                 :py:class:`DiscFullPreventionWarning` and
+                                 :py:class:`DiscFullPreventionError`
+        """
+        Thread.__init__(self, name='discFillChck')
+        DiscFillChecker.__init__(self, *args, **kwargs)
+
+        # make this thread a daemon
+        self.daemon = True   # works since python 2.6
+
+        # init do_stop
+        self.do_stop = False
+
     def run(self):
-        while True:
+        print('checker thread running')
+        while not self.do_stop:
             sleep(self.interval)
-            fs_states = get_filesystem_fill_states()
-            for fs_state in fs_states:
-                # ignore tmpfs since is in memory or swap, so fill up is not
-                # as bad
-                if fs_state.name == 'tmpfs':
-                    continue
-
-                fill_states = self._internal_state_buffer(fs_state)
-
-                # estimate time until full (i.e. when available == 0)
-                times_until_empty = \
-                    [calc_time_until_zero(old_state, new_state)
-                     for old_state, new_state in pairwise(fill_states)]
-
-                # call user-defined function to decide
-                min_time = None
-                if any(time > 0 for time in times_until_empty):
-                    min_time = min(time for time in times_until_empty
-                                   if time>0)
-                avg_time = calc_time_until_zero(fill_states[0],
-                                                 fill_states[-1])
-                self.decision_function(fs_state,
-                                       times_until_empty[-1],     # newest
-                                       min_time,                  # smallest
-                                       avg_time)                  # average
+            self.do_check()
+
+    def stop(self):
+        self.do_stop = True
 
 
 MEGABYTE = 2**20
@@ -147,11 +180,22 @@ DEFAULT_DISC_SIZE_ERR = MEGABYTE
 DEFAULT_TIME_TO_FULL_WARN = 10 * MINUTE
 DEFAULT_TIME_TO_FULL_ERR = MINUTE
 
-#: levels at which to kill running tests: regular exception (subclass of
-#: Exception), base exception (to avoid "catch(exception)") or sys.exit
+#: levels at which to kill running tests: regular exception, easily caught
+#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked!
 KILL_EXCEPTION = "KILL_EXCEPTION"
+
+#: levels at which to kill running tests: exception that is not a subclass of
+#: Exception to avoid "catch(Exception)".
+#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked!
 KILL_SYS_EXIT = "KILL_SYS_EXIT"
-KILL_REALLY_MEAN_EXIT = "REALLY_MEAN_EXIT"
+
+#: levels at which to kill running tests: brutal way that does not allow any
+#: default cleanup (os._exit). Last resort when all others do not work!
+KILL_BRUTALLY_ONLY_LAST_RESORT = "KILL_BRUTALLY_ONLY_LAST_RESORT"
+
+#: levels at which to kill running tests: SIGINT
+KILL_SIGINT = "KILL_SIGINT"
+
 
 def default_disc_full_decision_function(curr_state,
                                         estim_empty_newest,
@@ -161,7 +205,7 @@ def default_disc_full_decision_function(curr_state,
                                         size_err=DEFAULT_DISC_SIZE_ERR,
                                         time_warn=DEFAULT_TIME_TO_FULL_WARN,
                                         time_err=DEFAULT_TIME_TO_FULL_ERR,
-                                        kill_level=KILL_EXCEPTION):
+                                        kill_level=KILL_SIGINT):
     """ decide whether to warn or raise error because disc is getting full
 
     called by DiscFillCheckerThread regularly; params estim_empty_* are result
@@ -178,9 +222,16 @@ def default_disc_full_decision_function(curr_state,
                                   :py:data:`DEFAULT_DISC_SIZE_WARN`
     :param kill_level: how to kill the running application: with sys.exit or
                        by raising a "regular" Exception subclass (default).
+                       Brutal way or SIGINT (default) are also options.
                        Values should be one like :py:data:`KILL_EXCEPTION`
     """
 
+    if curr_state.name == '/dev/mapper/ssd2-local':
+        print('checking ssd2: free is {0:.1f}MB, warn at {1:.1f}MB, err at {2:.1f}MB'
+              .format(float(curr_state.available) / MEGABYTE,
+                      float(size_warn) / MEGABYTE,
+                      float(size_err) / MEGABYTE))
+
     # err?
     raise_err = False
     if curr_state.available < size_err:
@@ -190,20 +241,20 @@ def default_disc_full_decision_function(curr_state,
 
     # what kind of err?
     if raise_err:
+        print('Disc fill below tolerance or disc full soon:', file=stderr)
+        print('{0}, full in {1} s'
+              .format(curr_state, estim_empty_smallest), file=stderr)
         if kill_level == KILL_EXCEPTION:
             raise DiscFullPreventionError(curr_state, estim_empty_smallest)
-        elif kill_level == KILL_REALLY_MEAN_EXIT:
-            print('Disc fill below tolerance or disc full soon', file=stderr)
-            print('{0}, full in {1} s'
-                  .format(curr_state, estim_empty_smallest), file=stderr)
+        elif kill_level == KILL_BRUTALLY_ONLY_LAST_RESORT:
             print('Exiting now the brutal way', file=stderr)
-            bad_exit_function_that_should_not_be_used(1)
-        else:   # kill_level == SYS_EXIT or unknown
-            print('Disc fill below tolerance or disc full soon:', file=stderr)
-            print('{0}, full in {1} s'
-                  .format(curr_state, estim_empty_smallest), file=stderr)
+            brutal_exit_function_that_should_not_be_used(1)
+        elif kill_level == KILL_SYS_EXIT:
             print('Exiting now', file=stderr)
-            exit("Disc nearly full!")
+            sys_exit("Disc nearly full!")
+        else:   # kill_level == KILL_SIGINT or unknown
+            print('Sending SIGINT', file=stderr)
+            kill(getpid(), signal.SIGINT)
 
     # warn?
     if curr_state.available < size_warn:
@@ -254,17 +305,59 @@ def pairwise(iterable):
     return zip(a, b)
 
 
+METHOD_THREAD = 'thread'
+METHOD_ALARM = 'alarm'
+
+
 @contextlib.contextmanager
-def disc_fill_checked(*args, **kwargs):
-    """ run test function while separate thread watches disc space
+def disc_fill_checked(method=METHOD_THREAD, *args, **kwargs):
+    """ run test function while watching disc space
 
-    all args are forwarded to :py:class:`DiscFillCheckerThread` constructor
+    all args are forwarded to :py:class:`DiscFillChecker` constructor
+
+    depending on method, will create a separate thread (default) to watch disc
+    or use SIGALRM. First one does not interfere with signals within tested
+    method, second can use a simple Exception to stop the test
     """
 
-    # todo: add args and forward
-    DiscFillCheckerThread(*args, **kwargs).start()
+    if method == METHOD_THREAD:
+        checker = DiscFillCheckerThread(*args, **kwargs)
+        checker.start()
+    elif method == METHOD_ALARM:
+        # check that alarms are not used otherwise
+        old_handler = signal.getsignal(signal.SIGALRM)
+        if old_handler not in (signal.SIG_IGN, signal.SIG_DFL, None):
+            raise RuntimeError('Signal handler for SIGALRM set '
+                               '-- do not want to override!')
+
+        # create checker
+        checker = DiscFillChecker(*args, **kwargs)
+
+        # register checker as alarm handler
+        def alarm_signal_handler(signum ,frame):
+            print('todo: check signum, frame?')
+            checker.do_check()
+            if not checker.do_stop:
+                signal.alarm(checker.interval)   # set alarm again
+        signal.signal(signal.SIGALRM, alarm_signal_handler)
+
+        # set alarm
+        signal.alarm(checker.interval)
+    else:
+        raise ValueError('unknown checking method!')
 
     try:
-        yield
+        # run decorated function
+        yield checker
     finally:
-        pass # wait for other to stop or so? or stop it?
+        # stop checker
+        checker.stop()
+
+        # clean up
+        if method == METHOD_THREAD:
+            checker.join()
+        elif method == METHOD_ALARM:
+            signal.alarm(0)   # cancel alarms
+            signal.signal(signal.SIGALRM, old_handler)
+        else:
+            raise NotImplementedError('implement cleanup for new method!')