from logging import getLogger from math import log from fastapi import APIRouter from fastapi.responses import JSONResponse from httpx import RequestError import redis import config from models import user from models.user import LoginRequest, UserIDRequest, RegisterUserRequest from services.data_service import UserDataService from cryptography.fernet import Fernet from config.settings import PIN_KEY from auth.security import generate_token from services.email_service import send_email from config.mails import REGISTER_MAIL from config.settings import APPNAME from config.messages import ErrorResponse, SuccessResponse, UserResponse fernet = Fernet(PIN_KEY.encode()) logger = getLogger(__name__) user_data_service = UserDataService() user_router = APIRouter() def unique_pin_generate(): """Generate a unique 4-digit PIN""" import random pin = str(random.randint(1000, 9999)) return pin @user_router.post("/exists") async def exists_user(request: UserIDRequest): """Check if user exists""" user = user_data_service.get_by_id(request.id) if user: return JSONResponse(status_code=200, content={"exists": True, "message": UserResponse.USER_EXISTS}) else: return JSONResponse(status_code=404, content={"exists": False, "message": UserResponse.USER_DOES_NOT_EXIST}) @user_router.post("/register") async def register_user(request: RegisterUserRequest): """Register a new user""" pin = unique_pin_generate() userID = user_data_service.create(request.name, request.email, request.rut, pin) if userID == -1: return JSONResponse(status_code=400, content={"message": UserResponse.USER_ALREADY_EXISTS}) user = user_data_service.get_by_id(userID) if not user: return JSONResponse(status_code=500, content={"message": ErrorResponse.USER_CREATION_ERROR}) send_email( REGISTER_MAIL["subject"], REGISTER_MAIL["body"], [user.email], name=user.nombre, app_name=APPNAME, pin=pin ) return JSONResponse(status_code=201, content={"message": SuccessResponse.USER_CREATED_SUCCESS, "data": { **user.model_dump(exclude={"pin_hash"}), "token": generate_token(user.email) }}) @user_router.post("/login") async def login_user(request: LoginRequest): """Login user with email and PIN""" redis_client = redis.Redis(host='localhost', port=6379, db=0) logger.debug(f"Login attempt for email: {request.email}") is_blocked = redis_client.get(f"blocked:{request.email}") if is_blocked: logger.warning(f"Login attempt for blocked user: {request.email}, locked: {is_blocked}") return JSONResponse(status_code=403, content={"message": UserResponse.USER_FORMAT_BLOCKED.format(time=f"{int(str(redis_client.ttl(f'blocked:{request.email}'))) // 60} minutos")}) else: logger.info(f"{redis_client.get(f'blocked:{request.email}')}") user = user_data_service.login(request.email, request.pin) if user: # Successful login, return user data and token redis_client.delete(f"login_attempts:{request.email}") return JSONResponse(status_code=200, content={"message": SuccessResponse.LOGIN_SUCCESS, "data": { "id": user.id, "name": user.nombre, "email": user.email, "kleincoins": user.kleincoins, "created_at": user.created_at, "token": generate_token(user.email) }}) else: # Failed login: increment attempts in Redis redis_client.incr(f"login_attempts:{request.email}") redis_client.expire(f"login_attempts:{request.email}", 3600) redis_attempts = redis_client.get(f"login_attempts:{request.email}") attempts = int(redis_attempts) if redis_attempts else 0 # type: ignore if attempts and int(attempts) >= 5: # Too many attempts, block login redis_client.set(f"blocked:{request.email}", "true") redis_client.expire(f"blocked:{request.email}", 3600) logger.warning(f"Too many login attempts for {request.email}. User blocked.") return JSONResponse(status_code=429, content={"message": ErrorResponse.TOO_MANY_ATTEMPTS}) else: logger.warning(f"Failed login attempt for {request.email}. Attempts: {attempts}") logger.error("Invalid email or PIN.") # Return unauthorized with attempts remaining return JSONResponse(status_code=401, content={"message": ErrorResponse.INVALID_CREDENTIALS, "attempts_remaining": 5 - attempts if attempts else 5}) @user_router.delete("/delete") async def delete_user(request: UserIDRequest): """Delete a user by ID""" user = user_data_service.delete(request.id) if user: return JSONResponse(status_code=200, content={"message": SuccessResponse.USER_DELETED_SUCCESS, "data": user}) else: return JSONResponse(status_code=404, content={"message": UserResponse.USER_NOT_FOUND}) @user_router.get("/all") async def get_all_users(): """Get all users""" users = list(map(lambda u: u.model_dump(), user_data_service.get_all())) return JSONResponse(status_code=200, content={"data": users})