import requests import re import json import socket import textwrap import os import ipaddress from dotenv import load_dotenv load_dotenv() API_URL = os.getenv('BGP_API_URL', 'http://192.168.5.16:5000/bgp-route/lookup') AS_NAME_CACHE = {} AS_NAME_WRAP_WIDTH = 25 ROUTER_NAME = os.getenv('BGP_VIS_ROUTER_NAME', 'My Router') def _bulk_get_as_names(asn_numbers: list[str]): lookup_list = [asn for asn in asn_numbers if asn not in AS_NAME_CACHE] if not lookup_list: return print(f"Performing bulk lookup for {len(lookup_list)} AS numbers...") query = "begin\n" + "\n".join(f"AS{asn}" for asn in lookup_list) + "\nend\n" try: with socket.create_connection(('bgp.tools', 43), timeout=10) as s: s.sendall(query.encode('utf-8')) response_data = b"" while True: chunk = s.recv(4096) if not chunk: break response_data += chunk response_str = response_data.decode('utf-8') for line in response_str.splitlines(): parts = line.split('|') if len(parts) > 1: asn_num_str = parts[0].strip() as_name = parts[-1].strip() if asn_num_str.isdigit(): AS_NAME_CACHE[asn_num_str] = as_name except (socket.error, socket.timeout) as e: print(f"Bulk AS name lookup failed: {e}") for asn in lookup_list: if asn not in AS_NAME_CACHE: AS_NAME_CACHE[asn] = "" def _get_bgp_data(ip_address: str, ip_version: str) -> str | None: print(f"Querying API for {ip_version} address: {ip_address}...") payload = {"ip_version": ip_version, "bgprouteprefix": ip_address} headers = {"Content-Type": "application/json"} try: response = requests.post(API_URL, json=payload, headers=headers, timeout=10) response.raise_for_status() data = response.json() if data.get("success") and data.get("data"): return data["data"] else: return None except requests.exceptions.RequestException: return None def _parse_bgp_paths_to_graph(bgp_data: str) -> dict: prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data) prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix" lines = bgp_data.split('\n') start_index = -1 try: start_index = next(i for i, line in enumerate(lines) if "Paths:" in line) + 1 except StopIteration: print("DEBUG: 'Paths:' header not found. Trying fallback parsing method.") try: header_line_index = next(i for i, line in enumerate(lines) if "BGP routing table entry for" in line) start_index = header_line_index + 1 while start_index < len(lines) and not lines[start_index].strip(): start_index += 1 except StopIteration: return {"nodes": [], "edges": []} if start_index == -1 or start_index >= len(lines): return {"nodes": [], "edges": []} path_blocks, current_block = [], [] for line in lines[start_index:]: stripped_line = line.strip() first_word_is_asn = stripped_line and (stripped_line.split(' ')[0].replace(',', '').isdigit() or stripped_line.startswith('From') or stripped_line.startswith('Local')) is_new_path_line = (not line.startswith(' ') and first_word_is_asn) if is_new_path_line: if current_block: path_blocks.append(current_block) current_block = [line] elif stripped_line and current_block: current_block.append(line) if current_block: path_blocks.append(current_block) all_paths, best_path_asns = [], [] for block in path_blocks: path_line = block[0] if "From:" in path_line and len(block) > 1: as_path_line_candidates = [l for l in block if re.search(r'\b\d+\b', l)] if as_path_line_candidates: path_line = as_path_line_candidates[0] clean_path_line = path_line.strip() match = re.search(r'[,(]', clean_path_line) if match: clean_path_line = clean_path_line[:match.start()] path_asns_raw = re.findall(r'\b(\d+)\b', clean_path_line) path_asns = list(dict.fromkeys(path_asns_raw)) if path_asns not in all_paths: all_paths.append(path_asns) if 'best' in "\n".join(block).lower() and not best_path_asns: best_path_asns = path_asns if not all_paths and best_path_asns is not None and not best_path_asns: if any('Local' in block[0] for block in path_blocks): all_paths.append([]) if 'best' in bgp_data: best_path_asns = [] if not all_paths and best_path_asns is not None and not best_path_asns: all_paths.append([]) all_asns_in_graph = {asn for path in all_paths for asn in path} if all_asns_in_graph: _bulk_get_as_names(list(all_asns_in_graph)) ordered_paths = [] if best_path_asns is not None: ordered_paths.append(best_path_asns) for path in all_paths: if path != best_path_asns: ordered_paths.append(path) node_lanes, node_levels, max_level = {}, {}, 0 Y_SEPARATION, X_SEPARATION = 110, 280 y_lane_alternator = 1 for i, path in enumerate(ordered_paths): lane_y = 0 if i > 0 or best_path_asns is None: lane_y = y_lane_alternator * Y_SEPARATION if y_lane_alternator > 0: y_lane_alternator *= -1 else: y_lane_alternator = (y_lane_alternator * -1) + 1 full_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path] for node_id in full_chain: if node_id not in node_lanes: node_lanes[node_id] = lane_y for path in all_paths: full_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path] for level, node_id in enumerate(full_chain): node_levels[node_id] = max(node_levels.get(node_id, 0), level) max_level = max(max_level, level) node_levels[prefix] = max_level + 1 if prefix not in node_lanes: node_lanes[prefix] = 0 nodes, edges = [], [] best_path_node_ids = {f"AS{asn}" for asn in best_path_asns} | {ROUTER_NAME, prefix} if best_path_asns is not None else {ROUTER_NAME, prefix} all_node_ids = set(node_levels.keys()) for node_id in sorted(list(all_node_ids)): color = '#FADBD8' if node_id in best_path_node_ids else '#D6DBDF' label, is_endpoint = node_id, (node_id == ROUTER_NAME or node_id == prefix) if node_id.startswith('AS'): asn_number = node_id[2:] as_name = AS_NAME_CACHE.get(asn_number, "") if as_name: wrapped_name = '\n'.join(textwrap.wrap(as_name, width=AS_NAME_WRAP_WIDTH)) label = f"{node_id}\n{wrapped_name}" else: label = f"{node_id}" elif is_endpoint: label = f"{node_id}" nodes.append({"id": node_id, "label": label, "color": color, "x": node_levels[node_id] * X_SEPARATION, "y": node_lanes.get(node_id, 0), "fixed": True}) edge_map = {} def add_edge(u, v, color, width, dashes=False): edge_tuple = tuple(sorted((u, v))) if u == v: return if edge_tuple not in edge_map or color == '#C0392B': edge_map[edge_tuple] = {"from": u, "to": v, "color": color, "width": width, "dashes": dashes} if best_path_asns is not None: path_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in best_path_asns] + [prefix] for i in range(len(path_chain) - 1): add_edge(path_chain[i], path_chain[i+1], '#C0392B', 3, False) for path in all_paths: if path == best_path_asns: continue path_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path] + [prefix] for i in range(len(path_chain) - 1): add_edge(path_chain[i], path_chain[i+1], '#2C3E50', 1, True) edges = list(edge_map.values()) return {"nodes": nodes, "edges": edges} def generate_visual_route_graph(ip_address_str: str) -> dict: if not ip_address_str: return {"error": "IP address is required."} address_to_lookup = "" ip_version = "" if '/' in ip_address_str: try: net_obj = ipaddress.ip_network(ip_address_str, strict=False) ip_version = f"ipv{net_obj.version}" address_to_lookup = net_obj.with_prefixlen except ValueError: return {"error": f"Invalid CIDR notation: {ip_address_str}"} else: try: ip_obj = ipaddress.ip_address(ip_address_str) ip_version = f"ipv{ip_obj.version}" address_to_lookup = ip_address_str except ValueError: return {"error": f"Invalid IP address: {ip_address_str}"} bgp_data = _get_bgp_data(address_to_lookup, ip_version) if not bgp_data: return {"error": f"Failed to retrieve BGP data for {address_to_lookup}."} graph_data = _parse_bgp_paths_to_graph(bgp_data) if not graph_data.get("nodes"): return {"error": "Could not parse valid AS paths from the API response."} return graph_data