1 # This Python file uses the following encoding: utf-8
3 # The software in this package is distributed under the GNU General
4 # Public License version 2 (with a special exception described below).
6 # A copy of GNU General Public License (GPL) is included in this distribution,
7 # in the file COPYING.GPL.
9 # As a special exception, if other files instantiate templates or use macros
10 # or inline functions from this file, or you compile this file and link it
11 # with other works to produce a work based on this file, this file
12 # does not by itself cause the resulting work to be covered
13 # by the GNU General Public License.
15 # However the source code for this file must still be made available
16 # in accordance with section (3) of the GNU General Public License.
18 # This exception does not invalidate any other reasons why a work based
19 # on this file might be covered by the GNU General Public License.
21 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
26 ------------------------------------------------------
27 Class :py:class:`MailValidator`, a fully-featured email sender and checker.
29 Copyright: Intra2net AG
33 ------------------------------------------------------
41 from inspect import currentframe
47 from email.mime.audio import MIMEAudio
48 from email.mime.base import MIMEBase
49 from email.mime.image import MIMEImage
50 from email.mime.multipart import MIMEMultipart
51 from email.mime.text import MIMEText
52 from email.encoders import encode_base64
53 from email.utils import formatdate
54 from email.parser import Parser
57 from . import arnied_wrapper
59 log = logging.getLogger('pyi2ncommon.mail_utils')
62 class EmailException(Exception):
63 """Base class for custom exceptions raised from `MailValidator`."""
68 class EmailNotFound(EmailException): # pylint: disable=missing-docstring
72 class InvalidEmailHeader(EmailException): # pylint: disable=missing-docstring
76 class InvalidEmailContent(EmailException): # pylint: disable=missing-docstring
80 class EmailIDError(EmailException): # pylint: disable=missing-docstring
84 class MismatchedEmailID(EmailIDError): # pylint: disable=missing-docstring
88 class MissingEmailID(EmailIDError): # pylint: disable=missing-docstring
92 class EmailMismatch(EmailException): # pylint: disable=missing-docstring
96 class MailValidator():
97 """Class for validation of emails."""
99 def target_path(self, new_value=None):
100 """Getter/Setter for property `target_path`."""
101 if new_value is not None:
102 self._target_path = new_value
104 return self._target_path
105 target_path = property(target_path, target_path)
107 def source_path(self, new_value=None):
108 """Getter/Setter for property `source_path`."""
109 if new_value is not None:
110 self._source_path = new_value
112 return self._source_path
113 source_path = property(source_path, source_path)
115 def smtp_sender(self, new_value=None):
116 """Getter/Setter for property `smtp_sender`."""
117 if new_value is not None:
118 self._smtp_sender = new_value
120 return self._smtp_sender
121 smtp_sender = property(smtp_sender, smtp_sender)
123 def compare_emails_method(self, method="basic"):
125 Set email comparison method for validation.
127 :param str method: one of "basic", "headers"
128 :raises: :py:class:`ValueError` if chosen method is invalid
130 if method == "basic":
131 self._compare_emails_method = self._default_compare_emails
132 elif method == "headers":
133 self._compare_emails_method = self._compare_emails_by_basic_headers
134 elif method == "existence":
135 self._compare_emails_method = self._compare_emails_by_existence
137 raise ValueError("Invalid email comparison method %s" % method)
138 compare_emails_method = property(fset=compare_emails_method)
140 def __init__(self, source_path, target_path):
142 Construct a validator instance.
144 :param str source_path: path to find source emails (not sent)
145 :param str target_path: path to find target emails (received)
147 .. note:: The comparison method can be redefined using the variety of
148 private method implementations.
150 self._target_path = target_path
151 self._source_path = source_path
152 self._smtp_sender = "no_source@inject.smtp"
153 self._compare_emails_method = self._default_compare_emails
155 def inject_emails(self, username, original_user):
157 Inject emails from `source_path` to `target_path`.
159 This uses the script *restore_mail_inject.pl* which injects the mails
160 using IMAP (as opposed to :py:meth:`inject_smtp`).
162 :param str username: username for the mail injection script
163 :param str original_user: original username for the mail injection
166 In order to restore acl rights as well put a mailbox.dump file in the
169 log.info("Injecting emails for user %s", username)
171 # inject emails from test data
172 cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \
173 " -s " + self.source_path
174 if original_user != "":
175 cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \
176 " -o " + original_user
178 result = subprocess.check_output(cmd, shell=True)
181 def _prepare_recipients(self, recipients):
183 Prepare recipient list: ensure list of proper addresses.
185 If given a simple string, make a list of strings out of it.
186 If any recipient is just a username, append "@" + localhost to it.
187 Also check that recipients are just email addresses.
189 hostname = socket.gethostname()
190 if isinstance(recipients, str):
191 recipients = [recipients, ]
193 for recipient in recipients:
195 result.append(recipient)
197 result.append(recipient + '@' + hostname)
198 for bad_char in '<>"\'':
199 if bad_char in recipient:
200 raise ValueError('Recipient must be a "raw" email address,'
201 ' not {!r}'.format(recipient))
204 def inject_smtp(self, usernames, emails):
206 Inject emails from `source_path` using python's SMTP library.
208 As opposed to :py:meth:`inject_emails`, this actually sends the mail
209 to the local mail server (meaning filtering, archiving, ... will
212 :param usernames: username(s) of the localhost receiver(s) for each
213 email or proper email address(es)
214 :type usernames: str or [str]
215 :param emails: paths to files including full emails (header + body)
216 to be sent to each user
219 recipients = self._prepare_recipients(usernames)
220 log.info("Sending emails to %s", ','.join(recipients))
221 with smtplib.SMTP('localhost') as server:
223 log.info("Sending email %s", email)
224 with open(os.path.join(self.source_path, email), 'rb') \
226 email_content = file_handle.read()
227 server.sendmail(self.smtp_sender, recipients, email_content)
229 # Wait till SMTP queue is processed
230 arnied_wrapper.wait_for_email_transfer()
232 def verify_email_id(self, email, emails_list, timeout, in_target=True):
234 Verify that the id of an email is present in a list.
236 Returns that email's match in this list.
238 :param str email: email filename
239 :param emails_list: email among which the first email has to be found
240 :type emails_list: [str]
241 :param int timeout: timeout for extracting the source and target emails
242 :param bool in_target: whether the verified email is on the target side
244 If `in_target` is set to True we are getting the target id from the
245 target list of a source email. Otherwise we assume a target email from
249 email = self._extract_email_paths(self.source_path, [email],
251 emails_list = self._extract_email_paths(self.target_path,
252 emails_list, timeout)
254 email = self._extract_email_paths(self.target_path, [email],
256 emails_list = self._extract_email_paths(self.source_path,
257 emails_list, timeout)
259 email_id = self._extract_message_id(email)
260 match = self._find_message_with_id(email_id, emails_list)
261 return os.path.basename(match)
263 def verify_emails(self, source_emails, target_emails, timeout):
265 Check injected e-mails for a user.
267 :param source_emails: emails at the source location
268 :type source_emails: [str]
269 :param target_emails: emails at the target (server) location
270 :type target_emails: [str]
271 :param int timeout: timeout for extracting the source and target emails
272 :raises: :py:class:`EmailNotFound` if target email is not found on
275 source_paths = self._extract_email_paths(self.source_path,
276 source_emails, timeout)
277 target_paths = self._extract_email_paths(self.target_path,
278 target_emails, timeout)
280 log.info("Verifying emails at %s with %s", self.target_path,
282 for target in target_paths:
283 log.info("Verifying email %s", target)
284 target_id = self._extract_message_id(target)
285 source = self._find_message_with_id(target_id, source_paths)
286 source_paths.remove(source)
287 self._compare_emails_method(target, source, 1)
289 if len(source_paths) > 0:
290 raise EmailNotFound("%s target mails could not be found on server."
292 % (len(source_paths), "\n".join(source_paths)))
294 log.info("All e-mails at %s verified!", self.target_path)
296 def assert_header(self, emails, header, present_values=None,
297 absent_values=None, timeout=30):
299 Check headers for present and missing strings in a list of messages.
301 :param emails: emails whose headers will be checked
303 :param str header: header that will be validated for each email
304 :param present_values: strings that have to be present in the header
305 :type present_values: [str] or None
306 :param absent_values: strings that have to be absent in the header
307 :type absent_values: [str] or None
308 :param int timeout: timeout for extracting the source and target emails
309 :raises: :py:class:`InvalidEmailHeader` if email header is not valid
311 Every list of present and respectively absent values contains
312 alternative values. At least one of present and one of absent should be
315 target_paths = self._extract_email_paths(self.target_path, emails,
317 for email_path in target_paths:
318 with open(email_path, "r") as email_file:
319 verified_email = Parser().parse(email_file, headersonly=True)
320 log.debug("Extracted email headers:\n%s", verified_email)
322 log.info("Checking header '%s' in %s", header, email_path)
323 if not present_values:
326 log.info("for present '%s'", "', '".join(present_values))
327 if not absent_values:
330 log.info("for absent '%s'", "', '".join(absent_values))
331 present_valid = False
332 for present in present_values:
333 if present in verified_email[header]:
336 for absent in absent_values:
337 if absent not in verified_email[header]:
340 if not present_valid and len(present_values) > 0:
341 raise InvalidEmailHeader("Message header '%s' in %s is not "
343 % (header, email_path,
344 verified_email[header]))
345 if not absent_valid and len(absent_values) > 0:
346 raise InvalidEmailHeader("Message header '%s' in %s is not "
348 % (header, email_path,
349 verified_email[header]))
350 log.info("Message header '%s' in %s is valid!", header, email_path)
352 def assert_content(self, emails, content_type, present_values=None,
353 absent_values=None, timeout=30):
355 Check headers for present/missing strings in a list of messages.
357 :param emails: emails whose content will be checked
359 :param str content_type: type of the content that will be checked for
361 :param present_values: strings that have to be present in the content
362 :type present_values: [str] or None
363 :param absent_values: strings that have to be absent in the content
364 :type absent_values: [str] or None
365 :param int timeout: timeout for extracting the source and target emails
366 :raises: :py:class:`InvalidEmailContent` if email content is not valid
368 Every list of present and respectively absent values contains
369 alternative values. At least one of present and one of absent should be
372 target_paths = self._extract_email_paths(self.target_path, emails,
374 for email_path in target_paths:
375 with open(email_path, "r") as email_file:
376 verified_email = Parser().parse(email_file)
377 log.debug("Extracted email content:\n%s", verified_email)
379 for part in verified_email.walk():
380 log.debug("Extracted %s part while looking for %s",
381 part.get_content_type(), content_type)
382 if part.get_content_type() == content_type:
383 content = part.get_payload(decode=True)
384 if isinstance(content, bytes):
385 content = content.decode()
386 # NOTE: only one such element is expected
389 log.info("Checking content '%s' in %s", content_type, email_path)
390 if not present_values:
393 log.info("for present '%s'", "', '".join(present_values))
394 if not absent_values:
397 log.info("for absent '%s'", "', '".join(absent_values))
398 present_valid = False
399 for present in present_values:
400 if present in content:
403 for absent in absent_values:
404 if absent not in content:
407 if not present_valid and len(present_values) > 0:
408 raise InvalidEmailContent("Message content '%s' in %s is not "
410 % (content_type, email_path, content))
411 if not absent_valid and len(absent_values) > 0:
412 raise InvalidEmailContent("Message content '%s' in %s is not "
414 % (content_type, email_path, content))
415 log.info("Message content '%s' in %s is valid!",
416 content_type, email_path)
418 def send_email_with_files(self, usernames, file_list,
419 wait_for_transfer=True,
420 autotest_signature=None,
421 subject="my subject"):
423 Send a generated email with optional attachments.
425 :param usernames: username(s) of the localhost receiver(s) or proper
427 :type usernames: str or [str]
428 :param file_list: files attached to an email; can be empty
429 :type file_list: [str]
430 :param wait_for_transfer: specify whether to wait until arnied_wrapper
431 confirms email transfer; you can also specify
432 a fixed timeout (seconds)
433 :type wait_for_transfer: bool or int
434 :param autotest_signature: text to insert as value for header
435 X-Autotest-Signature for simpler recognition
436 of mail (if None do not add header)
437 :type autotest_signature: str or None
439 text = 'This is an autogenerated email.\n'
441 recipients = self._prepare_recipients(usernames)
443 if file_list: # empty or None or so
444 msg = MIMEMultipart() # pylint: disable=redefined-variable-type
445 msg.attach(MIMEText(text, _charset='utf-8'))
447 msg = MIMEText(text, _charset='utf-8') # pylint: disable=redefined-variable-type
448 msg['From'] = self.smtp_sender
449 msg['To'] = ', '.join(recipients)
450 msg['Subject'] = subject
451 msg['Date'] = formatdate(localtime=True)
452 msg.preamble = 'This is a multi-part message in MIME format.\n'
453 msg.add_header('X-Autotest-Creator',
454 self.__class__.__module__ + '.' +
455 self.__class__.__name__ + '.' +
456 currentframe().f_code.co_name)
457 # (with help from http://stackoverflow.com/questions/5067604/determine-
458 # function-name-from-within-that-function-without-using-traceback)
459 if autotest_signature:
460 msg.add_header('X-Autotest-Signature', autotest_signature)
463 for filename in file_list:
464 fullpath = os.path.join(self.source_path, filename)
466 # Guess the content type based on the file's extension. Encoding
467 # will be ignored, although we should check for simple things like
468 # gzip'd or compressed files.
469 ctype, encoding = mimetypes.guess_type(fullpath)
470 if ctype is None or encoding is not None:
471 # No guess could be made, or the file is encoded (compressed),
472 # so use a generic bag-of-bits type.
473 ctype = 'application/octet-stream'
475 maintype, subtype = ctype.split('/', 1)
476 log.debug("Creating message containing file {} of mime type {}"
477 .format(filename, ctype))
479 if maintype == 'text':
480 with open(fullpath, 'rt') as file_handle:
481 # Note: we should handle calculating the charset
482 part = MIMEText(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
483 elif maintype == 'image':
484 with open(fullpath, 'rb') as file_handle:
485 part = MIMEImage(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
486 elif maintype == 'audio':
487 with open(fullpath, 'rb') as file_handle:
488 part = MIMEAudio(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
490 part = MIMEBase(maintype, subtype) # pylint:disable=redefined-variable-type
491 with open(fullpath, 'rb') as file_handle:
492 part.set_payload(file_handle.read())
493 # Encode the payload using Base64
495 # Set the filename parameter
496 part.add_header('Content-Disposition', 'attachment',
500 log.debug("Message successfully created")
503 log.debug("Sending message from %s to %s"
504 % (self.smtp_sender, ', '.join(recipients)))
505 with smtplib.SMTP('localhost') as server:
506 server.sendmail(self.smtp_sender, recipients, msg.as_string())
508 # wait for transfer; complicated by isinstance(False, int) == True
509 if wait_for_transfer is False:
511 elif wait_for_transfer is True:
512 arnied_wrapper.wait_for_email_transfer()
514 arnied_wrapper.wait_for_email_transfer(timeout=wait_for_transfer)
516 def _extract_email_paths(self, path, emails, timeout):
517 """Check and return the absolute paths of a list of emails."""
518 log.debug("Extracting messages %s", emails)
520 emails = os.listdir(path)
522 for expected_email in emails:
523 # TODO: this can be improved by matching the emails themselves
524 if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index",
525 "Entw&APw-rfe", "Gesendete Objekte",
526 "Gel&APY-schte Elemente", "mailboxes.dump",
529 email_path = os.path.join(path, expected_email)
530 for i in range(timeout):
531 if os.path.isfile(email_path):
532 email_paths.append(email_path)
534 elif i == timeout - 1:
535 raise EmailNotFound("Target message %s could not be found "
536 "on server at %s within %ss"
537 % (expected_email, path, timeout))
539 log.debug("%s mails extracted at %s.", len(email_paths), path)
542 def _find_message_with_id(self, message_id, message_paths):
543 """Find message with id among a list of message paths."""
544 log.debug("Looking for a match for the message with id %s", message_id)
545 for message_path in message_paths:
546 extracted_id = self._extract_message_id(message_path)
547 log.debug("Extracted id %s from candidate %s", extracted_id,
549 if message_id == extracted_id:
550 log.debug("Found match at %s", message_path)
552 raise MismatchedEmailID("The message with id %s could not be matched "
553 "or wasn't expected among %s"
554 % (message_id, ", ".join(message_paths)))
556 def _extract_message_id(self, message_path):
558 Given a message file path extract the Message-ID.
560 :raises: :py:class:`MissingEmailID` if no Message-ID was found.
563 with open(message_path, errors='ignore') as file_handle:
564 content = file_handle.read()
565 for line in content.split("\n"):
566 match_id = re.match("Autotest-Message-ID: (.+)", line)
567 if match_id is not None:
568 message_id = match_id.group(1).rstrip('\r\n')
570 raise MissingEmailID("No id was found in target message %s so it "
571 "cannot be properly matched"
575 def _default_compare_emails(self, source_email_path, target_email_path,
578 Compare target emails with source ones.
580 Uses python provided diff functionality to compare complete mail files.
582 with open(source_email_path, "r") as source_email_file:
583 source_email = source_email_file.read()
584 with open(target_email_path, "r") as target_email_file:
585 target_email = target_email_file.read()
586 matcher = difflib.SequenceMatcher(None, source_email, target_email)
587 diffratio = matcher.ratio()
588 log.debug("Target message comparison ratio is %s.", diffratio)
589 # log.info("%s $$$ %s", source_email, target_email)
590 if diffratio < tolerance:
591 raise EmailMismatch("Target message is too different from the "
592 "source (difference %s < tolerance %s).",
593 diffratio, tolerance)
595 def _compare_emails_by_basic_headers(self, source_email_path,
596 target_email_path, tolerance=1):
598 Compare target emails with source ones.
600 Uses python provided diff functionality to compare headers and mail
603 Argument `tolerance` not used!
605 with open(source_email_path, errors="ignore") as file_handle:
606 source_email = Parser().parse(file_handle)
608 for part in source_email.walk():
609 if part.get_content_type() in ["text/plain", "text/html"]:
610 source_body = part.get_payload()
613 with open(target_email_path, errors="ignore") as file_handle:
614 target_email = Parser().parse(file_handle)
616 for part in target_email.walk():
617 if part.get_content_type() in ["text/plain", "text/html"]:
618 target_body = part.get_payload()
621 if source_email['From'] != target_email['From']:
622 raise EmailMismatch("Target message sender %s is too different "
623 "from the source one %s" %
624 (target_email['From'], source_email['From']))
625 if source_email['To'] != target_email['To']:
626 raise EmailMismatch("Target message recipient %s is too different "
627 "from the source one %s" %
628 (target_email['To'], source_email['To']))
629 if source_email['Subject'] != target_email['Subject']:
630 raise EmailMismatch("Target message subject '%s' is too different "
631 "from the source one '%s'" %
632 (target_email['Subject'],
633 source_email['Subject']))
634 if source_email['Date'] != target_email['Date']:
635 raise EmailMismatch("Target message date %s is too different from "
636 "the source one %s" %
637 (target_email['Date'], source_email['Date']))
638 if source_body != target_body:
639 raise EmailMismatch("Target message body '%s' is too different "
640 "from the source one '%s'" %
641 (target_body, source_body))
643 def _compare_emails_by_existence(self, source_email_path,
644 target_email_path, tolerance=1):
646 Weak email validation based only on presence of file.
648 DOES NOT CHECK ANYTHING!