import asyncio import json import logging from fastapi import Request, HTTPException, Depends, APIRouter, WebSocket, WebSocketDisconnect from fastapi.responses import JSONResponse from fastapi.security import HTTPAuthorizationCredentials from pydantic import BaseModel, ConfigDict from broadcaster import Broadcast from models.chat import ChatCompletionRequest from models.user import User from services.openai_service.openai_service import generate_completion from auth.security import get_current_user from config.messages import SuccessResponse from contextlib import asynccontextmanager from fastapi import FastAPI logger = logging.getLogger(__name__) chat_router = APIRouter() class ConnectedUser(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) websocket: WebSocket username: str connected_users: list[ConnectedUser] = [] # todavía útil para listar usuarios conectados @chat_router.websocket("/ws") async def chat_irc_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time chat interactions""" global connected_users # Backend de broadcast con Redis broadcast = Broadcast("redis://localhost:6379") logger.info("New WebSocket connection attempt") logger.debug(f"WebSocket query params: {websocket.query_params}") current_user = await get_current_user( HTTPAuthorizationCredentials( scheme="Bearer", credentials=websocket.query_params.get("token", "") ) ) if not current_user: await websocket.close(code=1008) # Policy Violation return await websocket.accept() logger.info(f"User {current_user.email} connected to WebSocket chat.") username = None stop_event = asyncio.Event() async def reader(): """Lee mensajes desde Redis y los manda al WebSocket""" logger.info("Iniciando lector de mensajes") async with broadcast.subscribe(channel="chat") as subscriber: logger.info("Subscribed to Redis channel 'chat'") logger.debug("Starting message read loop") async for event in subscriber: try: logger.debug(f"Broadcasting message to WebSocket: {event.message}") await websocket.send_text(event.message) except Exception: logger.error("Error sending message to WebSocket, closing connection.") break await broadcast.connect() reader_task = asyncio.create_task(reader()) try: while True: try: data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0) payload = json.loads(data) event_type = payload.get("type") if event_type == "join": logger.info(f"User {payload['username']} joined the chat.") username = payload["username"] connected_users.append(ConnectedUser(websocket=websocket, username=username)) response = {"type": "join", "username": username} elif event_type == "message": logger.debug(f"Message from {payload['username']}: {payload['message']}") username = payload["username"] message = payload["message"] response = {"type": "message", "username": username, "message": message} elif event_type == "leave": logger.info(f"User {payload['username']} left the chat.") response = {"type": "leave", "username": payload["username"]} connected_users = [u for u in connected_users if u.websocket != websocket] await websocket.close() break elif event_type == "pong": logger.debug(f"Received pong from user {current_user.email}") continue # Publicar en Redis await broadcast.publish(channel="chat", message=json.dumps(response)) except asyncio.TimeoutError: await websocket.send_text(json.dumps({"type": "ping"})) except WebSocketDisconnect: connected_users = [u for u in connected_users if u.websocket != websocket] logger.info(f"User {current_user.email} disconnected from WebSocket chat.") response = {"type": "leave", "username": username or "unknown"} await broadcast.publish(channel="chat", message=json.dumps(response)) finally: reader_task.cancel() await broadcast.disconnect() @chat_router.get("/users") async def get_connected_users(_: User = Depends(get_current_user)): """Get a list of connected users (solo local al worker)""" return {"users": [user.username for user in connected_users]} @chat_router.post("/completions") async def chat_completions( request_data: ChatCompletionRequest, request: Request, current_user: User = Depends(get_current_user) ): """Get chat completions from OpenAI""" session_identifier = request.session.get("antiAbuseToken", "unknown_session") logger.info(f"Chat completion request from user {current_user.email}") try: openai_response = await generate_completion( request_data.messages, session_identifier, current_user.name, current_user.email ) logger.info(f"Chat completion successful for user {current_user.email}") return JSONResponse({ "response": openai_response, "message": SuccessResponse.CHAT_RESPONSE_SUCCESS }) except HTTPException as e: logger.error(f"HTTP error in chat completion for user {current_user.email}: {e.detail}") raise except Exception as e: logger.error(f"Unexpected error: {e}") raise HTTPException(status_code=500, detail="Error interno del servidor al procesar el chat.")