68c916055b4531a1d660590205380cf8f0ec170f
[pyi2ncommon] / test / test_log_read.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
20
21 """ Unittests for log_read
22
23 Creates own thread to write data to a log file
24 """
25
26 import unittest
27 from threading import Thread
28 from tempfile import mkstemp
29 import os
30 import time
31 import logging
32 from warnings import warn
33
34 from src.log_read import *
35
36 # get best clock
37 perf_counter = time.perf_counter
38
39 DEBUG = False
40
41
42 class LogFileWriter(Thread):
43     """ thread that creates and writes to given file """
44
45     def __init__(self, file_name, text_pattern, n_writes=None,
46                  pause_time=0.1, do_encode=None, use_logging=True):
47         """ creates thread, daemon is True
48
49         if n_writes is None, will write indefinitely; else writes text_pattern
50         n_writes times, formatted with (counter, perf_counter)
51         If do_encode is True, will encode text to bytes and open file handle
52         in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text.
53         If use_logging is False, will open file and run file_handle.write;
54         If use_logging is True, will create logger that logs to file and use
55         logging.info (no file_handle.write)
56         """
57         super(LogFileWriter, self).__init__()
58         self.daemon = True
59         self.file_name = file_name
60         self.text_pattern = text_pattern
61         self.n_writes = n_writes
62         self.pause_time = pause_time
63         self.do_encode = do_encode
64         self.use_logging = use_logging
65
66     def run(self):
67         counter = 0
68         if self.do_encode:
69             mode = 'wb'
70             buffering = 0  # no buffering -- only allowed for byte mode
71         else:
72             mode = 'wt'
73             buffering = 1  # line buffering -- only allowed for text mode
74
75         if self.use_logging:
76             logging.basicConfig(filename=self.file_name, level=logging.INFO,
77                                 format='%(msg)s')
78             while True:
79                 if self.n_writes is not None and counter >= self.n_writes:
80                     break
81                 self.write_and_sleep(logging.info, counter)
82                 counter += 1
83         else:
84             with open(self.file_name, mode=mode, buffering=buffering) \
85                     as file_handle:
86                 while True:
87                     if self.n_writes is not None and counter >= self.n_writes:
88                         break
89                     self.write_and_sleep(file_handle.write, counter)
90                     counter += 1
91
92     def write_and_sleep(self, write_func, counter):
93         """ format text, write it using given function and sleep """
94         if isinstance(self.text_pattern, (list, tuple)):
95             text = self.text_pattern[counter]
96         else:
97             text = self.text_pattern
98         text = text.format(counter, perf_counter())
99
100         if self.do_encode:
101             text = text.encode(self.do_encode)
102         write_func(text)
103         time.sleep(self.pause_time)
104
105
106 class LogReadTester(unittest.TestCase):
107     """ class with all the tests """
108
109     def setUp(self):
110         """ called before each test """
111         if DEBUG:
112             print('setup test')
113         temp_handle, temp_name = mkstemp()
114         os.close(temp_handle)
115         self.temp_file = temp_name
116         if DEBUG:
117             print('created temp file ' + self.temp_file)
118
119     def tearDown(self):
120         """ called after each test """
121         if DEBUG:
122             print('tear down test')
123         if os.path.isfile(self.temp_file):
124             if DEBUG:
125                 print('delete temp file' + self.temp_file)
126             os.unlink(self.temp_file)
127
128     def helper_test_len(self, reader, n_expected):
129         """ helper function that tests length of vars in reader """
130         self.assertEqual(reader.n_sources(), n_expected)
131         self.assertEqual(len(reader.file_objs), n_expected)
132         self.assertEqual(len(reader.file_handles), n_expected)
133         self.assertEqual(len(reader.descriptions), n_expected)
134         self.assertEqual(len(reader.ignore), n_expected)
135         self.assertEqual(len(reader.last_sizes), n_expected)
136
137     def test_args(self):
138         self.assertRaises(TypeError, IterativeReader)  # no args
139         self.assertRaises(ValueError, IterativeReader, [], 'test')
140         self.assertRaises(ValueError, IterativeReader, [], ['test', ])
141         self.assertRaises(ValueError, IterativeReader, self.temp_file)
142         self.assertRaises(ValueError, IterativeReader, [self.temp_file, ])
143         with open(self.temp_file, 'rt') as file_handle:
144             reader = IterativeReader(file_handle)
145             self.helper_test_len(reader, 1)
146             reader = IterativeReader([file_handle, ])
147             self.helper_test_len(reader, 1)
148             reader = IterativeReader(file_handle, 'desc')
149             self.helper_test_len(reader, 1)
150             reader = IterativeReader([file_handle, ], ['desc', ])
151             self.helper_test_len(reader, 1)
152             reader = IterativeReader(file_handle, ['desc', ])
153             self.helper_test_len(reader, 1)
154             self.assertRaises(ValueError, IterativeReader,
155                               [file_handle, ], 'desc', )
156             reader = IterativeReader([file_handle, file_handle],
157                                      ['desc1', 'desc2'])
158             self.helper_test_len(reader, 2)
159             reader = IterativeReader((file_handle for _ in range(5)))
160             self.helper_test_len(reader, 5)
161             self.assertRaises(ValueError, IterativeReader,
162                               (file_handle for idx in range(5)),
163                               tuple('desc' for idx in range(4)))
164             self.assertRaises(ValueError, IterativeReader,
165                               (file_handle for idx in range(5)),
166                               ('desc' for idx in range(6)))
167
168     def test_simple_read(self):
169         """ write fixed number of lines, see how fast they are retrieved """
170
171         # need newline only when writing text (because of write buffering)
172         param_combinations = ('{0}:{1}\n', None,    False), \
173                              ('{0}:{1}\n', 'ascii', False), \
174                              ('{0}:{1} ' , 'ascii', False)
175                              #('{0}:{1}\n', None   , True), \  logging seems
176                              #('{0}:{1}\n', 'ascii', True), \  to buffer writes
177                              #('{0}:{1} ' , None   , True), \  to files
178                              #('{0}:{1} ' , 'ascii', True)
179
180         n_texts = 10
181         pause_time = 0.01  # 100 tps (texts per second)
182
183         for text_pattern, encoding, use_logging in param_combinations:
184             LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts,
185                           pause_time=pause_time, do_encode=encoding,
186                           use_logging=use_logging).start()
187             if DEBUG:
188                 print('testing with log file {0}'.format(self.temp_file))
189                 print('encoding is {0}, use logging = {1}'.format(encoding,
190                                                                   use_logging))
191             time_diffs = []
192
193             with open(self.temp_file, 'rt') as file_handle:
194                 reader = IterativeReader(file_handle, keep_watching=True)
195                 self.helper_test_len(reader, 1)
196                 counter = -1        # we may have to adapt this manually
197                 for desc, text, source_idx in reader:
198                     receive_time = perf_counter()
199                     self.assertEqual(desc, self.temp_file)
200                     self.assertEqual(source_idx, 0)
201                     counter += 1
202                     text = text.strip()
203                     if DEBUG:
204                         print('{1}: received text "{0}" at {2}'
205                               .format(text, counter, receive_time))
206                     if counter == 0 and not text:
207                         # if reader runs stat() before we write, we might get
208                         # a warning and one empty read here
209                         counter -= 1
210                         warn('Got an empty read, you should have seen another '
211                              'warning about file shrinking',
212                              category=LogReadWarning)
213                         continue
214                     index = text.index(':')
215                     count_text = int(text[:index].strip())
216                     self.assertEqual(count_text, counter)
217                     write_time = float(text[index+1:].strip())
218                     time_diffs.append((receive_time - write_time)*1000.)
219                     if counter == n_texts-1:
220                         if DEBUG:
221                             print('stop since have {0} reads'.format(counter))
222                         break
223             if DEBUG:
224                 print('time diffs in ms: {0}'.format(time_diffs))
225             self.assertTrue(max(time_diffs) < 100.,
226                             'read took more than 100ms (max was {0:.3f}ms)!'
227                             .format(max(time_diffs)))
228
229     def test_line_read(self):
230         """ write partial lines, full lines and multiple lines """
231
232         pause_time = 0.01  # 100 tps (texts per second)
233         encoding = None
234         use_logging = False
235         texts = ['line0\n', 'line1\n', 'li', 'ne2\n', 'line3\n',
236                  'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e',
237                  '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n',
238                  'end\n', '\nend\n', '\n\nend\n\n']
239         lines_expected = ['line{0}'.format(idx) for idx in range(12)] + ['', '', '']
240
241         # create writer
242         LogFileWriter(self.temp_file, texts, n_writes=len(texts),
243                       pause_time=pause_time, do_encode=encoding,
244                       use_logging=use_logging).start()
245
246         # read
247         with open(self.temp_file, 'rt') as file_handle:
248             reader = LineReader(file_handle, keep_watching=True)
249             self.helper_test_len(reader, 1)
250
251             for line_expected, (_, line_read, _) in zip(lines_expected, reader):
252                 if 'end' in line_read:
253                     break
254                 else:
255                     if DEBUG:
256                         print('expect "{0}", read "{1}"'.format(line_expected,
257                                                                 line_read))
258                     self.assertEqual(line_expected, line_read)
259
260     @unittest.skipIf(not os.access('/var/log/messages', os.R_OK),
261                      "messages not accessible")
262     def test_parse_messages(self):
263         """Try parsing first 100 lines of messages if running on linux"""
264         with LogParser.create_for('/var/log/messages', SYS_LOG_PATTERN) as parser:
265             for line_count, (_, data, _) in enumerate(parser):
266                 if line_count > 100:
267                     break
268                 self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
269
270     @unittest.skipIf(not os.access('/var/log/syslog', os.R_OK),
271                      "syslog not accessible")
272     def test_parse_syslog(self):
273         """Try parsing first 100 lines of syslog if running on linux"""
274         with LogParser.create_for('/var/log/syslog', SYS_LOG_PATTERN) as parser:
275             for line_count, (_, data, _) in enumerate(parser):
276                 if line_count > 100:
277                     break
278                 self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
279
280     @unittest.skipIf(not os.access('/var/log/maillog', os.R_OK),
281                      "maillog not accessible")
282     def test_parse_maillog(self):
283         """Try parsing first 100 lines of maillog if running on linux"""
284         with LogParser.create_for('/var/log/maillog', SYS_LOG_PATTERN) as parser:
285             for line_count, (_, data, _) in enumerate(parser):
286                 if line_count > 100:
287                     break
288                 self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
289
290     @unittest.skipIf(not os.access('/var/log/squid/access.log', os.R_OK),
291                      "proxy log not accessible")
292     def test_parse_proxy_log(self):
293         """Try parsing first 100 lines of proxy log if running on linux"""
294         with LogParser.create_for('/var/log/squid/access.log', PROXY_LOG_PATTERN) as parser:
295             for line_count, (_, data, _) in enumerate(parser):
296                 if line_count > 100:
297                     break
298                 self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
299
300
301 if __name__ == '__main__':
302     unittest.main()