import math from time import time import requests from config.settings import DEVELOPMENT import os import redis from logging import getLogger from models.items import Product logger = getLogger(__name__) api_token = os.getenv('FUDO_API_KEY') api_secret = os.getenv('FUDO_API_SECRET') token = None token_exp = None # Configuración de Redis redis_client = redis.Redis( host=os.getenv('REDIS_HOST', 'localhost'), port=int(os.getenv('REDIS_PORT', 6379)), db=1 if DEVELOPMENT else 0, decode_responses=True ) REDIS_TOKEN_KEY = 'fudo_api_token' def get_token(): """ Obtiene el token de autenticación de Fudo API. Primero verifica si existe un token válido en Redis. Si no existe o ha expirado, solicita uno nuevo y lo guarda en Redis con expiración automática. """ global token, token_exp # Intento de obtener el token desde la RAM if token and token_exp and time() < token_exp: return token try: # Intentar obtener el token desde Redis cached_token = redis_client.get(REDIS_TOKEN_KEY) if cached_token: token = cached_token ttl = redis_client.ttl(REDIS_TOKEN_KEY) if ttl is None or int(str(ttl)) < 0: token_exp = None else: token_exp = int(str(ttl)) + int(time()) return str(cached_token) except Exception as e: logger.error(f"Error al conectar con Redis: {e}") logger.info("Fallback: obteniendo token sin cache") # Si no hay token en cache, solicitar uno nuevo url = 'https://auth.fu.do/api' data = { "apiKey": api_token, "apiSecret": api_secret } r = requests.post(url, data=data) response_data = r.json() token = response_data['token'] expiration_timestamp = response_data['exp'] # Calcular TTL en segundos para Redis current_time = int(time()) ttl_seconds = expiration_timestamp - current_time try: # Guardar el token en Redis con expiración automática if ttl_seconds > 0: redis_client.setex(REDIS_TOKEN_KEY, ttl_seconds, token) else: logger.warning("Warning: El token ya está expirado") except Exception as e: logger.error(f"Error al guardar en Redis: {e}") return token def get_categories(): token = get_token() url = 'https://api.fu.do/v1alpha1/product-categories' headers = { 'Authorization': 'Bearer ' + token } r = requests.get(url, headers=headers) return r.json() def get_category(id_category:int): token = get_token() url = 'https://api.fu.do/v1alpha1/product-categories/{}'.format(id_category) headers = { 'Authorization': 'Bearer ' + token } r = requests.get(url, headers=headers) if r.status_code != 200: logger.error(f"Error al obtener producto: {r.json()['errors']}") data = r.json()["data"] return { "id": data["id"], "name": data["attributes"]["name"], "enableOnlineMenu": data["attributes"].get("enableOnlineMenu", False) } def get_product(id_category:int): url = 'https://api.fu.do/v1alpha1/products/{}'.format(id_category) token = get_token() headers = { 'Authorization': 'Bearer ' + token } r = requests.get(url, headers=headers) if r.status_code != 200: logger.error(f"Error al obtener producto: {r.json()['errors']}") data = r.json().get("data") if not data: return None return Product( id=int(data["id"]), name=data["attributes"]["name"], type=get_category(data["relationships"]["productCategory"]["data"]["id"])["name"] or "Producto", price=data["attributes"]["price"], image=data["attributes"]["imageUrl"], description=data["attributes"]["description"], status=1 if data["attributes"]["active"] and data else 0, promo_day=None, promo_price=None, promo_id=None ) def get_products(page: int = 1): url = 'https://api.fu.do/v1alpha1/products?page[number]={}'.format(page) token = get_token() headers = { 'Authorization': 'Bearer ' + token } r = requests.get(url, headers=headers) return r.json()['data'] def get_all_products(): """Método para obtener todos los productos de la base de datos. Returns: Diccionario de productos con IDs como claves y datos de productos como valores. """ url = 'https://api.fu.do/v1alpha1/products?page[number]={}' products = {} token = get_token() page = 1 while True: r = requests.get(url.format(page), headers={'Authorization': 'Bearer ' + token}) if r.status_code != 200: if products: return products else: return None data = r.json().get('data') if not data: return products for product in data: products[product['id']] = product page += 1 N_PER_PAGE = 100 def _get_page_bounds(page: int, token: str): """ Función helper: Obtiene una página y devuelve el primer y último número de mesa en ella, y los datos. """ url = ( 'https://api.fu.do/v1alpha1/tables' f'?page[number]={page}&page[size]={N_PER_PAGE}' '&include=activeSales&sort=number' ) headers = {'Authorization': 'Bearer ' + token} try: r = requests.get(url, headers=headers, timeout=10) if r.status_code != 200: return None, None, None # Error de API data = r.json().get('data', []) if not data: return 0, 0, [] # Página vacía first_number = data[0]['attributes']['number'] last_number = data[-1]['attributes']['number'] return first_number, last_number, data except requests.RequestException as e: print(f"Error de request en página {page}: {e}") return None, None, None def get_table(number: int): token = get_token() # --- FASE 1: BÚSQUEDA EXPONENCIAL (Encontrar rango) --- # Encontrar un 'high_bound' (página) donde el último N° sea >= 'number' page = 1 low_bound_page = 1 high_bound_page = 1 # Primero, revisamos la página 1 first_num, last_num, page_data = _get_page_bounds(page, token) if first_num is None: return None # Error en la primera petición if not page_data: return None # No hay mesas en total # Si está en la página 1 if number >= first_num and number <= last_num: low_bound_page = 1 high_bound_page = 1 # Si es mayor, empezamos a saltar exponencialmente elif number > last_num: low_bound_page = 2 page_jump = 2 while True: current_page = low_bound_page + page_jump - 1 first, last, data = _get_page_bounds(current_page, token) if not data: # Nos pasamos, el rango es entre low y la página actual high_bound_page = current_page - 1 break if number <= last: # Encontramos el techo. El rango es [low_bound_page, current_page] high_bound_page = current_page break # Si no, actualizamos el 'piso' y duplicamos el salto low_bound_page = current_page + 1 page_jump *= 2 # --- FASE 2: BÚSQUEDA BINARIA (En el rango) --- target_page_data = [] while low_bound_page <= high_bound_page: mid_page = (low_bound_page + high_bound_page) // 2 first, last, data = _get_page_bounds(mid_page, token) if not data: # Página vacía, buscar en la mitad inferior high_bound_page = mid_page - 1 continue if number >= first and number <= last: # ¡Encontramos la página correcta! target_page_data = data break elif number < first: # Está en una página anterior high_bound_page = mid_page - 1 else: # number > last # Está en una página posterior low_bound_page = mid_page + 1 # Filtramos la página que encontramos try: return list(filter(lambda x: x['attributes']['number'] == number, target_page_data))[0] except IndexError: # Esto no debería pasar si la lógica es correcta, # pero es una salvaguarda return None def get_sale(sale_id:int): url = 'https://api.fu.do/v1alpha1/sales/{}'.format(sale_id) token = get_token() headers = { 'Authorization': 'Bearer ' + token } r = requests.get(url, headers=headers) if r.status_code != 200: logger.error('Error al obtener tablas:' + str(r.json()['errors'])) return None return r.json() def create_sale(table_id:int): url = 'https://api.fu.do/v1alpha1/sales' token = get_token() headers = { 'Authorization': 'Bearer ' + token } data = { "data": { "type": "Sale", "attributes": { "people": 1, "saleType": "EAT-IN", "comment": "Pedido desde la app pedidos express" }, "relationships": { "table": { "data": { "id": str(table_id), "type": "Table" } }, "waiter": { "data": { "type": "User", "id": "76" } } } } } r = requests.post(url, headers=headers, json=data) if r.status_code != 201: logger.error('Error al crear la venta:', r.json()) return None return r.json()["data"] def create_item(product_id:int, quantity:int, sale_id:int, comment = None): url = 'https://api.fu.do/v1alpha1/items' token = get_token() headers = { 'Authorization': 'Bearer ' + token } data = { "quantity": quantity, "origin": "MOBILE", "comment": "Pedido desde pedidos express" + (f" - {comment}" if comment else ""), } data = { "data":{ "type": "Item", "attributes": data, "relationships": { "product": { "data": { "type": "Product", "id": str(product_id) } }, "sale": { "data": { "type": "Sale", "id": str(sale_id) } } }, } } r = requests.post(url, headers=headers, json=data) if r.status_code != 201: logger.error(r.json()) return None return r.json()["data"] def get_active_sale(table): data = table['relationships']['activeSales']['data'] if len(data) == 0: return None return data[0] def clear_token(): """ Elimina el token cached de Redis. Útil cuando el token es inválido o se necesita forzar una renovación. """ try: redis_client.delete(REDIS_TOKEN_KEY) logger.info("Token eliminado del cache") except Exception as e: logger.error(f"Error al eliminar token de Redis: {e}") """ Instrucciones para hacer un pedido: 1. Obtener el token de autenticación con `get_token()` (ahora usa Redis cache). 2. Obtener la mesa con `get_table(numero_de_mesa)`. 3. Ver si tiene una activeSale, en caso contrario crear una con `create_sale(id_mesa)`. 4. Agregar los items a la venta con `create_item(id_producto, cantidad, id_venta, comentario)`. Configuración de Redis: - Host: REDIS_HOST (default: localhost) - Puerto: REDIS_PORT (default: 6379) - Base de datos: REDIS_DB (default: 0) """ if __name__ == "__main__": from rich import print print(get_product(1))