Remove api doc headers
[pyi2ncommon] / src / mail_validator.py
1 # This Python file uses the following encoding: utf-8
2
3 # The software in this package is distributed under the GNU General
4 # Public License version 2 (with a special exception described below).
5 #
6 # A copy of GNU General Public License (GPL) is included in this distribution,
7 # in the file COPYING.GPL.
8 #
9 # As a special exception, if other files instantiate templates or use macros
10 # or inline functions from this file, or you compile this file and link it
11 # with other works to produce a work based on this file, this file
12 # does not by itself cause the resulting work to be covered
13 # by the GNU General Public License.
14 #
15 # However the source code for this file must still be made available
16 # in accordance with section (3) of the GNU General Public License.
17 #
18 # This exception does not invalidate any other reasons why a work based
19 # on this file might be covered by the GNU General Public License.
20 #
21 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
22
23 """
24 Class :py:class:`MailValidator`, a fully-featured email sender and checker.
25
26 Copyright: Intra2net AG
27 """
28
29 import time
30 import os
31 import difflib
32 import socket
33 from inspect import currentframe
34 import re
35 import subprocess
36 import logging
37
38 import smtplib
39 from email.mime.audio import MIMEAudio
40 from email.mime.base import MIMEBase
41 from email.mime.image import MIMEImage
42 from email.mime.multipart import MIMEMultipart
43 from email.mime.text import MIMEText
44 from email.encoders import encode_base64
45 from email.utils import formatdate
46 from email.parser import Parser
47 import mimetypes
48
49 from . import arnied_wrapper
50
51 log = logging.getLogger('pyi2ncommon.mail_utils')
52
53
54 class EmailException(Exception):
55     """Base class for custom exceptions raised from `MailValidator`."""
56
57     pass
58
59
60 class EmailNotFound(EmailException):        # pylint: disable=missing-docstring
61     pass
62
63
64 class InvalidEmailHeader(EmailException):   # pylint: disable=missing-docstring
65     pass
66
67
68 class InvalidEmailContent(EmailException):  # pylint: disable=missing-docstring
69     pass
70
71
72 class EmailIDError(EmailException):         # pylint: disable=missing-docstring
73     pass
74
75
76 class MismatchedEmailID(EmailIDError):      # pylint: disable=missing-docstring
77     pass
78
79
80 class MissingEmailID(EmailIDError):         # pylint: disable=missing-docstring
81     pass
82
83
84 class EmailMismatch(EmailException):        # pylint: disable=missing-docstring
85     pass
86
87
88 class MailValidator:
89     """Class for validation of emails."""
90
91     def target_path(self, new_value=None):
92         """Getter/Setter for property `target_path`."""
93         if new_value is not None:
94             self._target_path = new_value
95         else:
96             return self._target_path
97     target_path = property(target_path, target_path)
98
99     def source_path(self, new_value=None):
100         """Getter/Setter for property `source_path`."""
101         if new_value is not None:
102             self._source_path = new_value
103         else:
104             return self._source_path
105     source_path = property(source_path, source_path)
106
107     def smtp_sender(self, new_value=None):
108         """Getter/Setter for property `smtp_sender`."""
109         if new_value is not None:
110             self._smtp_sender = new_value
111         else:
112             return self._smtp_sender
113     smtp_sender = property(smtp_sender, smtp_sender)
114
115     def compare_emails_method(self, method="basic"):
116         """
117         Set email comparison method for validation.
118
119         :param str method: one of "basic", "headers"
120         :raises: :py:class:`ValueError` if chosen method is invalid
121         """
122         if method == "basic":
123             self._compare_emails_method = self._default_compare_emails
124         elif method == "headers":
125             self._compare_emails_method = self._compare_emails_by_basic_headers
126         elif method == "existence":
127             self._compare_emails_method = self._compare_emails_by_existence
128         else:
129             raise ValueError("Invalid email comparison method %s" % method)
130     compare_emails_method = property(fset=compare_emails_method)
131
132     def __init__(self, source_path, target_path):
133         """
134         Construct a validator instance.
135
136         :param str source_path: path to find source emails (not sent)
137         :param str target_path: path to find target emails (received)
138
139         .. note:: The comparison method can be redefined using the variety of
140             private method implementations.
141         """
142         self._target_path = target_path
143         self._source_path = source_path
144         self._smtp_sender = "no_source@inject.smtp"
145         self._compare_emails_method = self._default_compare_emails
146
147     def inject_emails(self, username, original_user):
148         """
149         Inject emails from `source_path` to `target_path`.
150
151         This uses the script *restore_mail_inject.pl* which injects the mails
152         using IMAP (as opposed to :py:meth:`inject_smtp`).
153
154         :param str username: username for the mail injection script
155         :param str original_user: original username for the mail injection
156                                   script
157
158         In order to restore acl rights as well put a mailbox.dump file in the
159         source path.
160         """
161         log.info("Injecting emails for user %s", username)
162
163         # inject emails from test data
164         cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \
165               " -s " + self.source_path
166         if original_user != "":
167             cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \
168                    " -o " + original_user
169
170         result = subprocess.check_output(cmd, shell=True)
171         log.debug(result)
172
173     def _prepare_recipients(self, recipients):
174         """
175         Prepare recipient list: ensure list of proper addresses.
176
177         If given a simple string, make a list of strings out of it.
178         If any recipient is just a username, append "@" + localhost to it.
179         Also check that recipients are just email addresses.
180         """
181         hostname = socket.gethostname()
182         if isinstance(recipients, str):
183             recipients = [recipients, ]
184         result = []
185         for recipient in recipients:
186             if '@' in recipient:
187                 result.append(recipient)
188             else:
189                 result.append(recipient + '@' + hostname)
190             for bad_char in '<>"\'':
191                 if bad_char in recipient:
192                     raise ValueError('Recipient must be a "raw" email address,'
193                                      ' not {!r}'.format(recipient))
194         return result
195
196     def inject_smtp(self, usernames, emails):
197         """
198         Inject emails from `source_path` using python's SMTP library.
199
200         As opposed to :py:meth:`inject_emails`, this actually sends the mail
201         to the local mail server (meaning filtering, archiving, ... will
202         happen).
203
204         :param usernames: username(s) of the localhost receiver(s) for each
205                           email or proper email address(es)
206         :type usernames: str or [str]
207         :param emails: paths to files including full emails (header + body)
208                        to be sent to each user
209         :type emails: [str]
210         """
211         recipients = self._prepare_recipients(usernames)
212         log.info("Sending emails to %s", ','.join(recipients))
213         with smtplib.SMTP('localhost') as server:
214             for email in emails:
215                 log.info("Sending email %s", email)
216                 with open(os.path.join(self.source_path, email), 'rb') \
217                         as file_handle:
218                     email_content = file_handle.read()
219                 server.sendmail(self.smtp_sender, recipients, email_content)
220
221         # Wait till SMTP queue is processed
222         arnied_wrapper.wait_for_email_transfer()
223
224     def verify_email_id(self, email, emails_list, timeout, in_target=True):
225         """
226         Verify that the id of an email is present in a list.
227
228         Returns that email's match in this list.
229
230         :param str email: email filename
231         :param emails_list: email among which the first email has to be found
232         :type emails_list: [str]
233         :param int timeout: timeout for extracting the source and target emails
234         :param bool in_target: whether the verified email is on the target side
235
236         If `in_target` is set to True we are getting the target id from the
237         target list of a source email. Otherwise, we assume a target email from
238         a source list.
239         """
240         if in_target:
241             email = self._extract_email_paths(self.source_path, [email],
242                                               timeout)[0]
243             emails_list = self._extract_email_paths(self.target_path,
244                                                     emails_list, timeout)
245         else:
246             email = self._extract_email_paths(self.target_path, [email],
247                                               timeout)[0]
248             emails_list = self._extract_email_paths(self.source_path,
249                                                     emails_list, timeout)
250
251         email_id = self._extract_message_id(email)
252         match = self._find_message_with_id(email_id, emails_list)
253         return os.path.basename(match)
254
255     def verify_emails(self, source_emails, target_emails, timeout):
256         """
257         Check injected e-mails for a user.
258
259         :param source_emails: emails at the source location
260         :type source_emails: [str]
261         :param target_emails: emails at the target (server) location
262         :type target_emails: [str]
263         :param int timeout: timeout for extracting the source and target emails
264         :raises: :py:class:`EmailNotFound` if target email is not found on
265                  server
266         """
267         source_paths = self._extract_email_paths(self.source_path,
268                                                  source_emails, timeout)
269         target_paths = self._extract_email_paths(self.target_path,
270                                                  target_emails, timeout)
271
272         log.info("Verifying emails at %s with %s", self.target_path,
273                  self.source_path)
274         for target in target_paths:
275             log.info("Verifying email %s", target)
276             target_id = self._extract_message_id(target)
277             source = self._find_message_with_id(target_id, source_paths)
278             source_paths.remove(source)
279             self._compare_emails_method(target, source, 1)
280
281         if len(source_paths) > 0:
282             raise EmailNotFound("%s target mails could not be found on server."
283                                 "\n%s"
284                                 % (len(source_paths), "\n".join(source_paths)))
285         else:
286             log.info("All e-mails at %s verified!", self.target_path)
287
288     def assert_header(self, emails, header, present_values=None,
289                       absent_values=None, timeout=30):
290         """
291         Check headers for present and missing strings in a list of messages.
292
293         :param emails: emails whose headers will be checked
294         :type emails: [str]
295         :param str header: header that will be validated for each email
296         :param present_values: strings that have to be present in the header
297         :type present_values: [str] or None
298         :param absent_values: strings that have to be absent in the header
299         :type absent_values: [str] or None
300         :param int timeout: timeout for extracting the source and target emails
301         :raises: :py:class:`InvalidEmailHeader` if email header is not valid
302
303         Every list of present and respectively absent values contains
304         alternative values. At least one of present and one of absent should be
305         satisfied.
306         """
307         target_paths = self._extract_email_paths(self.target_path, emails,
308                                                  timeout)
309         for email_path in target_paths:
310             with open(email_path, "r") as email_file:
311                 verified_email = Parser().parse(email_file, headersonly=True)
312                 log.debug("Extracted email headers:\n%s", verified_email)
313
314             log.info("Checking header '%s' in %s", header, email_path)
315             if not present_values:
316                 present_values = []
317             else:
318                 log.info("for present '%s'", "', '".join(present_values))
319             if not absent_values:
320                 absent_values = []
321             else:
322                 log.info("for absent '%s'", "', '".join(absent_values))
323             present_valid = False
324             for present in present_values:
325                 if present in verified_email[header]:
326                     present_valid = True
327             absent_valid = False
328             for absent in absent_values:
329                 if absent not in verified_email[header]:
330                     absent_valid = True
331
332             if not present_valid and len(present_values) > 0:
333                 raise InvalidEmailHeader("Message header '%s' in %s is not "
334                                          "valid:\n%s"
335                                          % (header, email_path,
336                                             verified_email[header]))
337             if not absent_valid and len(absent_values) > 0:
338                 raise InvalidEmailHeader("Message header '%s' in %s is not "
339                                          "valid:\n%s"
340                                          % (header, email_path,
341                                             verified_email[header]))
342             log.info("Message header '%s' in %s is valid!", header, email_path)
343
344     def assert_content(self, emails, content_type, present_values=None,
345                        absent_values=None, timeout=30):
346         """
347         Check headers for present/missing strings in a list of messages.
348
349         :param emails: emails whose content will be checked
350         :type emails: [str]
351         :param str content_type: type of the content that will be checked for
352                                  values
353         :param present_values: strings that have to be present in the content
354         :type present_values: [str] or None
355         :param absent_values: strings that have to be absent in the content
356         :type absent_values: [str] or None
357         :param int timeout: timeout for extracting the source and target emails
358         :raises: :py:class:`InvalidEmailContent` if email content is not valid
359
360         Every list of present and respectively absent values contains
361         alternative values. At least one of present and one of absent should be
362         satisfied.
363         """
364         target_paths = self._extract_email_paths(self.target_path, emails,
365                                                  timeout)
366         for email_path in target_paths:
367             with open(email_path, "r") as email_file:
368                 verified_email = Parser().parse(email_file)
369             log.debug("Extracted email content:\n%s", verified_email)
370             content = ""
371             for part in verified_email.walk():
372                 log.debug("Extracted %s part while looking for %s",
373                           part.get_content_type(), content_type)
374                 if part.get_content_type() == content_type:
375                     content = part.get_payload(decode=True)
376                     if isinstance(content, bytes):
377                         content = content.decode()
378                     # NOTE: only one such element is expected
379                     break
380
381             log.info("Checking content '%s' in %s", content_type, email_path)
382             if not present_values:
383                 present_values = []
384             else:
385                 log.info("for present '%s'", "', '".join(present_values))
386             if not absent_values:
387                 absent_values = []
388             else:
389                 log.info("for absent '%s'", "', '".join(absent_values))
390             present_valid = False
391             for present in present_values:
392                 if present in content:
393                     present_valid = True
394             absent_valid = False
395             for absent in absent_values:
396                 if absent not in content:
397                     absent_valid = True
398
399             if not present_valid and len(present_values) > 0:
400                 raise InvalidEmailContent("Message content '%s' in %s is not "
401                                           "valid:\n%s"
402                                           % (content_type, email_path, content))
403             if not absent_valid and len(absent_values) > 0:
404                 raise InvalidEmailContent("Message content '%s' in %s is not "
405                                           "valid:\n%s"
406                                           % (content_type, email_path, content))
407             log.info("Message content '%s' in %s is valid!",
408                      content_type, email_path)
409
410     def send_email_with_files(self, usernames, file_list,
411                               wait_for_transfer=True,
412                               autotest_signature=None,
413                               subject="my subject"):
414         """
415         Send a generated email with optional attachments.
416
417         :param usernames: username(s) of the localhost receiver(s) or proper
418                           email address(es)
419         :type usernames: str or [str]
420         :param file_list: files attached to an email; can be empty
421         :type file_list: [str]
422         :param wait_for_transfer: specify whether to wait until arnied_wrapper
423                                   confirms email transfer; you can also specify
424                                   a fixed timeout (seconds)
425         :type wait_for_transfer: bool or int
426         :param autotest_signature: text to insert as value for header
427                                    X-Autotest-Signature for simpler recognition
428                                    of mail (if None do not add header)
429         :type autotest_signature: str or None
430         :param str subject: Subject of created mails
431         """
432         text = 'This is an autogenerated email.\n'
433
434         recipients = self._prepare_recipients(usernames)
435
436         if file_list:   # empty or None or so
437             msg = MIMEMultipart()     # pylint: disable=redefined-variable-type
438             msg.attach(MIMEText(text, _charset='utf-8'))
439         else:
440             msg = MIMEText(text, _charset='utf-8')  # pylint: disable=redefined-variable-type
441         msg['From'] = self.smtp_sender
442         msg['To'] = ', '.join(recipients)
443         msg['Subject'] = subject
444         msg['Date'] = formatdate(localtime=True)
445         msg.preamble = 'This is a multi-part message in MIME format.\n'
446         msg.add_header('X-Autotest-Creator',
447                        self.__class__.__module__ + '.' +
448                        self.__class__.__name__ + '.' +
449                        currentframe().f_code.co_name)
450         # (with help from http://stackoverflow.com/questions/5067604/determine-
451         #      function-name-from-within-that-function-without-using-traceback)
452         if autotest_signature:
453             msg.add_header('X-Autotest-Signature', autotest_signature)
454
455         # attach files
456         for filename in file_list:
457             fullpath = os.path.join(self.source_path, filename)
458
459             # Guess the content type based on the file's extension.  Encoding
460             # will be ignored, although we should check for simple things like
461             # gzip'd or compressed files.
462             ctype, encoding = mimetypes.guess_type(fullpath)
463             if ctype is None or encoding is not None:
464                 # No guess could be made, or the file is encoded (compressed),
465                 # so use a generic bag-of-bits type.
466                 ctype = 'application/octet-stream'
467
468             maintype, subtype = ctype.split('/', 1)
469             log.debug("Creating message containing file {} of mime type {}"
470                       .format(filename, ctype))
471             part = None
472             if maintype == 'text':
473                 with open(fullpath, 'rt') as file_handle:
474                     # Note: we should handle calculating the charset
475                     part = MIMEText(file_handle.read(), _subtype=subtype)   # pylint:disable=redefined-variable-type
476             elif maintype == 'image':
477                 with open(fullpath, 'rb') as file_handle:
478                     part = MIMEImage(file_handle.read(), _subtype=subtype)  # pylint:disable=redefined-variable-type
479             elif maintype == 'audio':
480                 with open(fullpath, 'rb') as file_handle:
481                     part = MIMEAudio(file_handle.read(), _subtype=subtype)  # pylint:disable=redefined-variable-type
482             else:
483                 part = MIMEBase(maintype, subtype)                          # pylint:disable=redefined-variable-type
484                 with open(fullpath, 'rb') as file_handle:
485                     part.set_payload(file_handle.read())
486                 # Encode the payload using Base64
487                 encode_base64(part)
488             # Set the filename parameter
489             part.add_header('Content-Disposition', 'attachment',
490                             filename=filename)
491             msg.attach(part)
492
493         log.debug("Message successfully created")
494         # send via SMTP
495
496         log.debug("Sending message from %s to %s"
497                   % (self.smtp_sender, ', '.join(recipients)))
498         with smtplib.SMTP('localhost') as server:
499             server.sendmail(self.smtp_sender, recipients, msg.as_string())
500
501         # wait for transfer; complicated by isinstance(False, int) == True
502         if wait_for_transfer is False:
503             pass
504         elif wait_for_transfer is True:
505             arnied_wrapper.wait_for_email_transfer()
506         else:
507             arnied_wrapper.wait_for_email_transfer(timeout=wait_for_transfer)
508
509     def _extract_email_paths(self, path, emails, timeout):
510         """Check and return the absolute paths of a list of emails."""
511         log.debug("Extracting messages %s", emails)
512         if len(emails) == 0:
513             emails = os.listdir(path)
514         email_paths = []
515         for expected_email in emails:
516             # TODO: this can be improved by matching the emails themselves
517             if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index",
518                                   "Entw&APw-rfe", "Gesendete Elemente",
519                                   "Gel&APY-schte Elemente", "mailboxes.dump",
520                                   "tmp"]:
521                 continue
522             email_path = os.path.join(path, expected_email)
523             for i in range(timeout):
524                 if os.path.isfile(email_path):
525                     email_paths.append(email_path)
526                     break
527                 elif i == timeout - 1:
528                     raise EmailNotFound("Target message %s could not be found "
529                                         "on server at %s within %ss"
530                                         % (expected_email, path, timeout))
531                 time.sleep(1)
532         log.debug("%s mails extracted at %s.", len(email_paths), path)
533         return email_paths
534
535     def _find_message_with_id(self, message_id, message_paths):
536         """Find message with id among a list of message paths."""
537         log.debug("Looking for a match for the message with id %s", message_id)
538         for message_path in message_paths:
539             extracted_id = self._extract_message_id(message_path)
540             log.debug("Extracted id %s from candidate %s", extracted_id,
541                       message_path)
542             if message_id == extracted_id:
543                 log.debug("Found match at %s", message_path)
544                 return message_path
545         raise MismatchedEmailID("The message with id %s could not be matched "
546                                 "or wasn't expected among %s"
547                                 % (message_id, ", ".join(message_paths)))
548
549     def _extract_message_id(self, message_path):
550         """
551         Given a message file path extract the Message-ID.
552
553         :raises: :py:class:`MissingEmailID` if no Message-ID was found.
554         """
555         message_id = ""
556         with open(message_path, errors='ignore') as file_handle:
557             content = file_handle.read()
558         for line in content.split("\n"):
559             match_id = re.match("Autotest-Message-ID: (.+)", line)
560             if match_id is not None:
561                 message_id = match_id.group(1).rstrip('\r\n')
562         if message_id == "":
563             raise MissingEmailID(f"No id was found in target message {message_path}, "
564                                  f"so it cannot be properly matched")
565         return message_id
566
567     def _default_compare_emails(self, source_email_path, target_email_path,
568                                 tolerance=1):
569         """
570         Compare target emails with source ones.
571
572         Uses python provided diff functionality to compare complete mail files.
573         """
574         with open(source_email_path, "r") as source_email_file:
575             source_email = source_email_file.read()
576         with open(target_email_path, "r") as target_email_file:
577             target_email = target_email_file.read()
578         matcher = difflib.SequenceMatcher(None, source_email, target_email)
579         diffratio = matcher.ratio()
580         log.debug("Target message comparison ratio is %s.", diffratio)
581         # log.info("%s $$$ %s", source_email, target_email)
582         if diffratio < tolerance:
583             raise EmailMismatch("Target message is too different from the "
584                                 "source (difference %s < tolerance %s).",
585                                 diffratio, tolerance)
586
587     def _compare_emails_by_basic_headers(self, source_email_path,
588                                          target_email_path, tolerance=1):
589         """
590         Compare target emails with source ones.
591
592         Uses python provided diff functionality to compare headers and mail
593         "body".
594
595         Argument `tolerance` not used!
596         """
597         with open(source_email_path, errors="ignore") as file_handle:
598             source_email = Parser().parse(file_handle)
599             source_body = ""
600             for part in source_email.walk():
601                 if part.get_content_type() in ["text/plain", "text/html"]:
602                     source_body = part.get_payload()
603                     break
604
605         with open(target_email_path, errors="ignore") as file_handle:
606             target_email = Parser().parse(file_handle)
607             target_body = ""
608             for part in target_email.walk():
609                 if part.get_content_type() in ["text/plain", "text/html"]:
610                     target_body = part.get_payload()
611                     break
612
613         if source_email['From'] != target_email['From']:
614             raise EmailMismatch("Target message sender %s is too different "
615                                 "from the source one %s" %
616                                 (target_email['From'], source_email['From']))
617         if source_email['To'] != target_email['To']:
618             raise EmailMismatch("Target message recipient %s is too different "
619                                 "from the source one %s" %
620                                 (target_email['To'], source_email['To']))
621         if source_email['Subject'] != target_email['Subject']:
622             raise EmailMismatch("Target message subject '%s' is too different "
623                                 "from the source one '%s'" %
624                                 (target_email['Subject'],
625                                  source_email['Subject']))
626         if source_email['Date'] != target_email['Date']:
627             raise EmailMismatch("Target message date %s is too different from "
628                                 "the source one %s" %
629                                 (target_email['Date'], source_email['Date']))
630         if source_body != target_body:
631             raise EmailMismatch("Target message body '%s' is too different "
632                                 "from the source one '%s'" %
633                                 (target_body, source_body))
634
635     def _compare_emails_by_existence(self, source_email_path,
636                                      target_email_path, tolerance=1):
637         """
638         Weak email validation based only on presence of file.
639
640         DOES NOT CHECK ANYTHING!
641         """
642         return True