import asyncio from logging import getLogger from fastapi import WebSocket, WebSocketDisconnect, Query, status from fastapi.routing import APIRouter from services.data_service import UserDataService from config.settings import SECRET_KEY from jose import jwt, JWTError # Importar tus configuraciones y servicios existentes # from security import SECRET_KEY, ALGORITHM, user_data_service (Ajusta según tu estructura) logger = getLogger(__name__) debug_router = APIRouter() user_data_service = UserDataService() async def validate_ws_token(token: str): """Valida el token manualmente para WebSocket ya que no usan HTTPBearer standard""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) email = payload.get("sub") if email is None: return None # Busca el usuario usando tu servicio existente user = user_data_service.get_by_email(email) return user except (JWTError, Exception): return None @debug_router.websocket("/logs") async def websocket_endpoint(websocket: WebSocket, token: str = Query(...)): logger.info("WebSocket connection established") # 1. Autenticación user = await validate_ws_token(token) if not user: # Cierre con código de política violada si el token es inválido await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return # 2. Verificación de Permisos (Permission >= 2) # Asumo que tu modelo User tiene el atributo 'permission' if getattr(user, 'permission', 0) < 2: await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Permisos insuficientes") return await websocket.accept() # 3. Proceso de Streaming de Logs try: # Ejecuta journalctl en modo 'follow' (-f) para el servicio específico # -n 50 trae las ultimas 50 lineas para contexto inicial process = await asyncio.create_subprocess_exec( "journalctl", "-u", "pedidos_express", "-f", "-n", "50", "--output", "cat", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Bucle de lectura asíncrona while True: line = await process.stdout.readline() if line: # Decodificar bytes a string y enviar await websocket.send_text(line.decode("utf-8").strip()) else: # Pequeña pausa si no hay datos para no saturar el CPU await asyncio.sleep(0.1) except WebSocketDisconnect: # El cliente se desconectó, matar el proceso de journalctl para no dejar zombies if process.returncode is None: process.terminate() await process.wait() except Exception as e: logger.error(f"Error en websocket logs: {e}") if process.returncode is None: process.terminate() await websocket.close(code=status.WS_1011_INTERNAL_ERROR)