users.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. from email.policy import HTTP
  2. import json
  3. from logging import getLogger
  4. from math import log
  5. from os import name
  6. from uuid import uuid4
  7. from click import File
  8. from fastapi import APIRouter, HTTPException, Request
  9. from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
  10. from httpx import RequestError
  11. import redis
  12. from rsa import verify
  13. import config
  14. from models import user
  15. from models.user import LoginRequest, PinUserRequest, UserIDRequest, RegisterUserRequest
  16. from services.data_service import UserDataService
  17. from cryptography.fernet import Fernet
  18. from config.settings import PIN_KEY
  19. from auth.security import generate_token
  20. from services.email_service import send_email
  21. from config.mails import REGISTER_MAIL
  22. from config.settings import APPNAME
  23. from config.messages import ErrorResponse, SuccessResponse, UserResponse
  24. from utils.rut import validate_rut
  25. fernet = Fernet(PIN_KEY.encode())
  26. logger = getLogger(__name__)
  27. user_data_service = UserDataService()
  28. user_router = APIRouter()
  29. def unique_pin_generate():
  30. """Generate a unique 4-digit PIN"""
  31. import random
  32. pin = str(random.randint(1000, 9999))
  33. return pin
  34. @user_router.post("/exists")
  35. async def exists_user(request: UserIDRequest):
  36. """Check if user exists"""
  37. user = user_data_service.get_by_id(request.id)
  38. if user:
  39. return JSONResponse(status_code=200, content={"exists": True, "message": UserResponse.USER_EXISTS})
  40. else:
  41. return JSONResponse(status_code=404, content={"exists": False, "message": UserResponse.USER_DOES_NOT_EXIST})
  42. @user_router.post("/register")
  43. async def register_user(request: RegisterUserRequest):
  44. """Register a new user"""
  45. if not validate_rut(request.rut):
  46. return JSONResponse(status_code=400, content={"message": ErrorResponse.INVALID_RUT})
  47. user = user_data_service.get_by_email(request.email)
  48. if user:
  49. return JSONResponse(status_code=400, content={"message": UserResponse.USER_ALREADY_EXISTS})
  50. user = user_data_service.get_by_rut(request.rut)
  51. if user:
  52. return JSONResponse(status_code=400, content={"message": UserResponse.USER_ALREADY_EXISTS})
  53. redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
  54. verification_code = str(uuid4())
  55. redis_client.set(f"verify:{verification_code}", json.dumps({
  56. "name": request.name,
  57. "email": request.email,
  58. "rut": request.rut
  59. }))
  60. redis_client.expire(f"verify:{verification_code}", 3600) # Expire in 1 hour
  61. send_email(
  62. REGISTER_MAIL["subject"],
  63. REGISTER_MAIL["body"],
  64. [request.email], name=request.name, app_name=APPNAME, verification_code=verification_code
  65. )
  66. return JSONResponse(status_code=201, content={"message": SuccessResponse.USER_CREATED_SUCCESS})
  67. @user_router.post("/create-user")
  68. async def create_user(request: PinUserRequest, q: str):
  69. """Create a new user with PIN"""
  70. redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
  71. data = redis_client.get(f"verify:{q}")
  72. if not redis_client.get(f"verify:{q}"):
  73. return JSONResponse(status_code=400, content={"message": ErrorResponse.INVALID_VERIFICATION_CODE})
  74. else:
  75. data = json.loads(str(data))
  76. name = data.get("name")
  77. email = data.get("email")
  78. rut = data.get("rut")
  79. pin = request.pin
  80. if not request.pin or len(request.pin) != 4:
  81. return JSONResponse(status_code=400, content={"message": ErrorResponse.INVALID_PIN})
  82. userID = user_data_service.create(name, email, rut, pin)
  83. if userID == -1:
  84. return JSONResponse(status_code=400, content={"message": UserResponse.USER_ALREADY_EXISTS})
  85. user = user_data_service.get_by_id(userID)
  86. if not user:
  87. return JSONResponse(status_code=500, content={"message": ErrorResponse.USER_CREATION_ERROR})
  88. return JSONResponse(status_code=201, content={"message": SuccessResponse.USER_CREATED_SUCCESS, "data": {
  89. **user.model_dump(exclude={"pin_hash"}),
  90. "token": generate_token(user.email)
  91. }})
  92. @user_router.post("/login")
  93. async def login_user(request: LoginRequest, http_request: Request):
  94. """Login user with email and PIN"""
  95. redis_client = redis.Redis(host='localhost', port=6379, db=0)
  96. logger.debug(f"Login attempt for email: {request.email}")
  97. is_blocked = redis_client.get(f"blocked:{request.email}")
  98. if is_blocked:
  99. logger.warning(f"Login attempt for blocked user: {request.email}, locked: {is_blocked}")
  100. return JSONResponse(status_code=403, content={"message": UserResponse.USER_FORMAT_BLOCKED.format(time=f"{int(str(redis_client.ttl(f'blocked:{request.email}'))) // 60} minutos")})
  101. else:
  102. logger.info(f"{redis_client.get(f'blocked:{request.email}')}")
  103. user = user_data_service.login(request.email, request.pin)
  104. referer = http_request.headers.get("referer")
  105. if referer and "admin" in referer:
  106. if not user or user_data_service.permissions(user.id) == 0:
  107. logger.warning(f"Unauthorized admin access attempt by {request.email}")
  108. return JSONResponse(status_code=403, content={"message": UserResponse.NOT_PERMITTED})
  109. if user:
  110. # Successful login, return user data and token
  111. redis_client.delete(f"login_attempts:{request.email}")
  112. return JSONResponse(status_code=200, content={"message": SuccessResponse.LOGIN_SUCCESS, "data": {
  113. "id": user.id,
  114. "name": user.name,
  115. "email": user.email,
  116. "kleincoins": user.kleincoins,
  117. "created_at": user.created_at,
  118. "token": generate_token(user.email)
  119. }})
  120. else:
  121. # Failed login: increment attempts in Redis
  122. redis_client.incr(f"login_attempts:{request.email}")
  123. redis_client.expire(f"login_attempts:{request.email}", 3600)
  124. redis_attempts = redis_client.get(f"login_attempts:{request.email}")
  125. attempts = int(redis_attempts) if redis_attempts else 0 # type: ignore
  126. if attempts and int(attempts) >= 5:
  127. # Too many attempts, block login
  128. redis_client.set(f"blocked:{request.email}", "true")
  129. redis_client.expire(f"blocked:{request.email}", 3600)
  130. logger.warning(f"Too many login attempts for {request.email}. User blocked.")
  131. return JSONResponse(status_code=429, content={"message": ErrorResponse.TOO_MANY_ATTEMPTS})
  132. else:
  133. logger.warning(f"Failed login attempt for {request.email}. Attempts: {attempts}")
  134. logger.error("Invalid email or PIN.")
  135. # Return unauthorized with attempts remaining
  136. return JSONResponse(status_code=401, content={"message": ErrorResponse.INVALID_CREDENTIALS, "attempts_remaining": 5 - attempts if attempts else 5})
  137. @user_router.delete("/delete")
  138. async def delete_user(request: UserIDRequest):
  139. """Delete a user by ID"""
  140. user = user_data_service.delete(request.id)
  141. if user:
  142. return JSONResponse(status_code=200, content={"message": SuccessResponse.USER_DELETED_SUCCESS, "data": user})
  143. else:
  144. return JSONResponse(status_code=404, content={"message": UserResponse.USER_NOT_FOUND})
  145. @user_router.get("/all")
  146. async def get_all_users():
  147. """Get all users"""
  148. users = list(map(lambda u: u.model_dump(), user_data_service.get_all()))
  149. return JSONResponse(status_code=200, content={"data": users})
  150. from fastapi import Query
  151. verify_router = APIRouter()
  152. @verify_router.get("/")
  153. async def verify_user(q: str = Query(..., description="q parameter")):
  154. """Verify a user by ID"""
  155. # get url params
  156. redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
  157. if not redis_client.get(f"verify:{q}"):
  158. return HTMLResponse(
  159. content="<h1>Invalid verification code</h1>",
  160. status_code=400
  161. )
  162. return FileResponse(
  163. "public/verify.html",
  164. media_type="text/html",
  165. )