Update modules/visual_route.py

This commit is contained in:
Blackwhitebear8 2025-07-06 15:01:04 +02:00
parent e2b15d3d55
commit 0aad5c3fd3

View file

@ -56,16 +56,28 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data) prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data)
prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix" prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix"
lines = bgp_data.split('\n') lines = bgp_data.split('\n')
start_index = -1
try: try:
start_index = next(i for i, line in enumerate(lines) if "Paths:" in line) + 1 start_index = next(i for i, line in enumerate(lines) if "Paths:" in line) + 1
except StopIteration: return {"nodes": [], "edges": []} except StopIteration:
print("DEBUG: 'Paths:' header not found. Trying fallback parsing method.")
try:
header_line_index = next(i for i, line in enumerate(lines) if "BGP routing table entry for" in line)
start_index = header_line_index + 1
while start_index < len(lines) and not lines[start_index].strip():
start_index += 1
except StopIteration:
return {"nodes": [], "edges": []}
if start_index == -1 or start_index >= len(lines):
return {"nodes": [], "edges": []}
path_blocks, current_block = [], [] path_blocks, current_block = [], []
for line in lines[start_index:]: for line in lines[start_index:]:
stripped_line = line.strip() stripped_line = line.strip()
first_word_is_digit = stripped_line and stripped_line.split(' ')[0].replace(',', '').isdigit() first_word_is_asn = stripped_line and (stripped_line.split(' ')[0].replace(',', '').isdigit() or stripped_line.startswith('From') or stripped_line.startswith('Local'))
is_local_path = (stripped_line == 'Local') is_new_path_line = (not line.startswith(' ') and first_word_is_asn)
is_new_path_line = (line.startswith(' ') and not line.startswith(' ') and (first_word_is_digit or is_local_path))
if is_new_path_line: if is_new_path_line:
if current_block: path_blocks.append(current_block) if current_block: path_blocks.append(current_block)
current_block = [line] current_block = [line]
@ -76,6 +88,9 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
all_paths, best_path_asns = [], [] all_paths, best_path_asns = [], []
for block in path_blocks: for block in path_blocks:
path_line = block[0] path_line = block[0]
if "From:" in path_line and len(block) > 1:
as_path_line_candidates = [l for l in block if re.search(r'\b\d+\b', l)]
if as_path_line_candidates: path_line = as_path_line_candidates[0]
clean_path_line = path_line.strip() clean_path_line = path_line.strip()
match = re.search(r'[,(]', clean_path_line) match = re.search(r'[,(]', clean_path_line)
if match: clean_path_line = clean_path_line[:match.start()] if match: clean_path_line = clean_path_line[:match.start()]
@ -91,16 +106,13 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
if 'best' in bgp_data: best_path_asns = [] if 'best' in bgp_data: best_path_asns = []
if not all_paths and best_path_asns is not None and not best_path_asns: if not all_paths and best_path_asns is not None and not best_path_asns:
all_paths.append([]) all_paths.append([])
all_asns_in_graph = {asn for path in all_paths for asn in path} all_asns_in_graph = {asn for path in all_paths for asn in path}
if all_asns_in_graph: if all_asns_in_graph:
_bulk_get_as_names(list(all_asns_in_graph)) _bulk_get_as_names(list(all_asns_in_graph))
ordered_paths = [] ordered_paths = []
if best_path_asns is not None: ordered_paths.append(best_path_asns) if best_path_asns is not None: ordered_paths.append(best_path_asns)
for path in all_paths: for path in all_paths:
if path != best_path_asns: ordered_paths.append(path) if path != best_path_asns: ordered_paths.append(path)
node_lanes, node_levels, max_level = {}, {}, 0 node_lanes, node_levels, max_level = {}, {}, 0
Y_SEPARATION, X_SEPARATION = 110, 280 Y_SEPARATION, X_SEPARATION = 110, 280
y_lane_alternator = 1 y_lane_alternator = 1
@ -113,19 +125,15 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
full_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path] full_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path]
for node_id in full_chain: for node_id in full_chain:
if node_id not in node_lanes: node_lanes[node_id] = lane_y if node_id not in node_lanes: node_lanes[node_id] = lane_y
for path in all_paths: for path in all_paths:
full_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path] full_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path]
for level, node_id in enumerate(full_chain): for level, node_id in enumerate(full_chain):
node_levels[node_id] = max(node_levels.get(node_id, 0), level) node_levels[node_id] = max(node_levels.get(node_id, 0), level)
max_level = max(max_level, level) max_level = max(max_level, level)
node_levels[prefix] = max_level + 1 node_levels[prefix] = max_level + 1
if prefix not in node_lanes: node_lanes[prefix] = 0 if prefix not in node_lanes: node_lanes[prefix] = 0
nodes, edges = [], [] nodes, edges = [], []
best_path_node_ids = {f"AS{asn}" for asn in best_path_asns} | {ROUTER_NAME, prefix} if best_path_asns is not None else {ROUTER_NAME, prefix} best_path_node_ids = {f"AS{asn}" for asn in best_path_asns} | {ROUTER_NAME, prefix} if best_path_asns is not None else {ROUTER_NAME, prefix}
all_node_ids = set(node_levels.keys()) all_node_ids = set(node_levels.keys())
for node_id in sorted(list(all_node_ids)): for node_id in sorted(list(all_node_ids)):
color = '#FADBD8' if node_id in best_path_node_ids else '#D6DBDF' color = '#FADBD8' if node_id in best_path_node_ids else '#D6DBDF'
@ -141,38 +149,48 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
elif is_endpoint: elif is_endpoint:
label = f"<b>{node_id}</b>" label = f"<b>{node_id}</b>"
nodes.append({"id": node_id, "label": label, "color": color, "x": node_levels[node_id] * X_SEPARATION, "y": node_lanes.get(node_id, 0), "fixed": True}) nodes.append({"id": node_id, "label": label, "color": color, "x": node_levels[node_id] * X_SEPARATION, "y": node_lanes.get(node_id, 0), "fixed": True})
edge_map = {} edge_map = {}
def add_edge(u, v, color, width, dashes=False): def add_edge(u, v, color, width, dashes=False):
edge_tuple = tuple(sorted((u, v))) edge_tuple = tuple(sorted((u, v)))
if u == v: return if u == v: return
if edge_tuple not in edge_map or color == '#C0392B': if edge_tuple not in edge_map or color == '#C0392B':
edge_map[edge_tuple] = {"from": u, "to": v, "color": color, "width": width, "dashes": dashes} edge_map[edge_tuple] = {"from": u, "to": v, "color": color, "width": width, "dashes": dashes}
if best_path_asns is not None: if best_path_asns is not None:
path_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in best_path_asns] + [prefix] path_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in best_path_asns] + [prefix]
for i in range(len(path_chain) - 1): add_edge(path_chain[i], path_chain[i+1], '#C0392B', 3, False) for i in range(len(path_chain) - 1): add_edge(path_chain[i], path_chain[i+1], '#C0392B', 3, False)
for path in all_paths: for path in all_paths:
if path == best_path_asns: continue if path == best_path_asns: continue
path_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path] + [prefix] path_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path] + [prefix]
for i in range(len(path_chain) - 1): add_edge(path_chain[i], path_chain[i+1], '#2C3E50', 1, True) for i in range(len(path_chain) - 1): add_edge(path_chain[i], path_chain[i+1], '#2C3E50', 1, True)
edges = list(edge_map.values()) edges = list(edge_map.values())
return {"nodes": nodes, "edges": edges} return {"nodes": nodes, "edges": edges}
def generate_visual_route_graph(ip_address_str: str) -> dict: def generate_visual_route_graph(ip_address_str: str) -> dict:
if not ip_address_str: if not ip_address_str:
return {"error": "IP address is required."} return {"error": "IP address is required."}
address_to_lookup = ""
ip_version = ""
if '/' in ip_address_str:
try:
net_obj = ipaddress.ip_network(ip_address_str, strict=False)
ip_version = f"ipv{net_obj.version}"
address_to_lookup = net_obj.with_prefixlen
except ValueError:
return {"error": f"Invalid CIDR notation: {ip_address_str}"}
else:
try: try:
ip_obj = ipaddress.ip_address(ip_address_str) ip_obj = ipaddress.ip_address(ip_address_str)
ip_version = f"ipv{ip_obj.version}" ip_version = f"ipv{ip_obj.version}"
address_to_lookup = ip_address_str
except ValueError: except ValueError:
return {"error": f"Invalid IP address: {ip_address_str}"} return {"error": f"Invalid IP address: {ip_address_str}"}
bgp_data = _get_bgp_data(ip_address_str, ip_version) bgp_data = _get_bgp_data(address_to_lookup, ip_version)
if not bgp_data: if not bgp_data:
return {"error": f"Failed to retrieve BGP data for {ip_address_str}."} return {"error": f"Failed to retrieve BGP data for {address_to_lookup}."}
graph_data = _parse_bgp_paths_to_graph(bgp_data) graph_data = _parse_bgp_paths_to_graph(bgp_data)
if not graph_data.get("nodes"): if not graph_data.get("nodes"):