439 lines
16 KiB
Python
439 lines
16 KiB
Python
import subprocess
|
|
import tempfile
|
|
import uuid
|
|
|
|
from collections import defaultdict
|
|
from datetime import date
|
|
from decimal import Decimal
|
|
from io import BytesIO
|
|
from typing import Sequence
|
|
|
|
import brewman.schemas.recipe as schemas
|
|
import brewman.schemas.recipe_item as rischemas
|
|
|
|
from brewman.models.price import Price
|
|
from brewman.routers.calculate_prices import calculate_prices
|
|
from fastapi import APIRouter, Depends, HTTPException, Request, Security, status
|
|
from fastapi.responses import FileResponse, StreamingResponse
|
|
from openpyxl import Workbook
|
|
from openpyxl.styles import Alignment, Border, Font, NamedStyle, PatternFill, Side
|
|
from sqlalchemy import delete, func, select
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.orm import Session, contains_eager, joinedload
|
|
|
|
from ..core.security import get_current_active_user as get_user
|
|
from ..db.session import SessionFuture
|
|
from ..models.product import Product
|
|
from ..models.recipe import Recipe
|
|
from ..models.recipe_item import RecipeItem
|
|
from ..models.recipe_template import RecipeTemplate
|
|
from ..models.stock_keeping_unit import StockKeepingUnit
|
|
from ..schemas.user import UserToken
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("", response_model=schemas.Recipe)
|
|
def save(
|
|
data: schemas.RecipeIn,
|
|
request: Request,
|
|
user: UserToken = Security(get_user, scopes=["recipes"]),
|
|
) -> schemas.Recipe:
|
|
try:
|
|
with SessionFuture() as db:
|
|
recipe_sku = db.execute(select(StockKeepingUnit).where(StockKeepingUnit.id == data.sku.id_)).scalar_one()
|
|
|
|
if recipe_sku.product_id in set(i.product.id_ for i in data.items):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail="Product cannot also be an ingredient",
|
|
)
|
|
if len(data.items) == 0:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail="Recipe has no ingredients",
|
|
)
|
|
recipe = Recipe(
|
|
date_=data.date_,
|
|
source=data.source,
|
|
instructions=data.instructions,
|
|
garnishing=data.garnishing,
|
|
plating=data.plating,
|
|
notes=data.notes,
|
|
sku=recipe_sku,
|
|
recipe_yield=round(data.recipe_yield, 2),
|
|
)
|
|
for item in data.items:
|
|
product: Product = db.execute(select(Product).where(Product.id == item.product.id_)).scalar_one()
|
|
quantity = round(item.quantity, 2)
|
|
recipe.items.append(RecipeItem(product_id=product.id, quantity=quantity, description=item.description))
|
|
|
|
db.add(recipe)
|
|
for r_item in recipe.items:
|
|
r_item.recipe_id = recipe.id
|
|
db.add(r_item)
|
|
|
|
db.flush()
|
|
check_recursion(recipe_sku.product_id, set(), db)
|
|
db.commit()
|
|
return recipe_info(recipe)
|
|
except SQLAlchemyError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=str(e),
|
|
)
|
|
|
|
|
|
@router.put("/{id_}", response_model=schemas.Recipe)
|
|
async def update_route(
|
|
id_: uuid.UUID,
|
|
data: schemas.RecipeIn,
|
|
request: Request,
|
|
user: UserToken = Security(get_user, scopes=["recipes"]),
|
|
) -> schemas.Recipe:
|
|
try:
|
|
with SessionFuture() as db:
|
|
recipe: Recipe = db.execute(select(Recipe).where(Recipe.id == id_)).scalar_one()
|
|
|
|
sku: StockKeepingUnit = db.execute(
|
|
select(StockKeepingUnit).where(StockKeepingUnit.id == data.sku.id_)
|
|
).scalar_one()
|
|
if sku.product_id in set(i.product.id_ for i in data.items):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail="Product cannot also be an ingredient",
|
|
)
|
|
|
|
recipe.sku = sku
|
|
recipe.recipe_yield = round(data.recipe_yield, 2)
|
|
recipe.date_ = data.date_
|
|
recipe.source = data.source
|
|
recipe.instructions = data.instructions
|
|
recipe.garnishing = data.garnishing
|
|
recipe.plating = data.plating
|
|
for i in range(len(recipe.items), 0, -1):
|
|
item = recipe.items[i - 1]
|
|
index = next((idx for (idx, d) in enumerate(data.items) if d.id_ == item.id), None)
|
|
product: Product
|
|
if index is not None:
|
|
new_item = data.items.pop(index)
|
|
product = db.execute(select(Product).where(Product.id == new_item.product.id_)).scalar_one()
|
|
item.product_id = new_item.product.id_
|
|
item.quantity = round(new_item.quantity, 2)
|
|
item.description = new_item.description
|
|
else:
|
|
recipe.items.remove(item)
|
|
|
|
for d_item in data.items:
|
|
product = db.execute(select(Product).where(Product.id == d_item.product.id_)).scalar_one()
|
|
quantity = round(d_item.quantity, 2)
|
|
recipe.items.append(
|
|
RecipeItem(product_id=product.id, quantity=quantity, description=d_item.description)
|
|
)
|
|
|
|
db.flush()
|
|
check_recursion(sku.product_id, set(), db)
|
|
db.commit()
|
|
return recipe_info(recipe)
|
|
except SQLAlchemyError as e:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=str(e),
|
|
)
|
|
|
|
|
|
def check_recursion(product: uuid.UUID, visited: set[uuid.UUID], db: Session) -> None:
|
|
if product in visited:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail="Recipe recursion. Some ingredient recipe contains parent recipe.",
|
|
)
|
|
recipe: Recipe = (
|
|
db.execute(select(Recipe).join(Recipe.items).join(Recipe.sku).where(StockKeepingUnit.product_id == product))
|
|
.unique()
|
|
.scalar_one_or_none()
|
|
)
|
|
if recipe is None:
|
|
return
|
|
visited.add(product)
|
|
for i in recipe.items:
|
|
check_recursion(i.product_id, visited, db)
|
|
visited.remove(product)
|
|
|
|
|
|
@router.delete("/{id_}", response_model=None)
|
|
def delete_route(
|
|
id_: uuid.UUID,
|
|
request: Request,
|
|
user: UserToken = Security(get_user, scopes=["recipes"]),
|
|
) -> None:
|
|
with SessionFuture() as db:
|
|
recipe_ids: Sequence[uuid.UUID] = (
|
|
db.execute(
|
|
select(func.distinct(RecipeItem.recipe_id)).where(
|
|
RecipeItem.product_id
|
|
== select(StockKeepingUnit.product_id)
|
|
.where(StockKeepingUnit.id == select(Recipe.sku_id).where(Recipe.id == id_).scalar_subquery())
|
|
.scalar_subquery(),
|
|
)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
if len(recipe_ids) > 0:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail="This recipe has other recipes dependent on it and cannot be deleted",
|
|
)
|
|
|
|
db.execute(delete(RecipeItem).where(RecipeItem.recipe_id == id_))
|
|
db.execute(delete(Recipe).where(Recipe.id == id_))
|
|
|
|
db.commit()
|
|
|
|
|
|
@router.get("", response_model=schemas.RecipeBlank)
|
|
def show_blank(
|
|
request: Request,
|
|
user: UserToken = Security(get_user, scopes=["recipes"]),
|
|
) -> schemas.RecipeBlank:
|
|
return recipe_blank()
|
|
|
|
|
|
@router.get("/list", response_model=list[schemas.Recipe])
|
|
async def show_list(
|
|
user: UserToken = Depends(get_user),
|
|
) -> list[schemas.Recipe]:
|
|
with SessionFuture() as db:
|
|
list_: Sequence[Recipe] = (
|
|
db.execute(
|
|
select(Recipe)
|
|
.options(joinedload(Recipe.sku, innerjoin=True).joinedload(StockKeepingUnit.product, innerjoin=True))
|
|
.order_by(Recipe.sku_id)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
return [
|
|
schemas.Recipe(
|
|
id_=item.id,
|
|
sku=schemas.ProductLink(id_=item.sku.id, name=item.sku.product.name),
|
|
date_=item.date_,
|
|
source=item.source,
|
|
instructions=item.instructions,
|
|
garnishing=item.garnishing,
|
|
plating=item.plating,
|
|
recipe_yield=item.recipe_yield,
|
|
notes=item.notes,
|
|
product_group_id=item.sku.product.product_group_id,
|
|
units=item.sku.units,
|
|
items=[],
|
|
)
|
|
for item in list_
|
|
]
|
|
|
|
|
|
# @router.get("/ingredient-details/{id_}", response_model=ProductSku)
|
|
# def show_ingredient_id(
|
|
# id_: uuid.UUID,
|
|
# user: UserToken = Security(get_user, scopes=["recipes"]),
|
|
# ) -> ProductSku:
|
|
# with SessionFuture() as db:
|
|
# product: Product = db.execute(select(Product).join(Product.skus).where(Product.id == id_)).unique().scalar_one()
|
|
# return ProductSku(
|
|
# id_=id_,
|
|
# name=product.name,
|
|
# fractionUnits=product.fraction_units,
|
|
# isRateContracted=False,
|
|
# )
|
|
|
|
|
|
@router.get("/pdf/{id_}", response_class=FileResponse)
|
|
def show_pdf(
|
|
id_: uuid.UUID,
|
|
user: UserToken = Security(get_user, scopes=["recipes"]),
|
|
) -> schemas.Recipe:
|
|
with SessionFuture() as db:
|
|
# template = db.execute(select(RecipeTemplate.text).where(RecipeTemplate.selected == True)).scalar_one()
|
|
# with tempfile.TemporaryDirectory() as tmpdir:
|
|
# with open(tmpdir + "/recipe.tex", 'w') as rt:
|
|
# rt.write(template)
|
|
# subprocess.run(f"docker run -i --rm --name latex --volume \"{tmpdir}\":/data -w /data --user $(id -u):$(id -g) texlive/texlive:latest pdflatex recipe.tex")
|
|
item: Recipe = db.execute(select(Recipe).where(Recipe.id == id_)).scalar_one()
|
|
return recipe_info(item)
|
|
|
|
|
|
@router.get("/xlsx", response_class=StreamingResponse)
|
|
def get_report(
|
|
p: uuid.UUID | None = None,
|
|
t: uuid.UUID | None = None,
|
|
) -> StreamingResponse:
|
|
with SessionFuture() as db:
|
|
calculate_prices(t, db)
|
|
db.commit()
|
|
prices: list[tuple[str, str, Decimal]] = []
|
|
with SessionFuture() as db:
|
|
pq = (
|
|
db.execute(select(Price).where(Price.period_id == t).options(joinedload(Price.product, innerjoin=True)))
|
|
.unique()
|
|
.scalars()
|
|
.all()
|
|
)
|
|
prices = [(i.product.name, i.product.fraction_units, i.price) for i in pq]
|
|
|
|
list_: Sequence[Recipe] = []
|
|
with SessionFuture() as db:
|
|
q = (
|
|
select(Recipe)
|
|
.join(Recipe.sku)
|
|
.join(StockKeepingUnit.product)
|
|
.join(Product.product_group)
|
|
.options(
|
|
joinedload(Recipe.items, innerjoin=True).joinedload(RecipeItem.product, innerjoin=True),
|
|
contains_eager(Recipe.sku, StockKeepingUnit.product, Product.product_group),
|
|
)
|
|
)
|
|
if p is not None:
|
|
q = q.where(Recipe.sku, StockKeepingUnit.product, Product.product_group_id == p)
|
|
list_ = db.execute(q).unique().scalars().all()
|
|
e = excel(prices, sorted(list_, key=lambda r: r.sku.product.name))
|
|
e.seek(0)
|
|
|
|
headers = {"Content-Disposition": "attachment; filename = recipe.xlsx"}
|
|
return StreamingResponse(e, media_type="text/xlsx", headers=headers)
|
|
|
|
|
|
def excel(prices: list[tuple[str, str, Decimal, Decimal, Decimal]], recipes: list[Recipe]) -> BytesIO:
|
|
wb = Workbook()
|
|
wb.active.title = "Rate List"
|
|
wb.active.cell(row=1, column=1, value="Name")
|
|
wb.active.cell(row=1, column=2, value="Units")
|
|
wb.active.cell(row=1, column=3, value="Rate")
|
|
for i, p in enumerate(prices, start=2):
|
|
wb.active.cell(row=i, column=1, value=p[0])
|
|
wb.active.cell(row=i, column=2, value=p[1])
|
|
wb.active.cell(row=i, column=3, value=p[2])
|
|
|
|
pgs = set([x.sku.product.product_group.name for x in recipes])
|
|
for pg in pgs:
|
|
wb.create_sheet(pg)
|
|
rows = defaultdict(lambda: 1)
|
|
register_styles(wb)
|
|
for recipe in recipes:
|
|
ws = wb[recipe.sku.product.product_group.name]
|
|
row = rows[recipe.sku.product.product_group.name]
|
|
print(row)
|
|
ings = len(recipe.items)
|
|
ing_from = row + 2
|
|
ing_till = ing_from + ings - 1
|
|
ws.cell(row=row, column=1, value=recipe.sku.product.name).style = "recipe_name"
|
|
# ws.cell(row=row, column=3, value=f"Yeild = {recipe.recipe_yield} {recipe.sku.units}")
|
|
ws.cell(row=row, column=2, value=recipe.sku.units).style = "recipe_unit"
|
|
ws.cell(row=row, column=3, value=recipe.recipe_yield).style = "recipe_name"
|
|
ws.cell(row=row, column=4).style = "recipe_name"
|
|
ws.cell(row=row, column=5, value=f"=SUM(E{ing_from}:E{ing_till})").style = "recipe_name"
|
|
row += 1
|
|
ws.cell(row=row, column=1, value="Ingredients").style = "header"
|
|
ws.cell(row=row, column=2, value="Unit").style = "header"
|
|
ws.cell(row=row, column=3, value="Qty").style = "header"
|
|
ws.cell(row=row, column=4, value="Rate").style = "header"
|
|
ws.cell(row=row, column=5, value="Amount").style = "header"
|
|
for item in recipe.items:
|
|
row += 1
|
|
ws.cell(row=row, column=1, value=item.product.name).style = "ing"
|
|
ws.cell(row=row, column=2, value=item.product.fraction_units).style = "unit"
|
|
ws.cell(row=row, column=3, value=item.quantity).style = "ing"
|
|
ws.cell(row=row, column=4, value="=VLOOKUP(A:A,'Rate List'!A:C,3,0)").style = "ing"
|
|
ws.cell(row=row, column=5, value=f"=C{row}*D{row}").style = "ing"
|
|
rows[recipe.sku.product.product_group.name] = row + 1
|
|
virtual_workbook = BytesIO()
|
|
wb.save(virtual_workbook)
|
|
return virtual_workbook
|
|
|
|
|
|
def register_styles(wb: Workbook) -> tuple[NamedStyle, NamedStyle, NamedStyle, NamedStyle, NamedStyle]:
|
|
bd = Side(style="thin", color="000000")
|
|
|
|
recipe_name = NamedStyle(name="recipe_name")
|
|
recipe_name.font = Font(bold=True)
|
|
recipe_name.border = Border(left=bd, top=bd, right=bd, bottom=bd)
|
|
recipe_name.fill = PatternFill(fill_type=None, start_color="FFFF00", end_color="FFFF00", patternType="solid")
|
|
wb.add_named_style(recipe_name)
|
|
|
|
recipe_unit = NamedStyle(name="recipe_unit")
|
|
recipe_unit.font = Font(bold=True)
|
|
recipe_unit.border = Border(left=bd, top=bd, right=bd, bottom=bd)
|
|
recipe_unit.fill = PatternFill(fill_type=None, start_color="FFFF00", end_color="FFFF00", patternType="solid")
|
|
recipe_unit.alignment = Alignment(horizontal="center")
|
|
wb.add_named_style(recipe_unit)
|
|
|
|
header = NamedStyle(name="header")
|
|
header.font = Font(bold=True)
|
|
header.border = Border(left=bd, top=bd, right=bd, bottom=bd)
|
|
header.fill = PatternFill(fill_type=None, start_color="92D050", end_color="92D050", patternType="solid")
|
|
header.alignment = Alignment(horizontal="center")
|
|
wb.add_named_style(header)
|
|
|
|
ing = NamedStyle(name="ing")
|
|
ing.border = Border(left=bd, top=bd, right=bd, bottom=bd)
|
|
wb.add_named_style(ing)
|
|
|
|
unit = NamedStyle(name="unit")
|
|
unit.border = Border(left=bd, top=bd, right=bd, bottom=bd)
|
|
unit.alignment = Alignment(horizontal="center")
|
|
wb.add_named_style(unit)
|
|
return recipe_name, recipe_unit, header, ing, unit
|
|
|
|
|
|
@router.get("/{id_}", response_model=schemas.Recipe)
|
|
def show_id(
|
|
id_: uuid.UUID,
|
|
user: UserToken = Security(get_user, scopes=["recipes"]),
|
|
) -> schemas.Recipe:
|
|
with SessionFuture() as db:
|
|
item: Recipe = db.execute(select(Recipe).where(Recipe.id == id_)).scalar_one()
|
|
return recipe_info(item)
|
|
|
|
|
|
def recipe_info(recipe: Recipe) -> schemas.Recipe:
|
|
return schemas.Recipe(
|
|
id_=recipe.id,
|
|
sku=schemas.ProductLink(
|
|
id_=recipe.sku_id,
|
|
name=f"{recipe.sku.product.name} ({recipe.sku.units})",
|
|
),
|
|
date_=recipe.date_,
|
|
source=recipe.source,
|
|
instructions=recipe.instructions,
|
|
garnishing=recipe.garnishing,
|
|
plating=recipe.plating,
|
|
recipe_yield=recipe.recipe_yield,
|
|
notes="",
|
|
items=[
|
|
rischemas.RecipeItem(
|
|
id_=item.id,
|
|
product=schemas.ProductLink(
|
|
id_=item.product.id,
|
|
name=f"{item.product.name} ({item.product.fraction_units})",
|
|
),
|
|
quantity=round(item.quantity, 2),
|
|
description=item.description,
|
|
)
|
|
for item in recipe.items
|
|
],
|
|
)
|
|
|
|
|
|
def recipe_blank() -> schemas.RecipeBlank:
|
|
return schemas.RecipeBlank(
|
|
date_=date.today(),
|
|
source="",
|
|
instructions="",
|
|
garnishing="",
|
|
plating="",
|
|
recipe_yield=Decimal(1),
|
|
notes="",
|
|
items=[],
|
|
)
|