import itertools from time import time import requests from config.settings import DEVELOPMENT import os import redis from logging import getLogger from models.items import Product from concurrent.futures import ThreadPoolExecutor import aiohttp import asyncio logger = getLogger(__name__) api_token = os.getenv('FUDO_API_KEY') api_secret = os.getenv('FUDO_API_SECRET') token = None token_exp = None # Configuración de Redis redis_client = redis.Redis( host=os.getenv('REDIS_HOST', 'localhost'), port=int(os.getenv('REDIS_PORT', 6379)), db=1 if DEVELOPMENT else 0, decode_responses=True ) REDIS_TOKEN_KEY = 'fudo_api_token' def get_token(): """ Obtiene el token de autenticación de Fudo API. Primero verifica si existe un token válido en Redis. Si no existe o ha expirado, solicita uno nuevo y lo guarda en Redis con expiración automática. """ global token, token_exp # Intento de obtener el token desde la RAM if token and token_exp and time() < token_exp: return token try: # Intentar obtener el token desde Redis cached_token = redis_client.get(REDIS_TOKEN_KEY) if cached_token: token = cached_token ttl = redis_client.ttl(REDIS_TOKEN_KEY) if ttl is None or int(str(ttl)) < 0: token_exp = None else: token_exp = int(str(ttl)) + int(time()) return str(cached_token) except Exception as e: logger.error(f"Error al conectar con Redis: {e}") logger.info("Fallback: obteniendo token sin cache") # Si no hay token en cache, solicitar uno nuevo url = 'https://auth.fu.do/api' data = { "apiKey": api_token, "apiSecret": api_secret } r = requests.post(url, data=data) response_data = r.json() token = response_data['token'] expiration_timestamp = response_data['exp'] # Calcular TTL en segundos para Redis current_time = int(time()) ttl_seconds = expiration_timestamp - current_time try: # Guardar el token en Redis con expiración automática if ttl_seconds > 0: redis_client.setex(REDIS_TOKEN_KEY, ttl_seconds, token) else: logger.warning("Warning: El token ya está expirado") except Exception as e: logger.error(f"Error al guardar en Redis: {e}") return token def get_modifiers(): token = get_token() url = 'https://api.fu.do/v1alpha1/product-modifiers?include=product' headers = { 'Authorization': 'Bearer ' + token } r = requests.get(url, headers=headers) return r.json() def get_categories(): token = get_token() url = 'https://api.fu.do/v1alpha1/product-categories' headers = { 'Authorization': 'Bearer ' + token } r = requests.get(url, headers=headers) return r.json().get("data") def get_category_dict(): categories = get_categories() category_dict = {} for category in categories: category_dict[category["id"]] = category["attributes"]["name"] return category_dict def get_category(id_category:int): token = get_token() url = 'https://api.fu.do/v1alpha1/product-categories/{}'.format(id_category) headers = { 'Authorization': 'Bearer ' + token } r = requests.get(url, headers=headers) if r.status_code != 200: logger.error(f"Error al obtener producto: {r.json()['errors']}") data = r.json()["data"] return { "id": data["id"], "name": data["attributes"]["name"], "enableOnlineMenu": data["attributes"].get("enableOnlineMenu", False) } def get_product(id_category:int): url = 'https://api.fu.do/v1alpha1/products/{}'.format(id_category) token = get_token() headers = { 'Authorization': 'Bearer ' + token } r = requests.get(url, headers=headers) if r.status_code != 200: logger.error(f"Error al obtener producto: {r.json()['errors']}") data = r.json().get("data") if not data: return None return Product( id=int(data["id"]), name=data["attributes"]["name"], type=get_category(data["relationships"]["productCategory"]["data"]["id"])["name"] or "Producto", price=data["attributes"]["price"], image=data["attributes"]["imageUrl"], description=data["attributes"]["description"], status=1 if data["attributes"]["active"] and data else 0, kitchen_id=data["relationships"]["kitchen"]["data"]["id"], promo_day=None, promo_price=None, promo_id=None ) def get_products(page: int = 1): url = 'https://api.fu.do/v1alpha1/products?page[number]={}'.format(page) token = get_token() headers = { 'Authorization': 'Bearer ' + token } r = requests.get(url, headers=headers) return r.json()['data'] async def fetch_page(session, url_template, token, page_num): """ Realiza la petición de una sola página de forma asíncrona. Retorna una tupla (numero_pagina, lista_datos). """ url = url_template.format(page_num) headers = {'Authorization': 'Bearer ' + token} try: async with session.get(url, headers=headers) as response: if response.status == 200: payload = await response.json() data = payload.get('data', []) return page_num, data else: # Si falla (ej. 404 o 500), asumimos fin de datos o error recuperable return page_num, [] except Exception: return page_num, [] async def get_all_indexed_products(): url_template = 'https://api.fu.do/v1alpha1/products?page[number]={}' token = get_token() # Asumiendo que esta función existe y es síncrona products = {} # Configuración de fuerza bruta BATCH_SIZE = 8 # Cantidad de peticiones simultáneas current_page = 1 keep_fetching = True # Configuración de conexión (límite de conexiones simultáneas) connector = aiohttp.TCPConnector(limit=100) async with aiohttp.ClientSession(connector=connector) as session: while keep_fetching: # Crear tareas para el bloque actual (ej: páginas 1 a 50) tasks = [ fetch_page(session, url_template, token, page) for page in range(current_page, current_page + BATCH_SIZE) ] # Ejecutar bloque simultáneamente results = await asyncio.gather(*tasks) # Procesar resultados empty_page_found = False for page_num, data in results: if not data: empty_page_found = True # No rompemos el loop inmediato para procesar datos previos en el batch si existen continue for product in data: if product["attributes"]["active"]: if product["attributes"]["enableQrMenu"]: products[product['id']] = product else: logger.warning(f"Product {product['id']}:{product['attributes']['name']} is not QR-enabled. enableQrMenu={product['attributes']['enableQrMenu']}") # Lógica de parada if empty_page_found: # Si algún request en el bloque volvió vacío, asumimos que llegamos al final keep_fetching = False else: # Si todo el bloque trajo datos, preparamos el siguiente bloque current_page += BATCH_SIZE return products async def get_all_products(): """Método para obtener todos los productos de la base de datos.""" return list((await get_all_indexed_products()).values()) N_PER_PAGE = 100 def _get_page_bounds(page: int, token: str): """ Función helper: Obtiene una página y devuelve el primer y último número de mesa en ella, y los datos. """ url = ( 'https://api.fu.do/v1alpha1/tables' f'?page[number]={page}&page[size]={N_PER_PAGE}' '&include=activeSales&sort=number' ) headers = {'Authorization': 'Bearer ' + token} try: r = requests.get(url, headers=headers, timeout=10) if r.status_code != 200: return None, None, None # Error de API data = r.json().get('data', []) if not data: return 0, 0, [] # Página vacía first_number = data[0]['attributes']['number'] last_number = data[-1]['attributes']['number'] return first_number, last_number, data except requests.RequestException as e: print(f"Error de request en página {page}: {e}") return None, None, None def get_table(number: int): token = get_token() # --- FASE 1: BÚSQUEDA EXPONENCIAL (Encontrar rango) --- # Encontrar un 'high_bound' (página) donde el último N° sea >= 'number' page = 1 low_bound_page = 1 high_bound_page = 1 # Primero, revisamos la página 1 first_num, last_num, page_data = _get_page_bounds(page, token) if first_num is None: return None # Error en la primera petición if not page_data: return None # No hay mesas en total # Si está en la página 1 if number >= first_num and number <= last_num: low_bound_page = 1 high_bound_page = 1 # Si es mayor, empezamos a saltar exponencialmente elif number > last_num: low_bound_page = 2 page_jump = 2 while True: current_page = low_bound_page + page_jump - 1 first, last, data = _get_page_bounds(current_page, token) if not data: # Nos pasamos, el rango es entre low y la página actual high_bound_page = current_page - 1 break if number <= last: # Encontramos el techo. El rango es [low_bound_page, current_page] high_bound_page = current_page break # Si no, actualizamos el 'piso' y duplicamos el salto low_bound_page = current_page + 1 page_jump *= 2 # --- FASE 2: BÚSQUEDA BINARIA (En el rango) --- target_page_data = [] while low_bound_page <= high_bound_page: mid_page = (low_bound_page + high_bound_page) // 2 first, last, data = _get_page_bounds(mid_page, token) if not data: # Página vacía, buscar en la mitad inferior high_bound_page = mid_page - 1 continue if number >= first and number <= last: # ¡Encontramos la página correcta! target_page_data = data break elif number < first: # Está en una página anterior high_bound_page = mid_page - 1 else: # number > last # Está en una página posterior low_bound_page = mid_page + 1 # Filtramos la página que encontramos try: return list(filter(lambda x: x['attributes']['number'] == number, target_page_data))[0] except IndexError: # Esto no debería pasar si la lógica es correcta, # pero es una salvaguarda return None def get_table_items(table_number: int): token = get_token() table = get_table(table_number) if not table: return None active_sale = get_active_sale(table) if not active_sale: return None sale = get_sale(active_sale['id']) if not sale: return None items = sale["data"]['relationships']['items']['data'] # 1. Función ajustada para retornar datos def peticion(url, headers): try: r = requests.get(url, headers=headers) if r.status_code == 200: # Aquí procesas el resultado como necesites return r.json() return None except Exception: return None id_list = list(map(lambda x: f"https://api.fu.do/v1alpha1/items/{x['id']}?include=product", items)) print(id_list) headers = { 'Authorization': 'Bearer ' + token } # 2. Corrección en la ejecución del ThreadPoolExecutor resultados = [] with ThreadPoolExecutor(max_workers=10) as executor: # itertools.repeat repite el header para cada url en id_list resultados = list(executor.map(peticion, id_list, itertools.repeat(headers))) return [ { "id": int(data["data"]["relationships"]["product"]["data"]["id"]), "quantity": data["data"]["attributes"]["quantity"], } for data in resultados ] def get_sale(sale_id:int): url = 'https://api.fu.do/v1alpha1/sales/{}'.format(sale_id) token = get_token() headers = { 'Authorization': 'Bearer ' + token } r = requests.get(url, headers=headers) if r.status_code != 200: logger.error('Error al obtener tablas:' + str(r.json()['errors'])) return None return r.json() def create_sale(table_id:int): url = 'https://api.fu.do/v1alpha1/sales' token = get_token() headers = { 'Authorization': 'Bearer ' + token } data = { "data": { "type": "Sale", "attributes": { "people": 1, "saleType": "EAT-IN", "comment": "Pedido desde la app pedidos express" }, "relationships": { "table": { "data": { "id": str(table_id), "type": "Table" } }, "waiter": { "data": { "type": "User", "id": "76" } } } } } r = requests.post(url, headers=headers, json=data) if r.status_code != 201: logger.error('Error al crear la venta:', r.json()) return None return r.json()["data"] def create_item(product_id:int, quantity:int, sale_id:int, comment = None): url = 'https://api.fu.do/v1alpha1/items' token = get_token() headers = { 'Authorization': 'Bearer ' + token } data = { "quantity": quantity, "origin": "MOBILE", "comment": "Pedido desde pedidos express" + (f" - {comment}" if comment else ""), } data = { "data":{ "type": "Item", "attributes": data, "relationships": { "product": { "data": { "type": "Product", "id": str(product_id) } }, "sale": { "data": { "type": "Sale", "id": str(sale_id) } } }, } } r = requests.post(url, headers=headers, json=data) if r.status_code != 201: logger.error(r.json()) return None return r.json()["data"] def get_active_sale(table): data = table['relationships']['activeSales']['data'] if len(data) == 0: return None return data[0] def clear_token(): """ Elimina el token cached de Redis. Útil cuando el token es inválido o se necesita forzar una renovación. """ try: redis_client.delete(REDIS_TOKEN_KEY) logger.info("Token eliminado del cache") except Exception as e: logger.error(f"Error al eliminar token de Redis: {e}") """ Instrucciones para hacer un pedido: 1. Obtener el token de autenticación con `get_token()` (ahora usa Redis cache). 2. Obtener la mesa con `get_table(numero_de_mesa)`. 3. Ver si tiene una activeSale, en caso contrario crear una con `create_sale(id_mesa)`. 4. Agregar los items a la venta con `create_item(id_producto, cantidad, id_venta, comentario)`. Configuración de Redis: - Host: REDIS_HOST (default: localhost) - Puerto: REDIS_PORT (default: 6379) - Base de datos: REDIS_DB (default: 0) """ if __name__ == "__main__": from rich import print print(get_table_items(106))