Source code for mindroot.coreplugins.email.email_provider

from typing import Dict, List
from .smtp_handler import SMTPHandler
from .imap_handler import IMAPHandler
import logging

logger = logging.getLogger(__name__)

[docs] class EmailProvider: def __init__(self, config: Dict): self.config = config self.smtp = SMTPHandler(config) self.imap = IMAPHandler(config)
[docs] async def send_email(self, to: str, subject: str, body: str, reply_to_message: Dict = None, headers: Dict = None) -> Dict: """Send an email using SMTP""" return await self.smtp.send_email( to=to, subject=subject, body=body, reply_to_message=reply_to_message, headers=headers )
[docs] async def check_emails(self, folder: str = "INBOX", criteria: Dict = None, batch_size: int = None, max_messages: int = None, start_id: str = None) -> Dict: """Check for emails using IMAP""" return await self.imap.check_emails( folder=folder, criteria=criteria, batch_size=batch_size, max_messages=max_messages, start_id=start_id )
[docs] async def mark_as_processed(self, message_ids: List[str], folder: str = "INBOX") -> Dict: """Mark messages as processed using IMAP""" return await self.imap.mark_as_processed( message_ids=message_ids, folder=folder )