email_service.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. """
  2. Email Service with Asynchronous Queue Processing
  3. This service provides email sending functionality with built-in queue management
  4. to prevent SMTP server saturation. Key features:
  5. 1. Asynchronous Email Queue:
  6. - Multiple calls to send_email() are queued automatically
  7. - Emails are processed with a 10-second delay between sends
  8. - Background thread processes the queue continuously
  9. 2. Two Send Methods:
  10. - send_email(): Adds email to queue (recommended for most use cases)
  11. - send_email_sync(): Sends immediately without queuing (for urgent emails)
  12. 3. Queue Management:
  13. - get_queue_size(): Check number of pending emails
  14. - is_queue_processing(): Check if queue worker is active
  15. - clear_queue(): Remove all pending emails from queue
  16. 4. Automatic Retry Logic:
  17. - Handles SMTP disconnections with automatic reconnection
  18. - Fresh connection for each email to prevent timeouts
  19. - Comprehensive logging for debugging and monitoring
  20. Usage Example:
  21. sender = get_email_sender()
  22. # Queue multiple emails (recommended)
  23. sender.send_email("Subject 1", "Body 1", ["user1@example.com"])
  24. sender.send_email("Subject 2", "Body 2", ["user2@example.com"])
  25. # These will be sent with 10-second intervals automatically
  26. # Check queue status
  27. print(f"Emails in queue: {sender.get_queue_size()}")
  28. # Send urgent email immediately
  29. sender.send_email_sync("Urgent", "Urgent message", ["admin@example.com"])
  30. """
  31. import asyncio
  32. from config.settings import MAIL, MAIL_PASSWORD
  33. import smtplib
  34. from email.message import EmailMessage
  35. from logging import getLogger
  36. from typing import Optional, Dict, Any
  37. from services.logging_service import structured_logger, LogLevel
  38. from queue import Queue, Empty
  39. from threading import Thread, Lock, current_thread
  40. import time
  41. import random
  42. logger = getLogger(__name__)
  43. class EmailSender:
  44. def __init__(self):
  45. self.email = MAIL
  46. self.password = MAIL_PASSWORD
  47. self._smtp: Optional[smtplib.SMTP_SSL] = None
  48. self._email_queue: Queue = Queue()
  49. self._queue_worker_thread: Optional[Thread] = None
  50. self._queue_lock = Lock()
  51. self._is_processing = False
  52. self._start_queue_worker()
  53. def connect(self):
  54. """Establish SMTP connection - kept for compatibility but not used for persistent connections"""
  55. logger.info("Testing SMTP connection...")
  56. structured_logger.log_email_event(
  57. "Testing SMTP connection",
  58. LogLevel.INFO,
  59. {"email_server": "smtp.gmail.com", "port": 465, "sender_email": self.email}
  60. )
  61. try:
  62. # Test connection
  63. test_smtp = smtplib.SMTP_SSL('smtp.gmail.com', 465)
  64. test_smtp.login(self.email, self.password)
  65. test_smtp.ehlo()
  66. test_smtp.quit()
  67. logger.info("SMTP connection test successful.")
  68. structured_logger.log_email_event(
  69. "SMTP connection test successful",
  70. LogLevel.INFO,
  71. {"sender_email": self.email}
  72. )
  73. except smtplib.SMTPAuthenticationError as e:
  74. error_msg = f"SMTP authentication failed: {e}"
  75. logger.error(error_msg)
  76. structured_logger.log_email_event(
  77. "SMTP authentication failed",
  78. LogLevel.ERROR,
  79. {
  80. "sender_email": self.email,
  81. "error": str(e),
  82. "error_type": "SMTPAuthenticationError"
  83. }
  84. )
  85. raise
  86. except Exception as e:
  87. error_msg = f"Failed to establish SMTP connection: {e}"
  88. logger.error(error_msg)
  89. structured_logger.log_email_event(
  90. "SMTP connection failed",
  91. LogLevel.ERROR,
  92. {
  93. "sender_email": self.email,
  94. "error": str(e),
  95. "error_type": type(e).__name__
  96. }
  97. )
  98. raise
  99. def close(self):
  100. """Close SMTP connection and stop queue worker"""
  101. logger.info("Closing email service.")
  102. structured_logger.log_email_event(
  103. "Closing email service",
  104. LogLevel.INFO,
  105. {"sender_email": self.email}
  106. )
  107. # Stop the queue worker
  108. self._stop_queue_worker()
  109. # Clear any persistent connection (though we don't use them anymore)
  110. if self._smtp:
  111. try:
  112. self._smtp.quit()
  113. except:
  114. pass
  115. finally:
  116. self._smtp = None
  117. def _create_fresh_connection(self):
  118. """Create a fresh SMTP connection for each email send"""
  119. try:
  120. smtp = smtplib.SMTP_SSL('smtp.gmail.com', 465, timeout=30)
  121. smtp.login(self.email, self.password)
  122. reply = smtp.ehlo()
  123. print(f"SMTP connection established: {reply}")
  124. return smtp
  125. except Exception as e:
  126. logger.error(f"Failed to create fresh SMTP connection: {e}")
  127. raise
  128. def _start_queue_worker(self):
  129. """Start the background thread to process email queue"""
  130. with self._queue_lock:
  131. if not self._is_processing:
  132. self._is_processing = True
  133. self._queue_worker_thread = Thread(target=self._process_email_queue, daemon=True)
  134. self._queue_worker_thread.start()
  135. logger.info("Email queue worker started")
  136. def _stop_queue_worker(self):
  137. """Stop the background thread processing email queue"""
  138. with self._queue_lock:
  139. if self._is_processing:
  140. self._is_processing = False
  141. # Add a sentinel value to wake up the worker thread
  142. self._email_queue.put(None)
  143. # Only join if we're not in the same thread
  144. if (self._queue_worker_thread and
  145. self._queue_worker_thread.is_alive() and
  146. self._queue_worker_thread != current_thread()):
  147. try:
  148. self._queue_worker_thread.join(timeout=10)
  149. except RuntimeError as e:
  150. logger.warning(f"Could not join queue worker thread: {e}")
  151. logger.info("Email queue worker stopped")
  152. def _process_email_queue(self):
  153. """Background worker that processes emails from the queue with 10-second delays"""
  154. logger.info("Email queue processor started")
  155. while self._is_processing:
  156. try:
  157. # Get email from queue (blocking call with timeout)
  158. email_data = self._email_queue.get(timeout=1)
  159. # Check for sentinel value (None) to stop processing
  160. if email_data is None:
  161. break
  162. # Process the email with retry logic
  163. success = self._send_email_with_retry(
  164. email_data['subject'],
  165. email_data['body'],
  166. email_data['to'],
  167. email_data['kwargs']
  168. )
  169. if not success:
  170. logger.error(f"Failed to send email after all retries: {email_data['subject']}")
  171. # Wait 10 seconds before processing next email (unless it's the last one in queue)
  172. if not self._email_queue.empty():
  173. logger.info("Waiting 10 seconds before sending next email...")
  174. time.sleep(10)
  175. except Empty:
  176. # If queue is empty, just continue to check for new emails
  177. continue
  178. except Exception as e:
  179. if self._is_processing: # Only log if we're still supposed to be processing
  180. logger.error(f"Error processing email queue: {e}")
  181. time.sleep(2) # Brief pause before retrying
  182. logger.info("Email queue processor stopped")
  183. def _send_email_with_retry(self, subject: str, body: str, to: list[str], kwargs: Dict[str, Any], max_retries: int = 3) -> bool:
  184. """Send email with retry logic using fresh connections"""
  185. for attempt in range(max_retries):
  186. try:
  187. return self._send_email_immediately(subject, body, to, kwargs)
  188. except Exception as e:
  189. if attempt < max_retries - 1:
  190. wait_time = (2 ** attempt) + random.uniform(0, 1)
  191. logger.warning(f"Email send attempt {attempt + 1} failed: {e}. Waiting {wait_time:.2f}s before retry...")
  192. time.sleep(wait_time)
  193. else:
  194. logger.error(f"All email send attempts failed for {to}: {e}")
  195. return False
  196. return False
  197. def _send_email_immediately(self, subject: str, body: str, to: list[str], kwargs: Dict[str, Any]) -> bool:
  198. """Send email immediately using a fresh SMTP connection"""
  199. logger.debug(f"Sending queued email to: {to} with subject: '{subject}'")
  200. structured_logger.log_email_event(
  201. f"Sending queued email with subject: '{subject}'",
  202. LogLevel.INFO,
  203. {
  204. "subject": subject,
  205. "recipients": to,
  206. "sender_email": self.email,
  207. "body_length": len(body),
  208. "kwargs_count": len(kwargs),
  209. "queued": True
  210. }
  211. )
  212. smtp = None
  213. try:
  214. # Create fresh connection for this email
  215. smtp = self._create_fresh_connection()
  216. msg = EmailMessage()
  217. msg['Subject'] = subject
  218. msg['From'] = self.email
  219. msg['To'] = ", ".join(to)
  220. msg.set_content('Este correo tiene contenido HTML.')
  221. msg.add_alternative(body.format(**kwargs), subtype='html')
  222. smtp.send_message(msg)
  223. logger.info(f"Queued email sent to {to} with subject '{subject}'.")
  224. structured_logger.log_email_event(
  225. f"Queued email sent successfully",
  226. LogLevel.INFO,
  227. {
  228. "subject": subject,
  229. "recipients": to,
  230. "sender_email": self.email,
  231. "recipients_count": len(to),
  232. "queued": True
  233. }
  234. )
  235. return True
  236. except Exception as e:
  237. error_msg = f"Failed to send queued email to {to}: {e}"
  238. logger.error(error_msg)
  239. structured_logger.log_email_event(
  240. "Queued email send failed",
  241. LogLevel.ERROR,
  242. {
  243. "subject": subject,
  244. "recipients": to,
  245. "sender_email": self.email,
  246. "error": str(e),
  247. "error_type": type(e).__name__,
  248. "queued": True
  249. }
  250. )
  251. raise
  252. finally:
  253. # Always close the connection
  254. if smtp:
  255. try:
  256. smtp.quit()
  257. except:
  258. pass
  259. def send_email(self, subject: str, body: str, to: list[str], **kwargs):
  260. """Add email to queue for asynchronous sending with 10-second delays"""
  261. logger.debug(f"Queuing email to: {to} with subject: '{subject}'")
  262. structured_logger.log_email_event(
  263. f"Queuing email with subject: '{subject}'",
  264. LogLevel.INFO,
  265. {
  266. "subject": subject,
  267. "recipients": to,
  268. "sender_email": self.email,
  269. "body_length": len(body),
  270. "kwargs_count": len(kwargs),
  271. "queue_size": self._email_queue.qsize() + 1
  272. }
  273. )
  274. # Add email to queue
  275. email_data = {
  276. 'subject': subject,
  277. 'body': body,
  278. 'to': to,
  279. 'kwargs': kwargs
  280. }
  281. self._email_queue.put(email_data)
  282. logger.info(f"Email queued for {to}. Queue size: {self._email_queue.qsize()}")
  283. def send_email_sync(self, subject: str, body: str, to: list[str], **kwargs):
  284. """Send email immediately (synchronous) using fresh connection"""
  285. logger.debug(f"Preparing to send email immediately to: {to} with subject: '{subject}'")
  286. structured_logger.log_email_event(
  287. f"Preparing to send email immediately with subject: '{subject}'",
  288. LogLevel.INFO,
  289. {
  290. "subject": subject,
  291. "recipients": to,
  292. "sender_email": self.email,
  293. "body_length": len(body),
  294. "kwargs_count": len(kwargs),
  295. "sync": True
  296. }
  297. )
  298. smtp = None
  299. try:
  300. # Create fresh connection
  301. smtp = self._create_fresh_connection()
  302. msg = EmailMessage()
  303. msg['Subject'] = subject
  304. msg['From'] = self.email
  305. msg['To'] = ", ".join(to)
  306. msg.set_content('Este correo tiene contenido HTML.')
  307. msg.add_alternative(body.format(**kwargs), subtype='html')
  308. smtp.send_message(msg)
  309. logger.info(f"Email sent immediately to {to} with subject '{subject}'.")
  310. structured_logger.log_email_event(
  311. f"Email sent immediately and successfully",
  312. LogLevel.INFO,
  313. {
  314. "subject": subject,
  315. "recipients": to,
  316. "sender_email": self.email,
  317. "recipients_count": len(to),
  318. "sync": True
  319. }
  320. )
  321. except Exception as e:
  322. error_msg = f"Failed to send email immediately to {to}: {e}"
  323. logger.error(error_msg)
  324. structured_logger.log_email_event(
  325. "Immediate email send failed",
  326. LogLevel.ERROR,
  327. {
  328. "subject": subject,
  329. "recipients": to,
  330. "sender_email": self.email,
  331. "error": str(e),
  332. "error_type": type(e).__name__,
  333. "sync": True
  334. }
  335. )
  336. raise
  337. finally:
  338. # Always close the connection
  339. if smtp:
  340. try:
  341. smtp.quit()
  342. except:
  343. pass
  344. def get_queue_size(self) -> int:
  345. """Get the current number of emails in the queue"""
  346. return self._email_queue.qsize()
  347. def is_queue_processing(self) -> bool:
  348. """Check if the queue worker is currently processing emails"""
  349. return self._is_processing and bool(self._queue_worker_thread and self._queue_worker_thread.is_alive())
  350. def clear_queue(self):
  351. """Clear all pending emails from the queue"""
  352. with self._queue_lock:
  353. # Clear the queue
  354. while not self._email_queue.empty():
  355. try:
  356. self._email_queue.get_nowait()
  357. except:
  358. break
  359. logger.info("Email queue cleared")
  360. structured_logger.log_email_event(
  361. "Email queue cleared",
  362. LogLevel.INFO,
  363. {"sender_email": self.email}
  364. )
  365. # Global email sender instance
  366. email_sender: Optional[EmailSender] = None
  367. def initialize_email_sender():
  368. """Initialize the global email sender instance"""
  369. global email_sender
  370. try:
  371. email_sender = EmailSender()
  372. # Test the connection during initialization
  373. email_sender.connect()
  374. logger.info("Email sender initialized successfully")
  375. except Exception as e:
  376. logger.error(f"Failed to initialize email sender: {e}")
  377. raise
  378. def get_email_sender() -> EmailSender:
  379. """Get the global email sender instance, initialize if needed"""
  380. global email_sender
  381. if email_sender is None:
  382. initialize_email_sender()
  383. if not email_sender:
  384. raise ValueError("Email sender is not initialized and failed to initialize.")
  385. return email_sender