barker/barker/barker/routers/reports/menu_engineering_report.py

155 lines
5.5 KiB
Python

import uuid
from datetime import date, datetime, time, timedelta
from decimal import Decimal
from typing import Dict
from fastapi import APIRouter, Cookie, Depends, Security
from sqlalchemy import func, nulls_last, or_, select
from sqlalchemy.orm import Session
from ...core.config import settings
from ...core.security import get_current_active_user as get_user
from ...db.session import SessionFuture
from ...models.inventory import Inventory
from ...models.kot import Kot
from ...models.menu_category import MenuCategory
from ...models.product import Product
from ...models.product_version import ProductVersion
from ...models.sale_category import SaleCategory
from ...models.voucher import Voucher
from ...models.voucher_type import VoucherType
from ...printing.product_sale_report import print_product_sale_report
from ...schemas.menu_engineering_report import MeItem, MeReport
from ...schemas.user_token import UserToken
from . import check_audit_permission, report_finish_date, report_start_date
router = APIRouter()
@router.get("", response_model=MeReport)
def menu_engineering_report_view(
start_date: date = Depends(report_start_date),
finish_date: date = Depends(report_finish_date),
user: UserToken = Security(get_user, scopes=["product-sale-report"]),
):
check_audit_permission(start_date, user.permissions)
with SessionFuture() as db:
return MeReport(
startDate=start_date,
finishDate=finish_date,
amounts=menu_engineering_report(start_date, finish_date, db),
)
def menu_engineering_report(s: date, f: date, db: Session):
start_date = datetime.combine(s, time()) + timedelta(
minutes=settings.NEW_DAY_OFFSET_MINUTES - settings.TIMEZONE_OFFSET_MINUTES
)
finish_date = datetime.combine(f, time()) + timedelta(
days=1, minutes=settings.NEW_DAY_OFFSET_MINUTES - settings.TIMEZONE_OFFSET_MINUTES
)
day = func.date_trunc("day", Voucher.date - timedelta(minutes=settings.NEW_DAY_OFFSET_MINUTES)).label("day")
q = (
select(
SaleCategory.name,
MenuCategory.name,
ProductVersion.id,
ProductVersion.full_name,
ProductVersion.price,
func.sum(Inventory.quantity),
func.sum(Inventory.net),
)
.join(ProductVersion.sale_category)
.join(ProductVersion.menu_category)
.join(ProductVersion.product)
.join(Product.inventories, isouter=True)
.join(Inventory.kot, isouter=True)
.join(Kot.voucher, isouter=True)
.where(
or_(
Voucher.date == None, # noqa: E711
Voucher.date >= start_date,
),
or_(
Voucher.date == None, # noqa: E711
Voucher.date <= finish_date,
),
or_(
Voucher.voucher_type == None, # noqa: E711
Voucher.voucher_type == VoucherType.REGULAR_BILL,
),
or_(
ProductVersion.valid_from == None, # noqa: E711
ProductVersion.valid_from <= day,
Voucher.date == None, # noqa: E711
),
or_(
ProductVersion.valid_till == None, # noqa: E711
ProductVersion.valid_till >= day,
Voucher.date == None, # noqa: E711
),
)
.group_by(
SaleCategory.name,
MenuCategory.name,
ProductVersion.id,
ProductVersion.full_name,
)
.order_by(SaleCategory.name, nulls_last(func.sum(Inventory.net).desc()))
)
print(q)
list_ = db.execute(q).all()
info: list[MeItem] = []
sc_sales: Dict[str, Decimal] = {}
sc_quantity: Dict[str, Decimal] = {}
for sc, mc, id_, name, price, quantity, sales in list_:
if sc not in sc_sales:
sc_sales[sc] = Decimal(0)
if sales is not None:
sc_sales[sc] += sales
if sc not in sc_quantity:
sc_quantity[sc] = Decimal(0)
if quantity is not None:
sc_quantity[sc] += quantity
info.append(
MeItem(
id=id_,
name=name,
price=price,
average=round(sales / quantity) if sales and quantity else price,
saleCategory=sc,
menuCategory=mc,
quantity=quantity or Decimal(0),
sales=sales or Decimal(0),
quantityPercent=Decimal(0),
salesPercent=Decimal(0),
)
)
for item in info:
if sc_quantity[item.sale_category] > 0:
item.quantity_percent = round(item.quantity / sc_quantity[item.sale_category], 4)
item.sales_percent = round(item.sales / sc_sales[item.sale_category], 4)
return info
@router.get("/print", response_model=bool)
def print_report(
start_date: date = Depends(report_start_date),
finish_date: date = Depends(report_finish_date),
device_id: uuid.UUID = Cookie(None),
user: UserToken = Security(get_user, scopes=["product-sale-report"]),
) -> bool:
check_audit_permission(start_date, user.permissions)
with SessionFuture() as db:
report = {
"userName": user.name,
"startDate": start_date.strftime("%d-%b-%Y"),
"finishDate": finish_date.strftime("%d-%b-%Y"),
"amounts": menu_engineering_report(start_date, finish_date, db),
}
print_product_sale_report(report, device_id, db)
return True