From cd860c27a32f57d37d70fc035312da7f50e4849b Mon Sep 17 00:00:00 2001 From: Christian Herdtweck Date: Thu, 14 Jan 2016 17:54:02 +0100 Subject: [PATCH] renamed unittests from test/module_unittest.py to test/test_module.py --- test/buffer_unittest.py | 95 --------------- test/call_helper_unittest.py | 84 ------------- test/file_helper_unittest.py | 109 ----------------- test/follow_unittest.py | 142 ---------------------- test/log_read_unittest.py | 234 ----------------------------------- test/test_buffer.py | 95 +++++++++++++++ test/test_call_helpers.py | 84 +++++++++++++ test/test_file_helpers.py | 109 +++++++++++++++++ test/test_follow.py | 142 ++++++++++++++++++++++ test/test_helper_unittest.py | 275 ------------------------------------------ test/test_log_read.py | 234 +++++++++++++++++++++++++++++++++++ test/test_test_helper.py | 275 ++++++++++++++++++++++++++++++++++++++++++ test/test_type_helpers.py | 85 +++++++++++++ test/type_helper_unittest.py | 85 ------------- 14 files changed, 1024 insertions(+), 1024 deletions(-) delete mode 100644 test/buffer_unittest.py delete mode 100644 test/call_helper_unittest.py delete mode 100644 test/file_helper_unittest.py delete mode 100644 test/follow_unittest.py delete mode 100644 test/log_read_unittest.py create mode 100644 test/test_buffer.py create mode 100644 test/test_call_helpers.py create mode 100644 test/test_file_helpers.py create mode 100644 test/test_follow.py delete mode 100644 test/test_helper_unittest.py create mode 100644 test/test_log_read.py create mode 100644 test/test_test_helper.py create mode 100644 test/test_type_helpers.py delete mode 100644 test/type_helper_unittest.py diff --git a/test/buffer_unittest.py b/test/buffer_unittest.py deleted file mode 100644 index 96d02c7..0000000 --- a/test/buffer_unittest.py +++ /dev/null @@ -1,95 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" buffer_unittest.py: unit tests for buffers.py - -Tests classes and functions in buffers.py - -Should be able run from python2 and python3! - -For help see :py:mod:`unittest` - -.. codeauthor:: Intra2net -""" - -import unittest -from math import log, floor - -import buffers - - -class CircBufferTester(unittest.TestCase): - """ test class CircularBuffer """ - - def test_illegal_size(self): - for size in (-10, -1, 0): - self.assertRaises(ValueError, buffers.CircularBuffer, size) - - def test_basic(self): - """ tests basic functionality """ - - for size in (1, 2, 4, 8, 16, 32): - buf = buffers.CircularBuffer(size) - self.assertTrue(all(item == None for item in buf.get_all())) - - for elem in range(size*2): - buf.add(elem) - elems = buf.get_all() - start_elem = max(0, elem-size+1) - self.assertEqual(elems[:elem+1], range(start_elem, elem+1)) - self.assertTrue(all(elem == None for elem in elems[elem+1:])) - - -def log2(x): - """ Log2 does not exist before python3.3 """ - return log(x, 2) - - -class LogarithmicBufferTest(unittest.TestCase): - """ test class LogarithmicBuffer """ - - def test_illegal_size(self): - for size in (-10, -1): - self.assertRaises(ValueError, buffers.LogarithmicBuffer, size) - - def test_basic(self): - for size_new in (0, 1, 2, 10): - buf = buffers.LogarithmicBuffer(size_new) - - for elem in range(1000): - buf.add(elem) - - # test size - n_added = elem+1 - n_in_old = n_added - size_new - if n_in_old <= 0: - self.assertEqual(buf.get_size(), size_new) - elif n_in_old == 1: - self.assertEqual(buf.get_size(), size_new + 1) - else: - self.assertEqual(buf.get_size(), - size_new + floor(log2(n_in_old-1))+1) - - # test new contents really is the newest - start = max(0, elem-size_new) - expect_new = range(start, elem+1) - self.assertEqual(buf.get_all()[-len(expect_new):], expect_new) - - -if __name__ == '__main__': - unittest.main() diff --git a/test/call_helper_unittest.py b/test/call_helper_unittest.py deleted file mode 100644 index e21b011..0000000 --- a/test/call_helper_unittest.py +++ /dev/null @@ -1,84 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" call_helper_unittest.py: unit tests for call_helpers - -Should be run from python2 and python3! - -.. codeauthor:: Intra2net -""" - -import unittest -import os - -import call_helpers -from type_helpers import is_unicode - - -class CallHelperTester(unittest.TestCase): - - def test_call_and_capture(self): - """ tests call_and_capture with command ls -a / """ - - cmd = ['ls', '-a', '/'] - return_code, out_data, err_data = call_helpers.call_and_capture(cmd) - self.assertEqual(return_code, 0) - self.assertEqual(err_data, []) - - # get contents of root dir ourselves - true_contents = os.listdir('/') + ['.', '..'] - - # compare to out_data - self.assertEqual(sorted(out_data), sorted(true_contents)) - - # check type - self.assertTrue(all(isinstance(entry, str) for entry in out_data), - "Output is not of type str!") - - def test_call_and_capture_bytes(self): - """ tests call_and_capture with command ls -a / """ - - cmd = ['ls', '-a', '/'] - return_code, out_data, err_data = \ - call_helpers.call_and_capture(cmd, universal_newlines=False) - self.assertEqual(return_code, 0) - self.assertEqual(err_data, []) - - # get contents of root dir ourselves - true_contents = os.listdir(b'/') + [b'.', b'..'] - - # compare to out_data (ignoring order of entries) - self.assertEqual(sorted(out_data), sorted(true_contents)) - - # check type - self.assertFalse(is_unicode(out_data[0]), "Output is unicode!") - - def test_call_and_capture_err(self): - """ tests call_and_capture with command ls -a / """ - - cmd = ['ls', '-e'] - return_code, out_data, err_data = call_helpers.call_and_capture(cmd) - self.assertEqual(return_code, 2) - self.assertEqual(out_data, []) - self.assertEqual(len(err_data), 2) - self.assertEqual(err_data[0], "ls: invalid option -- 'e'") - self.assertEqual(err_data[1], "Try 'ls --help' for more information.") - - -if __name__ == '__main__': - unittest.main() diff --git a/test/file_helper_unittest.py b/test/file_helper_unittest.py deleted file mode 100644 index 7277db4..0000000 --- a/test/file_helper_unittest.py +++ /dev/null @@ -1,109 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" file_helper_unittest.py: unit tests for file_helpers - -Tests classes and functions in file_helpers - -Should be able to run from python2 and python3! - -For help see :py:mod:`unittest` - -.. codeauthor:: Intra2net -""" - -import unittest - -import file_helpers - -from call_helpers import call_and_capture -import os - - -class FileHelperTester(unittest.TestCase): - - def test_cd(self): - """ tests the cd context manager """ - - test_dir = '/' - - start_cwd = os.getcwd() - - with file_helpers.cd(test_dir): - self.assertEqual(os.getcwd(), test_dir) - self.assertEqual(os.getcwd(), start_cwd) - - - def test_obj_str(self): - """ test FilesystemFillState.__str__ """ - - # create dummy state - GIGABYTE = 2**30 - state = file_helpers.FilesystemFillState() - state.name = 'dummy' - state.size = 10 * GIGABYTE - state.used = 9 * GIGABYTE - state.available = 1 * GIGABYTE - state.capacity = 90 - state.mount_point = '/not/mounted' - - expect = '[Filesystem {0} mounted at {1}: {2}% used]' \ - .format(state.name, state.mount_point, state.capacity) - - self.assertEqual(str(state), expect) - - def test_disc_stats(self): - """ tests get_filesystem_fill_states """ - - stats = file_helpers.get_filesystem_fill_states() - - # check number - code, out, err = call_and_capture(file_helpers.DF_CMD) - self.assertEqual(code, 0) - self.assertEqual(len(err), 0) - self.assertEqual(len(out)-1, len(stats)) - - for stat in stats: - # do numbers make sense? - self.assertGreaterEqual(stat.size, 0) - self.assertGreaterEqual(stat.used, 0) - self.assertLessEqual(stat.used, stat.size) - self.assertGreaterEqual(stat.available, 0) - self.assertLessEqual(stat.available, stat.size) - self.assertGreaterEqual(stat.capacity, 0) - self.assertLessEqual(stat.capacity, 100) - - # are strings non-empty - self.assertGreater(len(stat.name), 0) - self.assertGreater(len(stat.mount_point), 0) - - # does match capacity? - capacity = 100. * stat.used / stat.size - self.assertLess(abs(capacity - stat.capacity), 5., - 'capacity deviates from used/size by >5%!') - - # is size approx equal to used + available? - size = stat.used + stat.available - self.assertLess(float(abs(stat.size - size)) - / float(max(stat.size, size)), - 0.10, - 'size deviates from used+free by more than 10%!') - - -if __name__ == '__main__': - unittest.main() diff --git a/test/follow_unittest.py b/test/follow_unittest.py deleted file mode 100644 index 0febfce..0000000 --- a/test/follow_unittest.py +++ /dev/null @@ -1,142 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" Unittest for follow.py - -This is a little more involved since in order to test properly, need a second -thread that writes to file/socket/stdout - -..todo:: as in log_read_unittest: disable buffering in LogFileWriter - -DEPRECATED -(at least for files see log_read; may still be usefull for pipes/sockets) - -.. codeauthor:: Intra2net -""" - - -import unittest -from tempfile import mkstemp -import threading -from follow import * -import time -import os -import os.path - - -class LogFileWriter(threading.Thread): - """ thread that creates and writes to given file handle """ - - def __init__(self, file_name, text_pattern, n_writes=None, - pause_time=0.1, do_encode=None): - """ creates thread, deamon is True - - if n_writes is None, will write indefinitely; else writes text_pattern - n_writes times, formatted with (counter, time.perf_counter) - """ - super().__init__(daemon=True) - self.file_name = file_name - self.text_pattern = text_pattern - self.n_writes = n_writes - self.pause_time = pause_time - self.do_encode = do_encode - - def run(self): - counter = 0 - if self.do_encode: - mode = 'wb' - else: - mode = 'wt' - - with open(self.file_name, mode) as file_handle: - while True: - if self.n_writes is not None and counter >= self.n_writes: - break - - if self.do_encode: - file_handle.write(self.text_pattern - .format(counter, time.perf_counter()) - .encode(self.do_encode)) - else: - file_handle.write(self.text_pattern - .format(counter, time.perf_counter())) - print('wrote {0}'.format(counter)) - counter += 1 - time.sleep(self.pause_time) - - -class FollowTester(unittest.TestCase): - """ Unittest for follow.py """ - - def test_logfile(self): - """ create logfile, write to it, expect output """ - - text_pattern = '{0}:{1}\n' - n_texts = 10 - pause_time = 1 - encoding = 'ascii' - - LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts, - pause_time=pause_time, do_encode=encoding).start() - print('testing with log file {0}'.format(self.temp_file)) - time_diffs = [] - - with open(self.temp_file) as read_handle: - follower = Follower(read_handle) - for counter, (source, desc, text, flags) in \ - enumerate(follower): - receive_time = time.perf_counter() - print('received text: "{0}"'.format(text)) - index = text.index(':') - counter = int(text[:index].strip()) - write_time = float(text[index+1:].strip()) - time_diffs.append(receive_time - write_time) - if counter == n_texts: - break - - - def setUp(self): - """ called before each test """ - print('setup test') - temp_handle, temp_name = mkstemp() - os.close(temp_handle) - self.temp_file = temp_name - print('created temp file ' + self.temp_file) - - def tearDown(self): - """ called after each test """ - print('tear down test') - if os.path.isfile(self.temp_file): - print('delete temp file' + self.temp_file) - os.unlink(self.temp_file) - - @classmethod - def setUpClass(clz): - """ called once before the first test """ - print('setup test class') - clz.temp_file = None - - @classmethod - def tearDownClass(clz): - """ called once after the last test """ - print('tear down test class') - - - -if __name__ == '__main__': - unittest.main() diff --git a/test/log_read_unittest.py b/test/log_read_unittest.py deleted file mode 100644 index 7505ad8..0000000 --- a/test/log_read_unittest.py +++ /dev/null @@ -1,234 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" Unittests for log_read - -Creates own thread to write data to a log file - -.. codeauthor:: Intra2net -""" - -import unittest -from threading import Thread -from tempfile import mkstemp -import os -import time -import logging - -from log_read import IterativeReader, LineReader - - -class LogFileWriter(Thread): - """ thread that creates and writes to given file """ - - def __init__(self, file_name, text_pattern, n_writes=None, - pause_time=0.1, do_encode=None, use_logging=True): - """ creates thread, deamon is True - - if n_writes is None, will write indefinitely; else writes text_pattern - n_writes times, formatted with (counter, time.perf_counter) - If do_encode is True, will encode text to bytes and open file handle - in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text. - If use_logging is False, will open file and run file_handle.write; - If use_logging is True, will create logger that logs to file and use - logging.info (no file_handle.write) - """ - super().__init__(daemon=True) - self.file_name = file_name - self.text_pattern = text_pattern - self.n_writes = n_writes - self.pause_time = pause_time - self.do_encode = do_encode - self.use_logging = use_logging - - def run(self): - counter = 0 - if self.do_encode: - mode = 'wb' - buffering = 0 # no buffering -- only allowed for byte mode - else: - mode = 'wt' - buffering = 1 # line buffering -- only allowed for text mode - - if self.use_logging: - logging.basicConfig(filename=self.file_name, level=logging.INFO, - format='%(msg)s') - while True: - if self.n_writes is not None and counter >= self.n_writes: - break - self.write_and_sleep(logging.info, counter) - counter += 1 - else: - with open(self.file_name, mode=mode, buffering=buffering) \ - as file_handle: - while True: - if self.n_writes is not None and counter >= self.n_writes: - break - self.write_and_sleep(file_handle.write, counter) - counter += 1 - - def write_and_sleep(self, write_func, counter): - """ format text, write it using given function and sleep """ - if isinstance(self.text_pattern, (list, tuple)): - text = self.text_pattern[counter] - else: - text = self.text_pattern - text = text.format(counter, time.perf_counter()) - - if self.do_encode: - text = text.encode(self.do_encode) - write_func(text) - #print('wrote {0}'.format(counter)) - time.sleep(self.pause_time) - - -class LogReadTester(unittest.TestCase): - """ class with all the tests """ - - def setUp(self): - """ called before each test """ - print('setup test') - temp_handle, temp_name = mkstemp() - os.close(temp_handle) - self.temp_file = temp_name - print('created temp file ' + self.temp_file) - - def tearDown(self): - """ called after each test """ - print('tear down test') - if os.path.isfile(self.temp_file): - print('delete temp file' + self.temp_file) - os.unlink(self.temp_file) - - def helper_test_len(self, reader, n_expected): - """ helper function that tests length of vars in reader """ - self.assertEqual(reader.n_sources(), n_expected) - self.assertEqual(len(reader.file_objs), n_expected) - self.assertEqual(len(reader.file_descs), n_expected) - self.assertEqual(len(reader.descriptions), n_expected) - self.assertEqual(len(reader.ignore), n_expected) - self.assertEqual(len(reader.last_sizes), n_expected) - - def test_args(self): - self.assertRaises(TypeError, IterativeReader) # no args - self.assertRaises(ValueError, IterativeReader, [], 'test') - self.assertRaises(ValueError, IterativeReader, [], ['test', ]) - self.assertRaises(ValueError, IterativeReader, self.temp_file) - self.assertRaises(ValueError, IterativeReader, [self.temp_file, ]) - with open(self.temp_file, 'rt') as file_handle: - reader = IterativeReader(file_handle) - self.helper_test_len(reader, 1) - reader = IterativeReader([file_handle, ]) - self.helper_test_len(reader, 1) - reader = IterativeReader(file_handle, 'desc') - self.helper_test_len(reader, 1) - reader = IterativeReader([file_handle, ], ['desc', ]) - self.helper_test_len(reader, 1) - reader = IterativeReader(file_handle, ['desc', ]) - self.helper_test_len(reader, 1) - self.assertRaises(ValueError, IterativeReader, - [file_handle, ], 'desc', ) - reader = IterativeReader([file_handle, file_handle], - ['desc1', 'desc2']) - self.helper_test_len(reader, 2) - reader = IterativeReader((file_handle for idx in range(5))) - self.helper_test_len(reader, 5) - self.assertRaises(ValueError, IterativeReader, - (file_handle for idx in range(5)), - tuple('desc' for idx in range(4))) - self.assertRaises(ValueError, IterativeReader, - (file_handle for idx in range(5)), - ('desc' for idx in range(6))) - - def test_simple_read(self): - """ write fixed number of lines, see how fast they are retrieved """ - - # need newline only when writing text (because of write buffering) - param_combinations = ('{0}:{1}\n', None, False), \ - ('{0}:{1}\n', 'ascii', False), \ - ('{0}:{1} ' , 'ascii', False) - #('{0}:{1}\n', None , True), \ logging seems - #('{0}:{1}\n', 'ascii', True), \ to buffer writes - #('{0}:{1} ' , None , True), \ to files - #('{0}:{1} ' , 'ascii', True) - - n_texts = 10 - pause_time = 0.01 # 100 tps (texts per second) - - for text_pattern, encoding, use_logging in param_combinations: - LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts, - pause_time=pause_time, do_encode=encoding, - use_logging=use_logging).start() - print('testing with log file {0}'.format(self.temp_file)) - print('encoding is {0}, use logging = {1}'.format(encoding, - use_logging)) - time_diffs = [] - - with open(self.temp_file, 'rt') as file_handle: - reader = IterativeReader(file_handle) - self.helper_test_len(reader, 1) - for counter, (desc, text) in enumerate(reader): - receive_time = time.perf_counter() - text = text.strip() - print('{1}: received text "{0}"'.format(text, counter)) - index = text.index(':') - count_text = int(text[:index].strip()) - self.assertEqual(count_text, counter) - write_time = float(text[index+1:].strip()) - time_diffs.append((receive_time - write_time)*1.e6) - if counter == n_texts-1: - print('stop since have {0} reads'.format(counter)) - break - print('time diffs in us: {0}'.format(time_diffs)) - self.assertTrue(max(time_diffs) < 1000., 'read took more than 1ms!') - - def test_line_read(self): - """ write partial lines, full lines and multiple lines """ - - pause_time = 0.01 # 100 tps (texts per second) - encoding = None - use_logging = False - texts = ['line0\n', 'line1\n', 'li', 'ne2\n', 'line3\n', - 'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e', - '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n', - 'end\n', '\nend\n', '\n\nend\n\n'] - lines_expected = ['line{0}'.format(idx) for idx in range(12)] \ - + ['', '', ''] - - # create writer - LogFileWriter(self.temp_file, texts, n_writes=len(texts), - pause_time=pause_time, do_encode=encoding, - use_logging=use_logging).start() - - # read - lines_read = [] - with open(self.temp_file, 'rt') as file_handle: - reader = LineReader(file_handle) - self.helper_test_len(reader, 1) - - for line_expected, (_, line_read) in zip(lines_expected, reader): - if 'end' in line_read: - break - else: - print('expect "{0}", read "{1}"'.format(line_expected, - line_read)) - self.assertEqual(line_expected, line_read) - - -if __name__ == '__main__': - unittest.main() diff --git a/test/test_buffer.py b/test/test_buffer.py new file mode 100644 index 0000000..96d02c7 --- /dev/null +++ b/test/test_buffer.py @@ -0,0 +1,95 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" buffer_unittest.py: unit tests for buffers.py + +Tests classes and functions in buffers.py + +Should be able run from python2 and python3! + +For help see :py:mod:`unittest` + +.. codeauthor:: Intra2net +""" + +import unittest +from math import log, floor + +import buffers + + +class CircBufferTester(unittest.TestCase): + """ test class CircularBuffer """ + + def test_illegal_size(self): + for size in (-10, -1, 0): + self.assertRaises(ValueError, buffers.CircularBuffer, size) + + def test_basic(self): + """ tests basic functionality """ + + for size in (1, 2, 4, 8, 16, 32): + buf = buffers.CircularBuffer(size) + self.assertTrue(all(item == None for item in buf.get_all())) + + for elem in range(size*2): + buf.add(elem) + elems = buf.get_all() + start_elem = max(0, elem-size+1) + self.assertEqual(elems[:elem+1], range(start_elem, elem+1)) + self.assertTrue(all(elem == None for elem in elems[elem+1:])) + + +def log2(x): + """ Log2 does not exist before python3.3 """ + return log(x, 2) + + +class LogarithmicBufferTest(unittest.TestCase): + """ test class LogarithmicBuffer """ + + def test_illegal_size(self): + for size in (-10, -1): + self.assertRaises(ValueError, buffers.LogarithmicBuffer, size) + + def test_basic(self): + for size_new in (0, 1, 2, 10): + buf = buffers.LogarithmicBuffer(size_new) + + for elem in range(1000): + buf.add(elem) + + # test size + n_added = elem+1 + n_in_old = n_added - size_new + if n_in_old <= 0: + self.assertEqual(buf.get_size(), size_new) + elif n_in_old == 1: + self.assertEqual(buf.get_size(), size_new + 1) + else: + self.assertEqual(buf.get_size(), + size_new + floor(log2(n_in_old-1))+1) + + # test new contents really is the newest + start = max(0, elem-size_new) + expect_new = range(start, elem+1) + self.assertEqual(buf.get_all()[-len(expect_new):], expect_new) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_call_helpers.py b/test/test_call_helpers.py new file mode 100644 index 0000000..e21b011 --- /dev/null +++ b/test/test_call_helpers.py @@ -0,0 +1,84 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" call_helper_unittest.py: unit tests for call_helpers + +Should be run from python2 and python3! + +.. codeauthor:: Intra2net +""" + +import unittest +import os + +import call_helpers +from type_helpers import is_unicode + + +class CallHelperTester(unittest.TestCase): + + def test_call_and_capture(self): + """ tests call_and_capture with command ls -a / """ + + cmd = ['ls', '-a', '/'] + return_code, out_data, err_data = call_helpers.call_and_capture(cmd) + self.assertEqual(return_code, 0) + self.assertEqual(err_data, []) + + # get contents of root dir ourselves + true_contents = os.listdir('/') + ['.', '..'] + + # compare to out_data + self.assertEqual(sorted(out_data), sorted(true_contents)) + + # check type + self.assertTrue(all(isinstance(entry, str) for entry in out_data), + "Output is not of type str!") + + def test_call_and_capture_bytes(self): + """ tests call_and_capture with command ls -a / """ + + cmd = ['ls', '-a', '/'] + return_code, out_data, err_data = \ + call_helpers.call_and_capture(cmd, universal_newlines=False) + self.assertEqual(return_code, 0) + self.assertEqual(err_data, []) + + # get contents of root dir ourselves + true_contents = os.listdir(b'/') + [b'.', b'..'] + + # compare to out_data (ignoring order of entries) + self.assertEqual(sorted(out_data), sorted(true_contents)) + + # check type + self.assertFalse(is_unicode(out_data[0]), "Output is unicode!") + + def test_call_and_capture_err(self): + """ tests call_and_capture with command ls -a / """ + + cmd = ['ls', '-e'] + return_code, out_data, err_data = call_helpers.call_and_capture(cmd) + self.assertEqual(return_code, 2) + self.assertEqual(out_data, []) + self.assertEqual(len(err_data), 2) + self.assertEqual(err_data[0], "ls: invalid option -- 'e'") + self.assertEqual(err_data[1], "Try 'ls --help' for more information.") + + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_file_helpers.py b/test/test_file_helpers.py new file mode 100644 index 0000000..7277db4 --- /dev/null +++ b/test/test_file_helpers.py @@ -0,0 +1,109 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" file_helper_unittest.py: unit tests for file_helpers + +Tests classes and functions in file_helpers + +Should be able to run from python2 and python3! + +For help see :py:mod:`unittest` + +.. codeauthor:: Intra2net +""" + +import unittest + +import file_helpers + +from call_helpers import call_and_capture +import os + + +class FileHelperTester(unittest.TestCase): + + def test_cd(self): + """ tests the cd context manager """ + + test_dir = '/' + + start_cwd = os.getcwd() + + with file_helpers.cd(test_dir): + self.assertEqual(os.getcwd(), test_dir) + self.assertEqual(os.getcwd(), start_cwd) + + + def test_obj_str(self): + """ test FilesystemFillState.__str__ """ + + # create dummy state + GIGABYTE = 2**30 + state = file_helpers.FilesystemFillState() + state.name = 'dummy' + state.size = 10 * GIGABYTE + state.used = 9 * GIGABYTE + state.available = 1 * GIGABYTE + state.capacity = 90 + state.mount_point = '/not/mounted' + + expect = '[Filesystem {0} mounted at {1}: {2}% used]' \ + .format(state.name, state.mount_point, state.capacity) + + self.assertEqual(str(state), expect) + + def test_disc_stats(self): + """ tests get_filesystem_fill_states """ + + stats = file_helpers.get_filesystem_fill_states() + + # check number + code, out, err = call_and_capture(file_helpers.DF_CMD) + self.assertEqual(code, 0) + self.assertEqual(len(err), 0) + self.assertEqual(len(out)-1, len(stats)) + + for stat in stats: + # do numbers make sense? + self.assertGreaterEqual(stat.size, 0) + self.assertGreaterEqual(stat.used, 0) + self.assertLessEqual(stat.used, stat.size) + self.assertGreaterEqual(stat.available, 0) + self.assertLessEqual(stat.available, stat.size) + self.assertGreaterEqual(stat.capacity, 0) + self.assertLessEqual(stat.capacity, 100) + + # are strings non-empty + self.assertGreater(len(stat.name), 0) + self.assertGreater(len(stat.mount_point), 0) + + # does match capacity? + capacity = 100. * stat.used / stat.size + self.assertLess(abs(capacity - stat.capacity), 5., + 'capacity deviates from used/size by >5%!') + + # is size approx equal to used + available? + size = stat.used + stat.available + self.assertLess(float(abs(stat.size - size)) + / float(max(stat.size, size)), + 0.10, + 'size deviates from used+free by more than 10%!') + + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_follow.py b/test/test_follow.py new file mode 100644 index 0000000..0febfce --- /dev/null +++ b/test/test_follow.py @@ -0,0 +1,142 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" Unittest for follow.py + +This is a little more involved since in order to test properly, need a second +thread that writes to file/socket/stdout + +..todo:: as in log_read_unittest: disable buffering in LogFileWriter + +DEPRECATED +(at least for files see log_read; may still be usefull for pipes/sockets) + +.. codeauthor:: Intra2net +""" + + +import unittest +from tempfile import mkstemp +import threading +from follow import * +import time +import os +import os.path + + +class LogFileWriter(threading.Thread): + """ thread that creates and writes to given file handle """ + + def __init__(self, file_name, text_pattern, n_writes=None, + pause_time=0.1, do_encode=None): + """ creates thread, deamon is True + + if n_writes is None, will write indefinitely; else writes text_pattern + n_writes times, formatted with (counter, time.perf_counter) + """ + super().__init__(daemon=True) + self.file_name = file_name + self.text_pattern = text_pattern + self.n_writes = n_writes + self.pause_time = pause_time + self.do_encode = do_encode + + def run(self): + counter = 0 + if self.do_encode: + mode = 'wb' + else: + mode = 'wt' + + with open(self.file_name, mode) as file_handle: + while True: + if self.n_writes is not None and counter >= self.n_writes: + break + + if self.do_encode: + file_handle.write(self.text_pattern + .format(counter, time.perf_counter()) + .encode(self.do_encode)) + else: + file_handle.write(self.text_pattern + .format(counter, time.perf_counter())) + print('wrote {0}'.format(counter)) + counter += 1 + time.sleep(self.pause_time) + + +class FollowTester(unittest.TestCase): + """ Unittest for follow.py """ + + def test_logfile(self): + """ create logfile, write to it, expect output """ + + text_pattern = '{0}:{1}\n' + n_texts = 10 + pause_time = 1 + encoding = 'ascii' + + LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts, + pause_time=pause_time, do_encode=encoding).start() + print('testing with log file {0}'.format(self.temp_file)) + time_diffs = [] + + with open(self.temp_file) as read_handle: + follower = Follower(read_handle) + for counter, (source, desc, text, flags) in \ + enumerate(follower): + receive_time = time.perf_counter() + print('received text: "{0}"'.format(text)) + index = text.index(':') + counter = int(text[:index].strip()) + write_time = float(text[index+1:].strip()) + time_diffs.append(receive_time - write_time) + if counter == n_texts: + break + + + def setUp(self): + """ called before each test """ + print('setup test') + temp_handle, temp_name = mkstemp() + os.close(temp_handle) + self.temp_file = temp_name + print('created temp file ' + self.temp_file) + + def tearDown(self): + """ called after each test """ + print('tear down test') + if os.path.isfile(self.temp_file): + print('delete temp file' + self.temp_file) + os.unlink(self.temp_file) + + @classmethod + def setUpClass(clz): + """ called once before the first test """ + print('setup test class') + clz.temp_file = None + + @classmethod + def tearDownClass(clz): + """ called once after the last test """ + print('tear down test class') + + + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_helper_unittest.py b/test/test_helper_unittest.py deleted file mode 100644 index 3f37d92..0000000 --- a/test/test_helper_unittest.py +++ /dev/null @@ -1,275 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" test_helper_unittest.py: unit tests for test_helpers - -Tests classes and functions in test_helpers - -Should be able run from python2 and python3! - -For help see :py:mod:`unittest` - -.. codeauthor:: Intra2net -""" - -from __future__ import print_function - -import unittest -from datetime import timedelta, datetime as dt -from file_helpers import FilesystemFillState, get_filesystem_fill_states -import warnings -from time import sleep -import threading - -import test_helpers - - -class TestHelperTester(unittest.TestCase): - - def assertNoWarn(self, func, *args, **kwargs): - """ check that calling function raises no warning - - use warnings.catch_warnings instead of self.assertWarn - for compatibility with python 2 - """ - - result = None - with warnings.catch_warnings(record=True) as warn_catcher: - result = func(*args, **kwargs) - - self.assertEqual(len(warn_catcher), 0) - - return result - - - def assertWarn(self, warn_type, message, func, *args, **kwargs): - """ check that calling function raises no warning - - use warnings.catch_warnings instead of self.assertWarn - for compatibility with python 2 - """ - - with warnings.catch_warnings(record=True) as warn_catcher: - func(*args, **kwargs) - - self.assertEqual(len(warn_catcher), 1) - if warn_type is not None: - self.assertTrue(issubclass(warn_catcher[-1].category, - warn_type)) - if message is not None: - self.assertEqual(warn_catcher[-1].message, message) - #print(warn_catcher[-1].message) - - - def test_zero_estimator(self): - """ test function :py:func:`test_helpers.calc_time_until_zero` """ - - # need to allow some tolerance since calc_time_until_zero uses now() - TOLERANCE = 0.1 - - start_time = dt.now() - time1 = start_time - timedelta(seconds=20) # 20s ago - time2 = start_time - timedelta(seconds=10) # 10s ago - - # test constant disc usage - self.assertEqual(test_helpers.calc_time_until_zero((time1, 1), - (time2, 1)), None) - - # test gaining disc space (1 byte per second, empty 30s ago) - estim = test_helpers.calc_time_until_zero((time1, 10), (time2, 20)) - self.assertTrue(estim < 0) - expect = ((time1-timedelta(seconds=10)) - dt.now()).total_seconds() - self.assertLess(abs(estim - expect), TOLERANCE) - - # test slowly losing disc space (1 byte per second, empty now) - estim = test_helpers.calc_time_until_zero((time1, 20), (time2, 10)) - expect = (start_time - dt.now()).total_seconds() - self.assertLess(abs(estim - expect), TOLERANCE) - - - def test_decision_function(self): - """ tests function default_disc_full_decision_function - - :py:func:`test_helpers.default_disc_full_decision_function` - """ - - f = test_helpers.default_disc_full_decision_function - - # create dummy state - GIGABYTE = 2**30 - state = FilesystemFillState() - state.name = 'dummy' - state.size = 10 * GIGABYTE - state.used = 9 * GIGABYTE - state.available = 1 * GIGABYTE - state.capacity = 90 - state.mount_point = '/not/mounted' - - HOUR = 60*60 - estim_empty_newest = None # currently not used - estim_empty_mean = None # currently not used - estim_empty_smallest = None # simulate no proper estimate - - # check no warning if no time estimate (and enough space) - self.assertNoWarn(f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean) - - # check no warning if enough time (and enough space) - estim_empty_smallest = HOUR # enough time - self.assertNoWarn(f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean) - - # check warn if time getting low - estim_empty_smallest = (test_helpers.DEFAULT_TIME_TO_FULL_WARN + - test_helpers.DEFAULT_TIME_TO_FULL_ERR) / 2. - self.assertWarn(test_helpers.DiscFullPreventionWarning, None, - f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean) - - # check err if time too low - estim_empty_smallest = test_helpers.DEFAULT_TIME_TO_FULL_ERR / 2. - self.assertRaises(KeyboardInterrupt, - f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean) - # show the error message: - #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean) - - # check warning if disc space is getting low - estim_empty_smallest = HOUR - state.available = int((test_helpers.DEFAULT_DISC_SIZE_WARN + - test_helpers.DEFAULT_DISC_SIZE_ERR ) / 2.) - state.used = state.size - state.available - state.capacity = int(100. * state.used / state.size) - self.assertWarn(test_helpers.DiscFullPreventionWarning, None, - f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean) - - # check error if disc space is too low - state.available = int(test_helpers.DEFAULT_DISC_SIZE_ERR / 2.) - state.used = state.size - state.available - state.capacity = int(100. * state.used / state.size) - self.assertRaises(KeyboardInterrupt, - f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean) - - # check other kill levels - self.assertRaises(test_helpers.DiscFullPreventionError, - f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean, - kill_level=test_helpers.KILL_EXCEPTION) - - self.assertRaises(SystemExit, - f, state, estim_empty_newest, estim_empty_smallest, - estim_empty_mean, - kill_level=test_helpers.KILL_SYS_EXIT) - - # this even kills unittest: - #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean, - # kill_level=test_helpers.KILL_BRUTALLY_ONLY_LAST_RESORT) - #self.fail('should have exited!') - - def test_thread(self): - """ test the DiscFillCheckerThread """ - - #threading.setprofile(self.threading_profile_func) - # somehow damages df output ... need to check! - - # since we do not want to fill up the disc, we incrementally decrease - # the tolerance so even our hopefully half-empty filesystems will - # trigger warnings / errors - min_free = None - min_name = None - for fill_state in get_filesystem_fill_states(): - if fill_state.name in test_helpers.NOT_REAL_FILESYSTEMS: - continue - if (min_free is None) or (fill_state.available < min_free): - min_free = fill_state.available - min_name = fill_state.name - print('min free: {0} with {1}B'.format(min_name, min_free)) - - # set space tolerance such that no warn/err should show - size_warn = min_free/4; - size_err = min_free/2; - def decision_function(*args): - test_helpers.default_disc_full_decision_function( - *args, size_warn=size_warn, size_err=size_err) - self._internal_test_thread(decision_function) - - # set space tolerance such that show warn but no err - size_warn = min_free*2; - size_err = min_free/2; - def decision_function(*args): - test_helpers.default_disc_full_decision_function( - *args, size_warn=size_warn, size_err=size_err) - self.assertWarn(test_helpers.DiscFullPreventionWarning, None, - self._internal_test_thread, decision_function) - - # set space tolerance such that err should be raised - size_warn = min_free*4; - size_err = min_free*2; - def decision_function(*args): - test_helpers.default_disc_full_decision_function( - *args, size_warn=size_warn, size_err=size_err) - self.assertRaises(KeyboardInterrupt, - self._internal_test_thread, decision_function) - - def _internal_test_thread(self, decision_function): - - sleep_time_checker = 0.1 - sleep_time_tester = 1 - time_tolerance = 0.1 - main_thread_name = 'MainThread' - checker_thread_name = 'discFillChck' - - curr_threads = [thread.name for thread in threading.enumerate()] - self.assertEqual(curr_threads, [main_thread_name]) - start_time = dt.now() - with test_helpers.disc_fill_checked(test_helpers.METHOD_THREAD, - sleep_time_checker, - decision_function) as checker: - curr_threads = [thread.name for thread in threading.enumerate()] - self.nonsense_function(sleep_time_tester) - time_taken = (dt.now() - start_time).total_seconds() - self.assertEqual(curr_threads, [main_thread_name, checker_thread_name]) - - # stop checker thread - checker.stop() - checker.join() - curr_threads = [thread.name for thread in threading.enumerate()] - self.assertEqual(curr_threads, [main_thread_name]) - - self.assertTrue(time_taken >= sleep_time_tester) - self.assertTrue(time_taken < sleep_time_tester + time_tolerance) - - - def nonsense_function(self, sleep_time): - """ function to run while checking disc fill state; just sleeps - - to make this test quicker could try to use profiling to count calls to - disc checker function and stop this after 2 or so - """ - sleep(sleep_time) - - def threading_profile_func(self, *args, **kwargs): - """ todo: set this as profiling function for checker thread, count - calls to disc checker function, trigger event or so """ - print('profiling with args {0} and kwargs {1}' - .format([arg for arg in args if len(args) < 20], kwargs)) - -if __name__ == '__main__': - unittest.main() diff --git a/test/test_log_read.py b/test/test_log_read.py new file mode 100644 index 0000000..7505ad8 --- /dev/null +++ b/test/test_log_read.py @@ -0,0 +1,234 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" Unittests for log_read + +Creates own thread to write data to a log file + +.. codeauthor:: Intra2net +""" + +import unittest +from threading import Thread +from tempfile import mkstemp +import os +import time +import logging + +from log_read import IterativeReader, LineReader + + +class LogFileWriter(Thread): + """ thread that creates and writes to given file """ + + def __init__(self, file_name, text_pattern, n_writes=None, + pause_time=0.1, do_encode=None, use_logging=True): + """ creates thread, deamon is True + + if n_writes is None, will write indefinitely; else writes text_pattern + n_writes times, formatted with (counter, time.perf_counter) + If do_encode is True, will encode text to bytes and open file handle + in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text. + If use_logging is False, will open file and run file_handle.write; + If use_logging is True, will create logger that logs to file and use + logging.info (no file_handle.write) + """ + super().__init__(daemon=True) + self.file_name = file_name + self.text_pattern = text_pattern + self.n_writes = n_writes + self.pause_time = pause_time + self.do_encode = do_encode + self.use_logging = use_logging + + def run(self): + counter = 0 + if self.do_encode: + mode = 'wb' + buffering = 0 # no buffering -- only allowed for byte mode + else: + mode = 'wt' + buffering = 1 # line buffering -- only allowed for text mode + + if self.use_logging: + logging.basicConfig(filename=self.file_name, level=logging.INFO, + format='%(msg)s') + while True: + if self.n_writes is not None and counter >= self.n_writes: + break + self.write_and_sleep(logging.info, counter) + counter += 1 + else: + with open(self.file_name, mode=mode, buffering=buffering) \ + as file_handle: + while True: + if self.n_writes is not None and counter >= self.n_writes: + break + self.write_and_sleep(file_handle.write, counter) + counter += 1 + + def write_and_sleep(self, write_func, counter): + """ format text, write it using given function and sleep """ + if isinstance(self.text_pattern, (list, tuple)): + text = self.text_pattern[counter] + else: + text = self.text_pattern + text = text.format(counter, time.perf_counter()) + + if self.do_encode: + text = text.encode(self.do_encode) + write_func(text) + #print('wrote {0}'.format(counter)) + time.sleep(self.pause_time) + + +class LogReadTester(unittest.TestCase): + """ class with all the tests """ + + def setUp(self): + """ called before each test """ + print('setup test') + temp_handle, temp_name = mkstemp() + os.close(temp_handle) + self.temp_file = temp_name + print('created temp file ' + self.temp_file) + + def tearDown(self): + """ called after each test """ + print('tear down test') + if os.path.isfile(self.temp_file): + print('delete temp file' + self.temp_file) + os.unlink(self.temp_file) + + def helper_test_len(self, reader, n_expected): + """ helper function that tests length of vars in reader """ + self.assertEqual(reader.n_sources(), n_expected) + self.assertEqual(len(reader.file_objs), n_expected) + self.assertEqual(len(reader.file_descs), n_expected) + self.assertEqual(len(reader.descriptions), n_expected) + self.assertEqual(len(reader.ignore), n_expected) + self.assertEqual(len(reader.last_sizes), n_expected) + + def test_args(self): + self.assertRaises(TypeError, IterativeReader) # no args + self.assertRaises(ValueError, IterativeReader, [], 'test') + self.assertRaises(ValueError, IterativeReader, [], ['test', ]) + self.assertRaises(ValueError, IterativeReader, self.temp_file) + self.assertRaises(ValueError, IterativeReader, [self.temp_file, ]) + with open(self.temp_file, 'rt') as file_handle: + reader = IterativeReader(file_handle) + self.helper_test_len(reader, 1) + reader = IterativeReader([file_handle, ]) + self.helper_test_len(reader, 1) + reader = IterativeReader(file_handle, 'desc') + self.helper_test_len(reader, 1) + reader = IterativeReader([file_handle, ], ['desc', ]) + self.helper_test_len(reader, 1) + reader = IterativeReader(file_handle, ['desc', ]) + self.helper_test_len(reader, 1) + self.assertRaises(ValueError, IterativeReader, + [file_handle, ], 'desc', ) + reader = IterativeReader([file_handle, file_handle], + ['desc1', 'desc2']) + self.helper_test_len(reader, 2) + reader = IterativeReader((file_handle for idx in range(5))) + self.helper_test_len(reader, 5) + self.assertRaises(ValueError, IterativeReader, + (file_handle for idx in range(5)), + tuple('desc' for idx in range(4))) + self.assertRaises(ValueError, IterativeReader, + (file_handle for idx in range(5)), + ('desc' for idx in range(6))) + + def test_simple_read(self): + """ write fixed number of lines, see how fast they are retrieved """ + + # need newline only when writing text (because of write buffering) + param_combinations = ('{0}:{1}\n', None, False), \ + ('{0}:{1}\n', 'ascii', False), \ + ('{0}:{1} ' , 'ascii', False) + #('{0}:{1}\n', None , True), \ logging seems + #('{0}:{1}\n', 'ascii', True), \ to buffer writes + #('{0}:{1} ' , None , True), \ to files + #('{0}:{1} ' , 'ascii', True) + + n_texts = 10 + pause_time = 0.01 # 100 tps (texts per second) + + for text_pattern, encoding, use_logging in param_combinations: + LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts, + pause_time=pause_time, do_encode=encoding, + use_logging=use_logging).start() + print('testing with log file {0}'.format(self.temp_file)) + print('encoding is {0}, use logging = {1}'.format(encoding, + use_logging)) + time_diffs = [] + + with open(self.temp_file, 'rt') as file_handle: + reader = IterativeReader(file_handle) + self.helper_test_len(reader, 1) + for counter, (desc, text) in enumerate(reader): + receive_time = time.perf_counter() + text = text.strip() + print('{1}: received text "{0}"'.format(text, counter)) + index = text.index(':') + count_text = int(text[:index].strip()) + self.assertEqual(count_text, counter) + write_time = float(text[index+1:].strip()) + time_diffs.append((receive_time - write_time)*1.e6) + if counter == n_texts-1: + print('stop since have {0} reads'.format(counter)) + break + print('time diffs in us: {0}'.format(time_diffs)) + self.assertTrue(max(time_diffs) < 1000., 'read took more than 1ms!') + + def test_line_read(self): + """ write partial lines, full lines and multiple lines """ + + pause_time = 0.01 # 100 tps (texts per second) + encoding = None + use_logging = False + texts = ['line0\n', 'line1\n', 'li', 'ne2\n', 'line3\n', + 'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e', + '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n', + 'end\n', '\nend\n', '\n\nend\n\n'] + lines_expected = ['line{0}'.format(idx) for idx in range(12)] \ + + ['', '', ''] + + # create writer + LogFileWriter(self.temp_file, texts, n_writes=len(texts), + pause_time=pause_time, do_encode=encoding, + use_logging=use_logging).start() + + # read + lines_read = [] + with open(self.temp_file, 'rt') as file_handle: + reader = LineReader(file_handle) + self.helper_test_len(reader, 1) + + for line_expected, (_, line_read) in zip(lines_expected, reader): + if 'end' in line_read: + break + else: + print('expect "{0}", read "{1}"'.format(line_expected, + line_read)) + self.assertEqual(line_expected, line_read) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_test_helper.py b/test/test_test_helper.py new file mode 100644 index 0000000..3f37d92 --- /dev/null +++ b/test/test_test_helper.py @@ -0,0 +1,275 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" test_helper_unittest.py: unit tests for test_helpers + +Tests classes and functions in test_helpers + +Should be able run from python2 and python3! + +For help see :py:mod:`unittest` + +.. codeauthor:: Intra2net +""" + +from __future__ import print_function + +import unittest +from datetime import timedelta, datetime as dt +from file_helpers import FilesystemFillState, get_filesystem_fill_states +import warnings +from time import sleep +import threading + +import test_helpers + + +class TestHelperTester(unittest.TestCase): + + def assertNoWarn(self, func, *args, **kwargs): + """ check that calling function raises no warning + + use warnings.catch_warnings instead of self.assertWarn + for compatibility with python 2 + """ + + result = None + with warnings.catch_warnings(record=True) as warn_catcher: + result = func(*args, **kwargs) + + self.assertEqual(len(warn_catcher), 0) + + return result + + + def assertWarn(self, warn_type, message, func, *args, **kwargs): + """ check that calling function raises no warning + + use warnings.catch_warnings instead of self.assertWarn + for compatibility with python 2 + """ + + with warnings.catch_warnings(record=True) as warn_catcher: + func(*args, **kwargs) + + self.assertEqual(len(warn_catcher), 1) + if warn_type is not None: + self.assertTrue(issubclass(warn_catcher[-1].category, + warn_type)) + if message is not None: + self.assertEqual(warn_catcher[-1].message, message) + #print(warn_catcher[-1].message) + + + def test_zero_estimator(self): + """ test function :py:func:`test_helpers.calc_time_until_zero` """ + + # need to allow some tolerance since calc_time_until_zero uses now() + TOLERANCE = 0.1 + + start_time = dt.now() + time1 = start_time - timedelta(seconds=20) # 20s ago + time2 = start_time - timedelta(seconds=10) # 10s ago + + # test constant disc usage + self.assertEqual(test_helpers.calc_time_until_zero((time1, 1), + (time2, 1)), None) + + # test gaining disc space (1 byte per second, empty 30s ago) + estim = test_helpers.calc_time_until_zero((time1, 10), (time2, 20)) + self.assertTrue(estim < 0) + expect = ((time1-timedelta(seconds=10)) - dt.now()).total_seconds() + self.assertLess(abs(estim - expect), TOLERANCE) + + # test slowly losing disc space (1 byte per second, empty now) + estim = test_helpers.calc_time_until_zero((time1, 20), (time2, 10)) + expect = (start_time - dt.now()).total_seconds() + self.assertLess(abs(estim - expect), TOLERANCE) + + + def test_decision_function(self): + """ tests function default_disc_full_decision_function + + :py:func:`test_helpers.default_disc_full_decision_function` + """ + + f = test_helpers.default_disc_full_decision_function + + # create dummy state + GIGABYTE = 2**30 + state = FilesystemFillState() + state.name = 'dummy' + state.size = 10 * GIGABYTE + state.used = 9 * GIGABYTE + state.available = 1 * GIGABYTE + state.capacity = 90 + state.mount_point = '/not/mounted' + + HOUR = 60*60 + estim_empty_newest = None # currently not used + estim_empty_mean = None # currently not used + estim_empty_smallest = None # simulate no proper estimate + + # check no warning if no time estimate (and enough space) + self.assertNoWarn(f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean) + + # check no warning if enough time (and enough space) + estim_empty_smallest = HOUR # enough time + self.assertNoWarn(f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean) + + # check warn if time getting low + estim_empty_smallest = (test_helpers.DEFAULT_TIME_TO_FULL_WARN + + test_helpers.DEFAULT_TIME_TO_FULL_ERR) / 2. + self.assertWarn(test_helpers.DiscFullPreventionWarning, None, + f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean) + + # check err if time too low + estim_empty_smallest = test_helpers.DEFAULT_TIME_TO_FULL_ERR / 2. + self.assertRaises(KeyboardInterrupt, + f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean) + # show the error message: + #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean) + + # check warning if disc space is getting low + estim_empty_smallest = HOUR + state.available = int((test_helpers.DEFAULT_DISC_SIZE_WARN + + test_helpers.DEFAULT_DISC_SIZE_ERR ) / 2.) + state.used = state.size - state.available + state.capacity = int(100. * state.used / state.size) + self.assertWarn(test_helpers.DiscFullPreventionWarning, None, + f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean) + + # check error if disc space is too low + state.available = int(test_helpers.DEFAULT_DISC_SIZE_ERR / 2.) + state.used = state.size - state.available + state.capacity = int(100. * state.used / state.size) + self.assertRaises(KeyboardInterrupt, + f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean) + + # check other kill levels + self.assertRaises(test_helpers.DiscFullPreventionError, + f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean, + kill_level=test_helpers.KILL_EXCEPTION) + + self.assertRaises(SystemExit, + f, state, estim_empty_newest, estim_empty_smallest, + estim_empty_mean, + kill_level=test_helpers.KILL_SYS_EXIT) + + # this even kills unittest: + #f(state, estim_empty_newest, estim_empty_smallest, estim_empty_mean, + # kill_level=test_helpers.KILL_BRUTALLY_ONLY_LAST_RESORT) + #self.fail('should have exited!') + + def test_thread(self): + """ test the DiscFillCheckerThread """ + + #threading.setprofile(self.threading_profile_func) + # somehow damages df output ... need to check! + + # since we do not want to fill up the disc, we incrementally decrease + # the tolerance so even our hopefully half-empty filesystems will + # trigger warnings / errors + min_free = None + min_name = None + for fill_state in get_filesystem_fill_states(): + if fill_state.name in test_helpers.NOT_REAL_FILESYSTEMS: + continue + if (min_free is None) or (fill_state.available < min_free): + min_free = fill_state.available + min_name = fill_state.name + print('min free: {0} with {1}B'.format(min_name, min_free)) + + # set space tolerance such that no warn/err should show + size_warn = min_free/4; + size_err = min_free/2; + def decision_function(*args): + test_helpers.default_disc_full_decision_function( + *args, size_warn=size_warn, size_err=size_err) + self._internal_test_thread(decision_function) + + # set space tolerance such that show warn but no err + size_warn = min_free*2; + size_err = min_free/2; + def decision_function(*args): + test_helpers.default_disc_full_decision_function( + *args, size_warn=size_warn, size_err=size_err) + self.assertWarn(test_helpers.DiscFullPreventionWarning, None, + self._internal_test_thread, decision_function) + + # set space tolerance such that err should be raised + size_warn = min_free*4; + size_err = min_free*2; + def decision_function(*args): + test_helpers.default_disc_full_decision_function( + *args, size_warn=size_warn, size_err=size_err) + self.assertRaises(KeyboardInterrupt, + self._internal_test_thread, decision_function) + + def _internal_test_thread(self, decision_function): + + sleep_time_checker = 0.1 + sleep_time_tester = 1 + time_tolerance = 0.1 + main_thread_name = 'MainThread' + checker_thread_name = 'discFillChck' + + curr_threads = [thread.name for thread in threading.enumerate()] + self.assertEqual(curr_threads, [main_thread_name]) + start_time = dt.now() + with test_helpers.disc_fill_checked(test_helpers.METHOD_THREAD, + sleep_time_checker, + decision_function) as checker: + curr_threads = [thread.name for thread in threading.enumerate()] + self.nonsense_function(sleep_time_tester) + time_taken = (dt.now() - start_time).total_seconds() + self.assertEqual(curr_threads, [main_thread_name, checker_thread_name]) + + # stop checker thread + checker.stop() + checker.join() + curr_threads = [thread.name for thread in threading.enumerate()] + self.assertEqual(curr_threads, [main_thread_name]) + + self.assertTrue(time_taken >= sleep_time_tester) + self.assertTrue(time_taken < sleep_time_tester + time_tolerance) + + + def nonsense_function(self, sleep_time): + """ function to run while checking disc fill state; just sleeps + + to make this test quicker could try to use profiling to count calls to + disc checker function and stop this after 2 or so + """ + sleep(sleep_time) + + def threading_profile_func(self, *args, **kwargs): + """ todo: set this as profiling function for checker thread, count + calls to disc checker function, trigger event or so """ + print('profiling with args {0} and kwargs {1}' + .format([arg for arg in args if len(args) < 20], kwargs)) + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_type_helpers.py b/test/test_type_helpers.py new file mode 100644 index 0000000..82ccfed --- /dev/null +++ b/test/test_type_helpers.py @@ -0,0 +1,85 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" type_helper_unittest.py: unit tests for type_helpers + +Tests classes and functions in type_helpers + +Should be run from python2 and python3! + +For help see :py:mod:`unittest` + +.. codeauthor:: Intra2net +""" + +import unittest + +from type_helpers import is_unicode, isstr +from sys import version_info + + +is_py2 = version_info.major == 2 + +class TypeHelperTester(unittest.TestCase): + + def test_is_py2_or_py3(self): + """ test that python version is 2 or 3 + + when implementing type_helpers, there was no py4 and no idea what it + might be like. Also will probably fail for python v1 + """ + self.assertIn(version_info.major, (2, 3)) + + def test_is_unicode(self): + """ tests function is_unicode """ + + self.assertFalse(is_unicode(1)) + self.assertFalse(is_unicode([])) + self.assertFalse(is_unicode(unittest.TestCase)) + self.assertFalse(is_unicode(type(unittest.TestCase))) + + if is_py2: + self.assertTrue(is_unicode(u'bla')) + self.assertTrue(eval("is_unicode(ur'bla')")) # not allowed in py3! + self.assertFalse(is_unicode('bla')) + self.assertFalse(is_unicode(r'bla')) + else: + self.assertTrue(is_unicode('bla')) + self.assertTrue(is_unicode(r'bla')) + self.assertFalse(is_unicode(b'bla')) + self.assertFalse(is_unicode(br'bla')) + + def test_isstr(self): + """ test function isstr """ + + tests = [ + ('abc', True), (u'abc', True), (r'abc', True), + (1, False), (['a', 'b', 'c'], False), (('a', 'b', 'c'), False)] + if not is_py2: + tests.append((b'abc', False)) # b'' does not exist in py2 + + for test_var, expected_result in tests: + self.assertEqual(isstr(test_var), expected_result, + 'isstr of var {0} (which is of type {1}) returned' + '{2} but expected {3}!' + .format(test_var, type(test_var), isstr(test_var), + expected_result)) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/type_helper_unittest.py b/test/type_helper_unittest.py deleted file mode 100644 index 82ccfed..0000000 --- a/test/type_helper_unittest.py +++ /dev/null @@ -1,85 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" type_helper_unittest.py: unit tests for type_helpers - -Tests classes and functions in type_helpers - -Should be run from python2 and python3! - -For help see :py:mod:`unittest` - -.. codeauthor:: Intra2net -""" - -import unittest - -from type_helpers import is_unicode, isstr -from sys import version_info - - -is_py2 = version_info.major == 2 - -class TypeHelperTester(unittest.TestCase): - - def test_is_py2_or_py3(self): - """ test that python version is 2 or 3 - - when implementing type_helpers, there was no py4 and no idea what it - might be like. Also will probably fail for python v1 - """ - self.assertIn(version_info.major, (2, 3)) - - def test_is_unicode(self): - """ tests function is_unicode """ - - self.assertFalse(is_unicode(1)) - self.assertFalse(is_unicode([])) - self.assertFalse(is_unicode(unittest.TestCase)) - self.assertFalse(is_unicode(type(unittest.TestCase))) - - if is_py2: - self.assertTrue(is_unicode(u'bla')) - self.assertTrue(eval("is_unicode(ur'bla')")) # not allowed in py3! - self.assertFalse(is_unicode('bla')) - self.assertFalse(is_unicode(r'bla')) - else: - self.assertTrue(is_unicode('bla')) - self.assertTrue(is_unicode(r'bla')) - self.assertFalse(is_unicode(b'bla')) - self.assertFalse(is_unicode(br'bla')) - - def test_isstr(self): - """ test function isstr """ - - tests = [ - ('abc', True), (u'abc', True), (r'abc', True), - (1, False), (['a', 'b', 'c'], False), (('a', 'b', 'c'), False)] - if not is_py2: - tests.append((b'abc', False)) # b'' does not exist in py2 - - for test_var, expected_result in tests: - self.assertEqual(isstr(test_var), expected_result, - 'isstr of var {0} (which is of type {1}) returned' - '{2} but expected {3}!' - .format(test_var, type(test_var), isstr(test_var), - expected_result)) - - -if __name__ == '__main__': - unittest.main() -- 1.7.1