Przeglądaj źródła

fix: mail concurrence

latapp 9 miesięcy temu
rodzic
commit
a4d3338679
1 zmienionych plików z 282 dodań i 3 usunięć
  1. 282 3
      services/email_service.py

+ 282 - 3
services/email_service.py

@@ -1,10 +1,52 @@
-import email
+"""
+Email Service with Asynchronous Queue Processing
+
+This service provides email sending functionality with built-in queue management
+to prevent SMTP server saturation. Key features:
+
+1. Asynchronous Email Queue:
+   - Multiple calls to send_email() are queued automatically
+   - Emails are processed with a 5-second delay between sends
+   - Background thread processes the queue continuously
+
+2. Two Send Methods:
+   - send_email(): Adds email to queue (recommended for most use cases)
+   - send_email_sync(): Sends immediately without queuing (for urgent emails)
+
+3. Queue Management:
+   - get_queue_size(): Check number of pending emails
+   - is_queue_processing(): Check if queue worker is active
+   - clear_queue(): Remove all pending emails from queue
+
+4. Automatic Retry Logic:
+   - Handles SMTP disconnections with automatic reconnection
+   - Comprehensive logging for debugging and monitoring
+
+Usage Example:
+    sender = get_email_sender()
+    
+    # Queue multiple emails (recommended)
+    sender.send_email("Subject 1", "Body 1", ["user1@example.com"])
+    sender.send_email("Subject 2", "Body 2", ["user2@example.com"])
+    # These will be sent with 5-second intervals automatically
+    
+    # Check queue status
+    print(f"Emails in queue: {sender.get_queue_size()}")
+    
+    # Send urgent email immediately
+    sender.send_email_sync("Urgent", "Urgent message", ["admin@example.com"])
+"""
+
+import asyncio
 from config.settings import MAIL, MAIL_PASSWORD
 import smtplib
 from email.message import EmailMessage
 from logging import getLogger
-from typing import Optional
+from typing import Optional, Dict, Any
 from services.logging_service import structured_logger, LogLevel
+from queue import Queue, Empty
+from threading import Thread, Lock
+import time
 
 logger = getLogger(__name__)
 class EmailSender:
@@ -12,7 +54,12 @@ class EmailSender:
         self.email = MAIL
         self.password = MAIL_PASSWORD
         self._smtp: Optional[smtplib.SMTP_SSL] = None
+        self._email_queue: Queue = Queue()
+        self._queue_worker_thread: Optional[Thread] = None
+        self._queue_lock = Lock()
+        self._is_processing = False
         self.connect()
+        self._start_queue_worker()
 
     def connect(self):
         if self._smtp is None:
@@ -76,8 +123,215 @@ class EmailSender:
             except Exception as e:
                 logger.warning(f"Error closing SMTP connection: {e}")
                 self._smtp = None  # Force reset even if quit fails
