brewman/brewman/brewman/routers/auth/role.py

168 lines
4.5 KiB
Python
Raw Normal View History

import uuid
2020-10-07 15:18:43 +00:00
from typing import List, Optional
2020-10-07 15:18:43 +00:00
import brewman.schemas.auth as schemas
from fastapi import APIRouter, Depends, HTTPException, Security, status
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from ...core.security import get_current_active_user as get_user
from ...db.session import SessionLocal
2020-10-07 15:18:43 +00:00
from ...models.auth import Permission, Role
from ...schemas.auth import UserToken
router = APIRouter()
# Dependency
def get_db():
try:
db = SessionLocal()
yield db
finally:
db.close()
@router.post("", response_model=schemas.Role)
def save(
2020-10-07 15:18:43 +00:00
data: schemas.RoleIn,
db: Session = Depends(get_db),
user: UserToken = Security(get_user, scopes=["users"]),
):
try:
item = Role(data.name)
db.add(item)
add_permissions(item, data.permissions, db)
db.commit()
return role_info(item, db)
except SQLAlchemyError as e:
db.rollback()
raise HTTPException(
2020-10-07 15:18:43 +00:00
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
except Exception:
db.rollback()
raise
@router.put("/{id_}", response_model=schemas.Role)
def update(
id_: uuid.UUID,
data: schemas.RoleIn,
db: Session = Depends(get_db),
user: UserToken = Security(get_user, scopes=["users"]),
):
try:
item: Role = db.query(Role).filter(Role.id == id_).first()
item.name = data.name
add_permissions(item, data.permissions, db)
db.commit()
return role_info(item, db)
except SQLAlchemyError as e:
db.rollback()
raise HTTPException(
2020-10-07 15:18:43 +00:00
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
)
except Exception:
db.rollback()
raise
def add_permissions(role: Role, permissions: List[schemas.PermissionItem], db):
for permission in permissions:
gp = [p for p in role.permissions if p.id == permission.id_]
gp = None if len(gp) == 0 else gp[0]
if permission.enabled and gp is None:
2020-10-07 16:59:24 +00:00
role.permissions.append(
db.query(Permission).filter(Permission.id == permission.id_).one()
)
elif not permission.enabled and gp:
role.permissions.remove(gp)
@router.delete("/{id_}")
def delete(
2020-10-07 15:18:43 +00:00
id_: uuid.UUID,
db: Session = Depends(get_db),
user: UserToken = Security(get_user, scopes=["users"]),
):
try:
item: Role = db.query(Role).filter(Role.id == id_).first()
if item is None:
raise HTTPException(
2020-10-07 15:18:43 +00:00
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Role not found",
)
else:
raise HTTPException(
2020-10-07 15:18:43 +00:00
status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail="Role deletion not implemented",
)
except Exception:
db.rollback()
raise
@router.get("")
def show_blank(
2020-10-07 15:18:43 +00:00
db: Session = Depends(get_db),
user: UserToken = Security(get_user, scopes=["users"]),
):
return role_info(None, db)
@router.get("/list", response_model=List[schemas.RoleList])
async def show_list(
2020-10-07 15:18:43 +00:00
db: Session = Depends(get_db),
user: UserToken = Security(get_user, scopes=["users"]),
):
return [
{
"id": item.id,
"name": item.name,
2020-10-07 16:59:24 +00:00
"permissions": [
p.name for p in sorted(item.permissions, key=lambda p: p.name)
],
}
for item in db.query(Role).order_by(Role.name).all()
]
@router.get("/{id_}", response_model=schemas.Role)
def show_id(
2020-10-07 15:18:43 +00:00
id_: uuid.UUID,
db: Session = Depends(get_db),
user: UserToken = Security(get_user, scopes=["users"]),
):
item: Role = db.query(Role).filter(Role.id == id_).first()
return role_info(item, db)
def role_info(item: Optional[Role], db):
if item is None:
return {
"name": "",
"permissions": [
{"id": p.id, "name": p.name, "enabled": False}
for p in db.query(Permission).order_by(Permission.name).all()
],
}
else:
return {
"id": item.id,
"name": item.name,
"permissions": [
2020-10-07 15:18:43 +00:00
{
"id": p.id,
"name": p.name,
"enabled": True if p in item.permissions else False,
}
for p in db.query(Permission).order_by(Permission.name).all()
],
}