| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- import asyncio
- import json
- import logging
- from typing import Optional
- from fastapi import Query, Depends, APIRouter, WebSocket, WebSocketDisconnect
- from fastapi.security import HTTPAuthorizationCredentials
- from broadcaster import Broadcast
- from config.settings import DEVELOPMENT
- from models.chat import NotifyRequest
- from models.user import User
- from services.openai_service.openai_service import generate_completion, admin_completion
- from auth.security import get_current_user
- from redis import Redis
- logger = logging.getLogger(__name__)
- chat_router = APIRouter()
- redis_client = Redis(host='localhost', port=6379, db=1 if DEVELOPMENT else 0, decode_responses=True)
- @chat_router.websocket("/ws")
- async def chat_irc_endpoint(websocket: WebSocket):
- """WebSocket endpoint for real-time chat interactions"""
- global connected_users
-
-
- # Backend de broadcast con Redis
- broadcast = Broadcast("redis://localhost:6379")
- logger.info("New WebSocket connection attempt")
- logger.debug(f"WebSocket query params: {websocket.query_params}")
- current_user = await get_current_user(
- HTTPAuthorizationCredentials(
- scheme="Bearer",
- credentials=websocket.query_params.get("token", "")
- )
- )
- if not current_user:
- await websocket.close(code=1008) # Policy Violation
- return
- await websocket.accept()
- logger.info(f"User {current_user.email} connected to WebSocket chat.")
- username = None
- async def reader():
- """Lee mensajes desde Redis y los manda al WebSocket"""
- logger.info("Iniciando lector de mensajes")
- async with broadcast.subscribe(channel="chat") as subscriber:
- logger.info("Subscribed to Redis channel 'chat'")
- logger.debug("Starting message read loop")
- async for event in subscriber:
- try:
- logger.debug(f"Broadcasting message to WebSocket: {event.message}")
- await websocket.send_text(event.message)
- except Exception:
- logger.error("Error sending message to WebSocket, closing connection.")
- break
- await broadcast.connect()
- reader_task = asyncio.create_task(reader())
- try:
- while True:
- try:
-
- data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0)
- payload = json.loads(data)
- event_type = payload.get("type")
- logger.debug(f"Received event: {event_type} with payload: {payload}")
- if event_type == "join":
- logger.info(f"User {payload['username']} joined the chat.")
- data = {
- "mail": current_user.email,
- "username": payload["username"]
- }
- redis_client.sadd("connected_users", json.dumps(data))
- response = {"type": "join", "username": payload["username"]}
- elif event_type == "message":
- logger.debug(f"Message from {payload['username']}: {payload['message']}")
- message_username = payload["username"]
- message = payload["message"]
- response = {"type": "message", "username": message_username, "message": message}
- redis_client.rpush("chat_history", json.dumps(response))
- redis_client.ltrim("chat_history", -100, -1) # Keep only
- elif event_type == "leave":
- logger.info(f"User {payload['username']} left the chat.")
- message_username = payload["username"]
-
- users = redis_client.smembers("connected_users")
- for user in users:
- user_data = json.loads(user)
- if user_data["username"] == message_username:
- redis_client.srem("connected_users", user)
- break
- response = {"type": "leave", "username": message_username}
- await websocket.close()
- break
- elif event_type == "ai_message":
- messages = redis_client.lrange("chat_history", -15, -1)
- parsed_messages = [json.loads(msg) for msg in messages]
- logger.debug(f"IA Message from {payload['username']}")
- message_username = payload["username"]
- response_content = await generate_completion(parsed_messages, current_user)
- response = {"type": "message", "username": "IAKlein", "message": response_content}
- elif event_type == "notification":
- logger.debug(f"Notification to {payload['username']}")
- notification_message = payload["message"]
- messages = redis_client.lrange("chat_history", -15, -1)
- parsed_messages = [json.loads(msg) for msg in messages]
- response_content = admin_completion(notification_message, parsed_messages)
- response = {"type": "message", "username": "IAKlein", "message": response_content}
-
- elif event_type == "mention":
- logger.debug(f"Mention to {payload['username']}")
- mention_username = payload["username"]
- logger.debug(f"Mention username: {mention_username}, Current username: {username}")
- await broadcast.publish(channel="chat", message=json.dumps({"type": "mentioned", "username": mention_username}))
- continue
- elif event_type == "pong":
- logger.debug(f"Received pong from user {current_user.email}")
- continue
- # Publicar en Redis
- await broadcast.publish(channel="chat", message=json.dumps(response))
- except asyncio.TimeoutError:
- await websocket.send_text(json.dumps({"type": "ping"}))
- except WebSocketDisconnect:
- logger.info(f"User {current_user.email} disconnected from WebSocket chat.")
-
- users = redis_client.smembers("connected_users")
- for user in users:
- user_data = json.loads(user)
- if user_data["mail"] == current_user.email:
- redis_client.srem("connected_users", user)
- break
- response = {"type": "leave", "username": user_data["username"]}
- await broadcast.publish(channel="chat", message=json.dumps(response))
- finally:
- reader_task.cancel()
- await broadcast.disconnect()
- @chat_router.post("/notify")
- async def notify_users(message: NotifyRequest, _: User = Depends(get_current_user)):
- """Send a notification message to all connected users"""
- broadcast = Broadcast("redis://localhost:6379")
- await broadcast.connect()
- await broadcast.publish(channel="chat", message=json.dumps({"type": "notification", "message": message.message}))
- await broadcast.disconnect()
- return {"status": "Notification sent"}
- @chat_router.get("/users")
- async def get_connected_users(q: Optional[str] = Query(None), _: User = Depends(get_current_user)):
- """Get a list of connected users (solo local al worker)"""
- # return {"users": [user.username for user in connected_users if q.lower() in user.username.lower()]}
- all_users = redis_client.smembers("connected_users")
- all_users = [json.loads(user)["username"] for user in all_users]
- if q is None or q.strip() == "":
- return {"users": all_users}
- filtered_users = [user for user in all_users if q.lower() in user.lower()]
- return {"users": filtered_users}
- @chat_router.get("/onlines")
- async def get_online_user_count(_: User = Depends(get_current_user)):
- """Get the count of online users (solo local al worker)"""
- all_users = redis_client.smembers("connected_users")
- return {"count": len(all_users)}
|