|
@@ -5,7 +5,7 @@ from math import log
|
|
|
from os import name
|
|
from os import name
|
|
|
from uuid import uuid4
|
|
from uuid import uuid4
|
|
|
from click import File
|
|
from click import File
|
|
|
-from fastapi import APIRouter, HTTPException
|
|
|
|
|
|
|
+from fastapi import APIRouter, HTTPException, Request
|
|
|
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
|
|
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
|
|
|
from httpx import RequestError
|
|
from httpx import RequestError
|
|
|
import redis
|
|
import redis
|
|
@@ -101,13 +101,13 @@ async def create_user(request: PinUserRequest, q: str):
|
|
|
}})
|
|
}})
|
|
|
|
|
|
|
|
@user_router.post("/login")
|
|
@user_router.post("/login")
|
|
|
-async def login_user(request: LoginRequest):
|
|
|
|
|
|
|
+async def login_user(request: LoginRequest, http_request: Request):
|
|
|
"""Login user with email and PIN"""
|
|
"""Login user with email and PIN"""
|
|
|
redis_client = redis.Redis(host='localhost', port=6379, db=0)
|
|
redis_client = redis.Redis(host='localhost', port=6379, db=0)
|
|
|
logger.debug(f"Login attempt for email: {request.email}")
|
|
logger.debug(f"Login attempt for email: {request.email}")
|
|
|
-
|
|
|
|
|
is_blocked = redis_client.get(f"blocked:{request.email}")
|
|
is_blocked = redis_client.get(f"blocked:{request.email}")
|
|
|
|
|
|
|
|
|
|
+
|
|
|
if is_blocked:
|
|
if is_blocked:
|
|
|
logger.warning(f"Login attempt for blocked user: {request.email}, locked: {is_blocked}")
|
|
logger.warning(f"Login attempt for blocked user: {request.email}, locked: {is_blocked}")
|
|
|
return JSONResponse(status_code=403, content={"message": UserResponse.USER_FORMAT_BLOCKED.format(time=f"{int(str(redis_client.ttl(f'blocked:{request.email}'))) // 60} minutos")})
|
|
return JSONResponse(status_code=403, content={"message": UserResponse.USER_FORMAT_BLOCKED.format(time=f"{int(str(redis_client.ttl(f'blocked:{request.email}'))) // 60} minutos")})
|
|
@@ -115,6 +115,12 @@ async def login_user(request: LoginRequest):
|
|
|
logger.info(f"{redis_client.get(f'blocked:{request.email}')}")
|
|
logger.info(f"{redis_client.get(f'blocked:{request.email}')}")
|
|
|
|
|
|
|
|
user = user_data_service.login(request.email, request.pin)
|
|
user = user_data_service.login(request.email, request.pin)
|
|
|
|
|
+
|
|
|
|
|
+ referer = http_request.headers.get("referer")
|
|
|
|
|
+ if referer and "admin" in referer:
|
|
|
|
|
+ if not user or user_data_service.permissions(user.id) == 0:
|
|
|
|
|
+ logger.warning(f"Unauthorized admin access attempt by {request.email}")
|
|
|
|
|
+ return JSONResponse(status_code=403, content={"message": UserResponse.NOT_PERMITTED})
|
|
|
if user:
|
|
if user:
|
|
|
# Successful login, return user data and token
|
|
# Successful login, return user data and token
|
|
|
redis_client.delete(f"login_attempts:{request.email}")
|
|
redis_client.delete(f"login_attempts:{request.email}")
|