1 # This Python file uses the following encoding: utf-8
3 # The software in this package is distributed under the GNU General
4 # Public License version 2 (with a special exception described below).
6 # A copy of GNU General Public License (GPL) is included in this distribution,
7 # in the file COPYING.GPL.
9 # As a special exception, if other files instantiate templates or use macros
10 # or inline functions from this file, or you compile this file and link it
11 # with other works to produce a work based on this file, this file
12 # does not by itself cause the resulting work to be covered
13 # by the GNU General Public License.
15 # However the source code for this file must still be made available
16 # in accordance with section (3) of the GNU General Public License.
18 # This exception does not invalidate any other reasons why a work based
19 # on this file might be covered by the GNU General Public License.
21 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
26 ------------------------------------------------------
27 Class :py:class:`MailValidator`, a fully-featured email sender and checker.
29 Copyright: Intra2net AG
33 ------------------------------------------------------
41 from inspect import currentframe
47 from email.mime.audio import MIMEAudio
48 from email.mime.base import MIMEBase
49 from email.mime.image import MIMEImage
50 from email.mime.multipart import MIMEMultipart
51 from email.mime.text import MIMEText
52 from email.encoders import encode_base64
53 from email.utils import formatdate
54 from email.parser import Parser
57 from . import arnied_wrapper
59 log = logging.getLogger('pyi2ncommon.mail_utils')
62 class EmailException(Exception):
63 """Base class for custom exceptions raised from `MailValidator`."""
68 class EmailNotFound(EmailException): # pylint: disable=missing-docstring
72 class InvalidEmailHeader(EmailException): # pylint: disable=missing-docstring
76 class InvalidEmailContent(EmailException): # pylint: disable=missing-docstring
80 class EmailIDError(EmailException): # pylint: disable=missing-docstring
84 class MismatchedEmailID(EmailIDError): # pylint: disable=missing-docstring
88 class MissingEmailID(EmailIDError): # pylint: disable=missing-docstring
92 class EmailMismatch(EmailException): # pylint: disable=missing-docstring
97 """Class for validation of emails."""
99 def target_path(self, new_value=None):
100 """Getter/Setter for property `target_path`."""
101 if new_value is not None:
102 self._target_path = new_value
104 return self._target_path
105 target_path = property(target_path, target_path)
107 def source_path(self, new_value=None):
108 """Getter/Setter for property `source_path`."""
109 if new_value is not None:
110 self._source_path = new_value
112 return self._source_path
113 source_path = property(source_path, source_path)
115 def smtp_sender(self, new_value=None):
116 """Getter/Setter for property `smtp_sender`."""
117 if new_value is not None:
118 self._smtp_sender = new_value
120 return self._smtp_sender
121 smtp_sender = property(smtp_sender, smtp_sender)
123 def compare_emails_method(self, method="basic"):
125 Set email comparison method for validation.
127 :param str method: one of "basic", "headers"
128 :raises: :py:class:`ValueError` if chosen method is invalid
130 if method == "basic":
131 self._compare_emails_method = self._default_compare_emails
132 elif method == "headers":
133 self._compare_emails_method = self._compare_emails_by_basic_headers
134 elif method == "existence":
135 self._compare_emails_method = self._compare_emails_by_existence
137 raise ValueError("Invalid email comparison method %s" % method)
138 compare_emails_method = property(fset=compare_emails_method)
140 def __init__(self, source_path, target_path):
142 Construct a validator instance.
144 :param str source_path: path to find source emails (not sent)
145 :param str target_path: path to find target emails (received)
147 .. note:: The comparison method can be redefined using the variety of
148 private method implementations.
150 self._target_path = target_path
151 self._source_path = source_path
152 self._smtp_sender = "no_source@inject.smtp"
153 self._compare_emails_method = self._default_compare_emails
155 def inject_emails(self, username, original_user):
157 Inject emails from `source_path` to `target_path`.
159 This uses the script *restore_mail_inject.pl* which injects the mails
160 using IMAP (as opposed to :py:meth:`inject_smtp`).
162 :param str username: username for the mail injection script
163 :param str original_user: original username for the mail injection
166 In order to restore acl rights as well put a mailbox.dump file in the
169 log.info("Injecting emails for user %s", username)
171 # inject emails from test data
172 cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \
173 " -s " + self.source_path
174 if original_user != "":
175 cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \
176 " -o " + original_user
178 result = subprocess.check_output(cmd, shell=True)
181 def _prepare_recipients(self, recipients):
183 Prepare recipient list: ensure list of proper addresses.
185 If given a simple string, make a list of strings out of it.
186 If any recipient is just a username, append "@" + localhost to it.
187 Also check that recipients are just email addresses.
189 hostname = socket.gethostname()
190 if isinstance(recipients, str):
191 recipients = [recipients, ]
193 for recipient in recipients:
195 result.append(recipient)
197 result.append(recipient + '@' + hostname)
198 for bad_char in '<>"\'':
199 if bad_char in recipient:
200 raise ValueError('Recipient must be a "raw" email address,'
201 ' not {!r}'.format(recipient))
204 def inject_smtp(self, usernames, emails):
206 Inject emails from `source_path` using python's SMTP library.
208 As opposed to :py:meth:`inject_emails`, this actually sends the mail
209 to the local mail server (meaning filtering, archiving, ... will
212 :param usernames: username(s) of the localhost receiver(s) for each
213 email or proper email address(es)
214 :type usernames: str or [str]
215 :param emails: paths to files including full emails (header + body)
216 to be sent to each user
219 recipients = self._prepare_recipients(usernames)
220 log.info("Sending emails to %s", ','.join(recipients))
221 with smtplib.SMTP('localhost') as server:
223 log.info("Sending email %s", email)
224 with open(os.path.join(self.source_path, email), 'rb') \
226 email_content = file_handle.read()
227 server.sendmail(self.smtp_sender, recipients, email_content)
229 # Wait till SMTP queue is processed
230 arnied_wrapper.wait_for_email_transfer()
232 def verify_email_id(self, email, emails_list, timeout, in_target=True):
234 Verify that the id of an email is present in a list.
236 Returns that email's match in this list.
238 :param str email: email filename
239 :param emails_list: email among which the first email has to be found
240 :type emails_list: [str]
241 :param int timeout: timeout for extracting the source and target emails
242 :param bool in_target: whether the verified email is on the target side
244 If `in_target` is set to True we are getting the target id from the
245 target list of a source email. Otherwise, we assume a target email from
249 email = self._extract_email_paths(self.source_path, [email],
251 emails_list = self._extract_email_paths(self.target_path,
252 emails_list, timeout)
254 email = self._extract_email_paths(self.target_path, [email],
256 emails_list = self._extract_email_paths(self.source_path,
257 emails_list, timeout)
259 email_id = self._extract_message_id(email)
260 match = self._find_message_with_id(email_id, emails_list)
261 return os.path.basename(match)
263 def verify_emails(self, source_emails, target_emails, timeout):
265 Check injected e-mails for a user.
267 :param source_emails: emails at the source location
268 :type source_emails: [str]
269 :param target_emails: emails at the target (server) location
270 :type target_emails: [str]
271 :param int timeout: timeout for extracting the source and target emails
272 :raises: :py:class:`EmailNotFound` if target email is not found on
275 source_paths = self._extract_email_paths(self.source_path,
276 source_emails, timeout)
277 target_paths = self._extract_email_paths(self.target_path,
278 target_emails, timeout)
280 log.info("Verifying emails at %s with %s", self.target_path,
282 for target in target_paths:
283 log.info("Verifying email %s", target)
284 target_id = self._extract_message_id(target)
285 source = self._find_message_with_id(target_id, source_paths)
286 source_paths.remove(source)
287 self._compare_emails_method(target, source, 1)
289 if len(source_paths) > 0:
290 raise EmailNotFound("%s target mails could not be found on server."
292 % (len(source_paths), "\n".join(source_paths)))
294 log.info("All e-mails at %s verified!", self.target_path)
296 def assert_header(self, emails, header, present_values=None,
297 absent_values=None, timeout=30):
299 Check headers for present and missing strings in a list of messages.
301 :param emails: emails whose headers will be checked
303 :param str header: header that will be validated for each email
304 :param present_values: strings that have to be present in the header
305 :type present_values: [str] or None
306 :param absent_values: strings that have to be absent in the header
307 :type absent_values: [str] or None
308 :param int timeout: timeout for extracting the source and target emails
309 :raises: :py:class:`InvalidEmailHeader` if email header is not valid
311 Every list of present and respectively absent values contains
312 alternative values. At least one of present and one of absent should be
315 target_paths = self._extract_email_paths(self.target_path, emails,
317 for email_path in target_paths:
318 with open(email_path, "r") as email_file:
319 verified_email = Parser().parse(email_file, headersonly=True)
320 log.debug("Extracted email headers:\n%s", verified_email)
322 log.info("Checking header '%s' in %s", header, email_path)
323 if not present_values:
326 log.info("for present '%s'", "', '".join(present_values))
327 if not absent_values:
330 log.info("for absent '%s'", "', '".join(absent_values))
331 present_valid = False
332 for present in present_values:
333 if present in verified_email[header]:
336 for absent in absent_values:
337 if absent not in verified_email[header]:
340 if not present_valid and len(present_values) > 0:
341 raise InvalidEmailHeader("Message header '%s' in %s is not "
343 % (header, email_path,
344 verified_email[header]))
345 if not absent_valid and len(absent_values) > 0:
346 raise InvalidEmailHeader("Message header '%s' in %s is not "
348 % (header, email_path,
349 verified_email[header]))
350 log.info("Message header '%s' in %s is valid!", header, email_path)
352 def assert_content(self, emails, content_type, present_values=None,
353 absent_values=None, timeout=30):
355 Check headers for present/missing strings in a list of messages.
357 :param emails: emails whose content will be checked
359 :param str content_type: type of the content that will be checked for
361 :param present_values: strings that have to be present in the content
362 :type present_values: [str] or None
363 :param absent_values: strings that have to be absent in the content
364 :type absent_values: [str] or None
365 :param int timeout: timeout for extracting the source and target emails
366 :raises: :py:class:`InvalidEmailContent` if email content is not valid
368 Every list of present and respectively absent values contains
369 alternative values. At least one of present and one of absent should be
372 target_paths = self._extract_email_paths(self.target_path, emails,
374 for email_path in target_paths:
375 with open(email_path, "r") as email_file:
376 verified_email = Parser().parse(email_file)
377 log.debug("Extracted email content:\n%s", verified_email)
379 for part in verified_email.walk():
380 log.debug("Extracted %s part while looking for %s",
381 part.get_content_type(), content_type)
382 if part.get_content_type() == content_type:
383 content = part.get_payload(decode=True)
384 if isinstance(content, bytes):
385 content = content.decode()
386 # NOTE: only one such element is expected
389 log.info("Checking content '%s' in %s", content_type, email_path)
390 if not present_values:
393 log.info("for present '%s'", "', '".join(present_values))
394 if not absent_values:
397 log.info("for absent '%s'", "', '".join(absent_values))
398 present_valid = False
399 for present in present_values:
400 if present in content:
403 for absent in absent_values:
404 if absent not in content:
407 if not present_valid and len(present_values) > 0:
408 raise InvalidEmailContent("Message content '%s' in %s is not "
410 % (content_type, email_path, content))
411 if not absent_valid and len(absent_values) > 0:
412 raise InvalidEmailContent("Message content '%s' in %s is not "
414 % (content_type, email_path, content))
415 log.info("Message content '%s' in %s is valid!",
416 content_type, email_path)
418 def send_email_with_files(self, usernames, file_list,
419 wait_for_transfer=True,
420 autotest_signature=None,
421 subject="my subject"):
423 Send a generated email with optional attachments.
425 :param usernames: username(s) of the localhost receiver(s) or proper
427 :type usernames: str or [str]
428 :param file_list: files attached to an email; can be empty
429 :type file_list: [str]
430 :param wait_for_transfer: specify whether to wait until arnied_wrapper
431 confirms email transfer; you can also specify
432 a fixed timeout (seconds)
433 :type wait_for_transfer: bool or int
434 :param autotest_signature: text to insert as value for header
435 X-Autotest-Signature for simpler recognition
436 of mail (if None do not add header)
437 :type autotest_signature: str or None
438 :param str subject: Subject of created mails
440 text = 'This is an autogenerated email.\n'
442 recipients = self._prepare_recipients(usernames)
444 if file_list: # empty or None or so
445 msg = MIMEMultipart() # pylint: disable=redefined-variable-type
446 msg.attach(MIMEText(text, _charset='utf-8'))
448 msg = MIMEText(text, _charset='utf-8') # pylint: disable=redefined-variable-type
449 msg['From'] = self.smtp_sender
450 msg['To'] = ', '.join(recipients)
451 msg['Subject'] = subject
452 msg['Date'] = formatdate(localtime=True)
453 msg.preamble = 'This is a multi-part message in MIME format.\n'
454 msg.add_header('X-Autotest-Creator',
455 self.__class__.__module__ + '.' +
456 self.__class__.__name__ + '.' +
457 currentframe().f_code.co_name)
458 # (with help from http://stackoverflow.com/questions/5067604/determine-
459 # function-name-from-within-that-function-without-using-traceback)
460 if autotest_signature:
461 msg.add_header('X-Autotest-Signature', autotest_signature)
464 for filename in file_list:
465 fullpath = os.path.join(self.source_path, filename)
467 # Guess the content type based on the file's extension. Encoding
468 # will be ignored, although we should check for simple things like
469 # gzip'd or compressed files.
470 ctype, encoding = mimetypes.guess_type(fullpath)
471 if ctype is None or encoding is not None:
472 # No guess could be made, or the file is encoded (compressed),
473 # so use a generic bag-of-bits type.
474 ctype = 'application/octet-stream'
476 maintype, subtype = ctype.split('/', 1)
477 log.debug("Creating message containing file {} of mime type {}"
478 .format(filename, ctype))
480 if maintype == 'text':
481 with open(fullpath, 'rt') as file_handle:
482 # Note: we should handle calculating the charset
483 part = MIMEText(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
484 elif maintype == 'image':
485 with open(fullpath, 'rb') as file_handle:
486 part = MIMEImage(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
487 elif maintype == 'audio':
488 with open(fullpath, 'rb') as file_handle:
489 part = MIMEAudio(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
491 part = MIMEBase(maintype, subtype) # pylint:disable=redefined-variable-type
492 with open(fullpath, 'rb') as file_handle:
493 part.set_payload(file_handle.read())
494 # Encode the payload using Base64
496 # Set the filename parameter
497 part.add_header('Content-Disposition', 'attachment',
501 log.debug("Message successfully created")
504 log.debug("Sending message from %s to %s"
505 % (self.smtp_sender, ', '.join(recipients)))
506 with smtplib.SMTP('localhost') as server:
507 server.sendmail(self.smtp_sender, recipients, msg.as_string())
509 # wait for transfer; complicated by isinstance(False, int) == True
510 if wait_for_transfer is False:
512 elif wait_for_transfer is True:
513 arnied_wrapper.wait_for_email_transfer()
515 arnied_wrapper.wait_for_email_transfer(timeout=wait_for_transfer)
517 def _extract_email_paths(self, path, emails, timeout):
518 """Check and return the absolute paths of a list of emails."""
519 log.debug("Extracting messages %s", emails)
521 emails = os.listdir(path)
523 for expected_email in emails:
524 # TODO: this can be improved by matching the emails themselves
525 if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index",
526 "Entw&APw-rfe", "Gesendete Elemente",
527 "Gel&APY-schte Elemente", "mailboxes.dump",
530 email_path = os.path.join(path, expected_email)
531 for i in range(timeout):
532 if os.path.isfile(email_path):
533 email_paths.append(email_path)
535 elif i == timeout - 1:
536 raise EmailNotFound("Target message %s could not be found "
537 "on server at %s within %ss"
538 % (expected_email, path, timeout))
540 log.debug("%s mails extracted at %s.", len(email_paths), path)
543 def _find_message_with_id(self, message_id, message_paths):
544 """Find message with id among a list of message paths."""
545 log.debug("Looking for a match for the message with id %s", message_id)
546 for message_path in message_paths:
547 extracted_id = self._extract_message_id(message_path)
548 log.debug("Extracted id %s from candidate %s", extracted_id,
550 if message_id == extracted_id:
551 log.debug("Found match at %s", message_path)
553 raise MismatchedEmailID("The message with id %s could not be matched "
554 "or wasn't expected among %s"
555 % (message_id, ", ".join(message_paths)))
557 def _extract_message_id(self, message_path):
559 Given a message file path extract the Message-ID.
561 :raises: :py:class:`MissingEmailID` if no Message-ID was found.
564 with open(message_path, errors='ignore') as file_handle:
565 content = file_handle.read()
566 for line in content.split("\n"):
567 match_id = re.match("Autotest-Message-ID: (.+)", line)
568 if match_id is not None:
569 message_id = match_id.group(1).rstrip('\r\n')
571 raise MissingEmailID(f"No id was found in target message {message_path}, "
572 f"so it cannot be properly matched")
575 def _default_compare_emails(self, source_email_path, target_email_path,
578 Compare target emails with source ones.
580 Uses python provided diff functionality to compare complete mail files.
582 with open(source_email_path, "r") as source_email_file:
583 source_email = source_email_file.read()
584 with open(target_email_path, "r") as target_email_file:
585 target_email = target_email_file.read()
586 matcher = difflib.SequenceMatcher(None, source_email, target_email)
587 diffratio = matcher.ratio()
588 log.debug("Target message comparison ratio is %s.", diffratio)
589 # log.info("%s $$$ %s", source_email, target_email)
590 if diffratio < tolerance:
591 raise EmailMismatch("Target message is too different from the "
592 "source (difference %s < tolerance %s).",
593 diffratio, tolerance)
595 def _compare_emails_by_basic_headers(self, source_email_path,
596 target_email_path, tolerance=1):
598 Compare target emails with source ones.
600 Uses python provided diff functionality to compare headers and mail
603 Argument `tolerance` not used!
605 with open(source_email_path, errors="ignore") as file_handle:
606 source_email = Parser().parse(file_handle)
608 for part in source_email.walk():
609 if part.get_content_type() in ["text/plain", "text/html"]:
610 source_body = part.get_payload()
613 with open(target_email_path, errors="ignore") as file_handle:
614 target_email = Parser().parse(file_handle)
616 for part in target_email.walk():
617 if part.get_content_type() in ["text/plain", "text/html"]:
618 target_body = part.get_payload()
621 if source_email['From'] != target_email['From']:
622 raise EmailMismatch("Target message sender %s is too different "
623 "from the source one %s" %
624 (target_email['From'], source_email['From']))
625 if source_email['To'] != target_email['To']:
626 raise EmailMismatch("Target message recipient %s is too different "
627 "from the source one %s" %
628 (target_email['To'], source_email['To']))
629 if source_email['Subject'] != target_email['Subject']:
630 raise EmailMismatch("Target message subject '%s' is too different "
631 "from the source one '%s'" %
632 (target_email['Subject'],
633 source_email['Subject']))
634 if source_email['Date'] != target_email['Date']:
635 raise EmailMismatch("Target message date %s is too different from "
636 "the source one %s" %
637 (target_email['Date'], source_email['Date']))
638 if source_body != target_body:
639 raise EmailMismatch("Target message body '%s' is too different "
640 "from the source one '%s'" %
641 (target_body, source_body))
643 def _compare_emails_by_existence(self, source_email_path,
644 target_email_path, tolerance=1):
646 Weak email validation based only on presence of file.
648 DOES NOT CHECK ANYTHING!