| 1 | # This Python file uses the following encoding: utf-8 |
| 2 | |
| 3 | # The software in this package is distributed under the GNU General |
| 4 | # Public License version 2 (with a special exception described below). |
| 5 | # |
| 6 | # A copy of GNU General Public License (GPL) is included in this distribution, |
| 7 | # in the file COPYING.GPL. |
| 8 | # |
| 9 | # As a special exception, if other files instantiate templates or use macros |
| 10 | # or inline functions from this file, or you compile this file and link it |
| 11 | # with other works to produce a work based on this file, this file |
| 12 | # does not by itself cause the resulting work to be covered |
| 13 | # by the GNU General Public License. |
| 14 | # |
| 15 | # However the source code for this file must still be made available |
| 16 | # in accordance with section (3) of the GNU General Public License. |
| 17 | # |
| 18 | # This exception does not invalidate any other reasons why a work based |
| 19 | # on this file might be covered by the GNU General Public License. |
| 20 | # |
| 21 | # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com> |
| 22 | |
| 23 | """ |
| 24 | Class :py:class:`MailValidator`, a fully-featured email sender and checker. |
| 25 | |
| 26 | Copyright: Intra2net AG |
| 27 | """ |
| 28 | |
| 29 | import time |
| 30 | import os |
| 31 | import difflib |
| 32 | import socket |
| 33 | from inspect import currentframe |
| 34 | import re |
| 35 | import subprocess |
| 36 | import logging |
| 37 | |
| 38 | import smtplib |
| 39 | from email.mime.audio import MIMEAudio |
| 40 | from email.mime.base import MIMEBase |
| 41 | from email.mime.image import MIMEImage |
| 42 | from email.mime.multipart import MIMEMultipart |
| 43 | from email.mime.text import MIMEText |
| 44 | from email.encoders import encode_base64 |
| 45 | from email.utils import formatdate |
| 46 | from email.parser import Parser |
| 47 | import mimetypes |
| 48 | |
| 49 | from . import arnied_wrapper |
| 50 | |
| 51 | log = logging.getLogger('pyi2ncommon.mail_utils') |
| 52 | |
| 53 | |
| 54 | class EmailException(Exception): |
| 55 | """Base class for custom exceptions raised from `MailValidator`.""" |
| 56 | |
| 57 | pass |
| 58 | |
| 59 | |
| 60 | class EmailNotFound(EmailException): # pylint: disable=missing-docstring |
| 61 | pass |
| 62 | |
| 63 | |
| 64 | class InvalidEmailHeader(EmailException): # pylint: disable=missing-docstring |
| 65 | pass |
| 66 | |
| 67 | |
| 68 | class InvalidEmailContent(EmailException): # pylint: disable=missing-docstring |
| 69 | pass |
| 70 | |
| 71 | |
| 72 | class EmailIDError(EmailException): # pylint: disable=missing-docstring |
| 73 | pass |
| 74 | |
| 75 | |
| 76 | class MismatchedEmailID(EmailIDError): # pylint: disable=missing-docstring |
| 77 | pass |
| 78 | |
| 79 | |
| 80 | class MissingEmailID(EmailIDError): # pylint: disable=missing-docstring |
| 81 | pass |
| 82 | |
| 83 | |
| 84 | class EmailMismatch(EmailException): # pylint: disable=missing-docstring |
| 85 | pass |
| 86 | |
| 87 | |
| 88 | class MailValidator: |
| 89 | """Class for validation of emails.""" |
| 90 | |
| 91 | def target_path(self, new_value=None): |
| 92 | """Getter/Setter for property `target_path`.""" |
| 93 | if new_value is not None: |
| 94 | self._target_path = new_value |
| 95 | else: |
| 96 | return self._target_path |
| 97 | target_path = property(target_path, target_path) |
| 98 | |
| 99 | def source_path(self, new_value=None): |
| 100 | """Getter/Setter for property `source_path`.""" |
| 101 | if new_value is not None: |
| 102 | self._source_path = new_value |
| 103 | else: |
| 104 | return self._source_path |
| 105 | source_path = property(source_path, source_path) |
| 106 | |
| 107 | def smtp_sender(self, new_value=None): |
| 108 | """Getter/Setter for property `smtp_sender`.""" |
| 109 | if new_value is not None: |
| 110 | self._smtp_sender = new_value |
| 111 | else: |
| 112 | return self._smtp_sender |
| 113 | smtp_sender = property(smtp_sender, smtp_sender) |
| 114 | |
| 115 | def compare_emails_method(self, method="basic"): |
| 116 | """ |
| 117 | Set email comparison method for validation. |
| 118 | |
| 119 | :param str method: one of "basic", "headers" |
| 120 | :raises: :py:class:`ValueError` if chosen method is invalid |
| 121 | """ |
| 122 | if method == "basic": |
| 123 | self._compare_emails_method = self._default_compare_emails |
| 124 | elif method == "headers": |
| 125 | self._compare_emails_method = self._compare_emails_by_basic_headers |
| 126 | elif method == "existence": |
| 127 | self._compare_emails_method = self._compare_emails_by_existence |
| 128 | else: |
| 129 | raise ValueError("Invalid email comparison method %s" % method) |
| 130 | compare_emails_method = property(fset=compare_emails_method) |
| 131 | |
| 132 | def __init__(self, source_path, target_path): |
| 133 | """ |
| 134 | Construct a validator instance. |
| 135 | |
| 136 | :param str source_path: path to find source emails (not sent) |
| 137 | :param str target_path: path to find target emails (received) |
| 138 | |
| 139 | .. note:: The comparison method can be redefined using the variety of |
| 140 | private method implementations. |
| 141 | """ |
| 142 | self._target_path = target_path |
| 143 | self._source_path = source_path |
| 144 | self._smtp_sender = "no_source@inject.smtp" |
| 145 | self._compare_emails_method = self._default_compare_emails |
| 146 | |
| 147 | def inject_emails(self, username, original_user): |
| 148 | """ |
| 149 | Inject emails from `source_path` to `target_path`. |
| 150 | |
| 151 | This uses the script *restore_mail_inject.pl* which injects the mails |
| 152 | using IMAP (as opposed to :py:meth:`inject_smtp`). |
| 153 | |
| 154 | :param str username: username for the mail injection script |
| 155 | :param str original_user: original username for the mail injection |
| 156 | script |
| 157 | |
| 158 | In order to restore acl rights as well put a mailbox.dump file in the |
| 159 | source path. |
| 160 | """ |
| 161 | log.info("Injecting emails for user %s", username) |
| 162 | |
| 163 | # inject emails from test data |
| 164 | cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \ |
| 165 | " -s " + self.source_path |
| 166 | if original_user != "": |
| 167 | cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \ |
| 168 | " -o " + original_user |
| 169 | |
| 170 | result = subprocess.check_output(cmd, shell=True) |
| 171 | log.debug(result) |
| 172 | |
| 173 | def _prepare_recipients(self, recipients): |
| 174 | """ |
| 175 | Prepare recipient list: ensure list of proper addresses. |
| 176 | |
| 177 | If given a simple string, make a list of strings out of it. |
| 178 | If any recipient is just a username, append "@" + localhost to it. |
| 179 | Also check that recipients are just email addresses. |
| 180 | """ |
| 181 | hostname = socket.gethostname() |
| 182 | if isinstance(recipients, str): |
| 183 | recipients = [recipients, ] |
| 184 | result = [] |
| 185 | for recipient in recipients: |
| 186 | if '@' in recipient: |
| 187 | result.append(recipient) |
| 188 | else: |
| 189 | result.append(recipient + '@' + hostname) |
| 190 | for bad_char in '<>"\'': |
| 191 | if bad_char in recipient: |
| 192 | raise ValueError('Recipient must be a "raw" email address,' |
| 193 | ' not {!r}'.format(recipient)) |
| 194 | return result |
| 195 | |
| 196 | def inject_smtp(self, usernames, emails): |
| 197 | """ |
| 198 | Inject emails from `source_path` using python's SMTP library. |
| 199 | |
| 200 | As opposed to :py:meth:`inject_emails`, this actually sends the mail |
| 201 | to the local mail server (meaning filtering, archiving, ... will |
| 202 | happen). |
| 203 | |
| 204 | :param usernames: username(s) of the localhost receiver(s) for each |
| 205 | email or proper email address(es) |
| 206 | :type usernames: str or [str] |
| 207 | :param emails: paths to files including full emails (header + body) |
| 208 | to be sent to each user |
| 209 | :type emails: [str] |
| 210 | """ |
| 211 | recipients = self._prepare_recipients(usernames) |
| 212 | log.info("Sending emails to %s", ','.join(recipients)) |
| 213 | with smtplib.SMTP('localhost') as server: |
| 214 | for email in emails: |
| 215 | log.info("Sending email %s", email) |
| 216 | with open(os.path.join(self.source_path, email), 'rb') \ |
| 217 | as file_handle: |
| 218 | email_content = file_handle.read() |
| 219 | server.sendmail(self.smtp_sender, recipients, email_content) |
| 220 | |
| 221 | # Wait till SMTP queue is processed |
| 222 | arnied_wrapper.wait_for_email_transfer() |
| 223 | |
| 224 | def verify_email_id(self, email, emails_list, timeout, in_target=True): |
| 225 | """ |
| 226 | Verify that the id of an email is present in a list. |
| 227 | |
| 228 | Returns that email's match in this list. |
| 229 | |
| 230 | :param str email: email filename |
| 231 | :param emails_list: email among which the first email has to be found |
| 232 | :type emails_list: [str] |
| 233 | :param int timeout: timeout for extracting the source and target emails |
| 234 | :param bool in_target: whether the verified email is on the target side |
| 235 | |
| 236 | If `in_target` is set to True we are getting the target id from the |
| 237 | target list of a source email. Otherwise, we assume a target email from |
| 238 | a source list. |
| 239 | """ |
| 240 | if in_target: |
| 241 | email = self._extract_email_paths(self.source_path, [email], |
| 242 | timeout)[0] |
| 243 | emails_list = self._extract_email_paths(self.target_path, |
| 244 | emails_list, timeout) |
| 245 | else: |
| 246 | email = self._extract_email_paths(self.target_path, [email], |
| 247 | timeout)[0] |
| 248 | emails_list = self._extract_email_paths(self.source_path, |
| 249 | emails_list, timeout) |
| 250 | |
| 251 | email_id = self._extract_message_id(email) |
| 252 | match = self._find_message_with_id(email_id, emails_list) |
| 253 | return os.path.basename(match) |
| 254 | |
| 255 | def verify_emails(self, source_emails, target_emails, timeout): |
| 256 | """ |
| 257 | Check injected e-mails for a user. |
| 258 | |
| 259 | :param source_emails: emails at the source location |
| 260 | :type source_emails: [str] |
| 261 | :param target_emails: emails at the target (server) location |
| 262 | :type target_emails: [str] |
| 263 | :param int timeout: timeout for extracting the source and target emails |
| 264 | :raises: :py:class:`EmailNotFound` if target email is not found on |
| 265 | server |
| 266 | """ |
| 267 | source_paths = self._extract_email_paths(self.source_path, |
| 268 | source_emails, timeout) |
| 269 | target_paths = self._extract_email_paths(self.target_path, |
| 270 | target_emails, timeout) |
| 271 | |
| 272 | log.info("Verifying emails at %s with %s", self.target_path, |
| 273 | self.source_path) |
| 274 | for target in target_paths: |
| 275 | log.info("Verifying email %s", target) |
| 276 | target_id = self._extract_message_id(target) |
| 277 | source = self._find_message_with_id(target_id, source_paths) |
| 278 | source_paths.remove(source) |
| 279 | self._compare_emails_method(target, source, 1) |
| 280 | |
| 281 | if len(source_paths) > 0: |
| 282 | raise EmailNotFound("%s target mails could not be found on server." |
| 283 | "\n%s" |
| 284 | % (len(source_paths), "\n".join(source_paths))) |
| 285 | else: |
| 286 | log.info("All e-mails at %s verified!", self.target_path) |
| 287 | |
| 288 | def assert_header(self, emails, header, present_values=None, |
| 289 | absent_values=None, timeout=30): |
| 290 | """ |
| 291 | Check headers for present and missing strings in a list of messages. |
| 292 | |
| 293 | :param emails: emails whose headers will be checked |
| 294 | :type emails: [str] |
| 295 | :param str header: header that will be validated for each email |
| 296 | :param present_values: strings that have to be present in the header |
| 297 | :type present_values: [str] or None |
| 298 | :param absent_values: strings that have to be absent in the header |
| 299 | :type absent_values: [str] or None |
| 300 | :param int timeout: timeout for extracting the source and target emails |
| 301 | :raises: :py:class:`InvalidEmailHeader` if email header is not valid |
| 302 | |
| 303 | Every list of present and respectively absent values contains |
| 304 | alternative values. At least one of present and one of absent should be |
| 305 | satisfied. |
| 306 | """ |
| 307 | target_paths = self._extract_email_paths(self.target_path, emails, |
| 308 | timeout) |
| 309 | for email_path in target_paths: |
| 310 | with open(email_path, "r") as email_file: |
| 311 | verified_email = Parser().parse(email_file, headersonly=True) |
| 312 | log.debug("Extracted email headers:\n%s", verified_email) |
| 313 | |
| 314 | log.info("Checking header '%s' in %s", header, email_path) |
| 315 | if not present_values: |
| 316 | present_values = [] |
| 317 | else: |
| 318 | log.info("for present '%s'", "', '".join(present_values)) |
| 319 | if not absent_values: |
| 320 | absent_values = [] |
| 321 | else: |
| 322 | log.info("for absent '%s'", "', '".join(absent_values)) |
| 323 | present_valid = False |
| 324 | for present in present_values: |
| 325 | if present in verified_email[header]: |
| 326 | present_valid = True |
| 327 | absent_valid = False |
| 328 | for absent in absent_values: |
| 329 | if absent not in verified_email[header]: |
| 330 | absent_valid = True |
| 331 | |
| 332 | if not present_valid and len(present_values) > 0: |
| 333 | raise InvalidEmailHeader("Message header '%s' in %s is not " |
| 334 | "valid:\n%s" |
| 335 | % (header, email_path, |
| 336 | verified_email[header])) |
| 337 | if not absent_valid and len(absent_values) > 0: |
| 338 | raise InvalidEmailHeader("Message header '%s' in %s is not " |
| 339 | "valid:\n%s" |
| 340 | % (header, email_path, |
| 341 | verified_email[header])) |
| 342 | log.info("Message header '%s' in %s is valid!", header, email_path) |
| 343 | |
| 344 | def assert_content(self, emails, content_type, present_values=None, |
| 345 | absent_values=None, timeout=30): |
| 346 | """ |
| 347 | Check headers for present/missing strings in a list of messages. |
| 348 | |
| 349 | :param emails: emails whose content will be checked |
| 350 | :type emails: [str] |
| 351 | :param str content_type: type of the content that will be checked for |
| 352 | values |
| 353 | :param present_values: strings that have to be present in the content |
| 354 | :type present_values: [str] or None |
| 355 | :param absent_values: strings that have to be absent in the content |
| 356 | :type absent_values: [str] or None |
| 357 | :param int timeout: timeout for extracting the source and target emails |
| 358 | :raises: :py:class:`InvalidEmailContent` if email content is not valid |
| 359 | |
| 360 | Every list of present and respectively absent values contains |
| 361 | alternative values. At least one of present and one of absent should be |
| 362 | satisfied. |
| 363 | """ |
| 364 | target_paths = self._extract_email_paths(self.target_path, emails, |
| 365 | timeout) |
| 366 | for email_path in target_paths: |
| 367 | with open(email_path, "r") as email_file: |
| 368 | verified_email = Parser().parse(email_file) |
| 369 | log.debug("Extracted email content:\n%s", verified_email) |
| 370 | content = "" |
| 371 | for part in verified_email.walk(): |
| 372 | log.debug("Extracted %s part while looking for %s", |
| 373 | part.get_content_type(), content_type) |
| 374 | if part.get_content_type() == content_type: |
| 375 | content = part.get_payload(decode=True) |
| 376 | if isinstance(content, bytes): |
| 377 | content = content.decode() |
| 378 | # NOTE: only one such element is expected |
| 379 | break |
| 380 | |
| 381 | log.info("Checking content '%s' in %s", content_type, email_path) |
| 382 | if not present_values: |
| 383 | present_values = [] |
| 384 | else: |
| 385 | log.info("for present '%s'", "', '".join(present_values)) |
| 386 | if not absent_values: |
| 387 | absent_values = [] |
| 388 | else: |
| 389 | log.info("for absent '%s'", "', '".join(absent_values)) |
| 390 | present_valid = False |
| 391 | for present in present_values: |
| 392 | if present in content: |
| 393 | present_valid = True |
| 394 | absent_valid = False |
| 395 | for absent in absent_values: |
| 396 | if absent not in content: |
| 397 | absent_valid = True |
| 398 | |
| 399 | if not present_valid and len(present_values) > 0: |
| 400 | raise InvalidEmailContent("Message content '%s' in %s is not " |
| 401 | "valid:\n%s" |
| 402 | % (content_type, email_path, content)) |
| 403 | if not absent_valid and len(absent_values) > 0: |
| 404 | raise InvalidEmailContent("Message content '%s' in %s is not " |
| 405 | "valid:\n%s" |
| 406 | % (content_type, email_path, content)) |
| 407 | log.info("Message content '%s' in %s is valid!", |
| 408 | content_type, email_path) |
| 409 | |
| 410 | def send_email_with_files(self, usernames, file_list, |
| 411 | wait_for_transfer=True, |
| 412 | autotest_signature=None, |
| 413 | subject="my subject"): |
| 414 | """ |
| 415 | Send a generated email with optional attachments. |
| 416 | |
| 417 | :param usernames: username(s) of the localhost receiver(s) or proper |
| 418 | email address(es) |
| 419 | :type usernames: str or [str] |
| 420 | :param file_list: files attached to an email; can be empty |
| 421 | :type file_list: [str] |
| 422 | :param wait_for_transfer: specify whether to wait until arnied_wrapper |
| 423 | confirms email transfer; you can also specify |
| 424 | a fixed timeout (seconds) |
| 425 | :type wait_for_transfer: bool or int |
| 426 | :param autotest_signature: text to insert as value for header |
| 427 | X-Autotest-Signature for simpler recognition |
| 428 | of mail (if None do not add header) |
| 429 | :type autotest_signature: str or None |
| 430 | :param str subject: Subject of created mails |
| 431 | """ |
| 432 | text = 'This is an autogenerated email.\n' |
| 433 | |
| 434 | recipients = self._prepare_recipients(usernames) |
| 435 | |
| 436 | if file_list: # empty or None or so |
| 437 | msg = MIMEMultipart() # pylint: disable=redefined-variable-type |
| 438 | msg.attach(MIMEText(text, _charset='utf-8')) |
| 439 | else: |
| 440 | msg = MIMEText(text, _charset='utf-8') # pylint: disable=redefined-variable-type |
| 441 | msg['From'] = self.smtp_sender |
| 442 | msg['To'] = ', '.join(recipients) |
| 443 | msg['Subject'] = subject |
| 444 | msg['Date'] = formatdate(localtime=True) |
| 445 | msg.preamble = 'This is a multi-part message in MIME format.\n' |
| 446 | msg.add_header('X-Autotest-Creator', |
| 447 | self.__class__.__module__ + '.' + |
| 448 | self.__class__.__name__ + '.' + |
| 449 | currentframe().f_code.co_name) |
| 450 | # (with help from http://stackoverflow.com/questions/5067604/determine- |
| 451 | # function-name-from-within-that-function-without-using-traceback) |
| 452 | if autotest_signature: |
| 453 | msg.add_header('X-Autotest-Signature', autotest_signature) |
| 454 | |
| 455 | # attach files |
| 456 | for filename in file_list: |
| 457 | fullpath = os.path.join(self.source_path, filename) |
| 458 | |
| 459 | # Guess the content type based on the file's extension. Encoding |
| 460 | # will be ignored, although we should check for simple things like |
| 461 | # gzip'd or compressed files. |
| 462 | ctype, encoding = mimetypes.guess_type(fullpath) |
| 463 | if ctype is None or encoding is not None: |
| 464 | # No guess could be made, or the file is encoded (compressed), |
| 465 | # so use a generic bag-of-bits type. |
| 466 | ctype = 'application/octet-stream' |
| 467 | |
| 468 | maintype, subtype = ctype.split('/', 1) |
| 469 | log.debug("Creating message containing file {} of mime type {}" |
| 470 | .format(filename, ctype)) |
| 471 | part = None |
| 472 | if maintype == 'text': |
| 473 | with open(fullpath, 'rt') as file_handle: |
| 474 | # Note: we should handle calculating the charset |
| 475 | part = MIMEText(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type |
| 476 | elif maintype == 'image': |
| 477 | with open(fullpath, 'rb') as file_handle: |
| 478 | part = MIMEImage(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type |
| 479 | elif maintype == 'audio': |
| 480 | with open(fullpath, 'rb') as file_handle: |
| 481 | part = MIMEAudio(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type |
| 482 | else: |
| 483 | part = MIMEBase(maintype, subtype) # pylint:disable=redefined-variable-type |
| 484 | with open(fullpath, 'rb') as file_handle: |
| 485 | part.set_payload(file_handle.read()) |
| 486 | # Encode the payload using Base64 |
| 487 | encode_base64(part) |
| 488 | # Set the filename parameter |
| 489 | part.add_header('Content-Disposition', 'attachment', |
| 490 | filename=filename) |
| 491 | msg.attach(part) |
| 492 | |
| 493 | log.debug("Message successfully created") |
| 494 | # send via SMTP |
| 495 | |
| 496 | log.debug("Sending message from %s to %s" |
| 497 | % (self.smtp_sender, ', '.join(recipients))) |
| 498 | with smtplib.SMTP('localhost') as server: |
| 499 | server.sendmail(self.smtp_sender, recipients, msg.as_string()) |
| 500 | |
| 501 | # wait for transfer; complicated by isinstance(False, int) == True |
| 502 | if wait_for_transfer is False: |
| 503 | pass |
| 504 | elif wait_for_transfer is True: |
| 505 | arnied_wrapper.wait_for_email_transfer() |
| 506 | else: |
| 507 | arnied_wrapper.wait_for_email_transfer(timeout=wait_for_transfer) |
| 508 | |
| 509 | def _extract_email_paths(self, path, emails, timeout): |
| 510 | """Check and return the absolute paths of a list of emails.""" |
| 511 | log.debug("Extracting messages %s", emails) |
| 512 | if len(emails) == 0: |
| 513 | emails = os.listdir(path) |
| 514 | email_paths = [] |
| 515 | for expected_email in emails: |
| 516 | # TODO: this can be improved by matching the emails themselves |
| 517 | if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index", |
| 518 | "Entw&APw-rfe", "Gesendete Elemente", |
| 519 | "Gel&APY-schte Elemente", "mailboxes.dump", |
| 520 | "tmp"]: |
| 521 | continue |
| 522 | email_path = os.path.join(path, expected_email) |
| 523 | for i in range(timeout): |
| 524 | if os.path.isfile(email_path): |
| 525 | email_paths.append(email_path) |
| 526 | break |
| 527 | elif i == timeout - 1: |
| 528 | raise EmailNotFound("Target message %s could not be found " |
| 529 | "on server at %s within %ss" |
| 530 | % (expected_email, path, timeout)) |
| 531 | time.sleep(1) |
| 532 | log.debug("%s mails extracted at %s.", len(email_paths), path) |
| 533 | return email_paths |
| 534 | |
| 535 | def _find_message_with_id(self, message_id, message_paths): |
| 536 | """Find message with id among a list of message paths.""" |
| 537 | log.debug("Looking for a match for the message with id %s", message_id) |
| 538 | for message_path in message_paths: |
| 539 | extracted_id = self._extract_message_id(message_path) |
| 540 | log.debug("Extracted id %s from candidate %s", extracted_id, |
| 541 | message_path) |
| 542 | if message_id == extracted_id: |
| 543 | log.debug("Found match at %s", message_path) |
| 544 | return message_path |
| 545 | raise MismatchedEmailID("The message with id %s could not be matched " |
| 546 | "or wasn't expected among %s" |
| 547 | % (message_id, ", ".join(message_paths))) |
| 548 | |
| 549 | def _extract_message_id(self, message_path): |
| 550 | """ |
| 551 | Given a message file path extract the Message-ID. |
| 552 | |
| 553 | :raises: :py:class:`MissingEmailID` if no Message-ID was found. |
| 554 | """ |
| 555 | message_id = "" |
| 556 | with open(message_path, errors='ignore') as file_handle: |
| 557 | content = file_handle.read() |
| 558 | for line in content.split("\n"): |
| 559 | match_id = re.match("Autotest-Message-ID: (.+)", line) |
| 560 | if match_id is not None: |
| 561 | message_id = match_id.group(1).rstrip('\r\n') |
| 562 | if message_id == "": |
| 563 | raise MissingEmailID(f"No id was found in target message {message_path}, " |
| 564 | f"so it cannot be properly matched") |
| 565 | return message_id |
| 566 | |
| 567 | def _default_compare_emails(self, source_email_path, target_email_path, |
| 568 | tolerance=1): |
| 569 | """ |
| 570 | Compare target emails with source ones. |
| 571 | |
| 572 | Uses python provided diff functionality to compare complete mail files. |
| 573 | """ |
| 574 | with open(source_email_path, "r") as source_email_file: |
| 575 | source_email = source_email_file.read() |
| 576 | with open(target_email_path, "r") as target_email_file: |
| 577 | target_email = target_email_file.read() |
| 578 | matcher = difflib.SequenceMatcher(None, source_email, target_email) |
| 579 | diffratio = matcher.ratio() |
| 580 | log.debug("Target message comparison ratio is %s.", diffratio) |
| 581 | # log.info("%s $$$ %s", source_email, target_email) |
| 582 | if diffratio < tolerance: |
| 583 | raise EmailMismatch("Target message is too different from the " |
| 584 | "source (difference %s < tolerance %s).", |
| 585 | diffratio, tolerance) |
| 586 | |
| 587 | def _compare_emails_by_basic_headers(self, source_email_path, |
| 588 | target_email_path, tolerance=1): |
| 589 | """ |
| 590 | Compare target emails with source ones. |
| 591 | |
| 592 | Uses python provided diff functionality to compare headers and mail |
| 593 | "body". |
| 594 | |
| 595 | Argument `tolerance` not used! |
| 596 | """ |
| 597 | with open(source_email_path, errors="ignore") as file_handle: |
| 598 | source_email = Parser().parse(file_handle) |
| 599 | source_body = "" |
| 600 | for part in source_email.walk(): |
| 601 | if part.get_content_type() in ["text/plain", "text/html"]: |
| 602 | source_body = part.get_payload() |
| 603 | break |
| 604 | |
| 605 | with open(target_email_path, errors="ignore") as file_handle: |
| 606 | target_email = Parser().parse(file_handle) |
| 607 | target_body = "" |
| 608 | for part in target_email.walk(): |
| 609 | if part.get_content_type() in ["text/plain", "text/html"]: |
| 610 | target_body = part.get_payload() |
| 611 | break |
| 612 | |
| 613 | if source_email['From'] != target_email['From']: |
| 614 | raise EmailMismatch("Target message sender %s is too different " |
| 615 | "from the source one %s" % |
| 616 | (target_email['From'], source_email['From'])) |
| 617 | if source_email['To'] != target_email['To']: |
| 618 | raise EmailMismatch("Target message recipient %s is too different " |
| 619 | "from the source one %s" % |
| 620 | (target_email['To'], source_email['To'])) |
| 621 | if source_email['Subject'] != target_email['Subject']: |
| 622 | raise EmailMismatch("Target message subject '%s' is too different " |
| 623 | "from the source one '%s'" % |
| 624 | (target_email['Subject'], |
| 625 | source_email['Subject'])) |
| 626 | if source_email['Date'] != target_email['Date']: |
| 627 | raise EmailMismatch("Target message date %s is too different from " |
| 628 | "the source one %s" % |
| 629 | (target_email['Date'], source_email['Date'])) |
| 630 | if source_body != target_body: |
| 631 | raise EmailMismatch("Target message body '%s' is too different " |
| 632 | "from the source one '%s'" % |
| 633 | (target_body, source_body)) |
| 634 | |
| 635 | def _compare_emails_by_existence(self, source_email_path, |
| 636 | target_email_path, tolerance=1): |
| 637 | """ |
| 638 | Weak email validation based only on presence of file. |
| 639 | |
| 640 | DOES NOT CHECK ANYTHING! |
| 641 | """ |
| 642 | return True |