bifrost/bifrost/digital_ocean.py

137 lines
4.8 KiB
Python

import json
import requests
def create_domain_a_record(domain, name, ip_address, api_url_base, api_token, logger):
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer {0}".format(api_token),
}
api_url = f"{api_url_base}/domains/{domain}/records"
data = {
"type": "A",
"name": name,
"data": ip_address,
"priority": None,
"port": None,
"ttl": 1800,
"weight": None,
"flags": None,
"tag": None,
}
response = requests.post(api_url, headers=headers, json=data)
if response.status_code > 499:
logger.error(f"[!] [{response.status_code}] Server Error")
return None
elif response.status_code == 404:
logger.error(f"[!] [{response.status_code}] URL not found: [{api_url}]")
return None
elif response.status_code == 401:
logger.error(f"[!] [{response.status_code}] Authentication Failed")
return None
elif response.status_code > 399:
logger.error(f"[!] [{response.status_code}] Bad Request")
logger.error(f"Domain: {domain}, Name: {name}, New IP Address: {ip_address}")
logger.error(response.content)
return None
elif response.status_code > 299:
logger.error(f"[!] [{response.status_code}] Unexpected redirect")
return None
elif response.status_code == 201:
logger.info(f"{domain} updated to {ip_address}")
return json.loads(response.content)
else:
logger.error(
f"[?] Unexpected Error: [HTTP {response.status_code}]: Content: {response.content}"
)
return None
def update_domain_a_record(
domain, record_id, new_ip_address, api_url_base, api_token, logger
):
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer {0}".format(api_token),
}
api_url = f"{api_url_base}/domains/{domain}/records/{record_id}"
data = {"data": new_ip_address}
response = requests.put(api_url, headers=headers, json=data)
if response.status_code > 499:
logger.error(f"[!] [{response.status_code}] Server Error")
return None
elif response.status_code == 404:
logger.error(f"[!] [{response.status_code}] URL not found: [{api_url}]")
return None
elif response.status_code == 401:
logger.error(f"[!] [{response.status_code}] Authentication Failed")
return None
elif response.status_code > 399:
logger.error(f"[!] [{response.status_code}] Bad Request")
logger.error(
f"Domain: {domain}, Record ID: {record_id}, New IP Address: {new_ip_address}"
)
logger.error(response.content)
return None
elif response.status_code > 299:
logger.error(f"[!] [{response.status_code}] Unexpected redirect")
return None
elif response.status_code == 201:
logger.info(f"{domain} updated to {new_ip_address}")
return json.loads(response.content)
else:
logger.error(
f"[?] Unexpected Error: [HTTP {response.status_code}]: Content: {response.content}"
)
return None
def list_all_domain_records(domain, api_url_base, api_token, logger):
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer {0}".format(api_token),
}
api_url = f"{api_url_base}/domains/{domain}/records"
response = requests.get(api_url, headers=headers)
if response.status_code == 200:
return json.loads(response.content.decode("utf-8"))["domain_records"]
else:
logger.error(
f"[?] Unexpected Error: [HTTP {response.status_code}]: Content: {response.content}"
)
return None
def get_domain_a_record(domain, name, api_url_base, api_token, logger):
for item in list_all_domain_records(domain, api_url_base, api_token, logger):
if item["name"] == name and item["type"] == "A":
return item
else:
return None
def update_domain(domain, name, new_ip_address, api_url_base, api_token, logger):
record = get_domain_a_record(domain, name, api_url_base, api_token, logger)
if record is None:
logger.info(
f"Creating domain a record for {domain} with ip address {new_ip_address}"
)
create_domain_a_record(
domain, name, new_ip_address, api_url_base, api_token, logger
)
elif record["data"] != new_ip_address:
logger.info(
f'Updating domain a record for {domain}. Old ip address {record["data"]}, new ip address {new_ip_address}'
)
update_domain_a_record(
domain, record["id"], new_ip_address, api_url_base, api_token, logger
)
else:
logger.info(
f'Not updating domain a record for {domain}. IP address {record["data"]} is current'
)