barker/barker/barker/routers/role.py

149 lines
4.9 KiB
Python

import uuid
import barker.schemas.role as schemas
from fastapi import APIRouter, HTTPException, Security, status
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from sqlalchemy.sql.functions import count
from ..core.security import get_current_active_user as get_user
from ..db.session import SessionFuture
from ..models.permission import Permission
from ..models.role import Role
from ..models.role_permissions import role_permissions
from ..models.user_roles import user_roles
from ..schemas.user_token import UserToken
router = APIRouter()
@router.post("", response_model=schemas.Role)
def save(
data: schemas.RoleIn,
user: UserToken = Security(get_user, scopes=["users"]),
) -> schemas.Role:
try:
with SessionFuture() as db:
item = Role(data.name)
db.add(item)
add_permissions(item, data.permissions, db)
db.commit()
return role_info(item, db)
except SQLAlchemyError as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
@router.put("/{id_}", response_model=schemas.Role)
def update_route(
id_: uuid.UUID,
data: schemas.RoleIn,
user: UserToken = Security(get_user, scopes=["users"]),
) -> schemas.Role:
try:
with SessionFuture() as db:
item: Role = db.execute(select(Role).where(Role.id == id_)).scalar_one()
item.name = data.name
add_permissions(item, data.permissions, db)
db.commit()
return role_info(item, db)
except SQLAlchemyError as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
@router.delete("/{id_}", response_model=schemas.RoleBlank)
def delete_route(
id_: uuid.UUID,
user: UserToken = Security(get_user, scopes=["users"]),
) -> schemas.RoleBlank:
with SessionFuture() as db:
if db.execute(select(count(user_roles.c.id)).where(user_roles.c.role_id == id_)).scalar_one() > 0:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="This Role has been assigned to users and cannot be deleted.",
)
if db.execute(select(count(role_permissions.c.id)).where(role_permissions.c.role_id == id_)).scalar_one() > 0:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="This Role has permissions and cannot be deleted.",
)
item: Role = db.execute(select(Role).where(Role.id == id_)).scalar_one()
db.delete(item)
db.commit()
return role_blank(db)
@router.get("", response_model=schemas.RoleBlank)
def show_blank(
user: UserToken = Security(get_user, scopes=["users"]),
) -> schemas.RoleBlank:
with SessionFuture() as db:
return role_blank(db)
@router.get("/list", response_model=list[schemas.RoleList])
def show_list(
user: UserToken = Security(get_user, scopes=["users"]),
) -> list[schemas.RoleList]:
with SessionFuture() as db:
return [
schemas.RoleList(
id=item.id,
name=item.name,
permissions=[p.name for p in sorted(item.permissions, key=lambda p: p.name)],
)
for item in db.execute(select(Role).order_by(Role.name)).scalars().all()
]
@router.get("/{id_}", response_model=schemas.Role)
def show_id(
id_: uuid.UUID,
user: UserToken = Security(get_user, scopes=["users"]),
) -> schemas.Role:
with SessionFuture() as db:
item: Role = db.execute(select(Role).where(Role.id == id_)).scalar_one()
return role_info(item, db)
def role_info(item: Role, db: Session) -> schemas.Role:
return schemas.Role(
id=item.id,
name=item.name,
permissions=[
schemas.PermissionItem(
id=p.id,
name=p.name,
enabled=True if p in item.permissions else False,
)
for p in db.execute(select(Permission).order_by(Permission.name)).scalars().all()
],
)
def role_blank(db: Session) -> schemas.RoleBlank:
return schemas.RoleBlank(
name="",
permissions=[
schemas.PermissionItem(id=p.id, name=p.name, enabled=False)
for p in db.execute(select(Permission).order_by(Permission.name)).scalars().all()
],
)
def add_permissions(role: Role, permissions: list[schemas.PermissionItem], db: Session):
for permission in permissions:
gp = next((p for p in role.permissions if p.id == permission.id_), None)
if permission.enabled and gp is None:
role.permissions.append(db.execute(select(Permission).where(Permission.id == permission.id_)).scalar_one())
elif not permission.enabled and gp:
role.permissions.remove(gp)