fix unittest compatibility issues
authorChristian Herdtweck <christian.herdtweck@intra2net.com>
Fri, 15 Jan 2016 14:45:14 +0000 (15:45 +0100)
committerChristian Herdtweck <christian.herdtweck@intra2net.com>
Fri, 15 Jan 2016 14:45:14 +0000 (15:45 +0100)
added iter_helpers.[i]zip_longest
added test_helpers.get_perf_counter

src/iter_helpers.py
src/log_read.py
src/test_helpers.py
test/test_buffer.py
test/test_follow.py
test/test_log_read.py
test/test_test_helper.py

index 624ede5..1ce9c10 100644 (file)
@@ -24,10 +24,22 @@ been deemed useful
 .. codeauthor:: Intra2net
 """
 
-from itertools import *
+import itertools as it
+from type_helpers import PY2, PY3
+
+
+zip_longest = None
+izip_longest = None
+if PY2:
+    zip_longest = it.izip_longest
+    izip_longest = it.izip_longest
+else:
+    zip_longest = it.zip_longest
+    izip_longest = it.zip_longest
+
 
 def pairwise(iterable):
     """ s -> (s0,s1), (s1,s2), (s2, s3), ...  """
-    a, b = tee(iterable)
+    a, b = it.tee(iterable)
     next(b, None)
     return zip(a, b)
index 0c30e88..97bb917 100644 (file)
@@ -39,7 +39,7 @@ like date, time, module name, urgency and message.
 import os
 from warnings import warn
 import os.path
-from itertools import zip_longest
+from iter_helpers import zip_longest
 
 from type_helpers import is_str_or_byte, is_file_obj
 
@@ -273,7 +273,7 @@ class LineReader(IterativeReader):
 
     def __init__(self, *args, **kwargs):
         """ forwards all args and kwargs to :py:class:`IterativeReader` """
-        super().__init__(*args, **kwargs)
+        super(LineReader, self).__init__(*args, **kwargs)
         self.line_buffers = ['' for _ in range(self.n_sources())]
 
     def prepare_result(self, description, new_data, idx):
index eebebc6..8ae83d8 100644 (file)
@@ -28,11 +28,11 @@ from __future__ import print_function
 
 from contextlib import contextmanager
 from threading import Thread
-from time import sleep
+import time
 from datetime import datetime as dt
 from itertools import tee
 from warnings import warn
-from sys import stderr, exit as sys_exit
+from sys import stderr, version_info, exit as sys_exit
 from os import getpid, kill, _exit as brutal_exit_function_that_should_not_be_used
 import signal
 
@@ -184,7 +184,7 @@ class DiscFillCheckerThread(Thread, DiscFillChecker):
     def run(self):
         print('checker thread running')
         while not self.do_stop:
-            sleep(self.interval)
+            time.sleep(self.interval)
             self.do_check()
 
     def stop(self):
@@ -365,3 +365,30 @@ def disc_fill_checked(method=METHOD_THREAD, *args, **kwargs):
             signal.signal(signal.SIGALRM, old_handler)
         else:
             raise NotImplementedError('implement cleanup for new method!')
+
+
+def get_perf_counter():
+    """ returns time.perf_counter or time.clock depending on python version
+
+    before py 3.3: return function :py:function:`time.clock`
+    after that: returns function :py:function:`time.perf_counter`
+
+    ..warn:: these will behave differently, so do not make assumption on change
+             or time during sleep or what 0 value means
+
+    Usage:
+
+      from test_helpers import get_perf_counter
+      perf_counter = get_perf_counter()
+      start_val = perf_counter()
+      # ... do something
+      end_val = perf_counter
+
+    """
+
+    if version_info.major == 2:
+        return time.clock
+    elif version_info.major == 3 and version_info.minor < 3:
+        return time.clock
+    else:
+        return time.perf_counter
index 96d02c7..46ae115 100644 (file)
@@ -51,7 +51,8 @@ class CircBufferTester(unittest.TestCase):
                 buf.add(elem)
                 elems = buf.get_all()
                 start_elem = max(0, elem-size+1)
-                self.assertEqual(elems[:elem+1], range(start_elem, elem+1))
+                self.assertEqual(elems[:elem+1],
+                                 list(range(start_elem, elem+1)))
                 self.assertTrue(all(elem == None for elem in elems[elem+1:]))
 
 
@@ -87,7 +88,7 @@ class LogarithmicBufferTest(unittest.TestCase):
 
                 # test new contents really is the newest
                 start = max(0, elem-size_new)
-                expect_new = range(start, elem+1)
+                expect_new = list(range(start, elem+1))
                 self.assertEqual(buf.get_all()[-len(expect_new):], expect_new)
 
 
index 0febfce..dbb4a35 100644 (file)
@@ -34,9 +34,13 @@ import unittest
 from tempfile import mkstemp
 import threading
 from follow import *
-import time
+from time import sleep
 import os
 import os.path
+from test_helpers import get_perf_counter
+
+# different counter needed on different python versions
+perf_counter = get_perf_counter()
 
 
 class LogFileWriter(threading.Thread):
@@ -47,9 +51,10 @@ class LogFileWriter(threading.Thread):
         """ creates thread, deamon is True
         
         if n_writes is None, will write indefinitely; else writes text_pattern
