pylint/pydocstyle compliance for mail_utils
authorChristian Herdtweck <christian.herdtweck@intra2net.com>
Tue, 4 Dec 2018 12:36:44 +0000 (13:36 +0100)
committerChristian Herdtweck <christian.herdtweck@intra2net.com>
Thu, 7 Feb 2019 15:50:39 +0000 (16:50 +0100)
- Wrapped long lines
- Added/Modified doc strings
- Replace single-letter variable names

src/mail_utils.py

index 2bd69c2..fde7a4b 100644 (file)
@@ -24,7 +24,8 @@
 
 SUMMARY
 ------------------------------------------------------
-Guest utility for fetchmail, spamassassin, horde and other email functionality tests.
+Guest utility for fetchmail, spamassassin, horde and other email functionality
+tests.
 
 Copyright: Intra2net AG
 
@@ -43,7 +44,6 @@ from base64 import b64decode
 import re
 import subprocess
 import logging
-log = logging.getLogger('pyi2ncommon.mail_utils')
 
 import smtplib
 from email.mime.audio import MIMEAudio
@@ -58,59 +58,66 @@ import mimetypes
 
 from . import arnied_wrapper
 
+log = logging.getLogger('pyi2ncommon.mail_utils')
+
 
 class EmailException(Exception):
+    """Base class for custom exceptions raised from `MailValidator`."""
+
     pass
 
 
-class EmailNotFound(EmailException):
+class EmailNotFound(EmailException):        # pylint: disable=missing-docstring
     pass
 
 
-class InvalidEmailHeader(EmailException):
+class InvalidEmailHeader(EmailException):   # pylint: disable=missing-docstring
     pass
 
 
-class InvalidEmailContent(EmailException):
+class InvalidEmailContent(EmailException):  # pylint: disable=missing-docstring
     pass
 
 
-class EmailIDError(EmailException):
+class EmailIDError(EmailException):         # pylint: disable=missing-docstring
     pass
 
 
-class MismatchedEmailID(EmailIDError):
+class MismatchedEmailID(EmailIDError):      # pylint: disable=missing-docstring
     pass
 
 
-class MissingEmailID(EmailIDError):
+class MissingEmailID(EmailIDError):         # pylint: disable=missing-docstring
     pass
 
 
-class EmailMismatch(EmailException):
+class EmailMismatch(EmailException):        # pylint: disable=missing-docstring
     pass
 
 
 class MailValidator():
     """Class for validation of emails."""
 
-    def target_path(self, v=None):
-        if v is not None:
-            self._target_path = v
+    def target_path(self, new_value=None):
+        """Getter/Setter for property `target_path`."""
+        if new_value is not None:
+            self._target_path = new_value
         else:
             return self._target_path
     target_path = property(target_path, target_path)
 
-    def source_path(self, v=None):
-        if v is not None:
-            self._source_path = v
+    def source_path(self, new_value=None):
+        """Getter/Setter for property `source_path`."""
+        if new_value is not None:
+            self._source_path = new_value
         else:
             return self._source_path
     source_path = property(source_path, source_path)
 
-    def smtp_sender(self, v=None):
-        if v is not None:
-            self._smtp_sender = v
+    def smtp_sender(self, new_value=None):
+        """Getter/Setter for property `smtp_sender`."""
+        if new_value is not None:
+            self._smtp_sender = new_value
         else:
             return self._smtp_sender
     smtp_sender = property(smtp_sender, smtp_sender)
@@ -145,23 +152,27 @@ class MailValidator():
         self._target_path = target_path
         self._source_path = source_path
         self._smtp_sender = "no_source@inject.smtp"
-        self._compare_emails_method= self._default_compare_emails
+        self._compare_emails_method = self._default_compare_emails
 
     def inject_emails(self, username, original_user):
         """
         Inject emails from `source_path` to `target_path`.
 
         :param str username: username for the mail injection script
-        :param str original_user: original username for the mail injection script
+        :param str original_user: original username for the mail injection
+                                  script
 
-        In order to restore acl rights as well put a mailbox.dump file in the source path.
+        In order to restore acl rights as well put a mailbox.dump file in the
+        source path.
         """
         log.info("Injecting emails for user %s", username)
 
         # inject emails from test data
