brewman/brewman/core/security.py

129 lines
3.6 KiB
Python

import uuid
from datetime import datetime, timedelta
from typing import List, Union
from jwt import PyJWTError
from fastapi import Depends, HTTPException, status, Security
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from pydantic import BaseModel, ValidationError
from sqlalchemy.orm import Session
from jose import jwt
from jose.exceptions import ExpiredSignatureError
from brewman.models.auth import User as UserModel
from ..db.session import SessionLocal
# to get a string like this run:
# openssl rand -hex 32
from ..schemas.auth import UserToken
SECRET_KEY = "8546a61262dab7c05ccf2e26abe30bc10966904df6dfd29259ea85dd0844a8e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token", scopes={})
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str = None
scopes: List[str] = []
def f7(seq):
seen = set()
seen_add = seen.add
return [x for x in seq if not (x in seen or seen_add(x))]
# Dependency
def get_db():
try:
db = SessionLocal()
yield db
finally:
db.close()
def create_access_token(*, data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_user(username: str, id_: str, locked_out: bool, scopes: List[str]) -> UserToken:
return UserToken(
id_=uuid.UUID(id_),
name=username,
locked_out=locked_out,
password="",
permissions=scopes,
)
def authenticate_user(
username: str, password: str, db: Session
) -> Union[UserModel, bool]:
found, user = UserModel.auth(username, password, db)
if not found:
return False
return user
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme),
) -> UserToken:
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = f"Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (PyJWTError, ValidationError, ExpiredSignatureError):
raise credentials_exception
user = get_user(
username=token_data.username,
id_=payload.get("userId", None),
locked_out=payload.get("lockedOut", True),
scopes=token_scopes,
)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
async def get_current_active_user(
current_user: UserToken = Security(get_current_user, scopes=["authenticated"])
) -> UserToken:
if current_user.locked_out:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user