+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-""" Helpers for developing quick test scripts
-
-Creation motivated by fear of filling disc space during long-running stress
-tests
-
-.. todo:: Find out why fs not in REAL_FILESYSTEMS generate warnings so quickly
- even if they are still empty
-"""
-
-from __future__ import print_function
-
-from contextlib import contextmanager
-from threading import Thread
-import time
-from datetime import datetime as dt
-from itertools import tee
-from warnings import warn
-from sys import stderr, platform, version_info, exit as sys_exit
-from os import getpid, kill, _exit as brutal_exit_function_DO_NOT_USE
-import signal
-
-try:
- from warnings import ResourceWarning
- WARN_BASE_CLASS = ResourceWarning
-except ImportError:
- # only added in python 3.2
- WARN_BASE_CLASS = UserWarning
-
-from .buffers import LogarithmicBuffer
-from .file_helpers import get_filesystem_fill_states, FilesystemFillState, \
- get_mount_info, get_fill_from_statvfs, \
- NOT_REAL_FILESYSTEMS_SPEC, size_str
-from .iter_helpers import pairwise
-
-
-class DiscFullPreventionError(Exception):
- """ Exception raised when disc space gets critical
-
- issued from function called by :py:class:`DiscFillCheckerThread`
- usually only happens after issuing a :py:class:`DiscFullPreventionWarning`
- """
- def __init__(self, state, time_estim):
- super(DiscFullPreventionError, self).__init__(
- 'Interrupting test to avoid full disc ({0}, full in {1}s)'
- .format(state, "?" if time_estim==None else time_estim))
- self.state = state
- self.time_estim = time_estim
-
-
-class DiscFullPreventionWarning(WARN_BASE_CLASS):
- """ Exception raised when disc space approaches problematic state
-
- issued from function called by :py:class:`DiscFillCheckerThread`
- If disc fills up further, will be followed by a
- :py:class:`DiscFullPreventionError` or sys.exit
- """
- def __init__(self, state, time_estim):
- super(DiscFullPreventionWarning, self).__init__(
- 'Disc nearly full! Might soon abort test ({0}, '
- 'full in {1}s)'
- .format(state, "?" if time_estim==None else time_estim))
- self.state = state
- self.time_estim = time_estim
-
-
-class DiscFillChecker:
- """ checker for disc fill status """
-
- def __init__(self, interval=10, decision_function=None):
-
- # set variables
- self.interval = interval
- if decision_function is None:
- self.decision_function = default_disc_full_decision_function
- else:
- # check decision_function:
- GIGABYTE = 2**30
- state = FilesystemFillState()
- state.name = 'dummy'
- state.size = 10 * GIGABYTE
- state.used = 1 * GIGABYTE
- state.available = 9 * GIGABYTE # asume this should be enough
- state.capacity = 10
- state.mount_point = '/not/mounted'
- decision_function(state, None, None, None)
-
- self.decision_function = decision_function
-
- # remember relevant disc stats at start
- self._bufs = {}
- for fs_state in get_filesystem_fill_states():
- self._internal_state_buffer(fs_state)
-
- def _internal_state_buffer(self, fs_state):
- """ update internal stats buffer, returns all estims
-
- internal helper called from __init__ and run
- """
- if fs_state.name in NOT_REAL_FILESYSTEMS_SPEC:
- return []
- if fs_state.size == 0:
- return []
-
- buf = None
- try:
- buf = self._bufs[fs_state.name]
- except KeyError:
- # new file system -- need to create a new buffer
- buf = LogarithmicBuffer(5)
- self._bufs[fs_state.name] = buf
-
- buf.add((dt.now(), float(fs_state.available)))
- return buf.get_all()
-
- def do_check(self):
- """ check disc fill state """
-
- # loop over current disc fill state
- for fs_state in get_filesystem_fill_states():
- # remember newest value with others for same file system
- fill_states = self._internal_state_buffer(fs_state)
- if not fill_states:
- continue
-
- # estimate time until full (i.e. when available == 0)
- times_until_empty = \
- [calc_time_until_zero(old_state, new_state)
- for old_state, new_state in pairwise(fill_states)]
-
- # call user-defined function to decide
- min_time = None
- if any(time != None and time > 0 for time in times_until_empty):
- min_time = min(time for time in times_until_empty
- if time != None and time>0)
- avg_time = calc_time_until_zero(fill_states[0],
- fill_states[-1])
- self.decision_function(fs_state,
- times_until_empty[-1], # newest
- min_time, # smallest
- avg_time) # average
-
-
-class DiscFillCheckerThread(Thread, DiscFillChecker):
- """ a thread that checks disc fill in regular intervals """
-
- def __init__(self, *args, **kwargs):
- """ creates a DiscFillCheckerThread
-
- :param int interval: time in seconds between checks of disc usage
- :param decision_function: function that decides when to issue a warning
- or an error. See
- :py:func:`default_disc_full_decision_function`
- as an example. Function should raise
- :py:class:`DiscFullPreventionWarning` and
- :py:class:`DiscFullPreventionError`
- """
- Thread.__init__(self, name='discFillChck')
- DiscFillChecker.__init__(self, *args, **kwargs)
-
- # make this thread a daemon
- self.daemon = True # works since python 2.6
-
- # init do_stop
- self.do_stop = False
-
- def run(self):
- print('checker thread running')
- while not self.do_stop:
- time.sleep(self.interval)
- self.do_check()
-
- def stop(self):
- self.do_stop = True
-
-
-MEGABYTE = 2**20
-MINUTE = 60
-
-#: default thresholds for :py:func:`default_disc_full_decision_function`
-DEFAULT_DISC_SIZE_WARN = 10 * MEGABYTE
-DEFAULT_DISC_SIZE_ERR = MEGABYTE
-DEFAULT_TIME_TO_FULL_WARN = 10 * MINUTE
-DEFAULT_TIME_TO_FULL_ERR = MINUTE
-
-#: levels at which to kill running tests: regular exception, easily caught
-#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked!
-KILL_EXCEPTION = "KILL_EXCEPTION"
-
-#: levels at which to kill running tests: exception that is not a subclass of
-#: Exception to avoid "catch(Exception)".
-#: will NOT work with default check_method METHOD_THREAD in disc_fill_checked!
-KILL_SYS_EXIT = "KILL_SYS_EXIT"
-
-#: levels at which to kill running tests: brutal way that does not allow any
-#: default cleanup (os._exit). Last resort when all others do not work!
-KILL_BRUTALLY_ONLY_LAST_RESORT = "KILL_BRUTALLY_ONLY_LAST_RESORT"
-
-#: levels at which to kill running tests: SIGINT
-KILL_SIGINT = "KILL_SIGINT"
-
-
-def default_disc_full_decision_function(curr_state,
- estim_empty_newest,
- estim_empty_smallest,
- estim_empty_mean,
- size_warn=DEFAULT_DISC_SIZE_WARN,
- size_err=DEFAULT_DISC_SIZE_ERR,
- time_warn=DEFAULT_TIME_TO_FULL_WARN,
- time_err=DEFAULT_TIME_TO_FULL_ERR,
- kill_level=KILL_SIGINT):
- """ decide whether to warn or raise error because disc is getting full
-
- called by DiscFillCheckerThread regularly; params estim_empty_* are result
- of :py:func:`calc_time_until_zero`, so None means that disc state did not
- change.
-
- Issues a :py:class:`DiscFullPreventionWarning` when space gets problematic
- and raises an error or calls sys.exit when it is really critical.
-
- :param estim_empty_newest: the newest time estimate
- :param estim_empty_smallest: the smallest of all non-negative estimates
- :param estim_empty_mean: estim between earliest and newest estimate
- :param int size/err_warn/err: thresholds, default to constants like
- :py:data:`DEFAULT_DISC_SIZE_WARN`
- :param kill_level: how to kill the running application: with sys.exit or
- by raising a "regular" Exception subclass (default).
- Brutal way or SIGINT (default) are also options.
- Values should be one like :py:data:`KILL_EXCEPTION`
- """
-
- # err?
- raise_err = False
- if curr_state.available < size_err:
- raise_err = True
- if (estim_empty_smallest != None) and (estim_empty_smallest < time_err):
- raise_err = True
-
- # what kind of err?
- if raise_err:
- print('Disc fill below tolerance or disc full soon:', file=stderr)
- print('{0}, full in {1} s'
- .format(curr_state, estim_empty_smallest), file=stderr)
- if kill_level == KILL_EXCEPTION:
- raise DiscFullPreventionError(curr_state, estim_empty_smallest)
- elif kill_level == KILL_BRUTALLY_ONLY_LAST_RESORT:
- print('Exiting now the brutal way', file=stderr)
- brutal_exit_function_DO_NOT_USE(1)
- elif kill_level == KILL_SYS_EXIT:
- print('Exiting now', file=stderr)
- sys_exit("Disc nearly full!")
- else: # kill_level == KILL_SIGINT or unknown
- print('Sending SIGINT', file=stderr)
- kill(getpid(), signal.SIGINT)
-
- # warn?
- if curr_state.available < size_warn:
- warn(DiscFullPreventionWarning(curr_state, estim_empty_smallest),
- stacklevel=3)
- if (estim_empty_smallest != None) and (estim_empty_smallest < time_warn):
- warn(DiscFullPreventionWarning(curr_state, estim_empty_smallest),
- stacklevel=3)
-
-
-def calc_time_until_zero(old_state, new_state):
- """ calc time until value hits zero given (time1, value1), (time2, value2)
-
- returns time from now until empty in seconds
-
- delta_date = new_date - old_date
- delta_free = new_free - old_free
- free(t) = new_free + (t-new_t) * delta_free / delta_date
- = 0
- <==> -new_free = (t-new_t) * delta_free / delta_date
- <==> -new_free * delta_date / date_free = t - new_t
- <==> diff_t := new_t - t = new_free * delta_date / delta_free
-
- compare to now:
- diff_to_now = t + (-new_t + new_t) - now = (t - new_t) + (new_t - now)
- = -diff_t - (now-new_t) = (new_t-now) - diff_t
-
- To avoid zero-division, returns None if delta_free == 0
- """
- old_date, old_free = old_state
- new_date, new_free = new_state
- if new_free == old_free:
- return None # avoid zero-division
- time_diff = new_free * (new_date - old_date).total_seconds() \
- / (new_free - old_free)
-
- # compare to now
- return (new_date - dt.now()).total_seconds() - time_diff
-
-
-METHOD_THREAD = 'thread'
-METHOD_ALARM = 'alarm'
-
-
-@contextmanager
-def disc_fill_checked(method=METHOD_THREAD, *args, **kwargs):
- """ run test function while watching disc space
-
- all args are forwarded to :py:class:`DiscFillChecker` constructor
-
- depending on method, will create a separate thread (default) to watch disc
- or use SIGALRM. First one does not interfere with signals within tested
- method, second can use a simple Exception to stop the test
- """
-
- if method == METHOD_THREAD:
- checker = DiscFillCheckerThread(*args, **kwargs)
- checker.start()
- elif method == METHOD_ALARM:
- # check that alarms are not used otherwise
- old_handler = signal.getsignal(signal.SIGALRM)
- if old_handler not in (signal.SIG_IGN, signal.SIG_DFL, None):
- raise RuntimeError('Signal handler for SIGALRM set '
- '-- do not want to override!')
-
- # create checker
- checker = DiscFillChecker(*args, **kwargs)
-
- # register checker as alarm handler
- def alarm_signal_handler(signum ,frame):
- print('todo: check signum, frame?')
- checker.do_check()
- if not checker.do_stop:
- signal.alarm(checker.interval) # set alarm again
- signal.signal(signal.SIGALRM, alarm_signal_handler)
-
- # set alarm
- signal.alarm(checker.interval)
- else:
- raise ValueError('unknown checking method!')
-
- try:
- # run decorated function
- yield checker
- finally:
- # stop checker
- checker.stop()
-
- # clean up
- if method == METHOD_THREAD:
- checker.join()
- elif method == METHOD_ALARM:
- signal.alarm(0) # cancel alarms
- signal.signal(signal.SIGALRM, old_handler)
- else:
- raise NotImplementedError('implement cleanup for new method!')
-
-
-def watch_disc_fill(paths=None, interval=0.1):
- """ watch disc fill state: show start, end and maxima
-
- gets fill state of all filesystems, reports start and maxima until user
- hits Ctrl-C; then prints final state and repeats max and exits
-
- :param float interval: time to sleep between checks (seconds)
- :param paths: iterable over paths (optional); if given just checks
- filesytems that contain them; if not given (default) will
- check all filesystems
- """
-
- if paths is None:
- mounts = None
- else:
- mounts = tuple(set(get_mount_info(path) for path in paths))
-
- def get_states():
- if mounts is None:
- for state in get_filesystem_fill_states():
- if state.name in NOT_REAL_FILESYSTEMS_SPEC:
- continue
- yield state
- else:
- for mount_point in mounts:
- yield get_fill_from_statvfs(mount_point)
-
- # start state
- print('{0}: start states are:'.format(dt.now()))
- first_fills = dict()
- max_vals = dict()
- for state in get_states():
- print('{0}, {1} used'.format(state, size_str(state.used)))
- max_vals[state.mount_point] = state.used
- first_fills[state.mount_point] = state.used
-
- try:
- # loop until user presses Ctrl-C
- print('checking every {0:.1f}s, press Ctrl-C to stop...'
- .format(interval))
- while True:
- time.sleep(interval)
- try:
- for state in get_states():
- if state.used > max_vals[state.mount_point]:
- print('{0}: new max {1:,d}B for {2}'
- .format(dt.now(), state.used, state))
- max_vals[state.mount_point] = state.used
- except KeyError:
- # a new file system was added
- for state in get_states():
- if state.mount_point not in max_vals:
- print('{0}: new filesystem {1}, {2:,d}B used'
- .format(dt.now(), state, state.used))
- max_vals[state.mount_point] = state.used
- first_fills[state.mount_point] = state.used
- except KeyboardInterrupt:
- # end function
- print(' --> caught KeyboardInterrupt')
- print('{0}: end states and maxima:'.format(dt.now()))
- for state in get_states():
- firstf = first_fills[state.mount_point]
- maxf = max_vals[state.mount_point]
- print('{0}, start/max/end fill: {1} / {2} ({3}) / {4} ({5})'
- .format(state, size_str(firstf), size_str(maxf),
- size_str(maxf-firstf, True), size_str(state.used),
- size_str(state.used-firstf, True)))
-
-
-def get_perf_counter():
- """ return the best monotonic clock for performance measure
-
- returned clocks are monotonic and measure "absoulte"/wall-clock time
- differences (as opposed to time processor spends on the particular python
- code, so sleep(1) should result in a perf counter diff of 1).
-
- before py 3.3: return monotonic clock from linux system
- after that: returns function :py:function:`time.perf_counter`
-
- .. warn:: absolute values mean nothing, only differences between calls!
-
- For simple single-command use cases: use ipython's %timeit magic command.
-
- Usage:
-
- from test_helpers import get_perf_counter
- perf_counter = get_perf_counter()
- start_val = perf_counter()
- # ... do something
- end_val = perf_counter
-
- """
-
- if version_info.major == 2:
- if platform.startswith('linux'):
- return monotonic_clock_py2()
- else:
- return time.time
- elif version_info.major == 3 and version_info.minor < 3:
- return time.clock
- else:
- return time.perf_counter
-
-
-def monotonic_clock_py2():
- """ get a monotonic clock in py2 using ctypes and clock_gettime
-
- copied from:
- https://stackoverflow.com/questions/1205722/
- how-do-i-get-monotonic-time-durations-in-python
- """
-
- import ctypes
- from os import strerror
- CLOCK_MONOTONIC_RAW = 4 # see <linux/time.h>
-
- class timespec(ctypes.Structure):
- _fields_ = [
- ('tv_sec', ctypes.c_long),
- ('tv_nsec', ctypes.c_long)
- ]
-
- librt = ctypes.CDLL('librt.so.1', use_errno=True)
- clock_gettime = librt.clock_gettime
- clock_gettime.argtypes = [ctypes.c_int, ctypes.POINTER(timespec)]
-
- def monotonic_time():
- t = timespec()
- if clock_gettime(CLOCK_MONOTONIC_RAW , ctypes.pointer(t)) != 0:
- errno_ = ctypes.get_errno()
- raise OSError(errno_, strerror(errno_))
- return t.tv_sec + t.tv_nsec * 1e-9
-
- return monotonic_time
+++ /dev/null
-# The software in this package is distributed under the GNU General
-# Public License version 2 (with a special exception described below).
-#
-# A copy of GNU General Public License (GPL) is included in this distribution,
-# in the file COPYING.GPL.
-#
-# As a special exception, if other files instantiate templates or use macros
-# or inline functions from this file, or you compile this file and link it
-# with other works to produce a work based on this file, this file
-# does not by itself cause the resulting work to be covered
-# by the GNU General Public License.
-#
-# However the source code for this file must still be made available
-# in accordance with section (3) of the GNU General Public License.
-#
-# This exception does not invalidate any other reasons why a work based
-# on this file might be covered by the GNU General Public License.
-#
-# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
-
-""" test_helper_unittest.py: unit tests for test_helpers
-
-Tests classes and functions in test_helpers
-
-Should be able run from python2 and python3!
-
-For help see :py:mod:`unittest`
-"""
-
-# TODO: remove print, use logging instead
-from __future__ import print_function
-
-from __future__ import absolute_import
-
-import unittest
-from datetime import timedelta, datetime as dt
-from src.file_helpers import FilesystemFillState, get_filesystem_fill_states
-import warnings
-from time import sleep
-import threading
-
-from src import test_helpers
-
-
-class TestHelperTester(unittest.TestCase):
-
- def assertNoWarn(self, func, *args, **kwargs):
- """ check that calling function raises no warning
-
- use warnings.catch_warnings instead of self.assertWarn
- for compatibility with python 2
- """
-
- result = None
- with warnings.catch_warnings(record=True) as warn_catcher:
- result = func(*args, **kwargs)
-
- self.assertEqual(len(warn_catcher), 0)
-
- return result
-
-
- def assertWarn(self, warn_type, n_warns, message, func, *args, **kwargs):
- """ check that calling function raises no warning
-
- use warnings.catch_warnings instead of self.assertWarn
- for compatibility with python 2
- """
-
- with warnings.catch_warnings(record=True) as warn_catcher:
- func(*args, **kwargs)
-
- if n_warns is None:
- self.assertTrue(len(warn_catcher) > 0)
- else:
- self.assertEqual(len(warn_catcher), n_warns)
-
- if warn_type is not None:
- self.assertTrue(issubclass(warn_catcher[-1].category,
- warn_type))
- if message is not None:
- self.assertEqual(warn_catcher[-1].message, message)
- #print(warn_catcher[-1].message)
-
-
- def test_zero_estimator(self):
- """ test function :py:func:`test_helpers.calc_time_until_zero` """
-
- # need to allow some tolerance since calc_time_until_zero uses now()
- TOLERANCE = 0.1
-
- start_time = dt.now()
- time1 = start_time - timedelta(seconds=20) # 20s ago
- time2 = start_time - timedelta(seconds=10) # 10s ago
-
- # test constant disc usage
- self.assertEqual(test_helpers.calc_time_until_zero((time1, 1),
- (time2, 1)), None)
-
- # test gaining disc space (1 byte per second, empty 30s ago)
- estim = test_helpers.calc_time_until_zero((time1, 10), (time2, 20))
- self.assertTrue(estim < 0)
- expect = ((time1-timedelta(seconds=10)) - dt.now()).total_seconds()
- self.assertLess(abs(estim - expect), TOLERANCE)
-
- # test slowly losing disc space (1 byte per second, empty now)
- estim = test_helpers.calc_time_until_zero((time1, 20), (time2, 10))
- expect = (start_time - dt.now()).total_seconds()
- self.assertLess(abs(estim - expect), TOLERANCE)
-
-
- def test_decision_function(self):
- """ tests function default_disc_full_decision_function
-
- :py:func:`test_helpers.default_disc_full_decision_function`
- """
-
- f = test_helpers.default_disc_full_decision_function
-
- # create dummy state
- GIGABYTE = 2**30
- state = FilesystemFillState()
- state.name = 'dummy'
- state.size = 10 * GIGABYTE
- state.used = 9 * GIGABYTE
- state.available = 1 * GIGABYTE
- state.capacity = 90
- state.mount_point = '/not/mounted'
-
- HOUR = 60*60
- estim_empty_newest = None # currently not used
- estim_empty_mean = None # currently not used
- estim_empty_smallest = None # simulate no proper estimate
-
- # check no warning if no time estimate (and enough space)
- self.assertNoWarn(f, state, estim_empty_newest, estim_empty_smallest,
- estim_empty_mean)
-
- # check no warning if enough time (and enough space)
- estim_empty_smallest = HOUR # enough time
- self.assertNoWarn(f, state, estim_empty_newest, estim_empty_smallest,
- estim_empty_mean)
-
- # check warn if time getting low
- estim_empty_smallest = (test_helpers.DEFAULT_TIME_TO_FULL_WARN +
- test_helpers.DEFAULT_TIME_TO_FULL_ERR) / 2.
- self.assertWarn(test_helpers.DiscFullPreventionWarning, 1, None,
- f, state, estim_empty_newest, estim_empty_smallest,
- estim_empty_mean)
-
- # check err if time too low
- estim_empty_smallest = test_helpers.DEFAULT_TIME_TO_FULL_ERR / 2.
- self.assertRaises(KeyboardInterrupt,
- f, state, estim_empty_newest, estim_empty_smallest,
- estim_empty_mean)
- # show the error message:
- #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean)
-
- # check warning if disc space is getting low
- estim_empty_smallest = HOUR
- state.available = int((test_helpers.DEFAULT_DISC_SIZE_WARN +
- test_helpers.DEFAULT_DISC_SIZE_ERR ) / 2.)
- state.used = state.size - state.available
- state.capacity = int(100. * state.used / state.size)
- self.assertWarn(test_helpers.DiscFullPreventionWarning, 1, None,
- f, state, estim_empty_newest, estim_empty_smallest,
- estim_empty_mean)
-
- # check error if disc space is too low
- state.available = int(test_helpers.DEFAULT_DISC_SIZE_ERR / 2.)
- state.used = state.size - state.available
- state.capacity = int(100. * state.used / state.size)
- self.assertRaises(KeyboardInterrupt,
- f, state, estim_empty_newest, estim_empty_smallest,
- estim_empty_mean)
-
- # check other kill levels
- self.assertRaises(test_helpers.DiscFullPreventionError,
- f, state, estim_empty_newest, estim_empty_smallest,
- estim_empty_mean,
- kill_level=test_helpers.KILL_EXCEPTION)
-
- self.assertRaises(SystemExit,
- f, state, estim_empty_newest, estim_empty_smallest,
- estim_empty_mean,
- kill_level=test_helpers.KILL_SYS_EXIT)
-
- # this even kills unittest:
- #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean,
- # kill_level=test_helpers.KILL_BRUTALLY_ONLY_LAST_RESORT)
- #self.fail('should have exited!')
-
- @unittest.skip('Works on its own but now when building-and-installing')
- def test_thread(self):
- """ test the DiscFillCheckerThread """
-
- #threading.setprofile(self.threading_profile_func)
- # somehow damages df output ... need to check!
-
- # since we do not want to fill up the disc, we incrementally decrease
- # the tolerance so even our hopefully half-empty filesystems will
- # trigger warnings / errors
- min_free = None
- min_name = None
- for fill_state in get_filesystem_fill_states():
- if fill_state.size == 0:
- continue
- if (min_free is None) or (fill_state.available < min_free):
- min_free = fill_state.available
- min_name = fill_state.name
- print('min free: {0} with {1}B'.format(min_name, min_free))
-
- if min_free is None:
- self.fail('Found no filesystem with size>0 --> cannot test')
-
- # set space tolerance such that no warn/err should show
- size_warn = min_free/4;
- size_err = min_free/2;
- def decision_function(*args):
- test_helpers.default_disc_full_decision_function(
- *args, size_warn=size_warn, size_err=size_err)
- self._internal_test_thread(decision_function)
-
- # set space tolerance such that show warn but no err
- size_warn = min_free*2;
- size_err = min_free/2;
- def decision_function(*args):
- test_helpers.default_disc_full_decision_function(
- *args, size_warn=size_warn, size_err=size_err)
- self.assertWarn(test_helpers.DiscFullPreventionWarning, None, None,
- self._internal_test_thread, decision_function)
-
- # set space tolerance such that err should be raised
- size_warn = min_free*4;
- size_err = min_free*2;
- def decision_function(*args):
- test_helpers.default_disc_full_decision_function(
- *args, size_warn=size_warn, size_err=size_err)
- self.assertRaises(KeyboardInterrupt,
- self._internal_test_thread, decision_function)
-
- def _internal_test_thread(self, decision_function):
-
- sleep_time_checker = 0.1
- sleep_time_tester = 1
- time_tolerance = 0.1
- main_thread_name = 'MainThread'
- checker_thread_name = 'discFillChck'
-
- curr_threads = [thread.name for thread in threading.enumerate()]
- self.assertEqual(curr_threads, [main_thread_name])
- start_time = dt.now()
- with test_helpers.disc_fill_checked(test_helpers.METHOD_THREAD,
- sleep_time_checker,
- decision_function) as checker:
- curr_threads = [thread.name for thread in threading.enumerate()]
- self.nonsense_function(sleep_time_tester)
- time_taken = (dt.now() - start_time).total_seconds()
- self.assertEqual(curr_threads, [main_thread_name, checker_thread_name])
-
- # stop checker thread
- checker.stop()
- checker.join()
- curr_threads = [thread.name for thread in threading.enumerate()]
- self.assertEqual(curr_threads, [main_thread_name])
-
- self.assertTrue(time_taken >= sleep_time_tester)
- self.assertTrue(time_taken < sleep_time_tester + time_tolerance)
-
-
- def nonsense_function(self, sleep_time):
- """ function to run while checking disc fill state; just sleeps
-
- to make this test quicker could try to use profiling to count calls to
- disc checker function and stop this after 2 or so
- """
- sleep(sleep_time)
-
- def threading_profile_func(self, *args, **kwargs):
- """ todo: set this as profiling function for checker thread, count
- calls to disc checker function, trigger event or so """
- print('profiling with args {0} and kwargs {1}'
- .format([arg for arg in args if len(args) < 20], kwargs))
-
-if __name__ == '__main__':
- unittest.main()