security.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. from typing import Union
  2. from venv import logger
  3. from fastapi import Request, HTTPException, Header, Depends
  4. from typing import Annotated
  5. import secrets
  6. from logging import getLogger
  7. logger = getLogger(__name__)
  8. async def get_session_token(request: Request) -> Union[str, None]:
  9. """Get the anti-abuse token from the session"""
  10. return request.session.get("antiAbuseToken")
  11. async def protect_chat_api(
  12. request: Request,
  13. x_app_token: Annotated[Union[str, None], Header(alias="X-App-Token")] = None,
  14. session_token: Annotated[Union[str, None], Depends(get_session_token)] = None
  15. ):
  16. """Protect chat API endpoints with token validation"""
  17. # Equivalent to protectChatAPI middleware
  18. if not session_token:
  19. if request.client:
  20. logger.error(f"Session token is not initialized or invalid. IP: {request.client.host}")
  21. else:
  22. logger.error("Session token is not initialized or invalid.")
  23. logger.error("Session token is not initialized or invalid.")
  24. raise HTTPException(status_code=403, detail="Acceso denegado: Sesión inválida o token no inicializado.")
  25. if not x_app_token:
  26. if request.client:
  27. logger.error(f"X-App-Token is missing. IP: {request.client.host}")
  28. else:
  29. logger.error("X-App-Token is missing.")
  30. raise HTTPException(status_code=401, detail="Acceso denegado: Falta el token X-Chat-Token.")
  31. if x_app_token != session_token:
  32. # Log this attempt for security monitoring
  33. logger.warning(f"Invalid token attempt. Expected: {session_token}, Received: {x_app_token}")
  34. raise HTTPException(status_code=403, detail="Acceso denegado: Token inválido.")
  35. return True # Protection passed
  36. def generate_anti_abuse_token() -> str:
  37. """Generate a new anti-abuse token"""
  38. return secrets.token_hex(32)