import subprocess import json import os from dotenv import load_dotenv load_dotenv() VYOS_API_URL = os.getenv("VYOS_API_URL") VYOS_API_KEY = os.getenv("VYOS_API_KEY") def _run_curl(endpoint, data_payload): curl_command = [ "curl", "-s", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}{endpoint}", "--form", f"data={json.dumps(data_payload)}", "--form", f"key={VYOS_API_KEY}" ] response = subprocess.check_output(curl_command, text=True) return json.loads(response) def get_bgp_base_path(): vrf_name = os.getenv("BGP_VRF_NAME") if vrf_name: return ["bgp", "vrf", vrf_name] else: return ["bgp"] def get_bgp_neighbor_config_path(neighbor_ip): vrf_name = os.getenv("BGP_VRF_NAME") if vrf_name: return ["vrf", "name", vrf_name, "protocols", "bgp", "neighbor", neighbor_ip] else: return ["protocols", "bgp", "neighbor", neighbor_ip] def get_route_summary_path(ip_version): vrf_name = os.getenv("BGP_VRF_NAME") path = [ip_version, "route"] if vrf_name: path.extend(["vrf", vrf_name, "summary"]) else: path.append("summary") return path def run_bgp_curl_command(): path = get_bgp_base_path() + ["summ"] return _run_curl("/show", {"op": "show", "path": path}) def run_bgp_route_curl_command(ip_version, bgprouteprefix): path = get_bgp_base_path() + [ip_version, bgprouteprefix] return _run_curl("/show", {"op": "show", "path": path}) def run_bgp_neighbor_detail_curl_command(ip_version, neighbor_ip): path_ip_version = "ipv4" if ip_version == "ipv4" else "ipv6" path = get_bgp_base_path() + [path_ip_version, "neighbors", neighbor_ip] return _run_curl("/show", {"op": "show", "path": path}) def run_bgp_dampeningv4_curl_command(): path = get_bgp_base_path() + ["ipv4", "dampening", "dampened-paths"] return _run_curl("/show", {"op": "show", "path": path}) def run_bgp_dampeningv6_curl_command(): path = get_bgp_base_path() + ["ipv6", "dampening", "dampened-paths"] return _run_curl("/show", {"op": "show", "path": path}) def run_bgp_reset_command(neighbor_ip, soft=False): path = get_bgp_base_path() + [neighbor_ip] if soft: path.append("soft") return _run_curl("/reset", {'op': 'reset', 'path': path}) def run_bgp_shutdown_command(neighbor_ip): path = get_bgp_neighbor_config_path(neighbor_ip) + ["shutdown"] return _run_curl("/configure", {'op': 'set', 'path': path}) def run_bgp_enable_command(neighbor_ip): path = get_bgp_neighbor_config_path(neighbor_ip) + ["shutdown"] return _run_curl("/configure", {'op': 'delete', 'path': path}) def run_arp_curl_command(): return _run_curl("/show", {"op": "show", "path": ["arp"]}) def run_neighbors_curl_command(): return _run_curl("/show", {"op": "show", "path": ["ipv6", "neighbors"]}) def run_interfaces_curl_command(): return _run_curl("/show", {"op": "show", "path": ["interfaces"]}) def run_rpki_cache_connection_curl_command(): return _run_curl("/show", {"op": "show", "path": ["rpki", "cache-connection"]}) def run_rpki_lookup_curl_command(lookup_type, query): return _run_curl("/show", {"op": "show", "path": ["rpki", lookup_type, query]}) def run_bfd_peers_curl_command(): return _run_curl("/show", {"op": "show", "path": ["bfd", "peers"]}) def run_bfd_peer_detail_curl_command(peer_ip): return _run_curl("/show", {"op": "show", "path": ["bfd", "peer", peer_ip]}) def run_firewall_ipv4_curl_command(): return _run_curl("/show", {"op": "show", "path": ["firewall", "ipv4"]}) def run_firewall_ipv6_curl_command(): return _run_curl("/show", {"op": "show", "path": ["firewall", "ipv6"]}) def run_ipv4_route_summary_curl_command(): path = get_route_summary_path("ip") return _run_curl("/show", {"op": "show", "path": path}) def run_ipv6_route_summary_curl_command(): path = get_route_summary_path("ipv6") return _run_curl("/show", {"op": "show", "path": path}) def run_firewall_log_curl_command(ip_version, ruleset_name, rule_number): name_parts = ruleset_name.lower().replace('_', ' ').split() path = ["log", "firewall", ip_version] path.extend(name_parts) path.extend(["rule", rule_number]) return _run_curl("/show", {"op": "show", "path": path}) def run_dhcpv6_leases_curl_command(pool_name): return _run_curl("/show", {"op": "show", "path": ["dhcpv6", "server", "leases", "pool", pool_name]})