if mailbox != 'INBOX':
folder = os.path.join(folder, cyrus_escape(mailbox))
for filename in os.listdir(folder):
- if not re.match('\d+\.', filename):
+ if not re.match(r'\d+\.', filename):
continue
full_path = os.path.join(folder, filename)
yield full_path, parse_mail_file(os.path.join(folder, filename),
**kwargs)
+def get_message_text(filename, fallback_encoding='iso8859-1',
+ include_all_text=False):
+ """
+ Extract message text as string from email message.
+
+ Intended as complementary addition to get_user_mail, e.g. ::
+
+ for filename, msg in get_user_mail(user):
+ # rough filtering based on headers
+ if msg['Subject'] != 'Expected Subject':
+ continue
+ # get message text for closer inspection
+ text = get_message_text(filename)
+ if 'Expected Text' not in text:
+ continue
+ ...
+
+ Finds the first part in message that is of type text/plain and decodes it
+ using encoding specified in mail or otherwise fallback encoding. If none
+ found takes first part of type "text/*", or otherwise just the first part.
+
+ If include_all_text is True, all text/* parts are included, with text/plain
+ being the first.
+
+ :param str filename: complete path of message file in filesystem
+ :param bool include_all_text: include all "text/*" parts in returned text
+ :returns: text(s) of message
+ :rtype: [str] if include_all_text else str
+ """
+ result = []
+ msg = parse_mail_file(filename, headers_only=False)
+ for part in msg.walk():
+ if part.get_content_type() != 'text/plain':
+ continue
+ encoding = part.get_content_charset(fallback_encoding)
+ result.append(part.get_payload(decode=True).decode(encoding))
+
+ if result and not include_all_text:
+ return result[0]
+
+ # no text/plain found. Try only "text/":
+ for part in msg.walk():
+ cont_type = part.get_content_type()
+ if cont_type.startswith('text/') and cont_type != 'text/plain':
+ encoding = part.get_content_charset(fallback_encoding)
+ result.append(part.get_payload(decode=True).decode(encoding))
+
+ if result:
+ if not include_all_text:
+ return result[0]
+ return result
+
+ # no "text/" found. Just take first part
+ while msg.is_multipart():
+ msg = msg.get_payload(0)
+
+ encoding = msg.get_content_charset(fallback_encoding)
+ if include_all_text:
+ return [msg.get_payload(decode=True).decode(encoding), ]
+ return msg.get_payload(decode=True).decode(encoding)
+
+
def cyrus_escape(user_or_folder, keep_path=False, regex=False):
"""
Converts names of users or mailbox folders to cyrus format.