| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365 |
- from datetime import datetime
- import json
- from logging import getLogger
- import re
- from uuid import uuid4
- from models import user
- import redis
- from cryptography.fernet import Fernet
- from fastapi import APIRouter, Depends, Request
- from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
- from fastapi.exceptions import HTTPException
- from auth.security import generate_token
- from auth.security import get_current_user
- from config.mails import REGISTER_MAIL, PIN_RECOVERY_MAIL, PIN_SUCCESSFULLY
- from config.messages import ErrorResponse, SuccessResponse, UserResponse
- from config.settings import APPNAME, DEVELOPMENT, PIN_KEY
- from models.user import LoginRequest, PinRecoveryRequest, PinRecoveryValidateRequest, PinUserRequest, RegisterUserRequest, User, UserIDRequest, UserMail, UserRewardRequest
- from services.data_service import BlacklistDataService, UserDataService
- from services.email_service import get_email_sender
- from services.print_service import print_ticket
- import services.recovery_service as recovery_service
- from utils.rut import validate_rut
- fernet = Fernet(PIN_KEY.encode())
- logger = getLogger(__name__)
- user_data_service = UserDataService()
- blacklist_data_service = BlacklistDataService()
- user_router = APIRouter()
- redis_client = redis.Redis(host='localhost', port=6379, db=1 if DEVELOPMENT else 0, decode_responses=True)
- def unique_pin_generate():
- """Generate a unique 4-digit PIN"""
- import random
- pin = str(random.randint(1000, 9999))
- return pin
- @user_router.post("/exists")
- async def exists_user(request: UserIDRequest):
- """Check if user exists"""
- user = user_data_service.get_by_id(request.id)
- if user:
- return JSONResponse(status_code=200, content={"exists": True, "message": UserResponse.USER_EXISTS})
- else:
- return JSONResponse(status_code=404, content={"exists": False, "message": UserResponse.USER_DOES_NOT_EXIST})
- @user_router.post("/register")
- async def register_user(request: RegisterUserRequest):
- """Register a new user"""
- logger.info(f"Registration attempt for email: {request.email}")
-
-
-
- # Validate RUT
- if not validate_rut(request.rut):
- logger.warning(f"Registration failed for {request.email}: invalid RUT {request.rut}")
-
- raise HTTPException(status_code=400, detail=ErrorResponse.INVALID_RUT)
- # Check if user already exists by email
- try:
- user = user_data_service.get_by_email(request.email)
- if user:
- logger.warning(f"Registration failed for {request.email}: user already exists")
-
- return HTTPException(status_code=400, detail=UserResponse.USER_ALREADY_EXISTS)
-
- # Check if RUT already exists
- user = user_data_service.get_by_rut(request.rut)
- if user:
- logger.warning(f"Registration failed for {request.email}: RUT already exists")
-
- return HTTPException(status_code=400, detail=UserResponse.USER_ALREADY_EXISTS)
- except Exception as e:
- error_msg = f"Database error during user validation: {e}"
- logger.error(error_msg)
-
- logger.info(f"Registering user: {request.email}")
-
- try:
- verification_code = str(uuid4())
-
- user_data = {
- "name": request.name,
- "email": request.email,
- "rut": request.rut
- }
-
- redis_client.set(f"verify:{verification_code}", json.dumps(user_data))
- redis_client.expire(f"verify:{verification_code}", 3600) # Expire in 1 hour
-
- logger.info(f"Verification code generated for {request.email}, code: {verification_code}")
- # Send verification email
- get_email_sender().send_email(
- REGISTER_MAIL["subject"],
- REGISTER_MAIL["body"],
- [request.email],
- name=request.name,
- app_name=APPNAME,
- verification_code=verification_code
- )
-
-
- return JSONResponse(status_code=201, content={"message": SuccessResponse.USER_CREATED_SUCCESS})
-
- except Exception as e:
- error_msg = f"Error during registration process for {request.email}: {e}"
- logger.error(error_msg)
-
- return JSONResponse(status_code=500, content={"message": "Error interno del servidor"})
- @user_router.post("/create-user")
- async def create_user(request: PinUserRequest, q: str):
- """Create a new user with PIN"""
- data = redis_client.get(f"verify:{q}")
- if not redis_client.get(f"verify:{q}"):
- return JSONResponse(status_code=400, content={"message": ErrorResponse.INVALID_VERIFICATION_CODE})
- else:
- data = json.loads(str(data))
- name = data.get("name")
- email = data.get("email")
- rut = data.get("rut")
- pin = request.pin
- if not request.pin or len(request.pin) != 4:
- return JSONResponse(status_code=400, content={"message": ErrorResponse.INVALID_PIN})
- userID = user_data_service.create(name, email, rut, pin)
- if userID == -1:
- return JSONResponse(status_code=400, content={"message": UserResponse.USER_ALREADY_EXISTS})
- user = user_data_service.get_by_id(userID)
- if not user:
- logger.error(f"User creation failed for {email}: user not found after creation")
- return JSONResponse(status_code=500, content={"message": ErrorResponse.USER_CREATION_ERROR})
- logger.info(f"User created successfully: {email}")
- return JSONResponse(status_code=201, content={"message": SuccessResponse.USER_CREATED_SUCCESS, "data": {
- **user.model_dump(exclude={"pin_hash"}),
- "token": generate_token(user.email)
- }})
- @user_router.post("/login")
- async def login_user(request: LoginRequest, http_request: Request):
- """Login user with email and PIN"""
- logger.info(f"Login attempt for email: {request.email}")
-
-
- try:
- is_blocked = redis_client.get(f"blocked:{request.email}")
- if is_blocked:
- try:
- blocked_time_raw = redis_client.ttl(f"blocked:{request.email}")
- blocked_minutes = max(0, int(blocked_time_raw) // 60) if blocked_time_raw and int(blocked_time_raw) > 0 else 0 # type: ignore
- except (ValueError, TypeError):
- blocked_minutes = 0
-
- logger.warning(f"Login attempt for blocked user: {request.email}, blocked for {blocked_minutes} minutes")
-
- return JSONResponse(
- status_code=403,
- content={"message": UserResponse.USER_FORMAT_BLOCKED.format(time=f"{blocked_minutes} minutos")}
- )
- # Attempt login
- user = user_data_service.login(request.email, request.pin)
- if user:
- if blacklist_data_service.is_user_blacklisted(user.id):
- logger.warning(f"Login attempt for blacklisted user: {request.email}")
-
- return JSONResponse(
- status_code=403,
- content={"message": UserResponse.USER_BLACKLISTED}
- )
- # Successful login
- logger.info(f"Successful login for user: {request.email}")
-
- # Check admin access
- referer = http_request.headers.get("Origin", "")
- logger.info(f"Login request referer: {referer}")
- if referer and "admin" in referer:
- user_permissions = user_data_service.permissions(user.id)
- if user_permissions == 0:
- logger.warning(f"Unauthorized admin access attempt by {request.email}")
-
- return JSONResponse(status_code=403, content={"message": UserResponse.NOT_PERMITTED})
- # Clear login attempts and log successful login
- redis_client.delete(f"login_attempts:{request.email}")
-
-
-
- return JSONResponse(status_code=200, content={
- "message": SuccessResponse.LOGIN_SUCCESS,
- "data": {
- "id": user.id,
- "name": user.name,
- "email": user.email,
- "kleincoins": user.kleincoins,
- "created_at": user.created_at,
- "token": generate_token(user.email),
- "reward_progress": user.reward_progress,
- }
- })
- else:
- # Failed login: increment attempts in Redis
- redis_client.incr(f"login_attempts:{request.email}")
- redis_client.expire(f"login_attempts:{request.email}", 3600)
- redis_attempts = redis_client.get(f"login_attempts:{request.email}")
- attempts = int(redis_attempts) if redis_attempts else 0 # type: ignore
-
- if attempts >= 5:
- # Too many attempts, block login
- redis_client.set(f"blocked:{request.email}", "true")
- redis_client.expire(f"blocked:{request.email}", 3600)
-
- logger.warning(f"Too many login attempts for {request.email}. User blocked.")
-
- return JSONResponse(status_code=429, content={"message": ErrorResponse.TOO_MANY_ATTEMPTS})
- else:
- logger.warning(f"Failed login attempt for {request.email}. Attempts: {attempts}")
-
-
- # Return unauthorized with attempts remaining
- return JSONResponse(status_code=401, content={
- "message": ErrorResponse.INVALID_CREDENTIALS,
- "attempts_remaining": 5 - attempts if attempts else 5
- })
-
- except redis.RedisError as e:
- error_msg = f"Redis error during login for {request.email}: {e}"
- logger.error(error_msg)
-
- return JSONResponse(status_code=500, content={"message": "Error interno del servidor"})
-
- except Exception as e:
- error_msg = f"Unexpected error during login for {request.email}: {e}"
- logger.error(error_msg)
-
- return JSONResponse(status_code=500, content={"message": "Error interno del servidor"})
- @user_router.delete("/delete")
- async def delete_user(request: UserIDRequest):
- """Delete a user by ID"""
- user = user_data_service.delete(request.id)
- if user:
- return JSONResponse(status_code=200, content={"message": SuccessResponse.USER_DELETED_SUCCESS, "data": user})
- else:
- return JSONResponse(status_code=404, content={"message": UserResponse.USER_NOT_FOUND})
- @user_router.post("/pin-recovery")
- async def change_pin(request: PinRecoveryRequest):
- """Change a user's PIN"""
- user = user_data_service.get_by_email(request.email)
- if not user:
- return JSONResponse(status_code=404, content={"message": UserResponse.USER_NOT_FOUND.format(user_id=request.email)})
- real_token = recovery_service.get_token(user.id)
- if real_token and real_token != request.token:
- return JSONResponse(status_code=400, content={"message": "Invalid token"})
- logger.info(f"Pin change, to {request.new_pin} for user {user.email}")
- user_data_service.update(user_id=user.id, pin_hash=request.new_pin)
- sender = get_email_sender()
- sender.send_email(
- to=[user.email],
- subject=PIN_SUCCESSFULLY["subject"],
- body=PIN_SUCCESSFULLY["body"].format(app_name=APPNAME, date=datetime.now().strftime("%Y-%m-%d"), time=datetime.now().strftime("%H:%M:%S"), name=user.name)
- )
- return JSONResponse(status_code=200, content={"message": "Recovery email sent"})
- @user_router.post("/reward")
- async def reward_user(request: UserRewardRequest, user: User = Depends(get_current_user)):
- """Reward a user with 1 free beer"""
- if user.reward_progress < 100:
- return JSONResponse(status_code=400, content={"message": UserResponse.REWARD_INSUFFICIENT_PROGRESS.format(progress=user.reward_progress)})
- if not user:
- return JSONResponse(status_code=404, content={"message": UserResponse.USER_NOT_FOUND.format(user_id=request.id)})
-
- user_data_service.set_reward_progress(user.id, 0)
- print_ticket(request.tableNumber)
- return JSONResponse(status_code=200, content={"message": SuccessResponse.REWARD_SUCCESS, "data": {
- "id": user.id,
- "name": user.name,
- "email": user.email,
- "reward_progress": 0
- }})
- @user_router.get("/user")
- async def get_cur_user(current_user:User = Depends(get_current_user)):
- """Get current user information"""
- return JSONResponse(status_code=200, content={"data": current_user.model_dump(exclude={"pin_hash", "kleincoins", "rut"})})
- @user_router.get("/all")
- async def get_all_users():
- """Get all users"""
- users = list(map(lambda u: u.model_dump(), user_data_service.get_all()))
- return JSONResponse(status_code=200, content={"data": users})
- from fastapi import Query
- verify_router = APIRouter()
- @verify_router.get("/")
- async def verify_user(q: str = Query(..., description="q parameter")):
- """Verify a user by ID"""
- # get url params
- if not redis_client.get(f"verify:{q}"):
- return HTMLResponse(
- content="<h1>Invalid verification code</h1>",
- status_code=400
- )
- return FileResponse(
- "public/verify.html",
- media_type="text/html",
- )
- recovery_pin_router = APIRouter()
- @recovery_pin_router.get("/")
- async def pin_forgot_get():
- """Render the PIN forgot page"""
- return FileResponse(
- "public/pin_forgot.html",
- media_type="text/html",
- )
- @recovery_pin_router.post("/")
- async def pin_forgot_post(request: UserMail):
- """Handle the PIN forgot form submission"""
- user = user_data_service.get_by_email(request.email)
- if not user:
- return JSONResponse(status_code=404, content={"message": UserResponse.USER_NOT_FOUND.format(user_id=request.email)})
- recovery_key = recovery_service.generate_recovery_key(user.id)
- sender = get_email_sender()
- sender.send_email(
- to=[user.email],
- subject=PIN_RECOVERY_MAIL["subject"],
- body=PIN_RECOVERY_MAIL["body"].format(app_name=APPNAME, verification_code=recovery_key,name=user.name)
- )
- # Send recovery_key to user's email
- return JSONResponse(status_code=200, content={"message": SuccessResponse.RECOVERY_EMAIL_SENT})
- @recovery_pin_router.post("/validate")
- async def pin_forgot_validate(request: PinRecoveryValidateRequest):
- """Validate the PIN recovery code"""
- user = user_data_service.get_by_email(request.email)
- if not user:
- return JSONResponse(status_code=404, content={"message": UserResponse.USER_NOT_FOUND.format(user_id=request.email)})
- recovery_data = recovery_service.get_recovery_data(user.id)
- logger.info(f"Recovery data for {request.email}: {recovery_data}|{request.code}")
- if recovery_data.code == -1:
- return JSONResponse(status_code=404, content={"message": UserResponse.USER_NOT_FOUND.format(user_id=request.email)})
- if recovery_data.code != request.code:
- return JSONResponse(status_code=400, content={"message": "Invalid recovery code"})
- token = uuid4().hex
- recovery_service.add_token(user.id, token)
- return JSONResponse(status_code=200, content={"message": "Recovery code validated successfully", "token": token})
|