1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
21 """ Iterative reading of log files
23 Basic Functionality (class :py:class:`IterativeReader`):
24 Runs stat in a loop to find out whether file size has changed. Then reads the
25 new data and forwards that
27 .. todo:: Want to also use lsof to find out whether file/pipe/socket was closed,
28 so can return from read loop
30 :py:class:`LineReader` takes output of :py:class:`IterativeReader` and returns
31 it line-wise as is normal for log files
33 :py:class:`LogParser` takes those lines and tries to parse them into fields
34 like date, time, module name, urgency and message.
36 .. todo:: auto-detect log line layout
41 from warnings import warn
44 from .iter_helpers import zip_longest
45 from .type_helpers import is_str_or_byte, is_file_obj
48 class LogReadWarning(UserWarning):
49 """ warnings issued by classes in this module """
53 def true_func(unused_argument_but_that_is_ok):
54 """ does nothing, always returns True """
58 def check_is_used(some_file_or_handle):
59 """ check whether file is being written to
61 to be implemented, e.g. using lsof
63 raise NotImplementedError()
66 _create_description_unknown_counter = 0
69 def create_description(file_obj, file_desc):
70 """ create some description for given file-like object / file descriptor
72 :param file_obj: file-like object
73 :param int file_desc: os-level file descriptor
77 global _create_description_unknown_counter
83 except AttributeError:
86 if file_desc is not None:
87 return 'file{0}'.format(file_desc)
89 _create_description_unknown_counter += 1
90 return 'unknown{0}'.format(_create_description_unknown_counter)
93 #: error message for IterativeReader constructor
94 _STR_ERR = 'not accepting file name "{0}" since cannot guarantee closing ' \
95 'files --> use with open(file_name)!'
98 class IterativeReader(object):
99 """ reads from a given file
101 Uses os.stat(file_obj.fileno()).st_size as measure whether file has changed
102 or not; Always reads as much data as possible
104 Catches most common exceptions in iteration (not constructor)
106 Does not care about closing files, so does not accept file names
108 This is the base for class :py:class:`LineReader` that just has to
109 implement a different :py:meth:`prepare_result` method
112 def __init__(self, sources, descs=None, return_when_done=False):
113 """ creates a reader; does some basic checks on args
115 :param sources: iterable over sources. Sources can be opened file
116 objects or read-opened os-level file descriptors.
117 Calling code has to ensure they are closed properly, so
118 best use this within a "with open(file_name) as
119 file_handle:"-context. If sources is a single file
120 obj/descriptor, both source and desc will be converted
122 :param descs: can be anything of same length as sources. If sources is
123 a single source, then descs is also converted to a list
124 of length 1. If not given (i.e. None), will use
125 :py:func:`create_description` to guess descriptions
126 :param bool return_when_done: ignore file_handle if no-one is writing
127 to it any more. Return from iterator when
128 all watched files are done (not
130 :raises: OSError when testing fstat on source
133 raise ValueError('need at least some source!')
134 elif is_str_or_byte(sources):
135 raise ValueError(_STR_ERR.format(sources))
136 elif is_file_obj(sources) or isinstance(sources, int):
137 source_input = [sources, ]
138 desc_input = [descs, ]
140 source_input = sources # assume some iterable
143 # now divide sources into os-level file descriptors for os.fstat,
144 # and file objects for read()
146 self.file_descs = [] # file descriptOR, not descriptION
147 for source in source_input:
148 if is_file_obj(source):
149 self.file_objs.append(source)
150 self.file_descs.append(source.fileno())
151 elif isinstance(source, int):
152 self.file_objs.append(os.fdopen(source))
153 self.file_descs.append(source)
154 elif is_str_or_byte(source):
155 raise ValueError(_STR_ERR.format(source))
157 raise ValueError('source {0} is neither file obj nor file '
160 # try to fstat the new file descriptor just for testing
161 os.fstat(self.file_descs[-1])
163 # guess descriptions if not given
165 self.descriptions = [create_description(obj, file_desc)
167 in zip(self.file_objs, self.file_descs)]
170 if len(desc_input) != len(self.file_objs):
171 raise ValueError('need same number of sources and '
174 pass # desc_input is generator or so
176 self.descriptions = []
177 for obj, file_desc, description in \
178 zip_longest(self.file_objs, self.file_descs, desc_input):
180 raise ValueError('more descriptions than sources!')
181 elif description is None:
182 self.descriptions.append(create_description(obj,
185 self.descriptions.append(description)
187 self.last_sizes = [0 for _ in self.file_objs]
188 self.ignore = [False for _ in self.file_objs]
191 self.is_used = check_is_used
193 self.is_used = true_func
195 for obj, file_desc, description in \
196 zip(self.file_objs, self.file_descs, self.descriptions):
197 logging.debug('log_read initialized with file descriptor {0}, '
198 'file obj {1}, description "{2}"'
199 .format(file_desc, obj, description))
202 return len(self.file_objs)
204 def n_active_sources(self):
205 return len(self.ignore) - sum(self.ignore)
209 for idx, (obj, file_desc, description, last_size, do_ignore) in \
210 enumerate(zip(self.file_objs, self.file_descs,
211 self.descriptions, self.last_sizes,
217 new_size = os.fstat(file_desc).st_size
219 # compare to old size
220 if new_size == last_size:
221 if not self.is_used(file_desc):
222 warn('no one is writing to {0} / {1} -- '
224 .format(file_desc, description),
225 category=LogReadWarning)
226 self.ignore[idx] = True
228 if new_size < last_size:
229 warn('{0} / {1} has become smaller ({2} --> {3})!'
230 .format(obj, description, last_size, new_size),
231 category=LogReadWarning)
233 new_data = obj.read()
234 except OSError as ose: # includes IOErrors
235 warn('io error reading from {0} / {1}: {2})'
236 .format(obj, description, ose),
237 category=LogReadWarning)
239 except UnicodeDecodeError as ude:
240 warn('unicode error reading from {0} / {1}: {2}'
241 .format(obj, description, ude),
242 category=LogReadWarning)
246 to_yield = self.prepare_result(description, new_data, idx)
247 for result in to_yield:
250 # prepare next iteration
251 self.last_sizes[idx] = new_size
253 def prepare_result(self, description, data, idx):
254 """ from raw new data create some yield-able results
256 to be intended for overwriting in sub-classes
258 this function is called from __iter__ for each new data that becomes
259 available. It has to return some iterable whose entries are yielded
260 from iteration over objects of this class.
262 It receives the following args:
263 - the description of the source
265 - the index of the source
266 The result must be an iterable of objects, which are yielded as-is, so
269 This base implementation just returns its input in a list, so new data
270 is yielded from __iter__ as-is
272 return [(description, data), ]
275 LINE_SPLITTERS = '\n\r'
278 class LineReader(IterativeReader):
279 """ an IterativeReader that returns new data line-wise
281 this means buffering partial line data
284 def __init__(self, *args, **kwargs):
285 """ forwards all args and kwargs to :py:class:`IterativeReader` """
286 super(LineReader, self).__init__(*args, **kwargs)
287 self.line_buffers = ['' for _ in range(self.n_sources())]
289 def prepare_result(self, description, new_data, idx):
290 """ take raw new data and split it into lines
292 if line is not complete, then buffer it
294 returns lines without their newline characters
296 all_data = self.line_buffers[idx] + new_data
297 self.line_buffers[idx] = ''
299 should_be_no_new_lines = False
300 for line in all_data.splitlines(True):
301 if line[-1] in LINE_SPLITTERS:
302 result.append((description, line.rstrip(LINE_SPLITTERS)))
303 elif should_be_no_new_lines:
304 raise ValueError('line splitters are not compatible with'
307 self.line_buffers[idx] = line
308 should_be_no_new_lines = True # (this should be the last)
313 class LogParser(LineReader):
314 """ takes lines from LineReader and parses their contents
316 requires a pattern for log lines, auto-detection is not implemented yet
318 Iteration returns re.match result or -- if matching failed -- the original
322 def __init__(self, log_file, pattern=None):
323 """ create a LogParser
325 :param str log_file: name of log file to parse (required!)
326 :param pattern: regexp to split log lines; None (default) to return
329 super(LogParser, self).__init__(log_file)
331 self.pattern = pattern
333 def prepare_result(self, *args):
334 # let super class split data into lines
335 for _, raw_line in super(LogParser, self).prepare_result(*args):
336 result = re.match(self.pattern, raw_line)