1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
24 ------------------------------------------------------
25 Connection to an IMAP mailbox
27 Convenience wrapper around :py:class:`imaplib.IMAP4`
29 Features provided in addition to :py:class:`imaplib.IMAP4`::
30 * dealing with unsolicited responses (missing in most other imap libs)
31 * conversion of `BAD` status responses to errors
32 * simple quoting of imap folder names
33 * debug logging to console/file
34 * memory of which folder is currently selected
35 * function to copy/move an email from one folder to another
36 * deal with some of the variability of returned values from `fetch` command
37 * created email message objects from byte data
39 Copyright: Intra2net AG
43 ------------------------------------------------------
50 from email import policy
51 from email import message_from_bytes
53 log = logging.getLogger('pyi2ncommon.imap_mailbox')
56 class ImapError(Exception):
57 """Exception raised when some imap command fails."""
59 def __init__(self, command, status, message):
60 """Create a new ImapError for given command and response."""
61 super(ImapError, self).__init__('IMAP command {!r} returned {}: {}'
62 .format(command, status, message))
63 self.command = command
65 self.message = message
68 def quote_imap_folder(folder):
70 Quote name of a folder if required by IMAP.
72 Folder names must be quoted if they contain e.g. spaces. This
73 implementation is incomplete, but should suffice for current use.
75 :param str folder: Name of a folder in an IMAP mailbox, possibly quoted
76 :returns: same folder name, possibly with added quotes
81 if folder[0] == '"' and folder[-1] == '"':
82 return folder # is already quoted
83 if any(special in folder for special in ' (){}"\\[]'):
84 # see RFC3501 $5.1 and $9
85 return '"' + folder + '"' # quote the folder
86 return folder.encode('ascii', 'strict')
89 class ImapMailbox(imaplib.IMAP4):
91 Convenience to access and query imap mailbox.
93 Raises :py:class:`ImapError` if commands fail, instead of returning 2-tuple
94 (status, data) with status being "OK" or "BAD" or "NO" or so.
96 Use as ContextManager::
98 with ImapMailbox(username, password) as mbox:
99 mbox.list_all_mail('INBOX')
102 Python's imaplib really offers little comfort for users, so we have to deal
104 - unsolicited responses
105 - converting BAD response to error
106 - encoding to/from bytes
107 - parsing output like b'[1 2 3]'
108 - quoting of folder names (e.g. add "" around folders with spaces)
110 Before adding too much capability here, consider using libraries that do
111 all that already like ImapClient (which unfortunately does not deal with
112 unsolicited reponses)...
114 Cannot deal with unicode folder names.
117 def __init__(self, user=None, password=None, host='localhost',
120 Create a new connection to an imap mailbox.
122 User and password can be given either here or to method
125 :param str user: User name for imap mailbox, optional
126 :param str password: Password for logging into mailbox
127 :param str host: Host name, defaults to localhost
128 :param debug_log: Either bool to enable/disable debug logging or a file
129 name to log to that file
130 :type debug_log: bool or str
132 super(ImapMailbox, self).__init__(host)
135 self.password = password
136 self._selected = None
137 self._debug_log_handle = None
139 self.debug = 4 # enable debugging in super class
140 # overwrite prviate debug log function of base class with own.
141 # this is slightly hacky since depends on private implementation
142 if isinstance(debug_log, str):
143 self._debug_log_handle = open(debug_log, 'at')
144 self._mesg = self._debug_log_to_file # logs to file
146 # uses logging instead of print; ignore optional secs argument
147 self._mesg = lambda message, _=None: \
148 log.debug('IMAP: ' + message)
149 self._mesg('Debug logging enabled in {}'.format(self))
152 """Create a textual representation of this object."""
153 if self.user is None:
154 return '[ImapMailbox on {}]'.format(self.host)
155 return '[ImapMailbox for {}@{}]'.format(self.user, self.host)
159 Called upon entering context manager. Logs in.
161 Requires user and password be given to constructor already.
166 def __exit__(self, *args):
167 """Called upon leaving context manager. Logs out. Ignores `args`"""
173 def _debug_log_to_file(self, message, secs=None):
175 Redirect debug message to file instead of stderr.
177 :param str message: Debug message
178 :param int secs: Timestamp of message, defaults to :py:func:`time.time`
182 timestamp = time.strftime('%H:%M:%S', time.localtime(secs))
183 prefix = '%s.%03d: ' % (timestamp, (secs*1000)%1000)
184 for line in message.splitlines():
185 self._debug_log_handle.write(prefix + line)
186 if not message.endswith('\n'):
187 self._debug_log_handle.write('\n')
189 def login(self, user=None, password=None):
193 :param str user: User name to use for login. Overrides the one given in
194 constructor. Must be given if not given to constructor
195 :param str password: Password for login. Same restrictions as user
196 :returns: first return item from imap login.
200 if password is not None:
201 self.password = password
202 if self.user is None:
203 raise ValueError('User must be given to either ImapMailbox '
204 'constructor or login')
205 if self.password is None:
206 raise ValueError('Password must be given to either ImapMailbox '
207 'constructor or login')
208 typ, data = super(ImapMailbox, self).login(self.user, self.password)
210 raise ImapError('login', typ, data)
211 self._select('INBOX')
215 """Log out of mailbox. Returns data returned by imap logout."""
216 typ, data = super(ImapMailbox, self).logout()
217 if self._debug_log_handle:
218 self._debug_log_handle.close()
220 raise ImapError('logout', typ, data)
221 self._selected = None
224 def _select(self, folder):
225 """Internal helper that remembers what folder is selected."""
228 if folder == self._selected:
230 # select clears all unsolicited responses, so no use to clear them now
231 typ, data = self.select(quote_imap_folder(folder))
233 raise ImapError('select', typ, data)
234 self._selected = folder
236 def _clear_unsolicited_responses(self, cmd):
238 Clear mailbox's memory of unsolicited responses for given cmd.
240 Next call to `cmd` should only give the expected responses.
242 :param str cmd: next command to run on mailbox.
246 def list_all_mail(self, folder=None):
247 """List all message ids in given folder."""
249 self._clear_unsolicited_responses('SEARCH')
250 typ, data = super(ImapMailbox, self).search(None, 'ALL')
252 raise ImapError('search', typ, data)
254 raise ImapError('search', typ,
255 'Data has not len 1 but {}'.format(len(data)))
256 return tuple(int(message_id) for message_id in data[0].split())
258 def copy_message(self, from_folder, message_id, to_folder, del_orig=False):
260 Copy message with given id from from_folder to to_folder.
262 :param str from_folder: Folder containing the message
263 :param message_id: ID of message in `from_folder` that will be copied
264 :type message_id: int or str
265 :param str to_folder: Target folder
266 :param bool del_orig: Delete original message, making this "copy" a
269 self._select(from_folder)
270 self._clear_unsolicited_responses('COPY')
271 typ, data = super(ImapMailbox, self).copy(str(message_id),
272 quote_imap_folder(to_folder))
274 raise ImapError('copy', typ, data)
276 self._clear_unsolicited_responses('FETCH')
277 typ, data = self.store(message_id, '+FLAGS', r'(\Deleted)')
279 raise ImapError('store', typ, data)
280 self._clear_unsolicited_responses('EXPUNGE')
281 typ, _ = self.expunge()
283 raise ImapError('expunge', typ, data)
285 def fetch_mail(self, message_id, folder=None, part='RFC822'):
287 Fetch and return an email in RFC822 format.
289 .. seealso:: :py:meth:`fetch_message`
291 :param int message_id: Message id in given folder
292 :param folder: Folder that contains the message or None (default) if
293 folder is already selected.
294 :type folder: str or None
295 :param str part: Message part to fetch. For simplicity of return args,
296 only one part can be fetched at a time. Other possible
297 values: ``UID``, ``BODY[TEXT]``, ...
298 :returns: requested message part
302 self._clear_unsolicited_responses('SEARCH')
303 typ, data = super(ImapMailbox, self).fetch(str(message_id), part)
305 raise ImapError('fetch', typ, data)
307 # have to be flexible with returned data here...
308 # should be a list of len 1 of 2-tuples, but reality differs
309 if len(data) == 2 and data[1] == b')':
312 raise ImapError('fetch', typ,
313 'Data has not len 1 but {}'.format(len(data)))
315 if part.lower() == 'uid' and isinstance(data, bytes):
318 raise ImapError('fetch', typ,
319 'Data[0] has len {} != 2'.format(len(data)))
322 def fetch_message(self, message_id, folder=None):
324 Fetch complete message and convert to message object.
326 All params are forwarded to :py:meth:`fetch_mail`. Afterwards, the
327 resulting data is parsed into a :py:class:`email.message.EmailMessage`
328 object (for python versions < 3.4: :py:class:`email.message.Message`).
330 return message_from_bytes(
331 self.fetch_mail(message_id, folder, part='RFC822'),
332 policy=policy.default + policy.strict)