Permissions are now stored in and gotten from the jwt
This commit is contained in:
parent
fe74ef44bf
commit
2466efb208
@ -1,6 +1,6 @@
|
||||
import uuid
|
||||
from datetime import datetime, timedelta
|
||||
from typing import List
|
||||
from typing import List, Union
|
||||
from jwt import PyJWTError
|
||||
|
||||
from fastapi import Depends, HTTPException, status, Security
|
||||
@ -20,10 +20,7 @@ ALGORITHM = "HS256"
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES = 30
|
||||
|
||||
|
||||
oauth2_scheme = OAuth2PasswordBearer(
|
||||
tokenUrl="/token",
|
||||
scopes={}
|
||||
)
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token", scopes={})
|
||||
|
||||
|
||||
class Token(BaseModel):
|
||||
@ -41,6 +38,7 @@ class User(BaseModel):
|
||||
name: str
|
||||
locked_out: bool = None
|
||||
password: str
|
||||
permissions: List[str]
|
||||
|
||||
|
||||
def f7(seq):
|
||||
@ -69,20 +67,27 @@ def create_access_token(*, data: dict, expires_delta: timedelta = None):
|
||||
return encoded_jwt
|
||||
|
||||
|
||||
def get_user(username: str, db: Session):
|
||||
user = db.query(UserModel).filter(UserModel.name.ilike(username)).first()
|
||||
if user:
|
||||
return User(id_=user.id, name=user.name, locked_out=user.locked_out, password=user.password)
|
||||
def get_user(username: str, id_: str, locked_out: bool, scopes: List[str]):
|
||||
return User(
|
||||
id_=uuid.UUID(id_),
|
||||
name=username,
|
||||
locked_out=locked_out,
|
||||
password="",
|
||||
permissions=scopes,
|
||||
)
|
||||
|
||||
|
||||
def authenticate_user(username: str, password: str, db: Session):
|
||||
def authenticate_user(username: str, password: str, db: Session) -> Union[UserModel, bool]:
|
||||
found, user = UserModel.auth(username, password, db)
|
||||
if not found:
|
||||
return False
|
||||
return user
|
||||
|
||||
|
||||
async def get_current_user(security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
|
||||
async def get_current_user(
|
||||
security_scopes: SecurityScopes,
|
||||
token: str = Depends(oauth2_scheme),
|
||||
):
|
||||
if security_scopes.scopes:
|
||||
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
|
||||
else:
|
||||
@ -101,7 +106,7 @@ async def get_current_user(security_scopes: SecurityScopes, token: str = Depends
|
||||
token_data = TokenData(scopes=token_scopes, username=username)
|
||||
except (PyJWTError, ValidationError):
|
||||
raise credentials_exception
|
||||
user = get_user(username=token_data.username, db=db)
|
||||
user = get_user(username=token_data.username, id_=payload.get("user_id", None), locked_out=payload.get("locked_out", True), scopes=token_scopes)
|
||||
if user is None:
|
||||
raise credentials_exception
|
||||
for scope in security_scopes.scopes:
|
||||
@ -114,9 +119,9 @@ async def get_current_user(security_scopes: SecurityScopes, token: str = Depends
|
||||
return user
|
||||
|
||||
|
||||
async def get_current_active_user(current_user: User = Security(get_current_user, scopes=["authenticated"])):
|
||||
async def get_current_active_user(
|
||||
current_user: User = Security(get_current_user, scopes=["authenticated"])
|
||||
):
|
||||
if current_user.locked_out:
|
||||
raise HTTPException(status_code=400, detail="Inactive user")
|
||||
return current_user
|
||||
|
||||
|
||||
|
@ -48,6 +48,8 @@ async def login_for_access_token(
|
||||
]
|
||||
)
|
||||
),
|
||||
"user_id": str(user.id),
|
||||
"locked_out": user.locked_out
|
||||
},
|
||||
expires_delta=access_token_expires,
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user