ed128df67b9a432d0c9300e561b59f90d1cf4287
[pyi2ncommon] / src / mail_utils.py
1 # This Python file uses the following encoding: utf-8
2
3 # The software in this package is distributed under the GNU General
4 # Public License version 2 (with a special exception described below).
5 #
6 # A copy of GNU General Public License (GPL) is included in this distribution,
7 # in the file COPYING.GPL.
8 #
9 # As a special exception, if other files instantiate templates or use macros
10 # or inline functions from this file, or you compile this file and link it
11 # with other works to produce a work based on this file, this file
12 # does not by itself cause the resulting work to be covered
13 # by the GNU General Public License.
14 #
15 # However the source code for this file must still be made available
16 # in accordance with section (3) of the GNU General Public License.
17 #
18 # This exception does not invalidate any other reasons why a work based
19 # on this file might be covered by the GNU General Public License.
20 #
21 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
22
23 """
24
25 SUMMARY
26 ------------------------------------------------------
27 Utilities for dealing with email
28
29 .. seealso:: :py:mod:`pyi2ncommon.mail_validator`,
30              :py:mod:`pyi2ncommon.imap_mailbox`
31
32 Copyright: Intra2net AG
33
34
35 INTERFACE
36 ------------------------------------------------------
37
38 """
39
40 import os
41 from base64 import b64decode
42 import re
43 import logging
44 from email.utils import parsedate_to_datetime
45 from email.parser import BytesParser
46 from email import policy
47
48 from . import arnied_wrapper
49
50 # outsourced source, import required for compatiblity
51 from .imap_mailbox import ImapMailbox           # pylint: disable=unused-import
52 from .mail_validator import *                   # pylint: disable=unused-import
53
54 log = logging.getLogger('pyi2ncommon.mail_utils')
55
56
57 def prep_email_header(email_file, value, regex=None, criterion="envelopeto"):
58     """
59     Replace value in a provided email file.
60
61     :param str email_file: file to use for the replacement
62     :param str value: value to replace the first matched group with
63     :param regex: regular expression to use when replacing a header value
64     :type regex: str or None
65     :param str criterion: criterion to use for replacement, one
66                           of 'envelopeto' or 'received'
67     :raises: :py:class:`ValueError` if the choice of criterion is invalid
68
69     In some cases this function is reusing arnied wrapper's cnf value
70     preparation but for email headers.
71     """
72     if criterion == "envelopeto":
73         logging.debug("Updating test emails' EnvelopeTo header")
74         arnied_wrapper.prep_cnf_value(email_file, value, regex=regex)
75     elif criterion == "received":
76         logging.debug("Updating test emails' Received header")
77         with open(email_file, "r") as file_handle:
78             email_text = file_handle.read()
79             email_text = re.sub(regex, value, email_text)
80             email_text = re.sub(regex, value, email_text)
81         with open(email_file, "w") as file_handle:
82             file_handle.write(email_text)
83     else:
84         raise ValueError("Invalid header preparation criterion '%s'"
85                          % criterion)
86
87
88 def create_users(usernames, config_file, params):
89     """
90     Create cyrus users from an absolute path to a user configuration file.
91
92     :param usernames: usernames of the created users
93     :type usernames: [str]
94     :param str config_file: template config file to use for each user
95                             configuration
96     :param params: template config file to use for each user configuration
97     :type params: {str, str}
98     :raises: :py:class:`RuntimeError` if the user exists already or cannot be
99               created
100     """
101     log.info("Creating new cyrus users %s", ", ".join(usernames))
102     cyrus_user_path = params.get("cyrus_user_path",
103                                  "/datastore/imap-mails/user/")
104
105     # check for existence round
106     for username in usernames:
107         if os.path.exists(os.path.join(cyrus_user_path,
108                                        username.replace(".", "^"))):
109             raise RuntimeError("The user %s was already created" % username)
110
111     for username in usernames:
112         params["user"] = '%i: "%s"' % (-1, username)
113         params["user_fullname"] = username
114         params_regex = {"user": r'%s,(-?\d+: ".*")'}
115         arnied_wrapper.set_cnf_semidynamic([config_file],
116                                            params, params_regex)
117
118     for username in usernames:
119         if not os.path.exists(os.path.join(cyrus_user_path,
120                                            username.replace(".", "^"))):
121             raise RuntimeError("The user %s could not be created" % username)
122         else:
123             log.info("Added new user %s", username)
124     log.info("%s users successfully created!", len(usernames))
125
126
127 def parse_mail_file(file_name, headers_only=True, attachment_filenames=False,
128                     raise_on_defect=False, new_message_type=False):
129     """
130     Parse given email file (e.g. a banned message).
131
132     This is basically a `email.parser.BytesParser().parse(...)` with given
133     `headers_only` and policy selection, that can also handle BSMTP. As an
134     extra bonus, you can just request headers plus the names of attached files.
135
136     Removes the SMTP envelope surrounding the email if present. Only left-over
137     might be a line with a '.' at end of non-multipart messages if
138      `headers_only` is False.
139
140     :param str file_name: path to the file that contains the email text
141     :param bool headers_only: whether to parse only the email headers; set this
142                               to False, e.g. if you want to check for
143                               attachments using message.walk()
144     :param bool attachment_filenames: if you just want headers and names of
145                                       attached files, set `headers_only` and
146                                       this to True.
147     :param bool raise_on_defect: whether to raise an error if email parser
148                                  encounters a defect (email policy `strict`) or
149                                  just add the defect to message's `defect`
150                                  attribute
151     :param bool new_message_type: whether to return the older
152                                   :py:class:`email.message.Message` (policy
153                                   `compat32`, our default), or the newer
154                                   :py:class:`email.message.EmailMessage` type
155                                   (policy `default`). Big difference!
156     :returns: either msg or 2-tuple `(msg, filenames)` if requested per arg
157               `attachment_filenames`
158     :rtype: :py:class:`email.message.Message` or
159              (:py:class:`email.message.Message`, (str)) or
160              one of these two with :py:class:`email.message.EmailMessage`
161     """
162     msg = None
163     start_pos = 0
164
165     if new_message_type:
166         mail_policy = policy.default
167     else:
168         mail_policy = policy.compat32
169     if raise_on_defect:
170         mail_policy += policy.strict
171
172     with open(file_name, 'rb') as read_handle:
173         line = read_handle.readline()
174         if line.startswith(b'EHLO'):
175             # there is a smtp header. skip to its end
176             while line.strip() != b'DATA':
177                 line = read_handle.readline()
178             # the rest is the email plus a trailing '.' (ignored by parser if
179             # multipart)
180         else:
181             read_handle.seek(0)  # forget we read the first line already
182         start_pos = read_handle.tell()
183         msg = BytesParser(policy=mail_policy).parse(read_handle,
184                                                     headersonly=headers_only)
185
186     if not attachment_filenames:
187         return msg
188
189     # otherwise need to parse complete message to get attachment file names
190     if headers_only:
191         with open(file_name, 'rb') as read_handle:
192             read_handle.seek(start_pos)
193             full_msg = BytesParser(policy=mail_policy).parse(read_handle,
194                                                              headersonly=False)
195     else:
196         full_msg = msg
197     filenames = [get_filename(part) for part in full_msg.walk()]
198     return msg, tuple(filename for filename in filenames
199                       if filename is not None)
200
201
202 def parse_mail_date(message):
203     """
204     Parse the 'Date' header of the given message.
205
206     Shortcut for :py:func:`email.utils.parsedate_to_datetime`.
207
208     This is no longer necessary for newer
209     :py:class:`email.message.EmailMessage` since the `Date` Header is
210     automatically parsed to a :py:class:`email.headerregistry.DateHeader`.
211
212     :param message: Email message
213     :type message: :py:class:`email.message.Message`
214     :returns: datetime from Email "Date" header or None if header not present
215     :rtype: :py:class:`datetime.datetime` or None
216     """
217     date_str = message.get('Date', '')
218     if not date_str:
219         return None
220     return parsedate_to_datetime(date_str)
221
222
223 def get_user_mail_files(user, mailbox='INBOX'):
224     """
225     Iterate over mails in given folder of given user; yields file names.
226
227     Works on local cyrus file system, not on imap server.
228
229     :param str user: Name of user whose mailbox is analyzed
230     :param str mailbox: name of mailbox to use, INBOX (default) for base
231                         folder; name is modified using :py:func:`cyrus_escape`
232     :returns: nothing; but yields full path to messages on disc
233     """
234     # base folder of user mail
235     folder = os.path.join('/datastore', 'imap-mails', 'user', user)
236
237     # adapt paths like "INBOX/sub/dir" to "sub/dir"
238     subdirs = mailbox.split('/')
239     if subdirs[0].upper() == 'INBOX':
240         subdirs = subdirs[1:]
241     folder = os.path.join(folder,
242                           *(cyrus_escape(subdir) for subdir in subdirs))
243
244     for filename in os.listdir(folder):
245         if not re.match(r'\d+\.', filename):
246             continue
247         full_path = os.path.join(folder, filename)
248         yield full_path
249
250
251 def get_user_mail(user, mailbox='INBOX', **kwargs):
252     """
253     Iterate over mails in given folder of given user; yields parsed mails.
254
255     :param str user: see :py:func:`get_user_mail_files`
256     :param str mailbox: see :py:func:`get_user_mail_files`
257     :param dict kwargs: all other args are forwarded to
258                         :py:func:`parse_mail_file`
259     :returns: nothing; but yields 2-tuples (path, email_msg) where first is the
260               full path to the message on disc, and the latter is the outcome
261               of :py:func:`parse_mail_file` for that file
262     """
263     for full_path in get_user_mail_files(user, mailbox):
264         yield full_path, parse_mail_file(full_path, **kwargs)
265
266
267 def get_message_text(filename, fallback_encoding='iso8859-1',
268                      include_all_text=False):
269     """
270     Extract message text as string from email message.
271
272     Intended as complementary addition to get_user_mail, e.g. ::
273
274         for filename, msg in get_user_mail(user):
275             # rough filtering based on headers
276             if msg['Subject'] != 'Expected Subject':
277                 continue
278             # get message text for closer inspection
279             text = get_message_text(filename)
280             if 'Expected Text' not in text:
281                 continue
282             ...
283
284     Finds the first part in message that is of type text/plain and decodes it
285     using encoding specified in mail or otherwise fallback encoding. If none
286     found takes first part of type "text/*", or otherwise just the first part.
287
288     If include_all_text is True, all text/* parts are included, with text/plain
289     being the first.
290
291     :param str filename: complete path of message file in filesystem
292     :param str fallback_encoding: Encoding of email text if none is specified
293                                   in mail.
294     :param bool include_all_text: include all "text/*" parts in returned text
295     :returns: text(s) of message
296     :rtype: [str] if include_all_text else str
297     """
298     result = []
299     msg = parse_mail_file(filename, headers_only=False)
300     for part in msg.walk():
301         if part.get_content_type() != 'text/plain':
302             continue
303         encoding = part.get_content_charset(fallback_encoding)
304         result.append(part.get_payload(decode=True).decode(encoding))
305
306     if result and not include_all_text:
307         return result[0]
308
309     # no text/plain found. Try only "text/":
310     for part in msg.walk():
311         cont_type = part.get_content_type()
312         if cont_type.startswith('text/') and cont_type != 'text/plain':
313             encoding = part.get_content_charset(fallback_encoding)
314             result.append(part.get_payload(decode=True).decode(encoding))
315
316     if result:
317         if not include_all_text:
318             return result[0]
319         return result
320
321     # no "text/" found. Just take first part
322     while msg.is_multipart():
323         msg = msg.get_payload(0)
324
325     encoding = msg.get_content_charset(fallback_encoding)
326     if include_all_text:
327         return [msg.get_payload(decode=True).decode(encoding), ]
328     return msg.get_payload(decode=True).decode(encoding)
329
330
331 def cyrus_escape(user_or_folder, keep_path=False, regex=False):
332     """
333     Convert names of users or mailbox folders to cyrus format.
334
335     quite a hack, just does the following hard-coded replacements:
336
337     * . --> ^
338     * / --> .  (except if keep_path is True)
339     * "u --> &APw-  ,  "o --> &APY-  ,  "a --> &AOQ-
340       (if need more: this is modified utf-7)
341     * inbox -->   (the empty string)
342
343     Would like to use a general modified utf-7-encoder/decoder but python has
344     non built-in (see https://bugs.python.org/issue5305) and an extra lib like
345     https://bitbucket.org/mjs0/imapclient/ would be overkill. After all, we
346     control the input to this function via params and this is enough umlaut-
347     testing I think...
348
349     :param str user_or_folder: name of the user or folder string to escape
350     :param bool keep_path: do not replace '/' with '.' so can still use result
351                            as path name
352     :param bool regex: result is used in grep or other regex, so ^, . and & are
353                        escaped again with a backslash
354     :returns: escaped user or folder string
355     :rtype: str
356
357     .. seealso:: :py:func:`cyrus_unescape`
358     """
359     temp = user_or_folder.replace('.', '^') \
360         .replace('ü', '&APw-').replace('ä', '&AOQ-') \
361         .replace('ö', '&APY-') \
362         .replace('inbox', '').replace('INBOX', '').replace('Inbox', '')
363     if not keep_path:
364         temp = temp.replace('/', '.')
365     if regex:
366         return temp.replace('^', r'\^').replace('&', r'\&') \
367                    .replace('.', r'\.').replace('$', r'\$')
368     return temp
369
370
371 def cyrus_unescape(user_or_folder):
372     """
373     Undo effects of :py:func:`cyrus_escape` (but not all of them).
374
375     :param str user_or_folder: name of the user or folder string to unescape
376     :returns: unescaped user or folder string
377     :rtype: str
378     """
379     if user_or_folder == '':
380         return 'inbox'
381     return user_or_folder.replace('.', '/')\
382         .replace(r'\^', '.').replace('^', '.')
383
384
385 def get_filename(message, failobj=None, do_unwrap=True):
386     """
387     Get filename of a message part, even if it is base64-encoded.
388
389     For attachments with base64-encoded file name, the
390     :py:func:`email.message.Message.get_filename()` does not work. This
391     function tries that first and if it fails tries to interprete the
392     Content-Disposition of the message part. If all fails, returns `failobj`.
393
394     Only for ascii filenames: also unwraps file names if they are line-wrapped.
395     But note that this may remove too much whitespace from the filename if
396     line-wrapping happend in the same position as the filename's whitespace.
397     To get unwrapped version, set param `do_unwrap` to `False`.
398
399     See also: https://en.wikipedia.org/wiki/MIME#Encoded-Word
400
401     :param message: message part, e.g. from
402                     :py:meth:`email.message.Message.walk`
403     :type message: :py:class:`email.message.Message` or
404                    :py:class:`email.message.EmailMessage`
405     :param failobj: object to return in case of failure (defaults to None)
406     :param bool do_unwrap: undo line-break inserted by mail-creator; may remove
407                            whitespace from file name; only applies to ascii
408                            file names
409     :returns: either a string or failobj
410     """
411     # try the old way and unwrap
412     filename = message.get_filename(failobj)
413
414     if isinstance(filename, bytes) and not filename.startswith(b'=?') \
415             and not filename.endswith(b'?='):
416         filename = filename.decode('utf8')
417
418     if isinstance(filename, str):
419         if do_unwrap:
420             return re.sub('[\\r\\n]+', '', filename)
421         return filename
422
423     if 'Content-Disposition' not in message:
424         return failobj
425
426     # try parsing content-disposition. e.g.:
427     # attachment; filename="2018年度公开课计划表.xlsx"   -->
428     # '=?utf-8?b?YXR0YWNobWVudDsgZmlsZW5hbWU9IjIwMTjlubTluqY=?=\r\n =?utf-8?b?'
429     # '5YWs5byA6K++6K6h5YiS6KGoLnhsc3gi?='
430
431     # This may be a re-implementation of email.utils.collapse_rfc2231_value()
432     # as mentioned in email.message.EmailMessage.get_param()
433
434     # The form is: "=?charset?encoding?encoded text?="
435     SPLIT_REGEX = '\r?\n *'    # should be CRNL but some files miss the \r
436     ENCODED_WORD_REGEX = r'\s*=\?([^?]+)\?([^?]+)\?(.*)\?=\s*$'
437     LINE_REGEX = r'attachment\s*;\s*filename=(")?(.+)\1\s*$'
438     decoded = []
439     for word in re.split(SPLIT_REGEX, message['Content-Disposition']):
440         match = re.match(ENCODED_WORD_REGEX, word)
441         if not match:
442             break
443         charset, encoding, data = match.groups()
444         if encoding.lower() == 'b':
445             temp = b64decode(data)
446         elif encoding.lower() == 'q':
447             raise NotImplementedError('use quopri.decodestring, handle _')
448         else:
449             raise ValueError('not allowed according to wikipedia: "{}"'
450                              .format(encoding))
451         decoded.append(temp.decode(charset))
452     decoded = u''.join(decoded)
453
454     match = re.match(LINE_REGEX, decoded)
455     if match:
456         return match.groups()[1]
457     return failobj