From 6717784460ce5aae61482e7c46d050a8065102c0 Mon Sep 17 00:00:00 2001 From: Christian Herdtweck Date: Tue, 25 Jun 2019 13:05:12 +0200 Subject: [PATCH] Move MailValidator into own module mail_utils is getting overcrowded and MailValidator is about half of it and rather independent of the rest. --- src/mail_utils.py | 581 +--------------------------------------------- src/mail_validator.py | 619 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 624 insertions(+), 576 deletions(-) create mode 100644 src/mail_validator.py diff --git a/src/mail_utils.py b/src/mail_utils.py index a989339..b3a0b50 100644 --- a/src/mail_utils.py +++ b/src/mail_utils.py @@ -35,591 +35,20 @@ INTERFACE """ -import time import os -import difflib -import socket -from inspect import currentframe from base64 import b64decode import re -import subprocess import logging - -import smtplib -from email.mime.audio import MIMEAudio -from email.mime.base import MIMEBase -from email.mime.image import MIMEImage -from email.mime.multipart import MIMEMultipart -from email.mime.text import MIMEText -from email.encoders import encode_base64 -from email.utils import formatdate, parsedate_to_datetime +from email.utils import parsedate_to_datetime from email.parser import Parser -import mimetypes from . import arnied_wrapper -from imap_mailbox import ImapMailbox - -log = logging.getLogger('pyi2ncommon.mail_utils') - - -class EmailException(Exception): - """Base class for custom exceptions raised from `MailValidator`.""" - - pass - - -class EmailNotFound(EmailException): # pylint: disable=missing-docstring - pass - - -class InvalidEmailHeader(EmailException): # pylint: disable=missing-docstring - pass - - -class InvalidEmailContent(EmailException): # pylint: disable=missing-docstring - pass - - -class EmailIDError(EmailException): # pylint: disable=missing-docstring - pass - - -class MismatchedEmailID(EmailIDError): # pylint: disable=missing-docstring - pass - -class MissingEmailID(EmailIDError): # pylint: disable=missing-docstring - pass +# outsourced source, import required for compatiblity +from .imap_mailbox import ImapMailbox # pylint: disable=unused-import +from .mail_validator import * # pylint: disable=unused-import - -class EmailMismatch(EmailException): # pylint: disable=missing-docstring - pass - - -class MailValidator(): - """Class for validation of emails.""" - - def target_path(self, new_value=None): - """Getter/Setter for property `target_path`.""" - if new_value is not None: - self._target_path = new_value - else: - return self._target_path - target_path = property(target_path, target_path) - - def source_path(self, new_value=None): - """Getter/Setter for property `source_path`.""" - if new_value is not None: - self._source_path = new_value - else: - return self._source_path - source_path = property(source_path, source_path) - - def smtp_sender(self, new_value=None): - """Getter/Setter for property `smtp_sender`.""" - if new_value is not None: - self._smtp_sender = new_value - else: - return self._smtp_sender - smtp_sender = property(smtp_sender, smtp_sender) - - def compare_emails_method(self, method="basic"): - """ - Set email comparison method for validation. - - :param str method: one of "basic", "headers" - :raises: :py:class:`ValueError` if chosen method is invalid - """ - if method == "basic": - self._compare_emails_method = self._default_compare_emails - elif method == "headers": - self._compare_emails_method = self._compare_emails_by_basic_headers - elif method == "existence": - self._compare_emails_method = self._compare_emails_by_existence - else: - raise ValueError("Invalid email comparison method %s" % method) - compare_emails_method = property(fset=compare_emails_method) - - def __init__(self, source_path, target_path): - """ - Construct a validator instance. - - :param str source_path: path to find source emails (not sent) - :param str target_path: path to find target emails (received) - - .. note:: The comparison method can be redefined using the variety of - private method implementations. - """ - self._target_path = target_path - self._source_path = source_path - self._smtp_sender = "no_source@inject.smtp" - self._compare_emails_method = self._default_compare_emails - - def inject_emails(self, username, original_user): - """ - Inject emails from `source_path` to `target_path`. - - :param str username: username for the mail injection script - :param str original_user: original username for the mail injection - script - - In order to restore acl rights as well put a mailbox.dump file in the - source path. - """ - log.info("Injecting emails for user %s", username) - - # inject emails from test data - cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \ - " -s " + self.source_path - if original_user != "": - cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \ - " -o " + original_user - - result = subprocess.check_output(cmd, shell=True) - log.debug(result) - - def inject_smtp(self, usernames, emails): - """ - Inject emails from `source_path` using python's SMTP library. - - :param usernames: usernames of the localhost receivers for each email - :type usernames: [str] - :param emails: emails to be sent to each user - :type emails: [str] - """ - usernames_string = ",".join(usernames) - log.info("Sending emails to %s", usernames_string) - with smtplib.SMTP('localhost') as server: - hostname = socket.gethostname() - users = [username + "@" + hostname for username in usernames] - - for email in emails: - log.info("Sending email %s", email) - with open(os.path.join(self.source_path, email), 'rb') \ - as file_handle: - email_content = file_handle.read() - server.sendmail(self.smtp_sender, users, email_content) - - # Wait till SMTP queue is processed - arnied_wrapper.wait_for_email_transfer() - - def verify_email_id(self, email, emails_list, timeout, in_target=True): - """ - Verify that the id of an email is present in a list. - - Returns that email's match in this list. - - :param str email: email filename - :param emails_list: email among which the first email has to be found - :type emails_list: [str] - :param int timeout: timeout for extracting the source and target emails - :param bool in_target: whether the verified email is on the target side - - If `in_target` is set to True we are getting the target id from the - target list of a source email. Otherwise we assume a target email from - a source list. - """ - if in_target: - email = self._extract_email_paths(self.source_path, [email], - timeout)[0] - emails_list = self._extract_email_paths(self.target_path, - emails_list, timeout) - else: - email = self._extract_email_paths(self.target_path, [email], - timeout)[0] - emails_list = self._extract_email_paths(self.source_path, - emails_list, timeout) - - email_id = self._extract_message_id(email) - match = self._find_message_with_id(email_id, emails_list) - return os.path.basename(match) - - def verify_emails(self, source_emails, target_emails, timeout): - """ - Check injected e-mails for a user. - - :param source_emails: emails at the source location - :type source_emails: [str] - :param target_emails: emails at the target (server) location - :type target_emails: [str] - :param int timeout: timeout for extracting the source and target emails - :raises: :py:class:`EmailNotFound` if target email is not found on - server - """ - source_paths = self._extract_email_paths(self.source_path, - source_emails, timeout) - target_paths = self._extract_email_paths(self.target_path, - target_emails, timeout) - - log.info("Verifying emails at %s with %s", self.target_path, - self.source_path) - for target in target_paths: - log.info("Verifying email %s", target) - target_id = self._extract_message_id(target) - source = self._find_message_with_id(target_id, source_paths) - source_paths.remove(source) - self._compare_emails_method(target, source, 1) - - if len(source_paths) > 0: - raise EmailNotFound("%s target mails could not be found on server." - "\n%s" - % (len(source_paths), "\n".join(source_paths))) - else: - log.info("All e-mails at %s verified!", self.target_path) - - def assert_header(self, emails, header, present_values=None, - absent_values=None, timeout=30): - """ - Check headers for present and missing strings in a list of messages. - - :param emails: emails whose headers will be checked - :type emails: [str] - :param str header: header that will be validated for each email - :param present_values: strings that have to be present in the header - :type present_values: [str] or None - :param absent_values: strings that have to be absent in the header - :type absent_values: [str] or None - :param int timeout: timeout for extracting the source and target emails - :raises: :py:class:`InvalidEmailHeader` if email header is not valid - - Every list of present and respectively absent values contains - alternative values. At least one of present and one of absent should be - satisfied. - """ - target_paths = self._extract_email_paths(self.target_path, emails, - timeout) - for email_path in target_paths: - with open(email_path, "r") as email_file: - verified_email = Parser().parse(email_file, headersonly=True) - log.debug("Extracted email headers:\n%s", verified_email) - - log.info("Checking header '%s' in %s", header, email_path) - if not present_values: - present_values = [] - else: - log.info("for present '%s'", "', '".join(present_values)) - if not absent_values: - absent_values = [] - else: - log.info("for absent '%s'", "', '".join(absent_values)) - present_valid = False - for present in present_values: - if present in verified_email[header]: - present_valid = True - absent_valid = False - for absent in absent_values: - if absent not in verified_email[header]: - absent_valid = True - - if not present_valid and len(present_values) > 0: - raise InvalidEmailHeader("Message header '%s' in %s is not " - "valid:\n%s" - % (header, email_path, - verified_email[header])) - if not absent_valid and len(absent_values) > 0: - raise InvalidEmailHeader("Message header '%s' in %s is not " - "valid:\n%s" - % (header, email_path, - verified_email[header])) - log.info("Message header '%s' in %s is valid!", header, email_path) - - def assert_content(self, emails, content_type, present_values=None, - absent_values=None, timeout=30): - """ - Check headers for present/missing strings in a list of messages. - - :param emails: emails whose content will be checked - :type emails: [str] - :param str content_type: type of the content that will be checked for - values - :param present_values: strings that have to be present in the content - :type present_values: [str] or None - :param absent_values: strings that have to be absent in the content - :type absent_values: [str] or None - :param int timeout: timeout for extracting the source and target emails - :raises: :py:class:`InvalidEmailContent` if email content is not valid - - Every list of present and respectively absent values contains - alternative values. At least one of present and one of absent should be - satisfied. - """ - target_paths = self._extract_email_paths(self.target_path, emails, - timeout) - for email_path in target_paths: - with open(email_path, "r") as email_file: - verified_email = Parser().parse(email_file) - log.debug("Extracted email content:\n%s", verified_email) - content = "" - for part in verified_email.walk(): - log.debug("Extracted %s part while looking for %s", - part.get_content_type(), content_type) - if part.get_content_type() == content_type: - content = part.get_payload(decode=True) - if isinstance(content, bytes): - content = content.decode() - # NOTE: only one such element is expected - break - - log.info("Checking content '%s' in %s", content_type, email_path) - if not present_values: - present_values = [] - else: - log.info("for present '%s'", "', '".join(present_values)) - if not absent_values: - absent_values = [] - else: - log.info("for absent '%s'", "', '".join(absent_values)) - present_valid = False - for present in present_values: - if present in content: - present_valid = True - absent_valid = False - for absent in absent_values: - if absent not in content: - absent_valid = True - - if not present_valid and len(present_values) > 0: - raise InvalidEmailContent("Message content '%s' in %s is not " - "valid:\n%s" - % (content_type, email_path, content)) - if not absent_valid and len(absent_values) > 0: - raise InvalidEmailContent("Message content '%s' in %s is not " - "valid:\n%s" - % (content_type, email_path, content)) - log.info("Message content '%s' in %s is valid!", - content_type, email_path) - - def send_email_with_files(self, username, file_list, - wait_for_transfer=True, - autotest_signature=None, - subject="my subject"): - """ - Send a generated email with attachments. - - :param str username: username of a localhost receiver of the email - :param file_list: files attached to an email - :type file_list: [str] - :param wait_for_transfer: specify whether to wait until arnied_wrapper - confirms email transfer; you can also specify - a fixed timeout (seconds) - :type wait_for_transfer: bool or int - :param autotest_signature: text to insert as value for header - X-Autotest-Signature for simpler recognition - of mail (if None do not add header) - :type autotest_signature: str or None - """ - text = 'This is an autogenerated email.\n' - - hostname = socket.gethostname() - user = username + "@" + hostname - - if file_list: # empty or None or so - msg = MIMEMultipart() # pylint: disable=redefined-variable-type - msg.attach(MIMEText(text, _charset='utf-8')) - else: - msg = MIMEText(text, _charset='utf-8') # pylint: disable=redefined-variable-type - msg['From'] = self.smtp_sender - msg['To'] = user - msg['Subject'] = subject - msg['Date'] = formatdate(localtime=True) - msg.preamble = 'This is a multi-part message in MIME format.\n' - msg.add_header('X-Autotest-Creator', - self.__class__.__module__ + '.' + - self.__class__.__name__ + '.' + - currentframe().f_code.co_name) - # (with help from http://stackoverflow.com/questions/5067604/determine- - # function-name-from-within-that-function-without-using-traceback) - if autotest_signature: - msg.add_header('X-Autotest-Signature', autotest_signature) - - # attach files - for filename in file_list: - fullpath = os.path.join(self.source_path, filename) - - # Guess the content type based on the file's extension. Encoding - # will be ignored, although we should check for simple things like - # gzip'd or compressed files. - ctype, encoding = mimetypes.guess_type(fullpath) - if ctype is None or encoding is not None: - # No guess could be made, or the file is encoded (compressed), - # so use a generic bag-of-bits type. - ctype = 'application/octet-stream' - - maintype, subtype = ctype.split('/', 1) - log.debug("Creating message containing file {} of mime type {}" - .format(filename, ctype)) - part = None - if maintype == 'text': - with open(fullpath, 'rt') as file_handle: - # Note: we should handle calculating the charset - part = MIMEText(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type - elif maintype == 'image': - with open(fullpath, 'rb') as file_handle: - part = MIMEImage(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type - elif maintype == 'audio': - with open(fullpath, 'rb') as file_handle: - part = MIMEAudio(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type - else: - part = MIMEBase(maintype, subtype) # pylint:disable=redefined-variable-type - with open(fullpath, 'rb') as file_handle: - part.set_payload(file_handle.read()) - # Encode the payload using Base64 - encode_base64(part) - # Set the filename parameter - part.add_header('Content-Disposition', 'attachment', - filename=filename) - msg.attach(part) - - log.debug("Message successfully created") - # send via SMTP - - log.debug("Sending message from %s to %s" % (self.smtp_sender, user)) - with smtplib.SMTP('localhost') as server: - server.sendmail(self.smtp_sender, user, msg.as_string()) - - # wait for transfer; complicated by isinstance(False, int) == True - if wait_for_transfer is False: - pass - elif wait_for_transfer is True: - arnied_wrapper.wait_for_email_transfer() - else: - arnied_wrapper.wait_for_email_transfer(timeout=wait_for_transfer) - - def _extract_email_paths(self, path, emails, timeout): - """Check and return the absolute paths of a list of emails.""" - log.debug("Extracting messages %s", emails) - if len(emails) == 0: - emails = os.listdir(path) - email_paths = [] - for expected_email in emails: - # TODO: this can be improved by matching the emails themselves - if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index", - "Entw&APw-rfe", "Gesendete Objekte", - "Gel&APY-schte Elemente", "mailboxes.dump", - "tmp"]: - continue - email_path = os.path.join(path, expected_email) - for i in range(timeout): - if os.path.isfile(email_path): - email_paths.append(email_path) - break - elif i == timeout - 1: - raise EmailNotFound("Target message %s could not be found " - "on server at %s within %ss" - % (expected_email, path, timeout)) - time.sleep(1) - log.debug("%s mails extracted at %s.", len(email_paths), path) - return email_paths - - def _find_message_with_id(self, message_id, message_paths): - """Find message with id among a list of message paths.""" - log.debug("Looking for a match for the message with id %s", message_id) - for message_path in message_paths: - extracted_id = self._extract_message_id(message_path) - log.debug("Extracted id %s from candidate %s", extracted_id, - message_path) - if message_id == extracted_id: - log.debug("Found match at %s", message_path) - return message_path - raise MismatchedEmailID("The message with id %s could not be matched " - "or wasn't expected among %s" - % (message_id, ", ".join(message_paths))) - - def _extract_message_id(self, message_path): - """ - Given a message file path extract the Message-ID. - - :raises: :py:class:`MissingEmailID` if no Message-ID was found. - """ - message_id = "" - with open(message_path, errors='ignore') as file_handle: - content = file_handle.read() - for line in content.split("\n"): - match_id = re.match("Autotest-Message-ID: (.+)", line) - if match_id is not None: - message_id = match_id.group(1).rstrip('\r\n') - if message_id == "": - raise MissingEmailID("No id was found in target message %s so it " - "cannot be properly matched" - % (message_path)) - return message_id - - def _default_compare_emails(self, source_email_path, target_email_path, - tolerance=1): - """ - Compare target emails with source ones. - - Uses python provided diff functionality to compare complete mail files. - """ - with open(source_email_path, "r") as source_email_file: - source_email = source_email_file.read() - with open(target_email_path, "r") as target_email_file: - target_email = target_email_file.read() - matcher = difflib.SequenceMatcher(None, source_email, target_email) - diffratio = matcher.ratio() - log.debug("Target message comparison ratio is %s.", diffratio) - # log.info("%s $$$ %s", source_email, target_email) - if diffratio < tolerance: - raise EmailMismatch("Target message is too different from the " - "source (difference %s < tolerance %s).", - diffratio, tolerance) - - def _compare_emails_by_basic_headers(self, source_email_path, - target_email_path, tolerance=1): - """ - Compare target emails with source ones. - - Uses python provided diff functionality to compare headers and mail - "body". - - Argument `tolerance` not used! - """ - with open(source_email_path, errors="ignore") as file_handle: - source_email = Parser().parse(file_handle) - source_body = "" - for part in source_email.walk(): - if part.get_content_type() in ["text/plain", "text/html"]: - source_body = part.get_payload() - break - - with open(target_email_path, errors="ignore") as file_handle: - target_email = Parser().parse(file_handle) - target_body = "" - for part in target_email.walk(): - if part.get_content_type() in ["text/plain", "text/html"]: - target_body = part.get_payload() - break - - if source_email['From'] != target_email['From']: - raise EmailMismatch("Target message sender %s is too different " - "from the source one %s" % - (target_email['From'], source_email['From'])) - if source_email['To'] != target_email['To']: - raise EmailMismatch("Target message recipient %s is too different " - "from the source one %s" % - (target_email['To'], source_email['To'])) - if source_email['Subject'] != target_email['Subject']: - raise EmailMismatch("Target message subject '%s' is too different " - "from the source one '%s'" % - (target_email['Subject'], - source_email['Subject'])) - if source_email['Date'] != target_email['Date']: - raise EmailMismatch("Target message date %s is too different from " - "the source one %s" % - (target_email['Date'], source_email['Date'])) - if source_body != target_body: - raise EmailMismatch("Target message body '%s' is too different " - "from the source one '%s'" % - (target_body, source_body)) - - def _compare_emails_by_existence(self, source_email_path, - target_email_path, tolerance=1): - """ - Weak email validation based only on presence of file. - - DOES NOT CHECK ANYTHING! - """ - return True +log = logging.getLogger('pyi2ncommon.mail_utils') def prep_email_header(email_file, value, regex=None, criterion="envelopeto"): diff --git a/src/mail_validator.py b/src/mail_validator.py new file mode 100644 index 0000000..aac4a2a --- /dev/null +++ b/src/mail_validator.py @@ -0,0 +1,619 @@ +# This Python file uses the following encoding: utf-8 + +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. +# +# Copyright (c) 2016-2018 Intra2net AG + +""" + +SUMMARY +------------------------------------------------------ +Class :py:class:`MailValidator`, a fully-featured email sender and checker. + +Copyright: Intra2net AG + + +INTERFACE +------------------------------------------------------ + +""" + +import time +import os +import difflib +import socket +from inspect import currentframe +import re +import subprocess +import logging + +import smtplib +from email.mime.audio import MIMEAudio +from email.mime.base import MIMEBase +from email.mime.image import MIMEImage +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email.encoders import encode_base64 +from email.utils import formatdate +from email.parser import Parser +import mimetypes + +from . import arnied_wrapper + +log = logging.getLogger('pyi2ncommon.mail_utils') + + +class EmailException(Exception): + """Base class for custom exceptions raised from `MailValidator`.""" + + pass + + +class EmailNotFound(EmailException): # pylint: disable=missing-docstring + pass + + +class InvalidEmailHeader(EmailException): # pylint: disable=missing-docstring + pass + + +class InvalidEmailContent(EmailException): # pylint: disable=missing-docstring + pass + + +class EmailIDError(EmailException): # pylint: disable=missing-docstring + pass + + +class MismatchedEmailID(EmailIDError): # pylint: disable=missing-docstring + pass + + +class MissingEmailID(EmailIDError): # pylint: disable=missing-docstring + pass + + +class EmailMismatch(EmailException): # pylint: disable=missing-docstring + pass + + +class MailValidator(): + """Class for validation of emails.""" + + def target_path(self, new_value=None): + """Getter/Setter for property `target_path`.""" + if new_value is not None: + self._target_path = new_value + else: + return self._target_path + target_path = property(target_path, target_path) + + def source_path(self, new_value=None): + """Getter/Setter for property `source_path`.""" + if new_value is not None: + self._source_path = new_value + else: + return self._source_path + source_path = property(source_path, source_path) + + def smtp_sender(self, new_value=None): + """Getter/Setter for property `smtp_sender`.""" + if new_value is not None: + self._smtp_sender = new_value + else: + return self._smtp_sender + smtp_sender = property(smtp_sender, smtp_sender) + + def compare_emails_method(self, method="basic"): + """ + Set email comparison method for validation. + + :param str method: one of "basic", "headers" + :raises: :py:class:`ValueError` if chosen method is invalid + """ + if method == "basic": + self._compare_emails_method = self._default_compare_emails + elif method == "headers": + self._compare_emails_method = self._compare_emails_by_basic_headers + elif method == "existence": + self._compare_emails_method = self._compare_emails_by_existence + else: + raise ValueError("Invalid email comparison method %s" % method) + compare_emails_method = property(fset=compare_emails_method) + + def __init__(self, source_path, target_path): + """ + Construct a validator instance. + + :param str source_path: path to find source emails (not sent) + :param str target_path: path to find target emails (received) + + .. note:: The comparison method can be redefined using the variety of + private method implementations. + """ + self._target_path = target_path + self._source_path = source_path + self._smtp_sender = "no_source@inject.smtp" + self._compare_emails_method = self._default_compare_emails + + def inject_emails(self, username, original_user): + """ + Inject emails from `source_path` to `target_path`. + + :param str username: username for the mail injection script + :param str original_user: original username for the mail injection + script + + In order to restore acl rights as well put a mailbox.dump file in the + source path. + """ + log.info("Injecting emails for user %s", username) + + # inject emails from test data + cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \ + " -s " + self.source_path + if original_user != "": + cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \ + " -o " + original_user + + result = subprocess.check_output(cmd, shell=True) + log.debug(result) + + def inject_smtp(self, usernames, emails): + """ + Inject emails from `source_path` using python's SMTP library. + + :param usernames: usernames of the localhost receivers for each email + :type usernames: [str] + :param emails: emails to be sent to each user + :type emails: [str] + """ + usernames_string = ",".join(usernames) + log.info("Sending emails to %s", usernames_string) + with smtplib.SMTP('localhost') as server: + hostname = socket.gethostname() + users = [username + "@" + hostname for username in usernames] + + for email in emails: + log.info("Sending email %s", email) + with open(os.path.join(self.source_path, email), 'rb') \ + as file_handle: + email_content = file_handle.read() + server.sendmail(self.smtp_sender, users, email_content) + + # Wait till SMTP queue is processed + arnied_wrapper.wait_for_email_transfer() + + def verify_email_id(self, email, emails_list, timeout, in_target=True): + """ + Verify that the id of an email is present in a list. + + Returns that email's match in this list. + + :param str email: email filename + :param emails_list: email among which the first email has to be found + :type emails_list: [str] + :param int timeout: timeout for extracting the source and target emails + :param bool in_target: whether the verified email is on the target side + + If `in_target` is set to True we are getting the target id from the + target list of a source email. Otherwise we assume a target email from + a source list. + """ + if in_target: + email = self._extract_email_paths(self.source_path, [email], + timeout)[0] + emails_list = self._extract_email_paths(self.target_path, + emails_list, timeout) + else: + email = self._extract_email_paths(self.target_path, [email], + timeout)[0] + emails_list = self._extract_email_paths(self.source_path, + emails_list, timeout) + + email_id = self._extract_message_id(email) + match = self._find_message_with_id(email_id, emails_list) + return os.path.basename(match) + + def verify_emails(self, source_emails, target_emails, timeout): + """ + Check injected e-mails for a user. + + :param source_emails: emails at the source location + :type source_emails: [str] + :param target_emails: emails at the target (server) location + :type target_emails: [str] + :param int timeout: timeout for extracting the source and target emails + :raises: :py:class:`EmailNotFound` if target email is not found on + server + """ + source_paths = self._extract_email_paths(self.source_path, + source_emails, timeout) + target_paths = self._extract_email_paths(self.target_path, + target_emails, timeout) + + log.info("Verifying emails at %s with %s", self.target_path, + self.source_path) + for target in target_paths: + log.info("Verifying email %s", target) + target_id = self._extract_message_id(target) + source = self._find_message_with_id(target_id, source_paths) + source_paths.remove(source) + self._compare_emails_method(target, source, 1) + + if len(source_paths) > 0: + raise EmailNotFound("%s target mails could not be found on server." + "\n%s" + % (len(source_paths), "\n".join(source_paths))) + else: + log.info("All e-mails at %s verified!", self.target_path) + + def assert_header(self, emails, header, present_values=None, + absent_values=None, timeout=30): + """ + Check headers for present and missing strings in a list of messages. + + :param emails: emails whose headers will be checked + :type emails: [str] + :param str header: header that will be validated for each email + :param present_values: strings that have to be present in the header + :type present_values: [str] or None + :param absent_values: strings that have to be absent in the header + :type absent_values: [str] or None + :param int timeout: timeout for extracting the source and target emails + :raises: :py:class:`InvalidEmailHeader` if email header is not valid + + Every list of present and respectively absent values contains + alternative values. At least one of present and one of absent should be + satisfied. + """ + target_paths = self._extract_email_paths(self.target_path, emails, + timeout) + for email_path in target_paths: + with open(email_path, "r") as email_file: + verified_email = Parser().parse(email_file, headersonly=True) + log.debug("Extracted email headers:\n%s", verified_email) + + log.info("Checking header '%s' in %s", header, email_path) + if not present_values: + present_values = [] + else: + log.info("for present '%s'", "', '".join(present_values)) + if not absent_values: + absent_values = [] + else: + log.info("for absent '%s'", "', '".join(absent_values)) + present_valid = False + for present in present_values: + if present in verified_email[header]: + present_valid = True + absent_valid = False + for absent in absent_values: + if absent not in verified_email[header]: + absent_valid = True + + if not present_valid and len(present_values) > 0: + raise InvalidEmailHeader("Message header '%s' in %s is not " + "valid:\n%s" + % (header, email_path, + verified_email[header])) + if not absent_valid and len(absent_values) > 0: + raise InvalidEmailHeader("Message header '%s' in %s is not " + "valid:\n%s" + % (header, email_path, + verified_email[header])) + log.info("Message header '%s' in %s is valid!", header, email_path) + + def assert_content(self, emails, content_type, present_values=None, + absent_values=None, timeout=30): + """ + Check headers for present/missing strings in a list of messages. + + :param emails: emails whose content will be checked + :type emails: [str] + :param str content_type: type of the content that will be checked for + values + :param present_values: strings that have to be present in the content + :type present_values: [str] or None + :param absent_values: strings that have to be absent in the content + :type absent_values: [str] or None + :param int timeout: timeout for extracting the source and target emails + :raises: :py:class:`InvalidEmailContent` if email content is not valid + + Every list of present and respectively absent values contains + alternative values. At least one of present and one of absent should be + satisfied. + """ + target_paths = self._extract_email_paths(self.target_path, emails, + timeout) + for email_path in target_paths: + with open(email_path, "r") as email_file: + verified_email = Parser().parse(email_file) + log.debug("Extracted email content:\n%s", verified_email) + content = "" + for part in verified_email.walk(): + log.debug("Extracted %s part while looking for %s", + part.get_content_type(), content_type) + if part.get_content_type() == content_type: + content = part.get_payload(decode=True) + if isinstance(content, bytes): + content = content.decode() + # NOTE: only one such element is expected + break + + log.info("Checking content '%s' in %s", content_type, email_path) + if not present_values: + present_values = [] + else: + log.info("for present '%s'", "', '".join(present_values)) + if not absent_values: + absent_values = [] + else: + log.info("for absent '%s'", "', '".join(absent_values)) + present_valid = False + for present in present_values: + if present in content: + present_valid = True + absent_valid = False + for absent in absent_values: + if absent not in content: + absent_valid = True + + if not present_valid and len(present_values) > 0: + raise InvalidEmailContent("Message content '%s' in %s is not " + "valid:\n%s" + % (content_type, email_path, content)) + if not absent_valid and len(absent_values) > 0: + raise InvalidEmailContent("Message content '%s' in %s is not " + "valid:\n%s" + % (content_type, email_path, content)) + log.info("Message content '%s' in %s is valid!", + content_type, email_path) + + def send_email_with_files(self, username, file_list, + wait_for_transfer=True, + autotest_signature=None, + subject="my subject"): + """ + Send a generated email with attachments. + + :param str username: username of a localhost receiver of the email + :param file_list: files attached to an email + :type file_list: [str] + :param wait_for_transfer: specify whether to wait until arnied_wrapper + confirms email transfer; you can also specify + a fixed timeout (seconds) + :type wait_for_transfer: bool or int + :param autotest_signature: text to insert as value for header + X-Autotest-Signature for simpler recognition + of mail (if None do not add header) + :type autotest_signature: str or None + """ + text = 'This is an autogenerated email.\n' + + hostname = socket.gethostname() + user = username + "@" + hostname + + if file_list: # empty or None or so + msg = MIMEMultipart() # pylint: disable=redefined-variable-type + msg.attach(MIMEText(text, _charset='utf-8')) + else: + msg = MIMEText(text, _charset='utf-8') # pylint: disable=redefined-variable-type + msg['From'] = self.smtp_sender + msg['To'] = user + msg['Subject'] = subject + msg['Date'] = formatdate(localtime=True) + msg.preamble = 'This is a multi-part message in MIME format.\n' + msg.add_header('X-Autotest-Creator', + self.__class__.__module__ + '.' + + self.__class__.__name__ + '.' + + currentframe().f_code.co_name) + # (with help from http://stackoverflow.com/questions/5067604/determine- + # function-name-from-within-that-function-without-using-traceback) + if autotest_signature: + msg.add_header('X-Autotest-Signature', autotest_signature) + + # attach files + for filename in file_list: + fullpath = os.path.join(self.source_path, filename) + + # Guess the content type based on the file's extension. Encoding + # will be ignored, although we should check for simple things like + # gzip'd or compressed files. + ctype, encoding = mimetypes.guess_type(fullpath) + if ctype is None or encoding is not None: + # No guess could be made, or the file is encoded (compressed), + # so use a generic bag-of-bits type. + ctype = 'application/octet-stream' + + maintype, subtype = ctype.split('/', 1) + log.debug("Creating message containing file {} of mime type {}" + .format(filename, ctype)) + part = None + if maintype == 'text': + with open(fullpath, 'rt') as file_handle: + # Note: we should handle calculating the charset + part = MIMEText(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type + elif maintype == 'image': + with open(fullpath, 'rb') as file_handle: + part = MIMEImage(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type + elif maintype == 'audio': + with open(fullpath, 'rb') as file_handle: + part = MIMEAudio(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type + else: + part = MIMEBase(maintype, subtype) # pylint:disable=redefined-variable-type + with open(fullpath, 'rb') as file_handle: + part.set_payload(file_handle.read()) + # Encode the payload using Base64 + encode_base64(part) + # Set the filename parameter + part.add_header('Content-Disposition', 'attachment', + filename=filename) + msg.attach(part) + + log.debug("Message successfully created") + # send via SMTP + + log.debug("Sending message from %s to %s" % (self.smtp_sender, user)) + with smtplib.SMTP('localhost') as server: + server.sendmail(self.smtp_sender, user, msg.as_string()) + + # wait for transfer; complicated by isinstance(False, int) == True + if wait_for_transfer is False: + pass + elif wait_for_transfer is True: + arnied_wrapper.wait_for_email_transfer() + else: + arnied_wrapper.wait_for_email_transfer(timeout=wait_for_transfer) + + def _extract_email_paths(self, path, emails, timeout): + """Check and return the absolute paths of a list of emails.""" + log.debug("Extracting messages %s", emails) + if len(emails) == 0: + emails = os.listdir(path) + email_paths = [] + for expected_email in emails: + # TODO: this can be improved by matching the emails themselves + if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index", + "Entw&APw-rfe", "Gesendete Objekte", + "Gel&APY-schte Elemente", "mailboxes.dump", + "tmp"]: + continue + email_path = os.path.join(path, expected_email) + for i in range(timeout): + if os.path.isfile(email_path): + email_paths.append(email_path) + break + elif i == timeout - 1: + raise EmailNotFound("Target message %s could not be found " + "on server at %s within %ss" + % (expected_email, path, timeout)) + time.sleep(1) + log.debug("%s mails extracted at %s.", len(email_paths), path) + return email_paths + + def _find_message_with_id(self, message_id, message_paths): + """Find message with id among a list of message paths.""" + log.debug("Looking for a match for the message with id %s", message_id) + for message_path in message_paths: + extracted_id = self._extract_message_id(message_path) + log.debug("Extracted id %s from candidate %s", extracted_id, + message_path) + if message_id == extracted_id: + log.debug("Found match at %s", message_path) + return message_path + raise MismatchedEmailID("The message with id %s could not be matched " + "or wasn't expected among %s" + % (message_id, ", ".join(message_paths))) + + def _extract_message_id(self, message_path): + """ + Given a message file path extract the Message-ID. + + :raises: :py:class:`MissingEmailID` if no Message-ID was found. + """ + message_id = "" + with open(message_path, errors='ignore') as file_handle: + content = file_handle.read() + for line in content.split("\n"): + match_id = re.match("Autotest-Message-ID: (.+)", line) + if match_id is not None: + message_id = match_id.group(1).rstrip('\r\n') + if message_id == "": + raise MissingEmailID("No id was found in target message %s so it " + "cannot be properly matched" + % (message_path)) + return message_id + + def _default_compare_emails(self, source_email_path, target_email_path, + tolerance=1): + """ + Compare target emails with source ones. + + Uses python provided diff functionality to compare complete mail files. + """ + with open(source_email_path, "r") as source_email_file: + source_email = source_email_file.read() + with open(target_email_path, "r") as target_email_file: + target_email = target_email_file.read() + matcher = difflib.SequenceMatcher(None, source_email, target_email) + diffratio = matcher.ratio() + log.debug("Target message comparison ratio is %s.", diffratio) + # log.info("%s $$$ %s", source_email, target_email) + if diffratio < tolerance: + raise EmailMismatch("Target message is too different from the " + "source (difference %s < tolerance %s).", + diffratio, tolerance) + + def _compare_emails_by_basic_headers(self, source_email_path, + target_email_path, tolerance=1): + """ + Compare target emails with source ones. + + Uses python provided diff functionality to compare headers and mail + "body". + + Argument `tolerance` not used! + """ + with open(source_email_path, errors="ignore") as file_handle: + source_email = Parser().parse(file_handle) + source_body = "" + for part in source_email.walk(): + if part.get_content_type() in ["text/plain", "text/html"]: + source_body = part.get_payload() + break + + with open(target_email_path, errors="ignore") as file_handle: + target_email = Parser().parse(file_handle) + target_body = "" + for part in target_email.walk(): + if part.get_content_type() in ["text/plain", "text/html"]: + target_body = part.get_payload() + break + + if source_email['From'] != target_email['From']: + raise EmailMismatch("Target message sender %s is too different " + "from the source one %s" % + (target_email['From'], source_email['From'])) + if source_email['To'] != target_email['To']: + raise EmailMismatch("Target message recipient %s is too different " + "from the source one %s" % + (target_email['To'], source_email['To'])) + if source_email['Subject'] != target_email['Subject']: + raise EmailMismatch("Target message subject '%s' is too different " + "from the source one '%s'" % + (target_email['Subject'], + source_email['Subject'])) + if source_email['Date'] != target_email['Date']: + raise EmailMismatch("Target message date %s is too different from " + "the source one %s" % + (target_email['Date'], source_email['Date'])) + if source_body != target_body: + raise EmailMismatch("Target message body '%s' is too different " + "from the source one '%s'" % + (target_body, source_body)) + + def _compare_emails_by_existence(self, source_email_path, + target_email_path, tolerance=1): + """ + Weak email validation based only on presence of file. + + DOES NOT CHECK ANYTHING! + """ + return True -- 1.7.1