email_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. """
  2. Email Service with Asynchronous Queue Processing
  3. This service provides email sending functionality with built-in queue management
  4. to prevent SMTP server saturation. Key features:
  5. 1. Asynchronous Email Queue:
  6. - Multiple calls to send_email() are queued automatically
  7. - Emails are processed with a 10-second delay between sends
  8. - Background thread processes the queue continuously
  9. 2. Two Send Methods:
  10. - send_email(): Adds email to queue (recommended for most use cases)
  11. - send_email_sync(): Sends immediately without queuing (for urgent emails)
  12. 3. Queue Management:
  13. - get_queue_size(): Check number of pending emails
  14. - is_queue_processing(): Check if queue worker is active
  15. - clear_queue(): Remove all pending emails from queue
  16. 4. Automatic Retry Logic:
  17. - Handles SMTP disconnections with automatic reconnection
  18. - Fresh connection for each email to prevent timeouts
  19. - Comprehensive logging for debugging and monitoring
  20. Usage Example:
  21. sender = get_email_sender()
  22. # Queue multiple emails (recommended)
  23. sender.send_email("Subject 1", "Body 1", ["user1@example.com"])
  24. sender.send_email("Subject 2", "Body 2", ["user2@example.com"])
  25. # These will be sent with 10-second intervals automatically
  26. # Check queue status
  27. print(f"Emails in queue: {sender.get_queue_size()}")
  28. # Send urgent email immediately
  29. sender.send_email_sync("Urgent", "Urgent message", ["admin@example.com"])
  30. """
  31. import asyncio
  32. from config.settings import MAIL, MAIL_PASSWORD
  33. import smtplib
  34. from email.message import EmailMessage
  35. from logging import getLogger
  36. from typing import Optional, Dict, Any
  37. from queue import Queue, Empty
  38. from threading import Thread, Lock, current_thread
  39. import time
  40. import random
  41. logger = getLogger(__name__)
  42. class EmailSender:
  43. def __init__(self):
  44. self.email = MAIL
  45. self.password = MAIL_PASSWORD
  46. self._smtp: Optional[smtplib.SMTP_SSL] = None
  47. self._email_queue: Queue = Queue()
  48. self._queue_worker_thread: Optional[Thread] = None
  49. self._queue_lock = Lock()
  50. self._is_processing = False
  51. self._start_queue_worker()
  52. def connect(self):
  53. """Establish SMTP connection - kept for compatibility but not used for persistent connections"""
  54. logger.info("Testing SMTP connection...")
  55. try:
  56. # Test connection
  57. test_smtp = smtplib.SMTP_SSL('smtp.gmail.com', 465)
  58. test_smtp.login(self.email, self.password)
  59. test_smtp.ehlo()
  60. test_smtp.quit()
  61. logger.info("SMTP connection test successful.")
  62. except smtplib.SMTPAuthenticationError as e:
  63. error_msg = f"SMTP authentication failed: {e}"
  64. logger.error(error_msg)
  65. raise
  66. except Exception as e:
  67. error_msg = f"Failed to establish SMTP connection: {e}"
  68. logger.error(error_msg)
  69. raise
  70. def close(self):
  71. """Close SMTP connection and stop queue worker"""
  72. logger.info("Closing email service.")
  73. # Stop the queue worker
  74. self._stop_queue_worker()
  75. # Clear any persistent connection (though we don't use them anymore)
  76. if self._smtp:
  77. try:
  78. self._smtp.quit()
  79. except:
  80. pass
  81. finally:
  82. self._smtp = None
  83. def _create_fresh_connection(self):
  84. """Create a fresh SMTP connection for each email send"""
  85. try:
  86. smtp = smtplib.SMTP_SSL('smtp.gmail.com', 465, timeout=30)
  87. smtp.login(self.email, self.password)
  88. reply = smtp.ehlo()
  89. print(f"SMTP connection established: {reply}")
  90. return smtp
  91. except Exception as e:
  92. logger.error(f"Failed to create fresh SMTP connection: {e}")
  93. raise
  94. def _start_queue_worker(self):
  95. """Start the background thread to process email queue"""
  96. with self._queue_lock:
  97. if not self._is_processing:
  98. self._is_processing = True
  99. self._queue_worker_thread = Thread(target=self._process_email_queue, daemon=True)
  100. self._queue_worker_thread.start()
  101. logger.info("Email queue worker started")
  102. def _stop_queue_worker(self):
  103. """Stop the background thread processing email queue"""
  104. with self._queue_lock:
  105. if self._is_processing:
  106. self._is_processing = False
  107. # Add a sentinel value to wake up the worker thread
  108. self._email_queue.put(None)
  109. # Only join if we're not in the same thread
  110. if (self._queue_worker_thread and
  111. self._queue_worker_thread.is_alive() and
  112. self._queue_worker_thread != current_thread()):
  113. try:
  114. self._queue_worker_thread.join(timeout=10)
  115. except RuntimeError as e:
  116. logger.warning(f"Could not join queue worker thread: {e}")
  117. logger.info("Email queue worker stopped")
  118. def _process_email_queue(self):
  119. """Background worker that processes emails from the queue with 10-second delays"""
  120. logger.info("Email queue processor started")
  121. while self._is_processing:
  122. try:
  123. # Get email from queue (blocking call with timeout)
  124. email_data = self._email_queue.get(timeout=1)
  125. # Check for sentinel value (None) to stop processing
  126. if email_data is None:
  127. break
  128. # Process the email with retry logic
  129. success = self._send_email_with_retry(
  130. email_data['subject'],
  131. email_data['body'],
  132. email_data['to'],
  133. email_data['kwargs']
  134. )
  135. if not success:
  136. logger.error(f"Failed to send email after all retries: {email_data['subject']}")
  137. # Wait 10 seconds before processing next email (unless it's the last one in queue)
  138. if not self._email_queue.empty():
  139. logger.info("Waiting 10 seconds before sending next email...")
  140. time.sleep(10)
  141. except Empty:
  142. # If queue is empty, just continue to check for new emails
  143. continue
  144. except Exception as e:
  145. if self._is_processing: # Only log if we're still supposed to be processing
  146. logger.error(f"Error processing email queue: {e}")
  147. time.sleep(2) # Brief pause before retrying
  148. logger.info("Email queue processor stopped")
  149. def _send_email_with_retry(self, subject: str, body: str, to: list[str], kwargs: Dict[str, Any], max_retries: int = 3) -> bool:
  150. """Send email with retry logic using fresh connections"""
  151. for attempt in range(max_retries):
  152. try:
  153. return self._send_email_immediately(subject, body, to, kwargs)
  154. except Exception as e:
  155. if attempt < max_retries - 1:
  156. wait_time = (2 ** attempt) + random.uniform(0, 1)
  157. logger.warning(f"Email send attempt {attempt + 1} failed: {e}. Waiting {wait_time:.2f}s before retry...")
  158. time.sleep(wait_time)
  159. else:
  160. logger.error(f"All email send attempts failed for {to}: {e}")
  161. return False
  162. return False
  163. def _send_email_immediately(self, subject: str, body: str, to: list[str], kwargs: Dict[str, Any]) -> bool:
  164. """Send email immediately using a fresh SMTP connection"""
  165. logger.debug(f"Sending queued email to: {to} with subject: '{subject}'")
  166. smtp = None
  167. try:
  168. # Create fresh connection for this email
  169. smtp = self._create_fresh_connection()
  170. msg = EmailMessage()
  171. msg['Subject'] = subject
  172. msg['From'] = self.email
  173. msg['To'] = ", ".join(to)
  174. msg.set_content('Este correo tiene contenido HTML.')
  175. msg.add_alternative(body.format(**kwargs), subtype='html')
  176. smtp.send_message(msg)
  177. logger.info(f"Queued email sent to {to} with subject '{subject}'.")
  178. return True
  179. except Exception as e:
  180. error_msg = f"Failed to send queued email to {to}: {e}"
  181. logger.error(error_msg)
  182. raise
  183. finally:
  184. # Always close the connection
  185. if smtp:
  186. try:
  187. smtp.quit()
  188. except:
  189. pass
  190. def send_email(self, subject: str, body: str, to: list[str], **kwargs):
  191. """Add email to queue for asynchronous sending with 10-second delays"""
  192. logger.debug(f"Queuing email to: {to} with subject: '{subject}'")
  193. # Add email to queue
  194. email_data = {
  195. 'subject': subject,
  196. 'body': body,
  197. 'to': to,
  198. 'kwargs': kwargs
  199. }
  200. self._email_queue.put(email_data)
  201. logger.info(f"Email queued for {to}. Queue size: {self._email_queue.qsize()}")
  202. def send_email_sync(self, subject: str, body: str, to: list[str], **kwargs):
  203. """Send email immediately (synchronous) using fresh connection"""
  204. logger.debug(f"Preparing to send email immediately to: {to} with subject: '{subject}'")
  205. smtp = None
  206. try:
  207. # Create fresh connection
  208. smtp = self._create_fresh_connection()
  209. msg = EmailMessage()
  210. msg['Subject'] = subject
  211. msg['From'] = self.email
  212. msg['To'] = ", ".join(to)
  213. msg.set_content('Este correo tiene contenido HTML.')
  214. msg.add_alternative(body.format(**kwargs), subtype='html')
  215. smtp.send_message(msg)
  216. logger.info(f"Email sent immediately to {to} with subject '{subject}'.")
  217. except Exception as e:
  218. error_msg = f"Failed to send email immediately to {to}: {e}"
  219. logger.error(error_msg)
  220. raise
  221. finally:
  222. # Always close the connection
  223. if smtp:
  224. try:
  225. smtp.quit()
  226. except:
  227. pass
  228. def get_queue_size(self) -> int:
  229. """Get the current number of emails in the queue"""
  230. return self._email_queue.qsize()
  231. def is_queue_processing(self) -> bool:
  232. """Check if the queue worker is currently processing emails"""
  233. return self._is_processing and bool(self._queue_worker_thread and self._queue_worker_thread.is_alive())
  234. def clear_queue(self):
  235. """Clear all pending emails from the queue"""
  236. with self._queue_lock:
  237. # Clear the queue
  238. while not self._email_queue.empty():
  239. try:
  240. self._email_queue.get_nowait()
  241. except:
  242. break
  243. logger.info("Email queue cleared")
  244. # Global email sender instance
  245. email_sender: Optional[EmailSender] = None
  246. def initialize_email_sender():
  247. """Initialize the global email sender instance"""
  248. global email_sender
  249. try:
  250. email_sender = EmailSender()
  251. # Test the connection during initialization
  252. email_sender.connect()
  253. logger.info("Email sender initialized successfully")
  254. except Exception as e:
  255. logger.error(f"Failed to initialize email sender: {e}")
  256. raise
  257. def get_email_sender() -> EmailSender:
  258. """Get the global email sender instance, initialize if needed"""
  259. global email_sender
  260. if email_sender is None:
  261. initialize_email_sender()
  262. if not email_sender:
  263. raise ValueError("Email sender is not initialized and failed to initialize.")
  264. return email_sender