log_read: Deal with rare case of wrong fstat at test start
[pyi2ncommon] / test / test_log_read.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
20
21 """ Unittests for log_read
22
23 Creates own thread to write data to a log file
24 """
25
26 import unittest
27 from threading import Thread
28 from tempfile import mkstemp
29 import os
30 import time
31 import logging
32 from warnings import warn
33
34 from src.log_read import IterativeReader, LineReader, LogReadWarning
35
36 # get best clock
37 from sys import version_info
38 if version_info.major == 2:
39     raise NotImplementedError('pyi2ncommon is no longer compatible with py2')
40 elif version_info.minor < 4:
41     perf_counter = time.clock
42 else:
43     perf_counter = time.perf_counter
44
45 DEBUG = False
46
47
48 class LogFileWriter(Thread):
49     """ thread that creates and writes to given file """
50
51     def __init__(self, file_name, text_pattern, n_writes=None,
52                  pause_time=0.1, do_encode=None, use_logging=True):
53         """ creates thread, deamon is True
54
55         if n_writes is None, will write indefinitely; else writes text_pattern
56         n_writes times, formatted with (counter, perf_counter)
57         If do_encode is True, will encode text to bytes and open file handle
58         in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text.
59         If use_logging is False, will open file and run file_handle.write;
60         If use_logging is True, will create logger that logs to file and use
61         logging.info (no file_handle.write)
62         """
63         super(LogFileWriter, self).__init__()
64         self.daemon = True
65         self.file_name = file_name
66         self.text_pattern = text_pattern
67         self.n_writes = n_writes
68         self.pause_time = pause_time
69         self.do_encode = do_encode
70         self.use_logging = use_logging
71
72     def run(self):
73         counter = 0
74         if self.do_encode:
75             mode = 'wb'
76             buffering = 0  # no buffering -- only allowed for byte mode
77         else:
78             mode = 'wt'
79             buffering = 1  # line buffering -- only allowed for text mode
80
81         if self.use_logging:
82             logging.basicConfig(filename=self.file_name, level=logging.INFO,
83                                 format='%(msg)s')
84             while True:
85                 if self.n_writes is not None and counter >= self.n_writes:
86                     break
87                 self.write_and_sleep(logging.info, counter)
88                 counter += 1
89         else:
90             with open(self.file_name, mode=mode, buffering=buffering) \
91                     as file_handle:
92                 while True:
93                     if self.n_writes is not None and counter >= self.n_writes:
94                         break
95                     self.write_and_sleep(file_handle.write, counter)
96                     counter += 1
97
98     def write_and_sleep(self, write_func, counter):
99         """ format text, write it using given function and sleep """
100         if isinstance(self.text_pattern, (list, tuple)):
101             text = self.text_pattern[counter]
102         else:
103             text = self.text_pattern
104         text = text.format(counter, perf_counter())
105
106         if self.do_encode:
107             text = text.encode(self.do_encode)
108         write_func(text)
109         time.sleep(self.pause_time)
110
111
112 class LogReadTester(unittest.TestCase):
113     """ class with all the tests """
114
115     def setUp(self):
116         """ called before each test """
117         if DEBUG:
118             print('setup test')
119         temp_handle, temp_name = mkstemp()
120         os.close(temp_handle)
121         self.temp_file = temp_name
122         if DEBUG:
123             print('created temp file ' + self.temp_file)
124
125     def tearDown(self):
126         """ called after each test """
127         if DEBUG:
128             print('tear down test')
129         if os.path.isfile(self.temp_file):
130             if DEBUG:
131                 print('delete temp file' + self.temp_file)
132             os.unlink(self.temp_file)
133
134     def helper_test_len(self, reader, n_expected):
135         """ helper function that tests length of vars in reader """
136         self.assertEqual(reader.n_sources(), n_expected)
137         self.assertEqual(len(reader.file_objs), n_expected)
138         self.assertEqual(len(reader.file_descs), n_expected)
139         self.assertEqual(len(reader.descriptions), n_expected)
140         self.assertEqual(len(reader.ignore), n_expected)
141         self.assertEqual(len(reader.last_sizes), n_expected)
142
143     def test_args(self):
144         self.assertRaises(TypeError, IterativeReader)  # no args
145         self.assertRaises(ValueError, IterativeReader, [], 'test')
146         self.assertRaises(ValueError, IterativeReader, [], ['test', ])
147         self.assertRaises(ValueError, IterativeReader, self.temp_file)
148         self.assertRaises(ValueError, IterativeReader, [self.temp_file, ])
149         with open(self.temp_file, 'rt') as file_handle:
150             reader = IterativeReader(file_handle)
151             self.helper_test_len(reader, 1)
152             reader = IterativeReader([file_handle, ])
153             self.helper_test_len(reader, 1)
154             reader = IterativeReader(file_handle, 'desc')
155             self.helper_test_len(reader, 1)
156             reader = IterativeReader([file_handle, ], ['desc', ])
157             self.helper_test_len(reader, 1)
158             reader = IterativeReader(file_handle, ['desc', ])
159             self.helper_test_len(reader, 1)
160             self.assertRaises(ValueError, IterativeReader,
161                               [file_handle, ], 'desc', )
162             reader = IterativeReader([file_handle, file_handle],
163                                      ['desc1', 'desc2'])
164             self.helper_test_len(reader, 2)
165             reader = IterativeReader((file_handle for idx in range(5)))
166             self.helper_test_len(reader, 5)
167             self.assertRaises(ValueError, IterativeReader,
168                               (file_handle for idx in range(5)),
169                               tuple('desc' for idx in range(4)))
170             self.assertRaises(ValueError, IterativeReader,
171                               (file_handle for idx in range(5)),
172                               ('desc' for idx in range(6)))
173
174     def test_simple_read(self):
175         """ write fixed number of lines, see how fast they are retrieved """
176
177         # need newline only when writing text (because of write buffering)
178         param_combinations = ('{0}:{1}\n', None,    False), \
179                              ('{0}:{1}\n', 'ascii', False), \
180                              ('{0}:{1} ' , 'ascii', False)
181                              #('{0}:{1}\n', None   , True), \  logging seems
182                              #('{0}:{1}\n', 'ascii', True), \  to buffer writes
183                              #('{0}:{1} ' , None   , True), \  to files
184                              #('{0}:{1} ' , 'ascii', True)
185
186         n_texts = 10
187         pause_time = 0.01  # 100 tps (texts per second)
188
189         for text_pattern, encoding, use_logging in param_combinations:
190             LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts,
191                           pause_time=pause_time, do_encode=encoding,
192                           use_logging=use_logging).start()
193             if DEBUG:
194                 print('testing with log file {0}'.format(self.temp_file))
195                 print('encoding is {0}, use logging = {1}'.format(encoding,
196                                                                   use_logging))
197             time_diffs = []
198
199             with open(self.temp_file, 'rt') as file_handle:
200                 reader = IterativeReader(file_handle)
201                 self.helper_test_len(reader, 1)
202                 counter = -1        # we may have to adapt this manually
203                 for desc, text in reader:
204                     receive_time = perf_counter()
205                     counter += 1
206                     text = text.strip()
207                     if DEBUG:
208                         print('{1}: received text "{0}" at {2}'
209                               .format(text, counter, receive_time))
210                     if counter == 0 and not text:
211                         # if reader runs stat() before we write, we might get
212                         # a warning and one empty read here
213                         counter -= 1
214                         warn('Got an empty read, you should have seen another '
215                              'warning about file shrinking',
216                              category=LogReadWarning)
217                         continue
218                     index = text.index(':')
219                     count_text = int(text[:index].strip())
220                     self.assertEqual(count_text, counter)
221                     write_time = float(text[index+1:].strip())
222                     time_diffs.append((receive_time - write_time)*1000.)
223                     if counter == n_texts-1:
224                         if DEBUG:
225                             print('stop since have {0} reads'.format(counter))
226                         break
227             if DEBUG:
228                 print('time diffs in ms: {0}'.format(time_diffs))
229             self.assertTrue(max(time_diffs) < 100.,
230                             'read took more than 100ms (max was {0:.3f}ms)!'
231                             .format(max(time_diffs)))
232
233     def test_line_read(self):
234         """ write partial lines, full lines and multiple lines """
235
236         pause_time = 0.01  # 100 tps (texts per second)
237         encoding = None
238         use_logging = False
239         texts = ['line0\n', 'line1\n', 'li', 'ne2\n', 'line3\n',
240                  'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e',
241                  '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n',
242                  'end\n', '\nend\n', '\n\nend\n\n']
243         lines_expected = ['line{0}'.format(idx) for idx in range(12)] \
244                          + ['', '', '']
245
246         # create writer
247         LogFileWriter(self.temp_file, texts, n_writes=len(texts),
248                       pause_time=pause_time, do_encode=encoding,
249                       use_logging=use_logging).start()
250
251         # read
252         lines_read = []
253         with open(self.temp_file, 'rt') as file_handle:
254             reader = LineReader(file_handle)
255             self.helper_test_len(reader, 1)
256
257             for line_expected, (_, line_read) in zip(lines_expected, reader):
258                 if 'end' in line_read:
259                     break
260                 else:
261                     if DEBUG:
262                         print('expect "{0}", read "{1}"'.format(line_expected,
263                                                                 line_read))
264                     self.assertEqual(line_expected, line_read)
265
266
267 if __name__ == '__main__':
268     unittest.main()