|
|
@@ -0,0 +1,76 @@
|
|
|
+import asyncio
|
|
|
+from fastapi import WebSocket, WebSocketDisconnect, Query, status
|
|
|
+from fastapi.routing import APIRouter
|
|
|
+from services.data_service import UserDataService
|
|
|
+from config.settings import SECRET_KEY
|
|
|
+from jose import jwt, JWTError
|
|
|
+
|
|
|
+# Importar tus configuraciones y servicios existentes
|
|
|
+# from security import SECRET_KEY, ALGORITHM, user_data_service (Ajusta según tu estructura)
|
|
|
+
|
|
|
+debug_router = APIRouter()
|
|
|
+user_data_service = UserDataService()
|
|
|
+
|
|
|
+async def validate_ws_token(token: str):
|
|
|
+ """Valida el token manualmente para WebSocket ya que no usan HTTPBearer standard"""
|
|
|
+ try:
|
|
|
+ payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
|
|
|
+ email = payload.get("sub")
|
|
|
+ if email is None:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # Busca el usuario usando tu servicio existente
|
|
|
+ user = user_data_service.get_by_email(email)
|
|
|
+ return user
|
|
|
+ except (JWTError, Exception):
|
|
|
+ return None
|
|
|
+
|
|
|
+@debug_router.websocket("/logs")
|
|
|
+async def websocket_endpoint(websocket: WebSocket, token: str = Query(...)):
|
|
|
+ # 1. Autenticación
|
|
|
+ user = await validate_ws_token(token)
|
|
|
+
|
|
|
+ if not user:
|
|
|
+ # Cierre con código de política violada si el token es inválido
|
|
|
+ await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
|
|
|
+ return
|
|
|
+
|
|
|
+ # 2. Verificación de Permisos (Permission >= 2)
|
|
|
+ # Asumo que tu modelo User tiene el atributo 'permission'
|
|
|
+ if getattr(user, 'permission', 0) < 2:
|
|
|
+ await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Permisos insuficientes")
|
|
|
+ return
|
|
|
+
|
|
|
+ await websocket.accept()
|
|
|
+
|
|
|
+ # 3. Proceso de Streaming de Logs
|
|
|
+ try:
|
|
|
+ # Ejecuta journalctl en modo 'follow' (-f) para el servicio específico
|
|
|
+ # -n 50 trae las ultimas 50 lineas para contexto inicial
|
|
|
+ process = await asyncio.create_subprocess_exec(
|
|
|
+ "journalctl", "-u", "pedidos_express", "-f", "-n", "50", "--output", "cat",
|
|
|
+ stdout=asyncio.subprocess.PIPE,
|
|
|
+ stderr=asyncio.subprocess.PIPE
|
|
|
+ )
|
|
|
+
|
|
|
+ # Bucle de lectura asíncrona
|
|
|
+ while True:
|
|
|
+ line = await process.stdout.readline()
|
|
|
+ if line:
|
|
|
+ # Decodificar bytes a string y enviar
|
|
|
+ await websocket.send_text(line.decode("utf-8").strip())
|
|
|
+ else:
|
|
|
+ # Pequeña pausa si no hay datos para no saturar el CPU
|
|
|
+ await asyncio.sleep(0.1)
|
|
|
+
|
|
|
+ except WebSocketDisconnect:
|
|
|
+ # El cliente se desconectó, matar el proceso de journalctl para no dejar zombies
|
|
|
+ if process.returncode is None:
|
|
|
+ process.terminate()
|
|
|
+ await process.wait()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error en websocket logs: {e}")
|
|
|
+ if process.returncode is None:
|
|
|
+ process.terminate()
|
|
|
+ await websocket.close(code=status.WS_1011_INTERNAL_ERROR)
|