:py:data:`self.descriptions`
:param str new_data: Text data read from source
:param idx: Index of data source
- :returns: [(description, data)], same as input
- :rtype [(str, str)]
+ :returns: [(description, data, idx], same as input
+ :rtype [(str, str, int)]
"""
- return [(description, data), ]
+ return [(description, data, idx), ]
#: characters to `rstrip()` from end of complete lines
If line is not complete, then buffer it.
Args: see super class method :py:meth:`IterativeReader.prepare_result`
- :returns: list of 2-tuples `(description, line)` where
- `description` is same as arg, and `line` is
+ :returns: list of 3-tuples `(description, line, idx)` where
+ `description` and `idx` are same as args, and `line` is
without trailing newline characters
- :rtype: [(str, str)]
+ :rtype: [(str, str, int)]
"""
all_data = self.line_buffers[idx] + new_data
self.line_buffers[idx] = ''
should_be_no_new_lines = False
for line in all_data.splitlines(True):
if line[-1] in LINE_SPLITTERS:
- result.append((description, line.rstrip(LINE_SPLITTERS)))
+ result.append((description, line.rstrip(LINE_SPLITTERS), idx))
elif should_be_no_new_lines:
raise ValueError('line splitters are not compatible with'
'str.splitlines!')
Try parsing lines.
Args: see super class method :py:meth:`IterativeReader.prepare_result`
- :returns: either a :py:class:`re.Match` if line matched
- :py:data:`self.pattern` or just str if line did not match.
- :rtype: :py:class:`re.Match` OR str
+ :returns: 3-tuples `(description, line, idx)` where `description` and
+ `idx` are same as input args and `line` is either a
+ :py:class:`re.Match` if line matched :py:data:`self.pattern`
+ or just str if line did not match.
+ :rtype: [(str, :py:class:`re.Match` OR str, int)]
"""
# let super class split data into lines
- for _, raw_line in super(LogParser, self).prepare_result(*args):
+ for description, raw_line, idx in \
+ super(LogParser, self).prepare_result(*args):
result = re.match(self.pattern, raw_line)
if result:
- return result
+ return (description, result, idx)
else:
- return raw_line
+ return (description, raw_line, idx)
reader = IterativeReader(file_handle)
self.helper_test_len(reader, 1)
counter = -1 # we may have to adapt this manually
- for desc, text in reader:
+ for desc, text, source_idx in reader:
receive_time = perf_counter()
self.assertEqual(desc, self.temp_file)
+ self.assertEqual(source_idx, 0)
counter += 1
text = text.strip()
if DEBUG:
reader = LineReader(file_handle)
self.helper_test_len(reader, 1)
- for line_expected, (_, line_read) in zip(lines_expected, reader):
+ for line_expected, (_, line_read, _) in zip(lines_expected, reader):
if 'end' in line_read:
break
else: