# The software in this package is distributed under the GNU General # Public License version 2 (with a special exception described below). # # A copy of GNU General Public License (GPL) is included in this distribution, # in the file COPYING.GPL. # # As a special exception, if other files instantiate templates or use macros # or inline functions from this file, or you compile this file and link it # with other works to produce a work based on this file, this file # does not by itself cause the resulting work to be covered # by the GNU General Public License. # # However the source code for this file must still be made available # in accordance with section (3) of the GNU General Public License. # # This exception does not invalidate any other reasons why a work based # on this file might be covered by the GNU General Public License. # # Copyright (c) 2016-2018 Intra2net AG """ Connection to an IMAP mailbox Convenience wrapper around :py:class:`imaplib.IMAP4` Features provided in addition to :py:class:`imaplib.IMAP4`:: * dealing with unsolicited responses (missing in most other imap libs) * conversion of `BAD` status responses to errors * simple quoting of imap folder names * debug logging to console/file * memory of which folder is currently selected * function to copy/move an email from one folder to another * deal with some of the variability of returned values from `fetch` command * created email message objects from byte data TODO: * use SSL connection per default, unencrypted only as option Copyright: Intra2net AG """ import imaplib import logging import time from email import policy from email import message_from_bytes log = logging.getLogger('pyi2ncommon.imap_mailbox') class ImapError(Exception): """Exception raised when some imap command fails.""" def __init__(self, command, status, message): """Create a new ImapError for given command and response.""" super(ImapError, self).__init__('IMAP command {!r} returned {}: {}' .format(command, status, message)) self.command = command self.status = status self.message = message def quote_imap_folder(folder): """ Quote name of a folder if required by IMAP. Folder names must be quoted if they contain e.g. spaces. This implementation is incomplete, but should suffice for current use. :param str folder: Name of a folder in an IMAP mailbox, possibly quoted :returns: same folder name as bytes, possibly with added quotes :rtype: bytes """ if not folder: return b'' if folder[0] == '"' and folder[-1] == '"': return folder.encode('ascii', 'strict') # is already quoted if any(special in folder for special in ' (){}"\\[]'): # see RFC3501 $5.1 and $9 return f'"{folder}"'.encode('ascii', 'strict') # quote the folder return folder.encode('ascii', 'strict') class ImapMailbox(imaplib.IMAP4): """ Convenience to access and query imap mailbox. Raises :py:class:`ImapError` if commands fail, instead of returning 2-tuple (status, data) with status being "OK" or "BAD" or "NO" or so. Use as ContextManager:: with ImapMailbox(username, password) as mbox: mbox.list_all_mail('INBOX') # do something Python's imaplib really offers little comfort for users, so we have to deal on our own with: - unsolicited responses - converting BAD response to error - encoding to/from bytes - parsing output like b'[1 2 3]' - quoting of folder names (e.g. add "" around folders with spaces) - ... Before adding too much capability here, consider using libraries that do all that already like ImapClient (which unfortunately does not deal with unsolicited responses)... Cannot deal with unicode folder names. """ def __init__(self, user=None, password=None, host='localhost', debug_log=False): """ Create a new connection to an imap mailbox. User and password can be given either here or to method :py:meth:`login`. :param str user: Username for imap mailbox, optional :param str password: Password for logging into mailbox :param str host: Host name, defaults to localhost :param debug_log: Either bool to enable/disable debug logging or a file name to log to that file :type debug_log: bool or str """ super(ImapMailbox, self).__init__(host) self.host = host self.user = user self.password = password self._selected = None self._debug_log_handle = None if debug_log: self.debug = 4 # enable debugging in super class # overwrite private debug log function of base class with own. # this is slightly hacky since depends on private implementation if isinstance(debug_log, str): self._debug_log_handle = open(debug_log, 'at') self._mesg = self._debug_log_to_file # logs to file else: # uses logging instead of print; ignore optional secs argument self._mesg = lambda message, _=None: \ log.debug('IMAP: ' + message) self._mesg('Debug logging enabled in {}'.format(self)) def __str__(self): """Create a textual representation of this object.""" if self.user is None: return '[ImapMailbox on {}]'.format(self.host) return '[ImapMailbox for {}@{}]'.format(self.user, self.host) def __enter__(self): """ Called upon entering context manager. Logs in. Requires user and password be given to constructor already. """ self.login() return self def __exit__(self, *args): """Called upon leaving context manager. Logs out. Ignores `args`""" try: self.logout() except OSError: pass def _debug_log_to_file(self, message, secs=None): """ Redirect debug message to file instead of stderr. :param str message: Debug message :param int secs: Timestamp of message, defaults to :py:func:`time.time` """ if secs is None: secs = time.time() timestamp = time.strftime('%H:%M:%S', time.localtime(secs)) prefix = '%s.%03d: ' % (timestamp, (secs*1000)%1000) for line in message.splitlines(): self._debug_log_handle.write(prefix + line) if not message.endswith('\n'): self._debug_log_handle.write('\n') def login(self, user=None, password=None): """ Log in to mailbox. :param str user: Username to use for login. Overrides the one given in constructor. Must be given if not given to constructor :param str password: Password for login. Same restrictions as user :returns: first return item from imap login. """ if user is not None: self.user = user if password is not None: self.password = password if self.user is None: raise ValueError('User must be given to either ImapMailbox ' 'constructor or login') if self.password is None: raise ValueError('Password must be given to either ImapMailbox ' 'constructor or login') typ, data = super(ImapMailbox, self).login(self.user, self.password) if typ != 'OK': raise ImapError('login', typ, data) self._select('INBOX') return data[0] def logout(self): """Log out of mailbox. Returns data returned by imap logout.""" typ, data = super(ImapMailbox, self).logout() if self._debug_log_handle: self._debug_log_handle.close() if typ != 'BYE': raise ImapError('logout', typ, data) self._selected = None return data def _select(self, folder): """Internal helper that remembers what folder is selected.""" if not folder: return if folder == self._selected: return # select clears all unsolicited responses, so no use to clear them now typ, data = self.select(quote_imap_folder(folder)) if typ != 'OK': raise ImapError('select', typ, data) self._selected = folder def _clear_unsolicited_responses(self, cmd): """ Clear mailbox's memory of unsolicited responses for given cmd. Next call to `cmd` should only give the expected responses. :param str cmd: next command to run on mailbox. """ self.response(cmd) def list_all_mail(self, folder=None): """List all message ids in given folder.""" self._select(folder) self._clear_unsolicited_responses('SEARCH') typ, data = super(ImapMailbox, self).search(None, 'ALL') if typ != 'OK': raise ImapError('search', typ, data) if len(data) != 1: raise ImapError('search', typ, 'Data has not len 1 but {}'.format(len(data))) return tuple(int(message_id) for message_id in data[0].split()) def copy_message(self, from_folder, message_id, to_folder, del_orig=False): """ Copy message with given id from from_folder to to_folder. :param str from_folder: Folder containing the message :param message_id: ID of message in `from_folder` that will be copied :type message_id: int or str :param str to_folder: Target folder :param bool del_orig: Delete original message, making this "copy" a "move" operation """ self._select(from_folder) self._clear_unsolicited_responses('COPY') typ, data = super(ImapMailbox, self).copy(str(message_id), quote_imap_folder(to_folder)) if typ != 'OK': raise ImapError('copy', typ, data) if del_orig: self._clear_unsolicited_responses('FETCH') typ, data = self.store(message_id, '+FLAGS', r'(\Deleted)') if typ != 'OK': raise ImapError('store', typ, data) self._clear_unsolicited_responses('EXPUNGE') typ, _ = self.expunge() if typ != 'OK': raise ImapError('expunge', typ, data) def fetch_mail(self, message_id, folder=None, part='RFC822'): """ Fetch and return an email in RFC822 format. .. seealso:: :py:meth:`fetch_message` :param int message_id: Message id in given folder :param folder: Folder that contains the message or None (default) if folder is already selected. :type folder: str or None :param str part: Message part to fetch. For simplicity of return args, only one part can be fetched at a time. Other possible values: ``UID``, ``BODY[TEXT]``, ... :returns: requested message part :rtype: bytes """ self._select(folder) self._clear_unsolicited_responses('FETCH') typ, data = super(ImapMailbox, self).fetch(str(message_id), part) if typ != 'OK': raise ImapError('fetch', typ, data) # have to be flexible with returned data here... # should be a list of len 1 of 2-tuples, but reality differs if len(data) == 2 and data[1] == b')': data = data[:1] if len(data) != 1: raise ImapError('fetch', typ, 'Data has not len 1 but {}'.format(len(data))) data = data[0] if part.lower() == 'uid' and isinstance(data, bytes): return data[0] if len(data) != 2: raise ImapError('fetch', typ, 'Data[0] has len {} != 2'.format(len(data))) return data[1] def fetch_message(self, message_id, folder=None): """ Fetch complete message and convert to message object. All params are forwarded to :py:meth:`fetch_mail`. Afterwards, the resulting data is parsed into a :py:class:`email.message.EmailMessage` object (for python versions < 3.4: :py:class:`email.message.Message`). """ return message_from_bytes( self.fetch_mail(message_id, folder, part='RFC822'), policy=policy.default + policy.strict)