import json from logging import getLogger from uuid import uuid4 import redis from cryptography.fernet import Fernet from fastapi import APIRouter, Depends, Request from fastapi.responses import FileResponse, HTMLResponse, JSONResponse from fastapi.exceptions import HTTPException from auth.security import generate_token from auth.security import get_current_user from config.mails import REGISTER_MAIL from config.messages import ErrorResponse, SuccessResponse, UserResponse from config.settings import APPNAME, PIN_KEY from models.user import LoginRequest, PinUserRequest, RegisterUserRequest, User, User, UserIDRequest, UserRewardRequest from services.data_service import UserDataService from services.email_service import get_email_sender from services.print_service import print_ticket from services.logging_service import structured_logger, LogLevel from utils.rut import validate_rut fernet = Fernet(PIN_KEY.encode()) logger = getLogger(__name__) user_data_service = UserDataService() user_router = APIRouter() def unique_pin_generate(): """Generate a unique 4-digit PIN""" import random pin = str(random.randint(1000, 9999)) return pin @user_router.post("/exists") async def exists_user(request: UserIDRequest): """Check if user exists""" user = user_data_service.get_by_id(request.id) if user: return JSONResponse(status_code=200, content={"exists": True, "message": UserResponse.USER_EXISTS}) else: return JSONResponse(status_code=404, content={"exists": False, "message": UserResponse.USER_DOES_NOT_EXIST}) @user_router.post("/register") async def register_user(request: RegisterUserRequest): """Register a new user""" logger.info(f"Registration attempt for email: {request.email}") structured_logger.log_user_event( f"User registration attempt", LogLevel.INFO, { "email": request.email, "name": request.name, "rut": request.rut }, user_email=request.email ) # Validate RUT if not validate_rut(request.rut): logger.warning(f"Registration failed for {request.email}: invalid RUT {request.rut}") structured_logger.log_user_event( "Registration failed: invalid RUT", LogLevel.WARNING, { "email": request.email, "rut": request.rut, "reason": "Invalid RUT format" }, user_email=request.email ) raise HTTPException(status_code=400, detail=ErrorResponse.INVALID_RUT) # Check if user already exists by email try: user = user_data_service.get_by_email(request.email) if user: logger.warning(f"Registration failed for {request.email}: user already exists") structured_logger.log_user_event( "Registration failed: user already exists", LogLevel.WARNING, { "email": request.email, "existing_user_id": user.id, "reason": "Email already registered" }, user_id=user.id, user_email=request.email ) return HTTPException(status_code=400, detail=UserResponse.USER_ALREADY_EXISTS) # Check if RUT already exists user = user_data_service.get_by_rut(request.rut) if user: logger.warning(f"Registration failed for {request.email}: RUT already exists") structured_logger.log_user_event( "Registration failed: RUT already exists", LogLevel.WARNING, { "email": request.email, "rut": request.rut, "existing_user_id": user.id, "reason": "RUT already registered" }, user_email=request.email ) return HTTPException(status_code=400, detail=UserResponse.USER_ALREADY_EXISTS) except Exception as e: error_msg = f"Database error during user validation: {e}" logger.error(error_msg) structured_logger.log_database_event( "User validation error during registration", LogLevel.ERROR, { "email": request.email, "error": str(e), "error_type": type(e).__name__ }, user_email=request.email ) raise HTTPException(status_code=500, detail={"message": "Error interno del servidor"}) logger.info(f"Registering user: {request.email}") try: # Setup Redis client and verification code redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) verification_code = str(uuid4()) user_data = { "name": request.name, "email": request.email, "rut": request.rut } redis_client.set(f"verify:{verification_code}", json.dumps(user_data)) redis_client.expire(f"verify:{verification_code}", 3600) # Expire in 1 hour structured_logger.log_user_event( "Verification code generated for user registration", LogLevel.INFO, { "email": request.email, "verification_code": verification_code, "expires_in": 3600 }, user_email=request.email ) # Send verification email get_email_sender().send_email( REGISTER_MAIL["subject"], REGISTER_MAIL["body"], [request.email], name=request.name, app_name=APPNAME, verification_code=verification_code ) structured_logger.log_email_event( "Registration verification email sent", LogLevel.INFO, { "email": request.email, "verification_code": verification_code }, user_email=request.email ) logger.info(f"Registration initiated successfully for {request.email}") return JSONResponse(status_code=201, content={"message": SuccessResponse.USER_CREATED_SUCCESS}) except Exception as e: error_msg = f"Error during registration process for {request.email}: {e}" logger.error(error_msg) structured_logger.log_user_event( "Registration process failed", LogLevel.ERROR, { "email": request.email, "error": str(e), "error_type": type(e).__name__, "step": "verification_setup_or_email" }, user_email=request.email ) return JSONResponse(status_code=500, content={"message": "Error interno del servidor"}) @user_router.post("/create-user") async def create_user(request: PinUserRequest, q: str): """Create a new user with PIN""" redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) data = redis_client.get(f"verify:{q}") if not redis_client.get(f"verify:{q}"): return JSONResponse(status_code=400, content={"message": ErrorResponse.INVALID_VERIFICATION_CODE}) else: data = json.loads(str(data)) name = data.get("name") email = data.get("email") rut = data.get("rut") pin = request.pin if not request.pin or len(request.pin) != 4: return JSONResponse(status_code=400, content={"message": ErrorResponse.INVALID_PIN}) userID = user_data_service.create(name, email, rut, pin) if userID == -1: return JSONResponse(status_code=400, content={"message": UserResponse.USER_ALREADY_EXISTS}) user = user_data_service.get_by_id(userID) if not user: return JSONResponse(status_code=500, content={"message": ErrorResponse.USER_CREATION_ERROR}) return JSONResponse(status_code=201, content={"message": SuccessResponse.USER_CREATED_SUCCESS, "data": { **user.model_dump(exclude={"pin_hash"}), "token": generate_token(user.email) }}) @user_router.post("/login") async def login_user(request: LoginRequest, http_request: Request): """Login user with email and PIN""" logger.info(f"Login attempt for email: {request.email}") structured_logger.log_security_event( f"Login attempt for user {request.email}", LogLevel.INFO, { "email": request.email, "user_agent": http_request.headers.get("user-agent", "unknown"), "referer": http_request.headers.get("referer", "unknown"), "client_ip": http_request.client.host if http_request.client else "unknown" }, user_email=request.email ) try: redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) is_blocked = redis_client.get(f"blocked:{request.email}") if is_blocked: try: blocked_time_raw = redis_client.ttl(f"blocked:{request.email}") blocked_minutes = max(0, int(blocked_time_raw) // 60) if blocked_time_raw and int(blocked_time_raw) > 0 else 0 # type: ignore except (ValueError, TypeError): blocked_minutes = 0 logger.warning(f"Login attempt for blocked user: {request.email}, blocked for {blocked_minutes} minutes") structured_logger.log_security_event( f"Login attempt by blocked user", LogLevel.WARNING, { "email": request.email, "blocked_time_remaining_minutes": blocked_minutes, "user_agent": http_request.headers.get("user-agent", "unknown") }, user_email=request.email ) return JSONResponse( status_code=403, content={"message": UserResponse.USER_FORMAT_BLOCKED.format(time=f"{blocked_minutes} minutos")} ) # Attempt login user = user_data_service.login(request.email, request.pin) if user: # Successful login logger.info(f"Successful login for user: {request.email}") # Check admin access referer = http_request.headers.get("referer", "") if referer and "admin" in referer: user_permissions = user_data_service.permissions(user.id) if user_permissions == 0: logger.warning(f"Unauthorized admin access attempt by {request.email}") structured_logger.log_security_event( f"Unauthorized admin access attempt", LogLevel.WARNING, { "email": request.email, "user_id": user.id, "permissions": user_permissions, "referer": referer }, user_id=user.id, user_email=request.email ) return JSONResponse(status_code=403, content={"message": UserResponse.NOT_PERMITTED}) # Clear login attempts and log successful login redis_client.delete(f"login_attempts:{request.email}") structured_logger.log_security_event( f"Successful login", LogLevel.INFO, { "email": request.email, "user_id": user.id, "user_name": user.name, "permissions": user_data_service.permissions(user.id), "is_admin_login": "admin" in referer, "reward_progress": user.reward_progress }, user_id=user.id, user_email=request.email ) return JSONResponse(status_code=200, content={ "message": SuccessResponse.LOGIN_SUCCESS, "data": { "id": user.id, "name": user.name, "email": user.email, "kleincoins": user.kleincoins, "created_at": user.created_at, "token": generate_token(user.email), "reward_progress": user.reward_progress, } }) else: # Failed login: increment attempts in Redis redis_client.incr(f"login_attempts:{request.email}") redis_client.expire(f"login_attempts:{request.email}", 3600) redis_attempts = redis_client.get(f"login_attempts:{request.email}") attempts = int(redis_attempts) if redis_attempts else 0 # type: ignore if attempts >= 5: # Too many attempts, block login redis_client.set(f"blocked:{request.email}", "true") redis_client.expire(f"blocked:{request.email}", 3600) logger.warning(f"Too many login attempts for {request.email}. User blocked.") structured_logger.log_security_event( f"User blocked due to too many failed login attempts", LogLevel.WARNING, { "email": request.email, "failed_attempts": attempts, "blocked_duration_seconds": 3600 }, user_email=request.email ) return JSONResponse(status_code=429, content={"message": ErrorResponse.TOO_MANY_ATTEMPTS}) else: logger.warning(f"Failed login attempt for {request.email}. Attempts: {attempts}") structured_logger.log_security_event( f"Failed login attempt", LogLevel.WARNING, { "email": request.email, "failed_attempts": attempts, "attempts_remaining": 5 - attempts, "reason": "Invalid credentials" }, user_email=request.email ) # Return unauthorized with attempts remaining return JSONResponse(status_code=401, content={ "message": ErrorResponse.INVALID_CREDENTIALS, "attempts_remaining": 5 - attempts if attempts else 5 }) except redis.RedisError as e: error_msg = f"Redis error during login for {request.email}: {e}" logger.error(error_msg) structured_logger.log_system_event( "Redis error during login process", LogLevel.ERROR, { "email": request.email, "error": str(e), "error_type": "RedisError" } ) return JSONResponse(status_code=500, content={"message": "Error interno del servidor"}) except Exception as e: error_msg = f"Unexpected error during login for {request.email}: {e}" logger.error(error_msg) structured_logger.log_security_event( "Unexpected error during login", LogLevel.ERROR, { "email": request.email, "error": str(e), "error_type": type(e).__name__ }, user_email=request.email ) return JSONResponse(status_code=500, content={"message": "Error interno del servidor"}) @user_router.delete("/delete") async def delete_user(request: UserIDRequest): """Delete a user by ID""" user = user_data_service.delete(request.id) if user: return JSONResponse(status_code=200, content={"message": SuccessResponse.USER_DELETED_SUCCESS, "data": user}) else: return JSONResponse(status_code=404, content={"message": UserResponse.USER_NOT_FOUND}) @user_router.post("/reward") async def reward_user(request: UserRewardRequest, user: User = Depends(get_current_user)): """Reward a user with 1 free beer""" if user.reward_progress < 100: return JSONResponse(status_code=400, content={"message": UserResponse.REWARD_INSUFFICIENT_PROGRESS.format(progress=user.reward_progress)}) if not user: return JSONResponse(status_code=404, content={"message": UserResponse.USER_NOT_FOUND.format(user_id=request.id)}) user_data_service.set_reward_progress(user.id, 0) print_ticket(request.tableNumber) return JSONResponse(status_code=200, content={"message": SuccessResponse.REWARD_SUCCESS, "data": { "id": user.id, "name": user.name, "email": user.email, "reward_progress": 0 }}) @user_router.get("/user") async def get_cur_user(current_user:User = Depends(get_current_user)): """Get current user information""" return JSONResponse(status_code=200, content={"data": current_user.model_dump(exclude={"pin_hash", "kleincoins", "rut"})}) @user_router.get("/all") async def get_all_users(): """Get all users""" users = list(map(lambda u: u.model_dump(), user_data_service.get_all())) return JSONResponse(status_code=200, content={"data": users}) from fastapi import Query verify_router = APIRouter() @verify_router.get("/") async def verify_user(q: str = Query(..., description="q parameter")): """Verify a user by ID""" # get url params redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) if not redis_client.get(f"verify:{q}"): return HTMLResponse( content="