From: Christian Herdtweck Date: Fri, 7 Jun 2019 07:21:15 +0000 (+0200) Subject: Create ImapMailbox, a wrapper around python's IMAP4 X-Git-Tag: v1.6~1^2~7 X-Git-Url: http://developer.intra2net.com/git/?a=commitdiff_plain;h=4bcc6621720c268684127c5a605e59f1c18eef14;p=pyi2ncommon Create ImapMailbox, a wrapper around python's IMAP4 Python's built-in imap class offers little convenience. Add some of our own with this class. --- diff --git a/src/imap_mailbox.py b/src/imap_mailbox.py new file mode 100644 index 0000000..7b79f1f --- /dev/null +++ b/src/imap_mailbox.py @@ -0,0 +1,279 @@ +# The software in this package is distributed under the GNU General +# Public License version 2 (with a special exception described below). +# +# A copy of GNU General Public License (GPL) is included in this distribution, +# in the file COPYING.GPL. +# +# As a special exception, if other files instantiate templates or use macros +# or inline functions from this file, or you compile this file and link it +# with other works to produce a work based on this file, this file +# does not by itself cause the resulting work to be covered +# by the GNU General Public License. +# +# However the source code for this file must still be made available +# in accordance with section (3) of the GNU General Public License. +# +# This exception does not invalidate any other reasons why a work based +# on this file might be covered by the GNU General Public License. +# +# Copyright (c) 2016-2018 Intra2net AG + +""" + +SUMMARY +------------------------------------------------------ +Connection to an IMAP mailbox + +Convenience wrapper around :py:class:`imaplib.IMAP4` + +Features provided in addition to :py:class:`imaplib.IMAP4`:: + * dealing with unsolicited responses (missing in most other imap libs) + * conversion of `BAD` status responses to errors + * simple quoting of imap folder names + * debug logging to console/file + * memory of which folder is currently selected + * function to copy/move an email from one folder to another + +Copyright: Intra2net AG + + +INTERFACE +------------------------------------------------------ + +""" + +import imaplib +import logging +import time + +log = logging.getLogger('pyi2ncommon.imap_mailbox') + + +class ImapError(Exception): + """Exception raised when some imap command fails.""" + + def __init__(self, command, status, message): + """Create a new ImapError for given command and response.""" + super(ImapError, self).__init__('IMAP command {!r} returned {}: {}' + .format(command, status, message)) + self.command = command + self.status = status + self.message = message + + +def quote_imap_folder(folder): + """ + Quote name of a folder if required by IMAP. + + Folder names must be quoted if they contain e.g. spaces. This + implementation is incomplete, but should suffice for current use. + + :param str folder: Name of a folder in an IMAP mailbox, possibly quoted + :returns: same folder name, possibly with added quotes + :rtype: byte + """ + if not folder: + return folder + if folder[0] == '"' and folder[-1] == '"': + return folder # is already quoted + if any(special in folder for special in ' (){}"\\[]'): + # see RFC3501 $5.1 and $9 + return '"' + folder + '"' # quote the folder + return folder.encode('ascii', 'strict') + + +class ImapMailbox(imaplib.IMAP4): + """ + Convenience to access and query imap mailbox. + + Raises :py:class:`ImapError` if commands fail, instead of returning 2-tuple + (status, data) with status being "OK" or "BAD" or "NO" or so. + + Use as ContextManager:: + + with ImapMailbox(username, password) as mbox: + mbox.list_all_mail('INBOX') + # do something + + Python's imaplib really offers little comfort for users, so we have to deal + on our own with: + - unsolicited responses + - converting BAD response to error + - encoding to/from bytes + - parsing output like b'[1 2 3]' + - quoting of folder names (e.g. add "" around folders with spaces) + - ... + Before adding too much capability here, consider using libraries that do + all that already like ImapClient (which unfortunately does not deal with + unsolicited reponses)... + + Cannot deal with unicode folder names. + """ + + def __init__(self, user=None, password=None, host='localhost', + debug_log=False): + """ + Create a new connection to an imap mailbox. + + User and password can be given either here or to method + :py:meth:`login`. + + :param str user: User name for imap mailbox, optional + :param str password: Password for logging into mailbox + :param str host: Host name, defaults to localhost + :param debug_log: Either bool to enable/disable debug logging or a file + name to log to that file + :type debug_log: bool or str + """ + super(ImapMailbox, self).__init__(host) + self.host = host + self.user = user + self.password = password + self._selected = None + self._debug_log_handle = None + if debug_log: + self.debug = 4 # enable debugging in super class + # overwrite prviate debug log function of base class with own. + # this is slightly hacky since depends on private implementation + if isinstance(debug_log, str): + self._debug_log_handle = open(debug_log, 'at') + self._mesg = self._debug_log_to_file # logs to file + else: + # uses logging instead of print; ignore optional secs argument + self._mesg = lambda message, _=None: \ + log.debug('IMAP: ' + message) + self._mesg('Debug logging enabled in {}'.format(self)) + + def __str__(self): + """Create a textual representation of this object.""" + if self.user is None: + return '[ImapMailbox on {}]'.format(self.host) + return '[ImapMailbox for {}@{}]'.format(self.user, self.host) + + def __enter__(self): + """ + Called upon entering context manager. Logs in. + + Requires user and password be given to constructor already. + """ + self.login() + return self + + def __exit__(self, *args): + """Called upon leaving context manager. Logs out. Ignores `args`""" + try: + self.logout() + except OSError: + pass + + def _debug_log_to_file(self, message, secs=None): + """ + Redirect debug message to file instead of stderr. + + :param str message: Debug message + :param int secs: Timestamp of message, defaults to :py:func:`time.time` + """ + if secs is None: + secs = time.time() + timestamp = time.strftime('%H:%M:%S', time.localtime(secs)) + prefix = '%s.%03d: ' % (timestamp, (secs*1000)%1000) + for line in message.splitlines(): + self._debug_log_handle.write(prefix + line) + if not message.endswith('\n'): + self._debug_log_handle.write('\n') + + def login(self, user=None, password=None): + """ + Log in to mailbox. + + :param str user: User name to use for login. Overrides the one given in + constructor. Must be given if not given to constructor + :param str password: Password for login. Same restrictions as user + :returns: Whatever imap login returns. + """ + if user is not None: + self.user = user + if password is not None: + self.password = password + if self.user is None: + raise ValueError('User must be given to either ImapMailbox ' + 'constructor or login') + if self.password is None: + raise ValueError('Password must be given to either ImapMailbox ' + 'constructor or login') + typ, data = super(ImapMailbox, self).login(self.user, self.password) + if typ != 'OK': + raise ImapError('login', typ, data) + self._select('INBOX') + return data + + def logout(self): + """Log out of mailbox. Returns data returned by imap logout.""" + typ, data = super(ImapMailbox, self).logout() + if self._debug_log_handle: + self._debug_log_handle.close() + if typ != 'BYE': + raise ImapError('logout', typ, data) + self._selected = None + return data + + def _select(self, folder): + """Internal helper that remembers what folder is selected.""" + if not folder: + return + if folder == self._selected: + return + # select clears all unsolicited responses, so no use to clear them now + typ, data = self.select(quote_imap_folder(folder)) + if typ != 'OK': + raise ImapError('select', typ, data) + self._selected = folder + + def _clear_unsolicited_responses(self, cmd): + """ + Clear mailbox's memory of unsolicited responses for given cmd. + + Next call to `cmd` should only give the expected responses. + + :param str cmd: next command to run on mailbox. + """ + self.response(cmd) + + def list_all_mail(self, folder=None): + """List all message ids in given folder.""" + self._select(folder) + self._clear_unsolicited_responses('SEARCH') + typ, data = super(ImapMailbox, self).search(None, 'ALL') + if typ != 'OK': + raise ImapError('search', typ, data) + if len(data) != 1: + raise ImapError('search', typ, + 'Data has not len 1 but {}'.format(len(data))) + return tuple(int(message_id) for message_id in data[0].split()) + + def copy_message(self, from_folder, message_id, to_folder, del_orig=False): + """ + Copy message with given id from from_folder to to_folder. + + :param str from_folder: Folder containing the message + :param message_id: ID of message in `from_folder` that will be copied + :type message_id: int or str + :param str to_folder: Target folder + :param bool del_orig: Delete original message, making this "copy" a + "move" operation + """ + self._select(from_folder) + self._clear_unsolicited_responses('COPY') + typ, data = super(ImapMailbox, self).copy(str(message_id), + quote_imap_folder(to_folder)) + if typ != 'OK': + raise ImapError('copy', typ, data) + if del_orig: + self._clear_unsolicited_responses('FETCH') + typ, data = self.store(message_id, '+FLAGS', r'(\Deleted)') + if typ != 'OK': + raise ImapError('store', typ, data) + self._clear_unsolicited_responses('EXPUNGE') + typ, _ = self.expunge() + if typ != 'OK': + raise ImapError('expunge', typ, data) diff --git a/src/mail_utils.py b/src/mail_utils.py index e6a0c7a..a989339 100644 --- a/src/mail_utils.py +++ b/src/mail_utils.py @@ -57,6 +57,7 @@ from email.parser import Parser import mimetypes from . import arnied_wrapper +from imap_mailbox import ImapMailbox log = logging.getLogger('pyi2ncommon.mail_utils')