Moved to FastAPI. Awaiting auth
This commit is contained in:
@ -1,17 +1,2 @@
|
||||
from crypt import crypt
|
||||
import os
|
||||
|
||||
|
||||
def htpasswd(username, password, request):
|
||||
settings = request.registry.settings
|
||||
file = settings['biforst.auth']
|
||||
if not os.path.isfile(file):
|
||||
return None
|
||||
users = {}
|
||||
with open(file) as f:
|
||||
for line in f:
|
||||
login, pwd = line.split(':')
|
||||
users[login] = pwd.rstrip('\n')
|
||||
if username in users and crypt(password, users[username]) == users[username]:
|
||||
return [Authenticated]
|
||||
return None
|
||||
|
||||
22
bifrost/auth.py
Normal file
22
bifrost/auth.py
Normal file
@ -0,0 +1,22 @@
|
||||
import os
|
||||
from crypt import crypt
|
||||
|
||||
|
||||
|
||||
# def setup_auth(app):
|
||||
# htpasswd: str = app.config.HTPASSWD
|
||||
# print(htpasswd)
|
||||
#
|
||||
# @auth.verify_password
|
||||
# def htpasswd(username, password):
|
||||
# if not os.path.isfile(htpasswd):
|
||||
# return None
|
||||
# users = {}
|
||||
# with open(htpasswd) as f:
|
||||
# for line in f:
|
||||
# login, pwd = line.split(':')
|
||||
# users[login] = pwd.rstrip('\n')
|
||||
# if username in users:
|
||||
# return crypt(password, users[username]) == users[username]
|
||||
# else:
|
||||
# return False
|
||||
14
bifrost/config.py
Normal file
14
bifrost/config.py
Normal file
@ -0,0 +1,14 @@
|
||||
from environs import Env
|
||||
|
||||
env = Env()
|
||||
env.read_env() # read .env file, if it exists
|
||||
print("env reloaded")
|
||||
|
||||
|
||||
class Settings:
|
||||
debug: bool = env("DEBUG")
|
||||
host: str = env("HOST")
|
||||
port: int = env.int("PORT")
|
||||
api_url_base: str = env("API_URL_BASE")
|
||||
api_token: str = env("API_TOKEN")
|
||||
htpasswd: str = env("HTPASSWD")
|
||||
@ -1,25 +1,27 @@
|
||||
from sanic import Sanic
|
||||
import uvicorn
|
||||
from fastapi import FastAPI
|
||||
|
||||
from environs import Env
|
||||
import bifrost.routers as routers
|
||||
from .config import Settings as settings
|
||||
|
||||
from bifrost.settings import Settings
|
||||
app = FastAPI()
|
||||
|
||||
app = Sanic(__name__, load_env='BIFROST_')
|
||||
|
||||
@app.get("/")
|
||||
async def root():
|
||||
return {"message": "Hello World"}
|
||||
|
||||
app.include_router(routers.router)
|
||||
|
||||
|
||||
def init():
|
||||
env = Env()
|
||||
env.read_env()
|
||||
uvicorn.run(app, host=settings.host, port=settings.port)
|
||||
# app.add_route(update_view, '/update', methods=['GET'])
|
||||
# # setup_auth(app)
|
||||
|
||||
app.config.from_object(Settings)
|
||||
|
||||
from bifrost.views import update_view
|
||||
app.add_route(update_view, '/update', methods=['GET'])
|
||||
|
||||
app.run(host='0.0.0.0', port=8000, debug=True, access_log=True)
|
||||
app.run(
|
||||
host=app.config.HOST,
|
||||
port=app.config.PORT,
|
||||
debug=app.config.DEBUG,
|
||||
access_log=app.config.ACCESS_LOG
|
||||
)
|
||||
# app.run(
|
||||
# host=app.config.HOST,
|
||||
# port=app.config.PORT,
|
||||
# debug=app.config.DEBUG,
|
||||
# access_log=app.config.ACCESS_LOG
|
||||
# )
|
||||
|
||||
33
bifrost/routers.py
Normal file
33
bifrost/routers.py
Normal file
@ -0,0 +1,33 @@
|
||||
from functools import lru_cache
|
||||
import logging as logger
|
||||
from fastapi import APIRouter, Header, Request, Depends
|
||||
|
||||
from .digital_ocean import update_domain
|
||||
from . import config
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def get_settings():
|
||||
return config.Settings
|
||||
|
||||
@router.get("/update")
|
||||
def update_view(domain: str, request: Request, ip: str = None, x_forwarded_for: str = Header(None), settings: config.Settings = Depends(get_settings)):
|
||||
logger.info('Here is your log')
|
||||
if ip is not None:
|
||||
current_ip = ip
|
||||
elif x_forwarded_for is not None:
|
||||
current_ip = x_forwarded_for
|
||||
else:
|
||||
current_ip = request.ip
|
||||
name, domain = domain.split('.', maxsplit=1)
|
||||
update_domain(domain, name, current_ip, settings.api_url_base, settings.api_token, logger)
|
||||
return {"domain": domain, "name": name, "current_ip": current_ip, "api_base": settings.api_url_base, "api_token": settings.api_token}
|
||||
|
||||
|
||||
# @forbidden_view_config()
|
||||
# def basic_challenge(request):
|
||||
# response = HTTPUnauthorized()
|
||||
# response.headers.update(forget(request))
|
||||
# return response
|
||||
@ -1,9 +0,0 @@
|
||||
from sanic_envconfig import EnvConfig
|
||||
|
||||
|
||||
class Settings(EnvConfig):
|
||||
DEBUG: bool = True
|
||||
HOST: str = '0.0.0.0'
|
||||
PORT: int = 8000
|
||||
API_URL_BASE: str = 'https://api.digitalocean.com/v2'
|
||||
API_TOKEN: str = None
|
||||
@ -1,28 +0,0 @@
|
||||
from sanic.log import logger
|
||||
from sanic.response import json
|
||||
|
||||
from bifrost.digital_ocean import update_domain
|
||||
|
||||
|
||||
async def update_view(request):
|
||||
logger.info('Here is your log')
|
||||
if 'domain' not in request.args or len(request.args['domain']) != 1:
|
||||
return json({"message": "Domain error"})
|
||||
domain = request.args['domain'][0]
|
||||
if 'ip' in request.args and len(request.args['ip']) == 1:
|
||||
current_ip = request.args['ip'][0]
|
||||
else:
|
||||
if 'X-Forwarded-For' in request.headers:
|
||||
current_ip = request.headers['X-Forwarded-For']
|
||||
else:
|
||||
current_ip = request.ip
|
||||
name, domain = domain.split('.', maxsplit=1)
|
||||
update_domain(domain, name, current_ip, request.app.config.API_URL_BASE, request.app.config.API_TOKEN, logger)
|
||||
return json({"domain": domain, "name": name, "current_ip": current_ip, "api_base": request.app.config.API_URL_BASE, "api_token": request.app.config.API_TOKEN})
|
||||
|
||||
|
||||
# @forbidden_view_config()
|
||||
# def basic_challenge(request):
|
||||
# response = HTTPUnauthorized()
|
||||
# response.headers.update(forget(request))
|
||||
# return response
|
||||
Reference in New Issue
Block a user