| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- from logging import getLogger
- from math import log
- from fastapi import APIRouter
- from fastapi.responses import JSONResponse
- from httpx import RequestError
- import redis
- import config
- from models import user
- from models.user import LoginRequest, UserIDRequest, RegisterUserRequest
- from services.data_service import UserDataService
- from cryptography.fernet import Fernet
- from config.settings import PIN_KEY
- from auth.security import generate_token
- from services.email_service import send_email
- from config.mails import REGISTER_MAIL
- from config.settings import APPNAME
- from config.messages import ErrorResponse, SuccessResponse, UserResponse
- fernet = Fernet(PIN_KEY.encode())
- logger = getLogger(__name__)
- user_data_service = UserDataService()
- user_router = APIRouter()
- def unique_pin_generate():
- """Generate a unique 4-digit PIN"""
- import random
- pin = str(random.randint(1000, 9999))
- return pin
- @user_router.post("/exists")
- async def exists_user(request: UserIDRequest):
- """Check if user exists"""
- user = user_data_service.get_by_id(request.id)
- if user:
- return JSONResponse(status_code=200, content={"exists": True, "message": UserResponse.USER_EXISTS})
- else:
- return JSONResponse(status_code=404, content={"exists": False, "message": UserResponse.USER_DOES_NOT_EXIST})
- @user_router.post("/register")
- async def register_user(request: RegisterUserRequest):
- """Register a new user"""
- pin = unique_pin_generate()
- userID = user_data_service.create(request.name, request.email, request.rut, pin)
- if userID == -1:
- return JSONResponse(status_code=400, content={"message": UserResponse.USER_ALREADY_EXISTS})
- user = user_data_service.get_by_id(userID)
- if not user:
- return JSONResponse(status_code=500, content={"message": ErrorResponse.USER_CREATION_ERROR})
- send_email(
- REGISTER_MAIL["subject"],
- REGISTER_MAIL["body"],
- [user.email], name=user.nombre, app_name=APPNAME, pin=pin
- )
- return JSONResponse(status_code=201, content={"message": SuccessResponse.USER_CREATED_SUCCESS, "data": {
- **user.model_dump(exclude={"pin_hash"}),
- "token": generate_token(user.email)
- }})
- @user_router.post("/login")
- async def login_user(request: LoginRequest):
- """Login user with email and PIN"""
- redis_client = redis.Redis(host='localhost', port=6379, db=0)
- logger.debug(f"Login attempt for email: {request.email}")
- is_blocked = redis_client.get(f"blocked:{request.email}")
- if is_blocked:
- logger.warning(f"Login attempt for blocked user: {request.email}, locked: {is_blocked}")
- return JSONResponse(status_code=403, content={"message": UserResponse.USER_FORMAT_BLOCKED.format(time=f"{int(str(redis_client.ttl(f'blocked:{request.email}'))) // 60} minutos")})
- else:
- logger.info(f"{redis_client.get(f'blocked:{request.email}')}")
-
- user = user_data_service.login(request.email, request.pin)
- if user:
- # Successful login, return user data and token
- redis_client.delete(f"login_attempts:{request.email}")
- return JSONResponse(status_code=200, content={"message": SuccessResponse.LOGIN_SUCCESS, "data": {
- "id": user.id,
- "name": user.nombre,
- "email": user.email,
- "kleincoins": user.kleincoins,
- "created_at": user.created_at,
- "token": generate_token(user.email)
- }})
- else:
- # Failed login: increment attempts in Redis
- redis_client.incr(f"login_attempts:{request.email}")
- redis_client.expire(f"login_attempts:{request.email}", 3600)
- redis_attempts = redis_client.get(f"login_attempts:{request.email}")
- attempts = int(redis_attempts) if redis_attempts else 0 # type: ignore
- if attempts and int(attempts) >= 5:
- # Too many attempts, block login
- redis_client.set(f"blocked:{request.email}", "true")
- redis_client.expire(f"blocked:{request.email}", 3600)
- logger.warning(f"Too many login attempts for {request.email}. User blocked.")
- return JSONResponse(status_code=429, content={"message": ErrorResponse.TOO_MANY_ATTEMPTS})
- else:
- logger.warning(f"Failed login attempt for {request.email}. Attempts: {attempts}")
- logger.error("Invalid email or PIN.")
- # Return unauthorized with attempts remaining
- return JSONResponse(status_code=401, content={"message": ErrorResponse.INVALID_CREDENTIALS, "attempts_remaining": 5 - attempts if attempts else 5})
- @user_router.delete("/delete")
- async def delete_user(request: UserIDRequest):
- """Delete a user by ID"""
- user = user_data_service.delete(request.id)
- if user:
- return JSONResponse(status_code=200, content={"message": SuccessResponse.USER_DELETED_SUCCESS, "data": user})
- else:
- return JSONResponse(status_code=404, content={"message": UserResponse.USER_NOT_FOUND})
- @user_router.get("/all")
- async def get_all_users():
- """Get all users"""
- users = list(map(lambda u: u.model_dump(), user_data_service.get_all()))
- return JSONResponse(status_code=200, content={"data": users})
|