| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- """
- Email Service with Asynchronous Queue Processing
- This service provides email sending functionality with built-in queue management
- to prevent SMTP server saturation. Key features:
- 1. Asynchronous Email Queue:
- - Multiple calls to send_email() are queued automatically
- - Emails are processed with a 10-second delay between sends
- - Background thread processes the queue continuously
- 2. Two Send Methods:
- - send_email(): Adds email to queue (recommended for most use cases)
- - send_email_sync(): Sends immediately without queuing (for urgent emails)
- 3. Queue Management:
- - get_queue_size(): Check number of pending emails
- - is_queue_processing(): Check if queue worker is active
- - clear_queue(): Remove all pending emails from queue
- 4. Automatic Retry Logic:
- - Handles SMTP disconnections with automatic reconnection
- - Fresh connection for each email to prevent timeouts
- - Comprehensive logging for debugging and monitoring
- Usage Example:
- sender = get_email_sender()
-
- # Queue multiple emails (recommended)
- sender.send_email("Subject 1", "Body 1", ["user1@example.com"])
- sender.send_email("Subject 2", "Body 2", ["user2@example.com"])
- # These will be sent with 10-second intervals automatically
-
- # Check queue status
- print(f"Emails in queue: {sender.get_queue_size()}")
-
- # Send urgent email immediately
- sender.send_email_sync("Urgent", "Urgent message", ["admin@example.com"])
- """
- import asyncio
- from config.settings import MAIL, MAIL_PASSWORD
- import smtplib
- from email.message import EmailMessage
- from logging import getLogger
- from typing import Optional, Dict, Any
- from queue import Queue, Empty
- from threading import Thread, Lock, current_thread
- import time
- import random
- logger = getLogger(__name__)
- class EmailSender:
- def __init__(self):
- self.email = MAIL
- self.password = MAIL_PASSWORD
- self._smtp: Optional[smtplib.SMTP_SSL] = None
- self._email_queue: Queue = Queue()
- self._queue_worker_thread: Optional[Thread] = None
- self._queue_lock = Lock()
- self._is_processing = False
- self._start_queue_worker()
- def connect(self):
- """Establish SMTP connection - kept for compatibility but not used for persistent connections"""
- logger.info("Testing SMTP connection...")
-
-
- try:
- # Test connection
- test_smtp = smtplib.SMTP_SSL('smtp.gmail.com', 465)
- test_smtp.login(self.email, self.password)
- test_smtp.ehlo()
- test_smtp.quit()
-
- logger.info("SMTP connection test successful.")
-
-
- except smtplib.SMTPAuthenticationError as e:
- error_msg = f"SMTP authentication failed: {e}"
- logger.error(error_msg)
- raise e
- except Exception as e:
- error_msg = f"Failed to establish SMTP connection: {e}\n line: {e.__traceback__.tb_lineno}"
- logger.error(error_msg)
-
- raise e
- def close(self):
- """Close SMTP connection and stop queue worker"""
- logger.info("Closing email service.")
-
-
- # Stop the queue worker
- self._stop_queue_worker()
-
- # Clear any persistent connection (though we don't use them anymore)
- if self._smtp:
- try:
- self._smtp.quit()
- except:
- pass
- finally:
- self._smtp = None
- def _create_fresh_connection(self):
- """Create a fresh SMTP connection for each email send"""
- try:
- smtp = smtplib.SMTP_SSL('smtp.gmail.com', 465, timeout=30)
- smtp.login(self.email, self.password)
- reply = smtp.ehlo()
- print(f"SMTP connection established: {reply}")
- return smtp
- except Exception as e:
- logger.error(f"Failed to create fresh SMTP connection: {e}")
- raise
- def _start_queue_worker(self):
- """Start the background thread to process email queue"""
- with self._queue_lock:
- if not self._is_processing:
- self._is_processing = True
- self._queue_worker_thread = Thread(target=self._process_email_queue, daemon=True)
- self._queue_worker_thread.start()
- logger.info("Email queue worker started")
- def _stop_queue_worker(self):
- """Stop the background thread processing email queue"""
- with self._queue_lock:
- if self._is_processing:
- self._is_processing = False
- # Add a sentinel value to wake up the worker thread
- self._email_queue.put(None)
-
- # Only join if we're not in the same thread
- if (self._queue_worker_thread and
- self._queue_worker_thread.is_alive() and
- self._queue_worker_thread != current_thread()):
- try:
- self._queue_worker_thread.join(timeout=10)
- except RuntimeError as e:
- logger.warning(f"Could not join queue worker thread: {e}")
-
- logger.info("Email queue worker stopped")
- def _process_email_queue(self):
- """Background worker that processes emails from the queue with 10-second delays"""
- logger.info("Email queue processor started")
-
- while self._is_processing:
- try:
- # Get email from queue (blocking call with timeout)
- email_data = self._email_queue.get(timeout=1)
-
- # Check for sentinel value (None) to stop processing
- if email_data is None:
- break
-
- # Process the email with retry logic
- success = self._send_email_with_retry(
- email_data['subject'],
- email_data['body'],
- email_data['to'],
- email_data['kwargs']
- )
-
- if not success:
- logger.error(f"Failed to send email after all retries: {email_data['subject']}")
-
- # Wait 10 seconds before processing next email (unless it's the last one in queue)
- if not self._email_queue.empty():
- logger.info("Waiting 10 seconds before sending next email...")
- time.sleep(10)
-
- except Empty:
- # If queue is empty, just continue to check for new emails
- continue
- except Exception as e:
- if self._is_processing: # Only log if we're still supposed to be processing
- logger.error(f"Error processing email queue: {e}")
- time.sleep(2) # Brief pause before retrying
-
- logger.info("Email queue processor stopped")
- def _send_email_with_retry(self, subject: str, body: str, to: list[str], kwargs: Dict[str, Any], max_retries: int = 3) -> bool:
- """Send email with retry logic using fresh connections"""
- for attempt in range(max_retries):
- try:
- return self._send_email_immediately(subject, body, to, kwargs)
- except Exception as e:
- if attempt < max_retries - 1:
- wait_time = (2 ** attempt) + random.uniform(0, 1)
- logger.warning(f"Email send attempt {attempt + 1} failed: {e}. Waiting {wait_time:.2f}s before retry...")
- time.sleep(wait_time)
- else:
- logger.error(f"All email send attempts failed for {to}: {e}")
- return False
- return False
- def _send_email_immediately(self, subject: str, body: str, to: list[str], kwargs: Dict[str, Any]) -> bool:
- """Send email immediately using a fresh SMTP connection"""
- logger.debug(f"Sending queued email to: {to} with subject: '{subject}'")
-
-
- smtp = None
- try:
- # Create fresh connection for this email
- smtp = self._create_fresh_connection()
-
- msg = EmailMessage()
- msg['Subject'] = subject
- msg['From'] = self.email
- msg['To'] = ", ".join(to)
- msg.set_content('Este correo tiene contenido HTML.')
- msg.add_alternative(body.format(**kwargs), subtype='html')
-
- smtp.send_message(msg)
-
- logger.info(f"Queued email sent to {to} with subject '{subject}'.")
-
- return True
-
- except Exception as e:
- error_msg = f"Failed to send queued email to {to}: {e}"
- logger.error(error_msg)
-
- raise
- finally:
- # Always close the connection
- if smtp:
- try:
- smtp.quit()
- except:
- pass
- def send_email(self, subject: str, body: str, to: list[str], **kwargs):
- """Add email to queue for asynchronous sending with 10-second delays"""
- logger.debug(f"Queuing email to: {to} with subject: '{subject}'")
-
-
- # Add email to queue
- email_data = {
- 'subject': subject,
- 'body': body,
- 'to': to,
- 'kwargs': kwargs
- }
-
- self._email_queue.put(email_data)
- logger.info(f"Email queued for {to}. Queue size: {self._email_queue.qsize()}")
- def send_email_sync(self, subject: str, body: str, to: list[str], **kwargs):
- """Send email immediately (synchronous) using fresh connection"""
- logger.debug(f"Preparing to send email immediately to: {to} with subject: '{subject}'")
-
-
- smtp = None
- try:
- # Create fresh connection
- smtp = self._create_fresh_connection()
-
- msg = EmailMessage()
- msg['Subject'] = subject
- msg['From'] = self.email
- msg['To'] = ", ".join(to)
- msg.set_content('Este correo tiene contenido HTML.')
- msg.add_alternative(body.format(**kwargs), subtype='html')
-
- smtp.send_message(msg)
-
- logger.info(f"Email sent immediately to {to} with subject '{subject}'.")
-
- except Exception as e:
- error_msg = f"Failed to send email immediately to {to}: {e}"
- logger.error(error_msg)
- raise
- finally:
- # Always close the connection
- if smtp:
- try:
- smtp.quit()
- except:
- pass
- def get_queue_size(self) -> int:
- """Get the current number of emails in the queue"""
- return self._email_queue.qsize()
- def is_queue_processing(self) -> bool:
- """Check if the queue worker is currently processing emails"""
- return self._is_processing and bool(self._queue_worker_thread and self._queue_worker_thread.is_alive())
- def clear_queue(self):
- """Clear all pending emails from the queue"""
- with self._queue_lock:
- # Clear the queue
- while not self._email_queue.empty():
- try:
- self._email_queue.get_nowait()
- except:
- break
- logger.info("Email queue cleared")
-
- # Global email sender instance
- email_sender: Optional[EmailSender] = None
- def initialize_email_sender():
- """Initialize the global email sender instance"""
- global email_sender
- try:
- email_sender = EmailSender()
- # Test the connection during initialization
- email_sender.connect()
- logger.info("Email sender initialized successfully")
- except Exception as e:
- logger.error(f"Failed to initialize email sender: {e}")
- raise
- def get_email_sender() -> EmailSender:
- """Get the global email sender instance, initialize if needed"""
- global email_sender
- if email_sender is None:
- initialize_email_sender()
- if not email_sender:
- raise ValueError("Email sender is not initialized and failed to initialize.")
- return email_sender
|