Clean up, remove compat with py < 3.6
[pyi2ncommon] / src / imap_mailbox.py
CommitLineData
4bcc6621
CH
1# The software in this package is distributed under the GNU General
2# Public License version 2 (with a special exception described below).
3#
4# A copy of GNU General Public License (GPL) is included in this distribution,
5# in the file COPYING.GPL.
6#
7# As a special exception, if other files instantiate templates or use macros
8# or inline functions from this file, or you compile this file and link it
9# with other works to produce a work based on this file, this file
10# does not by itself cause the resulting work to be covered
11# by the GNU General Public License.
12#
13# However the source code for this file must still be made available
14# in accordance with section (3) of the GNU General Public License.
15#
16# This exception does not invalidate any other reasons why a work based
17# on this file might be covered by the GNU General Public License.
18#
19# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
20
21"""
22
23SUMMARY
24------------------------------------------------------
25Connection to an IMAP mailbox
26
27Convenience wrapper around :py:class:`imaplib.IMAP4`
28
29Features provided in addition to :py:class:`imaplib.IMAP4`::
30 * dealing with unsolicited responses (missing in most other imap libs)
31 * conversion of `BAD` status responses to errors
32 * simple quoting of imap folder names
33 * debug logging to console/file
34 * memory of which folder is currently selected
35 * function to copy/move an email from one folder to another
6535c3f0
CH
36 * deal with some of the variability of returned values from `fetch` command
37 * created email message objects from byte data
4bcc6621 38
1ddcd5fe
CH
39TODO:
40 * use SSL connection per default, unencrypted only as option
41
4bcc6621
CH
42Copyright: Intra2net AG
43
44
45INTERFACE
46------------------------------------------------------
47
48"""
49
50import imaplib
51import logging
52import time
6535c3f0
CH
53from email import policy
54from email import message_from_bytes
4bcc6621
CH
55
56log = logging.getLogger('pyi2ncommon.imap_mailbox')
57
58
59class ImapError(Exception):
60 """Exception raised when some imap command fails."""
61
62 def __init__(self, command, status, message):
63 """Create a new ImapError for given command and response."""
64 super(ImapError, self).__init__('IMAP command {!r} returned {}: {}'
65 .format(command, status, message))
66 self.command = command
67 self.status = status
68 self.message = message
69
70
71def quote_imap_folder(folder):
72 """
73 Quote name of a folder if required by IMAP.
74
75 Folder names must be quoted if they contain e.g. spaces. This
76 implementation is incomplete, but should suffice for current use.
77
78 :param str folder: Name of a folder in an IMAP mailbox, possibly quoted
7628bc48 79 :returns: same folder name as bytes, possibly with added quotes
6535c3f0 80 :rtype: bytes
4bcc6621
CH
81 """
82 if not folder:
7628bc48 83 return b''
4bcc6621 84 if folder[0] == '"' and folder[-1] == '"':
7628bc48 85 return folder.encode('ascii', 'strict') # is already quoted
4bcc6621
CH
86 if any(special in folder for special in ' (){}"\\[]'):
87 # see RFC3501 $5.1 and $9
7628bc48 88 return f'"{folder}"'.encode('ascii', 'strict') # quote the folder
4bcc6621
CH
89 return folder.encode('ascii', 'strict')
90
91
92class ImapMailbox(imaplib.IMAP4):
93 """
94 Convenience to access and query imap mailbox.
95
96 Raises :py:class:`ImapError` if commands fail, instead of returning 2-tuple
97 (status, data) with status being "OK" or "BAD" or "NO" or so.
98
99 Use as ContextManager::
100
101 with ImapMailbox(username, password) as mbox:
102 mbox.list_all_mail('INBOX')
103 # do something
104
105 Python's imaplib really offers little comfort for users, so we have to deal
106 on our own with:
107 - unsolicited responses
108 - converting BAD response to error
109 - encoding to/from bytes
110 - parsing output like b'[1 2 3]'
111 - quoting of folder names (e.g. add "" around folders with spaces)
112 - ...
113 Before adding too much capability here, consider using libraries that do
114 all that already like ImapClient (which unfortunately does not deal with
7628bc48 115 unsolicited responses)...
4bcc6621
CH
116
117 Cannot deal with unicode folder names.
118 """
119
120 def __init__(self, user=None, password=None, host='localhost',
121 debug_log=False):
122 """
123 Create a new connection to an imap mailbox.
124
125 User and password can be given either here or to method
126 :py:meth:`login`.
127
7628bc48 128 :param str user: Username for imap mailbox, optional
4bcc6621
CH
129 :param str password: Password for logging into mailbox
130 :param str host: Host name, defaults to localhost
131 :param debug_log: Either bool to enable/disable debug logging or a file
132 name to log to that file
133 :type debug_log: bool or str
134 """
135 super(ImapMailbox, self).__init__(host)
136 self.host = host
137 self.user = user
138 self.password = password
139 self._selected = None
140 self._debug_log_handle = None
141 if debug_log:
142 self.debug = 4 # enable debugging in super class
7628bc48 143 # overwrite private debug log function of base class with own.
4bcc6621
CH
144 # this is slightly hacky since depends on private implementation
145 if isinstance(debug_log, str):
146 self._debug_log_handle = open(debug_log, 'at')
147 self._mesg = self._debug_log_to_file # logs to file
148 else:
149 # uses logging instead of print; ignore optional secs argument
150 self._mesg = lambda message, _=None: \
151 log.debug('IMAP: ' + message)
152 self._mesg('Debug logging enabled in {}'.format(self))
153
154 def __str__(self):
155 """Create a textual representation of this object."""
156 if self.user is None:
157 return '[ImapMailbox on {}]'.format(self.host)
158 return '[ImapMailbox for {}@{}]'.format(self.user, self.host)
159
160 def __enter__(self):
161 """
162 Called upon entering context manager. Logs in.
163
164 Requires user and password be given to constructor already.
165 """
166 self.login()
167 return self
168
169 def __exit__(self, *args):
170 """Called upon leaving context manager. Logs out. Ignores `args`"""
171 try:
172 self.logout()
173 except OSError:
174 pass
175
176 def _debug_log_to_file(self, message, secs=None):
177 """
178 Redirect debug message to file instead of stderr.
179
180 :param str message: Debug message
181 :param int secs: Timestamp of message, defaults to :py:func:`time.time`
182 """
183 if secs is None:
184 secs = time.time()
185 timestamp = time.strftime('%H:%M:%S', time.localtime(secs))
186 prefix = '%s.%03d: ' % (timestamp, (secs*1000)%1000)
187 for line in message.splitlines():
188 self._debug_log_handle.write(prefix + line)
189 if not message.endswith('\n'):
190 self._debug_log_handle.write('\n')
191
192 def login(self, user=None, password=None):
193 """
194 Log in to mailbox.
195
7628bc48 196 :param str user: Username to use for login. Overrides the one given in
4bcc6621
CH
197 constructor. Must be given if not given to constructor
198 :param str password: Password for login. Same restrictions as user
6535c3f0 199 :returns: first return item from imap login.
4bcc6621
CH
200 """
201 if user is not None:
202 self.user = user
203 if password is not None:
204 self.password = password
205 if self.user is None:
206 raise ValueError('User must be given to either ImapMailbox '
207 'constructor or login')
208 if self.password is None:
209 raise ValueError('Password must be given to either ImapMailbox '
210 'constructor or login')
211 typ, data = super(ImapMailbox, self).login(self.user, self.password)
212 if typ != 'OK':
213 raise ImapError('login', typ, data)
214 self._select('INBOX')
6535c3f0 215 return data[0]
4bcc6621
CH
216
217 def logout(self):
218 """Log out of mailbox. Returns data returned by imap logout."""
219 typ, data = super(ImapMailbox, self).logout()
220 if self._debug_log_handle:
221 self._debug_log_handle.close()
222 if typ != 'BYE':
223 raise ImapError('logout', typ, data)
224 self._selected = None
225 return data
226
227 def _select(self, folder):
228 """Internal helper that remembers what folder is selected."""
229 if not folder:
230 return
231 if folder == self._selected:
232 return
233 # select clears all unsolicited responses, so no use to clear them now
234 typ, data = self.select(quote_imap_folder(folder))
235 if typ != 'OK':
236 raise ImapError('select', typ, data)
237 self._selected = folder
238
239 def _clear_unsolicited_responses(self, cmd):
240 """
241 Clear mailbox's memory of unsolicited responses for given cmd.
242
243 Next call to `cmd` should only give the expected responses.
244
245 :param str cmd: next command to run on mailbox.
246 """
247 self.response(cmd)
248
249 def list_all_mail(self, folder=None):
250 """List all message ids in given folder."""
251 self._select(folder)
252 self._clear_unsolicited_responses('SEARCH')
253 typ, data = super(ImapMailbox, self).search(None, 'ALL')
254 if typ != 'OK':
255 raise ImapError('search', typ, data)
256 if len(data) != 1:
257 raise ImapError('search', typ,
258 'Data has not len 1 but {}'.format(len(data)))
259 return tuple(int(message_id) for message_id in data[0].split())
260
261 def copy_message(self, from_folder, message_id, to_folder, del_orig=False):
262 """
263 Copy message with given id from from_folder to to_folder.
264
265 :param str from_folder: Folder containing the message
266 :param message_id: ID of message in `from_folder` that will be copied
267 :type message_id: int or str
268 :param str to_folder: Target folder
269 :param bool del_orig: Delete original message, making this "copy" a
270 "move" operation
271 """
272 self._select(from_folder)
273 self._clear_unsolicited_responses('COPY')
274 typ, data = super(ImapMailbox, self).copy(str(message_id),
275 quote_imap_folder(to_folder))
276 if typ != 'OK':
277 raise ImapError('copy', typ, data)
278 if del_orig:
279 self._clear_unsolicited_responses('FETCH')
280 typ, data = self.store(message_id, '+FLAGS', r'(\Deleted)')
281 if typ != 'OK':
282 raise ImapError('store', typ, data)
283 self._clear_unsolicited_responses('EXPUNGE')
284 typ, _ = self.expunge()
285 if typ != 'OK':
286 raise ImapError('expunge', typ, data)
6535c3f0
CH
287
288 def fetch_mail(self, message_id, folder=None, part='RFC822'):
289 """
290 Fetch and return an email in RFC822 format.
291
292 .. seealso:: :py:meth:`fetch_message`
293
294 :param int message_id: Message id in given folder
295 :param folder: Folder that contains the message or None (default) if
296 folder is already selected.
297 :type folder: str or None
298 :param str part: Message part to fetch. For simplicity of return args,
299 only one part can be fetched at a time. Other possible
300 values: ``UID``, ``BODY[TEXT]``, ...
301 :returns: requested message part
302 :rtype: bytes
303 """
304 self._select(folder)
1ddcd5fe 305 self._clear_unsolicited_responses('FETCH')
6535c3f0
CH
306 typ, data = super(ImapMailbox, self).fetch(str(message_id), part)
307 if typ != 'OK':
308 raise ImapError('fetch', typ, data)
309
310 # have to be flexible with returned data here...
311 # should be a list of len 1 of 2-tuples, but reality differs
312 if len(data) == 2 and data[1] == b')':
313 data = data[:1]
314 if len(data) != 1:
315 raise ImapError('fetch', typ,
316 'Data has not len 1 but {}'.format(len(data)))
317 data = data[0]
318 if part.lower() == 'uid' and isinstance(data, bytes):
319 return data[0]
320 if len(data) != 2:
321 raise ImapError('fetch', typ,
322 'Data[0] has len {} != 2'.format(len(data)))
323 return data[1]
324
325 def fetch_message(self, message_id, folder=None):
326 """
327 Fetch complete message and convert to message object.
328
329 All params are forwarded to :py:meth:`fetch_mail`. Afterwards, the
330 resulting data is parsed into a :py:class:`email.message.EmailMessage`
331 object (for python versions < 3.4: :py:class:`email.message.Message`).
332 """
333 return message_from_bytes(
334 self.fetch_mail(message_id, folder, part='RFC822'),
335 policy=policy.default + policy.strict)