1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
21 """ Unittests for log_read
23 Creates own thread to write data to a log file
27 from threading import Thread
28 from tempfile import mkstemp
32 from warnings import warn
34 from src.log_read import IterativeReader, LineReader, LogReadWarning
37 from sys import version_info
38 if version_info.major == 2:
39 raise NotImplementedError('pyi2ncommon is no longer compatible with py2')
40 elif version_info.minor < 4:
41 perf_counter = time.clock
43 perf_counter = time.perf_counter
48 class LogFileWriter(Thread):
49 """ thread that creates and writes to given file """
51 def __init__(self, file_name, text_pattern, n_writes=None,
52 pause_time=0.1, do_encode=None, use_logging=True):
53 """ creates thread, deamon is True
55 if n_writes is None, will write indefinitely; else writes text_pattern
56 n_writes times, formatted with (counter, perf_counter)
57 If do_encode is True, will encode text to bytes and open file handle
58 in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text.
59 If use_logging is False, will open file and run file_handle.write;
60 If use_logging is True, will create logger that logs to file and use
61 logging.info (no file_handle.write)
63 super(LogFileWriter, self).__init__()
65 self.file_name = file_name
66 self.text_pattern = text_pattern
67 self.n_writes = n_writes
68 self.pause_time = pause_time
69 self.do_encode = do_encode
70 self.use_logging = use_logging
76 buffering = 0 # no buffering -- only allowed for byte mode
79 buffering = 1 # line buffering -- only allowed for text mode
82 logging.basicConfig(filename=self.file_name, level=logging.INFO,
85 if self.n_writes is not None and counter >= self.n_writes:
87 self.write_and_sleep(logging.info, counter)
90 with open(self.file_name, mode=mode, buffering=buffering) \
93 if self.n_writes is not None and counter >= self.n_writes:
95 self.write_and_sleep(file_handle.write, counter)
98 def write_and_sleep(self, write_func, counter):
99 """ format text, write it using given function and sleep """
100 if isinstance(self.text_pattern, (list, tuple)):
101 text = self.text_pattern[counter]
103 text = self.text_pattern
104 text = text.format(counter, perf_counter())
107 text = text.encode(self.do_encode)
109 time.sleep(self.pause_time)
112 class LogReadTester(unittest.TestCase):
113 """ class with all the tests """
116 """ called before each test """
119 temp_handle, temp_name = mkstemp()
120 os.close(temp_handle)
121 self.temp_file = temp_name
123 print('created temp file ' + self.temp_file)
126 """ called after each test """
128 print('tear down test')
129 if os.path.isfile(self.temp_file):
131 print('delete temp file' + self.temp_file)
132 os.unlink(self.temp_file)
134 def helper_test_len(self, reader, n_expected):
135 """ helper function that tests length of vars in reader """
136 self.assertEqual(reader.n_sources(), n_expected)
137 self.assertEqual(len(reader.file_objs), n_expected)
138 self.assertEqual(len(reader.file_descs), n_expected)
139 self.assertEqual(len(reader.descriptions), n_expected)
140 self.assertEqual(len(reader.ignore), n_expected)
141 self.assertEqual(len(reader.last_sizes), n_expected)
144 self.assertRaises(TypeError, IterativeReader) # no args
145 self.assertRaises(ValueError, IterativeReader, [], 'test')
146 self.assertRaises(ValueError, IterativeReader, [], ['test', ])
147 self.assertRaises(ValueError, IterativeReader, self.temp_file)
148 self.assertRaises(ValueError, IterativeReader, [self.temp_file, ])
149 with open(self.temp_file, 'rt') as file_handle:
150 reader = IterativeReader(file_handle)
151 self.helper_test_len(reader, 1)
152 reader = IterativeReader([file_handle, ])
153 self.helper_test_len(reader, 1)
154 reader = IterativeReader(file_handle, 'desc')
155 self.helper_test_len(reader, 1)
156 reader = IterativeReader([file_handle, ], ['desc', ])
157 self.helper_test_len(reader, 1)
158 reader = IterativeReader(file_handle, ['desc', ])
159 self.helper_test_len(reader, 1)
160 self.assertRaises(ValueError, IterativeReader,
161 [file_handle, ], 'desc', )
162 reader = IterativeReader([file_handle, file_handle],
164 self.helper_test_len(reader, 2)
165 reader = IterativeReader((file_handle for idx in range(5)))
166 self.helper_test_len(reader, 5)
167 self.assertRaises(ValueError, IterativeReader,
168 (file_handle for idx in range(5)),
169 tuple('desc' for idx in range(4)))
170 self.assertRaises(ValueError, IterativeReader,
171 (file_handle for idx in range(5)),
172 ('desc' for idx in range(6)))
174 def test_simple_read(self):
175 """ write fixed number of lines, see how fast they are retrieved """
177 # need newline only when writing text (because of write buffering)
178 param_combinations = ('{0}:{1}\n', None, False), \
179 ('{0}:{1}\n', 'ascii', False), \
180 ('{0}:{1} ' , 'ascii', False)
181 #('{0}:{1}\n', None , True), \ logging seems
182 #('{0}:{1}\n', 'ascii', True), \ to buffer writes
183 #('{0}:{1} ' , None , True), \ to files
184 #('{0}:{1} ' , 'ascii', True)
187 pause_time = 0.01 # 100 tps (texts per second)
189 for text_pattern, encoding, use_logging in param_combinations:
190 LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts,
191 pause_time=pause_time, do_encode=encoding,
192 use_logging=use_logging).start()
194 print('testing with log file {0}'.format(self.temp_file))
195 print('encoding is {0}, use logging = {1}'.format(encoding,
199 with open(self.temp_file, 'rt') as file_handle:
200 reader = IterativeReader(file_handle)
201 self.helper_test_len(reader, 1)
202 counter = -1 # we may have to adapt this manually
203 for desc, text in reader:
204 receive_time = perf_counter()
205 self.assertEqual(desc, self.temp_file)
209 print('{1}: received text "{0}" at {2}'
210 .format(text, counter, receive_time))
211 if counter == 0 and not text:
212 # if reader runs stat() before we write, we might get
213 # a warning and one empty read here
215 warn('Got an empty read, you should have seen another '
216 'warning about file shrinking',
217 category=LogReadWarning)
219 index = text.index(':')
220 count_text = int(text[:index].strip())
221 self.assertEqual(count_text, counter)
222 write_time = float(text[index+1:].strip())
223 time_diffs.append((receive_time - write_time)*1000.)
224 if counter == n_texts-1:
226 print('stop since have {0} reads'.format(counter))
229 print('time diffs in ms: {0}'.format(time_diffs))
230 self.assertTrue(max(time_diffs) < 100.,
231 'read took more than 100ms (max was {0:.3f}ms)!'
232 .format(max(time_diffs)))
234 def test_line_read(self):
235 """ write partial lines, full lines and multiple lines """
237 pause_time = 0.01 # 100 tps (texts per second)
240 texts = ['line0\n', 'line1\n', 'li', 'ne2\n', 'line3\n',
241 'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e',
242 '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n',
243 'end\n', '\nend\n', '\n\nend\n\n']
244 lines_expected = ['line{0}'.format(idx) for idx in range(12)] \
248 LogFileWriter(self.temp_file, texts, n_writes=len(texts),
249 pause_time=pause_time, do_encode=encoding,
250 use_logging=use_logging).start()
254 with open(self.temp_file, 'rt') as file_handle:
255 reader = LineReader(file_handle)
256 self.helper_test_len(reader, 1)
258 for line_expected, (_, line_read) in zip(lines_expected, reader):
259 if 'end' in line_read:
263 print('expect "{0}", read "{1}"'.format(line_expected,
265 self.assertEqual(line_expected, line_read)
268 if __name__ == '__main__':