فهرست منبع

use broadcast to async

Erwin Jacimino 8 ماه پیش
والد
کامیت
03f0a9af62
2فایلهای تغییر یافته به همراه85 افزوده شده و 51 حذف شده
  1. 1 1
      public/main/js/app.js
  2. 84 50
      routes/chat.py

+ 1 - 1
public/main/js/app.js

@@ -780,7 +780,7 @@ function newUserInChat(userName) {
     chatMessagesElement.appendChild(userClone);
 }
 
-function removeUserFromChat(userName) {
+function userLeftChat(userName) {
     let userTemplate = chatMessagesElement.querySelector("#systemMessageTemplate");
     if (!userTemplate) return;
 

+ 84 - 50
routes/chat.py

@@ -1,104 +1,137 @@
 import asyncio
 import json
-from fastapi import Request, HTTPException, Depends
+import logging
+from fastapi import Request, HTTPException, Depends, APIRouter, WebSocket, WebSocketDisconnect
 from fastapi.responses import JSONResponse
+from fastapi.security import HTTPAuthorizationCredentials
 from pydantic import BaseModel, ConfigDict
+from broadcaster import Broadcast
+
 from models.chat import ChatCompletionRequest
 from models.user import User
 from services.openai_service.openai_service import generate_completion
 from auth.security import get_current_user
-import logging
-from fastapi import APIRouter, WebSocket, WebSocketDisconnect
-from fastapi.security import HTTPAuthorizationCredentials
 from config.messages import SuccessResponse
+from contextlib import asynccontextmanager
+from fastapi import FastAPI
 
 logger = logging.getLogger(__name__)
-
 chat_router = APIRouter()
 
+
 class ConnectedUser(BaseModel):
-    
     model_config = ConfigDict(arbitrary_types_allowed=True)
     websocket: WebSocket
     username: str
 
-connected_users: list[ConnectedUser] = []
+connected_users: list[ConnectedUser] = []  # todavía útil para listar usuarios conectados
+
+
+
 
 @chat_router.websocket("/ws")
 async def chat_irc_endpoint(websocket: WebSocket):
     """WebSocket endpoint for real-time chat interactions"""
     global connected_users
+    
+    
+    # Backend de broadcast con Redis
+    broadcast = Broadcast("redis://localhost:6379")
     logger.info("New WebSocket connection attempt")
     logger.debug(f"WebSocket query params: {websocket.query_params}")
-    current_user = await get_current_user(HTTPAuthorizationCredentials(scheme="Bearer", credentials=websocket.query_params.get("token", "")))
+
+    current_user = await get_current_user(
+        HTTPAuthorizationCredentials(
+            scheme="Bearer",
+            credentials=websocket.query_params.get("token", "")
+        )
+    )
     if not current_user:
         await websocket.close(code=1008)  # Policy Violation
         return
-    
+
     await websocket.accept()
     logger.info(f"User {current_user.email} connected to WebSocket chat.")
+
     username = None
+    stop_event = asyncio.Event()
+    async def reader():
+        """Lee mensajes desde Redis y los manda al WebSocket"""
+        logger.info("Iniciando lector de mensajes")
+        async with broadcast.subscribe(channel="chat") as subscriber:
+            logger.info("Subscribed to Redis channel 'chat'")
+            logger.debug("Starting message read loop")
+            async for event in subscriber:
+                try:
+                    logger.debug(f"Broadcasting message to WebSocket: {event.message}")
+                    await websocket.send_text(event.message)
+                except Exception:
+                    logger.error("Error sending message to WebSocket, closing connection.")
+                    break
+    await broadcast.connect()
+    reader_task = asyncio.create_task(reader())
+
+
     try:
         while True:
             try:
-                data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0)  # 5 minutes timeout
-                payload= json.loads(data)
+                data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0)
+                payload = json.loads(data)
                 event_type = payload.get("type")
 
                 if event_type == "join":
+                    logger.info(f"User {payload['username']} joined the chat.")
                     username = payload["username"]
                     connected_users.append(ConnectedUser(websocket=websocket, username=username))
