Commit | Line | Data |
---|---|---|
67177844 CH |
1 | # This Python file uses the following encoding: utf-8 |
2 | ||
3 | # The software in this package is distributed under the GNU General | |
4 | # Public License version 2 (with a special exception described below). | |
5 | # | |
6 | # A copy of GNU General Public License (GPL) is included in this distribution, | |
7 | # in the file COPYING.GPL. | |
8 | # | |
9 | # As a special exception, if other files instantiate templates or use macros | |
10 | # or inline functions from this file, or you compile this file and link it | |
11 | # with other works to produce a work based on this file, this file | |
12 | # does not by itself cause the resulting work to be covered | |
13 | # by the GNU General Public License. | |
14 | # | |
15 | # However the source code for this file must still be made available | |
16 | # in accordance with section (3) of the GNU General Public License. | |
17 | # | |
18 | # This exception does not invalidate any other reasons why a work based | |
19 | # on this file might be covered by the GNU General Public License. | |
20 | # | |
21 | # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com> | |
22 | ||
23 | """ | |
24 | ||
25 | SUMMARY | |
26 | ------------------------------------------------------ | |
27 | Class :py:class:`MailValidator`, a fully-featured email sender and checker. | |
28 | ||
29 | Copyright: Intra2net AG | |
30 | ||
31 | ||
32 | INTERFACE | |
33 | ------------------------------------------------------ | |
34 | ||
35 | """ | |
36 | ||
37 | import time | |
38 | import os | |
39 | import difflib | |
40 | import socket | |
41 | from inspect import currentframe | |
42 | import re | |
43 | import subprocess | |
44 | import logging | |
45 | ||
46 | import smtplib | |
47 | from email.mime.audio import MIMEAudio | |
48 | from email.mime.base import MIMEBase | |
49 | from email.mime.image import MIMEImage | |
50 | from email.mime.multipart import MIMEMultipart | |
51 | from email.mime.text import MIMEText | |
52 | from email.encoders import encode_base64 | |
53 | from email.utils import formatdate | |
54 | from email.parser import Parser | |
55 | import mimetypes | |
56 | ||
57 | from . import arnied_wrapper | |
58 | ||
59 | log = logging.getLogger('pyi2ncommon.mail_utils') | |
60 | ||
61 | ||
62 | class EmailException(Exception): | |
63 | """Base class for custom exceptions raised from `MailValidator`.""" | |
64 | ||
65 | pass | |
66 | ||
67 | ||
68 | class EmailNotFound(EmailException): # pylint: disable=missing-docstring | |
69 | pass | |
70 | ||
71 | ||
72 | class InvalidEmailHeader(EmailException): # pylint: disable=missing-docstring | |
73 | pass | |
74 | ||
75 | ||
76 | class InvalidEmailContent(EmailException): # pylint: disable=missing-docstring | |
77 | pass | |
78 | ||
79 | ||
80 | class EmailIDError(EmailException): # pylint: disable=missing-docstring | |
81 | pass | |
82 | ||
83 | ||
84 | class MismatchedEmailID(EmailIDError): # pylint: disable=missing-docstring | |
85 | pass | |
86 | ||
87 | ||
88 | class MissingEmailID(EmailIDError): # pylint: disable=missing-docstring | |
89 | pass | |
90 | ||
91 | ||
92 | class EmailMismatch(EmailException): # pylint: disable=missing-docstring | |
93 | pass | |
94 | ||
95 | ||
7628bc48 | 96 | class MailValidator: |
67177844 CH |
97 | """Class for validation of emails.""" |
98 | ||
99 | def target_path(self, new_value=None): | |
100 | """Getter/Setter for property `target_path`.""" | |
101 | if new_value is not None: | |
102 | self._target_path = new_value | |
103 | else: | |
104 | return self._target_path | |
105 | target_path = property(target_path, target_path) | |
106 | ||
107 | def source_path(self, new_value=None): | |
108 | """Getter/Setter for property `source_path`.""" | |
109 | if new_value is not None: | |
110 | self._source_path = new_value | |
111 | else: | |
112 | return self._source_path | |
113 | source_path = property(source_path, source_path) | |
114 | ||
115 | def smtp_sender(self, new_value=None): | |
116 | """Getter/Setter for property `smtp_sender`.""" | |
117 | if new_value is not None: | |
118 | self._smtp_sender = new_value | |
119 | else: | |
120 | return self._smtp_sender | |
121 | smtp_sender = property(smtp_sender, smtp_sender) | |
122 | ||
123 | def compare_emails_method(self, method="basic"): | |
124 | """ | |
125 | Set email comparison method for validation. | |
126 | ||
127 | :param str method: one of "basic", "headers" | |
128 | :raises: :py:class:`ValueError` if chosen method is invalid | |
129 | """ | |
130 | if method == "basic": | |
131 | self._compare_emails_method = self._default_compare_emails | |
132 | elif method == "headers": | |
133 | self._compare_emails_method = self._compare_emails_by_basic_headers | |
134 | elif method == "existence": | |
135 | self._compare_emails_method = self._compare_emails_by_existence | |
136 | else: | |
137 | raise ValueError("Invalid email comparison method %s" % method) | |
138 | compare_emails_method = property(fset=compare_emails_method) | |
139 | ||
140 | def __init__(self, source_path, target_path): | |
141 | """ | |
142 | Construct a validator instance. | |
143 | ||
144 | :param str source_path: path to find source emails (not sent) | |
145 | :param str target_path: path to find target emails (received) | |
146 | ||
147 | .. note:: The comparison method can be redefined using the variety of | |
148 | private method implementations. | |
149 | """ | |
150 | self._target_path = target_path | |
151 | self._source_path = source_path | |
152 | self._smtp_sender = "no_source@inject.smtp" | |
153 | self._compare_emails_method = self._default_compare_emails | |
154 | ||
155 | def inject_emails(self, username, original_user): | |
156 | """ | |
157 | Inject emails from `source_path` to `target_path`. | |
158 | ||
ec0c7450 CH |
159 | This uses the script *restore_mail_inject.pl* which injects the mails |
160 | using IMAP (as opposed to :py:meth:`inject_smtp`). | |
161 | ||
67177844 CH |
162 | :param str username: username for the mail injection script |
163 | :param str original_user: original username for the mail injection | |
164 | script | |
165 | ||
166 | In order to restore acl rights as well put a mailbox.dump file in the | |
167 | source path. | |
168 | """ | |
169 | log.info("Injecting emails for user %s", username) | |
170 | ||
171 | # inject emails from test data | |
172 | cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \ | |
173 | " -s " + self.source_path | |
174 | if original_user != "": | |
175 | cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \ | |
176 | " -o " + original_user | |
177 | ||
178 | result = subprocess.check_output(cmd, shell=True) | |
179 | log.debug(result) | |
180 | ||
ec0c7450 CH |
181 | def _prepare_recipients(self, recipients): |
182 | """ | |
183 | Prepare recipient list: ensure list of proper addresses. | |
184 | ||
185 | If given a simple string, make a list of strings out of it. | |
186 | If any recipient is just a username, append "@" + localhost to it. | |
187 | Also check that recipients are just email addresses. | |
188 | """ | |
189 | hostname = socket.gethostname() | |
190 | if isinstance(recipients, str): | |
191 | recipients = [recipients, ] | |
192 | result = [] | |
193 | for recipient in recipients: | |
194 | if '@' in recipient: | |
195 | result.append(recipient) | |
196 | else: | |
197 | result.append(recipient + '@' + hostname) | |
198 | for bad_char in '<>"\'': | |
199 | if bad_char in recipient: | |
200 | raise ValueError('Recipient must be a "raw" email address,' | |
201 | ' not {!r}'.format(recipient)) | |
202 | return result | |
203 | ||
67177844 CH |
204 | def inject_smtp(self, usernames, emails): |
205 | """ | |
206 | Inject emails from `source_path` using python's SMTP library. | |
207 | ||
ec0c7450 CH |
208 | As opposed to :py:meth:`inject_emails`, this actually sends the mail |
209 | to the local mail server (meaning filtering, archiving, ... will | |
210 | happen). | |
211 | ||
212 | :param usernames: username(s) of the localhost receiver(s) for each | |
213 | email or proper email address(es) | |
214 | :type usernames: str or [str] | |
215 | :param emails: paths to files including full emails (header + body) | |
216 | to be sent to each user | |
67177844 CH |
217 | :type emails: [str] |
218 | """ | |
ec0c7450 CH |
219 | recipients = self._prepare_recipients(usernames) |
220 | log.info("Sending emails to %s", ','.join(recipients)) | |
67177844 | 221 | with smtplib.SMTP('localhost') as server: |
67177844 CH |
222 | for email in emails: |
223 | log.info("Sending email %s", email) | |
224 | with open(os.path.join(self.source_path, email), 'rb') \ | |
225 | as file_handle: | |
226 | email_content = file_handle.read() | |
ec0c7450 | 227 | server.sendmail(self.smtp_sender, recipients, email_content) |
67177844 CH |
228 | |
229 | # Wait till SMTP queue is processed | |
230 | arnied_wrapper.wait_for_email_transfer() | |
231 | ||
232 | def verify_email_id(self, email, emails_list, timeout, in_target=True): | |
233 | """ | |
234 | Verify that the id of an email is present in a list. | |
235 | ||
236 | Returns that email's match in this list. | |
237 | ||
238 | :param str email: email filename | |
239 | :param emails_list: email among which the first email has to be found | |
240 | :type emails_list: [str] | |
241 | :param int timeout: timeout for extracting the source and target emails | |
242 | :param bool in_target: whether the verified email is on the target side | |
243 | ||
244 | If `in_target` is set to True we are getting the target id from the | |
7628bc48 | 245 | target list of a source email. Otherwise, we assume a target email from |
67177844 CH |
246 | a source list. |
247 | """ | |
248 | if in_target: | |
249 | email = self._extract_email_paths(self.source_path, [email], | |
250 | timeout)[0] | |
251 | emails_list = self._extract_email_paths(self.target_path, | |
252 | emails_list, timeout) | |
253 | else: | |
254 | email = self._extract_email_paths(self.target_path, [email], | |
255 | timeout)[0] | |
256 | emails_list = self._extract_email_paths(self.source_path, | |
257 | emails_list, timeout) | |
258 | ||
259 | email_id = self._extract_message_id(email) | |
260 | match = self._find_message_with_id(email_id, emails_list) | |
261 | return os.path.basename(match) | |
262 | ||
263 | def verify_emails(self, source_emails, target_emails, timeout): | |
264 | """ | |
265 | Check injected e-mails for a user. | |
266 | ||
267 | :param source_emails: emails at the source location | |
268 | :type source_emails: [str] | |
269 | :param target_emails: emails at the target (server) location | |
270 | :type target_emails: [str] | |
271 | :param int timeout: timeout for extracting the source and target emails | |
272 | :raises: :py:class:`EmailNotFound` if target email is not found on | |
273 | server | |
274 | """ | |
275 | source_paths = self._extract_email_paths(self.source_path, | |
276 | source_emails, timeout) | |
277 | target_paths = self._extract_email_paths(self.target_path, | |
278 | target_emails, timeout) | |
279 | ||
280 | log.info("Verifying emails at %s with %s", self.target_path, | |
281 | self.source_path) | |
282 | for target in target_paths: | |
283 | log.info("Verifying email %s", target) | |
284 | target_id = self._extract_message_id(target) | |
285 | source = self._find_message_with_id(target_id, source_paths) | |
286 | source_paths.remove(source) | |
287 | self._compare_emails_method(target, source, 1) | |
288 | ||
289 | if len(source_paths) > 0: | |
290 | raise EmailNotFound("%s target mails could not be found on server." | |
291 | "\n%s" | |
292 | % (len(source_paths), "\n".join(source_paths))) | |
293 | else: | |
294 | log.info("All e-mails at %s verified!", self.target_path) | |
295 | ||
296 | def assert_header(self, emails, header, present_values=None, | |
297 | absent_values=None, timeout=30): | |
298 | """ | |
299 | Check headers for present and missing strings in a list of messages. | |
300 | ||
301 | :param emails: emails whose headers will be checked | |
302 | :type emails: [str] | |
303 | :param str header: header that will be validated for each email | |
304 | :param present_values: strings that have to be present in the header | |
305 | :type present_values: [str] or None | |
306 | :param absent_values: strings that have to be absent in the header | |
307 | :type absent_values: [str] or None | |
308 | :param int timeout: timeout for extracting the source and target emails | |
309 | :raises: :py:class:`InvalidEmailHeader` if email header is not valid | |
310 | ||
311 | Every list of present and respectively absent values contains | |
312 | alternative values. At least one of present and one of absent should be | |
313 | satisfied. | |
314 | """ | |
315 | target_paths = self._extract_email_paths(self.target_path, emails, | |
316 | timeout) | |
317 | for email_path in target_paths: | |
318 | with open(email_path, "r") as email_file: | |
319 | verified_email = Parser().parse(email_file, headersonly=True) | |
320 | log.debug("Extracted email headers:\n%s", verified_email) | |
321 | ||
322 | log.info("Checking header '%s' in %s", header, email_path) | |
323 | if not present_values: | |
324 | present_values = [] | |
325 | else: | |
326 | log.info("for present '%s'", "', '".join(present_values)) | |
327 | if not absent_values: | |
328 | absent_values = [] | |
329 | else: | |
330 | log.info("for absent '%s'", "', '".join(absent_values)) | |
331 | present_valid = False | |
332 | for present in present_values: | |
333 | if present in verified_email[header]: | |
334 | present_valid = True | |
335 | absent_valid = False | |
336 | for absent in absent_values: | |
337 | if absent not in verified_email[header]: | |
338 | absent_valid = True | |
339 | ||
340 | if not present_valid and len(present_values) > 0: | |
341 | raise InvalidEmailHeader("Message header '%s' in %s is not " | |
342 | "valid:\n%s" | |
343 | % (header, email_path, | |
344 | verified_email[header])) | |
345 | if not absent_valid and len(absent_values) > 0: | |
346 | raise InvalidEmailHeader("Message header '%s' in %s is not " | |
347 | "valid:\n%s" | |
348 | % (header, email_path, | |
349 | verified_email[header])) | |
350 | log.info("Message header '%s' in %s is valid!", header, email_path) | |
351 | ||
352 | def assert_content(self, emails, content_type, present_values=None, | |
353 | absent_values=None, timeout=30): | |
354 | """ | |
355 | Check headers for present/missing strings in a list of messages. | |
356 | ||
357 | :param emails: emails whose content will be checked | |
358 | :type emails: [str] | |
359 | :param str content_type: type of the content that will be checked for | |
360 | values | |
361 | :param present_values: strings that have to be present in the content | |
362 | :type present_values: [str] or None | |
363 | :param absent_values: strings that have to be absent in the content | |
364 | :type absent_values: [str] or None | |
365 | :param int timeout: timeout for extracting the source and target emails | |
366 | :raises: :py:class:`InvalidEmailContent` if email content is not valid | |
367 | ||
368 | Every list of present and respectively absent values contains | |
369 | alternative values. At least one of present and one of absent should be | |
370 | satisfied. | |
371 | """ | |
372 | target_paths = self._extract_email_paths(self.target_path, emails, | |
373 | timeout) | |
374 | for email_path in target_paths: | |
375 | with open(email_path, "r") as email_file: | |
376 | verified_email = Parser().parse(email_file) | |
377 | log.debug("Extracted email content:\n%s", verified_email) | |
378 | content = "" | |
379 | for part in verified_email.walk(): | |
380 | log.debug("Extracted %s part while looking for %s", | |
381 | part.get_content_type(), content_type) | |
382 | if part.get_content_type() == content_type: | |
383 | content = part.get_payload(decode=True) | |
384 | if isinstance(content, bytes): | |
385 | content = content.decode() | |
386 | # NOTE: only one such element is expected | |
387 | break | |
388 | ||
389 | log.info("Checking content '%s' in %s", content_type, email_path) | |
390 | if not present_values: | |
391 | present_values = [] | |
392 | else: | |
393 | log.info("for present '%s'", "', '".join(present_values)) | |
394 | if not absent_values: | |
395 | absent_values = [] | |
396 | else: | |
397 | log.info("for absent '%s'", "', '".join(absent_values)) | |
398 | present_valid = False | |
399 | for present in present_values: | |
400 | if present in content: | |
401 | present_valid = True | |
402 | absent_valid = False | |
403 | for absent in absent_values: | |
404 | if absent not in content: | |
405 | absent_valid = True | |
406 | ||
407 | if not present_valid and len(present_values) > 0: | |
408 | raise InvalidEmailContent("Message content '%s' in %s is not " | |
409 | "valid:\n%s" | |
410 | % (content_type, email_path, content)) | |
411 | if not absent_valid and len(absent_values) > 0: | |
412 | raise InvalidEmailContent("Message content '%s' in %s is not " | |
413 | "valid:\n%s" | |
414 | % (content_type, email_path, content)) | |
415 | log.info("Message content '%s' in %s is valid!", | |
416 | content_type, email_path) | |
417 | ||
ec0c7450 | 418 | def send_email_with_files(self, usernames, file_list, |
67177844 CH |
419 | wait_for_transfer=True, |
420 | autotest_signature=None, | |
421 | subject="my subject"): | |
422 | """ | |
ec0c7450 | 423 | Send a generated email with optional attachments. |
67177844 | 424 | |
ec0c7450 CH |
425 | :param usernames: username(s) of the localhost receiver(s) or proper |
426 | email address(es) | |
427 | :type usernames: str or [str] | |
428 | :param file_list: files attached to an email; can be empty | |
67177844 CH |
429 | :type file_list: [str] |
430 | :param wait_for_transfer: specify whether to wait until arnied_wrapper | |
431 | confirms email transfer; you can also specify | |
432 | a fixed timeout (seconds) | |
433 | :type wait_for_transfer: bool or int | |
434 | :param autotest_signature: text to insert as value for header | |
435 | X-Autotest-Signature for simpler recognition | |
436 | of mail (if None do not add header) | |
437 | :type autotest_signature: str or None | |
7628bc48 | 438 | :param str subject: Subject of created mails |
67177844 CH |
439 | """ |
440 | text = 'This is an autogenerated email.\n' | |
441 | ||
ec0c7450 | 442 | recipients = self._prepare_recipients(usernames) |
67177844 CH |
443 | |
444 | if file_list: # empty or None or so | |
445 | msg = MIMEMultipart() # pylint: disable=redefined-variable-type | |
446 | msg.attach(MIMEText(text, _charset='utf-8')) | |
447 | else: | |
448 | msg = MIMEText(text, _charset='utf-8') # pylint: disable=redefined-variable-type | |
449 | msg['From'] = self.smtp_sender | |
ec0c7450 | 450 | msg['To'] = ', '.join(recipients) |
67177844 CH |
451 | msg['Subject'] = subject |
452 | msg['Date'] = formatdate(localtime=True) | |
453 | msg.preamble = 'This is a multi-part message in MIME format.\n' | |
454 | msg.add_header('X-Autotest-Creator', | |
455 | self.__class__.__module__ + '.' + | |
456 | self.__class__.__name__ + '.' + | |
457 | currentframe().f_code.co_name) | |
458 | # (with help from http://stackoverflow.com/questions/5067604/determine- | |
459 | # function-name-from-within-that-function-without-using-traceback) | |
460 | if autotest_signature: | |
461 | msg.add_header('X-Autotest-Signature', autotest_signature) | |
462 | ||
463 | # attach files | |
464 | for filename in file_list: | |
465 | fullpath = os.path.join(self.source_path, filename) | |
466 | ||
467 | # Guess the content type based on the file's extension. Encoding | |
468 | # will be ignored, although we should check for simple things like | |
469 | # gzip'd or compressed files. | |
470 | ctype, encoding = mimetypes.guess_type(fullpath) | |
471 | if ctype is None or encoding is not None: | |
472 | # No guess could be made, or the file is encoded (compressed), | |
473 | # so use a generic bag-of-bits type. | |
474 | ctype = 'application/octet-stream' | |
475 | ||
476 | maintype, subtype = ctype.split('/', 1) | |
477 | log.debug("Creating message containing file {} of mime type {}" | |
478 | .format(filename, ctype)) | |
479 | part = None | |
480 | if maintype == 'text': | |
481 | with open(fullpath, 'rt') as file_handle: | |
482 | # Note: we should handle calculating the charset | |
483 | part = MIMEText(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type | |
484 | elif maintype == 'image': | |
485 | with open(fullpath, 'rb') as file_handle: | |
486 | part = MIMEImage(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type | |
487 | elif maintype == 'audio': | |
488 | with open(fullpath, 'rb') as file_handle: | |
489 | part = MIMEAudio(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type | |
490 | else: | |
491 | part = MIMEBase(maintype, subtype) # pylint:disable=redefined-variable-type | |
492 | with open(fullpath, 'rb') as file_handle: | |
493 | part.set_payload(file_handle.read()) | |
494 | # Encode the payload using Base64 | |
495 | encode_base64(part) | |
496 | # Set the filename parameter | |
497 | part.add_header('Content-Disposition', 'attachment', | |
498 | filename=filename) | |
499 | msg.attach(part) | |
500 | ||
501 | log.debug("Message successfully created") | |
502 | # send via SMTP | |
503 | ||
ec0c7450 CH |
504 | log.debug("Sending message from %s to %s" |
505 | % (self.smtp_sender, ', '.join(recipients))) | |
67177844 | 506 | with smtplib.SMTP('localhost') as server: |
ec0c7450 | 507 | server.sendmail(self.smtp_sender, recipients, msg.as_string()) |
67177844 CH |
508 | |
509 | # wait for transfer; complicated by isinstance(False, int) == True | |
510 | if wait_for_transfer is False: | |
511 | pass | |
512 | elif wait_for_transfer is True: | |
513 | arnied_wrapper.wait_for_email_transfer() | |
514 | else: | |
515 | arnied_wrapper.wait_for_email_transfer(timeout=wait_for_transfer) | |
516 | ||
517 | def _extract_email_paths(self, path, emails, timeout): | |
518 | """Check and return the absolute paths of a list of emails.""" | |
519 | log.debug("Extracting messages %s", emails) | |
520 | if len(emails) == 0: | |
521 | emails = os.listdir(path) | |
522 | email_paths = [] | |
523 | for expected_email in emails: | |
524 | # TODO: this can be improved by matching the emails themselves | |
525 | if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index", | |
526 | "Entw&APw-rfe", "Gesendete Objekte", | |
527 | "Gel&APY-schte Elemente", "mailboxes.dump", | |
528 | "tmp"]: | |
529 | continue | |
530 | email_path = os.path.join(path, expected_email) | |
531 | for i in range(timeout): | |
532 | if os.path.isfile(email_path): | |
533 | email_paths.append(email_path) | |
534 | break | |
535 | elif i == timeout - 1: | |
536 | raise EmailNotFound("Target message %s could not be found " | |
537 | "on server at %s within %ss" | |
538 | % (expected_email, path, timeout)) | |
539 | time.sleep(1) | |
540 | log.debug("%s mails extracted at %s.", len(email_paths), path) | |
541 | return email_paths | |
542 | ||
543 | def _find_message_with_id(self, message_id, message_paths): | |
544 | """Find message with id among a list of message paths.""" | |
545 | log.debug("Looking for a match for the message with id %s", message_id) | |
546 | for message_path in message_paths: | |
547 | extracted_id = self._extract_message_id(message_path) | |
548 | log.debug("Extracted id %s from candidate %s", extracted_id, | |
549 | message_path) | |
550 | if message_id == extracted_id: | |
551 | log.debug("Found match at %s", message_path) | |
552 | return message_path | |
553 | raise MismatchedEmailID("The message with id %s could not be matched " | |
554 | "or wasn't expected among %s" | |
555 | % (message_id, ", ".join(message_paths))) | |
556 | ||
557 | def _extract_message_id(self, message_path): | |
558 | """ | |
559 | Given a message file path extract the Message-ID. | |
560 | ||
561 | :raises: :py:class:`MissingEmailID` if no Message-ID was found. | |
562 | """ | |
563 | message_id = "" | |
564 | with open(message_path, errors='ignore') as file_handle: | |
565 | content = file_handle.read() | |
566 | for line in content.split("\n"): | |
567 | match_id = re.match("Autotest-Message-ID: (.+)", line) | |
568 | if match_id is not None: | |
569 | message_id = match_id.group(1).rstrip('\r\n') | |
570 | if message_id == "": | |
7628bc48 CH |
571 | raise MissingEmailID(f"No id was found in target message {message_path}, " |
572 | f"so it cannot be properly matched") | |
67177844 CH |
573 | return message_id |
574 | ||
575 | def _default_compare_emails(self, source_email_path, target_email_path, | |
576 | tolerance=1): | |
577 | """ | |
578 | Compare target emails with source ones. | |
579 | ||
580 | Uses python provided diff functionality to compare complete mail files. | |
581 | """ | |
582 | with open(source_email_path, "r") as source_email_file: | |
583 | source_email = source_email_file.read() | |
584 | with open(target_email_path, "r") as target_email_file: | |
585 | target_email = target_email_file.read() | |
586 | matcher = difflib.SequenceMatcher(None, source_email, target_email) | |
587 | diffratio = matcher.ratio() | |
588 | log.debug("Target message comparison ratio is %s.", diffratio) | |
589 | # log.info("%s $$$ %s", source_email, target_email) | |
590 | if diffratio < tolerance: | |
591 | raise EmailMismatch("Target message is too different from the " | |
592 | "source (difference %s < tolerance %s).", | |
593 | diffratio, tolerance) | |
594 | ||
595 | def _compare_emails_by_basic_headers(self, source_email_path, | |
596 | target_email_path, tolerance=1): | |
597 | """ | |
598 | Compare target emails with source ones. | |
599 | ||
600 | Uses python provided diff functionality to compare headers and mail | |
601 | "body". | |
602 | ||
603 | Argument `tolerance` not used! | |
604 | """ | |
605 | with open(source_email_path, errors="ignore") as file_handle: | |
606 | source_email = Parser().parse(file_handle) | |
607 | source_body = "" | |
608 | for part in source_email.walk(): | |
609 | if part.get_content_type() in ["text/plain", "text/html"]: | |
610 | source_body = part.get_payload() | |
611 | break | |
612 | ||
613 | with open(target_email_path, errors="ignore") as file_handle: | |
614 | target_email = Parser().parse(file_handle) | |
615 | target_body = "" | |
616 | for part in target_email.walk(): | |
617 | if part.get_content_type() in ["text/plain", "text/html"]: | |
618 | target_body = part.get_payload() | |
619 | break | |
620 | ||
621 | if source_email['From'] != target_email['From']: | |
622 | raise EmailMismatch("Target message sender %s is too different " | |
623 | "from the source one %s" % | |
624 | (target_email['From'], source_email['From'])) | |
625 | if source_email['To'] != target_email['To']: | |
626 | raise EmailMismatch("Target message recipient %s is too different " | |
627 | "from the source one %s" % | |
628 | (target_email['To'], source_email['To'])) | |
629 | if source_email['Subject'] != target_email['Subject']: | |
630 | raise EmailMismatch("Target message subject '%s' is too different " | |
631 | "from the source one '%s'" % | |
632 | (target_email['Subject'], | |
633 | source_email['Subject'])) | |
634 | if source_email['Date'] != target_email['Date']: | |
635 | raise EmailMismatch("Target message date %s is too different from " | |
636 | "the source one %s" % | |
637 | (target_email['Date'], source_email['Date'])) | |
638 | if source_body != target_body: | |
639 | raise EmailMismatch("Target message body '%s' is too different " | |
640 | "from the source one '%s'" % | |
641 | (target_body, source_body)) | |
642 | ||
643 | def _compare_emails_by_existence(self, source_email_path, | |
644 | target_email_path, tolerance=1): | |
645 | """ | |
646 | Weak email validation based only on presence of file. | |
647 | ||
648 | DOES NOT CHECK ANYTHING! | |
649 | """ | |
650 | return True |