-        cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + " -s " + self.source_path
+        cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \
+              " -s " + self.source_path
         if original_user != "":
-            cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + " -o " + original_user
+            cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \
+                   " -o " + original_user
 
         result = subprocess.check_output(cmd, shell=True)
         log.debug(result)
@@ -183,8 +194,9 @@ class MailValidator():
 
             for email in emails:
                 log.info("Sending email %s", email)
-                with open(os.path.join(self.source_path, email), 'rb') as f:
-                    email_content = f.read()
+                with open(os.path.join(self.source_path, email), 'rb') \
+                        as file_handle:
+                    email_content = file_handle.read()
                 server.sendmail(self.smtp_sender, users, email_content)
 
         # Wait till SMTP queue is processed
@@ -192,8 +204,9 @@ class MailValidator():
 
     def verify_email_id(self, email, emails_list, timeout, in_target=True):
         """
-        Verify that the id of an email is present in a list and return that email's
-        match in this list.
+        Verify that the id of an email is present in a list.
+
+        Returns that email's match in this list.
 
         :param str email: email filename
         :param emails_list: email among which the first email has to be found
@@ -201,15 +214,20 @@ class MailValidator():
         :param int timeout: timeout for extracting the source and target emails
         :param bool in_target: whether the verified email is on the target side
 
-        If `in_target` is set to True we are getting the target id from the target list
-        of a source email. Otherwise we assume a target email from a source list.
+        If `in_target` is set to True we are getting the target id from the
+        target list of a source email. Otherwise we assume a target email from
+        a source list.
         """
         if in_target:
-            email = self._extract_email_paths(self.source_path, [email], timeout)[0]
-            emails_list = self._extract_email_paths(self.target_path, emails_list, timeout)
+            email = self._extract_email_paths(self.source_path, [email],
+                                              timeout)[0]
+            emails_list = self._extract_email_paths(self.target_path,
+                                                    emails_list, timeout)
         else:
-            email = self._extract_email_paths(self.target_path, [email], timeout)[0]
-            emails_list = self._extract_email_paths(self.source_path, emails_list, timeout)
+            email = self._extract_email_paths(self.target_path, [email],
+                                              timeout)[0]
+            emails_list = self._extract_email_paths(self.source_path,
+                                                    emails_list, timeout)
 
         email_id = self._extract_message_id(email)
         match = self._find_message_with_id(email_id, emails_list)
@@ -224,12 +242,16 @@ class MailValidator():
         :param target_emails: emails at the target (server) location
         :type target_emails: [str]
         :param int timeout: timeout for extracting the source and target emails
-        :raises: :py:class:`EmailNotFound` if target email is not found on server
+        :raises: :py:class:`EmailNotFound` if target email is not found on
+                 server
         """
-        source_paths = self._extract_email_paths(self.source_path, source_emails, timeout)
-        target_paths = self._extract_email_paths(self.target_path, target_emails, timeout)
+        source_paths = self._extract_email_paths(self.source_path,
+                                                 source_emails, timeout)
+        target_paths = self._extract_email_paths(self.target_path,
+                                                 target_emails, timeout)
 
-        log.info("Verifying emails at %s with %s", self.target_path, self.source_path)
+        log.info("Verifying emails at %s with %s", self.target_path,
+                 self.source_path)
         for target in target_paths:
             log.info("Verifying email %s", target)
             target_id = self._extract_message_id(target)
@@ -238,14 +260,16 @@ class MailValidator():
             self._compare_emails_method(target, source, 1)
 
         if len(source_paths) > 0:
-            raise EmailNotFound("%s target mails could not be found on server.\n%s"
+            raise EmailNotFound("%s target mails could not be found on server."
+                                "\n%s"
                                 % (len(source_paths), "\n".join(source_paths)))
         else:
             log.info("All e-mails at %s verified!", self.target_path)
 
-    def assert_header(self, emails, header, present_values=None, absent_values=None, timeout=30):
+    def assert_header(self, emails, header, present_values=None,
+                      absent_values=None, timeout=30):
         """
-        Check headers for contained and not contained strings in a list of messages.
+        Check headers for present and missing strings in a list of messages.
 
         :param emails: emails whose headers will be checked
         :type emails: [str]
