barker/barker/barker/routers/role.py

145 lines
4.2 KiB
Python

import uuid
from typing import List, Optional
from fastapi import APIRouter, HTTPException, status, Depends, Security
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from ..schemas.auth import UserToken
from ..core.security import get_current_active_user as get_user
from ..db.session import SessionLocal
from ..models.auth import Role, Permission
import barker.schemas.auth as schemas
router = APIRouter()
# Dependency
def get_db():
try:
db = SessionLocal()
yield db
finally:
db.close()
@router.post("", response_model=schemas.Role)
def save(
data: schemas.RoleIn, db: Session = Depends(get_db), user: UserToken = Security(get_user, scopes=["users"]),
):
try:
item = Role(data.name)
db.add(item)
add_permissions(item, data.permissions, db)
db.commit()
return role_info(item, db)
except SQLAlchemyError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e),
)
except Exception:
db.rollback()
raise
@router.put("/{id_}", response_model=schemas.Role)
def update(
id_: uuid.UUID,
data: schemas.RoleIn,
db: Session = Depends(get_db),
user: UserToken = Security(get_user, scopes=["users"]),
):
try:
item: Role = db.query(Role).filter(Role.id == id_).first()
item.name = data.name
add_permissions(item, data.permissions, db)
db.commit()
return role_info(item, db)
except SQLAlchemyError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e),
)
except Exception:
db.rollback()
raise
def add_permissions(role: Role, permissions: List[schemas.PermissionItem], db):
for permission in permissions:
gp = [p for p in role.permissions if p.id == permission.id_]
gp = None if len(gp) == 0 else gp[0]
if permission.enabled and gp is None:
role.permissions.append(db.query(Permission).filter(Permission.id == permission.id_).one())
elif not permission.enabled and gp:
role.permissions.remove(gp)
@router.delete("/{id_}")
def delete(
id_: uuid.UUID, db: Session = Depends(get_db), user: UserToken = Security(get_user, scopes=["users"]),
):
try:
item: Role = db.query(Role).filter(Role.id == id_).first()
if item is None:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Role not found",
)
else:
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Role deletion not implemented",
)
except Exception:
db.rollback()
raise
@router.get("")
def show_blank(
db: Session = Depends(get_db), user: UserToken = Security(get_user, scopes=["users"]),
):
return role_info(None, db)
@router.get("/list", response_model=List[schemas.RoleList])
async def show_list(
db: Session = Depends(get_db), user: UserToken = Security(get_user, scopes=["users"]),
):
return [
{
"id": item.id,
"name": item.name,
"permissions": [p.name for p in sorted(item.permissions, key=lambda p: p.name)],
}
for item in db.query(Role).order_by(Role.name).all()
]
@router.get("/{id_}", response_model=schemas.Role)
def show_id(
id_: uuid.UUID, db: Session = Depends(get_db), user: UserToken = Security(get_user, scopes=["users"]),
):
item: Role = db.query(Role).filter(Role.id == id_).first()
return role_info(item, db)
def role_info(item: Optional[Role], db):
if item is None:
return {
"name": "",
"permissions": [
{"id": p.id, "name": p.name, "enabled": False}
for p in db.query(Permission).order_by(Permission.name).all()
],
}
else:
return {
"id": item.id,
"name": item.name,
"permissions": [
{"id": p.id, "name": p.name, "enabled": True if p in item.permissions else False,}
for p in db.query(Permission).order_by(Permission.name).all()
],
}