Add unit testing for the mailbox acls parsing functionality
[imap-restore-mail] / mail_iterator.py
CommitLineData
67e2ec02
PD
1'''
2restore-mail-inject.py - Tool to inject mails via IMAP
3
4Copyright (c) 2012 Intra2net AG
5'''
6
f797b0fd 7import sys
4a1a0b03 8import socket, imaplib
67e2ec02 9import re
f797b0fd 10import logging
67e2ec02
PD
11
12MAILBOX_RESP = re.compile(r'\((?P<flags>.*?)\) "(?P<delimiter>.*)" (?P<name>.*)')
13UIDVAL_RESP = re.compile(r'(?P<name>.*) \(UIDVALIDITY (?P<uidval>.*)\)')
4a1a0b03 14ACLS_RESP = re.compile(b'(?P<user>.*) (?P<acls>.*)')
67e2ec02
PD
15
16class MailIterator:
17 """This class communicates with the e-mail server."""
18
19 # class attributes
20 # IMAP4_SSL for connection with an IMAP server
21 mail_con = None
22 # list of tuples (uidvalidity, mailboxname) for the retrieved mailboxes
23 mailboxes = None
24 # logged in status
25 logged_in = None
26
0cf4dc33 27 def __init__(self, username, server = "intranator.m.i2n"):
67e2ec02 28 """Creates a connection and a user session."""
9ce1038d 29
b2bbd1f5 30 # connect to server
67e2ec02 31 try:
0cf4dc33 32 self.mail_con = imaplib.IMAP4(server)
4a1a0b03
PD
33 # MODIFIED
34 imap_socket = socket.socket(socket.AF_UNIX)
0cf4dc33 35 imap_socket.connect("/var/imap/socket/imap")
4a1a0b03
PD
36 self.mail_con.sock = imap_socket
37 self.mail_con.file = self.mail_con.sock.makefile('rb')
38f15e57
PD
38 logging.info("Connected to mail server %s", server)
39 except (socket.error, self.mail_con.error) as ex:
40 logging.error("Could not connect to host: %s", ex)
f797b0fd 41 sys.exit()
b2bbd1f5
PD
42
43 # log in
44 try:
7aad8dab 45 self.mail_con.login("cyrus", "geheim")
b2bbd1f5 46 self.logged_in = True
7aad8dab 47 #self.mail_con.proxyauth(username)
38f15e57 48 logging.info("Logged in as %s.", username)
f797b0fd 49 except self.mail_con.error as ex:
67e2ec02 50 self.logged_in = False
38f15e57 51 logging.error("Could not log in as user %s: %s", username, ex)
f797b0fd 52 sys.exit()
9ce1038d 53
b2bbd1f5
PD
54 # list mailboxes
55 try:
56 _result, mailboxes = self.mail_con.list()
f797b0fd 57 except self.mail_con.error as ex:
38f15e57 58 logging.warning("Could not retrieve mailboxes for user %s: %s", username, ex)
67e2ec02
PD
59 self.mailboxes = []
60 for mailbox in mailboxes:
61 mailbox = MAILBOX_RESP.match(mailbox.decode('iso-8859-1')).groups()
62 self.mailboxes.append(mailbox)
63 self.mailboxes = sorted(self.mailboxes, key=lambda box: box[2], reverse=True)
9ce1038d 64
67e2ec02
PD
65 return
66
67 def __del__(self):
68 """Closes the connection and the user session."""
0cf4dc33
PD
69
70 if self.logged_in:
71 self.mail_con.logout()
67e2ec02
PD
72
73 def clear_inbox_acls(self, user):
b2bbd1f5 74 """Resets the inbox acls for a given user."""
0cf4dc33 75
67e2ec02 76 try:
b2bbd1f5 77 _result, inbox_acls = self.mail_con.getacl("INBOX")
f797b0fd 78 except self.mail_con.error as ex:
38f15e57 79 logging.warning("Could not get the acls of INBOX: %s", ex)
f797b0fd 80 return
9ce1038d 81 inbox_acls = ACLS_RESP.findall(inbox_acls[0][6:])
38f15e57 82 logging.debug("Retrieved acls from INBOX are %s", inbox_acls)
9ce1038d
PD
83 for acl_ref in inbox_acls:
84 if acl_ref[0] != user:
67e2ec02 85 try:
f797b0fd 86 self.mail_con.deleteacl("INBOX", acl_ref[0])
38f15e57 87 logging.debug("Reset acls on INBOX for user %s", acl_ref[0].decode('iso-8859-1'))
f797b0fd 88 except self.mail_con.error as ex:
38f15e57 89 logging.warning("Could not reset acls on INBOX for user %s: %s", acl_ref[0], ex)
0cf4dc33 90
67e2ec02
PD
91 return
92
e42bd6a5 93 def add_acls(self, mailbox, mailbox_list, original_user, target_user):
67e2ec02 94 """Add acls to mailbox."""
b2bbd1f5 95
e42bd6a5
PD
96 # change encoding to internal cyrus format and make folder absolute
97 mailbox = mailbox.replace("INBOX/", "user/" + original_user + "/")
67e2ec02
PD
98 mailbox = mailbox.replace(".", "^")
99 mailbox = mailbox.replace("/", ".")
b2bbd1f5 100
b0169e56
PD
101 # find folder to set all acls
102 try:
103 mbox_acls = mailbox_list[mailbox]
104 except KeyError:
105 # no rights for the mailbox were found
106 return
107 for acl_user in mbox_acls:
b0169e56
PD
108 if acl_user != target_user and acl_user != original_user:
109 try:
b2bbd1f5 110 self.mail_con.setacl(mailbox, acl_user, mbox_acls[acl_user])
38f15e57 111 logging.debug("Set acls %s for user %s on mailbox %s", mbox_acls[acl_user], acl_user, mailbox)
f797b0fd 112 except self.mail_con.error as ex:
38f15e57 113 logging.warning("Could not set acls %s for user %s on mailbox %s: %s", mbox_acls[acl_user], acl_user, mailbox, ex)
b2bbd1f5 114
67e2ec02
PD
115 return
116
117 def delete_mailboxes(self, deleted_mailbox):
118 """Delete specified mailbox or empty inbox."""
0cf4dc33 119
67e2ec02
PD
120 for mailbox in self.mailboxes:
121 pattern = '^\"?' + deleted_mailbox
122 # if INBOX it cannot be deleted so add delimiter
123 if (deleted_mailbox == "INBOX"):
124 pattern += mailbox[1]
e42bd6a5 125 if re.compile(pattern).match(mailbox[2]):
67e2ec02 126 result, data = self.mail_con.delete(mailbox[2])
e42bd6a5 127 if result == "OK":
38f15e57 128 logging.debug("Deleted mailbox %s", mailbox[2])
67e2ec02 129 else:
0cf4dc33
PD
130 logging.warning("Could not delete mailbox %s: %s", mailbox[2], data[0])
131
67e2ec02
PD
132 return
133
134 def create_mailbox(self, mailbox):
0cf4dc33 135
67e2ec02 136 """Create new mailbox to inject messages."""
e42bd6a5 137 if mailbox != "INBOX":
67e2ec02 138 result, data = self.mail_con.create(mailbox)
e42bd6a5 139 if result == "OK":
38f15e57 140 logging.debug("Creating mailbox %s", mailbox)
67e2ec02 141 else:
38f15e57 142 logging.warning("Could not create mailbox %s: %s", mailbox, data[0])
0cf4dc33 143
67e2ec02
PD
144 return
145
146 def inject_message(self, message, mailbox, internal_date):
0cf4dc33 147
67e2ec02 148 """Inject a message into a mailbox."""
7aad8dab 149 result, data = self.mail_con.append(mailbox, "\\Seen", internal_date, message.encode())
b2bbd1f5 150 if result == "OK":
38f15e57 151 logging.debug("Appending message to mailbox %s", mailbox)
b2bbd1f5 152 else:
38f15e57 153 logging.warning("Could not append the e-mail %s: %s", message, data[0])
0cf4dc33 154
67e2ec02 155 return