daf8ea685c43316c271f92b505ef1a9396e038a5
[pyi2ncommon] / test / test_log_read.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
20
21 """ Unittests for log_read
22
23 Creates own thread to write data to a log file
24 """
25
26 import unittest
27 from threading import Thread
28 from tempfile import mkstemp
29 import os
30 import time
31 import logging
32
33 from src.log_read import IterativeReader, LineReader
34
35 # get best clock
36 from sys import version_info
37 if version_info.major == 2:
38     raise NotImplementedError('pyi2ncommon is no longer compatible with py2')
39 elif version_info.minor < 4:
40     perf_counter = time.clock
41 else:
42     perf_counter = time.perf_counter
43
44 DEBUG = False
45
46
47 class LogFileWriter(Thread):
48     """ thread that creates and writes to given file """
49
50     def __init__(self, file_name, text_pattern, n_writes=None,
51                  pause_time=0.1, do_encode=None, use_logging=True):
52         """ creates thread, deamon is True
53
54         if n_writes is None, will write indefinitely; else writes text_pattern
55         n_writes times, formatted with (counter, perf_counter)
56         If do_encode is True, will encode text to bytes and open file handle
57         in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text.
58         If use_logging is False, will open file and run file_handle.write;
59         If use_logging is True, will create logger that logs to file and use
60         logging.info (no file_handle.write)
61         """
62         super(LogFileWriter, self).__init__()
63         self.daemon = True
64         self.file_name = file_name
65         self.text_pattern = text_pattern
66         self.n_writes = n_writes
67         self.pause_time = pause_time
68         self.do_encode = do_encode
69         self.use_logging = use_logging
70
71     def run(self):
72         counter = 0
73         if self.do_encode:
74             mode = 'wb'
75             buffering = 0  # no buffering -- only allowed for byte mode
76         else:
77             mode = 'wt'
78             buffering = 1  # line buffering -- only allowed for text mode
79
80         if self.use_logging:
81             logging.basicConfig(filename=self.file_name, level=logging.INFO,
82                                 format='%(msg)s')
83             while True:
84                 if self.n_writes is not None and counter >= self.n_writes:
85                     break
86                 self.write_and_sleep(logging.info, counter)
87                 counter += 1
88         else:
89             with open(self.file_name, mode=mode, buffering=buffering) \
90                     as file_handle:
91                 while True:
92                     if self.n_writes is not None and counter >= self.n_writes:
93                         break
94                     self.write_and_sleep(file_handle.write, counter)
95                     counter += 1
96
97     def write_and_sleep(self, write_func, counter):
98         """ format text, write it using given function and sleep """
99         if isinstance(self.text_pattern, (list, tuple)):
100             text = self.text_pattern[counter]
101         else:
102             text = self.text_pattern
103         text = text.format(counter, perf_counter())
104
105         if self.do_encode:
106             text = text.encode(self.do_encode)
107         write_func(text)
108         time.sleep(self.pause_time)
109
110
111 class LogReadTester(unittest.TestCase):
112     """ class with all the tests """
113
114     def setUp(self):
115         """ called before each test """
116         if DEBUG:
117             print('setup test')
118         temp_handle, temp_name = mkstemp()
119         os.close(temp_handle)
120         self.temp_file = temp_name
121         if DEBUG:
122             print('created temp file ' + self.temp_file)
123
124     def tearDown(self):
125         """ called after each test """
126         if DEBUG:
127             print('tear down test')
128         if os.path.isfile(self.temp_file):
129             if DEBUG:
130                 print('delete temp file' + self.temp_file)
131             os.unlink(self.temp_file)
132
133     def helper_test_len(self, reader, n_expected):
134         """ helper function that tests length of vars in reader """
135         self.assertEqual(reader.n_sources(), n_expected)
136         self.assertEqual(len(reader.file_objs), n_expected)
137         self.assertEqual(len(reader.file_descs), n_expected)
138         self.assertEqual(len(reader.descriptions), n_expected)
139         self.assertEqual(len(reader.ignore), n_expected)
140         self.assertEqual(len(reader.last_sizes), n_expected)
141
142     def test_args(self):
143         self.assertRaises(TypeError, IterativeReader)  # no args
144         self.assertRaises(ValueError, IterativeReader, [], 'test')
145         self.assertRaises(ValueError, IterativeReader, [], ['test', ])
146         self.assertRaises(ValueError, IterativeReader, self.temp_file)
147         self.assertRaises(ValueError, IterativeReader, [self.temp_file, ])
148         with open(self.temp_file, 'rt') as file_handle:
149             reader = IterativeReader(file_handle)
150             self.helper_test_len(reader, 1)
151             reader = IterativeReader([file_handle, ])
152             self.helper_test_len(reader, 1)
153             reader = IterativeReader(file_handle, 'desc')
154             self.helper_test_len(reader, 1)
155             reader = IterativeReader([file_handle, ], ['desc', ])
156             self.helper_test_len(reader, 1)
157             reader = IterativeReader(file_handle, ['desc', ])
158             self.helper_test_len(reader, 1)
159             self.assertRaises(ValueError, IterativeReader,
160                               [file_handle, ], 'desc', )
161             reader = IterativeReader([file_handle, file_handle],
162                                      ['desc1', 'desc2'])
163             self.helper_test_len(reader, 2)
164             reader = IterativeReader((file_handle for idx in range(5)))
165             self.helper_test_len(reader, 5)
166             self.assertRaises(ValueError, IterativeReader,
167                               (file_handle for idx in range(5)),
168                               tuple('desc' for idx in range(4)))
169             self.assertRaises(ValueError, IterativeReader,
170                               (file_handle for idx in range(5)),
171                               ('desc' for idx in range(6)))
172
173     def test_simple_read(self):
174         """ write fixed number of lines, see how fast they are retrieved """
175
176         # need newline only when writing text (because of write buffering)
177         param_combinations = ('{0}:{1}\n', None,    False), \
178                              ('{0}:{1}\n', 'ascii', False), \
179                              ('{0}:{1} ' , 'ascii', False)
180                              #('{0}:{1}\n', None   , True), \  logging seems
181                              #('{0}:{1}\n', 'ascii', True), \  to buffer writes
182                              #('{0}:{1} ' , None   , True), \  to files
183                              #('{0}:{1} ' , 'ascii', True)
184
185         n_texts = 10
186         pause_time = 0.01  # 100 tps (texts per second)
187
188         for text_pattern, encoding, use_logging in param_combinations:
189             LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts,
190                           pause_time=pause_time, do_encode=encoding,
191                           use_logging=use_logging).start()
192             if DEBUG:
193                 print('testing with log file {0}'.format(self.temp_file))
194                 print('encoding is {0}, use logging = {1}'.format(encoding,
195                                                                   use_logging))
196             time_diffs = []
197
198             with open(self.temp_file, 'rt') as file_handle:
199                 reader = IterativeReader(file_handle)
200                 self.helper_test_len(reader, 1)
201                 for counter, (desc, text) in enumerate(reader):
202                     receive_time = perf_counter()
203                     text = text.strip()
204                     if DEBUG:
205                         print('{1}: received text "{0}" at {2}'
206                               .format(text, counter, receive_time))
207                     index = text.index(':')
208                     count_text = int(text[:index].strip())
209                     self.assertEqual(count_text, counter)
210                     write_time = float(text[index+1:].strip())
211                     time_diffs.append((receive_time - write_time)*1000.)
212                     if counter == n_texts-1:
213                         if DEBUG:
214                             print('stop since have {0} reads'.format(counter))
215                         break
216             if DEBUG:
217                 print('time diffs in ms: {0}'.format(time_diffs))
218             self.assertTrue(max(time_diffs) < 100.,
219                             'read took more than 100ms (max was {0:.3f}ms)!'
220                             .format(max(time_diffs)))
221
222     def test_line_read(self):
223         """ write partial lines, full lines and multiple lines """
224
225         pause_time = 0.01  # 100 tps (texts per second)
226         encoding = None
227         use_logging = False
228         texts = ['line0\n', 'line1\n', 'li', 'ne2\n', 'line3\n',
229                  'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e',
230                  '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n',
231                  'end\n', '\nend\n', '\n\nend\n\n']
232         lines_expected = ['line{0}'.format(idx) for idx in range(12)] \
233                          + ['', '', '']
234
235         # create writer
236         LogFileWriter(self.temp_file, texts, n_writes=len(texts),
237                       pause_time=pause_time, do_encode=encoding,
238                       use_logging=use_logging).start()
239
240         # read
241         lines_read = []
242         with open(self.temp_file, 'rt') as file_handle:
243             reader = LineReader(file_handle)
244             self.helper_test_len(reader, 1)
245
246             for line_expected, (_, line_read) in zip(lines_expected, reader):
247                 if 'end' in line_read:
248                     break
249                 else:
250                     if DEBUG:
251                         print('expect "{0}", read "{1}"'.format(line_expected,
252                                                                 line_read))
253                     self.assertEqual(line_expected, line_read)
254
255
256 if __name__ == '__main__':
257     unittest.main()