""" Email Service with Asynchronous Queue Processing This service provides email sending functionality with built-in queue management to prevent SMTP server saturation. Key features: 1. Asynchronous Email Queue: - Multiple calls to send_email() are queued automatically - Emails are processed with a 10-second delay between sends - Background thread processes the queue continuously 2. Two Send Methods: - send_email(): Adds email to queue (recommended for most use cases) - send_email_sync(): Sends immediately without queuing (for urgent emails) 3. Queue Management: - get_queue_size(): Check number of pending emails - is_queue_processing(): Check if queue worker is active - clear_queue(): Remove all pending emails from queue 4. Automatic Retry Logic: - Handles SMTP disconnections with automatic reconnection - Fresh connection for each email to prevent timeouts - Comprehensive logging for debugging and monitoring Usage Example: sender = get_email_sender() # Queue multiple emails (recommended) sender.send_email("Subject 1", "Body 1", ["user1@example.com"]) sender.send_email("Subject 2", "Body 2", ["user2@example.com"]) # These will be sent with 10-second intervals automatically # Check queue status print(f"Emails in queue: {sender.get_queue_size()}") # Send urgent email immediately sender.send_email_sync("Urgent", "Urgent message", ["admin@example.com"]) """ import asyncio from config.settings import MAIL, MAIL_PASSWORD import smtplib from email.message import EmailMessage from logging import getLogger from typing import Optional, Dict, Any from queue import Queue, Empty from threading import Thread, Lock, current_thread import time import random logger = getLogger(__name__) class EmailSender: def __init__(self): self.email = MAIL self.password = MAIL_PASSWORD self._smtp: Optional[smtplib.SMTP_SSL] = None self._email_queue: Queue = Queue() self._queue_worker_thread: Optional[Thread] = None self._queue_lock = Lock() self._is_processing = False self._start_queue_worker() def connect(self): """Establish SMTP connection - kept for compatibility but not used for persistent connections""" logger.info("Testing SMTP connection...") try: # Test connection test_smtp = smtplib.SMTP_SSL('smtp.gmail.com', 465) test_smtp.login(self.email, self.password) test_smtp.ehlo() test_smtp.quit() logger.info("SMTP connection test successful.") except smtplib.SMTPAuthenticationError as e: error_msg = f"SMTP authentication failed: {e}" logger.error(error_msg) raise e except Exception as e: error_msg = f"Failed to establish SMTP connection: {e}\n line: {e.__traceback__.tb_lineno}" logger.error(error_msg) raise e def close(self): """Close SMTP connection and stop queue worker""" logger.info("Closing email service.") # Stop the queue worker self._stop_queue_worker() # Clear any persistent connection (though we don't use them anymore) if self._smtp: try: self._smtp.quit() except: pass finally: self._smtp = None def _create_fresh_connection(self): """Create a fresh SMTP connection for each email send""" try: smtp = smtplib.SMTP_SSL('smtp.gmail.com', 465, timeout=30) smtp.login(self.email, self.password) reply = smtp.ehlo() print(f"SMTP connection established: {reply}") return smtp except Exception as e: logger.error(f"Failed to create fresh SMTP connection: {e}") raise def _start_queue_worker(self): """Start the background thread to process email queue""" with self._queue_lock: if not self._is_processing: self._is_processing = True self._queue_worker_thread = Thread(target=self._process_email_queue, daemon=True) self._queue_worker_thread.start() logger.info("Email queue worker started") def _stop_queue_worker(self): """Stop the background thread processing email queue""" with self._queue_lock: if self._is_processing: self._is_processing = False # Add a sentinel value to wake up the worker thread self._email_queue.put(None) # Only join if we're not in the same thread if (self._queue_worker_thread and self._queue_worker_thread.is_alive() and self._queue_worker_thread != current_thread()): try: self._queue_worker_thread.join(timeout=10) except RuntimeError as e: logger.warning(f"Could not join queue worker thread: {e}") logger.info("Email queue worker stopped") def _process_email_queue(self): """Background worker that processes emails from the queue with 10-second delays""" logger.info("Email queue processor started") while self._is_processing: try: # Get email from queue (blocking call with timeout) email_data = self._email_queue.get(timeout=1) # Check for sentinel value (None) to stop processing if email_data is None: break # Process the email with retry logic success = self._send_email_with_retry( email_data['subject'], email_data['body'], email_data['to'], email_data['kwargs'] ) if not success: logger.error(f"Failed to send email after all retries: {email_data['subject']}") # Wait 10 seconds before processing next email (unless it's the last one in queue) if not self._email_queue.empty(): logger.info("Waiting 10 seconds before sending next email...") time.sleep(10) except Empty: # If queue is empty, just continue to check for new emails continue except Exception as e: if self._is_processing: # Only log if we're still supposed to be processing logger.error(f"Error processing email queue: {e}") time.sleep(2) # Brief pause before retrying logger.info("Email queue processor stopped") def _send_email_with_retry(self, subject: str, body: str, to: list[str], kwargs: Dict[str, Any], max_retries: int = 3) -> bool: """Send email with retry logic using fresh connections""" for attempt in range(max_retries): try: return self._send_email_immediately(subject, body, to, kwargs) except Exception as e: if attempt < max_retries - 1: wait_time = (2 ** attempt) + random.uniform(0, 1) logger.warning(f"Email send attempt {attempt + 1} failed: {e}. Waiting {wait_time:.2f}s before retry...") time.sleep(wait_time) else: logger.error(f"All email send attempts failed for {to}: {e}") return False return False def _send_email_immediately(self, subject: str, body: str, to: list[str], kwargs: Dict[str, Any]) -> bool: """Send email immediately using a fresh SMTP connection""" logger.debug(f"Sending queued email to: {to} with subject: '{subject}'") smtp = None try: # Create fresh connection for this email smtp = self._create_fresh_connection() msg = EmailMessage() msg['Subject'] = subject msg['From'] = self.email msg['To'] = ", ".join(to) msg.set_content('Este correo tiene contenido HTML.') msg.add_alternative(body.format(**kwargs), subtype='html') smtp.send_message(msg) logger.info(f"Queued email sent to {to} with subject '{subject}'.") return True except Exception as e: error_msg = f"Failed to send queued email to {to}: {e}" logger.error(error_msg) raise finally: # Always close the connection if smtp: try: smtp.quit() except: pass def send_email(self, subject: str, body: str, to: list[str], **kwargs): """Add email to queue for asynchronous sending with 10-second delays""" logger.debug(f"Queuing email to: {to} with subject: '{subject}'") # Add email to queue email_data = { 'subject': subject, 'body': body, 'to': to, 'kwargs': kwargs } self._email_queue.put(email_data) logger.info(f"Email queued for {to}. Queue size: {self._email_queue.qsize()}") def send_email_sync(self, subject: str, body: str, to: list[str], **kwargs): """Send email immediately (synchronous) using fresh connection""" logger.debug(f"Preparing to send email immediately to: {to} with subject: '{subject}'") smtp = None try: # Create fresh connection smtp = self._create_fresh_connection() msg = EmailMessage() msg['Subject'] = subject msg['From'] = self.email msg['To'] = ", ".join(to) msg.set_content('Este correo tiene contenido HTML.') msg.add_alternative(body.format(**kwargs), subtype='html') smtp.send_message(msg) logger.info(f"Email sent immediately to {to} with subject '{subject}'.") except Exception as e: error_msg = f"Failed to send email immediately to {to}: {e}" logger.error(error_msg) raise finally: # Always close the connection if smtp: try: smtp.quit() except: pass def get_queue_size(self) -> int: """Get the current number of emails in the queue""" return self._email_queue.qsize() def is_queue_processing(self) -> bool: """Check if the queue worker is currently processing emails""" return self._is_processing and bool(self._queue_worker_thread and self._queue_worker_thread.is_alive()) def clear_queue(self): """Clear all pending emails from the queue""" with self._queue_lock: # Clear the queue while not self._email_queue.empty(): try: self._email_queue.get_nowait() except: break logger.info("Email queue cleared") # Global email sender instance email_sender: Optional[EmailSender] = None def initialize_email_sender(): """Initialize the global email sender instance""" global email_sender try: email_sender = EmailSender() # Test the connection during initialization email_sender.connect() logger.info("Email sender initialized successfully") except Exception as e: logger.error(f"Failed to initialize email sender: {e}") raise def get_email_sender() -> EmailSender: """Get the global email sender instance, initialize if needed""" global email_sender if email_sender is None: initialize_email_sender() if not email_sender: raise ValueError("Email sender is not initialized and failed to initialize.") return email_sender