Remove api doc headers
[pyi2ncommon] / src / mail_validator.py
CommitLineData
67177844
CH
1# This Python file uses the following encoding: utf-8
2
3# The software in this package is distributed under the GNU General
4# Public License version 2 (with a special exception described below).
5#
6# A copy of GNU General Public License (GPL) is included in this distribution,
7# in the file COPYING.GPL.
8#
9# As a special exception, if other files instantiate templates or use macros
10# or inline functions from this file, or you compile this file and link it
11# with other works to produce a work based on this file, this file
12# does not by itself cause the resulting work to be covered
13# by the GNU General Public License.
14#
15# However the source code for this file must still be made available
16# in accordance with section (3) of the GNU General Public License.
17#
18# This exception does not invalidate any other reasons why a work based
19# on this file might be covered by the GNU General Public License.
20#
21# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
22
23"""
67177844
CH
24Class :py:class:`MailValidator`, a fully-featured email sender and checker.
25
26Copyright: Intra2net AG
67177844
CH
27"""
28
29import time
30import os
31import difflib
32import socket
33from inspect import currentframe
34import re
35import subprocess
36import logging
37
38import smtplib
39from email.mime.audio import MIMEAudio
40from email.mime.base import MIMEBase
41from email.mime.image import MIMEImage
42from email.mime.multipart import MIMEMultipart
43from email.mime.text import MIMEText
44from email.encoders import encode_base64
45from email.utils import formatdate
46from email.parser import Parser
47import mimetypes
48
49from . import arnied_wrapper
50
51log = logging.getLogger('pyi2ncommon.mail_utils')
52
53
54class EmailException(Exception):
55 """Base class for custom exceptions raised from `MailValidator`."""
56
57 pass
58
59
60class EmailNotFound(EmailException): # pylint: disable=missing-docstring
61 pass
62
63
64class InvalidEmailHeader(EmailException): # pylint: disable=missing-docstring
65 pass
66
67
68class InvalidEmailContent(EmailException): # pylint: disable=missing-docstring
69 pass
70
71
72class EmailIDError(EmailException): # pylint: disable=missing-docstring
73 pass
74
75
76class MismatchedEmailID(EmailIDError): # pylint: disable=missing-docstring
77 pass
78
79
80class MissingEmailID(EmailIDError): # pylint: disable=missing-docstring
81 pass
82
83
84class EmailMismatch(EmailException): # pylint: disable=missing-docstring
85 pass
86
87
7628bc48 88class MailValidator:
67177844
CH
89 """Class for validation of emails."""
90
91 def target_path(self, new_value=None):
92 """Getter/Setter for property `target_path`."""
93 if new_value is not None:
94 self._target_path = new_value
95 else:
96 return self._target_path
97 target_path = property(target_path, target_path)
98
99 def source_path(self, new_value=None):
100 """Getter/Setter for property `source_path`."""
101 if new_value is not None:
102 self._source_path = new_value
103 else:
104 return self._source_path
105 source_path = property(source_path, source_path)
106
107 def smtp_sender(self, new_value=None):
108 """Getter/Setter for property `smtp_sender`."""
109 if new_value is not None:
110 self._smtp_sender = new_value
111 else:
112 return self._smtp_sender
113 smtp_sender = property(smtp_sender, smtp_sender)
114
115 def compare_emails_method(self, method="basic"):
116 """
117 Set email comparison method for validation.
118
119 :param str method: one of "basic", "headers"
120 :raises: :py:class:`ValueError` if chosen method is invalid
121 """
122 if method == "basic":
123 self._compare_emails_method = self._default_compare_emails
124 elif method == "headers":
125 self._compare_emails_method = self._compare_emails_by_basic_headers
126 elif method == "existence":
127 self._compare_emails_method = self._compare_emails_by_existence
128 else:
129 raise ValueError("Invalid email comparison method %s" % method)
130 compare_emails_method = property(fset=compare_emails_method)
131
132 def __init__(self, source_path, target_path):
133 """
134 Construct a validator instance.
135
136 :param str source_path: path to find source emails (not sent)
137 :param str target_path: path to find target emails (received)
138
139 .. note:: The comparison method can be redefined using the variety of
140 private method implementations.
141 """
142 self._target_path = target_path
143 self._source_path = source_path
144 self._smtp_sender = "no_source@inject.smtp"
145 self._compare_emails_method = self._default_compare_emails
146
147 def inject_emails(self, username, original_user):
148 """
149 Inject emails from `source_path` to `target_path`.
150
ec0c7450
CH
151 This uses the script *restore_mail_inject.pl* which injects the mails
152 using IMAP (as opposed to :py:meth:`inject_smtp`).
153
67177844
CH
154 :param str username: username for the mail injection script
155 :param str original_user: original username for the mail injection
156 script
157
158 In order to restore acl rights as well put a mailbox.dump file in the
159 source path.
160 """
161 log.info("Injecting emails for user %s", username)
162
163 # inject emails from test data
164 cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \
165 " -s " + self.source_path
166 if original_user != "":
167 cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \
168 " -o " + original_user
169
170 result = subprocess.check_output(cmd, shell=True)
171 log.debug(result)
172
ec0c7450
CH
173 def _prepare_recipients(self, recipients):
174 """
175 Prepare recipient list: ensure list of proper addresses.
176
177 If given a simple string, make a list of strings out of it.
178 If any recipient is just a username, append "@" + localhost to it.
179 Also check that recipients are just email addresses.
180 """
181 hostname = socket.gethostname()
182 if isinstance(recipients, str):
183 recipients = [recipients, ]
184 result = []
185 for recipient in recipients:
186 if '@' in recipient:
187 result.append(recipient)
188 else:
189 result.append(recipient + '@' + hostname)
190 for bad_char in '<>"\'':
191 if bad_char in recipient:
192 raise ValueError('Recipient must be a "raw" email address,'
193 ' not {!r}'.format(recipient))
194 return result
195
67177844
CH
196 def inject_smtp(self, usernames, emails):
197 """
198 Inject emails from `source_path` using python's SMTP library.
199
ec0c7450
CH
200 As opposed to :py:meth:`inject_emails`, this actually sends the mail
201 to the local mail server (meaning filtering, archiving, ... will
202 happen).
203
204 :param usernames: username(s) of the localhost receiver(s) for each
205 email or proper email address(es)
206 :type usernames: str or [str]
207 :param emails: paths to files including full emails (header + body)
208 to be sent to each user
67177844
CH
209 :type emails: [str]
210 """
ec0c7450
CH
211 recipients = self._prepare_recipients(usernames)
212 log.info("Sending emails to %s", ','.join(recipients))
67177844 213 with smtplib.SMTP('localhost') as server:
67177844
CH
214 for email in emails:
215 log.info("Sending email %s", email)
216 with open(os.path.join(self.source_path, email), 'rb') \
217 as file_handle:
218 email_content = file_handle.read()
ec0c7450 219 server.sendmail(self.smtp_sender, recipients, email_content)
67177844
CH
220
221 # Wait till SMTP queue is processed
222 arnied_wrapper.wait_for_email_transfer()
223
224 def verify_email_id(self, email, emails_list, timeout, in_target=True):
225 """
226 Verify that the id of an email is present in a list.
227
228 Returns that email's match in this list.
229
230 :param str email: email filename
231 :param emails_list: email among which the first email has to be found
232 :type emails_list: [str]
233 :param int timeout: timeout for extracting the source and target emails
234 :param bool in_target: whether the verified email is on the target side
235
236 If `in_target` is set to True we are getting the target id from the
7628bc48 237 target list of a source email. Otherwise, we assume a target email from
67177844
CH
238 a source list.
239 """
240 if in_target:
241 email = self._extract_email_paths(self.source_path, [email],
242 timeout)[0]
243 emails_list = self._extract_email_paths(self.target_path,
244 emails_list, timeout)
245 else:
246 email = self._extract_email_paths(self.target_path, [email],
247 timeout)[0]
248 emails_list = self._extract_email_paths(self.source_path,
249 emails_list, timeout)
250
251 email_id = self._extract_message_id(email)
252 match = self._find_message_with_id(email_id, emails_list)
253 return os.path.basename(match)
254
255 def verify_emails(self, source_emails, target_emails, timeout):
256 """
257 Check injected e-mails for a user.
258
259 :param source_emails: emails at the source location
260 :type source_emails: [str]
261 :param target_emails: emails at the target (server) location
262 :type target_emails: [str]
263 :param int timeout: timeout for extracting the source and target emails
264 :raises: :py:class:`EmailNotFound` if target email is not found on
265 server
266 """
267 source_paths = self._extract_email_paths(self.source_path,
268 source_emails, timeout)
269 target_paths = self._extract_email_paths(self.target_path,
270 target_emails, timeout)
271
272 log.info("Verifying emails at %s with %s", self.target_path,
273 self.source_path)
274 for target in target_paths:
275 log.info("Verifying email %s", target)
276 target_id = self._extract_message_id(target)
277 source = self._find_message_with_id(target_id, source_paths)
278 source_paths.remove(source)
279 self._compare_emails_method(target, source, 1)
280
281 if len(source_paths) > 0:
282 raise EmailNotFound("%s target mails could not be found on server."
283 "\n%s"
284 % (len(source_paths), "\n".join(source_paths)))
285 else:
286 log.info("All e-mails at %s verified!", self.target_path)
287
288 def assert_header(self, emails, header, present_values=None,
289 absent_values=None, timeout=30):
290 """
291 Check headers for present and missing strings in a list of messages.
292
293 :param emails: emails whose headers will be checked
294 :type emails: [str]
295 :param str header: header that will be validated for each email
296 :param present_values: strings that have to be present in the header
297 :type present_values: [str] or None
298 :param absent_values: strings that have to be absent in the header
299 :type absent_values: [str] or None
300 :param int timeout: timeout for extracting the source and target emails
301 :raises: :py:class:`InvalidEmailHeader` if email header is not valid
302
303 Every list of present and respectively absent values contains
304 alternative values. At least one of present and one of absent should be
305 satisfied.
306 """
307 target_paths = self._extract_email_paths(self.target_path, emails,
308 timeout)
309 for email_path in target_paths:
310 with open(email_path, "r") as email_file:
311 verified_email = Parser().parse(email_file, headersonly=True)
312 log.debug("Extracted email headers:\n%s", verified_email)
313
314 log.info("Checking header '%s' in %s", header, email_path)
315 if not present_values:
316 present_values = []
317 else:
318 log.info("for present '%s'", "', '".join(present_values))
319 if not absent_values:
320 absent_values = []
321 else:
322 log.info("for absent '%s'", "', '".join(absent_values))
323 present_valid = False
324 for present in present_values:
325 if present in verified_email[header]:
326 present_valid = True
327 absent_valid = False
328 for absent in absent_values:
329 if absent not in verified_email[header]:
330 absent_valid = True
331
332 if not present_valid and len(present_values) > 0:
333 raise InvalidEmailHeader("Message header '%s' in %s is not "
334 "valid:\n%s"
335 % (header, email_path,
336 verified_email[header]))
337 if not absent_valid and len(absent_values) > 0:
338 raise InvalidEmailHeader("Message header '%s' in %s is not "
339 "valid:\n%s"
340 % (header, email_path,
341 verified_email[header]))
342 log.info("Message header '%s' in %s is valid!", header, email_path)
343
344 def assert_content(self, emails, content_type, present_values=None,
345 absent_values=None, timeout=30):
346 """
347 Check headers for present/missing strings in a list of messages.
348
349 :param emails: emails whose content will be checked
350 :type emails: [str]
351 :param str content_type: type of the content that will be checked for
352 values
353 :param present_values: strings that have to be present in the content
354 :type present_values: [str] or None
355 :param absent_values: strings that have to be absent in the content
356 :type absent_values: [str] or None
357 :param int timeout: timeout for extracting the source and target emails
358 :raises: :py:class:`InvalidEmailContent` if email content is not valid
359
360 Every list of present and respectively absent values contains
361 alternative values. At least one of present and one of absent should be
362 satisfied.
363 """
364 target_paths = self._extract_email_paths(self.target_path, emails,
365 timeout)
366 for email_path in target_paths:
367 with open(email_path, "r") as email_file:
368 verified_email = Parser().parse(email_file)
369 log.debug("Extracted email content:\n%s", verified_email)
370 content = ""
371 for part in verified_email.walk():
372 log.debug("Extracted %s part while looking for %s",
373 part.get_content_type(), content_type)
374 if part.get_content_type() == content_type:
375 content = part.get_payload(decode=True)
376 if isinstance(content, bytes):
377 content = content.decode()
378 # NOTE: only one such element is expected
379 break
380
381 log.info("Checking content '%s' in %s", content_type, email_path)
382 if not present_values:
383 present_values = []
384 else:
385 log.info("for present '%s'", "', '".join(present_values))
386 if not absent_values:
387 absent_values = []
388 else:
389 log.info("for absent '%s'", "', '".join(absent_values))
390 present_valid = False
391 for present in present_values:
392 if present in content:
393 present_valid = True
394 absent_valid = False
395 for absent in absent_values:
396 if absent not in content:
397 absent_valid = True
398
399 if not present_valid and len(present_values) > 0:
400 raise InvalidEmailContent("Message content '%s' in %s is not "
401 "valid:\n%s"
402 % (content_type, email_path, content))
403 if not absent_valid and len(absent_values) > 0:
404 raise InvalidEmailContent("Message content '%s' in %s is not "
405 "valid:\n%s"
406 % (content_type, email_path, content))
407 log.info("Message content '%s' in %s is valid!",
408 content_type, email_path)
409
ec0c7450 410 def send_email_with_files(self, usernames, file_list,
67177844
CH
411 wait_for_transfer=True,
412 autotest_signature=None,
413 subject="my subject"):
414 """
ec0c7450 415 Send a generated email with optional attachments.
67177844 416
ec0c7450
CH
417 :param usernames: username(s) of the localhost receiver(s) or proper
418 email address(es)
419 :type usernames: str or [str]
420 :param file_list: files attached to an email; can be empty
67177844
CH
421 :type file_list: [str]
422 :param wait_for_transfer: specify whether to wait until arnied_wrapper
423 confirms email transfer; you can also specify
424 a fixed timeout (seconds)
425 :type wait_for_transfer: bool or int
426 :param autotest_signature: text to insert as value for header
427 X-Autotest-Signature for simpler recognition
428 of mail (if None do not add header)
429 :type autotest_signature: str or None
7628bc48 430 :param str subject: Subject of created mails
67177844
CH
431 """
432 text = 'This is an autogenerated email.\n'
433
ec0c7450 434 recipients = self._prepare_recipients(usernames)
67177844
CH
435
436 if file_list: # empty or None or so
437 msg = MIMEMultipart() # pylint: disable=redefined-variable-type
438 msg.attach(MIMEText(text, _charset='utf-8'))
439 else:
440 msg = MIMEText(text, _charset='utf-8') # pylint: disable=redefined-variable-type
441 msg['From'] = self.smtp_sender
ec0c7450 442 msg['To'] = ', '.join(recipients)
67177844
CH
443 msg['Subject'] = subject
444 msg['Date'] = formatdate(localtime=True)
445 msg.preamble = 'This is a multi-part message in MIME format.\n'
446 msg.add_header('X-Autotest-Creator',
447 self.__class__.__module__ + '.' +
448 self.__class__.__name__ + '.' +
449 currentframe().f_code.co_name)
450 # (with help from http://stackoverflow.com/questions/5067604/determine-
451 # function-name-from-within-that-function-without-using-traceback)
452 if autotest_signature:
453 msg.add_header('X-Autotest-Signature', autotest_signature)
454
455 # attach files
456 for filename in file_list:
457 fullpath = os.path.join(self.source_path, filename)
458
459 # Guess the content type based on the file's extension. Encoding
460 # will be ignored, although we should check for simple things like
461 # gzip'd or compressed files.
462 ctype, encoding = mimetypes.guess_type(fullpath)
463 if ctype is None or encoding is not None:
464 # No guess could be made, or the file is encoded (compressed),
465 # so use a generic bag-of-bits type.
466 ctype = 'application/octet-stream'
467
468 maintype, subtype = ctype.split('/', 1)
469 log.debug("Creating message containing file {} of mime type {}"
470 .format(filename, ctype))
471 part = None
472 if maintype == 'text':
473 with open(fullpath, 'rt') as file_handle:
474 # Note: we should handle calculating the charset
475 part = MIMEText(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
476 elif maintype == 'image':
477 with open(fullpath, 'rb') as file_handle:
478 part = MIMEImage(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
479 elif maintype == 'audio':
480 with open(fullpath, 'rb') as file_handle:
481 part = MIMEAudio(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
482 else:
483 part = MIMEBase(maintype, subtype) # pylint:disable=redefined-variable-type
484 with open(fullpath, 'rb') as file_handle:
485 part.set_payload(file_handle.read())
486 # Encode the payload using Base64
487 encode_base64(part)
488 # Set the filename parameter
489 part.add_header('Content-Disposition', 'attachment',
490 filename=filename)
491 msg.attach(part)
492
493 log.debug("Message successfully created")
494 # send via SMTP
495
ec0c7450
CH
496 log.debug("Sending message from %s to %s"
497 % (self.smtp_sender, ', '.join(recipients)))
67177844 498 with smtplib.SMTP('localhost') as server:
ec0c7450 499 server.sendmail(self.smtp_sender, recipients, msg.as_string())
67177844
CH
500
501 # wait for transfer; complicated by isinstance(False, int) == True
502 if wait_for_transfer is False:
503 pass
504 elif wait_for_transfer is True:
505 arnied_wrapper.wait_for_email_transfer()
506 else:
507 arnied_wrapper.wait_for_email_transfer(timeout=wait_for_transfer)
508
509 def _extract_email_paths(self, path, emails, timeout):
510 """Check and return the absolute paths of a list of emails."""
511 log.debug("Extracting messages %s", emails)
512 if len(emails) == 0:
513 emails = os.listdir(path)
514 email_paths = []
515 for expected_email in emails:
516 # TODO: this can be improved by matching the emails themselves
517 if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index",
a4bb1cea 518 "Entw&APw-rfe", "Gesendete Elemente",
67177844
CH
519 "Gel&APY-schte Elemente", "mailboxes.dump",
520 "tmp"]:
521 continue
522 email_path = os.path.join(path, expected_email)
523 for i in range(timeout):
524 if os.path.isfile(email_path):
525 email_paths.append(email_path)
526 break
527 elif i == timeout - 1:
528 raise EmailNotFound("Target message %s could not be found "
529 "on server at %s within %ss"
530 % (expected_email, path, timeout))
531 time.sleep(1)
532 log.debug("%s mails extracted at %s.", len(email_paths), path)
533 return email_paths
534
535 def _find_message_with_id(self, message_id, message_paths):
536 """Find message with id among a list of message paths."""
537 log.debug("Looking for a match for the message with id %s", message_id)
538 for message_path in message_paths:
539 extracted_id = self._extract_message_id(message_path)
540 log.debug("Extracted id %s from candidate %s", extracted_id,
541 message_path)
542 if message_id == extracted_id:
543 log.debug("Found match at %s", message_path)
544 return message_path
545 raise MismatchedEmailID("The message with id %s could not be matched "
546 "or wasn't expected among %s"
547 % (message_id, ", ".join(message_paths)))
548
549 def _extract_message_id(self, message_path):
550 """
551 Given a message file path extract the Message-ID.
552
553 :raises: :py:class:`MissingEmailID` if no Message-ID was found.
554 """
555 message_id = ""
556 with open(message_path, errors='ignore') as file_handle:
557 content = file_handle.read()
558 for line in content.split("\n"):
559 match_id = re.match("Autotest-Message-ID: (.+)", line)
560 if match_id is not None:
561 message_id = match_id.group(1).rstrip('\r\n')
562 if message_id == "":
7628bc48
CH
563 raise MissingEmailID(f"No id was found in target message {message_path}, "
564 f"so it cannot be properly matched")
67177844
CH
565 return message_id
566
567 def _default_compare_emails(self, source_email_path, target_email_path,
568 tolerance=1):
569 """
570 Compare target emails with source ones.
571
572 Uses python provided diff functionality to compare complete mail files.
573 """
574 with open(source_email_path, "r") as source_email_file:
575 source_email = source_email_file.read()
576 with open(target_email_path, "r") as target_email_file:
577 target_email = target_email_file.read()
578 matcher = difflib.SequenceMatcher(None, source_email, target_email)
579 diffratio = matcher.ratio()
580 log.debug("Target message comparison ratio is %s.", diffratio)
581 # log.info("%s $$$ %s", source_email, target_email)
582 if diffratio < tolerance:
583 raise EmailMismatch("Target message is too different from the "
584 "source (difference %s < tolerance %s).",
585 diffratio, tolerance)
586
587 def _compare_emails_by_basic_headers(self, source_email_path,
588 target_email_path, tolerance=1):
589 """
590 Compare target emails with source ones.
591
592 Uses python provided diff functionality to compare headers and mail
593 "body".
594
595 Argument `tolerance` not used!
596 """
597 with open(source_email_path, errors="ignore") as file_handle:
598 source_email = Parser().parse(file_handle)
599 source_body = ""
600 for part in source_email.walk():
601 if part.get_content_type() in ["text/plain", "text/html"]:
602 source_body = part.get_payload()
603 break
604
605 with open(target_email_path, errors="ignore") as file_handle:
606 target_email = Parser().parse(file_handle)
607 target_body = ""
608 for part in target_email.walk():
609 if part.get_content_type() in ["text/plain", "text/html"]:
610 target_body = part.get_payload()
611 break
612
613 if source_email['From'] != target_email['From']:
614 raise EmailMismatch("Target message sender %s is too different "
615 "from the source one %s" %
616 (target_email['From'], source_email['From']))
617 if source_email['To'] != target_email['To']:
618 raise EmailMismatch("Target message recipient %s is too different "
619 "from the source one %s" %
620 (target_email['To'], source_email['To']))
621 if source_email['Subject'] != target_email['Subject']:
622 raise EmailMismatch("Target message subject '%s' is too different "
623 "from the source one '%s'" %
624 (target_email['Subject'],
625 source_email['Subject']))
626 if source_email['Date'] != target_email['Date']:
627 raise EmailMismatch("Target message date %s is too different from "
628 "the source one %s" %
629 (target_email['Date'], source_email['Date']))
630 if source_body != target_body:
631 raise EmailMismatch("Target message body '%s' is too different "
632 "from the source one '%s'" %
633 (target_body, source_body))
634
635 def _compare_emails_by_existence(self, source_email_path,
636 target_email_path, tolerance=1):
637 """
638 Weak email validation based only on presence of file.
639
640 DOES NOT CHECK ANYTHING!
641 """
642 return True