1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
24 ------------------------------------------------------
25 Connection to an IMAP mailbox
27 Convenience wrapper around :py:class:`imaplib.IMAP4`
29 Features provided in addition to :py:class:`imaplib.IMAP4`::
30 * dealing with unsolicited responses (missing in most other imap libs)
31 * conversion of `BAD` status responses to errors
32 * simple quoting of imap folder names
33 * debug logging to console/file
34 * memory of which folder is currently selected
35 * function to copy/move an email from one folder to another
36 * deal with some of the variability of returned values from `fetch` command
37 * created email message objects from byte data
40 * use SSL connection per default, unencrypted only as option
42 Copyright: Intra2net AG
46 ------------------------------------------------------
53 from email import policy
54 from email import message_from_bytes
56 log = logging.getLogger('pyi2ncommon.imap_mailbox')
59 class ImapError(Exception):
60 """Exception raised when some imap command fails."""
62 def __init__(self, command, status, message):
63 """Create a new ImapError for given command and response."""
64 super(ImapError, self).__init__('IMAP command {!r} returned {}: {}'
65 .format(command, status, message))
66 self.command = command
68 self.message = message
71 def quote_imap_folder(folder):
73 Quote name of a folder if required by IMAP.
75 Folder names must be quoted if they contain e.g. spaces. This
76 implementation is incomplete, but should suffice for current use.
78 :param str folder: Name of a folder in an IMAP mailbox, possibly quoted
79 :returns: same folder name, possibly with added quotes
84 if folder[0] == '"' and folder[-1] == '"':
85 return folder # is already quoted
86 if any(special in folder for special in ' (){}"\\[]'):
87 # see RFC3501 $5.1 and $9
88 return '"' + folder + '"' # quote the folder
89 return folder.encode('ascii', 'strict')
92 class ImapMailbox(imaplib.IMAP4):
94 Convenience to access and query imap mailbox.
96 Raises :py:class:`ImapError` if commands fail, instead of returning 2-tuple
97 (status, data) with status being "OK" or "BAD" or "NO" or so.
99 Use as ContextManager::
101 with ImapMailbox(username, password) as mbox:
102 mbox.list_all_mail('INBOX')
105 Python's imaplib really offers little comfort for users, so we have to deal
107 - unsolicited responses
108 - converting BAD response to error
109 - encoding to/from bytes
110 - parsing output like b'[1 2 3]'
111 - quoting of folder names (e.g. add "" around folders with spaces)
113 Before adding too much capability here, consider using libraries that do
114 all that already like ImapClient (which unfortunately does not deal with
115 unsolicited reponses)...
117 Cannot deal with unicode folder names.
120 def __init__(self, user=None, password=None, host='localhost',
123 Create a new connection to an imap mailbox.
125 User and password can be given either here or to method
128 :param str user: User name for imap mailbox, optional
129 :param str password: Password for logging into mailbox
130 :param str host: Host name, defaults to localhost
131 :param debug_log: Either bool to enable/disable debug logging or a file
132 name to log to that file
133 :type debug_log: bool or str
135 super(ImapMailbox, self).__init__(host)
138 self.password = password
139 self._selected = None
140 self._debug_log_handle = None
142 self.debug = 4 # enable debugging in super class
143 # overwrite prviate debug log function of base class with own.
144 # this is slightly hacky since depends on private implementation
145 if isinstance(debug_log, str):
146 self._debug_log_handle = open(debug_log, 'at')
147 self._mesg = self._debug_log_to_file # logs to file
149 # uses logging instead of print; ignore optional secs argument
150 self._mesg = lambda message, _=None: \
151 log.debug('IMAP: ' + message)
152 self._mesg('Debug logging enabled in {}'.format(self))
155 """Create a textual representation of this object."""
156 if self.user is None:
157 return '[ImapMailbox on {}]'.format(self.host)
158 return '[ImapMailbox for {}@{}]'.format(self.user, self.host)
162 Called upon entering context manager. Logs in.
164 Requires user and password be given to constructor already.
169 def __exit__(self, *args):
170 """Called upon leaving context manager. Logs out. Ignores `args`"""
176 def _debug_log_to_file(self, message, secs=None):
178 Redirect debug message to file instead of stderr.
180 :param str message: Debug message
181 :param int secs: Timestamp of message, defaults to :py:func:`time.time`
185 timestamp = time.strftime('%H:%M:%S', time.localtime(secs))
186 prefix = '%s.%03d: ' % (timestamp, (secs*1000)%1000)
187 for line in message.splitlines():
188 self._debug_log_handle.write(prefix + line)
189 if not message.endswith('\n'):
190 self._debug_log_handle.write('\n')
192 def login(self, user=None, password=None):
196 :param str user: User name to use for login. Overrides the one given in
197 constructor. Must be given if not given to constructor
198 :param str password: Password for login. Same restrictions as user
199 :returns: first return item from imap login.
203 if password is not None:
204 self.password = password
205 if self.user is None:
206 raise ValueError('User must be given to either ImapMailbox '
207 'constructor or login')
208 if self.password is None:
209 raise ValueError('Password must be given to either ImapMailbox '
210 'constructor or login')
211 typ, data = super(ImapMailbox, self).login(self.user, self.password)
213 raise ImapError('login', typ, data)
214 self._select('INBOX')
218 """Log out of mailbox. Returns data returned by imap logout."""
219 typ, data = super(ImapMailbox, self).logout()
220 if self._debug_log_handle:
221 self._debug_log_handle.close()
223 raise ImapError('logout', typ, data)
224 self._selected = None
227 def _select(self, folder):
228 """Internal helper that remembers what folder is selected."""
231 if folder == self._selected:
233 # select clears all unsolicited responses, so no use to clear them now
234 typ, data = self.select(quote_imap_folder(folder))
236 raise ImapError('select', typ, data)
237 self._selected = folder
239 def _clear_unsolicited_responses(self, cmd):
241 Clear mailbox's memory of unsolicited responses for given cmd.
243 Next call to `cmd` should only give the expected responses.
245 :param str cmd: next command to run on mailbox.
249 def list_all_mail(self, folder=None):
250 """List all message ids in given folder."""
252 self._clear_unsolicited_responses('SEARCH')
253 typ, data = super(ImapMailbox, self).search(None, 'ALL')
255 raise ImapError('search', typ, data)
257 raise ImapError('search', typ,
258 'Data has not len 1 but {}'.format(len(data)))
259 return tuple(int(message_id) for message_id in data[0].split())
261 def copy_message(self, from_folder, message_id, to_folder, del_orig=False):
263 Copy message with given id from from_folder to to_folder.
265 :param str from_folder: Folder containing the message
266 :param message_id: ID of message in `from_folder` that will be copied
267 :type message_id: int or str
268 :param str to_folder: Target folder
269 :param bool del_orig: Delete original message, making this "copy" a
272 self._select(from_folder)
273 self._clear_unsolicited_responses('COPY')
274 typ, data = super(ImapMailbox, self).copy(str(message_id),
275 quote_imap_folder(to_folder))
277 raise ImapError('copy', typ, data)
279 self._clear_unsolicited_responses('FETCH')
280 typ, data = self.store(message_id, '+FLAGS', r'(\Deleted)')
282 raise ImapError('store', typ, data)
283 self._clear_unsolicited_responses('EXPUNGE')
284 typ, _ = self.expunge()
286 raise ImapError('expunge', typ, data)
288 def fetch_mail(self, message_id, folder=None, part='RFC822'):
290 Fetch and return an email in RFC822 format.
292 .. seealso:: :py:meth:`fetch_message`
294 :param int message_id: Message id in given folder
295 :param folder: Folder that contains the message or None (default) if
296 folder is already selected.
297 :type folder: str or None
298 :param str part: Message part to fetch. For simplicity of return args,
299 only one part can be fetched at a time. Other possible
300 values: ``UID``, ``BODY[TEXT]``, ...
301 :returns: requested message part
305 self._clear_unsolicited_responses('FETCH')
306 typ, data = super(ImapMailbox, self).fetch(str(message_id), part)
308 raise ImapError('fetch', typ, data)
310 # have to be flexible with returned data here...
311 # should be a list of len 1 of 2-tuples, but reality differs
312 if len(data) == 2 and data[1] == b')':
315 raise ImapError('fetch', typ,
316 'Data has not len 1 but {}'.format(len(data)))
318 if part.lower() == 'uid' and isinstance(data, bytes):
321 raise ImapError('fetch', typ,
322 'Data[0] has len {} != 2'.format(len(data)))
325 def fetch_message(self, message_id, folder=None):
327 Fetch complete message and convert to message object.
329 All params are forwarded to :py:meth:`fetch_mail`. Afterwards, the
330 resulting data is parsed into a :py:class:`email.message.EmailMessage`
331 object (for python versions < 3.4: :py:class:`email.message.Message`).
333 return message_from_bytes(
334 self.fetch_mail(message_id, folder, part='RFC822'),
335 policy=policy.default + policy.strict)