raise ValueError('source {0} is neither file obj nor file '
'descriptor!')
- # try to fstat the new file descriptor
+ # try to fstat the new file descriptor just for testing
os.fstat(self.file_descs[-1])
# guess descriptions if not given
def __init__(self, *args, **kwargs):
""" forwards all args and kwargs to :py:class:`IterativeReader` """
- super().__init__(self, *args, **kwargs)
- self.line_buffers = ['' for _ in range(self.n_sources)]
+ super().__init__(*args, **kwargs)
+ self.line_buffers = ['' for _ in range(self.n_sources())]
- def prepare_result(self, decription, new_data, idx):
+ def prepare_result(self, description, new_data, idx):
""" take raw new data and split it into lines
if line is not complete, then buffer it
returns lines without their newline characters
"""
+ #print('splitting "{0}" + "{1}"'.format(self.line_buffers[idx],
+ # new_data.replace('\n', r'\n')))
all_data = self.line_buffers[idx] + new_data
+ self.line_buffers[idx] = ''
result = []
should_be_no_new_lines = False
for line in all_data.splitlines(keepends=True):
import time
import logging
-from log_read import IterativeReader
+from log_read import IterativeReader, LineReader
class LogFileWriter(Thread):
counter += 1
def write_and_sleep(self, write_func, counter):
- text = self.text_pattern.format(counter, time.perf_counter())
+ """ format text, write it using given function and sleep """
+ if isinstance(self.text_pattern, (list, tuple)):
+ text = self.text_pattern[counter]
+ else:
+ text = self.text_pattern
+ text = text.format(counter, time.perf_counter())
+
if self.do_encode:
text = text.encode(self.do_encode)
write_func(text)
print('delete temp file' + self.temp_file)
os.unlink(self.temp_file)
- @classmethod
- def setUpClass(clz):
- """ called once before the first test """
- print('setup test class')
- clz.temp_file = None
-
- @classmethod
- def tearDownClass(clz):
- """ called once after the last test """
- print('tear down test class')
+ def helper_test_len(self, reader, n_expected):
+ """ helper function that tests length of vars in reader """
+ self.assertEqual(reader.n_sources(), n_expected)
+ self.assertEqual(len(reader.file_objs), n_expected)
+ self.assertEqual(len(reader.file_descs), n_expected)
+ self.assertEqual(len(reader.descriptions), n_expected)
+ self.assertEqual(len(reader.ignore), n_expected)
+ self.assertEqual(len(reader.last_sizes), n_expected)
def test_args(self):
self.assertRaises(TypeError, IterativeReader) # no args
self.assertRaises(ValueError, IterativeReader, [self.temp_file, ])
with open(self.temp_file, 'rt') as file_handle:
reader = IterativeReader(file_handle)
- self.assertEqual(reader.n_sources(), 1)
+ self.helper_test_len(reader, 1)
reader = IterativeReader([file_handle, ])
- self.assertEqual(reader.n_sources(), 1)
+ self.helper_test_len(reader, 1)
reader = IterativeReader(file_handle, 'desc')
- self.assertEqual(reader.n_sources(), 1)
+ self.helper_test_len(reader, 1)
reader = IterativeReader([file_handle, ], ['desc', ])
- self.assertEqual(reader.n_sources(), 1)
+ self.helper_test_len(reader, 1)
reader = IterativeReader(file_handle, ['desc', ])
- self.assertEqual(reader.n_sources(), 1)
+ self.helper_test_len(reader, 1)
self.assertRaises(ValueError, IterativeReader,
[file_handle, ], 'desc', )
reader = IterativeReader([file_handle, file_handle],
['desc1', 'desc2'])
- self.assertEqual(reader.n_sources(), 2)
+ self.helper_test_len(reader, 2)
reader = IterativeReader((file_handle for idx in range(5)))
- self.assertEqual(reader.n_sources(), 5)
+ self.helper_test_len(reader, 5)
self.assertRaises(ValueError, IterativeReader,
(file_handle for idx in range(5)),
tuple('desc' for idx in range(4)))
with open(self.temp_file, 'rt') as file_handle:
reader = IterativeReader(file_handle)
+ self.helper_test_len(reader, 1)
for counter, (desc, text) in enumerate(reader):
receive_time = time.perf_counter()
text = text.strip()
print('time diffs in us: {0}'.format(time_diffs))
self.assertTrue(max(time_diffs) < 1000., 'read took more than 1ms!')
+ def test_line_read(self):
+ """ write partial lines, full lines and multiple lines """
+
+ pause_time = 0.01 # 100 tps (texts per second)
+ encoding = None
+ use_logging = False
+ texts = ['line0\n', 'line1\n', 'li', 'ne2\n', 'line3\n',
+ 'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e',
+ '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n',
+ 'end\n', '\nend\n', '\n\nend\n\n']
+ lines_expected = ['line{0}'.format(idx) for idx in range(12)] \
+ + ['', '', '']
+
+ # create writer
+ LogFileWriter(self.temp_file, texts, n_writes=len(texts),
+ pause_time=pause_time, do_encode=encoding,
+ use_logging=use_logging).start()
+
+ # read
+ lines_read = []
+ with open(self.temp_file, 'rt') as file_handle:
+ reader = LineReader(file_handle)
+ self.helper_test_len(reader, 1)
+
+ for line_expected, (_, line_read) in zip(lines_expected, reader):
+ if 'end' in line_read:
+ break
+ else:
+ print('expect "{0}", read "{1}"'.format(line_expected,
+ line_read))
+ self.assertEqual(line_expected, line_read)
+
+
if __name__ == '__main__':
unittest.main()