brewman/brewman/routers/purchase.py

353 lines
12 KiB
Python

import traceback
import uuid
from decimal import Decimal
from typing import List
from datetime import datetime
from fastapi import (
APIRouter,
HTTPException,
status,
Depends,
Security,
File,
Request,
)
from sqlalchemy import func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from .db_image import save_files, update_files
from .voucher import (
voucher_info,
check_voucher_lock_info,
check_voucher_edit_allowed,
blank_voucher,
)
from ..core.session import set_date, get_date
from ..models import Product, AccountBase
from ..schemas.auth import UserToken
from ..core.security import get_current_active_user as get_user
from ..db.session import SessionLocal
from ..models.voucher import Voucher, VoucherType, Batch, Inventory, Journal
import brewman.schemas.voucher as output
import brewman.schemas.input as schema_in
router = APIRouter()
# Dependency
def get_db() -> Session:
try:
db = SessionLocal()
yield db
finally:
db.close()
@router.post("", response_model=output.Voucher)
def save_route(
request: Request,
data: schema_in.PurchaseIn = Depends(schema_in.PurchaseIn.load_form),
db: Session = Depends(get_db),
i: List[bytes] = File(None),
t: List[bytes] = File(None),
user: UserToken = Security(get_user, scopes=["purchase"]),
):
try:
item: Voucher = save(data, user, db)
save_inventories(item, data.inventories, db)
save_journals(item, data.vendor, db)
save_files(item.id, i, t, db)
db.commit()
set_date(data.date_.strftime("%d-%b-%Y"), request.session)
info = voucher_info(item, db)
return info
except SQLAlchemyError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e),
)
except Exception:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=traceback.format_exc(),
)
def save(data: schema_in.PurchaseIn, user: UserToken, db: Session) -> Voucher:
check_voucher_lock_info(None, data.date_, db)
voucher = Voucher(
date=data.date_,
narration=data.narration,
is_starred=data.is_starred,
user_id=user.id_,
type_=VoucherType.by_name(data.type_),
)
db.add(voucher)
return voucher
def save_inventories(
voucher: Voucher, inventories: List[schema_in.Inventory], db: Session
):
for item in inventories:
product: Product = db.query(Product).filter(
Product.id == item.product.id_
).first()
batch = Batch(
name=voucher.date,
product=product,
quantity_remaining=item.quantity,
rate=item.rate,
tax=item.tax,
discount=item.discount,
)
db.add(batch)
inventory = Inventory(
id_=item.id_,
product=product,
batch=batch,
quantity=item.quantity,
rate=item.rate,
tax=item.tax,
discount=item.discount,
)
product.price = item.rate
voucher.inventories.append(inventory)
db.add(inventory)
def save_journals(voucher: Voucher, ven: schema_in.AccountLink, db: Session):
vendor = db.query(AccountBase).filter(AccountBase.id == ven.id_).first()
journals = {}
amount = 0
for item in voucher.inventories:
account = item.product.account
amount += round(item.amount, 2)
if account.id in journals:
journals[account.id].amount += round(item.amount, 2)
else:
journals[account.id] = Journal(
debit=1,
cost_centre_id=account.cost_centre_id,
account_id=account.id,
amount=round(item.amount, 2),
)
journals[vendor.id] = Journal(
debit=-1,
cost_centre_id=vendor.cost_centre_id,
account_id=vendor.id,
amount=amount,
)
for item in journals.values():
voucher.journals.append(item)
db.add(item)
@router.put("/{id_}", response_model=output.Voucher)
def update_route(
id_: uuid.UUID,
request: Request,
data: schema_in.PurchaseIn = Depends(schema_in.PurchaseIn.load_form),
db: Session = Depends(get_db),
i: List[bytes] = File(None),
t: List[bytes] = File(None),
user: UserToken = Security(get_user, scopes=["purchase"]),
):
try:
item: Voucher = update(id_, data, user, db)
update_inventory(item, data.inventories, db)
update_journals(item, data.vendor, db)
# journals_valid(voucher)
update_files(item.id, data.files, i, t, db)
db.commit()
set_date(data.date_.strftime("%d-%b-%Y"), request.session)
return voucher_info(item, db)
except SQLAlchemyError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e),
)
except Exception:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=traceback.format_exc(),
)
def update(
id_: uuid.UUID, data: schema_in.PurchaseIn, user: UserToken, db: Session
) -> Voucher:
voucher: Voucher = db.query(Voucher).filter(Voucher.id == id_).first()
check_voucher_lock_info(voucher.date, data.date_, db)
check_voucher_edit_allowed(voucher, user)
voucher.date = data.date_
voucher.is_starred = data.is_starred
voucher.narration = data.narration
voucher.user_id = user.id_
voucher.posted = False
voucher.last_edit_date = datetime.utcnow()
return voucher
def update_inventory(
voucher: Voucher, new_inventories: List[schema_in.Inventory], db: Session
):
for it in range(len(voucher.inventories), 0, -1):
item = voucher.inventories[it - 1]
found = False
for j in range(len(new_inventories), 0, -1):
new_inventory = new_inventories[j - 1]
if new_inventory.id_:
product = (
db.query(Product)
.filter(Product.id == new_inventory.product.id_)
.first()
)
found = True
if item.product_id != new_inventory.product.id_:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Product cannot be changed",
)
old_quantity = round(Decimal(item.quantity), 2)
quantity_remaining = round(Decimal(item.batch.quantity_remaining), 2)
if new_inventory.quantity < (old_quantity - quantity_remaining):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"{old_quantity - quantity_remaining} is the minimum as it has been issued",
)
item.batch.quantity_remaining -= old_quantity - new_inventory.quantity
item.quantity = new_inventory.quantity
if voucher.date != item.batch.name:
item.batch.name = voucher.date
if voucher.date < item.batch.name:
# TODO: check for issued products which might have been in a back date
pass
item.rate = new_inventory.rate
item.batch.rate = new_inventory.rate
item.discount = new_inventory.discount
item.batch.discount = new_inventory.discount
item.tax = new_inventory.tax
item.batch.tax = new_inventory.tax
product.price = new_inventory.rate
new_inventories.remove(new_inventory)
# TODO: Update all references of the batch with the new rates
break
if not found:
has_been_issued = (
db.query(func.count(Inventory.id))
.filter(Inventory.batch_id == item.batch.id)
.filter(Inventory.id != item.id)
.scalar()
)
if has_been_issued > 0:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"{item.product.name} has been issued, it cannot be deleted",
)
else:
db.delete(item.batch)
db.delete(item)
voucher.inventories.remove(item)
for new_inventory in new_inventories:
product = (
db.query(Product).filter(Product.id == new_inventory.product.id_).first()
)
batch = Batch(
name=voucher.date,
product_id=product.id,
quantity_remaining=new_inventory.quantity,
rate=new_inventory.rate,
tax=new_inventory.tax,
discount=new_inventory.discount,
)
inventory = Inventory(
id_=None,
product_id=product.id,
batch=batch,
quantity=new_inventory.quantity,
rate=new_inventory.rate,
tax=new_inventory.tax,
discount=new_inventory.discount,
)
inventory.voucher_id = voucher.id
db.add(batch)
inventory.batch_id = batch.id
product.price = new_inventory.rate
voucher.inventories.append(inventory)
db.add(inventory)
def update_journals(voucher: Voucher, ven: schema_in.AccountLink, db):
vendor = db.query(AccountBase).filter(AccountBase.id == ven.id_).first()
journals = {}
amount = 0
for item in voucher.inventories:
product = db.query(Product).filter(Product.id == item.product_id).first()
account = product.account
amount += item.amount
if account.id in journals:
journals[account.id].amount += item.amount
else:
journals[account.id] = Journal(
debit=1,
cost_centre_id=account.cost_centre_id,
account_id=account.id,
amount=item.amount,
)
journals[vendor.id] = Journal(
debit=-1,
cost_centre_id=vendor.cost_centre_id,
account_id=vendor.id,
amount=amount,
)
for i in range(len(voucher.journals), 0, -1):
item = voucher.journals[i - 1]
if item.account_id in journals:
item.debit = journals[item.account_id].debit
item.amount = round(journals[item.account_id].amount, 2)
item.cost_centre_id = journals[item.account_id].cost_centre_id
del journals[item.account_id]
else:
db.delete(item)
voucher.journals.remove(item)
for item in journals.values():
item.amount = round(item.amount, 2)
voucher.journals.append(item)
db.add(item)
@router.get("/{id_}", response_model=output.Voucher)
def get_id(
id_: uuid.UUID,
db: Session = Depends(get_db),
user: UserToken = Security(get_user, scopes=["purchase"]),
):
try:
item: Voucher = db.query(Voucher).filter(Voucher.id == id_).first()
return voucher_info(item, db)
except SQLAlchemyError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e),
)
except Exception:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=traceback.format_exc(),
)
@router.get("", response_model=output.Voucher)
def show_blank(
request: Request,
db: Session = Depends(get_db),
user: UserToken = Security(get_user, scopes=["purchase"]),
):
additional_info = {"date": get_date(request.session), "type": "Purchase"}
return blank_voucher(additional_info, db)