1 # This Python file uses the following encoding: utf-8
3 # The software in this package is distributed under the GNU General
4 # Public License version 2 (with a special exception described below).
6 # A copy of GNU General Public License (GPL) is included in this distribution,
7 # in the file COPYING.GPL.
9 # As a special exception, if other files instantiate templates or use macros
10 # or inline functions from this file, or you compile this file and link it
11 # with other works to produce a work based on this file, this file
12 # does not by itself cause the resulting work to be covered
13 # by the GNU General Public License.
15 # However the source code for this file must still be made available
16 # in accordance with section (3) of the GNU General Public License.
18 # This exception does not invalidate any other reasons why a work based
19 # on this file might be covered by the GNU General Public License.
21 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
24 Utilities for dealing with email.
26 .. seealso:: :py:mod:`pyi2ncommon.mail_validator`,
27 :py:mod:`pyi2ncommon.imap_mailbox`
29 Copyright: Intra2net AG
32 from base64 import b64decode
33 from email.utils import parsedate_to_datetime
34 from email.parser import BytesParser
35 from email import policy
37 # outsourced source, import required for compatibility
38 from .imap_mailbox import ImapMailbox # pylint: disable=unused-import
39 from .mail_validator import * # pylint: disable=unused-import
40 from .sysmisc import replace_file_regex
42 log = logging.getLogger('pyi2ncommon.mail_utils')
45 def prep_email_header(email_file, value, regex=None, criterion="envelopeto"):
47 Replace value in a provided email file.
49 :param str email_file: file to use for the replacement
50 :param str value: value to replace the first matched group with
51 :param regex: regular expression to use when replacing a header value
52 :type regex: str or None
53 :param str criterion: criterion to use for replacement, one
54 of 'envelopeto' or 'received'
55 :raises: :py:class:`ValueError` if the choice of criterion is invalid
57 ..todo:: In some cases this function is reusing arnied wrapper's cnf
58 value preparation but for email headers.
60 if criterion == "envelopeto":
61 logging.debug("Updating test emails' EnvelopeTo header")
62 replace_file_regex(email_file, value, regex=regex)
63 elif criterion == "received":
64 logging.debug("Updating test emails' Received header")
65 with open(email_file, "r") as file_handle:
66 email_text = file_handle.read()
67 email_text = re.sub(regex, value, email_text)
68 email_text = re.sub(regex, value, email_text)
69 with open(email_file, "w") as file_handle:
70 file_handle.write(email_text)
72 raise ValueError("Invalid header preparation criterion '%s'"
76 def parse_mail_file(file_name, headers_only=True, attachment_filenames=False,
77 raise_on_defect=False, new_message_type=False):
79 Parse given email file (e.g. a banned message).
81 This is basically a `email.parser.BytesParser().parse(...)` with given
82 `headers_only` and policy selection, that can also handle BSMTP. As an
83 extra bonus, you can just request headers plus the names of attached files.
85 Removes the SMTP envelope surrounding the email if present. Only left-over
86 might be a line with a '.' at end of non-multipart messages if
87 `headers_only` is False.
89 :param str file_name: path to the file that contains the email text
90 :param bool headers_only: whether to parse only the email headers; set this
91 to False, e.g. if you want to check for
92 attachments using message.walk()
93 :param bool attachment_filenames: if you just want headers and names of
94 attached files, set `headers_only` and
96 :param bool raise_on_defect: whether to raise an error if email parser
97 encounters a defect (email policy `strict`) or
98 just add the defect to message's `defect`
100 :param bool new_message_type: whether to return the older
101 :py:class:`email.message.Message` (policy
102 `compat32`, our default), or the newer
103 :py:class:`email.message.EmailMessage` type
104 (policy `default`). Big difference!
105 :returns: either msg or 2-tuple `(msg, filenames)` if requested per arg
106 `attachment_filenames`
107 :rtype: :py:class:`email.message.Message` or
108 (:py:class:`email.message.Message`, (str)) or
109 one of these two with :py:class:`email.message.EmailMessage`
115 mail_policy = policy.default
117 mail_policy = policy.compat32
119 mail_policy += policy.strict
121 with open(file_name, 'rb') as read_handle:
122 line = read_handle.readline()
123 if line.startswith(b'EHLO'):
124 # there is a smtp header. skip to its end
125 while line.strip() != b'DATA':
126 line = read_handle.readline()
127 # the rest is the email plus a trailing '.' (ignored by parser if
130 read_handle.seek(0) # forget we read the first line already
131 start_pos = read_handle.tell()
132 msg = BytesParser(policy=mail_policy).parse(read_handle,
133 headersonly=headers_only)
135 if not attachment_filenames:
138 # otherwise need to parse complete message to get attachment file names
140 with open(file_name, 'rb') as read_handle:
141 read_handle.seek(start_pos)
142 full_msg = BytesParser(policy=mail_policy).parse(read_handle,
146 filenames = [get_filename(part) for part in full_msg.walk()]
147 return msg, tuple(filename for filename in filenames
148 if filename is not None)
151 def parse_mail_date(message):
153 Parse the 'Date' header of the given message.
155 Shortcut for :py:func:`email.utils.parsedate_to_datetime`.
157 This is no longer necessary for newer
158 :py:class:`email.message.EmailMessage` since the `Date` Header is
159 automatically parsed to a :py:class:`email.headerregistry.DateHeader`.
161 :param message: Email message
162 :type message: :py:class:`email.message.Message`
163 :returns: datetime from Email "Date" header or None if header not present
164 :rtype: :py:class:`datetime.datetime` or None
166 date_str = message.get('Date', '')
169 return parsedate_to_datetime(date_str)
172 def get_user_mail_files(user, mailbox='INBOX'):
174 Iterate over mails in given folder of given user; yields file names.
176 Works on local cyrus file system, not on imap server.
178 :param str user: Name of user whose mailbox is analyzed
179 :param str mailbox: name of mailbox to use, INBOX (default) for base
180 folder; name is modified using :py:func:`cyrus_escape`
181 :returns: nothing; but yields full path to messages on disc
183 # base folder of user mail
184 folder = os.path.join('/datastore', 'imap-mails', 'user', user)
186 # adapt paths like "INBOX/sub/dir" to "sub/dir"
187 subdirs = mailbox.split('/')
188 if subdirs[0].upper() == 'INBOX':
189 subdirs = subdirs[1:]
190 folder = os.path.join(folder,
191 *(cyrus_escape(subdir) for subdir in subdirs))
193 for filename in os.listdir(folder):
194 if not re.match(r'\d+\.', filename):
196 full_path = os.path.join(folder, filename)
200 def get_user_mail(user, mailbox='INBOX', **kwargs):
202 Iterate over mails in given folder of given user; yields parsed mails.
204 :param str user: see :py:func:`get_user_mail_files`
205 :param str mailbox: see :py:func:`get_user_mail_files`
206 :param dict kwargs: all other args are forwarded to
207 :py:func:`parse_mail_file`
208 :returns: nothing; but yields 2-tuples (path, email_msg) where first is the
209 full path to the message on disc, and the latter is the outcome
210 of :py:func:`parse_mail_file` for that file
212 for full_path in get_user_mail_files(user, mailbox):
213 yield full_path, parse_mail_file(full_path, **kwargs)
216 def get_message_text(filename, fallback_encoding='iso8859-1',
217 include_all_text=False):
219 Extract message text as string from email message.
221 Intended as complementary addition to get_user_mail, e.g. ::
223 for filename, msg in get_user_mail(user):
224 # rough filtering based on headers
225 if msg['Subject'] != 'Expected Subject':
227 # get message text for closer inspection
228 text = get_message_text(filename)
229 if 'Expected Text' not in text:
233 Finds the first part in message that is of type `text/plain` and decodes it
234 using encoding specified in mail or otherwise fallback encoding. If none
235 found takes first part of type `text/*`, or otherwise just the first part.
237 If include_all_text is True, all `text/*` parts are included, with `text/plain`
240 :param str filename: complete path of message file in filesystem
241 :param str fallback_encoding: Encoding of email text if none is specified
243 :param bool include_all_text: include all `text/*` parts in returned text
244 :returns: text(s) of message
245 :rtype: [str] if include_all_text else str
248 msg = parse_mail_file(filename, headers_only=False)
249 for part in msg.walk():
250 if part.get_content_type() != 'text/plain':
252 encoding = part.get_content_charset(fallback_encoding)
253 result.append(part.get_payload(decode=True).decode(encoding))
255 if result and not include_all_text:
258 # no text/plain found. Try only "text/":
259 for part in msg.walk():
260 cont_type = part.get_content_type()
261 if cont_type.startswith('text/') and cont_type != 'text/plain':
262 encoding = part.get_content_charset(fallback_encoding)
263 result.append(part.get_payload(decode=True).decode(encoding))
266 if not include_all_text:
270 # no "text/" found. Just take first part
271 while msg.is_multipart():
272 msg = msg.get_payload(0)
274 encoding = msg.get_content_charset(fallback_encoding)
276 return [msg.get_payload(decode=True).decode(encoding), ]
277 return msg.get_payload(decode=True).decode(encoding)
280 def cyrus_escape(user_or_folder, keep_path=False, regex=False):
282 Convert names of users or mailbox folders to cyrus format.
284 quite a hack, just does the following hard-coded replacements:
287 * / --> . (except if keep_path is True)
288 * "u --> &APw- , "o --> &APY- , "a --> &AOQ-
289 (if need more: this is modified utf-7)
290 * inbox --> (the empty string)
292 Would like to use a general modified utf-7-encoder/decoder but python has
293 none builtin (see https://bugs.python.org/issue5305) and an extra lib like
294 https://bitbucket.org/mjs0/imapclient/ would be overkill. After all, we
295 control the input to this function via params and this is enough umlaut-
298 :param str user_or_folder: name of the user or folder string to escape
299 :param bool keep_path: do not replace '/' with '.' so can still use result
301 :param bool regex: result is used in grep or other regex, so ^, . and & are
302 escaped again with a backslash
303 :returns: escaped user or folder string
306 .. seealso:: :py:func:`cyrus_unescape`
308 temp = user_or_folder.replace('.', '^') \
309 .replace('ü', '&APw-').replace('ä', '&AOQ-') \
310 .replace('ö', '&APY-') \
311 .replace('inbox', '').replace('INBOX', '').replace('Inbox', '')
313 temp = temp.replace('/', '.')
315 return temp.replace('^', r'\^').replace('&', r'\&') \
316 .replace('.', r'\.').replace('$', r'\$')
320 def cyrus_unescape(user_or_folder):
322 Undo effects of :py:func:`cyrus_escape` (but not all of them).
324 :param str user_or_folder: name of the user or folder string to unescape
325 :returns: unescaped user or folder string
328 if user_or_folder == '':
330 return user_or_folder.replace('.', '/')\
331 .replace(r'\^', '.').replace('^', '.')
334 def get_filename(message, failobj=None, do_unwrap=True):
336 Get filename of a message part, even if it is base64-encoded.
338 For attachments with base64-encoded file name, the
339 :py:func:`email.message.Message.get_filename()` does not work. This
340 function tries that first and if it fails tries to interprete the
341 Content-Disposition of the message part. If all fails, returns `failobj`.
343 Only for ascii filenames: also unwraps file names if they are line-wrapped.
344 But note that this may remove too much whitespace from the filename if
345 line-wrapping happened in the same position as the filename's whitespace.
346 To get unwrapped version, set param `do_unwrap` to `False`.
348 See also: https://en.wikipedia.org/wiki/MIME#Encoded-Word
350 :param message: message part, e.g. from
351 :py:meth:`email.message.Message.walk`
352 :type message: :py:class:`email.message.Message` or
353 :py:class:`email.message.EmailMessage`
354 :param failobj: object to return in case of failure (defaults to None)
355 :param bool do_unwrap: undo line-break inserted by mail-creator; may remove
356 whitespace from file name; only applies to ascii
358 :returns: either a string or failobj
360 # try the old way and unwrap
361 filename = message.get_filename(failobj)
363 if isinstance(filename, bytes) and not filename.startswith(b'=?') \
364 and not filename.endswith(b'?='):
365 filename = filename.decode('utf8')
367 if isinstance(filename, str):
369 return re.sub('[\\r\\n]+', '', filename)
372 if 'Content-Disposition' not in message:
375 # try parsing content-disposition. e.g.:
376 # attachment; filename="2018年度公开课计划表.xlsx" -->
377 # '=?utf-8?b?YXR0YWNobWVudDsgZmlsZW5hbWU9IjIwMTjlubTluqY=?=\r\n =?utf-8?b?'
378 # '5YWs5byA6K++6K6h5YiS6KGoLnhsc3gi?='
380 # This may be a re-implementation of email.utils.collapse_rfc2231_value()
381 # as mentioned in email.message.EmailMessage.get_param()
383 # The form is: "=?charset?encoding?encoded text?="
384 SPLIT_REGEX = '\r?\n *' # should be CRNL but some files miss the \r
385 ENCODED_WORD_REGEX = r'\s*=\?([^?]+)\?([^?]+)\?(.*)\?=\s*$'
386 LINE_REGEX = r'attachment\s*;\s*filename=(")?(.+)\1\s*$'
388 for word in re.split(SPLIT_REGEX, message['Content-Disposition']):
389 match = re.match(ENCODED_WORD_REGEX, word)
392 charset, encoding, data = match.groups()
393 if encoding.lower() == 'b':
394 temp = b64decode(data)
395 elif encoding.lower() == 'q':
396 raise NotImplementedError('use quopri.decodestring, handle _')
398 raise ValueError('not allowed according to wikipedia: "{}"'
400 decoded.append(temp.decode(charset))
401 decoded = u''.join(decoded)
403 match = re.match(LINE_REGEX, decoded)
405 return match.groups()[1]