1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
21 """ Unittests for log_read
23 Creates own thread to write data to a log file
27 from threading import Thread
28 from tempfile import mkstemp
32 from warnings import warn
34 from src.log_read import *
37 perf_counter = time.perf_counter
42 class LogFileWriter(Thread):
43 """ thread that creates and writes to given file """
45 def __init__(self, file_name, text_pattern, n_writes=None,
46 pause_time=0.1, do_encode=None, use_logging=True):
47 """ creates thread, daemon is True
49 if n_writes is None, will write indefinitely; else writes text_pattern
50 n_writes times, formatted with (counter, perf_counter)
51 If do_encode is True, will encode text to bytes and open file handle
52 in 'wb' mode; otherwise opens in 'wt' mode and writes unicode text.
53 If use_logging is False, will open file and run file_handle.write;
54 If use_logging is True, will create logger that logs to file and use
55 logging.info (no file_handle.write)
57 super(LogFileWriter, self).__init__()
59 self.file_name = file_name
60 self.text_pattern = text_pattern
61 self.n_writes = n_writes
62 self.pause_time = pause_time
63 self.do_encode = do_encode
64 self.use_logging = use_logging
70 buffering = 0 # no buffering -- only allowed for byte mode
73 buffering = 1 # line buffering -- only allowed for text mode
76 logging.basicConfig(filename=self.file_name, level=logging.INFO,
79 if self.n_writes is not None and counter >= self.n_writes:
81 self.write_and_sleep(logging.info, counter)
84 with open(self.file_name, mode=mode, buffering=buffering) \
87 if self.n_writes is not None and counter >= self.n_writes:
89 self.write_and_sleep(file_handle.write, counter)
92 def write_and_sleep(self, write_func, counter):
93 """ format text, write it using given function and sleep """
94 if isinstance(self.text_pattern, (list, tuple)):
95 text = self.text_pattern[counter]
97 text = self.text_pattern
98 text = text.format(counter, perf_counter())
101 text = text.encode(self.do_encode)
103 time.sleep(self.pause_time)
106 class LogReadTester(unittest.TestCase):
107 """ class with all the tests """
110 """ called before each test """
113 temp_handle, temp_name = mkstemp()
114 os.close(temp_handle)
115 self.temp_file = temp_name
117 print('created temp file ' + self.temp_file)
120 """ called after each test """
122 print('tear down test')
123 if os.path.isfile(self.temp_file):
125 print('delete temp file' + self.temp_file)
126 os.unlink(self.temp_file)
128 def helper_test_len(self, reader, n_expected):
129 """ helper function that tests length of vars in reader """
130 self.assertEqual(reader.n_sources(), n_expected)
131 self.assertEqual(len(reader.file_objs), n_expected)
132 self.assertEqual(len(reader.file_handles), n_expected)
133 self.assertEqual(len(reader.descriptions), n_expected)
134 self.assertEqual(len(reader.ignore), n_expected)
135 self.assertEqual(len(reader.last_sizes), n_expected)
138 self.assertRaises(TypeError, IterativeReader) # no args
139 self.assertRaises(ValueError, IterativeReader, [], 'test')
140 self.assertRaises(ValueError, IterativeReader, [], ['test', ])
141 self.assertRaises(ValueError, IterativeReader, self.temp_file)
142 self.assertRaises(ValueError, IterativeReader, [self.temp_file, ])
143 with open(self.temp_file, 'rt') as file_handle:
144 reader = IterativeReader(file_handle)
145 self.helper_test_len(reader, 1)
146 reader = IterativeReader([file_handle, ])
147 self.helper_test_len(reader, 1)
148 reader = IterativeReader(file_handle, 'desc')
149 self.helper_test_len(reader, 1)
150 reader = IterativeReader([file_handle, ], ['desc', ])
151 self.helper_test_len(reader, 1)
152 reader = IterativeReader(file_handle, ['desc', ])
153 self.helper_test_len(reader, 1)
154 self.assertRaises(ValueError, IterativeReader,
155 [file_handle, ], 'desc', )
156 reader = IterativeReader([file_handle, file_handle],
158 self.helper_test_len(reader, 2)
159 reader = IterativeReader((file_handle for _ in range(5)))
160 self.helper_test_len(reader, 5)
161 self.assertRaises(ValueError, IterativeReader,
162 (file_handle for idx in range(5)),
163 tuple('desc' for idx in range(4)))
164 self.assertRaises(ValueError, IterativeReader,
165 (file_handle for idx in range(5)),
166 ('desc' for idx in range(6)))
168 def test_simple_read(self):
169 """ write fixed number of lines, see how fast they are retrieved """
171 # need newline only when writing text (because of write buffering)
172 param_combinations = ('{0}:{1}\n', None, False), \
173 ('{0}:{1}\n', 'ascii', False), \
174 ('{0}:{1} ' , 'ascii', False)
175 #('{0}:{1}\n', None , True), \ logging seems
176 #('{0}:{1}\n', 'ascii', True), \ to buffer writes
177 #('{0}:{1} ' , None , True), \ to files
178 #('{0}:{1} ' , 'ascii', True)
181 pause_time = 0.01 # 100 tps (texts per second)
183 for text_pattern, encoding, use_logging in param_combinations:
184 LogFileWriter(self.temp_file, text_pattern, n_writes=n_texts,
185 pause_time=pause_time, do_encode=encoding,
186 use_logging=use_logging).start()
188 print('testing with log file {0}'.format(self.temp_file))
189 print('encoding is {0}, use logging = {1}'.format(encoding,
193 with open(self.temp_file, 'rt') as file_handle:
194 reader = IterativeReader(file_handle, keep_watching=True)
195 self.helper_test_len(reader, 1)
196 counter = -1 # we may have to adapt this manually
197 for desc, text, source_idx in reader:
198 receive_time = perf_counter()
199 self.assertEqual(desc, self.temp_file)
200 self.assertEqual(source_idx, 0)
204 print('{1}: received text "{0}" at {2}'
205 .format(text, counter, receive_time))
206 if counter == 0 and not text:
207 # if reader runs stat() before we write, we might get
208 # a warning and one empty read here
210 warn('Got an empty read, you should have seen another '
211 'warning about file shrinking',
212 category=LogReadWarning)
214 index = text.index(':')
215 count_text = int(text[:index].strip())
216 self.assertEqual(count_text, counter)
217 write_time = float(text[index+1:].strip())
218 time_diffs.append((receive_time - write_time)*1000.)
219 if counter == n_texts-1:
221 print('stop since have {0} reads'.format(counter))
224 print('time diffs in ms: {0}'.format(time_diffs))
225 self.assertTrue(max(time_diffs) < 100.,
226 'read took more than 100ms (max was {0:.3f}ms)!'
227 .format(max(time_diffs)))
229 def test_line_read(self):
230 """ write partial lines, full lines and multiple lines """
232 pause_time = 0.01 # 100 tps (texts per second)
235 texts = ['line0\n', 'line1\n', 'li', 'ne2\n', 'line3\n',
236 'line4\nline5\n', 'li', 'ne6\nli', 'ne7\nl', 'i', 'n', 'e',
237 '8', '\n', 'l', 'ine9\nline10\nline1', '1', '\n', '\n', '\n',
238 'end\n', '\nend\n', '\n\nend\n\n']
239 lines_expected = ['line{0}'.format(idx) for idx in range(12)] + ['', '', '']
242 LogFileWriter(self.temp_file, texts, n_writes=len(texts),
243 pause_time=pause_time, do_encode=encoding,
244 use_logging=use_logging).start()
247 with open(self.temp_file, 'rt') as file_handle:
248 reader = LineReader(file_handle, keep_watching=True)
249 self.helper_test_len(reader, 1)
251 for line_expected, (_, line_read, _) in zip(lines_expected, reader):
252 if 'end' in line_read:
256 print('expect "{0}", read "{1}"'.format(line_expected,
258 self.assertEqual(line_expected, line_read)
260 def test_parse_syslog_lines(self):
261 """Test parsing a few fixed lines of syslog"""
263 Nov 8 08:10:01 system-name rsyslogd: [origin software="rsyslogd" swVersion="8.39.0" x-pid="1968" x-info="http://www.rsyslog.com"] rsyslogd was HUPed
264 Nov 8 08:10:12 system-name squid[2762]: Starting new redirector helpers...#012 current master transaction: master58
265 Nov 8 08:10:28 system-name pop30[3922]: counts: retr=<0> top=<0> dele=<0>
266 Nov 8 08:10:43 system-name pop3loc[3923]: counts: retr=<0> top=<0> dele=<0>
267 Nov 8 08:10:44 system-name pingcheck[3004]: Status (0 down, limit=4, notify up now): report link up?
268 Nov 8 08:10:44 system-name connd[2917]: [subsys_pingcheck] received notification that connection for id 1 is online
269 Nov 8 08:10:44 system-name connd[2917]: [subsys_pingcheck] Connection [ConnInfo P1 (connection name): online] was confirmed online
270 Nov 8 08:10:44 system-name connd[2917]: [connection_manager] Confirm that provider "P1" is online
271 Nov 8 08:11:01 system-name mon[3563]: failure for intranator dirwatchd 1667891461 (NO SUMMARY)
272 Nov 8 08:11:04 system-name lmtpunix[3962]: Delivered: <cmu-lmtpd-3962-1667891464-0@system-name.m.i2n> to mailbox: user.cyrus as uid: 49792
273 Nov 8 08:11:04 system-name lmtpunix[3962]: USAGE cyrus user: 0.000000 sys: 0.006803
274 Nov 8 08:11:20 system-name pop30[3982]: counts: retr=<0> top=<0> dele=<0>
275 Oct 21 07:52:39 system-name avupdate[3671]: successfully installed new antivirus database
276 Oct 21 07:52:39 system-name savapi_version_info[3973]: AVE=8.3.64.214, VDF=8.19.26.110, Released=21.10.2022 05:39
277 Oct 21 07:52:39 system-name connd[3180]: [connection_manager] online mode set to always online
278 Oct 21 07:52:47 system-name kernel: REJECT local IN=eth0 OUT= MACSRC=68:4f:64:75:7b:82 MACDST=01:00:5e:00:00:01 MACPROTO=0800 SRC=123.45.67.89 DST=224.0.0.1 LEN=36 TOS=0x00 PREC=0x00 TTL=1 ID=34431 PROTO=ICMP TYPE=9 CODE=0
279 Oct 21 07:52:54 system-name lmtpunix[2715]: Delivered: <cmu-lmtpd-2715-1666331574-0@system-name.m.i2n> to mailbox: user.cyrus as uid: 46558
280 Oct 21 07:52:54 system-name lmtpunix[2715]: USAGE cyrus user: 0.010564 sys: 0.000000
281 Oct 21 07:53:08 system-name pop30[2703]: counts: retr=<0> top=<0> dele=<0>
282 Oct 21 07:53:08 system-name pop3loc[2701]: counts: retr=<0> top=<0> dele=<0>
283 Oct 21 07:53:30 system-name ntpd[3701]: adjusting local clock by -0.943974s
284 Oct 21 07:54:15 system-name keymakerd[3206]: [acmeautorenew] Starting auto renew check. @acmeautorenew#285
285 Oct 21 07:54:15 system-name keymakerd[3206]: [acmeautorenew] Finishing auto renew thread. @acmeautorenew#206
286 Oct 21 07:54:23 system-name pop3loc[2701]: counts: retr=<0> top=<0> dele=<0>
287 Oct 21 07:54:41 system-name pop30[2703]: counts: retr=<0> top=<0> dele=<0>
288 Oct 21 07:54:51 system-name squid[3025]: Starting new redirector helpers...#012 current master transaction: master54
289 Oct 21 07:54:55 system-name lmtpunix[3997]: Delivered: <cmu-lmtpd-3997-1666331695-0@system-name.m.i2n> to mailbox: user.cyrus as uid: 46559
290 Oct 21 07:54:55 system-name lmtpunix[3997]: USAGE cyrus user: 0.002022 sys: 0.005926
292 lines = [line.lstrip() for line in syslog_lines.splitlines() if line.lstrip()]
293 LogFileWriter(self.temp_file, lines, n_writes=len(lines)).start()
294 with open(self.temp_file, 'rt') as file_handle:
295 parser = LogParser(file_handle, SYS_LOG_PATTERN)
296 for _, data, _ in parser:
297 self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
299 def test_parse_proxy_lines(self):
301 1667901672.645 0 127.0.0.1 NONE_NONE/000 0 - error:transaction-end-before-headers - HIER_NONE/- - -
302 1667901792.812 0 127.0.0.1 NONE_NONE/000 0 - error:transaction-end-before-headers - HIER_NONE/- - -
303 1667901912.997 0 127.0.0.1 NONE_NONE/000 0 - error:transaction-end-before-headers - HIER_NONE/- - -
304 1667902032.168 0 127.0.0.1 NONE_NONE/000 0 - error:transaction-end-before-headers - HIER_NONE/- - -
305 1667902055.130 571 192.168.1.1 TCP_TUNNEL/200 10403 CONNECT apod.nasa.gov:443 - HIER_DIRECT/apod.nasa.gov - -
306 1667902063.333 969 192.168.1.1 TCP_TUNNEL/200 174425 CONNECT www.startpage.com:443 - HIER_DIRECT/www.startpage.com - -
307 1667902063.352 17 192.168.1.1 TCP_TUNNEL/200 39 CONNECT www.startpage.com:443 - HIER_DIRECT/www.startpage.com - -
308 1667902063.539 24 192.168.1.1 TCP_TUNNEL/200 39 CONNECT www.startpage.com:443 - HIER_DIRECT/www.startpage.com - -
309 1667902063.545 207 192.168.1.1 TCP_TUNNEL/200 1720 CONNECT www.startpage.com:443 - HIER_DIRECT/www.startpage.com - -
310 1667902064.141 611 192.168.1.1 TCP_TUNNEL/200 10100 CONNECT www.startpage.com:443 - HIER_DIRECT/www.startpage.com - -
311 1667902072.816 974 192.168.1.1 TCP_MISS/200 981 POST http://r3.o.lencr.org/ - FIRSTUP_PARENT/127.0.0.1 application/ocsp-response -
312 1667902089.338 16331 192.168.1.1 TCP_TUNNEL/200 103739 CONNECT www.roma-pizza-tuebingen.de:443 - HIER_DIRECT/www.roma-pizza-tuebingen.de - -
313 1667902089.339 16332 192.168.1.1 TCP_TUNNEL/200 49795 CONNECT www.roma-pizza-tuebingen.de:443 - HIER_DIRECT/www.roma-pizza-tuebingen.de - -
314 1667902089.339 16330 192.168.1.1 TCP_TUNNEL/200 24641 CONNECT www.roma-pizza-tuebingen.de:443 - HIER_DIRECT/www.roma-pizza-tuebingen.de - -
315 1667902105.718 0 192.168.1.1 NONE_NONE/000 0 - error:transaction-end-before-headers - HIER_NONE/- - -
316 1667902152.347 0 127.0.0.1 NONE_NONE/000 0 - error:transaction-end-before-headers - HIER_NONE/- - -
318 lines = [line.lstrip() for line in proxy_lines.splitlines() if line.lstrip()]
319 LogFileWriter(self.temp_file, lines, n_writes=len(lines)).start()
320 with open(self.temp_file, 'rt') as file_handle:
321 parser = LogParser(file_handle, PROXY_LOG_PATTERN)
322 for _, data, _ in parser:
323 self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
325 def test_parse_maillog_lines(self):
327 Nov 7 10:07:33 system-name amavis[1]: SA info: rules: meta test I2N_RAZOR_ADJUST_4 has dependency 'RAZOR2_CF_RANGE_E8_51_100' with a zero score
328 Nov 7 10:07:33 system-name amavis[1]: SpamControl: init_pre_fork on SpamAssassin done
329 Nov 7 10:07:35 system-name quarantined[3574]: quarantined version 0.15 (build Jun 28 2022 17:26:39) up and running
330 Nov 7 10:07:52 system-name mailreconfig[3713]: called by sh[3712]|generate[3663], mode update, changed files: aliases,recipients
331 Nov 7 10:07:53 system-name postfix/postfix-script[3724]: refreshing the Postfix mail system
332 Nov 7 10:07:53 system-name postfix/master[2548]: reload -- version 3.3.22, configuration /etc/postfix
333 Nov 7 10:08:07 system-name mailreconfig[3851]: called by sh[3850]|generate[3804], mode update, changed files: aliases,recipients
334 Nov 7 10:08:07 system-name postfix/postfix-script[3862]: refreshing the Postfix mail system
335 Nov 7 10:08:07 system-name postfix/master[2548]: reload -- version 3.3.22, configuration /etc/postfix
336 Nov 7 10:08:11 system-name postfix/smtpd[3933]: connect from system-name.net.lan[127.0.0.1]
337 Nov 7 10:08:11 system-name postfix/smtpd[3933]: D9C626B: client=system-name.net.lan[127.0.0.1]
338 Nov 7 10:08:11 system-name postfix/cleanup[3936]: D9C626B: message-id=<20221107090811.D9C626B@system-name.net.lan>
339 Nov 7 10:08:11 system-name postfix/qmgr[3868]: D9C626B: from=<test-sender@system-name.net.lan>, size=10888, nrcpt=1 (queue active)
340 Nov 7 10:08:11 system-name postfix/smtpd[3933]: disconnect from system-name.net.lan[127.0.0.1] ehlo=1 mail=1 rcpt=1 data=1 quit=1 commands=5
341 Nov 7 10:08:11 system-name postfix/smtpd[3933]: connect from system-name.net.lan[127.0.0.1]
342 Nov 7 10:08:11 system-name postfix/smtpd[3933]: E59146D: client=system-name.net.lan[127.0.0.1]
343 Nov 7 10:08:11 system-name postfix/cleanup[3936]: E59146D: message-id=<20221107090811.E59146D@system-name.net.lan>
344 Nov 7 10:08:11 system-name amavis[11]: (00011-01) LMTP::10024 /var/spool/vscan/amavis/tmp/amavis-20221107T100811-00011: <test-sender@system-name.net.lan> -> <test-recipient@system-name.net.lan> SIZE=10888 Received: from system-name.net.lan ([127.0.0.1]) by localhost (system-name.net.lan [127.0.0.1]) (amavisd-new, port 10024) with LMTP for <test-recipient@system-name.net.lan>; Mon, 7 Nov 2022 10:08:11 +0100 (CET)
345 Nov 7 10:08:12 system-name postfix/qmgr[3868]: E59146D: from=<test-sender@system-name.net.lan>, size=9472, nrcpt=1 (queue active)
346 Nov 7 10:08:12 system-name postfix/smtpd[3933]: disconnect from system-name.net.lan[127.0.0.1] ehlo=1 mail=1 rcpt=1 data=1 quit=1 commands=5
347 Nov 7 10:08:12 system-name postfix/smtpd[3933]: connect from system-name.net.lan[127.0.0.1]
348 Nov 7 10:08:12 system-name amavis[11]: (00011-01) Checking: q4eVqFn169Kq [127.0.0.1] <test-sender@system-name.net.lan> -> <test-recipient@system-name.net.lan>
349 Nov 7 10:08:12 system-name amavis[11]: (00011-01) p.path BANNED:1 test-recipient@system-name.net.lan: "P=p003,L=1,M=multipart/mixed | P=p002,L=1/2,M=application/x-msdownload,T=exe,T=exe-ms,N=cloudcar.exe", matching_key="(?^i:\\.exe$)"
350 Nov 7 10:08:12 system-name postfix/qmgr[3868]: 4B0CF70: from=<test-sender@system-name.net.lan>, size=164391, nrcpt=1 (queue active)
352 lines = [line.lstrip() for line in mail_lines.splitlines() if line.lstrip()]
353 LogFileWriter(self.temp_file, lines, n_writes=len(lines)).start()
354 with open(self.temp_file, 'rt') as file_handle:
355 parser = LogParser(file_handle, SYS_LOG_PATTERN)
356 for _, data, _ in parser:
357 self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
359 @unittest.skipIf(not os.access('/var/log/messages', os.R_OK),
360 "messages not accessible")
361 def test_parse_messages(self):
362 """Try parsing first 100 lines of messages if running on linux"""
363 with LogParser.create_for('/var/log/messages', SYS_LOG_PATTERN) as parser:
364 for line_count, (_, data, _) in enumerate(parser):
367 self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
369 @unittest.skipIf(not os.access('/var/log/syslog', os.R_OK),
370 "syslog not accessible")
371 def test_parse_syslog(self):
372 """Try parsing first 100 lines of syslog if running on linux"""
373 with LogParser.create_for('/var/log/syslog', SYS_LOG_PATTERN) as parser:
374 for line_count, (_, data, _) in enumerate(parser):
377 self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
379 @unittest.skipIf(not os.access('/var/log/maillog', os.R_OK),
380 "maillog not accessible")
381 def test_parse_maillog(self):
382 """Try parsing first 100 lines of maillog if running on linux"""
383 with LogParser.create_for('/var/log/maillog', SYS_LOG_PATTERN) as parser:
384 for line_count, (_, data, _) in enumerate(parser):
387 self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
389 @unittest.skipIf(not os.access('/var/log/squid/access.log', os.R_OK),
390 "proxy log not accessible")
391 def test_parse_proxy_log(self):
392 """Try parsing first 100 lines of proxy log if running on linux"""
393 with LogParser.create_for('/var/log/squid/access.log', PROXY_LOG_PATTERN) as parser:
394 for line_count, (_, data, _) in enumerate(parser):
397 self.assertIsNotNone(data, f"Failed to parse {parser.last_unparsed_line}")
400 if __name__ == '__main__':