97 lines
3.5 KiB
Python
97 lines
3.5 KiB
Python
import uuid
|
|
|
|
from datetime import timedelta
|
|
from typing import Optional
|
|
|
|
from fastapi import (
|
|
APIRouter,
|
|
Cookie,
|
|
Depends,
|
|
HTTPException,
|
|
Response,
|
|
Security,
|
|
status,
|
|
)
|
|
from fastapi.responses import JSONResponse
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
|
|
from .. import __version__
|
|
from ..core.config import settings
|
|
from ..core.security import (
|
|
Token,
|
|
authenticate_user,
|
|
create_access_token,
|
|
device_allowed,
|
|
get_current_active_user,
|
|
)
|
|
from ..db.session import SessionFuture
|
|
from ..models.login_history import LoginHistory
|
|
from ..schemas.user_token import UserToken
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post("/token", response_model=Token)
|
|
async def login_for_access_token(
|
|
response: Response,
|
|
form_data: OAuth2PasswordRequestForm = Depends(),
|
|
device_id: Optional[uuid.UUID] = Cookie(None),
|
|
):
|
|
with SessionFuture() as db:
|
|
user = authenticate_user(form_data.username, form_data.password, db)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
allowed, device = device_allowed(user, device_id, db)
|
|
db.flush()
|
|
if allowed:
|
|
history = LoginHistory(user.id, device.id)
|
|
db.add(history)
|
|
db.commit()
|
|
response.set_cookie(key="device_id", value=str(device.id), max_age=10 * 365 * 24 * 60 * 60)
|
|
response.set_cookie(key="device", value=device.name, max_age=10 * 365 * 24 * 60 * 60)
|
|
response.set_cookie(key="section", value=device.section.name, max_age=10 * 365 * 24 * 60 * 60)
|
|
if not allowed:
|
|
not_allowed_response = JSONResponse(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
content={"detail": "Device is not registered"},
|
|
)
|
|
not_allowed_response.set_cookie(key="device_id", value=str(device.id), max_age=10 * 365 * 24 * 60 * 60)
|
|
not_allowed_response.set_cookie(key="device", value=device.name, max_age=10 * 365 * 24 * 60 * 60)
|
|
not_allowed_response.set_cookie(key="section", value=device.section.name, max_age=10 * 365 * 24 * 60 * 60)
|
|
return not_allowed_response
|
|
access_token_expires = timedelta(minutes=settings.JWT_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={
|
|
"sub": user.name,
|
|
"scopes": ["authenticated"]
|
|
+ list(set([p.name.replace(" ", "-").lower() for r in user.roles for p in r.permissions])),
|
|
"userId": str(user.id),
|
|
"lockedOut": user.locked_out,
|
|
"ver": __version__.__version__,
|
|
},
|
|
expires_delta=access_token_expires,
|
|
)
|
|
return {"access_token": access_token, "token_type": "bearer"}
|
|
|
|
|
|
@router.post("/refresh", response_model=Token)
|
|
async def refresh_token(user: UserToken = Security(get_current_active_user)):
|
|
access_token_expires = timedelta(minutes=settings.JWT_TOKEN_EXPIRE_MINUTES)
|
|
access_token = create_access_token(
|
|
data={
|
|
"sub": user.name,
|
|
"scopes": user.permissions,
|
|
"userId": str(user.id_),
|
|
"lockedOut": user.locked_out,
|
|
"ver": __version__.__version__,
|
|
},
|
|
expires_delta=access_token_expires,
|
|
)
|
|
return {"access_token": access_token, "token_type": "bearer"}
|