chat.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import asyncio
  2. import json
  3. import logging
  4. from fastapi import Request, HTTPException, Depends, APIRouter, WebSocket, WebSocketDisconnect
  5. from fastapi.responses import JSONResponse
  6. from fastapi.security import HTTPAuthorizationCredentials
  7. from pydantic import BaseModel, ConfigDict
  8. from broadcaster import Broadcast
  9. from models.chat import ChatCompletionRequest
  10. from models.user import User
  11. from services.openai_service.openai_service import generate_completion
  12. from auth.security import get_current_user
  13. from config.messages import SuccessResponse
  14. from contextlib import asynccontextmanager
  15. from fastapi import FastAPI
  16. logger = logging.getLogger(__name__)
  17. chat_router = APIRouter()
  18. class ConnectedUser(BaseModel):
  19. model_config = ConfigDict(arbitrary_types_allowed=True)
  20. websocket: WebSocket
  21. username: str
  22. connected_users: list[ConnectedUser] = [] # todavía útil para listar usuarios conectados
  23. @chat_router.websocket("/ws")
  24. async def chat_irc_endpoint(websocket: WebSocket):
  25. """WebSocket endpoint for real-time chat interactions"""
  26. global connected_users
  27. # Backend de broadcast con Redis
  28. broadcast = Broadcast("redis://localhost:6379")
  29. logger.info("New WebSocket connection attempt")
  30. logger.debug(f"WebSocket query params: {websocket.query_params}")
  31. current_user = await get_current_user(
  32. HTTPAuthorizationCredentials(
  33. scheme="Bearer",
  34. credentials=websocket.query_params.get("token", "")
  35. )
  36. )
  37. if not current_user:
  38. await websocket.close(code=1008) # Policy Violation
  39. return
  40. await websocket.accept()
  41. logger.info(f"User {current_user.email} connected to WebSocket chat.")
  42. username = None
  43. stop_event = asyncio.Event()
  44. async def reader():
  45. """Lee mensajes desde Redis y los manda al WebSocket"""
  46. logger.info("Iniciando lector de mensajes")
  47. async with broadcast.subscribe(channel="chat") as subscriber:
  48. logger.info("Subscribed to Redis channel 'chat'")
  49. logger.debug("Starting message read loop")
  50. async for event in subscriber:
  51. try:
  52. logger.debug(f"Broadcasting message to WebSocket: {event.message}")
  53. await websocket.send_text(event.message)
  54. except Exception:
  55. logger.error("Error sending message to WebSocket, closing connection.")
  56. break
  57. await broadcast.connect()
  58. reader_task = asyncio.create_task(reader())
  59. try:
  60. while True:
  61. try:
  62. data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0)
  63. payload = json.loads(data)
  64. event_type = payload.get("type")
  65. if event_type == "join":
  66. logger.info(f"User {payload['username']} joined the chat.")
  67. username = payload["username"]
  68. connected_users.append(ConnectedUser(websocket=websocket, username=username))
  69. response = {"type": "join", "username": username}
  70. elif event_type == "message":
  71. logger.debug(f"Message from {payload['username']}: {payload['message']}")
  72. username = payload["username"]
  73. message = payload["message"]
  74. response = {"type": "message", "username": username, "message": message}
  75. elif event_type == "leave":
  76. logger.info(f"User {payload['username']} left the chat.")
  77. response = {"type": "leave", "username": payload["username"]}
  78. connected_users = [u for u in connected_users if u.websocket != websocket]
  79. await websocket.close()
  80. break
  81. elif event_type == "pong":
  82. logger.debug(f"Received pong from user {current_user.email}")
  83. continue
  84. # Publicar en Redis
  85. await broadcast.publish(channel="chat", message=json.dumps(response))
  86. except asyncio.TimeoutError:
  87. await websocket.send_text(json.dumps({"type": "ping"}))
  88. except WebSocketDisconnect:
  89. connected_users = [u for u in connected_users if u.websocket != websocket]
  90. logger.info(f"User {current_user.email} disconnected from WebSocket chat.")
  91. response = {"type": "leave", "username": username or "unknown"}
  92. await broadcast.publish(channel="chat", message=json.dumps(response))
  93. finally:
  94. reader_task.cancel()
  95. await broadcast.disconnect()
  96. @chat_router.get("/users")
  97. async def get_connected_users(_: User = Depends(get_current_user)):
  98. """Get a list of connected users (solo local al worker)"""
  99. return {"users": [user.username for user in connected_users]}
  100. @chat_router.post("/completions")
  101. async def chat_completions(
  102. request_data: ChatCompletionRequest,
  103. request: Request,
  104. current_user: User = Depends(get_current_user)
  105. ):
  106. """Get chat completions from OpenAI"""
  107. session_identifier = request.session.get("antiAbuseToken", "unknown_session")
  108. logger.info(f"Chat completion request from user {current_user.email}")
  109. try:
  110. openai_response = await generate_completion(
  111. request_data.messages,
  112. session_identifier,
  113. current_user.name,
  114. current_user.email
  115. )
  116. logger.info(f"Chat completion successful for user {current_user.email}")
  117. return JSONResponse({
  118. "response": openai_response,
  119. "message": SuccessResponse.CHAT_RESPONSE_SUCCESS
  120. })
  121. except HTTPException as e:
  122. logger.error(f"HTTP error in chat completion for user {current_user.email}: {e.detail}")
  123. raise
  124. except Exception as e:
  125. logger.error(f"Unexpected error: {e}")
  126. raise HTTPException(status_code=500, detail="Error interno del servidor al procesar el chat.")