import asyncio import json import logging from typing import Optional from fastapi import Query, Depends, APIRouter, WebSocket, WebSocketDisconnect from fastapi.security import HTTPAuthorizationCredentials from broadcaster import Broadcast from config.settings import DEVELOPMENT from models.chat import NotifyRequest from models.user import User from services.openai_service.openai_service import generate_completion, admin_completion from auth.security import get_current_user from redis import Redis logger = logging.getLogger(__name__) chat_router = APIRouter() redis_client = Redis(host='localhost', port=6379, db=1 if DEVELOPMENT else 0, decode_responses=True) @chat_router.websocket("/ws") async def chat_irc_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time chat interactions""" global connected_users # Backend de broadcast con Redis broadcast = Broadcast("redis://localhost:6379") logger.info("New WebSocket connection attempt") logger.debug(f"WebSocket query params: {websocket.query_params}") current_user = await get_current_user( HTTPAuthorizationCredentials( scheme="Bearer", credentials=websocket.query_params.get("token", "") ) ) if not current_user: await websocket.close(code=1008) # Policy Violation return await websocket.accept() logger.info(f"User {current_user.email} connected to WebSocket chat.") username = None async def reader(): """Lee mensajes desde Redis y los manda al WebSocket""" logger.info("Iniciando lector de mensajes") async with broadcast.subscribe(channel="chat") as subscriber: logger.info("Subscribed to Redis channel 'chat'") logger.debug("Starting message read loop") async for event in subscriber: try: logger.debug(f"Broadcasting message to WebSocket: {event.message}") await websocket.send_text(event.message) except Exception: logger.error("Error sending message to WebSocket, closing connection.") break await broadcast.connect() reader_task = asyncio.create_task(reader()) try: while True: try: data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0) payload = json.loads(data) event_type = payload.get("type") logger.debug(f"Received event: {event_type} with payload: {payload}") if event_type == "join": logger.info(f"User {payload['username']} joined the chat.") data = { "mail": current_user.email, "username": payload["username"] } redis_client.sadd("connected_users", json.dumps(data)) response = {"type": "join", "username": payload["username"]} elif event_type == "message": logger.debug(f"Message from {payload['username']}: {payload['message']}") message_username = payload["username"] message = payload["message"] response = {"type": "message", "username": message_username, "message": message} redis_client.rpush("chat_history", json.dumps(response)) redis_client.ltrim("chat_history", -100, -1) # Keep only elif event_type == "leave": logger.info(f"User {payload['username']} left the chat.") message_username = payload["username"] users = redis_client.smembers("connected_users") for user in users: user_data = json.loads(user) if user_data["username"] == message_username: redis_client.srem("connected_users", user) break response = {"type": "leave", "username": message_username} await websocket.close() break elif event_type == "ai_message": messages = redis_client.lrange("chat_history", -15, -1) parsed_messages = [json.loads(msg) for msg in messages] logger.debug(f"IA Message from {payload['username']}") message_username = payload["username"] response_content = await generate_completion(parsed_messages, current_user) response = {"type": "message", "username": "IAKlein", "message": response_content} elif event_type == "notification": logger.debug(f"Notification to {payload['username']}") notification_message = payload["message"] messages = redis_client.lrange("chat_history", -15, -1) parsed_messages = [json.loads(msg) for msg in messages] response_content = admin_completion(notification_message, parsed_messages) response = {"type": "message", "username": "IAKlein", "message": response_content} elif event_type == "mention": logger.debug(f"Mention to {payload['username']}") mention_username = payload["username"] logger.debug(f"Mention username: {mention_username}, Current username: {username}") await broadcast.publish(channel="chat", message=json.dumps({"type": "mentioned", "username": mention_username})) continue elif event_type == "pong": logger.debug(f"Received pong from user {current_user.email}") continue # Publicar en Redis await broadcast.publish(channel="chat", message=json.dumps(response)) except asyncio.TimeoutError: await websocket.send_text(json.dumps({"type": "ping"})) except WebSocketDisconnect: logger.info(f"User {current_user.email} disconnected from WebSocket chat.") users = redis_client.smembers("connected_users") for user in users: user_data = json.loads(user) if user_data["mail"] == current_user.email: redis_client.srem("connected_users", user) break response = {"type": "leave", "username": user_data["username"]} await broadcast.publish(channel="chat", message=json.dumps(response)) finally: reader_task.cancel() await broadcast.disconnect() @chat_router.post("/notify") async def notify_users(message: NotifyRequest, _: User = Depends(get_current_user)): """Send a notification message to all connected users""" broadcast = Broadcast("redis://localhost:6379") await broadcast.connect() await broadcast.publish(channel="chat", message=json.dumps({"type": "notification", "message": message.message})) await broadcast.disconnect() return {"status": "Notification sent"} @chat_router.get("/users") async def get_connected_users(q: Optional[str] = Query(None), _: User = Depends(get_current_user)): """Get a list of connected users (solo local al worker)""" # return {"users": [user.username for user in connected_users if q.lower() in user.username.lower()]} all_users = redis_client.smembers("connected_users") all_users = [json.loads(user)["username"] for user in all_users] if q is None or q.strip() == "": return {"users": all_users} filtered_users = [user for user in all_users if q.lower() in user.lower()] return {"users": filtered_users} @chat_router.get("/onlines") async def get_online_user_count(_: User = Depends(get_current_user)): """Get the count of online users (solo local al worker)""" all_users = redis_client.smembers("connected_users") return {"count": len(all_users)}