Source code for mindroot.coreplugins.email.imap_handler

import imaplib
import email
import chardet
from datetime import datetime, timedelta
from typing import Dict, List
import logging

logger = logging.getLogger(__name__)

[docs] class IMAPHandler: def __init__(self, config: Dict): self.imap_server = config.get('imap_server') self.imap_port = config.get('imap_port', 993) self.email = config['email'] self.password = config['password'] self.default_batch_size = config.get('batch_size', 50) self.default_max_messages = config.get('max_messages', 100)
[docs] def connect(self) -> imaplib.IMAP4: """Establish IMAP connection""" try: server = imaplib.IMAP4_SSL(self.imap_server, self.imap_port) server.login(self.email, self.password) return server except Exception as e: logger.error(f"IMAP Connection error: {str(e)}") raise
[docs] def build_search_criteria(self, criteria: Dict = None, last_uid: str = None) -> str: """Build IMAP search criteria string""" search_terms = [] # Add UID range if we're paginating if last_uid: search_terms.append(f'UID 1:{last_uid}') if criteria: if criteria.get('unread_only'): search_terms.append('UNSEEN') if criteria.get('since_date'): date_str = criteria['since_date'].strftime("%d-%b-%Y") search_terms.append(f'SINCE "{date_str}"') if criteria.get('from'): search_terms.append(f'FROM "{criteria["from"]}"') if criteria.get('subject'): search_terms.append(f'SUBJECT "{criteria["subject"]}"') else: # Default to messages from the last 30 days thirty_days_ago = (datetime.now() - timedelta(days=30)).strftime("%d-%b-%Y") search_terms.append(f'SINCE "{thirty_days_ago}"') return ' '.join(search_terms) if search_terms else 'ALL'
[docs] def decode_text(self, text_bytes: bytes) -> str: """Safely decode text bytes to string, trying multiple encodings""" if not text_bytes: return "" # Try UTF-8 first try: return text_bytes.decode('utf-8') except UnicodeDecodeError: pass # Try to detect encoding try: detected = chardet.detect(text_bytes) if detected and detected['encoding']: return text_bytes.decode(detected['encoding']) except Exception: pass # Fallback encodings for encoding in ['latin1', 'iso-8859-1', 'cp1252']: try: return text_bytes.decode(encoding) except UnicodeDecodeError: continue # Last resort: decode with replacement return text_bytes.decode('utf-8', errors='replace')
[docs] def get_email_body(self, email_message) -> str: """Extract email body with better encoding handling""" body = "" if email_message.is_multipart(): for part in email_message.walk(): if part.get_content_type() == "text/plain": payload = part.get_payload(decode=True) if payload: body = self.decode_text(payload) break else: payload = email_message.get_payload(decode=True) if payload: body = self.decode_text(payload) return body
[docs] async def check_emails(self, folder: str = "INBOX", criteria: Dict = None, batch_size: int = None, max_messages: int = None, start_id: str = None) -> Dict: """Check for emails in specified folder with pagination using UIDs""" try: batch_size = batch_size or self.default_batch_size max_messages = max_messages or self.default_max_messages server = self.connect() server.select(folder) # Build search criteria search_criteria = self.build_search_criteria(criteria, start_id) logger.info(f"Using search criteria: {search_criteria}") # Use UID SEARCH instead of regular SEARCH _, data = server.uid('search', None, search_criteria) message_uids = data[0].split() # Sort UIDs in descending order (newest first) message_uids = sorted([int(uid) for uid in message_uids], reverse=True) total_messages = len(message_uids) logger.info(f"Found {total_messages} total messages") # Take the next batch of UIDs batch_uids = message_uids[:batch_size] email_list = [] for uid in batch_uids: try: # Use UID FETCH instead of regular FETCH _, msg_data = server.uid('fetch', str(uid), '(RFC822)') email_body = msg_data[0][1] email_message = email.message_from_bytes(email_body) # Extract body with safe decoding body = self.get_email_body(email_message) email_data = { "id": str(uid), # Use UID as the message ID "message_id": email_message['Message-ID'], "in_reply_to": email_message['In-Reply-To'], "from": email_message['from'], "to": email_message['to'], "subject": email_message['subject'], "date": email_message['date'], "body": body } email_list.append(email_data) except Exception as e: logger.error(f"Error processing message UID {uid}: {str(e)}") continue server.close() server.logout() # Determine if there are more messages and next starting point has_more = len(message_uids) > batch_size next_id = str(batch_uids[-1] - 1) if has_more else None return { "success": True, "messages": email_list, "error": None, "pagination": { "total_messages": total_messages, "batch_size": batch_size, "next_id": next_id, "has_more": has_more } } except Exception as e: logger.error(f"Error checking emails: {str(e)}") return { "success": False, "messages": [], "error": str(e), "pagination": None }
[docs] async def mark_as_processed(self, message_ids: List[str], folder: str = "INBOX") -> Dict: """Mark messages as processed/read using UIDs""" try: server = self.connect() server.select(folder) processed = [] failed = [] for msg_id in message_ids: try: # Use UID STORE instead of regular STORE server.uid('store', msg_id, '+FLAGS', '\\Seen') processed.append(msg_id) except Exception as e: logger.error(f"Error marking message UID {msg_id}: {str(e)}") failed.append(msg_id) server.close() server.logout() return { "success": len(failed) == 0, "processed": processed, "failed": failed, "error": None if len(failed) == 0 else "Some messages failed to mark" } except Exception as e: logger.error(f"Error marking messages: {str(e)}") return { "success": False, "processed": [], "failed": message_ids, "error": str(e) }