users.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. from logging import getLogger
  2. from math import log
  3. from fastapi import APIRouter
  4. from fastapi.responses import JSONResponse
  5. from httpx import RequestError
  6. import redis
  7. import config
  8. from models import user
  9. from models.user import LoginRequest, UserIDRequest, RegisterUserRequest
  10. from services.data_service import UserDataService
  11. from cryptography.fernet import Fernet
  12. from config.settings import PIN_KEY
  13. from auth.security import generate_token
  14. from services.email_service import send_email
  15. from config.mails import REGISTER_MAIL
  16. from config.settings import APPNAME
  17. from config.messages import ErrorResponse, SuccessResponse, UserResponse
  18. fernet = Fernet(PIN_KEY.encode())
  19. logger = getLogger(__name__)
  20. user_data_service = UserDataService()
  21. user_router = APIRouter()
  22. def unique_pin_generate():
  23. """Generate a unique 4-digit PIN"""
  24. import random
  25. pin = str(random.randint(1000, 9999))
  26. return pin
  27. @user_router.post("/exists")
  28. async def exists_user(request: UserIDRequest):
  29. """Check if user exists"""
  30. user = user_data_service.get_by_id(request.id)
  31. if user:
  32. return JSONResponse(status_code=200, content={"exists": True, "message": UserResponse.USER_EXISTS})
  33. else:
  34. return JSONResponse(status_code=404, content={"exists": False, "message": UserResponse.USER_DOES_NOT_EXIST})
  35. @user_router.post("/register")
  36. async def register_user(request: RegisterUserRequest):
  37. """Register a new user"""
  38. pin = unique_pin_generate()
  39. userID = user_data_service.create(request.name, request.email, request.rut, pin)
  40. if userID == -1:
  41. return JSONResponse(status_code=400, content={"message": UserResponse.USER_ALREADY_EXISTS})
  42. user = user_data_service.get_by_id(userID)
  43. if not user:
  44. return JSONResponse(status_code=500, content={"message": ErrorResponse.USER_CREATION_ERROR})
  45. send_email(
  46. REGISTER_MAIL["subject"],
  47. REGISTER_MAIL["body"],
  48. [user.email], name=user.name, app_name=APPNAME, pin=pin
  49. )
  50. return JSONResponse(status_code=201, content={"message": SuccessResponse.USER_CREATED_SUCCESS, "data": {
  51. **user.model_dump(exclude={"pin_hash"}),
  52. "token": generate_token(user.email)
  53. }})
  54. @user_router.post("/login")
  55. async def login_user(request: LoginRequest):
  56. """Login user with email and PIN"""
  57. redis_client = redis.Redis(host='localhost', port=6379, db=0)
  58. logger.debug(f"Login attempt for email: {request.email}")
  59. is_blocked = redis_client.get(f"blocked:{request.email}")
  60. if is_blocked:
  61. logger.warning(f"Login attempt for blocked user: {request.email}, locked: {is_blocked}")
  62. return JSONResponse(status_code=403, content={"message": UserResponse.USER_FORMAT_BLOCKED.format(time=f"{int(str(redis_client.ttl(f'blocked:{request.email}'))) // 60} minutos")})
  63. else:
  64. logger.info(f"{redis_client.get(f'blocked:{request.email}')}")
  65. user = user_data_service.login(request.email, request.pin)
  66. if user:
  67. # Successful login, return user data and token
  68. redis_client.delete(f"login_attempts:{request.email}")
  69. return JSONResponse(status_code=200, content={"message": SuccessResponse.LOGIN_SUCCESS, "data": {
  70. "id": user.id,
  71. "name": user.name,
  72. "email": user.email,
  73. "kleincoins": user.kleincoins,
  74. "created_at": user.created_at,
  75. "token": generate_token(user.email)
  76. }})
  77. else:
  78. # Failed login: increment attempts in Redis
  79. redis_client.incr(f"login_attempts:{request.email}")
  80. redis_client.expire(f"login_attempts:{request.email}", 3600)
  81. redis_attempts = redis_client.get(f"login_attempts:{request.email}")
  82. attempts = int(redis_attempts) if redis_attempts else 0 # type: ignore
  83. if attempts and int(attempts) >= 5:
  84. # Too many attempts, block login
  85. redis_client.set(f"blocked:{request.email}", "true")
  86. redis_client.expire(f"blocked:{request.email}", 3600)
  87. logger.warning(f"Too many login attempts for {request.email}. User blocked.")
  88. return JSONResponse(status_code=429, content={"message": ErrorResponse.TOO_MANY_ATTEMPTS})
  89. else:
  90. logger.warning(f"Failed login attempt for {request.email}. Attempts: {attempts}")
  91. logger.error("Invalid email or PIN.")
  92. # Return unauthorized with attempts remaining
  93. return JSONResponse(status_code=401, content={"message": ErrorResponse.INVALID_CREDENTIALS, "attempts_remaining": 5 - attempts if attempts else 5})
  94. @user_router.delete("/delete")
  95. async def delete_user(request: UserIDRequest):
  96. """Delete a user by ID"""
  97. user = user_data_service.delete(request.id)
  98. if user:
  99. return JSONResponse(status_code=200, content={"message": SuccessResponse.USER_DELETED_SUCCESS, "data": user})
  100. else:
  101. return JSONResponse(status_code=404, content={"message": UserResponse.USER_NOT_FOUND})
  102. @user_router.get("/all")
  103. async def get_all_users():
  104. """Get all users"""
  105. users = list(map(lambda u: u.model_dump(), user_data_service.get_all()))
  106. return JSONResponse(status_code=200, content={"data": users})