brewman/brewman/brewman/routers/product.py

379 lines
14 KiB
Python

import uuid
from datetime import datetime
from decimal import Decimal
import brewman.schemas.product as schemas
from fastapi import APIRouter, Depends, HTTPException, Security, status
from sqlalchemy import delete, desc, func, or_, select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session, contains_eager
from ..core.security import get_current_active_user as get_user
from ..db.session import SessionFuture
from ..models.account import Account
from ..models.batch import Batch
from ..models.product import Product
from ..models.rate_contract import RateContract
from ..models.rate_contract_item import RateContractItem
from ..models.stock_keeping_unit import StockKeepingUnit
from ..schemas.product_sku import ProductSku
from ..schemas.user import UserToken
router = APIRouter()
@router.post("", response_model=schemas.Product)
def save(
data: schemas.ProductIn,
user: UserToken = Security(get_user, scopes=["products"]),
) -> schemas.Product:
try:
with SessionFuture() as db:
item = Product(
name=data.name,
fraction_units=data.fraction_units,
product_group_id=data.product_group.id_,
account_id=Account.all_purchases(),
is_active=data.is_active,
is_purchased=data.is_purchased,
is_sold=data.is_sold,
protein=data.protein,
carbohydrate=data.carbohydrate,
total_sugar=data.total_sugar,
added_sugar=data.added_sugar,
total_fat=data.total_fat,
saturated_fat=data.saturated_fat,
trans_fat=data.trans_fat,
cholestrol=data.cholestrol,
sodium=data.sodium,
msnf=data.msnf,
other_solids=data.other_solids,
total_solids=data.total_solids,
water=data.water,
)
item.code = db.execute(select(func.coalesce(func.max(Product.code), 0) + 1)).scalar_one()
db.add(item)
if not len(data.skus):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Not enough stock keeping units.",
)
for sku in data.skus:
db.add(
StockKeepingUnit(
units=sku.units,
fraction=round(sku.fraction, 5),
product_yield=round(sku.product_yield, 5),
cost_price=round(sku.cost_price, 2),
sale_price=round(sku.sale_price, 2),
product=item,
)
)
db.commit()
return product_info(item)
except SQLAlchemyError as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
@router.put("/{id_}", response_model=schemas.Product)
def update_route(
id_: uuid.UUID,
data: schemas.ProductIn,
user: UserToken = Security(get_user, scopes=["products"]),
) -> schemas.Product:
try:
with SessionFuture() as db:
item: Product = db.execute(select(Product).where(Product.id == id_)).scalar_one()
if item.is_fixture:
raise HTTPException(
status_code=status.HTTP_423_LOCKED,
detail=f"{item.name} is a fixture and cannot be edited or deleted.",
)
item.name = data.name
item.fraction_units = data.fraction_units
item.product_group_id = data.product_group.id_
item.account_id = Account.all_purchases()
item.is_active = data.is_active
item.is_purchased = data.is_purchased
item.is_sold = data.is_sold
item.protein = data.protein
item.carbohydrate = data.carbohydrate
item.total_sugar = data.total_sugar
item.added_sugar = data.added_sugar
item.total_fat = data.total_fat
item.saturated_fat = data.saturated_fat
item.trans_fat = data.trans_fat
item.cholestrol = data.cholestrol
item.sodium = data.sodium
item.msnf = data.msnf
item.other_solids = data.other_solids
item.total_solids = data.total_solids
item.water = data.water
if not len(data.skus):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Not enough stock keeping units.",
)
for i in range(len(item.skus), 0, -1):
sku = item.skus[i - 1]
index = next((idx for (idx, d) in enumerate(data.skus) if d.id_ == sku.id), None)
if index is not None:
new_data_sku = data.skus.pop(index)
sku.units = new_data_sku.units
sku.fraction = round(new_data_sku.fraction, 5)
sku.product_yield = round(new_data_sku.product_yield, 5)
sku.cost_price = round(new_data_sku.cost_price, 2)
sku.sale_price = round(new_data_sku.sale_price, 2)
else:
count: int = db.execute(select(func.count()).where(Batch.sku_id == sku.id)).scalar_one()
if count > 0:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"SKU '{sku.units}' has entries and cannot be deleted.",
)
item.skus.remove(sku)
db.delete(sku)
for data_sku in data.skus:
new_sku = StockKeepingUnit(
units=data_sku.units,
fraction=round(data_sku.fraction, 5),
product_yield=round(data_sku.product_yield, 5),
cost_price=round(data_sku.cost_price, 2),
sale_price=round(data_sku.sale_price, 2),
product=item,
)
db.add(new_sku)
item.skus.append(new_sku)
db.commit()
return product_info(item)
except SQLAlchemyError as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
@router.delete("/{id_}", response_model=schemas.ProductBlank)
def delete_route(
id_: uuid.UUID,
user: UserToken = Security(get_user, scopes=["products"]),
) -> schemas.ProductBlank:
with SessionFuture() as db:
item: Product = db.execute(select(Product).where(Product.id == id_)).scalar_one()
if item.is_fixture:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"{item.name} is a fixture and cannot be edited or deleted.",
)
count: int = db.execute(
select(func.count()).join(StockKeepingUnit.batches).where(StockKeepingUnit.product_id == id_)
).scalar_one()
if count > 0:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Product has entries and cannot be deleted."
)
db.execute(delete(StockKeepingUnit).where(StockKeepingUnit.product_id == id_))
db.execute(delete(Product).where(Product.id == id_))
db.commit()
return product_blank()
@router.get("", response_model=schemas.ProductBlank)
def show_blank(
user: UserToken = Security(get_user, scopes=["products"]),
) -> schemas.ProductBlank:
return product_blank()
@router.get("/list", response_model=list[schemas.Product])
def show_list(user: UserToken = Depends(get_user)) -> list[schemas.Product]:
with SessionFuture() as db:
return [
product_info(item)
for item in db.execute(
select(Product)
.join(Product.product_group)
.join(Product.skus)
.order_by(desc(Product.is_active))
.order_by(Product.product_group_id)
.order_by(Product.name)
.options(
contains_eager(Product.skus),
contains_eager(Product.product_group),
)
)
.unique()
.scalars()
.all()
]
@router.get("/q-sku", response_model=list[ProductSku])
async def show_term_sku(
q: str | None = None, # Query
a: bool | None = None, # Active
p: bool | None = None, # Is Purchased?
v: uuid.UUID | None = None, # Vendor
d: str | None = None, # Date
current_user: UserToken = Depends(get_user),
) -> list[ProductSku]:
list_ = []
with SessionFuture() as db:
query_ = select(Product).join(Product.skus).options(contains_eager(Product.skus))
if a is not None:
query_ = query_.filter(Product.is_active == a)
if p is not None:
query_ = query_.filter(Product.is_purchased == p)
if q is not None:
for sub in q.split():
if sub.strip() != "":
query_ = query_.filter(
or_(Product.name.ilike(f"%{sub}%"), StockKeepingUnit.units.ilike(f"%{sub}%"))
)
query_ = query_.order_by(Product.name)
for item in db.execute(query_).unique().scalars().all():
for sku in item.skus: # type: StockKeepingUnit
rc_price = get_rc_price(item.id, d, v, db)
list_.append(
ProductSku(
id_=sku.id,
name=f"{item.name} ({sku.units})",
fraction_units=item.fraction_units,
cost_price=sku.cost_price if rc_price is None else rc_price,
sale_price=sku.sale_price,
is_rate_contracted=False if rc_price is None else True,
)
)
return list_
@router.get("/q-product", response_model=list[ProductSku])
async def show_term_product(
q: str | None = None, # Query
a: bool | None = None, # Active
p: bool | None = None, # Is Purchased?
current_user: UserToken = Depends(get_user),
) -> list[ProductSku]:
list_ = []
with SessionFuture() as db:
query_ = select(Product)
if a is not None:
query_ = query_.filter(Product.is_active == a)
if p is not None:
query_ = query_.filter(Product.is_purchased == p)
if q is not None:
for sub in q.split():
if sub.strip() != "":
query_ = query_.filter(Product.name.ilike(f"%{sub}%"))
query_ = query_.order_by(Product.name)
for item in db.execute(query_).unique().scalars().all():
list_.append(
ProductSku(
id_=item.id,
name=item.name,
fraction_units=item.fraction_units,
cost_price=Decimal(0),
sale_price=Decimal(0),
is_rate_contracted=False,
)
)
return list_
@router.get("/{id_}", response_model=schemas.Product)
def show_id(
id_: uuid.UUID,
user: UserToken = Security(get_user, scopes=["products"]),
) -> schemas.Product:
with SessionFuture() as db:
item: Product = db.execute(select(Product).where(Product.id == id_)).scalar_one()
return product_info(item)
def product_info(product: Product) -> schemas.Product:
return schemas.Product(
id_=product.id,
code=product.code,
name=product.name,
fraction_units=product.fraction_units,
skus=[
schemas.StockKeepingUnit(
id_=sku.id,
units=sku.units,
fraction=sku.fraction,
product_yield=sku.product_yield,
cost_price=sku.cost_price,
sale_price=sku.sale_price,
)
for sku in product.skus
],
is_active=product.is_active,
is_fixture=product.is_fixture,
is_purchased=product.is_purchased,
is_sold=product.is_sold,
product_group=schemas.ProductGroupLink(id_=product.product_group.id, name=product.product_group.name),
protein=product.protein,
carbohydrate=product.carbohydrate,
total_sugar=product.total_sugar,
added_sugar=product.added_sugar,
total_fat=product.total_fat,
saturated_fat=product.saturated_fat,
trans_fat=product.trans_fat,
cholestrol=product.cholestrol,
sodium=product.sodium,
msnf=product.msnf,
other_solids=product.other_solids,
total_solids=product.total_solids,
water=product.water,
)
def product_blank() -> schemas.ProductBlank:
return schemas.ProductBlank(
name="",
fraction_units="",
skus=[],
is_active=True,
is_purchased=True,
is_sold=False,
is_fixture=False,
protein=0,
carbohydrate=0,
total_sugar=0,
added_sugar=0,
total_fat=0,
saturated_fat=0,
trans_fat=0,
cholestrol=0,
sodium=0,
msnf=0,
other_solids=0,
total_solids=0,
water=0,
)
def get_rc_price(id_: uuid.UUID, d: str | None, v: uuid.UUID | None, db: Session) -> Decimal | None:
if d is None or v is None:
return None
date_ = datetime.strptime(d, "%d-%b-%Y")
contracts = select(RateContract.id).where(
RateContract.vendor_id == v, RateContract.valid_from <= date_, RateContract.valid_till >= date_
)
rc_price = db.execute(
select(RateContractItem.price).where(
RateContractItem.sku_id == id_, RateContractItem.rate_contract_id.in_(contracts)
)
).scalar_one_or_none()
return rc_price