barker/barker/barker/routers/product.py

506 lines
18 KiB
Python

import uuid
from datetime import date, timedelta
import barker.schemas.product as schemas
from fastapi import APIRouter, Depends, HTTPException, Security, status
from sqlalchemy import and_, insert, or_, select, update
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session, joinedload
from sqlalchemy.sql.functions import count, func
from ..core.config import settings
from ..core.security import get_current_active_user as get_user
from ..db.session import SessionFuture
from ..models.inventory import Inventory
from ..models.kot import Kot
from ..models.menu_category import MenuCategory
from ..models.modifier_category import ModifierCategory
from ..models.modifier_category_product import ModifierCategoryProduct
from ..models.product import Product
from ..models.product_version import ProductVersion
from ..models.sale_category import SaleCategory
from ..models.voucher import Voucher
from ..schemas.user_token import UserToken
from . import effective_date
router = APIRouter()
@router.post("/list", response_model=list[schemas.Product])
def sort_order(
data: list[schemas.Product],
date_: date = Depends(effective_date),
user: UserToken = Security(get_user, scopes=["products"]),
):
try:
with SessionFuture() as db:
indexes: dict[uuid.UUID, int] = {}
for item in data:
if item.menu_category.id_ in indexes:
indexes[item.menu_category.id_] += 1
else:
indexes[item.menu_category.id_] = 0
db.execute(
update(ProductVersion)
.where(
and_(
ProductVersion.product_id == item.id_,
or_(
ProductVersion.valid_from == None, # noqa: E711
ProductVersion.valid_from <= date_,
),
or_(
ProductVersion.valid_till == None, # noqa: E711
ProductVersion.valid_till >= date_,
),
)
)
.values(sort_order=indexes[item.menu_category.id_])
)
db.commit()
return product_list(date_, db)
except SQLAlchemyError as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
@router.post("", response_model=None)
def save(
data: schemas.ProductIn,
date_: date = Depends(effective_date),
user: UserToken = Security(get_user, scopes=["products"]),
) -> None:
try:
with SessionFuture() as db:
item = Product()
db.add(item)
db.flush()
product_version = ProductVersion(
product_id=item.id,
name=data.name,
units=data.units,
menu_category_id=data.menu_category.id_,
sale_category_id=data.sale_category.id_,
price=data.price,
has_happy_hour=data.has_happy_hour,
is_not_available=data.is_not_available,
quantity=data.quantity,
valid_from=date_,
valid_till=None,
)
db.add(product_version)
add_modifiers(item.id, product_version.menu_category_id, date_, db)
db.commit()
return
except SQLAlchemyError as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
def add_modifiers(product_id: uuid.UUID, menu_category_id: uuid.UUID, date_: date, db: Session):
products_in_category = db.execute(
select(count(ProductVersion.id)).where(
and_(
ProductVersion.menu_category_id == menu_category_id,
ProductVersion.product_id != product_id,
or_(
ProductVersion.valid_from == None, # noqa: E711
ProductVersion.valid_from <= date_,
),
or_(
ProductVersion.valid_till == None, # noqa: E711
ProductVersion.valid_till >= date_,
),
)
)
).scalar()
categories = db.execute(
select(count(ProductVersion.id), ModifierCategory.id)
.join(Product.versions)
.join(ModifierCategoryProduct, Product.id == ModifierCategoryProduct.product_id)
.join(ModifierCategory, ModifierCategory.id == ModifierCategoryProduct.modifier_category_id)
.where(
and_(
ProductVersion.menu_category_id == menu_category_id,
or_(
ProductVersion.valid_from == None, # noqa: E711
ProductVersion.valid_from <= date_,
),
or_(
ProductVersion.valid_till == None, # noqa: E711
ProductVersion.valid_till >= date_,
),
)
)
.group_by(ModifierCategory.id)
).all()
for c, mc in categories:
if c == products_in_category:
db.execute(insert(ModifierCategoryProduct).values(product_id=product_id, modifier_category_id=mc))
@router.put("/{id_}", response_model=None)
def update_route(
id_: uuid.UUID,
data: schemas.ProductIn,
date_: date = Depends(effective_date),
user: UserToken = Security(get_user, scopes=["products"]),
) -> None:
try:
with SessionFuture() as db:
item: ProductVersion = db.execute(
select(ProductVersion)
.join(ProductVersion.menu_category)
.where(
and_(
ProductVersion.product_id == id_,
or_(
ProductVersion.valid_from == None, # noqa: E711
ProductVersion.valid_from <= date_,
),
or_(
ProductVersion.valid_till == None, # noqa: E711
ProductVersion.valid_till >= date_,
),
)
)
).scalar_one()
if (
item.name == data.name
and item.units == data.units
and item.menu_category_id == data.menu_category.id_
and item.sale_category_id == data.sale_category.id_
and item.price == data.price
and item.has_happy_hour == data.has_happy_hour
and item.is_not_available == data.is_not_available
):
if item.quantity != data.quantity or item.sort_order != data.sort_order:
item.quantity = data.quantity
item.sort_order = data.sort_order
db.commit()
# The product is identical. No need for a new one
return None
if item.valid_till is not None:
# Allow adding a product here splitting the valid from and to, but not implemented right now
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Product has been invalidated",
)
if item.valid_from == date_: # Update the product as valid from the the same
item.name = data.name
item.units = data.units
item.menu_category_id = data.menu_category.id_
item.sale_category_id = data.sale_category.id_
item.price = data.price
item.has_happy_hour = data.has_happy_hour
item.is_not_available = data.is_not_available
item.quantity = data.quantity
item.sort_order = data.sort_order
db.commit()
return None
else: # Create a new version of the product from the new details
item.valid_till = date_ - timedelta(days=1)
product_version = ProductVersion(
product_id=item.product_id,
name=data.name,
units=data.units,
menu_category_id=data.menu_category.id_,
sale_category_id=data.sale_category.id_,
price=data.price,
has_happy_hour=data.has_happy_hour,
is_not_available=data.is_not_available,
quantity=data.quantity,
valid_from=date_,
valid_till=None,
sort_order=item.sort_order,
)
db.add(product_version)
db.commit()
return None
except SQLAlchemyError as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
@router.delete("/{id_}", response_model=None)
def delete_route(
id_: uuid.UUID,
date_: date = Depends(effective_date),
user: UserToken = Security(get_user, scopes=["products"]),
) -> None:
with SessionFuture() as db:
item: ProductVersion = db.execute(
select(ProductVersion).where(
and_(
ProductVersion.product_id == id_,
or_(
ProductVersion.valid_from == None, # noqa: E711
ProductVersion.valid_from <= date_,
),
or_(
ProductVersion.valid_till == None, # noqa: E711
ProductVersion.valid_till >= date_,
),
)
)
).scalar_one()
day = func.date_trunc(
"day", Voucher.date + timedelta(minutes=settings.NEW_DAY_OFFSET_MINUTES - settings.TIMEZONE_OFFSET_MINUTES)
).label("day")
billed = db.execute(
select(count(Inventory.id))
.join(Inventory.kot)
.join(Kot.voucher)
.where(Inventory.product_id == id_, day >= date_)
).scalar_one()
if billed > 0:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="The cannot delete this product as it was billed",
)
if item.valid_from == date_:
db.delete(item)
else:
item.valid_till = date_ - timedelta(days=1)
db.commit()
@router.get("", response_model=schemas.ProductBlank)
def show_blank(
user: UserToken = Security(get_user, scopes=["products"]),
) -> schemas.ProductBlank:
return schemas.ProductBlank(
name="",
units="",
price=0,
hasHappyHour=False,
isNotAvailable=False,
isActive=True,
sortOrder=0,
quantity=0,
)
@router.get("/list", response_model=list[schemas.Product])
def show_list(date_: date = Depends(effective_date), user: UserToken = Depends(get_user)) -> list[schemas.Product]:
with SessionFuture() as db:
return product_list(date_, db)
def product_list(date_: date, db: Session) -> list[schemas.Product]:
return [
product_info(item)
for item in db.execute(
select(ProductVersion)
.join(ProductVersion.menu_category)
.join(ProductVersion.sale_category)
.where(
and_(
or_(
ProductVersion.valid_from == None, # noqa: E711
ProductVersion.valid_from <= date_,
),
or_(
ProductVersion.valid_till == None, # noqa: E711
ProductVersion.valid_till >= date_,
),
)
)
.order_by(MenuCategory.sort_order)
.order_by(MenuCategory.name)
.order_by(ProductVersion.sort_order)
.order_by(ProductVersion.name)
.options(
joinedload(ProductVersion.menu_category, innerjoin=True),
joinedload(ProductVersion.sale_category, innerjoin=True),
)
)
.scalars()
.all()
]
@router.get("/query")
def show_term(
mc: uuid.UUID | None = None,
sc: uuid.UUID | None = None,
date_: date = Depends(effective_date),
current_user: UserToken = Depends(get_user),
):
list_ = []
query = select(ProductVersion)
if sc is not None:
query = query.join(ProductVersion.menu_category)
if mc is not None:
query = query.join(ProductVersion.sale_category).join(SaleCategory.tax)
query = query.where(
and_(
or_(
ProductVersion.valid_from == None, # noqa: E711
ProductVersion.valid_from <= date_,
),
or_(
ProductVersion.valid_till == None, # noqa: E711
ProductVersion.valid_till >= date_,
),
)
)
if mc is not None:
query = query.where(ProductVersion.menu_category_id == mc).order_by(
ProductVersion.sort_order, ProductVersion.name
)
if sc is not None:
query = query.where(ProductVersion.sale_category_id == sc).order_by(
MenuCategory.sort_order, ProductVersion.sort_order, ProductVersion.name
)
if mc is not None:
query = query.options(
joinedload(ProductVersion.sale_category, innerjoin=True),
joinedload(ProductVersion.sale_category, SaleCategory.tax, innerjoin=True),
)
if sc is not None:
query = query.options(
joinedload(ProductVersion.menu_category, innerjoin=True),
)
with SessionFuture() as db:
for item in db.execute(query).scalars().all():
if sc is not None:
list_.append(
{
"id": item.product_id,
"name": item.full_name,
"menuCategory": {
"id": item.menu_category_id,
"name": item.menu_category.name,
},
"price": item.price,
}
)
if mc is not None:
list_.append(query_product_info(item, False))
if item.has_happy_hour:
list_.append(query_product_info(item, True))
return list_
def product_list_of_sale_category(date_: date, db: Session) -> list[schemas.Product]:
return [
product_info(item)
for item in db.execute(
select(ProductVersion)
.join(ProductVersion.menu_category)
.join(ProductVersion.sale_category)
.where(
and_(
or_(
ProductVersion.valid_from == None, # noqa: E711
ProductVersion.valid_from <= date_,
),
or_(
ProductVersion.valid_till == None, # noqa: E711
ProductVersion.valid_till >= date_,
),
)
)
.order_by(MenuCategory.sort_order)
.order_by(MenuCategory.name)
.order_by(ProductVersion.sort_order)
.order_by(ProductVersion.name)
.options(
joinedload(ProductVersion.menu_category, innerjoin=True),
joinedload(ProductVersion.sale_category, innerjoin=True),
)
)
.scalars()
.all()
]
@router.get("/{id_}", response_model=schemas.Product)
def show_id(
id_: uuid.UUID,
date_: date = Depends(effective_date),
user: UserToken = Security(get_user, scopes=["products"]),
) -> schemas.Product:
with SessionFuture() as db:
item: ProductVersion = db.execute(
select(ProductVersion)
.join(ProductVersion.sale_category)
.join(SaleCategory.tax)
.where(
and_(
ProductVersion.product_id == id_,
or_(
ProductVersion.valid_from == None, # noqa: E711
ProductVersion.valid_from <= date_,
),
or_(
ProductVersion.valid_till == None, # noqa: E711
ProductVersion.valid_till >= date_,
),
)
)
.order_by(ProductVersion.sort_order, ProductVersion.name)
.options(
joinedload(ProductVersion.sale_category, innerjoin=True),
joinedload(ProductVersion.sale_category, SaleCategory.tax, innerjoin=True),
)
).scalar_one()
return product_info(item)
def query_product_info(item: ProductVersion, happy_hour: bool):
return {
"id": item.product_id,
"name": ("H H " if happy_hour else "") + item.full_name,
"saleCategory": {
"id": item.sale_category_id,
"name": item.sale_category.name,
},
"tax": {
"id": item.sale_category.tax_id,
"name": item.sale_category.tax.name,
"rate": item.sale_category.tax.rate,
},
"price": item.price,
"hasHappyHour": happy_hour,
"isNotAvailable": item.is_not_available,
"sortOrder": item.sort_order,
}
def product_info(item: ProductVersion) -> schemas.Product:
return schemas.Product(
id=item.product_id,
versionId=item.id,
name=item.name,
units=item.units,
menuCategory=schemas.MenuCategoryLink(id=item.menu_category_id, name=item.menu_category.name, products=[]),
saleCategory=schemas.SaleCategoryLink(
id=item.sale_category_id,
name=item.sale_category.name,
),
price=item.price,
hasHappyHour=item.has_happy_hour,
isNotAvailable=item.is_not_available,
quantity=item.quantity,
isActive=True,
sortOrder=item.sort_order,
validFrom=item.valid_from,
validTill=item.valid_till,
)