| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- from email.policy import HTTP
- import json
- from logging import getLogger
- from math import log
- from os import name
- from uuid import uuid4
- from click import File
- from fastapi import APIRouter, HTTPException
- from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
- from httpx import RequestError
- import redis
- from rsa import verify
- import config
- from models import user
- from models.user import LoginRequest, PinUserRequest, UserIDRequest, RegisterUserRequest
- from services.data_service import UserDataService
- from cryptography.fernet import Fernet
- from config.settings import PIN_KEY
- from auth.security import generate_token
- from services.email_service import send_email
- from config.mails import REGISTER_MAIL
- from config.settings import APPNAME
- from config.messages import ErrorResponse, SuccessResponse, UserResponse
- from utils.rut import validate_rut
- fernet = Fernet(PIN_KEY.encode())
- logger = getLogger(__name__)
- user_data_service = UserDataService()
- user_router = APIRouter()
- def unique_pin_generate():
- """Generate a unique 4-digit PIN"""
- import random
- pin = str(random.randint(1000, 9999))
- return pin
- @user_router.post("/exists")
- async def exists_user(request: UserIDRequest):
- """Check if user exists"""
- user = user_data_service.get_by_id(request.id)
- if user:
- return JSONResponse(status_code=200, content={"exists": True, "message": UserResponse.USER_EXISTS})
- else:
- return JSONResponse(status_code=404, content={"exists": False, "message": UserResponse.USER_DOES_NOT_EXIST})
- @user_router.post("/register")
- async def register_user(request: RegisterUserRequest):
- """Register a new user"""
- if not validate_rut(request.rut):
- return JSONResponse(status_code=400, content={"message": ErrorResponse.INVALID_RUT})
- user = user_data_service.get_by_email(request.email)
- if user:
- return JSONResponse(status_code=400, content={"message": UserResponse.USER_ALREADY_EXISTS})
- user = user_data_service.get_by_rut(request.rut)
- if user:
- return JSONResponse(status_code=400, content={"message": UserResponse.USER_ALREADY_EXISTS})
-
- redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
- verification_code = str(uuid4())
- redis_client.set(f"verify:{verification_code}", json.dumps({
- "name": request.name,
- "email": request.email,
- "rut": request.rut
- }))
- redis_client.expire(f"verify:{verification_code}", 3600) # Expire in 1 hour
- send_email(
- REGISTER_MAIL["subject"],
- REGISTER_MAIL["body"],
- [request.email], name=request.name, app_name=APPNAME, verification_code=verification_code
- )
- return JSONResponse(status_code=201, content={"message": SuccessResponse.USER_CREATED_SUCCESS})
- @user_router.post("/create-user")
- async def create_user(request: PinUserRequest, q: str):
- """Create a new user with PIN"""
- redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
- data = redis_client.get(f"verify:{q}")
- if not redis_client.get(f"verify:{q}"):
- return JSONResponse(status_code=400, content={"message": ErrorResponse.INVALID_VERIFICATION_CODE})
- else:
- data = json.loads(str(data))
-
- name = data.get("name")
- email = data.get("email")
- rut = data.get("rut")
- pin = request.pin
- if not request.pin or len(request.pin) != 4:
- return JSONResponse(status_code=400, content={"message": ErrorResponse.INVALID_PIN})
- userID = user_data_service.create(name, email, rut, pin)
- if userID == -1:
- return JSONResponse(status_code=400, content={"message": UserResponse.USER_ALREADY_EXISTS})
- user = user_data_service.get_by_id(userID)
- if not user:
- return JSONResponse(status_code=500, content={"message": ErrorResponse.USER_CREATION_ERROR})
- return JSONResponse(status_code=201, content={"message": SuccessResponse.USER_CREATED_SUCCESS, "data": {
- **user.model_dump(exclude={"pin_hash"}),
- "token": generate_token(user.email)
- }})
- @user_router.post("/login")
- async def login_user(request: LoginRequest):
- """Login user with email and PIN"""
- redis_client = redis.Redis(host='localhost', port=6379, db=0)
- logger.debug(f"Login attempt for email: {request.email}")
- is_blocked = redis_client.get(f"blocked:{request.email}")
- if is_blocked:
- logger.warning(f"Login attempt for blocked user: {request.email}, locked: {is_blocked}")
- return JSONResponse(status_code=403, content={"message": UserResponse.USER_FORMAT_BLOCKED.format(time=f"{int(str(redis_client.ttl(f'blocked:{request.email}'))) // 60} minutos")})
- else:
- logger.info(f"{redis_client.get(f'blocked:{request.email}')}")
-
- user = user_data_service.login(request.email, request.pin)
- if user:
- # Successful login, return user data and token
- redis_client.delete(f"login_attempts:{request.email}")
- return JSONResponse(status_code=200, content={"message": SuccessResponse.LOGIN_SUCCESS, "data": {
- "id": user.id,
- "name": user.name,
- "email": user.email,
- "kleincoins": user.kleincoins,
- "created_at": user.created_at,
- "token": generate_token(user.email)
- }})
- else:
- # Failed login: increment attempts in Redis
- redis_client.incr(f"login_attempts:{request.email}")
- redis_client.expire(f"login_attempts:{request.email}", 3600)
- redis_attempts = redis_client.get(f"login_attempts:{request.email}")
- attempts = int(redis_attempts) if redis_attempts else 0 # type: ignore
- if attempts and int(attempts) >= 5:
- # Too many attempts, block login
- redis_client.set(f"blocked:{request.email}", "true")
- redis_client.expire(f"blocked:{request.email}", 3600)
- logger.warning(f"Too many login attempts for {request.email}. User blocked.")
- return JSONResponse(status_code=429, content={"message": ErrorResponse.TOO_MANY_ATTEMPTS})
- else:
- logger.warning(f"Failed login attempt for {request.email}. Attempts: {attempts}")
- logger.error("Invalid email or PIN.")
- # Return unauthorized with attempts remaining
- return JSONResponse(status_code=401, content={"message": ErrorResponse.INVALID_CREDENTIALS, "attempts_remaining": 5 - attempts if attempts else 5})
- @user_router.delete("/delete")
- async def delete_user(request: UserIDRequest):
- """Delete a user by ID"""
- user = user_data_service.delete(request.id)
- if user:
- return JSONResponse(status_code=200, content={"message": SuccessResponse.USER_DELETED_SUCCESS, "data": user})
- else:
- return JSONResponse(status_code=404, content={"message": UserResponse.USER_NOT_FOUND})
- @user_router.get("/all")
- async def get_all_users():
- """Get all users"""
- users = list(map(lambda u: u.model_dump(), user_data_service.get_all()))
- return JSONResponse(status_code=200, content={"data": users})
- from fastapi import Query
- verify_router = APIRouter()
- @verify_router.get("/")
- async def verify_user(q: str = Query(..., description="q parameter")):
- """Verify a user by ID"""
- # get url params
- redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
- if not redis_client.get(f"verify:{q}"):
- return HTMLResponse(
- content="<h1>Invalid verification code</h1>",
- status_code=400
- )
- return FileResponse(
- "public/verify.html",
- media_type="text/html",
- )
|