+        
+        # Stop the queue worker
+        self._stop_queue_worker()
+
+    def _start_queue_worker(self):
+        """Start the background thread to process email queue"""
+        with self._queue_lock:
+            if not self._is_processing:
+                self._is_processing = True
+                self._queue_worker_thread = Thread(target=self._process_email_queue, daemon=True)
+                self._queue_worker_thread.start()
+                logger.info("Email queue worker started")
+
+    def _stop_queue_worker(self):
+        """Stop the background thread processing email queue"""
+        with self._queue_lock:
+            if self._is_processing:
+                self._is_processing = False
+                # Add a sentinel value to wake up the worker thread
+                self._email_queue.put(None)
+                if self._queue_worker_thread and self._queue_worker_thread.is_alive():
+                    self._queue_worker_thread.join(timeout=10)
+                logger.info("Email queue worker stopped")
+
+    def _process_email_queue(self):
+        """Background worker that processes emails from the queue with 5-second delays"""
+        logger.info("Email queue processor started")
+        
+        while self._is_processing:
+            try:
+                # Get email from queue (blocking call)
+                email_data = self._email_queue.get(timeout=1)
+                
+                # Check for sentinel value (None) to stop processing
+                if email_data is None:
+                    break
+                
+                # Process the email
+                self._send_email_immediately(
+                    email_data['subject'],
+                    email_data['body'],
+                    email_data['to'],
+                    email_data['kwargs']
+                )
+                
+                # Wait 5 seconds before processing next email (unless it's the last one in queue)
+                if not self._email_queue.empty():
+                    logger.info("Waiting 5 seconds before sending next email...")
+                    time.sleep(5)
+            
+            except Empty:
+                # If queue is empty, just continue to check for new emails
+                continue
+
+            except Exception as e:
+                if self._is_processing:  # Only log if we're still supposed to be processing
+                    logger.error(f"Error processing email queue: {e}")
+                    time.sleep(1)  # Brief pause before retrying
+        
+        logger.info("Email queue processor stopped")
+
+    def _send_email_immediately(self, subject: str, body: str, to: list[str], kwargs: Dict[str, Any]):
+        """Send email immediately without queuing"""
+        if self._smtp is None:
+            self.connect()
+            
+        logger.debug(f"Sending queued email to: {to} with subject: '{subject}'")
+        structured_logger.log_email_event(
+            f"Sending queued email with subject: '{subject}'",
+            LogLevel.INFO,
+            {
+                "subject": subject,
+                "recipients": to,
+                "sender_email": self.email,
+                "body_length": len(body),
+                "kwargs_count": len(kwargs),
+                "queued": True
+            }
+        )
+        
+        try:
+            msg = EmailMessage()
+            msg['Subject'] = subject
+            msg['From'] = self.email
+            msg['To'] = ", ".join(to)
+            msg.set_content('Este correo tiene contenido HTML.')
+            msg.add_alternative(body.format(**kwargs), subtype='html')
+            
+            if not self._smtp:
+                error_msg = "Cannot send email because SMTP connection is not established"
+                logger.error(error_msg)
+                structured_logger.log_email_event(
+                    "Queued email send failed: SMTP connection not established",
+                    LogLevel.ERROR,
+                    {
+                        "subject": subject,
+                        "recipients": to,
+                        "sender_email": self.email
+                    }
+                )
+                raise ConnectionError(error_msg)
+
+            self._smtp.send_message(msg)
+            logger.info(f"Queued email sent to {to} with subject '{subject}'.")
+            structured_logger.log_email_event(
+                f"Queued email sent successfully",
+                LogLevel.INFO,
+                {
+                    "subject": subject,
+                    "recipients": to,
+                    "sender_email": self.email,
+                    "recipients_count": len(to),
+                    "queued": True
+                }
+            )
+            
+        except smtplib.SMTPServerDisconnected as e:
+            logger.warning("SMTP connection disconnected during queued send, retrying...")
+            structured_logger.log_email_event(
+                "SMTP disconnected during queued send, attempting retry",
+                LogLevel.WARNING,
+                {
+                    "subject": subject,
+                    "recipients": to,
+                    "error": str(e)
+                }
+            )
+            
+            try:
+                self.close()
+                self.connect()
+                if self._smtp:
+                    self._smtp.send_message(msg)
+                
+                logger.info(f"Queued email resent successfully to {to}.")
+                structured_logger.log_email_event(
+                    "Queued email sent successfully after retry",
+                    LogLevel.INFO,
+                    {
+                        "subject": subject,
+                        "recipients": to,
+                        "sender_email": self.email,
+                        "retry_attempt": True,
+                        "queued": True
+                    }
+                )
+            except Exception as retry_error:
+                error_msg = f"Failed to resend queued email after retry: {retry_error}"
+                logger.error(error_msg)
+                structured_logger.log_email_event(
+                    "Queued email retry failed",
+                    LogLevel.ERROR,
+                    {
+                        "subject": subject,
+                        "recipients": to,
+                        "original_error": str(e),
+                        "retry_error": str(retry_error),
+                        "error_type": type(retry_error).__name__,
+                        "queued": True
+                    }
+                )
+                raise
+                
+        except Exception as e:
+            error_msg = f"Failed to send queued email to {to}: {e}"
+            logger.error(error_msg)
+            structured_logger.log_email_event(
+                "Queued email send failed",
+                LogLevel.ERROR,
+                {
+                    "subject": subject,
+                    "recipients": to,
+                    "sender_email": self.email,
+                    "error": str(e),
+                    "error_type": type(e).__name__,
+                    "queued": True
+                }
+            )
+            raise
 
     def send_email(self, subject: str, body: str, to: list[str], **kwargs):
+        """Add email to queue for asynchronous sending with 5-second delays"""
+        logger.debug(f"Queuing email to: {to} with subject: '{subject}'")
+        structured_logger.log_email_event(
+            f"Queuing email with subject: '{subject}'",
+            LogLevel.INFO,
+            {
+                "subject": subject,
+                "recipients": to,
+                "sender_email": self.email,
+                "body_length": len(body),
+                "kwargs_count": len(kwargs),
+                "queue_size": self._email_queue.qsize() + 1
+            }
+        )
+        
+        # Add email to queue
+        email_data = {
+            'subject': subject,
+            'body': body,
+            'to': to,
+            'kwargs': kwargs
+        }
+        
+        self._email_queue.put(email_data)
+        logger.info(f"Email queued for {to}. Queue size: {self._email_queue.qsize()}")
+
+    def send_email_sync(self, subject: str, body: str, to: list[str], **kwargs):
+        """Send email immediately (synchronous) - for backwards compatibility"""
         if self._smtp is None:
             self.connect()
             
@@ -144,7 +398,8 @@ class EmailSender:
             try:
                 self.close()
                 self.connect()
-                self._smtp.send_message(msg)
+                if self._smtp:
+                    self._smtp.send_message(msg)
                 
                 logger.info(f"Email resent successfully to {to}.")
                 structured_logger.log_email_event(
@@ -191,6 +446,30 @@ class EmailSender:
             self.close()
             raise
 
+    def get_queue_size(self) -> int:
+        """Get the current number of emails in the queue"""
+        return self._email_queue.qsize()
+
+    def is_queue_processing(self) -> bool:
+        """Check if the queue worker is currently processing emails"""
+        return self._is_processing and bool(self._queue_worker_thread and self._queue_worker_thread.is_alive())
+
+    def clear_queue(self):
+        """Clear all pending emails from the queue"""
+        with self._queue_lock:
+            # Clear the queue
+            while not self._email_queue.empty():
+                try:
+                    self._email_queue.get_nowait()
+                except:
+                    break
+            logger.info("Email queue cleared")
+            structured_logger.log_email_event(
+                "Email queue cleared",
+                LogLevel.INFO,
+                {"sender_email": self.email}
+            )
+
 email_sender: Optional[EmailSender] = None
 
 def initialize_email_sender():