From fb2484123fc1126490f9e0b62d858497f9e3c777 Mon Sep 17 00:00:00 2001 From: Blackwhitebear8 Date: Sun, 6 Jul 2025 17:00:31 +0200 Subject: [PATCH] Update modules/visual_route.py --- modules/visual_route.py | 108 ++++++++++++++++++---------------------- 1 file changed, 49 insertions(+), 59 deletions(-) diff --git a/modules/visual_route.py b/modules/visual_route.py index f727bf1..c6e7fdc 100644 --- a/modules/visual_route.py +++ b/modules/visual_route.py @@ -20,8 +20,7 @@ ROUTER_NAME = os.getenv('BGP_VIS_ROUTER_NAME', 'My Router') def _bulk_get_as_names(asn_numbers: list[str]): lookup_list = [asn for asn in asn_numbers if asn not in AS_NAME_CACHE] - if not lookup_list: - return + if not lookup_list: return print(f"Performing bulk lookup for {len(lookup_list)} AS numbers...") query = "begin\n" + "\n".join(f"AS{asn}" for asn in lookup_list) + "\nend\n" try: @@ -60,51 +59,56 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict: prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data) prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix" lines = bgp_data.split('\n') - start_index = -1 - try: - start_index = next(i for i, line in enumerate(lines) if "Paths:" in line) + 1 - except StopIteration: - try: - header_line_index = next(i for i, line in enumerate(lines) if "BGP routing table entry for" in line) - start_index = header_line_index + 1 - while start_index < len(lines) and not lines[start_index].strip(): - start_index += 1 - except StopIteration: - return {"nodes": [], "edges": []} - if start_index >= len(lines): - return {"nodes": [], "edges": []} - path_blocks, current_block = [], [] - for line in lines[start_index:]: - stripped_line = line.strip() - first_word_is_asn = stripped_line and (stripped_line.split(' ')[0].replace(',', '').isdigit() or stripped_line.startswith('From') or stripped_line.startswith('Local')) - is_new_path_line = (not line.startswith(' ') and first_word_is_asn) - if is_new_path_line: - if current_block: path_blocks.append(current_block) - current_block = [line] - elif stripped_line and current_block: - current_block.append(line) - if current_block: path_blocks.append(current_block) + path_blocks = [] + try: + paths_header_index = next(i for i, line in enumerate(lines) if "Paths:" in line) + current_block = [] + for line in lines[paths_header_index + 1:]: + stripped_line = line.strip() + if not stripped_line: continue + is_new_path_line = False + if line.startswith(' ') and not line.startswith(' '): + first_word = stripped_line.split(' ')[0] + if first_word.isdigit() or stripped_line == "Local": + is_new_path_line = True + + if is_new_path_line: + if current_block: path_blocks.append(current_block) + current_block = [line] + elif current_block: + current_block.append(line) + + if current_block: path_blocks.append(current_block) + except StopIteration: + return {"nodes": [], "edges": []} + all_paths_info, best_path_info = [], None for block in path_blocks: - block_text = "\n".join(block) - block_text_lower = block_text.lower() - is_best = 'best' in block_text_lower - is_multipath = 'multipath' in block_text_lower - path_line = block[0] - if "From:" in path_line and len(block) > 1: - as_path_line_candidates = [l for l in block if re.search(r'\b\d+\b', l)] - if as_path_line_candidates: path_line = as_path_line_candidates[0] - clean_path_line = path_line.strip() - match = re.search(r'[,(]', clean_path_line) - if match: clean_path_line = clean_path_line[:match.start()] - path_asns_raw = re.findall(r'\b(\d+)\b', clean_path_line) + + clean_lines = [line for line in block if not line.strip().startswith("AddPath ID:")] + block_text_for_check = "\n".join(clean_lines) + + is_best = bool(re.search(r'\bbest\b', block_text_for_check, re.IGNORECASE)) + is_multipath = 'multipath' in block_text_for_check.lower() + + block_text_full = "\n".join(block) + path_line = block[0].strip() + path_asns_raw = [] + if path_line != "Local": + for part in path_line.split(' '): + clean_part = part.replace(',', '').strip() + if clean_part.isdigit(): + path_asns_raw.append(clean_part) + else: + break path_asns = list(dict.fromkeys(path_asns_raw)) - local_pref_match = re.search(r'localpref (\d+)', block_text) + + local_pref_match = re.search(r'localpref (\d+)', block_text_full) local_pref = int(local_pref_match.group(1)) if local_pref_match else None - next_hop_match = re.search(r'^\s*([\da-fA-F:.]+)\s+from', block_text, re.MULTILINE) + next_hop_match = re.search(r'^\s*([\da-fA-F:.]+)\s+from', block_text_full, re.MULTILINE) next_hop = next_hop_match.group(1) if next_hop_match else None - community_match = re.search(r'Large Community: ([\d:]+)', block_text) + community_match = re.search(r'Large Community: ([\d:]+)', block_text_full) community = community_match.group(1) if community_match else None category = 'other' if community: @@ -124,7 +128,7 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict: nodes, edges = [], [] X_SEPARATION, Y_SEPARATION = 300, 200 - max_path_len = max(len(p['asns']) for p in ordered_paths) if ordered_paths else 0 + max_path_len = max((len(p['asns']) for p in ordered_paths if p['asns']), default=0) nodes.append({"id": ROUTER_NAME, "label": f"{ROUTER_NAME}", "color": '#FADBD8', "x": 0, "y": 0, "fixed": True, "path_category": "global", "is_active": True}) nodes.append({"id": prefix, "label": f"{prefix}", "color": '#FADBD8', "x": (max_path_len + 1) * X_SEPARATION, "y": 0, "fixed": True, "path_category": "global", "is_active": True}) @@ -156,34 +160,20 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict: path_node_ids.append(unique_node_id) as_name = AS_NAME_CACHE.get(asn, "") wrapped_name = '\n'.join(textwrap.wrap(as_name, width=AS_NAME_WRAP_WIDTH)) if as_name else "" - base_label = f"AS{asn}" if j == 0 and path_info['local_pref'] is not None: - base_label += f" (LP: {path_info['local_pref']})" - + base_label += f" (LP: {path_info['local_pref']})" label = f"{base_label}\n{wrapped_name}" - if j == 0 and path_info['next_hop']: label += f"\nNext Hop: {path_info['next_hop']}{style['path_type']}" - nodes.append({"id": unique_node_id, "label": label, "color": style['node_color'], "x": (j + 1) * X_SEPARATION, "y": lane_y, "fixed": True, "path_category": path_info['category'], "is_active": is_active_path}) full_chain = [ROUTER_NAME] + path_node_ids + [prefix] smooth_config = {"enabled": True, "type": "cubicBezier", "forceDirection": "horizontal", "roundness": 0.85} - for j in range(len(full_chain) - 1): - edges.append({ - "from": full_chain[j], - "to": full_chain[j+1], - "color": style['edge_color'], - "width": style['width'], - "dashes": style['dashes'], - "path_category": path_info['category'], - "is_active": is_active_path, - "smooth": smooth_config, - }) + edges.append({"from": full_chain[j], "to": full_chain[j+1], "color": style['edge_color'], "width": style['width'], "dashes": style['dashes'], "path_category": path_info['category'], "is_active": is_active_path, "smooth": smooth_config}) - return {"nodes": nodes, "edges": edges} + return {"nodes": nodes, "edges": edges, "path_count": len(ordered_paths)} def generate_visual_route_graph(ip_address_str: str) -> dict: if not ip_address_str: