| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- import asyncio
- from logging import getLogger
- from fastapi import WebSocket, WebSocketDisconnect, Query, status
- from fastapi.routing import APIRouter
- from services.data_service import UserDataService
- from config.settings import SECRET_KEY
- from jose import jwt, JWTError
- # Importar tus configuraciones y servicios existentes
- # from security import SECRET_KEY, ALGORITHM, user_data_service (Ajusta según tu estructura)
- logger = getLogger(__name__)
- debug_router = APIRouter()
- user_data_service = UserDataService()
- async def validate_ws_token(token: str):
- """Valida el token manualmente para WebSocket ya que no usan HTTPBearer standard"""
- try:
- payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
- email = payload.get("sub")
- if email is None:
- return None
-
- # Busca el usuario usando tu servicio existente
- user = user_data_service.get_by_email(email)
- return user
- except (JWTError, Exception):
- return None
- @debug_router.websocket("/logs")
- async def websocket_endpoint(websocket: WebSocket, token: str = Query(...)):
- logger.info("WebSocket connection established")
- # 1. Autenticación
- user = await validate_ws_token(token)
-
- if not user:
- # Cierre con código de política violada si el token es inválido
- await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
- return
- # 2. Verificación de Permisos (Permission >= 2)
- # Asumo que tu modelo User tiene el atributo 'permission'
- if getattr(user, 'permission', 0) < 2:
- await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Permisos insuficientes")
- return
- await websocket.accept()
- # 3. Proceso de Streaming de Logs
- try:
- # Ejecuta journalctl en modo 'follow' (-f) para el servicio específico
- # -n 50 trae las ultimas 50 lineas para contexto inicial
- process = await asyncio.create_subprocess_exec(
- "journalctl", "-u", "pedidos_express", "-f", "-n", "50", "--output", "cat",
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE
- )
- # Bucle de lectura asíncrona
- while True:
- line = await process.stdout.readline()
- if line:
- # Decodificar bytes a string y enviar
- await websocket.send_text(line.decode("utf-8").strip())
- else:
- # Pequeña pausa si no hay datos para no saturar el CPU
- await asyncio.sleep(0.1)
- except WebSocketDisconnect:
- # El cliente se desconectó, matar el proceso de journalctl para no dejar zombies
- if process.returncode is None:
- process.terminate()
- await process.wait()
-
- except Exception as e:
- logger.error(f"Error en websocket logs: {e}")
- if process.returncode is None:
- process.terminate()
- await websocket.close(code=status.WS_1011_INTERNAL_ERROR)
|