28576a4535e24b068956b45cbe530c9ccd4db3b3
[pyi2ncommon] / test / test_log_read.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
20
21 """ Unittests for log_read
22
23 Creates own thread to write data to a log file
24 """
25
26 import unittest
27 from threading import Thread
28 from tempfile import mkstemp
29 import os
30 import time
31 import logging
32 from warnings import warn
33
34 from src.log_read import IterativeReader, LineReader, LogReadWarning
35
36 # get best clock
37 perf_counter = time.perf_counter
38
39 DEBUG = False
40
41
42 class LogFileWriter(Thread):
43     """ thread that creates and writes to given file """
44
45     def __init__(self, file_name, text_pattern, n_writes=None,
46                  pause_time=0.1, do_encode=None, use_logging=True):
47         """ creates thread, deamon is True
48
49         if n_writes is None, will write indefinitely; else writes text_pattern
50         n_writes times, formatted with (counter, perf_counter)
51         If do_encode is True, will encode text to bytes and open file handle
52         in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text.
53         If use_logging is False, will open file and run file_handle.write;
54         If use_logging is True, will create logger that logs to file and use
55         logging.info (no file_handle.write)
56         """
57         super(LogFileWriter, self).__init__()
58         self.daemon = True
59         self.file_name = file_name
60         self.text_pattern = text_pattern
61         self.n_writes = n_writes
62         self.pause_time = pause_time
63         self.do_encode = do_encode
64         self.use_logging = use_logging
65
66     def run(self):
67         counter = 0
68         if self.do_encode:
69             mode = 'wb'
70             buffering = 0  # no buffering -- only allowed for byte mode
71         else:
72             mode = 'wt'
73             buffering = 1  # line buffering -- only allowed for text mode
74
75         if self.use_logging:
76             logging.basicConfig(filename=self.file_name, level=logging.INFO,
77                                 format='%(msg)s')
78             while True:
79                 if self.n_writes is not None and counter >= self.n_writes:
80                     break
81                 self.write_and_sleep(logging.info, counter)
82                 counter += 1
83         else:
84             with open(self.file_name, mode=mode, buffering=buffering) \
85                     as file_handle:
86                 while True:
87                     if self.n_writes is not None and counter >= self.n_writes:
88                         break
89                     self.write_and_sleep(file_handle.write, counter)
90                     counter += 1
91
92     def write_and_sleep(self, write_func, counter):
93         """ format text, write it using given function and sleep """
94         if isinstance(self.text_pattern, (list, tuple)):
95             text = self.text_pattern[counter]
96         else:
97             text = self.text_pattern
98         text = text.format(counter, perf_counter())
99
100         if self.do_encode:
101             text = text.encode(self.do_encode)
102         write_func(text)
103         time.sleep(self.pause_time)
104
105
106 class LogReadTester(unittest.TestCase):
107     """ class with all the tests """
108
109     def setUp(self):
110         """ called before each test """
111         if DEBUG:
112             print('setup test')
113         temp_handle, temp_name = mkstemp()
114         os.close(temp_handle)
115         self.temp_file = temp_name
116         if DEBUG:
117             print('created temp file ' + self.temp_file)
118
119     def tearDown(self):
120         """ called after each test """
121         if DEBUG:
122             print('tear down test')
123         if os.path.isfile(self.temp_file):
124             if DEBUG:
125                 print('delete temp file' + self.temp_file)
126             os.unlink(self.temp_file)
127
128     def helper_test_len(self, reader, n_expected):
129         """ helper function that tests length of vars in reader """
130         self.assertEqual(reader.n_sources(), n_expected)
131         self.assertEqual(len(reader.file_objs), n_expected)
132         self.assertEqual(len(reader.file_handles), n_expected)
133         self.assertEqual(len(reader.descriptions), n_expected)
134         self.assertEqual(len(reader.ignore), n_expected)
135         self.assertEqual(len(reader.last_sizes), n_expected)
136
137     def test_args(self):
138         self.assertRaises(TypeError, IterativeReader)  # no args
139         self.assertRaises(ValueError, IterativeReader, [], 'test')
140         self.assertRaises(ValueError, IterativeReader, [], ['test', ])
141         self.assertRaises(ValueError, IterativeReader, self.temp_file)
142         self.assertRaises(ValueError, IterativeReader, [self.temp_file, ])
143         with open(self.temp_file, 'rt') as file_handle:
144             reader = IterativeReader(file_handle)
145             self.helper_test_len(reader, 1)
146             reader = IterativeReader([file_handle, ])
147             self.helper_test_len(reader, 1)
148             reader = IterativeReader(file_handle, 'desc')
149             self.helper_test_len(reader, 1)
150             reader = IterativeReader([file_handle, ], ['desc', ])
151             self.helper_test_len(reader, 1)
152             reader = IterativeReader(file_handle, ['desc', ])
153             self.helper_test_len(reader, 1)
154             self.assertRaises(ValueError, IterativeReader,
155                               [file_handle, ], 'desc', )
156             reader = IterativeReader([file_handle, file_handle],
157                                      ['desc1', 'desc2'])
158             self.helper_test_len(reader, 2)
159             reader = IterativeReader((file_handle for idx in range(5)))
160             self.helper_test_len(reader, 5)
161             self.assertRaises(ValueError, IterativeReader,
162                               (file_handle for idx in range(5)),
163                               tuple('desc' for idx in range(4)))
164             self.assertRaises(ValueError, IterativeReader,
165                               (file_handle for idx in range(5)),
166                               ('desc' for idx in range(6)))
167
168     def test_simple_read(self):
169         """ write fixed number of lines, see how fast they are retrieved """
170
171         # need newline only when writing text (because of write buffering)
172         param_combinations = ('{0}:{1}\n', None,    False), \
173                              ('{0}:{1}\n', 'ascii', False), \
174                              ('{0}:{1} ' , 'ascii', False)
175                              #('{0}:{1}\n', None   , True), \  logging seems
176                              #('{0}:{1}\n', 'ascii', True), \  to buffer writes
177                              #('{0}:{1} ' , None   , True), \  to files
178                              #('{0}:{1} ' , 'ascii', True)
179
180         n_texts = 10
181         pause_time = 0.01  # 100 tps (texts per second)
182
183         for text_pattern, encoding, use_logging in param_combinations:
184             LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts,
185                           pause_time=pause_time, do_encode=encoding,
186                           use_logging=use_logging).start()
187             if DEBUG:
188                 print('testing with log file {0}'.format(self.temp_file))
189                 print('encoding is {0}, use logging = {1}'.format(encoding,
190                                                                   use_logging))
191             time_diffs = []
192
193             with open(self.temp_file, 'rt') as file_handle:
194                 reader = IterativeReader(file_handle)
195                 self.helper_test_len(reader, 1)
196                 counter = -1        # we may have to adapt this manually
197                 for desc, text, source_idx in reader:
198                     receive_time = perf_counter()
199                     self.assertEqual(desc, self.temp_file)
200                     self.assertEqual(source_idx, 0)
201                     counter += 1
202                     text = text.strip()
203                     if DEBUG:
204                         print('{1}: received text "{0}" at {2}'
205                               .format(text, counter, receive_time))
206                     if counter == 0 and not text:
207                         # if reader runs stat() before we write, we might get
208                         # a warning and one empty read here
209                         counter -= 1
210                         warn('Got an empty read, you should have seen another '
211                              'warning about file shrinking',
212                              category=LogReadWarning)
213                         continue
214                     index = text.index(':')
215                     count_text = int(text[:index].strip())
216                     self.assertEqual(count_text, counter)
217                     write_time = float(text[index+1:].strip())
218                     time_diffs.append((receive_time - write_time)*1000.)
219                     if counter == n_texts-1:
220                         if DEBUG:
221                             print('stop since have {0} reads'.format(counter))
222                         break
223             if DEBUG:
224                 print('time diffs in ms: {0}'.format(time_diffs))
225             self.assertTrue(max(time_diffs) < 100.,
226                             'read took more than 100ms (max was {0:.3f}ms)!'
227                             .format(max(time_diffs)))
228
229     def test_line_read(self):
230         """ write partial lines, full lines and multiple lines """
231
232         pause_time = 0.01  # 100 tps (texts per second)
233         encoding = None
234         use_logging = False
235         texts = ['line0\n', 'line1\n', 'li', 'ne2\n', 'line3\n',
236                  'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e',
237                  '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n',
238                  'end\n', '\nend\n', '\n\nend\n\n']
239         lines_expected = ['line{0}'.format(idx) for idx in range(12)] \
240                          + ['', '', '']
241
242         # create writer
243         LogFileWriter(self.temp_file, texts, n_writes=len(texts),
244                       pause_time=pause_time, do_encode=encoding,
245                       use_logging=use_logging).start()
246
247         # read
248         lines_read = []
249         with open(self.temp_file, 'rt') as file_handle:
250             reader = LineReader(file_handle)
251             self.helper_test_len(reader, 1)
252
253             for line_expected, (_, line_read, _) in zip(lines_expected, reader):
254                 if 'end' in line_read:
255                     break
256                 else:
257                     if DEBUG:
258                         print('expect "{0}", read "{1}"'.format(line_expected,
259                                                                 line_read))
260                     self.assertEqual(line_expected, line_read)
261
262
263 if __name__ == '__main__':
264     unittest.main()