@@ -257,10 +281,12 @@ class MailValidator():
         :param int timeout: timeout for extracting the source and target emails
         :raises: :py:class:`InvalidEmailHeader` if email header is not valid
 
-        Every list of present and respectively absent values contains alternative values.
-        At least one of present and one of absent should be satisfied.
+        Every list of present and respectively absent values contains
+        alternative values. At least one of present and one of absent should be
+        satisfied.
         """
-        target_paths = self._extract_email_paths(self.target_path, emails, timeout)
+        target_paths = self._extract_email_paths(self.target_path, emails,
+                                                 timeout)
         for email_path in target_paths:
             with open(email_path, "r") as email_file:
                 verified_email = Parser().parse(email_file, headersonly=True)
@@ -285,20 +311,26 @@ class MailValidator():
                     absent_valid = True
 
             if not present_valid and len(present_values) > 0:
-                raise InvalidEmailHeader("Message header '%s' in %s is not valid:\n%s"
-                                         % (header, email_path, verified_email[header]))
+                raise InvalidEmailHeader("Message header '%s' in %s is not "
+                                         "valid:\n%s"
+                                         % (header, email_path,
+                                            verified_email[header]))
             if not absent_valid and len(absent_values) > 0:
-                raise InvalidEmailHeader("Message header '%s' in %s is not valid:\n%s"
-                                         % (header, email_path, verified_email[header]))
+                raise InvalidEmailHeader("Message header '%s' in %s is not "
+                                         "valid:\n%s"
+                                         % (header, email_path,
+                                            verified_email[header]))
             log.info("Message header '%s' in %s is valid!", header, email_path)
 
-    def assert_content(self, emails, content_type, present_values=None, absent_values=None, timeout=30):
+    def assert_content(self, emails, content_type, present_values=None,
+                       absent_values=None, timeout=30):
         """
-        Check headers for contained and not contained strings in a list of messages,
+        Check headers for present/missing strings in a list of messages.
 
         :param emails: emails whose content will be checked
         :type emails: [str]
-        :param str content_type: type of the content that will be checked for values
+        :param str content_type: type of the content that will be checked for
+                                 values
         :param present_values: strings that have to be present in the content
         :type present_values: [str] or None
         :param absent_values: strings that have to be absent in the content
@@ -306,10 +338,12 @@ class MailValidator():
         :param int timeout: timeout for extracting the source and target emails
         :raises: :py:class:`InvalidEmailContent` if email content is not valid
 
-        Every list of present and respectively absent values contains alternative values.
-        At least one of present and one of absent should be satisfied.
+        Every list of present and respectively absent values contains
+        alternative values. At least one of present and one of absent should be
+        satisfied.
         """
-        target_paths = self._extract_email_paths(self.target_path, emails, timeout)
+        target_paths = self._extract_email_paths(self.target_path, emails,
+                                                 timeout)
         for email_path in target_paths:
             with open(email_path, "r") as email_file:
                 verified_email = Parser().parse(email_file)
@@ -344,20 +378,22 @@ class MailValidator():
                     absent_valid = True
 
             if not present_valid and len(present_values) > 0:
-                raise InvalidEmailContent("Message content '%s' in %s is not valid:\n%s"
+                raise InvalidEmailContent("Message content '%s' in %s is not "
+                                          "valid:\n%s"
                                           % (content_type, email_path, content))
             if not absent_valid and len(absent_values) > 0:
-                raise InvalidEmailContent("Message content '%s' in %s is not valid:\n%s"
+                raise InvalidEmailContent("Message content '%s' in %s is not "
+                                          "valid:\n%s"
                                           % (content_type, email_path, content))
-            log.info("Message content '%s' in %s is valid!", content_type, email_path)
+            log.info("Message content '%s' in %s is valid!",
+                     content_type, email_path)
 
     def send_email_with_files(self, username, file_list,
                               wait_for_transfer=True,
                               autotest_signature=None,
                               subject="my subject"):
         """
-        Send a generated email with attachments instead of an .eml file
-        containing attachments.
+        Send a generated email with attachments.
 
         :param str username: username of a localhost receiver of the email
         :param file_list: files attached to an email
