| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- import asyncio
- import json
- from fastapi import Request, HTTPException, Depends
- from fastapi.responses import JSONResponse
- from pydantic import BaseModel, ConfigDict
- from models.chat import ChatCompletionRequest
- from models.user import User
- from services.openai_service.openai_service import generate_completion
- from auth.security import get_current_user
- import logging
- from fastapi import APIRouter, WebSocket, WebSocketDisconnect
- from fastapi.security import HTTPAuthorizationCredentials
- from config.messages import SuccessResponse
- logger = logging.getLogger(__name__)
- chat_router = APIRouter()
- class ConnectedUser(BaseModel):
-
- model_config = ConfigDict(arbitrary_types_allowed=True)
- websocket: WebSocket
- username: str
- connected_users: list[ConnectedUser] = []
- @chat_router.websocket("/ws")
- async def chat_irc_endpoint(websocket: WebSocket):
- """WebSocket endpoint for real-time chat interactions"""
- global connected_users
- logger.info("New WebSocket connection attempt")
- logger.debug(f"WebSocket query params: {websocket.query_params}")
- current_user = await get_current_user(HTTPAuthorizationCredentials(scheme="Bearer", credentials=websocket.query_params.get("token", "")))
- if not current_user:
- await websocket.close(code=1008) # Policy Violation
- return
-
- await websocket.accept()
- logger.info(f"User {current_user.email} connected to WebSocket chat.")
- username = None
- try:
- while True:
- try:
- data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0) # 5 minutes timeout
- payload= json.loads(data)
- event_type = payload.get("type")
- if event_type == "join":
- username = payload["username"]
- connected_users.append(ConnectedUser(websocket=websocket, username=username))
- response = {
- "type": "join",
- "username": username
- }
-
- elif event_type == "message":
- username = payload["username"]
- message = payload["message"]
- response = {
- "type": "message",
- "username": username,
- "message": message
- }
-
- elif event_type == "leave":
- response = {
- "type": "leave",
- "username": payload["username"]
- }
- connected_users = [user for user in connected_users if user.websocket != websocket]
- await websocket.close()
- logger.info(f"User {current_user.email} disconnected from WebSocket chat.")
- break
- elif event_type == "pong":
- logger.info(f"Received pong from user {current_user.email}")
- continue
- # Broadcast a todos los conectados
- for client in connected_users:
- await client.websocket.send_text(json.dumps(response))
- # Broadcast the received message to all connected users
- except asyncio.TimeoutError:
- websocket_text = {
- "type": "ping"
- }
- await websocket.send_text(json.dumps(websocket_text))
- except WebSocketDisconnect:
- connected_users = [user for user in connected_users if user.websocket != websocket]
- logger.info(f"User {current_user.email} disconnected from WebSocket chat.")
- response = {
- "type": "leave",
- "username": username if username else "unknown"
- }
- for client in connected_users:
- await client.websocket.send_text(json.dumps(response))
- @chat_router.post("/completions")
- async def chat_completions(request_data: ChatCompletionRequest, request: Request, current_user: User = Depends(get_current_user)):
- """Get chat completions from OpenAI"""
- # Uses session_token (which is the antiAbuseToken) as an identifier for logging
- session_identifier = request.session.get("antiAbuseToken", "unknown_session")
- logger.info(f"Chat completion request from user {current_user.email}")
- try:
- openai_response = await generate_completion(
- request_data.messages,
- session_identifier,
- current_user.name,
- current_user.email
- )
- logger.info(f"Chat completion successful for user {current_user.email}")
- return JSONResponse({"response": openai_response, "message": SuccessResponse.CHAT_RESPONSE_SUCCESS})
- except HTTPException as e:
- logger.error(f"HTTP error in chat completion for user {current_user.email}: {e.detail}")
- raise
- except Exception as e:
- error_msg = f"Unexpected error in /api/chat/completions for user {current_user.email}: {e}"
- logger.error(error_msg)
- raise HTTPException(status_code=500, detail="Error interno del servidor al procesar el chat.")
|