security.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. from datetime import datetime, timedelta
  2. from typing import Union
  3. from fastapi import Depends, HTTPException
  4. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  5. from logging import getLogger
  6. from pydantic import BaseModel
  7. from config.settings import SECRET_KEY
  8. from jose import jwt, JWTError
  9. from passlib.context import CryptContext
  10. from models.user import User
  11. from services.data_service import UserDataService
  12. logger = getLogger(__name__)
  13. ALGORITHM = "HS256"
  14. ACCESS_TOKEN_EXPIRE_DAYS = 7
  15. security = HTTPBearer()
  16. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  17. user_data_service = UserDataService()
  18. class TokenData(BaseModel):
  19. email: str
  20. def hash_password(password: str) -> str:
  21. """Hash a password using bcrypt."""
  22. return pwd_context.hash(password)
  23. def generate_token(email: str):
  24. """Generate a JWT token for user authentication."""
  25. logger.info(f"Generating token for user: {email}")
  26. try:
  27. data = {"sub": email}
  28. expires_delta = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)
  29. token = create_access_token(data=data, expires_delta=expires_delta)
  30. logger.debug(f"Generated token for email {email}: {token[:20]}...")
  31. return token
  32. except Exception as e:
  33. error_msg = f"Failed to generate token for {email}: {e}"
  34. logger.error(error_msg)
  35. raise
  36. def create_access_token(data: dict, expires_delta: timedelta) -> str:
  37. """Create a JWT access token."""
  38. to_encode = data.copy()
  39. if expires_delta:
  40. expire = datetime.utcnow() + expires_delta
  41. else:
  42. expire = datetime.utcnow() + timedelta(days=1)
  43. to_encode.update({"exp": expire})
  44. logger.debug(f"Creating access token with data: {to_encode}")
  45. logger.debug(f"Token expiration time: {expire}")
  46. return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  47. def authenticate_user(email: str, password: str) -> bool:
  48. """Authenticate a user by email and password."""
  49. logger.info(f"Authentication attempt for user: {email}")
  50. try:
  51. user = user_data_service.login(email, password)
  52. if user:
  53. logger.info(f"Authentication successful for user: {email}")
  54. return True
  55. else:
  56. logger.warning(f"Authentication failed for user: {email}")
  57. return False
  58. except Exception as e:
  59. error_msg = f"Authentication error for {email}: {e}"
  60. logger.error(error_msg)
  61. return False
  62. async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> User:
  63. credentials_exception = HTTPException(
  64. status_code=401,
  65. detail="No se pudieron validar las credenciales",
  66. headers={"WWW-Authenticate": "Bearer"},
  67. )
  68. try:
  69. token = credentials.credentials
  70. logger.debug(f"Decoding token: {token[:20]}...")
  71. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  72. email = payload.get("sub")
  73. if email is None:
  74. logger.error("Token does not contain email")
  75. raise credentials_exception
  76. token_data = TokenData(email=email)
  77. except JWTError as e:
  78. logger.error(f"JWTError: Invalid token - {e}")
  79. raise credentials_exception
  80. except Exception as e:
  81. logger.error(f"Unexpected error during token validation: {e}")
  82. raise credentials_exception
  83. try:
  84. user = user_data_service.get_by_email(token_data.email)
  85. if user is None:
  86. logger.error(f"User not found: {token_data.email}")
  87. raise credentials_exception
  88. logger.debug(f"User authenticated successfully: {user.email}")
  89. return user
  90. except Exception as e:
  91. logger.error(f"Database error during user lookup: {e}")
  92. raise credentials_exception