| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- import asyncio
- import json
- import logging
- from typing import Optional
- from fastapi import Query, Depends, APIRouter, WebSocket, WebSocketDisconnect
- from fastapi.security import HTTPAuthorizationCredentials
- from broadcaster import Broadcast
- from config.settings import DEVELOPMENT
- from models.chat import NotifyRequest
- from models.user import User
- from services.openai_service.openai_service import generate_completion, admin_completion
- from auth.security import get_current_user
- from redis import Redis
- from utils.responses import success_response
- logger = logging.getLogger(__name__)
- chat_router = APIRouter()
- redis_client = Redis(host='localhost', port=6379, db=1 if DEVELOPMENT else 0, decode_responses=True)
- broadcast_channel = "chat" if not DEVELOPMENT else "chat_dev"
- @chat_router.websocket("/ws")
- async def chat_irc_endpoint(websocket: WebSocket):
- """WebSocket endpoint for real-time chat interactions"""
-
- # Backend de broadcast con Redis
- broadcast = Broadcast("redis://localhost:6379")
- logger.info("New WebSocket connection attempt")
- logger.debug(f"WebSocket query params: {websocket.query_params}")
- current_user = await get_current_user(
- HTTPAuthorizationCredentials(
- scheme="Bearer",
- credentials=websocket.query_params.get("token", "")
- )
- )
- if not current_user:
- await websocket.close(code=1008) # Policy Violation
- return
- await websocket.accept()
- logger.info(f"User {current_user.email} connected to WebSocket chat.")
- username = None
- async def reader():
- """Lee mensajes desde Redis y los manda al WebSocket"""
- logger.info("Iniciando lector de mensajes")
- async with broadcast.subscribe(channel=broadcast_channel) as subscriber:
- logger.info(f"Subscribed to Redis channel '{broadcast_channel}'")
- logger.debug("Starting message read loop")
- try:
- async for event in subscriber: # type: ignore
- try:
- logger.debug(f"Broadcasting message to WebSocket: {event.message}")
- await websocket.send_text(event.message)
- except Exception as e:
- logger.error(f"Error sending message to WebSocket: {e}, closing connection.")
- break
- except asyncio.CancelledError:
- logger.info("Conection closed by client")
- await broadcast.connect()
- reader_task = asyncio.create_task(reader())
- try:
- while True:
- try:
-
- data = await asyncio.wait_for(websocket.receive_text(), timeout=30.0)
- logger.debug(f"Received data from WebSocket: {data}")
- payload = json.loads(data)
- event_type = payload.get("type")
- logger.debug(f"Received event: {event_type} with payload: {payload}")
- if event_type == "join":
- logger.info(f"User {payload['username']} joined the chat.")
- data = {
- "mail": current_user.email,
- "username": payload["username"]
- }
- redis_client.sadd("connected_users", json.dumps(data))
- response = {"type": "join", "username": payload["username"]}
- elif event_type == "message":
- logger.debug(f"Message from {payload['username']}: {payload['message']}")
- message_username = payload["username"]
- message = payload["message"]
- response = {"type": "message", "username": message_username, "message": message}
- redis_client.rpush("chat_history", json.dumps(response))
- redis_client.ltrim("chat_history", -100, -1) # Keep only
- elif event_type == "leave":
- logger.info(f"User {payload['username']} left the chat.")
- message_username = payload["username"]
-
- users = redis_client.smembers("connected_users")
- for user in users: # type: ignore
- user_data = json.loads(user)
- if user_data["username"] == message_username:
- redis_client.srem("connected_users", user)
- break
- response = {"type": "leave", "username": message_username}
- await websocket.close()
- break
- elif event_type == "ai_message":
- messages = redis_client.lrange("chat_history", -15, -1)
- parsed_messages = [json.loads(msg) for msg in messages] # type: ignore
- logger.debug(f"IA Message from {payload['username']}")
- message_username = payload["username"]
- response_content = await generate_completion(parsed_messages, current_user)
- redis_client.rpush("chat_history", json.dumps({"type": "ai_message", "username": "IAKlein", "message": response_content}))
- response = {"type": "message", "username": "IAKlein", "message": response_content}
- elif event_type == "mention":
- logger.debug(f"Mention to {payload['username']}")
- mention_username = payload["username"]
- logger.debug(f"Mention username: {mention_username}, Current username: {username}")
- await broadcast.publish(channel=broadcast_channel, message=json.dumps({"type": "mentioned", "username": mention_username}))
- continue
- elif event_type == "pong":
- continue
- # Publicar en Redis
- await broadcast.publish(channel=broadcast_channel, message=json.dumps(response)) # type: ignore
- except asyncio.TimeoutError:
- await websocket.send_text(json.dumps({"type": "ping"}))
- except WebSocketDisconnect:
- logger.info(f"User {current_user.email} disconnected from WebSocket chat.")
-
- users = redis_client.smembers("connected_users")
- for user in users: # type: ignore
- user_data = json.loads(user)
- if user_data["mail"] == current_user.email:
- redis_client.srem("connected_users", user)
- break
- response = {"type": "leave", "username": user_data["username"]} # type: ignore
- await broadcast.publish(channel=broadcast_channel, message=json.dumps(response))
- finally:
- reader_task.cancel()
- await broadcast.disconnect()
- @chat_router.post("/notify")
- async def notify_users(message: NotifyRequest, _: User = Depends(get_current_user)):
- broadcast = Broadcast("redis://localhost:6379")
- await broadcast.connect()
-
- # Obtener historial de mensajes para generar respuesta de IA
- messages = redis_client.lrange("chat_history", -15, -1)
- parsed_messages = [json.loads(msg) for msg in messages] # type: ignore
-
- # Generar respuesta de IA para la notificación
- logger.debug(f"Processing notification: {message.message}")
- response_content = admin_completion(message.message, parsed_messages)
-
- # Enviar directamente la respuesta de IA como mensaje
- response = {"type": "message", "username": "IAKlein", "message": response_content}
- await broadcast.publish(channel=broadcast_channel, message=json.dumps(response))
-
- await broadcast.disconnect()
- return success_response({}, message="Notificación enviada correctamente")
- @chat_router.get("/users")
- async def get__connected_users(q: Optional[str] = Query(None), _: User = Depends(get_current_user)):
- """Get a list of connected users (solo local al worker)"""
- # return {"users": [user.username for user in connected_users if q.lower() in user.username.lower()]}
- all_users = redis_client.smembers("connected_users")
- all_users = [json.loads(user)["username"] for user in all_users] # type: ignore
- if q is None or q.strip() == "":
- return success_response(data=all_users)
- filtered_users = [user for user in all_users if q.lower() in user.lower()]
- return success_response(data={"users": filtered_users})
- @chat_router.get("/onlines")
- async def get_online_user_count(_: User = Depends(get_current_user)):
- """Get the count of online users (solo local al worker)"""
- all_users = redis_client.smembers("connected_users")
- return success_response(data={"count": len(all_users)}) # type: ignore
|