barker/barker/barker/routers/login.py

97 lines
3.5 KiB
Python

import uuid
from datetime import timedelta
from typing import Optional
from fastapi import (
APIRouter,
Cookie,
Depends,
HTTPException,
Response,
Security,
status,
)
from fastapi.responses import JSONResponse
from fastapi.security import OAuth2PasswordRequestForm
from .. import __version__
from ..core.config import settings
from ..core.security import (
Token,
authenticate_user,
create_access_token,
device_allowed,
get_current_active_user,
)
from ..db.session import SessionFuture
from ..models.login_history import LoginHistory
from ..schemas.user_token import UserToken
router = APIRouter()
@router.post("/token", response_model=Token)
async def login_for_access_token(
response: Response,
form_data: OAuth2PasswordRequestForm = Depends(),
device_id: Optional[uuid.UUID] = Cookie(None),
):
with SessionFuture() as db:
user = authenticate_user(form_data.username.strip(), form_data.password.strip(), db)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
allowed, device = device_allowed(user, device_id, db)
db.flush()
if allowed:
history = LoginHistory(user.id, device.id)
db.add(history)
db.commit()
response.set_cookie(key="device_id", value=str(device.id), max_age=10 * 365 * 24 * 60 * 60)
response.set_cookie(key="device", value=device.name, max_age=10 * 365 * 24 * 60 * 60)
response.set_cookie(key="section", value=device.section.name, max_age=10 * 365 * 24 * 60 * 60)
if not allowed:
not_allowed_response = JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
headers={"WWW-Authenticate": "Bearer"},
content={"detail": "Device is not registered"},
)
not_allowed_response.set_cookie(key="device_id", value=str(device.id), max_age=10 * 365 * 24 * 60 * 60)
not_allowed_response.set_cookie(key="device", value=device.name, max_age=10 * 365 * 24 * 60 * 60)
not_allowed_response.set_cookie(key="section", value=device.section.name, max_age=10 * 365 * 24 * 60 * 60)
return not_allowed_response
access_token_expires = timedelta(minutes=settings.JWT_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={
"sub": user.name,
"scopes": ["authenticated"]
+ list(set([p.name.replace(" ", "-").lower() for r in user.roles for p in r.permissions])),
"userId": str(user.id),
"lockedOut": user.locked_out,
"ver": __version__.__version__,
},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/refresh", response_model=Token)
async def refresh_token(user: UserToken = Security(get_current_active_user)):
access_token_expires = timedelta(minutes=settings.JWT_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={
"sub": user.name,
"scopes": user.permissions,
"userId": str(user.id_),
"lockedOut": user.locked_out,
"ver": __version__.__version__,
},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}