# This Python file uses the following encoding: utf-8 # The software in this package is distributed under the GNU General # Public License version 2 (with a special exception described below). # # A copy of GNU General Public License (GPL) is included in this distribution, # in the file COPYING.GPL. # # As a special exception, if other files instantiate templates or use macros # or inline functions from this file, or you compile this file and link it # with other works to produce a work based on this file, this file # does not by itself cause the resulting work to be covered # by the GNU General Public License. # # However the source code for this file must still be made available # in accordance with section (3) of the GNU General Public License. # # This exception does not invalidate any other reasons why a work based # on this file might be covered by the GNU General Public License. # # Copyright (c) 2016-2018 Intra2net AG """ Class :py:class:`MailValidator`, a fully-featured email sender and checker. Copyright: Intra2net AG """ import time import os import difflib import socket from inspect import currentframe import re import subprocess import logging import smtplib from email.mime.audio import MIMEAudio from email.mime.base import MIMEBase from email.mime.image import MIMEImage from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.encoders import encode_base64 from email.utils import formatdate from email.parser import Parser import mimetypes from . import arnied_wrapper log = logging.getLogger('pyi2ncommon.mail_utils') class EmailException(Exception): """Base class for custom exceptions raised from `MailValidator`.""" pass class EmailNotFound(EmailException): # pylint: disable=missing-docstring pass class InvalidEmailHeader(EmailException): # pylint: disable=missing-docstring pass class InvalidEmailContent(EmailException): # pylint: disable=missing-docstring pass class EmailIDError(EmailException): # pylint: disable=missing-docstring pass class MismatchedEmailID(EmailIDError): # pylint: disable=missing-docstring pass class MissingEmailID(EmailIDError): # pylint: disable=missing-docstring pass class EmailMismatch(EmailException): # pylint: disable=missing-docstring pass class MailValidator: """Class for validation of emails.""" def target_path(self, new_value=None): """Getter/Setter for property `target_path`.""" if new_value is not None: self._target_path = new_value else: return self._target_path target_path = property(target_path, target_path) def source_path(self, new_value=None): """Getter/Setter for property `source_path`.""" if new_value is not None: self._source_path = new_value else: return self._source_path source_path = property(source_path, source_path) def smtp_sender(self, new_value=None): """Getter/Setter for property `smtp_sender`.""" if new_value is not None: self._smtp_sender = new_value else: return self._smtp_sender smtp_sender = property(smtp_sender, smtp_sender) def compare_emails_method(self, method="basic"): """ Set email comparison method for validation. :param str method: one of "basic", "headers" :raises: :py:class:`ValueError` if chosen method is invalid """ if method == "basic": self._compare_emails_method = self._default_compare_emails elif method == "headers": self._compare_emails_method = self._compare_emails_by_basic_headers elif method == "existence": self._compare_emails_method = self._compare_emails_by_existence else: raise ValueError("Invalid email comparison method %s" % method) compare_emails_method = property(fset=compare_emails_method) def __init__(self, source_path, target_path): """ Construct a validator instance. :param str source_path: path to find source emails (not sent) :param str target_path: path to find target emails (received) .. note:: The comparison method can be redefined using the variety of private method implementations. """ self._target_path = target_path self._source_path = source_path self._smtp_sender = "no_source@inject.smtp" self._compare_emails_method = self._default_compare_emails def inject_emails(self, username, original_user): """ Inject emails from `source_path` to `target_path`. This uses the script *restore_mail_inject.pl* which injects the mails using IMAP (as opposed to :py:meth:`inject_smtp`). :param str username: username for the mail injection script :param str original_user: original username for the mail injection script In order to restore acl rights as well put a mailbox.dump file in the source path. """ log.info("Injecting emails for user %s", username) # inject emails from test data cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \ " -s " + self.source_path if original_user != "": cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \ " -o " + original_user result = subprocess.check_output(cmd, shell=True) log.debug(result) def _prepare_recipients(self, recipients): """ Prepare recipient list: ensure list of proper addresses. If given a simple string, make a list of strings out of it. If any recipient is just a username, append "@" + localhost to it. Also check that recipients are just email addresses. """ hostname = socket.gethostname() if isinstance(recipients, str): recipients = [recipients, ] result = [] for recipient in recipients: if '@' in recipient: result.append(recipient) else: result.append(recipient + '@' + hostname) for bad_char in '<>"\'': if bad_char in recipient: raise ValueError('Recipient must be a "raw" email address,' ' not {!r}'.format(recipient)) return result def inject_smtp(self, usernames, emails): """ Inject emails from `source_path` using python's SMTP library. As opposed to :py:meth:`inject_emails`, this actually sends the mail to the local mail server (meaning filtering, archiving, ... will happen). :param usernames: username(s) of the localhost receiver(s) for each email or proper email address(es) :type usernames: str or [str] :param emails: paths to files including full emails (header + body) to be sent to each user :type emails: [str] """ recipients = self._prepare_recipients(usernames) log.info("Sending emails to %s", ','.join(recipients)) with smtplib.SMTP('localhost') as server: for email in emails: log.info("Sending email %s", email) with open(os.path.join(self.source_path, email), 'rb') \ as file_handle: email_content = file_handle.read() server.sendmail(self.smtp_sender, recipients, email_content) # Wait till SMTP queue is processed arnied_wrapper.wait_for_email_transfer() def verify_email_id(self, email, emails_list, timeout, in_target=True): """ Verify that the id of an email is present in a list. Returns that email's match in this list. :param str email: email filename :param emails_list: email among which the first email has to be found :type emails_list: [str] :param int timeout: timeout for extracting the source and target emails :param bool in_target: whether the verified email is on the target side If `in_target` is set to True we are getting the target id from the target list of a source email. Otherwise, we assume a target email from a source list. """ if in_target: email = self._extract_email_paths(self.source_path, [email], timeout)[0] emails_list = self._extract_email_paths(self.target_path, emails_list, timeout) else: email = self._extract_email_paths(self.target_path, [email], timeout)[0] emails_list = self._extract_email_paths(self.source_path, emails_list, timeout) email_id = self._extract_message_id(email) match = self._find_message_with_id(email_id, emails_list) return os.path.basename(match) def verify_emails(self, source_emails, target_emails, timeout): """ Check injected e-mails for a user. :param source_emails: emails at the source location :type source_emails: [str] :param target_emails: emails at the target (server) location :type target_emails: [str] :param int timeout: timeout for extracting the source and target emails :raises: :py:class:`EmailNotFound` if target email is not found on server """ source_paths = self._extract_email_paths(self.source_path, source_emails, timeout) target_paths = self._extract_email_paths(self.target_path, target_emails, timeout) log.info("Verifying emails at %s with %s", self.target_path, self.source_path) for target in target_paths: log.info("Verifying email %s", target) target_id = self._extract_message_id(target) source = self._find_message_with_id(target_id, source_paths) source_paths.remove(source) self._compare_emails_method(target, source, 1) if len(source_paths) > 0: raise EmailNotFound("%s target mails could not be found on server." "\n%s" % (len(source_paths), "\n".join(source_paths))) else: log.info("All e-mails at %s verified!", self.target_path) def assert_header(self, emails, header, present_values=None, absent_values=None, timeout=30): """ Check headers for present and missing strings in a list of messages. :param emails: emails whose headers will be checked :type emails: [str] :param str header: header that will be validated for each email :param present_values: strings that have to be present in the header :type present_values: [str] or None :param absent_values: strings that have to be absent in the header :type absent_values: [str] or None :param int timeout: timeout for extracting the source and target emails :raises: :py:class:`InvalidEmailHeader` if email header is not valid Every list of present and respectively absent values contains alternative values. At least one of present and one of absent should be satisfied. """ target_paths = self._extract_email_paths(self.target_path, emails, timeout) for email_path in target_paths: with open(email_path, "r") as email_file: verified_email = Parser().parse(email_file, headersonly=True) log.debug("Extracted email headers:\n%s", verified_email) log.info("Checking header '%s' in %s", header, email_path) if not present_values: present_values = [] else: log.info("for present '%s'", "', '".join(present_values)) if not absent_values: absent_values = [] else: log.info("for absent '%s'", "', '".join(absent_values)) present_valid = False for present in present_values: if present in verified_email[header]: present_valid = True absent_valid = False for absent in absent_values: if absent not in verified_email[header]: absent_valid = True if not present_valid and len(present_values) > 0: raise InvalidEmailHeader("Message header '%s' in %s is not " "valid:\n%s" % (header, email_path, verified_email[header])) if not absent_valid and len(absent_values) > 0: raise InvalidEmailHeader("Message header '%s' in %s is not " "valid:\n%s" % (header, email_path, verified_email[header])) log.info("Message header '%s' in %s is valid!", header, email_path) def assert_content(self, emails, content_type, present_values=None, absent_values=None, timeout=30): """ Check headers for present/missing strings in a list of messages. :param emails: emails whose content will be checked :type emails: [str] :param str content_type: type of the content that will be checked for values :param present_values: strings that have to be present in the content :type present_values: [str] or None :param absent_values: strings that have to be absent in the content :type absent_values: [str] or None :param int timeout: timeout for extracting the source and target emails :raises: :py:class:`InvalidEmailContent` if email content is not valid Every list of present and respectively absent values contains alternative values. At least one of present and one of absent should be satisfied. """ target_paths = self._extract_email_paths(self.target_path, emails, timeout) for email_path in target_paths: with open(email_path, "r") as email_file: verified_email = Parser().parse(email_file) log.debug("Extracted email content:\n%s", verified_email) content = "" for part in verified_email.walk(): log.debug("Extracted %s part while looking for %s", part.get_content_type(), content_type) if part.get_content_type() == content_type: content = part.get_payload(decode=True) if isinstance(content, bytes): content = content.decode() # NOTE: only one such element is expected break log.info("Checking content '%s' in %s", content_type, email_path) if not present_values: present_values = [] else: log.info("for present '%s'", "', '".join(present_values)) if not absent_values: absent_values = [] else: log.info("for absent '%s'", "', '".join(absent_values)) present_valid = False for present in present_values: if present in content: present_valid = True absent_valid = False for absent in absent_values: if absent not in content: absent_valid = True if not present_valid and len(present_values) > 0: raise InvalidEmailContent("Message content '%s' in %s is not " "valid:\n%s" % (content_type, email_path, content)) if not absent_valid and len(absent_values) > 0: raise InvalidEmailContent("Message content '%s' in %s is not " "valid:\n%s" % (content_type, email_path, content)) log.info("Message content '%s' in %s is valid!", content_type, email_path) def send_email_with_files(self, usernames, file_list, wait_for_transfer=True, autotest_signature=None, subject="my subject"): """ Send a generated email with optional attachments. :param usernames: username(s) of the localhost receiver(s) or proper email address(es) :type usernames: str or [str] :param file_list: files attached to an email; can be empty :type file_list: [str] :param wait_for_transfer: specify whether to wait until arnied_wrapper confirms email transfer; you can also specify a fixed timeout (seconds) :type wait_for_transfer: bool or int :param autotest_signature: text to insert as value for header X-Autotest-Signature for simpler recognition of mail (if None do not add header) :type autotest_signature: str or None :param str subject: Subject of created mails """ text = 'This is an autogenerated email.\n' recipients = self._prepare_recipients(usernames) if file_list: # empty or None or so msg = MIMEMultipart() # pylint: disable=redefined-variable-type msg.attach(MIMEText(text, _charset='utf-8')) else: msg = MIMEText(text, _charset='utf-8') # pylint: disable=redefined-variable-type msg['From'] = self.smtp_sender msg['To'] = ', '.join(recipients) msg['Subject'] = subject msg['Date'] = formatdate(localtime=True) msg.preamble = 'This is a multi-part message in MIME format.\n' msg.add_header('X-Autotest-Creator', self.__class__.__module__ + '.' + self.__class__.__name__ + '.' + currentframe().f_code.co_name) # (with help from http://stackoverflow.com/questions/5067604/determine- # function-name-from-within-that-function-without-using-traceback) if autotest_signature: msg.add_header('X-Autotest-Signature', autotest_signature) # attach files for filename in file_list: fullpath = os.path.join(self.source_path, filename) # Guess the content type based on the file's extension. Encoding # will be ignored, although we should check for simple things like # gzip'd or compressed files. ctype, encoding = mimetypes.guess_type(fullpath) if ctype is None or encoding is not None: # No guess could be made, or the file is encoded (compressed), # so use a generic bag-of-bits type. ctype = 'application/octet-stream' maintype, subtype = ctype.split('/', 1) log.debug("Creating message containing file {} of mime type {}" .format(filename, ctype)) part = None if maintype == 'text': with open(fullpath, 'rt') as file_handle: # Note: we should handle calculating the charset part = MIMEText(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type elif maintype == 'image': with open(fullpath, 'rb') as file_handle: part = MIMEImage(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type elif maintype == 'audio': with open(fullpath, 'rb') as file_handle: part = MIMEAudio(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type else: part = MIMEBase(maintype, subtype) # pylint:disable=redefined-variable-type with open(fullpath, 'rb') as file_handle: part.set_payload(file_handle.read()) # Encode the payload using Base64 encode_base64(part) # Set the filename parameter part.add_header('Content-Disposition', 'attachment', filename=filename) msg.attach(part) log.debug("Message successfully created") # send via SMTP log.debug("Sending message from %s to %s" % (self.smtp_sender, ', '.join(recipients))) with smtplib.SMTP('localhost') as server: server.sendmail(self.smtp_sender, recipients, msg.as_string()) # wait for transfer; complicated by isinstance(False, int) == True if wait_for_transfer is False: pass elif wait_for_transfer is True: arnied_wrapper.wait_for_email_transfer() else: arnied_wrapper.wait_for_email_transfer(timeout=wait_for_transfer) def _extract_email_paths(self, path, emails, timeout): """Check and return the absolute paths of a list of emails.""" log.debug("Extracting messages %s", emails) if len(emails) == 0: emails = os.listdir(path) email_paths = [] for expected_email in emails: # TODO: this can be improved by matching the emails themselves if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index", "Entw&APw-rfe", "Gesendete Elemente", "Gel&APY-schte Elemente", "mailboxes.dump", "tmp"]: continue email_path = os.path.join(path, expected_email) for i in range(timeout): if os.path.isfile(email_path): email_paths.append(email_path) break elif i == timeout - 1: raise EmailNotFound("Target message %s could not be found " "on server at %s within %ss" % (expected_email, path, timeout)) time.sleep(1) log.debug("%s mails extracted at %s.", len(email_paths), path) return email_paths def _find_message_with_id(self, message_id, message_paths): """Find message with id among a list of message paths.""" log.debug("Looking for a match for the message with id %s", message_id) for message_path in message_paths: extracted_id = self._extract_message_id(message_path) log.debug("Extracted id %s from candidate %s", extracted_id, message_path) if message_id == extracted_id: log.debug("Found match at %s", message_path) return message_path raise MismatchedEmailID("The message with id %s could not be matched " "or wasn't expected among %s" % (message_id, ", ".join(message_paths))) def _extract_message_id(self, message_path): """ Given a message file path extract the Message-ID. :raises: :py:class:`MissingEmailID` if no Message-ID was found. """ message_id = "" with open(message_path, errors='ignore') as file_handle: content = file_handle.read() for line in content.split("\n"): match_id = re.match("Autotest-Message-ID: (.+)", line) if match_id is not None: message_id = match_id.group(1).rstrip('\r\n') if message_id == "": raise MissingEmailID(f"No id was found in target message {message_path}, " f"so it cannot be properly matched") return message_id def _default_compare_emails(self, source_email_path, target_email_path, tolerance=1): """ Compare target emails with source ones. Uses python provided diff functionality to compare complete mail files. """ with open(source_email_path, "r") as source_email_file: source_email = source_email_file.read() with open(target_email_path, "r") as target_email_file: target_email = target_email_file.read() matcher = difflib.SequenceMatcher(None, source_email, target_email) diffratio = matcher.ratio() log.debug("Target message comparison ratio is %s.", diffratio) # log.info("%s $$$ %s", source_email, target_email) if diffratio < tolerance: raise EmailMismatch("Target message is too different from the " "source (difference %s < tolerance %s).", diffratio, tolerance) def _compare_emails_by_basic_headers(self, source_email_path, target_email_path, tolerance=1): """ Compare target emails with source ones. Uses python provided diff functionality to compare headers and mail "body". Argument `tolerance` not used! """ with open(source_email_path, errors="ignore") as file_handle: source_email = Parser().parse(file_handle) source_body = "" for part in source_email.walk(): if part.get_content_type() in ["text/plain", "text/html"]: source_body = part.get_payload() break with open(target_email_path, errors="ignore") as file_handle: target_email = Parser().parse(file_handle) target_body = "" for part in target_email.walk(): if part.get_content_type() in ["text/plain", "text/html"]: target_body = part.get_payload() break if source_email['From'] != target_email['From']: raise EmailMismatch("Target message sender %s is too different " "from the source one %s" % (target_email['From'], source_email['From'])) if source_email['To'] != target_email['To']: raise EmailMismatch("Target message recipient %s is too different " "from the source one %s" % (target_email['To'], source_email['To'])) if source_email['Subject'] != target_email['Subject']: raise EmailMismatch("Target message subject '%s' is too different " "from the source one '%s'" % (target_email['Subject'], source_email['Subject'])) if source_email['Date'] != target_email['Date']: raise EmailMismatch("Target message date %s is too different from " "the source one %s" % (target_email['Date'], source_email['Date'])) if source_body != target_body: raise EmailMismatch("Target message body '%s' is too different " "from the source one '%s'" % (target_body, source_body)) def _compare_emails_by_existence(self, source_email_path, target_email_path, tolerance=1): """ Weak email validation based only on presence of file. DOES NOT CHECK ANYTHING! """ return True