log_read: One more check for test
[pyi2ncommon] / test / test_log_read.py
CommitLineData
3237d2a6
CH
1# The software in this package is distributed under the GNU General
2# Public License version 2 (with a special exception described below).
3#
4# A copy of GNU General Public License (GPL) is included in this distribution,
5# in the file COPYING.GPL.
6#
7# As a special exception, if other files instantiate templates or use macros
8# or inline functions from this file, or you compile this file and link it
9# with other works to produce a work based on this file, this file
10# does not by itself cause the resulting work to be covered
11# by the GNU General Public License.
12#
13# However the source code for this file must still be made available
14# in accordance with section (3) of the GNU General Public License.
15#
16# This exception does not invalidate any other reasons why a work based
17# on this file might be covered by the GNU General Public License.
f365f614
CH
18#
19# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
3237d2a6 20
e7d49180
CH
21""" Unittests for log_read
22
23Creates own thread to write data to a log file
e7d49180
CH
24"""
25
26import unittest
27from threading import Thread
28from tempfile import mkstemp
29import os
30import time
31import logging
ea8b01a3 32from warnings import warn
e7d49180 33
ea8b01a3 34from src.log_read import IterativeReader, LineReader, LogReadWarning
e7d49180 35
59281ef9
CH
36# get best clock
37from sys import version_info
38if version_info.major == 2:
39 raise NotImplementedError('pyi2ncommon is no longer compatible with py2')
40elif version_info.minor < 4:
41 perf_counter = time.clock
42else:
43 perf_counter = time.perf_counter
44
7c00b0af
CH
45DEBUG = False
46
e7d49180
CH
47
48class LogFileWriter(Thread):
49 """ thread that creates and writes to given file """
50
51 def __init__(self, file_name, text_pattern, n_writes=None,
52 pause_time=0.1, do_encode=None, use_logging=True):
53 """ creates thread, deamon is True
54
55 if n_writes is None, will write indefinitely; else writes text_pattern
01fe1580 56 n_writes times, formatted with (counter, perf_counter)
e7d49180
CH
57 If do_encode is True, will encode text to bytes and open file handle
58 in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text.
59 If use_logging is False, will open file and run file_handle.write;
60 If use_logging is True, will create logger that logs to file and use
61 logging.info (no file_handle.write)
62 """
01fe1580
CH
63 super(LogFileWriter, self).__init__()
64 self.daemon = True
e7d49180
CH
65 self.file_name = file_name
66 self.text_pattern = text_pattern
67 self.n_writes = n_writes
68 self.pause_time = pause_time
69 self.do_encode = do_encode
70 self.use_logging = use_logging
71
72 def run(self):
73 counter = 0
74 if self.do_encode:
75 mode = 'wb'
76 buffering = 0 # no buffering -- only allowed for byte mode
77 else:
78 mode = 'wt'
79 buffering = 1 # line buffering -- only allowed for text mode
80
81 if self.use_logging:
82 logging.basicConfig(filename=self.file_name, level=logging.INFO,
83 format='%(msg)s')
84 while True:
85 if self.n_writes is not None and counter >= self.n_writes:
86 break
87 self.write_and_sleep(logging.info, counter)
88 counter += 1
89 else:
90 with open(self.file_name, mode=mode, buffering=buffering) \
91 as file_handle:
92 while True:
93 if self.n_writes is not None and counter >= self.n_writes:
94 break
95 self.write_and_sleep(file_handle.write, counter)
96 counter += 1
97
98 def write_and_sleep(self, write_func, counter):
d910eba5
CH
99 """ format text, write it using given function and sleep """
100 if isinstance(self.text_pattern, (list, tuple)):
101 text = self.text_pattern[counter]
102 else:
103 text = self.text_pattern
01fe1580 104 text = text.format(counter, perf_counter())
d910eba5 105
e7d49180
CH
106 if self.do_encode:
107 text = text.encode(self.do_encode)
108 write_func(text)
e7d49180
CH
109 time.sleep(self.pause_time)
110
111
112class LogReadTester(unittest.TestCase):
113 """ class with all the tests """
114
115 def setUp(self):
116 """ called before each test """
7c00b0af
CH
117 if DEBUG:
118 print('setup test')
e7d49180
CH
119 temp_handle, temp_name = mkstemp()
120 os.close(temp_handle)
121 self.temp_file = temp_name
7c00b0af
CH
122 if DEBUG:
123 print('created temp file ' + self.temp_file)
e7d49180
CH
124
125 def tearDown(self):
126 """ called after each test """
7c00b0af
CH
127 if DEBUG:
128 print('tear down test')
e7d49180 129 if os.path.isfile(self.temp_file):
7c00b0af
CH
130 if DEBUG:
131 print('delete temp file' + self.temp_file)
e7d49180
CH
132 os.unlink(self.temp_file)
133
d910eba5
CH
134 def helper_test_len(self, reader, n_expected):
135 """ helper function that tests length of vars in reader """
136 self.assertEqual(reader.n_sources(), n_expected)
137 self.assertEqual(len(reader.file_objs), n_expected)
138 self.assertEqual(len(reader.file_descs), n_expected)
139 self.assertEqual(len(reader.descriptions), n_expected)
140 self.assertEqual(len(reader.ignore), n_expected)
141 self.assertEqual(len(reader.last_sizes), n_expected)
e7d49180
CH
142
143 def test_args(self):
144 self.assertRaises(TypeError, IterativeReader) # no args
145 self.assertRaises(ValueError, IterativeReader, [], 'test')
146 self.assertRaises(ValueError, IterativeReader, [], ['test', ])
147 self.assertRaises(ValueError, IterativeReader, self.temp_file)
148 self.assertRaises(ValueError, IterativeReader, [self.temp_file, ])
149 with open(self.temp_file, 'rt') as file_handle:
150 reader = IterativeReader(file_handle)
d910eba5 151 self.helper_test_len(reader, 1)
e7d49180 152 reader = IterativeReader([file_handle, ])
d910eba5 153 self.helper_test_len(reader, 1)
e7d49180 154 reader = IterativeReader(file_handle, 'desc')
d910eba5 155 self.helper_test_len(reader, 1)
e7d49180 156 reader = IterativeReader([file_handle, ], ['desc', ])
d910eba5 157 self.helper_test_len(reader, 1)
e7d49180 158 reader = IterativeReader(file_handle, ['desc', ])
d910eba5 159 self.helper_test_len(reader, 1)
e7d49180
CH
160 self.assertRaises(ValueError, IterativeReader,
161 [file_handle, ], 'desc', )
162 reader = IterativeReader([file_handle, file_handle],
163 ['desc1', 'desc2'])
d910eba5 164 self.helper_test_len(reader, 2)
e7d49180 165 reader = IterativeReader((file_handle for idx in range(5)))
d910eba5 166 self.helper_test_len(reader, 5)
e7d49180
CH
167 self.assertRaises(ValueError, IterativeReader,
168 (file_handle for idx in range(5)),
169 tuple('desc' for idx in range(4)))
170 self.assertRaises(ValueError, IterativeReader,
171 (file_handle for idx in range(5)),
172 ('desc' for idx in range(6)))
173
174 def test_simple_read(self):
175 """ write fixed number of lines, see how fast they are retrieved """
176
177 # need newline only when writing text (because of write buffering)
178 param_combinations = ('{0}:{1}\n', None, False), \
179 ('{0}:{1}\n', 'ascii', False), \
180 ('{0}:{1} ' , 'ascii', False)
181 #('{0}:{1}\n', None , True), \ logging seems
182 #('{0}:{1}\n', 'ascii', True), \ to buffer writes
183 #('{0}:{1} ' , None , True), \ to files
184 #('{0}:{1} ' , 'ascii', True)
185
186 n_texts = 10
187 pause_time = 0.01 # 100 tps (texts per second)
188
189 for text_pattern, encoding, use_logging in param_combinations:
190 LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts,
191 pause_time=pause_time, do_encode=encoding,
192 use_logging=use_logging).start()
7c00b0af
CH
193 if DEBUG:
194 print('testing with log file {0}'.format(self.temp_file))
195 print('encoding is {0}, use logging = {1}'.format(encoding,
196 use_logging))
e7d49180
CH
197 time_diffs = []
198
199 with open(self.temp_file, 'rt') as file_handle:
200 reader = IterativeReader(file_handle)
d910eba5 201 self.helper_test_len(reader, 1)
ea8b01a3
CH
202 counter = -1 # we may have to adapt this manually
203 for desc, text in reader:
01fe1580 204 receive_time = perf_counter()
f75cc662 205 self.assertEqual(desc, self.temp_file)
ea8b01a3 206 counter += 1
e7d49180 207 text = text.strip()
7c00b0af
CH
208 if DEBUG:
209 print('{1}: received text "{0}" at {2}'
210 .format(text, counter, receive_time))
ea8b01a3
CH
211 if counter == 0 and not text:
212 # if reader runs stat() before we write, we might get
213 # a warning and one empty read here
214 counter -= 1
215 warn('Got an empty read, you should have seen another '
216 'warning about file shrinking',
217 category=LogReadWarning)
218 continue
e7d49180
CH
219 index = text.index(':')
220 count_text = int(text[:index].strip())
221 self.assertEqual(count_text, counter)
222 write_time = float(text[index+1:].strip())
58347979 223 time_diffs.append((receive_time - write_time)*1000.)
e7d49180 224 if counter == n_texts-1:
7c00b0af
CH
225 if DEBUG:
226 print('stop since have {0} reads'.format(counter))
e7d49180 227 break
7c00b0af
CH
228 if DEBUG:
229 print('time diffs in ms: {0}'.format(time_diffs))
58347979
CH
230 self.assertTrue(max(time_diffs) < 100.,
231 'read took more than 100ms (max was {0:.3f}ms)!'
6ab9df9b 232 .format(max(time_diffs)))
e7d49180 233
d910eba5
CH
234 def test_line_read(self):
235 """ write partial lines, full lines and multiple lines """
236
237 pause_time = 0.01 # 100 tps (texts per second)
238 encoding = None
239 use_logging = False
240 texts = ['line0\n', 'line1\n', 'li', 'ne2\n', 'line3\n',
241 'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e',
242 '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n',
243 'end\n', '\nend\n', '\n\nend\n\n']
244 lines_expected = ['line{0}'.format(idx) for idx in range(12)] \
245 + ['', '', '']
246
247 # create writer
248 LogFileWriter(self.temp_file, texts, n_writes=len(texts),
249 pause_time=pause_time, do_encode=encoding,
250 use_logging=use_logging).start()
251
252 # read
253 lines_read = []
254 with open(self.temp_file, 'rt') as file_handle:
255 reader = LineReader(file_handle)
256 self.helper_test_len(reader, 1)
257
258 for line_expected, (_, line_read) in zip(lines_expected, reader):
259 if 'end' in line_read:
260 break
261 else:
7c00b0af
CH
262 if DEBUG:
263 print('expect "{0}", read "{1}"'.format(line_expected,
264 line_read))
d910eba5
CH
265 self.assertEqual(line_expected, line_read)
266
267
e7d49180
CH
268if __name__ == '__main__':
269 unittest.main()