Clean up, remove compat with py < 3.6
[pyi2ncommon] / src / mail_validator.py
CommitLineData
67177844
CH
1# This Python file uses the following encoding: utf-8
2
3# The software in this package is distributed under the GNU General
4# Public License version 2 (with a special exception described below).
5#
6# A copy of GNU General Public License (GPL) is included in this distribution,
7# in the file COPYING.GPL.
8#
9# As a special exception, if other files instantiate templates or use macros
10# or inline functions from this file, or you compile this file and link it
11# with other works to produce a work based on this file, this file
12# does not by itself cause the resulting work to be covered
13# by the GNU General Public License.
14#
15# However the source code for this file must still be made available
16# in accordance with section (3) of the GNU General Public License.
17#
18# This exception does not invalidate any other reasons why a work based
19# on this file might be covered by the GNU General Public License.
20#
21# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
22
23"""
24
25SUMMARY
26------------------------------------------------------
27Class :py:class:`MailValidator`, a fully-featured email sender and checker.
28
29Copyright: Intra2net AG
30
31
32INTERFACE
33------------------------------------------------------
34
35"""
36
37import time
38import os
39import difflib
40import socket
41from inspect import currentframe
42import re
43import subprocess
44import logging
45
46import smtplib
47from email.mime.audio import MIMEAudio
48from email.mime.base import MIMEBase
49from email.mime.image import MIMEImage
50from email.mime.multipart import MIMEMultipart
51from email.mime.text import MIMEText
52from email.encoders import encode_base64
53from email.utils import formatdate
54from email.parser import Parser
55import mimetypes
56
57from . import arnied_wrapper
58
59log = logging.getLogger('pyi2ncommon.mail_utils')
60
61
62class EmailException(Exception):
63 """Base class for custom exceptions raised from `MailValidator`."""
64
65 pass
66
67
68class EmailNotFound(EmailException): # pylint: disable=missing-docstring
69 pass
70
71
72class InvalidEmailHeader(EmailException): # pylint: disable=missing-docstring
73 pass
74
75
76class InvalidEmailContent(EmailException): # pylint: disable=missing-docstring
77 pass
78
79
80class EmailIDError(EmailException): # pylint: disable=missing-docstring
81 pass
82
83
84class MismatchedEmailID(EmailIDError): # pylint: disable=missing-docstring
85 pass
86
87
88class MissingEmailID(EmailIDError): # pylint: disable=missing-docstring
89 pass
90
91
92class EmailMismatch(EmailException): # pylint: disable=missing-docstring
93 pass
94
95
7628bc48 96class MailValidator:
67177844
CH
97 """Class for validation of emails."""
98
99 def target_path(self, new_value=None):
100 """Getter/Setter for property `target_path`."""
101 if new_value is not None:
102 self._target_path = new_value
103 else:
104 return self._target_path
105 target_path = property(target_path, target_path)
106
107 def source_path(self, new_value=None):
108 """Getter/Setter for property `source_path`."""
109 if new_value is not None:
110 self._source_path = new_value
111 else:
112 return self._source_path
113 source_path = property(source_path, source_path)
114
115 def smtp_sender(self, new_value=None):
116 """Getter/Setter for property `smtp_sender`."""
117 if new_value is not None:
118 self._smtp_sender = new_value
119 else:
120 return self._smtp_sender
121 smtp_sender = property(smtp_sender, smtp_sender)
122
123 def compare_emails_method(self, method="basic"):
124 """
125 Set email comparison method for validation.
126
127 :param str method: one of "basic", "headers"
128 :raises: :py:class:`ValueError` if chosen method is invalid
129 """
130 if method == "basic":
131 self._compare_emails_method = self._default_compare_emails
132 elif method == "headers":
133 self._compare_emails_method = self._compare_emails_by_basic_headers
134 elif method == "existence":
135 self._compare_emails_method = self._compare_emails_by_existence
136 else:
137 raise ValueError("Invalid email comparison method %s" % method)
138 compare_emails_method = property(fset=compare_emails_method)
139
140 def __init__(self, source_path, target_path):
141 """
142 Construct a validator instance.
143
144 :param str source_path: path to find source emails (not sent)
145 :param str target_path: path to find target emails (received)
146
147 .. note:: The comparison method can be redefined using the variety of
148 private method implementations.
149 """
150 self._target_path = target_path
151 self._source_path = source_path
152 self._smtp_sender = "no_source@inject.smtp"
153 self._compare_emails_method = self._default_compare_emails
154
155 def inject_emails(self, username, original_user):
156 """
157 Inject emails from `source_path` to `target_path`.
158
ec0c7450
CH
159 This uses the script *restore_mail_inject.pl* which injects the mails
160 using IMAP (as opposed to :py:meth:`inject_smtp`).
161
67177844
CH
162 :param str username: username for the mail injection script
163 :param str original_user: original username for the mail injection
164 script
165
166 In order to restore acl rights as well put a mailbox.dump file in the
167 source path.
168 """
169 log.info("Injecting emails for user %s", username)
170
171 # inject emails from test data
172 cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \
173 " -s " + self.source_path
174 if original_user != "":
175 cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \
176 " -o " + original_user
177
178 result = subprocess.check_output(cmd, shell=True)
179 log.debug(result)
180
ec0c7450
CH
181 def _prepare_recipients(self, recipients):
182 """
183 Prepare recipient list: ensure list of proper addresses.
184
185 If given a simple string, make a list of strings out of it.
186 If any recipient is just a username, append "@" + localhost to it.
187 Also check that recipients are just email addresses.
188 """
189 hostname = socket.gethostname()
190 if isinstance(recipients, str):
191 recipients = [recipients, ]
192 result = []
193 for recipient in recipients:
194 if '@' in recipient:
195 result.append(recipient)
196 else:
197 result.append(recipient + '@' + hostname)
198 for bad_char in '<>"\'':
199 if bad_char in recipient:
200 raise ValueError('Recipient must be a "raw" email address,'
201 ' not {!r}'.format(recipient))
202 return result
203
67177844
CH
204 def inject_smtp(self, usernames, emails):
205 """
206 Inject emails from `source_path` using python's SMTP library.
207
ec0c7450
CH
208 As opposed to :py:meth:`inject_emails`, this actually sends the mail
209 to the local mail server (meaning filtering, archiving, ... will
210 happen).
211
212 :param usernames: username(s) of the localhost receiver(s) for each
213 email or proper email address(es)
214 :type usernames: str or [str]
215 :param emails: paths to files including full emails (header + body)
216 to be sent to each user
67177844
CH
217 :type emails: [str]
218 """
ec0c7450
CH
219 recipients = self._prepare_recipients(usernames)
220 log.info("Sending emails to %s", ','.join(recipients))
67177844 221 with smtplib.SMTP('localhost') as server:
67177844
CH
222 for email in emails:
223 log.info("Sending email %s", email)
224 with open(os.path.join(self.source_path, email), 'rb') \
225 as file_handle:
226 email_content = file_handle.read()
ec0c7450 227 server.sendmail(self.smtp_sender, recipients, email_content)
67177844
CH
228
229 # Wait till SMTP queue is processed
230 arnied_wrapper.wait_for_email_transfer()
231
232 def verify_email_id(self, email, emails_list, timeout, in_target=True):
233 """
234 Verify that the id of an email is present in a list.
235
236 Returns that email's match in this list.
237
238 :param str email: email filename
239 :param emails_list: email among which the first email has to be found
240 :type emails_list: [str]
241 :param int timeout: timeout for extracting the source and target emails
242 :param bool in_target: whether the verified email is on the target side
243
244 If `in_target` is set to True we are getting the target id from the
7628bc48 245 target list of a source email. Otherwise, we assume a target email from
67177844
CH
246 a source list.
247 """
248 if in_target:
249 email = self._extract_email_paths(self.source_path, [email],
250 timeout)[0]
251 emails_list = self._extract_email_paths(self.target_path,
252 emails_list, timeout)
253 else:
254 email = self._extract_email_paths(self.target_path, [email],
255 timeout)[0]
256 emails_list = self._extract_email_paths(self.source_path,
257 emails_list, timeout)
258
259 email_id = self._extract_message_id(email)
260 match = self._find_message_with_id(email_id, emails_list)
261 return os.path.basename(match)
262
263 def verify_emails(self, source_emails, target_emails, timeout):
264 """
265 Check injected e-mails for a user.
266
267 :param source_emails: emails at the source location
268 :type source_emails: [str]
269 :param target_emails: emails at the target (server) location
270 :type target_emails: [str]
271 :param int timeout: timeout for extracting the source and target emails
272 :raises: :py:class:`EmailNotFound` if target email is not found on
273 server
274 """
275 source_paths = self._extract_email_paths(self.source_path,
276 source_emails, timeout)
277 target_paths = self._extract_email_paths(self.target_path,
278 target_emails, timeout)
279
280 log.info("Verifying emails at %s with %s", self.target_path,
281 self.source_path)
282 for target in target_paths:
283 log.info("Verifying email %s", target)
284 target_id = self._extract_message_id(target)
285 source = self._find_message_with_id(target_id, source_paths)
286 source_paths.remove(source)
287 self._compare_emails_method(target, source, 1)
288
289 if len(source_paths) > 0:
290 raise EmailNotFound("%s target mails could not be found on server."
291 "\n%s"
292 % (len(source_paths), "\n".join(source_paths)))
293 else:
294 log.info("All e-mails at %s verified!", self.target_path)
295
296 def assert_header(self, emails, header, present_values=None,
297 absent_values=None, timeout=30):
298 """
299 Check headers for present and missing strings in a list of messages.
300
301 :param emails: emails whose headers will be checked
302 :type emails: [str]
303 :param str header: header that will be validated for each email
304 :param present_values: strings that have to be present in the header
305 :type present_values: [str] or None
306 :param absent_values: strings that have to be absent in the header
307 :type absent_values: [str] or None
308 :param int timeout: timeout for extracting the source and target emails
309 :raises: :py:class:`InvalidEmailHeader` if email header is not valid
310
311 Every list of present and respectively absent values contains
312 alternative values. At least one of present and one of absent should be
313 satisfied.
314 """
315 target_paths = self._extract_email_paths(self.target_path, emails,
316 timeout)
317 for email_path in target_paths:
318 with open(email_path, "r") as email_file:
319 verified_email = Parser().parse(email_file, headersonly=True)
320 log.debug("Extracted email headers:\n%s", verified_email)
321
322 log.info("Checking header '%s' in %s", header, email_path)
323 if not present_values:
324 present_values = []
325 else:
326 log.info("for present '%s'", "', '".join(present_values))
327 if not absent_values:
328 absent_values = []
329 else:
330 log.info("for absent '%s'", "', '".join(absent_values))
331 present_valid = False
332 for present in present_values:
333 if present in verified_email[header]:
334 present_valid = True
335 absent_valid = False
336 for absent in absent_values:
337 if absent not in verified_email[header]:
338 absent_valid = True
339
340 if not present_valid and len(present_values) > 0:
341 raise InvalidEmailHeader("Message header '%s' in %s is not "
342 "valid:\n%s"
343 % (header, email_path,
344 verified_email[header]))
345 if not absent_valid and len(absent_values) > 0:
346 raise InvalidEmailHeader("Message header '%s' in %s is not "
347 "valid:\n%s"
348 % (header, email_path,
349 verified_email[header]))
350 log.info("Message header '%s' in %s is valid!", header, email_path)
351
352 def assert_content(self, emails, content_type, present_values=None,
353 absent_values=None, timeout=30):
354 """
355 Check headers for present/missing strings in a list of messages.
356
357 :param emails: emails whose content will be checked
358 :type emails: [str]
359 :param str content_type: type of the content that will be checked for
360 values
361 :param present_values: strings that have to be present in the content
362 :type present_values: [str] or None
363 :param absent_values: strings that have to be absent in the content
364 :type absent_values: [str] or None
365 :param int timeout: timeout for extracting the source and target emails
366 :raises: :py:class:`InvalidEmailContent` if email content is not valid
367
368 Every list of present and respectively absent values contains
369 alternative values. At least one of present and one of absent should be
370 satisfied.
371 """
372 target_paths = self._extract_email_paths(self.target_path, emails,
373 timeout)
374 for email_path in target_paths:
375 with open(email_path, "r") as email_file:
376 verified_email = Parser().parse(email_file)
377 log.debug("Extracted email content:\n%s", verified_email)
378 content = ""
379 for part in verified_email.walk():
380 log.debug("Extracted %s part while looking for %s",
381 part.get_content_type(), content_type)
382 if part.get_content_type() == content_type:
383 content = part.get_payload(decode=True)
384 if isinstance(content, bytes):
385 content = content.decode()
386 # NOTE: only one such element is expected
387 break
388
389 log.info("Checking content '%s' in %s", content_type, email_path)
390 if not present_values:
391 present_values = []
392 else:
393 log.info("for present '%s'", "', '".join(present_values))
394 if not absent_values:
395 absent_values = []
396 else:
397 log.info("for absent '%s'", "', '".join(absent_values))
398 present_valid = False
399 for present in present_values:
400 if present in content:
401 present_valid = True
402 absent_valid = False
403 for absent in absent_values:
404 if absent not in content:
405 absent_valid = True
406
407 if not present_valid and len(present_values) > 0:
408 raise InvalidEmailContent("Message content '%s' in %s is not "
409 "valid:\n%s"
410 % (content_type, email_path, content))
411 if not absent_valid and len(absent_values) > 0:
412 raise InvalidEmailContent("Message content '%s' in %s is not "
413 "valid:\n%s"
414 % (content_type, email_path, content))
415 log.info("Message content '%s' in %s is valid!",
416 content_type, email_path)
417
ec0c7450 418 def send_email_with_files(self, usernames, file_list,
67177844
CH
419 wait_for_transfer=True,
420 autotest_signature=None,
421 subject="my subject"):
422 """
ec0c7450 423 Send a generated email with optional attachments.
67177844 424
ec0c7450
CH
425 :param usernames: username(s) of the localhost receiver(s) or proper
426 email address(es)
427 :type usernames: str or [str]
428 :param file_list: files attached to an email; can be empty
67177844
CH
429 :type file_list: [str]
430 :param wait_for_transfer: specify whether to wait until arnied_wrapper
431 confirms email transfer; you can also specify
432 a fixed timeout (seconds)
433 :type wait_for_transfer: bool or int
434 :param autotest_signature: text to insert as value for header
435 X-Autotest-Signature for simpler recognition
436 of mail (if None do not add header)
437 :type autotest_signature: str or None
7628bc48 438 :param str subject: Subject of created mails
67177844
CH
439 """
440 text = 'This is an autogenerated email.\n'
441
ec0c7450 442 recipients = self._prepare_recipients(usernames)
67177844
CH
443
444 if file_list: # empty or None or so
445 msg = MIMEMultipart() # pylint: disable=redefined-variable-type
446 msg.attach(MIMEText(text, _charset='utf-8'))
447 else:
448 msg = MIMEText(text, _charset='utf-8') # pylint: disable=redefined-variable-type
449 msg['From'] = self.smtp_sender
ec0c7450 450 msg['To'] = ', '.join(recipients)
67177844
CH
451 msg['Subject'] = subject
452 msg['Date'] = formatdate(localtime=True)
453 msg.preamble = 'This is a multi-part message in MIME format.\n'
454 msg.add_header('X-Autotest-Creator',
455 self.__class__.__module__ + '.' +
456 self.__class__.__name__ + '.' +
457 currentframe().f_code.co_name)
458 # (with help from http://stackoverflow.com/questions/5067604/determine-
459 # function-name-from-within-that-function-without-using-traceback)
460 if autotest_signature:
461 msg.add_header('X-Autotest-Signature', autotest_signature)
462
463 # attach files
464 for filename in file_list:
465 fullpath = os.path.join(self.source_path, filename)
466
467 # Guess the content type based on the file's extension. Encoding
468 # will be ignored, although we should check for simple things like
469 # gzip'd or compressed files.
470 ctype, encoding = mimetypes.guess_type(fullpath)
471 if ctype is None or encoding is not None:
472 # No guess could be made, or the file is encoded (compressed),
473 # so use a generic bag-of-bits type.
474 ctype = 'application/octet-stream'
475
476 maintype, subtype = ctype.split('/', 1)
477 log.debug("Creating message containing file {} of mime type {}"
478 .format(filename, ctype))
479 part = None
480 if maintype == 'text':
481 with open(fullpath, 'rt') as file_handle:
482 # Note: we should handle calculating the charset
483 part = MIMEText(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
484 elif maintype == 'image':
485 with open(fullpath, 'rb') as file_handle:
486 part = MIMEImage(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
487 elif maintype == 'audio':
488 with open(fullpath, 'rb') as file_handle:
489 part = MIMEAudio(file_handle.read(), _subtype=subtype) # pylint:disable=redefined-variable-type
490 else:
491 part = MIMEBase(maintype, subtype) # pylint:disable=redefined-variable-type
492 with open(fullpath, 'rb') as file_handle:
493 part.set_payload(file_handle.read())
494 # Encode the payload using Base64
495 encode_base64(part)
496 # Set the filename parameter
497 part.add_header('Content-Disposition', 'attachment',
498 filename=filename)
499 msg.attach(part)
500
501 log.debug("Message successfully created")
502 # send via SMTP
503
ec0c7450
CH
504 log.debug("Sending message from %s to %s"
505 % (self.smtp_sender, ', '.join(recipients)))
67177844 506 with smtplib.SMTP('localhost') as server:
ec0c7450 507 server.sendmail(self.smtp_sender, recipients, msg.as_string())
67177844
CH
508
509 # wait for transfer; complicated by isinstance(False, int) == True
510 if wait_for_transfer is False:
511 pass
512 elif wait_for_transfer is True:
513 arnied_wrapper.wait_for_email_transfer()
514 else:
515 arnied_wrapper.wait_for_email_transfer(timeout=wait_for_transfer)
516
517 def _extract_email_paths(self, path, emails, timeout):
518 """Check and return the absolute paths of a list of emails."""
519 log.debug("Extracting messages %s", emails)
520 if len(emails) == 0:
521 emails = os.listdir(path)
522 email_paths = []
523 for expected_email in emails:
524 # TODO: this can be improved by matching the emails themselves
525 if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index",
526 "Entw&APw-rfe", "Gesendete Objekte",
527 "Gel&APY-schte Elemente", "mailboxes.dump",
528 "tmp"]:
529 continue
530 email_path = os.path.join(path, expected_email)
531 for i in range(timeout):
532 if os.path.isfile(email_path):
533 email_paths.append(email_path)
534 break
535 elif i == timeout - 1:
536 raise EmailNotFound("Target message %s could not be found "
537 "on server at %s within %ss"
538 % (expected_email, path, timeout))
539 time.sleep(1)
540 log.debug("%s mails extracted at %s.", len(email_paths), path)
541 return email_paths
542
543 def _find_message_with_id(self, message_id, message_paths):
544 """Find message with id among a list of message paths."""
545 log.debug("Looking for a match for the message with id %s", message_id)
546 for message_path in message_paths:
547 extracted_id = self._extract_message_id(message_path)
548 log.debug("Extracted id %s from candidate %s", extracted_id,
549 message_path)
550 if message_id == extracted_id:
551 log.debug("Found match at %s", message_path)
552 return message_path
553 raise MismatchedEmailID("The message with id %s could not be matched "
554 "or wasn't expected among %s"
555 % (message_id, ", ".join(message_paths)))
556
557 def _extract_message_id(self, message_path):
558 """
559 Given a message file path extract the Message-ID.
560
561 :raises: :py:class:`MissingEmailID` if no Message-ID was found.
562 """
563 message_id = ""
564 with open(message_path, errors='ignore') as file_handle:
565 content = file_handle.read()
566 for line in content.split("\n"):
567 match_id = re.match("Autotest-Message-ID: (.+)", line)
568 if match_id is not None:
569 message_id = match_id.group(1).rstrip('\r\n')
570 if message_id == "":
7628bc48
CH
571 raise MissingEmailID(f"No id was found in target message {message_path}, "
572 f"so it cannot be properly matched")
67177844
CH
573 return message_id
574
575 def _default_compare_emails(self, source_email_path, target_email_path,
576 tolerance=1):
577 """
578 Compare target emails with source ones.
579
580 Uses python provided diff functionality to compare complete mail files.
581 """
582 with open(source_email_path, "r") as source_email_file:
583 source_email = source_email_file.read()
584 with open(target_email_path, "r") as target_email_file:
585 target_email = target_email_file.read()
586 matcher = difflib.SequenceMatcher(None, source_email, target_email)
587 diffratio = matcher.ratio()
588 log.debug("Target message comparison ratio is %s.", diffratio)
589 # log.info("%s $$$ %s", source_email, target_email)
590 if diffratio < tolerance:
591 raise EmailMismatch("Target message is too different from the "
592 "source (difference %s < tolerance %s).",
593 diffratio, tolerance)
594
595 def _compare_emails_by_basic_headers(self, source_email_path,
596 target_email_path, tolerance=1):
597 """
598 Compare target emails with source ones.
599
600 Uses python provided diff functionality to compare headers and mail
601 "body".
602
603 Argument `tolerance` not used!
604 """
605 with open(source_email_path, errors="ignore") as file_handle:
606 source_email = Parser().parse(file_handle)
607 source_body = ""
608 for part in source_email.walk():
609 if part.get_content_type() in ["text/plain", "text/html"]:
610 source_body = part.get_payload()
611 break
612
613 with open(target_email_path, errors="ignore") as file_handle:
614 target_email = Parser().parse(file_handle)
615 target_body = ""
616 for part in target_email.walk():
617 if part.get_content_type() in ["text/plain", "text/html"]:
618 target_body = part.get_payload()
619 break
620
621 if source_email['From'] != target_email['From']:
622 raise EmailMismatch("Target message sender %s is too different "
623 "from the source one %s" %
624 (target_email['From'], source_email['From']))
625 if source_email['To'] != target_email['To']:
626 raise EmailMismatch("Target message recipient %s is too different "
627 "from the source one %s" %
628 (target_email['To'], source_email['To']))
629 if source_email['Subject'] != target_email['Subject']:
630 raise EmailMismatch("Target message subject '%s' is too different "
631 "from the source one '%s'" %
632 (target_email['Subject'],
633 source_email['Subject']))
634 if source_email['Date'] != target_email['Date']:
635 raise EmailMismatch("Target message date %s is too different from "
636 "the source one %s" %
637 (target_email['Date'], source_email['Date']))
638 if source_body != target_body:
639 raise EmailMismatch("Target message body '%s' is too different "
640 "from the source one '%s'" %
641 (target_body, source_body))
642
643 def _compare_emails_by_existence(self, source_email_path,
644 target_email_path, tolerance=1):
645 """
646 Weak email validation based only on presence of file.
647
648 DOES NOT CHECK ANYTHING!
649 """
650 return True