Remove api doc headers
[pyi2ncommon] / src / imap_mailbox.py
1 # The software in this package is distributed under the GNU General
2 # Public License version 2 (with a special exception described below).
3 #
4 # A copy of GNU General Public License (GPL) is included in this distribution,
5 # in the file COPYING.GPL.
6 #
7 # As a special exception, if other files instantiate templates or use macros
8 # or inline functions from this file, or you compile this file and link it
9 # with other works to produce a work based on this file, this file
10 # does not by itself cause the resulting work to be covered
11 # by the GNU General Public License.
12 #
13 # However the source code for this file must still be made available
14 # in accordance with section (3) of the GNU General Public License.
15 #
16 # This exception does not invalidate any other reasons why a work based
17 # on this file might be covered by the GNU General Public License.
18 #
19 # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
20
21 """
22 Connection to an IMAP mailbox
23
24 Convenience wrapper around :py:class:`imaplib.IMAP4`
25
26 Features provided in addition to :py:class:`imaplib.IMAP4`::
27   * dealing with unsolicited responses (missing in most other imap libs)
28   * conversion of `BAD` status responses to errors
29   * simple quoting of imap folder names
30   * debug logging to console/file
31   * memory of which folder is currently selected
32   * function to copy/move an email from one folder to another
33   * deal with some of the variability of returned values from `fetch` command
34   * created email message objects from byte data
35
36 TODO:
37   * use SSL connection per default, unencrypted only as option
38
39 Copyright: Intra2net AG
40 """
41
42 import imaplib
43 import logging
44 import time
45 from email import policy
46 from email import message_from_bytes
47
48 log = logging.getLogger('pyi2ncommon.imap_mailbox')
49
50
51 class ImapError(Exception):
52     """Exception raised when some imap command fails."""
53
54     def __init__(self, command, status, message):
55         """Create a new ImapError for given command and response."""
56         super(ImapError, self).__init__('IMAP command {!r} returned {}: {}'
57                                         .format(command, status, message))
58         self.command = command
59         self.status = status
60         self.message = message
61
62
63 def quote_imap_folder(folder):
64     """
65     Quote name of a folder if required by IMAP.
66
67     Folder names must be quoted if they contain e.g. spaces. This
68     implementation is incomplete, but should suffice for current use.
69
70     :param str folder: Name of a folder in an IMAP mailbox, possibly quoted
71     :returns: same folder name as bytes, possibly with added quotes
72     :rtype: bytes
73     """
74     if not folder:
75         return b''
76     if folder[0] == '"' and folder[-1] == '"':
77         return folder.encode('ascii', 'strict')    # is already quoted
78     if any(special in folder for special in ' (){}"\\[]'):
79         # see RFC3501 $5.1 and $9
80         return f'"{folder}"'.encode('ascii', 'strict')   # quote the folder
81     return folder.encode('ascii', 'strict')
82
83
84 class ImapMailbox(imaplib.IMAP4):
85     """
86     Convenience to access and query imap mailbox.
87
88     Raises :py:class:`ImapError` if commands fail, instead of returning 2-tuple
89     (status, data) with status being "OK" or "BAD" or "NO" or so.
90
91     Use as ContextManager::
92
93         with ImapMailbox(username, password) as mbox:
94             mbox.list_all_mail('INBOX')
95             # do something
96
97     Python's imaplib really offers little comfort for users, so we have to deal
98     on our own with:
99     - unsolicited responses
100     - converting BAD response to error
101     - encoding to/from bytes
102     - parsing output like b'[1 2 3]'
103     - quoting of folder names (e.g. add "" around folders with spaces)
104     - ...
105     Before adding too much capability here, consider using libraries that do
106     all that already like ImapClient (which unfortunately does not deal with
107     unsolicited responses)...
108
109     Cannot deal with unicode folder names.
110     """
111
112     def __init__(self, user=None, password=None, host='localhost',
113                  debug_log=False):
114         """
115         Create a new connection to an imap mailbox.
116
117         User and password can be given either here or to method
118         :py:meth:`login`.
119
120         :param str user: Username for imap mailbox, optional
121         :param str password: Password for logging into mailbox
122         :param str host: Host name, defaults to localhost
123         :param debug_log: Either bool to enable/disable debug logging or a file
124                           name to log to that file
125         :type debug_log: bool or str
126         """
127         super(ImapMailbox, self).__init__(host)
128         self.host = host
129         self.user = user
130         self.password = password
131         self._selected = None
132         self._debug_log_handle = None
133         if debug_log:
134             self.debug = 4   # enable debugging in super class
135             # overwrite private debug log function of base class with own.
136             # this is slightly hacky since depends on private implementation
137             if isinstance(debug_log, str):
138                 self._debug_log_handle = open(debug_log, 'at')
139                 self._mesg = self._debug_log_to_file    # logs to file
140             else:
141                 # uses logging instead of print; ignore optional secs argument
142                 self._mesg = lambda message, _=None: \
143                     log.debug('IMAP: ' + message)
144             self._mesg('Debug logging enabled in {}'.format(self))
145
146     def __str__(self):
147         """Create a textual representation of this object."""
148         if self.user is None:
149             return '[ImapMailbox on {}]'.format(self.host)
150         return '[ImapMailbox for {}@{}]'.format(self.user, self.host)
151
152     def __enter__(self):
153         """
154         Called upon entering context manager. Logs in.
155
156         Requires user and password be given to constructor already.
157         """
158         self.login()
159         return self
160
161     def __exit__(self, *args):
162         """Called upon leaving context manager. Logs out. Ignores `args`"""
163         try:
164             self.logout()
165         except OSError:
166             pass
167
168     def _debug_log_to_file(self, message, secs=None):
169         """
170         Redirect debug message to file instead of stderr.
171
172         :param str message: Debug message
173         :param int secs: Timestamp of message, defaults to :py:func:`time.time`
174         """
175         if secs is None:
176             secs = time.time()
177         timestamp = time.strftime('%H:%M:%S', time.localtime(secs))
178         prefix = '%s.%03d: ' % (timestamp, (secs*1000)%1000)
179         for line in message.splitlines():
180             self._debug_log_handle.write(prefix + line)
181         if not message.endswith('\n'):
182             self._debug_log_handle.write('\n')
183
184     def login(self, user=None, password=None):
185         """
186         Log in to mailbox.
187
188         :param str user: Username to use for login. Overrides the one given in
189                          constructor. Must be given if not given to constructor
190         :param str password: Password for login. Same restrictions as user
191         :returns: first return item from imap login.
192         """
193         if user is not None:
194             self.user = user
195         if password is not None:
196             self.password = password
197         if self.user is None:
198             raise ValueError('User must be given to either ImapMailbox '
199                              'constructor or login')
200         if self.password is None:
201             raise ValueError('Password must be given to either ImapMailbox '
202                              'constructor or login')
203         typ, data = super(ImapMailbox, self).login(self.user, self.password)
204         if typ != 'OK':
205             raise ImapError('login', typ, data)
206         self._select('INBOX')
207         return data[0]
208
209     def logout(self):
210         """Log out of mailbox. Returns data returned by imap logout."""
211         typ, data = super(ImapMailbox, self).logout()
212         if self._debug_log_handle:
213             self._debug_log_handle.close()
214         if typ != 'BYE':
215             raise ImapError('logout', typ, data)
216         self._selected = None
217         return data
218
219     def _select(self, folder):
220         """Internal helper that remembers what folder is selected."""
221         if not folder:
222             return
223         if folder == self._selected:
224             return
225         # select clears all unsolicited responses, so no use to clear them now
226         typ, data = self.select(quote_imap_folder(folder))
227         if typ != 'OK':
228             raise ImapError('select', typ, data)
229         self._selected = folder
230
231     def _clear_unsolicited_responses(self, cmd):
232         """
233         Clear mailbox's memory of unsolicited responses for given cmd.
234
235         Next call to `cmd` should only give the expected responses.
236
237         :param str cmd: next command to run on mailbox.
238         """
239         self.response(cmd)
240
241     def list_all_mail(self, folder=None):
242         """List all message ids in given folder."""
243         self._select(folder)
244         self._clear_unsolicited_responses('SEARCH')
245         typ, data = super(ImapMailbox, self).search(None, 'ALL')
246         if typ != 'OK':
247             raise ImapError('search', typ, data)
248         if len(data) != 1:
249             raise ImapError('search', typ,
250                             'Data has not len 1 but {}'.format(len(data)))
251         return tuple(int(message_id) for message_id in data[0].split())
252
253     def copy_message(self, from_folder, message_id, to_folder, del_orig=False):
254         """
255         Copy message with given id from from_folder to to_folder.
256
257         :param str from_folder: Folder containing the message
258         :param message_id: ID of message in `from_folder` that will be copied
259         :type message_id: int or str
260         :param str to_folder: Target folder
261         :param bool del_orig: Delete original message, making this "copy" a
262                               "move" operation
263         """
264         self._select(from_folder)
265         self._clear_unsolicited_responses('COPY')
266         typ, data = super(ImapMailbox, self).copy(str(message_id),
267                                                   quote_imap_folder(to_folder))
268         if typ != 'OK':
269             raise ImapError('copy', typ, data)
270         if del_orig:
271             self._clear_unsolicited_responses('FETCH')
272             typ, data = self.store(message_id, '+FLAGS', r'(\Deleted)')
273             if typ != 'OK':
274                 raise ImapError('store', typ, data)
275             self._clear_unsolicited_responses('EXPUNGE')
276             typ, _ = self.expunge()
277             if typ != 'OK':
278                 raise ImapError('expunge', typ, data)
279
280     def fetch_mail(self, message_id, folder=None, part='RFC822'):
281         """
282         Fetch and return an email in RFC822 format.
283
284         .. seealso:: :py:meth:`fetch_message`
285
286         :param int message_id: Message id in given folder
287         :param folder: Folder that contains the message or None (default) if
288                        folder is already selected.
289         :type folder: str or None
290         :param str part: Message part to fetch. For simplicity of return args,
291                          only one part can be fetched at a time. Other possible
292                          values: ``UID``, ``BODY[TEXT]``, ...
293         :returns: requested message part
294         :rtype: bytes
295         """
296         self._select(folder)
297         self._clear_unsolicited_responses('FETCH')
298         typ, data = super(ImapMailbox, self).fetch(str(message_id), part)
299         if typ != 'OK':
300             raise ImapError('fetch', typ, data)
301
302         # have to be flexible with returned data here...
303         # should be a list of len 1 of 2-tuples, but reality differs
304         if len(data) == 2 and data[1] == b')':
305             data = data[:1]
306         if len(data) != 1:
307             raise ImapError('fetch', typ,
308                             'Data has not len 1 but {}'.format(len(data)))
309         data = data[0]
310         if part.lower() == 'uid' and isinstance(data, bytes):
311             return data[0]
312         if len(data) != 2:
313             raise ImapError('fetch', typ,
314                             'Data[0] has len {} != 2'.format(len(data)))
315         return data[1]
316
317     def fetch_message(self, message_id, folder=None):
318         """
319         Fetch complete message and convert to message object.
320
321         All params are forwarded to :py:meth:`fetch_mail`. Afterwards, the
322         resulting data is parsed into a :py:class:`email.message.EmailMessage`
323         object (for python versions < 3.4: :py:class:`email.message.Message`).
324         """
325         return message_from_bytes(
326             self.fetch_mail(message_id, folder, part='RFC822'),
327             policy=policy.default + policy.strict)