From db55c822b03c218526c84c501ba3fce72aa7be9e Mon Sep 17 00:00:00 2001 From: Blackwhitebear8 Date: Sat, 5 Jul 2025 16:20:38 +0200 Subject: [PATCH] Add modules/visual_route.py --- modules/visual_route.py | 181 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 181 insertions(+) create mode 100644 modules/visual_route.py diff --git a/modules/visual_route.py b/modules/visual_route.py new file mode 100644 index 0000000..b859b77 --- /dev/null +++ b/modules/visual_route.py @@ -0,0 +1,181 @@ +import requests +import re +import json +import socket +import textwrap +import os +import ipaddress +from dotenv import load_dotenv + +load_dotenv() + +API_URL = os.getenv('BGP_API_URL', 'http://192.168.5.16:5000/bgp-route/lookup') +AS_NAME_CACHE = {} +AS_NAME_WRAP_WIDTH = 25 +ROUTER_NAME = os.getenv('BGP_VIS_ROUTER_NAME', 'My Router') + +def _bulk_get_as_names(asn_numbers: list[str]): + lookup_list = [asn for asn in asn_numbers if asn not in AS_NAME_CACHE] + if not lookup_list: + return + print(f"Performing bulk lookup for {len(lookup_list)} AS numbers...") + query = "begin\n" + "\n".join(f"AS{asn}" for asn in lookup_list) + "\nend\n" + try: + with socket.create_connection(('bgp.tools', 43), timeout=10) as s: + s.sendall(query.encode('utf-8')) + response_data = b"" + while True: + chunk = s.recv(4096) + if not chunk: break + response_data += chunk + response_str = response_data.decode('utf-8') + for line in response_str.splitlines(): + parts = line.split('|') + if len(parts) > 1: + asn_num_str = parts[0].strip() + as_name = parts[-1].strip() + if asn_num_str.isdigit(): AS_NAME_CACHE[asn_num_str] = as_name + except (socket.error, socket.timeout) as e: + print(f"Bulk AS name lookup failed: {e}") + for asn in lookup_list: + if asn not in AS_NAME_CACHE: AS_NAME_CACHE[asn] = "" + +def _get_bgp_data(ip_address: str, ip_version: str) -> str | None: + print(f"Querying API for {ip_version} address: {ip_address}...") + payload = {"ip_version": ip_version, "bgprouteprefix": ip_address} + headers = {"Content-Type": "application/json"} + try: + response = requests.post(API_URL, json=payload, headers=headers, timeout=10) + response.raise_for_status() + data = response.json() + if data.get("success") and data.get("data"): return data["data"] + else: return None + except requests.exceptions.RequestException: return None + +def _parse_bgp_paths_to_graph(bgp_data: str) -> dict: + prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data) + prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix" + lines = bgp_data.split('\n') + try: + start_index = next(i for i, line in enumerate(lines) if "Paths:" in line) + 1 + except StopIteration: return {"nodes": [], "edges": []} + + path_blocks, current_block = [], [] + for line in lines[start_index:]: + stripped_line = line.strip() + first_word_is_digit = stripped_line and stripped_line.split(' ')[0].replace(',', '').isdigit() + is_local_path = (stripped_line == 'Local') + is_new_path_line = (line.startswith(' ') and not line.startswith(' ') and (first_word_is_digit or is_local_path)) + if is_new_path_line: + if current_block: path_blocks.append(current_block) + current_block = [line] + elif stripped_line and current_block: + current_block.append(line) + if current_block: path_blocks.append(current_block) + + all_paths, best_path_asns = [], [] + for block in path_blocks: + path_line = block[0] + clean_path_line = path_line.strip() + match = re.search(r'[,(]', clean_path_line) + if match: clean_path_line = clean_path_line[:match.start()] + path_asns_raw = re.findall(r'\b(\d+)\b', clean_path_line) + path_asns = list(dict.fromkeys(path_asns_raw)) + if path_asns not in all_paths: all_paths.append(path_asns) + if 'best' in "\n".join(block).lower() and not best_path_asns: + best_path_asns = path_asns + + if not all_paths and best_path_asns is not None and not best_path_asns: + if any('Local' in block[0] for block in path_blocks): + all_paths.append([]) + if 'best' in bgp_data: best_path_asns = [] + if not all_paths and best_path_asns is not None and not best_path_asns: + all_paths.append([]) + + all_asns_in_graph = {asn for path in all_paths for asn in path} + if all_asns_in_graph: + _bulk_get_as_names(list(all_asns_in_graph)) + + ordered_paths = [] + if best_path_asns is not None: ordered_paths.append(best_path_asns) + for path in all_paths: + if path != best_path_asns: ordered_paths.append(path) + + node_lanes, node_levels, max_level = {}, {}, 0 + Y_SEPARATION, X_SEPARATION = 110, 280 + y_lane_alternator = 1 + for i, path in enumerate(ordered_paths): + lane_y = 0 + if i > 0 or best_path_asns is None: + lane_y = y_lane_alternator * Y_SEPARATION + if y_lane_alternator > 0: y_lane_alternator *= -1 + else: y_lane_alternator = (y_lane_alternator * -1) + 1 + full_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path] + for node_id in full_chain: + if node_id not in node_lanes: node_lanes[node_id] = lane_y + + for path in all_paths: + full_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path] + for level, node_id in enumerate(full_chain): + node_levels[node_id] = max(node_levels.get(node_id, 0), level) + max_level = max(max_level, level) + + node_levels[prefix] = max_level + 1 + if prefix not in node_lanes: node_lanes[prefix] = 0 + + nodes, edges = [], [] + best_path_node_ids = {f"AS{asn}" for asn in best_path_asns} | {ROUTER_NAME, prefix} if best_path_asns is not None else {ROUTER_NAME, prefix} + + all_node_ids = set(node_levels.keys()) + for node_id in sorted(list(all_node_ids)): + color = '#FADBD8' if node_id in best_path_node_ids else '#D6DBDF' + label, is_endpoint = node_id, (node_id == ROUTER_NAME or node_id == prefix) + if node_id.startswith('AS'): + asn_number = node_id[2:] + as_name = AS_NAME_CACHE.get(asn_number, "") + if as_name: + wrapped_name = '\n'.join(textwrap.wrap(as_name, width=AS_NAME_WRAP_WIDTH)) + label = f"{node_id}\n{wrapped_name}" + else: + label = f"{node_id}" + elif is_endpoint: + label = f"{node_id}" + nodes.append({"id": node_id, "label": label, "color": color, "x": node_levels[node_id] * X_SEPARATION, "y": node_lanes.get(node_id, 0), "fixed": True}) + + edge_map = {} + def add_edge(u, v, color, width, dashes=False): + edge_tuple = tuple(sorted((u, v))) + if u == v: return + if edge_tuple not in edge_map or color == '#C0392B': + edge_map[edge_tuple] = {"from": u, "to": v, "color": color, "width": width, "dashes": dashes} + + if best_path_asns is not None: + path_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in best_path_asns] + [prefix] + for i in range(len(path_chain) - 1): add_edge(path_chain[i], path_chain[i+1], '#C0392B', 3, False) + + for path in all_paths: + if path == best_path_asns: continue + path_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path] + [prefix] + for i in range(len(path_chain) - 1): add_edge(path_chain[i], path_chain[i+1], '#2C3E50', 1, True) + edges = list(edge_map.values()) + + return {"nodes": nodes, "edges": edges} + +def generate_visual_route_graph(ip_address_str: str) -> dict: + if not ip_address_str: + return {"error": "IP address is required."} + try: + ip_obj = ipaddress.ip_address(ip_address_str) + ip_version = f"ipv{ip_obj.version}" + except ValueError: + return {"error": f"Invalid IP address: {ip_address_str}"} + + bgp_data = _get_bgp_data(ip_address_str, ip_version) + if not bgp_data: + return {"error": f"Failed to retrieve BGP data for {ip_address_str}."} + + graph_data = _parse_bgp_paths_to_graph(bgp_data) + if not graph_data.get("nodes"): + return {"error": "Could not parse valid AS paths from the API response."} + + return graph_data \ No newline at end of file