Remove api doc headers
[pyi2ncommon] / src / mail_utils.py
1 # This Python file uses the following encoding: utf-8
2
3 # The software in this package is distributed under the GNU General
4 # Public License version 2 (with a special exception described below).
5 #
6 # A copy of GNU General Public License (GPL) is included in this distribution,
7 # in the file COPYING.GPL.
8 #
9 # As a special exception, if other files instantiate templates or use macros
10 # or inline functions from this file, or you compile this file and link it
11 # with other works to produce a work based on this file, this file
12 # does not by itself cause the resulting work to be covered
13 # by the GNU General Public License.
14 #
15 # However the source code for this file must still be made available
16 # in accordance with section (3) of the GNU General Public License.
17 #
18 # This exception does not invalidate any other reasons why a work based
19 # on this file might be covered by the GNU General Public License.
20 #
21 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
22
23 """
24 Utilities for dealing with email.
25
26 .. seealso:: :py:mod:`pyi2ncommon.mail_validator`,
27              :py:mod:`pyi2ncommon.imap_mailbox`
28
29 Copyright: Intra2net AG
30 """
31
32 from base64 import b64decode
33 from email.utils import parsedate_to_datetime
34 from email.parser import BytesParser
35 from email import policy
36
37 # outsourced source, import required for compatiblity
38 from .imap_mailbox import ImapMailbox           # pylint: disable=unused-import
39 from .mail_validator import *                   # pylint: disable=unused-import
40 from .sysmisc import replace_file_regex
41
42 log = logging.getLogger('pyi2ncommon.mail_utils')
43
44
45 def prep_email_header(email_file, value, regex=None, criterion="envelopeto"):
46     """
47     Replace value in a provided email file.
48
49     :param str email_file: file to use for the replacement
50     :param str value: value to replace the first matched group with
51     :param regex: regular expression to use when replacing a header value
52     :type regex: str or None
53     :param str criterion: criterion to use for replacement, one
54                           of 'envelopeto' or 'received'
55     :raises: :py:class:`ValueError` if the choice of criterion is invalid
56
57     ..todo:: In some cases this function is reusing arnied wrapper's cnf
58              value preparation but for email headers.
59     """
60     if criterion == "envelopeto":
61         logging.debug("Updating test emails' EnvelopeTo header")
62         replace_file_regex(email_file, value, regex=regex)
63     elif criterion == "received":
64         logging.debug("Updating test emails' Received header")
65         with open(email_file, "r") as file_handle:
66             email_text = file_handle.read()
67             email_text = re.sub(regex, value, email_text)
68             email_text = re.sub(regex, value, email_text)
69         with open(email_file, "w") as file_handle:
70             file_handle.write(email_text)
71     else:
72         raise ValueError("Invalid header preparation criterion '%s'"
73                          % criterion)
74
75
76 def parse_mail_file(file_name, headers_only=True, attachment_filenames=False,
77                     raise_on_defect=False, new_message_type=False):
78     """
79     Parse given email file (e.g. a banned message).
80
81     This is basically a `email.parser.BytesParser().parse(...)` with given
82     `headers_only` and policy selection, that can also handle BSMTP. As an
83     extra bonus, you can just request headers plus the names of attached files.
84
85     Removes the SMTP envelope surrounding the email if present. Only left-over
86     might be a line with a '.' at end of non-multipart messages if
87     `headers_only` is False.
88
89     :param str file_name: path to the file that contains the email text
90     :param bool headers_only: whether to parse only the email headers; set this
91                               to False, e.g. if you want to check for
92                               attachments using message.walk()
93     :param bool attachment_filenames: if you just want headers and names of
94                                       attached files, set `headers_only` and
95                                       this to True.
96     :param bool raise_on_defect: whether to raise an error if email parser
97                                  encounters a defect (email policy `strict`) or
98                                  just add the defect to message's `defect`
99                                  attribute
100     :param bool new_message_type: whether to return the older
101                                   :py:class:`email.message.Message` (policy
102                                   `compat32`, our default), or the newer
103                                   :py:class:`email.message.EmailMessage` type
104                                   (policy `default`). Big difference!
105     :returns: either msg or 2-tuple `(msg, filenames)` if requested per arg
106               `attachment_filenames`
107     :rtype: :py:class:`email.message.Message` or
108              (:py:class:`email.message.Message`, (str)) or
109              one of these two with :py:class:`email.message.EmailMessage`
110     """
111     msg = None
112     start_pos = 0
113
114     if new_message_type:
115         mail_policy = policy.default
116     else:
117         mail_policy = policy.compat32
118     if raise_on_defect:
119         mail_policy += policy.strict
120
121     with open(file_name, 'rb') as read_handle:
122         line = read_handle.readline()
123         if line.startswith(b'EHLO'):
124             # there is a smtp header. skip to its end
125             while line.strip() != b'DATA':
126                 line = read_handle.readline()
127             # the rest is the email plus a trailing '.' (ignored by parser if
128             # multipart)
129         else:
130             read_handle.seek(0)  # forget we read the first line already
131         start_pos = read_handle.tell()
132         msg = BytesParser(policy=mail_policy).parse(read_handle,
133                                                     headersonly=headers_only)
134
135     if not attachment_filenames:
136         return msg
137
138     # otherwise need to parse complete message to get attachment file names
139     if headers_only:
140         with open(file_name, 'rb') as read_handle:
141             read_handle.seek(start_pos)
142             full_msg = BytesParser(policy=mail_policy).parse(read_handle,
143                                                              headersonly=False)
144     else:
145         full_msg = msg
146     filenames = [get_filename(part) for part in full_msg.walk()]
147     return msg, tuple(filename for filename in filenames
148                       if filename is not None)
149
150
151 def parse_mail_date(message):
152     """
153     Parse the 'Date' header of the given message.
154
155     Shortcut for :py:func:`email.utils.parsedate_to_datetime`.
156
157     This is no longer necessary for newer
158     :py:class:`email.message.EmailMessage` since the `Date` Header is
159     automatically parsed to a :py:class:`email.headerregistry.DateHeader`.
160
161     :param message: Email message
162     :type message: :py:class:`email.message.Message`
163     :returns: datetime from Email "Date" header or None if header not present
164     :rtype: :py:class:`datetime.datetime` or None
165     """
166     date_str = message.get('Date', '')
167     if not date_str:
168         return None
169     return parsedate_to_datetime(date_str)
170
171
172 def get_user_mail_files(user, mailbox='INBOX'):
173     """
174     Iterate over mails in given folder of given user; yields file names.
175
176     Works on local cyrus file system, not on imap server.
177
178     :param str user: Name of user whose mailbox is analyzed
179     :param str mailbox: name of mailbox to use, INBOX (default) for base
180                         folder; name is modified using :py:func:`cyrus_escape`
181     :returns: nothing; but yields full path to messages on disc
182     """
183     # base folder of user mail
184     folder = os.path.join('/datastore', 'imap-mails', 'user', user)
185
186     # adapt paths like "INBOX/sub/dir" to "sub/dir"
187     subdirs = mailbox.split('/')
188     if subdirs[0].upper() == 'INBOX':
189         subdirs = subdirs[1:]
190     folder = os.path.join(folder,
191                           *(cyrus_escape(subdir) for subdir in subdirs))
192
193     for filename in os.listdir(folder):
194         if not re.match(r'\d+\.', filename):
195             continue
196         full_path = os.path.join(folder, filename)
197         yield full_path
198
199
200 def get_user_mail(user, mailbox='INBOX', **kwargs):
201     """
202     Iterate over mails in given folder of given user; yields parsed mails.
203
204     :param str user: see :py:func:`get_user_mail_files`
205     :param str mailbox: see :py:func:`get_user_mail_files`
206     :param dict kwargs: all other args are forwarded to
207                         :py:func:`parse_mail_file`
208     :returns: nothing; but yields 2-tuples (path, email_msg) where first is the
209               full path to the message on disc, and the latter is the outcome
210               of :py:func:`parse_mail_file` for that file
211     """
212     for full_path in get_user_mail_files(user, mailbox):
213         yield full_path, parse_mail_file(full_path, **kwargs)
214
215
216 def get_message_text(filename, fallback_encoding='iso8859-1',
217                      include_all_text=False):
218     """
219     Extract message text as string from email message.
220
221     Intended as complementary addition to get_user_mail, e.g. ::
222
223         for filename, msg in get_user_mail(user):
224             # rough filtering based on headers
225             if msg['Subject'] != 'Expected Subject':
226                 continue
227             # get message text for closer inspection
228             text = get_message_text(filename)
229             if 'Expected Text' not in text:
230                 continue
231             ...
232
233     Finds the first part in message that is of type `text/plain` and decodes it
234     using encoding specified in mail or otherwise fallback encoding. If none
235     found takes first part of type `text/*`, or otherwise just the first part.
236
237     If include_all_text is True, all `text/*` parts are included, with `text/plain`
238     being the first.
239
240     :param str filename: complete path of message file in filesystem
241     :param str fallback_encoding: Encoding of email text if none is specified
242                                   in mail.
243     :param bool include_all_text: include all `text/*` parts in returned text
244     :returns: text(s) of message
245     :rtype: [str] if include_all_text else str
246     """
247     result = []
248     msg = parse_mail_file(filename, headers_only=False)
249     for part in msg.walk():
250         if part.get_content_type() != 'text/plain':
251             continue
252         encoding = part.get_content_charset(fallback_encoding)
253         result.append(part.get_payload(decode=True).decode(encoding))
254
255     if result and not include_all_text:
256         return result[0]
257
258     # no text/plain found. Try only "text/":
259     for part in msg.walk():
260         cont_type = part.get_content_type()
261         if cont_type.startswith('text/') and cont_type != 'text/plain':
262             encoding = part.get_content_charset(fallback_encoding)
263             result.append(part.get_payload(decode=True).decode(encoding))
264
265     if result:
266         if not include_all_text:
267             return result[0]
268         return result
269
270     # no "text/" found. Just take first part
271     while msg.is_multipart():
272         msg = msg.get_payload(0)
273
274     encoding = msg.get_content_charset(fallback_encoding)
275     if include_all_text:
276         return [msg.get_payload(decode=True).decode(encoding), ]
277     return msg.get_payload(decode=True).decode(encoding)
278
279
280 def cyrus_escape(user_or_folder, keep_path=False, regex=False):
281     """
282     Convert names of users or mailbox folders to cyrus format.
283
284     quite a hack, just does the following hard-coded replacements:
285
286     * . --> ^
287     * / --> .  (except if keep_path is True)
288     * "u --> &APw-  ,  "o --> &APY-  ,  "a --> &AOQ-
289       (if need more: this is modified utf-7)
290     * inbox -->   (the empty string)
291
292     Would like to use a general modified utf-7-encoder/decoder but python has
293     none builtin (see https://bugs.python.org/issue5305) and an extra lib like
294     https://bitbucket.org/mjs0/imapclient/ would be overkill. After all, we
295     control the input to this function via params and this is enough umlaut-
296     testing I think...
297
298     :param str user_or_folder: name of the user or folder string to escape
299     :param bool keep_path: do not replace '/' with '.' so can still use result
300                            as path name
301     :param bool regex: result is used in grep or other regex, so ^, . and & are
302                        escaped again with a backslash
303     :returns: escaped user or folder string
304     :rtype: str
305
306     .. seealso:: :py:func:`cyrus_unescape`
307     """
308     temp = user_or_folder.replace('.', '^') \
309         .replace('ü', '&APw-').replace('ä', '&AOQ-') \
310         .replace('ö', '&APY-') \
311         .replace('inbox', '').replace('INBOX', '').replace('Inbox', '')
312     if not keep_path:
313         temp = temp.replace('/', '.')
314     if regex:
315         return temp.replace('^', r'\^').replace('&', r'\&') \
316                    .replace('.', r'\.').replace('$', r'\$')
317     return temp
318
319
320 def cyrus_unescape(user_or_folder):
321     """
322     Undo effects of :py:func:`cyrus_escape` (but not all of them).
323
324     :param str user_or_folder: name of the user or folder string to unescape
325     :returns: unescaped user or folder string
326     :rtype: str
327     """
328     if user_or_folder == '':
329         return 'inbox'
330     return user_or_folder.replace('.', '/')\
331         .replace(r'\^', '.').replace('^', '.')
332
333
334 def get_filename(message, failobj=None, do_unwrap=True):
335     """
336     Get filename of a message part, even if it is base64-encoded.
337
338     For attachments with base64-encoded file name, the
339     :py:func:`email.message.Message.get_filename()` does not work. This
340     function tries that first and if it fails tries to interprete the
341     Content-Disposition of the message part. If all fails, returns `failobj`.
342
343     Only for ascii filenames: also unwraps file names if they are line-wrapped.
344     But note that this may remove too much whitespace from the filename if
345     line-wrapping happened in the same position as the filename's whitespace.
346     To get unwrapped version, set param `do_unwrap` to `False`.
347
348     See also: https://en.wikipedia.org/wiki/MIME#Encoded-Word
349
350     :param message: message part, e.g. from
351                     :py:meth:`email.message.Message.walk`
352     :type message: :py:class:`email.message.Message` or
353                    :py:class:`email.message.EmailMessage`
354     :param failobj: object to return in case of failure (defaults to None)
355     :param bool do_unwrap: undo line-break inserted by mail-creator; may remove
356                            whitespace from file name; only applies to ascii
357                            file names
358     :returns: either a string or failobj
359     """
360     # try the old way and unwrap
361     filename = message.get_filename(failobj)
362
363     if isinstance(filename, bytes) and not filename.startswith(b'=?') \
364             and not filename.endswith(b'?='):
365         filename = filename.decode('utf8')
366
367     if isinstance(filename, str):
368         if do_unwrap:
369             return re.sub('[\\r\\n]+', '', filename)
370         return filename
371
372     if 'Content-Disposition' not in message:
373         return failobj
374
375     # try parsing content-disposition. e.g.:
376     # attachment; filename="2018年度公开课计划表.xlsx"   -->
377     # '=?utf-8?b?YXR0YWNobWVudDsgZmlsZW5hbWU9IjIwMTjlubTluqY=?=\r\n =?utf-8?b?'
378     # '5YWs5byA6K++6K6h5YiS6KGoLnhsc3gi?='
379
380     # This may be a re-implementation of email.utils.collapse_rfc2231_value()
381     # as mentioned in email.message.EmailMessage.get_param()
382
383     # The form is: "=?charset?encoding?encoded text?="
384     SPLIT_REGEX = '\r?\n *'    # should be CRNL but some files miss the \r
385     ENCODED_WORD_REGEX = r'\s*=\?([^?]+)\?([^?]+)\?(.*)\?=\s*$'
386     LINE_REGEX = r'attachment\s*;\s*filename=(")?(.+)\1\s*$'
387     decoded = []
388     for word in re.split(SPLIT_REGEX, message['Content-Disposition']):
389         match = re.match(ENCODED_WORD_REGEX, word)
390         if not match:
391             break
392         charset, encoding, data = match.groups()
393         if encoding.lower() == 'b':
394             temp = b64decode(data)
395         elif encoding.lower() == 'q':
396             raise NotImplementedError('use quopri.decodestring, handle _')
397         else:
398             raise ValueError('not allowed according to wikipedia: "{}"'
399                              .format(encoding))
400         decoded.append(temp.decode(charset))
401     decoded = u''.join(decoded)
402
403     match = re.match(LINE_REGEX, decoded)
404     if match:
405         return match.groups()[1]
406     return failobj