Clean up, remove compat with py < 3.6
[pyi2ncommon] / src / imap_mailbox.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
20
21 """
22
23 SUMMARY
24 ------------------------------------------------------
25 Connection to an IMAP mailbox
26
27 Convenience wrapper around :py:class:`imaplib.IMAP4`
28
29 Features provided in addition to :py:class:`imaplib.IMAP4`::
30   * dealing with unsolicited responses (missing in most other imap libs)
31   * conversion of `BAD` status responses to errors
32   * simple quoting of imap folder names
33   * debug logging to console/file
34   * memory of which folder is currently selected
35   * function to copy/move an email from one folder to another
36   * deal with some of the variability of returned values from `fetch` command
37   * created email message objects from byte data
38
39 TODO:
40   * use SSL connection per default, unencrypted only as option
41
42 Copyright: Intra2net AG
43
44
45 INTERFACE
46 ------------------------------------------------------
47
48 """
49
50 import imaplib
51 import logging
52 import time
53 from email import policy
54 from email import message_from_bytes
55
56 log = logging.getLogger('pyi2ncommon.imap_mailbox')
57
58
59 class ImapError(Exception):
60     """Exception raised when some imap command fails."""
61
62     def __init__(self, command, status, message):
63         """Create a new ImapError for given command and response."""
64         super(ImapError, self).__init__('IMAP command {!r} returned {}: {}'
65                                         .format(command, status, message))
66         self.command = command
67         self.status = status
68         self.message = message
69
70
71 def quote_imap_folder(folder):
72     """
73     Quote name of a folder if required by IMAP.
74
75     Folder names must be quoted if they contain e.g. spaces. This
76     implementation is incomplete, but should suffice for current use.
77
78     :param str folder: Name of a folder in an IMAP mailbox, possibly quoted
79     :returns: same folder name as bytes, possibly with added quotes
80     :rtype: bytes
81     """
82     if not folder:
83         return b''
84     if folder[0] == '"' and folder[-1] == '"':
85         return folder.encode('ascii', 'strict')    # is already quoted
86     if any(special in folder for special in ' (){}"\\[]'):
87         # see RFC3501 $5.1 and $9
88         return f'"{folder}"'.encode('ascii', 'strict')   # quote the folder
89     return folder.encode('ascii', 'strict')
90
91
92 class ImapMailbox(imaplib.IMAP4):
93     """
94     Convenience to access and query imap mailbox.
95
96     Raises :py:class:`ImapError` if commands fail, instead of returning 2-tuple
97     (status, data) with status being "OK" or "BAD" or "NO" or so.
98
99     Use as ContextManager::
100
101         with ImapMailbox(username, password) as mbox:
102             mbox.list_all_mail('INBOX')
103             # do something
104
105     Python's imaplib really offers little comfort for users, so we have to deal
106     on our own with:
107     - unsolicited responses
108     - converting BAD response to error
109     - encoding to/from bytes
110     - parsing output like b'[1 2 3]'
111     - quoting of folder names (e.g. add "" around folders with spaces)
112     - ...
113     Before adding too much capability here, consider using libraries that do
114     all that already like ImapClient (which unfortunately does not deal with
115     unsolicited responses)...
116
117     Cannot deal with unicode folder names.
118     """
119
120     def __init__(self, user=None, password=None, host='localhost',
121                  debug_log=False):
122         """
123         Create a new connection to an imap mailbox.
124
125         User and password can be given either here or to method
126         :py:meth:`login`.
127
128         :param str user: Username for imap mailbox, optional
129         :param str password: Password for logging into mailbox
130         :param str host: Host name, defaults to localhost
131         :param debug_log: Either bool to enable/disable debug logging or a file
132                           name to log to that file
133         :type debug_log: bool or str
134         """
135         super(ImapMailbox, self).__init__(host)
136         self.host = host
137         self.user = user
138         self.password = password
139         self._selected = None
140         self._debug_log_handle = None
141         if debug_log:
142             self.debug = 4   # enable debugging in super class
143             # overwrite private debug log function of base class with own.
144             # this is slightly hacky since depends on private implementation
145             if isinstance(debug_log, str):
146                 self._debug_log_handle = open(debug_log, 'at')
147                 self._mesg = self._debug_log_to_file    # logs to file
148             else:
149                 # uses logging instead of print; ignore optional secs argument
150                 self._mesg = lambda message, _=None: \
151                     log.debug('IMAP: ' + message)
152             self._mesg('Debug logging enabled in {}'.format(self))
153
154     def __str__(self):
155         """Create a textual representation of this object."""
156         if self.user is None:
157             return '[ImapMailbox on {}]'.format(self.host)
158         return '[ImapMailbox for {}@{}]'.format(self.user, self.host)
159
160     def __enter__(self):
161         """
162         Called upon entering context manager. Logs in.
163
164         Requires user and password be given to constructor already.
165         """
166         self.login()
167         return self
168
169     def __exit__(self, *args):
170         """Called upon leaving context manager. Logs out. Ignores `args`"""
171         try:
172             self.logout()
173         except OSError:
174             pass
175
176     def _debug_log_to_file(self, message, secs=None):
177         """
178         Redirect debug message to file instead of stderr.
179
180         :param str message: Debug message
181         :param int secs: Timestamp of message, defaults to :py:func:`time.time`
182         """
183         if secs is None:
184             secs = time.time()
185         timestamp = time.strftime('%H:%M:%S', time.localtime(secs))
186         prefix = '%s.%03d: ' % (timestamp, (secs*1000)%1000)
187         for line in message.splitlines():
188             self._debug_log_handle.write(prefix + line)
189         if not message.endswith('\n'):
190             self._debug_log_handle.write('\n')
191
192     def login(self, user=None, password=None):
193         """
194         Log in to mailbox.
195
196         :param str user: Username to use for login. Overrides the one given in
197                          constructor. Must be given if not given to constructor
198         :param str password: Password for login. Same restrictions as user
199         :returns: first return item from imap login.
200         """
201         if user is not None:
202             self.user = user
203         if password is not None:
204             self.password = password
205         if self.user is None:
206             raise ValueError('User must be given to either ImapMailbox '
207                              'constructor or login')
208         if self.password is None:
209             raise ValueError('Password must be given to either ImapMailbox '
210                              'constructor or login')
211         typ, data = super(ImapMailbox, self).login(self.user, self.password)
212         if typ != 'OK':
213             raise ImapError('login', typ, data)
214         self._select('INBOX')
215         return data[0]
216
217     def logout(self):
218         """Log out of mailbox. Returns data returned by imap logout."""
219         typ, data = super(ImapMailbox, self).logout()
220         if self._debug_log_handle:
221             self._debug_log_handle.close()
222         if typ != 'BYE':
223             raise ImapError('logout', typ, data)
224         self._selected = None
225         return data
226
227     def _select(self, folder):
228         """Internal helper that remembers what folder is selected."""
229         if not folder:
230             return
231         if folder == self._selected:
232             return
233         # select clears all unsolicited responses, so no use to clear them now
234         typ, data = self.select(quote_imap_folder(folder))
235         if typ != 'OK':
236             raise ImapError('select', typ, data)
237         self._selected = folder
238
239     def _clear_unsolicited_responses(self, cmd):
240         """
241         Clear mailbox's memory of unsolicited responses for given cmd.
242
243         Next call to `cmd` should only give the expected responses.
244
245         :param str cmd: next command to run on mailbox.
246         """
247         self.response(cmd)
248
249     def list_all_mail(self, folder=None):
250         """List all message ids in given folder."""
251         self._select(folder)
252         self._clear_unsolicited_responses('SEARCH')
253         typ, data = super(ImapMailbox, self).search(None, 'ALL')
254         if typ != 'OK':
255             raise ImapError('search', typ, data)
256         if len(data) != 1:
257             raise ImapError('search', typ,
258                             'Data has not len 1 but {}'.format(len(data)))
259         return tuple(int(message_id) for message_id in data[0].split())
260
261     def copy_message(self, from_folder, message_id, to_folder, del_orig=False):
262         """
263         Copy message with given id from from_folder to to_folder.
264
265         :param str from_folder: Folder containing the message
266         :param message_id: ID of message in `from_folder` that will be copied
267         :type message_id: int or str
268         :param str to_folder: Target folder
269         :param bool del_orig: Delete original message, making this "copy" a
270                               "move" operation
271         """
272         self._select(from_folder)
273         self._clear_unsolicited_responses('COPY')
274         typ, data = super(ImapMailbox, self).copy(str(message_id),
275                                                   quote_imap_folder(to_folder))
276         if typ != 'OK':
277             raise ImapError('copy', typ, data)
278         if del_orig:
279             self._clear_unsolicited_responses('FETCH')
280             typ, data = self.store(message_id, '+FLAGS', r'(\Deleted)')
281             if typ != 'OK':
282                 raise ImapError('store', typ, data)
283             self._clear_unsolicited_responses('EXPUNGE')
284             typ, _ = self.expunge()
285             if typ != 'OK':
286                 raise ImapError('expunge', typ, data)
287
288     def fetch_mail(self, message_id, folder=None, part='RFC822'):
289         """
290         Fetch and return an email in RFC822 format.
291
292         .. seealso:: :py:meth:`fetch_message`
293
294         :param int message_id: Message id in given folder
295         :param folder: Folder that contains the message or None (default) if
296                        folder is already selected.
297         :type folder: str or None
298         :param str part: Message part to fetch. For simplicity of return args,
299                          only one part can be fetched at a time. Other possible
300                          values: ``UID``, ``BODY[TEXT]``, ...
301         :returns: requested message part
302         :rtype: bytes
303         """
304         self._select(folder)
305         self._clear_unsolicited_responses('FETCH')
306         typ, data = super(ImapMailbox, self).fetch(str(message_id), part)
307         if typ != 'OK':
308             raise ImapError('fetch', typ, data)
309
310         # have to be flexible with returned data here...
311         # should be a list of len 1 of 2-tuples, but reality differs
312         if len(data) == 2 and data[1] == b')':
313             data = data[:1]
314         if len(data) != 1:
315             raise ImapError('fetch', typ,
316                             'Data has not len 1 but {}'.format(len(data)))
317         data = data[0]
318         if part.lower() == 'uid' and isinstance(data, bytes):
319             return data[0]
320         if len(data) != 2:
321             raise ImapError('fetch', typ,
322                             'Data[0] has len {} != 2'.format(len(data)))
323         return data[1]
324
325     def fetch_message(self, message_id, folder=None):
326         """
327         Fetch complete message and convert to message object.
328
329         All params are forwarded to :py:meth:`fetch_mail`. Afterwards, the
330         resulting data is parsed into a :py:class:`email.message.EmailMessage`
331         object (for python versions < 3.4: :py:class:`email.message.Message`).
332         """
333         return message_from_bytes(
334             self.fetch_mail(message_id, folder, part='RFC822'),
335             policy=policy.default + policy.strict)