-                    response = {
-                        "type": "join",
-                        "username": username
-                    }
-                
+                    response = {"type": "join", "username": username}
+
                 elif event_type == "message":
+                    logger.debug(f"Message from {payload['username']}: {payload['message']}")
                     username = payload["username"]
                     message = payload["message"]
-                    response = {
-                        "type": "message",
-                        "username": username,
-                        "message": message
-                    }
-                
+                    response = {"type": "message", "username": username, "message": message}
+
                 elif event_type == "leave":
-                    response = {
-                        "type": "leave",
-                        "username": payload["username"]
-                    }
-                    connected_users = [user for user in connected_users if user.websocket != websocket]
+                    logger.info(f"User {payload['username']} left the chat.")
+                    response = {"type": "leave", "username": payload["username"]}
+                    connected_users = [u for u in connected_users if u.websocket != websocket]
                     await websocket.close()
-                    logger.info(f"User {current_user.email} disconnected from WebSocket chat.")
                     break
+
                 elif event_type == "pong":
-                    logger.info(f"Received pong from user {current_user.email}")
+                    logger.debug(f"Received pong from user {current_user.email}")
                     continue
 
-                # Broadcast a todos los conectados
-                for client in connected_users:
-                    await client.websocket.send_text(json.dumps(response))
-                # Broadcast the received message to all connected users
+                # Publicar en Redis
+                await broadcast.publish(channel="chat", message=json.dumps(response))
+
             except asyncio.TimeoutError:
-                websocket_text = {
-                    "type": "ping"
-                }
-                await websocket.send_text(json.dumps(websocket_text))
+                await websocket.send_text(json.dumps({"type": "ping"}))
+
     except WebSocketDisconnect:
-        connected_users = [user for user in connected_users if user.websocket != websocket]
+        connected_users = [u for u in connected_users if u.websocket != websocket]
         logger.info(f"User {current_user.email} disconnected from WebSocket chat.")
-        response = {
-            "type": "leave",
-            "username": username if username else "unknown"
-        }
-        for client in connected_users:
-            await client.websocket.send_text(json.dumps(response))
+        response = {"type": "leave", "username": username or "unknown"}
+        await broadcast.publish(channel="chat", message=json.dumps(response))
+
+    finally:
+        reader_task.cancel()
+        await broadcast.disconnect()
+
+
+@chat_router.get("/users")
+async def get_connected_users(_: User = Depends(get_current_user)):
+    """Get a list of connected users (solo local al worker)"""
+    return {"users": [user.username for user in connected_users]}
 
 
 @chat_router.post("/completions")
-async def chat_completions(request_data: ChatCompletionRequest, request: Request, current_user: User = Depends(get_current_user)):
+async def chat_completions(
+    request_data: ChatCompletionRequest,
+    request: Request,
+    current_user: User = Depends(get_current_user)
+):
     """Get chat completions from OpenAI"""
-    # Uses session_token (which is the antiAbuseToken) as an identifier for logging
     session_identifier = request.session.get("antiAbuseToken", "unknown_session")
     logger.info(f"Chat completion request from user {current_user.email}")
 
@@ -109,13 +142,14 @@ async def chat_completions(request_data: ChatCompletionRequest, request: Request
             current_user.name,
             current_user.email
         )
-
         logger.info(f"Chat completion successful for user {current_user.email}")
-        return JSONResponse({"response": openai_response, "message": SuccessResponse.CHAT_RESPONSE_SUCCESS})
+        return JSONResponse({
+            "response": openai_response,
+            "message": SuccessResponse.CHAT_RESPONSE_SUCCESS
+        })
     except HTTPException as e:
         logger.error(f"HTTP error in chat completion for user {current_user.email}: {e.detail}")
         raise
     except Exception as e:
-        error_msg = f"Unexpected error in /api/chat/completions for user {current_user.email}: {e}"
-        logger.error(error_msg)
+        logger.error(f"Unexpected error: {e}")
         raise HTTPException(status_code=500, detail="Error interno del servidor al procesar el chat.")