import csv
import os
import json
import secrets
from typing import List, Dict, Union, Annotated
from fastapi import FastAPI, Request, HTTPException, Header, Depends
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from openai import OpenAI
from dotenv import load_dotenv
from starlette.middleware.sessions import SessionMiddleware
from impresora.printer import PrinterUSB
from impresora.order import *
import smtplib
from email.message import EmailMessage
# Load environment variables from .env file
load_dotenv()
import fudo.fudo as fd
# Configuration
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
PORT = int(os.getenv("PORT", 6001))
EXCLUDED_BEER_IDS = [14, 12, 11];
# SECRET_KEY is crucial for signing session cookies.
# Fallback to a default if not set, but warn that this is insecure for production.
SECRET_KEY = os.getenv("SECRET_KEY", "your_very_very_secret_key_for_signing_cookies_python_v2")
if SECRET_KEY == "your_very_very_secret_key_for_signing_cookies_python_v2":
print("WARNING: Using default SECRET_KEY. Please set a strong SECRET_KEY in your .env file for production.")
if not OPENAI_API_KEY:
print("CRITICAL ERROR: OPENAI_API_KEY environment variable not set. The applicaton will not work correctly.")
# Potentially exit or prevent app startup if critical env var is missing
# raise ValueError("OPENAI_API_KEY is not set, cannot start application.")
# --- FastAPI App Initialization ---
app = FastAPI(title="Web Pedidos Klein - FastAPI Backend")
# Add SessionMiddleware
# This middleware adds session support using signed cookies.
# Original Express maxAge was 1 hour (60 * 60 * 1000 ms)
app.add_middleware(
SessionMiddleware,
secret_key=SECRET_KEY,
max_age=60 * 60 # max_age in seconds for Starlette
)
# --- Data Loading ---
# Assumes data.json is in the same directory as main.py
# The original path was web_pedidos/src/data.json
# For the Python version, copy src/data.json to be alongside main.py
BG_DATA_PATH = os.path.join(os.path.dirname(__file__), 'data.json')
PRODUCTS_PATH = os.path.join(os.path.dirname(__file__), 'products.json')
def add_product_to_fudo(product_id: int, quantity: int, table_number:int, comment: str | None = None):
table = fd.get_table(table_number)
if not table:
print(f"Error: Table {table_number} not found.")
return None
activeSale = fd.get_active_sale(table)
if not activeSale:
activeSale = fd.create_sale(table['id'])
if not activeSale:
print(f"Error: Could not create sale for table {table_number}.")
return None
item = fd.create_item(product_id, quantity, activeSale['id'], comment)
if not item:
print(f"Error: Could not create item for product {product_id}.")
return None
return item
def send_email():
# Datos del remitente
EMAIL_ORIGEN = 'expresspedidos211@gmail.com'
EMAIL_DESTINO = ['erwinjacimino2003@gmail.com', "mompyn@gmail.com"]
CONTRASENA = 'drkassszdtgapufg'
# Crear el correo
msg = EmailMessage()
msg['Subject'] = 'Impresora Desconectada weon :('
msg['From'] = EMAIL_ORIGEN
msg['To'] = ", ".join(EMAIL_DESTINO)
msg.set_content('Este correo tiene contenido HTML.')
msg.add_alternative("""
|
🖨️
|
|
¡Impresora Desconectada!
|
|
No se puede establecer conexión con la impresora.
Por favor, verifica la conexión y vuelve a intentarlo.
|
|
🔴 Estado: Desconectada
|
|
""", subtype='html')
# Enviar el correo usando SMTP de Gmail
with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp:
smtp.login(EMAIL_ORIGEN, CONTRASENA)
smtp.send_message(msg)
def load_bg_data() -> List[Dict[str, str]]:
try:
with open(BG_DATA_PATH, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
print(f"ERROR: Data file not found at {BG_DATA_PATH}. Serving with empty data.")
return []
except json.JSONDecodeError:
print(f"ERROR: Could not decode JSON from {BG_DATA_PATH}. Serving with empty data.")
return []
def load_products() -> List[Dict[str, str]]:
try:
with open(PRODUCTS_PATH, 'r', encoding='utf-8') as f:
return list(filter(lambda product: product['id'] not in EXCLUDED_BEER_IDS, json.load(f)))
except FileNotFoundError:
print(f"ERROR: Data file not found at {PRODUCTS_PATH}. Serving with empty data.")
return []
except json.JSONDecodeError:
print(f"ERROR: Could not decode JSON from {PRODUCTS_PATH}. Serving with empty data.")
return []
bg_data_loaded = load_bg_data()
all_products = load_products()
# region --- Pydantic Models for Request/Response Typing ---
class Message(BaseModel):
role: str
content: str
class ChatCompletionRequest(BaseModel):
messages: List[Message]
user: str
class ItemWeb(BaseModel):
id: int
name: str
quantity: int
price: float
itemTotal: float
class OrderWeb(BaseModel):
customerName: str
items: List[ItemWeb]
totalAmount: float
orderDate: str
table: int
# endregion --- Pydantic Models for Request/Response Typing ---
# region --- OpenAI Service Logic ---
openai_client = OpenAI(api_key=OPENAI_API_KEY)
async def generate_completion(messages_array: List[Message], session_id: str) -> str:
if not OPENAI_API_KEY:
print("Error: OpenAI API key is not configured.")
raise HTTPException(status_code=500, detail="OpenAI API key not configured on server.")
print(f"[OpenAI Service Python] Session/Token {session_id} sent: {[msg.model_dump() for msg in messages_array]}")
data_for_prompt = [
f'{{"pregunta": "{item.get("q", "")}", "respuesta": "{item.get("ans", "")}"}}'
for item in bg_data_loaded
]
data_string = "\n".join(data_for_prompt)
preprompt = f"""
Eres un asistente de el bar klein, tu nombre es camilo klein, usas emojis para responder.
y ser carismatico con el cliente.
tus responsabilidades son:
- Responder preguntas sobre el menu de el bar klein
- Proporcionar información sobre el menú de el bar klein
- Proporcionar recomendaciones sobre el menú de el bar klein
- Proporcionar información sobre la comida de el bar klein
- No puedes tomar pedidos de clientes, solo informar
- Debes evadir cualquier pregunta que no sea relacionada con el bar klein
para esto usaras los siguientes datos:
{data_string}
""" #
processed_messages: List[Dict[str, str]] = [{"role": "system", "content": preprompt}]
processed_messages.extend([msg.model_dump() for msg in messages_array])
try:
completion = openai_client.chat.completions.create(
model="gpt-4o-mini", #
messages=processed_messages, # type: ignore (OpenAI lib expects list of specific dicts)
temperature=0.3, #
)
response_content = completion.choices[0].message.content
return response_content if response_content else "-1" #
except Exception as e:
print(f"Error calling OpenAI: {e}")
# Avoid exposing detailed error messages to the client unless necessary
raise HTTPException(status_code=500, detail="Error al procesar tu solicitud con OpenAI.")
# endregion --- OpenAI Service Logic ---
# --- Security/Token Dependency ---
async def get_session_token(request: Request) -> Union[str, None]:
return request.session.get("antiAbuseToken")
async def protect_chat_api(
request: Request,
x_app_token: Annotated[Union[str, None], Header(alias="X-App-Token")] = None,
session_token: Annotated[Union[str, None], Depends(get_session_token)] = None
):
# Equivalent to protectChatAPI middleware
if not session_token:
raise HTTPException(status_code=403, detail="Acceso denegado: Sesión inválida o token no inicializado.")
if not x_app_token:
raise HTTPException(status_code=401, detail="Acceso denegado: Falta el token X-Chat-Token.")
if x_app_token != session_token:
# Log this attempt for security monitoring
print(f"WARN: Invalid token attempt. Expected: {session_token}, Received: {x_app_token}")
raise HTTPException(status_code=403, detail="Acceso denegado: Token inválido.")
return True # Protection passed
@app.get("/api/get_products", summary="Get products")
async def get_products():
return JSONResponse({"products": all_products})
# --- API Endpoints ---
@app.get("/api/chat/init-chat", summary="Initialize chat and get anti-abuse token")
async def init_chat(request: Request):
current_token = request.session.get("antiAbuseToken")
if not current_token:
new_token = secrets.token_hex(32)
request.session["antiAbuseToken"] = new_token # Store in session
print(f"Generated new antiAbuseToken for session: {new_token}")
return JSONResponse({"chatToken": new_token})
else:
# print(f"Using existing antiAbuseToken for session: {current_token}")
return JSONResponse({"chatToken": current_token})
class UserCodeRequest(BaseModel):
user_code: str
@app.post("/api/existsUser", summary="Check if user exists")
async def exists_user(request: UserCodeRequest):
with open('users.json', 'r') as f:
users = json.load(f)
for user in users:
if user['userCode'] == request.user_code:
return JSONResponse({
"success": True,
"userName": user['userName']
})
return JSONResponse({
"success": True,
"userName": request.user_code
})
@app.post("/api/printer/order", summary="Printer order", dependencies=[Depends(protect_chat_api)])
async def printer_order(order: OrderWeb):
print("Printer order received")
print(order)
items = order.items
table = order.table
if not items or not table:
return JSONResponse(status_code=400, content={"message": "Items and table are required."})
if not isinstance(table, int):
return JSONResponse(status_code=400, content={"message": "Table must be an integer."})
product_errors = []
for item in items:
product = add_product_to_fudo(item.id, item.quantity, table)
if not product:
product_errors.append(f"Error adding product {item.id} to table {table}.")
if product_errors:
return JSONResponse(status_code=424, content={"message": "Error adding products to table.", "errors": product_errors})
# en caso de que no alla error, imprimimos el pedido
printer = PrinterUSB(0xfe6,0x811e)
print_order = Order(order.customerName,[Item(item.name, item.price, item.quantity) for item in items])
try:
printer.print_order(print_order, table)
except:
#Si la impresora no esta conectada, enviamos un correo
send_email()
return JSONResponse(status_code=424, content={"message": "No se pudo imprimir el Pedido, impresora desconectada"})
# Logs de pedidos
if not os.path.exists('logs.csv'):
with open('logs.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['userName', 'table', 'orderDate', 'items'])
else:
with open('logs.csv', 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([order.customerName, order.table, order.orderDate, list(map(lambda item: item.name, items))])
@app.post("/api/chat/completions",
summary="Get chat completions from OpenAI",
dependencies=[Depends(protect_chat_api)])
async def chat_completions(request_data: ChatCompletionRequest, request: Request):
# Uses session_token (which is the antiAbuseToken) as an identifier for logging
session_identifier = request.session.get("antiAbuseToken", "unknown_session")
try:
openai_response = await generate_completion(request_data.messages, session_identifier)
if os.path.exists("llm_logs.txt"):
with open("llm_logs.txt", "a") as f:
f.write(f"{request_data.user}: {openai_response}\n")
else:
with open("llm_logs.txt", "w") as f:
f.write(f"{request_data.user}: {openai_response}\n")
return JSONResponse({"response": openai_response})
except HTTPException as e: # Re-raise HTTPExceptions from called functions
raise e
except Exception as e:
print(f"Unexpected error in /api/chat/completions: {e}")
raise HTTPException(status_code=500, detail="Error interno del servidor al procesar el chat.")
@app.get("/", response_class=HTMLResponse, include_in_schema=False)
async def serve_index_html():
index_path = os.path.join("public", "index.html")
if not os.path.exists(index_path):
raise HTTPException(status_code=404, detail="public/index.html not found.")
return FileResponse(index_path)
app.mount("/", StaticFiles(directory="public", html=False), name="public_root_assets")
# --- Main Application Runner ---
if __name__ == "__main__":
if not OPENAI_API_KEY:
print("FATAL: OPENAI_API_KEY is not set. OpenAI features will fail.")
print("Please create a .env file with OPENAI_API_KEY='your_key_here'")
with open(".env", "w") as f:
f.write("OPENAI_API_KEY='your_key_here'")
print(f"Servidor corriendo en http://localhost:{PORT}")
if not os.path.exists(BG_DATA_PATH):
print(f"ADVERTENCIA: {BG_DATA_PATH} no encontrado. El asistente de IA no tendrá datos específicos del menú.")
else:
print(f"Datos del asistente cargados desde: {os.path.abspath(BG_DATA_PATH)}")
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=PORT)