# This exception does not invalidate any other reasons why a work based
# on this file might be covered by the GNU General Public License.
-""" Helpers for developping quick test scripts
+""" Helpers for developing quick test scripts
Creation motivated by fear of filling disc space during long-running stress
tests
from itertools import tee
from warnings import warn
from sys import stderr, version_info, exit as sys_exit
-from os import getpid, kill, _exit as brutal_exit_function_that_should_not_be_used
+from os import getpid, kill, _exit as brutal_exit_function_DO_NOT_USE
import signal
try:
WARN_BASE_CLASS = UserWarning
from buffers import LogarithmicBuffer
-from file_helpers import get_filesystem_fill_states, FilesystemFillState
+from file_helpers import get_filesystem_fill_states, FilesystemFillState, \
+ get_mount_info, get_fill_from_statvfs
from iter_helpers import pairwise
raise DiscFullPreventionError(curr_state, estim_empty_smallest)
elif kill_level == KILL_BRUTALLY_ONLY_LAST_RESORT:
print('Exiting now the brutal way', file=stderr)
- brutal_exit_function_that_should_not_be_used(1)
+ brutal_exit_function_DO_NOT_USE(1)
elif kill_level == KILL_SYS_EXIT:
print('Exiting now', file=stderr)
sys_exit("Disc nearly full!")
raise NotImplementedError('implement cleanup for new method!')
+def watch_disc_fill(interval=0.1, paths=None):
+ """ watch disc fill state: show start, end and maxima
+
+ gets fill state of all filesystems, reports start and maxima until user
+ hits Ctrl-C; then prints final state and repeats max and exits
+
+ :param float interval: time to sleep between checks (seconds)
+ :param paths: iterable over paths (optional); if given just checks
+ filesytems that contain them; if not given (default) will
+ check all filesystems
+ """
+
+ if paths is None:
+ mounts = None
+ else:
+ mounts = tuple(set(get_mount_info(path) for path in paths))
+
+ def get_states():
+ if mounts is None:
+ for state in get_filesystem_fill_states():
+ if state.name in NOT_REAL_FILESYSTEMS:
+ continue
+ yield state
+ else:
+ for mount_point in mounts:
+ yield get_fill_from_statvfs(mount_point)
+
+ # start state
+ print('{0}: start states are:'.format(dt.now()))
+ max_vals = dict()
+ for state in get_states():
+ print('{0}, {1}B used'.format(state, state.used))
+ max_vals[state.mount_point] = state.used
+
+ try:
+ # infinite loop
+ while True:
+ time.sleep(interval)
+ try:
+ for state in get_states():
+ if state.used > max_vals[state.mount_point]:
+ print('{0}: new max {1}B for {2}'
+ .format(dt.now(), state.used, state))
+ max_vals[state.mount_point] = state.used
+ except KeyError:
+ # a new file system was added
+ for state in get_states():
+ if state.mount_point not in max_vals:
+ print('{0}: new filesystem {1}, {2}B used'
+ .format(dt.now(), state, state.used))
+ max_vals[state.mount_point] = state.used
+ except KeyboardInterrupt:
+ # end function
+ print(' --> caught KeyboardInterrupt')
+ print('{0}: end states and maxima:'.format(dt.now()))
+ for state in get_states():
+ print('{0}, end fill is {1}B, max fill was {2}B'
+ .format(state, state.used, max_vals[state.mount_point]))
+
+
def get_perf_counter():
""" returns time.perf_counter or time.clock depending on python version
before py 3.3: return function :py:function:`time.clock`
after that: returns function :py:function:`time.perf_counter`
- ..warn:: these will behave differently, so do not make assumption on change
+ .. warn:: these will behave differently, so do not make assumption on change
or time during sleep or what 0 value means
+ For simple single-command use cases: use ipython's %timeit magic command.
+
Usage:
from test_helpers import get_perf_counter