| 1 | # The software in this package is distributed under the GNU General |
| 2 | # Public License version 2 (with a special exception described below). |
| 3 | # |
| 4 | # A copy of GNU General Public License (GPL) is included in this distribution, |
| 5 | # in the file COPYING.GPL. |
| 6 | # |
| 7 | # As a special exception, if other files instantiate templates or use macros |
| 8 | # or inline functions from this file, or you compile this file and link it |
| 9 | # with other works to produce a work based on this file, this file |
| 10 | # does not by itself cause the resulting work to be covered |
| 11 | # by the GNU General Public License. |
| 12 | # |
| 13 | # However the source code for this file must still be made available |
| 14 | # in accordance with section (3) of the GNU General Public License. |
| 15 | # |
| 16 | # This exception does not invalidate any other reasons why a work based |
| 17 | # on this file might be covered by the GNU General Public License. |
| 18 | # |
| 19 | # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com> |
| 20 | |
| 21 | """ |
| 22 | Connection to an IMAP mailbox |
| 23 | |
| 24 | Convenience wrapper around :py:class:`imaplib.IMAP4` |
| 25 | |
| 26 | Features provided in addition to :py:class:`imaplib.IMAP4`:: |
| 27 | * dealing with unsolicited responses (missing in most other imap libs) |
| 28 | * conversion of `BAD` status responses to errors |
| 29 | * simple quoting of imap folder names |
| 30 | * debug logging to console/file |
| 31 | * memory of which folder is currently selected |
| 32 | * function to copy/move an email from one folder to another |
| 33 | * deal with some of the variability of returned values from `fetch` command |
| 34 | * created email message objects from byte data |
| 35 | |
| 36 | TODO: |
| 37 | * use SSL connection per default, unencrypted only as option |
| 38 | |
| 39 | Copyright: Intra2net AG |
| 40 | """ |
| 41 | |
| 42 | import imaplib |
| 43 | import logging |
| 44 | import time |
| 45 | from email import policy |
| 46 | from email import message_from_bytes |
| 47 | |
| 48 | log = logging.getLogger('pyi2ncommon.imap_mailbox') |
| 49 | |
| 50 | |
| 51 | class ImapError(Exception): |
| 52 | """Exception raised when some imap command fails.""" |
| 53 | |
| 54 | def __init__(self, command, status, message): |
| 55 | """Create a new ImapError for given command and response.""" |
| 56 | super(ImapError, self).__init__('IMAP command {!r} returned {}: {}' |
| 57 | .format(command, status, message)) |
| 58 | self.command = command |
| 59 | self.status = status |
| 60 | self.message = message |
| 61 | |
| 62 | |
| 63 | def quote_imap_folder(folder): |
| 64 | """ |
| 65 | Quote name of a folder if required by IMAP. |
| 66 | |
| 67 | Folder names must be quoted if they contain e.g. spaces. This |
| 68 | implementation is incomplete, but should suffice for current use. |
| 69 | |
| 70 | :param str folder: Name of a folder in an IMAP mailbox, possibly quoted |
| 71 | :returns: same folder name as bytes, possibly with added quotes |
| 72 | :rtype: bytes |
| 73 | """ |
| 74 | if not folder: |
| 75 | return b'' |
| 76 | if folder[0] == '"' and folder[-1] == '"': |
| 77 | return folder.encode('ascii', 'strict') # is already quoted |
| 78 | if any(special in folder for special in ' (){}"\\[]'): |
| 79 | # see RFC3501 $5.1 and $9 |
| 80 | return f'"{folder}"'.encode('ascii', 'strict') # quote the folder |
| 81 | return folder.encode('ascii', 'strict') |
| 82 | |
| 83 | |
| 84 | class ImapMailbox(imaplib.IMAP4): |
| 85 | """ |
| 86 | Convenience to access and query imap mailbox. |
| 87 | |
| 88 | Raises :py:class:`ImapError` if commands fail, instead of returning 2-tuple |
| 89 | (status, data) with status being "OK" or "BAD" or "NO" or so. |
| 90 | |
| 91 | Use as ContextManager:: |
| 92 | |
| 93 | with ImapMailbox(username, password) as mbox: |
| 94 | mbox.list_all_mail('INBOX') |
| 95 | # do something |
| 96 | |
| 97 | Python's imaplib really offers little comfort for users, so we have to deal |
| 98 | on our own with: |
| 99 | - unsolicited responses |
| 100 | - converting BAD response to error |
| 101 | - encoding to/from bytes |
| 102 | - parsing output like b'[1 2 3]' |
| 103 | - quoting of folder names (e.g. add "" around folders with spaces) |
| 104 | - ... |
| 105 | Before adding too much capability here, consider using libraries that do |
| 106 | all that already like ImapClient (which unfortunately does not deal with |
| 107 | unsolicited responses)... |
| 108 | |
| 109 | Cannot deal with unicode folder names. |
| 110 | """ |
| 111 | |
| 112 | def __init__(self, user=None, password=None, host='localhost', |
| 113 | debug_log=False): |
| 114 | """ |
| 115 | Create a new connection to an imap mailbox. |
| 116 | |
| 117 | User and password can be given either here or to method |
| 118 | :py:meth:`login`. |
| 119 | |
| 120 | :param str user: Username for imap mailbox, optional |
| 121 | :param str password: Password for logging into mailbox |
| 122 | :param str host: Host name, defaults to localhost |
| 123 | :param debug_log: Either bool to enable/disable debug logging or a file |
| 124 | name to log to that file |
| 125 | :type debug_log: bool or str |
| 126 | """ |
| 127 | super(ImapMailbox, self).__init__(host) |
| 128 | self.host = host |
| 129 | self.user = user |
| 130 | self.password = password |
| 131 | self._selected = None |
| 132 | self._debug_log_handle = None |
| 133 | if debug_log: |
| 134 | self.debug = 4 # enable debugging in super class |
| 135 | # overwrite private debug log function of base class with own. |
| 136 | # this is slightly hacky since depends on private implementation |
| 137 | if isinstance(debug_log, str): |
| 138 | self._debug_log_handle = open(debug_log, 'at') |
| 139 | self._mesg = self._debug_log_to_file # logs to file |
| 140 | else: |
| 141 | # uses logging instead of print; ignore optional secs argument |
| 142 | self._mesg = lambda message, _=None: \ |
| 143 | log.debug('IMAP: ' + message) |
| 144 | self._mesg('Debug logging enabled in {}'.format(self)) |
| 145 | |
| 146 | def __str__(self): |
| 147 | """Create a textual representation of this object.""" |
| 148 | if self.user is None: |
| 149 | return '[ImapMailbox on {}]'.format(self.host) |
| 150 | return '[ImapMailbox for {}@{}]'.format(self.user, self.host) |
| 151 | |
| 152 | def __enter__(self): |
| 153 | """ |
| 154 | Called upon entering context manager. Logs in. |
| 155 | |
| 156 | Requires user and password be given to constructor already. |
| 157 | """ |
| 158 | self.login() |
| 159 | return self |
| 160 | |
| 161 | def __exit__(self, *args): |
| 162 | """Called upon leaving context manager. Logs out. Ignores `args`""" |
| 163 | try: |
| 164 | self.logout() |
| 165 | except OSError: |
| 166 | pass |
| 167 | |
| 168 | def _debug_log_to_file(self, message, secs=None): |
| 169 | """ |
| 170 | Redirect debug message to file instead of stderr. |
| 171 | |
| 172 | :param str message: Debug message |
| 173 | :param int secs: Timestamp of message, defaults to :py:func:`time.time` |
| 174 | """ |
| 175 | if secs is None: |
| 176 | secs = time.time() |
| 177 | timestamp = time.strftime('%H:%M:%S', time.localtime(secs)) |
| 178 | prefix = '%s.%03d: ' % (timestamp, (secs*1000)%1000) |
| 179 | for line in message.splitlines(): |
| 180 | self._debug_log_handle.write(prefix + line) |
| 181 | if not message.endswith('\n'): |
| 182 | self._debug_log_handle.write('\n') |
| 183 | |
| 184 | def login(self, user=None, password=None): |
| 185 | """ |
| 186 | Log in to mailbox. |
| 187 | |
| 188 | :param str user: Username to use for login. Overrides the one given in |
| 189 | constructor. Must be given if not given to constructor |
| 190 | :param str password: Password for login. Same restrictions as user |
| 191 | :returns: first return item from imap login. |
| 192 | """ |
| 193 | if user is not None: |
| 194 | self.user = user |
| 195 | if password is not None: |
| 196 | self.password = password |
| 197 | if self.user is None: |
| 198 | raise ValueError('User must be given to either ImapMailbox ' |
| 199 | 'constructor or login') |
| 200 | if self.password is None: |
| 201 | raise ValueError('Password must be given to either ImapMailbox ' |
| 202 | 'constructor or login') |
| 203 | typ, data = super(ImapMailbox, self).login(self.user, self.password) |
| 204 | if typ != 'OK': |
| 205 | raise ImapError('login', typ, data) |
| 206 | self._select('INBOX') |
| 207 | return data[0] |
| 208 | |
| 209 | def logout(self): |
| 210 | """Log out of mailbox. Returns data returned by imap logout.""" |
| 211 | typ, data = super(ImapMailbox, self).logout() |
| 212 | if self._debug_log_handle: |
| 213 | self._debug_log_handle.close() |
| 214 | if typ != 'BYE': |
| 215 | raise ImapError('logout', typ, data) |
| 216 | self._selected = None |
| 217 | return data |
| 218 | |
| 219 | def _select(self, folder): |
| 220 | """Internal helper that remembers what folder is selected.""" |
| 221 | if not folder: |
| 222 | return |
| 223 | if folder == self._selected: |
| 224 | return |
| 225 | # select clears all unsolicited responses, so no use to clear them now |
| 226 | typ, data = self.select(quote_imap_folder(folder)) |
| 227 | if typ != 'OK': |
| 228 | raise ImapError('select', typ, data) |
| 229 | self._selected = folder |
| 230 | |
| 231 | def _clear_unsolicited_responses(self, cmd): |
| 232 | """ |
| 233 | Clear mailbox's memory of unsolicited responses for given cmd. |
| 234 | |
| 235 | Next call to `cmd` should only give the expected responses. |
| 236 | |
| 237 | :param str cmd: next command to run on mailbox. |
| 238 | """ |
| 239 | self.response(cmd) |
| 240 | |
| 241 | def list_all_mail(self, folder=None): |
| 242 | """List all message ids in given folder.""" |
| 243 | self._select(folder) |
| 244 | self._clear_unsolicited_responses('SEARCH') |
| 245 | typ, data = super(ImapMailbox, self).search(None, 'ALL') |
| 246 | if typ != 'OK': |
| 247 | raise ImapError('search', typ, data) |
| 248 | if len(data) != 1: |
| 249 | raise ImapError('search', typ, |
| 250 | 'Data has not len 1 but {}'.format(len(data))) |
| 251 | return tuple(int(message_id) for message_id in data[0].split()) |
| 252 | |
| 253 | def copy_message(self, from_folder, message_id, to_folder, del_orig=False): |
| 254 | """ |
| 255 | Copy message with given id from from_folder to to_folder. |
| 256 | |
| 257 | :param str from_folder: Folder containing the message |
| 258 | :param message_id: ID of message in `from_folder` that will be copied |
| 259 | :type message_id: int or str |
| 260 | :param str to_folder: Target folder |
| 261 | :param bool del_orig: Delete original message, making this "copy" a |
| 262 | "move" operation |
| 263 | """ |
| 264 | self._select(from_folder) |
| 265 | self._clear_unsolicited_responses('COPY') |
| 266 | typ, data = super(ImapMailbox, self).copy(str(message_id), |
| 267 | quote_imap_folder(to_folder)) |
| 268 | if typ != 'OK': |
| 269 | raise ImapError('copy', typ, data) |
| 270 | if del_orig: |
| 271 | self._clear_unsolicited_responses('FETCH') |
| 272 | typ, data = self.store(message_id, '+FLAGS', r'(\Deleted)') |
| 273 | if typ != 'OK': |
| 274 | raise ImapError('store', typ, data) |
| 275 | self._clear_unsolicited_responses('EXPUNGE') |
| 276 | typ, _ = self.expunge() |
| 277 | if typ != 'OK': |
| 278 | raise ImapError('expunge', typ, data) |
| 279 | |
| 280 | def fetch_mail(self, message_id, folder=None, part='RFC822'): |
| 281 | """ |
| 282 | Fetch and return an email in RFC822 format. |
| 283 | |
| 284 | .. seealso:: :py:meth:`fetch_message` |
| 285 | |
| 286 | :param int message_id: Message id in given folder |
| 287 | :param folder: Folder that contains the message or None (default) if |
| 288 | folder is already selected. |
| 289 | :type folder: str or None |
| 290 | :param str part: Message part to fetch. For simplicity of return args, |
| 291 | only one part can be fetched at a time. Other possible |
| 292 | values: ``UID``, ``BODY[TEXT]``, ... |
| 293 | :returns: requested message part |
| 294 | :rtype: bytes |
| 295 | """ |
| 296 | self._select(folder) |
| 297 | self._clear_unsolicited_responses('FETCH') |
| 298 | typ, data = super(ImapMailbox, self).fetch(str(message_id), part) |
| 299 | if typ != 'OK': |
| 300 | raise ImapError('fetch', typ, data) |
| 301 | |
| 302 | # have to be flexible with returned data here... |
| 303 | # should be a list of len 1 of 2-tuples, but reality differs |
| 304 | if len(data) == 2 and data[1] == b')': |
| 305 | data = data[:1] |
| 306 | if len(data) != 1: |
| 307 | raise ImapError('fetch', typ, |
| 308 | 'Data has not len 1 but {}'.format(len(data))) |
| 309 | data = data[0] |
| 310 | if part.lower() == 'uid' and isinstance(data, bytes): |
| 311 | return data[0] |
| 312 | if len(data) != 2: |
| 313 | raise ImapError('fetch', typ, |
| 314 | 'Data[0] has len {} != 2'.format(len(data))) |
| 315 | return data[1] |
| 316 | |
| 317 | def fetch_message(self, message_id, folder=None): |
| 318 | """ |
| 319 | Fetch complete message and convert to message object. |
| 320 | |
| 321 | All params are forwarded to :py:meth:`fetch_mail`. Afterwards, the |
| 322 | resulting data is parsed into a :py:class:`email.message.EmailMessage` |
| 323 | object (for python versions < 3.4: :py:class:`email.message.Message`). |
| 324 | """ |
| 325 | return message_from_bytes( |
| 326 | self.fetch_mail(message_id, folder, part='RFC822'), |
| 327 | policy=policy.default + policy.strict) |