import uuid from typing import List import barker.schemas.customer as schemas from fastapi import APIRouter, Depends, HTTPException, Security, status from sqlalchemy import delete, or_, select from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Session from ..core.security import get_current_active_user as get_user from ..db.session import SessionFuture from ..models.customer import Customer from ..models.customer_discount import CustomerDiscount from ..models.sale_category import SaleCategory from ..schemas.user_token import UserToken router = APIRouter() @router.post("", response_model=schemas.Customer) def save( data: schemas.CustomerIn, user: UserToken = Security(get_user, scopes=["customers"]), ) -> schemas.Customer: try: with SessionFuture() as db: item = Customer(name=data.name, phone=data.phone, address=data.address, print_in_bill=data.print_in_bill) db.add(item) add_discounts(item, data.discounts, db) db.commit() sc: List[SaleCategory] = db.execute(select(SaleCategory).order_by(SaleCategory.name)).scalars().all() return customer_info(item, sc) except SQLAlchemyError as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e), ) @router.put("/{id_}", response_model=schemas.Customer) def update_route( id_: uuid.UUID, data: schemas.CustomerIn, user: UserToken = Security(get_user, scopes=["customers"]), ) -> schemas.Customer: try: with SessionFuture() as db: item: Customer = db.execute(select(Customer).where(Customer.id == id_)).scalar_one() item.name = data.name item.phone = data.phone item.address = data.address item.print_in_bill = data.print_in_bill add_discounts(item, data.discounts, db) db.commit() sc: List[SaleCategory] = db.execute(select(SaleCategory).order_by(SaleCategory.name)).scalars().all() return customer_info(item, sc) except SQLAlchemyError as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e), ) def add_discounts(customer: Customer, discounts: List[schemas.DiscountItem], db: Session) -> None: for discount in discounts: cd = next((d for d in customer.discounts if d.sale_category_id == discount.id_), None) if cd is None: cd = CustomerDiscount(discount.id_, round(discount.discount, 5), customer=customer) customer.discounts.append(cd) db.add(cd) else: cd.discount = round(discount.discount, 5) @router.delete("/{id_}") def delete_route( id_: uuid.UUID, user: UserToken = Security(get_user, scopes=["customers"]), ) -> None: with SessionFuture() as db: db.execute(delete(CustomerDiscount).where(CustomerDiscount.customer_id == id_)) db.execute(delete(Customer).where(Customer.id == id_)) db.commit() @router.get("", response_model=schemas.CustomerBlank) def show_blank( user: UserToken = Security(get_user, scopes=["customers"]), ) -> schemas.CustomerBlank: with SessionFuture() as db: sc: List[SaleCategory] = db.execute(select(SaleCategory).order_by(SaleCategory.name)).scalars().all() return blank_customer_info(sc) @router.get("/list", response_model=List[schemas.Customer]) def show_list(user: UserToken = Depends(get_user)) -> List[schemas.Customer]: with SessionFuture() as db: sc: List[SaleCategory] = db.execute(select(SaleCategory).order_by(SaleCategory.name)).scalars().all() return [ customer_info(item, sc) for item in db.execute(select(Customer).order_by(Customer.name)).scalars().all() ] @router.get("/query", response_model=List[schemas.Customer]) def show_term( q: str, current_user: UserToken = Depends(get_user), ) -> List[schemas.Customer]: query = select(Customer) if q is not None: for item in q.split(): query = query.where(or_(Customer.name.ilike(f"%{item}%"), Customer.phone.ilike(f"%{item}%"))) query = query.order_by(Customer.name) with SessionFuture() as db: sc: List[SaleCategory] = db.execute(select(SaleCategory).order_by(SaleCategory.name)).scalars().all() return [customer_info(item, sc) for item in db.execute(query).scalars().all()] @router.get("/{id_}", response_model=schemas.Customer) def show_id( id_: uuid.UUID, user: UserToken = Security(get_user, scopes=["customers"]), ) -> schemas.Customer: with SessionFuture() as db: sc: List[SaleCategory] = db.execute(select(SaleCategory).order_by(SaleCategory.name)).scalars().all() item: Customer = db.execute(select(Customer).where(Customer.id == id_)).scalar_one() return customer_info(item, sc) def customer_info(item: Customer, sale_categories: List[SaleCategory]) -> schemas.Customer: return schemas.Customer( id=item.id, name=item.name, address=item.address, phone=item.phone, printInBill=item.print_in_bill, discounts=[ { "id": sc.id, "name": sc.name, "discount": next((d.discount for d in item.discounts if d.sale_category_id == sc.id), 0), } for sc in sale_categories ], ) def blank_customer_info(sale_categories: List[SaleCategory]) -> schemas.CustomerBlank: return schemas.CustomerBlank( name="", address="", phone="", printInBill=False, discounts=[ { "id": sc.id, "name": sc.name, "discount": 0, } for sc in sale_categories ], )