barker/barker/barker/core/security.py

123 lines
3.8 KiB
Python

import uuid
from datetime import datetime, timedelta
from typing import List, Optional, Tuple
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from jose import jwt
from jose.exceptions import ExpiredSignatureError
from jwt import PyJWTError
from pydantic import BaseModel, ValidationError
from sqlalchemy import select
from sqlalchemy.orm import Session
from ..core.config import settings
from ..models.device import Device
from ..models.user import User as UserModel
# to get a string like this run:
from ..schemas.user_token import UserToken
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token", scopes={})
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str = ""
scopes: List[str] = []
def f7(seq):
seen = set()
seen_add = seen.add
return [x for x in seq if not (x in seen or seen_add(x))]
def create_access_token(*, data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def get_user(username: str, id_: str, locked_out: bool, scopes: List[str]) -> UserToken:
return UserToken(
id_=uuid.UUID(id_),
name=username,
locked_out=locked_out,
password="",
permissions=scopes,
)
def authenticate_user(username: str, password: str, db: Session) -> Optional[UserModel]:
user = UserModel.auth(username, password, db)
return user
def device_allowed(user: UserModel, device_id: Optional[uuid.UUID], db: Session) -> Tuple[bool, Device]:
device: Device = db.execute(select(Device).where(Device.id == device_id)).scalars().one_or_none()
if device is None:
device = Device.create(db)
allowed = "add-devices" in set([p.name.replace(" ", "-").lower() for r in user.roles for p in r.permissions])
return allowed or device.enabled, device
async def get_current_user(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme),
) -> UserToken:
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (PyJWTError, ValidationError, ExpiredSignatureError):
raise credentials_exception
user = get_user(
username=token_data.username,
id_=payload.get("userId", None),
locked_out=payload.get("lockedOut", True),
scopes=token_scopes,
)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: UserToken = Security(get_current_user, scopes=["authenticated"])
) -> UserToken:
if current_user.locked_out:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user