|
|
@@ -6,7 +6,7 @@ to prevent SMTP server saturation. Key features:
|
|
|
|
|
|
1. Asynchronous Email Queue:
|
|
|
- Multiple calls to send_email() are queued automatically
|
|
|
- - Emails are processed with a 5-second delay between sends
|
|
|
+ - Emails are processed with a 10-second delay between sends
|
|
|
- Background thread processes the queue continuously
|
|
|
|
|
|
2. Two Send Methods:
|
|
|
@@ -20,6 +20,7 @@ to prevent SMTP server saturation. Key features:
|
|
|
|
|
|
4. Automatic Retry Logic:
|
|
|
- Handles SMTP disconnections with automatic reconnection
|
|
|
+ - Fresh connection for each email to prevent timeouts
|
|
|
- Comprehensive logging for debugging and monitoring
|
|
|
|
|
|
Usage Example:
|
|
|
@@ -28,7 +29,7 @@ Usage Example:
|
|
|
# Queue multiple emails (recommended)
|
|
|
sender.send_email("Subject 1", "Body 1", ["user1@example.com"])
|
|
|
sender.send_email("Subject 2", "Body 2", ["user2@example.com"])
|
|
|
- # These will be sent with 5-second intervals automatically
|
|
|
+ # These will be sent with 10-second intervals automatically
|
|
|
|
|
|
# Check queue status
|
|
|
print(f"Emails in queue: {sender.get_queue_size()}")
|
|
|
@@ -45,10 +46,12 @@ from logging import getLogger
|
|
|
from typing import Optional, Dict, Any
|
|
|
from services.logging_service import structured_logger, LogLevel
|
|
|
from queue import Queue, Empty
|
|
|
-from threading import Thread, Lock
|
|
|
+from threading import Thread, Lock, current_thread
|
|
|
import time
|
|
|
+import random
|
|
|
|
|
|
logger = getLogger(__name__)
|
|
|
+
|
|
|
class EmailSender:
|
|
|
def __init__(self):
|
|
|
self.email = MAIL
|
|
|
@@ -58,74 +61,90 @@ class EmailSender:
|
|
|
self._queue_worker_thread: Optional[Thread] = None
|
|
|
self._queue_lock = Lock()
|
|
|
self._is_processing = False
|
|
|
- self.connect()
|
|
|
self._start_queue_worker()
|
|
|
|
|
|
def connect(self):
|
|
|
- if self._smtp is None:
|
|
|
- logger.info("Establishing new SMTP connection...")
|
|
|
+ """Establish SMTP connection - kept for compatibility but not used for persistent connections"""
|
|
|
+ logger.info("Testing SMTP connection...")
|
|
|
+ structured_logger.log_email_event(
|
|
|
+ "Testing SMTP connection",
|
|
|
+ LogLevel.INFO,
|
|
|
+ {"email_server": "smtp.gmail.com", "port": 465, "sender_email": self.email}
|
|
|
+ )
|
|
|
+
|
|
|
+ try:
|
|
|
+ # Test connection
|
|
|
+ test_smtp = smtplib.SMTP_SSL('smtp.gmail.com', 465)
|
|
|
+ test_smtp.login(self.email, self.password)
|
|
|
+ test_smtp.ehlo()
|
|
|
+ test_smtp.quit()
|
|
|
+
|
|
|
+ logger.info("SMTP connection test successful.")
|
|
|
structured_logger.log_email_event(
|
|
|
- "Establishing SMTP connection",
|
|
|
+ "SMTP connection test successful",
|
|
|
LogLevel.INFO,
|
|
|
- {"email_server": "smtp.gmail.com", "port": 465, "sender_email": self.email}
|
|
|
+ {"sender_email": self.email}
|
|
|
)
|
|
|
|
|
|
- try:
|
|
|
- self._smtp = smtplib.SMTP_SSL('smtp.gmail.com', 465)
|
|
|
- self._smtp.login(self.email, self.password)
|
|
|
- self._smtp.ehlo()
|
|
|
-
|
|
|
- logger.info("SMTP connection established and authenticated.")
|
|
|
- structured_logger.log_email_event(
|
|
|
- "SMTP authentication successful",
|
|
|
- LogLevel.INFO,
|
|
|
- {"sender_email": self.email}
|
|
|
- )
|
|
|
-
|
|
|
- except smtplib.SMTPAuthenticationError as e:
|
|
|
- error_msg = f"SMTP authentication failed: {e}"
|
|
|
- logger.error(error_msg)
|
|
|
- structured_logger.log_email_event(
|
|
|
- "SMTP authentication failed",
|
|
|
- LogLevel.ERROR,
|
|
|
- {
|
|
|
- "sender_email": self.email,
|
|
|
- "error": str(e),
|
|
|
- "error_type": "SMTPAuthenticationError"
|
|
|
- }
|
|
|
- )
|
|
|
- raise
|
|
|
- except Exception as e:
|
|
|
- error_msg = f"Failed to establish SMTP connection: {e}"
|
|
|
- logger.error(error_msg)
|
|
|
- structured_logger.log_email_event(
|
|
|
- "SMTP connection failed",
|
|
|
- LogLevel.ERROR,
|
|
|
- {
|
|
|
- "sender_email": self.email,
|
|
|
- "error": str(e),
|
|
|
- "error_type": type(e).__name__
|
|
|
- }
|
|
|
- )
|
|
|
- raise
|
|
|
+ except smtplib.SMTPAuthenticationError as e:
|
|
|
+ error_msg = f"SMTP authentication failed: {e}"
|
|
|
+ logger.error(error_msg)
|
|
|
+ structured_logger.log_email_event(
|
|
|
+ "SMTP authentication failed",
|
|
|
+ LogLevel.ERROR,
|
|
|
+ {
|
|
|
+ "sender_email": self.email,
|
|
|
+ "error": str(e),
|
|
|
+ "error_type": "SMTPAuthenticationError"
|
|
|
+ }
|
|
|
+ )
|
|
|
+ raise
|
|
|
+ except Exception as e:
|
|
|
+ error_msg = f"Failed to establish SMTP connection: {e}"
|
|
|
+ logger.error(error_msg)
|
|
|
+ structured_logger.log_email_event(
|
|
|
+ "SMTP connection failed",
|
|
|
+ LogLevel.ERROR,
|
|
|
+ {
|
|
|
+ "sender_email": self.email,
|
|
|
+ "error": str(e),
|
|
|
+ "error_type": type(e).__name__
|
|
|
+ }
|
|
|
+ )
|
|
|
+ raise
|
|
|
|
|
|
def close(self):
|
|
|
+ """Close SMTP connection and stop queue worker"""
|
|
|
+ logger.info("Closing email service.")
|
|
|
+ structured_logger.log_email_event(
|
|
|
+ "Closing email service",
|
|
|
+ LogLevel.INFO,
|
|
|
+ {"sender_email": self.email}
|
|
|
+ )
|
|
|
+
|
|
|
+ # Stop the queue worker
|
|
|
+ self._stop_queue_worker()
|
|
|
+
|
|
|
+ # Clear any persistent connection (though we don't use them anymore)
|
|
|
if self._smtp:
|
|
|
- logger.info("Closing SMTP connection.")
|
|
|
- structured_logger.log_email_event(
|
|
|
- "Closing SMTP connection",
|
|
|
- LogLevel.INFO,
|
|
|
- {"sender_email": self.email}
|
|
|
- )
|
|
|
try:
|
|
|
self._smtp.quit()
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+ finally:
|
|
|
self._smtp = None
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"Error closing SMTP connection: {e}")
|
|
|
- self._smtp = None # Force reset even if quit fails
|
|
|
-
|
|
|
- # Stop the queue worker
|
|
|
- self._stop_queue_worker()
|
|
|
+
|
|
|
+ def _create_fresh_connection(self):
|
|
|
+ """Create a fresh SMTP connection for each email send"""
|
|
|
+ try:
|
|
|
+ smtp = smtplib.SMTP_SSL('smtp.gmail.com', 465, timeout=30)
|
|
|
+ smtp.login(self.email, self.password)
|
|
|
+ reply = smtp.ehlo()
|
|
|
+ print(f"SMTP connection established: {reply}")
|
|
|
+ return smtp
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Failed to create fresh SMTP connection: {e}")
|
|
|
+ raise
|
|
|
|
|
|
def _start_queue_worker(self):
|
|
|
"""Start the background thread to process email queue"""
|
|
|
@@ -143,52 +162,74 @@ class EmailSender:
|
|
|
self._is_processing = False
|
|
|
# Add a sentinel value to wake up the worker thread
|
|
|
self._email_queue.put(None)
|
|
|
- if self._queue_worker_thread and self._queue_worker_thread.is_alive():
|
|
|
- self._queue_worker_thread.join(timeout=10)
|
|
|
+
|
|
|
+ # Only join if we're not in the same thread
|
|
|
+ if (self._queue_worker_thread and
|
|
|
+ self._queue_worker_thread.is_alive() and
|
|
|
+ self._queue_worker_thread != current_thread()):
|
|
|
+ try:
|
|
|
+ self._queue_worker_thread.join(timeout=10)
|
|
|
+ except RuntimeError as e:
|
|
|
+ logger.warning(f"Could not join queue worker thread: {e}")
|
|
|
+
|
|
|
logger.info("Email queue worker stopped")
|
|
|
|
|
|
def _process_email_queue(self):
|
|
|
- """Background worker that processes emails from the queue with 5-second delays"""
|
|
|
+ """Background worker that processes emails from the queue with 10-second delays"""
|
|
|
logger.info("Email queue processor started")
|
|
|
|
|
|
while self._is_processing:
|
|
|
try:
|
|
|
- # Get email from queue (blocking call)
|
|
|
+ # Get email from queue (blocking call with timeout)
|
|
|
email_data = self._email_queue.get(timeout=1)
|
|
|
|
|
|
# Check for sentinel value (None) to stop processing
|
|
|
if email_data is None:
|
|
|
break
|
|
|
|
|
|
- # Process the email
|
|
|
- self._send_email_immediately(
|
|
|
+ # Process the email with retry logic
|
|
|
+ success = self._send_email_with_retry(
|
|
|
email_data['subject'],
|
|
|
email_data['body'],
|
|
|
email_data['to'],
|
|
|
email_data['kwargs']
|
|
|
)
|
|
|
|
|
|
- # Wait 5 seconds before processing next email (unless it's the last one in queue)
|
|
|
+ if not success:
|
|
|
+ logger.error(f"Failed to send email after all retries: {email_data['subject']}")
|
|
|
+
|
|
|
+ # Wait 10 seconds before processing next email (unless it's the last one in queue)
|
|
|
if not self._email_queue.empty():
|
|
|
- logger.info("Waiting 5 seconds before sending next email...")
|
|
|
- time.sleep(5)
|
|
|
+ logger.info("Waiting 10 seconds before sending next email...")
|
|
|
+ time.sleep(10)
|
|
|
|
|
|
except Empty:
|
|
|
# If queue is empty, just continue to check for new emails
|
|
|
continue
|
|
|
-
|
|
|
except Exception as e:
|
|
|
if self._is_processing: # Only log if we're still supposed to be processing
|
|
|
logger.error(f"Error processing email queue: {e}")
|
|
|
- time.sleep(1) # Brief pause before retrying
|
|
|
+ time.sleep(2) # Brief pause before retrying
|
|
|
|
|
|
logger.info("Email queue processor stopped")
|
|
|
|
|
|
- def _send_email_immediately(self, subject: str, body: str, to: list[str], kwargs: Dict[str, Any]):
|
|
|
- """Send email immediately without queuing"""
|
|
|
- if self._smtp is None:
|
|
|
- self.connect()
|
|
|
-
|
|
|
+ def _send_email_with_retry(self, subject: str, body: str, to: list[str], kwargs: Dict[str, Any], max_retries: int = 3) -> bool:
|
|
|
+ """Send email with retry logic using fresh connections"""
|
|
|
+ for attempt in range(max_retries):
|
|
|
+ try:
|
|
|
+ return self._send_email_immediately(subject, body, to, kwargs)
|
|
|
+ except Exception as e:
|
|
|
+ if attempt < max_retries - 1:
|
|
|
+ wait_time = (2 ** attempt) + random.uniform(0, 1)
|
|
|
+ logger.warning(f"Email send attempt {attempt + 1} failed: {e}. Waiting {wait_time:.2f}s before retry...")
|
|
|
+ time.sleep(wait_time)
|
|
|
+ else:
|
|
|
+ logger.error(f"All email send attempts failed for {to}: {e}")
|
|
|
+ return False
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _send_email_immediately(self, subject: str, body: str, to: list[str], kwargs: Dict[str, Any]) -> bool:
|
|
|
+ """Send email immediately using a fresh SMTP connection"""
|
|
|
logger.debug(f"Sending queued email to: {to} with subject: '{subject}'")
|
|
|
structured_logger.log_email_event(
|
|
|
f"Sending queued email with subject: '{subject}'",
|
|
|
@@ -203,7 +244,11 @@ class EmailSender:
|
|
|
}
|
|
|
)
|
|
|
|
|
|
+ smtp = None
|
|
|
try:
|
|
|
+ # Create fresh connection for this email
|
|
|
+ smtp = self._create_fresh_connection()
|
|
|
+
|
|
|
msg = EmailMessage()
|
|
|
msg['Subject'] = subject
|
|
|
msg['From'] = self.email
|
|
|
@@ -211,21 +256,8 @@ class EmailSender:
|
|
|
msg.set_content('Este correo tiene contenido HTML.')
|
|
|
msg.add_alternative(body.format(**kwargs), subtype='html')
|
|
|
|
|
|
- if not self._smtp:
|
|
|
- error_msg = "Cannot send email because SMTP connection is not established"
|
|
|
- logger.error(error_msg)
|
|
|
- structured_logger.log_email_event(
|
|
|
- "Queued email send failed: SMTP connection not established",
|
|
|
- LogLevel.ERROR,
|
|
|
- {
|
|
|
- "subject": subject,
|
|
|
- "recipients": to,
|
|
|
- "sender_email": self.email
|
|
|
- }
|
|
|
- )
|
|
|
- raise ConnectionError(error_msg)
|
|
|
-
|
|
|
- self._smtp.send_message(msg)
|
|
|
+ smtp.send_message(msg)
|
|
|
+
|
|
|
logger.info(f"Queued email sent to {to} with subject '{subject}'.")
|
|
|
structured_logger.log_email_event(
|
|
|
f"Queued email sent successfully",
|
|
|
@@ -238,54 +270,8 @@ class EmailSender:
|
|
|
"queued": True
|
|
|
}
|
|
|
)
|
|
|
+ return True
|
|
|
|
|
|
- except smtplib.SMTPServerDisconnected as e:
|
|
|
- logger.warning("SMTP connection disconnected during queued send, retrying...")
|
|
|
- structured_logger.log_email_event(
|
|
|
- "SMTP disconnected during queued send, attempting retry",
|
|
|
- LogLevel.WARNING,
|
|
|
- {
|
|
|
- "subject": subject,
|
|
|
- "recipients": to,
|
|
|
- "error": str(e)
|
|
|
- }
|
|
|
- )
|
|
|
-
|
|
|
- try:
|
|
|
- self.close()
|
|
|
- self.connect()
|
|
|
- if self._smtp:
|
|
|
- self._smtp.send_message(msg)
|
|
|
-
|
|
|
- logger.info(f"Queued email resent successfully to {to}.")
|
|
|
- structured_logger.log_email_event(
|
|
|
- "Queued email sent successfully after retry",
|
|
|
- LogLevel.INFO,
|
|
|
- {
|
|
|
- "subject": subject,
|
|
|
- "recipients": to,
|
|
|
- "sender_email": self.email,
|
|
|
- "retry_attempt": True,
|
|
|
- "queued": True
|
|
|
- }
|
|
|
- )
|
|
|
- except Exception as retry_error:
|
|
|
- error_msg = f"Failed to resend queued email after retry: {retry_error}"
|
|
|
- logger.error(error_msg)
|
|
|
- structured_logger.log_email_event(
|
|
|
- "Queued email retry failed",
|
|
|
- LogLevel.ERROR,
|
|
|
- {
|
|
|
- "subject": subject,
|
|
|
- "recipients": to,
|
|
|
- "original_error": str(e),
|
|
|
- "retry_error": str(retry_error),
|
|
|
- "error_type": type(retry_error).__name__,
|
|
|
- "queued": True
|
|
|
- }
|
|
|
- )
|
|
|
- raise
|
|
|
-
|
|
|
except Exception as e:
|
|
|
error_msg = f"Failed to send queued email to {to}: {e}"
|
|
|
logger.error(error_msg)
|
|
|
@@ -302,9 +288,16 @@ class EmailSender:
|
|
|
}
|
|
|
)
|
|
|
raise
|
|
|
+ finally:
|
|
|
+ # Always close the connection
|
|
|
+ if smtp:
|
|
|
+ try:
|
|
|
+ smtp.quit()
|
|
|
+ except:
|
|
|
+ pass
|
|
|
|
|
|
def send_email(self, subject: str, body: str, to: list[str], **kwargs):
|
|
|
- """Add email to queue for asynchronous sending with 5-second delays"""
|
|
|
+ """Add email to queue for asynchronous sending with 10-second delays"""
|
|
|
logger.debug(f"Queuing email to: {to} with subject: '{subject}'")
|
|
|
structured_logger.log_email_event(
|
|
|
f"Queuing email with subject: '{subject}'",
|
|
|
@@ -331,24 +324,26 @@ class EmailSender:
|
|
|
logger.info(f"Email queued for {to}. Queue size: {self._email_queue.qsize()}")
|
|
|
|
|
|
def send_email_sync(self, subject: str, body: str, to: list[str], **kwargs):
|
|
|
- """Send email immediately (synchronous) - for backwards compatibility"""
|
|
|
- if self._smtp is None:
|
|
|
- self.connect()
|
|
|
-
|
|
|
- logger.debug(f"Preparing to send email to: {to} with subject: '{subject}'")
|
|
|
+ """Send email immediately (synchronous) using fresh connection"""
|
|
|
+ logger.debug(f"Preparing to send email immediately to: {to} with subject: '{subject}'")
|
|
|
structured_logger.log_email_event(
|
|
|
- f"Preparing to send email with subject: '{subject}'",
|
|
|
+ f"Preparing to send email immediately with subject: '{subject}'",
|
|
|
LogLevel.INFO,
|
|
|
{
|
|
|
"subject": subject,
|
|
|
"recipients": to,
|
|
|
"sender_email": self.email,
|
|
|
"body_length": len(body),
|
|
|
- "kwargs_count": len(kwargs)
|
|
|
+ "kwargs_count": len(kwargs),
|
|
|
+ "sync": True
|
|
|
}
|
|
|
)
|
|
|
|
|
|
+ smtp = None
|
|
|
try:
|
|
|
+ # Create fresh connection
|
|
|
+ smtp = self._create_fresh_connection()
|
|
|
+
|
|
|
msg = EmailMessage()
|
|
|
msg['Subject'] = subject
|
|
|
msg['From'] = self.email
|
|
|
@@ -356,95 +351,44 @@ class EmailSender:
|
|
|
msg.set_content('Este correo tiene contenido HTML.')
|
|
|
msg.add_alternative(body.format(**kwargs), subtype='html')
|
|
|
|
|
|
- if not self._smtp:
|
|
|
- error_msg = "Cannot send email because SMTP connection is not established"
|
|
|
- logger.error(error_msg)
|
|
|
- structured_logger.log_email_event(
|
|
|
- "Email send failed: SMTP connection not established",
|
|
|
- LogLevel.ERROR,
|
|
|
- {
|
|
|
- "subject": subject,
|
|
|
- "recipients": to,
|
|
|
- "sender_email": self.email
|
|
|
- }
|
|
|
- )
|
|
|
- raise ConnectionError(error_msg)
|
|
|
-
|
|
|
- self._smtp.send_message(msg)
|
|
|
- logger.info(f"Email sent to {to} with subject '{subject}'.")
|
|
|
+ smtp.send_message(msg)
|
|
|
+
|
|
|
+ logger.info(f"Email sent immediately to {to} with subject '{subject}'.")
|
|
|
structured_logger.log_email_event(
|
|
|
- f"Email sent successfully",
|
|
|
+ f"Email sent immediately and successfully",
|
|
|
LogLevel.INFO,
|
|
|
{
|
|
|
"subject": subject,
|
|
|
"recipients": to,
|
|
|
"sender_email": self.email,
|
|
|
- "recipients_count": len(to)
|
|
|
- }
|
|
|
- )
|
|
|
-
|
|
|
- except smtplib.SMTPServerDisconnected as e:
|
|
|
- logger.warning("SMTP connection disconnected, retrying...")
|
|
|
- structured_logger.log_email_event(
|
|
|
- "SMTP disconnected during send, attempting retry",
|
|
|
- LogLevel.WARNING,
|
|
|
- {
|
|
|
- "subject": subject,
|
|
|
- "recipients": to,
|
|
|
- "error": str(e)
|
|
|
+ "recipients_count": len(to),
|
|
|
+ "sync": True
|
|
|
}
|
|
|
)
|
|
|
|
|
|
- try:
|
|
|
- self.close()
|
|
|
- self.connect()
|
|
|
- if self._smtp:
|
|
|
- self._smtp.send_message(msg)
|
|
|
-
|
|
|
- logger.info(f"Email resent successfully to {to}.")
|
|
|
- structured_logger.log_email_event(
|
|
|
- "Email sent successfully after retry",
|
|
|
- LogLevel.INFO,
|
|
|
- {
|
|
|
- "subject": subject,
|
|
|
- "recipients": to,
|
|
|
- "sender_email": self.email,
|
|
|
- "retry_attempt": True
|
|
|
- }
|
|
|
- )
|
|
|
- except Exception as retry_error:
|
|
|
- error_msg = f"Failed to resend email after retry: {retry_error}"
|
|
|
- logger.error(error_msg)
|
|
|
- structured_logger.log_email_event(
|
|
|
- "Email retry failed",
|
|
|
- LogLevel.ERROR,
|
|
|
- {
|
|
|
- "subject": subject,
|
|
|
- "recipients": to,
|
|
|
- "original_error": str(e),
|
|
|
- "retry_error": str(retry_error),
|
|
|
- "error_type": type(retry_error).__name__
|
|
|
- }
|
|
|
- )
|
|
|
- self.close()
|
|
|
- raise
|
|
|
-
|
|
|
except Exception as e:
|
|
|
- error_msg = f"Failed to send email to {to}: {e}"
|
|
|
+ error_msg = f"Failed to send email immediately to {to}: {e}"
|
|
|
logger.error(error_msg)
|
|
|
structured_logger.log_email_event(
|
|
|
- "Email send failed",
|
|
|
+ "Immediate email send failed",
|
|
|
LogLevel.ERROR,
|
|
|
{
|
|
|
"subject": subject,
|
|
|
"recipients": to,
|
|
|
"sender_email": self.email,
|
|
|
"error": str(e),
|
|
|
- "error_type": type(e).__name__
|
|
|
+ "error_type": type(e).__name__,
|
|
|
+ "sync": True
|
|
|
}
|
|
|
)
|
|
|
- self.close()
|
|
|
raise
|
|
|
+ finally:
|
|
|
+ # Always close the connection
|
|
|
+ if smtp:
|
|
|
+ try:
|
|
|
+ smtp.quit()
|
|
|
+ except:
|
|
|
+ pass
|
|
|
|
|
|
def get_queue_size(self) -> int:
|
|
|
"""Get the current number of emails in the queue"""
|
|
|
@@ -470,13 +414,25 @@ class EmailSender:
|
|
|
{"sender_email": self.email}
|
|
|
)
|
|
|
|
|
|
+
|
|
|
+# Global email sender instance
|
|
|
email_sender: Optional[EmailSender] = None
|
|
|
|
|
|
def initialize_email_sender():
|
|
|
+ """Initialize the global email sender instance"""
|
|
|
global email_sender
|
|
|
- email_sender = EmailSender()
|
|
|
+ try:
|
|
|
+ email_sender = EmailSender()
|
|
|
+ # Test the connection during initialization
|
|
|
+ email_sender.connect()
|
|
|
+ logger.info("Email sender initialized successfully")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Failed to initialize email sender: {e}")
|
|
|
+ raise
|
|
|
|
|
|
def get_email_sender() -> EmailSender:
|
|
|
+ """Get the global email sender instance, initialize if needed"""
|
|
|
+ global email_sender
|
|
|
if email_sender is None:
|
|
|
initialize_email_sender()
|
|
|
if not email_sender:
|