from datetime import timedelta, datetime as dt
from file_helpers import FilesystemFillState, get_filesystem_fill_states
import warnings
+from time import sleep
+import threading
import test_helpers
def test_thread(self):
""" test the DiscFillCheckerThread """
+ #threading.setprofile(self.threading_profile_func)
+ # somehow damages df output ... need to check!
+
# since we do not want to fill up the disc, we incrementally decrease
# the tolerance so even our hopefully half-empty filesystems will
# trigger warnings / errors
min_free = min(fill_state.available for fill_state in actual_states)
print(min_free)
- sleep_time = 0
+ sleep_time_checker = 0
+ sleep_time_tester = 1
+ time_tolerance = 0.1
# run something with default settings. Should give no trouble
+ decision_function = test_helpers.default_disc_full_decision_function
+ print('before: threads are {0}'
+ .format([thread.name for thread in threading.enumerate()]))
+ start_time = dt.now()
+ with test_helpers.disc_fill_checked(sleep_time_checker,
+ decision_function):
+ print('within: threads are {0}'
+ .format([thread.name for thread in threading.enumerate()]))
+ self.nonsense_function(sleep_time_tester)
+ time_taken = (dt.now() - start_time).total_seconds()
+ print('after: threads are {0}'
+ .format([thread.name for thread in threading.enumerate()]))
+ self.assertTrue(time_taken >= sleep_time_tester)
+ self.assertTrue(time_taken < sleep_time_tester + time_tolerance)
# run with thresholds set so we get warnings
# run with thresholds set so we get errors
+ def nonsense_function(self, sleep_time):
+ """ function to run while checking disc fill state; just sleeps """
+ sleep(sleep_time)
+
+ def threading_profile_func(self, *args, **kwargs):
+ print('profiling with args {0} and kwargs {1}'
+ .format([arg for arg in args if len(args) < 20], kwargs))
+
if __name__ == '__main__':
unittest.main()