brewman/brewman/brewman/routers/calculate_nutrition.py

250 lines
11 KiB
Python

import uuid
from ..schemas.nutritional_information import NutritionalInformation
from ..models.product_group import ProductGroup
from fastapi import HTTPException, status
from sqlalchemy import distinct, or_, select, update
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from ..models.product import Product
from ..models.recipe import Recipe
from ..models.recipe_item import RecipeItem
from ..models.stock_keeping_unit import StockKeepingUnit
def calculate_nutrition(db: Session):
try:
# Get all recipes that have nutritional values
products = set(
db.execute(
select(distinct(StockKeepingUnit.product_id))
.join(StockKeepingUnit.recipes)
.join(StockKeepingUnit.product)
.join(Product.product_group)
.where(or_(ProductGroup.nutritional == True, ProductGroup.ice_cream == True)) # noqa: E712
)
.scalars()
.all()
)
db.flush()
while len(products) > 0:
calculate(products, db)
except SQLAlchemyError as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
def calculate(products: set[uuid.UUID], db: Session) -> None:
sq = select(RecipeItem.recipe_id).where(RecipeItem.product_id.in_(products))
recipes = (
db.execute(
select(Recipe).join(Recipe.sku).where(StockKeepingUnit.product_id.in_(products), Recipe.id.notin_(sq))
)
.scalars()
.all()
)
for recipe in recipes:
protein = sum(i.quantity * i.product.protein for i in recipe.items) / recipe.sku.fraction
carbohydrate = sum(i.quantity * i.product.carbohydrate for i in recipe.items) / recipe.sku.fraction
total_sugar = sum(i.quantity * i.product.total_sugar for i in recipe.items) / recipe.sku.fraction
added_sugar = sum(i.quantity * i.product.added_sugar for i in recipe.items) / recipe.sku.fraction
total_fat = sum(i.quantity * i.product.total_fat for i in recipe.items) / recipe.sku.fraction
saturated_fat = sum(i.quantity * i.product.saturated_fat for i in recipe.items) / recipe.sku.fraction
trans_fat = sum(i.quantity * i.product.trans_fat for i in recipe.items) / recipe.sku.fraction
cholestrol = sum(i.quantity * i.product.cholestrol for i in recipe.items) / recipe.sku.fraction
sodium = sum(i.quantity * i.product.sodium for i in recipe.items) / recipe.sku.fraction
msnf = sum(i.quantity * i.product.msnf for i in recipe.items) / recipe.sku.fraction
other_solids = sum(i.quantity * i.product.other_solids for i in recipe.items) / recipe.sku.fraction
total_solids = sum(i.quantity * i.product.total_solids for i in recipe.items) / recipe.sku.fraction
water = sum(i.quantity * i.product.water for i in recipe.items) / recipe.sku.fraction
db.execute(
update(Product)
.where(Product.id == recipe.sku.product_id)
.values(
protein=protein,
carbohydrate=carbohydrate,
total_sugar=total_sugar,
added_sugar=added_sugar,
total_fat=total_fat,
saturated_fat=saturated_fat,
trans_fat=trans_fat,
cholestrol=cholestrol,
sodium=sodium,
msnf=msnf,
other_solids=other_solids,
total_solids=total_solids,
water=water,
)
)
products.remove(recipe.sku.product_id)
db.flush()
def report_nutrition(db: Session) -> list[NutritionalInformation]:
try:
# Get all recipes that have nutritional values
products = set(
db.execute(
select(distinct(StockKeepingUnit.product_id))
.join(StockKeepingUnit.recipes)
.join(StockKeepingUnit.product)
.join(Product.product_group)
)
.scalars()
.all()
)
final, semi, ingredients = nut_final(db), nut_semi(db), nut_ingredients(db)
return report(products, final, semi, ingredients, db)
except SQLAlchemyError as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
def report(
products: set[uuid.UUID], final: set[uuid.UUID], semi: set[uuid.UUID], ingredients: set[uuid.UUID], db: Session
) -> list[NutritionalInformation]:
_list: list[NutritionalInformation] = []
ingredient_dict: dict[str, set[str]] = {}
allergen_dict: dict[str, set[str]] = {}
ings = db.execute(select(Product).where(Product.id.in_(ingredients))).scalars().all()
for ingredient in ings:
if ingredient.id in products:
raise ValueError("Ingredient cannot be in a recipe")
ingredients.remove(ingredient.id)
ingredient_dict[ingredient.name] = set([ingredient.name])
allergen_dict[ingredient.name] = set(a.strip() for a in ingredient.allergen.split(",")) - set([""])
_list.append(
NutritionalInformation(
name=ingredient.name,
units=ingredient.fraction_units,
product_group="Ingredients",
description=ingredient.description,
ingredients=[ingredient.name],
allergen=list(allergen_dict[ingredient.name]),
protein=ingredient.protein,
carbohydrate=ingredient.carbohydrate,
total_sugar=ingredient.total_sugar,
added_sugar=ingredient.added_sugar,
total_fat=ingredient.total_fat,
saturated_fat=ingredient.saturated_fat,
trans_fat=ingredient.trans_fat,
cholestrol=ingredient.cholestrol,
sodium=ingredient.sodium,
msnf=ingredient.msnf,
other_solids=ingredient.other_solids,
total_solids=ingredient.total_solids,
water=ingredient.water,
)
)
while len(semi) > 0:
semi_products = (
db.execute(select(Recipe).join(Recipe.sku).where(StockKeepingUnit.product_id.in_(semi))).scalars().all()
)
for semi_recipe in semi_products:
semi_product = semi_recipe.sku.product
products.remove(semi_product.id)
semi.remove(semi_product.id)
ingredient_dict[semi_product.name] = set.union(
*[ingredient_dict[i.product.name] for i in semi_recipe.items]
)
allergen_dict[semi_product.name] = set.union(
*[allergen_dict[i.product.name] for i in semi_recipe.items]
) - set([""])
_list.append(
NutritionalInformation(
name=semi_product.name,
units=semi_product.fraction_units,
product_group="Semi",
description=semi_product.description,
ingredients=list(ingredient_dict[semi_product.name]),
allergen=list(allergen_dict[semi_product.name]),
protein=semi_product.protein,
carbohydrate=semi_product.carbohydrate,
total_sugar=semi_product.total_sugar,
added_sugar=semi_product.added_sugar,
total_fat=semi_product.total_fat,
saturated_fat=semi_product.saturated_fat,
trans_fat=semi_product.trans_fat,
cholestrol=semi_product.cholestrol,
sodium=semi_product.sodium,
msnf=semi_product.msnf,
other_solids=semi_product.other_solids,
total_solids=semi_product.total_solids,
water=semi_product.water,
)
)
recipes = db.execute(select(Recipe).join(Recipe.sku).where(StockKeepingUnit.product_id.in_(final))).scalars().all()
for recipe in recipes:
recipe_product = recipe.sku.product
products.remove(recipe_product.id)
final.remove(recipe_product.id)
ingredient_dict[recipe_product.name] = set.union(*[ingredient_dict[i.product.name] for i in recipe.items])
allergen_dict[recipe_product.name] = set.union(*[allergen_dict[i.product.name] for i in recipe.items]) - set(
[""]
)
_list.append(
NutritionalInformation(
name=recipe_product.name,
units=recipe_product.fraction_units,
product_group=recipe_product.product_group.name,
description=recipe_product.description,
ingredients=list(ingredient_dict[recipe_product.name]),
allergen=list(allergen_dict[recipe_product.name]),
protein=recipe_product.protein,
carbohydrate=recipe_product.carbohydrate,
total_sugar=recipe_product.total_sugar,
added_sugar=recipe_product.added_sugar,
total_fat=recipe_product.total_fat,
saturated_fat=recipe_product.saturated_fat,
trans_fat=recipe_product.trans_fat,
cholestrol=recipe_product.cholestrol,
sodium=recipe_product.sodium,
msnf=recipe_product.msnf,
other_solids=recipe_product.other_solids,
total_solids=recipe_product.total_solids,
water=recipe_product.water,
)
)
if len(products) > 0 or len(final) > 0 or len(semi) > 0 or len(ingredients) > 0:
raise ValueError("They cannot be more than 0")
return _list
def nut_final(db: Session) -> set[uuid.UUID]:
sq = select(StockKeepingUnit.product_id).where(StockKeepingUnit.id.in_(select(Recipe.sku_id)))
ingredients = (
db.execute(select(Product.id).where(Product.id.notin_(select(RecipeItem.product_id)), Product.id.in_(sq)))
.scalars()
.all()
)
return set(ingredients)
def nut_ingredients(db: Session) -> set[uuid.UUID]:
sq = select(StockKeepingUnit.product_id).where(StockKeepingUnit.id.in_(select(Recipe.sku_id)))
ingredients = (
db.execute(select(Product.id).where(Product.id.in_(select(RecipeItem.product_id)), Product.id.notin_(sq)))
.scalars()
.all()
)
return set(ingredients)
def nut_semi(db: Session) -> set[uuid.UUID]:
sq = select(StockKeepingUnit.product_id).where(StockKeepingUnit.id.in_(select(Recipe.sku_id)))
ingredients = (
db.execute(select(Product.id).where(Product.id.in_(select(RecipeItem.product_id)), Product.id.in_(sq)))
.scalars()
.all()
)
return set(ingredients)