''' restore-mail-inject.py - Tool to inject mails via IMAP Copyright (c) 2012 Intra2net AG ''' import sys import socket, imaplib import re import logging MAILBOX_RESP = re.compile(r'\((?P.*?)\) "(?P.*)" (?P.*)') UIDVAL_RESP = re.compile(r'(?P.*) \(UIDVALIDITY (?P.*)\)') ACLS_RESP = re.compile(b'(?P.*) (?P.*)') class MailIterator: """This class communicates with the e-mail server.""" # class attributes # IMAP4_SSL for connection with an IMAP server mail_con = None # list of tuples (uidvalidity, mailboxname) for the retrieved mailboxes mailboxes = None # logged in status logged_in = None def __init__(self, username): """Creates a connection and a user session.""" # connect to server try: self.mail_con = imaplib.IMAP4("intranator.m.i2n") # MODIFIED imap_socket = socket.socket(socket.AF_UNIX) imap_socket.connect("/var/imap/socket/imap") self.mail_con.sock = imap_socket self.mail_con.file = self.mail_con.sock.makefile('rb') logging.info("Connected to mail server.") except (self.mail_con.error, socket.error) as ex: logging.error("Could not connect to host: %s" % (ex)) sys.exit() # log in try: self.mail_con.login("cyrus", "geheim") self.logged_in = True #self.mail_con.proxyauth(username) logging.info("Logged in as %s." % username) except self.mail_con.error as ex: self.logged_in = False logging.error("Could not log in as user %s: %s" % (username, ex)) sys.exit() # list mailboxes try: _result, mailboxes = self.mail_con.list() except self.mail_con.error as ex: logging.warning("Could not retrieve mailboxes for user %s: %s" % (username, ex)) self.mailboxes = [] for mailbox in mailboxes: mailbox = MAILBOX_RESP.match(mailbox.decode('iso-8859-1')).groups() self.mailboxes.append(mailbox) self.mailboxes = sorted(self.mailboxes, key=lambda box: box[2], reverse=True) return def __del__(self): """Closes the connection and the user session.""" #if self.logged_in: # self.mail_con.close() # self.mail_con.logout() def clear_inbox_acls(self, user): """Resets the inbox acls for a given user.""" try: _result, inbox_acls = self.mail_con.getacl("INBOX") except self.mail_con.error as ex: logging.warning("Could not get the acls of INBOX: %s" % ex) return inbox_acls = ACLS_RESP.findall(inbox_acls[0][6:]) logging.debug("Retrieved acls from INBOX are %s" % inbox_acls) for acl_ref in inbox_acls: if acl_ref[0] != user: try: self.mail_con.deleteacl("INBOX", acl_ref[0]) logging.debug("Reset acls on INBOX for user %s" % acl_ref[0].decode('iso-8859-1')) except self.mail_con.error as ex: logging.warning("Could not reset acls on INBOX for user %s: %s" % (acl_ref[0], ex)) return def add_acls(self, mailbox, mailbox_list, original_user, target_user): """Add acls to mailbox.""" # change encoding to internal cyrus format and make folder absolute mailbox = mailbox.replace("INBOX/", "user/" + original_user + "/") mailbox = mailbox.replace(".", "^") mailbox = mailbox.replace("/", ".") # find folder to set all acls try: mbox_acls = mailbox_list[mailbox] except KeyError: # no rights for the mailbox were found return for acl_user in mbox_acls: if acl_user != target_user and acl_user != original_user: try: self.mail_con.setacl(mailbox, acl_user, mbox_acls[acl_user]) logging.debug("Set acls %s for user %s on mailbox %s" % (mbox_acls[acl_user], acl_user, mailbox)) except self.mail_con.error as ex: logging.warning("Could not set acls %s for user %s on mailbox %s: %s" % (mbox_acls[acl_user], acl_user, mailbox, ex)) return def delete_mailboxes(self, deleted_mailbox): """Delete specified mailbox or empty inbox.""" for mailbox in self.mailboxes: pattern = '^\"?' + deleted_mailbox # if INBOX it cannot be deleted so add delimiter if (deleted_mailbox == "INBOX"): pattern += mailbox[1] if re.compile(pattern).match(mailbox[2]): result, data = self.mail_con.delete(mailbox[2]) if result == "OK": logging.debug("Deleted mailbox %s" % mailbox[2]) else: logging.warning("Could not delete mailbox %s: %s" % (mailbox[2], data[0])) return def create_mailbox(self, mailbox): """Create new mailbox to inject messages.""" if mailbox != "INBOX": result, data = self.mail_con.create(mailbox) if result == "OK": logging.debug("Creating mailbox %s" % mailbox) else: logging.warning("Could not create mailbox %s: %s" % (mailbox, data[0])) return def inject_message(self, message, mailbox, internal_date): """Inject a message into a mailbox.""" result, data = self.mail_con.append(mailbox, "\\Seen", internal_date, message.encode()) if result == "OK": logging.debug("Appending message to mailbox %s" % mailbox) else: logging.warning("Could not append the e-mail %s: %s" % (message, data[0])) return