from datetime import datetime, timedelta from typing import Union from venv import logger from fastapi import Depends, HTTPException from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from logging import getLogger from pydantic import BaseModel from config.settings import SECRET_KEY from jose import jwt, JWTError from passlib.context import CryptContext from services.data_service import UserDataService logger = getLogger(__name__) ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = 60 * 24 * 7 security = HTTPBearer() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") user_data_service = UserDataService() class TokenData(BaseModel): email: str def hash_password(password: str) -> str: """Hash a password using bcrypt.""" return pwd_context.hash(password) def generate_token(email: str): """Generate a JWT token for user authentication.""" data = {"sub": email} expires_delta = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS) token = create_access_token(data=data, expires_delta=expires_delta) logger.debug(f"Generated token for email {email}: {token}") return token def create_access_token(data: dict, expires_delta: timedelta) -> str: """Create a JWT access token.""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(days=1) to_encode.update({"exp": expire}) logger.debug(f"Creating access token with data: {to_encode}") logger.debug(f"Token expiration time: {expire}") return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def authenticate_user(email: str, password: str) -> bool: """Authenticate a user by email and password.""" user = user_data_service.login(email, password) if not user: return False return True async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): credentials_exception = HTTPException( status_code=401, detail="No se pudieron validar las credenciales", headers={"WWW-Authenticate": "Bearer"}, ) try: token = credentials.credentials logger.debug(f"Decoding token: {token}") payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) email = payload.get("sub") if email is None: logger.error("Token does not contain email") raise credentials_exception token_data = TokenData(email=email) except JWTError: logger.error("JWTError: Invalid token") raise credentials_exception user = user_data_service.get_by_email(token_data.email) if user is None: logger.error(f"User not found: {token_data.email}") raise credentials_exception return user