barker/barker/barker/routers/customer.py

187 lines
5.4 KiB
Python

import uuid
from typing import List
import barker.schemas.customer as schemas
from fastapi import APIRouter, Depends, HTTPException, Security, status
from sqlalchemy import or_
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from ..core.security import get_current_active_user as get_user
from ..db.session import SessionLocal
from ..models.customer import Customer
from ..models.customer_discount import CustomerDiscount
from ..models.sale_category import SaleCategory
from ..schemas.user_token import UserToken
router = APIRouter()
# Dependency
def get_db():
try:
db = SessionLocal()
yield db
finally:
db.close()
@router.post("", response_model=schemas.Customer)
def save(
data: schemas.CustomerIn,
db: Session = Depends(get_db),
user: UserToken = Security(get_user, scopes=["customers"]),
) -> schemas.Customer:
try:
item = Customer(name=data.name, phone=data.phone, address=data.address)
db.add(item)
add_discounts(item, data.discounts, db)
db.commit()
return customer_info(item, db)
except SQLAlchemyError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
except Exception:
db.rollback()
raise
@router.put("/{id_}", response_model=schemas.Customer)
def update(
id_: uuid.UUID,
data: schemas.CustomerIn,
db: Session = Depends(get_db),
user: UserToken = Security(get_user, scopes=["customers"]),
) -> schemas.Customer:
try:
item: Customer = db.query(Customer).filter(Customer.id == id_).first()
item.name = data.name
item.phone = data.phone
item.address = data.address
add_discounts(item, data.discounts, db)
db.commit()
return customer_info(item, db)
except SQLAlchemyError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
except Exception:
db.rollback()
raise
def add_discounts(customer: Customer, discounts: List[schemas.DiscountItem], db: Session):
for discount in discounts:
cd = next((d for d in customer.discounts if d.sale_category_id == discount.id_), None)
if cd is None:
cd = CustomerDiscount(discount.id_, round(discount.discount, 5), customer=customer)
customer.discounts.append(cd)
db.add(cd)
else:
cd.discount = round(discount.discount, 5)
@router.delete("/{id_}")
def delete(
id_: uuid.UUID,
db: Session = Depends(get_db),
user: UserToken = Security(get_user, scopes=["customers"]),
):
try:
item: Customer = db.query(Customer).filter(Customer.id == id_).first()
if item is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Customer not found",
)
db.delete(item)
db.commit()
except Exception:
db.rollback()
raise
@router.get("", response_model=schemas.CustomerBlank)
def show_blank(
db: Session = Depends(get_db),
user: UserToken = Security(get_user, scopes=["customers"]),
) -> schemas.CustomerBlank:
return blank_customer_info()
@router.get("/list", response_model=List[schemas.Customer])
def show_list(db: Session = Depends(get_db), user: UserToken = Depends(get_user)) -> List[schemas.Customer]:
return [customer_info_for_list(item) for item in db.query(Customer).order_by(Customer.name).all()]
@router.get("/query", response_model=List[schemas.Customer])
async def show_term(
q: str,
db: Session = Depends(get_db),
current_user: UserToken = Depends(get_user),
) -> List[schemas.Customer]:
query = db.query(Customer)
if q is not None:
for item in q.split():
query = query.filter(or_(Customer.name.ilike(f"%{item}%"), Customer.phone.ilike(f"%{item}%")))
query = query.order_by(Customer.name).all()
return [customer_info_for_list(item) for item in query]
@router.get("/{id_}", response_model=schemas.Customer)
def show_id(
id_: uuid.UUID,
db: Session = Depends(get_db),
user: UserToken = Security(get_user, scopes=["customers"]),
) -> schemas.Customer:
item: Customer = db.query(Customer).filter(Customer.id == id_).first()
return customer_info(item, db)
def customer_info(item: Customer, db: Session) -> schemas.Customer:
return schemas.Customer(
id=item.id,
name=item.name,
address=item.address,
phone=item.phone,
discounts=[
{
"id": sc.id,
"name": sc.name,
"discount": next((d.discount for d in item.discounts if d.sale_category_id == sc.id), 0),
}
for sc in db.query(SaleCategory).order_by(SaleCategory.name).all()
],
)
def customer_info_for_list(item: Customer) -> schemas.Customer:
return schemas.Customer(
id=item.id,
name=item.name,
address=item.address,
phone=item.phone,
discounts=[
{
"id": d.sale_category_id,
"name": d.sale_category.name,
"discount": d.discount,
}
for d in item.discounts
if d.discount != 0
],
)
def blank_customer_info() -> schemas.CustomerBlank:
return schemas.CustomerBlank(name="", address="", phone="", discounts=[])