Clean up, remove compat with py < 3.6
[pyi2ncommon] / src / mail_utils.py
CommitLineData
f49f6323 1# This Python file uses the following encoding: utf-8
11cbb815
PD
2
3# The software in this package is distributed under the GNU General
4# Public License version 2 (with a special exception described below).
5#
6# A copy of GNU General Public License (GPL) is included in this distribution,
7# in the file COPYING.GPL.
8#
9# As a special exception, if other files instantiate templates or use macros
10# or inline functions from this file, or you compile this file and link it
11# with other works to produce a work based on this file, this file
12# does not by itself cause the resulting work to be covered
13# by the GNU General Public License.
14#
15# However the source code for this file must still be made available
16# in accordance with section (3) of the GNU General Public License.
17#
18# This exception does not invalidate any other reasons why a work based
19# on this file might be covered by the GNU General Public License.
20#
21# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
22
f49f6323
PD
23"""
24
25SUMMARY
26------------------------------------------------------
2ed7100d
CH
27Utilities for dealing with email
28
29.. seealso:: :py:mod:`pyi2ncommon.mail_validator`,
30 :py:mod:`pyi2ncommon.imap_mailbox`
f49f6323
PD
31
32Copyright: Intra2net AG
33
34
35INTERFACE
36------------------------------------------------------
37
38"""
39
b36398e7 40from base64 import b64decode
67177844 41from email.utils import parsedate_to_datetime
1d21262c 42from email.parser import BytesParser
4b44f515 43from email import policy
f49f6323 44
67177844
CH
45# outsourced source, import required for compatiblity
46from .imap_mailbox import ImapMailbox # pylint: disable=unused-import
47from .mail_validator import * # pylint: disable=unused-import
f49f6323 48
67177844 49log = logging.getLogger('pyi2ncommon.mail_utils')
f49f6323
PD
50
51
52def prep_email_header(email_file, value, regex=None, criterion="envelopeto"):
53 """
54 Replace value in a provided email file.
55
56 :param str email_file: file to use for the replacement
57 :param str value: value to replace the first matched group with
58 :param regex: regular expression to use when replacing a header value
59 :type regex: str or None
60 :param str criterion: criterion to use for replacement, one
61 of 'envelopeto' or 'received'
62 :raises: :py:class:`ValueError` if the choice of criterion is invalid
63
64 In some cases this function is reusing arnied wrapper's cnf value
65 preparation but for email headers.
66 """
67 if criterion == "envelopeto":
68 logging.debug("Updating test emails' EnvelopeTo header")
69 arnied_wrapper.prep_cnf_value(email_file, value, regex=regex)
70 elif criterion == "received":
71 logging.debug("Updating test emails' Received header")
e108b7d4
CH
72 with open(email_file, "r") as file_handle:
73 email_text = file_handle.read()
f49f6323
PD
74 email_text = re.sub(regex, value, email_text)
75 email_text = re.sub(regex, value, email_text)
e108b7d4
CH
76 with open(email_file, "w") as file_handle:
77 file_handle.write(email_text)
f49f6323 78 else:
e108b7d4
CH
79 raise ValueError("Invalid header preparation criterion '%s'"
80 % criterion)
f49f6323
PD
81
82
83def create_users(usernames, config_file, params):
84 """
85 Create cyrus users from an absolute path to a user configuration file.
86
87 :param usernames: usernames of the created users
88 :type usernames: [str]
e108b7d4
CH
89 :param str config_file: template config file to use for each user
90 configuration
f49f6323
PD
91 :param params: template config file to use for each user configuration
92 :type params: {str, str}
e108b7d4
CH
93 :raises: :py:class:`RuntimeError` if the user exists already or cannot be
94 created
f49f6323
PD
95 """
96 log.info("Creating new cyrus users %s", ", ".join(usernames))
e108b7d4
CH
97 cyrus_user_path = params.get("cyrus_user_path",
98 "/datastore/imap-mails/user/")
f49f6323
PD
99
100 # check for existence round
101 for username in usernames:
102 if os.path.exists(os.path.join(cyrus_user_path,
103 username.replace(".", "^"))):
104 raise RuntimeError("The user %s was already created" % username)
105
106 for username in usernames:
107 params["user"] = '%i: "%s"' % (-1, username)
108 params["user_fullname"] = username
e108b7d4 109 params_regex = {"user": r'%s,(-?\d+: ".*")'}
f49f6323
PD
110 arnied_wrapper.set_cnf_semidynamic([config_file],
111 params, params_regex)
112
113 for username in usernames:
e108b7d4
CH
114 if not os.path.exists(os.path.join(cyrus_user_path,
115 username.replace(".", "^"))):
f49f6323
PD
116 raise RuntimeError("The user %s could not be created" % username)
117 else:
118 log.info("Added new user %s", username)
119 log.info("%s users successfully created!", len(usernames))
120
121
4b44f515
CH
122def parse_mail_file(file_name, headers_only=True, attachment_filenames=False,
123 raise_on_defect=False, new_message_type=False):
f49f6323
PD
124 """
125 Parse given email file (e.g. a banned message).
126
1d21262c 127 This is basically a `email.parser.BytesParser().parse(...)` with given
4b44f515
CH
128 `headers_only` and policy selection, that can also handle BSMTP. As an
129 extra bonus, you can just request headers plus the names of attached files.
f49f6323
PD
130
131 Removes the SMTP envelope surrounding the email if present. Only left-over
e108b7d4
CH
132 might be a line with a '.' at end of non-multipart messages if
133 `headers_only` is False.
b359b15c 134
4b44f515 135 :param str file_name: path to the file that contains the email text
b359b15c
CH
136 :param bool headers_only: whether to parse only the email headers; set this
137 to False, e.g. if you want to check for
138 attachments using message.walk()
139 :param bool attachment_filenames: if you just want headers and names of
140 attached files, set `headers_only` and
141 this to True.
4b44f515
CH
142 :param bool raise_on_defect: whether to raise an error if email parser
143 encounters a defect (email policy `strict`) or
144 just add the defect to message's `defect`
145 attribute
146 :param bool new_message_type: whether to return the older
147 :py:class:`email.message.Message` (policy
148 `compat32`, our default), or the newer
149 :py:class:`email.message.EmailMessage` type
150 (policy `default`). Big difference!
b359b15c
CH
151 :returns: either msg or 2-tuple `(msg, filenames)` if requested per arg
152 `attachment_filenames`
153 :rtype: :py:class:`email.message.Message` or
4b44f515
CH
154 (:py:class:`email.message.Message`, (str)) or
155 one of these two with :py:class:`email.message.EmailMessage`
f49f6323 156 """
b359b15c
CH
157 msg = None
158 start_pos = 0
4b44f515
CH
159
160 if new_message_type:
161 mail_policy = policy.default
162 else:
163 mail_policy = policy.compat32
164 if raise_on_defect:
165 mail_policy += policy.strict
166
1d21262c 167 with open(file_name, 'rb') as read_handle:
f49f6323 168 line = read_handle.readline()
1d21262c 169 if line.startswith(b'EHLO'):
f49f6323 170 # there is a smtp header. skip to its end
1d21262c 171 while line.strip() != b'DATA':
f49f6323
PD
172 line = read_handle.readline()
173 # the rest is the email plus a trailing '.' (ignored by parser if
174 # multipart)
175 else:
176 read_handle.seek(0) # forget we read the first line already
b359b15c 177 start_pos = read_handle.tell()
4b44f515
CH
178 msg = BytesParser(policy=mail_policy).parse(read_handle,
179 headersonly=headers_only)
b359b15c
CH
180
181 if not attachment_filenames:
182 return msg
183
184 # otherwise need to parse complete message to get attachment file names
185 if headers_only:
1d21262c 186 with open(file_name, 'rb') as read_handle:
b359b15c 187 read_handle.seek(start_pos)
4b44f515
CH
188 full_msg = BytesParser(policy=mail_policy).parse(read_handle,
189 headersonly=False)
b359b15c
CH
190 else:
191 full_msg = msg
192 filenames = [get_filename(part) for part in full_msg.walk()]
193 return msg, tuple(filename for filename in filenames
194 if filename is not None)
f49f6323
PD
195
196
58414aec
CH
197def parse_mail_date(message):
198 """
199 Parse the 'Date' header of the given message.
200
201 Shortcut for :py:func:`email.utils.parsedate_to_datetime`.
202
203 This is no longer necessary for newer
204 :py:class:`email.message.EmailMessage` since the `Date` Header is
205 automatically parsed to a :py:class:`email.headerregistry.DateHeader`.
206
207 :param message: Email message
208 :type message: :py:class:`email.message.Message`
209 :returns: datetime from Email "Date" header or None if header not present
210 :rtype: :py:class:`datetime.datetime` or None
211 """
212 date_str = message.get('Date', '')
213 if not date_str:
214 return None
215 return parsedate_to_datetime(date_str)
216
217
f44055b0
CH
218def get_user_mail_files(user, mailbox='INBOX'):
219 """
2ed7100d
CH
220 Iterate over mails in given folder of given user; yields file names.
221
222 Works on local cyrus file system, not on imap server.
f44055b0 223
2ed7100d
CH
224 :param str user: Name of user whose mailbox is analyzed
225 :param str mailbox: name of mailbox to use, INBOX (default) for base
226 folder; name is modified using :py:func:`cyrus_escape`
f44055b0
CH
227 :returns: nothing; but yields full path to messages on disc
228 """
229 # base folder of user mail
230 folder = os.path.join('/datastore', 'imap-mails', 'user', user)
231
2ed7100d 232 # adapt paths like "INBOX/sub/dir" to "sub/dir"
f44055b0
CH
233 subdirs = mailbox.split('/')
234 if subdirs[0].upper() == 'INBOX':
235 subdirs = subdirs[1:]
236 folder = os.path.join(folder,
237 *(cyrus_escape(subdir) for subdir in subdirs))
238
239 for filename in os.listdir(folder):
240 if not re.match(r'\d+\.', filename):
241 continue
242 full_path = os.path.join(folder, filename)
243 yield full_path
244
245
f49f6323
PD
246def get_user_mail(user, mailbox='INBOX', **kwargs):
247 """
e108b7d4 248 Iterate over mails in given folder of given user; yields parsed mails.
f49f6323 249
2ed7100d
CH
250 :param str user: see :py:func:`get_user_mail_files`
251 :param str mailbox: see :py:func:`get_user_mail_files`
f49f6323
PD
252 :param dict kwargs: all other args are forwarded to
253 :py:func:`parse_mail_file`
254 :returns: nothing; but yields 2-tuples (path, email_msg) where first is the
255 full path to the message on disc, and the latter is the outcome
256 of :py:func:`parse_mail_file` for that file
257 """
f44055b0
CH
258 for full_path in get_user_mail_files(user, mailbox):
259 yield full_path, parse_mail_file(full_path, **kwargs)
f49f6323
PD
260
261
f4dec410
CH
262def get_message_text(filename, fallback_encoding='iso8859-1',
263 include_all_text=False):
264 """
265 Extract message text as string from email message.
266
267 Intended as complementary addition to get_user_mail, e.g. ::
268
269 for filename, msg in get_user_mail(user):
270 # rough filtering based on headers
271 if msg['Subject'] != 'Expected Subject':
272 continue
273 # get message text for closer inspection
274 text = get_message_text(filename)
275 if 'Expected Text' not in text:
276 continue
277 ...
278
279 Finds the first part in message that is of type text/plain and decodes it
280 using encoding specified in mail or otherwise fallback encoding. If none
281 found takes first part of type "text/*", or otherwise just the first part.
282
283 If include_all_text is True, all text/* parts are included, with text/plain
284 being the first.
285
286 :param str filename: complete path of message file in filesystem
2ed7100d
CH
287 :param str fallback_encoding: Encoding of email text if none is specified
288 in mail.
f4dec410
CH
289 :param bool include_all_text: include all "text/*" parts in returned text
290 :returns: text(s) of message
291 :rtype: [str] if include_all_text else str
292 """
293 result = []
294 msg = parse_mail_file(filename, headers_only=False)
295 for part in msg.walk():
296 if part.get_content_type() != 'text/plain':
297 continue
298 encoding = part.get_content_charset(fallback_encoding)
299 result.append(part.get_payload(decode=True).decode(encoding))
300
301 if result and not include_all_text:
302 return result[0]
303
304 # no text/plain found. Try only "text/":
305 for part in msg.walk():
306 cont_type = part.get_content_type()
307 if cont_type.startswith('text/') and cont_type != 'text/plain':
308 encoding = part.get_content_charset(fallback_encoding)
309 result.append(part.get_payload(decode=True).decode(encoding))
310
311 if result:
312 if not include_all_text:
313 return result[0]
314 return result
315
316 # no "text/" found. Just take first part
317 while msg.is_multipart():
318 msg = msg.get_payload(0)
319
320 encoding = msg.get_content_charset(fallback_encoding)
321 if include_all_text:
322 return [msg.get_payload(decode=True).decode(encoding), ]
323 return msg.get_payload(decode=True).decode(encoding)
324
325
f49f6323
PD
326def cyrus_escape(user_or_folder, keep_path=False, regex=False):
327 """
e108b7d4 328 Convert names of users or mailbox folders to cyrus format.
f49f6323
PD
329
330 quite a hack, just does the following hard-coded replacements:
331
332 * . --> ^
333 * / --> . (except if keep_path is True)
334 * "u --> &APw- , "o --> &APY- , "a --> &AOQ-
335 (if need more: this is modified utf-7)
336 * inbox --> (the empty string)
337
338 Would like to use a general modified utf-7-encoder/decoder but python has
7628bc48 339 none builtin (see https://bugs.python.org/issue5305) and an extra lib like
f49f6323
PD
340 https://bitbucket.org/mjs0/imapclient/ would be overkill. After all, we
341 control the input to this function via params and this is enough umlaut-
342 testing I think...
343
344 :param str user_or_folder: name of the user or folder string to escape
345 :param bool keep_path: do not replace '/' with '.' so can still use result
346 as path name
347 :param bool regex: result is used in grep or other regex, so ^, . and & are
348 escaped again with a backslash
349 :returns: escaped user or folder string
350 :rtype: str
351
352 .. seealso:: :py:func:`cyrus_unescape`
353 """
354 temp = user_or_folder.replace('.', '^') \
355 .replace('ü', '&APw-').replace('ä', '&AOQ-') \
356 .replace('ö', '&APY-') \
357 .replace('inbox', '').replace('INBOX', '').replace('Inbox', '')
358 if not keep_path:
359 temp = temp.replace('/', '.')
360 if regex:
361 return temp.replace('^', r'\^').replace('&', r'\&') \
362 .replace('.', r'\.').replace('$', r'\$')
2ed7100d 363 return temp
f49f6323
PD
364
365
366def cyrus_unescape(user_or_folder):
367 """
368 Undo effects of :py:func:`cyrus_escape` (but not all of them).
369
370 :param str user_or_folder: name of the user or folder string to unescape
371 :returns: unescaped user or folder string
372 :rtype: str
373 """
374 if user_or_folder == '':
375 return 'inbox'
376 return user_or_folder.replace('.', '/')\
377 .replace(r'\^', '.').replace('^', '.')
b36398e7
CH
378
379
380def get_filename(message, failobj=None, do_unwrap=True):
381 """
e108b7d4 382 Get filename of a message part, even if it is base64-encoded.
b36398e7
CH
383
384 For attachments with base64-encoded file name, the
2ed7100d
CH
385 :py:func:`email.message.Message.get_filename()` does not work. This
386 function tries that first and if it fails tries to interprete the
387 Content-Disposition of the message part. If all fails, returns `failobj`.
b36398e7
CH
388
389 Only for ascii filenames: also unwraps file names if they are line-wrapped.
390 But note that this may remove too much whitespace from the filename if
7628bc48 391 line-wrapping happened in the same position as the filename's whitespace.
b36398e7
CH
392 To get unwrapped version, set param `do_unwrap` to `False`.
393
394 See also: https://en.wikipedia.org/wiki/MIME#Encoded-Word
395
396 :param message: message part, e.g. from
397 :py:meth:`email.message.Message.walk`
4b44f515
CH
398 :type message: :py:class:`email.message.Message` or
399 :py:class:`email.message.EmailMessage`
b36398e7
CH
400 :param failobj: object to return in case of failure (defaults to None)
401 :param bool do_unwrap: undo line-break inserted by mail-creator; may remove
402 whitespace from file name; only applies to ascii
403 file names
404 :returns: either a string or failobj
405 """
406 # try the old way and unwrap
407 filename = message.get_filename(failobj)
408
409 if isinstance(filename, bytes) and not filename.startswith(b'=?') \
410 and not filename.endswith(b'?='):
411 filename = filename.decode('utf8')
412
413 if isinstance(filename, str):
414 if do_unwrap:
415 return re.sub('[\\r\\n]+', '', filename)
416 return filename
417
418 if 'Content-Disposition' not in message:
419 return failobj
420
421 # try parsing content-disposition. e.g.:
422 # attachment; filename="2018年度公开课计划表.xlsx" -->
423 # '=?utf-8?b?YXR0YWNobWVudDsgZmlsZW5hbWU9IjIwMTjlubTluqY=?=\r\n =?utf-8?b?'
424 # '5YWs5byA6K++6K6h5YiS6KGoLnhsc3gi?='
425
426 # This may be a re-implementation of email.utils.collapse_rfc2231_value()
4b44f515 427 # as mentioned in email.message.EmailMessage.get_param()
b36398e7
CH
428
429 # The form is: "=?charset?encoding?encoded text?="
430 SPLIT_REGEX = '\r?\n *' # should be CRNL but some files miss the \r
431 ENCODED_WORD_REGEX = r'\s*=\?([^?]+)\?([^?]+)\?(.*)\?=\s*$'
432 LINE_REGEX = r'attachment\s*;\s*filename=(")?(.+)\1\s*$'
433 decoded = []
434 for word in re.split(SPLIT_REGEX, message['Content-Disposition']):
435 match = re.match(ENCODED_WORD_REGEX, word)
436 if not match:
437 break
438 charset, encoding, data = match.groups()
439 if encoding.lower() == 'b':
440 temp = b64decode(data)
441 elif encoding.lower() == 'q':
442 raise NotImplementedError('use quopri.decodestring, handle _')
443 else:
444 raise ValueError('not allowed according to wikipedia: "{}"'
445 .format(encoding))
446 decoded.append(temp.decode(charset))
447 decoded = u''.join(decoded)
448
449 match = re.match(LINE_REGEX, decoded)
450 if match:
451 return match.groups()[1]
452 return failobj