log_read: Just warn when file shrinks, try reading anyway
[pyi2ncommon] / src / log_read.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
20
21 """ Iterative reading of log files
22
23 Basic Functionality (class :py:class:`IterativeReader`):
24 Runs stat in a loop to find out whether file size has changed. Then reads the
25 new data and forwards that
26
27 .. todo:: Want to also use lsof to find out whether file/pipe/socket was closed,
28          so can return from read loop
29
30 :py:class:`LineReader` takes output of :py:class:`IterativeReader` and returns
31 it line-wise as is normal for log files
32
33 :py:class:`LogParser` takes those lines and tries to parse them into fields
34 like date, time, module name, urgency and message.
35
36 .. todo:: auto-detect log line layout
37 """
38
39 import os
40 import re
41 from warnings import warn
42 import os.path
43 import logging
44 from .iter_helpers import zip_longest
45 from .type_helpers import is_str_or_byte, is_file_obj
46
47
48 class LogReadWarning(UserWarning):
49     """ warnings issued by classes in this module """
50     pass
51
52
53 def true_func(unused_argument_but_that_is_ok):
54     """ does nothing, always returns True """
55     return True
56
57
58 def check_is_used(some_file_or_handle):
59     """ check whether file is being written to
60
61     to be implemented, e.g. using lsof
62     """
63     raise NotImplementedError()
64
65
66 _create_description_unknown_counter = 0
67
68
69 def create_description(file_obj, file_desc):
70     """ create some description for given file-like object / file descriptor
71
72     :param file_obj: file-like object
73     :param int file_desc: os-level file descriptor
74     :returns: string
75     """
76
77     global _create_description_unknown_counter
78
79     try:
80         desc = file_obj.name
81         if desc:
82             return desc
83     except AttributeError:
84         pass
85
86     if file_desc is not None:
87         return 'file{0}'.format(file_desc)
88     else:
89         _create_description_unknown_counter += 1
90         return 'unknown{0}'.format(_create_description_unknown_counter)
91
92
93 #: error message for IterativeReader constructor
94 _STR_ERR = 'not accepting file name "{0}" since cannot guarantee closing ' \
95            'files --> use with open(file_name)!'
96
97
98 class IterativeReader(object):
99     """ reads from a given file
100
101     Uses os.stat(file_obj.fileno()).st_size as measure whether file has changed
102     or not; Always reads as much data as possible
103
104     Catches most common exceptions in iteration (not constructor)
105
106     Does not care about closing files, so does not accept file names
107
108     This is the base for class :py:class:`LineReader` that just has to
109     implement a different :py:meth:`prepare_result` method
110     """
111
112     def __init__(self, sources, descs=None, return_when_done=False):
113         """ creates a reader; does some basic checks on args
114
115         :param sources: iterable over sources. Sources can be opened file
116                         objects or read-opened os-level file descriptors.
117                         Calling code has to ensure they are closed properly, so
118                         best use this within a "with open(file_name) as
119                         file_handle:"-context. If sources is a single file
120                         obj/descriptor, both source and desc will be converted
121                         to lists of length 1
122         :param descs: can be anything of same length as sources. If sources is
123                       a single source, then descs is also converted to a list
124                       of length 1. If not given (i.e. None), will use
125                       :py:func:`create_description` to guess descriptions
126         :param bool return_when_done: ignore file_handle if no-one is writing
127                                       to it any more. Return from iterator when
128                                       all watched files are done (not
129                                       implemented yet)
130         :raises: OSError when testing fstat on source
131         """
132         if not sources:
133             raise ValueError('need at least some source!')
134         elif is_str_or_byte(sources):
135             raise ValueError(_STR_ERR.format(sources))
136         elif is_file_obj(sources) or isinstance(sources, int):
137             source_input = [sources, ]
138             desc_input = [descs, ]
139         else:
140             source_input = sources  # assume some iterable
141             desc_input = descs
142
143         # now divide sources into os-level file descriptors for os.fstat,
144         # and file objects for read()
145         self.file_objs = []
146         self.file_descs = []          # file descriptOR, not descriptION
147         for source in source_input:
148             if is_file_obj(source):
149                 self.file_objs.append(source)
150                 self.file_descs.append(source.fileno())
151             elif isinstance(source, int):
152                 self.file_objs.append(os.fdopen(source))
153                 self.file_descs.append(source)
154             elif is_str_or_byte(source):
155                 raise ValueError(_STR_ERR.format(source))
156             else:
157                 raise ValueError('source {0} is neither file obj nor file '
158                                  'descriptor!')
159
160             # try to fstat the new file descriptor just for testing
161             os.fstat(self.file_descs[-1])
162
163         # guess descriptions if not given
164         if not desc_input:
165             self.descriptions = [create_description(obj, file_desc)
166                                  for obj, file_desc
167                                  in zip(self.file_objs, self.file_descs)]
168         else:
169             try:
170                 if len(desc_input) != len(self.file_objs):
171                     raise ValueError('need same number of sources and '
172                                      'descriptions!')
173             except TypeError:
174                 pass  # desc_input is generator or so
175
176             self.descriptions = []
177             for obj, file_desc, description in \
178                     zip_longest(self.file_objs, self.file_descs, desc_input):
179                 if obj is None:
180                     raise ValueError('more descriptions than sources!')
181                 elif description is None:
182                     self.descriptions.append(create_description(obj,
183                                                                 file_desc))
184                 else:
185                     self.descriptions.append(description)
186
187         self.last_sizes = [0 for _ in self.file_objs]
188         self.ignore = [False for _ in self.file_objs]
189
190         if return_when_done:
191             self.is_used = check_is_used
192         else:
193             self.is_used = true_func
194
195         for obj, file_desc, description in \
196                 zip(self.file_objs, self.file_descs, self.descriptions):
197             logging.debug('log_read initialized with file descriptor {0}, '
198                           'file obj {1}, description "{2}"'
199                           .format(file_desc, obj, description))
200
201     def n_sources(self):
202         return len(self.file_objs)
203
204     def n_active_sources(self):
205         return len(self.ignore) - sum(self.ignore)
206
207     def __iter__(self):
208         while True:
209             for idx, (obj, file_desc, description, last_size, do_ignore) in \
210                     enumerate(zip(self.file_objs, self.file_descs,
211                                   self.descriptions, self.last_sizes,
212                                   self.ignore)):
213                 if do_ignore:
214                     continue
215
216                 # get new file size
217                 new_size = os.fstat(file_desc).st_size
218
219                 # compare to old size
220                 if new_size == last_size:
221                     if not self.is_used(file_desc):
222                         warn('no one is writing to {0} / {1} -- '
223                              'stop watching it!'
224                              .format(file_desc, description),
225                              category=LogReadWarning)
226                         self.ignore[idx] = True
227                 else:
228                     if new_size < last_size:
229                         warn('{0} / {1} has become smaller ({2} --> {3})!'
230                              .format(obj, description, last_size, new_size),
231                              category=LogReadWarning)
232                     try:
233                         new_data = obj.read()
234                     except OSError as ose:    # includes IOErrors
235                         warn('io error reading from {0} / {1}: {2})'
236                              .format(obj, description, ose),
237                              category=LogReadWarning)
238                         new_data = str(ose)
239                     except UnicodeDecodeError as ude:
240                         warn('unicode error reading from {0} / {1}: {2}'
241                              .format(obj, description, ude),
242                              category=LogReadWarning)
243                         new_data = str(ude)
244
245                     # post-processing
246                     to_yield = self.prepare_result(description, new_data, idx)
247                     for result in to_yield:
248                         yield result
249
250                     # prepare next iteration
251                     self.last_sizes[idx] = new_size
252
253     def prepare_result(self, description, data, idx):
254         """ from raw new data create some yield-able results
255
256         to be intended for overwriting in sub-classes
257
258         this function is called from __iter__ for each new data that becomes
259         available. It has to return some iterable whose entries are yielded
260         from iteration over objects of this class.
261
262         It receives the following args:
263         - the description of the source
264         - the data itself
265         - the index of the source
266         The result must be an iterable of objects, which are yielded as-is, so
267         can have any form
268
269         This base implementation just returns its input in a list, so new data
270         is yielded from __iter__ as-is
271         """
272         return [(description, data), ]
273
274
275 LINE_SPLITTERS = '\n\r'
276
277
278 class LineReader(IterativeReader):
279     """ an IterativeReader that returns new data line-wise
280
281     this means buffering partial line data
282     """
283
284     def __init__(self, *args, **kwargs):
285         """ forwards all args and kwargs to :py:class:`IterativeReader` """
286         super(LineReader, self).__init__(*args, **kwargs)
287         self.line_buffers = ['' for _ in range(self.n_sources())]
288
289     def prepare_result(self, description, new_data, idx):
290         """ take raw new data and split it into lines
291
292         if line is not complete, then buffer it
293
294         returns lines without their newline characters
295         """
296         all_data = self.line_buffers[idx] + new_data
297         self.line_buffers[idx] = ''
298         result = []
299         should_be_no_new_lines = False
300         for line in all_data.splitlines(True):
301             if line[-1] in LINE_SPLITTERS:
302                 result.append((description, line.rstrip(LINE_SPLITTERS)))
303             elif should_be_no_new_lines:
304                 raise ValueError('line splitters are not compatible with'
305                                  'str.splitlines!')
306             else:
307                 self.line_buffers[idx] = line
308                 should_be_no_new_lines = True  # (this should be the last)
309
310         return result
311
312
313 class LogParser(LineReader):
314     """ takes lines from LineReader and parses their contents
315
316     requires a pattern for log lines, auto-detection is not implemented yet
317
318     Iteration returns re.match result or -- if matching failed -- the original
319     raw line
320     """
321
322     def __init__(self, log_file, pattern=None):
323         """ create a LogParser
324
325         :param str log_file: name of log file to parse (required!)
326         :param pattern: regexp to split log lines; None (default) to return
327                         line as they are
328         """
329         super(LogParser, self).__init__(log_file)
330
331         self.pattern = pattern
332
333     def prepare_result(self, *args):
334         # let super class split data into lines
335         for _, raw_line in super(LogParser, self).prepare_result(*args):
336             result = re.match(self.pattern, raw_line)
337             if result:
338                 yield result
339             else:
340                 yield raw_line