|
|
@@ -0,0 +1,1184 @@
|
|
|
+import json
|
|
|
+from math import log
|
|
|
+import os
|
|
|
+import sqlite3
|
|
|
+from typing import List, Dict, Optional, Any, Union
|
|
|
+from abc import ABC, abstractmethod
|
|
|
+from config.settings import BG_DATA_PATH, DB_PATH, PRODUCT_DATA_PATH
|
|
|
+from logging import getLogger
|
|
|
+from datetime import datetime
|
|
|
+import uuid
|
|
|
+from cryptography.fernet import Fernet
|
|
|
+from config.settings import PIN_KEY
|
|
|
+
|
|
|
+# Import models
|
|
|
+from models.user import User
|
|
|
+from models.items import Product, ProductWithQuantity
|
|
|
+from models.sells import Sale
|
|
|
+from models.blacklist import Blacklist
|
|
|
+
|
|
|
+fernet = Fernet(PIN_KEY.encode())
|
|
|
+
|
|
|
+logger = getLogger(__name__)
|
|
|
+
|
|
|
+"""
|
|
|
+ESQUEMA DE BASE DE DATOS SQLITE (data.db)
|
|
|
+
|
|
|
+1. Tabla: users
|
|
|
+-----------------------------------
|
|
|
+- id INTEGER PRIMARY KEY AUTOINCREMENT
|
|
|
+- email TEXT UNIQUE NOT NULL
|
|
|
+- nombre TEXT NOT NULL
|
|
|
+- rut TEXT UNIQUE NOT NULL
|
|
|
+- pin_hash TEXT NOT NULL (encriptado)
|
|
|
+- kleincoins TEXT NOT NULL (encriptado, valor por defecto "0")
|
|
|
+- created_at TEXT NOT NULL (fecha de creación en formato ISO 8601)
|
|
|
+(Guarda la información del usuario con su pin hasheado y kleincoins encriptadas)
|
|
|
+
|
|
|
+2. Tabla: productos
|
|
|
+-----------------------------------
|
|
|
+- id INTEGER PRIMARY KEY
|
|
|
+- name TEXT NOT NULL
|
|
|
+- type TEXT
|
|
|
+- description TEXT
|
|
|
+- price REAL NOT NULL
|
|
|
+- image TEXT (URL de la imagen)
|
|
|
+- status INTEGER DEFAULT 1 NOT NULL CHECK (status IN (0, 1)) -- 0: Inactivo, 1: Activo
|
|
|
+(Guarda los productos disponibles para venta con su estado activo/inactivo)
|
|
|
+
|
|
|
+3. Tabla: ventas
|
|
|
+-----------------------------------
|
|
|
+- id INTEGER PRIMARY KEY AUTOINCREMENT
|
|
|
+- user_id INTEGER NOT NULL (relación a users.id)
|
|
|
+- total REAL NOT NULL (precio total de la venta)
|
|
|
+- fudo_id TEXT UNIQUE NOT NULL (ID string único por venta)
|
|
|
+- fecha TEXT NOT NULL (fecha y hora en formato ISO 8601)
|
|
|
+- table INTEGER NOT NULL (número de mesa)
|
|
|
+(Guarda cada venta, asociada a un usuario y mesa)
|
|
|
+
|
|
|
+4. Tabla: venta_productos
|
|
|
+-----------------------------------
|
|
|
+- venta_id INTEGER NOT NULL (relación a ventas.id)
|
|
|
+- producto_id INTEGER NOT NULL (relación a productos.id)
|
|
|
+- cantidad INTEGER NOT NULL DEFAULT 1 (cantidad del producto)
|
|
|
+(Relación muchos a muchos entre ventas y productos con cantidad)
|
|
|
+
|
|
|
+5. Tabla: blacklist
|
|
|
+-----------------------------------
|
|
|
+- id INTEGER PRIMARY KEY AUTOINCREMENT
|
|
|
+- user_id INTEGER NOT NULL (relación a users.id)
|
|
|
+(Usuarios bloqueados o no autorizados para ciertas acciones)
|
|
|
+
|
|
|
+RELACIONES:
|
|
|
+-----------------------------------
|
|
|
+- users puede tener muchas ventas
|
|
|
+- ventas puede tener muchos productos (y viceversa), por eso se usa una tabla intermedia (venta_productos)
|
|
|
+- productos pueden repetirse en múltiples ventas
|
|
|
+"""
|
|
|
+
|
|
|
+# Base abstract class for data access
|
|
|
+class BaseDataService(ABC):
|
|
|
+ """Abstract base class for data services"""
|
|
|
+
|
|
|
+ def __init__(self, db_path: str = DB_PATH):
|
|
|
+ self.db_path = db_path
|
|
|
+
|
|
|
+ def _get_connection(self) -> sqlite3.Connection:
|
|
|
+ """Get database connection"""
|
|
|
+ return sqlite3.connect(self.db_path)
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def get_all(self) -> List[Any]:
|
|
|
+ """Get all records"""
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def get_by_id(self, id: int) -> Optional[Any]:
|
|
|
+ """Get record by ID"""
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def create(self, **kwargs) -> int:
|
|
|
+ """Create new record"""
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def update(self, id: int, **kwargs) -> bool:
|
|
|
+ """Update record"""
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def delete(self, id: int) -> bool:
|
|
|
+ """Delete record"""
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+# User Data Service
|
|
|
+class UserDataService(BaseDataService):
|
|
|
+ """Service for managing user data"""
|
|
|
+ #region Create
|
|
|
+ def create(self, nombre: str, email: str, rut: str, pin_hash: str) -> int:
|
|
|
+ """Add a new user to the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ cursor.execute(
|
|
|
+ "INSERT INTO users (nombre, email, rut, pin_hash, kleincoins, created_at) VALUES (?, ?, ?, ?, ?, ?)",
|
|
|
+ (nombre, email, rut, fernet.encrypt(pin_hash.encode()).decode(), fernet.encrypt(b"0").decode(), datetime.now().isoformat())
|
|
|
+ )
|
|
|
+ conn.commit()
|
|
|
+ user_id = cursor.lastrowid
|
|
|
+ if user_id:
|
|
|
+ logger.info(f"User added with ID: {user_id}")
|
|
|
+ return user_id
|
|
|
+ else:
|
|
|
+ logger.error("Failed to add user.")
|
|
|
+ return -1
|
|
|
+ except sqlite3.IntegrityError as e:
|
|
|
+ logger.error(f"Failed to add user: {e}")
|
|
|
+ return -1
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+ #endregion
|
|
|
+ #region Read
|
|
|
+ def get_all(self) -> List[User]:
|
|
|
+ """Get all users from the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM users")
|
|
|
+ users = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ User(
|
|
|
+ id=user[0],
|
|
|
+ email=user[1],
|
|
|
+ nombre=user[2],
|
|
|
+ rut=user[3],
|
|
|
+ pin_hash=fernet.decrypt(user[4].encode()).decode(),
|
|
|
+ kleincoins=fernet.decrypt(user[5].encode()).decode(),
|
|
|
+ created_at=user[6]
|
|
|
+ ) for user in users
|
|
|
+ ]
|
|
|
+
|
|
|
+ def get_by_id(self, user_id: int) -> Optional[User]:
|
|
|
+ """Get user data from the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
|
|
|
+ user = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ if user:
|
|
|
+ return User(
|
|
|
+ id=user[0],
|
|
|
+ email=user[1],
|
|
|
+ nombre=user[2],
|
|
|
+ rut=user[3],
|
|
|
+ pin_hash=fernet.decrypt(user[4].encode()).decode(),
|
|
|
+ kleincoins=fernet.decrypt(user[5].encode()).decode(),
|
|
|
+ created_at=user[6]
|
|
|
+ )
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_by_email(self, email: str) -> Optional[User]:
|
|
|
+ """Get user by email"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM users WHERE email = ?", (email,))
|
|
|
+ user = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ logger.debug(f"get_by_email: {email}, user: {user}")
|
|
|
+ if user:
|
|
|
+ return User(
|
|
|
+ id=user[0],
|
|
|
+ email=user[1],
|
|
|
+ nombre=user[2],
|
|
|
+ rut=user[3],
|
|
|
+ pin_hash=user[4],
|
|
|
+ kleincoins=fernet.decrypt(user[5].encode()).decode(),
|
|
|
+ created_at=user[6]
|
|
|
+ )
|
|
|
+ return None
|
|
|
+
|
|
|
+ def login(self, email: str, pin_hashed: str) -> Optional[User]:
|
|
|
+ """Login user by email and pin hash"""
|
|
|
+ user = self.get_by_email(email)
|
|
|
+ if user and fernet.decrypt(user.pin_hash.encode()).decode() == pin_hashed:
|
|
|
+ logger.info(f"User {user.email} logged in successfully.")
|
|
|
+ return user
|
|
|
+ else:
|
|
|
+ logger.error("Login failed: Invalid email or pin.")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+ def get_by_rut(self, rut: str) -> Optional[User]:
|
|
|
+ """Get user by RUT"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM users WHERE rut = ?", (rut,))
|
|
|
+ user = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ if user:
|
|
|
+ return User(
|
|
|
+ id=user[0],
|
|
|
+ email=user[1],
|
|
|
+ nombre=user[2],
|
|
|
+ rut=user[3],
|
|
|
+ pin_hash=fernet.decrypt(user[4].encode()).decode(),
|
|
|
+ kleincoins=fernet.decrypt(user[5].encode()).decode(),
|
|
|
+ created_at=user[6]
|
|
|
+ )
|
|
|
+ return None
|
|
|
+ #endregion
|
|
|
+ #region Update
|
|
|
+ def update(self, user_id: int, email=None, nombre=None, rut=None, pin_hash=None, kleincoins=None) -> bool:
|
|
|
+ """Update user information in the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ updates = []
|
|
|
+ params = []
|
|
|
+ if email:
|
|
|
+ updates.append("email = ?")
|
|
|
+ params.append(email)
|
|
|
+ if nombre:
|
|
|
+ updates.append("nombre = ?")
|
|
|
+ params.append(nombre)
|
|
|
+ if rut:
|
|
|
+ updates.append("rut = ?")
|
|
|
+ params.append(rut)
|
|
|
+ if pin_hash:
|
|
|
+ updates.append("pin_hash = ?")
|
|
|
+ params.append(fernet.encrypt(pin_hash.encode()).decode())
|
|
|
+ if kleincoins is not None:
|
|
|
+ updates.append("kleincoins = ?")
|
|
|
+ params.append(fernet.encrypt(str(kleincoins).encode()).decode())
|
|
|
+ if not updates:
|
|
|
+ conn.close()
|
|
|
+ return False
|
|
|
+ try:
|
|
|
+ cursor.execute(f"UPDATE users SET {', '.join(updates)} WHERE id = ?", (*params, user_id))
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ if success:
|
|
|
+ logger.info(f"User with ID {user_id} updated.")
|
|
|
+ return success
|
|
|
+ except sqlite3.IntegrityError as e:
|
|
|
+ logger.error(f"Failed to update user: {e}")
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+ #endregion
|
|
|
+ #region Delete
|
|
|
+ def delete(self, user_id: int) -> bool:
|
|
|
+ """Delete a user from the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
|
|
|
+ conn.commit()
|
|
|
+ conn.close()
|
|
|
+ if cursor.rowcount > 0:
|
|
|
+ logger.info(f"User with ID {user_id} deleted.")
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ logger.error(f"Failed to delete user with ID {user_id}.")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def update_kleincoins(self, user_id: int, kleincoins: int) -> bool:
|
|
|
+ """Update user's kleincoins"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ cursor.execute(
|
|
|
+ "UPDATE users SET kleincoins = ? WHERE id = ?",
|
|
|
+ (fernet.encrypt(str(kleincoins).encode()).decode(), user_id)
|
|
|
+ )
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ if success:
|
|
|
+ logger.info(f"Kleincoins updated for user {user_id}: {kleincoins}")
|
|
|
+ return success
|
|
|
+ except sqlite3.IntegrityError as e:
|
|
|
+ logger.error(f"Failed to update kleincoins: {e}")
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ def get_kleincoins(self, user_id: int) -> Optional[int]:
|
|
|
+ """Get user's kleincoins"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT kleincoins FROM users WHERE id = ?", (user_id,))
|
|
|
+ result = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ if result:
|
|
|
+ try:
|
|
|
+ return int(fernet.decrypt(result[0].encode()).decode())
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Failed to decrypt kleincoins for user {user_id}: {e}")
|
|
|
+ return None
|
|
|
+ return None
|
|
|
+ #endregion
|
|
|
+
|
|
|
+# Blacklist Data Service
|
|
|
+class BlacklistDataService(BaseDataService):
|
|
|
+ """Service for managing blacklisted users"""
|
|
|
+ #region Create
|
|
|
+ def create(self, user_id: int) -> int:
|
|
|
+ """Add a user to the blacklist"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ cursor.execute("INSERT INTO blacklist (user_id) VALUES (?)", (user_id,))
|
|
|
+ conn.commit()
|
|
|
+ blacklist_id = cursor.lastrowid
|
|
|
+ if blacklist_id:
|
|
|
+ logger.info(f"User with ID {user_id} added to blacklist.")
|
|
|
+ return blacklist_id
|
|
|
+ else:
|
|
|
+ logger.error(f"Failed to add user with ID {user_id} to blacklist.")
|
|
|
+ return -1
|
|
|
+ except sqlite3.IntegrityError as e:
|
|
|
+ logger.error(f"Failed to add user to blacklist: {e}")
|
|
|
+ return -1
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+ #endregion
|
|
|
+ #region Read
|
|
|
+ def get_all(self) -> List[Blacklist]:
|
|
|
+ """Get all blacklisted users"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("""
|
|
|
+ SELECT b.id, b.user_id, u.email, u.nombre, u.rut
|
|
|
+ FROM blacklist b
|
|
|
+ LEFT JOIN users u ON b.user_id = u.id
|
|
|
+ """)
|
|
|
+ blacklisted = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ Blacklist(
|
|
|
+ id=row[0],
|
|
|
+ user_id=row[1],
|
|
|
+ email=row[2],
|
|
|
+ nombre=row[3],
|
|
|
+ rut=row[4]
|
|
|
+ ) for row in blacklisted
|
|
|
+ ]
|
|
|
+
|
|
|
+ def get_by_id(self, id: int) -> Optional[Blacklist]:
|
|
|
+ """Get blacklist entry by ID"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("""
|
|
|
+ SELECT b.id, b.user_id, u.email, u.nombre, u.rut
|
|
|
+ FROM blacklist b
|
|
|
+ LEFT JOIN users u ON b.user_id = u.id
|
|
|
+ WHERE b.id = ?
|
|
|
+ """, (id,))
|
|
|
+ row = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ if row:
|
|
|
+ return Blacklist(
|
|
|
+ id=row[0],
|
|
|
+ user_id=row[1],
|
|
|
+ email=row[2],
|
|
|
+ nombre=row[3],
|
|
|
+ rut=row[4]
|
|
|
+ )
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_blacklisted_user_ids(self) -> List[int]:
|
|
|
+ """Get a list of blacklisted user IDs"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT user_id FROM blacklist")
|
|
|
+ blacklisted_users = [row[0] for row in cursor.fetchall()]
|
|
|
+ conn.close()
|
|
|
+ return blacklisted_users
|
|
|
+
|
|
|
+ def is_user_blacklisted(self, user_id: int) -> bool:
|
|
|
+ """Check if a user is blacklisted"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM blacklist WHERE user_id = ?", (user_id,))
|
|
|
+ blacklisted = cursor.fetchone() is not None
|
|
|
+ conn.close()
|
|
|
+ return blacklisted
|
|
|
+ #endregion
|
|
|
+ #region Update
|
|
|
+ def update(self, id: int, **kwargs) -> bool:
|
|
|
+ """Update blacklist entry (not commonly used)"""
|
|
|
+ # Blacklist entries typically don't need updates
|
|
|
+ return False
|
|
|
+ #endregion
|
|
|
+ #region Delete
|
|
|
+ def delete(self, id: int) -> bool:
|
|
|
+ """Remove a blacklist entry by ID"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("DELETE FROM blacklist WHERE id = ?", (id,))
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ conn.close()
|
|
|
+ if success:
|
|
|
+ logger.info(f"Blacklist entry with ID {id} removed.")
|
|
|
+ else:
|
|
|
+ logger.error(f"Failed to remove blacklist entry with ID {id}.")
|
|
|
+ return success
|
|
|
+
|
|
|
+ def remove_user_from_blacklist(self, user_id: int) -> bool:
|
|
|
+ """Remove a user from the blacklist"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ conn.close()
|
|
|
+ if success:
|
|
|
+ logger.info(f"User with ID {user_id} removed from blacklist.")
|
|
|
+ else:
|
|
|
+ logger.error(f"Failed to remove user with ID {user_id} from blacklist.")
|
|
|
+ return success
|
|
|
+ #endregion
|
|
|
+
|
|
|
+# Product Data Service
|
|
|
+class ProductDataService(BaseDataService):
|
|
|
+ """Service for managing products"""
|
|
|
+ #region Create
|
|
|
+ def create(self, id: int, name: str, price: float, type: Optional[str] = None, description: Optional[str] = None, image: Optional[str] = None, status: int = 1) -> int:
|
|
|
+ """Add a new product to the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ cursor.execute(
|
|
|
+ "INSERT INTO productos (id, name, type, description, price, image, status) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
|
+ (id, name, type, description, price, image, status)
|
|
|
+ )
|
|
|
+ conn.commit()
|
|
|
+ product_id = cursor.lastrowid
|
|
|
+ if product_id:
|
|
|
+ logger.info(f"Product added with ID: {product_id}")
|
|
|
+ return product_id
|
|
|
+ else:
|
|
|
+ logger.error("Failed to add product.")
|
|
|
+ return -1
|
|
|
+ except sqlite3.Error as e:
|
|
|
+ logger.error(f"Failed to add product: {e}")
|
|
|
+ return -1
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+ def load_data(self, products: List[Dict[str, str]]) -> None:
|
|
|
+ """Load multiple products from a list of dictionaries"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ for product in products:
|
|
|
+ try:
|
|
|
+ self.create(
|
|
|
+ id=int(product['id']),
|
|
|
+ name=product['name'],
|
|
|
+ price=int(product['price']),
|
|
|
+ type=product.get('type'),
|
|
|
+ description=product.get('description'),
|
|
|
+ image=product.get('image'),
|
|
|
+ status=int(product.get('status', 1)) # Default to active (1)
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Failed to load product {product['name']}: {e}")
|
|
|
+ conn.close()
|
|
|
+ #endregion
|
|
|
+ #region Read
|
|
|
+ def get_all(self) -> List[Product]:
|
|
|
+ """Get all products from the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM productos")
|
|
|
+ products = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ Product(
|
|
|
+ id=product[0],
|
|
|
+ name=product[1],
|
|
|
+ type=product[2],
|
|
|
+ description=product[3],
|
|
|
+ price=product[4],
|
|
|
+ image=product[5],
|
|
|
+ status=product[6]
|
|
|
+ ) for product in products
|
|
|
+ ]
|
|
|
+
|
|
|
+ def get_by_id(self, product_id: int) -> Optional[Product]:
|
|
|
+ """Get product by ID"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM productos WHERE id = ?", (product_id,))
|
|
|
+ product = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ if product:
|
|
|
+ return Product(
|
|
|
+ id=product[0],
|
|
|
+ name=product[1],
|
|
|
+ type=product[2],
|
|
|
+ description=product[3],
|
|
|
+ price=product[4],
|
|
|
+ image=product[5],
|
|
|
+ status=product[6]
|
|
|
+ )
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_by_type(self, product_type: str) -> List[Product]:
|
|
|
+ """Get products by type"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM productos WHERE type = ?", (product_type,))
|
|
|
+ products = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ Product(
|
|
|
+ id=product[0],
|
|
|
+ name=product[1],
|
|
|
+ type=product[2],
|
|
|
+ description=product[3],
|
|
|
+ price=product[4],
|
|
|
+ image=product[5],
|
|
|
+ status=product[6]
|
|
|
+ ) for product in products
|
|
|
+ ]
|
|
|
+
|
|
|
+ def search_by_name(self, name: str) -> List[Product]:
|
|
|
+ """Search products by name"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM productos WHERE name LIKE ?", (f"%{name}%",))
|
|
|
+ products = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ Product(
|
|
|
+ id=product[0],
|
|
|
+ name=product[1],
|
|
|
+ type=product[2],
|
|
|
+ description=product[3],
|
|
|
+ price=product[4],
|
|
|
+ image=product[5],
|
|
|
+ status=product[6]
|
|
|
+ ) for product in products
|
|
|
+ ]
|
|
|
+
|
|
|
+ def get_products(self, product_ids: List[int]) -> List[Product]:
|
|
|
+ """Get multiple products by their IDs"""
|
|
|
+ if not product_ids:
|
|
|
+ return []
|
|
|
+ placeholders = ', '.join('?' for _ in product_ids)
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute(f"SELECT * FROM productos WHERE id IN ({placeholders})", product_ids)
|
|
|
+ products = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ Product(
|
|
|
+ id=product[0],
|
|
|
+ name=product[1],
|
|
|
+ type=product[2],
|
|
|
+ description=product[3],
|
|
|
+ price=product[4],
|
|
|
+ image=product[5],
|
|
|
+ status=product[6]
|
|
|
+ ) for product in products
|
|
|
+ ]
|
|
|
+ #endregion
|
|
|
+ #region Update
|
|
|
+ def update(self, product_id: int, name=None, type=None, description=None, price=None, image=None, status=None) -> bool:
|
|
|
+ """Update product information"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ updates = []
|
|
|
+ params = []
|
|
|
+ if name is not None:
|
|
|
+ updates.append("name = ?")
|
|
|
+ params.append(name)
|
|
|
+ if type is not None:
|
|
|
+ updates.append("type = ?")
|
|
|
+ params.append(type)
|
|
|
+ if description is not None:
|
|
|
+ updates.append("description = ?")
|
|
|
+ params.append(description)
|
|
|
+ if price is not None:
|
|
|
+ updates.append("price = ?")
|
|
|
+ params.append(price)
|
|
|
+ if image is not None:
|
|
|
+ updates.append("image = ?")
|
|
|
+ params.append(image)
|
|
|
+ if status is not None:
|
|
|
+ updates.append("status = ?")
|
|
|
+ params.append(status)
|
|
|
+ if not updates:
|
|
|
+ conn.close()
|
|
|
+ return False
|
|
|
+ try:
|
|
|
+ cursor.execute(f"UPDATE productos SET {', '.join(updates)} WHERE id = ?", (*params, product_id))
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ if success:
|
|
|
+ logger.info(f"Product with ID {product_id} updated.")
|
|
|
+ return success
|
|
|
+ except sqlite3.Error as e:
|
|
|
+ logger.error(f"Failed to update product: {e}")
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ def get_active_products(self) -> List[Product]:
|
|
|
+ """Get only active products (status = 1)"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM productos WHERE status = 1")
|
|
|
+ products = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ Product(
|
|
|
+ id=product[0],
|
|
|
+ name=product[1],
|
|
|
+ type=product[2],
|
|
|
+ description=product[3],
|
|
|
+ price=product[4],
|
|
|
+ image=product[5],
|
|
|
+ status=product[6]
|
|
|
+ ) for product in products
|
|
|
+ ]
|
|
|
+
|
|
|
+ def get_inactive_products(self) -> List[Product]:
|
|
|
+ """Get only inactive products (status = 0)"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM productos WHERE status = 0")
|
|
|
+ products = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ Product(
|
|
|
+ id=product[0],
|
|
|
+ name=product[1],
|
|
|
+ type=product[2],
|
|
|
+ description=product[3],
|
|
|
+ price=product[4],
|
|
|
+ image=product[5],
|
|
|
+ status=product[6]
|
|
|
+ ) for product in products
|
|
|
+ ]
|
|
|
+
|
|
|
+ def activate_product(self, product_id: int) -> bool:
|
|
|
+ """Activate a product (set status to 1)"""
|
|
|
+ return self.update(product_id, status=1)
|
|
|
+
|
|
|
+ def deactivate_product(self, product_id: int) -> bool:
|
|
|
+ """Deactivate a product (set status to 0)"""
|
|
|
+ return self.update(product_id, status=0)
|
|
|
+
|
|
|
+ def is_product_active(self, product_id: int) -> Optional[bool]:
|
|
|
+ """Check if a product is active"""
|
|
|
+ product = self.get_by_id(product_id)
|
|
|
+ if product:
|
|
|
+ return product.status == 1
|
|
|
+ return None
|
|
|
+ #endregion
|
|
|
+ #region Delete
|
|
|
+ def delete(self, product_id: int) -> bool:
|
|
|
+ """Delete a product from the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("DELETE FROM productos WHERE id = ?", (product_id,))
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ conn.close()
|
|
|
+ if success:
|
|
|
+ logger.info(f"Product with ID {product_id} deleted.")
|
|
|
+ else:
|
|
|
+ logger.error(f"Failed to delete product with ID {product_id}.")
|
|
|
+ return success
|
|
|
+ #endregion
|
|
|
+
|
|
|
+# Sales Data Service
|
|
|
+class SalesDataService(BaseDataService):
|
|
|
+ """Service for managing sales"""
|
|
|
+ #region C
|
|
|
+ def create(self, user_id: int,fudo_id:str, total: float, table: int, product_ids: List[int], quantities: Optional[List[int]] = None) -> int:
|
|
|
+ """Create a new sale with products and quantities"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ fecha = datetime.now().isoformat()
|
|
|
+
|
|
|
+ # Insert sale
|
|
|
+ cursor.execute(
|
|
|
+ "INSERT INTO ventas (user_id, total, fudo_id, fecha, table) VALUES (?, ?, ?, ?, ?)",
|
|
|
+ (user_id, total, fudo_id, fecha, table)
|
|
|
+ )
|
|
|
+ sale_id = cursor.lastrowid
|
|
|
+
|
|
|
+ if sale_id and product_ids:
|
|
|
+ # Insert sale-product relationships with quantities
|
|
|
+ if quantities is None:
|
|
|
+ quantities = [1] * len(product_ids) # Default quantity 1
|
|
|
+
|
|
|
+ for i, product_id in enumerate(product_ids):
|
|
|
+ quantity = quantities[i] if i < len(quantities) else 1
|
|
|
+ cursor.execute(
|
|
|
+ "INSERT INTO venta_productos (venta_id, producto_id, cantidad) VALUES (?, ?, ?)",
|
|
|
+ (sale_id, product_id, quantity)
|
|
|
+ )
|
|
|
+
|
|
|
+ conn.commit()
|
|
|
+ if sale_id:
|
|
|
+ logger.info(f"Sale created with ID: {sale_id}, fudo_id: {fudo_id}")
|
|
|
+ return sale_id
|
|
|
+ else:
|
|
|
+ logger.error("Failed to create sale.")
|
|
|
+ return -1
|
|
|
+ except sqlite3.Error as e:
|
|
|
+ logger.error(f"Failed to create sale: {e}")
|
|
|
+ conn.rollback()
|
|
|
+ return -1
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+
|
|
|
+ def add_product_to_sale(self, sale_id: int, product_id: int, quantity: int = 1) -> bool:
|
|
|
+ """Add a product to an existing sale with quantity"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ cursor.execute(
|
|
|
+ "INSERT INTO venta_productos (venta_id, producto_id, cantidad) VALUES (?, ?, ?)",
|
|
|
+ (sale_id, product_id, quantity)
|
|
|
+ )
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ if success:
|
|
|
+ logger.info(f"Product {product_id} (qty: {quantity}) added to sale {sale_id}.")
|
|
|
+ return success
|
|
|
+ except sqlite3.IntegrityError as e:
|
|
|
+ logger.error(f"Failed to add product to sale: {e}")
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ #endregion
|
|
|
+ #region R
|
|
|
+ def get_all(self) -> List[Sale]:
|
|
|
+ """Get all sales from the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("""
|
|
|
+ SELECT v.id, v.user_id, v.total, v.fudo_id, v.fecha, v.table, u.nombre, u.email
|
|
|
+ FROM ventas v
|
|
|
+ LEFT JOIN users u ON v.user_id = u.id
|
|
|
+ ORDER BY v.fecha DESC
|
|
|
+ """)
|
|
|
+ sales = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ Sale(
|
|
|
+ id=sale[0],
|
|
|
+ user_id=sale[1],
|
|
|
+ total=sale[2],
|
|
|
+ fudo_id=sale[3],
|
|
|
+ fecha=sale[4],
|
|
|
+ table=sale[5],
|
|
|
+ user_name=sale[6],
|
|
|
+ user_email=sale[7]
|
|
|
+ ) for sale in sales
|
|
|
+ ]
|
|
|
+
|
|
|
+ def get_by_id(self, sale_id: int) -> Optional[Sale]:
|
|
|
+ """Get sale by ID"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("""
|
|
|
+ SELECT v.id, v.user_id, v.total, v.fudo_id, v.fecha, v.table, u.nombre, u.email
|
|
|
+ FROM ventas v
|
|
|
+ LEFT JOIN users u ON v.user_id = u.id
|
|
|
+ WHERE v.id = ?
|
|
|
+ """, (sale_id,))
|
|
|
+ sale = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ if sale:
|
|
|
+ return Sale(
|
|
|
+ id=sale[0],
|
|
|
+ user_id=sale[1],
|
|
|
+ total=sale[2],
|
|
|
+ fudo_id=sale[3],
|
|
|
+ fecha=sale[4],
|
|
|
+ table=sale[5],
|
|
|
+ user_name=sale[6],
|
|
|
+ user_email=sale[7]
|
|
|
+ )
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_by_fudo_id(self, fudo_id: str) -> Optional[Sale]:
|
|
|
+ """Get sale by fudo_id"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("""
|
|
|
+ SELECT v.id, v.user_id, v.total, v.fudo_id, v.fecha, v.table, u.nombre, u.email
|
|
|
+ FROM ventas v
|
|
|
+ LEFT JOIN users u ON v.user_id = u.id
|
|
|
+ WHERE v.fudo_id = ?
|
|
|
+ """, (fudo_id,))
|
|
|
+ sale = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ if sale:
|
|
|
+ return Sale(
|
|
|
+ id=sale[0],
|
|
|
+ user_id=sale[1],
|
|
|
+ total=sale[2],
|
|
|
+ fudo_id=sale[3],
|
|
|
+ fecha=sale[4],
|
|
|
+ table=sale[5],
|
|
|
+ user_name=sale[6],
|
|
|
+ user_email=sale[7]
|
|
|
+ )
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_by_user(self, user_id: int) -> List[Sale]:
|
|
|
+ """Get sales by user ID"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("""
|
|
|
+ SELECT v.id, v.user_id, v.total, v.fudo_id, v.fecha, v.table, u.nombre, u.email
|
|
|
+ FROM ventas v
|
|
|
+ LEFT JOIN users u ON v.user_id = u.id
|
|
|
+ WHERE v.user_id = ?
|
|
|
+ ORDER BY v.fecha DESC
|
|
|
+ """, (user_id,))
|
|
|
+ sales = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ Sale(
|
|
|
+ id=sale[0],
|
|
|
+ user_id=sale[1],
|
|
|
+ total=sale[2],
|
|
|
+ fudo_id=sale[3],
|
|
|
+ fecha=sale[4],
|
|
|
+ table=sale[5],
|
|
|
+ user_name=sale[6],
|
|
|
+ user_email=sale[7]
|
|
|
+ ) for sale in sales
|
|
|
+ ]
|
|
|
+
|
|
|
+ def get_sale_products(self, sale_id: int) -> List[ProductWithQuantity]:
|
|
|
+ """Get products for a specific sale with quantities"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("""
|
|
|
+ SELECT p.id, p.name, p.type, p.description, p.price, p.image, p.status, vp.cantidad
|
|
|
+ FROM venta_productos vp
|
|
|
+ JOIN productos p ON vp.producto_id = p.id
|
|
|
+ WHERE vp.venta_id = ?
|
|
|
+ """, (sale_id,))
|
|
|
+ products = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ ProductWithQuantity(
|
|
|
+ id=product[0],
|
|
|
+ name=product[1],
|
|
|
+ type=product[2],
|
|
|
+ description=product[3],
|
|
|
+ price=product[4],
|
|
|
+ image=product[5],
|
|
|
+ status=product[6],
|
|
|
+ cantidad=product[7]
|
|
|
+ ) for product in products
|
|
|
+ ]
|
|
|
+
|
|
|
+ def get_by_table(self, table: int) -> List[Sale]:
|
|
|
+ """Get sales by table number"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("""
|
|
|
+ SELECT v.id, v.user_id, v.total, v.fudo_id, v.fecha, v.table, u.nombre, u.email
|
|
|
+ FROM ventas v
|
|
|
+ LEFT JOIN users u ON v.user_id = u.id
|
|
|
+ WHERE v.table = ?
|
|
|
+ ORDER BY v.fecha DESC
|
|
|
+ """, (table,))
|
|
|
+ sales = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ Sale(
|
|
|
+ id=sale[0],
|
|
|
+ user_id=sale[1],
|
|
|
+ total=sale[2],
|
|
|
+ fudo_id=sale[3],
|
|
|
+ fecha=sale[4],
|
|
|
+ table=sale[5],
|
|
|
+ user_name=sale[6],
|
|
|
+ user_email=sale[7]
|
|
|
+ ) for sale in sales
|
|
|
+ ]
|
|
|
+
|
|
|
+ def update_product_quantity(self, sale_id: int, product_id: int, new_quantity: int) -> bool:
|
|
|
+ """Update quantity of a product in a sale"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ cursor.execute(
|
|
|
+ "UPDATE venta_productos SET cantidad = ? WHERE venta_id = ? AND producto_id = ?",
|
|
|
+ (new_quantity, sale_id, product_id)
|
|
|
+ )
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ if success:
|
|
|
+ logger.info(f"Updated quantity to {new_quantity} for product {product_id} in sale {sale_id}.")
|
|
|
+ return success
|
|
|
+ except sqlite3.Error as e:
|
|
|
+ logger.error(f"Failed to update product quantity: {e}")
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ def get_product_quantity_in_sale(self, sale_id: int, product_id: int) -> Optional[int]:
|
|
|
+ """Get quantity of a specific product in a sale"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute(
|
|
|
+ "SELECT cantidad FROM venta_productos WHERE venta_id = ? AND producto_id = ?",
|
|
|
+ (sale_id, product_id)
|
|
|
+ )
|
|
|
+ result = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ return result[0] if result else None
|
|
|
+ #endregion
|
|
|
+ #region U
|
|
|
+ def update(self, sale_id: int, user_id=None, total=None, table=None) -> bool:
|
|
|
+ """Update sale information (products should be updated via specific methods)"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ updates = []
|
|
|
+ params = []
|
|
|
+ if user_id is not None:
|
|
|
+ updates.append("user_id = ?")
|
|
|
+ params.append(user_id)
|
|
|
+ if total is not None:
|
|
|
+ updates.append("total = ?")
|
|
|
+ params.append(total)
|
|
|
+ if table is not None:
|
|
|
+ updates.append("table = ?")
|
|
|
+ params.append(table)
|
|
|
+ if not updates:
|
|
|
+ conn.close()
|
|
|
+ return False
|
|
|
+ try:
|
|
|
+ cursor.execute(f"UPDATE ventas SET {', '.join(updates)} WHERE id = ?", (*params, sale_id))
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ if success:
|
|
|
+ logger.info(f"Sale with ID {sale_id} updated.")
|
|
|
+ return success
|
|
|
+ except sqlite3.Error as e:
|
|
|
+ logger.error(f"Failed to update sale: {e}")
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+ #endregion
|
|
|
+ #region D
|
|
|
+ def delete(self, sale_id: int) -> bool:
|
|
|
+ """Delete a sale and its product relationships"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ # Delete sale-product relationships first
|
|
|
+ cursor.execute("DELETE FROM venta_productos WHERE venta_id = ?", (sale_id,))
|
|
|
+ # Delete the sale
|
|
|
+ cursor.execute("DELETE FROM ventas WHERE id = ?", (sale_id,))
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ if success:
|
|
|
+ logger.info(f"Sale with ID {sale_id} deleted.")
|
|
|
+ else:
|
|
|
+ logger.error(f"Failed to delete sale with ID {sale_id}.")
|
|
|
+ return success
|
|
|
+ except sqlite3.Error as e:
|
|
|
+ logger.error(f"Failed to delete sale: {e}")
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ def remove_product_from_sale(self, sale_id: int, product_id: int) -> bool:
|
|
|
+ """Remove a product from a sale (removes all quantity)"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute(
|
|
|
+ "DELETE FROM venta_productos WHERE venta_id = ? AND producto_id = ?",
|
|
|
+ (sale_id, product_id)
|
|
|
+ )
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ conn.close()
|
|
|
+ if success:
|
|
|
+ logger.info(f"Product {product_id} removed from sale {sale_id}.")
|
|
|
+ else:
|
|
|
+ logger.error(f"Failed to remove product {product_id} from sale {sale_id}.")
|
|
|
+ return success
|
|
|
+
|
|
|
+ def decrease_product_quantity(self, sale_id: int, product_id: int, decrease_by: int = 1) -> bool:
|
|
|
+ """Decrease quantity of a product in a sale, removes if quantity becomes 0 or less"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ # Get current quantity
|
|
|
+ cursor.execute(
|
|
|
+ "SELECT cantidad FROM venta_productos WHERE venta_id = ? AND producto_id = ?",
|
|
|
+ (sale_id, product_id)
|
|
|
+ )
|
|
|
+ result = cursor.fetchone()
|
|
|
+ if not result:
|
|
|
+ return False
|
|
|
+
|
|
|
+ current_quantity = result[0]
|
|
|
+ new_quantity = current_quantity - decrease_by
|
|
|
+
|
|
|
+ if new_quantity <= 0:
|
|
|
+ # Remove the product completely
|
|
|
+ cursor.execute(
|
|
|
+ "DELETE FROM venta_productos WHERE venta_id = ? AND producto_id = ?",
|
|
|
+ (sale_id, product_id)
|
|
|
+ )
|
|
|
+ logger.info(f"Product {product_id} removed from sale {sale_id} (quantity reached 0).")
|
|
|
+ else:
|
|
|
+ # Update with new quantity
|
|
|
+ cursor.execute(
|
|
|
+ "UPDATE venta_productos SET cantidad = ? WHERE venta_id = ? AND producto_id = ?",
|
|
|
+ (new_quantity, sale_id, product_id)
|
|
|
+ )
|
|
|
+ logger.info(f"Product {product_id} quantity decreased to {new_quantity} in sale {sale_id}.")
|
|
|
+
|
|
|
+ conn.commit()
|
|
|
+ return True
|
|
|
+ except sqlite3.Error as e:
|
|
|
+ logger.error(f"Failed to decrease product quantity: {e}")
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ #endregion
|
|
|
+
|
|
|
+# Factory class to get service instances
|
|
|
+class DataServiceFactory:
|
|
|
+ """Factory class to create data service instances"""
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def get_user_service() -> UserDataService:
|
|
|
+ """Get user data service instance"""
|
|
|
+ return UserDataService()
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def get_blacklist_service() -> BlacklistDataService:
|
|
|
+ """Get blacklist data service instance"""
|
|
|
+ return BlacklistDataService()
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def get_product_service() -> ProductDataService:
|
|
|
+ """Get product data service instance"""
|
|
|
+ return ProductDataService()
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def get_sales_service() -> SalesDataService:
|
|
|
+ """Get sales data service instance"""
|
|
|
+ return SalesDataService()
|
|
|
+
|
|
|
+
|
|
|
+# Helper functions for background data
|
|
|
+def load_bg_data() -> List[Dict[str, str]]:
|
|
|
+ """Load background data for AI assistant"""
|
|
|
+ try:
|
|
|
+ with open(BG_DATA_PATH, 'r', encoding='utf-8') as f:
|
|
|
+ return json.load(f)
|
|
|
+ except FileNotFoundError:
|
|
|
+ logger.error(f"Data file not found at {BG_DATA_PATH}. Serving with empty data.")
|
|
|
+ return []
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ logger.error(f"Could not decode JSON from {BG_DATA_PATH}. Serving with empty data.")
|
|
|
+ return []
|
|
|
+
|
|
|
+def initialize_db():
|
|
|
+
|
|
|
+ if os.path.exists(DB_PATH):
|
|
|
+ logger.info("Base de datos ya existe, no se necesita inicializar.")
|
|
|
+ return
|
|
|
+
|
|
|
+ conn = sqlite3.connect(DB_PATH)
|
|
|
+ cursor = conn.cursor()
|
|
|
+ # Crear tabla de usuarios
|
|
|
+ logger.info("Inicializando base de datos...")
|
|
|
+ logger.info("Creando tabla de usuarios...")
|
|
|
+ cursor.execute("""
|
|
|
+ CREATE TABLE IF NOT EXISTS users (
|
|
|
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
+ email TEXT UNIQUE NOT NULL,
|
|
|
+ nombre TEXT NOT NULL,
|
|
|
+ rut TEXT UNIQUE NOT NULL,
|
|
|
+ pin_hash TEXT NOT NULL,
|
|
|
+ kleincoins TEXT NOT NULL,
|
|
|
+ created_at TEXT NOT NULL
|
|
|
+ );
|
|
|
+ """)
|
|
|
+
|
|
|
+ # Crear tabla de productos
|
|
|
+ logger.info("Creando tabla de productos...")
|
|
|
+ cursor.execute("""
|
|
|
+ CREATE TABLE IF NOT EXISTS productos (
|
|
|
+ id INTEGER PRIMARY KEY,
|
|
|
+ name TEXT NOT NULL,
|
|
|
+ type TEXT,
|
|
|
+ description TEXT,
|
|
|
+ price REAL NOT NULL,
|
|
|
+ image TEXT,
|
|
|
+ status INTEGER DEFAULT 1 NOT NULL CHECK (status IN (0, 1)) -- 0: Inactivo, 1: Activo
|
|
|
+ );
|
|
|
+ """)
|
|
|
+
|
|
|
+ # Crear tabla de ventas
|
|
|
+ logger.info("Creando tabla de ventas...")
|
|
|
+ cursor.execute("""
|
|
|
+ CREATE TABLE IF NOT EXISTS ventas (
|
|
|
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
+ user_id INTEGER NOT NULL,
|
|
|
+ total REAL NOT NULL,
|
|
|
+ fudo_id TEXT UNIQUE NOT NULL,
|
|
|
+ fecha TEXT NOT NULL,
|
|
|
+ "table" INTEGER NOT NULL,
|
|
|
+ FOREIGN KEY (user_id) REFERENCES users(id)
|
|
|
+ );
|
|
|
+ """)
|
|
|
+
|
|
|
+ # Crear tabla intermedia para ventas y productos
|
|
|
+ logger.info("Creando tabla intermedia de venta_productos...")
|
|
|
+ cursor.execute("""
|
|
|
+ CREATE TABLE IF NOT EXISTS venta_productos (
|
|
|
+ venta_id INTEGER NOT NULL,
|
|
|
+ producto_id INTEGER NOT NULL,
|
|
|
+ cantidad INTEGER NOT NULL DEFAULT 1,
|
|
|
+ FOREIGN KEY (venta_id) REFERENCES ventas(id),
|
|
|
+ FOREIGN KEY (producto_id) REFERENCES productos(id)
|
|
|
+ );
|
|
|
+ """)
|
|
|
+
|
|
|
+ # Crear tabla de blacklist
|
|
|
+ logger.info("Creando tabla de blacklist...")
|
|
|
+ cursor.execute("""
|
|
|
+ CREATE TABLE IF NOT EXISTS blacklist (
|
|
|
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
|
+ user_id INTEGER NOT NULL,
|
|
|
+ FOREIGN KEY (user_id) REFERENCES users(id)
|
|
|
+ );
|
|
|
+ """)
|
|
|
+
|
|
|
+ logger.info("Todas las tablas creadas correctamente.")
|
|
|
+ logger.info("Cargando datos de productos desde el archivo JSON...")
|
|
|
+ products_json = json.loads(open(PRODUCT_DATA_PATH, 'r', encoding='utf-8').read())
|
|
|
+ product_service = ProductDataService()
|
|
|
+ product_service.load_data(products_json)
|
|
|
+ conn.commit()
|
|
|
+
|
|
|
+
|
|
|
+ conn.close()
|
|
|
+ logger.info("Base de datos inicializada correctamente.")
|
|
|
+
|
|
|
+data_bg_loaded = load_bg_data()
|