created watch_disc_fill to monitor disc fill of other programs
authorChristian Herdtweck <christian.herdtweck@intra2net.com>
Thu, 4 Feb 2016 13:54:00 +0000 (14:54 +0100)
committerChristian Herdtweck <christian.herdtweck@intra2net.com>
Thu, 4 Feb 2016 13:54:00 +0000 (14:54 +0100)
src/test_helpers.py

index 8ae83d8..eedcd20 100644 (file)
@@ -16,7 +16,7 @@
 # This exception does not invalidate any other reasons why a work based
 # on this file might be covered by the GNU General Public License.
 
-""" Helpers for developping quick test scripts
+""" Helpers for developing quick test scripts
 
 Creation motivated by fear of filling disc space during long-running stress
 tests
@@ -33,7 +33,7 @@ from datetime import datetime as dt
 from itertools import tee
 from warnings import warn
 from sys import stderr, version_info, exit as sys_exit
-from os import getpid, kill, _exit as brutal_exit_function_that_should_not_be_used
+from os import getpid, kill, _exit as brutal_exit_function_DO_NOT_USE
 import signal
 
 try:
@@ -44,7 +44,8 @@ except ImportError:
     WARN_BASE_CLASS = UserWarning
 
 from buffers import LogarithmicBuffer
-from file_helpers import get_filesystem_fill_states, FilesystemFillState
+from file_helpers import get_filesystem_fill_states, FilesystemFillState, \
+                         get_mount_info, get_fill_from_statvfs
 from iter_helpers import pairwise
 
 
@@ -262,7 +263,7 @@ def default_disc_full_decision_function(curr_state,
             raise DiscFullPreventionError(curr_state, estim_empty_smallest)
         elif kill_level == KILL_BRUTALLY_ONLY_LAST_RESORT:
             print('Exiting now the brutal way', file=stderr)
-            brutal_exit_function_that_should_not_be_used(1)
+            brutal_exit_function_DO_NOT_USE(1)
         elif kill_level == KILL_SYS_EXIT:
             print('Exiting now', file=stderr)
             sys_exit("Disc nearly full!")
@@ -367,15 +368,77 @@ def disc_fill_checked(method=METHOD_THREAD, *args, **kwargs):
             raise NotImplementedError('implement cleanup for new method!')
 
 
+def watch_disc_fill(interval=0.1, paths=None):
+    """ watch disc fill state: show start, end and maxima
+
+    gets fill state of all filesystems, reports start and maxima until user
+    hits Ctrl-C; then prints final state and repeats max and exits
+
+    :param float interval: time to sleep between checks (seconds)
+    :param paths: iterable over paths (optional); if given just checks
+                  filesytems that contain them; if not given (default) will
+                  check all filesystems
+    """
+
+    if paths is None:
+        mounts = None
+    else:
+        mounts = tuple(set(get_mount_info(path) for path in paths))
+
+    def get_states():
+        if mounts is None:
+            for state in get_filesystem_fill_states():
+                if state.name in NOT_REAL_FILESYSTEMS:
+                    continue
+                yield state
+        else:
+            for mount_point in mounts:
+                yield get_fill_from_statvfs(mount_point)
+
+    # start state
+    print('{0}: start states are:'.format(dt.now()))
+    max_vals = dict()
+    for state in get_states():
+        print('{0}, {1}B used'.format(state, state.used))
+        max_vals[state.mount_point] = state.used
+
+    try:
+        # infinite loop
+        while True:
+            time.sleep(interval)
+            try:
+                for state in get_states():
+                    if state.used > max_vals[state.mount_point]:
+                        print('{0}: new max {1}B for {2}'
+                              .format(dt.now(), state.used, state))
+                        max_vals[state.mount_point] = state.used
+            except KeyError:
+                # a new file system was added
+                for state in get_states():
+                    if state.mount_point not in max_vals:
+                        print('{0}: new filesystem {1}, {2}B used'
+                              .format(dt.now(), state, state.used))
+                        max_vals[state.mount_point] = state.used
+    except KeyboardInterrupt:
+        # end function
+        print(' --> caught KeyboardInterrupt')
+        print('{0}: end states and maxima:'.format(dt.now()))
+        for state in get_states():
+            print('{0}, end fill is {1}B, max fill was {2}B'
+                  .format(state, state.used, max_vals[state.mount_point]))
+
+
 def get_perf_counter():
     """ returns time.perf_counter or time.clock depending on python version
 
     before py 3.3: return function :py:function:`time.clock`
     after that: returns function :py:function:`time.perf_counter`
 
-    ..warn:: these will behave differently, so do not make assumption on change
+    .. warn:: these will behave differently, so do not make assumption on change
              or time during sleep or what 0 value means
 
+    For simple single-command use cases: use ipython's %timeit magic command.
+
     Usage:
 
       from test_helpers import get_perf_counter