1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
22 Connection to an IMAP mailbox
24 Convenience wrapper around :py:class:`imaplib.IMAP4`
26 Features provided in addition to :py:class:`imaplib.IMAP4`::
27 * dealing with unsolicited responses (missing in most other imap libs)
28 * conversion of `BAD` status responses to errors
29 * simple quoting of imap folder names
30 * debug logging to console/file
31 * memory of which folder is currently selected
32 * function to copy/move an email from one folder to another
33 * deal with some of the variability of returned values from `fetch` command
34 * created email message objects from byte data
37 * use SSL connection per default, unencrypted only as option
39 Copyright: Intra2net AG
45 from email import policy
46 from email import message_from_bytes
48 log = logging.getLogger('pyi2ncommon.imap_mailbox')
51 class ImapError(Exception):
52 """Exception raised when some imap command fails."""
54 def __init__(self, command, status, message):
55 """Create a new ImapError for given command and response."""
56 super(ImapError, self).__init__('IMAP command {!r} returned {}: {}'
57 .format(command, status, message))
58 self.command = command
60 self.message = message
63 def quote_imap_folder(folder):
65 Quote name of a folder if required by IMAP.
67 Folder names must be quoted if they contain e.g. spaces. This
68 implementation is incomplete, but should suffice for current use.
70 :param str folder: Name of a folder in an IMAP mailbox, possibly quoted
71 :returns: same folder name as bytes, possibly with added quotes
76 if folder[0] == '"' and folder[-1] == '"':
77 return folder.encode('ascii', 'strict') # is already quoted
78 if any(special in folder for special in ' (){}"\\[]'):
79 # see RFC3501 $5.1 and $9
80 return f'"{folder}"'.encode('ascii', 'strict') # quote the folder
81 return folder.encode('ascii', 'strict')
84 class ImapMailbox(imaplib.IMAP4):
86 Convenience to access and query imap mailbox.
88 Raises :py:class:`ImapError` if commands fail, instead of returning 2-tuple
89 (status, data) with status being "OK" or "BAD" or "NO" or so.
91 Use as ContextManager::
93 with ImapMailbox(username, password) as mbox:
94 mbox.list_all_mail('INBOX')
97 Python's imaplib really offers little comfort for users, so we have to deal
99 - unsolicited responses
100 - converting BAD response to error
101 - encoding to/from bytes
102 - parsing output like b'[1 2 3]'
103 - quoting of folder names (e.g. add "" around folders with spaces)
105 Before adding too much capability here, consider using libraries that do
106 all that already like ImapClient (which unfortunately does not deal with
107 unsolicited responses)...
109 Cannot deal with unicode folder names.
112 def __init__(self, user=None, password=None, host='localhost',
115 Create a new connection to an imap mailbox.
117 User and password can be given either here or to method
120 :param str user: Username for imap mailbox, optional
121 :param str password: Password for logging into mailbox
122 :param str host: Host name, defaults to localhost
123 :param debug_log: Either bool to enable/disable debug logging or a file
124 name to log to that file
125 :type debug_log: bool or str
127 super(ImapMailbox, self).__init__(host)
130 self.password = password
131 self._selected = None
132 self._debug_log_handle = None
134 self.debug = 4 # enable debugging in super class
135 # overwrite private debug log function of base class with own.
136 # this is slightly hacky since depends on private implementation
137 if isinstance(debug_log, str):
138 self._debug_log_handle = open(debug_log, 'at')
139 self._mesg = self._debug_log_to_file # logs to file
141 # uses logging instead of print; ignore optional secs argument
142 self._mesg = lambda message, _=None: \
143 log.debug('IMAP: ' + message)
144 self._mesg('Debug logging enabled in {}'.format(self))
147 """Create a textual representation of this object."""
148 if self.user is None:
149 return '[ImapMailbox on {}]'.format(self.host)
150 return '[ImapMailbox for {}@{}]'.format(self.user, self.host)
154 Called upon entering context manager. Logs in.
156 Requires user and password be given to constructor already.
161 def __exit__(self, *args):
162 """Called upon leaving context manager. Logs out. Ignores `args`"""
168 def _debug_log_to_file(self, message, secs=None):
170 Redirect debug message to file instead of stderr.
172 :param str message: Debug message
173 :param int secs: Timestamp of message, defaults to :py:func:`time.time`
177 timestamp = time.strftime('%H:%M:%S', time.localtime(secs))
178 prefix = '%s.%03d: ' % (timestamp, (secs*1000)%1000)
179 for line in message.splitlines():
180 self._debug_log_handle.write(prefix + line)
181 if not message.endswith('\n'):
182 self._debug_log_handle.write('\n')
184 def login(self, user=None, password=None):
188 :param str user: Username to use for login. Overrides the one given in
189 constructor. Must be given if not given to constructor
190 :param str password: Password for login. Same restrictions as user
191 :returns: first return item from imap login.
195 if password is not None:
196 self.password = password
197 if self.user is None:
198 raise ValueError('User must be given to either ImapMailbox '
199 'constructor or login')
200 if self.password is None:
201 raise ValueError('Password must be given to either ImapMailbox '
202 'constructor or login')
203 typ, data = super(ImapMailbox, self).login(self.user, self.password)
205 raise ImapError('login', typ, data)
206 self._select('INBOX')
210 """Log out of mailbox. Returns data returned by imap logout."""
211 typ, data = super(ImapMailbox, self).logout()
212 if self._debug_log_handle:
213 self._debug_log_handle.close()
215 raise ImapError('logout', typ, data)
216 self._selected = None
219 def _select(self, folder):
220 """Internal helper that remembers what folder is selected."""
223 if folder == self._selected:
225 # select clears all unsolicited responses, so no use to clear them now
226 typ, data = self.select(quote_imap_folder(folder))
228 raise ImapError('select', typ, data)
229 self._selected = folder
231 def _clear_unsolicited_responses(self, cmd):
233 Clear mailbox's memory of unsolicited responses for given cmd.
235 Next call to `cmd` should only give the expected responses.
237 :param str cmd: next command to run on mailbox.
241 def list_all_mail(self, folder=None):
242 """List all message ids in given folder."""
244 self._clear_unsolicited_responses('SEARCH')
245 typ, data = super(ImapMailbox, self).search(None, 'ALL')
247 raise ImapError('search', typ, data)
249 raise ImapError('search', typ,
250 'Data has not len 1 but {}'.format(len(data)))
251 return tuple(int(message_id) for message_id in data[0].split())
253 def copy_message(self, from_folder, message_id, to_folder, del_orig=False):
255 Copy message with given id from from_folder to to_folder.
257 :param str from_folder: Folder containing the message
258 :param message_id: ID of message in `from_folder` that will be copied
259 :type message_id: int or str
260 :param str to_folder: Target folder
261 :param bool del_orig: Delete original message, making this "copy" a
264 self._select(from_folder)
265 self._clear_unsolicited_responses('COPY')
266 typ, data = super(ImapMailbox, self).copy(str(message_id),
267 quote_imap_folder(to_folder))
269 raise ImapError('copy', typ, data)
271 self._clear_unsolicited_responses('FETCH')
272 typ, data = self.store(message_id, '+FLAGS', r'(\Deleted)')
274 raise ImapError('store', typ, data)
275 self._clear_unsolicited_responses('EXPUNGE')
276 typ, _ = self.expunge()
278 raise ImapError('expunge', typ, data)
280 def fetch_mail(self, message_id, folder=None, part='RFC822'):
282 Fetch and return an email in RFC822 format.
284 .. seealso:: :py:meth:`fetch_message`
286 :param int message_id: Message id in given folder
287 :param folder: Folder that contains the message or None (default) if
288 folder is already selected.
289 :type folder: str or None
290 :param str part: Message part to fetch. For simplicity of return args,
291 only one part can be fetched at a time. Other possible
292 values: ``UID``, ``BODY[TEXT]``, ...
293 :returns: requested message part
297 self._clear_unsolicited_responses('FETCH')
298 typ, data = super(ImapMailbox, self).fetch(str(message_id), part)
300 raise ImapError('fetch', typ, data)
302 # have to be flexible with returned data here...
303 # should be a list of len 1 of 2-tuples, but reality differs
304 if len(data) == 2 and data[1] == b')':
307 raise ImapError('fetch', typ,
308 'Data has not len 1 but {}'.format(len(data)))
310 if part.lower() == 'uid' and isinstance(data, bytes):
313 raise ImapError('fetch', typ,
314 'Data[0] has len {} != 2'.format(len(data)))
317 def fetch_message(self, message_id, folder=None):
319 Fetch complete message and convert to message object.
321 All params are forwarded to :py:meth:`fetch_mail`. Afterwards, the
322 resulting data is parsed into a :py:class:`email.message.EmailMessage`
323 object (for python versions < 3.4: :py:class:`email.message.Message`).
325 return message_from_bytes(
326 self.fetch_mail(message_id, folder, part='RFC822'),
327 policy=policy.default + policy.strict)