luthor/luthor/luthor/routers/product.py

392 lines
13 KiB
Python

# import uuid
#
# from datetime import date, timedelta
# from typing import List
#
# import barker.schemas.product as schemas
#
# from fastapi import APIRouter, Depends, HTTPException, Security, status
# from sqlalchemy import and_, or_
# from sqlalchemy.exc import SQLAlchemyError
# from sqlalchemy.orm import Session, contains_eager, joinedload
#
# from ..core.security import get_current_active_user as get_user
# from ..db.session import SessionLocal
# from ..models.master import MenuCategory, Product, ProductVersion, SaleCategory
# from ..schemas.user_token import UserToken
# from . import effective_date
#
#
# router = APIRouter()
#
#
# # Dependency
# def get_db():
# try:
# db = SessionLocal()
# yield db
# finally:
# db.close()
#
#
# @router.post("/list", response_model=List[schemas.Product])
# def sort_order(
# data: List[schemas.Product],
# date_: date = Depends(effective_date),
# db: Session = Depends(get_db),
# user: UserToken = Security(get_user, scopes=["products"]),
# ):
# try:
# indexes = {}
# for item in data:
# if item.menu_category.id_ in indexes:
# indexes[item.menu_category.id_] += 1
# else:
# indexes[item.menu_category.id_] = 0
# db.query(ProductVersion).filter(
# and_(
# ProductVersion.product_id == item.id_,
# or_(
# ProductVersion.valid_from == None, # noqa: E711
# ProductVersion.valid_from <= date_,
# ),
# or_(
# ProductVersion.valid_till == None, # noqa: E711
# ProductVersion.valid_till >= date_,
# ),
# )
# ).update({ProductVersion.sort_order: indexes[item.menu_category.id_]})
# db.commit()
# return product_list(date_, db)
# except SQLAlchemyError as e:
# db.rollback()
# raise HTTPException(
# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
# detail=str(e),
# )
# except Exception:
# db.rollback()
# raise
#
#
# @router.post("", response_model=schemas.Product)
# def save(
# data: schemas.ProductIn,
# date_: date = Depends(effective_date),
# db: Session = Depends(get_db),
# user: UserToken = Security(get_user, scopes=["products"]),
# ):
# try:
# item = Product()
# db.add(item)
# db.flush()
# product_version = ProductVersion(
# product_id=item.id,
# name=data.name,
# units=data.units,
# menu_category_id=data.menu_category.id_,
# sale_category_id=data.sale_category.id_,
# price=data.price,
# has_happy_hour=data.has_happy_hour,
# is_not_available=data.is_not_available,
# quantity=data.quantity,
# valid_from=date_,
# valid_till=None,
# )
# db.add(product_version)
# db.commit()
# return product_info(product_version)
# except SQLAlchemyError as e:
# db.rollback()
# raise HTTPException(
# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
# detail=str(e),
# )
# except Exception:
# db.rollback()
# raise
#
#
# @router.put("/{id_}", response_model=schemas.Product)
# def update(
# id_: uuid.UUID,
# data: schemas.ProductIn,
# date_: date = Depends(effective_date),
# db: Session = Depends(get_db),
# user: UserToken = Security(get_user, scopes=["products"]),
# ) -> schemas.Product:
# try:
# item: ProductVersion = (
# db.query(ProductVersion)
# .join(ProductVersion.menu_category)
# .filter(
# and_(
# ProductVersion.product_id == id_,
# or_(
# ProductVersion.valid_from == None, # noqa: E711
# ProductVersion.valid_from <= date_,
# ),
# or_(
# ProductVersion.valid_till == None, # noqa: E711
# ProductVersion.valid_till >= date_,
# ),
# )
# )
# .first()
# )
# if item.valid_till is not None:
# # Allow adding a product here splitting the valid from and to, but not implemented right now
# raise HTTPException(
# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
# detail="Product has been invalidated",
# )
# if item.valid_from == date_: # Update the product as valid from the the same
# item.name = data.name
# item.units = data.units
# item.menu_category_id = data.menu_category.id_
# item.sale_category_id = data.sale_category.id_
# item.price = data.price
# item.has_happy_hour = data.has_happy_hour
# item.is_not_available = data.is_not_available
# item.quantity = data.quantity
# db.commit()
# return product_info(item)
# else: # Create a new version of the product from the new details
# item.valid_till = date_ - timedelta(days=1)
# product_version = ProductVersion(
# product_id=item.product_id,
# name=data.name,
# units=data.units,
# menu_category_id=data.menu_category.id_,
# sale_category_id=data.sale_category.id_,
# price=data.price,
# has_happy_hour=data.has_happy_hour,
# is_not_available=data.is_not_available,
# quantity=data.quantity,
# valid_from=date_,
# valid_till=None,
# sort_order=item.sort_order,
# )
# db.add(product_version)
# db.commit()
# return product_info(product_version)
#
# except SQLAlchemyError as e:
# db.rollback()
# raise HTTPException(
# status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
# detail=str(e),
# )
# except Exception:
# db.rollback()
# raise
#
#
# @router.delete("/{id_}")
# def delete(
# id_: uuid.UUID,
# date_: date = Depends(effective_date),
# db: Session = Depends(get_db),
# user: UserToken = Security(get_user, scopes=["products"]),
# ):
# try:
# item: ProductVersion = (
# db.query(ProductVersion)
# .filter(
# and_(
# ProductVersion.product_id == id_,
# or_(
# ProductVersion.valid_from == None, # noqa: E711
# ProductVersion.valid_from <= date_,
# ),
# or_(
# ProductVersion.valid_till == None, # noqa: E711
# ProductVersion.valid_till >= date_,
# ),
# )
# )
# .first()
# )
# if item is None:
# raise HTTPException(
# status_code=status.HTTP_404_NOT_FOUND,
# detail="Product not found",
# )
# elif item.valid_from == date_:
# db.delete(item)
# else:
# item.valid_till = date_ - timedelta(days=1)
# db.commit()
# except Exception:
# db.rollback()
# raise
#
#
# @router.get("", response_model=schemas.ProductBlank)
# def show_blank(
# db: Session = Depends(get_db),
# user: UserToken = Security(get_user, scopes=["products"]),
# ) -> schemas.ProductBlank:
# return schemas.ProductBlank(
# name="",
# units="",
# price=0,
# hasHappyHour=False,
# isNotAvailable=False,
# isActive=True,
# sortOrder=0,
# )
#
#
# @router.get("/list", response_model=List[schemas.Product])
# def show_list(
# date_: date = Depends(effective_date), db: Session = Depends(get_db), user: UserToken = Depends(get_user)
# ) -> List[schemas.Product]:
# return product_list(date_, db)
#
#
# def product_list(date_: date, db: Session) -> List[schemas.Product]:
# return [
# product_info(item)
# for item in db.query(ProductVersion)
# .join(ProductVersion.menu_category)
# .join(ProductVersion.sale_category)
# .filter(
# and_(
# or_(
# ProductVersion.valid_from == None, # noqa: E711
# ProductVersion.valid_from <= date_,
# ),
# or_(
# ProductVersion.valid_till == None, # noqa: E711
# ProductVersion.valid_till >= date_,
# ),
# )
# )
# .order_by(MenuCategory.sort_order)
# .order_by(MenuCategory.name)
# .order_by(ProductVersion.sort_order)
# .order_by(ProductVersion.name)
# .options(
# joinedload(ProductVersion.menu_category, innerjoin=True),
# joinedload(ProductVersion.sale_category, innerjoin=True),
# contains_eager(ProductVersion.menu_category),
# contains_eager(ProductVersion.sale_category),
# )
# .all()
# ]
#
#
# @router.get("/query")
# async def show_term(
# mc: uuid.UUID = None,
# date_: date = Depends(effective_date),
# db: Session = Depends(get_db),
# current_user: UserToken = Depends(get_user),
# ):
# list_ = []
# for item in (
# db.query(ProductVersion)
# .join(ProductVersion.sale_category)
# .join(ProductVersion.sale_category, SaleCategory.tax)
# .filter(
# and_(
# ProductVersion.menu_category_id == mc,
# or_(
# ProductVersion.valid_from == None, # noqa: E711
# ProductVersion.valid_from <= date_,
# ),
# or_(
# ProductVersion.valid_till == None, # noqa: E711
# ProductVersion.valid_till >= date_,
# ),
# )
# )
# .order_by(ProductVersion.sort_order, ProductVersion.name)
# .options(
# joinedload(ProductVersion.sale_category, innerjoin=True),
# joinedload(ProductVersion.sale_category, SaleCategory.tax, innerjoin=True),
# contains_eager(ProductVersion.sale_category),
# contains_eager(ProductVersion.sale_category, SaleCategory.tax),
# )
# .all()
# ):
# list_.append(query_product_info(item, False))
# if item.has_happy_hour:
# list_.append(query_product_info(item, True))
# return list_
#
#
# @router.get("/{id_}", response_model=schemas.Product)
# def show_id(
# id_: uuid.UUID,
# date_: date = Depends(effective_date),
# db: Session = Depends(get_db),
# user: UserToken = Security(get_user, scopes=["products"]),
# ) -> schemas.Product:
# item: ProductVersion = (
# db.query(ProductVersion)
# .join(ProductVersion.sale_category)
# .join(ProductVersion.sale_category, SaleCategory.tax)
# .filter(
# and_(
# ProductVersion.product_id == id_,
# or_(
# ProductVersion.valid_from == None, # noqa: E711
# ProductVersion.valid_from <= date_,
# ),
# or_(
# ProductVersion.valid_till == None, # noqa: E711
# ProductVersion.valid_till >= date_,
# ),
# )
# )
# .order_by(ProductVersion.sort_order, ProductVersion.name)
# .options(
# joinedload(ProductVersion.sale_category, innerjoin=True),
# joinedload(ProductVersion.sale_category, SaleCategory.tax, innerjoin=True),
# contains_eager(ProductVersion.sale_category),
# contains_eager(ProductVersion.sale_category, SaleCategory.tax),
# )
# .first()
# )
# return product_info(item)
#
#
# def query_product_info(item: ProductVersion, happy_hour: bool):
# return {
# "id": item.product_id,
# "name": ("H H " if happy_hour else "") + item.full_name,
# "saleCategory": {
# "id": item.sale_category_id,
# "name": item.sale_category.name,
# },
# "tax": {
# "id": item.sale_category.tax_id,
# "name": item.sale_category.tax.name,
# "rate": item.sale_category.tax.rate,
# },
# "price": item.price,
# "hasHappyHour": happy_hour,
# "isNotAvailable": item.is_not_available,
# "sortOrder": item.sort_order,
# }
#
#
# def product_info(item: ProductVersion) -> schemas.Product:
# return schemas.Product(
# id=item.product_id,
# name=item.name,
# units=item.units,
# menuCategory=schemas.MenuCategoryLink(id=item.menu_category_id, name=item.menu_category.name),
# saleCategory=schemas.SaleCategoryLink(
# id=item.sale_category_id,
# name=item.sale_category.name,
# ),
# price=item.price,
# hasHappyHour=item.has_happy_hour,
# isNotAvailable=item.is_not_available,
# isActive=True,
# sortOrder=item.sort_order,
# )