.. codeauthor:: Intra2net
"""
-from itertools import *
+import itertools as it
+from type_helpers import PY2, PY3
+
+
+zip_longest = None
+izip_longest = None
+if PY2:
+ zip_longest = it.izip_longest
+ izip_longest = it.izip_longest
+else:
+ zip_longest = it.zip_longest
+ izip_longest = it.zip_longest
+
def pairwise(iterable):
""" s -> (s0,s1), (s1,s2), (s2, s3), ... """
- a, b = tee(iterable)
+ a, b = it.tee(iterable)
next(b, None)
return zip(a, b)
import os
from warnings import warn
import os.path
-from itertools import zip_longest
+from iter_helpers import zip_longest
from type_helpers import is_str_or_byte, is_file_obj
def __init__(self, *args, **kwargs):
""" forwards all args and kwargs to :py:class:`IterativeReader` """
- super().__init__(*args, **kwargs)
+ super(LineReader, self).__init__(*args, **kwargs)
self.line_buffers = ['' for _ in range(self.n_sources())]
def prepare_result(self, description, new_data, idx):
from contextlib import contextmanager
from threading import Thread
-from time import sleep
+import time
from datetime import datetime as dt
from itertools import tee
from warnings import warn
-from sys import stderr, exit as sys_exit
+from sys import stderr, version_info, exit as sys_exit
from os import getpid, kill, _exit as brutal_exit_function_that_should_not_be_used
import signal
def run(self):
print('checker thread running')
while not self.do_stop:
- sleep(self.interval)
+ time.sleep(self.interval)
self.do_check()
def stop(self):
signal.signal(signal.SIGALRM, old_handler)
else:
raise NotImplementedError('implement cleanup for new method!')
+
+
+def get_perf_counter():
+ """ returns time.perf_counter or time.clock depending on python version
+
+ before py 3.3: return function :py:function:`time.clock`
+ after that: returns function :py:function:`time.perf_counter`
+
+ ..warn:: these will behave differently, so do not make assumption on change
+ or time during sleep or what 0 value means
+
+ Usage:
+
+ from test_helpers import get_perf_counter
+ perf_counter = get_perf_counter()
+ start_val = perf_counter()
+ # ... do something
+ end_val = perf_counter
+
+ """
+
+ if version_info.major == 2:
+ return time.clock
+ elif version_info.major == 3 and version_info.minor < 3:
+ return time.clock
+ else:
+ return time.perf_counter
buf.add(elem)
elems = buf.get_all()
start_elem = max(0, elem-size+1)
- self.assertEqual(elems[:elem+1], range(start_elem, elem+1))
+ self.assertEqual(elems[:elem+1],
+ list(range(start_elem, elem+1)))
self.assertTrue(all(elem == None for elem in elems[elem+1:]))
# test new contents really is the newest
start = max(0, elem-size_new)
- expect_new = range(start, elem+1)
+ expect_new = list(range(start, elem+1))
self.assertEqual(buf.get_all()[-len(expect_new):], expect_new)
from tempfile import mkstemp
import threading
from follow import *
-import time
+from time import sleep
import os
import os.path
+from test_helpers import get_perf_counter
+
+# different counter needed on different python versions
+perf_counter = get_perf_counter()
class LogFileWriter(threading.Thread):
""" creates thread, deamon is True
if n_writes is None, will write indefinitely; else writes text_pattern
- n_writes times, formatted with (counter, time.perf_counter)
+ n_writes times, formatted with (counter, perf_counter)
"""
- super().__init__(daemon=True)
+ super(LogFileWriter, self).__init__()
+ self.daemon = True
self.file_name = file_name
self.text_pattern = text_pattern
self.n_writes = n_writes
if self.do_encode:
file_handle.write(self.text_pattern
- .format(counter, time.perf_counter())
+ .format(counter, perf_counter())
.encode(self.do_encode))
else:
file_handle.write(self.text_pattern
- .format(counter, time.perf_counter()))
+ .format(counter, perf_counter()))
print('wrote {0}'.format(counter))
counter += 1
- time.sleep(self.pause_time)
+ sleep(self.pause_time)
class FollowTester(unittest.TestCase):
follower = Follower(read_handle)
for counter, (source, desc, text, flags) in \
enumerate(follower):
- receive_time = time.perf_counter()
+ receive_time = perf_counter()
print('received text: "{0}"'.format(text))
index = text.index(':')
counter = int(text[:index].strip())
import os
import time
import logging
+from test_helpers import get_perf_counter
+
+perf_counter = get_perf_counter()
from log_read import IterativeReader, LineReader
""" creates thread, deamon is True
if n_writes is None, will write indefinitely; else writes text_pattern
- n_writes times, formatted with (counter, time.perf_counter)
+ n_writes times, formatted with (counter, perf_counter)
If do_encode is True, will encode text to bytes and open file handle
in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text.
If use_logging is False, will open file and run file_handle.write;
If use_logging is True, will create logger that logs to file and use
logging.info (no file_handle.write)
"""
- super().__init__(daemon=True)
+ super(LogFileWriter, self).__init__()
+ self.daemon = True
self.file_name = file_name
self.text_pattern = text_pattern
self.n_writes = n_writes
text = self.text_pattern[counter]
else:
text = self.text_pattern
- text = text.format(counter, time.perf_counter())
+ text = text.format(counter, perf_counter())
if self.do_encode:
text = text.encode(self.do_encode)
reader = IterativeReader(file_handle)
self.helper_test_len(reader, 1)
for counter, (desc, text) in enumerate(reader):
- receive_time = time.perf_counter()
+ receive_time = perf_counter()
text = text.strip()
print('{1}: received text "{0}"'.format(text, counter))
index = text.index(':')
return result
- def assertWarn(self, warn_type, message, func, *args, **kwargs):
+ def assertWarn(self, warn_type, n_warns, message, func, *args, **kwargs):
""" check that calling function raises no warning
use warnings.catch_warnings instead of self.assertWarn
with warnings.catch_warnings(record=True) as warn_catcher:
func(*args, **kwargs)
- self.assertEqual(len(warn_catcher), 1)
+ if n_warns is None:
+ self.assertTrue(len(warn_catcher) > 0)
+ else:
+ self.assertEqual(len(warn_catcher), n_warns)
+
if warn_type is not None:
self.assertTrue(issubclass(warn_catcher[-1].category,
warn_type))
# check warn if time getting low
estim_empty_smallest = (test_helpers.DEFAULT_TIME_TO_FULL_WARN +
test_helpers.DEFAULT_TIME_TO_FULL_ERR) / 2.
- self.assertWarn(test_helpers.DiscFullPreventionWarning, None,
+ self.assertWarn(test_helpers.DiscFullPreventionWarning, 1, None,
f, state, estim_empty_newest, estim_empty_smallest,
estim_empty_mean)
test_helpers.DEFAULT_DISC_SIZE_ERR ) / 2.)
state.used = state.size - state.available
state.capacity = int(100. * state.used / state.size)
- self.assertWarn(test_helpers.DiscFullPreventionWarning, None,
+ self.assertWarn(test_helpers.DiscFullPreventionWarning, 1, None,
f, state, estim_empty_newest, estim_empty_smallest,
estim_empty_mean)
def decision_function(*args):
test_helpers.default_disc_full_decision_function(
*args, size_warn=size_warn, size_err=size_err)
- self.assertWarn(test_helpers.DiscFullPreventionWarning, None,
+ self.assertWarn(test_helpers.DiscFullPreventionWarning, None, None,
self._internal_test_thread, decision_function)
# set space tolerance such that err should be raised