From b295322cba23b4642c6dd18c5eef6c3dd7e223a6 Mon Sep 17 00:00:00 2001 From: Blackwhitebear8 Date: Wed, 13 Aug 2025 17:12:45 +0200 Subject: [PATCH] Update modules/parse.py --- modules/parse.py | 199 +++++++++++++++++------------------------------ 1 file changed, 70 insertions(+), 129 deletions(-) diff --git a/modules/parse.py b/modules/parse.py index 947579d..48e2546 100644 --- a/modules/parse.py +++ b/modules/parse.py @@ -4,149 +4,90 @@ import os from dotenv import load_dotenv load_dotenv() - VYOS_API_URL = os.getenv("VYOS_API_URL") VYOS_API_KEY = os.getenv("VYOS_API_KEY") +def _run_curl(endpoint, data_payload): + curl_command = [ + "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}{endpoint}", + "--form", f"data={json.dumps(data_payload)}", + "--form", f"key={VYOS_API_KEY}" + ] + response = subprocess.check_output(curl_command, text=True) + return json.loads(response) + +def get_bgp_base_path(): + vrf_name = os.getenv("BGP_VRF_NAME") + if vrf_name: + return ["bgp", "vrf", vrf_name] + else: + return ["bgp"] + +def get_bgp_neighbor_config_path(neighbor_ip): + vrf_name = os.getenv("BGP_VRF_NAME") + if vrf_name: + return ["vrf", "name", vrf_name, "protocols", "bgp", "neighbor", neighbor_ip] + else: + return ["protocols", "bgp", "neighbor", neighbor_ip] + def run_bgp_curl_command(): - curl_command = [ - "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", - "--form", "data={\"op\": \"show\", \"path\": [\"bgp\", \"vrf\", \"bgp\", \"summ\"]}", - "--form", f"key={VYOS_API_KEY}" - ] - response = subprocess.check_output(curl_command, text=True) - return json.loads(response) + path = get_bgp_base_path() + ["summ"] + return _run_curl("/show", {"op": "show", "path": path}) -def run_arp_curl_command(): - curl_command = [ - "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", - "--form", "data={\"op\": \"show\", \"path\": [\"arp\"]}", - "--form", f"key={VYOS_API_KEY}" - ] - response = subprocess.check_output(curl_command, text=True) - return json.loads(response) - -def run_neighbors_curl_command(): - curl_command = [ - "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", - "--form", "data={\"op\": \"show\", \"path\": [\"ipv6\", \"neighbors\"]}", - "--form", f"key={VYOS_API_KEY}" - ] - response = subprocess.check_output(curl_command, text=True) - return json.loads(response) - -def run_interfaces_curl_command(): - curl_command = [ - "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", - "--form", "data={\"op\": \"show\", \"path\": [\"interfaces\"]}", - "--form", f"key={VYOS_API_KEY}" - ] - response = subprocess.check_output(curl_command, text=True) - return json.loads(response) - def run_bgp_route_curl_command(ip_version, bgprouteprefix): - data_json = { - "op": "show", - "path": ["bgp", "vrf", "bgp", ip_version, bgprouteprefix] - } - curl_command = [ - "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", - "--form", f"data={json.dumps(data_json)}", - "--form", f"key={VYOS_API_KEY}" - ] - response = subprocess.check_output(curl_command, text=True) - return json.loads(response) + path = get_bgp_base_path() + [ip_version, bgprouteprefix] + return _run_curl("/show", {"op": "show", "path": path}) -def run_rpki_cache_connection_curl_command(): - curl_command = [ - "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", - "--form", "data={\"op\": \"show\", \"path\": [\"rpki\", \"cache-connection\"]}", - "--form", f"key={VYOS_API_KEY}" - ] - response = subprocess.check_output(curl_command, text=True) - return json.loads(response) - -def run_rpki_lookup_curl_command(lookup_type, query): - data_json = { - "op": "show", - "path": ["rpki", lookup_type, query] - } - curl_command = [ - "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", - "--form", f"data={json.dumps(data_json)}", - "--form", f"key={VYOS_API_KEY}" - ] - response = subprocess.check_output(curl_command, text=True) - return json.loads(response) - def run_bgp_neighbor_detail_curl_command(ip_version, neighbor_ip): path_ip_version = "ipv4" if ip_version == "ipv4" else "ipv6" - data_json = { - "op": "show", - "path": ["bgp", "vrf", "bgp", path_ip_version, "neighbors", neighbor_ip] - } - curl_command = [ - "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", - "--form", f"data={json.dumps(data_json)}", - "--form", f"key={VYOS_API_KEY}" - ] - response = subprocess.check_output(curl_command, text=True) - return json.loads(response) - -def run_bfd_peers_curl_command(): - curl_command = [ - "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", - "--form", "data={\"op\": \"show\", \"path\": [\"bfd\", \"peers\"]}", - "--form", f"key={VYOS_API_KEY}" - ] - response = subprocess.check_output(curl_command, text=True) - return json.loads(response) - -def run_bfd_peer_detail_curl_command(peer_ip): - data_json = { - "op": "show", - "path": ["bfd", "peer", peer_ip] - } - curl_command = [ - "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", - "--form", f"data={json.dumps(data_json)}", - "--form", f"key={VYOS_API_KEY}" - ] - response = subprocess.check_output(curl_command, text=True) - return json.loads(response) + path = get_bgp_base_path() + [path_ip_version, "neighbors", neighbor_ip] + return _run_curl("/show", {"op": "show", "path": path}) def run_bgp_dampeningv4_curl_command(): - curl_command = [ - "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", - "--form", "data={\"op\": \"show\", \"path\": [\"bgp\", \"vrf\", \"bgp\", \"ipv4\", \"dampening\", \"dampened-paths\"]}", - "--form", f"key={VYOS_API_KEY}" - ] - response = subprocess.check_output(curl_command, text=True) - return json.loads(response) + path = get_bgp_base_path() + ["ipv4", "dampening", "dampened-paths"] + return _run_curl("/show", {"op": "show", "path": path}) def run_bgp_dampeningv6_curl_command(): - curl_command = [ - "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", - "--form", "data={\"op\": \"show\", \"path\": [\"bgp\", \"vrf\", \"bgp\", \"ipv6\", \"dampening\", \"dampened-paths\"]}", - "--form", f"key={VYOS_API_KEY}" - ] - response = subprocess.check_output(curl_command, text=True) - return json.loads(response) + path = get_bgp_base_path() + ["ipv6", "dampening", "dampened-paths"] + return _run_curl("/show", {"op": "show", "path": path}) + +def run_bgp_reset_command(neighbor_ip, soft=False): + path = get_bgp_base_path() + [neighbor_ip] + if soft: + path.append("soft") + return _run_curl("/reset", {'op': 'reset', 'path': path}) + +def run_bgp_shutdown_command(neighbor_ip): + path = get_bgp_neighbor_config_path(neighbor_ip) + ["shutdown"] + return _run_curl("/configure", {'op': 'set', 'path': path}) + +def run_bgp_enable_command(neighbor_ip): + path = get_bgp_neighbor_config_path(neighbor_ip) + ["shutdown"] + return _run_curl("/configure", {'op': 'delete', 'path': path}) + +def run_arp_curl_command(): + return _run_curl("/show", {"op": "show", "path": ["arp"]}) + +def run_neighbors_curl_command(): + return _run_curl("/show", {"op": "show", "path": ["ipv6", "neighbors"]}) + +def run_interfaces_curl_command(): + return _run_curl("/show", {"op": "show", "path": ["interfaces"]}) + +def run_rpki_cache_connection_curl_command(): + return _run_curl("/show", {"op": "show", "path": ["rpki", "cache-connection"]}) + +def run_rpki_lookup_curl_command(lookup_type, query): + return _run_curl("/show", {"op": "show", "path": ["rpki", lookup_type, query]}) + +def run_bfd_peers_curl_command(): + return _run_curl("/show", {"op": "show", "path": ["bfd", "peers"]}) + +def run_bfd_peer_detail_curl_command(peer_ip): + return _run_curl("/show", {"op": "show", "path": ["bfd", "peer", peer_ip]}) def run_firewall_ipv4_curl_command(): - curl_command = [ - "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", - "--form", "data={\"op\": \"show\", \"path\": [\"firewall\", \"ipv4\"]}", - "--form", f"key={VYOS_API_KEY}" - ] - response = subprocess.check_output(curl_command, text=True) - return json.loads(response) + return _run_curl("/show", {"op": "show", "path": ["firewall", "ipv4"]}) def run_firewall_ipv6_curl_command(): - curl_command = [ - "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", - "--form", "data={\"op\": \"show\", \"path\": [\"firewall\", \"ipv6\"]}", - "--form", f"key={VYOS_API_KEY}" - ] - response = subprocess.check_output(curl_command, text=True) - return json.loads(response) \ No newline at end of file + return _run_curl("/show", {"op": "show", "path": ["firewall", "ipv6"]}) \ No newline at end of file