debug.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import asyncio
  2. from logging import getLogger
  3. from fastapi import WebSocket, WebSocketDisconnect, Query, status
  4. from fastapi.routing import APIRouter
  5. from services.data_service import UserDataService
  6. from config.settings import SECRET_KEY
  7. from jose import jwt, JWTError
  8. # Importar tus configuraciones y servicios existentes
  9. # from security import SECRET_KEY, ALGORITHM, user_data_service (Ajusta según tu estructura)
  10. logger = getLogger(__name__)
  11. debug_router = APIRouter()
  12. user_data_service = UserDataService()
  13. async def validate_ws_token(token: str):
  14. """Valida el token manualmente para WebSocket ya que no usan HTTPBearer standard"""
  15. try:
  16. payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
  17. email = payload.get("sub")
  18. if email is None:
  19. return None
  20. # Busca el usuario usando tu servicio existente
  21. user = user_data_service.get_by_email(email)
  22. return user
  23. except (JWTError, Exception):
  24. return None
  25. @debug_router.websocket("/logs")
  26. async def websocket_endpoint(websocket: WebSocket, token: str = Query(...)):
  27. logger.info("WebSocket connection established")
  28. # 1. Autenticación
  29. user = await validate_ws_token(token)
  30. if not user:
  31. # Cierre con código de política violada si el token es inválido
  32. await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
  33. return
  34. # 2. Verificación de Permisos (Permission >= 2)
  35. # Asumo que tu modelo User tiene el atributo 'permission'
  36. if getattr(user, 'permissions', 0) < 2:
  37. await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Permisos insuficientes")
  38. return
  39. await websocket.accept()
  40. # 3. Proceso de Streaming de Logs
  41. try:
  42. # Ejecuta journalctl en modo 'follow' (-f) para el servicio específico
  43. # -n 50 trae las ultimas 50 lineas para contexto inicial
  44. process = await asyncio.create_subprocess_exec(
  45. "journalctl", "-u", "pedidos_express", "-f", "-n", "50", "--output", "cat",
  46. stdout=asyncio.subprocess.PIPE,
  47. stderr=asyncio.subprocess.PIPE
  48. )
  49. # Bucle de lectura asíncrona
  50. while True:
  51. line = await process.stdout.readline() #type: ignore
  52. if line:
  53. # Decodificar bytes a string y enviar
  54. await websocket.send_text(line.decode("utf-8").strip())
  55. else:
  56. # Pequeña pausa si no hay datos para no saturar el CPU
  57. await asyncio.sleep(0.1)
  58. except WebSocketDisconnect:
  59. # El cliente se desconectó, matar el proceso de journalctl para no dejar zombies
  60. if not process: #type: ignore
  61. return
  62. if process.returncode is None:
  63. process.terminate()
  64. await process.wait()
  65. except Exception as e:
  66. if not process: #type: ignore
  67. return
  68. logger.error(f"Error en websocket logs: {e}")
  69. if process.returncode is None:
  70. process.terminate()
  71. await websocket.close(code=status.WS_1011_INTERNAL_ERROR)