Commit | Line | Data |
---|---|---|
f49f6323 | 1 | # This Python file uses the following encoding: utf-8 |
11cbb815 PD |
2 | |
3 | # The software in this package is distributed under the GNU General | |
4 | # Public License version 2 (with a special exception described below). | |
5 | # | |
6 | # A copy of GNU General Public License (GPL) is included in this distribution, | |
7 | # in the file COPYING.GPL. | |
8 | # | |
9 | # As a special exception, if other files instantiate templates or use macros | |
10 | # or inline functions from this file, or you compile this file and link it | |
11 | # with other works to produce a work based on this file, this file | |
12 | # does not by itself cause the resulting work to be covered | |
13 | # by the GNU General Public License. | |
14 | # | |
15 | # However the source code for this file must still be made available | |
16 | # in accordance with section (3) of the GNU General Public License. | |
17 | # | |
18 | # This exception does not invalidate any other reasons why a work based | |
19 | # on this file might be covered by the GNU General Public License. | |
20 | # | |
21 | # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com> | |
22 | ||
f49f6323 PD |
23 | """ |
24 | ||
25 | SUMMARY | |
26 | ------------------------------------------------------ | |
2ed7100d CH |
27 | Utilities for dealing with email |
28 | ||
29 | .. seealso:: :py:mod:`pyi2ncommon.mail_validator`, | |
30 | :py:mod:`pyi2ncommon.imap_mailbox` | |
f49f6323 PD |
31 | |
32 | Copyright: Intra2net AG | |
33 | ||
34 | ||
35 | INTERFACE | |
36 | ------------------------------------------------------ | |
37 | ||
38 | """ | |
39 | ||
b36398e7 | 40 | from base64 import b64decode |
67177844 | 41 | from email.utils import parsedate_to_datetime |
1d21262c | 42 | from email.parser import BytesParser |
4b44f515 | 43 | from email import policy |
f49f6323 | 44 | |
67177844 CH |
45 | # outsourced source, import required for compatiblity |
46 | from .imap_mailbox import ImapMailbox # pylint: disable=unused-import | |
47 | from .mail_validator import * # pylint: disable=unused-import | |
4965c436 | 48 | from .sysmisc import replace_file_regex |
f49f6323 | 49 | |
67177844 | 50 | log = logging.getLogger('pyi2ncommon.mail_utils') |
f49f6323 PD |
51 | |
52 | ||
53 | def prep_email_header(email_file, value, regex=None, criterion="envelopeto"): | |
54 | """ | |
55 | Replace value in a provided email file. | |
56 | ||
57 | :param str email_file: file to use for the replacement | |
58 | :param str value: value to replace the first matched group with | |
59 | :param regex: regular expression to use when replacing a header value | |
60 | :type regex: str or None | |
61 | :param str criterion: criterion to use for replacement, one | |
62 | of 'envelopeto' or 'received' | |
63 | :raises: :py:class:`ValueError` if the choice of criterion is invalid | |
64 | ||
fd562d9b PD |
65 | ..todo:: In some cases this function is reusing arnied wrapper's cnf |
66 | value preparation but for email headers. | |
f49f6323 PD |
67 | """ |
68 | if criterion == "envelopeto": | |
69 | logging.debug("Updating test emails' EnvelopeTo header") | |
4965c436 | 70 | replace_file_regex(email_file, value, regex=regex) |
f49f6323 PD |
71 | elif criterion == "received": |
72 | logging.debug("Updating test emails' Received header") | |
e108b7d4 CH |
73 | with open(email_file, "r") as file_handle: |
74 | email_text = file_handle.read() | |
f49f6323 PD |
75 | email_text = re.sub(regex, value, email_text) |
76 | email_text = re.sub(regex, value, email_text) | |
e108b7d4 CH |
77 | with open(email_file, "w") as file_handle: |
78 | file_handle.write(email_text) | |
f49f6323 | 79 | else: |
e108b7d4 CH |
80 | raise ValueError("Invalid header preparation criterion '%s'" |
81 | % criterion) | |
f49f6323 PD |
82 | |
83 | ||
4b44f515 CH |
84 | def parse_mail_file(file_name, headers_only=True, attachment_filenames=False, |
85 | raise_on_defect=False, new_message_type=False): | |
f49f6323 PD |
86 | """ |
87 | Parse given email file (e.g. a banned message). | |
88 | ||
1d21262c | 89 | This is basically a `email.parser.BytesParser().parse(...)` with given |
4b44f515 CH |
90 | `headers_only` and policy selection, that can also handle BSMTP. As an |
91 | extra bonus, you can just request headers plus the names of attached files. | |
f49f6323 PD |
92 | |
93 | Removes the SMTP envelope surrounding the email if present. Only left-over | |
e108b7d4 | 94 | might be a line with a '.' at end of non-multipart messages if |
df036fbe | 95 | `headers_only` is False. |
b359b15c | 96 | |
4b44f515 | 97 | :param str file_name: path to the file that contains the email text |
b359b15c CH |
98 | :param bool headers_only: whether to parse only the email headers; set this |
99 | to False, e.g. if you want to check for | |
100 | attachments using message.walk() | |
101 | :param bool attachment_filenames: if you just want headers and names of | |
102 | attached files, set `headers_only` and | |
103 | this to True. | |
4b44f515 CH |
104 | :param bool raise_on_defect: whether to raise an error if email parser |
105 | encounters a defect (email policy `strict`) or | |
106 | just add the defect to message's `defect` | |
107 | attribute | |
108 | :param bool new_message_type: whether to return the older | |
109 | :py:class:`email.message.Message` (policy | |
110 | `compat32`, our default), or the newer | |
111 | :py:class:`email.message.EmailMessage` type | |
112 | (policy `default`). Big difference! | |
b359b15c CH |
113 | :returns: either msg or 2-tuple `(msg, filenames)` if requested per arg |
114 | `attachment_filenames` | |
115 | :rtype: :py:class:`email.message.Message` or | |
4b44f515 CH |
116 | (:py:class:`email.message.Message`, (str)) or |
117 | one of these two with :py:class:`email.message.EmailMessage` | |
f49f6323 | 118 | """ |
b359b15c CH |
119 | msg = None |
120 | start_pos = 0 | |
4b44f515 CH |
121 | |
122 | if new_message_type: | |
123 | mail_policy = policy.default | |
124 | else: | |
125 | mail_policy = policy.compat32 | |
126 | if raise_on_defect: | |
127 | mail_policy += policy.strict | |
128 | ||
1d21262c | 129 | with open(file_name, 'rb') as read_handle: |
f49f6323 | 130 | line = read_handle.readline() |
1d21262c | 131 | if line.startswith(b'EHLO'): |
f49f6323 | 132 | # there is a smtp header. skip to its end |
1d21262c | 133 | while line.strip() != b'DATA': |
f49f6323 PD |
134 | line = read_handle.readline() |
135 | # the rest is the email plus a trailing '.' (ignored by parser if | |
136 | # multipart) | |
137 | else: | |
138 | read_handle.seek(0) # forget we read the first line already | |
b359b15c | 139 | start_pos = read_handle.tell() |
4b44f515 CH |
140 | msg = BytesParser(policy=mail_policy).parse(read_handle, |
141 | headersonly=headers_only) | |
b359b15c CH |
142 | |
143 | if not attachment_filenames: | |
144 | return msg | |
145 | ||
146 | # otherwise need to parse complete message to get attachment file names | |
147 | if headers_only: | |
1d21262c | 148 | with open(file_name, 'rb') as read_handle: |
b359b15c | 149 | read_handle.seek(start_pos) |
4b44f515 CH |
150 | full_msg = BytesParser(policy=mail_policy).parse(read_handle, |
151 | headersonly=False) | |
b359b15c CH |
152 | else: |
153 | full_msg = msg | |
154 | filenames = [get_filename(part) for part in full_msg.walk()] | |
155 | return msg, tuple(filename for filename in filenames | |
156 | if filename is not None) | |
f49f6323 PD |
157 | |
158 | ||
58414aec CH |
159 | def parse_mail_date(message): |
160 | """ | |
161 | Parse the 'Date' header of the given message. | |
162 | ||
163 | Shortcut for :py:func:`email.utils.parsedate_to_datetime`. | |
164 | ||
165 | This is no longer necessary for newer | |
166 | :py:class:`email.message.EmailMessage` since the `Date` Header is | |
167 | automatically parsed to a :py:class:`email.headerregistry.DateHeader`. | |
168 | ||
169 | :param message: Email message | |
170 | :type message: :py:class:`email.message.Message` | |
171 | :returns: datetime from Email "Date" header or None if header not present | |
172 | :rtype: :py:class:`datetime.datetime` or None | |
173 | """ | |
174 | date_str = message.get('Date', '') | |
175 | if not date_str: | |
176 | return None | |
177 | return parsedate_to_datetime(date_str) | |
178 | ||
179 | ||
f44055b0 CH |
180 | def get_user_mail_files(user, mailbox='INBOX'): |
181 | """ | |
2ed7100d CH |
182 | Iterate over mails in given folder of given user; yields file names. |
183 | ||
184 | Works on local cyrus file system, not on imap server. | |
f44055b0 | 185 | |
2ed7100d CH |
186 | :param str user: Name of user whose mailbox is analyzed |
187 | :param str mailbox: name of mailbox to use, INBOX (default) for base | |
188 | folder; name is modified using :py:func:`cyrus_escape` | |
f44055b0 CH |
189 | :returns: nothing; but yields full path to messages on disc |
190 | """ | |
191 | # base folder of user mail | |
192 | folder = os.path.join('/datastore', 'imap-mails', 'user', user) | |
193 | ||
2ed7100d | 194 | # adapt paths like "INBOX/sub/dir" to "sub/dir" |
f44055b0 CH |
195 | subdirs = mailbox.split('/') |
196 | if subdirs[0].upper() == 'INBOX': | |
197 | subdirs = subdirs[1:] | |
198 | folder = os.path.join(folder, | |
199 | *(cyrus_escape(subdir) for subdir in subdirs)) | |
200 | ||
201 | for filename in os.listdir(folder): | |
202 | if not re.match(r'\d+\.', filename): | |
203 | continue | |
204 | full_path = os.path.join(folder, filename) | |
205 | yield full_path | |
206 | ||
207 | ||
f49f6323 PD |
208 | def get_user_mail(user, mailbox='INBOX', **kwargs): |
209 | """ | |
e108b7d4 | 210 | Iterate over mails in given folder of given user; yields parsed mails. |
f49f6323 | 211 | |
2ed7100d CH |
212 | :param str user: see :py:func:`get_user_mail_files` |
213 | :param str mailbox: see :py:func:`get_user_mail_files` | |
f49f6323 PD |
214 | :param dict kwargs: all other args are forwarded to |
215 | :py:func:`parse_mail_file` | |
216 | :returns: nothing; but yields 2-tuples (path, email_msg) where first is the | |
217 | full path to the message on disc, and the latter is the outcome | |
218 | of :py:func:`parse_mail_file` for that file | |
219 | """ | |
f44055b0 CH |
220 | for full_path in get_user_mail_files(user, mailbox): |
221 | yield full_path, parse_mail_file(full_path, **kwargs) | |
f49f6323 PD |
222 | |
223 | ||
f4dec410 CH |
224 | def get_message_text(filename, fallback_encoding='iso8859-1', |
225 | include_all_text=False): | |
226 | """ | |
227 | Extract message text as string from email message. | |
228 | ||
229 | Intended as complementary addition to get_user_mail, e.g. :: | |
230 | ||
231 | for filename, msg in get_user_mail(user): | |
232 | # rough filtering based on headers | |
233 | if msg['Subject'] != 'Expected Subject': | |
234 | continue | |
235 | # get message text for closer inspection | |
236 | text = get_message_text(filename) | |
237 | if 'Expected Text' not in text: | |
238 | continue | |
239 | ... | |
240 | ||
df036fbe | 241 | Finds the first part in message that is of type `text/plain` and decodes it |
f4dec410 | 242 | using encoding specified in mail or otherwise fallback encoding. If none |
df036fbe | 243 | found takes first part of type `text/*`, or otherwise just the first part. |
f4dec410 | 244 | |
df036fbe | 245 | If include_all_text is True, all `text/*` parts are included, with `text/plain` |
f4dec410 CH |
246 | being the first. |
247 | ||
248 | :param str filename: complete path of message file in filesystem | |
2ed7100d CH |
249 | :param str fallback_encoding: Encoding of email text if none is specified |
250 | in mail. | |
df036fbe | 251 | :param bool include_all_text: include all `text/*` parts in returned text |
f4dec410 CH |
252 | :returns: text(s) of message |
253 | :rtype: [str] if include_all_text else str | |
254 | """ | |
255 | result = [] | |
256 | msg = parse_mail_file(filename, headers_only=False) | |
257 | for part in msg.walk(): | |
258 | if part.get_content_type() != 'text/plain': | |
259 | continue | |
260 | encoding = part.get_content_charset(fallback_encoding) | |
261 | result.append(part.get_payload(decode=True).decode(encoding)) | |
262 | ||
263 | if result and not include_all_text: | |
264 | return result[0] | |
265 | ||
266 | # no text/plain found. Try only "text/": | |
267 | for part in msg.walk(): | |
268 | cont_type = part.get_content_type() | |
269 | if cont_type.startswith('text/') and cont_type != 'text/plain': | |
270 | encoding = part.get_content_charset(fallback_encoding) | |
271 | result.append(part.get_payload(decode=True).decode(encoding)) | |
272 | ||
273 | if result: | |
274 | if not include_all_text: | |
275 | return result[0] | |
276 | return result | |
277 | ||
278 | # no "text/" found. Just take first part | |
279 | while msg.is_multipart(): | |
280 | msg = msg.get_payload(0) | |
281 | ||
282 | encoding = msg.get_content_charset(fallback_encoding) | |
283 | if include_all_text: | |
284 | return [msg.get_payload(decode=True).decode(encoding), ] | |
285 | return msg.get_payload(decode=True).decode(encoding) | |
286 | ||
287 | ||
f49f6323 PD |
288 | def cyrus_escape(user_or_folder, keep_path=False, regex=False): |
289 | """ | |
e108b7d4 | 290 | Convert names of users or mailbox folders to cyrus format. |
f49f6323 PD |
291 | |
292 | quite a hack, just does the following hard-coded replacements: | |
293 | ||
294 | * . --> ^ | |
295 | * / --> . (except if keep_path is True) | |
296 | * "u --> &APw- , "o --> &APY- , "a --> &AOQ- | |
297 | (if need more: this is modified utf-7) | |
298 | * inbox --> (the empty string) | |
299 | ||
300 | Would like to use a general modified utf-7-encoder/decoder but python has | |
7628bc48 | 301 | none builtin (see https://bugs.python.org/issue5305) and an extra lib like |
f49f6323 PD |
302 | https://bitbucket.org/mjs0/imapclient/ would be overkill. After all, we |
303 | control the input to this function via params and this is enough umlaut- | |
304 | testing I think... | |
305 | ||
306 | :param str user_or_folder: name of the user or folder string to escape | |
307 | :param bool keep_path: do not replace '/' with '.' so can still use result | |
308 | as path name | |
309 | :param bool regex: result is used in grep or other regex, so ^, . and & are | |
310 | escaped again with a backslash | |
311 | :returns: escaped user or folder string | |
312 | :rtype: str | |
313 | ||
314 | .. seealso:: :py:func:`cyrus_unescape` | |
315 | """ | |
316 | temp = user_or_folder.replace('.', '^') \ | |
317 | .replace('ü', '&APw-').replace('ä', '&AOQ-') \ | |
318 | .replace('ö', '&APY-') \ | |
319 | .replace('inbox', '').replace('INBOX', '').replace('Inbox', '') | |
320 | if not keep_path: | |
321 | temp = temp.replace('/', '.') | |
322 | if regex: | |
323 | return temp.replace('^', r'\^').replace('&', r'\&') \ | |
324 | .replace('.', r'\.').replace('$', r'\$') | |
2ed7100d | 325 | return temp |
f49f6323 PD |
326 | |
327 | ||
328 | def cyrus_unescape(user_or_folder): | |
329 | """ | |
330 | Undo effects of :py:func:`cyrus_escape` (but not all of them). | |
331 | ||
332 | :param str user_or_folder: name of the user or folder string to unescape | |
333 | :returns: unescaped user or folder string | |
334 | :rtype: str | |
335 | """ | |
336 | if user_or_folder == '': | |
337 | return 'inbox' | |
338 | return user_or_folder.replace('.', '/')\ | |
339 | .replace(r'\^', '.').replace('^', '.') | |
b36398e7 CH |
340 | |
341 | ||
342 | def get_filename(message, failobj=None, do_unwrap=True): | |
343 | """ | |
e108b7d4 | 344 | Get filename of a message part, even if it is base64-encoded. |
b36398e7 CH |
345 | |
346 | For attachments with base64-encoded file name, the | |
2ed7100d CH |
347 | :py:func:`email.message.Message.get_filename()` does not work. This |
348 | function tries that first and if it fails tries to interprete the | |
349 | Content-Disposition of the message part. If all fails, returns `failobj`. | |
b36398e7 CH |
350 | |
351 | Only for ascii filenames: also unwraps file names if they are line-wrapped. | |
352 | But note that this may remove too much whitespace from the filename if | |
7628bc48 | 353 | line-wrapping happened in the same position as the filename's whitespace. |
b36398e7 CH |
354 | To get unwrapped version, set param `do_unwrap` to `False`. |
355 | ||
356 | See also: https://en.wikipedia.org/wiki/MIME#Encoded-Word | |
357 | ||
358 | :param message: message part, e.g. from | |
359 | :py:meth:`email.message.Message.walk` | |
4b44f515 CH |
360 | :type message: :py:class:`email.message.Message` or |
361 | :py:class:`email.message.EmailMessage` | |
b36398e7 CH |
362 | :param failobj: object to return in case of failure (defaults to None) |
363 | :param bool do_unwrap: undo line-break inserted by mail-creator; may remove | |
364 | whitespace from file name; only applies to ascii | |
365 | file names | |
366 | :returns: either a string or failobj | |
367 | """ | |
368 | # try the old way and unwrap | |
369 | filename = message.get_filename(failobj) | |
370 | ||
371 | if isinstance(filename, bytes) and not filename.startswith(b'=?') \ | |
372 | and not filename.endswith(b'?='): | |
373 | filename = filename.decode('utf8') | |
374 | ||
375 | if isinstance(filename, str): | |
376 | if do_unwrap: | |
377 | return re.sub('[\\r\\n]+', '', filename) | |
378 | return filename | |
379 | ||
380 | if 'Content-Disposition' not in message: | |
381 | return failobj | |
382 | ||
383 | # try parsing content-disposition. e.g.: | |
384 | # attachment; filename="2018年度公开课计划表.xlsx" --> | |
385 | # '=?utf-8?b?YXR0YWNobWVudDsgZmlsZW5hbWU9IjIwMTjlubTluqY=?=\r\n =?utf-8?b?' | |
386 | # '5YWs5byA6K++6K6h5YiS6KGoLnhsc3gi?=' | |
387 | ||
388 | # This may be a re-implementation of email.utils.collapse_rfc2231_value() | |
4b44f515 | 389 | # as mentioned in email.message.EmailMessage.get_param() |
b36398e7 CH |
390 | |
391 | # The form is: "=?charset?encoding?encoded text?=" | |
392 | SPLIT_REGEX = '\r?\n *' # should be CRNL but some files miss the \r | |
393 | ENCODED_WORD_REGEX = r'\s*=\?([^?]+)\?([^?]+)\?(.*)\?=\s*$' | |
394 | LINE_REGEX = r'attachment\s*;\s*filename=(")?(.+)\1\s*$' | |
395 | decoded = [] | |
396 | for word in re.split(SPLIT_REGEX, message['Content-Disposition']): | |
397 | match = re.match(ENCODED_WORD_REGEX, word) | |
398 | if not match: | |
399 | break | |
400 | charset, encoding, data = match.groups() | |
401 | if encoding.lower() == 'b': | |
402 | temp = b64decode(data) | |
403 | elif encoding.lower() == 'q': | |
404 | raise NotImplementedError('use quopri.decodestring, handle _') | |
405 | else: | |
406 | raise ValueError('not allowed according to wikipedia: "{}"' | |
407 | .format(encoding)) | |
408 | decoded.append(temp.decode(charset)) | |
409 | decoded = u''.join(decoded) | |
410 | ||
411 | match = re.match(LINE_REGEX, decoded) | |
412 | if match: | |
413 | return match.groups()[1] | |
414 | return failobj |