-        n_writes times, formatted with (counter, time.perf_counter)
+        n_writes times, formatted with (counter, perf_counter)
         """
-        super().__init__(daemon=True)
+        super(LogFileWriter, self).__init__()
+        self.daemon = True
         self.file_name = file_name
         self.text_pattern = text_pattern
         self.n_writes = n_writes
@@ -70,14 +75,14 @@ class LogFileWriter(threading.Thread):
 
                 if self.do_encode:
                     file_handle.write(self.text_pattern
-                                           .format(counter, time.perf_counter())
+                                           .format(counter, perf_counter())
                                            .encode(self.do_encode))
                 else:
                     file_handle.write(self.text_pattern
-                                           .format(counter, time.perf_counter()))
+                                           .format(counter, perf_counter()))
                 print('wrote {0}'.format(counter))
                 counter += 1
-                time.sleep(self.pause_time)
+                sleep(self.pause_time)
 
 
 class FollowTester(unittest.TestCase):
@@ -100,7 +105,7 @@ class FollowTester(unittest.TestCase):
             follower = Follower(read_handle)
             for counter, (source, desc, text, flags) in \
                     enumerate(follower):
-                receive_time = time.perf_counter()
+                receive_time = perf_counter()
                 print('received text: "{0}"'.format(text))
                 index = text.index(':')
                 counter = int(text[:index].strip())
index 7505ad8..642eba7 100644 (file)
@@ -29,6 +29,9 @@ from tempfile import mkstemp
 import os
 import time
 import logging
+from test_helpers import get_perf_counter
+
+perf_counter = get_perf_counter()
 
 from log_read import IterativeReader, LineReader
 
@@ -41,14 +44,15 @@ class LogFileWriter(Thread):
         """ creates thread, deamon is True
 
         if n_writes is None, will write indefinitely; else writes text_pattern
-        n_writes times, formatted with (counter, time.perf_counter)
+        n_writes times, formatted with (counter, perf_counter)
         If do_encode is True, will encode text to bytes and open file handle
         in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text.
         If use_logging is False, will open file and run file_handle.write;
         If use_logging is True, will create logger that logs to file and use
         logging.info (no file_handle.write)
         """
-        super().__init__(daemon=True)
+        super(LogFileWriter, self).__init__()
+        self.daemon = True
         self.file_name = file_name
         self.text_pattern = text_pattern
         self.n_writes = n_writes
@@ -88,7 +92,7 @@ class LogFileWriter(Thread):
             text = self.text_pattern[counter]
         else:
             text = self.text_pattern
-        text = text.format(counter, time.perf_counter())
+        text = text.format(counter, perf_counter())
 
         if self.do_encode:
             text = text.encode(self.do_encode)
@@ -183,7 +187,7 @@ class LogReadTester(unittest.TestCase):
                 reader = IterativeReader(file_handle)
                 self.helper_test_len(reader, 1)
                 for counter, (desc, text) in enumerate(reader):
-                    receive_time = time.perf_counter()
+                    receive_time = perf_counter()
                     text = text.strip()
                     print('{1}: received text "{0}"'.format(text, counter))
                     index = text.index(':')
index 3f37d92..7894521 100644 (file)
@@ -57,7 +57,7 @@ class TestHelperTester(unittest.TestCase):
         return result
 
 
-    def assertWarn(self, warn_type, message, func, *args, **kwargs):
+    def assertWarn(self, warn_type, n_warns, message, func, *args, **kwargs):
         """ check that calling function raises no warning
 
         use warnings.catch_warnings instead of self.assertWarn
@@ -67,7 +67,11 @@ class TestHelperTester(unittest.TestCase):
         with warnings.catch_warnings(record=True) as warn_catcher:
             func(*args, **kwargs)
 
-            self.assertEqual(len(warn_catcher), 1)
+            if n_warns is None:
+                self.assertTrue(len(warn_catcher) > 0)
+            else:
+                self.assertEqual(len(warn_catcher), n_warns)
+
             if warn_type is not None:
                 self.assertTrue(issubclass(warn_catcher[-1].category,
                                            warn_type))
@@ -137,7 +141,7 @@ class TestHelperTester(unittest.TestCase):
         # check warn if time getting low
         estim_empty_smallest = (test_helpers.DEFAULT_TIME_TO_FULL_WARN +
                                 test_helpers.DEFAULT_TIME_TO_FULL_ERR) / 2.
-        self.assertWarn(test_helpers.DiscFullPreventionWarning, None,
+        self.assertWarn(test_helpers.DiscFullPreventionWarning, 1, None,
                         f, state, estim_empty_newest, estim_empty_smallest,
                         estim_empty_mean)
 
@@ -155,7 +159,7 @@ class TestHelperTester(unittest.TestCase):
                                test_helpers.DEFAULT_DISC_SIZE_ERR ) / 2.)
         state.used = state.size - state.available
         state.capacity = int(100. * state.used / state.size)
-        self.assertWarn(test_helpers.DiscFullPreventionWarning, None,
+        self.assertWarn(test_helpers.DiscFullPreventionWarning, 1, None,
                         f, state, estim_empty_newest, estim_empty_smallest,
                         estim_empty_mean)
 
@@ -216,7 +220,7 @@ class TestHelperTester(unittest.TestCase):
         def decision_function(*args):
             test_helpers.default_disc_full_decision_function(
                 *args, size_warn=size_warn, size_err=size_err)
-        self.assertWarn(test_helpers.DiscFullPreventionWarning, None,
+        self.assertWarn(test_helpers.DiscFullPreventionWarning, None, None,
                         self._internal_test_thread, decision_function)
 
         # set space tolerance such that err should be raised