| 1 | # This Python file uses the following encoding: utf-8 |
| 2 | |
| 3 | # The software in this package is distributed under the GNU General |
| 4 | # Public License version 2 (with a special exception described below). |
| 5 | # |
| 6 | # A copy of GNU General Public License (GPL) is included in this distribution, |
| 7 | # in the file COPYING.GPL. |
| 8 | # |
| 9 | # As a special exception, if other files instantiate templates or use macros |
| 10 | # or inline functions from this file, or you compile this file and link it |
| 11 | # with other works to produce a work based on this file, this file |
| 12 | # does not by itself cause the resulting work to be covered |
| 13 | # by the GNU General Public License. |
| 14 | # |
| 15 | # However the source code for this file must still be made available |
| 16 | # in accordance with section (3) of the GNU General Public License. |
| 17 | # |
| 18 | # This exception does not invalidate any other reasons why a work based |
| 19 | # on this file might be covered by the GNU General Public License. |
| 20 | # |
| 21 | # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com> |
| 22 | |
| 23 | """ |
| 24 | Utilities for dealing with email. |
| 25 | |
| 26 | .. seealso:: :py:mod:`pyi2ncommon.mail_validator`, |
| 27 | :py:mod:`pyi2ncommon.imap_mailbox` |
| 28 | |
| 29 | Copyright: Intra2net AG |
| 30 | """ |
| 31 | |
| 32 | from base64 import b64decode |
| 33 | from email.utils import parsedate_to_datetime |
| 34 | from email.parser import BytesParser |
| 35 | from email import policy |
| 36 | |
| 37 | # outsourced source, import required for compatibility |
| 38 | from .imap_mailbox import ImapMailbox # pylint: disable=unused-import |
| 39 | from .mail_validator import * # pylint: disable=unused-import |
| 40 | from .sysmisc import replace_file_regex |
| 41 | |
| 42 | log = logging.getLogger('pyi2ncommon.mail_utils') |
| 43 | |
| 44 | |
| 45 | def prep_email_header(email_file, value, regex=None, criterion="envelopeto"): |
| 46 | """ |
| 47 | Replace value in a provided email file. |
| 48 | |
| 49 | :param str email_file: file to use for the replacement |
| 50 | :param str value: value to replace the first matched group with |
| 51 | :param regex: regular expression to use when replacing a header value |
| 52 | :type regex: str or None |
| 53 | :param str criterion: criterion to use for replacement, one |
| 54 | of 'envelopeto' or 'received' |
| 55 | :raises: :py:class:`ValueError` if the choice of criterion is invalid |
| 56 | |
| 57 | ..todo:: In some cases this function is reusing arnied wrapper's cnf |
| 58 | value preparation but for email headers. |
| 59 | """ |
| 60 | if criterion == "envelopeto": |
| 61 | logging.debug("Updating test emails' EnvelopeTo header") |
| 62 | replace_file_regex(email_file, value, regex=regex) |
| 63 | elif criterion == "received": |
| 64 | logging.debug("Updating test emails' Received header") |
| 65 | with open(email_file, "r") as file_handle: |
| 66 | email_text = file_handle.read() |
| 67 | email_text = re.sub(regex, value, email_text) |
| 68 | email_text = re.sub(regex, value, email_text) |
| 69 | with open(email_file, "w") as file_handle: |
| 70 | file_handle.write(email_text) |
| 71 | else: |
| 72 | raise ValueError("Invalid header preparation criterion '%s'" |
| 73 | % criterion) |
| 74 | |
| 75 | |
| 76 | def parse_mail_file(file_name, headers_only=True, attachment_filenames=False, |
| 77 | raise_on_defect=False, new_message_type=False): |
| 78 | """ |
| 79 | Parse given email file (e.g. a banned message). |
| 80 | |
| 81 | This is basically a `email.parser.BytesParser().parse(...)` with given |
| 82 | `headers_only` and policy selection, that can also handle BSMTP. As an |
| 83 | extra bonus, you can just request headers plus the names of attached files. |
| 84 | |
| 85 | Removes the SMTP envelope surrounding the email if present. Only left-over |
| 86 | might be a line with a '.' at end of non-multipart messages if |
| 87 | `headers_only` is False. |
| 88 | |
| 89 | :param str file_name: path to the file that contains the email text |
| 90 | :param bool headers_only: whether to parse only the email headers; set this |
| 91 | to False, e.g. if you want to check for |
| 92 | attachments using message.walk() |
| 93 | :param bool attachment_filenames: if you just want headers and names of |
| 94 | attached files, set `headers_only` and |
| 95 | this to True. |
| 96 | :param bool raise_on_defect: whether to raise an error if email parser |
| 97 | encounters a defect (email policy `strict`) or |
| 98 | just add the defect to message's `defect` |
| 99 | attribute |
| 100 | :param bool new_message_type: whether to return the older |
| 101 | :py:class:`email.message.Message` (policy |
| 102 | `compat32`, our default), or the newer |
| 103 | :py:class:`email.message.EmailMessage` type |
| 104 | (policy `default`). Big difference! |
| 105 | :returns: either msg or 2-tuple `(msg, filenames)` if requested per arg |
| 106 | `attachment_filenames` |
| 107 | :rtype: :py:class:`email.message.Message` or |
| 108 | (:py:class:`email.message.Message`, (str)) or |
| 109 | one of these two with :py:class:`email.message.EmailMessage` |
| 110 | """ |
| 111 | msg = None |
| 112 | start_pos = 0 |
| 113 | |
| 114 | if new_message_type: |
| 115 | mail_policy = policy.default |
| 116 | else: |
| 117 | mail_policy = policy.compat32 |
| 118 | if raise_on_defect: |
| 119 | mail_policy += policy.strict |
| 120 | |
| 121 | with open(file_name, 'rb') as read_handle: |
| 122 | line = read_handle.readline() |
| 123 | if line.startswith(b'EHLO'): |
| 124 | # there is a smtp header. skip to its end |
| 125 | while line.strip() != b'DATA': |
| 126 | line = read_handle.readline() |
| 127 | # the rest is the email plus a trailing '.' (ignored by parser if |
| 128 | # multipart) |
| 129 | else: |
| 130 | read_handle.seek(0) # forget we read the first line already |
| 131 | start_pos = read_handle.tell() |
| 132 | msg = BytesParser(policy=mail_policy).parse(read_handle, |
| 133 | headersonly=headers_only) |
| 134 | |
| 135 | if not attachment_filenames: |
| 136 | return msg |
| 137 | |
| 138 | # otherwise need to parse complete message to get attachment file names |
| 139 | if headers_only: |
| 140 | with open(file_name, 'rb') as read_handle: |
| 141 | read_handle.seek(start_pos) |
| 142 | full_msg = BytesParser(policy=mail_policy).parse(read_handle, |
| 143 | headersonly=False) |
| 144 | else: |
| 145 | full_msg = msg |
| 146 | filenames = [get_filename(part) for part in full_msg.walk()] |
| 147 | return msg, tuple(filename for filename in filenames |
| 148 | if filename is not None) |
| 149 | |
| 150 | |
| 151 | def parse_mail_date(message): |
| 152 | """ |
| 153 | Parse the 'Date' header of the given message. |
| 154 | |
| 155 | Shortcut for :py:func:`email.utils.parsedate_to_datetime`. |
| 156 | |
| 157 | This is no longer necessary for newer |
| 158 | :py:class:`email.message.EmailMessage` since the `Date` Header is |
| 159 | automatically parsed to a :py:class:`email.headerregistry.DateHeader`. |
| 160 | |
| 161 | :param message: Email message |
| 162 | :type message: :py:class:`email.message.Message` |
| 163 | :returns: datetime from Email "Date" header or None if header not present |
| 164 | :rtype: :py:class:`datetime.datetime` or None |
| 165 | """ |
| 166 | date_str = message.get('Date', '') |
| 167 | if not date_str: |
| 168 | return None |
| 169 | return parsedate_to_datetime(date_str) |
| 170 | |
| 171 | |
| 172 | def get_user_mail_files(user, mailbox='INBOX'): |
| 173 | """ |
| 174 | Iterate over mails in given folder of given user; yields file names. |
| 175 | |
| 176 | Works on local cyrus file system, not on imap server. |
| 177 | |
| 178 | :param str user: Name of user whose mailbox is analyzed |
| 179 | :param str mailbox: name of mailbox to use, INBOX (default) for base |
| 180 | folder; name is modified using :py:func:`cyrus_escape` |
| 181 | :returns: nothing; but yields full path to messages on disc |
| 182 | """ |
| 183 | # base folder of user mail |
| 184 | folder = os.path.join('/datastore', 'imap-mails', 'user', user) |
| 185 | |
| 186 | # adapt paths like "INBOX/sub/dir" to "sub/dir" |
| 187 | subdirs = mailbox.split('/') |
| 188 | if subdirs[0].upper() == 'INBOX': |
| 189 | subdirs = subdirs[1:] |
| 190 | folder = os.path.join(folder, |
| 191 | *(cyrus_escape(subdir) for subdir in subdirs)) |
| 192 | |
| 193 | for filename in os.listdir(folder): |
| 194 | if not re.match(r'\d+\.', filename): |
| 195 | continue |
| 196 | full_path = os.path.join(folder, filename) |
| 197 | yield full_path |
| 198 | |
| 199 | |
| 200 | def get_user_mail(user, mailbox='INBOX', **kwargs): |
| 201 | """ |
| 202 | Iterate over mails in given folder of given user; yields parsed mails. |
| 203 | |
| 204 | :param str user: see :py:func:`get_user_mail_files` |
| 205 | :param str mailbox: see :py:func:`get_user_mail_files` |
| 206 | :param dict kwargs: all other args are forwarded to |
| 207 | :py:func:`parse_mail_file` |
| 208 | :returns: nothing; but yields 2-tuples (path, email_msg) where first is the |
| 209 | full path to the message on disc, and the latter is the outcome |
| 210 | of :py:func:`parse_mail_file` for that file |
| 211 | """ |
| 212 | for full_path in get_user_mail_files(user, mailbox): |
| 213 | yield full_path, parse_mail_file(full_path, **kwargs) |
| 214 | |
| 215 | |
| 216 | def get_message_text(filename, fallback_encoding='iso8859-1', |
| 217 | include_all_text=False): |
| 218 | """ |
| 219 | Extract message text as string from email message. |
| 220 | |
| 221 | Intended as complementary addition to get_user_mail, e.g. :: |
| 222 | |
| 223 | for filename, msg in get_user_mail(user): |
| 224 | # rough filtering based on headers |
| 225 | if msg['Subject'] != 'Expected Subject': |
| 226 | continue |
| 227 | # get message text for closer inspection |
| 228 | text = get_message_text(filename) |
| 229 | if 'Expected Text' not in text: |
| 230 | continue |
| 231 | ... |
| 232 | |
| 233 | Finds the first part in message that is of type `text/plain` and decodes it |
| 234 | using encoding specified in mail or otherwise fallback encoding. If none |
| 235 | found takes first part of type `text/*`, or otherwise just the first part. |
| 236 | |
| 237 | If include_all_text is True, all `text/*` parts are included, with `text/plain` |
| 238 | being the first. |
| 239 | |
| 240 | :param str filename: complete path of message file in filesystem |
| 241 | :param str fallback_encoding: Encoding of email text if none is specified |
| 242 | in mail. |
| 243 | :param bool include_all_text: include all `text/*` parts in returned text |
| 244 | :returns: text(s) of message |
| 245 | :rtype: [str] if include_all_text else str |
| 246 | """ |
| 247 | result = [] |
| 248 | msg = parse_mail_file(filename, headers_only=False) |
| 249 | for part in msg.walk(): |
| 250 | if part.get_content_type() != 'text/plain': |
| 251 | continue |
| 252 | encoding = part.get_content_charset(fallback_encoding) |
| 253 | result.append(part.get_payload(decode=True).decode(encoding)) |
| 254 | |
| 255 | if result and not include_all_text: |
| 256 | return result[0] |
| 257 | |
| 258 | # no text/plain found. Try only "text/": |
| 259 | for part in msg.walk(): |
| 260 | cont_type = part.get_content_type() |
| 261 | if cont_type.startswith('text/') and cont_type != 'text/plain': |
| 262 | encoding = part.get_content_charset(fallback_encoding) |
| 263 | result.append(part.get_payload(decode=True).decode(encoding)) |
| 264 | |
| 265 | if result: |
| 266 | if not include_all_text: |
| 267 | return result[0] |
| 268 | return result |
| 269 | |
| 270 | # no "text/" found. Just take first part |
| 271 | while msg.is_multipart(): |
| 272 | msg = msg.get_payload(0) |
| 273 | |
| 274 | encoding = msg.get_content_charset(fallback_encoding) |
| 275 | if include_all_text: |
| 276 | return [msg.get_payload(decode=True).decode(encoding), ] |
| 277 | return msg.get_payload(decode=True).decode(encoding) |
| 278 | |
| 279 | |
| 280 | def cyrus_escape(user_or_folder, keep_path=False, regex=False): |
| 281 | """ |
| 282 | Convert names of users or mailbox folders to cyrus format. |
| 283 | |
| 284 | quite a hack, just does the following hard-coded replacements: |
| 285 | |
| 286 | * . --> ^ |
| 287 | * / --> . (except if keep_path is True) |
| 288 | * "u --> &APw- , "o --> &APY- , "a --> &AOQ- |
| 289 | (if need more: this is modified utf-7) |
| 290 | * inbox --> (the empty string) |
| 291 | |
| 292 | Would like to use a general modified utf-7-encoder/decoder but python has |
| 293 | none builtin (see https://bugs.python.org/issue5305) and an extra lib like |
| 294 | https://bitbucket.org/mjs0/imapclient/ would be overkill. After all, we |
| 295 | control the input to this function via params and this is enough umlaut- |
| 296 | testing I think... |
| 297 | |
| 298 | :param str user_or_folder: name of the user or folder string to escape |
| 299 | :param bool keep_path: do not replace '/' with '.' so can still use result |
| 300 | as path name |
| 301 | :param bool regex: result is used in grep or other regex, so ^, . and & are |
| 302 | escaped again with a backslash |
| 303 | :returns: escaped user or folder string |
| 304 | :rtype: str |
| 305 | |
| 306 | .. seealso:: :py:func:`cyrus_unescape` |
| 307 | """ |
| 308 | temp = user_or_folder.replace('.', '^') \ |
| 309 | .replace('ü', '&APw-').replace('ä', '&AOQ-') \ |
| 310 | .replace('ö', '&APY-') \ |
| 311 | .replace('inbox', '').replace('INBOX', '').replace('Inbox', '') |
| 312 | if not keep_path: |
| 313 | temp = temp.replace('/', '.') |
| 314 | if regex: |
| 315 | return temp.replace('^', r'\^').replace('&', r'\&') \ |
| 316 | .replace('.', r'\.').replace('$', r'\$') |
| 317 | return temp |
| 318 | |
| 319 | |
| 320 | def cyrus_unescape(user_or_folder): |
| 321 | """ |
| 322 | Undo effects of :py:func:`cyrus_escape` (but not all of them). |
| 323 | |
| 324 | :param str user_or_folder: name of the user or folder string to unescape |
| 325 | :returns: unescaped user or folder string |
| 326 | :rtype: str |
| 327 | """ |
| 328 | if user_or_folder == '': |
| 329 | return 'inbox' |
| 330 | return user_or_folder.replace('.', '/')\ |
| 331 | .replace(r'\^', '.').replace('^', '.') |
| 332 | |
| 333 | |
| 334 | def get_filename(message, failobj=None, do_unwrap=True): |
| 335 | """ |
| 336 | Get filename of a message part, even if it is base64-encoded. |
| 337 | |
| 338 | For attachments with base64-encoded file name, the |
| 339 | :py:func:`email.message.Message.get_filename()` does not work. This |
| 340 | function tries that first and if it fails tries to interprete the |
| 341 | Content-Disposition of the message part. If all fails, returns `failobj`. |
| 342 | |
| 343 | Only for ascii filenames: also unwraps file names if they are line-wrapped. |
| 344 | But note that this may remove too much whitespace from the filename if |
| 345 | line-wrapping happened in the same position as the filename's whitespace. |
| 346 | To get unwrapped version, set param `do_unwrap` to `False`. |
| 347 | |
| 348 | See also: https://en.wikipedia.org/wiki/MIME#Encoded-Word |
| 349 | |
| 350 | :param message: message part, e.g. from |
| 351 | :py:meth:`email.message.Message.walk` |
| 352 | :type message: :py:class:`email.message.Message` or |
| 353 | :py:class:`email.message.EmailMessage` |
| 354 | :param failobj: object to return in case of failure (defaults to None) |
| 355 | :param bool do_unwrap: undo line-break inserted by mail-creator; may remove |
| 356 | whitespace from file name; only applies to ascii |
| 357 | file names |
| 358 | :returns: either a string or failobj |
| 359 | """ |
| 360 | # try the old way and unwrap |
| 361 | filename = message.get_filename(failobj) |
| 362 | |
| 363 | if isinstance(filename, bytes) and not filename.startswith(b'=?') \ |
| 364 | and not filename.endswith(b'?='): |
| 365 | filename = filename.decode('utf8') |
| 366 | |
| 367 | if isinstance(filename, str): |
| 368 | if do_unwrap: |
| 369 | return re.sub('[\\r\\n]+', '', filename) |
| 370 | return filename |
| 371 | |
| 372 | if 'Content-Disposition' not in message: |
| 373 | return failobj |
| 374 | |
| 375 | # try parsing content-disposition. e.g.: |
| 376 | # attachment; filename="2018年度公开课计划表.xlsx" --> |
| 377 | # '=?utf-8?b?YXR0YWNobWVudDsgZmlsZW5hbWU9IjIwMTjlubTluqY=?=\r\n =?utf-8?b?' |
| 378 | # '5YWs5byA6K++6K6h5YiS6KGoLnhsc3gi?=' |
| 379 | |
| 380 | # This may be a re-implementation of email.utils.collapse_rfc2231_value() |
| 381 | # as mentioned in email.message.EmailMessage.get_param() |
| 382 | |
| 383 | # The form is: "=?charset?encoding?encoded text?=" |
| 384 | SPLIT_REGEX = '\r?\n *' # should be CRNL but some files miss the \r |
| 385 | ENCODED_WORD_REGEX = r'\s*=\?([^?]+)\?([^?]+)\?(.*)\?=\s*$' |
| 386 | LINE_REGEX = r'attachment\s*;\s*filename=(")?(.+)\1\s*$' |
| 387 | decoded = [] |
| 388 | for word in re.split(SPLIT_REGEX, message['Content-Disposition']): |
| 389 | match = re.match(ENCODED_WORD_REGEX, word) |
| 390 | if not match: |
| 391 | break |
| 392 | charset, encoding, data = match.groups() |
| 393 | if encoding.lower() == 'b': |
| 394 | temp = b64decode(data) |
| 395 | elif encoding.lower() == 'q': |
| 396 | raise NotImplementedError('use quopri.decodestring, handle _') |
| 397 | else: |
| 398 | raise ValueError('not allowed according to wikipedia: "{}"' |
| 399 | .format(encoding)) |
| 400 | decoded.append(temp.decode(charset)) |
| 401 | decoded = u''.join(decoded) |
| 402 | |
| 403 | match = re.match(LINE_REGEX, decoded) |
| 404 | if match: |
| 405 | return match.groups()[1] |
| 406 | return failobj |