# This Python file uses the following encoding: utf-8 # The software in this package is distributed under the GNU General # Public License version 2 (with a special exception described below). # # A copy of GNU General Public License (GPL) is included in this distribution, # in the file COPYING.GPL. # # As a special exception, if other files instantiate templates or use macros # or inline functions from this file, or you compile this file and link it # with other works to produce a work based on this file, this file # does not by itself cause the resulting work to be covered # by the GNU General Public License. # # However the source code for this file must still be made available # in accordance with section (3) of the GNU General Public License. # # This exception does not invalidate any other reasons why a work based # on this file might be covered by the GNU General Public License. # # Copyright (c) 2016-2018 Intra2net AG """ Utilities for dealing with email. .. seealso:: :py:mod:`pyi2ncommon.mail_validator`, :py:mod:`pyi2ncommon.imap_mailbox` Copyright: Intra2net AG """ from base64 import b64decode from email.utils import parsedate_to_datetime from email.parser import BytesParser from email import policy # outsourced source, import required for compatibility from .imap_mailbox import ImapMailbox # pylint: disable=unused-import from .mail_validator import * # pylint: disable=unused-import from .sysmisc import replace_file_regex log = logging.getLogger('pyi2ncommon.mail_utils') def prep_email_header(email_file, value, regex=None, criterion="envelopeto"): """ Replace value in a provided email file. :param str email_file: file to use for the replacement :param str value: value to replace the first matched group with :param regex: regular expression to use when replacing a header value :type regex: str or None :param str criterion: criterion to use for replacement, one of 'envelopeto' or 'received' :raises: :py:class:`ValueError` if the choice of criterion is invalid ..todo:: In some cases this function is reusing arnied wrapper's cnf value preparation but for email headers. """ if criterion == "envelopeto": logging.debug("Updating test emails' EnvelopeTo header") replace_file_regex(email_file, value, regex=regex) elif criterion == "received": logging.debug("Updating test emails' Received header") with open(email_file, "r") as file_handle: email_text = file_handle.read() email_text = re.sub(regex, value, email_text) email_text = re.sub(regex, value, email_text) with open(email_file, "w") as file_handle: file_handle.write(email_text) else: raise ValueError("Invalid header preparation criterion '%s'" % criterion) def parse_mail_file(file_name, headers_only=True, attachment_filenames=False, raise_on_defect=False, new_message_type=False): """ Parse given email file (e.g. a banned message). This is basically a `email.parser.BytesParser().parse(...)` with given `headers_only` and policy selection, that can also handle BSMTP. As an extra bonus, you can just request headers plus the names of attached files. Removes the SMTP envelope surrounding the email if present. Only left-over might be a line with a '.' at end of non-multipart messages if `headers_only` is False. :param str file_name: path to the file that contains the email text :param bool headers_only: whether to parse only the email headers; set this to False, e.g. if you want to check for attachments using message.walk() :param bool attachment_filenames: if you just want headers and names of attached files, set `headers_only` and this to True. :param bool raise_on_defect: whether to raise an error if email parser encounters a defect (email policy `strict`) or just add the defect to message's `defect` attribute :param bool new_message_type: whether to return the older :py:class:`email.message.Message` (policy `compat32`, our default), or the newer :py:class:`email.message.EmailMessage` type (policy `default`). Big difference! :returns: either msg or 2-tuple `(msg, filenames)` if requested per arg `attachment_filenames` :rtype: :py:class:`email.message.Message` or (:py:class:`email.message.Message`, (str)) or one of these two with :py:class:`email.message.EmailMessage` """ msg = None start_pos = 0 if new_message_type: mail_policy = policy.default else: mail_policy = policy.compat32 if raise_on_defect: mail_policy += policy.strict with open(file_name, 'rb') as read_handle: line = read_handle.readline() if line.startswith(b'EHLO'): # there is a smtp header. skip to its end while line.strip() != b'DATA': line = read_handle.readline() # the rest is the email plus a trailing '.' (ignored by parser if # multipart) else: read_handle.seek(0) # forget we read the first line already start_pos = read_handle.tell() msg = BytesParser(policy=mail_policy).parse(read_handle, headersonly=headers_only) if not attachment_filenames: return msg # otherwise need to parse complete message to get attachment file names if headers_only: with open(file_name, 'rb') as read_handle: read_handle.seek(start_pos) full_msg = BytesParser(policy=mail_policy).parse(read_handle, headersonly=False) else: full_msg = msg filenames = [get_filename(part) for part in full_msg.walk()] return msg, tuple(filename for filename in filenames if filename is not None) def parse_mail_date(message): """ Parse the 'Date' header of the given message. Shortcut for :py:func:`email.utils.parsedate_to_datetime`. This is no longer necessary for newer :py:class:`email.message.EmailMessage` since the `Date` Header is automatically parsed to a :py:class:`email.headerregistry.DateHeader`. :param message: Email message :type message: :py:class:`email.message.Message` :returns: datetime from Email "Date" header or None if header not present :rtype: :py:class:`datetime.datetime` or None """ date_str = message.get('Date', '') if not date_str: return None return parsedate_to_datetime(date_str) def get_user_mail_files(user, mailbox='INBOX'): """ Iterate over mails in given folder of given user; yields file names. Works on local cyrus file system, not on imap server. :param str user: Name of user whose mailbox is analyzed :param str mailbox: name of mailbox to use, INBOX (default) for base folder; name is modified using :py:func:`cyrus_escape` :returns: nothing; but yields full path to messages on disc """ # base folder of user mail folder = os.path.join('/datastore', 'imap-mails', 'user', user) # adapt paths like "INBOX/sub/dir" to "sub/dir" subdirs = mailbox.split('/') if subdirs[0].upper() == 'INBOX': subdirs = subdirs[1:] folder = os.path.join(folder, *(cyrus_escape(subdir) for subdir in subdirs)) for filename in os.listdir(folder): if not re.match(r'\d+\.', filename): continue full_path = os.path.join(folder, filename) yield full_path def get_user_mail(user, mailbox='INBOX', **kwargs): """ Iterate over mails in given folder of given user; yields parsed mails. :param str user: see :py:func:`get_user_mail_files` :param str mailbox: see :py:func:`get_user_mail_files` :param dict kwargs: all other args are forwarded to :py:func:`parse_mail_file` :returns: nothing; but yields 2-tuples (path, email_msg) where first is the full path to the message on disc, and the latter is the outcome of :py:func:`parse_mail_file` for that file """ for full_path in get_user_mail_files(user, mailbox): yield full_path, parse_mail_file(full_path, **kwargs) def get_message_text(filename, fallback_encoding='iso8859-1', include_all_text=False): """ Extract message text as string from email message. Intended as complementary addition to get_user_mail, e.g. :: for filename, msg in get_user_mail(user): # rough filtering based on headers if msg['Subject'] != 'Expected Subject': continue # get message text for closer inspection text = get_message_text(filename) if 'Expected Text' not in text: continue ... Finds the first part in message that is of type `text/plain` and decodes it using encoding specified in mail or otherwise fallback encoding. If none found takes first part of type `text/*`, or otherwise just the first part. If include_all_text is True, all `text/*` parts are included, with `text/plain` being the first. :param str filename: complete path of message file in filesystem :param str fallback_encoding: Encoding of email text if none is specified in mail. :param bool include_all_text: include all `text/*` parts in returned text :returns: text(s) of message :rtype: [str] if include_all_text else str """ result = [] msg = parse_mail_file(filename, headers_only=False) for part in msg.walk(): if part.get_content_type() != 'text/plain': continue encoding = part.get_content_charset(fallback_encoding) result.append(part.get_payload(decode=True).decode(encoding)) if result and not include_all_text: return result[0] # no text/plain found. Try only "text/": for part in msg.walk(): cont_type = part.get_content_type() if cont_type.startswith('text/') and cont_type != 'text/plain': encoding = part.get_content_charset(fallback_encoding) result.append(part.get_payload(decode=True).decode(encoding)) if result: if not include_all_text: return result[0] return result # no "text/" found. Just take first part while msg.is_multipart(): msg = msg.get_payload(0) encoding = msg.get_content_charset(fallback_encoding) if include_all_text: return [msg.get_payload(decode=True).decode(encoding), ] return msg.get_payload(decode=True).decode(encoding) def cyrus_escape(user_or_folder, keep_path=False, regex=False): """ Convert names of users or mailbox folders to cyrus format. quite a hack, just does the following hard-coded replacements: * . --> ^ * / --> . (except if keep_path is True) * "u --> &APw- , "o --> &APY- , "a --> &AOQ- (if need more: this is modified utf-7) * inbox --> (the empty string) Would like to use a general modified utf-7-encoder/decoder but python has none builtin (see https://bugs.python.org/issue5305) and an extra lib like https://bitbucket.org/mjs0/imapclient/ would be overkill. After all, we control the input to this function via params and this is enough umlaut- testing I think... :param str user_or_folder: name of the user or folder string to escape :param bool keep_path: do not replace '/' with '.' so can still use result as path name :param bool regex: result is used in grep or other regex, so ^, . and & are escaped again with a backslash :returns: escaped user or folder string :rtype: str .. seealso:: :py:func:`cyrus_unescape` """ temp = user_or_folder.replace('.', '^') \ .replace('ü', '&APw-').replace('ä', '&AOQ-') \ .replace('ö', '&APY-') \ .replace('inbox', '').replace('INBOX', '').replace('Inbox', '') if not keep_path: temp = temp.replace('/', '.') if regex: return temp.replace('^', r'\^').replace('&', r'\&') \ .replace('.', r'\.').replace('$', r'\$') return temp def cyrus_unescape(user_or_folder): """ Undo effects of :py:func:`cyrus_escape` (but not all of them). :param str user_or_folder: name of the user or folder string to unescape :returns: unescaped user or folder string :rtype: str """ if user_or_folder == '': return 'inbox' return user_or_folder.replace('.', '/')\ .replace(r'\^', '.').replace('^', '.') def get_filename(message, failobj=None, do_unwrap=True): """ Get filename of a message part, even if it is base64-encoded. For attachments with base64-encoded file name, the :py:func:`email.message.Message.get_filename()` does not work. This function tries that first and if it fails tries to interprete the Content-Disposition of the message part. If all fails, returns `failobj`. Only for ascii filenames: also unwraps file names if they are line-wrapped. But note that this may remove too much whitespace from the filename if line-wrapping happened in the same position as the filename's whitespace. To get unwrapped version, set param `do_unwrap` to `False`. See also: https://en.wikipedia.org/wiki/MIME#Encoded-Word :param message: message part, e.g. from :py:meth:`email.message.Message.walk` :type message: :py:class:`email.message.Message` or :py:class:`email.message.EmailMessage` :param failobj: object to return in case of failure (defaults to None) :param bool do_unwrap: undo line-break inserted by mail-creator; may remove whitespace from file name; only applies to ascii file names :returns: either a string or failobj """ # try the old way and unwrap filename = message.get_filename(failobj) if isinstance(filename, bytes) and not filename.startswith(b'=?') \ and not filename.endswith(b'?='): filename = filename.decode('utf8') if isinstance(filename, str): if do_unwrap: return re.sub('[\\r\\n]+', '', filename) return filename if 'Content-Disposition' not in message: return failobj # try parsing content-disposition. e.g.: # attachment; filename="2018年度公开课计划表.xlsx" --> # '=?utf-8?b?YXR0YWNobWVudDsgZmlsZW5hbWU9IjIwMTjlubTluqY=?=\r\n =?utf-8?b?' # '5YWs5byA6K++6K6h5YiS6KGoLnhsc3gi?=' # This may be a re-implementation of email.utils.collapse_rfc2231_value() # as mentioned in email.message.EmailMessage.get_param() # The form is: "=?charset?encoding?encoded text?=" SPLIT_REGEX = '\r?\n *' # should be CRNL but some files miss the \r ENCODED_WORD_REGEX = r'\s*=\?([^?]+)\?([^?]+)\?(.*)\?=\s*$' LINE_REGEX = r'attachment\s*;\s*filename=(")?(.+)\1\s*$' decoded = [] for word in re.split(SPLIT_REGEX, message['Content-Disposition']): match = re.match(ENCODED_WORD_REGEX, word) if not match: break charset, encoding, data = match.groups() if encoding.lower() == 'b': temp = b64decode(data) elif encoding.lower() == 'q': raise NotImplementedError('use quopri.decodestring, handle _') else: raise ValueError('not allowed according to wikipedia: "{}"' .format(encoding)) decoded.append(temp.decode(charset)) decoded = u''.join(decoded) match = re.match(LINE_REGEX, decoded) if match: return match.groups()[1] return failobj