"""
-import time
import os
-import difflib
-import socket
-from inspect import currentframe
from base64 import b64decode
import re
-import subprocess
import logging
-
-import smtplib
-from email.mime.audio import MIMEAudio
-from email.mime.base import MIMEBase
-from email.mime.image import MIMEImage
-from email.mime.multipart import MIMEMultipart
-from email.mime.text import MIMEText
-from email.encoders import encode_base64
-from email.utils import formatdate, parsedate_to_datetime
+from email.utils import parsedate_to_datetime
from email.parser import Parser
-import mimetypes
from . import arnied_wrapper
-from imap_mailbox import ImapMailbox
-
-log = logging.getLogger('pyi2ncommon.mail_utils')
-
-
-class EmailException(Exception):
- """Base class for custom exceptions raised from `MailValidator`."""
-
- pass
-
-
-class EmailNotFound(EmailException): # pylint: disable=missing-docstring
- pass
-
-
-class InvalidEmailHeader(EmailException): # pylint: disable=missing-docstring
- pass
-
-
-class InvalidEmailContent(EmailException): # pylint: disable=missing-docstring
- pass
-
-
-class EmailIDError(EmailException): # pylint: disable=missing-docstring
- pass
-
-
-class MismatchedEmailID(EmailIDError): # pylint: disable=missing-docstring
- pass
-
-class MissingEmailID(EmailIDError): # pylint: disable=missing-docstring
- pass
+# outsourced source, import required for compatiblity
+from .imap_mailbox import ImapMailbox # pylint: disable=unused-import
+from .mail_validator import * # pylint: disable=unused-import
-
-class EmailMismatch(EmailException): # pylint: disable=missing-docstring
- pass
-
-
-class MailValidator():
- """Class for validation of emails."""
-
- def target_path(self, new_value=None):
- """Getter/Setter for property `target_path`."""
- if new_value is not None:
- self._target_path = new_value
- else:
- return self._target_path
- target_path = property(target_path, target_path)
-
- def source_path(self, new_value=None):
- """Getter/Setter for property `source_path`."""
- if new_value is not None:
- self._source_path = new_value
- else:
- return self._source_path
- source_path = property(source_path, source_path)
-
- def smtp_sender(self, new_value=None):
- """Getter/Setter for property `smtp_sender`."""
- if new_value is not None:
- self._smtp_sender = new_value
- else:
- return self._smtp_sender
- smtp_sender = property(smtp_sender, smtp_sender)
-
- def compare_emails_method(self, method="basic"):
- """
- Set email comparison method for validation.
-
- :param str method: one of "basic", "headers"
- :raises: :py:class:`ValueError` if chosen method is invalid
- """
- if method == "basic":
- self._compare_emails_method = self._default_compare_emails
- elif method == "headers":
- self._compare_emails_method = self._compare_emails_by_basic_headers
- elif method == "existence":
- self._compare_emails_method = self._compare_emails_by_existence
- else:
- raise ValueError("Invalid email comparison method %s" % method)
- compare_emails_method = property(fset=compare_emails_method)
-
- def __init__(self, source_path, target_path):
- """
- Construct a validator instance.
-
- :param str source_path: path to find source emails (not sent)
- :param str target_path: path to find target emails (received)
-
- .. note:: The comparison method can be redefined using the variety of
- private method implementations.
- """
- self._target_path = target_path
- self._source_path = source_path
- self._smtp_sender = "no_source@inject.smtp"
- self._compare_emails_method = self._default_compare_emails
-
- def inject_emails(self, username, original_user):
- """
- Inject emails from `source_path` to `target_path`.
-
- :param str username: username for the mail injection script
- :param str original_user: original username for the mail injection
- script
-
- In order to restore acl rights as well put a mailbox.dump file in the
- source path.
- """
- log.info("Injecting emails for user %s", username)
-
- # inject emails from test data
- cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \
- " -s " + self.source_path
- if original_user != "":
- cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \
- " -o " + original_user
-
- result = subprocess.check_output(cmd, shell=True)
- log.debug(result)
-
- def inject_smtp(self, usernames, emails):
- """
- Inject emails from `source_path` using python's SMTP library.
-
- :param usernames: usernames of the localhost receivers for each email
- :type usernames: [str]
- :param emails: emails to be sent to each user
- :type emails: [str]
- """
- usernames_string = ",".join(usernames)
- log.info("Sending emails to %s", usernames_string)
- with smtplib.SMTP('localhost') as server:
- hostname = socket.gethostname()
- users = [username + "@" + hostname for username in usernames]
-
- for email in emails:
- log.info("Sending email %s", email)
- with open(os.path.join(self.source_path, email), 'rb') \
- as file_handle:
- email_content = file_handle.read()
- server.sendmail(self.smtp_sender, users, email_content)
-
- # Wait till SMTP queue is processed
- arnied_wrapper.wait_for_email_transfer()
-
- def verify_email_id(self, email, emails_list, timeout, in_target=True):
- """
- Verify that the id of an email is present in a list.
-
- Returns that email's match in this list.
-
- :param str email: email filename
- :param emails_list: email among which the first email has to be found
- :type emails_list: [str]
- :param int timeout: timeout for extracting the source and target emails
- :param bool in_target: whether the verified email is on the target side
-
- If `in_target` is set to True we are getting the target id from the
- target list of a source email. Otherwise we assume a target email from
- a source list.
- """
- if in_target:
- email = self._extract_email_paths(self.source_path, [email],
- timeout)[0]
- emails_list = self._extract_email_paths(self.target_path,
- emails_list, timeout)
- else:
- email = self._extract_email_paths(self.target_path, [email],
- timeout)[0]
- emails_list = self._extract_email_paths(self.source_path,
- emails_list, timeout)
-
- email_id = self._extract_message_id(email)
- match = self._find_message_with_id(email_id, emails_list)
- return os.path.basename(match)
-
- def verify_emails(self, source_emails, target_emails, timeout):
- """
- Check injected e-mails for a user.
-
- :param source_emails: emails at the source location
- :type source_emails: [str]
- :param target_emails: emails at the target (server) location
- :type target_emails: [str]
- :param int timeout: timeout for extracting the source and target emails
- :raises: :py:class:`EmailNotFound` if target email is not found on
- server
- """
- source_paths = self._extract_email_paths(self.source_path,
- source_emails, timeout)
- target_paths = self._extract_email_paths(self.target_path,
- target_emails, timeout)
-
- log.info("Verifying emails at %s with %s", self.target_path,
- self.source_path)
- for target in target_paths:
- log.info("Verifying email %s", target)
- target_id = self._extract_message_id(target)
- source = self._find_message_with_id(target_id, source_paths)
- source_paths.remove(source)
- self._compare_emails_method(target, source, 1)
-
- if len(source_paths) > 0:
- raise EmailNotFound("%s target mails could not be found on server."
- "\n%s"
- % (len(source_paths), "\n".join(source_paths)))
- else:
- log.info("All e-mails at %s verified!", self.target_path)
-
- def assert_header(self, emails, header, present_values=None,
- absent_values=None, timeout=30):
- """
- Check headers for present and missing strings in a list of messages.
-
- :param emails: emails whose headers will be checked
- :type emails: [str]
- :param str header: header that will be validated for each email
- :param present_values: strings that have to be present in the header
- :type present_values: [str] or None
- :param absent_values: strings that have to be absent in the header
- :type absent_values: [str] or None
- :param int timeout: timeout for extracting the source and target emails
- :raises: :py:class:`InvalidEmailHeader` if email header is not valid
-
- Every list of present and respectively absent values contains
- alternative values. At least one of present and one of absent should be
- satisfied.
- """
- target_paths = self._extract_email_paths(self.target_path, emails,
- timeout)
- for email_path in target_paths:
- with open(email_path, "r") as email_file:
- verified_email = Parser().parse(email_file, headersonly=True)
- log.debug("Extracted email headers:\n%s", verified_email)
-
- log.info("Checking header '%s' in %s", header, email_path)
- if not present_values:
- present_values = []
- else:
- log.info("for present '%s'", "', '".join(present_values))
- if not absent_values:
- absent_values = []
- else:
- log.info("for absent '%s'", "', '".join(absent_values))
- present_valid = False
- for present in present_values:
- if present in verified_email[header]:
- present_valid = True
- absent_valid = False
- for absent in absent_values:
- if absent not in verified_email[header]:
- absent_valid = True
-
- if not present_valid and len(present_values) > 0:
- raise InvalidEmailHeader("Message header '%s' in %s is not "
- "valid:\n%s"
- % (header, email_path,
- verified_email[header]))
- if not absent_valid and len(absent_values) > 0:
- raise InvalidEmailHeader("Message header '%s' in %s is not "
- "valid:\n%s"
- % (header, email_path,
- verified_email[header]))
- log.info("Message header '%s' in %s is valid!", header, email_path)
-
- def assert_content(self, emails, content_type, present_values=None,
- absent_values=None, timeout=30):
- """
- Check headers for present/missing strings in a list of messages.
-
- :param emails: emails whose content will be checked
- :type emails: [str]
- :param str content_type: type of the content that will be checked for
- values
- :param present_values: strings that have to be present in the content
- :type present_values: [str] or None
- :param absent_values: strings that have to be absent in the content
- :type absent_values: [str] or None
- :param int timeout: timeout for extracting the source and target emails
- :raises: :py:class:`InvalidEmailContent` if email content is not valid
-
- Every list of present and respectively absent values contains
- alternative values. At least one of present and one of absent should be
- satisfied.
- """
- target_paths = self._extract_email_paths(self.target_path, emails,
- timeout)
- for email_path in target_paths:
- with open(email_path, "r") as email_file:
- verified_email = Parser().parse(email_file)
- log.debug("Extracted email content:\n%s", verified_email)
- content = ""
- for part in verified_email.walk():
- log.debug("Extracted %s part while looking for %s",
- part.get_content_type(), content_type)
- if part.get_content_type() == content_type:
- content = part.get_payload(decode=True)
- if isinstance(content, bytes):
- content = content.decode()
- # NOTE: only one such element is expected
- break
-
- log.info("Checking content '%s' in %s", content_type, email_path)
- if not present_values:
- present_values = []
- else:
- log.info("for present '%s'", "', '".join(present_values))
- if not absent_values:
- absent_values = []
- else:
- log.info("for absent '%s'", "', '".join(absent_values))
- present_valid = False
- for present in present_values:
- if present in content:
- present_valid = True
- absent_valid = False
- for absent in absent_values:
- if absent not in content:
- absent_valid = True
-
- if not present_valid and len(present_values) > 0:
- raise InvalidEmailContent("Message content '%s' in %s is not "
- "valid:\n%s"
- % (content_type, email_path, content))
- if not absent_valid and len(absent_values) > 0:
- raise InvalidEmailContent("Message content '%s' in %s is not "
- "valid:\n%s"
- % (content_type, email_path, content))
- log.info("Message content '%s' in %s is valid!",
- content_type, email_path)
-
- def send_email_with_files(self, username, file_list,
- wait_for_transfer=True,
- autotest_signature=None,
- subject="my subject"):
- """
- Send a generated email with attachments.
-
- :param str username: username of a localhost receiver of the email
- :param file_list: files attached to an email
- :type file_list: [str]
- :param wait_for_transfer: specify whether to wait until arnied_wrapper
- confirms email transfer; you can also specify
- a fixed timeout (seconds)
- :type wait_for_transfer: bool or int
- :param autotest_signature: text to insert as value for header
- X-Autotest-Signature for simpler recognition
- of mail (if None do not add header)
- :type autotest_signature: str or None
- """
- text = 'This is an autogenerated email.\n'
-
- hostname = socket.gethostname()
- user = username + "@" + hostname
-
- if file_list: # empty or None or so
- msg = MIMEMultipart() # pylint: disable=redefined-variable-type
- msg.attach(MIMEText(text, _charset='utf-8'))
- else:
- msg = MIMEText(text, _charset='utf-8') # pylint: disable=redefined-variable-type
- msg['From'] = self.smtp_sender
- msg['To'] = user
- msg['Subject'] = subject
- msg['Date'] = formatdate(localtime=True)
- msg.preamble = 'This is a multi-part message in MIME format.\n'
- msg.add_header('X-Autotest-Creator',
- self.__class__.__module__ + '.' +
- self.__class__.__name__ + '.' +
- currentframe().f_code.co_name)
- # (with help from http://stackoverflow.com/questions/5067604/determine-
- # function-name-from-within-that-function-without-using-traceback)
- if autotest_signature:
- msg.add_header('X-Autotest-Signature', autotest_signature)
-
- # attach files
- for filename in file_list:
- fullpath = os.path.join(self.source_path, filename)
-
- # Guess the content type based on the file's extension. Encoding
- # will be ignored, although we should check for simple things like
- # gzip'd or compressed files.
- ctype, encoding = mimetypes.guess_type(fullpath)
- if ctype is None or encoding is not None:
- # No guess could be made, or the file is encoded (compressed),
- # so use a generic bag-of-bits type.
- ctype = 'application/octet-stream'
-
- maintype, subtype = ctype.split('/', 1)
- log.debug("Creating message containing file {} of mime type {}"
- .format(filename, ctype))
- part = None
- if maintype == 'text':
- with open(fullpath, 'rt') as file_handle:
- # Note: we should handle calculating the charset
- part = MIMEText(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
- elif maintype == 'image':
- with open(fullpath, 'rb') as file_handle:
- part = MIMEImage(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
- elif maintype == 'audio':
- with open(fullpath, 'rb') as file_handle:
- part = MIMEAudio(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
- else:
- part = MIMEBase(maintype, subtype) # pylint:disable=redefined-variable-type
- with open(fullpath, 'rb') as file_handle:
- part.set_payload(file_handle.read())
- # Encode the payload using Base64
- encode_base64(part)
- # Set the filename parameter
- part.add_header('Content-Disposition', 'attachment',
- filename=filename)
- msg.attach(part)
-
- log.debug("Message successfully created")
- # send via SMTP
-
- log.debug("Sending message from %s to %s" % (self.smtp_sender, user))
- with smtplib.SMTP('localhost') as server:
- server.sendmail(self.smtp_sender, user, msg.as_string())
-
- # wait for transfer; complicated by isinstance(False, int) == True
- if wait_for_transfer is False:
- pass
- elif wait_for_transfer is True:
- arnied_wrapper.wait_for_email_transfer()
- else:
- arnied_wrapper.wait_for_email_transfer(timeout=wait_for_transfer)
-
- def _extract_email_paths(self, path, emails, timeout):
- """Check and return the absolute paths of a list of emails."""
- log.debug("Extracting messages %s", emails)
- if len(emails) == 0:
- emails = os.listdir(path)
- email_paths = []
- for expected_email in emails:
- # TODO: this can be improved by matching the emails themselves
- if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index",
- "Entw&APw-rfe", "Gesendete Objekte",
- "Gel&APY-schte Elemente", "mailboxes.dump",
- "tmp"]:
- continue
- email_path = os.path.join(path, expected_email)
- for i in range(timeout):
- if os.path.isfile(email_path):
- email_paths.append(email_path)
- break
- elif i == timeout - 1:
- raise EmailNotFound("Target message %s could not be found "
- "on server at %s within %ss"
- % (expected_email, path, timeout))
- time.sleep(1)
- log.debug("%s mails extracted at %s.", len(email_paths), path)
- return email_paths
-
- def _find_message_with_id(self, message_id, message_paths):
- """Find message with id among a list of message paths."""
- log.debug("Looking for a match for the message with id %s", message_id)
- for message_path in message_paths:
- extracted_id = self._extract_message_id(message_path)
- log.debug("Extracted id %s from candidate %s", extracted_id,
- message_path)
- if message_id == extracted_id:
- log.debug("Found match at %s", message_path)
- return message_path
- raise MismatchedEmailID("The message with id %s could not be matched "
- "or wasn't expected among %s"
- % (message_id, ", ".join(message_paths)))
-
- def _extract_message_id(self, message_path):
- """
- Given a message file path extract the Message-ID.
-
- :raises: :py:class:`MissingEmailID` if no Message-ID was found.
- """
- message_id = ""
- with open(message_path, errors='ignore') as file_handle:
- content = file_handle.read()
- for line in content.split("\n"):
- match_id = re.match("Autotest-Message-ID: (.+)", line)
- if match_id is not None:
- message_id = match_id.group(1).rstrip('\r\n')
- if message_id == "":
- raise MissingEmailID("No id was found in target message %s so it "
- "cannot be properly matched"
- % (message_path))
- return message_id
-
- def _default_compare_emails(self, source_email_path, target_email_path,
- tolerance=1):
- """
- Compare target emails with source ones.
-
- Uses python provided diff functionality to compare complete mail files.
- """
- with open(source_email_path, "r") as source_email_file:
- source_email = source_email_file.read()
- with open(target_email_path, "r") as target_email_file:
- target_email = target_email_file.read()
- matcher = difflib.SequenceMatcher(None, source_email, target_email)
- diffratio = matcher.ratio()
- log.debug("Target message comparison ratio is %s.", diffratio)
- # log.info("%s $$$ %s", source_email, target_email)
- if diffratio < tolerance:
- raise EmailMismatch("Target message is too different from the "
- "source (difference %s < tolerance %s).",
- diffratio, tolerance)
-
- def _compare_emails_by_basic_headers(self, source_email_path,
- target_email_path, tolerance=1):
- """
- Compare target emails with source ones.
-
- Uses python provided diff functionality to compare headers and mail
- "body".
-
- Argument `tolerance` not used!
- """
- with open(source_email_path, errors="ignore") as file_handle:
- source_email = Parser().parse(file_handle)
- source_body = ""
- for part in source_email.walk():
- if part.get_content_type() in ["text/plain", "text/html"]:
- source_body = part.get_payload()
- break
-
- with open(target_email_path, errors="ignore") as file_handle:
- target_email = Parser().parse(file_handle)
- target_body = ""
- for part in target_email.walk():
- if part.get_content_type() in ["text/plain", "text/html"]:
- target_body = part.get_payload()
- break
-
- if source_email['From'] != target_email['From']:
- raise EmailMismatch("Target message sender %s is too different "
- "from the source one %s" %
- (target_email['From'], source_email['From']))
- if source_email['To'] != target_email['To']:
- raise EmailMismatch("Target message recipient %s is too different "
- "from the source one %s" %
- (target_email['To'], source_email['To']))
- if source_email['Subject'] != target_email['Subject']:
- raise EmailMismatch("Target message subject '%s' is too different "
- "from the source one '%s'" %
- (target_email['Subject'],
- source_email['Subject']))
- if source_email['Date'] != target_email['Date']:
- raise EmailMismatch("Target message date %s is too different from "
- "the source one %s" %
- (target_email['Date'], source_email['Date']))
- if source_body != target_body:
- raise EmailMismatch("Target message body '%s' is too different "
- "from the source one '%s'" %
- (target_body, source_body))
-
- def _compare_emails_by_existence(self, source_email_path,
- target_email_path, tolerance=1):
- """
- Weak email validation based only on presence of file.
-
- DOES NOT CHECK ANYTHING!
- """
- return True
+log = logging.getLogger('pyi2ncommon.mail_utils')
def prep_email_header(email_file, value, regex=None, criterion="envelopeto"):
--- /dev/null
+# This Python file uses the following encoding: utf-8
+
+# The software in this package is distributed under the GNU General
+# Public License version 2 (with a special exception described below).
+#
+# A copy of GNU General Public License (GPL) is included in this distribution,
+# in the file COPYING.GPL.
+#
+# As a special exception, if other files instantiate templates or use macros
+# or inline functions from this file, or you compile this file and link it
+# with other works to produce a work based on this file, this file
+# does not by itself cause the resulting work to be covered
+# by the GNU General Public License.
+#
+# However the source code for this file must still be made available
+# in accordance with section (3) of the GNU General Public License.
+#
+# This exception does not invalidate any other reasons why a work based
+# on this file might be covered by the GNU General Public License.
+#
+# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
+
+"""
+
+SUMMARY
+------------------------------------------------------
+Class :py:class:`MailValidator`, a fully-featured email sender and checker.
+
+Copyright: Intra2net AG
+
+
+INTERFACE
+------------------------------------------------------
+
+"""
+
+import time
+import os
+import difflib
+import socket
+from inspect import currentframe
+import re
+import subprocess
+import logging
+
+import smtplib
+from email.mime.audio import MIMEAudio
+from email.mime.base import MIMEBase
+from email.mime.image import MIMEImage
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+from email.encoders import encode_base64
+from email.utils import formatdate
+from email.parser import Parser
+import mimetypes
+
+from . import arnied_wrapper
+
+log = logging.getLogger('pyi2ncommon.mail_utils')
+
+
+class EmailException(Exception):
+ """Base class for custom exceptions raised from `MailValidator`."""
+
+ pass
+
+
+class EmailNotFound(EmailException): # pylint: disable=missing-docstring
+ pass
+
+
+class InvalidEmailHeader(EmailException): # pylint: disable=missing-docstring
+ pass
+
+
+class InvalidEmailContent(EmailException): # pylint: disable=missing-docstring
+ pass
+
+
+class EmailIDError(EmailException): # pylint: disable=missing-docstring
+ pass
+
+
+class MismatchedEmailID(EmailIDError): # pylint: disable=missing-docstring
+ pass
+
+
+class MissingEmailID(EmailIDError): # pylint: disable=missing-docstring
+ pass
+
+
+class EmailMismatch(EmailException): # pylint: disable=missing-docstring
+ pass
+
+
+class MailValidator():
+ """Class for validation of emails."""
+
+ def target_path(self, new_value=None):
+ """Getter/Setter for property `target_path`."""
+ if new_value is not None:
+ self._target_path = new_value
+ else:
+ return self._target_path
+ target_path = property(target_path, target_path)
+
+ def source_path(self, new_value=None):
+ """Getter/Setter for property `source_path`."""
+ if new_value is not None:
+ self._source_path = new_value
+ else:
+ return self._source_path
+ source_path = property(source_path, source_path)
+
+ def smtp_sender(self, new_value=None):
+ """Getter/Setter for property `smtp_sender`."""
+ if new_value is not None:
+ self._smtp_sender = new_value
+ else:
+ return self._smtp_sender
+ smtp_sender = property(smtp_sender, smtp_sender)
+
+ def compare_emails_method(self, method="basic"):
+ """
+ Set email comparison method for validation.
+
+ :param str method: one of "basic", "headers"
+ :raises: :py:class:`ValueError` if chosen method is invalid
+ """
+ if method == "basic":
+ self._compare_emails_method = self._default_compare_emails
+ elif method == "headers":
+ self._compare_emails_method = self._compare_emails_by_basic_headers
+ elif method == "existence":
+ self._compare_emails_method = self._compare_emails_by_existence
+ else:
+ raise ValueError("Invalid email comparison method %s" % method)
+ compare_emails_method = property(fset=compare_emails_method)
+
+ def __init__(self, source_path, target_path):
+ """
+ Construct a validator instance.
+
+ :param str source_path: path to find source emails (not sent)
+ :param str target_path: path to find target emails (received)
+
+ .. note:: The comparison method can be redefined using the variety of
+ private method implementations.
+ """
+ self._target_path = target_path
+ self._source_path = source_path
+ self._smtp_sender = "no_source@inject.smtp"
+ self._compare_emails_method = self._default_compare_emails
+
+ def inject_emails(self, username, original_user):
+ """
+ Inject emails from `source_path` to `target_path`.
+
+ :param str username: username for the mail injection script
+ :param str original_user: original username for the mail injection
+ script
+
+ In order to restore acl rights as well put a mailbox.dump file in the
+ source path.
+ """
+ log.info("Injecting emails for user %s", username)
+
+ # inject emails from test data
+ cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \
+ " -s " + self.source_path
+ if original_user != "":
+ cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \
+ " -o " + original_user
+
+ result = subprocess.check_output(cmd, shell=True)
+ log.debug(result)
+
+ def inject_smtp(self, usernames, emails):
+ """
+ Inject emails from `source_path` using python's SMTP library.
+
+ :param usernames: usernames of the localhost receivers for each email
+ :type usernames: [str]
+ :param emails: emails to be sent to each user
+ :type emails: [str]
+ """
+ usernames_string = ",".join(usernames)
+ log.info("Sending emails to %s", usernames_string)
+ with smtplib.SMTP('localhost') as server:
+ hostname = socket.gethostname()
+ users = [username + "@" + hostname for username in usernames]
+
+ for email in emails:
+ log.info("Sending email %s", email)
+ with open(os.path.join(self.source_path, email), 'rb') \
+ as file_handle:
+ email_content = file_handle.read()
+ server.sendmail(self.smtp_sender, users, email_content)
+
+ # Wait till SMTP queue is processed
+ arnied_wrapper.wait_for_email_transfer()
+
+ def verify_email_id(self, email, emails_list, timeout, in_target=True):
+ """
+ Verify that the id of an email is present in a list.
+
+ Returns that email's match in this list.
+
+ :param str email: email filename
+ :param emails_list: email among which the first email has to be found
+ :type emails_list: [str]
+ :param int timeout: timeout for extracting the source and target emails
+ :param bool in_target: whether the verified email is on the target side
+
+ If `in_target` is set to True we are getting the target id from the
+ target list of a source email. Otherwise we assume a target email from
+ a source list.
+ """
+ if in_target:
+ email = self._extract_email_paths(self.source_path, [email],
+ timeout)[0]
+ emails_list = self._extract_email_paths(self.target_path,
+ emails_list, timeout)
+ else:
+ email = self._extract_email_paths(self.target_path, [email],
+ timeout)[0]
+ emails_list = self._extract_email_paths(self.source_path,
+ emails_list, timeout)
+
+ email_id = self._extract_message_id(email)
+ match = self._find_message_with_id(email_id, emails_list)
+ return os.path.basename(match)
+
+ def verify_emails(self, source_emails, target_emails, timeout):
+ """
+ Check injected e-mails for a user.
+
+ :param source_emails: emails at the source location
+ :type source_emails: [str]
+ :param target_emails: emails at the target (server) location
+ :type target_emails: [str]
+ :param int timeout: timeout for extracting the source and target emails
+ :raises: :py:class:`EmailNotFound` if target email is not found on
+ server
+ """
+ source_paths = self._extract_email_paths(self.source_path,
+ source_emails, timeout)
+ target_paths = self._extract_email_paths(self.target_path,
+ target_emails, timeout)
+
+ log.info("Verifying emails at %s with %s", self.target_path,
+ self.source_path)
+ for target in target_paths:
+ log.info("Verifying email %s", target)
+ target_id = self._extract_message_id(target)
+ source = self._find_message_with_id(target_id, source_paths)
+ source_paths.remove(source)
+ self._compare_emails_method(target, source, 1)
+
+ if len(source_paths) > 0:
+ raise EmailNotFound("%s target mails could not be found on server."
+ "\n%s"
+ % (len(source_paths), "\n".join(source_paths)))
+ else:
+ log.info("All e-mails at %s verified!", self.target_path)
+
+ def assert_header(self, emails, header, present_values=None,
+ absent_values=None, timeout=30):
+ """
+ Check headers for present and missing strings in a list of messages.
+
+ :param emails: emails whose headers will be checked
+ :type emails: [str]
+ :param str header: header that will be validated for each email
+ :param present_values: strings that have to be present in the header
+ :type present_values: [str] or None
+ :param absent_values: strings that have to be absent in the header
+ :type absent_values: [str] or None
+ :param int timeout: timeout for extracting the source and target emails
+ :raises: :py:class:`InvalidEmailHeader` if email header is not valid
+
+ Every list of present and respectively absent values contains
+ alternative values. At least one of present and one of absent should be
+ satisfied.
+ """
+ target_paths = self._extract_email_paths(self.target_path, emails,
+ timeout)
+ for email_path in target_paths:
+ with open(email_path, "r") as email_file:
+ verified_email = Parser().parse(email_file, headersonly=True)
+ log.debug("Extracted email headers:\n%s", verified_email)
+
+ log.info("Checking header '%s' in %s", header, email_path)
+ if not present_values:
+ present_values = []
+ else:
+ log.info("for present '%s'", "', '".join(present_values))
+ if not absent_values:
+ absent_values = []
+ else:
+ log.info("for absent '%s'", "', '".join(absent_values))
+ present_valid = False
+ for present in present_values:
+ if present in verified_email[header]:
+ present_valid = True
+ absent_valid = False
+ for absent in absent_values:
+ if absent not in verified_email[header]:
+ absent_valid = True
+
+ if not present_valid and len(present_values) > 0:
+ raise InvalidEmailHeader("Message header '%s' in %s is not "
+ "valid:\n%s"
+ % (header, email_path,
+ verified_email[header]))
+ if not absent_valid and len(absent_values) > 0:
+ raise InvalidEmailHeader("Message header '%s' in %s is not "
+ "valid:\n%s"
+ % (header, email_path,
+ verified_email[header]))
+ log.info("Message header '%s' in %s is valid!", header, email_path)
+
+ def assert_content(self, emails, content_type, present_values=None,
+ absent_values=None, timeout=30):
+ """
+ Check headers for present/missing strings in a list of messages.
+
+ :param emails: emails whose content will be checked
+ :type emails: [str]
+ :param str content_type: type of the content that will be checked for
+ values
+ :param present_values: strings that have to be present in the content
+ :type present_values: [str] or None
+ :param absent_values: strings that have to be absent in the content
+ :type absent_values: [str] or None
+ :param int timeout: timeout for extracting the source and target emails
+ :raises: :py:class:`InvalidEmailContent` if email content is not valid
+
+ Every list of present and respectively absent values contains
+ alternative values. At least one of present and one of absent should be
+ satisfied.
+ """
+ target_paths = self._extract_email_paths(self.target_path, emails,
+ timeout)
+ for email_path in target_paths:
+ with open(email_path, "r") as email_file:
+ verified_email = Parser().parse(email_file)
+ log.debug("Extracted email content:\n%s", verified_email)
+ content = ""
+ for part in verified_email.walk():
+ log.debug("Extracted %s part while looking for %s",
+ part.get_content_type(), content_type)
+ if part.get_content_type() == content_type:
+ content = part.get_payload(decode=True)
+ if isinstance(content, bytes):
+ content = content.decode()
+ # NOTE: only one such element is expected
+ break
+
+ log.info("Checking content '%s' in %s", content_type, email_path)
+ if not present_values:
+ present_values = []
+ else:
+ log.info("for present '%s'", "', '".join(present_values))
+ if not absent_values:
+ absent_values = []
+ else:
+ log.info("for absent '%s'", "', '".join(absent_values))
+ present_valid = False
+ for present in present_values:
+ if present in content:
+ present_valid = True
+ absent_valid = False
+ for absent in absent_values:
+ if absent not in content:
+ absent_valid = True
+
+ if not present_valid and len(present_values) > 0:
+ raise InvalidEmailContent("Message content '%s' in %s is not "
+ "valid:\n%s"
+ % (content_type, email_path, content))
+ if not absent_valid and len(absent_values) > 0:
+ raise InvalidEmailContent("Message content '%s' in %s is not "
+ "valid:\n%s"
+ % (content_type, email_path, content))
+ log.info("Message content '%s' in %s is valid!",
+ content_type, email_path)
+
+ def send_email_with_files(self, username, file_list,
+ wait_for_transfer=True,
+ autotest_signature=None,
+ subject="my subject"):
+ """
+ Send a generated email with attachments.
+
+ :param str username: username of a localhost receiver of the email
+ :param file_list: files attached to an email
+ :type file_list: [str]
+ :param wait_for_transfer: specify whether to wait until arnied_wrapper
+ confirms email transfer; you can also specify
+ a fixed timeout (seconds)
+ :type wait_for_transfer: bool or int
+ :param autotest_signature: text to insert as value for header
+ X-Autotest-Signature for simpler recognition
+ of mail (if None do not add header)
+ :type autotest_signature: str or None
+ """
+ text = 'This is an autogenerated email.\n'
+
+ hostname = socket.gethostname()
+ user = username + "@" + hostname
+
+ if file_list: # empty or None or so
+ msg = MIMEMultipart() # pylint: disable=redefined-variable-type
+ msg.attach(MIMEText(text, _charset='utf-8'))
+ else:
+ msg = MIMEText(text, _charset='utf-8') # pylint: disable=redefined-variable-type
+ msg['From'] = self.smtp_sender
+ msg['To'] = user
+ msg['Subject'] = subject
+ msg['Date'] = formatdate(localtime=True)
+ msg.preamble = 'This is a multi-part message in MIME format.\n'
+ msg.add_header('X-Autotest-Creator',
+ self.__class__.__module__ + '.' +
+ self.__class__.__name__ + '.' +
+ currentframe().f_code.co_name)
+ # (with help from http://stackoverflow.com/questions/5067604/determine-
+ # function-name-from-within-that-function-without-using-traceback)
+ if autotest_signature:
+ msg.add_header('X-Autotest-Signature', autotest_signature)
+
+ # attach files
+ for filename in file_list:
+ fullpath = os.path.join(self.source_path, filename)
+
+ # Guess the content type based on the file's extension. Encoding
+ # will be ignored, although we should check for simple things like
+ # gzip'd or compressed files.
+ ctype, encoding = mimetypes.guess_type(fullpath)
+ if ctype is None or encoding is not None:
+ # No guess could be made, or the file is encoded (compressed),
+ # so use a generic bag-of-bits type.
+ ctype = 'application/octet-stream'
+
+ maintype, subtype = ctype.split('/', 1)
+ log.debug("Creating message containing file {} of mime type {}"
+ .format(filename, ctype))
+ part = None
+ if maintype == 'text':
+ with open(fullpath, 'rt') as file_handle:
+ # Note: we should handle calculating the charset
+ part = MIMEText(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
+ elif maintype == 'image':
+ with open(fullpath, 'rb') as file_handle:
+ part = MIMEImage(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
+ elif maintype == 'audio':
+ with open(fullpath, 'rb') as file_handle:
+ part = MIMEAudio(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
+ else:
+ part = MIMEBase(maintype, subtype) # pylint:disable=redefined-variable-type
+ with open(fullpath, 'rb') as file_handle:
+ part.set_payload(file_handle.read())
+ # Encode the payload using Base64
+ encode_base64(part)
+ # Set the filename parameter
+ part.add_header('Content-Disposition', 'attachment',
+ filename=filename)
+ msg.attach(part)
+
+ log.debug("Message successfully created")
+ # send via SMTP
+
+ log.debug("Sending message from %s to %s" % (self.smtp_sender, user))
+ with smtplib.SMTP('localhost') as server:
+ server.sendmail(self.smtp_sender, user, msg.as_string())
+
+ # wait for transfer; complicated by isinstance(False, int) == True
+ if wait_for_transfer is False:
+ pass
+ elif wait_for_transfer is True:
+ arnied_wrapper.wait_for_email_transfer()
+ else:
+ arnied_wrapper.wait_for_email_transfer(timeout=wait_for_transfer)
+
+ def _extract_email_paths(self, path, emails, timeout):
+ """Check and return the absolute paths of a list of emails."""
+ log.debug("Extracting messages %s", emails)
+ if len(emails) == 0:
+ emails = os.listdir(path)
+ email_paths = []
+ for expected_email in emails:
+ # TODO: this can be improved by matching the emails themselves
+ if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index",
+ "Entw&APw-rfe", "Gesendete Objekte",
+ "Gel&APY-schte Elemente", "mailboxes.dump",
+ "tmp"]:
+ continue
+ email_path = os.path.join(path, expected_email)
+ for i in range(timeout):
+ if os.path.isfile(email_path):
+ email_paths.append(email_path)
+ break
+ elif i == timeout - 1:
+ raise EmailNotFound("Target message %s could not be found "
+ "on server at %s within %ss"
+ % (expected_email, path, timeout))
+ time.sleep(1)
+ log.debug("%s mails extracted at %s.", len(email_paths), path)
+ return email_paths
+
+ def _find_message_with_id(self, message_id, message_paths):
+ """Find message with id among a list of message paths."""
+ log.debug("Looking for a match for the message with id %s", message_id)
+ for message_path in message_paths:
+ extracted_id = self._extract_message_id(message_path)
+ log.debug("Extracted id %s from candidate %s", extracted_id,
+ message_path)
+ if message_id == extracted_id:
+ log.debug("Found match at %s", message_path)
+ return message_path
+ raise MismatchedEmailID("The message with id %s could not be matched "
+ "or wasn't expected among %s"
+ % (message_id, ", ".join(message_paths)))
+
+ def _extract_message_id(self, message_path):
+ """
+ Given a message file path extract the Message-ID.
+
+ :raises: :py:class:`MissingEmailID` if no Message-ID was found.
+ """
+ message_id = ""
+ with open(message_path, errors='ignore') as file_handle:
+ content = file_handle.read()
+ for line in content.split("\n"):
+ match_id = re.match("Autotest-Message-ID: (.+)", line)
+ if match_id is not None:
+ message_id = match_id.group(1).rstrip('\r\n')
+ if message_id == "":
+ raise MissingEmailID("No id was found in target message %s so it "
+ "cannot be properly matched"
+ % (message_path))
+ return message_id
+
+ def _default_compare_emails(self, source_email_path, target_email_path,
+ tolerance=1):
+ """
+ Compare target emails with source ones.
+
+ Uses python provided diff functionality to compare complete mail files.
+ """
+ with open(source_email_path, "r") as source_email_file:
+ source_email = source_email_file.read()
+ with open(target_email_path, "r") as target_email_file:
+ target_email = target_email_file.read()
+ matcher = difflib.SequenceMatcher(None, source_email, target_email)
+ diffratio = matcher.ratio()
+ log.debug("Target message comparison ratio is %s.", diffratio)
+ # log.info("%s $$$ %s", source_email, target_email)
+ if diffratio < tolerance:
+ raise EmailMismatch("Target message is too different from the "
+ "source (difference %s < tolerance %s).",
+ diffratio, tolerance)
+
+ def _compare_emails_by_basic_headers(self, source_email_path,
+ target_email_path, tolerance=1):
+ """
+ Compare target emails with source ones.
+
+ Uses python provided diff functionality to compare headers and mail
+ "body".
+
+ Argument `tolerance` not used!
+ """
+ with open(source_email_path, errors="ignore") as file_handle:
+ source_email = Parser().parse(file_handle)
+ source_body = ""
+ for part in source_email.walk():
+ if part.get_content_type() in ["text/plain", "text/html"]:
+ source_body = part.get_payload()
+ break
+
+ with open(target_email_path, errors="ignore") as file_handle:
+ target_email = Parser().parse(file_handle)
+ target_body = ""
+ for part in target_email.walk():
+ if part.get_content_type() in ["text/plain", "text/html"]:
+ target_body = part.get_payload()
+ break
+
+ if source_email['From'] != target_email['From']:
+ raise EmailMismatch("Target message sender %s is too different "
+ "from the source one %s" %
+ (target_email['From'], source_email['From']))
+ if source_email['To'] != target_email['To']:
+ raise EmailMismatch("Target message recipient %s is too different "
+ "from the source one %s" %
+ (target_email['To'], source_email['To']))
+ if source_email['Subject'] != target_email['Subject']:
+ raise EmailMismatch("Target message subject '%s' is too different "
+ "from the source one '%s'" %
+ (target_email['Subject'],
+ source_email['Subject']))
+ if source_email['Date'] != target_email['Date']:
+ raise EmailMismatch("Target message date %s is too different from "
+ "the source one %s" %
+ (target_email['Date'], source_email['Date']))
+ if source_body != target_body:
+ raise EmailMismatch("Target message body '%s' is too different "
+ "from the source one '%s'" %
+ (target_body, source_body))
+
+ def _compare_emails_by_existence(self, source_email_path,
+ target_email_path, tolerance=1):
+ """
+ Weak email validation based only on presence of file.
+
+ DOES NOT CHECK ANYTHING!
+ """
+ return True