chat.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. import asyncio
  2. import json
  3. from fastapi import Request, HTTPException, Depends
  4. from fastapi.responses import JSONResponse
  5. from pydantic import BaseModel, ConfigDict
  6. from models.chat import ChatCompletionRequest
  7. from models.user import User
  8. from services.openai_service.openai_service import generate_completion
  9. from auth.security import get_current_user
  10. import logging
  11. from fastapi import APIRouter, WebSocket, WebSocketDisconnect
  12. from fastapi.security import HTTPAuthorizationCredentials
  13. from config.messages import SuccessResponse
  14. logger = logging.getLogger(__name__)
  15. chat_router = APIRouter()
  16. class ConnectedUser(BaseModel):
  17. model_config = ConfigDict(arbitrary_types_allowed=True)
  18. websocket: WebSocket
  19. username: str
  20. connected_users: list[ConnectedUser] = []
  21. @chat_router.websocket("/ws")
  22. async def chat_irc_endpoint(websocket: WebSocket):
  23. """WebSocket endpoint for real-time chat interactions"""
  24. global connected_users
  25. logger.info("New WebSocket connection attempt")
  26. logger.debug(f"WebSocket query params: {websocket.query_params}")
  27. current_user = await get_current_user(HTTPAuthorizationCredentials(scheme="Bearer", credentials=websocket.query_params.get("token", "")))
  28. if not current_user:
  29. await websocket.close(code=1008) # Policy Violation
  30. return
  31. await websocket.accept()
  32. logger.info(f"User {current_user.email} connected to WebSocket chat.")
  33. username = None
  34. try:
  35. while True:
  36. try:
  37. data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0) # 5 minutes timeout
  38. payload= json.loads(data)
  39. event_type = payload.get("type")
  40. if event_type == "join":
  41. username = payload["username"]
  42. connected_users.append(ConnectedUser(websocket=websocket, username=username))
  43. response = {
  44. "type": "join",
  45. "username": username
  46. }
  47. elif event_type == "message":
  48. username = payload["username"]
  49. message = payload["message"]
  50. response = {
  51. "type": "message",
  52. "username": username,
  53. "message": message
  54. }
  55. elif event_type == "leave":
  56. response = {
  57. "type": "leave",
  58. "username": payload["username"]
  59. }
  60. connected_users = [user for user in connected_users if user.websocket != websocket]
  61. await websocket.close()
  62. logger.info(f"User {current_user.email} disconnected from WebSocket chat.")
  63. break
  64. elif event_type == "pong":
  65. logger.info(f"Received pong from user {current_user.email}")
  66. continue
  67. # Broadcast a todos los conectados
  68. for client in connected_users:
  69. await client.websocket.send_text(json.dumps(response))
  70. # Broadcast the received message to all connected users
  71. except asyncio.TimeoutError:
  72. websocket_text = {
  73. "type": "ping"
  74. }
  75. await websocket.send_text(json.dumps(websocket_text))
  76. except WebSocketDisconnect:
  77. connected_users = [user for user in connected_users if user.websocket != websocket]
  78. logger.info(f"User {current_user.email} disconnected from WebSocket chat.")
  79. response = {
  80. "type": "leave",
  81. "username": username if username else "unknown"
  82. }
  83. for client in connected_users:
  84. await client.websocket.send_text(json.dumps(response))
  85. @chat_router.post("/completions")
  86. async def chat_completions(request_data: ChatCompletionRequest, request: Request, current_user: User = Depends(get_current_user)):
  87. """Get chat completions from OpenAI"""
  88. # Uses session_token (which is the antiAbuseToken) as an identifier for logging
  89. session_identifier = request.session.get("antiAbuseToken", "unknown_session")
  90. logger.info(f"Chat completion request from user {current_user.email}")
  91. try:
  92. openai_response = await generate_completion(
  93. request_data.messages,
  94. session_identifier,
  95. current_user.name,
  96. current_user.email
  97. )
  98. logger.info(f"Chat completion successful for user {current_user.email}")
  99. return JSONResponse({"response": openai_response, "message": SuccessResponse.CHAT_RESPONSE_SUCCESS})
  100. except HTTPException as e:
  101. logger.error(f"HTTP error in chat completion for user {current_user.email}: {e.detail}")
  102. raise
  103. except Exception as e:
  104. error_msg = f"Unexpected error in /api/chat/completions for user {current_user.email}: {e}"
  105. logger.error(error_msg)
  106. raise HTTPException(status_code=500, detail="Error interno del servidor al procesar el chat.")