import asyncio import json from fastapi import Request, HTTPException, Depends from fastapi.responses import JSONResponse from pydantic import BaseModel, ConfigDict from models.chat import ChatCompletionRequest from models.user import User from services.openai_service.openai_service import generate_completion from auth.security import get_current_user import logging from fastapi import APIRouter, WebSocket, WebSocketDisconnect from fastapi.security import HTTPAuthorizationCredentials from config.messages import SuccessResponse logger = logging.getLogger(__name__) chat_router = APIRouter() class ConnectedUser(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) websocket: WebSocket username: str connected_users: list[ConnectedUser] = [] @chat_router.websocket("/ws") async def chat_irc_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time chat interactions""" global connected_users logger.info("New WebSocket connection attempt") logger.debug(f"WebSocket query params: {websocket.query_params}") current_user = await get_current_user(HTTPAuthorizationCredentials(scheme="Bearer", credentials=websocket.query_params.get("token", ""))) if not current_user: await websocket.close(code=1008) # Policy Violation return await websocket.accept() logger.info(f"User {current_user.email} connected to WebSocket chat.") username = None try: while True: try: data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0) # 5 minutes timeout payload= json.loads(data) event_type = payload.get("type") if event_type == "join": username = payload["username"] connected_users.append(ConnectedUser(websocket=websocket, username=username)) response = { "type": "join", "username": username } elif event_type == "message": username = payload["username"] message = payload["message"] response = { "type": "message", "username": username, "message": message } elif event_type == "leave": response = { "type": "leave", "username": payload["username"] } connected_users = [user for user in connected_users if user.websocket != websocket] await websocket.close() logger.info(f"User {current_user.email} disconnected from WebSocket chat.") break elif event_type == "pong": logger.info(f"Received pong from user {current_user.email}") continue # Broadcast a todos los conectados for client in connected_users: await client.websocket.send_text(json.dumps(response)) # Broadcast the received message to all connected users except asyncio.TimeoutError: websocket_text = { "type": "ping" } await websocket.send_text(json.dumps(websocket_text)) except WebSocketDisconnect: connected_users = [user for user in connected_users if user.websocket != websocket] logger.info(f"User {current_user.email} disconnected from WebSocket chat.") response = { "type": "leave", "username": username if username else "unknown" } for client in connected_users: await client.websocket.send_text(json.dumps(response)) @chat_router.post("/completions") async def chat_completions(request_data: ChatCompletionRequest, request: Request, current_user: User = Depends(get_current_user)): """Get chat completions from OpenAI""" # Uses session_token (which is the antiAbuseToken) as an identifier for logging session_identifier = request.session.get("antiAbuseToken", "unknown_session") logger.info(f"Chat completion request from user {current_user.email}") try: openai_response = await generate_completion( request_data.messages, session_identifier, current_user.name, current_user.email ) logger.info(f"Chat completion successful for user {current_user.email}") return JSONResponse({"response": openai_response, "message": SuccessResponse.CHAT_RESPONSE_SUCCESS}) except HTTPException as e: logger.error(f"HTTP error in chat completion for user {current_user.email}: {e.detail}") raise except Exception as e: error_msg = f"Unexpected error in /api/chat/completions for user {current_user.email}: {e}" logger.error(error_msg) raise HTTPException(status_code=500, detail="Error interno del servidor al procesar el chat.")