from datetime import datetime, timedelta from typing import Union from fastapi import Depends, HTTPException from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from logging import getLogger from pydantic import BaseModel from config.settings import SECRET_KEY from jose import jwt, JWTError from passlib.context import CryptContext from models.user import User from services.data_service import UserDataService from services.logging_service import structured_logger, LogLevel logger = getLogger(__name__) ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = 7 security = HTTPBearer() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") user_data_service = UserDataService() class TokenData(BaseModel): email: str def hash_password(password: str) -> str: """Hash a password using bcrypt.""" return pwd_context.hash(password) def generate_token(email: str): """Generate a JWT token for user authentication.""" logger.info(f"Generating token for user: {email}") try: data = {"sub": email} expires_delta = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS) token = create_access_token(data=data, expires_delta=expires_delta) structured_logger.log_security_event( f"JWT token generated for user {email}", LogLevel.INFO, { "email": email, "token_length": len(token), "expires_in_days": ACCESS_TOKEN_EXPIRE_DAYS }, user_email=email ) logger.debug(f"Generated token for email {email}: {token[:20]}...") return token except Exception as e: error_msg = f"Failed to generate token for {email}: {e}" logger.error(error_msg) structured_logger.log_security_event( f"Token generation failed for user {email}", LogLevel.ERROR, { "email": email, "error": str(e), "error_type": type(e).__name__ }, user_email=email ) raise def create_access_token(data: dict, expires_delta: timedelta) -> str: """Create a JWT access token.""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(days=1) to_encode.update({"exp": expire}) logger.debug(f"Creating access token with data: {to_encode}") logger.debug(f"Token expiration time: {expire}") return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def authenticate_user(email: str, password: str) -> bool: """Authenticate a user by email and password.""" logger.info(f"Authentication attempt for user: {email}") try: user = user_data_service.login(email, password) if user: structured_logger.log_security_event( f"Successful authentication for user {email}", LogLevel.INFO, { "email": email, "user_id": user.id, "user_name": user.name }, user_id=user.id, user_email=email ) logger.info(f"Authentication successful for user: {email}") return True else: structured_logger.log_security_event( f"Failed authentication attempt for user {email}", LogLevel.WARNING, { "email": email, "reason": "Invalid credentials" }, user_email=email ) logger.warning(f"Authentication failed for user: {email}") return False except Exception as e: error_msg = f"Authentication error for {email}: {e}" logger.error(error_msg) structured_logger.log_security_event( f"Authentication error for user {email}", LogLevel.ERROR, { "email": email, "error": str(e), "error_type": type(e).__name__ }, user_email=email ) return False async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> User: credentials_exception = HTTPException( status_code=401, detail="No se pudieron validar las credenciales", headers={"WWW-Authenticate": "Bearer"}, ) try: token = credentials.credentials logger.debug(f"Decoding token: {token[:20]}...") payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) email = payload.get("sub") if email is None: logger.error("Token does not contain email") structured_logger.log_security_event( "Token validation failed: missing email", LogLevel.WARNING, {"reason": "Token missing email subject"} ) raise credentials_exception token_data = TokenData(email=email) except JWTError as e: logger.error(f"JWTError: Invalid token - {e}") structured_logger.log_security_event( "Token validation failed: JWT error", LogLevel.WARNING, { "error": str(e), "error_type": "JWTError" } ) raise credentials_exception except Exception as e: logger.error(f"Unexpected error during token validation: {e}") structured_logger.log_security_event( "Token validation failed: unexpected error", LogLevel.ERROR, { "error": str(e), "error_type": type(e).__name__ } ) raise credentials_exception try: user = user_data_service.get_by_email(token_data.email) if user is None: logger.error(f"User not found: {token_data.email}") structured_logger.log_security_event( f"Token validation failed: user not found", LogLevel.WARNING, { "email": token_data.email, "reason": "User not found in database" }, user_email=token_data.email ) raise credentials_exception logger.debug(f"User authenticated successfully: {user.email}") structured_logger.log_security_event( f"Token validation successful for user {user.email}", LogLevel.DEBUG, { "user_id": user.id, "email": user.email, "permissions": user.permissions }, user_id=user.id, user_email=user.email ) return user except Exception as e: logger.error(f"Database error during user lookup: {e}") structured_logger.log_security_event( "Token validation failed: database error", LogLevel.ERROR, { "email": token_data.email if 'token_data' in locals() else "unknown", "error": str(e), "error_type": type(e).__name__ } ) raise credentials_exception