| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- from datetime import datetime, timedelta
- from typing import Union
- from fastapi import Depends, HTTPException
- from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
- from logging import getLogger
- from pydantic import BaseModel
- from config.settings import SECRET_KEY
- from jose import jwt, JWTError
- from passlib.context import CryptContext
- from models.user import User
- from services.data_service import UserDataService
- logger = getLogger(__name__)
- ALGORITHM = "HS256"
- ACCESS_TOKEN_EXPIRE_DAYS = 7
- security = HTTPBearer()
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- user_data_service = UserDataService()
- class TokenData(BaseModel):
- email: str
- def hash_password(password: str) -> str:
- """Hash a password using bcrypt."""
- return pwd_context.hash(password)
- def generate_token(email: str):
- """Generate a JWT token for user authentication."""
- logger.info(f"Generating token for user: {email}")
-
- try:
- data = {"sub": email}
- expires_delta = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)
- token = create_access_token(data=data, expires_delta=expires_delta)
-
-
- logger.debug(f"Generated token for email {email}: {token[:20]}...")
- return token
-
- except Exception as e:
- error_msg = f"Failed to generate token for {email}: {e}"
- logger.error(error_msg)
-
- raise
- def create_access_token(data: dict, expires_delta: timedelta) -> str:
- """Create a JWT access token."""
- to_encode = data.copy()
- if expires_delta:
- expire = datetime.utcnow() + expires_delta
- else:
- expire = datetime.utcnow() + timedelta(days=1)
-
- to_encode.update({"exp": expire})
- logger.debug(f"Creating access token with data: {to_encode}")
- logger.debug(f"Token expiration time: {expire}")
- return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- def authenticate_user(email: str, password: str) -> bool:
- """Authenticate a user by email and password."""
- logger.info(f"Authentication attempt for user: {email}")
-
- try:
- user = user_data_service.login(email, password)
-
- if user:
- logger.info(f"Authentication successful for user: {email}")
- return True
- else:
-
- logger.warning(f"Authentication failed for user: {email}")
- return False
-
- except Exception as e:
- error_msg = f"Authentication error for {email}: {e}"
- logger.error(error_msg)
-
- return False
- async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> User:
- credentials_exception = HTTPException(
- status_code=401,
- detail="No se pudieron validar las credenciales",
- headers={"WWW-Authenticate": "Bearer"},
- )
-
- try:
- token = credentials.credentials
- logger.debug(f"Decoding token: {token[:20]}...")
-
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- email = payload.get("sub")
-
- if email is None:
- logger.error("Token does not contain email")
-
- raise credentials_exception
-
- token_data = TokenData(email=email)
-
- except JWTError as e:
- logger.error(f"JWTError: Invalid token - {e}")
-
- raise credentials_exception
- except Exception as e:
- logger.error(f"Unexpected error during token validation: {e}")
-
- raise credentials_exception
-
- try:
- user = user_data_service.get_by_email(token_data.email)
-
- if user is None:
- logger.error(f"User not found: {token_data.email}")
-
- raise credentials_exception
-
- logger.debug(f"User authenticated successfully: {user.email}")
-
- return user
-
- except Exception as e:
- logger.error(f"Database error during user lookup: {e}")
-
- raise credentials_exception
|