|
|
@@ -1,15 +1,16 @@
|
|
|
import json
|
|
|
from math import log
|
|
|
import os
|
|
|
+import re
|
|
|
import sqlite3
|
|
|
from typing import List, Dict, Optional, Any
|
|
|
from abc import ABC, abstractmethod
|
|
|
-from config.settings import BG_DATA_PATH, DB_PATH, PRODUCT_DATA_PATH
|
|
|
+from config.settings import BG_DATA_PATH, DB_PATH, IMAGE_PATH, PRODUCT_DATA_PATH, CURRENT_URL
|
|
|
from logging import getLogger
|
|
|
from datetime import datetime
|
|
|
from cryptography.fernet import Fernet
|
|
|
from config.settings import PIN_KEY
|
|
|
-
|
|
|
+import base64 as b64
|
|
|
# Import models
|
|
|
from models.user import User
|
|
|
from models.items import Product
|
|
|
@@ -457,6 +458,12 @@ class ProductDataService(BaseDataService):
|
|
|
"""Add a new product to the database"""
|
|
|
conn = self._get_connection()
|
|
|
cursor = conn.cursor()
|
|
|
+ if image and "base64" in image:
|
|
|
+ extension = re.search(r'data:image/(.*?);base64,', image)
|
|
|
+ image_name = f"{id}_img"
|
|
|
+ if extension:
|
|
|
+ image_name += f".{extension.group(1)}"
|
|
|
+ image = self._image_process(image_name, image)
|
|
|
try:
|
|
|
cursor.execute(
|
|
|
"INSERT INTO products (id, name, type, description, price, image, status) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
|
@@ -514,7 +521,7 @@ class ProductDataService(BaseDataService):
|
|
|
) for product in products
|
|
|
]
|
|
|
|
|
|
- def get_by_id(self, product_id: int) -> Optional[Product]:
|
|
|
+ def get_by_id(self, product_id: int) -> Optional[Product]:
|
|
|
"""Get product by ID"""
|
|
|
conn = self._get_connection()
|
|
|
cursor = conn.cursor()
|
|
|
@@ -594,10 +601,25 @@ class ProductDataService(BaseDataService):
|
|
|
]
|
|
|
#endregion
|
|
|
#region Update
|
|
|
+ def _image_process(self, image: str, base64: str) -> str:
|
|
|
+ """Process image for storage"""
|
|
|
+ if not image or not base64:
|
|
|
+ raise ValueError("Image and base64 data must be provided")
|
|
|
+ image_path = os.path.join(IMAGE_PATH, image)
|
|
|
+ if not os.path.exists(IMAGE_PATH):
|
|
|
+ os.makedirs(IMAGE_PATH)
|
|
|
+ with open(image_path, 'wb') as img_file:
|
|
|
+ img_file.write(b64.b64decode(base64.split(',')[1]))
|
|
|
+ return CURRENT_URL+"/images/" + image # Return url
|
|
|
+
|
|
|
def update(self, product_id: int, name=None, type=None, description=None, price=None, image=None, status=None) -> bool:
|
|
|
"""Update product information"""
|
|
|
conn = self._get_connection()
|
|
|
cursor = conn.cursor()
|
|
|
+ product = self.get_by_id(product_id)
|
|
|
+ if not product:
|
|
|
+ logger.error(f"Product with ID {product_id} not found.")
|
|
|
+ return False
|
|
|
updates = []
|
|
|
params = []
|
|
|
if name is not None:
|
|
|
@@ -613,6 +635,18 @@ class ProductDataService(BaseDataService):
|
|
|
updates.append("price = ?")
|
|
|
params.append(price)
|
|
|
if image is not None:
|
|
|
+ if product.image and product.image != image:
|
|
|
+ if os.path.exists(os.path.join(IMAGE_PATH, product.image)):
|
|
|
+ os.remove(os.path.join(IMAGE_PATH, product.image))
|
|
|
+ try:
|
|
|
+ extension = re.search(r'data:image/(.*?);base64,', image)
|
|
|
+ if not extension:
|
|
|
+ raise ValueError("Invalid image format")
|
|
|
+ extension = extension.group(1)
|
|
|
+ image = self._image_process(f"{product.id}_img.{extension}", image)
|
|
|
+ except ValueError as e:
|
|
|
+ logger.error(f"Failed to process image: {e}")
|
|
|
+ return False
|
|
|
updates.append("image = ?")
|
|
|
params.append(image)
|
|
|
if status is not None:
|
|
|
@@ -692,6 +726,9 @@ class ProductDataService(BaseDataService):
|
|
|
"""Delete a product from the database"""
|
|
|
conn = self._get_connection()
|
|
|
cursor = conn.cursor()
|
|
|
+ product = self.get_by_id(product_id)
|
|
|
+ if product and product.image:
|
|
|
+ os.remove(os.path.join(IMAGE_PATH, product.image.removeprefix(CURRENT_URL + "/images/")))
|
|
|
cursor.execute("DELETE FROM products WHERE id = ?", (product_id,))
|
|
|
conn.commit()
|
|
|
success = cursor.rowcount > 0
|