Add unit testing for the mailbox acls parsing functionality
[imap-restore-mail] / mail_iterator.py
index 1c4550c..af45da9 100644 (file)
@@ -4,12 +4,14 @@ restore-mail-inject.py - Tool to inject mails via IMAP
 Copyright (c) 2012 Intra2net AG
 '''
 
-import imaplib
+import sys
+import socket, imaplib
 import re
+import logging
 
 MAILBOX_RESP = re.compile(r'\((?P<flags>.*?)\) "(?P<delimiter>.*)" (?P<name>.*)')
 UIDVAL_RESP = re.compile(r'(?P<name>.*) \(UIDVALIDITY (?P<uidval>.*)\)')
-ACLS_RESP = re.compile(r'(?P<name>.*) \(UIDVALIDITY (?P<uidval>.*)\)')
+ACLS_RESP = re.compile(b'(?P<user>.*) (?P<acls>.*)')
 
 class MailIterator:
     """This class communicates with the e-mail server."""
@@ -22,58 +24,70 @@ class MailIterator:
     # logged in status
     logged_in = None
 
-    def __init__(self, server, username, password):
+    def __init__(self, username, server = "intranator.m.i2n"):
         """Creates a connection and a user session."""
-        
+
         # connect to server
         try:
-            self.mail_con = imaplib.IMAP4_SSL(server)
-            print("Connected to %s." % server)
-        except Exception as ex:
-            raise UserWarning("Could not connect to host %s: %s" % (server, ex))
+            self.mail_con = imaplib.IMAP4(server)
+            # MODIFIED
+            imap_socket = socket.socket(socket.AF_UNIX)
+            imap_socket.connect("/var/imap/socket/imap")
+            self.mail_con.sock = imap_socket
+            self.mail_con.file = self.mail_con.sock.makefile('rb')
+            logging.info("Connected to mail server %s", server)
+        except (socket.error, self.mail_con.error) as ex:
+            logging.error("Could not connect to host: %s", ex)
+            sys.exit()
 
         # log in
         try:
-            self.mail_con.login(username, password)
+            self.mail_con.login("cyrus", "geheim")
             self.logged_in = True
-            print("Logged in as %s." % username)
-        except:
+            #self.mail_con.proxyauth(username)
+            logging.info("Logged in as %s.", username)
+        except self.mail_con.error as ex:
             self.logged_in = False
-            raise UserWarning("Could not log in as user " + username)
-            
+            logging.error("Could not log in as user %s: %s", username, ex)
+            sys.exit()
+
         # list mailboxes
         try:
             _result, mailboxes = self.mail_con.list()
-        except (self.mail_con.error):
-            raise UserWarning("Could not retrieve mailboxes for user " + username)
+        except self.mail_con.error as ex:
+            logging.warning("Could not retrieve mailboxes for user %s: %s", username, ex)
         self.mailboxes = []
         for mailbox in mailboxes:
             mailbox = MAILBOX_RESP.match(mailbox.decode('iso-8859-1')).groups()
             self.mailboxes.append(mailbox)
         self.mailboxes = sorted(self.mailboxes, key=lambda box: box[2], reverse=True)
-        
+
         return
 
     def __del__(self):
         """Closes the connection and the user session."""
-        #if self.logged_in:
-        #    self.mail_con.close()
-        #    self.mail_con.logout()
+
+        if self.logged_in:
+            self.mail_con.logout()
 
     def clear_inbox_acls(self, user):
         """Resets the inbox acls for a given user."""
+
         try:
             _result, inbox_acls = self.mail_con.getacl("INBOX")
-        except:
-            print("Could not get the acls of INBOX.")
-        inbox_acls = inbox_acls[0].split(' ')[1:]
-        for acl_user in inbox_acls:
-            if acl_user != user:
+        except self.mail_con.error as ex:
+            logging.warning("Could not get the acls of INBOX: %s", ex)
+            return
+        inbox_acls = ACLS_RESP.findall(inbox_acls[0][6:])
+        logging.debug("Retrieved acls from INBOX are %s", inbox_acls)
+        for acl_ref in inbox_acls:
+            if acl_ref[0] != user:
                 try:
-                    self.mail_con.setacl("INBOX", acl_user, "")
-                    print("Reset acls on INBOX for user %s" % acl_user)
-                except:
-                    print("Could not reset acls on INBOX for user %s" % acl_user)
+                    self.mail_con.deleteacl("INBOX", acl_ref[0])
+                    logging.debug("Reset acls on INBOX for user %s", acl_ref[0].decode('iso-8859-1'))
+                except self.mail_con.error as ex:
+                    logging.warning("Could not reset acls on INBOX for user %s: %s", acl_ref[0], ex)
+
         return
 
     def add_acls(self, mailbox, mailbox_list, original_user, target_user):
@@ -94,14 +108,15 @@ class MailIterator:
             if acl_user != target_user and acl_user != original_user:
                 try:
                     self.mail_con.setacl(mailbox, acl_user, mbox_acls[acl_user])
-                    print("Set acls %s for user %s on mailbox %s." % (mbox_acls[acl_user], acl_user, mailbox))
-                except:
-                    print("Could not set acls %s for user %s on mailbox %s." % (mbox_acls[acl_user], acl_user, mailbox))    
+                    logging.debug("Set acls %s for user %s on mailbox %s", mbox_acls[acl_user], acl_user, mailbox)
+                except self.mail_con.error as ex:
+                    logging.warning("Could not set acls %s for user %s on mailbox %s: %s", mbox_acls[acl_user], acl_user, mailbox, ex)    
 
         return
 
     def delete_mailboxes(self, deleted_mailbox):
         """Delete specified mailbox or empty inbox."""
+
         for mailbox in self.mailboxes:
             pattern = '^\"?' + deleted_mailbox
             # if INBOX it cannot be deleted so add delimiter
@@ -110,26 +125,31 @@ class MailIterator:
             if re.compile(pattern).match(mailbox[2]):
                 result, data = self.mail_con.delete(mailbox[2])
                 if result == "OK":
-                    print("Deleted mailbox %s" % mailbox[2])
+                    logging.debug("Deleted mailbox %s", mailbox[2])
                 else:
-                    print("Could not delete folder %s: %s" % (mailbox[2], data[0]))         
+                    logging.warning("Could not delete mailbox %s: %s", mailbox[2], data[0]) 
+        
         return
 
     def create_mailbox(self, mailbox):
+
         """Create new mailbox to inject messages."""
         if mailbox != "INBOX":
             result, data = self.mail_con.create(mailbox)
             if result == "OK":
-                print("Creating mailbox %s" % mailbox)
+                logging.debug("Creating mailbox %s", mailbox)
             else:
-                print("Could not create mailbox %s: %s" % (mailbox, data[0]))
+                logging.warning("Could not create mailbox %s: %s", mailbox, data[0])
+
         return
 
     def inject_message(self, message, mailbox, internal_date):
+
         """Inject a message into a mailbox."""
-        result, data = self.mail_con.append(mailbox, "\\Seen", internal_date, message)
+        result, data = self.mail_con.append(mailbox, "\\Seen", internal_date, message.encode())
         if result == "OK":
-            print("Appending message to mailbox %s" % mailbox)
+            logging.debug("Appending message to mailbox %s", mailbox)
         else:
-            print("Could not append the e-mail %s: %s" % (message, data[0]))
+            logging.warning("Could not append the e-mail %s: %s", message, data[0])
+
         return
\ No newline at end of file