From 7ce11847197460f98805c2a2cf73622910a0fbca Mon Sep 17 00:00:00 2001 From: Blackwhitebear8 Date: Sun, 6 Jul 2025 17:59:49 +0200 Subject: [PATCH] Upload files to "modules" --- modules/__init__.py | Bin 0 -> 1024 bytes modules/bgp.py | 84 ++++++++++++++++++ modules/parse.py | 58 ++++++++++++ modules/visual_route.py | 189 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 331 insertions(+) create mode 100644 modules/__init__.py create mode 100644 modules/bgp.py create mode 100644 modules/parse.py create mode 100644 modules/visual_route.py diff --git a/modules/__init__.py b/modules/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..06d7405020018ddf3cacee90fd4af10487da3d20 GIT binary patch literal 1024 ScmZQz7zLvtFd70QH3R?z00031 literal 0 HcmV?d00001 diff --git a/modules/bgp.py b/modules/bgp.py new file mode 100644 index 0000000..a4101dc --- /dev/null +++ b/modules/bgp.py @@ -0,0 +1,84 @@ +def parse_bgp_data(data): + ipv4_section = "" + ipv6_section = "" + ipv4_info = {} + ipv6_info = {} + + if "data" in data: + raw_data = data["data"] + + ipv4_marker = "IPv4 Unicast Summary (VRF bgp):" + ipv6_marker = "IPv6 Unicast Summary (VRF bgp):" + + ipv4_start = raw_data.find(ipv4_marker) + ipv6_start = raw_data.find(ipv6_marker) + + if ipv4_start != -1: + if ipv6_start != -1: + ipv4_section = raw_data[ipv4_start + len(ipv4_marker):ipv6_start].strip() + else: + ipv4_section = raw_data[ipv4_start + len(ipv4_marker):].strip() + ipv4_info = extract_bgp_info(ipv4_section) + + if ipv6_start != -1: + ipv6_section = raw_data[ipv6_start + len(ipv6_marker):].strip() + ipv6_info = extract_bgp_info(ipv6_section) + + def process_peers(peer_data): + peers = [] + for line in peer_data.split("\n"): + if line.strip().startswith("Neighbor"): + continue + if line.strip(): + peer_info = line.split() + if len(peer_info) >= 12: + peers.append({ + "neighbor": peer_info[0], + "version": peer_info[1], + "as_number": peer_info[2], + "msg_received": peer_info[3], + "msg_sent": peer_info[4], + "table_version": peer_info[5], + "in_queue": peer_info[6], + "out_queue": peer_info[7], + "up_down": peer_info[8], + "state_pfx_rcd": peer_info[9], + "prefix_sent": peer_info[10], + "description": " ".join(peer_info[11:]) + }) + return peers + + ipv4_peers = process_peers(ipv4_section) + ipv6_peers = process_peers(ipv6_section) + + return ipv4_info, ipv4_peers, ipv6_info, ipv6_peers + +def extract_bgp_info(raw_data): + lines = raw_data.split("\n") + info = {} + for line in lines: + if "BGP router identifier" in line: + parts = line.split(",") + info["router_id"] = parts[0].split("identifier")[1].strip() + info["local_as"] = parts[1].split("number")[1].strip().split(" ")[0] + if "vrf-id" in parts[-1]: + info["vrf_id"] = parts[-1].split("vrf-id")[1].strip() + if "BGP table version" in line: + info["table_version"] = line.split("version")[1].strip() + if "RIB entries" in line: + parts = line.split(",") + info["rib_entries"] = parts[0].split("entries")[1].strip() + info["rib_memory"] = parts[1].split("using")[1].strip() + if "Peers" in line: + parts = line.split(",") + info["peers"] = parts[0].split("Peers")[1].strip() + info["peers_memory"] = parts[1].split("using")[1].strip() + return info + +def generate_bgp_json(ipv4_info, ipv4_peers, ipv6_info, ipv6_peers): + return { + "ipv4_info": ipv4_info, + "ipv4_peers": ipv4_peers, + "ipv6_info": ipv6_info, + "ipv6_peers": ipv6_peers + } \ No newline at end of file diff --git a/modules/parse.py b/modules/parse.py new file mode 100644 index 0000000..209b354 --- /dev/null +++ b/modules/parse.py @@ -0,0 +1,58 @@ +import subprocess +import json +import os +from dotenv import load_dotenv + +load_dotenv() + +VYOS_API_URL = os.getenv("VYOS_API_URL") +VYOS_API_KEY = os.getenv("VYOS_API_KEY") + +def run_bgp_curl_command(): + curl_command = [ + "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", + "--form", "data={\"op\": \"show\", \"path\": [\"bgp\", \"vrf\", \"bgp\", \"summ\"]}", + "--form", f"key={VYOS_API_KEY}" + ] + response = subprocess.check_output(curl_command, text=True) + return json.loads(response) + +def run_arp_curl_command(): + curl_command = [ + "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", + "--form", "data={\"op\": \"show\", \"path\": [\"arp\"]}", + "--form", f"key={VYOS_API_KEY}" + ] + response = subprocess.check_output(curl_command, text=True) + return json.loads(response) + +def run_neighbors_curl_command(): + curl_command = [ + "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", + "--form", "data={\"op\": \"show\", \"path\": [\"ipv6\", \"neighbors\"]}", + "--form", f"key={VYOS_API_KEY}" + ] + response = subprocess.check_output(curl_command, text=True) + return json.loads(response) + +def run_interfaces_curl_command(): + curl_command = [ + "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", + "--form", "data={\"op\": \"show\", \"path\": [\"interfaces\"]}", + "--form", f"key={VYOS_API_KEY}" + ] + response = subprocess.check_output(curl_command, text=True) + return json.loads(response) + +def run_bgp_route_curl_command(ip_version, bgprouteprefix): + data_json = { + "op": "show", + "path": ["bgp", "vrf", "bgp", ip_version, bgprouteprefix] + } + curl_command = [ + "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show", + "--form", f"data={json.dumps(data_json)}", + "--form", f"key={VYOS_API_KEY}" + ] + response = subprocess.check_output(curl_command, text=True) + return json.loads(response) \ No newline at end of file diff --git a/modules/visual_route.py b/modules/visual_route.py new file mode 100644 index 0000000..611b55f --- /dev/null +++ b/modules/visual_route.py @@ -0,0 +1,189 @@ +import requests +import re +import json +import socket +import textwrap +import os +import ipaddress +from dotenv import load_dotenv + +load_dotenv() + +TRANSIT_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_TRANSIT', '').split(','))) +IX_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_IX', '').split(','))) +CUSTOMER_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_CUSTOMER', '').split(','))) + +API_URL = os.getenv('BGP_API_URL', 'http://127.0.0.1:5000/bgp-route/lookup') +AS_NAME_CACHE = {} +AS_NAME_WRAP_WIDTH = 25 +ROUTER_NAME = os.getenv('BGP_VIS_ROUTER_NAME', 'My Router') + +def _bulk_get_as_names(asn_numbers: list[str]): + lookup_list = [asn for asn in asn_numbers if asn not in AS_NAME_CACHE] + if not lookup_list: return + print(f"Performing bulk lookup for {len(lookup_list)} AS numbers...") + query = "begin\n" + "\n".join(f"AS{asn}" for asn in lookup_list) + "\nend\n" + try: + with socket.create_connection(('bgp.tools', 43), timeout=10) as s: + s.sendall(query.encode('utf-8')) + response_data = b"" + while True: + chunk = s.recv(4096) + if not chunk: break + response_data += chunk + response_str = response_data.decode('utf-8') + for line in response_str.splitlines(): + parts = line.split('|') + if len(parts) > 1: + asn_num_str = parts[0].strip() + as_name = parts[-1].strip() + if asn_num_str.isdigit(): AS_NAME_CACHE[asn_num_str] = as_name + except (socket.error, socket.timeout) as e: + print(f"Bulk AS name lookup failed: {e}") + for asn in lookup_list: + if asn not in AS_NAME_CACHE: AS_NAME_CACHE[asn] = "" + +def _get_bgp_data(ip_address: str, ip_version: str) -> str | None: + print(f"Querying API for {ip_version} address: {ip_address}...") + payload = {"ip_version": ip_version, "bgprouteprefix": ip_address} + headers = {"Content-Type": "application/json"} + try: + response = requests.post(API_URL, json=payload, headers=headers, timeout=10) + response.raise_for_status() + data = response.json() + if data.get("success") and data.get("data"): return data["data"] + else: return None + except requests.exceptions.RequestException: return None + +def _parse_bgp_paths_to_graph(bgp_data: str) -> dict: + prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data) + prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix" + lines = bgp_data.split('\n') + + path_blocks = [] + try: + paths_header_index = next(i for i, line in enumerate(lines) if "Paths:" in line) + current_block = [] + for line in lines[paths_header_index + 1:]: + stripped_line = line.strip() + if not stripped_line: continue + + is_new_path_line = False + if line.startswith(' ') and not line.startswith(' '): + first_word = stripped_line.split(' ')[0].replace(',', '') + if first_word.isdigit(): + is_new_path_line = True + + if is_new_path_line: + if current_block: path_blocks.append(current_block) + current_block = [line] + elif current_block: + current_block.append(line) + + if current_block: path_blocks.append(current_block) + except StopIteration: + return {"nodes": [], "edges": []} + + all_paths_info = [] + best_path_info = None + for block in path_blocks: + block_text = "\n".join(block) + clean_lines = [line for line in block if not line.strip().startswith("AddPath ID:")] + block_text_for_check = "\n".join(clean_lines) + is_best = bool(re.search(r'\bbest\b', block_text_for_check, re.IGNORECASE)) + is_multipath = 'multipath' in block_text_for_check.lower() + + path_line = block[0].strip() + path_asns_raw = [] + for part in path_line.split(' '): + clean_part = part.replace(',', '').strip() + if clean_part.isdigit(): + path_asns_raw.append(clean_part) + else: + break + path_asns = list(dict.fromkeys(path_asns_raw)) + + local_pref_match = re.search(r'localpref (\d+)', block_text) + local_pref = int(local_pref_match.group(1)) if local_pref_match else None + next_hop_match = re.search(r'^\s*([\da-fA-F:.]+)\s+from', block_text, re.MULTILINE) + next_hop = next_hop_match.group(1) if next_hop_match else None + community_match = re.search(r'Large Community: ([\d:]+)', block_text) + community = community_match.group(1) if community_match else None + category = 'other' + if community: + if community in TRANSIT_COMMUNITIES: category = 'transit' + elif community in IX_COMMUNITIES: category = 'ix' + elif community in CUSTOMER_COMMUNITIES: category = 'customer' + path_info = {"asns": path_asns, "local_pref": local_pref, "next_hop": next_hop, "is_best": is_best, "is_multipath": is_multipath, "community": community, "category": category} + all_paths_info.append(path_info) + if is_best and not best_path_info: + best_path_info = path_info + + all_asns_in_graph = {asn for path in all_paths_info for asn in path['asns']} + if all_asns_in_graph: + _bulk_get_as_names(list(all_asns_in_graph)) + + ordered_paths = sorted(all_paths_info, key=lambda p: (not p['is_best'], not p['is_multipath'])) + + nodes, edges = [], [] + X_SEPARATION, Y_SEPARATION = 300, 200 + max_path_len = max((len(p['asns']) for p in ordered_paths if p['asns']), default=0) + nodes.append({"id": ROUTER_NAME, "label": f"{ROUTER_NAME}", "color": '#FADBD8', "x": 0, "y": 0, "fixed": True, "path_category": "global", "is_active": True}) + nodes.append({"id": prefix, "label": f"{prefix}", "color": '#FADBD8', "x": (max_path_len + 1) * X_SEPARATION, "y": 0, "fixed": True, "path_category": "global", "is_active": True}) + + y_pos_counter_up, y_pos_counter_down = 1, 1 + for i, path_info in enumerate(ordered_paths): + lane_y = 0 + if i > 0: + if y_pos_counter_up <= y_pos_counter_down: lane_y = y_pos_counter_up * Y_SEPARATION; y_pos_counter_up += 1 + else: lane_y = -y_pos_counter_down * Y_SEPARATION; y_pos_counter_down += 1 + + style, is_active_path = {}, False + if path_info['is_best']: + style = {"node_color": '#FADBD8', "edge_color": '#C0392B', "width": 3, "dashes": False, "path_type": " (best)"}; is_active_path = True + elif path_info['is_multipath']: + style = {"node_color": '#FDEBD0', "edge_color": '#F39C12', "width": 2, "dashes": False, "path_type": " (multipath)"}; is_active_path = True + else: + style = {"node_color": '#D6DBDF', "edge_color": '#2C3E50', "width": 1, "dashes": True, "path_type": ""}; is_active_path = False + + path_node_ids = [] + for j, asn in enumerate(path_info['asns']): + unique_node_id = f"AS{asn}-{i}" + path_node_ids.append(unique_node_id) + as_name = AS_NAME_CACHE.get(asn, ""); wrapped_name = '\n'.join(textwrap.wrap(as_name, width=AS_NAME_WRAP_WIDTH)) if as_name else "" + base_label = f"AS{asn}" + if j == 0 and path_info['local_pref'] is not None: base_label += f" (LP: {path_info['local_pref']})" + label = f"{base_label}\n{wrapped_name}" + if j == 0 and path_info['next_hop']: label += f"\nNext Hop: {path_info['next_hop']}{style['path_type']}" + nodes.append({"id": unique_node_id, "label": label, "color": style['node_color'], "x": (j + 1) * X_SEPARATION, "y": lane_y, "fixed": True, "path_category": path_info['category'], "is_active": is_active_path}) + + full_chain = [ROUTER_NAME] + path_node_ids + [prefix] + smooth_config = {"enabled": True, "type": "cubicBezier", "forceDirection": "horizontal", "roundness": 0.85} + for j in range(len(full_chain) - 1): + edges.append({"from": full_chain[j], "to": full_chain[j+1], "color": style['edge_color'], "width": style['width'], "dashes": style['dashes'], "path_category": path_info['category'], "is_active": is_active_path, "smooth": smooth_config}) + + return {"nodes": nodes, "edges": edges, "path_count": len(ordered_paths)} + +def generate_visual_route_graph(ip_address_str: str) -> dict: + if not ip_address_str: + return {"error": "IP address is required."} + address_to_lookup, ip_version = "", "" + if '/' in ip_address_str: + try: + net_obj = ipaddress.ip_network(ip_address_str, strict=False) + ip_version, address_to_lookup = f"ipv{net_obj.version}", net_obj.with_prefixlen + except ValueError: + return {"error": f"Invalid CIDR notation: {ip_address_str}"} + else: + try: + ip_obj = ipaddress.ip_address(ip_address_str) + ip_version, address_to_lookup = f"ipv{ip_obj.version}", ip_address_str + except ValueError: + return {"error": f"Invalid IP address: {ip_address_str}"} + bgp_data = _get_bgp_data(address_to_lookup, ip_version) + if not bgp_data: + return {"error": f"Failed to retrieve BGP data for {address_to_lookup}."} + graph_data = _parse_bgp_paths_to_graph(bgp_data) + if not graph_data.get("nodes"): + return {"error": "Could not parse valid AS paths from the API response."} + return graph_data \ No newline at end of file