--- /dev/null
+# The software in this package is distributed under the GNU General
+# Public License version 2 (with a special exception described below).
+#
+# A copy of GNU General Public License (GPL) is included in this distribution,
+# in the file COPYING.GPL.
+#
+# As a special exception, if other files instantiate templates or use macros
+# or inline functions from this file, or you compile this file and link it
+# with other works to produce a work based on this file, this file
+# does not by itself cause the resulting work to be covered
+# by the GNU General Public License.
+#
+# However the source code for this file must still be made available
+# in accordance with section (3) of the GNU General Public License.
+#
+# This exception does not invalidate any other reasons why a work based
+# on this file might be covered by the GNU General Public License.
+#
+# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
+
+"""
+
+SUMMARY
+------------------------------------------------------
+Connection to an IMAP mailbox
+
+Convenience wrapper around :py:class:`imaplib.IMAP4`
+
+Features provided in addition to :py:class:`imaplib.IMAP4`::
+ * dealing with unsolicited responses (missing in most other imap libs)
+ * conversion of `BAD` status responses to errors
+ * simple quoting of imap folder names
+ * debug logging to console/file
+ * memory of which folder is currently selected
+ * function to copy/move an email from one folder to another
+
+Copyright: Intra2net AG
+
+
+INTERFACE
+------------------------------------------------------
+
+"""
+
+import imaplib
+import logging
+import time
+
+log = logging.getLogger('pyi2ncommon.imap_mailbox')
+
+
+class ImapError(Exception):
+ """Exception raised when some imap command fails."""
+
+ def __init__(self, command, status, message):
+ """Create a new ImapError for given command and response."""
+ super(ImapError, self).__init__('IMAP command {!r} returned {}: {}'
+ .format(command, status, message))
+ self.command = command
+ self.status = status
+ self.message = message
+
+
+def quote_imap_folder(folder):
+ """
+ Quote name of a folder if required by IMAP.
+
+ Folder names must be quoted if they contain e.g. spaces. This
+ implementation is incomplete, but should suffice for current use.
+
+ :param str folder: Name of a folder in an IMAP mailbox, possibly quoted
+ :returns: same folder name, possibly with added quotes
+ :rtype: byte
+ """
+ if not folder:
+ return folder
+ if folder[0] == '"' and folder[-1] == '"':
+ return folder # is already quoted
+ if any(special in folder for special in ' (){}"\\[]'):
+ # see RFC3501 $5.1 and $9
+ return '"' + folder + '"' # quote the folder
+ return folder.encode('ascii', 'strict')
+
+
+class ImapMailbox(imaplib.IMAP4):
+ """
+ Convenience to access and query imap mailbox.
+
+ Raises :py:class:`ImapError` if commands fail, instead of returning 2-tuple
+ (status, data) with status being "OK" or "BAD" or "NO" or so.
+
+ Use as ContextManager::
+
+ with ImapMailbox(username, password) as mbox:
+ mbox.list_all_mail('INBOX')
+ # do something
+
+ Python's imaplib really offers little comfort for users, so we have to deal
+ on our own with:
+ - unsolicited responses
+ - converting BAD response to error
+ - encoding to/from bytes
+ - parsing output like b'[1 2 3]'
+ - quoting of folder names (e.g. add "" around folders with spaces)
+ - ...
+ Before adding too much capability here, consider using libraries that do
+ all that already like ImapClient (which unfortunately does not deal with
+ unsolicited reponses)...
+
+ Cannot deal with unicode folder names.
+ """
+
+ def __init__(self, user=None, password=None, host='localhost',
+ debug_log=False):
+ """
+ Create a new connection to an imap mailbox.
+
+ User and password can be given either here or to method
+ :py:meth:`login`.
+
+ :param str user: User name for imap mailbox, optional
+ :param str password: Password for logging into mailbox
+ :param str host: Host name, defaults to localhost
+ :param debug_log: Either bool to enable/disable debug logging or a file
+ name to log to that file
+ :type debug_log: bool or str
+ """
+ super(ImapMailbox, self).__init__(host)
+ self.host = host
+ self.user = user
+ self.password = password
+ self._selected = None
+ self._debug_log_handle = None
+ if debug_log:
+ self.debug = 4 # enable debugging in super class
+ # overwrite prviate debug log function of base class with own.
+ # this is slightly hacky since depends on private implementation
+ if isinstance(debug_log, str):
+ self._debug_log_handle = open(debug_log, 'at')
+ self._mesg = self._debug_log_to_file # logs to file
+ else:
+ # uses logging instead of print; ignore optional secs argument
+ self._mesg = lambda message, _=None: \
+ log.debug('IMAP: ' + message)
+ self._mesg('Debug logging enabled in {}'.format(self))
+
+ def __str__(self):
+ """Create a textual representation of this object."""
+ if self.user is None:
+ return '[ImapMailbox on {}]'.format(self.host)
+ return '[ImapMailbox for {}@{}]'.format(self.user, self.host)
+
+ def __enter__(self):
+ """
+ Called upon entering context manager. Logs in.
+
+ Requires user and password be given to constructor already.
+ """
+ self.login()
+ return self
+
+ def __exit__(self, *args):
+ """Called upon leaving context manager. Logs out. Ignores `args`"""
+ try:
+ self.logout()
+ except OSError:
+ pass
+
+ def _debug_log_to_file(self, message, secs=None):
+ """
+ Redirect debug message to file instead of stderr.
+
+ :param str message: Debug message
+ :param int secs: Timestamp of message, defaults to :py:func:`time.time`
+ """
+ if secs is None:
+ secs = time.time()
+ timestamp = time.strftime('%H:%M:%S', time.localtime(secs))
+ prefix = '%s.%03d: ' % (timestamp, (secs*1000)%1000)
+ for line in message.splitlines():
+ self._debug_log_handle.write(prefix + line)
+ if not message.endswith('\n'):
+ self._debug_log_handle.write('\n')
+
+ def login(self, user=None, password=None):
+ """
+ Log in to mailbox.
+
+ :param str user: User name to use for login. Overrides the one given in
+ constructor. Must be given if not given to constructor
+ :param str password: Password for login. Same restrictions as user
+ :returns: Whatever imap login returns.
+ """
+ if user is not None:
+ self.user = user
+ if password is not None:
+ self.password = password
+ if self.user is None:
+ raise ValueError('User must be given to either ImapMailbox '
+ 'constructor or login')
+ if self.password is None:
+ raise ValueError('Password must be given to either ImapMailbox '
+ 'constructor or login')
+ typ, data = super(ImapMailbox, self).login(self.user, self.password)
+ if typ != 'OK':
+ raise ImapError('login', typ, data)
+ self._select('INBOX')
+ return data
+
+ def logout(self):
+ """Log out of mailbox. Returns data returned by imap logout."""
+ typ, data = super(ImapMailbox, self).logout()
+ if self._debug_log_handle:
+ self._debug_log_handle.close()
+ if typ != 'BYE':
+ raise ImapError('logout', typ, data)
+ self._selected = None
+ return data
+
+ def _select(self, folder):
+ """Internal helper that remembers what folder is selected."""
+ if not folder:
+ return
+ if folder == self._selected:
+ return
+ # select clears all unsolicited responses, so no use to clear them now
+ typ, data = self.select(quote_imap_folder(folder))
+ if typ != 'OK':
+ raise ImapError('select', typ, data)
+ self._selected = folder
+
+ def _clear_unsolicited_responses(self, cmd):
+ """
+ Clear mailbox's memory of unsolicited responses for given cmd.
+
+ Next call to `cmd` should only give the expected responses.
+
+ :param str cmd: next command to run on mailbox.
+ """
+ self.response(cmd)
+
+ def list_all_mail(self, folder=None):
+ """List all message ids in given folder."""
+ self._select(folder)
+ self._clear_unsolicited_responses('SEARCH')
+ typ, data = super(ImapMailbox, self).search(None, 'ALL')
+ if typ != 'OK':
+ raise ImapError('search', typ, data)
+ if len(data) != 1:
+ raise ImapError('search', typ,
+ 'Data has not len 1 but {}'.format(len(data)))
+ return tuple(int(message_id) for message_id in data[0].split())
+
+ def copy_message(self, from_folder, message_id, to_folder, del_orig=False):
+ """
+ Copy message with given id from from_folder to to_folder.
+
+ :param str from_folder: Folder containing the message
+ :param message_id: ID of message in `from_folder` that will be copied
+ :type message_id: int or str
+ :param str to_folder: Target folder
+ :param bool del_orig: Delete original message, making this "copy" a
+ "move" operation
+ """
+ self._select(from_folder)
+ self._clear_unsolicited_responses('COPY')
+ typ, data = super(ImapMailbox, self).copy(str(message_id),
+ quote_imap_folder(to_folder))
+ if typ != 'OK':
+ raise ImapError('copy', typ, data)
+ if del_orig:
+ self._clear_unsolicited_responses('FETCH')
+ typ, data = self.store(message_id, '+FLAGS', r'(\Deleted)')
+ if typ != 'OK':
+ raise ImapError('store', typ, data)
+ self._clear_unsolicited_responses('EXPUNGE')
+ typ, _ = self.expunge()
+ if typ != 'OK':
+ raise ImapError('expunge', typ, data)