users.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. from logging import getLogger
  2. from fastapi import APIRouter
  3. from fastapi.responses import JSONResponse
  4. from httpx import RequestError
  5. import redis
  6. import config
  7. from models import user
  8. from models.user import LoginRequest, UserIDRequest, RegisterUserRequest
  9. from services.data_service import UserDataService
  10. from cryptography.fernet import Fernet
  11. from config.settings import PIN_KEY
  12. from auth.security import generate_token
  13. from services.email_service import send_email
  14. from config.mails import REGISTER_MAIL
  15. from config.settings import APPNAME
  16. fernet = Fernet(PIN_KEY.encode())
  17. logger = getLogger(__name__)
  18. user_data_service = UserDataService()
  19. user_router = APIRouter()
  20. def unique_pin_generate():
  21. """Generate a unique 4-digit PIN"""
  22. import random
  23. pin = str(random.randint(1000, 9999))
  24. return pin
  25. @user_router.post("/exists")
  26. async def exists_user(request: UserIDRequest):
  27. """Check if user exists"""
  28. user = user_data_service.get_by_id(request.id)
  29. if user:
  30. return JSONResponse(status_code=200, content={"exists": True})
  31. else:
  32. return JSONResponse(status_code=404, content={"exists": False, "data": {"message": "User does not exist."}})
  33. @user_router.post("/register")
  34. async def register_user(request: RegisterUserRequest):
  35. """Register a new user"""
  36. pin = unique_pin_generate()
  37. userID = user_data_service.create(request.name, request.email, request.rut, pin)
  38. if userID == -1:
  39. return JSONResponse(status_code=400, content={"message": "User already exists."})
  40. user = user_data_service.get_by_id(userID)
  41. if not user:
  42. return JSONResponse(status_code=500, content={"message": "Error creating user."})
  43. send_email(
  44. REGISTER_MAIL["subject"],
  45. REGISTER_MAIL["body"],
  46. [user.email], name=user.nombre, app_name=APPNAME, pin=pin
  47. )
  48. return JSONResponse(status_code=201, content={"message": "User created successfully.", "data": {
  49. **user.model_dump(exclude={"pin_hash"}),
  50. "token": generate_token(user.email)
  51. }})
  52. @user_router.post("/login")
  53. async def login_user(request: LoginRequest):
  54. """Login user with email and PIN"""
  55. logger.debug(f"Login attempt for email: {request.email}")
  56. user = user_data_service.login(request.email, request.pin)
  57. if user:
  58. # Successful login, return user data and token
  59. return JSONResponse(status_code=200, content={"message": "Login successful.", "data": {
  60. "id": user.id,
  61. "name": user.nombre,
  62. "email": user.email,
  63. "kleincoins": user.kleincoins,
  64. "created_at": user.created_at,
  65. "token": generate_token(user.email)
  66. }})
  67. else:
  68. # Failed login: increment attempts in Redis
  69. redis_client = redis.Redis(host='localhost', port=6379, db=0)
  70. redis_client.incr(f"login_attempts:{request.email}")
  71. redis_client.expire(f"login_attempts:{request.email}", 3600)
  72. redis_attempts = redis_client.get(f"login_attempts:{request.email}")
  73. attempts = int(redis_attempts) if redis_attempts else 0 # type: ignore
  74. if attempts and int(attempts) > 5:
  75. # Too many attempts, block login
  76. return JSONResponse(status_code=429, content={"message": "Too many login attempts. Please try again later."})
  77. else:
  78. # Set flag for last failed login
  79. redis_client.set(f"last_failed_login:{request.email}", "true")
  80. redis_client.expire(f"last_failed_login:{request.email}", 3600)
  81. logger.warning(f"Failed login attempt for {request.email}. Attempts: {attempts}")
  82. logger.error("Invalid email or PIN.")
  83. # Return unauthorized with attempts remaining
  84. return JSONResponse(status_code=401, content={"message": "Invalid email or PIN.", "attempts_remaining": 5 - attempts if attempts else 5})
  85. @user_router.delete("/delete")
  86. async def delete_user(request: UserIDRequest):
  87. """Delete a user by ID"""
  88. user = user_data_service.delete(request.id)
  89. if user:
  90. return JSONResponse(status_code=200, content={"message": "User deleted successfully.", "data": user})
  91. else:
  92. return JSONResponse(status_code=404, content={"message": "User not found."})
  93. @user_router.get("/all")
  94. async def get_all_users():
  95. """Get all users"""
  96. users = list(map(lambda u: u.model_dump(), user_data_service.get_all()))
  97. return JSONResponse(status_code=200, content={"data": users})