diff --git a/modules/visual_route.py b/modules/visual_route.py index b859b77..1d2163f 100644 --- a/modules/visual_route.py +++ b/modules/visual_route.py @@ -28,13 +28,13 @@ def _bulk_get_as_names(asn_numbers: list[str]): chunk = s.recv(4096) if not chunk: break response_data += chunk - response_str = response_data.decode('utf-8') - for line in response_str.splitlines(): - parts = line.split('|') - if len(parts) > 1: - asn_num_str = parts[0].strip() - as_name = parts[-1].strip() - if asn_num_str.isdigit(): AS_NAME_CACHE[asn_num_str] = as_name + response_str = response_data.decode('utf-8') + for line in response_str.splitlines(): + parts = line.split('|') + if len(parts) > 1: + asn_num_str = parts[0].strip() + as_name = parts[-1].strip() + if asn_num_str.isdigit(): AS_NAME_CACHE[asn_num_str] = as_name except (socket.error, socket.timeout) as e: print(f"Bulk AS name lookup failed: {e}") for asn in lookup_list: @@ -56,26 +56,41 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict: prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data) prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix" lines = bgp_data.split('\n') + + start_index = -1 try: start_index = next(i for i, line in enumerate(lines) if "Paths:" in line) + 1 - except StopIteration: return {"nodes": [], "edges": []} + except StopIteration: + print("DEBUG: 'Paths:' header not found. Trying fallback parsing method.") + try: + header_line_index = next(i for i, line in enumerate(lines) if "BGP routing table entry for" in line) + start_index = header_line_index + 1 + while start_index < len(lines) and not lines[start_index].strip(): + start_index += 1 + except StopIteration: + return {"nodes": [], "edges": []} + + if start_index == -1 or start_index >= len(lines): + return {"nodes": [], "edges": []} path_blocks, current_block = [], [] for line in lines[start_index:]: stripped_line = line.strip() - first_word_is_digit = stripped_line and stripped_line.split(' ')[0].replace(',', '').isdigit() - is_local_path = (stripped_line == 'Local') - is_new_path_line = (line.startswith(' ') and not line.startswith(' ') and (first_word_is_digit or is_local_path)) + first_word_is_asn = stripped_line and (stripped_line.split(' ')[0].replace(',', '').isdigit() or stripped_line.startswith('From') or stripped_line.startswith('Local')) + is_new_path_line = (not line.startswith(' ') and first_word_is_asn) if is_new_path_line: if current_block: path_blocks.append(current_block) current_block = [line] elif stripped_line and current_block: current_block.append(line) if current_block: path_blocks.append(current_block) - + all_paths, best_path_asns = [], [] for block in path_blocks: path_line = block[0] + if "From:" in path_line and len(block) > 1: + as_path_line_candidates = [l for l in block if re.search(r'\b\d+\b', l)] + if as_path_line_candidates: path_line = as_path_line_candidates[0] clean_path_line = path_line.strip() match = re.search(r'[,(]', clean_path_line) if match: clean_path_line = clean_path_line[:match.start()] @@ -91,16 +106,13 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict: if 'best' in bgp_data: best_path_asns = [] if not all_paths and best_path_asns is not None and not best_path_asns: all_paths.append([]) - all_asns_in_graph = {asn for path in all_paths for asn in path} if all_asns_in_graph: _bulk_get_as_names(list(all_asns_in_graph)) - ordered_paths = [] if best_path_asns is not None: ordered_paths.append(best_path_asns) for path in all_paths: if path != best_path_asns: ordered_paths.append(path) - node_lanes, node_levels, max_level = {}, {}, 0 Y_SEPARATION, X_SEPARATION = 110, 280 y_lane_alternator = 1 @@ -113,19 +125,15 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict: full_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path] for node_id in full_chain: if node_id not in node_lanes: node_lanes[node_id] = lane_y - for path in all_paths: full_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path] for level, node_id in enumerate(full_chain): node_levels[node_id] = max(node_levels.get(node_id, 0), level) max_level = max(max_level, level) - node_levels[prefix] = max_level + 1 if prefix not in node_lanes: node_lanes[prefix] = 0 - nodes, edges = [], [] best_path_node_ids = {f"AS{asn}" for asn in best_path_asns} | {ROUTER_NAME, prefix} if best_path_asns is not None else {ROUTER_NAME, prefix} - all_node_ids = set(node_levels.keys()) for node_id in sorted(list(all_node_ids)): color = '#FADBD8' if node_id in best_path_node_ids else '#D6DBDF' @@ -141,38 +149,48 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict: elif is_endpoint: label = f"{node_id}" nodes.append({"id": node_id, "label": label, "color": color, "x": node_levels[node_id] * X_SEPARATION, "y": node_lanes.get(node_id, 0), "fixed": True}) - edge_map = {} def add_edge(u, v, color, width, dashes=False): edge_tuple = tuple(sorted((u, v))) if u == v: return if edge_tuple not in edge_map or color == '#C0392B': edge_map[edge_tuple] = {"from": u, "to": v, "color": color, "width": width, "dashes": dashes} - if best_path_asns is not None: path_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in best_path_asns] + [prefix] for i in range(len(path_chain) - 1): add_edge(path_chain[i], path_chain[i+1], '#C0392B', 3, False) - for path in all_paths: if path == best_path_asns: continue path_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path] + [prefix] for i in range(len(path_chain) - 1): add_edge(path_chain[i], path_chain[i+1], '#2C3E50', 1, True) edges = list(edge_map.values()) - return {"nodes": nodes, "edges": edges} + def generate_visual_route_graph(ip_address_str: str) -> dict: if not ip_address_str: return {"error": "IP address is required."} - try: - ip_obj = ipaddress.ip_address(ip_address_str) - ip_version = f"ipv{ip_obj.version}" - except ValueError: - return {"error": f"Invalid IP address: {ip_address_str}"} - - bgp_data = _get_bgp_data(ip_address_str, ip_version) + + address_to_lookup = "" + ip_version = "" + + if '/' in ip_address_str: + try: + net_obj = ipaddress.ip_network(ip_address_str, strict=False) + ip_version = f"ipv{net_obj.version}" + address_to_lookup = net_obj.with_prefixlen + except ValueError: + return {"error": f"Invalid CIDR notation: {ip_address_str}"} + else: + try: + ip_obj = ipaddress.ip_address(ip_address_str) + ip_version = f"ipv{ip_obj.version}" + address_to_lookup = ip_address_str + except ValueError: + return {"error": f"Invalid IP address: {ip_address_str}"} + + bgp_data = _get_bgp_data(address_to_lookup, ip_version) if not bgp_data: - return {"error": f"Failed to retrieve BGP data for {ip_address_str}."} + return {"error": f"Failed to retrieve BGP data for {address_to_lookup}."} graph_data = _parse_bgp_paths_to_graph(bgp_data) if not graph_data.get("nodes"):