Router-tools/modules/parse.py

121 lines
No EOL
4.4 KiB
Python

import subprocess
import json
import os
from dotenv import load_dotenv
load_dotenv()
VYOS_API_URL = os.getenv("VYOS_API_URL")
VYOS_API_KEY = os.getenv("VYOS_API_KEY")
def _run_curl(endpoint, data_payload):
curl_command = [
"curl", "-s", "-k",
"--location", "--request", "POST", f"{VYOS_API_URL}{endpoint}",
"--form", f"data={json.dumps(data_payload)}",
"--form", f"key={VYOS_API_KEY}"
]
response = subprocess.check_output(curl_command, text=True)
return json.loads(response)
def get_bgp_base_path():
vrf_name = os.getenv("BGP_VRF_NAME")
if vrf_name:
return ["bgp", "vrf", vrf_name]
else:
return ["bgp"]
def get_bgp_neighbor_config_path(neighbor_ip):
vrf_name = os.getenv("BGP_VRF_NAME")
if vrf_name:
return ["vrf", "name", vrf_name, "protocols", "bgp", "neighbor", neighbor_ip]
else:
return ["protocols", "bgp", "neighbor", neighbor_ip]
def get_route_summary_path(ip_version):
vrf_name = os.getenv("BGP_VRF_NAME")
path = [ip_version, "route"]
if vrf_name:
path.extend(["vrf", vrf_name, "summary"])
else:
path.append("summary")
return path
def run_bgp_curl_command():
path = get_bgp_base_path() + ["summ"]
return _run_curl("/show", {"op": "show", "path": path})
def run_bgp_route_curl_command(ip_version, bgprouteprefix):
path = get_bgp_base_path() + [ip_version, bgprouteprefix]
return _run_curl("/show", {"op": "show", "path": path})
def run_bgp_neighbor_detail_curl_command(ip_version, neighbor_ip):
path_ip_version = "ipv4" if ip_version == "ipv4" else "ipv6"
path = get_bgp_base_path() + [path_ip_version, "neighbors", neighbor_ip]
return _run_curl("/show", {"op": "show", "path": path})
def run_bgp_dampeningv4_curl_command():
path = get_bgp_base_path() + ["ipv4", "dampening", "dampened-paths"]
return _run_curl("/show", {"op": "show", "path": path})
def run_bgp_dampeningv6_curl_command():
path = get_bgp_base_path() + ["ipv6", "dampening", "dampened-paths"]
return _run_curl("/show", {"op": "show", "path": path})
def run_bgp_reset_command(neighbor_ip, soft=False):
path = get_bgp_base_path() + [neighbor_ip]
if soft:
path.append("soft")
return _run_curl("/reset", {'op': 'reset', 'path': path})
def run_bgp_shutdown_command(neighbor_ip):
path = get_bgp_neighbor_config_path(neighbor_ip) + ["shutdown"]
return _run_curl("/configure", {'op': 'set', 'path': path})
def run_bgp_enable_command(neighbor_ip):
path = get_bgp_neighbor_config_path(neighbor_ip) + ["shutdown"]
return _run_curl("/configure", {'op': 'delete', 'path': path})
def run_arp_curl_command():
return _run_curl("/show", {"op": "show", "path": ["arp"]})
def run_neighbors_curl_command():
return _run_curl("/show", {"op": "show", "path": ["ipv6", "neighbors"]})
def run_interfaces_curl_command():
return _run_curl("/show", {"op": "show", "path": ["interfaces"]})
def run_rpki_cache_connection_curl_command():
return _run_curl("/show", {"op": "show", "path": ["rpki", "cache-connection"]})
def run_rpki_lookup_curl_command(lookup_type, query):
return _run_curl("/show", {"op": "show", "path": ["rpki", lookup_type, query]})
def run_bfd_peers_curl_command():
return _run_curl("/show", {"op": "show", "path": ["bfd", "peers"]})
def run_bfd_peer_detail_curl_command(peer_ip):
return _run_curl("/show", {"op": "show", "path": ["bfd", "peer", peer_ip]})
def run_firewall_ipv4_curl_command():
return _run_curl("/show", {"op": "show", "path": ["firewall", "ipv4"]})
def run_firewall_ipv6_curl_command():
return _run_curl("/show", {"op": "show", "path": ["firewall", "ipv6"]})
def run_ipv4_route_summary_curl_command():
path = get_route_summary_path("ip")
return _run_curl("/show", {"op": "show", "path": path})
def run_ipv6_route_summary_curl_command():
path = get_route_summary_path("ipv6")
return _run_curl("/show", {"op": "show", "path": path})
def run_firewall_log_curl_command(ip_version, ruleset_name, rule_number):
name_parts = ruleset_name.lower().replace('_', ' ').split()
path = ["log", "firewall", ip_version]
path.extend(name_parts)
path.extend(["rule", rule_number])
return _run_curl("/show", {"op": "show", "path": path})
def run_dhcpv6_leases_curl_command(pool_name):
return _run_curl("/show", {"op": "show", "path": ["dhcpv6", "server", "leases", "pool", pool_name]})