| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401 |
- import json
- from math import log
- import os
- import re
- import psycopg2
- from config.settings import POSTGRESQL_DB_CONFIG
- from typing import List, Dict, Optional, Any
- from abc import ABC, abstractmethod
- from config.settings import BG_DATA_PATH, IMAGE_PATH, PRODUCT_DATA_PATH, CURRENT_URL
- from logging import getLogger
- from datetime import datetime
- from cryptography.fernet import Fernet
- from config.settings import PIN_KEY
- import base64 as b64
- # Import models
- from models.user import User
- from models.items import Product
- from models.sales import Sale
- from models.blacklist import Blacklist
- fernet = Fernet(PIN_KEY.encode())
- logger = getLogger(__name__)
- """
- ESQUEMA DE BASE DE DATOS SQLITE (data.db)
- 1. Tabla: users
- -----------------------------------
- - id SERIAL PRIMARY KEY
- - email VARCHAR(255) UNIQUE NOT NULL
- - name VARCHAR(255) NOT NULL
- - rut VARCHAR(20) UNIQUE NOT NULL
- - pin_hash TEXT NOT NULL (encriptado)
- - kleincoins TEXT NOT NULL (encriptado, valor por defecto "0")
- - created_at TEXT NOT NULL (fecha de creación en formato ISO 8601)
- - reward_progress INTEGER DEFAULT 0 NOT NULL CHECK (reward_progress >= 0 AND reward_progress <= 100)
- (Guarda la información del usuario con su pin hasheado y kleincoins encriptadas)
- 2. Tabla: products
- -----------------------------------
- - id INTEGER PRIMARY KEY
- - name VARCHAR(255) NOT NULL
- - type VARCHAR(100) NOT NULL
- - description TEXT NOT NULL
- - price INTEGER NOT NULL
- - image TEXT (URL de la imagen)
- - status INTEGER DEFAULT 1 NOT NULL CHECK (status IN (0, 1)) -- 0: Inactivo, 1: Activo
- - promo_id INTEGER (ID de la promoción asociada, si existe)(puede ser null)
- - promo_price INTEGER (Precio promocional, si existe)(puede ser null)
- - promo_day INTEGER (Día de la semana para la promoción, 1-7)(puede ser null)
- (Guarda los productos disponibles para venta con su estado activo/inactivo)
- 3. Tabla: sales
- -----------------------------------
- - id SERIAL PRIMARY KEY
- - user_id INTEGER NOT NULL REFERENCES users.id
- - total INTEGER NOT NULL (precio total de la venta)
- - fudo_id TEXT UNIQUE NOT NULL (ID string único por venta)
- - date TEXT NOT NULL (fecha y hora en formato ISO 8601)
- - table INTEGER NOT NULL (número de mesa)
- (Guarda cada venta, asociada a un usuario y mesa)
- 4. Tabla: sale_products
- -----------------------------------
- - sale_id INTEGER NOT NULL (relación a sales.id)
- - product_id INTEGER NOT NULL (relación a products.id)
- - quantity INTEGER NOT NULL DEFAULT 1 (cantidad del producto)
- (Relación muchos a muchos entre ventas y productos con cantidad)
- 5. Tabla: blacklist
- -----------------------------------
- - id SERIAL PRIMARY KEY
- - user_id INTEGER NOT NULL (relación a users.id)
- (Usuarios bloqueados o no autorizados para ciertas acciones)
- RELACIONES:
- -----------------------------------
- - users puede tener muchas sales
- - sales puede tener muchos productos (y viceversa), por eso se usa una tabla intermedia (sale_products)
- - products pueden repetirse en múltiples sales
- """
- # Base abstract class for data access
- class BaseDataService(ABC):
- """Abstract base class for data services"""
- def __init__(self):
- self.db_config = POSTGRESQL_DB_CONFIG
- def _get_connection(self):
- """Get database connection"""
- return psycopg2.connect(
- dbname=self.db_config['dbname'],
- user=self.db_config['user'],
- password=self.db_config['password'],
- host=self.db_config['host'],
- port=self.db_config['port']
- )
- @abstractmethod
- def get_all(self) -> List[Any]:
- """Get all records"""
- pass
-
- @abstractmethod
- def get_by_id(self, id: int) -> Optional[Any]:
- """Get record by ID"""
- pass
-
- @abstractmethod
- def create(self, **kwargs) -> int:
- """Create new record"""
- pass
-
- @abstractmethod
- def update(self, id: int, **kwargs) -> bool:
- """Update record"""
- pass
-
- @abstractmethod
- def delete(self, id: int) -> bool:
- """Delete record"""
- pass
- # User Data Service
- class UserDataService(BaseDataService):
- """Service for managing user data"""
- #region Create
- def create(self, name: str, email: str, rut: str, pin_hash: str) -> int:
- """Add a new user to the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- cursor.execute(
- "INSERT INTO users (name, email, rut, pin_hash, kleincoins, created_at) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id",
- (name, email, rut, fernet.encrypt(pin_hash.encode()).decode(), fernet.encrypt(b"0").decode(), datetime.now().isoformat())
- )
- conn.commit()
- user_id = cursor.fetchone()
- if user_id:
- logger.info(f"User added with ID: {user_id[0]}")
- return user_id[0]
- else:
- logger.error("Failed to add user.")
- return -1
- except psycopg2.IntegrityError as e:
- logger.error(f"Failed to add user: {e}")
- return -1
- finally:
- conn.close()
- #endregion
- #region Read
- def get_all(self) -> List[User]:
- """Get all users from the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM users")
- users = cursor.fetchall()
- conn.close()
- return [
- User(
- id=user[0],
- email=user[1],
- name=user[2],
- rut=user[3],
- pin_hash=fernet.decrypt(user[4].encode()).decode(),
- kleincoins=fernet.decrypt(user[5].encode()).decode(),
- created_at=user[6],
- permissions=user[7],
- reward_progress=user[8]
- ) for user in users
- ]
-
- def get_by_id(self, user_id: int) -> Optional[User]:
- """Get user data from the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
- user = cursor.fetchone()
- conn.close()
- if user:
- return User(
- id=user[0],
- email=user[1],
- name=user[2],
- rut=user[3],
- pin_hash=fernet.decrypt(user[4].encode()).decode(),
- kleincoins=fernet.decrypt(user[5].encode()).decode(),
- created_at=user[6],
- permissions=user[7],
- reward_progress=user[8]
- )
- return None
-
- def get_by_email(self, email: str) -> Optional[User]:
- """Get user by email"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM users WHERE email = %s", (email,))
- user = cursor.fetchone()
- conn.close()
- logger.debug(f"get_by_email: {email}, user: {user}")
- if user:
- return User(
- id=user[0],
- email=user[1],
- name=user[2],
- rut=user[3],
- pin_hash=user[4],
- kleincoins=fernet.decrypt(user[5].encode()).decode(),
- created_at=user[6],
- permissions=user[7],
- reward_progress=user[8]
- )
- return None
-
- def login(self, email: str, pin_hashed: str) -> Optional[User]:
- """Login user by email and pin hash"""
- user = self.get_by_email(email)
- if user:
- logger.info(f"comparing {fernet.decrypt(user.pin_hash.encode()).decode()} with {pin_hashed}")
- if user and fernet.decrypt(user.pin_hash.encode()).decode() == pin_hashed:
- logger.info(f"User {user.email} logged in successfully.")
- return user
- else:
- logger.error("Login failed: Invalid email or pin.")
- return None
- def permissions(self, user_id: int) -> int:
- """Get user permissions"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT permissions FROM users WHERE id = %s", (user_id,))
- result = cursor.fetchone()
- if not result:
- logger.error(f"User with ID {user_id} not found.")
- return 0
- logger.info(f"userID: {user_id}, permissions: {result[0]}")
- result = result[0]
- conn.close()
- if result:
- return result
- return 0
- def get_by_rut(self, rut: str) -> Optional[User]:
- """Get user by RUT"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM users WHERE rut = %s", (rut,))
- user = cursor.fetchone()
- if not user:
- logger.error(f"User with RUT {rut} not found.")
- conn.close()
- return None
- user = user[0]
- conn.close()
- if user:
- return User(
- id=user[0],
- email=user[1],
- name=user[2],
- rut=user[3],
- pin_hash=fernet.decrypt(user[4].encode()).decode(),
- kleincoins=fernet.decrypt(user[5].encode()).decode(),
- created_at=user[6],
- permissions=user[7],
- reward_progress=user[8]
- )
- return None
-
- def get_next_id(self) -> int:
- """Get the next user ID"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT last_value FROM users_id_seq")
- result = cursor.fetchone()
- conn.close()
- if result and result[0]:
- return result[0]
- return 1
- #endregion
- #region Update
- def update(self, user_id: int, email=None, name=None, rut=None, pin_hash=None, kleincoins=None) -> bool:
- """Update user information in the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- updates = []
- params = []
- if email:
- updates.append("email = %s")
- params.append(email)
- if name:
- updates.append("name = %s")
- params.append(name)
- if rut:
- updates.append("rut = %s")
- params.append(rut)
- if pin_hash:
- updates.append("pin_hash = %s")
- params.append(fernet.encrypt(pin_hash.encode()).decode())
- if kleincoins is not None:
- updates.append("kleincoins = %s")
- params.append(fernet.encrypt(str(kleincoins).encode()).decode())
- if not updates:
- conn.close()
- return False
- try:
- cursor.execute(f"UPDATE users SET {', '.join(updates)} WHERE id = %s", (*params, user_id))
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"User with ID {user_id} updated.")
- return success
- except psycopg2.IntegrityError as e:
- logger.error(f"Failed to update user: {e}")
- return False
- finally:
- conn.close()
- #endregion
-
- def set_reward_progress(self, user_id: int, progress: int) -> bool:
- """Add progress to user's reward"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- cursor.execute("UPDATE users SET reward_progress = %s WHERE id = %s", (progress, user_id))
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"Reward progress updated for user {user_id}: {progress}")
- return success
- except psycopg2.IntegrityError as e:
- logger.error(f"Failed to update reward progress: {e}")
- return False
- finally:
- conn.close()
-
- #region Delete
- def delete(self, user_id: int) -> bool:
- """Delete a user from the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
- conn.commit()
- conn.close()
- if cursor.rowcount > 0:
- logger.info(f"User with ID {user_id} deleted.")
- return True
- else:
- logger.error(f"Failed to delete user with ID {user_id}.")
- return False
-
- def update_kleincoins(self, user_id: int, kleincoins: int) -> bool:
- """Update user's kleincoins"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- cursor.execute(
- "UPDATE users SET kleincoins = %s WHERE id = %s",
- (fernet.encrypt(str(kleincoins).encode()).decode(), user_id)
- )
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"Kleincoins updated for user {user_id}: {kleincoins}")
- return success
- except psycopg2.IntegrityError as e:
- logger.error(f"Failed to update kleincoins: {e}")
- return False
- finally:
- conn.close()
-
- def get_kleincoins(self, user_id: int) -> Optional[int]:
- """Get user's kleincoins"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT kleincoins FROM users WHERE id = %s", (user_id,))
- result = cursor.fetchone()
- conn.close()
- if result:
- try:
- return int(fernet.decrypt(result[0].encode()).decode())
- except Exception as e:
- logger.error(f"Failed to decrypt kleincoins for user {user_id}: {e}")
- return None
- return None
- #endregion
- # Blacklist Data Service
- class BlacklistDataService(BaseDataService):
- """Service for managing blacklisted users"""
- #region Create
- def create(self, user_id: int) -> int:
- """Add a user to the blacklist"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- cursor.execute("INSERT INTO blacklist (user_id) VALUES (%s) RETURNING id", (user_id,))
- conn.commit()
- blacklist_id = cursor.fetchone()
- if blacklist_id:
- logger.info(f"User with ID {blacklist_id[0]} added to blacklist.")
- return blacklist_id[0]
- else:
- logger.error(f"Failed to add user with ID {user_id} to blacklist.")
- return -1
- except psycopg2.IntegrityError as e:
- logger.error(f"Failed to add user to blacklist: {e}")
- return -1
- finally:
- conn.close()
- #endregion
- #region Read
- def get_all(self) -> List[Blacklist]:
- """Get all blacklisted users"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("""
- SELECT b.id, b.user_id, u.email, u.name, u.rut
- FROM blacklist b
- LEFT JOIN users u ON b.user_id = u.id
- """)
- blacklisted = cursor.fetchall()
- conn.close()
- return [
- Blacklist(
- id=row[0],
- user_id=row[1],
- email=row[2],
- name=row[3],
- rut=row[4]
- ) for row in blacklisted
- ]
-
- def get_by_id(self, id: int) -> Optional[Blacklist]:
- """Get blacklist entry by ID"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("""
- SELECT b.id, b.user_id, u.email, u.name, u.rut
- FROM blacklist b
- LEFT JOIN users u ON b.user_id = u.id
- WHERE b.id = %s
- """, (id,))
- row = cursor.fetchone()
- conn.close()
- if row:
- return Blacklist(
- id=row[0],
- user_id=row[1],
- email=row[2],
- name=row[3],
- rut=row[4]
- )
- return None
-
- def get_blacklisted_user_ids(self) -> List[int]:
- """Get a list of blacklisted user IDs"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT user_id FROM blacklist")
- blacklisted_users = [row[0] for row in cursor.fetchall()]
- conn.close()
- return blacklisted_users
-
- def is_user_blacklisted(self, user_id: int) -> bool:
- """Check if a user is blacklisted"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM blacklist WHERE user_id = %s", (user_id,))
- blacklisted = cursor.fetchone() is not None
- conn.close()
- return blacklisted
- #endregion
- #region Update
- def update(self, id: int, **kwargs) -> bool:
- """Update blacklist entry (not commonly used)"""
- # Blacklist entries typically don't need updates
- return False
- #endregion
- #region Delete
- def delete(self, id: int) -> bool:
- """Remove a blacklist entry by ID"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("DELETE FROM blacklist WHERE id = %s", (id,))
- conn.commit()
- success = cursor.rowcount > 0
- conn.close()
- if success:
- logger.info(f"Blacklist entry with ID {id} removed.")
- else:
- logger.error(f"Failed to remove blacklist entry with ID {id}.")
- return success
-
- def remove_user_from_blacklist(self, user_id: int) -> bool:
- """Remove a user from the blacklist"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("DELETE FROM blacklist WHERE user_id = %s", (user_id,))
- conn.commit()
- success = cursor.rowcount > 0
- conn.close()
- if success:
- logger.info(f"User with ID {user_id} removed from blacklist.")
- else:
- logger.error(f"Failed to remove user with ID {user_id} from blacklist.")
- return success
- #endregion
- # Product Data Service
- class ProductDataService(BaseDataService):
- """Service for managing products"""
- #region Create
- def create(self, id: int, name: str, price: float, type: Optional[str] = None, description: Optional[str] = None, image: Optional[str] = None, status: int = 1) -> int:
- """Add a new product to the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- if image and "base64" in image:
- extension = re.search(r'data:image/(.*%s);base64,', image)
- image_name = f"{id}_img"
- if extension:
- image_name += f".{extension.group(1)}"
- image = self._image_process(image_name, image)
- try:
- cursor.execute(
- "INSERT INTO products (id, name, type, description, price, image, status) VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id",
- (id, name, type, description, price, image, status)
- )
- conn.commit()
- product_id = cursor.fetchone()
- if product_id:
- logger.info(f"Product added with ID: {product_id[0]}")
- return product_id[0]
- else:
- logger.error("Failed to add product.")
- return -1
- except psycopg2.Error as e:
- logger.error(f"Failed to add product: {e}")
- return -1
- finally:
- conn.close()
- def load_data(self, products: List[Dict[str, str]]) -> None:
- """Load multiple products from a list of dictionaries"""
- conn = self._get_connection()
- cursor = conn.cursor()
- for product in products:
- try:
- self.create(
- id=int(product['id']),
- name=product['name'],
- price=int(product['price']),
- type=product.get('type'),
- description=product.get('description'),
- image=product.get('image'),
- status=int(product.get('status', 1)) # Default to active (1)
- )
- except Exception as e:
- logger.error(f"Failed to load product {product['name']}: {e}")
- conn.close()
- #endregion
- #region Read
- def get_all(self) -> List[Product]:
- """Get all products from the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM products")
- products = cursor.fetchall()
- conn.close()
- return [
- Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6],
- promo_id=product[7],
- promo_price=product[8],
- promo_day=product[9]
- ) for product in products
- ]
-
- def get_by_id(self, product_id: int) -> Optional[Product]:
- """Get product by ID"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM products WHERE id = %s", (product_id,))
- product = cursor.fetchone()
- conn.close()
- if product:
- return Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6],
- promo_id=product[7],
- promo_price=product[8],
- promo_day=product[9]
- )
- return None
-
- def get_by_type(self, product_type: str) -> List[Product]:
- """Get products by type"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM products WHERE type = %s", (product_type,))
- products = cursor.fetchall()
- conn.close()
- return [
- Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6],
- promo_id=product[7],
- promo_price=product[8],
- promo_day=product[9]
- ) for product in products
- ]
-
- def search_by_name(self, name: str) -> List[Product]:
- """Search products by name"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM products WHERE name LIKE %s", (f"%{name}%",))
- products = cursor.fetchall()
- conn.close()
- return [
- Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6],
- promo_id=product[7],
- promo_price=product[8],
- promo_day=product[9]
- ) for product in products
- ]
-
- def get_products(self, product_ids: List[int]) -> List[Product]:
- """Get multiple products by their IDs"""
- if not product_ids:
- return []
- placeholders = ', '.join('%s' for _ in product_ids)
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute(f"SELECT * FROM products WHERE id IN ({placeholders})", product_ids)
- products = cursor.fetchall()
- conn.close()
- return [
- Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6],
- promo_id=product[7],
- promo_price=product[8],
- promo_day=product[9]
- ) for product in products
- ]
- #endregion
- #region Update
- def _image_process(self, image: str, base64: str) -> str:
- """Process image for storage"""
- if not image or not base64:
- raise ValueError("Image and base64 data must be provided")
- image_path = os.path.join(IMAGE_PATH, image)
- if not os.path.exists(IMAGE_PATH):
- os.makedirs(IMAGE_PATH)
- with open(image_path, 'wb') as img_file:
- img_file.write(b64.b64decode(base64.split(',')[1]))
- return CURRENT_URL+"/images/" + image # Return url
- def update(self, product_id: int, name=None, type=None, description=None, price=None, image=None, status=None) -> bool:
- """Update product information"""
- conn = self._get_connection()
- cursor = conn.cursor()
- product = self.get_by_id(product_id)
- if not product:
- logger.error(f"Product with ID {product_id} not found.")
- return False
- updates = []
- params = []
- if name is not None:
- updates.append("name = %s")
- params.append(name)
- if type is not None:
- updates.append("type = %s")
- params.append(type)
- if description is not None:
- updates.append("description = %s")
- params.append(description)
- if price is not None:
- updates.append("price = %s")
- params.append(price)
- if image is not None:
- if product.image and product.image != image:
- if os.path.exists(os.path.join(IMAGE_PATH, product.image)):
- os.remove(os.path.join(IMAGE_PATH, product.image))
- try:
- logger.info(f"Processing image: {image[:40]}... for product {product_id}")
- logger.info(f"updates: {updates}, params: {params}")
- extension = re.search(r'data:image/(.*?);base64,', image[:40])
- if not extension:
- raise ValueError("Invalid image format")
- extension = extension.group(1)
- image = self._image_process(f"{product.id}_img.{extension}", image)
- except ValueError as e:
- logger.error(f"Failed to process image: {e}")
- return False
- updates.append("image = %s")
- params.append(image)
- if status is not None:
- updates.append("status = %s")
- params.append(status)
- if not updates:
- conn.close()
- return False
- try:
- cursor.execute(f"UPDATE products SET {', '.join(updates)} WHERE id = %s", (*params, product_id))
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"Product with ID {product_id} updated.")
- return success
- except psycopg2.Error as e:
- logger.error(f"Failed to update product: {e}")
- return False
- finally:
- conn.close()
-
- def get_active_products(self) -> List[Product]:
- """Get only active products (status = 1)"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM products WHERE status = 1")
- products = cursor.fetchall()
- conn.close()
- return [
- Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6],
- promo_id=product[7],
- promo_price=product[8],
- promo_day=product[9]
- ) for product in products
- ]
-
- def get_inactive_products(self) -> List[Product]:
- """Get only inactive products (status = 0)"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM products WHERE status = 0")
- products = cursor.fetchall()
- conn.close()
- return [
- Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6],
- promo_id=product[7],
- promo_price=product[8],
- promo_day=product[9]
- ) for product in products
- ]
-
- def activate_product(self, product_id: int) -> bool:
- """Activate a product (set status to 1)"""
- return self.update(product_id, status=1)
-
- def deactivate_product(self, product_id: int) -> bool:
- """Deactivate a product (set status to 0)"""
- return self.update(product_id, status=0)
-
- def is_product_active(self, product_id: int) -> Optional[bool]:
- """Check if a product is active"""
- product = self.get_by_id(product_id)
- if product:
- return product.status == 1
- return None
- #endregion
- #region Delete
- def delete(self, product_id: int) -> bool:
- """Delete a product from the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- product = self.get_by_id(product_id)
- if product and product.image and os.path.exists(os.path.join(IMAGE_PATH, product.image.removeprefix(CURRENT_URL + "/images/"))):
- os.remove(os.path.join(IMAGE_PATH, product.image.removeprefix(CURRENT_URL + "/images/")))
- cursor.execute("DELETE FROM products WHERE id = %s", (product_id,))
- conn.commit()
- success = cursor.rowcount > 0
- conn.close()
- if success:
- logger.info(f"Product with ID {product_id} deleted.")
- else:
- logger.error(f"Failed to delete product with ID {product_id}.")
- return success
- #endregion
- # Sales Data Service
- class SalesDataService(BaseDataService):
- """Service for managing sales"""
- #region C
- def create(self, user_id: int, fudo_id: str, total: float, table: int, product_ids: List[int], quantities: Optional[List[int]] = None) -> int:
- """Create a new sale with products and quantities"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- fecha = datetime.now().isoformat()
-
- # Insert sale
- cursor.execute(
- "INSERT INTO sales (user_id, total, fudo_id, date, table_number) VALUES (%s, %s, %s, %s, %s) RETURNING id",
- (user_id, total, fudo_id, fecha, table)
- )
- last_sale = cursor.fetchone()
- sale_id = last_sale[0] if last_sale else None
-
- if sale_id and product_ids:
- # Insert sale-product relationships with quantities
- if quantities is None:
- quantities = [1] * len(product_ids) # Default quantity 1
-
- for i, product_id in enumerate(product_ids):
- quantity = quantities[i] if i < len(quantities) else 1
- cursor.execute(
- "INSERT INTO sale_products (sale_id, product_id, quantity) VALUES (%s, %s, %s)",
- (sale_id, product_id, quantity)
- )
-
- conn.commit()
- if sale_id:
- logger.info(f"Sale created with ID: {sale_id}, fudo_id: {fudo_id}")
- return sale_id
- else:
- logger.error("Failed to create sale.")
- return -1
- except psycopg2.Error as e:
- logger.error(f"Failed to create sale: {e}")
- conn.rollback()
- return -1
- finally:
- conn.close()
-
- def add_product_to_sale(self, sale_id: int, product_id: int, quantity: int = 1) -> bool:
- """Add a product to an existing sale with quantity"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- cursor.execute(
- "INSERT INTO sale_products (sale_id, product_id, quantity) VALUES (%s, %s, %s)",
- (sale_id, product_id, quantity)
- )
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"Product {product_id} (qty: {quantity}) added to sale {sale_id}.")
- return success
- except psycopg2.IntegrityError as e:
- logger.error(f"Failed to add product to sale: {e}")
- return False
- finally:
- conn.close()
-
- def add_products_to_sale(self, sale_id: int, product_ids: List[int], quantities: Optional[List[int]] = None) -> bool:
- """Add multiple products to an existing sale with quantities"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- if quantities is None:
- quantities = [1] * len(product_ids) # Default quantity 1
-
- for i, product_id in enumerate(product_ids):
- quantity = quantities[i] if i < len(quantities) else 1
- cursor.execute(
- "INSERT INTO sale_products (sale_id, product_id, quantity) VALUES (%s, %s, %s)",
- (sale_id, product_id, quantity)
- )
-
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"Products added to sale {sale_id}.")
- return success
- except psycopg2.IntegrityError as e:
- logger.error(f"Failed to add products to sale: {e}")
- return False
- finally:
- conn.close()
- #endregion
- #region R
- def get_all(self) -> List[Sale]:
- """Get all sales from the database"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("""
- SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table, u.name, u.email
- FROM sales s
- LEFT JOIN users u ON s.user_id = u.id
- ORDER BY s.date DESC
- """)
- sales = cursor.fetchall()
- conn.close()
- return [
- Sale(
- id=sale[0],
- user_id=sale[1],
- total=sale[2],
- fudo_id=sale[3],
- date=sale[4],
- table=sale[5],
- username=sale[6],
- user_email=sale[7],
- products=self.get_sale_products(sale[0])
- ) for sale in sales
- ]
-
- def get_by_id(self, sale_id: int) -> Optional[Sale]:
- """Get sale by ID"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("""
- SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table, u.name, u.email
- FROM sales s
- LEFT JOIN users u ON s.user_id = u.id
- WHERE s.id = %s
- """, (sale_id,))
- sale = cursor.fetchone()
- conn.close()
- if sale:
- return Sale(
- id=sale[0],
- user_id=sale[1],
- total=sale[2],
- fudo_id=sale[3],
- date=sale[4],
- table=sale[5],
- username=sale[6],
- user_email=sale[7]
- )
- return None
-
- def get_by_fudo_id(self, fudo_id: str) -> Optional[Sale]:
- """Get sale by fudo_id"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("""
- SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table, u.name, u.email
- FROM sales s
- LEFT JOIN users u ON s.user_id = u.id
- WHERE s.fudo_id = %s
- """, (fudo_id,))
- sale = cursor.fetchone()
- conn.close()
- if sale:
- return Sale(
- id=sale[0],
- user_id=sale[1],
- total=sale[2],
- fudo_id=sale[3],
- date=sale[4],
- table=sale[5],
- username=sale[6],
- user_email=sale[7]
- )
- return None
-
- def get_by_user(self, user_id: int) -> List[Sale]:
- """Get sales by user ID"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute(
- """
- SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table_number, u.name, u.email
- FROM sales s
- LEFT JOIN users u ON s.user_id = u.id
- WHERE s.user_id = %s
- ORDER BY s.date DESC
- """,
- (user_id,)
- )
- sales = cursor.fetchall()
- conn.close()
- return [
- Sale(
- id=sale[0],
- user_id=sale[1],
- total=sale[2],
- fudo_id=sale[3],
- date=datetime.fromisoformat(sale[4]),
- table=sale[5],
- username=sale[6],
- user_email=sale[7],
- products=self.get_sale_products(sale[0])
- ) for sale in sales
- ]
-
- def get_sale_products_ids(self, sale_id: int) -> List[int]:
- """Get product IDs for a specific sale"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("""
- SELECT product_id FROM sale_products WHERE sale_id = %s
- """, (sale_id,))
- products = cursor.fetchall()
- conn.close()
- return [product[0] for product in products]
- def get_sale_products(self, sale_id: int) -> List[Product]:
- """Get products for a specific sale with quantities"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute("""
- SELECT p.id, p.name, p.type, p.description, p.price, p.image, p.status, sp.quantity
- FROM sale_products sp
- JOIN products p ON sp.product_id = p.id
- WHERE sp.sale_id = %s
- """, (sale_id,))
- products = cursor.fetchall()
- conn.close()
- return [
- Product(
- id=product[0],
- name=product[1],
- type=product[2],
- description=product[3],
- price=product[4],
- image=product[5],
- status=product[6],
- quantity=product[7],
- promo_id=None, # Not applicable for sale products
- promo_price=None,
- promo_day=None
- ) for product in products
- ]
-
- def get_by_table(self, table: int) -> List[Sale]:
- """Get sales by table number"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute(
- """
- SELECT s.id, s.user_id, s.total, s.fudo_id, s.date, s.table_number, u.name, u.email
- FROM sales s
- LEFT JOIN users u ON s.user_id = u.id
- WHERE s.table_number = %s
- ORDER BY s.date DESC
- """,
- (table,)
- )
- sales = cursor.fetchall()
- conn.close()
- return [
- Sale(
- id=sale[0],
- user_id=sale[1],
- total=sale[2],
- fudo_id=sale[3],
- date=sale[4],
- table=sale[5],
- username=sale[6],
- user_email=sale[7]
- ) for sale in sales
- ]
-
- def update_product_quantity(self, sale_id: int, product_id: int, new_quantity: int) -> bool:
- """Update quantity of a product in a sale"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- cursor.execute(
- "UPDATE sale_products SET quantity = %s WHERE sale_id = %s AND product_id = %s",
- (new_quantity, sale_id, product_id)
- )
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"Updated quantity to {new_quantity} for product {product_id} in sale {sale_id}.")
- return success
- except psycopg2.Error as e:
- logger.error(f"Failed to update product quantity: {e}")
- return False
- finally:
- conn.close()
-
- def get_product_quantity_in_sale(self, sale_id: int, product_id: int) -> Optional[int]:
- """Get quantity of a specific product in a sale"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute(
- "SELECT quantity FROM sale_products WHERE sale_id = %s AND product_id = %s",
- (sale_id, product_id)
- )
- result = cursor.fetchone()
- conn.close()
- return result[0] if result else None
- #endregion
- #region U
- def update(self, sale_id: int, user_id=None, total=None, table=None) -> bool:
- """Update sale information (products should be updated via specific methods)"""
- conn = self._get_connection()
- cursor = conn.cursor()
- updates = []
- params = []
- if user_id is not None:
- updates.append("user_id = %s")
- params.append(user_id)
- if total is not None:
- updates.append("total = %s")
- params.append(total)
- if table is not None:
- updates.append("table = %s")
- params.append(table)
- if not updates:
- conn.close()
- return False
- try:
- cursor.execute(f"UPDATE sales SET {', '.join(updates)} WHERE id = %s", (*params, sale_id))
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"Sale with ID {sale_id} updated.")
- return success
- except psycopg2.Error as e:
- logger.error(f"Failed to update sale: {e}")
- return False
- finally:
- conn.close()
- #endregion
- #region D
- def delete(self, sale_id: int) -> bool:
- """Delete a sale and its product relationships"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- # Delete sale-product relationships first
- cursor.execute("DELETE FROM sale_products WHERE sale_id = %s", (sale_id,))
- # Delete the sale
- cursor.execute("DELETE FROM sales WHERE id = %s", (sale_id,))
- conn.commit()
- success = cursor.rowcount > 0
- if success:
- logger.info(f"Sale with ID {sale_id} deleted.")
- else:
- logger.error(f"Failed to delete sale with ID {sale_id}.")
- return success
- except psycopg2.Error as e:
- logger.error(f"Failed to delete sale: {e}")
- return False
- finally:
- conn.close()
-
- def remove_product_from_sale(self, sale_id: int, product_id: int) -> bool:
- """Remove a product from a sale (removes all quantity)"""
- conn = self._get_connection()
- cursor = conn.cursor()
- cursor.execute(
- "DELETE FROM sale_products WHERE sale_id = %s AND product_id = %s",
- (sale_id, product_id)
- )
- conn.commit()
- success = cursor.rowcount > 0
- conn.close()
- if success:
- logger.info(f"Product {product_id} removed from sale {sale_id}.")
- else:
- logger.error(f"Failed to remove product {product_id} from sale {sale_id}.")
- return success
-
- def decrease_product_quantity(self, sale_id: int, product_id: int, decrease_by: int = 1) -> bool:
- """Decrease quantity of a product in a sale, removes if quantity becomes 0 or less"""
- conn = self._get_connection()
- cursor = conn.cursor()
- try:
- # Get current quantity
- cursor.execute(
- "SELECT quantity FROM sale_products WHERE sale_id = %s AND product_id = %s",
- (sale_id, product_id)
- )
- result = cursor.fetchone()
- if not result:
- return False
-
- current_quantity = result[0]
- new_quantity = current_quantity - decrease_by
-
- if new_quantity <= 0:
- # Remove the product completely
- cursor.execute(
- "DELETE FROM sale_products WHERE sale_id = %s AND product_id = %s",
- (sale_id, product_id)
- )
- logger.info(f"Product {product_id} removed from sale {sale_id} (quantity reached 0).")
- else:
- # Update with new quantity
- cursor.execute(
- "UPDATE sale_products SET quantity = %s WHERE sale_id = %s AND product_id = %s",
- (new_quantity, sale_id, product_id)
- )
- logger.info(f"Product {product_id} quantity decreased to {new_quantity} in sale {sale_id}.")
-
- conn.commit()
- return True
- except psycopg2.Error as e:
- logger.error(f"Failed to decrease product quantity: {e}")
- return False
- finally:
- conn.close()
- #endregion
- # Factory class to get service instances
- class DataServiceFactory:
- """Factory class to create data service instances"""
-
- @staticmethod
- def get_user_service() -> UserDataService:
- """Get user data service instance"""
- return UserDataService()
-
- @staticmethod
- def get_blacklist_service() -> BlacklistDataService:
- """Get blacklist data service instance"""
- return BlacklistDataService()
-
- @staticmethod
- def get_product_service() -> ProductDataService:
- """Get product data service instance"""
- return ProductDataService()
-
- @staticmethod
- def get_sales_service() -> SalesDataService:
- """Get sales data service instance"""
- return SalesDataService()
- # Helper functions for background data
- def load_bg_data() -> List[Dict[str, str]]:
- """Load background data for AI assistant"""
- try:
- with open(BG_DATA_PATH, 'r', encoding='utf-8') as f:
- return json.load(f)
- except FileNotFoundError:
- logger.error(f"Data file not found at {BG_DATA_PATH}. Serving with empty data.")
- return []
- except json.JSONDecodeError:
- logger.error(f"Could not decode JSON from {BG_DATA_PATH}. Serving with empty data.")
- return []
-
- def initialize_db():
- # En PostgreSQL, no se crea el archivo de base de datos, así que verificamos si las tablas existen.
- try:
- conn = psycopg2.connect(
- dbname=POSTGRESQL_DB_CONFIG['dbname'],
- user=POSTGRESQL_DB_CONFIG['user'],
- password=POSTGRESQL_DB_CONFIG['password'],
- host=POSTGRESQL_DB_CONFIG['host'],
- port=POSTGRESQL_DB_CONFIG['port']
- )
- cursor = conn.cursor()
- # Verificar si la tabla 'users' ya existe
- cursor.execute("""
- SELECT EXISTS (
- SELECT FROM information_schema.tables
- WHERE table_name = 'users'
- );
- """)
- exists = cursor.fetchone()
- if exists:
- logger.info("La base de datos ya está inicializada, no se necesita inicializar.")
- conn.close()
- return
- conn.close()
- except Exception as e:
- logger.error(f"Error comprobando la existencia de tablas: {e}")
- # Si hay error, continuamos con la inicialización
- conn = psycopg2.connect(
- dbname=POSTGRESQL_DB_CONFIG['dbname'],
- user=POSTGRESQL_DB_CONFIG['user'],
- password=POSTGRESQL_DB_CONFIG['password'],
- host=POSTGRESQL_DB_CONFIG['host'],
- port=POSTGRESQL_DB_CONFIG['port']
- )
- cursor = conn.cursor()
- # Crear tabla de usuarios
- logger.info("Inicializando base de datos...")
- logger.info("Creando tabla de usuarios...")
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS users (
- id SERIAL PRIMARY KEY,
- email VARCHAR(255) UNIQUE NOT NULL,
- name VARCHAR(255) NOT NULL,
- rut VARCHAR(20) UNIQUE NOT NULL,
- pin_hash TEXT NOT NULL,
- kleincoins TEXT NOT NULL,
- created_at TEXT NOT NULL,
- permissions INTEGER DEFAULT 0 NOT NULL CHECK (permissions IN (0, 1, 2)) -- 0: Usuario normal, 1: Administrador, 2: Superusuario
-
- reward_progress INTEGER DEFAULT 0 NOT NULL CHECK (reward_progress >= 0 AND reward_progress <= 100) -- Progreso de recompensas
- );
- """)
- # Crear tabla de productos
- logger.info("Creando tabla de products...")
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS products (
- id INTEGER PRIMARY KEY,
- name VARCHAR(255) NOT NULL,
- type VARCHAR(100) NOT NULL,
- description TEXT NOT NULL,
- price INTEGER NOT NULL,
- image TEXT,
- status INTEGER DEFAULT 1 NOT NULL CHECK (status IN (0, 1)) -- 0: Inactivo, 1: Activo,
- promo_id INTEGER,
- promo_price INTEGER,
- promo_day INTEGER CHECK (promo_day >= 1 AND promo_day <= 7)
- );
- """)
- # Crear tabla de ventas
- logger.info("Creando tabla de sales...")
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS sales (
- id SERIAL PRIMARY KEY,
- user_id INTEGER NOT NULL,
- total INTEGER NOT NULL,
- fudo_id TEXT UNIQUE NOT NULL,
- date TEXT NOT NULL,
- table_number INTEGER NOT NULL,
- FOREIGN KEY (user_id) REFERENCES users(id)
- );
- """)
- # Crear tabla intermedia para ventas y productos
- logger.info("Creando tabla intermedia de sale_products...")
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS sale_products (
- sale_id INTEGER NOT NULL,
- product_id INTEGER NOT NULL,
- quantity INTEGER NOT NULL DEFAULT 1,
- FOREIGN KEY (sale_id) REFERENCES sales(id),
- FOREIGN KEY (product_id) REFERENCES products(id)
- );
- """)
- # Crear tabla de blacklist
- logger.info("Creando tabla de blacklist...")
- cursor.execute("""
- CREATE TABLE IF NOT EXISTS blacklist (
- id SERIAL PRIMARY KEY,
- user_id INTEGER NOT NULL,
- FOREIGN KEY (user_id) REFERENCES users(id)
- );
- """)
- logger.info("Todas las tablas creadas correctamente.")
- logger.info("Cargando datos de productos desde el archivo JSON...")
- products_json = json.loads(open(PRODUCT_DATA_PATH, 'r', encoding='utf-8').read())
- product_service = ProductDataService()
- product_service.load_data(products_json)
- conn.commit()
-
- conn.close()
- logger.info("Base de datos inicializada correctamente.")
- data_bg_loaded = load_bg_data()
|