1 # This Python file uses the following encoding: utf-8
3 # The software in this package is distributed under the GNU General
4 # Public License version 2 (with a special exception described below).
6 # A copy of GNU General Public License (GPL) is included in this distribution,
7 # in the file COPYING.GPL.
9 # As a special exception, if other files instantiate templates or use macros
10 # or inline functions from this file, or you compile this file and link it
11 # with other works to produce a work based on this file, this file
12 # does not by itself cause the resulting work to be covered
13 # by the GNU General Public License.
15 # However the source code for this file must still be made available
16 # in accordance with section (3) of the GNU General Public License.
18 # This exception does not invalidate any other reasons why a work based
19 # on this file might be covered by the GNU General Public License.
21 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
26 ------------------------------------------------------
27 Utilities for dealing with email
29 .. seealso:: :py:mod:`pyi2ncommon.mail_validator`,
30 :py:mod:`pyi2ncommon.imap_mailbox`
32 Copyright: Intra2net AG
36 ------------------------------------------------------
41 from base64 import b64decode
44 from email.utils import parsedate_to_datetime
45 from email.parser import BytesParser
46 from email import policy
48 from . import arnied_wrapper
50 # outsourced source, import required for compatiblity
51 from .imap_mailbox import ImapMailbox # pylint: disable=unused-import
52 from .mail_validator import * # pylint: disable=unused-import
54 log = logging.getLogger('pyi2ncommon.mail_utils')
57 def prep_email_header(email_file, value, regex=None, criterion="envelopeto"):
59 Replace value in a provided email file.
61 :param str email_file: file to use for the replacement
62 :param str value: value to replace the first matched group with
63 :param regex: regular expression to use when replacing a header value
64 :type regex: str or None
65 :param str criterion: criterion to use for replacement, one
66 of 'envelopeto' or 'received'
67 :raises: :py:class:`ValueError` if the choice of criterion is invalid
69 In some cases this function is reusing arnied wrapper's cnf value
70 preparation but for email headers.
72 if criterion == "envelopeto":
73 logging.debug("Updating test emails' EnvelopeTo header")
74 arnied_wrapper.prep_cnf_value(email_file, value, regex=regex)
75 elif criterion == "received":
76 logging.debug("Updating test emails' Received header")
77 with open(email_file, "r") as file_handle:
78 email_text = file_handle.read()
79 email_text = re.sub(regex, value, email_text)
80 email_text = re.sub(regex, value, email_text)
81 with open(email_file, "w") as file_handle:
82 file_handle.write(email_text)
84 raise ValueError("Invalid header preparation criterion '%s'"
88 def create_users(usernames, config_file, params):
90 Create cyrus users from an absolute path to a user configuration file.
92 :param usernames: usernames of the created users
93 :type usernames: [str]
94 :param str config_file: template config file to use for each user
96 :param params: template config file to use for each user configuration
97 :type params: {str, str}
98 :raises: :py:class:`RuntimeError` if the user exists already or cannot be
101 log.info("Creating new cyrus users %s", ", ".join(usernames))
102 cyrus_user_path = params.get("cyrus_user_path",
103 "/datastore/imap-mails/user/")
105 # check for existence round
106 for username in usernames:
107 if os.path.exists(os.path.join(cyrus_user_path,
108 username.replace(".", "^"))):
109 raise RuntimeError("The user %s was already created" % username)
111 for username in usernames:
112 params["user"] = '%i: "%s"' % (-1, username)
113 params["user_fullname"] = username
114 params_regex = {"user": r'%s,(-?\d+: ".*")'}
115 arnied_wrapper.set_cnf_semidynamic([config_file],
116 params, params_regex)
118 for username in usernames:
119 if not os.path.exists(os.path.join(cyrus_user_path,
120 username.replace(".", "^"))):
121 raise RuntimeError("The user %s could not be created" % username)
123 log.info("Added new user %s", username)
124 log.info("%s users successfully created!", len(usernames))
127 def parse_mail_file(file_name, headers_only=True, attachment_filenames=False,
128 raise_on_defect=False, new_message_type=False):
130 Parse given email file (e.g. a banned message).
132 This is basically a `email.parser.BytesParser().parse(...)` with given
133 `headers_only` and policy selection, that can also handle BSMTP. As an
134 extra bonus, you can just request headers plus the names of attached files.
136 Removes the SMTP envelope surrounding the email if present. Only left-over
137 might be a line with a '.' at end of non-multipart messages if
138 `headers_only` is False.
140 :param str file_name: path to the file that contains the email text
141 :param bool headers_only: whether to parse only the email headers; set this
142 to False, e.g. if you want to check for
143 attachments using message.walk()
144 :param bool attachment_filenames: if you just want headers and names of
145 attached files, set `headers_only` and
147 :param bool raise_on_defect: whether to raise an error if email parser
148 encounters a defect (email policy `strict`) or
149 just add the defect to message's `defect`
151 :param bool new_message_type: whether to return the older
152 :py:class:`email.message.Message` (policy
153 `compat32`, our default), or the newer
154 :py:class:`email.message.EmailMessage` type
155 (policy `default`). Big difference!
156 :returns: either msg or 2-tuple `(msg, filenames)` if requested per arg
157 `attachment_filenames`
158 :rtype: :py:class:`email.message.Message` or
159 (:py:class:`email.message.Message`, (str)) or
160 one of these two with :py:class:`email.message.EmailMessage`
166 mail_policy = policy.default
168 mail_policy = policy.compat32
170 mail_policy += policy.strict
172 with open(file_name, 'rb') as read_handle:
173 line = read_handle.readline()
174 if line.startswith(b'EHLO'):
175 # there is a smtp header. skip to its end
176 while line.strip() != b'DATA':
177 line = read_handle.readline()
178 # the rest is the email plus a trailing '.' (ignored by parser if
181 read_handle.seek(0) # forget we read the first line already
182 start_pos = read_handle.tell()
183 msg = BytesParser(policy=mail_policy).parse(read_handle,
184 headersonly=headers_only)
186 if not attachment_filenames:
189 # otherwise need to parse complete message to get attachment file names
191 with open(file_name, 'rb') as read_handle:
192 read_handle.seek(start_pos)
193 full_msg = BytesParser(policy=mail_policy).parse(read_handle,
197 filenames = [get_filename(part) for part in full_msg.walk()]
198 return msg, tuple(filename for filename in filenames
199 if filename is not None)
202 def parse_mail_date(message):
204 Parse the 'Date' header of the given message.
206 Shortcut for :py:func:`email.utils.parsedate_to_datetime`.
208 This is no longer necessary for newer
209 :py:class:`email.message.EmailMessage` since the `Date` Header is
210 automatically parsed to a :py:class:`email.headerregistry.DateHeader`.
212 :param message: Email message
213 :type message: :py:class:`email.message.Message`
214 :returns: datetime from Email "Date" header or None if header not present
215 :rtype: :py:class:`datetime.datetime` or None
217 date_str = message.get('Date', '')
220 return parsedate_to_datetime(date_str)
223 def get_user_mail_files(user, mailbox='INBOX'):
225 Iterate over mails in given folder of given user; yields file names.
227 Works on local cyrus file system, not on imap server.
229 :param str user: Name of user whose mailbox is analyzed
230 :param str mailbox: name of mailbox to use, INBOX (default) for base
231 folder; name is modified using :py:func:`cyrus_escape`
232 :returns: nothing; but yields full path to messages on disc
234 # base folder of user mail
235 folder = os.path.join('/datastore', 'imap-mails', 'user', user)
237 # adapt paths like "INBOX/sub/dir" to "sub/dir"
238 subdirs = mailbox.split('/')
239 if subdirs[0].upper() == 'INBOX':
240 subdirs = subdirs[1:]
241 folder = os.path.join(folder,
242 *(cyrus_escape(subdir) for subdir in subdirs))
244 for filename in os.listdir(folder):
245 if not re.match(r'\d+\.', filename):
247 full_path = os.path.join(folder, filename)
251 def get_user_mail(user, mailbox='INBOX', **kwargs):
253 Iterate over mails in given folder of given user; yields parsed mails.
255 :param str user: see :py:func:`get_user_mail_files`
256 :param str mailbox: see :py:func:`get_user_mail_files`
257 :param dict kwargs: all other args are forwarded to
258 :py:func:`parse_mail_file`
259 :returns: nothing; but yields 2-tuples (path, email_msg) where first is the
260 full path to the message on disc, and the latter is the outcome
261 of :py:func:`parse_mail_file` for that file
263 for full_path in get_user_mail_files(user, mailbox):
264 yield full_path, parse_mail_file(full_path, **kwargs)
267 def get_message_text(filename, fallback_encoding='iso8859-1',
268 include_all_text=False):
270 Extract message text as string from email message.
272 Intended as complementary addition to get_user_mail, e.g. ::
274 for filename, msg in get_user_mail(user):
275 # rough filtering based on headers
276 if msg['Subject'] != 'Expected Subject':
278 # get message text for closer inspection
279 text = get_message_text(filename)
280 if 'Expected Text' not in text:
284 Finds the first part in message that is of type text/plain and decodes it
285 using encoding specified in mail or otherwise fallback encoding. If none
286 found takes first part of type "text/*", or otherwise just the first part.
288 If include_all_text is True, all text/* parts are included, with text/plain
291 :param str filename: complete path of message file in filesystem
292 :param str fallback_encoding: Encoding of email text if none is specified
294 :param bool include_all_text: include all "text/*" parts in returned text
295 :returns: text(s) of message
296 :rtype: [str] if include_all_text else str
299 msg = parse_mail_file(filename, headers_only=False)
300 for part in msg.walk():
301 if part.get_content_type() != 'text/plain':
303 encoding = part.get_content_charset(fallback_encoding)
304 result.append(part.get_payload(decode=True).decode(encoding))
306 if result and not include_all_text:
309 # no text/plain found. Try only "text/":
310 for part in msg.walk():
311 cont_type = part.get_content_type()
312 if cont_type.startswith('text/') and cont_type != 'text/plain':
313 encoding = part.get_content_charset(fallback_encoding)
314 result.append(part.get_payload(decode=True).decode(encoding))
317 if not include_all_text:
321 # no "text/" found. Just take first part
322 while msg.is_multipart():
323 msg = msg.get_payload(0)
325 encoding = msg.get_content_charset(fallback_encoding)
327 return [msg.get_payload(decode=True).decode(encoding), ]
328 return msg.get_payload(decode=True).decode(encoding)
331 def cyrus_escape(user_or_folder, keep_path=False, regex=False):
333 Convert names of users or mailbox folders to cyrus format.
335 quite a hack, just does the following hard-coded replacements:
338 * / --> . (except if keep_path is True)
339 * "u --> &APw- , "o --> &APY- , "a --> &AOQ-
340 (if need more: this is modified utf-7)
341 * inbox --> (the empty string)
343 Would like to use a general modified utf-7-encoder/decoder but python has
344 non built-in (see https://bugs.python.org/issue5305) and an extra lib like
345 https://bitbucket.org/mjs0/imapclient/ would be overkill. After all, we
346 control the input to this function via params and this is enough umlaut-
349 :param str user_or_folder: name of the user or folder string to escape
350 :param bool keep_path: do not replace '/' with '.' so can still use result
352 :param bool regex: result is used in grep or other regex, so ^, . and & are
353 escaped again with a backslash
354 :returns: escaped user or folder string
357 .. seealso:: :py:func:`cyrus_unescape`
359 temp = user_or_folder.replace('.', '^') \
360 .replace('ü', '&APw-').replace('ä', '&AOQ-') \
361 .replace('ö', '&APY-') \
362 .replace('inbox', '').replace('INBOX', '').replace('Inbox', '')
364 temp = temp.replace('/', '.')
366 return temp.replace('^', r'\^').replace('&', r'\&') \
367 .replace('.', r'\.').replace('$', r'\$')
371 def cyrus_unescape(user_or_folder):
373 Undo effects of :py:func:`cyrus_escape` (but not all of them).
375 :param str user_or_folder: name of the user or folder string to unescape
376 :returns: unescaped user or folder string
379 if user_or_folder == '':
381 return user_or_folder.replace('.', '/')\
382 .replace(r'\^', '.').replace('^', '.')
385 def get_filename(message, failobj=None, do_unwrap=True):
387 Get filename of a message part, even if it is base64-encoded.
389 For attachments with base64-encoded file name, the
390 :py:func:`email.message.Message.get_filename()` does not work. This
391 function tries that first and if it fails tries to interprete the
392 Content-Disposition of the message part. If all fails, returns `failobj`.
394 Only for ascii filenames: also unwraps file names if they are line-wrapped.
395 But note that this may remove too much whitespace from the filename if
396 line-wrapping happend in the same position as the filename's whitespace.
397 To get unwrapped version, set param `do_unwrap` to `False`.
399 See also: https://en.wikipedia.org/wiki/MIME#Encoded-Word
401 :param message: message part, e.g. from
402 :py:meth:`email.message.Message.walk`
403 :type message: :py:class:`email.message.Message` or
404 :py:class:`email.message.EmailMessage`
405 :param failobj: object to return in case of failure (defaults to None)
406 :param bool do_unwrap: undo line-break inserted by mail-creator; may remove
407 whitespace from file name; only applies to ascii
409 :returns: either a string or failobj
411 # try the old way and unwrap
412 filename = message.get_filename(failobj)
414 if isinstance(filename, bytes) and not filename.startswith(b'=?') \
415 and not filename.endswith(b'?='):
416 filename = filename.decode('utf8')
418 if isinstance(filename, str):
420 return re.sub('[\\r\\n]+', '', filename)
423 if 'Content-Disposition' not in message:
426 # try parsing content-disposition. e.g.:
427 # attachment; filename="2018年度公开课计划表.xlsx" -->
428 # '=?utf-8?b?YXR0YWNobWVudDsgZmlsZW5hbWU9IjIwMTjlubTluqY=?=\r\n =?utf-8?b?'
429 # '5YWs5byA6K++6K6h5YiS6KGoLnhsc3gi?='
431 # This may be a re-implementation of email.utils.collapse_rfc2231_value()
432 # as mentioned in email.message.EmailMessage.get_param()
434 # The form is: "=?charset?encoding?encoded text?="
435 SPLIT_REGEX = '\r?\n *' # should be CRNL but some files miss the \r
436 ENCODED_WORD_REGEX = r'\s*=\?([^?]+)\?([^?]+)\?(.*)\?=\s*$'
437 LINE_REGEX = r'attachment\s*;\s*filename=(")?(.+)\1\s*$'
439 for word in re.split(SPLIT_REGEX, message['Content-Disposition']):
440 match = re.match(ENCODED_WORD_REGEX, word)
443 charset, encoding, data = match.groups()
444 if encoding.lower() == 'b':
445 temp = b64decode(data)
446 elif encoding.lower() == 'q':
447 raise NotImplementedError('use quopri.decodestring, handle _')
449 raise ValueError('not allowed according to wikipedia: "{}"'
451 decoded.append(temp.decode(charset))
452 decoded = u''.join(decoded)
454 match = re.match(LINE_REGEX, decoded)
456 return match.groups()[1]