| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- import asyncio
- import json
- import logging
- from fastapi import Request, HTTPException, Depends, APIRouter, WebSocket, WebSocketDisconnect
- from fastapi.responses import JSONResponse
- from fastapi.security import HTTPAuthorizationCredentials
- from pydantic import BaseModel, ConfigDict
- from broadcaster import Broadcast
- from models.chat import ChatCompletionRequest
- from models.user import User
- from services.openai_service.openai_service import generate_completion
- from auth.security import get_current_user
- from config.messages import SuccessResponse
- from contextlib import asynccontextmanager
- from fastapi import FastAPI
- logger = logging.getLogger(__name__)
- chat_router = APIRouter()
- class ConnectedUser(BaseModel):
- model_config = ConfigDict(arbitrary_types_allowed=True)
- websocket: WebSocket
- username: str
- connected_users: list[ConnectedUser] = [] # todavía útil para listar usuarios conectados
- @chat_router.websocket("/ws")
- async def chat_irc_endpoint(websocket: WebSocket):
- """WebSocket endpoint for real-time chat interactions"""
- global connected_users
-
-
- # Backend de broadcast con Redis
- broadcast = Broadcast("redis://localhost:6379")
- logger.info("New WebSocket connection attempt")
- logger.debug(f"WebSocket query params: {websocket.query_params}")
- current_user = await get_current_user(
- HTTPAuthorizationCredentials(
- scheme="Bearer",
- credentials=websocket.query_params.get("token", "")
- )
- )
- if not current_user:
- await websocket.close(code=1008) # Policy Violation
- return
- await websocket.accept()
- logger.info(f"User {current_user.email} connected to WebSocket chat.")
- username = None
- stop_event = asyncio.Event()
- async def reader():
- """Lee mensajes desde Redis y los manda al WebSocket"""
- logger.info("Iniciando lector de mensajes")
- async with broadcast.subscribe(channel="chat") as subscriber:
- logger.info("Subscribed to Redis channel 'chat'")
- logger.debug("Starting message read loop")
- async for event in subscriber:
- try:
- logger.debug(f"Broadcasting message to WebSocket: {event.message}")
- await websocket.send_text(event.message)
- except Exception:
- logger.error("Error sending message to WebSocket, closing connection.")
- break
- await broadcast.connect()
- reader_task = asyncio.create_task(reader())
- try:
- while True:
- try:
- data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0)
- payload = json.loads(data)
- event_type = payload.get("type")
- if event_type == "join":
- logger.info(f"User {payload['username']} joined the chat.")
- username = payload["username"]
- connected_users.append(ConnectedUser(websocket=websocket, username=username))
- response = {"type": "join", "username": username}
- elif event_type == "message":
- logger.debug(f"Message from {payload['username']}: {payload['message']}")
- username = payload["username"]
- message = payload["message"]
- response = {"type": "message", "username": username, "message": message}
- elif event_type == "leave":
- logger.info(f"User {payload['username']} left the chat.")
- response = {"type": "leave", "username": payload["username"]}
- connected_users = [u for u in connected_users if u.websocket != websocket]
- await websocket.close()
- break
- elif event_type == "pong":
- logger.debug(f"Received pong from user {current_user.email}")
- continue
- # Publicar en Redis
- await broadcast.publish(channel="chat", message=json.dumps(response))
- except asyncio.TimeoutError:
- await websocket.send_text(json.dumps({"type": "ping"}))
- except WebSocketDisconnect:
- connected_users = [u for u in connected_users if u.websocket != websocket]
- logger.info(f"User {current_user.email} disconnected from WebSocket chat.")
- response = {"type": "leave", "username": username or "unknown"}
- await broadcast.publish(channel="chat", message=json.dumps(response))
- finally:
- reader_task.cancel()
- await broadcast.disconnect()
- @chat_router.get("/users")
- async def get_connected_users(_: User = Depends(get_current_user)):
- """Get a list of connected users (solo local al worker)"""
- return {"users": [user.username for user in connected_users]}
- @chat_router.post("/completions")
- async def chat_completions(
- request_data: ChatCompletionRequest,
- request: Request,
- current_user: User = Depends(get_current_user)
- ):
- """Get chat completions from OpenAI"""
- session_identifier = request.session.get("antiAbuseToken", "unknown_session")
- logger.info(f"Chat completion request from user {current_user.email}")
- try:
- openai_response = await generate_completion(
- request_data.messages,
- session_identifier,
- current_user.name,
- current_user.email
- )
- logger.info(f"Chat completion successful for user {current_user.email}")
- return JSONResponse({
- "response": openai_response,
- "message": SuccessResponse.CHAT_RESPONSE_SUCCESS
- })
- except HTTPException as e:
- logger.error(f"HTTP error in chat completion for user {current_user.email}: {e.detail}")
- raise
- except Exception as e:
- logger.error(f"Unexpected error: {e}")
- raise HTTPException(status_code=500, detail="Error interno del servidor al procesar el chat.")
|