security.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. from datetime import datetime, timedelta
  2. from typing import Union
  3. from venv import logger
  4. from fastapi import Depends, HTTPException
  5. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  6. from typing import Annotated
  7. from logging import getLogger
  8. from pydantic import BaseModel
  9. from config.settings import SECRET_KEY
  10. from jose import jwt, JWTError
  11. from passlib.context import CryptContext
  12. from services.data_service import UserDataService
  13. logger = getLogger(__name__)
  14. ALGORITHM = "HS256"
  15. ACCESS_TOKEN_EXPIRE_DAYS = 60 * 24 * 7
  16. security = HTTPBearer()
  17. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  18. user_data_service = UserDataService()
  19. class TokenData(BaseModel):
  20. email: str
  21. def hash_password(password: str) -> str:
  22. """Hash a password using bcrypt."""
  23. return pwd_context.hash(password)
  24. def generate_token(email: str):
  25. """Generate a JWT token for user authentication."""
  26. data = {"sub": email}
  27. expires_delta = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)
  28. token = create_access_token(data=data, expires_delta=expires_delta)
  29. logger.debug(f"Generated token for email {email}: {token}")
  30. return token
  31. def create_access_token(data: dict, expires_delta: timedelta) -> str:
  32. """Create a JWT access token."""
  33. to_encode = data.copy()
  34. if expires_delta:
  35. expire = datetime.utcnow() + expires_delta
  36. else:
  37. expire = datetime.utcnow() + timedelta(days=1)
  38. to_encode.update({"exp": expire})
  39. logger.debug(f"Creating access token with data: {to_encode}")
  40. logger.debug(f"Token expiration time: {expire}")
  41. return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  42. def authenticate_user(email: str, password: str) -> bool:
  43. """Authenticate a user by email and password."""
  44. user = user_data_service.login(email, password)
  45. if not user:
  46. return False
  47. return True
  48. async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
  49. credentials_exception = HTTPException(
  50. status_code=401,
  51. detail="No se pudieron validar las credenciales",
  52. headers={"WWW-Authenticate": "Bearer"},
  53. )
  54. try:
  55. token = credentials.credentials
  56. logger.debug(f"Decoding token: {token}")
  57. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  58. email = payload.get("sub")
  59. if email is None:
  60. logger.error("Token does not contain email")
  61. raise credentials_exception
  62. token_data = TokenData(email=email)
  63. except JWTError:
  64. logger.error("JWTError: Invalid token")
  65. raise credentials_exception
  66. user = user_data_service.get_by_email(token_data.email)
  67. if user is None:
  68. logger.error(f"User not found: {token_data.email}")
  69. raise credentials_exception
  70. return user