From 7e758ca3755e17300c5db106dbec3967cebd4ceb Mon Sep 17 00:00:00 2001 From: Christian Herdtweck Date: Thu, 14 Jan 2016 16:54:46 +0100 Subject: [PATCH] moved old unittests into test subdir --- follow_unittest.py | 142 --------------------------- log_read_unittest.py | 234 --------------------------------------------- test/follow_unittest.py | 142 +++++++++++++++++++++++++++ test/log_read_unittest.py | 234 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 376 insertions(+), 376 deletions(-) delete mode 100644 follow_unittest.py delete mode 100644 log_read_unittest.py create mode 100644 test/follow_unittest.py create mode 100644 test/log_read_unittest.py diff --git a/follow_unittest.py b/follow_unittest.py deleted file mode 100644 index 0febfce..0000000 --- a/follow_unittest.py +++ /dev/null @@ -1,142 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" Unittest for follow.py - -This is a little more involved since in order to test properly, need a second -thread that writes to file/socket/stdout - -..todo:: as in log_read_unittest: disable buffering in LogFileWriter - -DEPRECATED -(at least for files see log_read; may still be usefull for pipes/sockets) - -.. codeauthor:: Intra2net -""" - - -import unittest -from tempfile import mkstemp -import threading -from follow import * -import time -import os -import os.path - - -class LogFileWriter(threading.Thread): - """ thread that creates and writes to given file handle """ - - def __init__(self, file_name, text_pattern, n_writes=None, - pause_time=0.1, do_encode=None): - """ creates thread, deamon is True - - if n_writes is None, will write indefinitely; else writes text_pattern - n_writes times, formatted with (counter, time.perf_counter) - """ - super().__init__(daemon=True) - self.file_name = file_name - self.text_pattern = text_pattern - self.n_writes = n_writes - self.pause_time = pause_time - self.do_encode = do_encode - - def run(self): - counter = 0 - if self.do_encode: - mode = 'wb' - else: - mode = 'wt' - - with open(self.file_name, mode) as file_handle: - while True: - if self.n_writes is not None and counter >= self.n_writes: - break - - if self.do_encode: - file_handle.write(self.text_pattern - .format(counter, time.perf_counter()) - .encode(self.do_encode)) - else: - file_handle.write(self.text_pattern - .format(counter, time.perf_counter())) - print('wrote {0}'.format(counter)) - counter += 1 - time.sleep(self.pause_time) - - -class FollowTester(unittest.TestCase): - """ Unittest for follow.py """ - - def test_logfile(self): - """ create logfile, write to it, expect output """ - - text_pattern = '{0}:{1}\n' - n_texts = 10 - pause_time = 1 - encoding = 'ascii' - - LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts, - pause_time=pause_time, do_encode=encoding).start() - print('testing with log file {0}'.format(self.temp_file)) - time_diffs = [] - - with open(self.temp_file) as read_handle: - follower = Follower(read_handle) - for counter, (source, desc, text, flags) in \ - enumerate(follower): - receive_time = time.perf_counter() - print('received text: "{0}"'.format(text)) - index = text.index(':') - counter = int(text[:index].strip()) - write_time = float(text[index+1:].strip()) - time_diffs.append(receive_time - write_time) - if counter == n_texts: - break - - - def setUp(self): - """ called before each test """ - print('setup test') - temp_handle, temp_name = mkstemp() - os.close(temp_handle) - self.temp_file = temp_name - print('created temp file ' + self.temp_file) - - def tearDown(self): - """ called after each test """ - print('tear down test') - if os.path.isfile(self.temp_file): - print('delete temp file' + self.temp_file) - os.unlink(self.temp_file) - - @classmethod - def setUpClass(clz): - """ called once before the first test """ - print('setup test class') - clz.temp_file = None - - @classmethod - def tearDownClass(clz): - """ called once after the last test """ - print('tear down test class') - - - -if __name__ == '__main__': - unittest.main() diff --git a/log_read_unittest.py b/log_read_unittest.py deleted file mode 100644 index 7505ad8..0000000 --- a/log_read_unittest.py +++ /dev/null @@ -1,234 +0,0 @@ -# The software in this package is distributed under the GNU General -# Public License version 2 (with a special exception described below). -# -# A copy of GNU General Public License (GPL) is included in this distribution, -# in the file COPYING.GPL. -# -# As a special exception, if other files instantiate templates or use macros -# or inline functions from this file, or you compile this file and link it -# with other works to produce a work based on this file, this file -# does not by itself cause the resulting work to be covered -# by the GNU General Public License. -# -# However the source code for this file must still be made available -# in accordance with section (3) of the GNU General Public License. -# -# This exception does not invalidate any other reasons why a work based -# on this file might be covered by the GNU General Public License. - -""" Unittests for log_read - -Creates own thread to write data to a log file - -.. codeauthor:: Intra2net -""" - -import unittest -from threading import Thread -from tempfile import mkstemp -import os -import time -import logging - -from log_read import IterativeReader, LineReader - - -class LogFileWriter(Thread): - """ thread that creates and writes to given file """ - - def __init__(self, file_name, text_pattern, n_writes=None, - pause_time=0.1, do_encode=None, use_logging=True): - """ creates thread, deamon is True - - if n_writes is None, will write indefinitely; else writes text_pattern - n_writes times, formatted with (counter, time.perf_counter) - If do_encode is True, will encode text to bytes and open file handle - in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text. - If use_logging is False, will open file and run file_handle.write; - If use_logging is True, will create logger that logs to file and use - logging.info (no file_handle.write) - """ - super().__init__(daemon=True) - self.file_name = file_name - self.text_pattern = text_pattern - self.n_writes = n_writes - self.pause_time = pause_time - self.do_encode = do_encode - self.use_logging = use_logging - - def run(self): - counter = 0 - if self.do_encode: - mode = 'wb' - buffering = 0 # no buffering -- only allowed for byte mode - else: - mode = 'wt' - buffering = 1 # line buffering -- only allowed for text mode - - if self.use_logging: - logging.basicConfig(filename=self.file_name, level=logging.INFO, - format='%(msg)s') - while True: - if self.n_writes is not None and counter >= self.n_writes: - break - self.write_and_sleep(logging.info, counter) - counter += 1 - else: - with open(self.file_name, mode=mode, buffering=buffering) \ - as file_handle: - while True: - if self.n_writes is not None and counter >= self.n_writes: - break - self.write_and_sleep(file_handle.write, counter) - counter += 1 - - def write_and_sleep(self, write_func, counter): - """ format text, write it using given function and sleep """ - if isinstance(self.text_pattern, (list, tuple)): - text = self.text_pattern[counter] - else: - text = self.text_pattern - text = text.format(counter, time.perf_counter()) - - if self.do_encode: - text = text.encode(self.do_encode) - write_func(text) - #print('wrote {0}'.format(counter)) - time.sleep(self.pause_time) - - -class LogReadTester(unittest.TestCase): - """ class with all the tests """ - - def setUp(self): - """ called before each test """ - print('setup test') - temp_handle, temp_name = mkstemp() - os.close(temp_handle) - self.temp_file = temp_name - print('created temp file ' + self.temp_file) - - def tearDown(self): - """ called after each test """ - print('tear down test') - if os.path.isfile(self.temp_file): - print('delete temp file' + self.temp_file) - os.unlink(self.temp_file) - - def helper_test_len(self, reader, n_expected): - """ helper function that tests length of vars in reader """ - self.assertEqual(reader.n_sources(), n_expected) - self.assertEqual(len(reader.file_objs), n_expected) - self.assertEqual(len(reader.file_descs), n_expected) - self.assertEqual(len(reader.descriptions), n_expected) - self.assertEqual(len(reader.ignore), n_expected) - self.assertEqual(len(reader.last_sizes), n_expected) - - def test_args(self): - self.assertRaises(TypeError, IterativeReader) # no args - self.assertRaises(ValueError, IterativeReader, [], 'test') - self.assertRaises(ValueError, IterativeReader, [], ['test', ]) - self.assertRaises(ValueError, IterativeReader, self.temp_file) - self.assertRaises(ValueError, IterativeReader, [self.temp_file, ]) - with open(self.temp_file, 'rt') as file_handle: - reader = IterativeReader(file_handle) - self.helper_test_len(reader, 1) - reader = IterativeReader([file_handle, ]) - self.helper_test_len(reader, 1) - reader = IterativeReader(file_handle, 'desc') - self.helper_test_len(reader, 1) - reader = IterativeReader([file_handle, ], ['desc', ]) - self.helper_test_len(reader, 1) - reader = IterativeReader(file_handle, ['desc', ]) - self.helper_test_len(reader, 1) - self.assertRaises(ValueError, IterativeReader, - [file_handle, ], 'desc', ) - reader = IterativeReader([file_handle, file_handle], - ['desc1', 'desc2']) - self.helper_test_len(reader, 2) - reader = IterativeReader((file_handle for idx in range(5))) - self.helper_test_len(reader, 5) - self.assertRaises(ValueError, IterativeReader, - (file_handle for idx in range(5)), - tuple('desc' for idx in range(4))) - self.assertRaises(ValueError, IterativeReader, - (file_handle for idx in range(5)), - ('desc' for idx in range(6))) - - def test_simple_read(self): - """ write fixed number of lines, see how fast they are retrieved """ - - # need newline only when writing text (because of write buffering) - param_combinations = ('{0}:{1}\n', None, False), \ - ('{0}:{1}\n', 'ascii', False), \ - ('{0}:{1} ' , 'ascii', False) - #('{0}:{1}\n', None , True), \ logging seems - #('{0}:{1}\n', 'ascii', True), \ to buffer writes - #('{0}:{1} ' , None , True), \ to files - #('{0}:{1} ' , 'ascii', True) - - n_texts = 10 - pause_time = 0.01 # 100 tps (texts per second) - - for text_pattern, encoding, use_logging in param_combinations: - LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts, - pause_time=pause_time, do_encode=encoding, - use_logging=use_logging).start() - print('testing with log file {0}'.format(self.temp_file)) - print('encoding is {0}, use logging = {1}'.format(encoding, - use_logging)) - time_diffs = [] - - with open(self.temp_file, 'rt') as file_handle: - reader = IterativeReader(file_handle) - self.helper_test_len(reader, 1) - for counter, (desc, text) in enumerate(reader): - receive_time = time.perf_counter() - text = text.strip() - print('{1}: received text "{0}"'.format(text, counter)) - index = text.index(':') - count_text = int(text[:index].strip()) - self.assertEqual(count_text, counter) - write_time = float(text[index+1:].strip()) - time_diffs.append((receive_time - write_time)*1.e6) - if counter == n_texts-1: - print('stop since have {0} reads'.format(counter)) - break - print('time diffs in us: {0}'.format(time_diffs)) - self.assertTrue(max(time_diffs) < 1000., 'read took more than 1ms!') - - def test_line_read(self): - """ write partial lines, full lines and multiple lines """ - - pause_time = 0.01 # 100 tps (texts per second) - encoding = None - use_logging = False - texts = ['line0\n', 'line1\n', 'li', 'ne2\n', 'line3\n', - 'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e', - '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n', - 'end\n', '\nend\n', '\n\nend\n\n'] - lines_expected = ['line{0}'.format(idx) for idx in range(12)] \ - + ['', '', ''] - - # create writer - LogFileWriter(self.temp_file, texts, n_writes=len(texts), - pause_time=pause_time, do_encode=encoding, - use_logging=use_logging).start() - - # read - lines_read = [] - with open(self.temp_file, 'rt') as file_handle: - reader = LineReader(file_handle) - self.helper_test_len(reader, 1) - - for line_expected, (_, line_read) in zip(lines_expected, reader): - if 'end' in line_read: - break - else: - print('expect "{0}", read "{1}"'.format(line_expected, - line_read)) - self.assertEqual(line_expected, line_read) - - -if __name__ == '__main__': - unittest.main() diff --git a/test/follow_unittest.py b/test/follow_unittest.py new file mode 100644 index 0000000..0febfce --- /dev/null +++ b/test/follow_unittest.py @@ -0,0 +1,142 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" Unittest for follow.py + +This is a little more involved since in order to test properly, need a second +thread that writes to file/socket/stdout + +..todo:: as in log_read_unittest: disable buffering in LogFileWriter + +DEPRECATED +(at least for files see log_read; may still be usefull for pipes/sockets) + +.. codeauthor:: Intra2net +""" + + +import unittest +from tempfile import mkstemp +import threading +from follow import * +import time +import os +import os.path + + +class LogFileWriter(threading.Thread): + """ thread that creates and writes to given file handle """ + + def __init__(self, file_name, text_pattern, n_writes=None, + pause_time=0.1, do_encode=None): + """ creates thread, deamon is True + + if n_writes is None, will write indefinitely; else writes text_pattern + n_writes times, formatted with (counter, time.perf_counter) + """ + super().__init__(daemon=True) + self.file_name = file_name + self.text_pattern = text_pattern + self.n_writes = n_writes + self.pause_time = pause_time + self.do_encode = do_encode + + def run(self): + counter = 0 + if self.do_encode: + mode = 'wb' + else: + mode = 'wt' + + with open(self.file_name, mode) as file_handle: + while True: + if self.n_writes is not None and counter >= self.n_writes: + break + + if self.do_encode: + file_handle.write(self.text_pattern + .format(counter, time.perf_counter()) + .encode(self.do_encode)) + else: + file_handle.write(self.text_pattern + .format(counter, time.perf_counter())) + print('wrote {0}'.format(counter)) + counter += 1 + time.sleep(self.pause_time) + + +class FollowTester(unittest.TestCase): + """ Unittest for follow.py """ + + def test_logfile(self): + """ create logfile, write to it, expect output """ + + text_pattern = '{0}:{1}\n' + n_texts = 10 + pause_time = 1 + encoding = 'ascii' + + LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts, + pause_time=pause_time, do_encode=encoding).start() + print('testing with log file {0}'.format(self.temp_file)) + time_diffs = [] + + with open(self.temp_file) as read_handle: + follower = Follower(read_handle) + for counter, (source, desc, text, flags) in \ + enumerate(follower): + receive_time = time.perf_counter() + print('received text: "{0}"'.format(text)) + index = text.index(':') + counter = int(text[:index].strip()) + write_time = float(text[index+1:].strip()) + time_diffs.append(receive_time - write_time) + if counter == n_texts: + break + + + def setUp(self): + """ called before each test """ + print('setup test') + temp_handle, temp_name = mkstemp() + os.close(temp_handle) + self.temp_file = temp_name + print('created temp file ' + self.temp_file) + + def tearDown(self): + """ called after each test """ + print('tear down test') + if os.path.isfile(self.temp_file): + print('delete temp file' + self.temp_file) + os.unlink(self.temp_file) + + @classmethod + def setUpClass(clz): + """ called once before the first test """ + print('setup test class') + clz.temp_file = None + + @classmethod + def tearDownClass(clz): + """ called once after the last test """ + print('tear down test class') + + + +if __name__ == '__main__': + unittest.main() diff --git a/test/log_read_unittest.py b/test/log_read_unittest.py new file mode 100644 index 0000000..7505ad8 --- /dev/null +++ b/test/log_read_unittest.py @@ -0,0 +1,234 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. + +""" Unittests for log_read + +Creates own thread to write data to a log file + +.. codeauthor:: Intra2net +""" + +import unittest +from threading import Thread +from tempfile import mkstemp +import os +import time +import logging + +from log_read import IterativeReader, LineReader + + +class LogFileWriter(Thread): + """ thread that creates and writes to given file """ + + def __init__(self, file_name, text_pattern, n_writes=None, + pause_time=0.1, do_encode=None, use_logging=True): + """ creates thread, deamon is True + + if n_writes is None, will write indefinitely; else writes text_pattern + n_writes times, formatted with (counter, time.perf_counter) + If do_encode is True, will encode text to bytes and open file handle + in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text. + If use_logging is False, will open file and run file_handle.write; + If use_logging is True, will create logger that logs to file and use + logging.info (no file_handle.write) + """ + super().__init__(daemon=True) + self.file_name = file_name + self.text_pattern = text_pattern + self.n_writes = n_writes + self.pause_time = pause_time + self.do_encode = do_encode + self.use_logging = use_logging + + def run(self): + counter = 0 + if self.do_encode: + mode = 'wb' + buffering = 0 # no buffering -- only allowed for byte mode + else: + mode = 'wt' + buffering = 1 # line buffering -- only allowed for text mode + + if self.use_logging: + logging.basicConfig(filename=self.file_name, level=logging.INFO, + format='%(msg)s') + while True: + if self.n_writes is not None and counter >= self.n_writes: + break + self.write_and_sleep(logging.info, counter) + counter += 1 + else: + with open(self.file_name, mode=mode, buffering=buffering) \ + as file_handle: + while True: + if self.n_writes is not None and counter >= self.n_writes: + break + self.write_and_sleep(file_handle.write, counter) + counter += 1 + + def write_and_sleep(self, write_func, counter): + """ format text, write it using given function and sleep """ + if isinstance(self.text_pattern, (list, tuple)): + text = self.text_pattern[counter] + else: + text = self.text_pattern + text = text.format(counter, time.perf_counter()) + + if self.do_encode: + text = text.encode(self.do_encode) + write_func(text) + #print('wrote {0}'.format(counter)) + time.sleep(self.pause_time) + + +class LogReadTester(unittest.TestCase): + """ class with all the tests """ + + def setUp(self): + """ called before each test """ + print('setup test') + temp_handle, temp_name = mkstemp() + os.close(temp_handle) + self.temp_file = temp_name + print('created temp file ' + self.temp_file) + + def tearDown(self): + """ called after each test """ + print('tear down test') + if os.path.isfile(self.temp_file): + print('delete temp file' + self.temp_file) + os.unlink(self.temp_file) + + def helper_test_len(self, reader, n_expected): + """ helper function that tests length of vars in reader """ + self.assertEqual(reader.n_sources(), n_expected) + self.assertEqual(len(reader.file_objs), n_expected) + self.assertEqual(len(reader.file_descs), n_expected) + self.assertEqual(len(reader.descriptions), n_expected) + self.assertEqual(len(reader.ignore), n_expected) + self.assertEqual(len(reader.last_sizes), n_expected) + + def test_args(self): + self.assertRaises(TypeError, IterativeReader) # no args + self.assertRaises(ValueError, IterativeReader, [], 'test') + self.assertRaises(ValueError, IterativeReader, [], ['test', ]) + self.assertRaises(ValueError, IterativeReader, self.temp_file) + self.assertRaises(ValueError, IterativeReader, [self.temp_file, ]) + with open(self.temp_file, 'rt') as file_handle: + reader = IterativeReader(file_handle) + self.helper_test_len(reader, 1) + reader = IterativeReader([file_handle, ]) + self.helper_test_len(reader, 1) + reader = IterativeReader(file_handle, 'desc') + self.helper_test_len(reader, 1) + reader = IterativeReader([file_handle, ], ['desc', ]) + self.helper_test_len(reader, 1) + reader = IterativeReader(file_handle, ['desc', ]) + self.helper_test_len(reader, 1) + self.assertRaises(ValueError, IterativeReader, + [file_handle, ], 'desc', ) + reader = IterativeReader([file_handle, file_handle], + ['desc1', 'desc2']) + self.helper_test_len(reader, 2) + reader = IterativeReader((file_handle for idx in range(5))) + self.helper_test_len(reader, 5) + self.assertRaises(ValueError, IterativeReader, + (file_handle for idx in range(5)), + tuple('desc' for idx in range(4))) + self.assertRaises(ValueError, IterativeReader, + (file_handle for idx in range(5)), + ('desc' for idx in range(6))) + + def test_simple_read(self): + """ write fixed number of lines, see how fast they are retrieved """ + + # need newline only when writing text (because of write buffering) + param_combinations = ('{0}:{1}\n', None, False), \ + ('{0}:{1}\n', 'ascii', False), \ + ('{0}:{1} ' , 'ascii', False) + #('{0}:{1}\n', None , True), \ logging seems + #('{0}:{1}\n', 'ascii', True), \ to buffer writes + #('{0}:{1} ' , None , True), \ to files + #('{0}:{1} ' , 'ascii', True) + + n_texts = 10 + pause_time = 0.01 # 100 tps (texts per second) + + for text_pattern, encoding, use_logging in param_combinations: + LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts, + pause_time=pause_time, do_encode=encoding, + use_logging=use_logging).start() + print('testing with log file {0}'.format(self.temp_file)) + print('encoding is {0}, use logging = {1}'.format(encoding, + use_logging)) + time_diffs = [] + + with open(self.temp_file, 'rt') as file_handle: + reader = IterativeReader(file_handle) + self.helper_test_len(reader, 1) + for counter, (desc, text) in enumerate(reader): + receive_time = time.perf_counter() + text = text.strip() + print('{1}: received text "{0}"'.format(text, counter)) + index = text.index(':') + count_text = int(text[:index].strip()) + self.assertEqual(count_text, counter) + write_time = float(text[index+1:].strip()) + time_diffs.append((receive_time - write_time)*1.e6) + if counter == n_texts-1: + print('stop since have {0} reads'.format(counter)) + break + print('time diffs in us: {0}'.format(time_diffs)) + self.assertTrue(max(time_diffs) < 1000., 'read took more than 1ms!') + + def test_line_read(self): + """ write partial lines, full lines and multiple lines """ + + pause_time = 0.01 # 100 tps (texts per second) + encoding = None + use_logging = False + texts = ['line0\n', 'line1\n', 'li', 'ne2\n', 'line3\n', + 'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e', + '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n', + 'end\n', '\nend\n', '\n\nend\n\n'] + lines_expected = ['line{0}'.format(idx) for idx in range(12)] \ + + ['', '', ''] + + # create writer + LogFileWriter(self.temp_file, texts, n_writes=len(texts), + pause_time=pause_time, do_encode=encoding, + use_logging=use_logging).start() + + # read + lines_read = [] + with open(self.temp_file, 'rt') as file_handle: + reader = LineReader(file_handle) + self.helper_test_len(reader, 1) + + for line_expected, (_, line_read) in zip(lines_expected, reader): + if 'end' in line_read: + break + else: + print('expect "{0}", read "{1}"'.format(line_expected, + line_read)) + self.assertEqual(line_expected, line_read) + + +if __name__ == '__main__': + unittest.main() -- 1.7.1