@@ -404,8 +440,8 @@ class MailValidator():
             # gzip'd or compressed files.
             ctype, encoding = mimetypes.guess_type(fullpath)
             if ctype is None or encoding is not None:
-                # No guess could be made, or the file is encoded (compressed), so
-                # use a generic bag-of-bits type.
+                # No guess could be made, or the file is encoded (compressed),
+                # so use a generic bag-of-bits type.
                 ctype = 'application/octet-stream'
 
             maintype, subtype = ctype.split('/', 1)
@@ -436,8 +472,7 @@ class MailValidator():
         log.debug("Message successfully created")
         # send via SMTP
 
-        log.debug("Sending message from %s to %s" %
-                      (self.smtp_sender, user))
+        log.debug("Sending message from %s to %s" % (self.smtp_sender, user))
         with smtplib.SMTP('localhost') as server:
             server.sendmail(self.smtp_sender, user, msg.as_string())
 
@@ -455,8 +490,9 @@ class MailValidator():
         for expected_email in emails:
             # TODO: this can be improved by matching the emails themselves
             if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index",
-                                  "Entw&APw-rfe", "Gesendete Objekte", "Gel&APY-schte Elemente",
-                                  "mailboxes.dump", "tmp"]:
+                                  "Entw&APw-rfe", "Gesendete Objekte",
+                                  "Gel&APY-schte Elemente", "mailboxes.dump",
+                                  "tmp"]:
                 continue
             email_path = os.path.join(path, expected_email)
             for i in range(timeout):
@@ -464,7 +500,8 @@ class MailValidator():
                     email_paths.append(email_path)
                     break
                 elif i == timeout - 1:
