1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
21 """ Iterative reading of log files
23 Basic Functionality (class :py:class:`IterativeReader`):
24 Runs stat in a loop to find out whether file size has changed. Then reads the
25 new data and forwards that
27 .. todo:: Want to also use lsof to find out whether file/pipe/socket was closed,
28 so can return from read loop
30 :py:class:`LineReader` takes output of :py:class:`IterativeReader` and returns
31 it line-wise as is normal for log files
33 :py:class:`LogParser` takes those lines and tries to parse them into fields
34 like date, time, module name, urgency and message.
36 .. todo:: auto-detect log line layout
41 from warnings import warn
44 from .iter_helpers import zip_longest
45 from .type_helpers import is_str_or_byte, is_file_obj
48 class LogReadWarning(UserWarning):
49 """ warnings issued by classes in this module """
53 def true_func(unused_argument_but_that_is_ok):
54 """ does nothing, always returns True """
58 def check_is_used(some_file_or_handle):
59 """ check whether file is being written to
61 to be implemented, e.g. using lsof
63 raise NotImplementedError()
66 _create_description_unknown_counter = 0
69 def create_description(file_obj, file_desc):
70 """ create some description for given file-like object / file descriptor
72 :param file_obj: file-like object
73 :param int file_desc: os-level file descriptor
77 global _create_description_unknown_counter
83 except AttributeError:
86 if file_desc is not None:
87 return 'file{0}'.format(file_desc)
89 _create_description_unknown_counter += 1
90 return 'unknown{0}'.format(_create_description_unknown_counter)
93 #: error message for IterativeReader constructor
94 _STR_ERR = 'not accepting file name "{0}" since cannot guarantee closing ' \
95 'files --> use with open(file_name)!'
98 class IterativeReader(object):
99 """ reads from a given file
101 Uses os.stat(file_obj.fileno()).st_size as measure whether file has changed
102 or not; Always reads as much data as possible
104 Catches most common exceptions in iteration (not constructor)
106 Does not care about closing files, so does not accept file names
108 This is the base for class :py:class:`LineReader` that just has to
109 implement a different :py:meth:`prepare_result` method
112 def __init__(self, sources, descs=None, return_when_done=False):
113 """ creates a reader; does some basic checks on args
115 :param sources: iterable over sources. Sources can be opened file
116 objects or read-opened os-level file descriptors.
117 Calling code has to ensure they are closed properly, so
118 best use this within a "with open(file_name) as
119 file_handle:"-context. If sources is a single file
120 obj/descriptor, both source and desc will be converted
122 :param descs: can be anything of same length as sources. If sources is
123 a single source, then descs is also converted to a list
124 of length 1. If not given (i.e. None), will use
125 :py:func:`create_description` to guess descriptions
126 :param bool return_when_done: ignore file_handle if no-one is writing
127 to it any more. Return from iterator when
128 all watched files are done (not
130 :raises: OSError when testing fstat on source
133 raise ValueError('need at least some source!')
134 elif is_str_or_byte(sources):
135 raise ValueError(_STR_ERR.format(sources))
136 elif is_file_obj(sources) or isinstance(sources, int):
137 source_input = [sources, ]
138 desc_input = [descs, ]
140 source_input = sources # assume some iterable
143 # now divide sources into os-level file descriptors for os.fstat,
144 # and file objects for read()
146 self.file_descs = [] # file descriptOR, not descriptION
147 for source in source_input:
148 if is_file_obj(source):
149 self.file_objs.append(source)
150 self.file_descs.append(source.fileno())
151 elif isinstance(source, int):
152 self.file_objs.append(os.fdopen(source))
153 self.file_descs.append(source)
154 elif is_str_or_byte(source):
155 raise ValueError(_STR_ERR.format(source))
157 raise ValueError('source {0} is neither file obj nor file '
160 # try to fstat the new file descriptor just for testing
161 os.fstat(self.file_descs[-1])
163 # guess descriptions if not given
165 self.descriptions = [create_description(obj, file_desc)
167 in zip(self.file_objs, self.file_descs)]
170 if len(desc_input) != len(self.file_objs):
171 raise ValueError('need same number of sources and '
174 pass # desc_input is generator or so
176 self.descriptions = []
177 for obj, file_desc, description in \
178 zip_longest(self.file_objs, self.file_descs, desc_input):
180 raise ValueError('more descriptions than sources!')
181 elif description is None:
182 self.descriptions.append(create_description(obj,
185 self.descriptions.append(description)
187 self.last_sizes = [0 for _ in self.file_objs]
188 self.ignore = [False for _ in self.file_objs]
191 self.is_used = check_is_used
193 self.is_used = true_func
195 for obj, file_desc, description in \
196 zip(self.file_objs, self.file_descs, self.descriptions):
197 logging.debug('log_read initialized with file descriptor {0}, '
198 'file obj {1}, description "{2}"'
199 .format(file_desc, obj, description))
202 return len(self.file_objs)
204 def n_active_sources(self):
205 return len(self.ignore) - sum(self.ignore)
209 for idx, (obj, file_desc, description, last_size, do_ignore) in \
210 enumerate(zip(self.file_objs, self.file_descs,
211 self.descriptions, self.last_sizes,
218 new_size = os.fstat(file_desc).st_size
220 # compare to old size
221 if new_size == last_size:
222 if not self.is_used(file_desc):
223 warn('no one is writing to {0} / {1} -- '
225 .format(file_desc, description),
226 category=LogReadWarning)
227 self.ignore[idx] = True
228 elif new_size < last_size:
229 warn('{0} / {1} has become smaller ({2} --> {3})!'
230 .format(obj, description, last_size, new_size),
231 category=LogReadWarning)
232 else: # (new_size > last_size)
234 new_data = obj.read()
235 except OSError as ose: # includes IOErrors
236 warn('io error reading from {0} / {1}: {2})'
237 .format(obj, description, ose),
238 category=LogReadWarning)
240 except UnicodeDecodeError as ude:
241 warn('unicode error reading from {0} / {1}: {2}'
242 .format(obj, description, ude),
243 category=LogReadWarning)
247 to_yield = self.prepare_result(description, new_data, idx)
248 for result in to_yield:
251 # prepare next iteration
252 self.last_sizes[idx] = new_size
254 def prepare_result(self, description, data, idx):
255 """ from raw new data create some yield-able results
257 to be intended for overwriting in sub-classes
259 this function is called from __iter__ for each new data that becomes
260 available. It has to return some iterable whose entries are yielded
261 from iteration over objects of this class.
263 It receives the following args:
264 - the description of the source
266 - the index of the source
267 The result must be an iterable of objects, which are yielded as-is, so
270 This base implementation just returns its input in a list, so new data
271 is yielded from __iter__ as-is
273 return [(description, data), ]
276 LINE_SPLITTERS = '\n\r'
279 class LineReader(IterativeReader):
280 """ an IterativeReader that returns new data line-wise
282 this means buffering partial line data
285 def __init__(self, *args, **kwargs):
286 """ forwards all args and kwargs to :py:class:`IterativeReader` """
287 super(LineReader, self).__init__(*args, **kwargs)
288 self.line_buffers = ['' for _ in range(self.n_sources())]
290 def prepare_result(self, description, new_data, idx):
291 """ take raw new data and split it into lines
293 if line is not complete, then buffer it
295 returns lines without their newline characters
297 all_data = self.line_buffers[idx] + new_data
298 self.line_buffers[idx] = ''
300 should_be_no_new_lines = False
301 for line in all_data.splitlines(True):
302 if line[-1] in LINE_SPLITTERS:
303 result.append((description, line.rstrip(LINE_SPLITTERS)))
304 elif should_be_no_new_lines:
305 raise ValueError('line splitters are not compatible with'
308 self.line_buffers[idx] = line
309 should_be_no_new_lines = True # (this should be the last)
314 class LogParser(LineReader):
315 """ takes lines from LineReader and parses their contents
317 requires a pattern for log lines, auto-detection is not implemented yet
319 Iteration returns re.match result or -- if matching failed -- the original
323 def __init__(self, log_file, pattern=None):
324 """ create a LogParser
326 :param str log_file: name of log file to parse (required!)
327 :param pattern: regexp to split log lines; None (default) to return
330 super(LogParser, self).__init__(log_file)
332 self.pattern = pattern
334 def prepare_result(self, *args):
335 # let super class split data into lines
336 for _, raw_line in super(LogParser, self).prepare_result(*args):
337 result = re.match(self.pattern, raw_line)