Commit | Line | Data |
---|---|---|
67177844 CH |
1 | # This Python file uses the following encoding: utf-8 |
2 | ||
3 | # The software in this package is distributed under the GNU General | |
4 | # Public License version 2 (with a special exception described below). | |
5 | # | |
6 | # A copy of GNU General Public License (GPL) is included in this distribution, | |
7 | # in the file COPYING.GPL. | |
8 | # | |
9 | # As a special exception, if other files instantiate templates or use macros | |
10 | # or inline functions from this file, or you compile this file and link it | |
11 | # with other works to produce a work based on this file, this file | |
12 | # does not by itself cause the resulting work to be covered | |
13 | # by the GNU General Public License. | |
14 | # | |
15 | # However the source code for this file must still be made available | |
16 | # in accordance with section (3) of the GNU General Public License. | |
17 | # | |
18 | # This exception does not invalidate any other reasons why a work based | |
19 | # on this file might be covered by the GNU General Public License. | |
20 | # | |
21 | # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com> | |
22 | ||
23 | """ | |
67177844 CH |
24 | Class :py:class:`MailValidator`, a fully-featured email sender and checker. |
25 | ||
26 | Copyright: Intra2net AG | |
67177844 CH |
27 | """ |
28 | ||
29 | import time | |
30 | import os | |
31 | import difflib | |
32 | import socket | |
33 | from inspect import currentframe | |
34 | import re | |
35 | import subprocess | |
36 | import logging | |
37 | ||
38 | import smtplib | |
39 | from email.mime.audio import MIMEAudio | |
40 | from email.mime.base import MIMEBase | |
41 | from email.mime.image import MIMEImage | |
42 | from email.mime.multipart import MIMEMultipart | |
43 | from email.mime.text import MIMEText | |
44 | from email.encoders import encode_base64 | |
45 | from email.utils import formatdate | |
46 | from email.parser import Parser | |
47 | import mimetypes | |
48 | ||
49 | from . import arnied_wrapper | |
50 | ||
51 | log = logging.getLogger('pyi2ncommon.mail_utils') | |
52 | ||
53 | ||
54 | class EmailException(Exception): | |
55 | """Base class for custom exceptions raised from `MailValidator`.""" | |
56 | ||
57 | pass | |
58 | ||
59 | ||
60 | class EmailNotFound(EmailException): # pylint: disable=missing-docstring | |
61 | pass | |
62 | ||
63 | ||
64 | class InvalidEmailHeader(EmailException): # pylint: disable=missing-docstring | |
65 | pass | |
66 | ||
67 | ||
68 | class InvalidEmailContent(EmailException): # pylint: disable=missing-docstring | |
69 | pass | |
70 | ||
71 | ||
72 | class EmailIDError(EmailException): # pylint: disable=missing-docstring | |
73 | pass | |
74 | ||
75 | ||
76 | class MismatchedEmailID(EmailIDError): # pylint: disable=missing-docstring | |
77 | pass | |
78 | ||
79 | ||
80 | class MissingEmailID(EmailIDError): # pylint: disable=missing-docstring | |
81 | pass | |
82 | ||
83 | ||
84 | class EmailMismatch(EmailException): # pylint: disable=missing-docstring | |
85 | pass | |
86 | ||
87 | ||
7628bc48 | 88 | class MailValidator: |
67177844 CH |
89 | """Class for validation of emails.""" |
90 | ||
91 | def target_path(self, new_value=None): | |
92 | """Getter/Setter for property `target_path`.""" | |
93 | if new_value is not None: | |
94 | self._target_path = new_value | |
95 | else: | |
96 | return self._target_path | |
97 | target_path = property(target_path, target_path) | |
98 | ||
99 | def source_path(self, new_value=None): | |
100 | """Getter/Setter for property `source_path`.""" | |
101 | if new_value is not None: | |
102 | self._source_path = new_value | |
103 | else: | |
104 | return self._source_path | |
105 | source_path = property(source_path, source_path) | |
106 | ||
107 | def smtp_sender(self, new_value=None): | |
108 | """Getter/Setter for property `smtp_sender`.""" | |
109 | if new_value is not None: | |
110 | self._smtp_sender = new_value | |
111 | else: | |
112 | return self._smtp_sender | |
113 | smtp_sender = property(smtp_sender, smtp_sender) | |
114 | ||
115 | def compare_emails_method(self, method="basic"): | |
116 | """ | |
117 | Set email comparison method for validation. | |
118 | ||
119 | :param str method: one of "basic", "headers" | |
120 | :raises: :py:class:`ValueError` if chosen method is invalid | |
121 | """ | |
122 | if method == "basic": | |
123 | self._compare_emails_method = self._default_compare_emails | |
124 | elif method == "headers": | |
125 | self._compare_emails_method = self._compare_emails_by_basic_headers | |
126 | elif method == "existence": | |
127 | self._compare_emails_method = self._compare_emails_by_existence | |
128 | else: | |
129 | raise ValueError("Invalid email comparison method %s" % method) | |
130 | compare_emails_method = property(fset=compare_emails_method) | |
131 | ||
132 | def __init__(self, source_path, target_path): | |
133 | """ | |
134 | Construct a validator instance. | |
135 | ||
136 | :param str source_path: path to find source emails (not sent) | |
137 | :param str target_path: path to find target emails (received) | |
138 | ||
139 | .. note:: The comparison method can be redefined using the variety of | |
140 | private method implementations. | |
141 | """ | |
142 | self._target_path = target_path | |
143 | self._source_path = source_path | |
144 | self._smtp_sender = "no_source@inject.smtp" | |
145 | self._compare_emails_method = self._default_compare_emails | |
146 | ||
147 | def inject_emails(self, username, original_user): | |
148 | """ | |
149 | Inject emails from `source_path` to `target_path`. | |
150 | ||
ec0c7450 CH |
151 | This uses the script *restore_mail_inject.pl* which injects the mails |
152 | using IMAP (as opposed to :py:meth:`inject_smtp`). | |
153 | ||
67177844 CH |
154 | :param str username: username for the mail injection script |
155 | :param str original_user: original username for the mail injection | |
156 | script | |
157 | ||
158 | In order to restore acl rights as well put a mailbox.dump file in the | |
159 | source path. | |
160 | """ | |
161 | log.info("Injecting emails for user %s", username) | |
162 | ||
163 | # inject emails from test data | |
164 | cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \ | |
165 | " -s " + self.source_path | |
166 | if original_user != "": | |
167 | cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \ | |
168 | " -o " + original_user | |
169 | ||
170 | result = subprocess.check_output(cmd, shell=True) | |
171 | log.debug(result) | |
172 | ||
ec0c7450 CH |
173 | def _prepare_recipients(self, recipients): |
174 | """ | |
175 | Prepare recipient list: ensure list of proper addresses. | |
176 | ||
177 | If given a simple string, make a list of strings out of it. | |
178 | If any recipient is just a username, append "@" + localhost to it. | |
179 | Also check that recipients are just email addresses. | |
180 | """ | |
181 | hostname = socket.gethostname() | |
182 | if isinstance(recipients, str): | |
183 | recipients = [recipients, ] | |
184 | result = [] | |
185 | for recipient in recipients: | |
186 | if '@' in recipient: | |
187 | result.append(recipient) | |
188 | else: | |
189 | result.append(recipient + '@' + hostname) | |
190 | for bad_char in '<>"\'': | |
191 | if bad_char in recipient: | |
192 | raise ValueError('Recipient must be a "raw" email address,' | |
193 | ' not {!r}'.format(recipient)) | |
194 | return result | |
195 | ||
67177844 CH |
196 | def inject_smtp(self, usernames, emails): |
197 | """ | |
198 | Inject emails from `source_path` using python's SMTP library. | |
199 | ||
ec0c7450 CH |
200 | As opposed to :py:meth:`inject_emails`, this actually sends the mail |
201 | to the local mail server (meaning filtering, archiving, ... will | |
202 | happen). | |
203 | ||
204 | :param usernames: username(s) of the localhost receiver(s) for each | |
205 | email or proper email address(es) | |
206 | :type usernames: str or [str] | |
207 | :param emails: paths to files including full emails (header + body) | |
208 | to be sent to each user | |
67177844 CH |
209 | :type emails: [str] |
210 | """ | |
ec0c7450 CH |
211 | recipients = self._prepare_recipients(usernames) |
212 | log.info("Sending emails to %s", ','.join(recipients)) | |
67177844 | 213 | with smtplib.SMTP('localhost') as server: |
67177844 CH |
214 | for email in emails: |
215 | log.info("Sending email %s", email) | |
216 | with open(os.path.join(self.source_path, email), 'rb') \ | |
217 | as file_handle: | |
218 | email_content = file_handle.read() | |
ec0c7450 | 219 | server.sendmail(self.smtp_sender, recipients, email_content) |
67177844 CH |
220 | |
221 | # Wait till SMTP queue is processed | |
222 | arnied_wrapper.wait_for_email_transfer() | |
223 | ||
224 | def verify_email_id(self, email, emails_list, timeout, in_target=True): | |
225 | """ | |
226 | Verify that the id of an email is present in a list. | |
227 | ||
228 | Returns that email's match in this list. | |
229 | ||
230 | :param str email: email filename | |
231 | :param emails_list: email among which the first email has to be found | |
232 | :type emails_list: [str] | |
233 | :param int timeout: timeout for extracting the source and target emails | |
234 | :param bool in_target: whether the verified email is on the target side | |
235 | ||
236 | If `in_target` is set to True we are getting the target id from the | |
7628bc48 | 237 | target list of a source email. Otherwise, we assume a target email from |
67177844 CH |
238 | a source list. |
239 | """ | |
240 | if in_target: | |
241 | email = self._extract_email_paths(self.source_path, [email], | |
242 | timeout)[0] | |
243 | emails_list = self._extract_email_paths(self.target_path, | |
244 | emails_list, timeout) | |
245 | else: | |
246 | email = self._extract_email_paths(self.target_path, [email], | |
247 | timeout)[0] | |
248 | emails_list = self._extract_email_paths(self.source_path, | |
249 | emails_list, timeout) | |
250 | ||
251 | email_id = self._extract_message_id(email) | |
252 | match = self._find_message_with_id(email_id, emails_list) | |
253 | return os.path.basename(match) | |
254 | ||
255 | def verify_emails(self, source_emails, target_emails, timeout): | |
256 | """ | |
257 | Check injected e-mails for a user. | |
258 | ||
259 | :param source_emails: emails at the source location | |
260 | :type source_emails: [str] | |
261 | :param target_emails: emails at the target (server) location | |
262 | :type target_emails: [str] | |
263 | :param int timeout: timeout for extracting the source and target emails | |
264 | :raises: :py:class:`EmailNotFound` if target email is not found on | |
265 | server | |
266 | """ | |
267 | source_paths = self._extract_email_paths(self.source_path, | |
268 | source_emails, timeout) | |
269 | target_paths = self._extract_email_paths(self.target_path, | |
270 | target_emails, timeout) | |
271 | ||
272 | log.info("Verifying emails at %s with %s", self.target_path, | |
273 | self.source_path) | |
274 | for target in target_paths: | |
275 | log.info("Verifying email %s", target) | |
276 | target_id = self._extract_message_id(target) | |
277 | source = self._find_message_with_id(target_id, source_paths) | |
278 | source_paths.remove(source) | |
279 | self._compare_emails_method(target, source, 1) | |
280 | ||
281 | if len(source_paths) > 0: | |
282 | raise EmailNotFound("%s target mails could not be found on server." | |
283 | "\n%s" | |
284 | % (len(source_paths), "\n".join(source_paths))) | |
285 | else: | |
286 | log.info("All e-mails at %s verified!", self.target_path) | |
287 | ||
288 | def assert_header(self, emails, header, present_values=None, | |
289 | absent_values=None, timeout=30): | |
290 | """ | |
291 | Check headers for present and missing strings in a list of messages. | |
292 | ||
293 | :param emails: emails whose headers will be checked | |
294 | :type emails: [str] | |
295 | :param str header: header that will be validated for each email | |
296 | :param present_values: strings that have to be present in the header | |
297 | :type present_values: [str] or None | |
298 | :param absent_values: strings that have to be absent in the header | |
299 | :type absent_values: [str] or None | |
300 | :param int timeout: timeout for extracting the source and target emails | |
301 | :raises: :py:class:`InvalidEmailHeader` if email header is not valid | |
302 | ||
303 | Every list of present and respectively absent values contains | |
304 | alternative values. At least one of present and one of absent should be | |
305 | satisfied. | |
306 | """ | |
307 | target_paths = self._extract_email_paths(self.target_path, emails, | |
308 | timeout) | |
309 | for email_path in target_paths: | |
310 | with open(email_path, "r") as email_file: | |
311 | verified_email = Parser().parse(email_file, headersonly=True) | |
312 | log.debug("Extracted email headers:\n%s", verified_email) | |
313 | ||
314 | log.info("Checking header '%s' in %s", header, email_path) | |
315 | if not present_values: | |
316 | present_values = [] | |
317 | else: | |
318 | log.info("for present '%s'", "', '".join(present_values)) | |
319 | if not absent_values: | |
320 | absent_values = [] | |
321 | else: | |
322 | log.info("for absent '%s'", "', '".join(absent_values)) | |
323 | present_valid = False | |
324 | for present in present_values: | |
325 | if present in verified_email[header]: | |
326 | present_valid = True | |
327 | absent_valid = False | |
328 | for absent in absent_values: | |
329 | if absent not in verified_email[header]: | |
330 | absent_valid = True | |
331 | ||
332 | if not present_valid and len(present_values) > 0: | |
333 | raise InvalidEmailHeader("Message header '%s' in %s is not " | |
334 | "valid:\n%s" | |
335 | % (header, email_path, | |
336 | verified_email[header])) | |
337 | if not absent_valid and len(absent_values) > 0: | |
338 | raise InvalidEmailHeader("Message header '%s' in %s is not " | |
339 | "valid:\n%s" | |
340 | % (header, email_path, | |
341 | verified_email[header])) | |
342 | log.info("Message header '%s' in %s is valid!", header, email_path) | |
343 | ||
344 | def assert_content(self, emails, content_type, present_values=None, | |
345 | absent_values=None, timeout=30): | |
346 | """ | |
347 | Check headers for present/missing strings in a list of messages. | |
348 | ||
349 | :param emails: emails whose content will be checked | |
350 | :type emails: [str] | |
351 | :param str content_type: type of the content that will be checked for | |
352 | values | |
353 | :param present_values: strings that have to be present in the content | |
354 | :type present_values: [str] or None | |
355 | :param absent_values: strings that have to be absent in the content | |
356 | :type absent_values: [str] or None | |
357 | :param int timeout: timeout for extracting the source and target emails | |
358 | :raises: :py:class:`InvalidEmailContent` if email content is not valid | |
359 | ||
360 | Every list of present and respectively absent values contains | |
361 | alternative values. At least one of present and one of absent should be | |
362 | satisfied. | |
363 | """ | |
364 | target_paths = self._extract_email_paths(self.target_path, emails, | |
365 | timeout) | |
366 | for email_path in target_paths: | |
367 | with open(email_path, "r") as email_file: | |
368 | verified_email = Parser().parse(email_file) | |
369 | log.debug("Extracted email content:\n%s", verified_email) | |
370 | content = "" | |
371 | for part in verified_email.walk(): | |
372 | log.debug("Extracted %s part while looking for %s", | |
373 | part.get_content_type(), content_type) | |
374 | if part.get_content_type() == content_type: | |
375 | content = part.get_payload(decode=True) | |
376 | if isinstance(content, bytes): | |
377 | content = content.decode() | |
378 | # NOTE: only one such element is expected | |
379 | break | |
380 | ||
381 | log.info("Checking content '%s' in %s", content_type, email_path) | |
382 | if not present_values: | |
383 | present_values = [] | |
384 | else: | |
385 | log.info("for present '%s'", "', '".join(present_values)) | |
386 | if not absent_values: | |
387 | absent_values = [] | |
388 | else: | |
389 | log.info("for absent '%s'", "', '".join(absent_values)) | |
390 | present_valid = False | |
391 | for present in present_values: | |
392 | if present in content: | |
393 | present_valid = True | |
394 | absent_valid = False | |
395 | for absent in absent_values: | |
396 | if absent not in content: | |
397 | absent_valid = True | |
398 | ||
399 | if not present_valid and len(present_values) > 0: | |
400 | raise InvalidEmailContent("Message content '%s' in %s is not " | |
401 | "valid:\n%s" | |
402 | % (content_type, email_path, content)) | |
403 | if not absent_valid and len(absent_values) > 0: | |
404 | raise InvalidEmailContent("Message content '%s' in %s is not " | |
405 | "valid:\n%s" | |
406 | % (content_type, email_path, content)) | |
407 | log.info("Message content '%s' in %s is valid!", | |
408 | content_type, email_path) | |
409 | ||
ec0c7450 | 410 | def send_email_with_files(self, usernames, file_list, |
67177844 CH |
411 | wait_for_transfer=True, |
412 | autotest_signature=None, | |
413 | subject="my subject"): | |
414 | """ | |
ec0c7450 | 415 | Send a generated email with optional attachments. |
67177844 | 416 | |
ec0c7450 CH |
417 | :param usernames: username(s) of the localhost receiver(s) or proper |
418 | email address(es) | |
419 | :type usernames: str or [str] | |
420 | :param file_list: files attached to an email; can be empty | |
67177844 CH |
421 | :type file_list: [str] |
422 | :param wait_for_transfer: specify whether to wait until arnied_wrapper | |
423 | confirms email transfer; you can also specify | |
424 | a fixed timeout (seconds) | |
425 | :type wait_for_transfer: bool or int | |
426 | :param autotest_signature: text to insert as value for header | |
427 | X-Autotest-Signature for simpler recognition | |
428 | of mail (if None do not add header) | |
429 | :type autotest_signature: str or None | |
7628bc48 | 430 | :param str subject: Subject of created mails |
67177844 CH |
431 | """ |
432 | text = 'This is an autogenerated email.\n' | |
433 | ||
ec0c7450 | 434 | recipients = self._prepare_recipients(usernames) |
67177844 CH |
435 | |
436 | if file_list: # empty or None or so | |
437 | msg = MIMEMultipart() # pylint: disable=redefined-variable-type | |
438 | msg.attach(MIMEText(text, _charset='utf-8')) | |
439 | else: | |
440 | msg = MIMEText(text, _charset='utf-8') # pylint: disable=redefined-variable-type | |
441 | msg['From'] = self.smtp_sender | |
ec0c7450 | 442 | msg['To'] = ', '.join(recipients) |
67177844 CH |
443 | msg['Subject'] = subject |
444 | msg['Date'] = formatdate(localtime=True) | |
445 | msg.preamble = 'This is a multi-part message in MIME format.\n' | |
446 | msg.add_header('X-Autotest-Creator', | |
447 | self.__class__.__module__ + '.' + | |
448 | self.__class__.__name__ + '.' + | |
449 | currentframe().f_code.co_name) | |
450 | # (with help from http://stackoverflow.com/questions/5067604/determine- | |
451 | # function-name-from-within-that-function-without-using-traceback) | |
452 | if autotest_signature: | |
453 | msg.add_header('X-Autotest-Signature', autotest_signature) | |
454 | ||
455 | # attach files | |
456 | for filename in file_list: | |
457 | fullpath = os.path.join(self.source_path, filename) | |
458 | ||
459 | # Guess the content type based on the file's extension. Encoding | |
460 | # will be ignored, although we should check for simple things like | |
461 | # gzip'd or compressed files. | |
462 | ctype, encoding = mimetypes.guess_type(fullpath) | |
463 | if ctype is None or encoding is not None: | |
464 | # No guess could be made, or the file is encoded (compressed), | |
465 | # so use a generic bag-of-bits type. | |
466 | ctype = 'application/octet-stream' | |
467 | ||
468 | maintype, subtype = ctype.split('/', 1) | |
469 | log.debug("Creating message containing file {} of mime type {}" | |
470 | .format(filename, ctype)) | |
471 | part = None | |
472 | if maintype == 'text': | |
473 | with open(fullpath, 'rt') as file_handle: | |
474 | # Note: we should handle calculating the charset | |
475 | part = MIMEText(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type | |
476 | elif maintype == 'image': | |
477 | with open(fullpath, 'rb') as file_handle: | |
478 | part = MIMEImage(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type | |
479 | elif maintype == 'audio': | |
480 | with open(fullpath, 'rb') as file_handle: | |
481 | part = MIMEAudio(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type | |
482 | else: | |
483 | part = MIMEBase(maintype, subtype) # pylint:disable=redefined-variable-type | |
484 | with open(fullpath, 'rb') as file_handle: | |
485 | part.set_payload(file_handle.read()) | |
486 | # Encode the payload using Base64 | |
487 | encode_base64(part) | |
488 | # Set the filename parameter | |
489 | part.add_header('Content-Disposition', 'attachment', | |
490 | filename=filename) | |
491 | msg.attach(part) | |
492 | ||
493 | log.debug("Message successfully created") | |
494 | # send via SMTP | |
495 | ||
ec0c7450 CH |
496 | log.debug("Sending message from %s to %s" |
497 | % (self.smtp_sender, ', '.join(recipients))) | |
67177844 | 498 | with smtplib.SMTP('localhost') as server: |
ec0c7450 | 499 | server.sendmail(self.smtp_sender, recipients, msg.as_string()) |
67177844 CH |
500 | |
501 | # wait for transfer; complicated by isinstance(False, int) == True | |
502 | if wait_for_transfer is False: | |
503 | pass | |
504 | elif wait_for_transfer is True: | |
505 | arnied_wrapper.wait_for_email_transfer() | |
506 | else: | |
507 | arnied_wrapper.wait_for_email_transfer(timeout=wait_for_transfer) | |
508 | ||
509 | def _extract_email_paths(self, path, emails, timeout): | |
510 | """Check and return the absolute paths of a list of emails.""" | |
511 | log.debug("Extracting messages %s", emails) | |
512 | if len(emails) == 0: | |
513 | emails = os.listdir(path) | |
514 | email_paths = [] | |
515 | for expected_email in emails: | |
516 | # TODO: this can be improved by matching the emails themselves | |
517 | if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index", | |
a4bb1cea | 518 | "Entw&APw-rfe", "Gesendete Elemente", |
67177844 CH |
519 | "Gel&APY-schte Elemente", "mailboxes.dump", |
520 | "tmp"]: | |
521 | continue | |
522 | email_path = os.path.join(path, expected_email) | |
523 | for i in range(timeout): | |
524 | if os.path.isfile(email_path): | |
525 | email_paths.append(email_path) | |
526 | break | |
527 | elif i == timeout - 1: | |
528 | raise EmailNotFound("Target message %s could not be found " | |
529 | "on server at %s within %ss" | |
530 | % (expected_email, path, timeout)) | |
531 | time.sleep(1) | |
532 | log.debug("%s mails extracted at %s.", len(email_paths), path) | |
533 | return email_paths | |
534 | ||
535 | def _find_message_with_id(self, message_id, message_paths): | |
536 | """Find message with id among a list of message paths.""" | |
537 | log.debug("Looking for a match for the message with id %s", message_id) | |
538 | for message_path in message_paths: | |
539 | extracted_id = self._extract_message_id(message_path) | |
540 | log.debug("Extracted id %s from candidate %s", extracted_id, | |
541 | message_path) | |
542 | if message_id == extracted_id: | |
543 | log.debug("Found match at %s", message_path) | |
544 | return message_path | |
545 | raise MismatchedEmailID("The message with id %s could not be matched " | |
546 | "or wasn't expected among %s" | |
547 | % (message_id, ", ".join(message_paths))) | |
548 | ||
549 | def _extract_message_id(self, message_path): | |
550 | """ | |
551 | Given a message file path extract the Message-ID. | |
552 | ||
553 | :raises: :py:class:`MissingEmailID` if no Message-ID was found. | |
554 | """ | |
555 | message_id = "" | |
556 | with open(message_path, errors='ignore') as file_handle: | |
557 | content = file_handle.read() | |
558 | for line in content.split("\n"): | |
559 | match_id = re.match("Autotest-Message-ID: (.+)", line) | |
560 | if match_id is not None: | |
561 | message_id = match_id.group(1).rstrip('\r\n') | |
562 | if message_id == "": | |
7628bc48 CH |
563 | raise MissingEmailID(f"No id was found in target message {message_path}, " |
564 | f"so it cannot be properly matched") | |
67177844 CH |
565 | return message_id |
566 | ||
567 | def _default_compare_emails(self, source_email_path, target_email_path, | |
568 | tolerance=1): | |
569 | """ | |
570 | Compare target emails with source ones. | |
571 | ||
572 | Uses python provided diff functionality to compare complete mail files. | |
573 | """ | |
574 | with open(source_email_path, "r") as source_email_file: | |
575 | source_email = source_email_file.read() | |
576 | with open(target_email_path, "r") as target_email_file: | |
577 | target_email = target_email_file.read() | |
578 | matcher = difflib.SequenceMatcher(None, source_email, target_email) | |
579 | diffratio = matcher.ratio() | |
580 | log.debug("Target message comparison ratio is %s.", diffratio) | |
581 | # log.info("%s $$$ %s", source_email, target_email) | |
582 | if diffratio < tolerance: | |
583 | raise EmailMismatch("Target message is too different from the " | |
584 | "source (difference %s < tolerance %s).", | |
585 | diffratio, tolerance) | |
586 | ||
587 | def _compare_emails_by_basic_headers(self, source_email_path, | |
588 | target_email_path, tolerance=1): | |
589 | """ | |
590 | Compare target emails with source ones. | |
591 | ||
592 | Uses python provided diff functionality to compare headers and mail | |
593 | "body". | |
594 | ||
595 | Argument `tolerance` not used! | |
596 | """ | |
597 | with open(source_email_path, errors="ignore") as file_handle: | |
598 | source_email = Parser().parse(file_handle) | |
599 | source_body = "" | |
600 | for part in source_email.walk(): | |
601 | if part.get_content_type() in ["text/plain", "text/html"]: | |
602 | source_body = part.get_payload() | |
603 | break | |
604 | ||
605 | with open(target_email_path, errors="ignore") as file_handle: | |
606 | target_email = Parser().parse(file_handle) | |
607 | target_body = "" | |
608 | for part in target_email.walk(): | |
609 | if part.get_content_type() in ["text/plain", "text/html"]: | |
610 | target_body = part.get_payload() | |
611 | break | |
612 | ||
613 | if source_email['From'] != target_email['From']: | |
614 | raise EmailMismatch("Target message sender %s is too different " | |
615 | "from the source one %s" % | |
616 | (target_email['From'], source_email['From'])) | |
617 | if source_email['To'] != target_email['To']: | |
618 | raise EmailMismatch("Target message recipient %s is too different " | |
619 | "from the source one %s" % | |
620 | (target_email['To'], source_email['To'])) | |
621 | if source_email['Subject'] != target_email['Subject']: | |
622 | raise EmailMismatch("Target message subject '%s' is too different " | |
623 | "from the source one '%s'" % | |
624 | (target_email['Subject'], | |
625 | source_email['Subject'])) | |
626 | if source_email['Date'] != target_email['Date']: | |
627 | raise EmailMismatch("Target message date %s is too different from " | |
628 | "the source one %s" % | |
629 | (target_email['Date'], source_email['Date'])) | |
630 | if source_body != target_body: | |
631 | raise EmailMismatch("Target message body '%s' is too different " | |
632 | "from the source one '%s'" % | |
633 | (target_body, source_body)) | |
634 | ||
635 | def _compare_emails_by_existence(self, source_email_path, | |
636 | target_email_path, tolerance=1): | |
637 | """ | |
638 | Weak email validation based only on presence of file. | |
639 | ||
640 | DOES NOT CHECK ANYTHING! | |
641 | """ | |
642 | return True |