chat.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. import asyncio
  2. import json
  3. import logging
  4. from typing import Optional
  5. from fastapi import Query, Depends, APIRouter, WebSocket, WebSocketDisconnect
  6. from fastapi.security import HTTPAuthorizationCredentials
  7. from broadcaster import Broadcast
  8. from config.settings import DEVELOPMENT
  9. from models.chat import NotifyRequest
  10. from models.user import User
  11. from services.openai_service.openai_service import generate_completion, admin_completion
  12. from auth.security import get_current_user
  13. from redis import Redis
  14. from utils.responses import success_response
  15. logger = logging.getLogger(__name__)
  16. chat_router = APIRouter()
  17. redis_client = Redis(host='localhost', port=6379, db=1 if DEVELOPMENT else 0, decode_responses=True)
  18. broadcast_channel = "chat" if not DEVELOPMENT else "chat_dev"
  19. @chat_router.websocket("/ws")
  20. async def chat_irc_endpoint(websocket: WebSocket):
  21. """WebSocket endpoint for real-time chat interactions"""
  22. # Backend de broadcast con Redis
  23. broadcast = Broadcast("redis://localhost:6379")
  24. logger.info("New WebSocket connection attempt")
  25. logger.debug(f"WebSocket query params: {websocket.query_params}")
  26. current_user = await get_current_user(
  27. HTTPAuthorizationCredentials(
  28. scheme="Bearer",
  29. credentials=websocket.query_params.get("token", "")
  30. )
  31. )
  32. if not current_user:
  33. await websocket.close(code=1008) # Policy Violation
  34. return
  35. await websocket.accept()
  36. logger.info(f"User {current_user.email} connected to WebSocket chat.")
  37. username = None
  38. async def reader():
  39. """Lee mensajes desde Redis y los manda al WebSocket"""
  40. logger.info("Iniciando lector de mensajes")
  41. async with broadcast.subscribe(channel=broadcast_channel) as subscriber:
  42. logger.info(f"Subscribed to Redis channel '{broadcast_channel}'")
  43. logger.debug("Starting message read loop")
  44. try:
  45. async for event in subscriber: # type: ignore
  46. try:
  47. logger.debug(f"Broadcasting message to WebSocket: {event.message}")
  48. await websocket.send_text(event.message)
  49. except Exception as e:
  50. logger.error(f"Error sending message to WebSocket: {e}, closing connection.")
  51. break
  52. except asyncio.CancelledError:
  53. logger.info("Conection closed by client")
  54. await broadcast.connect()
  55. reader_task = asyncio.create_task(reader())
  56. try:
  57. while True:
  58. try:
  59. data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0)
  60. logger.debug(f"Received data from WebSocket: {data}")
  61. payload = json.loads(data)
  62. event_type = payload.get("type")
  63. logger.debug(f"Received event: {event_type} with payload: {payload}")
  64. if event_type == "join":
  65. logger.info(f"User {payload['username']} joined the chat.")
  66. data = {
  67. "mail": current_user.email,
  68. "username": payload["username"]
  69. }
  70. redis_client.sadd("connected_users", json.dumps(data))
  71. response = {"type": "join", "username": payload["username"]}
  72. elif event_type == "message":
  73. logger.debug(f"Message from {payload['username']}: {payload['message']}")
  74. message_username = payload["username"]
  75. message = payload["message"]
  76. response = {"type": "message", "username": message_username, "message": message}
  77. redis_client.rpush("chat_history", json.dumps(response))
  78. redis_client.ltrim("chat_history", -100, -1) # Keep only
  79. elif event_type == "leave":
  80. logger.info(f"User {payload['username']} left the chat.")
  81. message_username = payload["username"]
  82. users = redis_client.smembers("connected_users")
  83. for user in users: # type: ignore
  84. user_data = json.loads(user)
  85. if user_data["username"] == message_username:
  86. redis_client.srem("connected_users", user)
  87. break
  88. response = {"type": "leave", "username": message_username}
  89. await websocket.close()
  90. break
  91. elif event_type == "ai_message":
  92. messages = redis_client.lrange("chat_history", -15, -1)
  93. parsed_messages = [json.loads(msg) for msg in messages] # type: ignore
  94. logger.debug(f"IA Message from {payload['username']}")
  95. message_username = payload["username"]
  96. response_content = await generate_completion(parsed_messages, current_user)
  97. redis_client.rpush("chat_history", json.dumps({"type": "ai_message", "username": "IAKlein", "message": response_content}))
  98. response = {"type": "message", "username": "IAKlein", "message": response_content}
  99. elif event_type == "mention":
  100. logger.debug(f"Mention to {payload['username']}")
  101. mention_username = payload["username"]
  102. logger.debug(f"Mention username: {mention_username}, Current username: {username}")
  103. await broadcast.publish(channel=broadcast_channel, message=json.dumps({"type": "mentioned", "username": mention_username}))
  104. continue
  105. elif event_type == "pong":
  106. continue
  107. # Publicar en Redis
  108. await broadcast.publish(channel=broadcast_channel, message=json.dumps(response)) # type: ignore
  109. except asyncio.TimeoutError:
  110. await websocket.send_text(json.dumps({"type": "ping"}))
  111. except WebSocketDisconnect:
  112. logger.info(f"User {current_user.email} disconnected from WebSocket chat.")
  113. users = redis_client.smembers("connected_users")
  114. for user in users: # type: ignore
  115. user_data = json.loads(user)
  116. if user_data["mail"] == current_user.email:
  117. redis_client.srem("connected_users", user)
  118. break
  119. response = {"type": "leave", "username": user_data["username"]} # type: ignore
  120. await broadcast.publish(channel=broadcast_channel, message=json.dumps(response))
  121. finally:
  122. reader_task.cancel()
  123. await broadcast.disconnect()
  124. @chat_router.post("/notify")
  125. async def notify_users(message: NotifyRequest, _: User = Depends(get_current_user)):
  126. broadcast = Broadcast("redis://localhost:6379")
  127. await broadcast.connect()
  128. # Obtener historial de mensajes para generar respuesta de IA
  129. messages = redis_client.lrange("chat_history", -15, -1)
  130. parsed_messages = [json.loads(msg) for msg in messages] # type: ignore
  131. # Generar respuesta de IA para la notificación
  132. logger.debug(f"Processing notification: {message.message}")
  133. response_content = admin_completion(message.message, parsed_messages)
  134. # Enviar directamente la respuesta de IA como mensaje
  135. response = {"type": "message", "username": "IAKlein", "message": response_content}
  136. await broadcast.publish(channel=broadcast_channel, message=json.dumps(response))
  137. await broadcast.disconnect()
  138. return success_response({}, message="Notificación enviada correctamente")
  139. @chat_router.get("/users")
  140. async def get__connected_users(q: Optional[str] = Query(None), _: User = Depends(get_current_user)):
  141. """Get a list of connected users (solo local al worker)"""
  142. # return {"users": [user.username for user in connected_users if q.lower() in user.username.lower()]}
  143. all_users = redis_client.smembers("connected_users")
  144. all_users = [json.loads(user)["username"] for user in all_users] # type: ignore
  145. if q is None or q.strip() == "":
  146. return success_response(data=all_users)
  147. filtered_users = [user for user in all_users if q.lower() in user.lower()]
  148. return success_response(data={"users": filtered_users})
  149. @chat_router.get("/onlines")
  150. async def get_online_user_count(_: User = Depends(get_current_user)):
  151. """Get the count of online users (solo local al worker)"""
  152. all_users = redis_client.smembers("connected_users")
  153. return success_response(data={"count": len(all_users)}) # type: ignore