security.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. from datetime import datetime, timedelta
  2. from typing import Union
  3. from fastapi import Depends, HTTPException
  4. from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
  5. from logging import getLogger
  6. from pydantic import BaseModel
  7. from config.settings import SECRET_KEY
  8. from jose import jwt, JWTError
  9. from passlib.context import CryptContext
  10. from models.user import User
  11. from services.data_service import UserDataService
  12. from services.logging_service import structured_logger, LogLevel
  13. logger = getLogger(__name__)
  14. ALGORITHM = "HS256"
  15. ACCESS_TOKEN_EXPIRE_DAYS = 60 * 24 * 7
  16. security = HTTPBearer()
  17. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  18. user_data_service = UserDataService()
  19. class TokenData(BaseModel):
  20. email: str
  21. def hash_password(password: str) -> str:
  22. """Hash a password using bcrypt."""
  23. return pwd_context.hash(password)
  24. def generate_token(email: str):
  25. """Generate a JWT token for user authentication."""
  26. logger.info(f"Generating token for user: {email}")
  27. try:
  28. data = {"sub": email}
  29. expires_delta = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)
  30. token = create_access_token(data=data, expires_delta=expires_delta)
  31. structured_logger.log_security_event(
  32. f"JWT token generated for user {email}",
  33. LogLevel.INFO,
  34. {
  35. "email": email,
  36. "token_length": len(token),
  37. "expires_in_days": ACCESS_TOKEN_EXPIRE_DAYS
  38. },
  39. user_email=email
  40. )
  41. logger.debug(f"Generated token for email {email}: {token[:20]}...")
  42. return token
  43. except Exception as e:
  44. error_msg = f"Failed to generate token for {email}: {e}"
  45. logger.error(error_msg)
  46. structured_logger.log_security_event(
  47. f"Token generation failed for user {email}",
  48. LogLevel.ERROR,
  49. {
  50. "email": email,
  51. "error": str(e),
  52. "error_type": type(e).__name__
  53. },
  54. user_email=email
  55. )
  56. raise
  57. def create_access_token(data: dict, expires_delta: timedelta) -> str:
  58. """Create a JWT access token."""
  59. to_encode = data.copy()
  60. if expires_delta:
  61. expire = datetime.utcnow() + expires_delta
  62. else:
  63. expire = datetime.utcnow() + timedelta(days=1)
  64. to_encode.update({"exp": expire})
  65. logger.debug(f"Creating access token with data: {to_encode}")
  66. logger.debug(f"Token expiration time: {expire}")
  67. return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  68. def authenticate_user(email: str, password: str) -> bool:
  69. """Authenticate a user by email and password."""
  70. logger.info(f"Authentication attempt for user: {email}")
  71. try:
  72. user = user_data_service.login(email, password)
  73. if user:
  74. structured_logger.log_security_event(
  75. f"Successful authentication for user {email}",
  76. LogLevel.INFO,
  77. {
  78. "email": email,
  79. "user_id": user.id,
  80. "user_name": user.name
  81. },
  82. user_id=user.id,
  83. user_email=email
  84. )
  85. logger.info(f"Authentication successful for user: {email}")
  86. return True
  87. else:
  88. structured_logger.log_security_event(
  89. f"Failed authentication attempt for user {email}",
  90. LogLevel.WARNING,
  91. {
  92. "email": email,
  93. "reason": "Invalid credentials"
  94. },
  95. user_email=email
  96. )
  97. logger.warning(f"Authentication failed for user: {email}")
  98. return False
  99. except Exception as e:
  100. error_msg = f"Authentication error for {email}: {e}"
  101. logger.error(error_msg)
  102. structured_logger.log_security_event(
  103. f"Authentication error for user {email}",
  104. LogLevel.ERROR,
  105. {
  106. "email": email,
  107. "error": str(e),
  108. "error_type": type(e).__name__
  109. },
  110. user_email=email
  111. )
  112. return False
  113. async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> User:
  114. credentials_exception = HTTPException(
  115. status_code=401,
  116. detail="No se pudieron validar las credenciales",
  117. headers={"WWW-Authenticate": "Bearer"},
  118. )
  119. try:
  120. token = credentials.credentials
  121. logger.debug(f"Decoding token: {token[:20]}...")
  122. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  123. email = payload.get("sub")
  124. if email is None:
  125. logger.error("Token does not contain email")
  126. structured_logger.log_security_event(
  127. "Token validation failed: missing email",
  128. LogLevel.WARNING,
  129. {"reason": "Token missing email subject"}
  130. )
  131. raise credentials_exception
  132. token_data = TokenData(email=email)
  133. except JWTError as e:
  134. logger.error(f"JWTError: Invalid token - {e}")
  135. structured_logger.log_security_event(
  136. "Token validation failed: JWT error",
  137. LogLevel.WARNING,
  138. {
  139. "error": str(e),
  140. "error_type": "JWTError"
  141. }
  142. )
  143. raise credentials_exception
  144. except Exception as e:
  145. logger.error(f"Unexpected error during token validation: {e}")
  146. structured_logger.log_security_event(
  147. "Token validation failed: unexpected error",
  148. LogLevel.ERROR,
  149. {
  150. "error": str(e),
  151. "error_type": type(e).__name__
  152. }
  153. )
  154. raise credentials_exception
  155. try:
  156. user = user_data_service.get_by_email(token_data.email)
  157. if user is None:
  158. logger.error(f"User not found: {token_data.email}")
  159. structured_logger.log_security_event(
  160. f"Token validation failed: user not found",
  161. LogLevel.WARNING,
  162. {
  163. "email": token_data.email,
  164. "reason": "User not found in database"
  165. },
  166. user_email=token_data.email
  167. )
  168. raise credentials_exception
  169. logger.debug(f"User authenticated successfully: {user.email}")
  170. structured_logger.log_security_event(
  171. f"Token validation successful for user {user.email}",
  172. LogLevel.DEBUG,
  173. {
  174. "user_id": user.id,
  175. "email": user.email,
  176. "permissions": user.permissions
  177. },
  178. user_id=user.id,
  179. user_email=user.email
  180. )
  181. return user
  182. except Exception as e:
  183. logger.error(f"Database error during user lookup: {e}")
  184. structured_logger.log_security_event(
  185. "Token validation failed: database error",
  186. LogLevel.ERROR,
  187. {
  188. "email": token_data.email if 'token_data' in locals() else "unknown",
  189. "error": str(e),
  190. "error_type": type(e).__name__
  191. }
  192. )
  193. raise credentials_exception