Increase version to 1.7.4
[pyi2ncommon] / src / mail_utils.py
... / ...
CommitLineData
1# This Python file uses the following encoding: utf-8
2
3# The software in this package is distributed under the GNU General
4# Public License version 2 (with a special exception described below).
5#
6# A copy of GNU General Public License (GPL) is included in this distribution,
7# in the file COPYING.GPL.
8#
9# As a special exception, if other files instantiate templates or use macros
10# or inline functions from this file, or you compile this file and link it
11# with other works to produce a work based on this file, this file
12# does not by itself cause the resulting work to be covered
13# by the GNU General Public License.
14#
15# However the source code for this file must still be made available
16# in accordance with section (3) of the GNU General Public License.
17#
18# This exception does not invalidate any other reasons why a work based
19# on this file might be covered by the GNU General Public License.
20#
21# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
22
23"""
24Utilities for dealing with email.
25
26.. seealso:: :py:mod:`pyi2ncommon.mail_validator`,
27 :py:mod:`pyi2ncommon.imap_mailbox`
28
29Copyright: Intra2net AG
30"""
31
32from base64 import b64decode
33from email.utils import parsedate_to_datetime
34from email.parser import BytesParser
35from email import policy
36
37# outsourced source, import required for compatibility
38from .imap_mailbox import ImapMailbox # pylint: disable=unused-import
39from .mail_validator import * # pylint: disable=unused-import
40from .sysmisc import replace_file_regex
41
42log = logging.getLogger('pyi2ncommon.mail_utils')
43
44
45def prep_email_header(email_file, value, regex=None, criterion="envelopeto"):
46 """
47 Replace value in a provided email file.
48
49 :param str email_file: file to use for the replacement
50 :param str value: value to replace the first matched group with
51 :param regex: regular expression to use when replacing a header value
52 :type regex: str or None
53 :param str criterion: criterion to use for replacement, one
54 of 'envelopeto' or 'received'
55 :raises: :py:class:`ValueError` if the choice of criterion is invalid
56
57 ..todo:: In some cases this function is reusing arnied wrapper's cnf
58 value preparation but for email headers.
59 """
60 if criterion == "envelopeto":
61 logging.debug("Updating test emails' EnvelopeTo header")
62 replace_file_regex(email_file, value, regex=regex)
63 elif criterion == "received":
64 logging.debug("Updating test emails' Received header")
65 with open(email_file, "r") as file_handle:
66 email_text = file_handle.read()
67 email_text = re.sub(regex, value, email_text)
68 email_text = re.sub(regex, value, email_text)
69 with open(email_file, "w") as file_handle:
70 file_handle.write(email_text)
71 else:
72 raise ValueError("Invalid header preparation criterion '%s'"
73 % criterion)
74
75
76def parse_mail_file(file_name, headers_only=True, attachment_filenames=False,
77 raise_on_defect=False, new_message_type=False):
78 """
79 Parse given email file (e.g. a banned message).
80
81 This is basically a `email.parser.BytesParser().parse(...)` with given
82 `headers_only` and policy selection, that can also handle BSMTP. As an
83 extra bonus, you can just request headers plus the names of attached files.
84
85 Removes the SMTP envelope surrounding the email if present. Only left-over
86 might be a line with a '.' at end of non-multipart messages if
87 `headers_only` is False.
88
89 :param str file_name: path to the file that contains the email text
90 :param bool headers_only: whether to parse only the email headers; set this
91 to False, e.g. if you want to check for
92 attachments using message.walk()
93 :param bool attachment_filenames: if you just want headers and names of
94 attached files, set `headers_only` and
95 this to True.
96 :param bool raise_on_defect: whether to raise an error if email parser
97 encounters a defect (email policy `strict`) or
98 just add the defect to message's `defect`
99 attribute
100 :param bool new_message_type: whether to return the older
101 :py:class:`email.message.Message` (policy
102 `compat32`, our default), or the newer
103 :py:class:`email.message.EmailMessage` type
104 (policy `default`). Big difference!
105 :returns: either msg or 2-tuple `(msg, filenames)` if requested per arg
106 `attachment_filenames`
107 :rtype: :py:class:`email.message.Message` or
108 (:py:class:`email.message.Message`, (str)) or
109 one of these two with :py:class:`email.message.EmailMessage`
110 """
111 msg = None
112 start_pos = 0
113
114 if new_message_type:
115 mail_policy = policy.default
116 else:
117 mail_policy = policy.compat32
118 if raise_on_defect:
119 mail_policy += policy.strict
120
121 with open(file_name, 'rb') as read_handle:
122 line = read_handle.readline()
123 if line.startswith(b'EHLO'):
124 # there is a smtp header. skip to its end
125 while line.strip() != b'DATA':
126 line = read_handle.readline()
127 # the rest is the email plus a trailing '.' (ignored by parser if
128 # multipart)
129 else:
130 read_handle.seek(0) # forget we read the first line already
131 start_pos = read_handle.tell()
132 msg = BytesParser(policy=mail_policy).parse(read_handle,
133 headersonly=headers_only)
134
135 if not attachment_filenames:
136 return msg
137
138 # otherwise need to parse complete message to get attachment file names
139 if headers_only:
140 with open(file_name, 'rb') as read_handle:
141 read_handle.seek(start_pos)
142 full_msg = BytesParser(policy=mail_policy).parse(read_handle,
143 headersonly=False)
144 else:
145 full_msg = msg
146 filenames = [get_filename(part) for part in full_msg.walk()]
147 return msg, tuple(filename for filename in filenames
148 if filename is not None)
149
150
151def parse_mail_date(message):
152 """
153 Parse the 'Date' header of the given message.
154
155 Shortcut for :py:func:`email.utils.parsedate_to_datetime`.
156
157 This is no longer necessary for newer
158 :py:class:`email.message.EmailMessage` since the `Date` Header is
159 automatically parsed to a :py:class:`email.headerregistry.DateHeader`.
160
161 :param message: Email message
162 :type message: :py:class:`email.message.Message`
163 :returns: datetime from Email "Date" header or None if header not present
164 :rtype: :py:class:`datetime.datetime` or None
165 """
166 date_str = message.get('Date', '')
167 if not date_str:
168 return None
169 return parsedate_to_datetime(date_str)
170
171
172def get_user_mail_files(user, mailbox='INBOX'):
173 """
174 Iterate over mails in given folder of given user; yields file names.
175
176 Works on local cyrus file system, not on imap server.
177
178 :param str user: Name of user whose mailbox is analyzed
179 :param str mailbox: name of mailbox to use, INBOX (default) for base
180 folder; name is modified using :py:func:`cyrus_escape`
181 :returns: nothing; but yields full path to messages on disc
182 """
183 # base folder of user mail
184 folder = os.path.join('/datastore', 'imap-mails', 'user', user)
185
186 # adapt paths like "INBOX/sub/dir" to "sub/dir"
187 subdirs = mailbox.split('/')
188 if subdirs[0].upper() == 'INBOX':
189 subdirs = subdirs[1:]
190 folder = os.path.join(folder,
191 *(cyrus_escape(subdir) for subdir in subdirs))
192
193 for filename in os.listdir(folder):
194 if not re.match(r'\d+\.', filename):
195 continue
196 full_path = os.path.join(folder, filename)
197 yield full_path
198
199
200def get_user_mail(user, mailbox='INBOX', **kwargs):
201 """
202 Iterate over mails in given folder of given user; yields parsed mails.
203
204 :param str user: see :py:func:`get_user_mail_files`
205 :param str mailbox: see :py:func:`get_user_mail_files`
206 :param dict kwargs: all other args are forwarded to
207 :py:func:`parse_mail_file`
208 :returns: nothing; but yields 2-tuples (path, email_msg) where first is the
209 full path to the message on disc, and the latter is the outcome
210 of :py:func:`parse_mail_file` for that file
211 """
212 for full_path in get_user_mail_files(user, mailbox):
213 yield full_path, parse_mail_file(full_path, **kwargs)
214
215
216def get_message_text(filename, fallback_encoding='iso8859-1',
217 include_all_text=False):
218 """
219 Extract message text as string from email message.
220
221 Intended as complementary addition to get_user_mail, e.g. ::
222
223 for filename, msg in get_user_mail(user):
224 # rough filtering based on headers
225 if msg['Subject'] != 'Expected Subject':
226 continue
227 # get message text for closer inspection
228 text = get_message_text(filename)
229 if 'Expected Text' not in text:
230 continue
231 ...
232
233 Finds the first part in message that is of type `text/plain` and decodes it
234 using encoding specified in mail or otherwise fallback encoding. If none
235 found takes first part of type `text/*`, or otherwise just the first part.
236
237 If include_all_text is True, all `text/*` parts are included, with `text/plain`
238 being the first.
239
240 :param str filename: complete path of message file in filesystem
241 :param str fallback_encoding: Encoding of email text if none is specified
242 in mail.
243 :param bool include_all_text: include all `text/*` parts in returned text
244 :returns: text(s) of message
245 :rtype: [str] if include_all_text else str
246 """
247 result = []
248 msg = parse_mail_file(filename, headers_only=False)
249 for part in msg.walk():
250 if part.get_content_type() != 'text/plain':
251 continue
252 encoding = part.get_content_charset(fallback_encoding)
253 result.append(part.get_payload(decode=True).decode(encoding))
254
255 if result and not include_all_text:
256 return result[0]
257
258 # no text/plain found. Try only "text/":
259 for part in msg.walk():
260 cont_type = part.get_content_type()
261 if cont_type.startswith('text/') and cont_type != 'text/plain':
262 encoding = part.get_content_charset(fallback_encoding)
263 result.append(part.get_payload(decode=True).decode(encoding))
264
265 if result:
266 if not include_all_text:
267 return result[0]
268 return result
269
270 # no "text/" found. Just take first part
271 while msg.is_multipart():
272 msg = msg.get_payload(0)
273
274 encoding = msg.get_content_charset(fallback_encoding)
275 if include_all_text:
276 return [msg.get_payload(decode=True).decode(encoding), ]
277 return msg.get_payload(decode=True).decode(encoding)
278
279
280def cyrus_escape(user_or_folder, keep_path=False, regex=False):
281 """
282 Convert names of users or mailbox folders to cyrus format.
283
284 quite a hack, just does the following hard-coded replacements:
285
286 * . --> ^
287 * / --> . (except if keep_path is True)
288 * "u --> &APw- , "o --> &APY- , "a --> &AOQ-
289 (if need more: this is modified utf-7)
290 * inbox --> (the empty string)
291
292 Would like to use a general modified utf-7-encoder/decoder but python has
293 none builtin (see https://bugs.python.org/issue5305) and an extra lib like
294 https://bitbucket.org/mjs0/imapclient/ would be overkill. After all, we
295 control the input to this function via params and this is enough umlaut-
296 testing I think...
297
298 :param str user_or_folder: name of the user or folder string to escape
299 :param bool keep_path: do not replace '/' with '.' so can still use result
300 as path name
301 :param bool regex: result is used in grep or other regex, so ^, . and & are
302 escaped again with a backslash
303 :returns: escaped user or folder string
304 :rtype: str
305
306 .. seealso:: :py:func:`cyrus_unescape`
307 """
308 temp = user_or_folder.replace('.', '^') \
309 .replace('ü', '&APw-').replace('ä', '&AOQ-') \
310 .replace('ö', '&APY-') \
311 .replace('inbox', '').replace('INBOX', '').replace('Inbox', '')
312 if not keep_path:
313 temp = temp.replace('/', '.')
314 if regex:
315 return temp.replace('^', r'\^').replace('&', r'\&') \
316 .replace('.', r'\.').replace('$', r'\$')
317 return temp
318
319
320def cyrus_unescape(user_or_folder):
321 """
322 Undo effects of :py:func:`cyrus_escape` (but not all of them).
323
324 :param str user_or_folder: name of the user or folder string to unescape
325 :returns: unescaped user or folder string
326 :rtype: str
327 """
328 if user_or_folder == '':
329 return 'inbox'
330 return user_or_folder.replace('.', '/')\
331 .replace(r'\^', '.').replace('^', '.')
332
333
334def get_filename(message, failobj=None, do_unwrap=True):
335 """
336 Get filename of a message part, even if it is base64-encoded.
337
338 For attachments with base64-encoded file name, the
339 :py:func:`email.message.Message.get_filename()` does not work. This
340 function tries that first and if it fails tries to interprete the
341 Content-Disposition of the message part. If all fails, returns `failobj`.
342
343 Only for ascii filenames: also unwraps file names if they are line-wrapped.
344 But note that this may remove too much whitespace from the filename if
345 line-wrapping happened in the same position as the filename's whitespace.
346 To get unwrapped version, set param `do_unwrap` to `False`.
347
348 See also: https://en.wikipedia.org/wiki/MIME#Encoded-Word
349
350 :param message: message part, e.g. from
351 :py:meth:`email.message.Message.walk`
352 :type message: :py:class:`email.message.Message` or
353 :py:class:`email.message.EmailMessage`
354 :param failobj: object to return in case of failure (defaults to None)
355 :param bool do_unwrap: undo line-break inserted by mail-creator; may remove
356 whitespace from file name; only applies to ascii
357 file names
358 :returns: either a string or failobj
359 """
360 # try the old way and unwrap
361 filename = message.get_filename(failobj)
362
363 if isinstance(filename, bytes) and not filename.startswith(b'=?') \
364 and not filename.endswith(b'?='):
365 filename = filename.decode('utf8')
366
367 if isinstance(filename, str):
368 if do_unwrap:
369 return re.sub('[\\r\\n]+', '', filename)
370 return filename
371
372 if 'Content-Disposition' not in message:
373 return failobj
374
375 # try parsing content-disposition. e.g.:
376 # attachment; filename="2018年度公开课计划表.xlsx" -->
377 # '=?utf-8?b?YXR0YWNobWVudDsgZmlsZW5hbWU9IjIwMTjlubTluqY=?=\r\n =?utf-8?b?'
378 # '5YWs5byA6K++6K6h5YiS6KGoLnhsc3gi?='
379
380 # This may be a re-implementation of email.utils.collapse_rfc2231_value()
381 # as mentioned in email.message.EmailMessage.get_param()
382
383 # The form is: "=?charset?encoding?encoded text?="
384 SPLIT_REGEX = '\r?\n *' # should be CRNL but some files miss the \r
385 ENCODED_WORD_REGEX = r'\s*=\?([^?]+)\?([^?]+)\?(.*)\?=\s*$'
386 LINE_REGEX = r'attachment\s*;\s*filename=(")?(.+)\1\s*$'
387 decoded = []
388 for word in re.split(SPLIT_REGEX, message['Content-Disposition']):
389 match = re.match(ENCODED_WORD_REGEX, word)
390 if not match:
391 break
392 charset, encoding, data = match.groups()
393 if encoding.lower() == 'b':
394 temp = b64decode(data)
395 elif encoding.lower() == 'q':
396 raise NotImplementedError('use quopri.decodestring, handle _')
397 else:
398 raise ValueError('not allowed according to wikipedia: "{}"'
399 .format(encoding))
400 decoded.append(temp.decode(charset))
401 decoded = u''.join(decoded)
402
403 match = re.match(LINE_REGEX, decoded)
404 if match:
405 return match.groups()[1]
406 return failobj