e3339a1d4be631fb6a0afda01f070356d12c7ece
[pyi2ncommon] / src / mail_utils.py
1 # This Python file uses the following encoding: utf-8
2
3 # The software in this package is distributed under the GNU General
4 # Public License version 2 (with a special exception described below).
5 #
6 # A copy of GNU General Public License (GPL) is included in this distribution,
7 # in the file COPYING.GPL.
8 #
9 # As a special exception, if other files instantiate templates or use macros
10 # or inline functions from this file, or you compile this file and link it
11 # with other works to produce a work based on this file, this file
12 # does not by itself cause the resulting work to be covered
13 # by the GNU General Public License.
14 #
15 # However the source code for this file must still be made available
16 # in accordance with section (3) of the GNU General Public License.
17 #
18 # This exception does not invalidate any other reasons why a work based
19 # on this file might be covered by the GNU General Public License.
20 #
21 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
22
23 """
24
25 SUMMARY
26 ------------------------------------------------------
27 Utilities for dealing with email
28
29 .. seealso:: :py:mod:`pyi2ncommon.mail_validator`,
30              :py:mod:`pyi2ncommon.imap_mailbox`
31
32 Copyright: Intra2net AG
33
34
35 INTERFACE
36 ------------------------------------------------------
37
38 """
39
40 from base64 import b64decode
41 from email.utils import parsedate_to_datetime
42 from email.parser import BytesParser
43 from email import policy
44
45 # outsourced source, import required for compatiblity
46 from .imap_mailbox import ImapMailbox           # pylint: disable=unused-import
47 from .mail_validator import *                   # pylint: disable=unused-import
48
49 log = logging.getLogger('pyi2ncommon.mail_utils')
50
51
52 def prep_email_header(email_file, value, regex=None, criterion="envelopeto"):
53     """
54     Replace value in a provided email file.
55
56     :param str email_file: file to use for the replacement
57     :param str value: value to replace the first matched group with
58     :param regex: regular expression to use when replacing a header value
59     :type regex: str or None
60     :param str criterion: criterion to use for replacement, one
61                           of 'envelopeto' or 'received'
62     :raises: :py:class:`ValueError` if the choice of criterion is invalid
63
64     In some cases this function is reusing arnied wrapper's cnf value
65     preparation but for email headers.
66     """
67     if criterion == "envelopeto":
68         logging.debug("Updating test emails' EnvelopeTo header")
69         arnied_wrapper.prep_cnf_value(email_file, value, regex=regex)
70     elif criterion == "received":
71         logging.debug("Updating test emails' Received header")
72         with open(email_file, "r") as file_handle:
73             email_text = file_handle.read()
74             email_text = re.sub(regex, value, email_text)
75             email_text = re.sub(regex, value, email_text)
76         with open(email_file, "w") as file_handle:
77             file_handle.write(email_text)
78     else:
79         raise ValueError("Invalid header preparation criterion '%s'"
80                          % criterion)
81
82
83 def create_users(usernames, config_file, params):
84     """
85     Create cyrus users from an absolute path to a user configuration file.
86
87     :param usernames: usernames of the created users
88     :type usernames: [str]
89     :param str config_file: template config file to use for each user
90                             configuration
91     :param params: template config file to use for each user configuration
92     :type params: {str, str}
93     :raises: :py:class:`RuntimeError` if the user exists already or cannot be
94               created
95     """
96     log.info("Creating new cyrus users %s", ", ".join(usernames))
97     cyrus_user_path = params.get("cyrus_user_path",
98                                  "/datastore/imap-mails/user/")
99
100     # check for existence round
101     for username in usernames:
102         if os.path.exists(os.path.join(cyrus_user_path,
103                                        username.replace(".", "^"))):
104             raise RuntimeError("The user %s was already created" % username)
105
106     for username in usernames:
107         params["user"] = '%i: "%s"' % (-1, username)
108         params["user_fullname"] = username
109         params_regex = {"user": r'%s,(-?\d+: ".*")'}
110         arnied_wrapper.set_cnf_semidynamic([config_file],
111                                            params, params_regex)
112
113     for username in usernames:
114         if not os.path.exists(os.path.join(cyrus_user_path,
115                                            username.replace(".", "^"))):
116             raise RuntimeError("The user %s could not be created" % username)
117         else:
118             log.info("Added new user %s", username)
119     log.info("%s users successfully created!", len(usernames))
120
121
122 def parse_mail_file(file_name, headers_only=True, attachment_filenames=False,
123                     raise_on_defect=False, new_message_type=False):
124     """
125     Parse given email file (e.g. a banned message).
126
127     This is basically a `email.parser.BytesParser().parse(...)` with given
128     `headers_only` and policy selection, that can also handle BSMTP. As an
129     extra bonus, you can just request headers plus the names of attached files.
130
131     Removes the SMTP envelope surrounding the email if present. Only left-over
132     might be a line with a '.' at end of non-multipart messages if
133      `headers_only` is False.
134
135     :param str file_name: path to the file that contains the email text
136     :param bool headers_only: whether to parse only the email headers; set this
137                               to False, e.g. if you want to check for
138                               attachments using message.walk()
139     :param bool attachment_filenames: if you just want headers and names of
140                                       attached files, set `headers_only` and
141                                       this to True.
142     :param bool raise_on_defect: whether to raise an error if email parser
143                                  encounters a defect (email policy `strict`) or
144                                  just add the defect to message's `defect`
145                                  attribute
146     :param bool new_message_type: whether to return the older
147                                   :py:class:`email.message.Message` (policy
148                                   `compat32`, our default), or the newer
149                                   :py:class:`email.message.EmailMessage` type
150                                   (policy `default`). Big difference!
151     :returns: either msg or 2-tuple `(msg, filenames)` if requested per arg
152               `attachment_filenames`
153     :rtype: :py:class:`email.message.Message` or
154              (:py:class:`email.message.Message`, (str)) or
155              one of these two with :py:class:`email.message.EmailMessage`
156     """
157     msg = None
158     start_pos = 0
159
160     if new_message_type:
161         mail_policy = policy.default
162     else:
163         mail_policy = policy.compat32
164     if raise_on_defect:
165         mail_policy += policy.strict
166
167     with open(file_name, 'rb') as read_handle:
168         line = read_handle.readline()
169         if line.startswith(b'EHLO'):
170             # there is a smtp header. skip to its end
171             while line.strip() != b'DATA':
172                 line = read_handle.readline()
173             # the rest is the email plus a trailing '.' (ignored by parser if
174             # multipart)
175         else:
176             read_handle.seek(0)  # forget we read the first line already
177         start_pos = read_handle.tell()
178         msg = BytesParser(policy=mail_policy).parse(read_handle,
179                                                     headersonly=headers_only)
180
181     if not attachment_filenames:
182         return msg
183
184     # otherwise need to parse complete message to get attachment file names
185     if headers_only:
186         with open(file_name, 'rb') as read_handle:
187             read_handle.seek(start_pos)
188             full_msg = BytesParser(policy=mail_policy).parse(read_handle,
189                                                              headersonly=False)
190     else:
191         full_msg = msg
192     filenames = [get_filename(part) for part in full_msg.walk()]
193     return msg, tuple(filename for filename in filenames
194                       if filename is not None)
195
196
197 def parse_mail_date(message):
198     """
199     Parse the 'Date' header of the given message.
200
201     Shortcut for :py:func:`email.utils.parsedate_to_datetime`.
202
203     This is no longer necessary for newer
204     :py:class:`email.message.EmailMessage` since the `Date` Header is
205     automatically parsed to a :py:class:`email.headerregistry.DateHeader`.
206
207     :param message: Email message
208     :type message: :py:class:`email.message.Message`
209     :returns: datetime from Email "Date" header or None if header not present
210     :rtype: :py:class:`datetime.datetime` or None
211     """
212     date_str = message.get('Date', '')
213     if not date_str:
214         return None
215     return parsedate_to_datetime(date_str)
216
217
218 def get_user_mail_files(user, mailbox='INBOX'):
219     """
220     Iterate over mails in given folder of given user; yields file names.
221
222     Works on local cyrus file system, not on imap server.
223
224     :param str user: Name of user whose mailbox is analyzed
225     :param str mailbox: name of mailbox to use, INBOX (default) for base
226                         folder; name is modified using :py:func:`cyrus_escape`
227     :returns: nothing; but yields full path to messages on disc
228     """
229     # base folder of user mail
230     folder = os.path.join('/datastore', 'imap-mails', 'user', user)
231
232     # adapt paths like "INBOX/sub/dir" to "sub/dir"
233     subdirs = mailbox.split('/')
234     if subdirs[0].upper() == 'INBOX':
235         subdirs = subdirs[1:]
236     folder = os.path.join(folder,
237                           *(cyrus_escape(subdir) for subdir in subdirs))
238
239     for filename in os.listdir(folder):
240         if not re.match(r'\d+\.', filename):
241             continue
242         full_path = os.path.join(folder, filename)
243         yield full_path
244
245
246 def get_user_mail(user, mailbox='INBOX', **kwargs):
247     """
248     Iterate over mails in given folder of given user; yields parsed mails.
249
250     :param str user: see :py:func:`get_user_mail_files`
251     :param str mailbox: see :py:func:`get_user_mail_files`
252     :param dict kwargs: all other args are forwarded to
253                         :py:func:`parse_mail_file`
254     :returns: nothing; but yields 2-tuples (path, email_msg) where first is the
255               full path to the message on disc, and the latter is the outcome
256               of :py:func:`parse_mail_file` for that file
257     """
258     for full_path in get_user_mail_files(user, mailbox):
259         yield full_path, parse_mail_file(full_path, **kwargs)
260
261
262 def get_message_text(filename, fallback_encoding='iso8859-1',
263                      include_all_text=False):
264     """
265     Extract message text as string from email message.
266
267     Intended as complementary addition to get_user_mail, e.g. ::
268
269         for filename, msg in get_user_mail(user):
270             # rough filtering based on headers
271             if msg['Subject'] != 'Expected Subject':
272                 continue
273             # get message text for closer inspection
274             text = get_message_text(filename)
275             if 'Expected Text' not in text:
276                 continue
277             ...
278
279     Finds the first part in message that is of type text/plain and decodes it
280     using encoding specified in mail or otherwise fallback encoding. If none
281     found takes first part of type "text/*", or otherwise just the first part.
282
283     If include_all_text is True, all text/* parts are included, with text/plain
284     being the first.
285
286     :param str filename: complete path of message file in filesystem
287     :param str fallback_encoding: Encoding of email text if none is specified
288                                   in mail.
289     :param bool include_all_text: include all "text/*" parts in returned text
290     :returns: text(s) of message
291     :rtype: [str] if include_all_text else str
292     """
293     result = []
294     msg = parse_mail_file(filename, headers_only=False)
295     for part in msg.walk():
296         if part.get_content_type() != 'text/plain':
297             continue
298         encoding = part.get_content_charset(fallback_encoding)
299         result.append(part.get_payload(decode=True).decode(encoding))
300
301     if result and not include_all_text:
302         return result[0]
303
304     # no text/plain found. Try only "text/":
305     for part in msg.walk():
306         cont_type = part.get_content_type()
307         if cont_type.startswith('text/') and cont_type != 'text/plain':
308             encoding = part.get_content_charset(fallback_encoding)
309             result.append(part.get_payload(decode=True).decode(encoding))
310
311     if result:
312         if not include_all_text:
313             return result[0]
314         return result
315
316     # no "text/" found. Just take first part
317     while msg.is_multipart():
318         msg = msg.get_payload(0)
319
320     encoding = msg.get_content_charset(fallback_encoding)
321     if include_all_text:
322         return [msg.get_payload(decode=True).decode(encoding), ]
323     return msg.get_payload(decode=True).decode(encoding)
324
325
326 def cyrus_escape(user_or_folder, keep_path=False, regex=False):
327     """
328     Convert names of users or mailbox folders to cyrus format.
329
330     quite a hack, just does the following hard-coded replacements:
331
332     * . --> ^
333     * / --> .  (except if keep_path is True)
334     * "u --> &APw-  ,  "o --> &APY-  ,  "a --> &AOQ-
335       (if need more: this is modified utf-7)
336     * inbox -->   (the empty string)
337
338     Would like to use a general modified utf-7-encoder/decoder but python has
339     none builtin (see https://bugs.python.org/issue5305) and an extra lib like
340     https://bitbucket.org/mjs0/imapclient/ would be overkill. After all, we
341     control the input to this function via params and this is enough umlaut-
342     testing I think...
343
344     :param str user_or_folder: name of the user or folder string to escape
345     :param bool keep_path: do not replace '/' with '.' so can still use result
346                            as path name
347     :param bool regex: result is used in grep or other regex, so ^, . and & are
348                        escaped again with a backslash
349     :returns: escaped user or folder string
350     :rtype: str
351
352     .. seealso:: :py:func:`cyrus_unescape`
353     """
354     temp = user_or_folder.replace('.', '^') \
355         .replace('ü', '&APw-').replace('ä', '&AOQ-') \
356         .replace('ö', '&APY-') \
357         .replace('inbox', '').replace('INBOX', '').replace('Inbox', '')
358     if not keep_path:
359         temp = temp.replace('/', '.')
360     if regex:
361         return temp.replace('^', r'\^').replace('&', r'\&') \
362                    .replace('.', r'\.').replace('$', r'\$')
363     return temp
364
365
366 def cyrus_unescape(user_or_folder):
367     """
368     Undo effects of :py:func:`cyrus_escape` (but not all of them).
369
370     :param str user_or_folder: name of the user or folder string to unescape
371     :returns: unescaped user or folder string
372     :rtype: str
373     """
374     if user_or_folder == '':
375         return 'inbox'
376     return user_or_folder.replace('.', '/')\
377         .replace(r'\^', '.').replace('^', '.')
378
379
380 def get_filename(message, failobj=None, do_unwrap=True):
381     """
382     Get filename of a message part, even if it is base64-encoded.
383
384     For attachments with base64-encoded file name, the
385     :py:func:`email.message.Message.get_filename()` does not work. This
386     function tries that first and if it fails tries to interprete the
387     Content-Disposition of the message part. If all fails, returns `failobj`.
388
389     Only for ascii filenames: also unwraps file names if they are line-wrapped.
390     But note that this may remove too much whitespace from the filename if
391     line-wrapping happened in the same position as the filename's whitespace.
392     To get unwrapped version, set param `do_unwrap` to `False`.
393
394     See also: https://en.wikipedia.org/wiki/MIME#Encoded-Word
395
396     :param message: message part, e.g. from
397                     :py:meth:`email.message.Message.walk`
398     :type message: :py:class:`email.message.Message` or
399                    :py:class:`email.message.EmailMessage`
400     :param failobj: object to return in case of failure (defaults to None)
401     :param bool do_unwrap: undo line-break inserted by mail-creator; may remove
402                            whitespace from file name; only applies to ascii
403                            file names
404     :returns: either a string or failobj
405     """
406     # try the old way and unwrap
407     filename = message.get_filename(failobj)
408
409     if isinstance(filename, bytes) and not filename.startswith(b'=?') \
410             and not filename.endswith(b'?='):
411         filename = filename.decode('utf8')
412
413     if isinstance(filename, str):
414         if do_unwrap:
415             return re.sub('[\\r\\n]+', '', filename)
416         return filename
417
418     if 'Content-Disposition' not in message:
419         return failobj
420
421     # try parsing content-disposition. e.g.:
422     # attachment; filename="2018年度公开课计划表.xlsx"   -->
423     # '=?utf-8?b?YXR0YWNobWVudDsgZmlsZW5hbWU9IjIwMTjlubTluqY=?=\r\n =?utf-8?b?'
424     # '5YWs5byA6K++6K6h5YiS6KGoLnhsc3gi?='
425
426     # This may be a re-implementation of email.utils.collapse_rfc2231_value()
427     # as mentioned in email.message.EmailMessage.get_param()
428
429     # The form is: "=?charset?encoding?encoded text?="
430     SPLIT_REGEX = '\r?\n *'    # should be CRNL but some files miss the \r
431     ENCODED_WORD_REGEX = r'\s*=\?([^?]+)\?([^?]+)\?(.*)\?=\s*$'
432     LINE_REGEX = r'attachment\s*;\s*filename=(")?(.+)\1\s*$'
433     decoded = []
434     for word in re.split(SPLIT_REGEX, message['Content-Disposition']):
435         match = re.match(ENCODED_WORD_REGEX, word)
436         if not match:
437             break
438         charset, encoding, data = match.groups()
439         if encoding.lower() == 'b':
440             temp = b64decode(data)
441         elif encoding.lower() == 'q':
442             raise NotImplementedError('use quopri.decodestring, handle _')
443         else:
444             raise ValueError('not allowed according to wikipedia: "{}"'
445                              .format(encoding))
446         decoded.append(temp.decode(charset))
447     decoded = u''.join(decoded)
448
449     match = re.match(LINE_REGEX, decoded)
450     if match:
451         return match.groups()[1]
452     return failobj