Remove api doc headers
[pyi2ncommon] / src / imap_mailbox.py
CommitLineData
4bcc6621
CH
1# The software in this package is distributed under the GNU General
2# Public License version 2 (with a special exception described below).
3#
4# A copy of GNU General Public License (GPL) is included in this distribution,
5# in the file COPYING.GPL.
6#
7# As a special exception, if other files instantiate templates or use macros
8# or inline functions from this file, or you compile this file and link it
9# with other works to produce a work based on this file, this file
10# does not by itself cause the resulting work to be covered
11# by the GNU General Public License.
12#
13# However the source code for this file must still be made available
14# in accordance with section (3) of the GNU General Public License.
15#
16# This exception does not invalidate any other reasons why a work based
17# on this file might be covered by the GNU General Public License.
18#
19# Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com>
20
21"""
4bcc6621
CH
22Connection to an IMAP mailbox
23
24Convenience wrapper around :py:class:`imaplib.IMAP4`
25
26Features provided in addition to :py:class:`imaplib.IMAP4`::
27 * dealing with unsolicited responses (missing in most other imap libs)
28 * conversion of `BAD` status responses to errors
29 * simple quoting of imap folder names
30 * debug logging to console/file
31 * memory of which folder is currently selected
32 * function to copy/move an email from one folder to another
6535c3f0
CH
33 * deal with some of the variability of returned values from `fetch` command
34 * created email message objects from byte data
4bcc6621 35
1ddcd5fe
CH
36TODO:
37 * use SSL connection per default, unencrypted only as option
38
4bcc6621 39Copyright: Intra2net AG
4bcc6621
CH
40"""
41
42import imaplib
43import logging
44import time
6535c3f0
CH
45from email import policy
46from email import message_from_bytes
4bcc6621
CH
47
48log = logging.getLogger('pyi2ncommon.imap_mailbox')
49
50
51class ImapError(Exception):
52 """Exception raised when some imap command fails."""
53
54 def __init__(self, command, status, message):
55 """Create a new ImapError for given command and response."""
56 super(ImapError, self).__init__('IMAP command {!r} returned {}: {}'
57 .format(command, status, message))
58 self.command = command
59 self.status = status
60 self.message = message
61
62
63def quote_imap_folder(folder):
64 """
65 Quote name of a folder if required by IMAP.
66
67 Folder names must be quoted if they contain e.g. spaces. This
68 implementation is incomplete, but should suffice for current use.
69
70 :param str folder: Name of a folder in an IMAP mailbox, possibly quoted
7628bc48 71 :returns: same folder name as bytes, possibly with added quotes
6535c3f0 72 :rtype: bytes
4bcc6621
CH
73 """
74 if not folder:
7628bc48 75 return b''
4bcc6621 76 if folder[0] == '"' and folder[-1] == '"':
7628bc48 77 return folder.encode('ascii', 'strict') # is already quoted
4bcc6621
CH
78 if any(special in folder for special in ' (){}"\\[]'):
79 # see RFC3501 $5.1 and $9
7628bc48 80 return f'"{folder}"'.encode('ascii', 'strict') # quote the folder
4bcc6621
CH
81 return folder.encode('ascii', 'strict')
82
83
84class ImapMailbox(imaplib.IMAP4):
85 """
86 Convenience to access and query imap mailbox.
87
88 Raises :py:class:`ImapError` if commands fail, instead of returning 2-tuple
89 (status, data) with status being "OK" or "BAD" or "NO" or so.
90
91 Use as ContextManager::
92
93 with ImapMailbox(username, password) as mbox:
94 mbox.list_all_mail('INBOX')
95 # do something
96
97 Python's imaplib really offers little comfort for users, so we have to deal
98 on our own with:
99 - unsolicited responses
100 - converting BAD response to error
101 - encoding to/from bytes
102 - parsing output like b'[1 2 3]'
103 - quoting of folder names (e.g. add "" around folders with spaces)
104 - ...
105 Before adding too much capability here, consider using libraries that do
106 all that already like ImapClient (which unfortunately does not deal with
7628bc48 107 unsolicited responses)...
4bcc6621
CH
108
109 Cannot deal with unicode folder names.
110 """
111
112 def __init__(self, user=None, password=None, host='localhost',
113 debug_log=False):
114 """
115 Create a new connection to an imap mailbox.
116
117 User and password can be given either here or to method
118 :py:meth:`login`.
119
7628bc48 120 :param str user: Username for imap mailbox, optional
4bcc6621
CH
121 :param str password: Password for logging into mailbox
122 :param str host: Host name, defaults to localhost
123 :param debug_log: Either bool to enable/disable debug logging or a file
124 name to log to that file
125 :type debug_log: bool or str
126 """
127 super(ImapMailbox, self).__init__(host)
128 self.host = host
129 self.user = user
130 self.password = password
131 self._selected = None
132 self._debug_log_handle = None
133 if debug_log:
134 self.debug = 4 # enable debugging in super class
7628bc48 135 # overwrite private debug log function of base class with own.
4bcc6621
CH
136 # this is slightly hacky since depends on private implementation
137 if isinstance(debug_log, str):
138 self._debug_log_handle = open(debug_log, 'at')
139 self._mesg = self._debug_log_to_file # logs to file
140 else:
141 # uses logging instead of print; ignore optional secs argument
142 self._mesg = lambda message, _=None: \
143 log.debug('IMAP: ' + message)
144 self._mesg('Debug logging enabled in {}'.format(self))
145
146 def __str__(self):
147 """Create a textual representation of this object."""
148 if self.user is None:
149 return '[ImapMailbox on {}]'.format(self.host)
150 return '[ImapMailbox for {}@{}]'.format(self.user, self.host)
151
152 def __enter__(self):
153 """
154 Called upon entering context manager. Logs in.
155
156 Requires user and password be given to constructor already.
157 """
158 self.login()
159 return self
160
161 def __exit__(self, *args):
162 """Called upon leaving context manager. Logs out. Ignores `args`"""
163 try:
164 self.logout()
165 except OSError:
166 pass
167
168 def _debug_log_to_file(self, message, secs=None):
169 """
170 Redirect debug message to file instead of stderr.
171
172 :param str message: Debug message
173 :param int secs: Timestamp of message, defaults to :py:func:`time.time`
174 """
175 if secs is None:
176 secs = time.time()
177 timestamp = time.strftime('%H:%M:%S', time.localtime(secs))
178 prefix = '%s.%03d: ' % (timestamp, (secs*1000)%1000)
179 for line in message.splitlines():
180 self._debug_log_handle.write(prefix + line)
181 if not message.endswith('\n'):
182 self._debug_log_handle.write('\n')
183
184 def login(self, user=None, password=None):
185 """
186 Log in to mailbox.
187
7628bc48 188 :param str user: Username to use for login. Overrides the one given in
4bcc6621
CH
189 constructor. Must be given if not given to constructor
190 :param str password: Password for login. Same restrictions as user
6535c3f0 191 :returns: first return item from imap login.
4bcc6621
CH
192 """
193 if user is not None:
194 self.user = user
195 if password is not None:
196 self.password = password
197 if self.user is None:
198 raise ValueError('User must be given to either ImapMailbox '
199 'constructor or login')
200 if self.password is None:
201 raise ValueError('Password must be given to either ImapMailbox '
202 'constructor or login')
203 typ, data = super(ImapMailbox, self).login(self.user, self.password)
204 if typ != 'OK':
205 raise ImapError('login', typ, data)
206 self._select('INBOX')
6535c3f0 207 return data[0]
4bcc6621
CH
208
209 def logout(self):
210 """Log out of mailbox. Returns data returned by imap logout."""
211 typ, data = super(ImapMailbox, self).logout()
212 if self._debug_log_handle:
213 self._debug_log_handle.close()
214 if typ != 'BYE':
215 raise ImapError('logout', typ, data)
216 self._selected = None
217 return data
218
219 def _select(self, folder):
220 """Internal helper that remembers what folder is selected."""
221 if not folder:
222 return
223 if folder == self._selected:
224 return
225 # select clears all unsolicited responses, so no use to clear them now
226 typ, data = self.select(quote_imap_folder(folder))
227 if typ != 'OK':
228 raise ImapError('select', typ, data)
229 self._selected = folder
230
231 def _clear_unsolicited_responses(self, cmd):
232 """
233 Clear mailbox's memory of unsolicited responses for given cmd.
234
235 Next call to `cmd` should only give the expected responses.
236
237 :param str cmd: next command to run on mailbox.
238 """
239 self.response(cmd)
240
241 def list_all_mail(self, folder=None):
242 """List all message ids in given folder."""
243 self._select(folder)
244 self._clear_unsolicited_responses('SEARCH')
245 typ, data = super(ImapMailbox, self).search(None, 'ALL')
246 if typ != 'OK':
247 raise ImapError('search', typ, data)
248 if len(data) != 1:
249 raise ImapError('search', typ,
250 'Data has not len 1 but {}'.format(len(data)))
251 return tuple(int(message_id) for message_id in data[0].split())
252
253 def copy_message(self, from_folder, message_id, to_folder, del_orig=False):
254 """
255 Copy message with given id from from_folder to to_folder.
256
257 :param str from_folder: Folder containing the message
258 :param message_id: ID of message in `from_folder` that will be copied
259 :type message_id: int or str
260 :param str to_folder: Target folder
261 :param bool del_orig: Delete original message, making this "copy" a
262 "move" operation
263 """
264 self._select(from_folder)
265 self._clear_unsolicited_responses('COPY')
266 typ, data = super(ImapMailbox, self).copy(str(message_id),
267 quote_imap_folder(to_folder))
268 if typ != 'OK':
269 raise ImapError('copy', typ, data)
270 if del_orig:
271 self._clear_unsolicited_responses('FETCH')
272 typ, data = self.store(message_id, '+FLAGS', r'(\Deleted)')
273 if typ != 'OK':
274 raise ImapError('store', typ, data)
275 self._clear_unsolicited_responses('EXPUNGE')
276 typ, _ = self.expunge()
277 if typ != 'OK':
278 raise ImapError('expunge', typ, data)
6535c3f0
CH
279
280 def fetch_mail(self, message_id, folder=None, part='RFC822'):
281 """
282 Fetch and return an email in RFC822 format.
283
284 .. seealso:: :py:meth:`fetch_message`
285
286 :param int message_id: Message id in given folder
287 :param folder: Folder that contains the message or None (default) if
288 folder is already selected.
289 :type folder: str or None
290 :param str part: Message part to fetch. For simplicity of return args,
291 only one part can be fetched at a time. Other possible
292 values: ``UID``, ``BODY[TEXT]``, ...
293 :returns: requested message part
294 :rtype: bytes
295 """
296 self._select(folder)
1ddcd5fe 297 self._clear_unsolicited_responses('FETCH')
6535c3f0
CH
298 typ, data = super(ImapMailbox, self).fetch(str(message_id), part)
299 if typ != 'OK':
300 raise ImapError('fetch', typ, data)
301
302 # have to be flexible with returned data here...
303 # should be a list of len 1 of 2-tuples, but reality differs
304 if len(data) == 2 and data[1] == b')':
305 data = data[:1]
306 if len(data) != 1:
307 raise ImapError('fetch', typ,
308 'Data has not len 1 but {}'.format(len(data)))
309 data = data[0]
310 if part.lower() == 'uid' and isinstance(data, bytes):
311 return data[0]
312 if len(data) != 2:
313 raise ImapError('fetch', typ,
314 'Data[0] has len {} != 2'.format(len(data)))
315 return data[1]
316
317 def fetch_message(self, message_id, folder=None):
318 """
319 Fetch complete message and convert to message object.
320
321 All params are forwarded to :py:meth:`fetch_mail`. Afterwards, the
322 resulting data is parsed into a :py:class:`email.message.EmailMessage`
323 object (for python versions < 3.4: :py:class:`email.message.Message`).
324 """
325 return message_from_bytes(
326 self.fetch_mail(message_id, folder, part='RFC822'),
327 policy=policy.default + policy.strict)