import json import os import re import psycopg2 from config.settings import POSTGRESQL_DB_CONFIG from typing import List, Dict, Optional from abc import ABC from config.settings import BG_DATA_PATH, IMAGE_PATH, PRODUCT_DATA_PATH, CURRENT_URL from toteat.toteat import get_all_products, get_category_dict from logging import getLogger from datetime import datetime from cryptography.fernet import Fernet from rich import print from config.settings import PIN_KEY import base64 as b64 from time import time # Import models from models.user import User from models.items import Product from models.sales import Sale from models.blacklist import Blacklist fernet = Fernet(PIN_KEY.encode()) logger = getLogger(__name__) """ ESQUEMA DE BASE DE DATOS SQLITE (data.db) 1. Tabla: users ----------------------------------- - id SERIAL PRIMARY KEY - email VARCHAR(255) UNIQUE NOT NULL - name VARCHAR(255) NOT NULL - rut VARCHAR(20) UNIQUE NOT NULL - pin_hash TEXT NOT NULL (encriptado) - kleincoins TEXT NOT NULL (encriptado, valor por defecto "0") - created_at TEXT NOT NULL (fecha de creación en formato ISO 8601) - reward_progress INTEGER DEFAULT 0 NOT NULL CHECK (reward_progress >= 0 AND reward_progress <= 100) (Guarda la información del usuario con su pin hasheado y kleincoins encriptadas) 2. Tabla: products ----------------------------------- - id INTEGER PRIMARY KEY - name VARCHAR(255) NOT NULL - type VARCHAR(100) NOT NULL - description TEXT NOT NULL - price INTEGER NOT NULL - image TEXT (URL de la imagen) - status INTEGER DEFAULT 1 NOT NULL CHECK (status IN (0, 1)) -- 0: Inactivo, 1: Activo - promo_id INTEGER (ID de la promoción asociada, si existe)(puede ser null) - promo_price INTEGER (Precio promocional, si existe)(puede ser null) - promo_day INTEGER (Día de la semana para la promoción, 1-7)(puede ser null) (Guarda los productos disponibles para venta con su estado activo/inactivo) 3. Tabla: sales ----------------------------------- - id SERIAL PRIMARY KEY - user_id INTEGER NOT NULL REFERENCES users.id - total INTEGER NOT NULL (precio total de la venta) - fudo_id TEXT UNIQUE NOT NULL (ID string único por venta) - date TEXT NOT NULL (fecha y hora en formato ISO 8601) - table INTEGER NOT NULL (número de mesa) (Guarda cada venta, asociada a un usuario y mesa) 4. Tabla: sale_products ----------------------------------- - sale_id INTEGER NOT NULL (relación a sales.id) - product_id INTEGER NOT NULL (relación a products.id) - quantity INTEGER NOT NULL DEFAULT 1 (cantidad del producto) (Relación muchos a muchos entre ventas y productos con cantidad) 5. Tabla: blacklist ----------------------------------- - id SERIAL PRIMARY KEY - user_id INTEGER NOT NULL (relación a users.id) (Usuarios bloqueados o no autorizados para ciertas acciones) RELACIONES: ----------------------------------- - users puede tener muchas sales - sales puede tener muchos productos (y viceversa), por eso se usa una tabla intermedia (sale_products) - products pueden repetirse en múltiples sales """ # Base abstract class for data access class BaseDataService(ABC): """Abstract base class for data services""" def __init__(self): self.db_config = POSTGRESQL_DB_CONFIG def _get_connection(self): """Get database connection""" return psycopg2.connect( dbname=self.db_config['dbname'], user=self.db_config['user'], password=self.db_config['password'], host=self.db_config['host'], port=self.db_config['port'] ) # User Data Service class UserDataService(BaseDataService): """Service for managing user data""" #region Create def create(self, name: str, email: str, rut: str, pin_hash: str) -> int: """Add a new user to the database""" conn = self._get_connection() cursor = conn.cursor() try: cursor.execute( "INSERT INTO users (name, email, rut, pin_hash, kleincoins, created_at) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id", (name, email, rut, fernet.encrypt(pin_hash.encode()).decode(), fernet.encrypt(b"0").decode(), datetime.now().isoformat()) ) conn.commit() user_id = cursor.fetchone() if user_id: logger.info(f"User added with ID: {user_id[0]}") return user_id[0] else: logger.error("Failed to add user.") return -1 except psycopg2.IntegrityError as e: logger.error(f"Failed to add user: {e}") return -1 finally: conn.close() #endregion #region Read def get_all(self) -> List[User]: """Get all users from the database""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM users") users = cursor.fetchall() conn.close() return [ User( id=user[0], email=user[1], name=user[2], rut=user[3], pin_hash=fernet.decrypt(user[4].encode()).decode(), kleincoins=fernet.decrypt(user[5].encode()).decode(), created_at=user[6], permissions=user[7], reward_progress=user[8] ) for user in users ] def get_by_id(self, user_id: int) -> Optional[User]: """Get user data from the database""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,)) user = cursor.fetchone() conn.close() if user: return User( id=user[0], email=user[1], name=user[2], rut=user[3], pin_hash=fernet.decrypt(user[4].encode()).decode(), kleincoins=fernet.decrypt(user[5].encode()).decode(), created_at=user[6], permissions=user[7], reward_progress=user[8] ) return None def get_by_email(self, email: str) -> Optional[User]: """Get user by email""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE email = %s", (email,)) user = cursor.fetchone() conn.close() logger.debug(f"get_by_email: {email}, user: {user}") if user: return User( id=user[0], email=user[1], name=user[2], rut=user[3], pin_hash=user[4], kleincoins=fernet.decrypt(user[5].encode()).decode(), created_at=user[6], permissions=user[7], reward_progress=user[8] ) return None def login(self, email: str, pin_hashed: str) -> Optional[User]: """Login user by email and pin hash""" user = self.get_by_email(email) if user: logger.info(f"comparing {fernet.decrypt(user.pin_hash.encode()).decode()} with {pin_hashed}") if user and fernet.decrypt(user.pin_hash.encode()).decode() == pin_hashed: logger.info(f"User {user.email} logged in successfully.") return user else: logger.error("Login failed: Invalid email or pin.") return None def permissions(self, user_id: int) -> int: """Get user permissions""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT permissions FROM users WHERE id = %s", (user_id,)) result = cursor.fetchone() if not result: logger.error(f"User with ID {user_id} not found.") return 0 logger.info(f"userID: {user_id}, permissions: {result[0]}") result = result[0] conn.close() if result: return result return 0 def get_by_rut(self, rut: str) -> Optional[User]: """Get user by RUT""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE rut = %s", (rut,)) user = cursor.fetchone() if not user: logger.error(f"User with RUT {rut} not found.") conn.close() return None user = user[0] conn.close() if user: return User( id=user[0], email=user[1], name=user[2], rut=user[3], pin_hash=fernet.decrypt(user[4].encode()).decode(), kleincoins=fernet.decrypt(user[5].encode()).decode(), created_at=user[6], permissions=user[7], reward_progress=user[8] ) return None def get_next_id(self) -> int: """Get the next user ID""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT last_value FROM users_id_seq") result = cursor.fetchone() conn.close() if result and result[0]: return result[0] return 1 #endregion #region Update def update(self, user_id: int, email=None, name=None, rut=None, pin_hash=None, kleincoins=None) -> bool: """Update user information in the database""" conn = self._get_connection() cursor = conn.cursor() updates = [] params = [] if email: updates.append("email = %s") params.append(email) if name: updates.append("name = %s") params.append(name) if rut: updates.append("rut = %s") params.append(rut) if pin_hash: updates.append("pin_hash = %s") params.append(fernet.encrypt(pin_hash.encode()).decode()) if kleincoins is not None: updates.append("kleincoins = %s") params.append(fernet.encrypt(str(kleincoins).encode()).decode()) if not updates: conn.close() return False try: cursor.execute(f"UPDATE users SET {', '.join(updates)} WHERE id = %s", (*params, user_id)) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"User with ID {user_id} updated.") return success except psycopg2.IntegrityError as e: logger.error(f"Failed to update user: {e}") return False finally: conn.close() #endregion def set_reward_progress(self, user_id: int, progress: int) -> bool: """Add progress to user's reward""" conn = self._get_connection() cursor = conn.cursor() try: cursor.execute("UPDATE users SET reward_progress = %s WHERE id = %s", (progress, user_id)) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"Reward progress updated for user {user_id}: {progress}") return success except psycopg2.IntegrityError as e: logger.error(f"Failed to update reward progress: {e}") return False finally: conn.close() #region Delete def delete(self, user_id: int) -> bool: """Delete a user from the database""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM users WHERE id = %s", (user_id,)) conn.commit() conn.close() if cursor.rowcount > 0: logger.info(f"User with ID {user_id} deleted.") return True else: logger.error(f"Failed to delete user with ID {user_id}.") return False def update_kleincoins(self, user_id: int, kleincoins: int) -> bool: """Update user's kleincoins""" conn = self._get_connection() cursor = conn.cursor() try: cursor.execute( "UPDATE users SET kleincoins = %s WHERE id = %s", (fernet.encrypt(str(kleincoins).encode()).decode(), user_id) ) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"Kleincoins updated for user {user_id}: {kleincoins}") return success except psycopg2.IntegrityError as e: logger.error(f"Failed to update kleincoins: {e}") return False finally: conn.close() def get_kleincoins(self, user_id: int) -> Optional[int]: """Get user's kleincoins""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT kleincoins FROM users WHERE id = %s", (user_id,)) result = cursor.fetchone() conn.close() if result: try: return int(fernet.decrypt(result[0].encode()).decode()) except Exception as e: logger.error(f"Failed to decrypt kleincoins for user {user_id}: {e}") return None return None #endregion # Blacklist Data Service class BlacklistDataService(BaseDataService): """Service for managing blacklisted users""" #region Create def create(self, user_id: int) -> int: """Add a user to the blacklist""" conn = self._get_connection() cursor = conn.cursor() try: cursor.execute("INSERT INTO blacklist (user_id) VALUES (%s) RETURNING id", (user_id,)) conn.commit() blacklist_id = cursor.fetchone() if blacklist_id: logger.info(f"User with ID {blacklist_id[0]} added to blacklist.") return blacklist_id[0] else: logger.error(f"Failed to add user with ID {user_id} to blacklist.") return -1 except psycopg2.IntegrityError as e: logger.error(f"Failed to add user to blacklist: {e}") return -1 finally: conn.close() #endregion #region Read def get_all(self) -> List[Blacklist]: """Get all blacklisted users""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT b.id, b.user_id, u.email, u.name, u.rut FROM blacklist b LEFT JOIN users u ON b.user_id = u.id """) blacklisted = cursor.fetchall() conn.close() return [ Blacklist( id=row[0], user_id=row[1], email=row[2], name=row[3], rut=row[4] ) for row in blacklisted ] def get_by_id(self, id: int) -> Optional[Blacklist]: """Get blacklist entry by ID""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT b.id, b.user_id, u.email, u.name, u.rut FROM blacklist b LEFT JOIN users u ON b.user_id = u.id WHERE b.id = %s """, (id,)) row = cursor.fetchone() conn.close() if row: return Blacklist( id=row[0], user_id=row[1], email=row[2], name=row[3], rut=row[4] ) return None def get_blacklisted_user_ids(self) -> List[int]: """Get a list of blacklisted user IDs""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT user_id FROM blacklist") blacklisted_users = [row[0] for row in cursor.fetchall()] conn.close() return blacklisted_users def is_user_blacklisted(self, user_id: int) -> bool: """Check if a user is blacklisted""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM blacklist WHERE user_id = %s", (user_id,)) blacklisted = cursor.fetchone() is not None conn.close() return blacklisted #endregion #region Update def update(self, id: int, **kwargs) -> bool: """Update blacklist entry (not commonly used)""" # Blacklist entries typically don't need updates return False #endregion #region Delete def delete(self, id: int) -> bool: """Remove a blacklist entry by ID""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM blacklist WHERE id = %s", (id,)) conn.commit() success = cursor.rowcount > 0 conn.close() if success: logger.info(f"Blacklist entry with ID {id} removed.") else: logger.error(f"Failed to remove blacklist entry with ID {id}.") return success def remove_user_from_blacklist(self, user_id: int) -> bool: """Remove a user from the blacklist""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM blacklist WHERE user_id = %s", (user_id,)) conn.commit() success = cursor.rowcount > 0 conn.close() if success: logger.info(f"User with ID {user_id} removed from blacklist.") else: logger.error(f"Failed to remove user with ID {user_id} from blacklist.") return success #endregion # Product Data Service class ProductDataService(BaseDataService): category_dict = {} product_cache = { "products": [], "expires": 0 } """Service for managing products""" #region Create def create(self, id: str, name: str, price: float, type: Optional[str] = None, description: Optional[str] = None, image: Optional[str] = None, status: int = 1) -> Optional[str]: """Add a new product to the database""" conn = self._get_connection() cursor = conn.cursor() if image and "base64" in image: extension = re.search(r'data:image/(.*%s);base64,', image) image_name = f"{id}_img" if extension: image_name += f".{extension.group(1)}" image = self._image_process(image_name, image) try: cursor.execute( "INSERT INTO products (id, name, type, description, price, image, status) VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id", (id, name, type, description, price, image, status) ) conn.commit() product_id = cursor.fetchone() if product_id: logger.info(f"Product added with ID: {product_id[0]}") return product_id[0] else: logger.error("Failed to add product.") return None except psycopg2.Error as e: logger.error(f"Failed to add product: {e}") return None finally: conn.close() def load_data(self, products: List[Dict[str, str]]) -> None: """Load multiple products from a list of dictionaries""" conn = self._get_connection() cursor = conn.cursor() for product in products: try: self.create( id=str(product['id']), name=product['name'], price=int(product['price']), type=product.get('type'), description=product.get('description'), image=product.get('image'), status=int(product.get('status', 1)) # Default to active (1) ) except Exception as e: logger.error(f"Failed to load product {product['name']}: {e}") conn.close() #endregion #region Read async def get_all(self) -> List[Product]: """Get all products from the database""" return_list = [] beUpdate = False # se ve si la cache expiro o si esta vacia actual_time = time() if self.product_cache['expires'] < actual_time or not self.product_cache['products']: self.product_cache['expires'] = actual_time # Por ahora la cache no expira beUpdate = True else: logger.debug("Product cache not expired") return self.product_cache['products'] products = await get_all_products() for product in products: categoryID = product['relationships']['productCategory']['data']['id'] # en caso de que la categoria no este en el diccionario, se actualiza if categoryID not in self.category_dict: self.category_dict = get_category_dict() try: return_list.append( Product( id=str(product['id']), name=product['attributes']['name'], type=self.category_dict[categoryID], description=product['attributes'].get('description'), price=int(product['attributes'].get('price', 0)), image=product['attributes'].get('imageUrl'), kitchen_id=product['relationships']['kitchen']['data']['id'] or None, status=1 if product['attributes'].get('active') else 0, promo_id=None, promo_price=None, promo_day=None )) except Exception as e: print(e) continue if beUpdate: self.product_cache['products'] = return_list return return_list def get_by_id(self, product_id: str) -> Optional[Product]: """Get product by ID""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM products WHERE id = %s", (product_id,)) product = cursor.fetchone() conn.close() if product: return Product( id=product[0], name=product[1], type=product[2], description=product[3], price=product[4], image=product[5], status=product[6], promo_id=product[7], promo_price=product[8], promo_day=product[9] ) return None def get_by_type(self, product_type: str) -> List[Product]: """Get products by type""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM products WHERE type = %s", (product_type,)) products = cursor.fetchall() conn.close() return [ Product( id=product[0], name=product[1], type=product[2], description=product[3], price=product[4], image=product[5], status=product[6], promo_id=product[7], promo_price=product[8], promo_day=product[9] ) for product in products ] def search_by_name(self, name: str) -> List[Product]: """Search products by name""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM products WHERE name LIKE %s", (f"%{name}%",)) products = cursor.fetchall() conn.close() return [ Product( id=product[0], name=product[1], type=product[2], description=product[3], price=product[4], image=product[5], status=product[6], promo_id=product[7], promo_price=product[8], promo_day=product[9] ) for product in products ] async def get_products(self, product_ids: List[str]) -> List[Product]: """Get multiple products by their IDs""" products = await self.get_all() products = list(filter(lambda x: x.id in product_ids, products)) return products #endregion #region Update def _image_process(self, image: str, base64: str) -> str: """Process image for storage""" if not image or not base64: raise ValueError("Image and base64 data must be provided") image_path = os.path.join(IMAGE_PATH, image) if not os.path.exists(IMAGE_PATH): os.makedirs(IMAGE_PATH) with open(image_path, 'wb') as img_file: img_file.write(b64.b64decode(base64.split(',')[1])) return CURRENT_URL+"/images/" + image # Return url def update(self, product_id: str, name=None, type=None, description=None, price=None, image=None, status=None) -> bool: """Update product information""" conn = self._get_connection() cursor = conn.cursor() product = self.get_by_id(product_id) if not product: logger.error(f"Product with ID {product_id} not found.") return False updates = [] params = [] if name is not None: updates.append("name = %s") params.append(name) if type is not None: updates.append("type = %s") params.append(type) if description is not None: updates.append("description = %s") params.append(description) if price is not None: updates.append("price = %s") params.append(price) if image is not None: if product.image and product.image != image: if os.path.exists(os.path.join(IMAGE_PATH, product.image)): os.remove(os.path.join(IMAGE_PATH, product.image)) try: logger.info(f"Processing image: {image[:40]}... for product {product_id}") logger.info(f"updates: {updates}, params: {params}") extension = re.search(r'data:image/(.*?);base64,', image[:40]) if not extension: raise ValueError("Invalid image format") extension = extension.group(1) image = self._image_process(f"{product.id}_img.{extension}", image) except ValueError as e: logger.error(f"Failed to process image: {e}") return False updates.append("image = %s") params.append(image) if status is not None: updates.append("status = %s") params.append(status) if not updates: conn.close() return False try: cursor.execute(f"UPDATE products SET {', '.join(updates)} WHERE id = %s", (*params, product_id)) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"Product with ID {product_id} updated.") return success except psycopg2.Error as e: logger.error(f"Failed to update product: {e}") return False finally: conn.close() def get_active_products(self) -> List[Product]: """Get only active products (status = 1)""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM products WHERE status = 1") products = cursor.fetchall() conn.close() return [ Product( id=product[0], name=product[1], type=product[2], description=product[3], price=product[4], image=product[5], status=product[6], promo_id=product[7], promo_price=product[8], promo_day=product[9] ) for product in products ] def get_inactive_products(self) -> List[Product]: """Get only inactive products (status = 0)""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM products WHERE status = 0") products = cursor.fetchall() conn.close() return [ Product( id=product[0], name=product[1], type=product[2], description=product[3], price=product[4], image=product[5], status=product[6], promo_id=product[7], promo_price=product[8], promo_day=product[9] ) for product in products ] def activate_product(self, product_id: str) -> bool: """Activate a product (set status to 1)""" return self.update(product_id, status=1) def deactivate_product(self, product_id: str) -> bool: """Deactivate a product (set status to 0)""" return self.update(product_id, status=0) def is_product_active(self, product_id: str) -> Optional[bool]: """Check if a product is active""" product = self.get_by_id(product_id) if product: return product.status == 1 return None def update_cache(self): self.product_cache['expires'] = time() + 60 * 15 # 15 minutos self.product_cache['products'] = self.get_all() #endregion #region Delete def delete(self, product_id: str) -> bool: """Delete a product from the database""" conn = self._get_connection() cursor = conn.cursor() product = self.get_by_id(product_id) if product and product.image and os.path.exists(os.path.join(IMAGE_PATH, product.image.removeprefix(CURRENT_URL + "/images/"))): os.remove(os.path.join(IMAGE_PATH, product.image.removeprefix(CURRENT_URL + "/images/"))) cursor.execute("DELETE FROM products WHERE id = %s", (product_id,)) conn.commit() success = cursor.rowcount > 0 conn.close() if success: logger.info(f"Product with ID {product_id} deleted.") else: logger.error(f"Failed to delete product with ID {product_id}.") return success #endregion # Sales Data Service class SalesDataService(BaseDataService): """Service for managing sales""" #region C def create(self, user_id: int, fudo_id: str, total: float, table: int, product_ids: List[str], quantities: Optional[List[int]] = None) -> int: """Create a new sale with products and quantities""" conn = self._get_connection() cursor = conn.cursor() try: fecha = datetime.now().isoformat() # Insert sale cursor.execute( "INSERT INTO sales (user_id, total, fudo_id, date, table_number) VALUES (%s, %s, %s, %s, %s) RETURNING id", (user_id, total, fudo_id, fecha, table) ) last_sale = cursor.fetchone() sale_id = last_sale[0] if last_sale else None if sale_id and product_ids: # Insert sale-product relationships with quantities if quantities is None: quantities = [1] * len(product_ids) # Default quantity 1 for i, product_id in enumerate(product_ids): quantity = quantities[i] if i < len(quantities) else 1 cursor.execute( "INSERT INTO sale_products (sale_id, product_id, quantity) VALUES (%s, %s, %s)", (sale_id, product_id, quantity) ) conn.commit() if sale_id: logger.info(f"Sale created with ID: {sale_id}, fudo_id: {fudo_id}") return sale_id else: logger.error("Failed to create sale.") return -1 except psycopg2.Error as e: logger.error(f"Failed to create sale: {e}") conn.rollback() return -1 finally: conn.close() def add_product_to_sale(self, sale_id: int, product_id: str, quantity: int = 1) -> bool: """Add a product to an existing sale with quantity""" conn = self._get_connection() cursor = conn.cursor() try: cursor.execute( "INSERT INTO sale_products (sale_id, product_id, quantity) VALUES (%s, %s, %s)", (sale_id, product_id, quantity) ) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"Product {product_id} (qty: {quantity}) added to sale {sale_id}.") return success except psycopg2.IntegrityError as e: logger.error(f"Failed to add product to sale: {e}") return False finally: conn.close() def add_products_to_sale(self, sale_id: int, product_ids: List[str], quantities: Optional[List[int]] = None) -> bool: """Add multiple products to an existing sale with quantities""" conn = self._get_connection() cursor = conn.cursor() try: if quantities is None: quantities = [1] * len(product_ids) # Default quantity 1 for i, product_id in enumerate(product_ids): quantity = quantities[i] if i < len(quantities) else 1 cursor.execute( "INSERT INTO sale_products (sale_id, product_id, quantity) VALUES (%s, %s, %s)", (sale_id, product_id, quantity) ) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"Products added to sale {sale_id}.") return success except psycopg2.IntegrityError as e: logger.error(f"Failed to add products to sale: {e}") return False finally: conn.close() #endregion #region R def get_all(self) -> List[Sale]: """Get all sales from the database""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table, u.name, u.email FROM sales s LEFT JOIN users u ON s.user_id = u.id ORDER BY s.date DESC """) sales = cursor.fetchall() conn.close() return [ Sale( id=sale[0], user_id=sale[1], total=sale[2], fudo_id=sale[3], date=sale[4], table=sale[5], username=sale[6], user_email=sale[7], products=self.get_sale_products(sale[0]) ) for sale in sales ] def get_by_id(self, sale_id: int) -> Optional[Sale]: """Get sale by ID""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table, u.name, u.email FROM sales s LEFT JOIN users u ON s.user_id = u.id WHERE s.id = %s """, (sale_id,)) sale = cursor.fetchone() conn.close() if sale: return Sale( id=sale[0], user_id=sale[1], total=sale[2], fudo_id=sale[3], date=sale[4], table=sale[5], username=sale[6], user_email=sale[7] ) return None def get_by_fudo_id(self, fudo_id: str) -> Optional[Sale]: """Get sale by fudo_id""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table, u.name, u.email FROM sales s LEFT JOIN users u ON s.user_id = u.id WHERE s.fudo_id = %s """, (fudo_id,)) sale = cursor.fetchone() conn.close() if sale: return Sale( id=sale[0], user_id=sale[1], total=sale[2], fudo_id=sale[3], date=sale[4], table=sale[5], username=sale[6], user_email=sale[7] ) return None def get_by_user(self, user_id: int) -> List[Sale]: """Get sales by user ID""" conn = self._get_connection() cursor = conn.cursor() cursor.execute( """ SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table_number, u.name, u.email FROM sales s LEFT JOIN users u ON s.user_id = u.id WHERE s.user_id = %s ORDER BY s.date DESC """, (user_id,) ) sales = cursor.fetchall() conn.close() return [ Sale( id=sale[0], user_id=sale[1], total=sale[2], fudo_id=sale[3], date=datetime.fromisoformat(sale[4]), table=sale[5], username=sale[6], user_email=sale[7], products=self.get_sale_products(sale[0]) ) for sale in sales ] def get_sale_products_ids(self, sale_id: int) -> List[int]: """Get product IDs for a specific sale""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT product_id FROM sale_products WHERE sale_id = %s """, (sale_id,)) products = cursor.fetchall() conn.close() return [product[0] for product in products] def get_sale_products(self, sale_id: int) -> List[Product]: """Get products for a specific sale with quantities""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT p.id, p.name, p.type, p.description, p.price, p.image, p.status, sp.quantity FROM sale_products sp JOIN products p ON sp.product_id = p.id WHERE sp.sale_id = %s """, (sale_id,)) products = cursor.fetchall() conn.close() return [ Product( id=product[0], name=product[1], type=product[2], description=product[3], price=product[4], image=product[5], status=product[6], quantity=product[7], promo_id=None, # Not applicable for sale products promo_price=None, promo_day=None ) for product in products ] def get_by_table(self, table: int) -> List[Sale]: """Get sales by table number""" conn = self._get_connection() cursor = conn.cursor() cursor.execute( """ SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table_number, u.name, u.email FROM sales s LEFT JOIN users u ON s.user_id = u.id WHERE s.table_number = %s ORDER BY s.date DESC """, (table,) ) sales = cursor.fetchall() conn.close() return [ Sale( id=sale[0], user_id=sale[1], total=sale[2], fudo_id=sale[3], date=sale[4], table=sale[5], username=sale[6], user_email=sale[7] ) for sale in sales ] def update_product_quantity(self, sale_id: int, product_id: str, new_quantity: int) -> bool: """Update quantity of a product in a sale""" conn = self._get_connection() cursor = conn.cursor() try: cursor.execute( "UPDATE sale_products SET quantity = %s WHERE sale_id = %s AND product_id = %s", (new_quantity, sale_id, product_id) ) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"Updated quantity to {new_quantity} for product {product_id} in sale {sale_id}.") return success except psycopg2.Error as e: logger.error(f"Failed to update product quantity: {e}") return False finally: conn.close() def get_product_quantity_in_sale(self, sale_id: int, product_id: str) -> Optional[int]: """Get quantity of a specific product in a sale""" conn = self._get_connection() cursor = conn.cursor() cursor.execute( "SELECT quantity FROM sale_products WHERE sale_id = %s AND product_id = %s", (sale_id, product_id) ) result = cursor.fetchone() conn.close() return result[0] if result else None #endregion #region U def update(self, sale_id: int, user_id=None, total=None, table=None) -> bool: """Update sale information (products should be updated via specific methods)""" conn = self._get_connection() cursor = conn.cursor() updates = [] params = [] if user_id is not None: updates.append("user_id = %s") params.append(user_id) if total is not None: updates.append("total = %s") params.append(total) if table is not None: updates.append("table = %s") params.append(table) if not updates: conn.close() return False try: cursor.execute(f"UPDATE sales SET {', '.join(updates)} WHERE id = %s", (*params, sale_id)) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"Sale with ID {sale_id} updated.") return success except psycopg2.Error as e: logger.error(f"Failed to update sale: {e}") return False finally: conn.close() #endregion #region D def delete(self, sale_id: int) -> bool: """Delete a sale and its product relationships""" conn = self._get_connection() cursor = conn.cursor() try: # Delete sale-product relationships first cursor.execute("DELETE FROM sale_products WHERE sale_id = %s", (sale_id,)) # Delete the sale cursor.execute("DELETE FROM sales WHERE id = %s", (sale_id,)) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"Sale with ID {sale_id} deleted.") else: logger.error(f"Failed to delete sale with ID {sale_id}.") return success except psycopg2.Error as e: logger.error(f"Failed to delete sale: {e}") return False finally: conn.close() def remove_product_from_sale(self, sale_id: int, product_id: str) -> bool: """Remove a product from a sale (removes all quantity)""" conn = self._get_connection() cursor = conn.cursor() cursor.execute( "DELETE FROM sale_products WHERE sale_id = %s AND product_id = %s", (sale_id, product_id) ) conn.commit() success = cursor.rowcount > 0 conn.close() if success: logger.info(f"Product {product_id} removed from sale {sale_id}.") else: logger.error(f"Failed to remove product {product_id} from sale {sale_id}.") return success def decrease_product_quantity(self, sale_id: int, product_id: str, decrease_by: int = 1) -> bool: """Decrease quantity of a product in a sale, removes if quantity becomes 0 or less""" conn = self._get_connection() cursor = conn.cursor() try: # Get current quantity cursor.execute( "SELECT quantity FROM sale_products WHERE sale_id = %s AND product_id = %s", (sale_id, product_id) ) result = cursor.fetchone() if not result: return False current_quantity = result[0] new_quantity = current_quantity - decrease_by if new_quantity <= 0: # Remove the product completely cursor.execute( "DELETE FROM sale_products WHERE sale_id = %s AND product_id = %s", (sale_id, product_id) ) logger.info(f"Product {product_id} removed from sale {sale_id} (quantity reached 0).") else: # Update with new quantity cursor.execute( "UPDATE sale_products SET quantity = %s WHERE sale_id = %s AND product_id = %s", (new_quantity, sale_id, product_id) ) logger.info(f"Product {product_id} quantity decreased to {new_quantity} in sale {sale_id}.") conn.commit() return True except psycopg2.Error as e: logger.error(f"Failed to decrease product quantity: {e}") return False finally: conn.close() #endregion # Factory class to get service instances class DataServiceFactory: """Factory class to create data service instances""" @staticmethod def get_user_service() -> UserDataService: """Get user data service instance""" return UserDataService() @staticmethod def get_blacklist_service() -> BlacklistDataService: """Get blacklist data service instance""" return BlacklistDataService() @staticmethod def get_product_service() -> ProductDataService: """Get product data service instance""" return ProductDataService() @staticmethod def get_sales_service() -> SalesDataService: """Get sales data service instance""" return SalesDataService() # Helper functions for background data def load_bg_data() -> List[Dict[str, str]]: """Load background data for AI assistant""" try: with open(BG_DATA_PATH, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: logger.error(f"Data file not found at {BG_DATA_PATH}. Serving with empty data.") return [] except json.JSONDecodeError: logger.error(f"Could not decode JSON from {BG_DATA_PATH}. Serving with empty data.") return [] def initialize_db(): # En PostgreSQL, no se crea el archivo de base de datos, así que verificamos si las tablas existen. try: conn = psycopg2.connect( dbname=POSTGRESQL_DB_CONFIG['dbname'], user=POSTGRESQL_DB_CONFIG['user'], password=POSTGRESQL_DB_CONFIG['password'], host=POSTGRESQL_DB_CONFIG['host'], port=POSTGRESQL_DB_CONFIG['port'] ) cursor = conn.cursor() # Verificar si la tabla 'users' ya existe cursor.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_name = 'users' ); """) exists = cursor.fetchone() if exists: logger.info("La base de datos ya está inicializada, no se necesita inicializar.") conn.close() return conn.close() except Exception as e: logger.error(f"Error comprobando la existencia de tablas: {e}") # Si hay error, continuamos con la inicialización conn = psycopg2.connect( dbname=POSTGRESQL_DB_CONFIG['dbname'], user=POSTGRESQL_DB_CONFIG['user'], password=POSTGRESQL_DB_CONFIG['password'], host=POSTGRESQL_DB_CONFIG['host'], port=POSTGRESQL_DB_CONFIG['port'] ) cursor = conn.cursor() # Crear tabla de usuarios logger.info("Inicializando base de datos...") logger.info("Creando tabla de usuarios...") cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, email VARCHAR(255) UNIQUE NOT NULL, name VARCHAR(255) NOT NULL, rut VARCHAR(20) UNIQUE NOT NULL, pin_hash TEXT NOT NULL, kleincoins TEXT NOT NULL, created_at TEXT NOT NULL, permissions INTEGER DEFAULT 0 NOT NULL CHECK (permissions IN (0, 1, 2)) -- 0: Usuario normal, 1: Administrador, 2: Superusuario reward_progress INTEGER DEFAULT 0 NOT NULL CHECK (reward_progress >= 0 AND reward_progress <= 100) -- Progreso de recompensas ); """) # Crear tabla de productos logger.info("Creando tabla de products...") cursor.execute(""" CREATE TABLE IF NOT EXISTS products ( id TEXT PRIMARY KEY, name VARCHAR(255) NOT NULL, type VARCHAR(100) NOT NULL, description TEXT NOT NULL, price INTEGER NOT NULL, image TEXT, status INTEGER DEFAULT 1 NOT NULL CHECK (status IN (0, 1)) -- 0: Inactivo, 1: Activo, promo_id INTEGER, promo_price INTEGER, promo_day INTEGER CHECK (promo_day >= 1 AND promo_day <= 7) ); """) # Crear tabla de ventas logger.info("Creando tabla de sales...") cursor.execute(""" CREATE TABLE IF NOT EXISTS sales ( id SERIAL PRIMARY KEY, user_id INTEGER NOT NULL, total INTEGER NOT NULL, fudo_id TEXT UNIQUE NOT NULL, date TEXT NOT NULL, table_number INTEGER NOT NULL, FOREIGN KEY (user_id) REFERENCES users(id) ); """) # Crear tabla intermedia para ventas y productos logger.info("Creando tabla intermedia de sale_products...") cursor.execute(""" CREATE TABLE IF NOT EXISTS sale_products ( sale_id INTEGER NOT NULL, product_id TEXT NOT NULL, quantity INTEGER NOT NULL DEFAULT 1, FOREIGN KEY (sale_id) REFERENCES sales(id), FOREIGN KEY (product_id) REFERENCES products(id) ); """) # Crear tabla de blacklist logger.info("Creando tabla de blacklist...") cursor.execute(""" CREATE TABLE IF NOT EXISTS blacklist ( id SERIAL PRIMARY KEY, user_id INTEGER NOT NULL, FOREIGN KEY (user_id) REFERENCES users(id) ); """) logger.info("Todas las tablas creadas correctamente.") logger.info("Cargando datos de productos desde el archivo JSON...") products_json = json.loads(open(PRODUCT_DATA_PATH, 'r', encoding='utf-8').read()) product_service = ProductDataService() product_service.load_data(products_json) conn.commit() conn.close() logger.info("Base de datos inicializada correctamente.") data_bg_loaded = load_bg_data() if __name__ == "__main__": import asyncio data_service = ProductDataService() products = asyncio.run(data_service.get_all()) print(products[0])