f5ebe61782142c869fb265fc5e1adea40dd17997
[pyi2ncommon] / test / test_log_read.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
20
21 """ Unittests for log_read
22
23 Creates own thread to write data to a log file
24 """
25
26 import unittest
27 from threading import Thread
28 from tempfile import mkstemp
29 import os
30 import time
31 import logging
32 from warnings import warn
33
34 from src.log_read import IterativeReader, LineReader, LogReadWarning
35
36 # get best clock
37 from sys import version_info
38 if version_info.major == 2:
39     raise NotImplementedError('pyi2ncommon is no longer compatible with py2')
40 elif version_info.minor < 4:
41     perf_counter = time.clock
42 else:
43     perf_counter = time.perf_counter
44
45 DEBUG = False
46
47
48 class LogFileWriter(Thread):
49     """ thread that creates and writes to given file """
50
51     def __init__(self, file_name, text_pattern, n_writes=None,
52                  pause_time=0.1, do_encode=None, use_logging=True):
53         """ creates thread, deamon is True
54
55         if n_writes is None, will write indefinitely; else writes text_pattern
56         n_writes times, formatted with (counter, perf_counter)
57         If do_encode is True, will encode text to bytes and open file handle
58         in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text.
59         If use_logging is False, will open file and run file_handle.write;
60         If use_logging is True, will create logger that logs to file and use
61         logging.info (no file_handle.write)
62         """
63         super(LogFileWriter, self).__init__()
64         self.daemon = True
65         self.file_name = file_name
66         self.text_pattern = text_pattern
67         self.n_writes = n_writes
68         self.pause_time = pause_time
69         self.do_encode = do_encode
70         self.use_logging = use_logging
71
72     def run(self):
73         counter = 0
74         if self.do_encode:
75             mode = 'wb'
76             buffering = 0  # no buffering -- only allowed for byte mode
77         else:
78             mode = 'wt'
79             buffering = 1  # line buffering -- only allowed for text mode
80
81         if self.use_logging:
82             logging.basicConfig(filename=self.file_name, level=logging.INFO,
83                                 format='%(msg)s')
84             while True:
85                 if self.n_writes is not None and counter >= self.n_writes:
86                     break
87                 self.write_and_sleep(logging.info, counter)
88                 counter += 1
89         else:
90             with open(self.file_name, mode=mode, buffering=buffering) \
91                     as file_handle:
92                 while True:
93                     if self.n_writes is not None and counter >= self.n_writes:
94                         break
95                     self.write_and_sleep(file_handle.write, counter)
96                     counter += 1
97
98     def write_and_sleep(self, write_func, counter):
99         """ format text, write it using given function and sleep """
100         if isinstance(self.text_pattern, (list, tuple)):
101             text = self.text_pattern[counter]
102         else:
103             text = self.text_pattern
104         text = text.format(counter, perf_counter())
105
106         if self.do_encode:
107             text = text.encode(self.do_encode)
108         write_func(text)
109         time.sleep(self.pause_time)
110
111
112 class LogReadTester(unittest.TestCase):
113     """ class with all the tests """
114
115     def setUp(self):
116         """ called before each test """
117         if DEBUG:
118             print('setup test')
119         temp_handle, temp_name = mkstemp()
120         os.close(temp_handle)
121         self.temp_file = temp_name
122         if DEBUG:
123             print('created temp file ' + self.temp_file)
124
125     def tearDown(self):
126         """ called after each test """
127         if DEBUG:
128             print('tear down test')
129         if os.path.isfile(self.temp_file):
130             if DEBUG:
131                 print('delete temp file' + self.temp_file)
132             os.unlink(self.temp_file)
133
134     def helper_test_len(self, reader, n_expected):
135         """ helper function that tests length of vars in reader """
136         self.assertEqual(reader.n_sources(), n_expected)
137         self.assertEqual(len(reader.file_objs), n_expected)
138         self.assertEqual(len(reader.file_handles), n_expected)
139         self.assertEqual(len(reader.descriptions), n_expected)
140         self.assertEqual(len(reader.ignore), n_expected)
141         self.assertEqual(len(reader.last_sizes), n_expected)
142
143     def test_args(self):
144         self.assertRaises(TypeError, IterativeReader)  # no args
145         self.assertRaises(ValueError, IterativeReader, [], 'test')
146         self.assertRaises(ValueError, IterativeReader, [], ['test', ])
147         self.assertRaises(ValueError, IterativeReader, self.temp_file)
148         self.assertRaises(ValueError, IterativeReader, [self.temp_file, ])
149         with open(self.temp_file, 'rt') as file_handle:
150             reader = IterativeReader(file_handle)
151             self.helper_test_len(reader, 1)
152             reader = IterativeReader([file_handle, ])
153             self.helper_test_len(reader, 1)
154             reader = IterativeReader(file_handle, 'desc')
155             self.helper_test_len(reader, 1)
156             reader = IterativeReader([file_handle, ], ['desc', ])
157             self.helper_test_len(reader, 1)
158             reader = IterativeReader(file_handle, ['desc', ])
159             self.helper_test_len(reader, 1)
160             self.assertRaises(ValueError, IterativeReader,
161                               [file_handle, ], 'desc', )
162             reader = IterativeReader([file_handle, file_handle],
163                                      ['desc1', 'desc2'])
164             self.helper_test_len(reader, 2)
165             reader = IterativeReader((file_handle for idx in range(5)))
166             self.helper_test_len(reader, 5)
167             self.assertRaises(ValueError, IterativeReader,
168                               (file_handle for idx in range(5)),
169                               tuple('desc' for idx in range(4)))
170             self.assertRaises(ValueError, IterativeReader,
171                               (file_handle for idx in range(5)),
172                               ('desc' for idx in range(6)))
173
174     def test_simple_read(self):
175         """ write fixed number of lines, see how fast they are retrieved """
176
177         # need newline only when writing text (because of write buffering)
178         param_combinations = ('{0}:{1}\n', None,    False), \
179                              ('{0}:{1}\n', 'ascii', False), \
180                              ('{0}:{1} ' , 'ascii', False)
181                              #('{0}:{1}\n', None   , True), \  logging seems
182                              #('{0}:{1}\n', 'ascii', True), \  to buffer writes
183                              #('{0}:{1} ' , None   , True), \  to files
184                              #('{0}:{1} ' , 'ascii', True)
185
186         n_texts = 10
187         pause_time = 0.01  # 100 tps (texts per second)
188
189         for text_pattern, encoding, use_logging in param_combinations:
190             LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts,
191                           pause_time=pause_time, do_encode=encoding,
192                           use_logging=use_logging).start()
193             if DEBUG:
194                 print('testing with log file {0}'.format(self.temp_file))
195                 print('encoding is {0}, use logging = {1}'.format(encoding,
196                                                                   use_logging))
197             time_diffs = []
198
199             with open(self.temp_file, 'rt') as file_handle:
200                 reader = IterativeReader(file_handle)
201                 self.helper_test_len(reader, 1)
202                 counter = -1        # we may have to adapt this manually
203                 for desc, text, source_idx in reader:
204                     receive_time = perf_counter()
205                     self.assertEqual(desc, self.temp_file)
206                     self.assertEqual(source_idx, 0)
207                     counter += 1
208                     text = text.strip()
209                     if DEBUG:
210                         print('{1}: received text "{0}" at {2}'
211                               .format(text, counter, receive_time))
212                     if counter == 0 and not text:
213                         # if reader runs stat() before we write, we might get
214                         # a warning and one empty read here
215                         counter -= 1
216                         warn('Got an empty read, you should have seen another '
217                              'warning about file shrinking',
218                              category=LogReadWarning)
219                         continue
220                     index = text.index(':')
221                     count_text = int(text[:index].strip())
222                     self.assertEqual(count_text, counter)
223                     write_time = float(text[index+1:].strip())
224                     time_diffs.append((receive_time - write_time)*1000.)
225                     if counter == n_texts-1:
226                         if DEBUG:
227                             print('stop since have {0} reads'.format(counter))
228                         break
229             if DEBUG:
230                 print('time diffs in ms: {0}'.format(time_diffs))
231             self.assertTrue(max(time_diffs) < 100.,
232                             'read took more than 100ms (max was {0:.3f}ms)!'
233                             .format(max(time_diffs)))
234
235     def test_line_read(self):
236         """ write partial lines, full lines and multiple lines """
237
238         pause_time = 0.01  # 100 tps (texts per second)
239         encoding = None
240         use_logging = False
241         texts = ['line0\n', 'line1\n', 'li', 'ne2\n', 'line3\n',
242                  'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e',
243                  '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n',
244                  'end\n', '\nend\n', '\n\nend\n\n']
245         lines_expected = ['line{0}'.format(idx) for idx in range(12)] \
246                          + ['', '', '']
247
248         # create writer
249         LogFileWriter(self.temp_file, texts, n_writes=len(texts),
250                       pause_time=pause_time, do_encode=encoding,
251                       use_logging=use_logging).start()
252
253         # read
254         lines_read = []
255         with open(self.temp_file, 'rt') as file_handle:
256             reader = LineReader(file_handle)
257             self.helper_test_len(reader, 1)
258
259             for line_expected, (_, line_read, _) in zip(lines_expected, reader):
260                 if 'end' in line_read:
261                     break
262                 else:
263                     if DEBUG:
264                         print('expect "{0}", read "{1}"'.format(line_expected,
265                                                                 line_read))
266                     self.assertEqual(line_expected, line_read)
267
268
269 if __name__ == '__main__':
270     unittest.main()