| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- from datetime import datetime, timedelta
- from typing import Union
- from venv import logger
- from fastapi import Depends, HTTPException
- from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
- from logging import getLogger
- from pydantic import BaseModel
- from config.settings import SECRET_KEY
- from jose import jwt, JWTError
- from passlib.context import CryptContext
- from services.data_service import UserDataService
- logger = getLogger(__name__)
- ALGORITHM = "HS256"
- ACCESS_TOKEN_EXPIRE_DAYS = 60 * 24 * 7
- security = HTTPBearer()
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- user_data_service = UserDataService()
- class TokenData(BaseModel):
- email: str
- def hash_password(password: str) -> str:
- """Hash a password using bcrypt."""
- return pwd_context.hash(password)
- def generate_token(email: str):
- """Generate a JWT token for user authentication."""
- data = {"sub": email}
- expires_delta = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)
- token = create_access_token(data=data, expires_delta=expires_delta)
- logger.debug(f"Generated token for email {email}: {token}")
- return token
- def create_access_token(data: dict, expires_delta: timedelta) -> str:
- """Create a JWT access token."""
- to_encode = data.copy()
- if expires_delta:
- expire = datetime.utcnow() + expires_delta
- else:
- expire = datetime.utcnow() + timedelta(days=1)
-
- to_encode.update({"exp": expire})
- logger.debug(f"Creating access token with data: {to_encode}")
- logger.debug(f"Token expiration time: {expire}")
- return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- def authenticate_user(email: str, password: str) -> bool:
- """Authenticate a user by email and password."""
- user = user_data_service.login(email, password)
- if not user:
- return False
- return True
- async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
- credentials_exception = HTTPException(
- status_code=401,
- detail="No se pudieron validar las credenciales",
- headers={"WWW-Authenticate": "Bearer"},
- )
-
- try:
- token = credentials.credentials
- logger.debug(f"Decoding token: {token}")
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- email = payload.get("sub")
- if email is None:
- logger.error("Token does not contain email")
- raise credentials_exception
- token_data = TokenData(email=email)
- except JWTError:
- logger.error("JWTError: Invalid token")
- raise credentials_exception
-
- user = user_data_service.get_by_email(token_data.email)
- if user is None:
- logger.error(f"User not found: {token_data.email}")
- raise credentials_exception
- return user
|