email_service.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. """
  2. Email Service with Asynchronous Queue Processing
  3. This service provides email sending functionality with built-in queue management
  4. to prevent SMTP server saturation. Key features:
  5. 1. Asynchronous Email Queue:
  6. - Multiple calls to send_email() are queued automatically
  7. - Emails are processed with a 5-second delay between sends
  8. - Background thread processes the queue continuously
  9. 2. Two Send Methods:
  10. - send_email(): Adds email to queue (recommended for most use cases)
  11. - send_email_sync(): Sends immediately without queuing (for urgent emails)
  12. 3. Queue Management:
  13. - get_queue_size(): Check number of pending emails
  14. - is_queue_processing(): Check if queue worker is active
  15. - clear_queue(): Remove all pending emails from queue
  16. 4. Automatic Retry Logic:
  17. - Handles SMTP disconnections with automatic reconnection
  18. - Comprehensive logging for debugging and monitoring
  19. Usage Example:
  20. sender = get_email_sender()
  21. # Queue multiple emails (recommended)
  22. sender.send_email("Subject 1", "Body 1", ["user1@example.com"])
  23. sender.send_email("Subject 2", "Body 2", ["user2@example.com"])
  24. # These will be sent with 5-second intervals automatically
  25. # Check queue status
  26. print(f"Emails in queue: {sender.get_queue_size()}")
  27. # Send urgent email immediately
  28. sender.send_email_sync("Urgent", "Urgent message", ["admin@example.com"])
  29. """
  30. import asyncio
  31. from config.settings import MAIL, MAIL_PASSWORD
  32. import smtplib
  33. from email.message import EmailMessage
  34. from logging import getLogger
  35. from typing import Optional, Dict, Any
  36. from services.logging_service import structured_logger, LogLevel
  37. from queue import Queue, Empty
  38. from threading import Thread, Lock
  39. import time
  40. logger = getLogger(__name__)
  41. class EmailSender:
  42. def __init__(self):
  43. self.email = MAIL
  44. self.password = MAIL_PASSWORD
  45. self._smtp: Optional[smtplib.SMTP_SSL] = None
  46. self._email_queue: Queue = Queue()
  47. self._queue_worker_thread: Optional[Thread] = None
  48. self._queue_lock = Lock()
  49. self._is_processing = False
  50. self.connect()
  51. self._start_queue_worker()
  52. def connect(self):
  53. if self._smtp is None:
  54. logger.info("Establishing new SMTP connection...")
  55. structured_logger.log_email_event(
  56. "Establishing SMTP connection",
  57. LogLevel.INFO,
  58. {"email_server": "smtp.gmail.com", "port": 465, "sender_email": self.email}
  59. )
  60. try:
  61. self._smtp = smtplib.SMTP_SSL('smtp.gmail.com', 465)
  62. self._smtp.login(self.email, self.password)
  63. self._smtp.ehlo()
  64. logger.info("SMTP connection established and authenticated.")
  65. structured_logger.log_email_event(
  66. "SMTP authentication successful",
  67. LogLevel.INFO,
  68. {"sender_email": self.email}
  69. )
  70. except smtplib.SMTPAuthenticationError as e:
  71. error_msg = f"SMTP authentication failed: {e}"
  72. logger.error(error_msg)
  73. structured_logger.log_email_event(
  74. "SMTP authentication failed",
  75. LogLevel.ERROR,
  76. {
  77. "sender_email": self.email,
  78. "error": str(e),
  79. "error_type": "SMTPAuthenticationError"
  80. }
  81. )
  82. raise
  83. except Exception as e:
  84. error_msg = f"Failed to establish SMTP connection: {e}"
  85. logger.error(error_msg)
  86. structured_logger.log_email_event(
  87. "SMTP connection failed",
  88. LogLevel.ERROR,
  89. {
  90. "sender_email": self.email,
  91. "error": str(e),
  92. "error_type": type(e).__name__
  93. }
  94. )
  95. raise
  96. def close(self):
  97. if self._smtp:
  98. logger.info("Closing SMTP connection.")
  99. structured_logger.log_email_event(
  100. "Closing SMTP connection",
  101. LogLevel.INFO,
  102. {"sender_email": self.email}
  103. )
  104. try:
  105. self._smtp.quit()
  106. self._smtp = None
  107. except Exception as e:
  108. logger.warning(f"Error closing SMTP connection: {e}")
  109. self._smtp = None # Force reset even if quit fails
  110. # Stop the queue worker
  111. self._stop_queue_worker()
  112. def _start_queue_worker(self):
  113. """Start the background thread to process email queue"""
  114. with self._queue_lock:
  115. if not self._is_processing:
  116. self._is_processing = True
  117. self._queue_worker_thread = Thread(target=self._process_email_queue, daemon=True)
  118. self._queue_worker_thread.start()
  119. logger.info("Email queue worker started")
  120. def _stop_queue_worker(self):
  121. """Stop the background thread processing email queue"""
  122. with self._queue_lock:
  123. if self._is_processing:
  124. self._is_processing = False
  125. # Add a sentinel value to wake up the worker thread
  126. self._email_queue.put(None)
  127. if self._queue_worker_thread and self._queue_worker_thread.is_alive():
  128. self._queue_worker_thread.join(timeout=10)
  129. logger.info("Email queue worker stopped")
  130. def _process_email_queue(self):
  131. """Background worker that processes emails from the queue with 5-second delays"""
  132. logger.info("Email queue processor started")
  133. while self._is_processing:
  134. try:
  135. # Get email from queue (blocking call)
  136. email_data = self._email_queue.get(timeout=1)
  137. # Check for sentinel value (None) to stop processing
  138. if email_data is None:
  139. break
  140. # Process the email
  141. self._send_email_immediately(
  142. email_data['subject'],
  143. email_data['body'],
  144. email_data['to'],
  145. email_data['kwargs']
  146. )
  147. # Wait 5 seconds before processing next email (unless it's the last one in queue)
  148. if not self._email_queue.empty():
  149. logger.info("Waiting 5 seconds before sending next email...")
  150. time.sleep(5)
  151. except Empty:
  152. # If queue is empty, just continue to check for new emails
  153. continue
  154. except Exception as e:
  155. if self._is_processing: # Only log if we're still supposed to be processing
  156. logger.error(f"Error processing email queue: {e}")
  157. time.sleep(1) # Brief pause before retrying
  158. logger.info("Email queue processor stopped")
  159. def _send_email_immediately(self, subject: str, body: str, to: list[str], kwargs: Dict[str, Any]):
  160. """Send email immediately without queuing"""
  161. if self._smtp is None:
  162. self.connect()
  163. logger.debug(f"Sending queued email to: {to} with subject: '{subject}'")
  164. structured_logger.log_email_event(
  165. f"Sending queued email with subject: '{subject}'",
  166. LogLevel.INFO,
  167. {
  168. "subject": subject,
  169. "recipients": to,
  170. "sender_email": self.email,
  171. "body_length": len(body),
  172. "kwargs_count": len(kwargs),
  173. "queued": True
  174. }
  175. )
  176. try:
  177. msg = EmailMessage()
  178. msg['Subject'] = subject
  179. msg['From'] = self.email
  180. msg['To'] = ", ".join(to)
  181. msg.set_content('Este correo tiene contenido HTML.')
  182. msg.add_alternative(body.format(**kwargs), subtype='html')
  183. if not self._smtp:
  184. error_msg = "Cannot send email because SMTP connection is not established"
  185. logger.error(error_msg)
  186. structured_logger.log_email_event(
  187. "Queued email send failed: SMTP connection not established",
  188. LogLevel.ERROR,
  189. {
  190. "subject": subject,
  191. "recipients": to,
  192. "sender_email": self.email
  193. }
  194. )
  195. raise ConnectionError(error_msg)
  196. self._smtp.send_message(msg)
  197. logger.info(f"Queued email sent to {to} with subject '{subject}'.")
  198. structured_logger.log_email_event(
  199. f"Queued email sent successfully",
  200. LogLevel.INFO,
  201. {
  202. "subject": subject,
  203. "recipients": to,
  204. "sender_email": self.email,
  205. "recipients_count": len(to),
  206. "queued": True
  207. }
  208. )
  209. except smtplib.SMTPServerDisconnected as e:
  210. logger.warning("SMTP connection disconnected during queued send, retrying...")
  211. structured_logger.log_email_event(
  212. "SMTP disconnected during queued send, attempting retry",
  213. LogLevel.WARNING,
  214. {
  215. "subject": subject,
  216. "recipients": to,
  217. "error": str(e)
  218. }
  219. )
  220. try:
  221. self.close()
  222. self.connect()
  223. if self._smtp:
  224. self._smtp.send_message(msg)
  225. logger.info(f"Queued email resent successfully to {to}.")
  226. structured_logger.log_email_event(
  227. "Queued email sent successfully after retry",
  228. LogLevel.INFO,
  229. {
  230. "subject": subject,
  231. "recipients": to,
  232. "sender_email": self.email,
  233. "retry_attempt": True,
  234. "queued": True
  235. }
  236. )
  237. except Exception as retry_error:
  238. error_msg = f"Failed to resend queued email after retry: {retry_error}"
  239. logger.error(error_msg)
  240. structured_logger.log_email_event(
  241. "Queued email retry failed",
  242. LogLevel.ERROR,
  243. {
  244. "subject": subject,
  245. "recipients": to,
  246. "original_error": str(e),
  247. "retry_error": str(retry_error),
  248. "error_type": type(retry_error).__name__,
  249. "queued": True
  250. }
  251. )
  252. raise
  253. except Exception as e:
  254. error_msg = f"Failed to send queued email to {to}: {e}"
  255. logger.error(error_msg)
  256. structured_logger.log_email_event(
  257. "Queued email send failed",
  258. LogLevel.ERROR,
  259. {
  260. "subject": subject,
  261. "recipients": to,
  262. "sender_email": self.email,
  263. "error": str(e),
  264. "error_type": type(e).__name__,
  265. "queued": True
  266. }
  267. )
  268. raise
  269. def send_email(self, subject: str, body: str, to: list[str], **kwargs):
  270. """Add email to queue for asynchronous sending with 5-second delays"""
  271. logger.debug(f"Queuing email to: {to} with subject: '{subject}'")
  272. structured_logger.log_email_event(
  273. f"Queuing email with subject: '{subject}'",
  274. LogLevel.INFO,
  275. {
  276. "subject": subject,
  277. "recipients": to,
  278. "sender_email": self.email,
  279. "body_length": len(body),
  280. "kwargs_count": len(kwargs),
  281. "queue_size": self._email_queue.qsize() + 1
  282. }
  283. )
  284. # Add email to queue
  285. email_data = {
  286. 'subject': subject,
  287. 'body': body,
  288. 'to': to,
  289. 'kwargs': kwargs
  290. }
  291. self._email_queue.put(email_data)
  292. logger.info(f"Email queued for {to}. Queue size: {self._email_queue.qsize()}")
  293. def send_email_sync(self, subject: str, body: str, to: list[str], **kwargs):
  294. """Send email immediately (synchronous) - for backwards compatibility"""
  295. if self._smtp is None:
  296. self.connect()
  297. logger.debug(f"Preparing to send email to: {to} with subject: '{subject}'")
  298. structured_logger.log_email_event(
  299. f"Preparing to send email with subject: '{subject}'",
  300. LogLevel.INFO,
  301. {
  302. "subject": subject,
  303. "recipients": to,
  304. "sender_email": self.email,
  305. "body_length": len(body),
  306. "kwargs_count": len(kwargs)
  307. }
  308. )
  309. try:
  310. msg = EmailMessage()
  311. msg['Subject'] = subject
  312. msg['From'] = self.email
  313. msg['To'] = ", ".join(to)
  314. msg.set_content('Este correo tiene contenido HTML.')
  315. msg.add_alternative(body.format(**kwargs), subtype='html')
  316. if not self._smtp:
  317. error_msg = "Cannot send email because SMTP connection is not established"
  318. logger.error(error_msg)
  319. structured_logger.log_email_event(
  320. "Email send failed: SMTP connection not established",
  321. LogLevel.ERROR,
  322. {
  323. "subject": subject,
  324. "recipients": to,
  325. "sender_email": self.email
  326. }
  327. )
  328. raise ConnectionError(error_msg)
  329. self._smtp.send_message(msg)
  330. logger.info(f"Email sent to {to} with subject '{subject}'.")
  331. structured_logger.log_email_event(
  332. f"Email sent successfully",
  333. LogLevel.INFO,
  334. {
  335. "subject": subject,
  336. "recipients": to,
  337. "sender_email": self.email,
  338. "recipients_count": len(to)
  339. }
  340. )
  341. except smtplib.SMTPServerDisconnected as e:
  342. logger.warning("SMTP connection disconnected, retrying...")
  343. structured_logger.log_email_event(
  344. "SMTP disconnected during send, attempting retry",
  345. LogLevel.WARNING,
  346. {
  347. "subject": subject,
  348. "recipients": to,
  349. "error": str(e)
  350. }
  351. )
  352. try:
  353. self.close()
  354. self.connect()
  355. if self._smtp:
  356. self._smtp.send_message(msg)
  357. logger.info(f"Email resent successfully to {to}.")
  358. structured_logger.log_email_event(
  359. "Email sent successfully after retry",
  360. LogLevel.INFO,
  361. {
  362. "subject": subject,
  363. "recipients": to,
  364. "sender_email": self.email,
  365. "retry_attempt": True
  366. }
  367. )
  368. except Exception as retry_error:
  369. error_msg = f"Failed to resend email after retry: {retry_error}"
  370. logger.error(error_msg)
  371. structured_logger.log_email_event(
  372. "Email retry failed",
  373. LogLevel.ERROR,
  374. {
  375. "subject": subject,
  376. "recipients": to,
  377. "original_error": str(e),
  378. "retry_error": str(retry_error),
  379. "error_type": type(retry_error).__name__
  380. }
  381. )
  382. self.close()
  383. raise
  384. except Exception as e:
  385. error_msg = f"Failed to send email to {to}: {e}"
  386. logger.error(error_msg)
  387. structured_logger.log_email_event(
  388. "Email send failed",
  389. LogLevel.ERROR,
  390. {
  391. "subject": subject,
  392. "recipients": to,
  393. "sender_email": self.email,
  394. "error": str(e),
  395. "error_type": type(e).__name__
  396. }
  397. )
  398. self.close()
  399. raise
  400. def get_queue_size(self) -> int:
  401. """Get the current number of emails in the queue"""
  402. return self._email_queue.qsize()
  403. def is_queue_processing(self) -> bool:
  404. """Check if the queue worker is currently processing emails"""
  405. return self._is_processing and bool(self._queue_worker_thread and self._queue_worker_thread.is_alive())
  406. def clear_queue(self):
  407. """Clear all pending emails from the queue"""
  408. with self._queue_lock:
  409. # Clear the queue
  410. while not self._email_queue.empty():
  411. try:
  412. self._email_queue.get_nowait()
  413. except:
  414. break
  415. logger.info("Email queue cleared")
  416. structured_logger.log_email_event(
  417. "Email queue cleared",
  418. LogLevel.INFO,
  419. {"sender_email": self.email}
  420. )
  421. email_sender: Optional[EmailSender] = None
  422. def initialize_email_sender():
  423. global email_sender
  424. email_sender = EmailSender()
  425. def get_email_sender() -> EmailSender:
  426. if email_sender is None:
  427. initialize_email_sender()
  428. if not email_sender:
  429. raise ValueError("Email sender is not initialized and failed to initialize.")
  430. return email_sender