Update modules/parse.py
This commit is contained in:
parent
ee5d4804ce
commit
b295322cba
1 changed files with 70 additions and 129 deletions
199
modules/parse.py
199
modules/parse.py
|
|
@ -4,149 +4,90 @@ import os
|
|||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
VYOS_API_URL = os.getenv("VYOS_API_URL")
|
||||
VYOS_API_KEY = os.getenv("VYOS_API_KEY")
|
||||
|
||||
def _run_curl(endpoint, data_payload):
|
||||
curl_command = [
|
||||
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}{endpoint}",
|
||||
"--form", f"data={json.dumps(data_payload)}",
|
||||
"--form", f"key={VYOS_API_KEY}"
|
||||
]
|
||||
response = subprocess.check_output(curl_command, text=True)
|
||||
return json.loads(response)
|
||||
|
||||
def get_bgp_base_path():
|
||||
vrf_name = os.getenv("BGP_VRF_NAME")
|
||||
if vrf_name:
|
||||
return ["bgp", "vrf", vrf_name]
|
||||
else:
|
||||
return ["bgp"]
|
||||
|
||||
def get_bgp_neighbor_config_path(neighbor_ip):
|
||||
vrf_name = os.getenv("BGP_VRF_NAME")
|
||||
if vrf_name:
|
||||
return ["vrf", "name", vrf_name, "protocols", "bgp", "neighbor", neighbor_ip]
|
||||
else:
|
||||
return ["protocols", "bgp", "neighbor", neighbor_ip]
|
||||
|
||||
def run_bgp_curl_command():
|
||||
curl_command = [
|
||||
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
|
||||
"--form", "data={\"op\": \"show\", \"path\": [\"bgp\", \"vrf\", \"bgp\", \"summ\"]}",
|
||||
"--form", f"key={VYOS_API_KEY}"
|
||||
]
|
||||
response = subprocess.check_output(curl_command, text=True)
|
||||
return json.loads(response)
|
||||
path = get_bgp_base_path() + ["summ"]
|
||||
return _run_curl("/show", {"op": "show", "path": path})
|
||||
|
||||
def run_arp_curl_command():
|
||||
curl_command = [
|
||||
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
|
||||
"--form", "data={\"op\": \"show\", \"path\": [\"arp\"]}",
|
||||
"--form", f"key={VYOS_API_KEY}"
|
||||
]
|
||||
response = subprocess.check_output(curl_command, text=True)
|
||||
return json.loads(response)
|
||||
|
||||
def run_neighbors_curl_command():
|
||||
curl_command = [
|
||||
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
|
||||
"--form", "data={\"op\": \"show\", \"path\": [\"ipv6\", \"neighbors\"]}",
|
||||
"--form", f"key={VYOS_API_KEY}"
|
||||
]
|
||||
response = subprocess.check_output(curl_command, text=True)
|
||||
return json.loads(response)
|
||||
|
||||
def run_interfaces_curl_command():
|
||||
curl_command = [
|
||||
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
|
||||
"--form", "data={\"op\": \"show\", \"path\": [\"interfaces\"]}",
|
||||
"--form", f"key={VYOS_API_KEY}"
|
||||
]
|
||||
response = subprocess.check_output(curl_command, text=True)
|
||||
return json.loads(response)
|
||||
|
||||
def run_bgp_route_curl_command(ip_version, bgprouteprefix):
|
||||
data_json = {
|
||||
"op": "show",
|
||||
"path": ["bgp", "vrf", "bgp", ip_version, bgprouteprefix]
|
||||
}
|
||||
curl_command = [
|
||||
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
|
||||
"--form", f"data={json.dumps(data_json)}",
|
||||
"--form", f"key={VYOS_API_KEY}"
|
||||
]
|
||||
response = subprocess.check_output(curl_command, text=True)
|
||||
return json.loads(response)
|
||||
path = get_bgp_base_path() + [ip_version, bgprouteprefix]
|
||||
return _run_curl("/show", {"op": "show", "path": path})
|
||||
|
||||
def run_rpki_cache_connection_curl_command():
|
||||
curl_command = [
|
||||
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
|
||||
"--form", "data={\"op\": \"show\", \"path\": [\"rpki\", \"cache-connection\"]}",
|
||||
"--form", f"key={VYOS_API_KEY}"
|
||||
]
|
||||
response = subprocess.check_output(curl_command, text=True)
|
||||
return json.loads(response)
|
||||
|
||||
def run_rpki_lookup_curl_command(lookup_type, query):
|
||||
data_json = {
|
||||
"op": "show",
|
||||
"path": ["rpki", lookup_type, query]
|
||||
}
|
||||
curl_command = [
|
||||
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
|
||||
"--form", f"data={json.dumps(data_json)}",
|
||||
"--form", f"key={VYOS_API_KEY}"
|
||||
]
|
||||
response = subprocess.check_output(curl_command, text=True)
|
||||
return json.loads(response)
|
||||
|
||||
def run_bgp_neighbor_detail_curl_command(ip_version, neighbor_ip):
|
||||
path_ip_version = "ipv4" if ip_version == "ipv4" else "ipv6"
|
||||
data_json = {
|
||||
"op": "show",
|
||||
"path": ["bgp", "vrf", "bgp", path_ip_version, "neighbors", neighbor_ip]
|
||||
}
|
||||
curl_command = [
|
||||
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
|
||||
"--form", f"data={json.dumps(data_json)}",
|
||||
"--form", f"key={VYOS_API_KEY}"
|
||||
]
|
||||
response = subprocess.check_output(curl_command, text=True)
|
||||
return json.loads(response)
|
||||
|
||||
def run_bfd_peers_curl_command():
|
||||
curl_command = [
|
||||
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
|
||||
"--form", "data={\"op\": \"show\", \"path\": [\"bfd\", \"peers\"]}",
|
||||
"--form", f"key={VYOS_API_KEY}"
|
||||
]
|
||||
response = subprocess.check_output(curl_command, text=True)
|
||||
return json.loads(response)
|
||||
|
||||
def run_bfd_peer_detail_curl_command(peer_ip):
|
||||
data_json = {
|
||||
"op": "show",
|
||||
"path": ["bfd", "peer", peer_ip]
|
||||
}
|
||||
curl_command = [
|
||||
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
|
||||
"--form", f"data={json.dumps(data_json)}",
|
||||
"--form", f"key={VYOS_API_KEY}"
|
||||
]
|
||||
response = subprocess.check_output(curl_command, text=True)
|
||||
return json.loads(response)
|
||||
path = get_bgp_base_path() + [path_ip_version, "neighbors", neighbor_ip]
|
||||
return _run_curl("/show", {"op": "show", "path": path})
|
||||
|
||||
def run_bgp_dampeningv4_curl_command():
|
||||
curl_command = [
|
||||
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
|
||||
"--form", "data={\"op\": \"show\", \"path\": [\"bgp\", \"vrf\", \"bgp\", \"ipv4\", \"dampening\", \"dampened-paths\"]}",
|
||||
"--form", f"key={VYOS_API_KEY}"
|
||||
]
|
||||
response = subprocess.check_output(curl_command, text=True)
|
||||
return json.loads(response)
|
||||
path = get_bgp_base_path() + ["ipv4", "dampening", "dampened-paths"]
|
||||
return _run_curl("/show", {"op": "show", "path": path})
|
||||
|
||||
def run_bgp_dampeningv6_curl_command():
|
||||
curl_command = [
|
||||
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
|
||||
"--form", "data={\"op\": \"show\", \"path\": [\"bgp\", \"vrf\", \"bgp\", \"ipv6\", \"dampening\", \"dampened-paths\"]}",
|
||||
"--form", f"key={VYOS_API_KEY}"
|
||||
]
|
||||
response = subprocess.check_output(curl_command, text=True)
|
||||
return json.loads(response)
|
||||
path = get_bgp_base_path() + ["ipv6", "dampening", "dampened-paths"]
|
||||
return _run_curl("/show", {"op": "show", "path": path})
|
||||
|
||||
def run_bgp_reset_command(neighbor_ip, soft=False):
|
||||
path = get_bgp_base_path() + [neighbor_ip]
|
||||
if soft:
|
||||
path.append("soft")
|
||||
return _run_curl("/reset", {'op': 'reset', 'path': path})
|
||||
|
||||
def run_bgp_shutdown_command(neighbor_ip):
|
||||
path = get_bgp_neighbor_config_path(neighbor_ip) + ["shutdown"]
|
||||
return _run_curl("/configure", {'op': 'set', 'path': path})
|
||||
|
||||
def run_bgp_enable_command(neighbor_ip):
|
||||
path = get_bgp_neighbor_config_path(neighbor_ip) + ["shutdown"]
|
||||
return _run_curl("/configure", {'op': 'delete', 'path': path})
|
||||
|
||||
def run_arp_curl_command():
|
||||
return _run_curl("/show", {"op": "show", "path": ["arp"]})
|
||||
|
||||
def run_neighbors_curl_command():
|
||||
return _run_curl("/show", {"op": "show", "path": ["ipv6", "neighbors"]})
|
||||
|
||||
def run_interfaces_curl_command():
|
||||
return _run_curl("/show", {"op": "show", "path": ["interfaces"]})
|
||||
|
||||
def run_rpki_cache_connection_curl_command():
|
||||
return _run_curl("/show", {"op": "show", "path": ["rpki", "cache-connection"]})
|
||||
|
||||
def run_rpki_lookup_curl_command(lookup_type, query):
|
||||
return _run_curl("/show", {"op": "show", "path": ["rpki", lookup_type, query]})
|
||||
|
||||
def run_bfd_peers_curl_command():
|
||||
return _run_curl("/show", {"op": "show", "path": ["bfd", "peers"]})
|
||||
|
||||
def run_bfd_peer_detail_curl_command(peer_ip):
|
||||
return _run_curl("/show", {"op": "show", "path": ["bfd", "peer", peer_ip]})
|
||||
|
||||
def run_firewall_ipv4_curl_command():
|
||||
curl_command = [
|
||||
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
|
||||
"--form", "data={\"op\": \"show\", \"path\": [\"firewall\", \"ipv4\"]}",
|
||||
"--form", f"key={VYOS_API_KEY}"
|
||||
]
|
||||
response = subprocess.check_output(curl_command, text=True)
|
||||
return json.loads(response)
|
||||
return _run_curl("/show", {"op": "show", "path": ["firewall", "ipv4"]})
|
||||
|
||||
def run_firewall_ipv6_curl_command():
|
||||
curl_command = [
|
||||
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
|
||||
"--form", "data={\"op\": \"show\", \"path\": [\"firewall\", \"ipv6\"]}",
|
||||
"--form", f"key={VYOS_API_KEY}"
|
||||
]
|
||||
response = subprocess.check_output(curl_command, text=True)
|
||||
return json.loads(response)
|
||||
return _run_curl("/show", {"op": "show", "path": ["firewall", "ipv6"]})
|
||||
Loading…
Add table
Add a link
Reference in a new issue