users.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. from email.policy import HTTP
  2. import json
  3. from logging import getLogger
  4. from math import log
  5. from os import name
  6. from uuid import uuid4
  7. from click import File
  8. from fastapi import APIRouter, HTTPException
  9. from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
  10. from httpx import RequestError
  11. import redis
  12. from rsa import verify
  13. import config
  14. from models import user
  15. from models.user import LoginRequest, PinUserRequest, UserIDRequest, RegisterUserRequest
  16. from services.data_service import UserDataService
  17. from cryptography.fernet import Fernet
  18. from config.settings import PIN_KEY
  19. from auth.security import generate_token
  20. from services.email_service import send_email
  21. from config.mails import REGISTER_MAIL
  22. from config.settings import APPNAME
  23. from config.messages import ErrorResponse, SuccessResponse, UserResponse
  24. fernet = Fernet(PIN_KEY.encode())
  25. logger = getLogger(__name__)
  26. user_data_service = UserDataService()
  27. user_router = APIRouter()
  28. def unique_pin_generate():
  29. """Generate a unique 4-digit PIN"""
  30. import random
  31. pin = str(random.randint(1000, 9999))
  32. return pin
  33. @user_router.post("/exists")
  34. async def exists_user(request: UserIDRequest):
  35. """Check if user exists"""
  36. user = user_data_service.get_by_id(request.id)
  37. if user:
  38. return JSONResponse(status_code=200, content={"exists": True, "message": UserResponse.USER_EXISTS})
  39. else:
  40. return JSONResponse(status_code=404, content={"exists": False, "message": UserResponse.USER_DOES_NOT_EXIST})
  41. @user_router.post("/register")
  42. async def register_user(request: RegisterUserRequest):
  43. """Register a new user"""
  44. user = user_data_service.get_by_email(request.email)
  45. if user:
  46. return JSONResponse(status_code=400, content={"message": UserResponse.USER_ALREADY_EXISTS})
  47. user = user_data_service.get_by_rut(request.rut)
  48. if user:
  49. return JSONResponse(status_code=400, content={"message": UserResponse.USER_ALREADY_EXISTS})
  50. redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
  51. verification_code = str(uuid4())
  52. redis_client.set(f"verify:{verification_code}", json.dumps({
  53. "name": request.name,
  54. "email": request.email,
  55. "rut": request.rut
  56. }))
  57. redis_client.expire(f"verify:{verification_code}", 3600) # Expire in 1 hour
  58. send_email(
  59. REGISTER_MAIL["subject"],
  60. REGISTER_MAIL["body"],
  61. [request.email], name=request.name, app_name=APPNAME, verification_code=verification_code
  62. )
  63. return JSONResponse(status_code=201, content={"message": SuccessResponse.USER_CREATED_SUCCESS})
  64. @user_router.post("/create-user")
  65. async def create_user(request: PinUserRequest, q: str):
  66. """Create a new user with PIN"""
  67. redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
  68. data = redis_client.get(f"verify:{q}")
  69. if not redis_client.get(f"verify:{q}"):
  70. return JSONResponse(status_code=400, content={"message": ErrorResponse.INVALID_VERIFICATION_CODE})
  71. else:
  72. data = json.loads(str(data))
  73. name = data.get("name")
  74. email = data.get("email")
  75. rut = data.get("rut")
  76. pin = request.pin
  77. if not request.pin or len(request.pin) != 4:
  78. return JSONResponse(status_code=400, content={"message": ErrorResponse.INVALID_PIN})
  79. userID = user_data_service.create(name, email, rut, pin)
  80. if userID == -1:
  81. return JSONResponse(status_code=400, content={"message": UserResponse.USER_ALREADY_EXISTS})
  82. user = user_data_service.get_by_id(userID)
  83. if not user:
  84. return JSONResponse(status_code=500, content={"message": ErrorResponse.USER_CREATION_ERROR})
  85. return JSONResponse(status_code=201, content={"message": SuccessResponse.USER_CREATED_SUCCESS, "data": {
  86. **user.model_dump(exclude={"pin_hash"}),
  87. "token": generate_token(user.email)
  88. }})
  89. @user_router.post("/login")
  90. async def login_user(request: LoginRequest):
  91. """Login user with email and PIN"""
  92. redis_client = redis.Redis(host='localhost', port=6379, db=0)
  93. logger.debug(f"Login attempt for email: {request.email}")
  94. is_blocked = redis_client.get(f"blocked:{request.email}")
  95. if is_blocked:
  96. logger.warning(f"Login attempt for blocked user: {request.email}, locked: {is_blocked}")
  97. return JSONResponse(status_code=403, content={"message": UserResponse.USER_FORMAT_BLOCKED.format(time=f"{int(str(redis_client.ttl(f'blocked:{request.email}'))) // 60} minutos")})
  98. else:
  99. logger.info(f"{redis_client.get(f'blocked:{request.email}')}")
  100. user = user_data_service.login(request.email, request.pin)
  101. if user:
  102. # Successful login, return user data and token
  103. redis_client.delete(f"login_attempts:{request.email}")
  104. return JSONResponse(status_code=200, content={"message": SuccessResponse.LOGIN_SUCCESS, "data": {
  105. "id": user.id,
  106. "name": user.name,
  107. "email": user.email,
  108. "kleincoins": user.kleincoins,
  109. "created_at": user.created_at,
  110. "token": generate_token(user.email)
  111. }})
  112. else:
  113. # Failed login: increment attempts in Redis
  114. redis_client.incr(f"login_attempts:{request.email}")
  115. redis_client.expire(f"login_attempts:{request.email}", 3600)
  116. redis_attempts = redis_client.get(f"login_attempts:{request.email}")
  117. attempts = int(redis_attempts) if redis_attempts else 0 # type: ignore
  118. if attempts and int(attempts) >= 5:
  119. # Too many attempts, block login
  120. redis_client.set(f"blocked:{request.email}", "true")
  121. redis_client.expire(f"blocked:{request.email}", 3600)
  122. logger.warning(f"Too many login attempts for {request.email}. User blocked.")
  123. return JSONResponse(status_code=429, content={"message": ErrorResponse.TOO_MANY_ATTEMPTS})
  124. else:
  125. logger.warning(f"Failed login attempt for {request.email}. Attempts: {attempts}")
  126. logger.error("Invalid email or PIN.")
  127. # Return unauthorized with attempts remaining
  128. return JSONResponse(status_code=401, content={"message": ErrorResponse.INVALID_CREDENTIALS, "attempts_remaining": 5 - attempts if attempts else 5})
  129. @user_router.delete("/delete")
  130. async def delete_user(request: UserIDRequest):
  131. """Delete a user by ID"""
  132. user = user_data_service.delete(request.id)
  133. if user:
  134. return JSONResponse(status_code=200, content={"message": SuccessResponse.USER_DELETED_SUCCESS, "data": user})
  135. else:
  136. return JSONResponse(status_code=404, content={"message": UserResponse.USER_NOT_FOUND})
  137. @user_router.get("/all")
  138. async def get_all_users():
  139. """Get all users"""
  140. users = list(map(lambda u: u.model_dump(), user_data_service.get_all()))
  141. return JSONResponse(status_code=200, content={"data": users})
  142. from fastapi import Query
  143. verify_router = APIRouter()
  144. @verify_router.get("/")
  145. async def verify_user(q: str = Query(..., description="q parameter")):
  146. """Verify a user by ID"""
  147. # get url params
  148. redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
  149. return FileResponse(
  150. "public/verify.html",
  151. media_type="text/html",
  152. )