From e108b7d4c5da049c3d77153250d7e2bd6f2603d0 Mon Sep 17 00:00:00 2001 From: Christian Herdtweck Date: Tue, 4 Dec 2018 13:36:44 +0100 Subject: [PATCH] pylint/pydocstyle compliance for mail_utils - Wrapped long lines - Added/Modified doc strings - Replace single-letter variable names --- src/mail_utils.py | 286 +++++++++++++++++++++++++++++++++-------------------- 1 files changed, 178 insertions(+), 108 deletions(-) diff --git a/src/mail_utils.py b/src/mail_utils.py index 2bd69c2..fde7a4b 100644 --- a/src/mail_utils.py +++ b/src/mail_utils.py @@ -24,7 +24,8 @@ SUMMARY ------------------------------------------------------ -Guest utility for fetchmail, spamassassin, horde and other email functionality tests. +Guest utility for fetchmail, spamassassin, horde and other email functionality +tests. Copyright: Intra2net AG @@ -43,7 +44,6 @@ from base64 import b64decode import re import subprocess import logging -log = logging.getLogger('pyi2ncommon.mail_utils') import smtplib from email.mime.audio import MIMEAudio @@ -58,59 +58,66 @@ import mimetypes from . import arnied_wrapper +log = logging.getLogger('pyi2ncommon.mail_utils') + class EmailException(Exception): + """Base class for custom exceptions raised from `MailValidator`.""" + pass -class EmailNotFound(EmailException): +class EmailNotFound(EmailException): # pylint: disable=missing-docstring pass -class InvalidEmailHeader(EmailException): +class InvalidEmailHeader(EmailException): # pylint: disable=missing-docstring pass -class InvalidEmailContent(EmailException): +class InvalidEmailContent(EmailException): # pylint: disable=missing-docstring pass -class EmailIDError(EmailException): +class EmailIDError(EmailException): # pylint: disable=missing-docstring pass -class MismatchedEmailID(EmailIDError): +class MismatchedEmailID(EmailIDError): # pylint: disable=missing-docstring pass -class MissingEmailID(EmailIDError): +class MissingEmailID(EmailIDError): # pylint: disable=missing-docstring pass -class EmailMismatch(EmailException): +class EmailMismatch(EmailException): # pylint: disable=missing-docstring pass class MailValidator(): """Class for validation of emails.""" - def target_path(self, v=None): - if v is not None: - self._target_path = v + def target_path(self, new_value=None): + """Getter/Setter for property `target_path`.""" + if new_value is not None: + self._target_path = new_value else: return self._target_path target_path = property(target_path, target_path) - def source_path(self, v=None): - if v is not None: - self._source_path = v + def source_path(self, new_value=None): + """Getter/Setter for property `source_path`.""" + if new_value is not None: + self._source_path = new_value else: return self._source_path source_path = property(source_path, source_path) - def smtp_sender(self, v=None): - if v is not None: - self._smtp_sender = v + def smtp_sender(self, new_value=None): + """Getter/Setter for property `smtp_sender`.""" + if new_value is not None: + self._smtp_sender = new_value else: return self._smtp_sender smtp_sender = property(smtp_sender, smtp_sender) @@ -145,23 +152,27 @@ class MailValidator(): self._target_path = target_path self._source_path = source_path self._smtp_sender = "no_source@inject.smtp" - self._compare_emails_method= self._default_compare_emails + self._compare_emails_method = self._default_compare_emails def inject_emails(self, username, original_user): """ Inject emails from `source_path` to `target_path`. :param str username: username for the mail injection script - :param str original_user: original username for the mail injection script + :param str original_user: original username for the mail injection + script - In order to restore acl rights as well put a mailbox.dump file in the source path. + In order to restore acl rights as well put a mailbox.dump file in the + source path. """ log.info("Injecting emails for user %s", username) # inject emails from test data - cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + " -s " + self.source_path + cmd = "/usr/intranator/bin/restore_mail_inject.pl -u " + username + \ + " -s " + self.source_path if original_user != "": - cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + " -o " + original_user + cmd += " -m " + os.path.join(self.source_path, "mailboxes.dump") + \ + " -o " + original_user result = subprocess.check_output(cmd, shell=True) log.debug(result) @@ -183,8 +194,9 @@ class MailValidator(): for email in emails: log.info("Sending email %s", email) - with open(os.path.join(self.source_path, email), 'rb') as f: - email_content = f.read() + with open(os.path.join(self.source_path, email), 'rb') \ + as file_handle: + email_content = file_handle.read() server.sendmail(self.smtp_sender, users, email_content) # Wait till SMTP queue is processed @@ -192,8 +204,9 @@ class MailValidator(): def verify_email_id(self, email, emails_list, timeout, in_target=True): """ - Verify that the id of an email is present in a list and return that email's - match in this list. + Verify that the id of an email is present in a list. + + Returns that email's match in this list. :param str email: email filename :param emails_list: email among which the first email has to be found @@ -201,15 +214,20 @@ class MailValidator(): :param int timeout: timeout for extracting the source and target emails :param bool in_target: whether the verified email is on the target side - If `in_target` is set to True we are getting the target id from the target list - of a source email. Otherwise we assume a target email from a source list. + If `in_target` is set to True we are getting the target id from the + target list of a source email. Otherwise we assume a target email from + a source list. """ if in_target: - email = self._extract_email_paths(self.source_path, [email], timeout)[0] - emails_list = self._extract_email_paths(self.target_path, emails_list, timeout) + email = self._extract_email_paths(self.source_path, [email], + timeout)[0] + emails_list = self._extract_email_paths(self.target_path, + emails_list, timeout) else: - email = self._extract_email_paths(self.target_path, [email], timeout)[0] - emails_list = self._extract_email_paths(self.source_path, emails_list, timeout) + email = self._extract_email_paths(self.target_path, [email], + timeout)[0] + emails_list = self._extract_email_paths(self.source_path, + emails_list, timeout) email_id = self._extract_message_id(email) match = self._find_message_with_id(email_id, emails_list) @@ -224,12 +242,16 @@ class MailValidator(): :param target_emails: emails at the target (server) location :type target_emails: [str] :param int timeout: timeout for extracting the source and target emails - :raises: :py:class:`EmailNotFound` if target email is not found on server + :raises: :py:class:`EmailNotFound` if target email is not found on + server """ - source_paths = self._extract_email_paths(self.source_path, source_emails, timeout) - target_paths = self._extract_email_paths(self.target_path, target_emails, timeout) + source_paths = self._extract_email_paths(self.source_path, + source_emails, timeout) + target_paths = self._extract_email_paths(self.target_path, + target_emails, timeout) - log.info("Verifying emails at %s with %s", self.target_path, self.source_path) + log.info("Verifying emails at %s with %s", self.target_path, + self.source_path) for target in target_paths: log.info("Verifying email %s", target) target_id = self._extract_message_id(target) @@ -238,14 +260,16 @@ class MailValidator(): self._compare_emails_method(target, source, 1) if len(source_paths) > 0: - raise EmailNotFound("%s target mails could not be found on server.\n%s" + raise EmailNotFound("%s target mails could not be found on server." + "\n%s" % (len(source_paths), "\n".join(source_paths))) else: log.info("All e-mails at %s verified!", self.target_path) - def assert_header(self, emails, header, present_values=None, absent_values=None, timeout=30): + def assert_header(self, emails, header, present_values=None, + absent_values=None, timeout=30): """ - Check headers for contained and not contained strings in a list of messages. + Check headers for present and missing strings in a list of messages. :param emails: emails whose headers will be checked :type emails: [str] @@ -257,10 +281,12 @@ class MailValidator(): :param int timeout: timeout for extracting the source and target emails :raises: :py:class:`InvalidEmailHeader` if email header is not valid - Every list of present and respectively absent values contains alternative values. - At least one of present and one of absent should be satisfied. + Every list of present and respectively absent values contains + alternative values. At least one of present and one of absent should be + satisfied. """ - target_paths = self._extract_email_paths(self.target_path, emails, timeout) + target_paths = self._extract_email_paths(self.target_path, emails, + timeout) for email_path in target_paths: with open(email_path, "r") as email_file: verified_email = Parser().parse(email_file, headersonly=True) @@ -285,20 +311,26 @@ class MailValidator(): absent_valid = True if not present_valid and len(present_values) > 0: - raise InvalidEmailHeader("Message header '%s' in %s is not valid:\n%s" - % (header, email_path, verified_email[header])) + raise InvalidEmailHeader("Message header '%s' in %s is not " + "valid:\n%s" + % (header, email_path, + verified_email[header])) if not absent_valid and len(absent_values) > 0: - raise InvalidEmailHeader("Message header '%s' in %s is not valid:\n%s" - % (header, email_path, verified_email[header])) + raise InvalidEmailHeader("Message header '%s' in %s is not " + "valid:\n%s" + % (header, email_path, + verified_email[header])) log.info("Message header '%s' in %s is valid!", header, email_path) - def assert_content(self, emails, content_type, present_values=None, absent_values=None, timeout=30): + def assert_content(self, emails, content_type, present_values=None, + absent_values=None, timeout=30): """ - Check headers for contained and not contained strings in a list of messages, + Check headers for present/missing strings in a list of messages. :param emails: emails whose content will be checked :type emails: [str] - :param str content_type: type of the content that will be checked for values + :param str content_type: type of the content that will be checked for + values :param present_values: strings that have to be present in the content :type present_values: [str] or None :param absent_values: strings that have to be absent in the content @@ -306,10 +338,12 @@ class MailValidator(): :param int timeout: timeout for extracting the source and target emails :raises: :py:class:`InvalidEmailContent` if email content is not valid - Every list of present and respectively absent values contains alternative values. - At least one of present and one of absent should be satisfied. + Every list of present and respectively absent values contains + alternative values. At least one of present and one of absent should be + satisfied. """ - target_paths = self._extract_email_paths(self.target_path, emails, timeout) + target_paths = self._extract_email_paths(self.target_path, emails, + timeout) for email_path in target_paths: with open(email_path, "r") as email_file: verified_email = Parser().parse(email_file) @@ -344,20 +378,22 @@ class MailValidator(): absent_valid = True if not present_valid and len(present_values) > 0: - raise InvalidEmailContent("Message content '%s' in %s is not valid:\n%s" + raise InvalidEmailContent("Message content '%s' in %s is not " + "valid:\n%s" % (content_type, email_path, content)) if not absent_valid and len(absent_values) > 0: - raise InvalidEmailContent("Message content '%s' in %s is not valid:\n%s" + raise InvalidEmailContent("Message content '%s' in %s is not " + "valid:\n%s" % (content_type, email_path, content)) - log.info("Message content '%s' in %s is valid!", content_type, email_path) + log.info("Message content '%s' in %s is valid!", + content_type, email_path) def send_email_with_files(self, username, file_list, wait_for_transfer=True, autotest_signature=None, subject="my subject"): """ - Send a generated email with attachments instead of an .eml file - containing attachments. + Send a generated email with attachments. :param str username: username of a localhost receiver of the email :param file_list: files attached to an email @@ -404,8 +440,8 @@ class MailValidator(): # gzip'd or compressed files. ctype, encoding = mimetypes.guess_type(fullpath) if ctype is None or encoding is not None: - # No guess could be made, or the file is encoded (compressed), so - # use a generic bag-of-bits type. + # No guess could be made, or the file is encoded (compressed), + # so use a generic bag-of-bits type. ctype = 'application/octet-stream' maintype, subtype = ctype.split('/', 1) @@ -436,8 +472,7 @@ class MailValidator(): log.debug("Message successfully created") # send via SMTP - log.debug("Sending message from %s to %s" % - (self.smtp_sender, user)) + log.debug("Sending message from %s to %s" % (self.smtp_sender, user)) with smtplib.SMTP('localhost') as server: server.sendmail(self.smtp_sender, user, msg.as_string()) @@ -455,8 +490,9 @@ class MailValidator(): for expected_email in emails: # TODO: this can be improved by matching the emails themselves if expected_email in ["cyrus.cache", "cyrus.header", "cyrus.index", - "Entw&APw-rfe", "Gesendete Objekte", "Gel&APY-schte Elemente", - "mailboxes.dump", "tmp"]: + "Entw&APw-rfe", "Gesendete Objekte", + "Gel&APY-schte Elemente", "mailboxes.dump", + "tmp"]: continue email_path = os.path.join(path, expected_email) for i in range(timeout): @@ -464,7 +500,8 @@ class MailValidator(): email_paths.append(email_path) break elif i == timeout - 1: - raise EmailNotFound("Target message %s could not be found on server at %s within %ss" + raise EmailNotFound("Target message %s could not be found " + "on server at %s within %ss" % (expected_email, path, timeout)) time.sleep(1) log.debug("%s mails extracted at %s.", len(email_paths), path) @@ -475,56 +512,74 @@ class MailValidator(): log.debug("Looking for a match for the message with id %s", message_id) for message_path in message_paths: extracted_id = self._extract_message_id(message_path) - log.debug("Extracted id %s from candidate %s", extracted_id, message_path) + log.debug("Extracted id %s from candidate %s", extracted_id, + message_path) if message_id == extracted_id: log.debug("Found match at %s", message_path) return message_path - raise MismatchedEmailID("The message with id %s could not be matched or wasn't expected among %s" + raise MismatchedEmailID("The message with id %s could not be matched " + "or wasn't expected among %s" % (message_id, ", ".join(message_paths))) def _extract_message_id(self, message_path): """ - Given a message file path extract the Message-ID and raise error if - none was found. + Given a message file path extract the Message-ID. + + :raises: :py:class:`MissingEmailID` if no Message-ID was found. """ message_id = "" - with open(message_path, errors='ignore') as f: - content = f.read() + with open(message_path, errors='ignore') as file_handle: + content = file_handle.read() for line in content.split("\n"): match_id = re.match("Autotest-Message-ID: (.+)", line) if match_id is not None: message_id = match_id.group(1).rstrip('\r\n') if message_id == "": - raise MissingEmailID("No id was found in target message %s so it cannot be properly matched" + raise MissingEmailID("No id was found in target message %s so it " + "cannot be properly matched" % (message_path)) return message_id - def _default_compare_emails(self, source_email_path, target_email_path, tolerance=1): - """Use python provided diff functionality to compare target emails with source ones.""" + def _default_compare_emails(self, source_email_path, target_email_path, + tolerance=1): + """ + Compare target emails with source ones. + + Uses python provided diff functionality to compare complete mail files. + """ with open(source_email_path, "r") as source_email_file: source_email = source_email_file.read() with open(target_email_path, "r") as target_email_file: target_email = target_email_file.read() - s = difflib.SequenceMatcher(None, source_email, target_email) - diffratio = s.ratio() + matcher = difflib.SequenceMatcher(None, source_email, target_email) + diffratio = matcher.ratio() log.debug("Target message comparison ratio is %s.", diffratio) - #log.info("%s $$$ %s", source_email, target_email) + # log.info("%s $$$ %s", source_email, target_email) if diffratio < tolerance: - raise EmailMismatch("Target message is too different from the source (difference %s < tolerance %s).", + raise EmailMismatch("Target message is too different from the " + "source (difference %s < tolerance %s).", diffratio, tolerance) - def _compare_emails_by_basic_headers(self, source_email_path, target_email_path, tolerance=1): - """Use python provided diff functionality to compare target emails with source ones.""" - with open(source_email_path, errors="ignore") as f: - source_email = Parser().parse(f) + def _compare_emails_by_basic_headers(self, source_email_path, + target_email_path, tolerance=1): + """ + Compare target emails with source ones. + + Uses python provided diff functionality to compare headers and mail + "body". + + Argument `tolerance` not used! + """ + with open(source_email_path, errors="ignore") as file_handle: + source_email = Parser().parse(file_handle) source_body = "" for part in source_email.walk(): if part.get_content_type() in ["text/plain", "text/html"]: source_body = part.get_payload() break - with open(target_email_path, errors="ignore") as f: - target_email = Parser().parse(f) + with open(target_email_path, errors="ignore") as file_handle: + target_email = Parser().parse(file_handle) target_body = "" for part in target_email.walk(): if part.get_content_type() in ["text/plain", "text/html"]: @@ -532,23 +587,34 @@ class MailValidator(): break if source_email['From'] != target_email['From']: - raise EmailMismatch("Target message sender %s is too different from the source one %s" % + raise EmailMismatch("Target message sender %s is too different " + "from the source one %s" % (target_email['From'], source_email['From'])) if source_email['To'] != target_email['To']: - raise EmailMismatch("Target message recipient %s is too different from the source one %s" % + raise EmailMismatch("Target message recipient %s is too different " + "from the source one %s" % (target_email['To'], source_email['To'])) if source_email['Subject'] != target_email['Subject']: - raise EmailMismatch("Target message subject '%s' is too different from the source one '%s'" % - (target_email['Subject'], source_email['Subject'])) + raise EmailMismatch("Target message subject '%s' is too different " + "from the source one '%s'" % + (target_email['Subject'], + source_email['Subject'])) if source_email['Date'] != target_email['Date']: - raise EmailMismatch("Target message date %s is too different from the source one %s" % + raise EmailMismatch("Target message date %s is too different from " + "the source one %s" % (target_email['Date'], source_email['Date'])) if source_body != target_body: - raise EmailMismatch("Target message body '%s' is too different from the source one '%s'" % + raise EmailMismatch("Target message body '%s' is too different " + "from the source one '%s'" % (target_body, source_body)) - def _compare_emails_by_existence(self, source_email_path, target_email_path, tolerance=1): - """Weak email validation based only on presence of file""" + def _compare_emails_by_existence(self, source_email_path, + target_email_path, tolerance=1): + """ + Weak email validation based only on presence of file. + + DOES NOT CHECK ANYTHING! + """ return True @@ -572,14 +638,15 @@ def prep_email_header(email_file, value, regex=None, criterion="envelopeto"): arnied_wrapper.prep_cnf_value(email_file, value, regex=regex) elif criterion == "received": logging.debug("Updating test emails' Received header") - with open(email_file, "r") as f: - email_text = f.read() + with open(email_file, "r") as file_handle: + email_text = file_handle.read() email_text = re.sub(regex, value, email_text) email_text = re.sub(regex, value, email_text) - with open(email_file, "w") as f: - f.write(email_text) + with open(email_file, "w") as file_handle: + file_handle.write(email_text) else: - raise ValueError("Invalid header preparation criterion '%s'" % criterion) + raise ValueError("Invalid header preparation criterion '%s'" + % criterion) def create_users(usernames, config_file, params): @@ -588,13 +655,16 @@ def create_users(usernames, config_file, params): :param usernames: usernames of the created users :type usernames: [str] - :param str config_file: template config file to use for each user configuration + :param str config_file: template config file to use for each user + configuration :param params: template config file to use for each user configuration :type params: {str, str} - :raises: :py:class:`RuntimeError` if the user is already or cannot be created + :raises: :py:class:`RuntimeError` if the user exists already or cannot be + created """ log.info("Creating new cyrus users %s", ", ".join(usernames)) - cyrus_user_path = params.get("cyrus_user_path", "/datastore/imap-mails/user/") + cyrus_user_path = params.get("cyrus_user_path", + "/datastore/imap-mails/user/") # check for existence round for username in usernames: @@ -605,12 +675,13 @@ def create_users(usernames, config_file, params): for username in usernames: params["user"] = '%i: "%s"' % (-1, username) params["user_fullname"] = username - params_regex = {"user": '%s,(-?\d+: ".*")'} + params_regex = {"user": r'%s,(-?\d+: ".*")'} arnied_wrapper.set_cnf_semidynamic([config_file], params, params_regex) for username in usernames: - if not os.path.exists(os.path.join(cyrus_user_path, username.replace(".", "^"))): + if not os.path.exists(os.path.join(cyrus_user_path, + username.replace(".", "^"))): raise RuntimeError("The user %s could not be created" % username) else: log.info("Added new user %s", username) @@ -629,8 +700,8 @@ def parse_mail_file(file_name, headers_only=True): :rtype: root message object (of class :py:class:`email.message.Message`) Removes the SMTP envelope surrounding the email if present. Only left-over - might be a line with a '.' at end of non-multipart messages if `headers_only` - is False. + might be a line with a '.' at end of non-multipart messages if + `headers_only` is False. """ with open(file_name, 'r') as read_handle: line = read_handle.readline() @@ -647,7 +718,7 @@ def parse_mail_file(file_name, headers_only=True): def get_user_mail(user, mailbox='INBOX', **kwargs): """ - Iterate over mails in given folder of given user; yields parsed mails + Iterate over mails in given folder of given user; yields parsed mails. :param str mailbox: name of mailbox to use, INBOX (default) for base folder; name is modified using :py:func:`cyrus_escape` @@ -657,7 +728,6 @@ def get_user_mail(user, mailbox='INBOX', **kwargs): full path to the message on disc, and the latter is the outcome of :py:func:`parse_mail_file` for that file """ - folder = os.path.join('/datastore', 'imap-mails', 'user', user) if mailbox != 'INBOX': folder = os.path.join(folder, cyrus_escape(mailbox)) @@ -733,7 +803,7 @@ def get_message_text(filename, fallback_encoding='iso8859-1', def cyrus_escape(user_or_folder, keep_path=False, regex=False): """ - Converts names of users or mailbox folders to cyrus format. + Convert names of users or mailbox folders to cyrus format. quite a hack, just does the following hard-coded replacements: @@ -788,7 +858,7 @@ def cyrus_unescape(user_or_folder): def get_filename(message, failobj=None, do_unwrap=True): """ - Get filename of a message part, even if it is base64-encoded + Get filename of a message part, even if it is base64-encoded. For attachments with base64-encoded file name, the :py:func:`email.message.Message.get_filename()` does not work. This function -- 1.7.1