import uuid from datetime import date, datetime, time, timedelta from decimal import Decimal from typing import Dict from fastapi import APIRouter, Cookie, Depends, Security from sqlalchemy import func, nulls_last, or_, select from sqlalchemy.orm import Session from ...core.config import settings from ...core.security import get_current_active_user as get_user from ...db.session import SessionFuture from ...models.inventory import Inventory from ...models.kot import Kot from ...models.menu_category import MenuCategory from ...models.product import Product from ...models.product_version import ProductVersion from ...models.sale_category import SaleCategory from ...models.voucher import Voucher from ...models.voucher_type import VoucherType from ...printing.product_sale_report import print_product_sale_report from ...schemas.menu_engineering_report import MeItem, MeReport from ...schemas.user_token import UserToken from . import check_audit_permission, report_finish_date, report_start_date router = APIRouter() @router.get("", response_model=MeReport) def menu_engineering_report_view( start_date: date = Depends(report_start_date), finish_date: date = Depends(report_finish_date), user: UserToken = Security(get_user, scopes=["product-sale-report"]), ): check_audit_permission(start_date, user.permissions) with SessionFuture() as db: return MeReport( startDate=start_date, finishDate=finish_date, amounts=menu_engineering_report(start_date, finish_date, db), ) def menu_engineering_report(s: date, f: date, db: Session): start_date = datetime.combine(s, time()) + timedelta( minutes=settings.NEW_DAY_OFFSET_MINUTES - settings.TIMEZONE_OFFSET_MINUTES ) finish_date = datetime.combine(f, time()) + timedelta( days=1, minutes=settings.NEW_DAY_OFFSET_MINUTES - settings.TIMEZONE_OFFSET_MINUTES ) day = func.date_trunc("day", Voucher.date - timedelta(minutes=settings.NEW_DAY_OFFSET_MINUTES)).label("day") list_ = db.execute( select( SaleCategory.name, MenuCategory.name, ProductVersion.id, ProductVersion.full_name, ProductVersion.price, func.sum(Inventory.quantity), func.sum(Inventory.net), ) .join(ProductVersion.sale_category) .join(ProductVersion.menu_category) .join(ProductVersion.product) .join(Product.inventories, isouter=True) .join(Inventory.kot, isouter=True) .join(Kot.voucher, isouter=True) .where( or_( Voucher.date == None, # noqa: E711 Voucher.date >= start_date, ), or_( Voucher.date == None, # noqa: E711 Voucher.date <= finish_date, ), or_( Voucher.voucher_type == None, # noqa: E711 Voucher.voucher_type == VoucherType.REGULAR_BILL, ), or_( ProductVersion.valid_from == None, # noqa: E711 ProductVersion.valid_from <= day, Voucher.date == None, # noqa: E711 ), or_( ProductVersion.valid_till == None, # noqa: E711 ProductVersion.valid_till >= day, Voucher.date == None, # noqa: E711 ), ) .group_by( SaleCategory.name, MenuCategory.name, ProductVersion.id, ProductVersion.full_name, ) .order_by(SaleCategory.name, nulls_last(func.sum(Inventory.net).desc())) ).all() info: list[MeItem] = [] sc_sales: Dict[str, Decimal] = {} sc_quantity: Dict[str, Decimal] = {} for sc, mc, id_, name, price, quantity, sales in list_: if sc not in sc_sales: sc_sales[sc] = Decimal(0) if sales is not None: sc_sales[sc] += sales if sc not in sc_quantity: sc_quantity[sc] = Decimal(0) if quantity is not None: sc_quantity[sc] += quantity info.append( MeItem( id=id_, name=name, price=price, average=round(sales / quantity) if sales and quantity else price, saleCategory=sc, menuCategory=mc, quantity=quantity or Decimal(0), sales=sales or Decimal(0), quantityPercent=Decimal(0), salesPercent=Decimal(0), ) ) for item in info: if sc_quantity[item.sale_category] > 0: item.quantity_percent = round(item.quantity / sc_quantity[item.sale_category], 4) item.sales_percent = round(item.sales / sc_sales[item.sale_category], 4) return info @router.get("/print", response_model=bool) def print_report( start_date: date = Depends(report_start_date), finish_date: date = Depends(report_finish_date), device_id: uuid.UUID = Cookie(None), user: UserToken = Security(get_user, scopes=["product-sale-report"]), ) -> bool: check_audit_permission(start_date, user.permissions) with SessionFuture() as db: report = { "userName": user.name, "startDate": start_date.strftime("%d-%b-%Y"), "finishDate": finish_date.strftime("%d-%b-%Y"), "amounts": menu_engineering_report(start_date, finish_date, db), } print_product_sale_report(report, device_id, db) return True