Create ImapMailbox, a wrapper around python's IMAP4
authorChristian Herdtweck <christian.herdtweck@intra2net.com>
Fri, 7 Jun 2019 07:21:15 +0000 (09:21 +0200)
committerChristian Herdtweck <christian.herdtweck@intra2net.com>
Thu, 8 Aug 2019 09:54:43 +0000 (11:54 +0200)
Python's built-in imap class offers little convenience. Add some
of our own with this class.

src/imap_mailbox.py [new file with mode: 0644]
src/mail_utils.py

diff --git a/src/imap_mailbox.py b/src/imap_mailbox.py
new file mode 100644 (file)
index 0000000..7b79f1f
--- /dev/null
@@ -0,0 +1,279 @@
+# The software in this package is distributed under the GNU General
+# Public License version 2 (with a special exception described below).
+#
+# A copy of GNU General Public License (GPL) is included in this distribution,
+# in the file COPYING.GPL.
+#
+# As a special exception, if other files instantiate templates or use macros
+# or inline functions from this file, or you compile this file and link it
+# with other works to produce a work based on this file, this file
+# does not by itself cause the resulting work to be covered
+# by the GNU General Public License.
+#
+# However the source code for this file must still be made available
+# in accordance with section (3) of the GNU General Public License.
+#
+# This exception does not invalidate any other reasons why a work based
+# on this file might be covered by the GNU General Public License.
+#
+# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
+
+"""
+
+SUMMARY
+------------------------------------------------------
+Connection to an IMAP mailbox
+
+Convenience wrapper around :py:class:`imaplib.IMAP4`
+
+Features provided in addition to :py:class:`imaplib.IMAP4`::
+  * dealing with unsolicited responses (missing in most other imap libs)
+  * conversion of `BAD` status responses to errors
+  * simple quoting of imap folder names
+  * debug logging to console/file
+  * memory of which folder is currently selected
+  * function to copy/move an email from one folder to another
+
+Copyright: Intra2net AG
+
+
+INTERFACE
+------------------------------------------------------
+
+"""
+
+import imaplib
+import logging
+import time
+
+log = logging.getLogger('pyi2ncommon.imap_mailbox')
+
+
+class ImapError(Exception):
+    """Exception raised when some imap command fails."""
+
+    def __init__(self, command, status, message):
+        """Create a new ImapError for given command and response."""
+        super(ImapError, self).__init__('IMAP command {!r} returned {}: {}'
+                                        .format(command, status, message))
+        self.command = command
+        self.status = status
+        self.message = message
+
+
+def quote_imap_folder(folder):
+    """
+    Quote name of a folder if required by IMAP.
+
+    Folder names must be quoted if they contain e.g. spaces. This
+    implementation is incomplete, but should suffice for current use.
+
+    :param str folder: Name of a folder in an IMAP mailbox, possibly quoted
+    :returns: same folder name, possibly with added quotes
+    :rtype: byte
+    """
+    if not folder:
+        return folder
+    if folder[0] == '"' and folder[-1] == '"':
+        return folder    # is already quoted
+    if any(special in folder for special in ' (){}"\\[]'):
+        # see RFC3501 $5.1 and $9
+        return '"' + folder + '"'   # quote the folder
+    return folder.encode('ascii', 'strict')
+
+
+class ImapMailbox(imaplib.IMAP4):
+    """
+    Convenience to access and query imap mailbox.
+
+    Raises :py:class:`ImapError` if commands fail, instead of returning 2-tuple
+    (status, data) with status being "OK" or "BAD" or "NO" or so.
+
+    Use as ContextManager::
+
+        with ImapMailbox(username, password) as mbox:
+            mbox.list_all_mail('INBOX')
+            # do something
+
+    Python's imaplib really offers little comfort for users, so we have to deal
+    on our own with:
+    - unsolicited responses
+    - converting BAD response to error
+    - encoding to/from bytes
+    - parsing output like b'[1 2 3]'
+    - quoting of folder names (e.g. add "" around folders with spaces)
+    - ...
+    Before adding too much capability here, consider using libraries that do
+    all that already like ImapClient (which unfortunately does not deal with
+    unsolicited reponses)...
+
+    Cannot deal with unicode folder names.
+    """
+
+    def __init__(self, user=None, password=None, host='localhost',
+                 debug_log=False):
+        """
+        Create a new connection to an imap mailbox.
+
+        User and password can be given either here or to method
+        :py:meth:`login`.
+
+        :param str user: User name for imap mailbox, optional
+        :param str password: Password for logging into mailbox
+        :param str host: Host name, defaults to localhost
+        :param debug_log: Either bool to enable/disable debug logging or a file
+                          name to log to that file
+        :type debug_log: bool or str
+        """
+        super(ImapMailbox, self).__init__(host)
+        self.host = host
+        self.user = user
+        self.password = password
+        self._selected = None
+        self._debug_log_handle = None
+        if debug_log:
+            self.debug = 4   # enable debugging in super class
+            # overwrite prviate debug log function of base class with own.
+            # this is slightly hacky since depends on private implementation
+            if isinstance(debug_log, str):
+                self._debug_log_handle = open(debug_log, 'at')
+                self._mesg = self._debug_log_to_file    # logs to file
+            else:
+                # uses logging instead of print; ignore optional secs argument
+                self._mesg = lambda message, _=None: \
+                    log.debug('IMAP: ' + message)
+            self._mesg('Debug logging enabled in {}'.format(self))
+
+    def __str__(self):
+        """Create a textual representation of this object."""
+        if self.user is None:
+            return '[ImapMailbox on {}]'.format(self.host)
+        return '[ImapMailbox for {}@{}]'.format(self.user, self.host)
+
+    def __enter__(self):
+        """
+        Called upon entering context manager. Logs in.
+
+        Requires user and password be given to constructor already.
+        """
+        self.login()
+        return self
+
+    def __exit__(self, *args):
+        """Called upon leaving context manager. Logs out. Ignores `args`"""
+        try:
+            self.logout()
+        except OSError:
+            pass
+
+    def _debug_log_to_file(self, message, secs=None):
+        """
+        Redirect debug message to file instead of stderr.
+
+        :param str message: Debug message
+        :param int secs: Timestamp of message, defaults to :py:func:`time.time`
+        """
+        if secs is None:
+            secs = time.time()
+        timestamp = time.strftime('%H:%M:%S', time.localtime(secs))
+        prefix = '%s.%03d: ' % (timestamp, (secs*1000)%1000)
+        for line in message.splitlines():
+            self._debug_log_handle.write(prefix + line)
+        if not message.endswith('\n'):
+            self._debug_log_handle.write('\n')
+
+    def login(self, user=None, password=None):
+        """
+        Log in to mailbox.
+
+        :param str user: User name to use for login. Overrides the one given in
+                         constructor. Must be given if not given to constructor
+        :param str password: Password for login. Same restrictions as user
+        :returns: Whatever imap login returns.
+        """
+        if user is not None:
+            self.user = user
+        if password is not None:
+            self.password = password
+        if self.user is None:
+            raise ValueError('User must be given to either ImapMailbox '
+                             'constructor or login')
+        if self.password is None:
+            raise ValueError('Password must be given to either ImapMailbox '
+                             'constructor or login')
+        typ, data = super(ImapMailbox, self).login(self.user, self.password)
+        if typ != 'OK':
+            raise ImapError('login', typ, data)
+        self._select('INBOX')
+        return data
+
+    def logout(self):
+        """Log out of mailbox. Returns data returned by imap logout."""
+        typ, data = super(ImapMailbox, self).logout()
+        if self._debug_log_handle:
+            self._debug_log_handle.close()
+        if typ != 'BYE':
+            raise ImapError('logout', typ, data)
+        self._selected = None
+        return data
+
+    def _select(self, folder):
+        """Internal helper that remembers what folder is selected."""
+        if not folder:
+            return
+        if folder == self._selected:
+            return
+        # select clears all unsolicited responses, so no use to clear them now
+        typ, data = self.select(quote_imap_folder(folder))
+        if typ != 'OK':
+            raise ImapError('select', typ, data)
+        self._selected = folder
+
+    def _clear_unsolicited_responses(self, cmd):
+        """
+        Clear mailbox's memory of unsolicited responses for given cmd.
+
+        Next call to `cmd` should only give the expected responses.
+
+        :param str cmd: next command to run on mailbox.
+        """
+        self.response(cmd)
+
+    def list_all_mail(self, folder=None):
+        """List all message ids in given folder."""
+        self._select(folder)
+        self._clear_unsolicited_responses('SEARCH')
+        typ, data = super(ImapMailbox, self).search(None, 'ALL')
+        if typ != 'OK':
+            raise ImapError('search', typ, data)
+        if len(data) != 1:
+            raise ImapError('search', typ,
+                            'Data has not len 1 but {}'.format(len(data)))
+        return tuple(int(message_id) for message_id in data[0].split())
+
+    def copy_message(self, from_folder, message_id, to_folder, del_orig=False):
+        """
+        Copy message with given id from from_folder to to_folder.
+
+        :param str from_folder: Folder containing the message
+        :param message_id: ID of message in `from_folder` that will be copied
+        :type message_id: int or str
+        :param str to_folder: Target folder
+        :param bool del_orig: Delete original message, making this "copy" a
+                              "move" operation
+        """
+        self._select(from_folder)
+        self._clear_unsolicited_responses('COPY')
+        typ, data = super(ImapMailbox, self).copy(str(message_id),
+                                                  quote_imap_folder(to_folder))
+        if typ != 'OK':
+            raise ImapError('copy', typ, data)
+        if del_orig:
+            self._clear_unsolicited_responses('FETCH')
+            typ, data = self.store(message_id, '+FLAGS', r'(\Deleted)')
+            if typ != 'OK':
+                raise ImapError('store', typ, data)
+            self._clear_unsolicited_responses('EXPUNGE')
+            typ, _ = self.expunge()
+            if typ != 'OK':
+                raise ImapError('expunge', typ, data)
index e6a0c7a..a989339 100644 (file)
@@ -57,6 +57,7 @@ from email.parser import Parser
 import mimetypes
 
 from . import arnied_wrapper
+from imap_mailbox import ImapMailbox
 
 log = logging.getLogger('pyi2ncommon.mail_utils')