|
|
@@ -1,12 +1,806 @@
|
|
|
import json
|
|
|
-import os
|
|
|
-from typing import List, Dict
|
|
|
-from venv import logger
|
|
|
-from config.settings import BG_DATA_PATH, PRODUCTS_PATH, EXCLUDED_BEER_IDS
|
|
|
+import sqlite3
|
|
|
+from typing import List, Dict, Optional
|
|
|
+from abc import ABC, abstractmethod
|
|
|
+from config.settings import BG_DATA_PATH, DB_PATH
|
|
|
from logging import getLogger
|
|
|
+from datetime import datetime
|
|
|
+import uuid
|
|
|
|
|
|
logger = getLogger(__name__)
|
|
|
|
|
|
+"""
|
|
|
+ESQUEMA DE BASE DE DATOS SQLITE (data.db)
|
|
|
+
|
|
|
+1. Tabla: users
|
|
|
+-----------------------------------
|
|
|
+- id INTEGER PRIMARY KEY AUTOINCREMENT
|
|
|
+- correo TEXT UNIQUE NOT NULL
|
|
|
+- nombre TEXT NOT NULL
|
|
|
+- rut TEXT UNIQUE NOT NULL
|
|
|
+- pin_hash TEXT NOT NULL
|
|
|
+(Guarda la información del usuario con su pin hasheado)
|
|
|
+
|
|
|
+2. Tabla: productos
|
|
|
+-----------------------------------
|
|
|
+- id INTEGER PRIMARY KEY AUTOINCREMENT
|
|
|
+- name TEXT NOT NULL
|
|
|
+- type TEXT
|
|
|
+- description TEXT
|
|
|
+- price REAL NOT NULL
|
|
|
+- image TEXT (URL de la imagen)
|
|
|
+(Guarda los productos disponibles para venta)
|
|
|
+
|
|
|
+3. Tabla: ventas
|
|
|
+-----------------------------------
|
|
|
+- id INTEGER PRIMARY KEY AUTOINCREMENT
|
|
|
+- user_id INTEGER NOT NULL (relación a users.id)
|
|
|
+- total REAL NOT NULL (precio total de la venta)
|
|
|
+- venta_uuid TEXT UNIQUE NOT NULL (ID string único por venta)
|
|
|
+- fecha TEXT NOT NULL (fecha y hora en formato ISO 8601)
|
|
|
+(Guarda cada venta, asociada a un usuario)
|
|
|
+
|
|
|
+4. Tabla: venta_productos
|
|
|
+-----------------------------------
|
|
|
+- venta_id INTEGER NOT NULL (relación a ventas.id)
|
|
|
+- producto_id INTEGER NOT NULL (relación a productos.id)
|
|
|
+(Relación muchos a muchos entre ventas y productos)
|
|
|
+
|
|
|
+5. Tabla: blacklist
|
|
|
+-----------------------------------
|
|
|
+- id INTEGER PRIMARY KEY AUTOINCREMENT
|
|
|
+- user_id INTEGER NOT NULL (relación a users.id)
|
|
|
+(Usuarios bloqueados o no autorizados para ciertas acciones)
|
|
|
+
|
|
|
+RELACIONES:
|
|
|
+-----------------------------------
|
|
|
+- users puede tener muchas ventas
|
|
|
+- ventas puede tener muchos productos (y viceversa), por eso se usa una tabla intermedia (venta_productos)
|
|
|
+- productos pueden repetirse en múltiples ventas
|
|
|
+"""
|
|
|
+
|
|
|
+# Base abstract class for data access
|
|
|
+class BaseDataService(ABC):
|
|
|
+ """Abstract base class for data services"""
|
|
|
+
|
|
|
+ def __init__(self, db_path: str = DB_PATH):
|
|
|
+ self.db_path = db_path
|
|
|
+
|
|
|
+ def _get_connection(self) -> sqlite3.Connection:
|
|
|
+ """Get database connection"""
|
|
|
+ return sqlite3.connect(self.db_path)
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def get_all(self) -> List[Dict]:
|
|
|
+ """Get all records"""
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def get_by_id(self, id: int) -> Optional[Dict]:
|
|
|
+ """Get record by ID"""
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def create(self, **kwargs) -> int:
|
|
|
+ """Create new record"""
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def update(self, id: int, **kwargs) -> bool:
|
|
|
+ """Update record"""
|
|
|
+ pass
|
|
|
+
|
|
|
+ @abstractmethod
|
|
|
+ def delete(self, id: int) -> bool:
|
|
|
+ """Delete record"""
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+# User Data Service
|
|
|
+class UserDataService(BaseDataService):
|
|
|
+ """Service for managing user data"""
|
|
|
+ #region Create
|
|
|
+ def create(self, correo: str, nombre: str, rut: str, pin_hash: str) -> int:
|
|
|
+ """Add a new user to the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ cursor.execute(
|
|
|
+ "INSERT INTO users (correo, nombre, rut, pin_hash) VALUES (?, ?, ?, ?)",
|
|
|
+ (correo, nombre, rut, pin_hash)
|
|
|
+ )
|
|
|
+ conn.commit()
|
|
|
+ user_id = cursor.lastrowid
|
|
|
+ if user_id:
|
|
|
+ logger.info(f"User added with ID: {user_id}")
|
|
|
+ return user_id
|
|
|
+ else:
|
|
|
+ logger.error("Failed to add user.")
|
|
|
+ return -1
|
|
|
+ except sqlite3.IntegrityError as e:
|
|
|
+ logger.error(f"Failed to add user: {e}")
|
|
|
+ return -1
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+ #endregion
|
|
|
+ #region Read
|
|
|
+ def get_all(self) -> List[Dict[str, str]]:
|
|
|
+ """Get all users from the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM users")
|
|
|
+ users = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ {
|
|
|
+ "id": user[0],
|
|
|
+ "correo": user[1],
|
|
|
+ "nombre": user[2],
|
|
|
+ "rut": user[3],
|
|
|
+ "pin_hash": user[4]
|
|
|
+ } for user in users
|
|
|
+ ]
|
|
|
+
|
|
|
+ def get_by_id(self, user_id: int) -> Optional[Dict[str, str]]:
|
|
|
+ """Get user data from the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
|
|
|
+ user = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ if user:
|
|
|
+ return {
|
|
|
+ "id": user[0],
|
|
|
+ "correo": user[1],
|
|
|
+ "nombre": user[2],
|
|
|
+ "rut": user[3],
|
|
|
+ "pin_hash": user[4]
|
|
|
+ }
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_by_email(self, correo: str) -> Optional[Dict[str, str]]:
|
|
|
+ """Get user by email"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM users WHERE correo = ?", (correo,))
|
|
|
+ user = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ if user:
|
|
|
+ return {
|
|
|
+ "id": user[0],
|
|
|
+ "correo": user[1],
|
|
|
+ "nombre": user[2],
|
|
|
+ "rut": user[3],
|
|
|
+ "pin_hash": user[4]
|
|
|
+ }
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_by_rut(self, rut: str) -> Optional[Dict[str, str]]:
|
|
|
+ """Get user by RUT"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM users WHERE rut = ?", (rut,))
|
|
|
+ user = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ if user:
|
|
|
+ return {
|
|
|
+ "id": user[0],
|
|
|
+ "correo": user[1],
|
|
|
+ "nombre": user[2],
|
|
|
+ "rut": user[3],
|
|
|
+ "pin_hash": user[4]
|
|
|
+ }
|
|
|
+ return None
|
|
|
+ #endregion
|
|
|
+ #region Update
|
|
|
+ def update(self, user_id: int, correo=None, nombre=None, rut=None, pin_hash=None) -> bool:
|
|
|
+ """Update user information in the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ updates = []
|
|
|
+ params = []
|
|
|
+ if correo:
|
|
|
+ updates.append("correo = ?")
|
|
|
+ params.append(correo)
|
|
|
+ if nombre:
|
|
|
+ updates.append("nombre = ?")
|
|
|
+ params.append(nombre)
|
|
|
+ if rut:
|
|
|
+ updates.append("rut = ?")
|
|
|
+ params.append(rut)
|
|
|
+ if pin_hash:
|
|
|
+ updates.append("pin_hash = ?")
|
|
|
+ params.append(pin_hash)
|
|
|
+ if not updates:
|
|
|
+ conn.close()
|
|
|
+ return False
|
|
|
+ try:
|
|
|
+ cursor.execute(f"UPDATE users SET {', '.join(updates)} WHERE id = ?", (*params, user_id))
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ if success:
|
|
|
+ logger.info(f"User with ID {user_id} updated.")
|
|
|
+ return success
|
|
|
+ except sqlite3.IntegrityError as e:
|
|
|
+ logger.error(f"Failed to update user: {e}")
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+ #endregion
|
|
|
+ #region Delete
|
|
|
+ def delete(self, user_id: int) -> bool:
|
|
|
+ """Delete a user from the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
|
|
|
+ conn.commit()
|
|
|
+ conn.close()
|
|
|
+ if cursor.rowcount > 0:
|
|
|
+ logger.info(f"User with ID {user_id} deleted.")
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ logger.error(f"Failed to delete user with ID {user_id}.")
|
|
|
+ return False
|
|
|
+ #endregion
|
|
|
+
|
|
|
+# Blacklist Data Service
|
|
|
+class BlacklistDataService(BaseDataService):
|
|
|
+ """Service for managing blacklisted users"""
|
|
|
+ #region Create
|
|
|
+ def create(self, user_id: int) -> int:
|
|
|
+ """Add a user to the blacklist"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ cursor.execute("INSERT INTO blacklist (user_id) VALUES (?)", (user_id,))
|
|
|
+ conn.commit()
|
|
|
+ blacklist_id = cursor.lastrowid
|
|
|
+ if blacklist_id:
|
|
|
+ logger.info(f"User with ID {user_id} added to blacklist.")
|
|
|
+ return blacklist_id
|
|
|
+ else:
|
|
|
+ logger.error(f"Failed to add user with ID {user_id} to blacklist.")
|
|
|
+ return -1
|
|
|
+ except sqlite3.IntegrityError as e:
|
|
|
+ logger.error(f"Failed to add user to blacklist: {e}")
|
|
|
+ return -1
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+ #endregion
|
|
|
+ #region Read
|
|
|
+ def get_all(self) -> List[Dict]:
|
|
|
+ """Get all blacklisted users"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("""
|
|
|
+ SELECT b.id, b.user_id, u.correo, u.nombre, u.rut
|
|
|
+ FROM blacklist b
|
|
|
+ LEFT JOIN users u ON b.user_id = u.id
|
|
|
+ """)
|
|
|
+ blacklisted = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ {
|
|
|
+ "id": row[0],
|
|
|
+ "user_id": row[1],
|
|
|
+ "correo": row[2],
|
|
|
+ "nombre": row[3],
|
|
|
+ "rut": row[4]
|
|
|
+ } for row in blacklisted
|
|
|
+ ]
|
|
|
+
|
|
|
+ def get_by_id(self, id: int) -> Optional[Dict]:
|
|
|
+ """Get blacklist entry by ID"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("""
|
|
|
+ SELECT b.id, b.user_id, u.correo, u.nombre, u.rut
|
|
|
+ FROM blacklist b
|
|
|
+ LEFT JOIN users u ON b.user_id = u.id
|
|
|
+ WHERE b.id = ?
|
|
|
+ """, (id,))
|
|
|
+ row = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ if row:
|
|
|
+ return {
|
|
|
+ "id": row[0],
|
|
|
+ "user_id": row[1],
|
|
|
+ "correo": row[2],
|
|
|
+ "nombre": row[3],
|
|
|
+ "rut": row[4]
|
|
|
+ }
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_blacklisted_user_ids(self) -> List[int]:
|
|
|
+ """Get a list of blacklisted user IDs"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT user_id FROM blacklist")
|
|
|
+ blacklisted_users = [row[0] for row in cursor.fetchall()]
|
|
|
+ conn.close()
|
|
|
+ return blacklisted_users
|
|
|
+
|
|
|
+ def is_user_blacklisted(self, user_id: int) -> bool:
|
|
|
+ """Check if a user is blacklisted"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM blacklist WHERE user_id = ?", (user_id,))
|
|
|
+ blacklisted = cursor.fetchone() is not None
|
|
|
+ conn.close()
|
|
|
+ return blacklisted
|
|
|
+ #endregion
|
|
|
+ #region Update
|
|
|
+ def update(self, id: int, **kwargs) -> bool:
|
|
|
+ """Update blacklist entry (not commonly used)"""
|
|
|
+ # Blacklist entries typically don't need updates
|
|
|
+ return False
|
|
|
+ #endregion
|
|
|
+ #region Delete
|
|
|
+ def delete(self, id: int) -> bool:
|
|
|
+ """Remove a blacklist entry by ID"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("DELETE FROM blacklist WHERE id = ?", (id,))
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ conn.close()
|
|
|
+ if success:
|
|
|
+ logger.info(f"Blacklist entry with ID {id} removed.")
|
|
|
+ else:
|
|
|
+ logger.error(f"Failed to remove blacklist entry with ID {id}.")
|
|
|
+ return success
|
|
|
+
|
|
|
+ def remove_user_from_blacklist(self, user_id: int) -> bool:
|
|
|
+ """Remove a user from the blacklist"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ conn.close()
|
|
|
+ if success:
|
|
|
+ logger.info(f"User with ID {user_id} removed from blacklist.")
|
|
|
+ else:
|
|
|
+ logger.error(f"Failed to remove user with ID {user_id} from blacklist.")
|
|
|
+ return success
|
|
|
+ #endregion
|
|
|
+
|
|
|
+# Product Data Service
|
|
|
+class ProductDataService(BaseDataService):
|
|
|
+ """Service for managing products"""
|
|
|
+ #region Create
|
|
|
+ def create(self, name: str, price: float, type: Optional[str] = None, description: Optional[str] = None, image: Optional[str] = None) -> int:
|
|
|
+ """Add a new product to the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ cursor.execute(
|
|
|
+ "INSERT INTO productos (name, type, description, price, image) VALUES (?, ?, ?, ?, ?)",
|
|
|
+ (name, type, description, price, image)
|
|
|
+ )
|
|
|
+ conn.commit()
|
|
|
+ product_id = cursor.lastrowid
|
|
|
+ if product_id:
|
|
|
+ logger.info(f"Product added with ID: {product_id}")
|
|
|
+ return product_id
|
|
|
+ else:
|
|
|
+ logger.error("Failed to add product.")
|
|
|
+ return -1
|
|
|
+ except sqlite3.Error as e:
|
|
|
+ logger.error(f"Failed to add product: {e}")
|
|
|
+ return -1
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+ #endregion
|
|
|
+ #region Read
|
|
|
+ def get_all(self) -> List[Dict[str, str]]:
|
|
|
+ """Get all products from the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM productos")
|
|
|
+ products = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ {
|
|
|
+ "id": product[0],
|
|
|
+ "name": product[1],
|
|
|
+ "type": product[2],
|
|
|
+ "description": product[3],
|
|
|
+ "price": product[4],
|
|
|
+ "image": product[5]
|
|
|
+ } for product in products
|
|
|
+ ]
|
|
|
+
|
|
|
+ def get_by_id(self, product_id: int) -> Optional[Dict]:
|
|
|
+ """Get product by ID"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM productos WHERE id = ?", (product_id,))
|
|
|
+ product = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ if product:
|
|
|
+ return {
|
|
|
+ "id": product[0],
|
|
|
+ "name": product[1],
|
|
|
+ "type": product[2],
|
|
|
+ "description": product[3],
|
|
|
+ "price": product[4],
|
|
|
+ "image": product[5]
|
|
|
+ }
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_by_type(self, product_type: str) -> List[Dict]:
|
|
|
+ """Get products by type"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM productos WHERE type = ?", (product_type,))
|
|
|
+ products = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ {
|
|
|
+ "id": product[0],
|
|
|
+ "name": product[1],
|
|
|
+ "type": product[2],
|
|
|
+ "description": product[3],
|
|
|
+ "price": product[4],
|
|
|
+ "image": product[5]
|
|
|
+ } for product in products
|
|
|
+ ]
|
|
|
+
|
|
|
+ def search_by_name(self, name: str) -> List[Dict]:
|
|
|
+ """Search products by name"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("SELECT * FROM productos WHERE name LIKE ?", (f"%{name}%",))
|
|
|
+ products = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ {
|
|
|
+ "id": product[0],
|
|
|
+ "name": product[1],
|
|
|
+ "type": product[2],
|
|
|
+ "description": product[3],
|
|
|
+ "price": product[4],
|
|
|
+ "image": product[5]
|
|
|
+ } for product in products
|
|
|
+ ]
|
|
|
+ #endregion
|
|
|
+ #region Update
|
|
|
+ def update(self, product_id: int, name=None, type=None, description=None, price=None, image=None) -> bool:
|
|
|
+ """Update product information"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ updates = []
|
|
|
+ params = []
|
|
|
+ if name is not None:
|
|
|
+ updates.append("name = ?")
|
|
|
+ params.append(name)
|
|
|
+ if type is not None:
|
|
|
+ updates.append("type = ?")
|
|
|
+ params.append(type)
|
|
|
+ if description is not None:
|
|
|
+ updates.append("description = ?")
|
|
|
+ params.append(description)
|
|
|
+ if price is not None:
|
|
|
+ updates.append("price = ?")
|
|
|
+ params.append(price)
|
|
|
+ if image is not None:
|
|
|
+ updates.append("image = ?")
|
|
|
+ params.append(image)
|
|
|
+ if not updates:
|
|
|
+ conn.close()
|
|
|
+ return False
|
|
|
+ try:
|
|
|
+ cursor.execute(f"UPDATE productos SET {', '.join(updates)} WHERE id = ?", (*params, product_id))
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ if success:
|
|
|
+ logger.info(f"Product with ID {product_id} updated.")
|
|
|
+ return success
|
|
|
+ except sqlite3.Error as e:
|
|
|
+ logger.error(f"Failed to update product: {e}")
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+ #endregion
|
|
|
+ #region Delete
|
|
|
+ def delete(self, product_id: int) -> bool:
|
|
|
+ """Delete a product from the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("DELETE FROM productos WHERE id = ?", (product_id,))
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ conn.close()
|
|
|
+ if success:
|
|
|
+ logger.info(f"Product with ID {product_id} deleted.")
|
|
|
+ else:
|
|
|
+ logger.error(f"Failed to delete product with ID {product_id}.")
|
|
|
+ return success
|
|
|
+ #endregion
|
|
|
+
|
|
|
+# Sales Data Service
|
|
|
+class SalesDataService(BaseDataService):
|
|
|
+ """Service for managing sales"""
|
|
|
+ #region C
|
|
|
+ def create(self, user_id: int, total: float, product_ids: List[int]) -> int:
|
|
|
+ """Create a new sale with products"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ # Generate unique UUID for the sale
|
|
|
+ venta_uuid = str(uuid.uuid4())
|
|
|
+ fecha = datetime.now().isoformat()
|
|
|
+
|
|
|
+ # Insert sale
|
|
|
+ cursor.execute(
|
|
|
+ "INSERT INTO ventas (user_id, total, venta_uuid, fecha) VALUES (?, ?, ?, ?)",
|
|
|
+ (user_id, total, venta_uuid, fecha)
|
|
|
+ )
|
|
|
+ sale_id = cursor.lastrowid
|
|
|
+
|
|
|
+ if sale_id and product_ids:
|
|
|
+ # Insert sale-product relationships
|
|
|
+ for product_id in product_ids:
|
|
|
+ cursor.execute(
|
|
|
+ "INSERT INTO venta_productos (venta_id, producto_id) VALUES (?, ?)",
|
|
|
+ (sale_id, product_id)
|
|
|
+ )
|
|
|
+
|
|
|
+ conn.commit()
|
|
|
+ if sale_id:
|
|
|
+ logger.info(f"Sale created with ID: {sale_id}, UUID: {venta_uuid}")
|
|
|
+ return sale_id
|
|
|
+ else:
|
|
|
+ logger.error("Failed to create sale.")
|
|
|
+ return -1
|
|
|
+ except sqlite3.Error as e:
|
|
|
+ logger.error(f"Failed to create sale: {e}")
|
|
|
+ conn.rollback()
|
|
|
+ return -1
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+
|
|
|
+ def add_product_to_sale(self, sale_id: int, product_id: int) -> bool:
|
|
|
+ """Add a product to an existing sale"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ cursor.execute(
|
|
|
+ "INSERT INTO venta_productos (venta_id, producto_id) VALUES (?, ?)",
|
|
|
+ (sale_id, product_id)
|
|
|
+ )
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ if success:
|
|
|
+ logger.info(f"Product {product_id} added to sale {sale_id}.")
|
|
|
+ return success
|
|
|
+ except sqlite3.IntegrityError as e:
|
|
|
+ logger.error(f"Failed to add product to sale: {e}")
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ #endregion
|
|
|
+ #region R
|
|
|
+ def get_all(self) -> List[Dict]:
|
|
|
+ """Get all sales from the database"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("""
|
|
|
+ SELECT v.id, v.user_id, v.total, v.venta_uuid, v.fecha, u.nombre, u.correo
|
|
|
+ FROM ventas v
|
|
|
+ LEFT JOIN users u ON v.user_id = u.id
|
|
|
+ ORDER BY v.fecha DESC
|
|
|
+ """)
|
|
|
+ sales = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ {
|
|
|
+ "id": sale[0],
|
|
|
+ "user_id": sale[1],
|
|
|
+ "total": sale[2],
|
|
|
+ "venta_uuid": sale[3],
|
|
|
+ "fecha": sale[4],
|
|
|
+ "user_name": sale[5],
|
|
|
+ "user_email": sale[6]
|
|
|
+ } for sale in sales
|
|
|
+ ]
|
|
|
+
|
|
|
+ def get_by_id(self, sale_id: int) -> Optional[Dict]:
|
|
|
+ """Get sale by ID"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("""
|
|
|
+ SELECT v.id, v.user_id, v.total, v.venta_uuid, v.fecha, u.nombre, u.correo
|
|
|
+ FROM ventas v
|
|
|
+ LEFT JOIN users u ON v.user_id = u.id
|
|
|
+ WHERE v.id = ?
|
|
|
+ """, (sale_id,))
|
|
|
+ sale = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ if sale:
|
|
|
+ return {
|
|
|
+ "id": sale[0],
|
|
|
+ "user_id": sale[1],
|
|
|
+ "total": sale[2],
|
|
|
+ "venta_uuid": sale[3],
|
|
|
+ "fecha": sale[4],
|
|
|
+ "user_name": sale[5],
|
|
|
+ "user_email": sale[6]
|
|
|
+ }
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_by_uuid(self, venta_uuid: str) -> Optional[Dict]:
|
|
|
+ """Get sale by UUID"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("""
|
|
|
+ SELECT v.id, v.user_id, v.total, v.venta_uuid, v.fecha, u.nombre, u.correo
|
|
|
+ FROM ventas v
|
|
|
+ LEFT JOIN users u ON v.user_id = u.id
|
|
|
+ WHERE v.venta_uuid = ?
|
|
|
+ """, (venta_uuid,))
|
|
|
+ sale = cursor.fetchone()
|
|
|
+ conn.close()
|
|
|
+ if sale:
|
|
|
+ return {
|
|
|
+ "id": sale[0],
|
|
|
+ "user_id": sale[1],
|
|
|
+ "total": sale[2],
|
|
|
+ "venta_uuid": sale[3],
|
|
|
+ "fecha": sale[4],
|
|
|
+ "user_name": sale[5],
|
|
|
+ "user_email": sale[6]
|
|
|
+ }
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_by_user(self, user_id: int) -> List[Dict]:
|
|
|
+ """Get sales by user ID"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("""
|
|
|
+ SELECT v.id, v.user_id, v.total, v.venta_uuid, v.fecha, u.nombre, u.correo
|
|
|
+ FROM ventas v
|
|
|
+ LEFT JOIN users u ON v.user_id = u.id
|
|
|
+ WHERE v.user_id = ?
|
|
|
+ ORDER BY v.fecha DESC
|
|
|
+ """, (user_id,))
|
|
|
+ sales = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ {
|
|
|
+ "id": sale[0],
|
|
|
+ "user_id": sale[1],
|
|
|
+ "total": sale[2],
|
|
|
+ "venta_uuid": sale[3],
|
|
|
+ "fecha": sale[4],
|
|
|
+ "user_name": sale[5],
|
|
|
+ "user_email": sale[6]
|
|
|
+ } for sale in sales
|
|
|
+ ]
|
|
|
+
|
|
|
+ def get_sale_products(self, sale_id: int) -> List[Dict]:
|
|
|
+ """Get products for a specific sale"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute("""
|
|
|
+ SELECT p.id, p.name, p.type, p.description, p.price, p.image
|
|
|
+ FROM venta_productos vp
|
|
|
+ JOIN productos p ON vp.producto_id = p.id
|
|
|
+ WHERE vp.venta_id = ?
|
|
|
+ """, (sale_id,))
|
|
|
+ products = cursor.fetchall()
|
|
|
+ conn.close()
|
|
|
+ return [
|
|
|
+ {
|
|
|
+ "id": product[0],
|
|
|
+ "name": product[1],
|
|
|
+ "type": product[2],
|
|
|
+ "description": product[3],
|
|
|
+ "price": product[4],
|
|
|
+ "image": product[5]
|
|
|
+ } for product in products
|
|
|
+ ]
|
|
|
+ #endregion
|
|
|
+ #region U
|
|
|
+ def update(self, sale_id: int, user_id=None, total=None) -> bool:
|
|
|
+ """Update sale information (products cannot be updated directly)"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ updates = []
|
|
|
+ params = []
|
|
|
+ if user_id is not None:
|
|
|
+ updates.append("user_id = ?")
|
|
|
+ params.append(user_id)
|
|
|
+ if total is not None:
|
|
|
+ updates.append("total = ?")
|
|
|
+ params.append(total)
|
|
|
+ if not updates:
|
|
|
+ conn.close()
|
|
|
+ return False
|
|
|
+ try:
|
|
|
+ cursor.execute(f"UPDATE ventas SET {', '.join(updates)} WHERE id = ?", (*params, sale_id))
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ if success:
|
|
|
+ logger.info(f"Sale with ID {sale_id} updated.")
|
|
|
+ return success
|
|
|
+ except sqlite3.Error as e:
|
|
|
+ logger.error(f"Failed to update sale: {e}")
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+ #endregion
|
|
|
+ #region D
|
|
|
+ def delete(self, sale_id: int) -> bool:
|
|
|
+ """Delete a sale and its product relationships"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ # Delete sale-product relationships first
|
|
|
+ cursor.execute("DELETE FROM venta_productos WHERE venta_id = ?", (sale_id,))
|
|
|
+ # Delete the sale
|
|
|
+ cursor.execute("DELETE FROM ventas WHERE id = ?", (sale_id,))
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ if success:
|
|
|
+ logger.info(f"Sale with ID {sale_id} deleted.")
|
|
|
+ else:
|
|
|
+ logger.error(f"Failed to delete sale with ID {sale_id}.")
|
|
|
+ return success
|
|
|
+ except sqlite3.Error as e:
|
|
|
+ logger.error(f"Failed to delete sale: {e}")
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ def remove_product_from_sale(self, sale_id: int, product_id: int) -> bool:
|
|
|
+ """Remove a product from a sale"""
|
|
|
+ conn = self._get_connection()
|
|
|
+ cursor = conn.cursor()
|
|
|
+ cursor.execute(
|
|
|
+ "DELETE FROM venta_productos WHERE venta_id = ? AND producto_id = ?",
|
|
|
+ (sale_id, product_id)
|
|
|
+ )
|
|
|
+ conn.commit()
|
|
|
+ success = cursor.rowcount > 0
|
|
|
+ conn.close()
|
|
|
+ if success:
|
|
|
+ logger.info(f"Product {product_id} removed from sale {sale_id}.")
|
|
|
+ else:
|
|
|
+ logger.error(f"Failed to remove product {product_id} from sale {sale_id}.")
|
|
|
+ return success
|
|
|
+
|
|
|
+ #endregion
|
|
|
+
|
|
|
+# Factory class to get service instances
|
|
|
+class DataServiceFactory:
|
|
|
+ """Factory class to create data service instances"""
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def get_user_service() -> UserDataService:
|
|
|
+ """Get user data service instance"""
|
|
|
+ return UserDataService()
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def get_blacklist_service() -> BlacklistDataService:
|
|
|
+ """Get blacklist data service instance"""
|
|
|
+ return BlacklistDataService()
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def get_product_service() -> ProductDataService:
|
|
|
+ """Get product data service instance"""
|
|
|
+ return ProductDataService()
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def get_sales_service() -> SalesDataService:
|
|
|
+ """Get sales data service instance"""
|
|
|
+ return SalesDataService()
|
|
|
+
|
|
|
+
|
|
|
+# Helper functions for background data
|
|
|
def load_bg_data() -> List[Dict[str, str]]:
|
|
|
"""Load background data for AI assistant"""
|
|
|
try:
|
|
|
@@ -19,34 +813,3 @@ def load_bg_data() -> List[Dict[str, str]]:
|
|
|
logger.error(f"Could not decode JSON from {BG_DATA_PATH}. Serving with empty data.")
|
|
|
return []
|
|
|
|
|
|
-
|
|
|
-def load_products() -> List[Dict[str, str]]:
|
|
|
- """Load products data excluding beer IDs"""
|
|
|
- try:
|
|
|
- with open(PRODUCTS_PATH, 'r', encoding='utf-8') as f:
|
|
|
- products = json.load(f)
|
|
|
- return list(filter(lambda product: product['id'] not in EXCLUDED_BEER_IDS, products))
|
|
|
- except FileNotFoundError:
|
|
|
- logger.error(f"Data file not found at {PRODUCTS_PATH}. Serving with empty data.")
|
|
|
- return []
|
|
|
- except json.JSONDecodeError:
|
|
|
- logger.error(f"Could not decode JSON from {PRODUCTS_PATH}. Serving with empty data.")
|
|
|
- return []
|
|
|
-
|
|
|
-
|
|
|
-def load_users() -> List[Dict[str, str]]:
|
|
|
- """Load users data"""
|
|
|
- try:
|
|
|
- with open('users.json', 'r') as f:
|
|
|
- return json.load(f)
|
|
|
- except FileNotFoundError:
|
|
|
- logger.error("ERROR: users.json file not found.")
|
|
|
- return []
|
|
|
- except json.JSONDecodeError:
|
|
|
- logger.error("ERROR: Could not decode JSON from users.json.")
|
|
|
- return []
|
|
|
-
|
|
|
-
|
|
|
-# Load data at module level
|
|
|
-bg_data_loaded = load_bg_data()
|
|
|
-all_products = load_products()
|