import uuid from typing import List import barker.schemas.customer as schemas from fastapi import APIRouter, Depends, HTTPException, Security, status from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Session from ..core.security import get_current_active_user as get_user from ..db.session import SessionLocal from ..models.customer import Customer from ..schemas.user_token import UserToken router = APIRouter() # Dependency def get_db(): try: db = SessionLocal() yield db finally: db.close() @router.post("", response_model=schemas.Customer) def save( data: schemas.CustomerIn, db: Session = Depends(get_db), user: UserToken = Security(get_user, scopes=["customers"]), ) -> schemas.Customer: try: item = Customer(name=data.name, phone=data.phone, address=data.address) db.add(item) db.commit() return customer_info(item) except SQLAlchemyError as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e), ) except Exception: db.rollback() raise @router.put("/{id_}", response_model=schemas.Customer) def update( id_: uuid.UUID, data: schemas.CustomerIn, db: Session = Depends(get_db), user: UserToken = Security(get_user, scopes=["customers"]), ) -> schemas.Customer: try: item: Customer = db.query(Customer).filter(Customer.id == id_).first() item.name = data.name item.phone = data.phone item.address = data.address db.commit() return customer_info(item) except SQLAlchemyError as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e), ) except Exception: db.rollback() raise @router.delete("/{id_}") def delete( id_: uuid.UUID, db: Session = Depends(get_db), user: UserToken = Security(get_user, scopes=["customers"]), ): try: item: Customer = db.query(Customer).filter(Customer.id == id_).first() if item is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Sale Category not found", ) db.delete(item) db.commit() except Exception: db.rollback() raise @router.get("", response_model=schemas.CustomerIn) def show_blank( db: Session = Depends(get_db), user: UserToken = Security(get_user, scopes=["customers"]), ) -> schemas.CustomerIn: return blank_customer_info() @router.get("/list", response_model=List[schemas.Customer]) def show_list(db: Session = Depends(get_db), user: UserToken = Depends(get_user)) -> List[schemas.Customer]: return [customer_info(item) for item in db.query(Customer).order_by(Customer.name).all()] @router.get("/query", response_model=List[schemas.Customer]) async def show_term( q: str, db: Session = Depends(get_db), current_user: UserToken = Depends(get_user), ) -> List[schemas.Customer]: return [ customer_info(item) for item in db.query(Customer).filter(Customer.phone.ilike(f"%{q}%")).order_by(Customer.name).all() ] @router.get("/{id_}", response_model=schemas.Customer) def show_id( id_: uuid.UUID, db: Session = Depends(get_db), user: UserToken = Security(get_user, scopes=["customers"]), ) -> schemas.Customer: item: Customer = db.query(Customer).filter(Customer.id == id_).first() return customer_info(item) def customer_info(item: Customer) -> schemas.Customer: return schemas.Customer( id=item.id, name=item.name, address=item.address, phone=item.phone, ) def blank_customer_info() -> schemas.CustomerIn: return schemas.CustomerIn(name="", address="", phone="")