Commit | Line | Data |
---|---|---|
4bcc6621 CH |
1 | # The software in this package is distributed under the GNU General |
2 | # Public License version 2 (with a special exception described below). | |
3 | # | |
4 | # A copy of GNU General Public License (GPL) is included in this distribution, | |
5 | # in the file COPYING.GPL. | |
6 | # | |
7 | # As a special exception, if other files instantiate templates or use macros | |
8 | # or inline functions from this file, or you compile this file and link it | |
9 | # with other works to produce a work based on this file, this file | |
10 | # does not by itself cause the resulting work to be covered | |
11 | # by the GNU General Public License. | |
12 | # | |
13 | # However the source code for this file must still be made available | |
14 | # in accordance with section (3) of the GNU General Public License. | |
15 | # | |
16 | # This exception does not invalidate any other reasons why a work based | |
17 | # on this file might be covered by the GNU General Public License. | |
18 | # | |
19 | # Copyright (c) 2016-2018 Intra2net AG <info@intra2net.com> | |
20 | ||
21 | """ | |
4bcc6621 CH |
22 | Connection to an IMAP mailbox |
23 | ||
24 | Convenience wrapper around :py:class:`imaplib.IMAP4` | |
25 | ||
26 | Features provided in addition to :py:class:`imaplib.IMAP4`:: | |
27 | * dealing with unsolicited responses (missing in most other imap libs) | |
28 | * conversion of `BAD` status responses to errors | |
29 | * simple quoting of imap folder names | |
30 | * debug logging to console/file | |
31 | * memory of which folder is currently selected | |
32 | * function to copy/move an email from one folder to another | |
6535c3f0 CH |
33 | * deal with some of the variability of returned values from `fetch` command |
34 | * created email message objects from byte data | |
4bcc6621 | 35 | |
1ddcd5fe CH |
36 | TODO: |
37 | * use SSL connection per default, unencrypted only as option | |
38 | ||
4bcc6621 | 39 | Copyright: Intra2net AG |
4bcc6621 CH |
40 | """ |
41 | ||
42 | import imaplib | |
43 | import logging | |
44 | import time | |
6535c3f0 CH |
45 | from email import policy |
46 | from email import message_from_bytes | |
4bcc6621 CH |
47 | |
48 | log = logging.getLogger('pyi2ncommon.imap_mailbox') | |
49 | ||
50 | ||
51 | class ImapError(Exception): | |
52 | """Exception raised when some imap command fails.""" | |
53 | ||
54 | def __init__(self, command, status, message): | |
55 | """Create a new ImapError for given command and response.""" | |
56 | super(ImapError, self).__init__('IMAP command {!r} returned {}: {}' | |
57 | .format(command, status, message)) | |
58 | self.command = command | |
59 | self.status = status | |
60 | self.message = message | |
61 | ||
62 | ||
63 | def quote_imap_folder(folder): | |
64 | """ | |
65 | Quote name of a folder if required by IMAP. | |
66 | ||
67 | Folder names must be quoted if they contain e.g. spaces. This | |
68 | implementation is incomplete, but should suffice for current use. | |
69 | ||
70 | :param str folder: Name of a folder in an IMAP mailbox, possibly quoted | |
7628bc48 | 71 | :returns: same folder name as bytes, possibly with added quotes |
6535c3f0 | 72 | :rtype: bytes |
4bcc6621 CH |
73 | """ |
74 | if not folder: | |
7628bc48 | 75 | return b'' |
4bcc6621 | 76 | if folder[0] == '"' and folder[-1] == '"': |
7628bc48 | 77 | return folder.encode('ascii', 'strict') # is already quoted |
4bcc6621 CH |
78 | if any(special in folder for special in ' (){}"\\[]'): |
79 | # see RFC3501 $5.1 and $9 | |
7628bc48 | 80 | return f'"{folder}"'.encode('ascii', 'strict') # quote the folder |
4bcc6621 CH |
81 | return folder.encode('ascii', 'strict') |
82 | ||
83 | ||
84 | class ImapMailbox(imaplib.IMAP4): | |
85 | """ | |
86 | Convenience to access and query imap mailbox. | |
87 | ||
88 | Raises :py:class:`ImapError` if commands fail, instead of returning 2-tuple | |
89 | (status, data) with status being "OK" or "BAD" or "NO" or so. | |
90 | ||
91 | Use as ContextManager:: | |
92 | ||
93 | with ImapMailbox(username, password) as mbox: | |
94 | mbox.list_all_mail('INBOX') | |
95 | # do something | |
96 | ||
97 | Python's imaplib really offers little comfort for users, so we have to deal | |
98 | on our own with: | |
99 | - unsolicited responses | |
100 | - converting BAD response to error | |
101 | - encoding to/from bytes | |
102 | - parsing output like b'[1 2 3]' | |
103 | - quoting of folder names (e.g. add "" around folders with spaces) | |
104 | - ... | |
105 | Before adding too much capability here, consider using libraries that do | |
106 | all that already like ImapClient (which unfortunately does not deal with | |
7628bc48 | 107 | unsolicited responses)... |
4bcc6621 CH |
108 | |
109 | Cannot deal with unicode folder names. | |
110 | """ | |
111 | ||
112 | def __init__(self, user=None, password=None, host='localhost', | |
113 | debug_log=False): | |
114 | """ | |
115 | Create a new connection to an imap mailbox. | |
116 | ||
117 | User and password can be given either here or to method | |
118 | :py:meth:`login`. | |
119 | ||
7628bc48 | 120 | :param str user: Username for imap mailbox, optional |
4bcc6621 CH |
121 | :param str password: Password for logging into mailbox |
122 | :param str host: Host name, defaults to localhost | |
123 | :param debug_log: Either bool to enable/disable debug logging or a file | |
124 | name to log to that file | |
125 | :type debug_log: bool or str | |
126 | """ | |
127 | super(ImapMailbox, self).__init__(host) | |
128 | self.host = host | |
129 | self.user = user | |
130 | self.password = password | |
131 | self._selected = None | |
132 | self._debug_log_handle = None | |
133 | if debug_log: | |
134 | self.debug = 4 # enable debugging in super class | |
7628bc48 | 135 | # overwrite private debug log function of base class with own. |
4bcc6621 CH |
136 | # this is slightly hacky since depends on private implementation |
137 | if isinstance(debug_log, str): | |
138 | self._debug_log_handle = open(debug_log, 'at') | |
139 | self._mesg = self._debug_log_to_file # logs to file | |
140 | else: | |
141 | # uses logging instead of print; ignore optional secs argument | |
142 | self._mesg = lambda message, _=None: \ | |
143 | log.debug('IMAP: ' + message) | |
144 | self._mesg('Debug logging enabled in {}'.format(self)) | |
145 | ||
146 | def __str__(self): | |
147 | """Create a textual representation of this object.""" | |
148 | if self.user is None: | |
149 | return '[ImapMailbox on {}]'.format(self.host) | |
150 | return '[ImapMailbox for {}@{}]'.format(self.user, self.host) | |
151 | ||
152 | def __enter__(self): | |
153 | """ | |
154 | Called upon entering context manager. Logs in. | |
155 | ||
156 | Requires user and password be given to constructor already. | |
157 | """ | |
158 | self.login() | |
159 | return self | |
160 | ||
161 | def __exit__(self, *args): | |
162 | """Called upon leaving context manager. Logs out. Ignores `args`""" | |
163 | try: | |
164 | self.logout() | |
165 | except OSError: | |
166 | pass | |
167 | ||
168 | def _debug_log_to_file(self, message, secs=None): | |
169 | """ | |
170 | Redirect debug message to file instead of stderr. | |
171 | ||
172 | :param str message: Debug message | |
173 | :param int secs: Timestamp of message, defaults to :py:func:`time.time` | |
174 | """ | |
175 | if secs is None: | |
176 | secs = time.time() | |
177 | timestamp = time.strftime('%H:%M:%S', time.localtime(secs)) | |
178 | prefix = '%s.%03d: ' % (timestamp, (secs*1000)%1000) | |
179 | for line in message.splitlines(): | |
180 | self._debug_log_handle.write(prefix + line) | |
181 | if not message.endswith('\n'): | |
182 | self._debug_log_handle.write('\n') | |
183 | ||
184 | def login(self, user=None, password=None): | |
185 | """ | |
186 | Log in to mailbox. | |
187 | ||
7628bc48 | 188 | :param str user: Username to use for login. Overrides the one given in |
4bcc6621 CH |
189 | constructor. Must be given if not given to constructor |
190 | :param str password: Password for login. Same restrictions as user | |
6535c3f0 | 191 | :returns: first return item from imap login. |
4bcc6621 CH |
192 | """ |
193 | if user is not None: | |
194 | self.user = user | |
195 | if password is not None: | |
196 | self.password = password | |
197 | if self.user is None: | |
198 | raise ValueError('User must be given to either ImapMailbox ' | |
199 | 'constructor or login') | |
200 | if self.password is None: | |
201 | raise ValueError('Password must be given to either ImapMailbox ' | |
202 | 'constructor or login') | |
203 | typ, data = super(ImapMailbox, self).login(self.user, self.password) | |
204 | if typ != 'OK': | |
205 | raise ImapError('login', typ, data) | |
206 | self._select('INBOX') | |
6535c3f0 | 207 | return data[0] |
4bcc6621 CH |
208 | |
209 | def logout(self): | |
210 | """Log out of mailbox. Returns data returned by imap logout.""" | |
211 | typ, data = super(ImapMailbox, self).logout() | |
212 | if self._debug_log_handle: | |
213 | self._debug_log_handle.close() | |
214 | if typ != 'BYE': | |
215 | raise ImapError('logout', typ, data) | |
216 | self._selected = None | |
217 | return data | |
218 | ||
219 | def _select(self, folder): | |
220 | """Internal helper that remembers what folder is selected.""" | |
221 | if not folder: | |
222 | return | |
223 | if folder == self._selected: | |
224 | return | |
225 | # select clears all unsolicited responses, so no use to clear them now | |
226 | typ, data = self.select(quote_imap_folder(folder)) | |
227 | if typ != 'OK': | |
228 | raise ImapError('select', typ, data) | |
229 | self._selected = folder | |
230 | ||
231 | def _clear_unsolicited_responses(self, cmd): | |
232 | """ | |
233 | Clear mailbox's memory of unsolicited responses for given cmd. | |
234 | ||
235 | Next call to `cmd` should only give the expected responses. | |
236 | ||
237 | :param str cmd: next command to run on mailbox. | |
238 | """ | |
239 | self.response(cmd) | |
240 | ||
241 | def list_all_mail(self, folder=None): | |
242 | """List all message ids in given folder.""" | |
243 | self._select(folder) | |
244 | self._clear_unsolicited_responses('SEARCH') | |
245 | typ, data = super(ImapMailbox, self).search(None, 'ALL') | |
246 | if typ != 'OK': | |
247 | raise ImapError('search', typ, data) | |
248 | if len(data) != 1: | |
249 | raise ImapError('search', typ, | |
250 | 'Data has not len 1 but {}'.format(len(data))) | |
251 | return tuple(int(message_id) for message_id in data[0].split()) | |
252 | ||
253 | def copy_message(self, from_folder, message_id, to_folder, del_orig=False): | |
254 | """ | |
255 | Copy message with given id from from_folder to to_folder. | |
256 | ||
257 | :param str from_folder: Folder containing the message | |
258 | :param message_id: ID of message in `from_folder` that will be copied | |
259 | :type message_id: int or str | |
260 | :param str to_folder: Target folder | |
261 | :param bool del_orig: Delete original message, making this "copy" a | |
262 | "move" operation | |
263 | """ | |
264 | self._select(from_folder) | |
265 | self._clear_unsolicited_responses('COPY') | |
266 | typ, data = super(ImapMailbox, self).copy(str(message_id), | |
267 | quote_imap_folder(to_folder)) | |
268 | if typ != 'OK': | |
269 | raise ImapError('copy', typ, data) | |
270 | if del_orig: | |
271 | self._clear_unsolicited_responses('FETCH') | |
272 | typ, data = self.store(message_id, '+FLAGS', r'(\Deleted)') | |
273 | if typ != 'OK': | |
274 | raise ImapError('store', typ, data) | |
275 | self._clear_unsolicited_responses('EXPUNGE') | |
276 | typ, _ = self.expunge() | |
277 | if typ != 'OK': | |
278 | raise ImapError('expunge', typ, data) | |
6535c3f0 CH |
279 | |
280 | def fetch_mail(self, message_id, folder=None, part='RFC822'): | |
281 | """ | |
282 | Fetch and return an email in RFC822 format. | |
283 | ||
284 | .. seealso:: :py:meth:`fetch_message` | |
285 | ||
286 | :param int message_id: Message id in given folder | |
287 | :param folder: Folder that contains the message or None (default) if | |
288 | folder is already selected. | |
289 | :type folder: str or None | |
290 | :param str part: Message part to fetch. For simplicity of return args, | |
291 | only one part can be fetched at a time. Other possible | |
292 | values: ``UID``, ``BODY[TEXT]``, ... | |
293 | :returns: requested message part | |
294 | :rtype: bytes | |
295 | """ | |
296 | self._select(folder) | |
1ddcd5fe | 297 | self._clear_unsolicited_responses('FETCH') |
6535c3f0 CH |
298 | typ, data = super(ImapMailbox, self).fetch(str(message_id), part) |
299 | if typ != 'OK': | |
300 | raise ImapError('fetch', typ, data) | |
301 | ||
302 | # have to be flexible with returned data here... | |
303 | # should be a list of len 1 of 2-tuples, but reality differs | |
304 | if len(data) == 2 and data[1] == b')': | |
305 | data = data[:1] | |
306 | if len(data) != 1: | |
307 | raise ImapError('fetch', typ, | |
308 | 'Data has not len 1 but {}'.format(len(data))) | |
309 | data = data[0] | |
310 | if part.lower() == 'uid' and isinstance(data, bytes): | |
311 | return data[0] | |
312 | if len(data) != 2: | |
313 | raise ImapError('fetch', typ, | |
314 | 'Data[0] has len {} != 2'.format(len(data))) | |
315 | return data[1] | |
316 | ||
317 | def fetch_message(self, message_id, folder=None): | |
318 | """ | |
319 | Fetch complete message and convert to message object. | |
320 | ||
321 | All params are forwarded to :py:meth:`fetch_mail`. Afterwards, the | |
322 | resulting data is parsed into a :py:class:`email.message.EmailMessage` | |
323 | object (for python versions < 3.4: :py:class:`email.message.Message`). | |
324 | """ | |
325 | return message_from_bytes( | |
326 | self.fetch_mail(message_id, folder, part='RFC822'), | |
327 | policy=policy.default + policy.strict) |