| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240 |
- import json
- from math import log
- import os
- import sqlite3
- from typing import List, Dict, Optional, Any
- from abc import ABC, abstractmethod
- from config.settings import BG_DATA_PATH, DB_PATH, PRODUCT_DATA_PATH
- from logging import getLogger
- from datetime import datetime
- from cryptography.fernet import Fernet
- from config.settings import PIN_KEY
- # Import models
- from models.user import User
- from models.items import Product
- from models.sales import Sale
- from models.blacklist import Blacklist
- fernet = Fernet(PIN_KEY.encode())
- logger = getLogger(__name__)
- """
- ESQUEMA DE BASE DE DATOS SQLITE (data.db)
- 1. Tabla: users
- -----------------------------------
- - id INTEGER PRIMARY KEY AUTOINCREMENT
- - email TEXT UNIQUE NOT NULL
- - name TEXT NOT NULL
- - rut TEXT UNIQUE NOT NULL
- - pin_hash TEXT NOT NULL (encriptado)
- - kleincoins TEXT NOT NULL (encriptado, valor por defecto "0")
- - created_at TEXT NOT NULL (fecha de creación en formato ISO 8601)
- (Guarda la información del usuario con su pin hasheado y kleincoins encriptadas)
- 2. Tabla: products
- -----------------------------------
- - id INTEGER PRIMARY KEY
- - name TEXT NOT NULL
- - type TEXT
- - description TEXT
- - price REAL NOT NULL
- - image TEXT (URL de la imagen)
- - status INTEGER DEFAULT 1 NOT NULL CHECK (status IN (0, 1)) -- 0: Inactivo, 1: Activo
- (Guarda los productos disponibles para venta con su estado activo/inactivo)
- 3. Tabla: sales
- -----------------------------------
- - id INTEGER PRIMARY KEY AUTOINCREMENT
- - user_id INTEGER NOT NULL (relación a users.id)
- - total REAL NOT NULL (precio total de la venta)
- - fudo_id TEXT UNIQUE NOT NULL (ID string único por venta)
- - date TEXT NOT NULL (fecha y hora en formato ISO 8601)
- - table INTEGER NOT NULL (número de mesa)
- (Guarda cada venta, asociada a un usuario y mesa)
- 4. Tabla: sale_products
- -----------------------------------
- - sale_id INTEGER NOT NULL (relación a sales.id)
- - product_id INTEGER NOT NULL (relación a products.id)
- - quantity INTEGER NOT NULL DEFAULT 1 (cantidad del producto)
- (Relación muchos a muchos entre ventas y productos con cantidad)
- 5. Tabla: blacklist
- -----------------------------------
- - id INTEGER PRIMARY KEY AUTOINCREMENT
- - user_id INTEGER NOT NULL (relación a users.id)
- (Usuarios bloqueados o no autorizados para ciertas acciones)
- RELACIONES:
- -----------------------------------
- - users puede tener muchas sales
- - sales puede tener muchos productos (y viceversa), por eso se usa una tabla intermedia (sale_products)
- - products pueden repetirse en múltiples sales
- """
- # Base abstract class for data access
- class BaseDataService(ABC):
- """Abstract base class for data services"""
-
- def __init__(self, db_path: str = DB_PATH):
- self.db_path = db_path
-
- def _get_connection(self) -> sqlite3.Connection:
- """Get database connection"""
- return sqlite3.connect(self.db_path)
-
- @abstractmethod
- def get_all(self) -> List[Any]:
- """Get all records"""
- pass
-
- @abstractmethod
- def get_by_id(self, id: int) -> Optional[Any]:
- """Get record by ID"""
- pass
-
- @abstractmethod
- def create(self, **kwargs) -> int:
- """Create new record"""
- pass
-
- @abstractmethod
- def update(self, id: int, **kwargs) -> bool:
- """Update record"""
- pass
-
- @abstractmethod
- def delete(self, id: int) -> bool:
- """Delete record"""
- pass
- # User Data Service
- class UserDataService(BaseDataService):
- """Service for managing user data"""
- #region Create
- def create(self, name: str, email: str, rut: str, pin_hash: str) -> int:
- """Add a new user to the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- cursor.execute(
- "INSERT INTO users (name, email, rut, pin_hash, kleincoins, created_at) VALUES (?, ?, ?, ?, ?, ?)",
- (name, email, rut, fernet.encrypt(pin_hash.encode()).decode(), fernet.encrypt(b"0").decode(), datetime.now().isoformat())
- )
- conn.commit()
- user_id = cursor.lastrowid
- if user_id:
- logger.info(f"User added with ID: {user_id}")
- return user_id
- else:
- logger.error("Failed to add user.")
- return -1
- except sqlite3.IntegrityError as e:
- logger.error(f"Failed to add user: {e}")
- return -1
- finally:
- conn.close()
- #endregion
- #region Read
- def get_all(self) -> List[User]:
- """Get all users from the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM users")
- users = cursor.fetchall()
- conn.close()
- return [
- User(
- id=user[0],
- email=user[1],
- name=user[2],
- rut=user[3],
- pin_hash=fernet.decrypt(user[4].encode()).decode(),
- kleincoins=fernet.decrypt(user[5].encode()).decode(),
- created_at=user[6]
- ) for user in users
- ]
-
- def get_by_id(self, user_id: int) -> Optional[User]:
- """Get user data from the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
- user = cursor.fetchone()
- conn.close()
- if user:
- return User(
- id=user[0],
- email=user[1],
- name=user[2],
- rut=user[3],
- pin_hash=fernet.decrypt(user[4].encode()).decode(),
- kleincoins=fernet.decrypt(user[5].encode()).decode(),
- created_at=user[6]
- )
- return None
-
- def get_by_email(self, email: str) -> Optional[User]:
- """Get user by email"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM users WHERE email = ?", (email,))
- user = cursor.fetchone()
- conn.close()
- logger.debug(f"get_by_email: {email}, user: {user}")
- if user:
- return User(
- id=user[0],
- email=user[1],
- name=user[2],
- rut=user[3],
- pin_hash=user[4],
- kleincoins=fernet.decrypt(user[5].encode()).decode(),
- created_at=user[6]
- )
- return None
-
- def login(self, email: str, pin_hashed: str) -> Optional[User]:
- """Login user by email and pin hash"""
- user = self.get_by_email(email)
- if user and fernet.decrypt(user.pin_hash.encode()).decode() == pin_hashed:
- logger.info(f"User {user.email} logged in successfully.")
- return user
- else:
- logger.error("Login failed: Invalid email or pin.")
- return None
- def permissions(self, user_id: int) -> int:
- """Get user permissions"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT permissions FROM users WHERE id = ?", (user_id,))
- result = cursor.fetchone()
- conn.close()
- if result:
- return result[0]
- return 0
- def get_by_rut(self, rut: str) -> Optional[User]:
- """Get user by RUT"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM users WHERE rut = ?", (rut,))
- user = cursor.fetchone()
- conn.close()
- if user:
- return User(
- id=user[0],
- email=user[1],
- name=user[2],
- rut=user[3],
- pin_hash=fernet.decrypt(user[4].encode()).decode(),
- kleincoins=fernet.decrypt(user[5].encode()).decode(),
- created_at=user[6]
- )
- return None
- #endregion
- #region Update
- def update(self, user_id: int, email=None, name=None, rut=None, pin_hash=None, kleincoins=None) -> bool:
- """Update user information in the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- updates = []
- params = []
- if email:
- updates.append("email = ?")
- params.append(email)
- if name:
- updates.append("name = ?")
- params.append(name)
- if rut:
- updates.append("rut = ?")
- params.append(rut)
- if pin_hash:
- updates.append("pin_hash = ?")
- params.append(fernet.encrypt(pin_hash.encode()).decode())
- if kleincoins is not None:
- updates.append("kleincoins = ?")
- params.append(fernet.encrypt(str(kleincoins).encode()).decode())
- if not updates:
- conn.close()
- return False
- try:
- cursor.execute(f"UPDATE users SET {', '.join(updates)} WHERE id = ?", (*params, user_id))
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"User with ID {user_id} updated.")
- return success
- except sqlite3.IntegrityError as e:
- logger.error(f"Failed to update user: {e}")
- return False
- finally:
- conn.close()
- #endregion
- #region Delete
- def delete(self, user_id: int) -> bool:
- """Delete a user from the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
- conn.commit()
- conn.close()
- if cursor.rowcount > 0:
- logger.info(f"User with ID {user_id} deleted.")
- return True
- else:
- logger.error(f"Failed to delete user with ID {user_id}.")
- return False
-
- def update_kleincoins(self, user_id: int, kleincoins: int) -> bool:
- """Update user's kleincoins"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- cursor.execute(
- "UPDATE users SET kleincoins = ? WHERE id = ?",
- (fernet.encrypt(str(kleincoins).encode()).decode(), user_id)
- )
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"Kleincoins updated for user {user_id}: {kleincoins}")
- return success
- except sqlite3.IntegrityError as e:
- logger.error(f"Failed to update kleincoins: {e}")
- return False
- finally:
- conn.close()
-
- def get_kleincoins(self, user_id: int) -> Optional[int]:
- """Get user's kleincoins"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT kleincoins FROM users WHERE id = ?", (user_id,))
- result = cursor.fetchone()
- conn.close()
- if result:
- try:
- return int(fernet.decrypt(result[0].encode()).decode())
- except Exception as e:
- logger.error(f"Failed to decrypt kleincoins for user {user_id}: {e}")
- return None
- return None
- #endregion
- # Blacklist Data Service
- class BlacklistDataService(BaseDataService):
- """Service for managing blacklisted users"""
- #region Create
- def create(self, user_id: int) -> int:
- """Add a user to the blacklist"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- cursor.execute("INSERT INTO blacklist (user_id) VALUES (?)", (user_id,))
- conn.commit()
- blacklist_id = cursor.lastrowid
- if blacklist_id:
- logger.info(f"User with ID {user_id} added to blacklist.")
- return blacklist_id
- else:
- logger.error(f"Failed to add user with ID {user_id} to blacklist.")
- return -1
- except sqlite3.IntegrityError as e:
- logger.error(f"Failed to add user to blacklist: {e}")
- return -1
- finally:
- conn.close()
- #endregion
- #region Read
- def get_all(self) -> List[Blacklist]:
- """Get all blacklisted users"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("""
- SELECT b.id, b.user_id, u.email, u.name, u.rut
- FROM blacklist b
- LEFT JOIN users u ON b.user_id = u.id
- """)
- blacklisted = cursor.fetchall()
- conn.close()
- return [
- Blacklist(
- id=row[0],
- user_id=row[1],
- email=row[2],
- name=row[3],
- rut=row[4]
- ) for row in blacklisted
- ]
-
- def get_by_id(self, id: int) -> Optional[Blacklist]:
- """Get blacklist entry by ID"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("""
- SELECT b.id, b.user_id, u.email, u.name, u.rut
- FROM blacklist b
- LEFT JOIN users u ON b.user_id = u.id
- WHERE b.id = ?
- """, (id,))
- row = cursor.fetchone()
- conn.close()
- if row:
- return Blacklist(
- id=row[0],
- user_id=row[1],
- email=row[2],
- name=row[3],
- rut=row[4]
- )
- return None
-
- def get_blacklisted_user_ids(self) -> List[int]:
- """Get a list of blacklisted user IDs"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT user_id FROM blacklist")
- blacklisted_users = [row[0] for row in cursor.fetchall()]
- conn.close()
- return blacklisted_users
-
- def is_user_blacklisted(self, user_id: int) -> bool:
- """Check if a user is blacklisted"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM blacklist WHERE user_id = ?", (user_id,))
- blacklisted = cursor.fetchone() is not None
- conn.close()
- return blacklisted
- #endregion
- #region Update
- def update(self, id: int, **kwargs) -> bool:
- """Update blacklist entry (not commonly used)"""
- # Blacklist entries typically don't need updates
- return False
- #endregion
- #region Delete
- def delete(self, id: int) -> bool:
- """Remove a blacklist entry by ID"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("DELETE FROM blacklist WHERE id = ?", (id,))
- conn.commit()
- success = cursor.rowcount > 0
- conn.close()
- if success:
- logger.info(f"Blacklist entry with ID {id} removed.")
- else:
- logger.error(f"Failed to remove blacklist entry with ID {id}.")
- return success
-
- def remove_user_from_blacklist(self, user_id: int) -> bool:
- """Remove a user from the blacklist"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
- conn.commit()
- success = cursor.rowcount > 0
- conn.close()
- if success:
- logger.info(f"User with ID {user_id} removed from blacklist.")
- else:
- logger.error(f"Failed to remove user with ID {user_id} from blacklist.")
- return success
- #endregion
- # Product Data Service
- class ProductDataService(BaseDataService):
- """Service for managing products"""
- #region Create
- def create(self, id: int, name: str, price: float, type: Optional[str] = None, description: Optional[str] = None, image: Optional[str] = None, status: int = 1) -> int:
- """Add a new product to the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- cursor.execute(
- "INSERT INTO products (id, name, type, description, price, image, status) VALUES (?, ?, ?, ?, ?, ?, ?)",
- (id, name, type, description, price, image, status)
- )
- conn.commit()
- product_id = cursor.lastrowid
- if product_id:
- logger.info(f"Product added with ID: {product_id}")
- return product_id
- else:
- logger.error("Failed to add product.")
- return -1
- except sqlite3.Error as e:
- logger.error(f"Failed to add product: {e}")
- return -1
- finally:
- conn.close()
- def load_data(self, products: List[Dict[str, str]]) -> None:
- """Load multiple products from a list of dictionaries"""
- conn = self._get_connection()
- cursor = conn.cursor()
- for product in products:
- try:
- self.create(
- id=int(product['id']),
- name=product['name'],
- price=int(product['price']),
- type=product.get('type'),
- description=product.get('description'),
- image=product.get('image'),
- status=int(product.get('status', 1)) # Default to active (1)
- )
- except Exception as e:
- logger.error(f"Failed to load product {product['name']}: {e}")
- conn.close()
- #endregion
- #region Read
- def get_all(self) -> List[Product]:
- """Get all products from the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM products")
- products = cursor.fetchall()
- conn.close()
- return [
- Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6]
- ) for product in products
- ]
-
- def get_by_id(self, product_id: int) -> Optional[Product]:
- """Get product by ID"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM products WHERE id = ?", (product_id,))
- product = cursor.fetchone()
- conn.close()
- if product:
- return Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6]
- )
- return None
-
- def get_by_type(self, product_type: str) -> List[Product]:
- """Get products by type"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM products WHERE type = ?", (product_type,))
- products = cursor.fetchall()
- conn.close()
- return [
- Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6]
- ) for product in products
- ]
-
- def search_by_name(self, name: str) -> List[Product]:
- """Search products by name"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM products WHERE name LIKE ?", (f"%{name}%",))
- products = cursor.fetchall()
- conn.close()
- return [
- Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6]
- ) for product in products
- ]
-
- def get_products(self, product_ids: List[int]) -> List[Product]:
- """Get multiple products by their IDs"""
- if not product_ids:
- return []
- placeholders = ', '.join('?' for _ in product_ids)
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute(f"SELECT * FROM products WHERE id IN ({placeholders})", product_ids)
- products = cursor.fetchall()
- conn.close()
- return [
- Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6]
- ) for product in products
- ]
- #endregion
- #region Update
- def update(self, product_id: int, name=None, type=None, description=None, price=None, image=None, status=None) -> bool:
- """Update product information"""
- conn = self._get_connection()
- cursor = conn.cursor()
- updates = []
- params = []
- if name is not None:
- updates.append("name = ?")
- params.append(name)
- if type is not None:
- updates.append("type = ?")
- params.append(type)
- if description is not None:
- updates.append("description = ?")
- params.append(description)
- if price is not None:
- updates.append("price = ?")
- params.append(price)
- if image is not None:
- updates.append("image = ?")
- params.append(image)
- if status is not None:
- updates.append("status = ?")
- params.append(status)
- if not updates:
- conn.close()
- return False
- try:
- cursor.execute(f"UPDATE products SET {', '.join(updates)} WHERE id = ?", (*params, product_id))
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"Product with ID {product_id} updated.")
- return success
- except sqlite3.Error as e:
- logger.error(f"Failed to update product: {e}")
- return False
- finally:
- conn.close()
-
- def get_active_products(self) -> List[Product]:
- """Get only active products (status = 1)"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM products WHERE status = 1")
- products = cursor.fetchall()
- conn.close()
- return [
- Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6]
- ) for product in products
- ]
-
- def get_inactive_products(self) -> List[Product]:
- """Get only inactive products (status = 0)"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM products WHERE status = 0")
- products = cursor.fetchall()
- conn.close()
- return [
- Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6]
- ) for product in products
- ]
-
- def activate_product(self, product_id: int) -> bool:
- """Activate a product (set status to 1)"""
- return self.update(product_id, status=1)
-
- def deactivate_product(self, product_id: int) -> bool:
- """Deactivate a product (set status to 0)"""
- return self.update(product_id, status=0)
-
- def is_product_active(self, product_id: int) -> Optional[bool]:
- """Check if a product is active"""
- product = self.get_by_id(product_id)
- if product:
- return product.status == 1
- return None
- #endregion
- #region Delete
- def delete(self, product_id: int) -> bool:
- """Delete a product from the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("DELETE FROM products WHERE id = ?", (product_id,))
- conn.commit()
- success = cursor.rowcount > 0
- conn.close()
- if success:
- logger.info(f"Product with ID {product_id} deleted.")
- else:
- logger.error(f"Failed to delete product with ID {product_id}.")
- return success
- #endregion
- # Sales Data Service
- class SalesDataService(BaseDataService):
- """Service for managing sales"""
- #region C
- def create(self, user_id: int, fudo_id: str, total: float, table: int, product_ids: List[int], quantities: Optional[List[int]] = None) -> int:
- """Create a new sale with products and quantities"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- fecha = datetime.now().isoformat()
-
- # Insert sale
- cursor.execute(
- "INSERT INTO sales (user_id, total, fudo_id, date, 'table') VALUES (?, ?, ?, ?, ?)",
- (user_id, total, fudo_id, fecha, table)
- )
- sale_id = cursor.lastrowid
-
- if sale_id and product_ids:
- # Insert sale-product relationships with quantities
- if quantities is None:
- quantities = [1] * len(product_ids) # Default quantity 1
-
- for i, product_id in enumerate(product_ids):
- quantity = quantities[i] if i < len(quantities) else 1
- cursor.execute(
- "INSERT INTO sale_products (sale_id, product_id, quantity) VALUES (?, ?, ?)",
- (sale_id, product_id, quantity)
- )
-
- conn.commit()
- if sale_id:
- logger.info(f"Sale created with ID: {sale_id}, fudo_id: {fudo_id}")
- return sale_id
- else:
- logger.error("Failed to create sale.")
- return -1
- except sqlite3.Error as e:
- logger.error(f"Failed to create sale: {e}")
- conn.rollback()
- return -1
- finally:
- conn.close()
-
- def add_product_to_sale(self, sale_id: int, product_id: int, quantity: int = 1) -> bool:
- """Add a product to an existing sale with quantity"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- cursor.execute(
- "INSERT INTO sale_products (sale_id, product_id, quantity) VALUES (?, ?, ?)",
- (sale_id, product_id, quantity)
- )
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"Product {product_id} (qty: {quantity}) added to sale {sale_id}.")
- return success
- except sqlite3.IntegrityError as e:
- logger.error(f"Failed to add product to sale: {e}")
- return False
- finally:
- conn.close()
-
- def add_products_to_sale(self, sale_id: int, product_ids: List[int], quantities: Optional[List[int]] = None) -> bool:
- """Add multiple products to an existing sale with quantities"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- if quantities is None:
- quantities = [1] * len(product_ids) # Default quantity 1
-
- for i, product_id in enumerate(product_ids):
- quantity = quantities[i] if i < len(quantities) else 1
- cursor.execute(
- "INSERT INTO sale_products (sale_id, product_id, quantity) VALUES (?, ?, ?)",
- (sale_id, product_id, quantity)
- )
-
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"Products added to sale {sale_id}.")
- return success
- except sqlite3.IntegrityError as e:
- logger.error(f"Failed to add products to sale: {e}")
- return False
- finally:
- conn.close()
- #endregion
- #region R
- def get_all(self) -> List[Sale]:
- """Get all sales from the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("""
- SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table, u.name, u.email
- FROM sales s
- LEFT JOIN users u ON s.user_id = u.id
- ORDER BY s.date DESC
- """)
- sales = cursor.fetchall()
- conn.close()
- return [
- Sale(
- id=sale[0],
- user_id=sale[1],
- total=sale[2],
- fudo_id=sale[3],
- date=sale[4],
- table=sale[5],
- username=sale[6],
- user_email=sale[7],
- products=self.get_sale_products(sale[0])
- ) for sale in sales
- ]
-
- def get_by_id(self, sale_id: int) -> Optional[Sale]:
- """Get sale by ID"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("""
- SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table, u.name, u.email
- FROM sales s
- LEFT JOIN users u ON s.user_id = u.id
- WHERE s.id = ?
- """, (sale_id,))
- sale = cursor.fetchone()
- conn.close()
- if sale:
- return Sale(
- id=sale[0],
- user_id=sale[1],
- total=sale[2],
- fudo_id=sale[3],
- date=sale[4],
- table=sale[5],
- username=sale[6],
- user_email=sale[7]
- )
- return None
-
- def get_by_fudo_id(self, fudo_id: str) -> Optional[Sale]:
- """Get sale by fudo_id"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("""
- SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table, u.name, u.email
- FROM sales s
- LEFT JOIN users u ON s.user_id = u.id
- WHERE s.fudo_id = ?
- """, (fudo_id,))
- sale = cursor.fetchone()
- conn.close()
- if sale:
- return Sale(
- id=sale[0],
- user_id=sale[1],
- total=sale[2],
- fudo_id=sale[3],
- date=sale[4],
- table=sale[5],
- username=sale[6],
- user_email=sale[7]
- )
- return None
-
- def get_by_user(self, user_id: int) -> List[Sale]:
- """Get sales by user ID"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute(
- """
- SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s."table", u.name, u.email
- FROM sales s
- LEFT JOIN users u ON s.user_id = u.id
- WHERE s.user_id = ?
- ORDER BY s.date DESC
- """,
- (user_id,)
- )
- sales = cursor.fetchall()
- conn.close()
- return [
- Sale(
- id=sale[0],
- user_id=sale[1],
- total=sale[2],
- fudo_id=sale[3],
- date=sale[4],
- table=sale[5],
- username=sale[6],
- user_email=sale[7],
- products=self.get_sale_products(sale[0])
- ) for sale in sales
- ]
-
- def get_sale_products_ids(self, sale_id: int) -> List[int]:
- """Get product IDs for a specific sale"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("""
- SELECT product_id FROM sale_products WHERE sale_id = ?
- """, (sale_id,))
- products = cursor.fetchall()
- conn.close()
- return [product[0] for product in products]
- def get_sale_products(self, sale_id: int) -> List[Product]:
- """Get products for a specific sale with quantities"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("""
- SELECT p.id, p.name, p.type, p.description, p.price, p.image, p.status, sp.quantity
- FROM sale_products sp
- JOIN products p ON sp.product_id = p.id
- WHERE sp.sale_id = ?
- """, (sale_id,))
- products = cursor.fetchall()
- conn.close()
- return [
- Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6],
- quantity=product[7]
- ) for product in products
- ]
-
- def get_by_table(self, table: int) -> List[Sale]:
- """Get sales by table number"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute(
- """
- SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s."table", u.name, u.email
- FROM sales s
- LEFT JOIN users u ON s.user_id = u.id
- WHERE s."table" = ?
- ORDER BY s.date DESC
- """,
- (table,)
- )
- sales = cursor.fetchall()
- conn.close()
- return [
- Sale(
- id=sale[0],
- user_id=sale[1],
- total=sale[2],
- fudo_id=sale[3],
- date=sale[4],
- table=sale[5],
- username=sale[6],
- user_email=sale[7]
- ) for sale in sales
- ]
-
- def update_product_quantity(self, sale_id: int, product_id: int, new_quantity: int) -> bool:
- """Update quantity of a product in a sale"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- cursor.execute(
- "UPDATE sale_products SET quantity = ? WHERE sale_id = ? AND product_id = ?",
- (new_quantity, sale_id, product_id)
- )
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"Updated quantity to {new_quantity} for product {product_id} in sale {sale_id}.")
- return success
- except sqlite3.Error as e:
- logger.error(f"Failed to update product quantity: {e}")
- return False
- finally:
- conn.close()
-
- def get_product_quantity_in_sale(self, sale_id: int, product_id: int) -> Optional[int]:
- """Get quantity of a specific product in a sale"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute(
- "SELECT quantity FROM sale_products WHERE sale_id = ? AND product_id = ?",
- (sale_id, product_id)
- )
- result = cursor.fetchone()
- conn.close()
- return result[0] if result else None
- #endregion
- #region U
- def update(self, sale_id: int, user_id=None, total=None, table=None) -> bool:
- """Update sale information (products should be updated via specific methods)"""
- conn = self._get_connection()
- cursor = conn.cursor()
- updates = []
- params = []
- if user_id is not None:
- updates.append("user_id = ?")
- params.append(user_id)
- if total is not None:
- updates.append("total = ?")
- params.append(total)
- if table is not None:
- updates.append("table = ?")
- params.append(table)
- if not updates:
- conn.close()
- return False
- try:
- cursor.execute(f"UPDATE sales SET {', '.join(updates)} WHERE id = ?", (*params, sale_id))
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"Sale with ID {sale_id} updated.")
- return success
- except sqlite3.Error as e:
- logger.error(f"Failed to update sale: {e}")
- return False
- finally:
- conn.close()
- #endregion
- #region D
- def delete(self, sale_id: int) -> bool:
- """Delete a sale and its product relationships"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- # Delete sale-product relationships first
- cursor.execute("DELETE FROM sale_products WHERE sale_id = ?", (sale_id,))
- # Delete the sale
- cursor.execute("DELETE FROM sales WHERE id = ?", (sale_id,))
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"Sale with ID {sale_id} deleted.")
- else:
- logger.error(f"Failed to delete sale with ID {sale_id}.")
- return success
- except sqlite3.Error as e:
- logger.error(f"Failed to delete sale: {e}")
- return False
- finally:
- conn.close()
-
- def remove_product_from_sale(self, sale_id: int, product_id: int) -> bool:
- """Remove a product from a sale (removes all quantity)"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute(
- "DELETE FROM sale_products WHERE sale_id = ? AND product_id = ?",
- (sale_id, product_id)
- )
- conn.commit()
- success = cursor.rowcount > 0
- conn.close()
- if success:
- logger.info(f"Product {product_id} removed from sale {sale_id}.")
- else:
- logger.error(f"Failed to remove product {product_id} from sale {sale_id}.")
- return success
-
- def decrease_product_quantity(self, sale_id: int, product_id: int, decrease_by: int = 1) -> bool:
- """Decrease quantity of a product in a sale, removes if quantity becomes 0 or less"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- # Get current quantity
- cursor.execute(
- "SELECT quantity FROM sale_products WHERE sale_id = ? AND product_id = ?",
- (sale_id, product_id)
- )
- result = cursor.fetchone()
- if not result:
- return False
-
- current_quantity = result[0]
- new_quantity = current_quantity - decrease_by
-
- if new_quantity <= 0:
- # Remove the product completely
- cursor.execute(
- "DELETE FROM sale_products WHERE sale_id = ? AND product_id = ?",
- (sale_id, product_id)
- )
- logger.info(f"Product {product_id} removed from sale {sale_id} (quantity reached 0).")
- else:
- # Update with new quantity
- cursor.execute(
- "UPDATE sale_products SET quantity = ? WHERE sale_id = ? AND product_id = ?",
- (new_quantity, sale_id, product_id)
- )
- logger.info(f"Product {product_id} quantity decreased to {new_quantity} in sale {sale_id}.")
-
- conn.commit()
- return True
- except sqlite3.Error as e:
- logger.error(f"Failed to decrease product quantity: {e}")
- return False
- finally:
- conn.close()
- #endregion
- # Factory class to get service instances
- class DataServiceFactory:
- """Factory class to create data service instances"""
-
- @staticmethod
- def get_user_service() -> UserDataService:
- """Get user data service instance"""
- return UserDataService()
-
- @staticmethod
- def get_blacklist_service() -> BlacklistDataService:
- """Get blacklist data service instance"""
- return BlacklistDataService()
-
- @staticmethod
- def get_product_service() -> ProductDataService:
- """Get product data service instance"""
- return ProductDataService()
-
- @staticmethod
- def get_sales_service() -> SalesDataService:
- """Get sales data service instance"""
- return SalesDataService()
- # Helper functions for background data
- def load_bg_data() -> List[Dict[str, str]]:
- """Load background data for AI assistant"""
- try:
- with open(BG_DATA_PATH, 'r', encoding='utf-8') as f:
- return json.load(f)
- except FileNotFoundError:
- logger.error(f"Data file not found at {BG_DATA_PATH}. Serving with empty data.")
- return []
- except json.JSONDecodeError:
- logger.error(f"Could not decode JSON from {BG_DATA_PATH}. Serving with empty data.")
- return []
- def initialize_db():
- if os.path.exists(DB_PATH):
- logger.info("Base de datos ya existe, no se necesita inicializar.")
- return
- conn = sqlite3.connect(DB_PATH)
- cursor = conn.cursor()
- # Crear tabla de usuarios
- logger.info("Inicializando base de datos...")
- logger.info("Creando tabla de usuarios...")
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS users (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- email TEXT UNIQUE NOT NULL,
- name TEXT NOT NULL,
- rut TEXT UNIQUE NOT NULL,
- pin_hash TEXT NOT NULL,
- kleincoins TEXT NOT NULL,
- created_at TEXT NOT NULL,
- permissions INTEGER DEFAULT 0 NOT NULL CHECK (permissions IN (0, 1, 2)), -- 0: Usuario normal, 1: Administrador, 2: Superusuario
- );
- """)
- # Crear tabla de productos
- logger.info("Creando tabla de products...")
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS products (
- id INTEGER PRIMARY KEY,
- name TEXT NOT NULL,
- type TEXT,
- description TEXT,
- price REAL NOT NULL,
- image TEXT,
- status INTEGER DEFAULT 1 NOT NULL CHECK (status IN (0, 1)) -- 0: Inactivo, 1: Activo
- );
- """)
- # Crear tabla de ventas
- logger.info("Creando tabla de sales...")
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS sales (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- user_id INTEGER NOT NULL,
- total REAL NOT NULL,
- fudo_id TEXT NOT NULL,
- date TEXT NOT NULL,
- "table" INTEGER NOT NULL,
- FOREIGN KEY (user_id) REFERENCES users(id)
- );
- """)
- # Crear tabla intermedia para ventas y productos
- logger.info("Creando tabla intermedia de sale_products...")
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS sale_products (
- sale_id INTEGER NOT NULL,
- product_id INTEGER NOT NULL,
- quantity INTEGER NOT NULL DEFAULT 1,
- FOREIGN KEY (sale_id) REFERENCES sales(id),
- FOREIGN KEY (product_id) REFERENCES products(id)
- );
- """)
- # Crear tabla de blacklist
- logger.info("Creando tabla de blacklist...")
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS blacklist (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- user_id INTEGER NOT NULL,
- FOREIGN KEY (user_id) REFERENCES users(id)
- );
- """)
- logger.info("Todas las tablas creadas correctamente.")
- logger.info("Cargando datos de productos desde el archivo JSON...")
- products_json = json.loads(open(PRODUCT_DATA_PATH, 'r', encoding='utf-8').read())
- product_service = ProductDataService()
- product_service.load_data(products_json)
- conn.commit()
-
- conn.close()
- logger.info("Base de datos inicializada correctamente.")
- data_bg_loaded = load_bg_data()
|