a27f6122c6ded15fa6b4343d360f0c303510ee18
[pyi2ncommon] / src / imap_mailbox.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
20
21 """
22
23 SUMMARY
24 ------------------------------------------------------
25 Connection to an IMAP mailbox
26
27 Convenience wrapper around :py:class:`imaplib.IMAP4`
28
29 Features provided in addition to :py:class:`imaplib.IMAP4`::
30   * dealing with unsolicited responses (missing in most other imap libs)
31   * conversion of `BAD` status responses to errors
32   * simple quoting of imap folder names
33   * debug logging to console/file
34   * memory of which folder is currently selected
35   * function to copy/move an email from one folder to another
36   * deal with some of the variability of returned values from `fetch` command
37   * created email message objects from byte data
38
39 Copyright: Intra2net AG
40
41
42 INTERFACE
43 ------------------------------------------------------
44
45 """
46
47 import imaplib
48 import logging
49 import time
50 from email import policy
51 from email import message_from_bytes
52
53 log = logging.getLogger('pyi2ncommon.imap_mailbox')
54
55
56 class ImapError(Exception):
57     """Exception raised when some imap command fails."""
58
59     def __init__(self, command, status, message):
60         """Create a new ImapError for given command and response."""
61         super(ImapError, self).__init__('IMAP command {!r} returned {}: {}'
62                                         .format(command, status, message))
63         self.command = command
64         self.status = status
65         self.message = message
66
67
68 def quote_imap_folder(folder):
69     """
70     Quote name of a folder if required by IMAP.
71
72     Folder names must be quoted if they contain e.g. spaces. This
73     implementation is incomplete, but should suffice for current use.
74
75     :param str folder: Name of a folder in an IMAP mailbox, possibly quoted
76     :returns: same folder name, possibly with added quotes
77     :rtype: bytes
78     """
79     if not folder:
80         return folder
81     if folder[0] == '"' and folder[-1] == '"':
82         return folder    # is already quoted
83     if any(special in folder for special in ' (){}"\\[]'):
84         # see RFC3501 $5.1 and $9
85         return '"' + folder + '"'   # quote the folder
86     return folder.encode('ascii', 'strict')
87
88
89 class ImapMailbox(imaplib.IMAP4):
90     """
91     Convenience to access and query imap mailbox.
92
93     Raises :py:class:`ImapError` if commands fail, instead of returning 2-tuple
94     (status, data) with status being "OK" or "BAD" or "NO" or so.
95
96     Use as ContextManager::
97
98         with ImapMailbox(username, password) as mbox:
99             mbox.list_all_mail('INBOX')
100             # do something
101
102     Python's imaplib really offers little comfort for users, so we have to deal
103     on our own with:
104     - unsolicited responses
105     - converting BAD response to error
106     - encoding to/from bytes
107     - parsing output like b'[1 2 3]'
108     - quoting of folder names (e.g. add "" around folders with spaces)
109     - ...
110     Before adding too much capability here, consider using libraries that do
111     all that already like ImapClient (which unfortunately does not deal with
112     unsolicited reponses)...
113
114     Cannot deal with unicode folder names.
115     """
116
117     def __init__(self, user=None, password=None, host='localhost',
118                  debug_log=False):
119         """
120         Create a new connection to an imap mailbox.
121
122         User and password can be given either here or to method
123         :py:meth:`login`.
124
125         :param str user: User name for imap mailbox, optional
126         :param str password: Password for logging into mailbox
127         :param str host: Host name, defaults to localhost
128         :param debug_log: Either bool to enable/disable debug logging or a file
129                           name to log to that file
130         :type debug_log: bool or str
131         """
132         super(ImapMailbox, self).__init__(host)
133         self.host = host
134         self.user = user
135         self.password = password
136         self._selected = None
137         self._debug_log_handle = None
138         if debug_log:
139             self.debug = 4   # enable debugging in super class
140             # overwrite prviate debug log function of base class with own.
141             # this is slightly hacky since depends on private implementation
142             if isinstance(debug_log, str):
143                 self._debug_log_handle = open(debug_log, 'at')
144                 self._mesg = self._debug_log_to_file    # logs to file
145             else:
146                 # uses logging instead of print; ignore optional secs argument
147                 self._mesg = lambda message, _=None: \
148                     log.debug('IMAP: ' + message)
149             self._mesg('Debug logging enabled in {}'.format(self))
150
151     def __str__(self):
152         """Create a textual representation of this object."""
153         if self.user is None:
154             return '[ImapMailbox on {}]'.format(self.host)
155         return '[ImapMailbox for {}@{}]'.format(self.user, self.host)
156
157     def __enter__(self):
158         """
159         Called upon entering context manager. Logs in.
160
161         Requires user and password be given to constructor already.
162         """
163         self.login()
164         return self
165
166     def __exit__(self, *args):
167         """Called upon leaving context manager. Logs out. Ignores `args`"""
168         try:
169             self.logout()
170         except OSError:
171             pass
172
173     def _debug_log_to_file(self, message, secs=None):
174         """
175         Redirect debug message to file instead of stderr.
176
177         :param str message: Debug message
178         :param int secs: Timestamp of message, defaults to :py:func:`time.time`
179         """
180         if secs is None:
181             secs = time.time()
182         timestamp = time.strftime('%H:%M:%S', time.localtime(secs))
183         prefix = '%s.%03d: ' % (timestamp, (secs*1000)%1000)
184         for line in message.splitlines():
185             self._debug_log_handle.write(prefix + line)
186         if not message.endswith('\n'):
187             self._debug_log_handle.write('\n')
188
189     def login(self, user=None, password=None):
190         """
191         Log in to mailbox.
192
193         :param str user: User name to use for login. Overrides the one given in
194                          constructor. Must be given if not given to constructor
195         :param str password: Password for login. Same restrictions as user
196         :returns: first return item from imap login.
197         """
198         if user is not None:
199             self.user = user
200         if password is not None:
201             self.password = password
202         if self.user is None:
203             raise ValueError('User must be given to either ImapMailbox '
204                              'constructor or login')
205         if self.password is None:
206             raise ValueError('Password must be given to either ImapMailbox '
207                              'constructor or login')
208         typ, data = super(ImapMailbox, self).login(self.user, self.password)
209         if typ != 'OK':
210             raise ImapError('login', typ, data)
211         self._select('INBOX')
212         return data[0]
213
214     def logout(self):
215         """Log out of mailbox. Returns data returned by imap logout."""
216         typ, data = super(ImapMailbox, self).logout()
217         if self._debug_log_handle:
218             self._debug_log_handle.close()
219         if typ != 'BYE':
220             raise ImapError('logout', typ, data)
221         self._selected = None
222         return data
223
224     def _select(self, folder):
225         """Internal helper that remembers what folder is selected."""
226         if not folder:
227             return
228         if folder == self._selected:
229             return
230         # select clears all unsolicited responses, so no use to clear them now
231         typ, data = self.select(quote_imap_folder(folder))
232         if typ != 'OK':
233             raise ImapError('select', typ, data)
234         self._selected = folder
235
236     def _clear_unsolicited_responses(self, cmd):
237         """
238         Clear mailbox's memory of unsolicited responses for given cmd.
239
240         Next call to `cmd` should only give the expected responses.
241
242         :param str cmd: next command to run on mailbox.
243         """
244         self.response(cmd)
245
246     def list_all_mail(self, folder=None):
247         """List all message ids in given folder."""
248         self._select(folder)
249         self._clear_unsolicited_responses('SEARCH')
250         typ, data = super(ImapMailbox, self).search(None, 'ALL')
251         if typ != 'OK':
252             raise ImapError('search', typ, data)
253         if len(data) != 1:
254             raise ImapError('search', typ,
255                             'Data has not len 1 but {}'.format(len(data)))
256         return tuple(int(message_id) for message_id in data[0].split())
257
258     def copy_message(self, from_folder, message_id, to_folder, del_orig=False):
259         """
260         Copy message with given id from from_folder to to_folder.
261
262         :param str from_folder: Folder containing the message
263         :param message_id: ID of message in `from_folder` that will be copied
264         :type message_id: int or str
265         :param str to_folder: Target folder
266         :param bool del_orig: Delete original message, making this "copy" a
267                               "move" operation
268         """
269         self._select(from_folder)
270         self._clear_unsolicited_responses('COPY')
271         typ, data = super(ImapMailbox, self).copy(str(message_id),
272                                                   quote_imap_folder(to_folder))
273         if typ != 'OK':
274             raise ImapError('copy', typ, data)
275         if del_orig:
276             self._clear_unsolicited_responses('FETCH')
277             typ, data = self.store(message_id, '+FLAGS', r'(\Deleted)')
278             if typ != 'OK':
279                 raise ImapError('store', typ, data)
280             self._clear_unsolicited_responses('EXPUNGE')
281             typ, _ = self.expunge()
282             if typ != 'OK':
283                 raise ImapError('expunge', typ, data)
284
285     def fetch_mail(self, message_id, folder=None, part='RFC822'):
286         """
287         Fetch and return an email in RFC822 format.
288
289         .. seealso:: :py:meth:`fetch_message`
290
291         :param int message_id: Message id in given folder
292         :param folder: Folder that contains the message or None (default) if
293                        folder is already selected.
294         :type folder: str or None
295         :param str part: Message part to fetch. For simplicity of return args,
296                          only one part can be fetched at a time. Other possible
297                          values: ``UID``, ``BODY[TEXT]``, ...
298         :returns: requested message part
299         :rtype: bytes
300         """
301         self._select(folder)
302         self._clear_unsolicited_responses('SEARCH')
303         typ, data = super(ImapMailbox, self).fetch(str(message_id), part)
304         if typ != 'OK':
305             raise ImapError('fetch', typ, data)
306
307         # have to be flexible with returned data here...
308         # should be a list of len 1 of 2-tuples, but reality differs
309         if len(data) == 2 and data[1] == b')':
310             data = data[:1]
311         if len(data) != 1:
312             raise ImapError('fetch', typ,
313                             'Data has not len 1 but {}'.format(len(data)))
314         data = data[0]
315         if part.lower() == 'uid' and isinstance(data, bytes):
316             return data[0]
317         if len(data) != 2:
318             raise ImapError('fetch', typ,
319                             'Data[0] has len {} != 2'.format(len(data)))
320         return data[1]
321
322     def fetch_message(self, message_id, folder=None):
323         """
324         Fetch complete message and convert to message object.
325
326         All params are forwarded to :py:meth:`fetch_mail`. Afterwards, the
327         resulting data is parsed into a :py:class:`email.message.EmailMessage`
328         object (for python versions < 3.4: :py:class:`email.message.Message`).
329         """
330         return message_from_bytes(
331             self.fetch_mail(message_id, folder, part='RFC822'),
332             policy=policy.default + policy.strict)