-                    raise EmailNotFound("Target message %s could not be found on server at %s within %ss"
+                    raise EmailNotFound("Target message %s could not be found "
+                                        "on server at %s within %ss"
                                         % (expected_email, path, timeout))
                 time.sleep(1)
         log.debug("%s mails extracted at %s.", len(email_paths), path)
@@ -475,56 +512,74 @@ class MailValidator():
         log.debug("Looking for a match for the message with id %s", message_id)
         for message_path in message_paths:
             extracted_id = self._extract_message_id(message_path)
-            log.debug("Extracted id %s from candidate %s", extracted_id, message_path)
+            log.debug("Extracted id %s from candidate %s", extracted_id,
+                      message_path)
             if message_id == extracted_id:
                 log.debug("Found match at %s", message_path)
                 return message_path
-        raise MismatchedEmailID("The message with id %s could not be matched or wasn't expected among %s"
+        raise MismatchedEmailID("The message with id %s could not be matched "
+                                "or wasn't expected among %s"
                                 % (message_id, ", ".join(message_paths)))
 
     def _extract_message_id(self, message_path):
         """
-        Given a message file path extract the Message-ID and raise error if
-        none was found.
+        Given a message file path extract the Message-ID.
+
+        :raises: :py:class:`MissingEmailID` if no Message-ID was found.
         """
         message_id = ""
-        with open(message_path, errors='ignore') as f:
-            content = f.read()
+        with open(message_path, errors='ignore') as file_handle:
+            content = file_handle.read()
         for line in content.split("\n"):
             match_id = re.match("Autotest-Message-ID: (.+)", line)
             if match_id is not None:
                 message_id = match_id.group(1).rstrip('\r\n')
         if message_id == "":
-            raise MissingEmailID("No id was found in target message %s so it cannot be properly matched"
+            raise MissingEmailID("No id was found in target message %s so it "
+                                 "cannot be properly matched"
                                  % (message_path))
         return message_id
 
-    def _default_compare_emails(self, source_email_path, target_email_path, tolerance=1):
-        """Use python provided diff functionality to compare target emails with source ones."""
+    def _default_compare_emails(self, source_email_path, target_email_path,
+                                tolerance=1):
+        """
+        Compare target emails with source ones.
+
+        Uses python provided diff functionality to compare complete mail files.
+        """
         with open(source_email_path, "r") as source_email_file:
             source_email = source_email_file.read()
         with open(target_email_path, "r") as target_email_file:
             target_email = target_email_file.read()
-        s = difflib.SequenceMatcher(None, source_email, target_email)
-        diffratio = s.ratio()
+        matcher = difflib.SequenceMatcher(None, source_email, target_email)
+        diffratio = matcher.ratio()
         log.debug("Target message comparison ratio is %s.", diffratio)
-        #log.info("%s $$$ %s", source_email, target_email)
+        # log.info("%s $$$ %s", source_email, target_email)
         if diffratio < tolerance:
-            raise EmailMismatch("Target message is too different from the source (difference %s < tolerance %s).",
+            raise EmailMismatch("Target message is too different from the "
+                                "source (difference %s < tolerance %s).",
                                 diffratio, tolerance)
 
-    def _compare_emails_by_basic_headers(self, source_email_path, target_email_path, tolerance=1):
-        """Use python provided diff functionality to compare target emails with source ones."""
-        with open(source_email_path, errors="ignore") as f:
-            source_email = Parser().parse(f)
+    def _compare_emails_by_basic_headers(self, source_email_path,
+                                         target_email_path, tolerance=1):
+        """
+        Compare target emails with source ones.
+
+        Uses python provided diff functionality to compare headers and mail
+        "body".
+
+        Argument `tolerance` not used!
+        """
+        with open(source_email_path, errors="ignore") as file_handle:
+            source_email = Parser().parse(file_handle)
             source_body = ""
             for part in source_email.walk():
                 if part.get_content_type() in ["text/plain", "text/html"]:
                     source_body = part.get_payload()
                     break
 
-        with open(target_email_path, errors="ignore") as f:
-            target_email = Parser().parse(f)
+        with open(target_email_path, errors="ignore") as file_handle:
+            target_email = Parser().parse(file_handle)
             target_body = ""
             for part in target_email.walk():
                 if part.get_content_type() in ["text/plain", "text/html"]:
@@ -532,23 +587,34 @@ class MailValidator():
                     break
 
         if source_email['From'] != target_email['From']:
-            raise EmailMismatch("Target message sender %s is too different from the source one %s" %
+            raise EmailMismatch("Target message sender %s is too different "
+                                "from the source one %s" %
                                 (target_email['From'], source_email['From']))
         if source_email['To'] != target_email['To']:
-            raise EmailMismatch("Target message recipient %s is too different from the source one %s" %
+            raise EmailMismatch("Target message recipient %s is too different "
+                                "from the source one %s" %
                                 (target_email['To'], source_email['To']))
         if source_email['Subject'] != target_email['Subject']:
-            raise EmailMismatch("Target message subject '%s' is too different from the source one '%s'" %
-                                (target_email['Subject'], source_email['Subject']))
+            raise EmailMismatch("Target message subject '%s' is too different "
+                                "from the source one '%s'" %
+                                (target_email['Subject'],
+                                 source_email['Subject']))
         if source_email['Date'] != target_email['Date']:
-            raise EmailMismatch("Target message date %s is too different from the source one %s" %
+            raise EmailMismatch("Target message date %s is too different from "
+                                "the source one %s" %
                                 (target_email['Date'], source_email['Date']))
         if source_body != target_body:
-            raise EmailMismatch("Target message body '%s' is too different from the source one '%s'" %
+            raise EmailMismatch("Target message body '%s' is too different "
+                                "from the source one '%s'" %
                                 (target_body, source_body))
 
-    def _compare_emails_by_existence(self, source_email_path, target_email_path, tolerance=1):
-        """Weak email validation based only on presence of file"""
+    def _compare_emails_by_existence(self, source_email_path,
+                                     target_email_path, tolerance=1):
+        """
+        Weak email validation based only on presence of file.
+
+        DOES NOT CHECK ANYTHING!
+        """
         return True
 
 
@@ -572,14 +638,15 @@ def prep_email_header(email_file, value, regex=None, criterion="envelopeto"):
         arnied_wrapper.prep_cnf_value(email_file, value, regex=regex)
     elif criterion == "received":
         logging.debug("Updating test emails' Received header")
-        with open(email_file, "r") as f:
-            email_text = f.read()
+        with open(email_file, "r") as file_handle:
+            email_text = file_handle.read()
             email_text = re.sub(regex, value, email_text)
             email_text = re.sub(regex, value, email_text)
-        with open(email_file, "w") as f:
-            f.write(email_text)
+        with open(email_file, "w") as file_handle:
+            file_handle.write(email_text)
     else:
-        raise ValueError("Invalid header preparation criterion '%s'" % criterion)
+        raise ValueError("Invalid header preparation criterion '%s'"
+                         % criterion)
 
 
 def create_users(usernames, config_file, params):
@@ -588,13 +655,16 @@ def create_users(usernames, config_file, params):
 
     :param usernames: usernames of the created users
     :type usernames: [str]
-    :param str config_file: template config file to use for each user configuration
+    :param str config_file: template config file to use for each user
+                            configuration
     :param params: template config file to use for each user configuration
     :type params: {str, str}
-    :raises: :py:class:`RuntimeError` if the user is already or cannot be created
+    :raises: :py:class:`RuntimeError` if the user exists already or cannot be
+              created
     """
     log.info("Creating new cyrus users %s", ", ".join(usernames))
-    cyrus_user_path = params.get("cyrus_user_path", "/datastore/imap-mails/user/")
+    cyrus_user_path = params.get("cyrus_user_path",
+                                 "/datastore/imap-mails/user/")
 
     # check for existence round
     for username in usernames:
@@ -605,12 +675,13 @@ def create_users(usernames, config_file, params):
     for username in usernames:
         params["user"] = '%i: "%s"' % (-1, username)
         params["user_fullname"] = username
-        params_regex = {"user": '%s,(-?\d+: ".*")'}
+        params_regex = {"user": r'%s,(-?\d+: ".*")'}
         arnied_wrapper.set_cnf_semidynamic([config_file],
                                            params, params_regex)
 
     for username in usernames:
-        if not os.path.exists(os.path.join(cyrus_user_path, username.replace(".", "^"))):
+        if not os.path.exists(os.path.join(cyrus_user_path,
+                                           username.replace(".", "^"))):
             raise RuntimeError("The user %s could not be created" % username)
         else:
             log.info("Added new user %s", username)
@@ -629,8 +700,8 @@ def parse_mail_file(file_name, headers_only=True):
     :rtype: root message object (of class :py:class:`email.message.Message`)
 
     Removes the SMTP envelope surrounding the email if present. Only left-over
-    might be a line with a '.' at end of non-multipart messages if `headers_only`
-    is False.
+    might be a line with a '.' at end of non-multipart messages if
+     `headers_only` is False.
     """
     with open(file_name, 'r') as read_handle:
         line = read_handle.readline()
@@ -647,7 +718,7 @@ def parse_mail_file(file_name, headers_only=True):
 
 def get_user_mail(user, mailbox='INBOX', **kwargs):
     """
-    Iterate over mails in given folder of given user; yields parsed mails
+    Iterate over mails in given folder of given user; yields parsed mails.
 
     :param str mailbox: name of mailbox to use, INBOX (default) for base folder;
                         name is modified using :py:func:`cyrus_escape`
@@ -657,7 +728,6 @@ def get_user_mail(user, mailbox='INBOX', **kwargs):
               full path to the message on disc, and the latter is the outcome
               of :py:func:`parse_mail_file` for that file
     """
-
     folder = os.path.join('/datastore', 'imap-mails', 'user', user)
     if mailbox != 'INBOX':
         folder = os.path.join(folder, cyrus_escape(mailbox))
@@ -733,7 +803,7 @@ def get_message_text(filename, fallback_encoding='iso8859-1',
 
 def cyrus_escape(user_or_folder, keep_path=False, regex=False):
     """
-    Converts names of users or mailbox folders to cyrus format.
+    Convert names of users or mailbox folders to cyrus format.
 
     quite a hack, just does the following hard-coded replacements:
 
@@ -788,7 +858,7 @@ def cyrus_unescape(user_or_folder):
 
 def get_filename(message, failobj=None, do_unwrap=True):
     """
-    Get filename of a message part, even if it is base64-encoded
+    Get filename of a message part, even if it is base64-encoded.
 
     For attachments with base64-encoded file name, the
     :py:func:`email.message.Message.get_filename()` does not work. This function