import json from math import log import os import sqlite3 from typing import List, Dict, Optional, Any from abc import ABC, abstractmethod from config.settings import BG_DATA_PATH, DB_PATH, PRODUCT_DATA_PATH from logging import getLogger from datetime import datetime from cryptography.fernet import Fernet from config.settings import PIN_KEY # Import models from models.user import User from models.items import Product from models.sales import Sale from models.blacklist import Blacklist fernet = Fernet(PIN_KEY.encode()) logger = getLogger(__name__) """ ESQUEMA DE BASE DE DATOS SQLITE (data.db) 1. Tabla: users ----------------------------------- - id INTEGER PRIMARY KEY AUTOINCREMENT - email TEXT UNIQUE NOT NULL - name TEXT NOT NULL - rut TEXT UNIQUE NOT NULL - pin_hash TEXT NOT NULL (encriptado) - kleincoins TEXT NOT NULL (encriptado, valor por defecto "0") - created_at TEXT NOT NULL (fecha de creación en formato ISO 8601) (Guarda la información del usuario con su pin hasheado y kleincoins encriptadas) 2. Tabla: products ----------------------------------- - id INTEGER PRIMARY KEY - name TEXT NOT NULL - type TEXT - description TEXT - price REAL NOT NULL - image TEXT (URL de la imagen) - status INTEGER DEFAULT 1 NOT NULL CHECK (status IN (0, 1)) -- 0: Inactivo, 1: Activo (Guarda los productos disponibles para venta con su estado activo/inactivo) 3. Tabla: sales ----------------------------------- - id INTEGER PRIMARY KEY AUTOINCREMENT - user_id INTEGER NOT NULL (relación a users.id) - total REAL NOT NULL (precio total de la venta) - fudo_id TEXT UNIQUE NOT NULL (ID string único por venta) - date TEXT NOT NULL (fecha y hora en formato ISO 8601) - table INTEGER NOT NULL (número de mesa) (Guarda cada venta, asociada a un usuario y mesa) 4. Tabla: sale_products ----------------------------------- - sale_id INTEGER NOT NULL (relación a sales.id) - product_id INTEGER NOT NULL (relación a products.id) - quantity INTEGER NOT NULL DEFAULT 1 (cantidad del producto) (Relación muchos a muchos entre ventas y productos con cantidad) 5. Tabla: blacklist ----------------------------------- - id INTEGER PRIMARY KEY AUTOINCREMENT - user_id INTEGER NOT NULL (relación a users.id) (Usuarios bloqueados o no autorizados para ciertas acciones) RELACIONES: ----------------------------------- - users puede tener muchas sales - sales puede tener muchos productos (y viceversa), por eso se usa una tabla intermedia (sale_products) - products pueden repetirse en múltiples sales """ # Base abstract class for data access class BaseDataService(ABC): """Abstract base class for data services""" def __init__(self, db_path: str = DB_PATH): self.db_path = db_path def _get_connection(self) -> sqlite3.Connection: """Get database connection""" return sqlite3.connect(self.db_path) @abstractmethod def get_all(self) -> List[Any]: """Get all records""" pass @abstractmethod def get_by_id(self, id: int) -> Optional[Any]: """Get record by ID""" pass @abstractmethod def create(self, **kwargs) -> int: """Create new record""" pass @abstractmethod def update(self, id: int, **kwargs) -> bool: """Update record""" pass @abstractmethod def delete(self, id: int) -> bool: """Delete record""" pass # User Data Service class UserDataService(BaseDataService): """Service for managing user data""" #region Create def create(self, name: str, email: str, rut: str, pin_hash: str) -> int: """Add a new user to the database""" conn = self._get_connection() cursor = conn.cursor() try: cursor.execute( "INSERT INTO users (name, email, rut, pin_hash, kleincoins, created_at) VALUES (?, ?, ?, ?, ?, ?)", (name, email, rut, fernet.encrypt(pin_hash.encode()).decode(), fernet.encrypt(b"0").decode(), datetime.now().isoformat()) ) conn.commit() user_id = cursor.lastrowid if user_id: logger.info(f"User added with ID: {user_id}") return user_id else: logger.error("Failed to add user.") return -1 except sqlite3.IntegrityError as e: logger.error(f"Failed to add user: {e}") return -1 finally: conn.close() #endregion #region Read def get_all(self) -> List[User]: """Get all users from the database""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM users") users = cursor.fetchall() conn.close() return [ User( id=user[0], email=user[1], name=user[2], rut=user[3], pin_hash=fernet.decrypt(user[4].encode()).decode(), kleincoins=fernet.decrypt(user[5].encode()).decode(), created_at=user[6] ) for user in users ] def get_by_id(self, user_id: int) -> Optional[User]: """Get user data from the database""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() conn.close() if user: return User( id=user[0], email=user[1], name=user[2], rut=user[3], pin_hash=fernet.decrypt(user[4].encode()).decode(), kleincoins=fernet.decrypt(user[5].encode()).decode(), created_at=user[6] ) return None def get_by_email(self, email: str) -> Optional[User]: """Get user by email""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE email = ?", (email,)) user = cursor.fetchone() conn.close() logger.debug(f"get_by_email: {email}, user: {user}") if user: return User( id=user[0], email=user[1], name=user[2], rut=user[3], pin_hash=user[4], kleincoins=fernet.decrypt(user[5].encode()).decode(), created_at=user[6] ) return None def login(self, email: str, pin_hashed: str) -> Optional[User]: """Login user by email and pin hash""" user = self.get_by_email(email) if user and fernet.decrypt(user.pin_hash.encode()).decode() == pin_hashed: logger.info(f"User {user.email} logged in successfully.") return user else: logger.error("Login failed: Invalid email or pin.") return None def permissions(self, user_id: int) -> int: """Get user permissions""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT permissions FROM users WHERE id = ?", (user_id,)) result = cursor.fetchone() conn.close() if result: return result[0] return 0 def get_by_rut(self, rut: str) -> Optional[User]: """Get user by RUT""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE rut = ?", (rut,)) user = cursor.fetchone() conn.close() if user: return User( id=user[0], email=user[1], name=user[2], rut=user[3], pin_hash=fernet.decrypt(user[4].encode()).decode(), kleincoins=fernet.decrypt(user[5].encode()).decode(), created_at=user[6] ) return None #endregion #region Update def update(self, user_id: int, email=None, name=None, rut=None, pin_hash=None, kleincoins=None) -> bool: """Update user information in the database""" conn = self._get_connection() cursor = conn.cursor() updates = [] params = [] if email: updates.append("email = ?") params.append(email) if name: updates.append("name = ?") params.append(name) if rut: updates.append("rut = ?") params.append(rut) if pin_hash: updates.append("pin_hash = ?") params.append(fernet.encrypt(pin_hash.encode()).decode()) if kleincoins is not None: updates.append("kleincoins = ?") params.append(fernet.encrypt(str(kleincoins).encode()).decode()) if not updates: conn.close() return False try: cursor.execute(f"UPDATE users SET {', '.join(updates)} WHERE id = ?", (*params, user_id)) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"User with ID {user_id} updated.") return success except sqlite3.IntegrityError as e: logger.error(f"Failed to update user: {e}") return False finally: conn.close() #endregion #region Delete def delete(self, user_id: int) -> bool: """Delete a user from the database""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM users WHERE id = ?", (user_id,)) conn.commit() conn.close() if cursor.rowcount > 0: logger.info(f"User with ID {user_id} deleted.") return True else: logger.error(f"Failed to delete user with ID {user_id}.") return False def update_kleincoins(self, user_id: int, kleincoins: int) -> bool: """Update user's kleincoins""" conn = self._get_connection() cursor = conn.cursor() try: cursor.execute( "UPDATE users SET kleincoins = ? WHERE id = ?", (fernet.encrypt(str(kleincoins).encode()).decode(), user_id) ) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"Kleincoins updated for user {user_id}: {kleincoins}") return success except sqlite3.IntegrityError as e: logger.error(f"Failed to update kleincoins: {e}") return False finally: conn.close() def get_kleincoins(self, user_id: int) -> Optional[int]: """Get user's kleincoins""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT kleincoins FROM users WHERE id = ?", (user_id,)) result = cursor.fetchone() conn.close() if result: try: return int(fernet.decrypt(result[0].encode()).decode()) except Exception as e: logger.error(f"Failed to decrypt kleincoins for user {user_id}: {e}") return None return None #endregion # Blacklist Data Service class BlacklistDataService(BaseDataService): """Service for managing blacklisted users""" #region Create def create(self, user_id: int) -> int: """Add a user to the blacklist""" conn = self._get_connection() cursor = conn.cursor() try: cursor.execute("INSERT INTO blacklist (user_id) VALUES (?)", (user_id,)) conn.commit() blacklist_id = cursor.lastrowid if blacklist_id: logger.info(f"User with ID {user_id} added to blacklist.") return blacklist_id else: logger.error(f"Failed to add user with ID {user_id} to blacklist.") return -1 except sqlite3.IntegrityError as e: logger.error(f"Failed to add user to blacklist: {e}") return -1 finally: conn.close() #endregion #region Read def get_all(self) -> List[Blacklist]: """Get all blacklisted users""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT b.id, b.user_id, u.email, u.name, u.rut FROM blacklist b LEFT JOIN users u ON b.user_id = u.id """) blacklisted = cursor.fetchall() conn.close() return [ Blacklist( id=row[0], user_id=row[1], email=row[2], name=row[3], rut=row[4] ) for row in blacklisted ] def get_by_id(self, id: int) -> Optional[Blacklist]: """Get blacklist entry by ID""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT b.id, b.user_id, u.email, u.name, u.rut FROM blacklist b LEFT JOIN users u ON b.user_id = u.id WHERE b.id = ? """, (id,)) row = cursor.fetchone() conn.close() if row: return Blacklist( id=row[0], user_id=row[1], email=row[2], name=row[3], rut=row[4] ) return None def get_blacklisted_user_ids(self) -> List[int]: """Get a list of blacklisted user IDs""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT user_id FROM blacklist") blacklisted_users = [row[0] for row in cursor.fetchall()] conn.close() return blacklisted_users def is_user_blacklisted(self, user_id: int) -> bool: """Check if a user is blacklisted""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM blacklist WHERE user_id = ?", (user_id,)) blacklisted = cursor.fetchone() is not None conn.close() return blacklisted #endregion #region Update def update(self, id: int, **kwargs) -> bool: """Update blacklist entry (not commonly used)""" # Blacklist entries typically don't need updates return False #endregion #region Delete def delete(self, id: int) -> bool: """Remove a blacklist entry by ID""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM blacklist WHERE id = ?", (id,)) conn.commit() success = cursor.rowcount > 0 conn.close() if success: logger.info(f"Blacklist entry with ID {id} removed.") else: logger.error(f"Failed to remove blacklist entry with ID {id}.") return success def remove_user_from_blacklist(self, user_id: int) -> bool: """Remove a user from the blacklist""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,)) conn.commit() success = cursor.rowcount > 0 conn.close() if success: logger.info(f"User with ID {user_id} removed from blacklist.") else: logger.error(f"Failed to remove user with ID {user_id} from blacklist.") return success #endregion # Product Data Service class ProductDataService(BaseDataService): """Service for managing products""" #region Create def create(self, id: int, name: str, price: float, type: Optional[str] = None, description: Optional[str] = None, image: Optional[str] = None, status: int = 1) -> int: """Add a new product to the database""" conn = self._get_connection() cursor = conn.cursor() try: cursor.execute( "INSERT INTO products (id, name, type, description, price, image, status) VALUES (?, ?, ?, ?, ?, ?, ?)", (id, name, type, description, price, image, status) ) conn.commit() product_id = cursor.lastrowid if product_id: logger.info(f"Product added with ID: {product_id}") return product_id else: logger.error("Failed to add product.") return -1 except sqlite3.Error as e: logger.error(f"Failed to add product: {e}") return -1 finally: conn.close() def load_data(self, products: List[Dict[str, str]]) -> None: """Load multiple products from a list of dictionaries""" conn = self._get_connection() cursor = conn.cursor() for product in products: try: self.create( id=int(product['id']), name=product['name'], price=int(product['price']), type=product.get('type'), description=product.get('description'), image=product.get('image'), status=int(product.get('status', 1)) # Default to active (1) ) except Exception as e: logger.error(f"Failed to load product {product['name']}: {e}") conn.close() #endregion #region Read def get_all(self) -> List[Product]: """Get all products from the database""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM products") products = cursor.fetchall() conn.close() return [ Product( id=product[0], name=product[1], type=product[2], description=product[3], price=product[4], image=product[5], status=product[6] ) for product in products ] def get_by_id(self, product_id: int) -> Optional[Product]: """Get product by ID""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM products WHERE id = ?", (product_id,)) product = cursor.fetchone() conn.close() if product: return Product( id=product[0], name=product[1], type=product[2], description=product[3], price=product[4], image=product[5], status=product[6] ) return None def get_by_type(self, product_type: str) -> List[Product]: """Get products by type""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM products WHERE type = ?", (product_type,)) products = cursor.fetchall() conn.close() return [ Product( id=product[0], name=product[1], type=product[2], description=product[3], price=product[4], image=product[5], status=product[6] ) for product in products ] def search_by_name(self, name: str) -> List[Product]: """Search products by name""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM products WHERE name LIKE ?", (f"%{name}%",)) products = cursor.fetchall() conn.close() return [ Product( id=product[0], name=product[1], type=product[2], description=product[3], price=product[4], image=product[5], status=product[6] ) for product in products ] def get_products(self, product_ids: List[int]) -> List[Product]: """Get multiple products by their IDs""" if not product_ids: return [] placeholders = ', '.join('?' for _ in product_ids) conn = self._get_connection() cursor = conn.cursor() cursor.execute(f"SELECT * FROM products WHERE id IN ({placeholders})", product_ids) products = cursor.fetchall() conn.close() return [ Product( id=product[0], name=product[1], type=product[2], description=product[3], price=product[4], image=product[5], status=product[6] ) for product in products ] #endregion #region Update def update(self, product_id: int, name=None, type=None, description=None, price=None, image=None, status=None) -> bool: """Update product information""" conn = self._get_connection() cursor = conn.cursor() updates = [] params = [] if name is not None: updates.append("name = ?") params.append(name) if type is not None: updates.append("type = ?") params.append(type) if description is not None: updates.append("description = ?") params.append(description) if price is not None: updates.append("price = ?") params.append(price) if image is not None: updates.append("image = ?") params.append(image) if status is not None: updates.append("status = ?") params.append(status) if not updates: conn.close() return False try: cursor.execute(f"UPDATE products SET {', '.join(updates)} WHERE id = ?", (*params, product_id)) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"Product with ID {product_id} updated.") return success except sqlite3.Error as e: logger.error(f"Failed to update product: {e}") return False finally: conn.close() def get_active_products(self) -> List[Product]: """Get only active products (status = 1)""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM products WHERE status = 1") products = cursor.fetchall() conn.close() return [ Product( id=product[0], name=product[1], type=product[2], description=product[3], price=product[4], image=product[5], status=product[6] ) for product in products ] def get_inactive_products(self) -> List[Product]: """Get only inactive products (status = 0)""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM products WHERE status = 0") products = cursor.fetchall() conn.close() return [ Product( id=product[0], name=product[1], type=product[2], description=product[3], price=product[4], image=product[5], status=product[6] ) for product in products ] def activate_product(self, product_id: int) -> bool: """Activate a product (set status to 1)""" return self.update(product_id, status=1) def deactivate_product(self, product_id: int) -> bool: """Deactivate a product (set status to 0)""" return self.update(product_id, status=0) def is_product_active(self, product_id: int) -> Optional[bool]: """Check if a product is active""" product = self.get_by_id(product_id) if product: return product.status == 1 return None #endregion #region Delete def delete(self, product_id: int) -> bool: """Delete a product from the database""" conn = self._get_connection() cursor = conn.cursor() cursor.execute("DELETE FROM products WHERE id = ?", (product_id,)) conn.commit() success = cursor.rowcount > 0 conn.close() if success: logger.info(f"Product with ID {product_id} deleted.") else: logger.error(f"Failed to delete product with ID {product_id}.") return success #endregion # Sales Data Service class SalesDataService(BaseDataService): """Service for managing sales""" #region C def create(self, user_id: int, fudo_id: str, total: float, table: int, product_ids: List[int], quantities: Optional[List[int]] = None) -> int: """Create a new sale with products and quantities""" conn = self._get_connection() cursor = conn.cursor() try: fecha = datetime.now().isoformat() # Insert sale cursor.execute( "INSERT INTO sales (user_id, total, fudo_id, date, 'table') VALUES (?, ?, ?, ?, ?)", (user_id, total, fudo_id, fecha, table) ) sale_id = cursor.lastrowid if sale_id and product_ids: # Insert sale-product relationships with quantities if quantities is None: quantities = [1] * len(product_ids) # Default quantity 1 for i, product_id in enumerate(product_ids): quantity = quantities[i] if i < len(quantities) else 1 cursor.execute( "INSERT INTO sale_products (sale_id, product_id, quantity) VALUES (?, ?, ?)", (sale_id, product_id, quantity) ) conn.commit() if sale_id: logger.info(f"Sale created with ID: {sale_id}, fudo_id: {fudo_id}") return sale_id else: logger.error("Failed to create sale.") return -1 except sqlite3.Error as e: logger.error(f"Failed to create sale: {e}") conn.rollback() return -1 finally: conn.close() def add_product_to_sale(self, sale_id: int, product_id: int, quantity: int = 1) -> bool: """Add a product to an existing sale with quantity""" conn = self._get_connection() cursor = conn.cursor() try: cursor.execute( "INSERT INTO sale_products (sale_id, product_id, quantity) VALUES (?, ?, ?)", (sale_id, product_id, quantity) ) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"Product {product_id} (qty: {quantity}) added to sale {sale_id}.") return success except sqlite3.IntegrityError as e: logger.error(f"Failed to add product to sale: {e}") return False finally: conn.close() def add_products_to_sale(self, sale_id: int, product_ids: List[int], quantities: Optional[List[int]] = None) -> bool: """Add multiple products to an existing sale with quantities""" conn = self._get_connection() cursor = conn.cursor() try: if quantities is None: quantities = [1] * len(product_ids) # Default quantity 1 for i, product_id in enumerate(product_ids): quantity = quantities[i] if i < len(quantities) else 1 cursor.execute( "INSERT INTO sale_products (sale_id, product_id, quantity) VALUES (?, ?, ?)", (sale_id, product_id, quantity) ) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"Products added to sale {sale_id}.") return success except sqlite3.IntegrityError as e: logger.error(f"Failed to add products to sale: {e}") return False finally: conn.close() #endregion #region R def get_all(self) -> List[Sale]: """Get all sales from the database""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table, u.name, u.email FROM sales s LEFT JOIN users u ON s.user_id = u.id ORDER BY s.date DESC """) sales = cursor.fetchall() conn.close() return [ Sale( id=sale[0], user_id=sale[1], total=sale[2], fudo_id=sale[3], date=sale[4], table=sale[5], username=sale[6], user_email=sale[7], products=self.get_sale_products(sale[0]) ) for sale in sales ] def get_by_id(self, sale_id: int) -> Optional[Sale]: """Get sale by ID""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table, u.name, u.email FROM sales s LEFT JOIN users u ON s.user_id = u.id WHERE s.id = ? """, (sale_id,)) sale = cursor.fetchone() conn.close() if sale: return Sale( id=sale[0], user_id=sale[1], total=sale[2], fudo_id=sale[3], date=sale[4], table=sale[5], username=sale[6], user_email=sale[7] ) return None def get_by_fudo_id(self, fudo_id: str) -> Optional[Sale]: """Get sale by fudo_id""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table, u.name, u.email FROM sales s LEFT JOIN users u ON s.user_id = u.id WHERE s.fudo_id = ? """, (fudo_id,)) sale = cursor.fetchone() conn.close() if sale: return Sale( id=sale[0], user_id=sale[1], total=sale[2], fudo_id=sale[3], date=sale[4], table=sale[5], username=sale[6], user_email=sale[7] ) return None def get_by_user(self, user_id: int) -> List[Sale]: """Get sales by user ID""" conn = self._get_connection() cursor = conn.cursor() cursor.execute( """ SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s."table", u.name, u.email FROM sales s LEFT JOIN users u ON s.user_id = u.id WHERE s.user_id = ? ORDER BY s.date DESC """, (user_id,) ) sales = cursor.fetchall() conn.close() return [ Sale( id=sale[0], user_id=sale[1], total=sale[2], fudo_id=sale[3], date=sale[4], table=sale[5], username=sale[6], user_email=sale[7], products=self.get_sale_products(sale[0]) ) for sale in sales ] def get_sale_products_ids(self, sale_id: int) -> List[int]: """Get product IDs for a specific sale""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT product_id FROM sale_products WHERE sale_id = ? """, (sale_id,)) products = cursor.fetchall() conn.close() return [product[0] for product in products] def get_sale_products(self, sale_id: int) -> List[Product]: """Get products for a specific sale with quantities""" conn = self._get_connection() cursor = conn.cursor() cursor.execute(""" SELECT p.id, p.name, p.type, p.description, p.price, p.image, p.status, sp.quantity FROM sale_products sp JOIN products p ON sp.product_id = p.id WHERE sp.sale_id = ? """, (sale_id,)) products = cursor.fetchall() conn.close() return [ Product( id=product[0], name=product[1], type=product[2], description=product[3], price=product[4], image=product[5], status=product[6], quantity=product[7] ) for product in products ] def get_by_table(self, table: int) -> List[Sale]: """Get sales by table number""" conn = self._get_connection() cursor = conn.cursor() cursor.execute( """ SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s."table", u.name, u.email FROM sales s LEFT JOIN users u ON s.user_id = u.id WHERE s."table" = ? ORDER BY s.date DESC """, (table,) ) sales = cursor.fetchall() conn.close() return [ Sale( id=sale[0], user_id=sale[1], total=sale[2], fudo_id=sale[3], date=sale[4], table=sale[5], username=sale[6], user_email=sale[7] ) for sale in sales ] def update_product_quantity(self, sale_id: int, product_id: int, new_quantity: int) -> bool: """Update quantity of a product in a sale""" conn = self._get_connection() cursor = conn.cursor() try: cursor.execute( "UPDATE sale_products SET quantity = ? WHERE sale_id = ? AND product_id = ?", (new_quantity, sale_id, product_id) ) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"Updated quantity to {new_quantity} for product {product_id} in sale {sale_id}.") return success except sqlite3.Error as e: logger.error(f"Failed to update product quantity: {e}") return False finally: conn.close() def get_product_quantity_in_sale(self, sale_id: int, product_id: int) -> Optional[int]: """Get quantity of a specific product in a sale""" conn = self._get_connection() cursor = conn.cursor() cursor.execute( "SELECT quantity FROM sale_products WHERE sale_id = ? AND product_id = ?", (sale_id, product_id) ) result = cursor.fetchone() conn.close() return result[0] if result else None #endregion #region U def update(self, sale_id: int, user_id=None, total=None, table=None) -> bool: """Update sale information (products should be updated via specific methods)""" conn = self._get_connection() cursor = conn.cursor() updates = [] params = [] if user_id is not None: updates.append("user_id = ?") params.append(user_id) if total is not None: updates.append("total = ?") params.append(total) if table is not None: updates.append("table = ?") params.append(table) if not updates: conn.close() return False try: cursor.execute(f"UPDATE sales SET {', '.join(updates)} WHERE id = ?", (*params, sale_id)) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"Sale with ID {sale_id} updated.") return success except sqlite3.Error as e: logger.error(f"Failed to update sale: {e}") return False finally: conn.close() #endregion #region D def delete(self, sale_id: int) -> bool: """Delete a sale and its product relationships""" conn = self._get_connection() cursor = conn.cursor() try: # Delete sale-product relationships first cursor.execute("DELETE FROM sale_products WHERE sale_id = ?", (sale_id,)) # Delete the sale cursor.execute("DELETE FROM sales WHERE id = ?", (sale_id,)) conn.commit() success = cursor.rowcount > 0 if success: logger.info(f"Sale with ID {sale_id} deleted.") else: logger.error(f"Failed to delete sale with ID {sale_id}.") return success except sqlite3.Error as e: logger.error(f"Failed to delete sale: {e}") return False finally: conn.close() def remove_product_from_sale(self, sale_id: int, product_id: int) -> bool: """Remove a product from a sale (removes all quantity)""" conn = self._get_connection() cursor = conn.cursor() cursor.execute( "DELETE FROM sale_products WHERE sale_id = ? AND product_id = ?", (sale_id, product_id) ) conn.commit() success = cursor.rowcount > 0 conn.close() if success: logger.info(f"Product {product_id} removed from sale {sale_id}.") else: logger.error(f"Failed to remove product {product_id} from sale {sale_id}.") return success def decrease_product_quantity(self, sale_id: int, product_id: int, decrease_by: int = 1) -> bool: """Decrease quantity of a product in a sale, removes if quantity becomes 0 or less""" conn = self._get_connection() cursor = conn.cursor() try: # Get current quantity cursor.execute( "SELECT quantity FROM sale_products WHERE sale_id = ? AND product_id = ?", (sale_id, product_id) ) result = cursor.fetchone() if not result: return False current_quantity = result[0] new_quantity = current_quantity - decrease_by if new_quantity <= 0: # Remove the product completely cursor.execute( "DELETE FROM sale_products WHERE sale_id = ? AND product_id = ?", (sale_id, product_id) ) logger.info(f"Product {product_id} removed from sale {sale_id} (quantity reached 0).") else: # Update with new quantity cursor.execute( "UPDATE sale_products SET quantity = ? WHERE sale_id = ? AND product_id = ?", (new_quantity, sale_id, product_id) ) logger.info(f"Product {product_id} quantity decreased to {new_quantity} in sale {sale_id}.") conn.commit() return True except sqlite3.Error as e: logger.error(f"Failed to decrease product quantity: {e}") return False finally: conn.close() #endregion # Factory class to get service instances class DataServiceFactory: """Factory class to create data service instances""" @staticmethod def get_user_service() -> UserDataService: """Get user data service instance""" return UserDataService() @staticmethod def get_blacklist_service() -> BlacklistDataService: """Get blacklist data service instance""" return BlacklistDataService() @staticmethod def get_product_service() -> ProductDataService: """Get product data service instance""" return ProductDataService() @staticmethod def get_sales_service() -> SalesDataService: """Get sales data service instance""" return SalesDataService() # Helper functions for background data def load_bg_data() -> List[Dict[str, str]]: """Load background data for AI assistant""" try: with open(BG_DATA_PATH, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: logger.error(f"Data file not found at {BG_DATA_PATH}. Serving with empty data.") return [] except json.JSONDecodeError: logger.error(f"Could not decode JSON from {BG_DATA_PATH}. Serving with empty data.") return [] def initialize_db(): if os.path.exists(DB_PATH): logger.info("Base de datos ya existe, no se necesita inicializar.") return conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Crear tabla de usuarios logger.info("Inicializando base de datos...") logger.info("Creando tabla de usuarios...") cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT UNIQUE NOT NULL, name TEXT NOT NULL, rut TEXT UNIQUE NOT NULL, pin_hash TEXT NOT NULL, kleincoins TEXT NOT NULL, created_at TEXT NOT NULL, permissions INTEGER DEFAULT 0 NOT NULL CHECK (permissions IN (0, 1, 2)), -- 0: Usuario normal, 1: Administrador, 2: Superusuario ); """) # Crear tabla de productos logger.info("Creando tabla de products...") cursor.execute(""" CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, type TEXT, description TEXT, price REAL NOT NULL, image TEXT, status INTEGER DEFAULT 1 NOT NULL CHECK (status IN (0, 1)) -- 0: Inactivo, 1: Activo ); """) # Crear tabla de ventas logger.info("Creando tabla de sales...") cursor.execute(""" CREATE TABLE IF NOT EXISTS sales ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, total REAL NOT NULL, fudo_id TEXT NOT NULL, date TEXT NOT NULL, "table" INTEGER NOT NULL, FOREIGN KEY (user_id) REFERENCES users(id) ); """) # Crear tabla intermedia para ventas y productos logger.info("Creando tabla intermedia de sale_products...") cursor.execute(""" CREATE TABLE IF NOT EXISTS sale_products ( sale_id INTEGER NOT NULL, product_id INTEGER NOT NULL, quantity INTEGER NOT NULL DEFAULT 1, FOREIGN KEY (sale_id) REFERENCES sales(id), FOREIGN KEY (product_id) REFERENCES products(id) ); """) # Crear tabla de blacklist logger.info("Creando tabla de blacklist...") cursor.execute(""" CREATE TABLE IF NOT EXISTS blacklist ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, FOREIGN KEY (user_id) REFERENCES users(id) ); """) logger.info("Todas las tablas creadas correctamente.") logger.info("Cargando datos de productos desde el archivo JSON...") products_json = json.loads(open(PRODUCT_DATA_PATH, 'r', encoding='utf-8').read()) product_service = ProductDataService() product_service.load_data(products_json) conn.commit() conn.close() logger.info("Base de datos inicializada correctamente.") data_bg_loaded = load_bg_data()