""" Email Service with Asynchronous Queue Processing This service provides email sending functionality with built-in queue management to prevent SMTP server saturation. Key features: 1. Asynchronous Email Queue: - Multiple calls to send_email() are queued automatically - Emails are processed with a 5-second delay between sends - Background thread processes the queue continuously 2. Two Send Methods: - send_email(): Adds email to queue (recommended for most use cases) - send_email_sync(): Sends immediately without queuing (for urgent emails) 3. Queue Management: - get_queue_size(): Check number of pending emails - is_queue_processing(): Check if queue worker is active - clear_queue(): Remove all pending emails from queue 4. Automatic Retry Logic: - Handles SMTP disconnections with automatic reconnection - Comprehensive logging for debugging and monitoring Usage Example: sender = get_email_sender() # Queue multiple emails (recommended) sender.send_email("Subject 1", "Body 1", ["user1@example.com"]) sender.send_email("Subject 2", "Body 2", ["user2@example.com"]) # These will be sent with 5-second intervals automatically # Check queue status print(f"Emails in queue: {sender.get_queue_size()}") # Send urgent email immediately sender.send_email_sync("Urgent", "Urgent message", ["admin@example.com"]) """ import asyncio from config.settings import MAIL, MAIL_PASSWORD import smtplib from email.message import EmailMessage from logging import getLogger from typing import Optional, Dict, Any from services.logging_service import structured_logger, LogLevel from queue import Queue, Empty from threading import Thread, Lock import time logger = getLogger(__name__) class EmailSender: def __init__(self): self.email = MAIL self.password = MAIL_PASSWORD self._smtp: Optional[smtplib.SMTP_SSL] = None self._email_queue: Queue = Queue() self._queue_worker_thread: Optional[Thread] = None self._queue_lock = Lock() self._is_processing = False self.connect() self._start_queue_worker() def connect(self): if self._smtp is None: logger.info("Establishing new SMTP connection...") structured_logger.log_email_event( "Establishing SMTP connection", LogLevel.INFO, {"email_server": "smtp.gmail.com", "port": 465, "sender_email": self.email} ) try: self._smtp = smtplib.SMTP_SSL('smtp.gmail.com', 465) self._smtp.login(self.email, self.password) self._smtp.ehlo() logger.info("SMTP connection established and authenticated.") structured_logger.log_email_event( "SMTP authentication successful", LogLevel.INFO, {"sender_email": self.email} ) except smtplib.SMTPAuthenticationError as e: error_msg = f"SMTP authentication failed: {e}" logger.error(error_msg) structured_logger.log_email_event( "SMTP authentication failed", LogLevel.ERROR, { "sender_email": self.email, "error": str(e), "error_type": "SMTPAuthenticationError" } ) raise except Exception as e: error_msg = f"Failed to establish SMTP connection: {e}" logger.error(error_msg) structured_logger.log_email_event( "SMTP connection failed", LogLevel.ERROR, { "sender_email": self.email, "error": str(e), "error_type": type(e).__name__ } ) raise def close(self): if self._smtp: logger.info("Closing SMTP connection.") structured_logger.log_email_event( "Closing SMTP connection", LogLevel.INFO, {"sender_email": self.email} ) try: self._smtp.quit() self._smtp = None except Exception as e: logger.warning(f"Error closing SMTP connection: {e}") self._smtp = None # Force reset even if quit fails # Stop the queue worker self._stop_queue_worker() def _start_queue_worker(self): """Start the background thread to process email queue""" with self._queue_lock: if not self._is_processing: self._is_processing = True self._queue_worker_thread = Thread(target=self._process_email_queue, daemon=True) self._queue_worker_thread.start() logger.info("Email queue worker started") def _stop_queue_worker(self): """Stop the background thread processing email queue""" with self._queue_lock: if self._is_processing: self._is_processing = False # Add a sentinel value to wake up the worker thread self._email_queue.put(None) if self._queue_worker_thread and self._queue_worker_thread.is_alive(): self._queue_worker_thread.join(timeout=10) logger.info("Email queue worker stopped") def _process_email_queue(self): """Background worker that processes emails from the queue with 5-second delays""" logger.info("Email queue processor started") while self._is_processing: try: # Get email from queue (blocking call) email_data = self._email_queue.get(timeout=1) # Check for sentinel value (None) to stop processing if email_data is None: break # Process the email self._send_email_immediately( email_data['subject'], email_data['body'], email_data['to'], email_data['kwargs'] ) # Wait 5 seconds before processing next email (unless it's the last one in queue) if not self._email_queue.empty(): logger.info("Waiting 5 seconds before sending next email...") time.sleep(5) except Empty: # If queue is empty, just continue to check for new emails continue except Exception as e: if self._is_processing: # Only log if we're still supposed to be processing logger.error(f"Error processing email queue: {e}") time.sleep(1) # Brief pause before retrying logger.info("Email queue processor stopped") def _send_email_immediately(self, subject: str, body: str, to: list[str], kwargs: Dict[str, Any]): """Send email immediately without queuing""" if self._smtp is None: self.connect() logger.debug(f"Sending queued email to: {to} with subject: '{subject}'") structured_logger.log_email_event( f"Sending queued email with subject: '{subject}'", LogLevel.INFO, { "subject": subject, "recipients": to, "sender_email": self.email, "body_length": len(body), "kwargs_count": len(kwargs), "queued": True } ) try: msg = EmailMessage() msg['Subject'] = subject msg['From'] = self.email msg['To'] = ", ".join(to) msg.set_content('Este correo tiene contenido HTML.') msg.add_alternative(body.format(**kwargs), subtype='html') if not self._smtp: error_msg = "Cannot send email because SMTP connection is not established" logger.error(error_msg) structured_logger.log_email_event( "Queued email send failed: SMTP connection not established", LogLevel.ERROR, { "subject": subject, "recipients": to, "sender_email": self.email } ) raise ConnectionError(error_msg) self._smtp.send_message(msg) logger.info(f"Queued email sent to {to} with subject '{subject}'.") structured_logger.log_email_event( f"Queued email sent successfully", LogLevel.INFO, { "subject": subject, "recipients": to, "sender_email": self.email, "recipients_count": len(to), "queued": True } ) except smtplib.SMTPServerDisconnected as e: logger.warning("SMTP connection disconnected during queued send, retrying...") structured_logger.log_email_event( "SMTP disconnected during queued send, attempting retry", LogLevel.WARNING, { "subject": subject, "recipients": to, "error": str(e) } ) try: self.close() self.connect() if self._smtp: self._smtp.send_message(msg) logger.info(f"Queued email resent successfully to {to}.") structured_logger.log_email_event( "Queued email sent successfully after retry", LogLevel.INFO, { "subject": subject, "recipients": to, "sender_email": self.email, "retry_attempt": True, "queued": True } ) except Exception as retry_error: error_msg = f"Failed to resend queued email after retry: {retry_error}" logger.error(error_msg) structured_logger.log_email_event( "Queued email retry failed", LogLevel.ERROR, { "subject": subject, "recipients": to, "original_error": str(e), "retry_error": str(retry_error), "error_type": type(retry_error).__name__, "queued": True } ) raise except Exception as e: error_msg = f"Failed to send queued email to {to}: {e}" logger.error(error_msg) structured_logger.log_email_event( "Queued email send failed", LogLevel.ERROR, { "subject": subject, "recipients": to, "sender_email": self.email, "error": str(e), "error_type": type(e).__name__, "queued": True } ) raise def send_email(self, subject: str, body: str, to: list[str], **kwargs): """Add email to queue for asynchronous sending with 5-second delays""" logger.debug(f"Queuing email to: {to} with subject: '{subject}'") structured_logger.log_email_event( f"Queuing email with subject: '{subject}'", LogLevel.INFO, { "subject": subject, "recipients": to, "sender_email": self.email, "body_length": len(body), "kwargs_count": len(kwargs), "queue_size": self._email_queue.qsize() + 1 } ) # Add email to queue email_data = { 'subject': subject, 'body': body, 'to': to, 'kwargs': kwargs } self._email_queue.put(email_data) logger.info(f"Email queued for {to}. Queue size: {self._email_queue.qsize()}") def send_email_sync(self, subject: str, body: str, to: list[str], **kwargs): """Send email immediately (synchronous) - for backwards compatibility""" if self._smtp is None: self.connect() logger.debug(f"Preparing to send email to: {to} with subject: '{subject}'") structured_logger.log_email_event( f"Preparing to send email with subject: '{subject}'", LogLevel.INFO, { "subject": subject, "recipients": to, "sender_email": self.email, "body_length": len(body), "kwargs_count": len(kwargs) } ) try: msg = EmailMessage() msg['Subject'] = subject msg['From'] = self.email msg['To'] = ", ".join(to) msg.set_content('Este correo tiene contenido HTML.') msg.add_alternative(body.format(**kwargs), subtype='html') if not self._smtp: error_msg = "Cannot send email because SMTP connection is not established" logger.error(error_msg) structured_logger.log_email_event( "Email send failed: SMTP connection not established", LogLevel.ERROR, { "subject": subject, "recipients": to, "sender_email": self.email } ) raise ConnectionError(error_msg) self._smtp.send_message(msg) logger.info(f"Email sent to {to} with subject '{subject}'.") structured_logger.log_email_event( f"Email sent successfully", LogLevel.INFO, { "subject": subject, "recipients": to, "sender_email": self.email, "recipients_count": len(to) } ) except smtplib.SMTPServerDisconnected as e: logger.warning("SMTP connection disconnected, retrying...") structured_logger.log_email_event( "SMTP disconnected during send, attempting retry", LogLevel.WARNING, { "subject": subject, "recipients": to, "error": str(e) } ) try: self.close() self.connect() if self._smtp: self._smtp.send_message(msg) logger.info(f"Email resent successfully to {to}.") structured_logger.log_email_event( "Email sent successfully after retry", LogLevel.INFO, { "subject": subject, "recipients": to, "sender_email": self.email, "retry_attempt": True } ) except Exception as retry_error: error_msg = f"Failed to resend email after retry: {retry_error}" logger.error(error_msg) structured_logger.log_email_event( "Email retry failed", LogLevel.ERROR, { "subject": subject, "recipients": to, "original_error": str(e), "retry_error": str(retry_error), "error_type": type(retry_error).__name__ } ) self.close() raise except Exception as e: error_msg = f"Failed to send email to {to}: {e}" logger.error(error_msg) structured_logger.log_email_event( "Email send failed", LogLevel.ERROR, { "subject": subject, "recipients": to, "sender_email": self.email, "error": str(e), "error_type": type(e).__name__ } ) self.close() raise def get_queue_size(self) -> int: """Get the current number of emails in the queue""" return self._email_queue.qsize() def is_queue_processing(self) -> bool: """Check if the queue worker is currently processing emails""" return self._is_processing and bool(self._queue_worker_thread and self._queue_worker_thread.is_alive()) def clear_queue(self): """Clear all pending emails from the queue""" with self._queue_lock: # Clear the queue while not self._email_queue.empty(): try: self._email_queue.get_nowait() except: break logger.info("Email queue cleared") structured_logger.log_email_event( "Email queue cleared", LogLevel.INFO, {"sender_email": self.email} ) email_sender: Optional[EmailSender] = None def initialize_email_sender(): global email_sender email_sender = EmailSender() def get_email_sender() -> EmailSender: if email_sender is None: initialize_email_sender() if not email_sender: raise ValueError("Email sender is not initialized and failed to initialize.") return email_sender