security.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. from datetime import datetime, timedelta
  2. from typing import Union
  3. from venv import logger
  4. from fastapi import Depends, HTTPException
  5. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  6. from logging import getLogger
  7. from pydantic import BaseModel
  8. from config.settings import SECRET_KEY
  9. from jose import jwt, JWTError
  10. from passlib.context import CryptContext
  11. from services.data_service import UserDataService
  12. logger = getLogger(__name__)
  13. ALGORITHM = "HS256"
  14. ACCESS_TOKEN_EXPIRE_DAYS = 60 * 24 * 7
  15. security = HTTPBearer()
  16. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  17. user_data_service = UserDataService()
  18. class TokenData(BaseModel):
  19. email: str
  20. def hash_password(password: str) -> str:
  21. """Hash a password using bcrypt."""
  22. return pwd_context.hash(password)
  23. def generate_token(email: str):
  24. """Generate a JWT token for user authentication."""
  25. data = {"sub": email}
  26. expires_delta = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)
  27. token = create_access_token(data=data, expires_delta=expires_delta)
  28. logger.debug(f"Generated token for email {email}: {token}")
  29. return token
  30. def create_access_token(data: dict, expires_delta: timedelta) -> str:
  31. """Create a JWT access token."""
  32. to_encode = data.copy()
  33. if expires_delta:
  34. expire = datetime.utcnow() + expires_delta
  35. else:
  36. expire = datetime.utcnow() + timedelta(days=1)
  37. to_encode.update({"exp": expire})
  38. logger.debug(f"Creating access token with data: {to_encode}")
  39. logger.debug(f"Token expiration time: {expire}")
  40. return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  41. def authenticate_user(email: str, password: str) -> bool:
  42. """Authenticate a user by email and password."""
  43. user = user_data_service.login(email, password)
  44. if not user:
  45. return False
  46. return True
  47. async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
  48. credentials_exception = HTTPException(
  49. status_code=401,
  50. detail="No se pudieron validar las credenciales",
  51. headers={"WWW-Authenticate": "Bearer"},
  52. )
  53. try:
  54. token = credentials.credentials
  55. logger.debug(f"Decoding token: {token}")
  56. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  57. email = payload.get("sub")
  58. if email is None:
  59. logger.error("Token does not contain email")
  60. raise credentials_exception
  61. token_data = TokenData(email=email)
  62. except JWTError:
  63. logger.error("JWTError: Invalid token")
  64. raise credentials_exception
  65. user = user_data_service.get_by_email(token_data.email)
  66. if user is None:
  67. logger.error(f"User not found: {token_data.email}")
  68. raise credentials_exception
  69. return user