SUMMARY
------------------------------------------------------
-Guest utility for fetchmail, spamassassin, horde and other email functionality tests.
+Guest utility for fetchmail, spamassassin, horde and other email functionality
+tests.
Copyright: Intra2net AG
import re
import subprocess
import logging
-log = logging.getLogger('pyi2ncommon.mail_utils')
import smtplib
from email.mime.audio import MIMEAudio
from . import arnied_wrapper
+log = logging.getLogger('pyi2ncommon.mail_utils')
+
class EmailException(Exception):
+ """Base class for custom exceptions raised from `MailValidator`."""
+
pass
-class EmailNotFound(EmailException):
+class EmailNotFound(EmailException): # pylint: disable=missing-docstring
pass
-class InvalidEmailHeader(EmailException):
+class InvalidEmailHeader(EmailException): # pylint: disable=missing-docstring
pass
-class InvalidEmailContent(EmailException):
+class InvalidEmailContent(EmailException): # pylint: disable=missing-docstring
pass
-class EmailIDError(EmailException):
+class EmailIDError(EmailException): # pylint: disable=missing-docstring
pass
-class MismatchedEmailID(EmailIDError):
+class MismatchedEmailID(EmailIDError): # pylint: disable=missing-docstring
pass
-class MissingEmailID(EmailIDError):
+class MissingEmailID(EmailIDError): # pylint: disable=missing-docstring
pass
-class EmailMismatch(EmailException):
+class EmailMismatch(EmailException): # pylint: disable=missing-docstring
pass
class MailValidator():
"""Class for validation of emails."""
- def target_path(self, v=None):
- if v is not None:
- self._target_path = v
+ def target_path(self, new_value=None):
+ """Getter/Setter for property `target_path`."""
+ if new_value is not None:
+ self._target_path = new_value
else:
return self._target_path
target_path = property(target_path, target_path)
- def source_path(self, v=None):
- if v is not None:
- self._source_path = v
+ def source_path(self, new_value=None):
+ """Getter/Setter for property `source_path`."""
+ if new_value is not None:
+ self._source_path = new_value
else:
return self._source_path
source_path = property(source_path, source_path)
- def smtp_sender(self, v=None):
- if v is not None:
- self._smtp_sender = v
+ def smtp_sender(self, new_value=None):
+ """Getter/Setter for property `smtp_sender`."""
+ if new_value is not None:
+ self._smtp_sender = new_value
else:
return self._smtp_sender
smtp_sender = property(smtp_sender, smtp_sender)
self._target_path = target_path
self._source_path = source_path
self._smtp_sender = "no_source@inject.smtp"
- self._compare_emails_method= self._default_compare_emails
+ self._compare_emails_method = self._default_compare_emails
def inject_emails(self, username, original_user):
"""
Inject emails from `source_path` to `target_path`.
:param str username: username for the mail injection script
- :param str original_user: original username for the mail injection script
+ :param str original_user: original username for the mail injection
+ script
- In order to restore acl rights as well put a mailbox.dump file in the source path.
+ In order to restore acl rights as well put a mailbox.dump file in the
+ source path.
"""
log.info("Injecting emails for user %s", username)
# inject emails from test data
- cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + " -s " + self.source_path
+ cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \
+ " -s " + self.source_path
if original_user != "":
- cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + " -o " + original_user
+ cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \
+ " -o " + original_user
result = subprocess.check_output(cmd, shell=True)
log.debug(result)
for email in emails:
log.info("Sending email %s", email)
- with open(os.path.join(self.source_path, email), 'rb') as f:
- email_content = f.read()
+ with open(os.path.join(self.source_path, email), 'rb') \
+ as file_handle:
+ email_content = file_handle.read()
server.sendmail(self.smtp_sender, users, email_content)
# Wait till SMTP queue is processed
def verify_email_id(self, email, emails_list, timeout, in_target=True):
"""
- Verify that the id of an email is present in a list and return that email's
- match in this list.
+ Verify that the id of an email is present in a list.
+
+ Returns that email's match in this list.
:param str email: email filename
:param emails_list: email among which the first email has to be found
:param int timeout: timeout for extracting the source and target emails
:param bool in_target: whether the verified email is on the target side
- If `in_target` is set to True we are getting the target id from the target list
- of a source email. Otherwise we assume a target email from a source list.
+ If `in_target` is set to True we are getting the target id from the
+ target list of a source email. Otherwise we assume a target email from
+ a source list.
"""
if in_target:
- email = self._extract_email_paths(self.source_path, [email], timeout)[0]
- emails_list = self._extract_email_paths(self.target_path, emails_list, timeout)
+ email = self._extract_email_paths(self.source_path, [email],
+ timeout)[0]
+ emails_list = self._extract_email_paths(self.target_path,
+ emails_list, timeout)
else:
- email = self._extract_email_paths(self.target_path, [email], timeout)[0]
- emails_list = self._extract_email_paths(self.source_path, emails_list, timeout)
+ email = self._extract_email_paths(self.target_path, [email],
+ timeout)[0]
+ emails_list = self._extract_email_paths(self.source_path,
+ emails_list, timeout)
email_id = self._extract_message_id(email)
match = self._find_message_with_id(email_id, emails_list)
:param target_emails: emails at the target (server) location
:type target_emails: [str]
:param int timeout: timeout for extracting the source and target emails
- :raises: :py:class:`EmailNotFound` if target email is not found on server
+ :raises: :py:class:`EmailNotFound` if target email is not found on
+ server
"""
- source_paths = self._extract_email_paths(self.source_path, source_emails, timeout)
- target_paths = self._extract_email_paths(self.target_path, target_emails, timeout)
+ source_paths = self._extract_email_paths(self.source_path,
+ source_emails, timeout)
+ target_paths = self._extract_email_paths(self.target_path,
+ target_emails, timeout)
- log.info("Verifying emails at %s with %s", self.target_path, self.source_path)
+ log.info("Verifying emails at %s with %s", self.target_path,
+ self.source_path)
for target in target_paths:
log.info("Verifying email %s", target)
target_id = self._extract_message_id(target)
self._compare_emails_method(target, source, 1)
if len(source_paths) > 0:
- raise EmailNotFound("%s target mails could not be found on server.\n%s"
+ raise EmailNotFound("%s target mails could not be found on server."
+ "\n%s"
% (len(source_paths), "\n".join(source_paths)))
else:
log.info("All e-mails at %s verified!", self.target_path)
- def assert_header(self, emails, header, present_values=None, absent_values=None, timeout=30):
+ def assert_header(self, emails, header, present_values=None,
+ absent_values=None, timeout=30):
"""
- Check headers for contained and not contained strings in a list of messages.
+ Check headers for present and missing strings in a list of messages.
:param emails: emails whose headers will be checked
:type emails: [str]
:param int timeout: timeout for extracting the source and target emails
:raises: :py:class:`InvalidEmailHeader` if email header is not valid
- Every list of present and respectively absent values contains alternative values.
- At least one of present and one of absent should be satisfied.
+ Every list of present and respectively absent values contains
+ alternative values. At least one of present and one of absent should be
+ satisfied.
"""
- target_paths = self._extract_email_paths(self.target_path, emails, timeout)
+ target_paths = self._extract_email_paths(self.target_path, emails,
+ timeout)
for email_path in target_paths:
with open(email_path, "r") as email_file:
verified_email = Parser().parse(email_file, headersonly=True)
absent_valid = True
if not present_valid and len(present_values) > 0:
- raise InvalidEmailHeader("Message header '%s' in %s is not valid:\n%s"
- % (header, email_path, verified_email[header]))
+ raise InvalidEmailHeader("Message header '%s' in %s is not "
+ "valid:\n%s"
+ % (header, email_path,
+ verified_email[header]))
if not absent_valid and len(absent_values) > 0:
- raise InvalidEmailHeader("Message header '%s' in %s is not valid:\n%s"
- % (header, email_path, verified_email[header]))
+ raise InvalidEmailHeader("Message header '%s' in %s is not "
+ "valid:\n%s"
+ % (header, email_path,
+ verified_email[header]))
log.info("Message header '%s' in %s is valid!", header, email_path)
- def assert_content(self, emails, content_type, present_values=None, absent_values=None, timeout=30):
+ def assert_content(self, emails, content_type, present_values=None,
+ absent_values=None, timeout=30):
"""
- Check headers for contained and not contained strings in a list of messages,
+ Check headers for present/missing strings in a list of messages.
:param emails: emails whose content will be checked
:type emails: [str]
- :param str content_type: type of the content that will be checked for values
+ :param str content_type: type of the content that will be checked for
+ values
:param present_values: strings that have to be present in the content
:type present_values: [str] or None
:param absent_values: strings that have to be absent in the content
:param int timeout: timeout for extracting the source and target emails
:raises: :py:class:`InvalidEmailContent` if email content is not valid
- Every list of present and respectively absent values contains alternative values.
- At least one of present and one of absent should be satisfied.
+ Every list of present and respectively absent values contains
+ alternative values. At least one of present and one of absent should be
+ satisfied.
"""
- target_paths = self._extract_email_paths(self.target_path, emails, timeout)
+ target_paths = self._extract_email_paths(self.target_path, emails,
+ timeout)
for email_path in target_paths:
with open(email_path, "r") as email_file:
verified_email = Parser().parse(email_file)
absent_valid = True
if not present_valid and len(present_values) > 0:
- raise InvalidEmailContent("Message content '%s' in %s is not valid:\n%s"
+ raise InvalidEmailContent("Message content '%s' in %s is not "
+ "valid:\n%s"
% (content_type, email_path, content))
if not absent_valid and len(absent_values) > 0:
- raise InvalidEmailContent("Message content '%s' in %s is not valid:\n%s"
+ raise InvalidEmailContent("Message content '%s' in %s is not "
+ "valid:\n%s"
% (content_type, email_path, content))
- log.info("Message content '%s' in %s is valid!", content_type, email_path)
+ log.info("Message content '%s' in %s is valid!",
+ content_type, email_path)
def send_email_with_files(self, username, file_list,
wait_for_transfer=True,
autotest_signature=None,
subject="my subject"):
"""
- Send a generated email with attachments instead of an .eml file
- containing attachments.
+ Send a generated email with attachments.
:param str username: username of a localhost receiver of the email
:param file_list: files attached to an email
# gzip'd or compressed files.
ctype, encoding = mimetypes.guess_type(fullpath)
if ctype is None or encoding is not None:
- # No guess could be made, or the file is encoded (compressed), so
- # use a generic bag-of-bits type.
+ # No guess could be made, or the file is encoded (compressed),
+ # so use a generic bag-of-bits type.
ctype = 'application/octet-stream'
maintype, subtype = ctype.split('/', 1)
log.debug("Message successfully created")
# send via SMTP
- log.debug("Sending message from %s to %s" %
- (self.smtp_sender, user))
+ log.debug("Sending message from %s to %s" % (self.smtp_sender, user))
with smtplib.SMTP('localhost') as server:
server.sendmail(self.smtp_sender, user, msg.as_string())
for expected_email in emails:
# TODO: this can be improved by matching the emails themselves
if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index",
- "Entw&APw-rfe", "Gesendete Objekte", "Gel&APY-schte Elemente",
- "mailboxes.dump", "tmp"]:
+ "Entw&APw-rfe", "Gesendete Objekte",
+ "Gel&APY-schte Elemente", "mailboxes.dump",
+ "tmp"]:
continue
email_path = os.path.join(path, expected_email)
for i in range(timeout):
email_paths.append(email_path)
break
elif i == timeout - 1:
- raise EmailNotFound("Target message %s could not be found on server at %s within %ss"
+ raise EmailNotFound("Target message %s could not be found "
+ "on server at %s within %ss"
% (expected_email, path, timeout))
time.sleep(1)
log.debug("%s mails extracted at %s.", len(email_paths), path)
log.debug("Looking for a match for the message with id %s", message_id)
for message_path in message_paths:
extracted_id = self._extract_message_id(message_path)
- log.debug("Extracted id %s from candidate %s", extracted_id, message_path)
+ log.debug("Extracted id %s from candidate %s", extracted_id,
+ message_path)
if message_id == extracted_id:
log.debug("Found match at %s", message_path)
return message_path
- raise MismatchedEmailID("The message with id %s could not be matched or wasn't expected among %s"
+ raise MismatchedEmailID("The message with id %s could not be matched "
+ "or wasn't expected among %s"
% (message_id, ", ".join(message_paths)))
def _extract_message_id(self, message_path):
"""
- Given a message file path extract the Message-ID and raise error if
- none was found.
+ Given a message file path extract the Message-ID.
+
+ :raises: :py:class:`MissingEmailID` if no Message-ID was found.
"""
message_id = ""
- with open(message_path, errors='ignore') as f:
- content = f.read()
+ with open(message_path, errors='ignore') as file_handle:
+ content = file_handle.read()
for line in content.split("\n"):
match_id = re.match("Autotest-Message-ID: (.+)", line)
if match_id is not None:
message_id = match_id.group(1).rstrip('\r\n')
if message_id == "":
- raise MissingEmailID("No id was found in target message %s so it cannot be properly matched"
+ raise MissingEmailID("No id was found in target message %s so it "
+ "cannot be properly matched"
% (message_path))
return message_id
- def _default_compare_emails(self, source_email_path, target_email_path, tolerance=1):
- """Use python provided diff functionality to compare target emails with source ones."""
+ def _default_compare_emails(self, source_email_path, target_email_path,
+ tolerance=1):
+ """
+ Compare target emails with source ones.
+
+ Uses python provided diff functionality to compare complete mail files.
+ """
with open(source_email_path, "r") as source_email_file:
source_email = source_email_file.read()
with open(target_email_path, "r") as target_email_file:
target_email = target_email_file.read()
- s = difflib.SequenceMatcher(None, source_email, target_email)
- diffratio = s.ratio()
+ matcher = difflib.SequenceMatcher(None, source_email, target_email)
+ diffratio = matcher.ratio()
log.debug("Target message comparison ratio is %s.", diffratio)
- #log.info("%s $$$ %s", source_email, target_email)
+ # log.info("%s $$$ %s", source_email, target_email)
if diffratio < tolerance:
- raise EmailMismatch("Target message is too different from the source (difference %s < tolerance %s).",
+ raise EmailMismatch("Target message is too different from the "
+ "source (difference %s < tolerance %s).",
diffratio, tolerance)
- def _compare_emails_by_basic_headers(self, source_email_path, target_email_path, tolerance=1):
- """Use python provided diff functionality to compare target emails with source ones."""
- with open(source_email_path, errors="ignore") as f:
- source_email = Parser().parse(f)
+ def _compare_emails_by_basic_headers(self, source_email_path,
+ target_email_path, tolerance=1):
+ """
+ Compare target emails with source ones.
+
+ Uses python provided diff functionality to compare headers and mail
+ "body".
+
+ Argument `tolerance` not used!
+ """
+ with open(source_email_path, errors="ignore") as file_handle:
+ source_email = Parser().parse(file_handle)
source_body = ""
for part in source_email.walk():
if part.get_content_type() in ["text/plain", "text/html"]:
source_body = part.get_payload()
break
- with open(target_email_path, errors="ignore") as f:
- target_email = Parser().parse(f)
+ with open(target_email_path, errors="ignore") as file_handle:
+ target_email = Parser().parse(file_handle)
target_body = ""
for part in target_email.walk():
if part.get_content_type() in ["text/plain", "text/html"]:
break
if source_email['From'] != target_email['From']:
- raise EmailMismatch("Target message sender %s is too different from the source one %s" %
+ raise EmailMismatch("Target message sender %s is too different "
+ "from the source one %s" %
(target_email['From'], source_email['From']))
if source_email['To'] != target_email['To']:
- raise EmailMismatch("Target message recipient %s is too different from the source one %s" %
+ raise EmailMismatch("Target message recipient %s is too different "
+ "from the source one %s" %
(target_email['To'], source_email['To']))
if source_email['Subject'] != target_email['Subject']:
- raise EmailMismatch("Target message subject '%s' is too different from the source one '%s'" %
- (target_email['Subject'], source_email['Subject']))
+ raise EmailMismatch("Target message subject '%s' is too different "
+ "from the source one '%s'" %
+ (target_email['Subject'],
+ source_email['Subject']))
if source_email['Date'] != target_email['Date']:
- raise EmailMismatch("Target message date %s is too different from the source one %s" %
+ raise EmailMismatch("Target message date %s is too different from "
+ "the source one %s" %
(target_email['Date'], source_email['Date']))
if source_body != target_body:
- raise EmailMismatch("Target message body '%s' is too different from the source one '%s'" %
+ raise EmailMismatch("Target message body '%s' is too different "
+ "from the source one '%s'" %
(target_body, source_body))
- def _compare_emails_by_existence(self, source_email_path, target_email_path, tolerance=1):
- """Weak email validation based only on presence of file"""
+ def _compare_emails_by_existence(self, source_email_path,
+ target_email_path, tolerance=1):
+ """
+ Weak email validation based only on presence of file.
+
+ DOES NOT CHECK ANYTHING!
+ """
return True
arnied_wrapper.prep_cnf_value(email_file, value, regex=regex)
elif criterion == "received":
logging.debug("Updating test emails' Received header")
- with open(email_file, "r") as f:
- email_text = f.read()
+ with open(email_file, "r") as file_handle:
+ email_text = file_handle.read()
email_text = re.sub(regex, value, email_text)
email_text = re.sub(regex, value, email_text)
- with open(email_file, "w") as f:
- f.write(email_text)
+ with open(email_file, "w") as file_handle:
+ file_handle.write(email_text)
else:
- raise ValueError("Invalid header preparation criterion '%s'" % criterion)
+ raise ValueError("Invalid header preparation criterion '%s'"
+ % criterion)
def create_users(usernames, config_file, params):
:param usernames: usernames of the created users
:type usernames: [str]
- :param str config_file: template config file to use for each user configuration
+ :param str config_file: template config file to use for each user
+ configuration
:param params: template config file to use for each user configuration
:type params: {str, str}
- :raises: :py:class:`RuntimeError` if the user is already or cannot be created
+ :raises: :py:class:`RuntimeError` if the user exists already or cannot be
+ created
"""
log.info("Creating new cyrus users %s", ", ".join(usernames))
- cyrus_user_path = params.get("cyrus_user_path", "/datastore/imap-mails/user/")
+ cyrus_user_path = params.get("cyrus_user_path",
+ "/datastore/imap-mails/user/")
# check for existence round
for username in usernames:
for username in usernames:
params["user"] = '%i: "%s"' % (-1, username)
params["user_fullname"] = username
- params_regex = {"user": '%s,(-?\d+: ".*")'}
+ params_regex = {"user": r'%s,(-?\d+: ".*")'}
arnied_wrapper.set_cnf_semidynamic([config_file],
params, params_regex)
for username in usernames:
- if not os.path.exists(os.path.join(cyrus_user_path, username.replace(".", "^"))):
+ if not os.path.exists(os.path.join(cyrus_user_path,
+ username.replace(".", "^"))):
raise RuntimeError("The user %s could not be created" % username)
else:
log.info("Added new user %s", username)
:rtype: root message object (of class :py:class:`email.message.Message`)
Removes the SMTP envelope surrounding the email if present. Only left-over
- might be a line with a '.' at end of non-multipart messages if `headers_only`
- is False.
+ might be a line with a '.' at end of non-multipart messages if
+ `headers_only` is False.
"""
with open(file_name, 'r') as read_handle:
line = read_handle.readline()
def get_user_mail(user, mailbox='INBOX', **kwargs):
"""
- Iterate over mails in given folder of given user; yields parsed mails
+ Iterate over mails in given folder of given user; yields parsed mails.
:param str mailbox: name of mailbox to use, INBOX (default) for base folder;
name is modified using :py:func:`cyrus_escape`
full path to the message on disc, and the latter is the outcome
of :py:func:`parse_mail_file` for that file
"""
-
folder = os.path.join('/datastore', 'imap-mails', 'user', user)
if mailbox != 'INBOX':
folder = os.path.join(folder, cyrus_escape(mailbox))
def cyrus_escape(user_or_folder, keep_path=False, regex=False):
"""
- Converts names of users or mailbox folders to cyrus format.
+ Convert names of users or mailbox folders to cyrus format.
quite a hack, just does the following hard-coded replacements:
def get_filename(message, failobj=None, do_unwrap=True):
"""
- Get filename of a message part, even if it is base64-encoded
+ Get filename of a message part, even if it is base64-encoded.
For attachments with base64-encoded file name, the
:py:func:`email.message.Message.get_filename()` does not work. This function