Update modules/visual_route.py

This commit is contained in:
Blackwhitebear8 2025-07-06 17:00:31 +02:00
parent cf5c76e792
commit fb2484123f

View file

@ -20,8 +20,7 @@ ROUTER_NAME = os.getenv('BGP_VIS_ROUTER_NAME', 'My Router')
def _bulk_get_as_names(asn_numbers: list[str]): def _bulk_get_as_names(asn_numbers: list[str]):
lookup_list = [asn for asn in asn_numbers if asn not in AS_NAME_CACHE] lookup_list = [asn for asn in asn_numbers if asn not in AS_NAME_CACHE]
if not lookup_list: if not lookup_list: return
return
print(f"Performing bulk lookup for {len(lookup_list)} AS numbers...") print(f"Performing bulk lookup for {len(lookup_list)} AS numbers...")
query = "begin\n" + "\n".join(f"AS{asn}" for asn in lookup_list) + "\nend\n" query = "begin\n" + "\n".join(f"AS{asn}" for asn in lookup_list) + "\nend\n"
try: try:
@ -60,51 +59,56 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data) prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data)
prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix" prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix"
lines = bgp_data.split('\n') lines = bgp_data.split('\n')
start_index = -1
try:
start_index = next(i for i, line in enumerate(lines) if "Paths:" in line) + 1
except StopIteration:
try:
header_line_index = next(i for i, line in enumerate(lines) if "BGP routing table entry for" in line)
start_index = header_line_index + 1
while start_index < len(lines) and not lines[start_index].strip():
start_index += 1
except StopIteration:
return {"nodes": [], "edges": []}
if start_index >= len(lines):
return {"nodes": [], "edges": []}
path_blocks, current_block = [], []
for line in lines[start_index:]:
stripped_line = line.strip()
first_word_is_asn = stripped_line and (stripped_line.split(' ')[0].replace(',', '').isdigit() or stripped_line.startswith('From') or stripped_line.startswith('Local'))
is_new_path_line = (not line.startswith(' ') and first_word_is_asn)
if is_new_path_line:
if current_block: path_blocks.append(current_block)
current_block = [line]
elif stripped_line and current_block:
current_block.append(line)
if current_block: path_blocks.append(current_block)
path_blocks = []
try:
paths_header_index = next(i for i, line in enumerate(lines) if "Paths:" in line)
current_block = []
for line in lines[paths_header_index + 1:]:
stripped_line = line.strip()
if not stripped_line: continue
is_new_path_line = False
if line.startswith(' ') and not line.startswith(' '):
first_word = stripped_line.split(' ')[0]
if first_word.isdigit() or stripped_line == "Local":
is_new_path_line = True
if is_new_path_line:
if current_block: path_blocks.append(current_block)
current_block = [line]
elif current_block:
current_block.append(line)
if current_block: path_blocks.append(current_block)
except StopIteration:
return {"nodes": [], "edges": []}
all_paths_info, best_path_info = [], None all_paths_info, best_path_info = [], None
for block in path_blocks: for block in path_blocks:
block_text = "\n".join(block)
block_text_lower = block_text.lower() clean_lines = [line for line in block if not line.strip().startswith("AddPath ID:")]
is_best = 'best' in block_text_lower block_text_for_check = "\n".join(clean_lines)
is_multipath = 'multipath' in block_text_lower
path_line = block[0] is_best = bool(re.search(r'\bbest\b', block_text_for_check, re.IGNORECASE))
if "From:" in path_line and len(block) > 1: is_multipath = 'multipath' in block_text_for_check.lower()
as_path_line_candidates = [l for l in block if re.search(r'\b\d+\b', l)]
if as_path_line_candidates: path_line = as_path_line_candidates[0] block_text_full = "\n".join(block)
clean_path_line = path_line.strip() path_line = block[0].strip()
match = re.search(r'[,(]', clean_path_line) path_asns_raw = []
if match: clean_path_line = clean_path_line[:match.start()] if path_line != "Local":
path_asns_raw = re.findall(r'\b(\d+)\b', clean_path_line) for part in path_line.split(' '):
clean_part = part.replace(',', '').strip()
if clean_part.isdigit():
path_asns_raw.append(clean_part)
else:
break
path_asns = list(dict.fromkeys(path_asns_raw)) path_asns = list(dict.fromkeys(path_asns_raw))
local_pref_match = re.search(r'localpref (\d+)', block_text)
local_pref_match = re.search(r'localpref (\d+)', block_text_full)
local_pref = int(local_pref_match.group(1)) if local_pref_match else None local_pref = int(local_pref_match.group(1)) if local_pref_match else None
next_hop_match = re.search(r'^\s*([\da-fA-F:.]+)\s+from', block_text, re.MULTILINE) next_hop_match = re.search(r'^\s*([\da-fA-F:.]+)\s+from', block_text_full, re.MULTILINE)
next_hop = next_hop_match.group(1) if next_hop_match else None next_hop = next_hop_match.group(1) if next_hop_match else None
community_match = re.search(r'Large Community: ([\d:]+)', block_text) community_match = re.search(r'Large Community: ([\d:]+)', block_text_full)
community = community_match.group(1) if community_match else None community = community_match.group(1) if community_match else None
category = 'other' category = 'other'
if community: if community:
@ -124,7 +128,7 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
nodes, edges = [], [] nodes, edges = [], []
X_SEPARATION, Y_SEPARATION = 300, 200 X_SEPARATION, Y_SEPARATION = 300, 200
max_path_len = max(len(p['asns']) for p in ordered_paths) if ordered_paths else 0 max_path_len = max((len(p['asns']) for p in ordered_paths if p['asns']), default=0)
nodes.append({"id": ROUTER_NAME, "label": f"<b>{ROUTER_NAME}</b>", "color": '#FADBD8', "x": 0, "y": 0, "fixed": True, "path_category": "global", "is_active": True}) nodes.append({"id": ROUTER_NAME, "label": f"<b>{ROUTER_NAME}</b>", "color": '#FADBD8', "x": 0, "y": 0, "fixed": True, "path_category": "global", "is_active": True})
nodes.append({"id": prefix, "label": f"<b>{prefix}</b>", "color": '#FADBD8', "x": (max_path_len + 1) * X_SEPARATION, "y": 0, "fixed": True, "path_category": "global", "is_active": True}) nodes.append({"id": prefix, "label": f"<b>{prefix}</b>", "color": '#FADBD8', "x": (max_path_len + 1) * X_SEPARATION, "y": 0, "fixed": True, "path_category": "global", "is_active": True})
@ -156,34 +160,20 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
path_node_ids.append(unique_node_id) path_node_ids.append(unique_node_id)
as_name = AS_NAME_CACHE.get(asn, "") as_name = AS_NAME_CACHE.get(asn, "")
wrapped_name = '\n'.join(textwrap.wrap(as_name, width=AS_NAME_WRAP_WIDTH)) if as_name else "" wrapped_name = '\n'.join(textwrap.wrap(as_name, width=AS_NAME_WRAP_WIDTH)) if as_name else ""
base_label = f"<b>AS{asn}</b>" base_label = f"<b>AS{asn}</b>"
if j == 0 and path_info['local_pref'] is not None: if j == 0 and path_info['local_pref'] is not None:
base_label += f" (LP: <b>{path_info['local_pref']}</b>)" base_label += f" (LP: {path_info['local_pref']})"
label = f"{base_label}\n{wrapped_name}" label = f"{base_label}\n{wrapped_name}"
if j == 0 and path_info['next_hop']: if j == 0 and path_info['next_hop']:
label += f"\n<i>Next Hop: {path_info['next_hop']}{style['path_type']}</i>" label += f"\n<i>Next Hop: {path_info['next_hop']}{style['path_type']}</i>"
nodes.append({"id": unique_node_id, "label": label, "color": style['node_color'], "x": (j + 1) * X_SEPARATION, "y": lane_y, "fixed": True, "path_category": path_info['category'], "is_active": is_active_path}) nodes.append({"id": unique_node_id, "label": label, "color": style['node_color'], "x": (j + 1) * X_SEPARATION, "y": lane_y, "fixed": True, "path_category": path_info['category'], "is_active": is_active_path})
full_chain = [ROUTER_NAME] + path_node_ids + [prefix] full_chain = [ROUTER_NAME] + path_node_ids + [prefix]
smooth_config = {"enabled": True, "type": "cubicBezier", "forceDirection": "horizontal", "roundness": 0.85} smooth_config = {"enabled": True, "type": "cubicBezier", "forceDirection": "horizontal", "roundness": 0.85}
for j in range(len(full_chain) - 1): for j in range(len(full_chain) - 1):
edges.append({ edges.append({"from": full_chain[j], "to": full_chain[j+1], "color": style['edge_color'], "width": style['width'], "dashes": style['dashes'], "path_category": path_info['category'], "is_active": is_active_path, "smooth": smooth_config})
"from": full_chain[j],
"to": full_chain[j+1],
"color": style['edge_color'],
"width": style['width'],
"dashes": style['dashes'],
"path_category": path_info['category'],
"is_active": is_active_path,
"smooth": smooth_config,
})
return {"nodes": nodes, "edges": edges} return {"nodes": nodes, "edges": edges, "path_count": len(ordered_paths)}
def generate_visual_route_graph(ip_address_str: str) -> dict: def generate_visual_route_graph(ip_address_str: str) -> dict:
if not ip_address_str: if not ip_address_str: