diff --git a/modules/visual_route.py b/modules/visual_route.py
index 1d2163f..f727bf1 100644
--- a/modules/visual_route.py
+++ b/modules/visual_route.py
@@ -9,6 +9,10 @@ from dotenv import load_dotenv
load_dotenv()
+TRANSIT_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_TRANSIT', '').split(',')))
+IX_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_IX', '').split(',')))
+CUSTOMER_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_CUSTOMER', '').split(',')))
+
API_URL = os.getenv('BGP_API_URL', 'http://192.168.5.16:5000/bgp-route/lookup')
AS_NAME_CACHE = {}
AS_NAME_WRAP_WIDTH = 25
@@ -56,12 +60,10 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data)
prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix"
lines = bgp_data.split('\n')
-
start_index = -1
try:
start_index = next(i for i, line in enumerate(lines) if "Paths:" in line) + 1
except StopIteration:
- print("DEBUG: 'Paths:' header not found. Trying fallback parsing method.")
try:
header_line_index = next(i for i, line in enumerate(lines) if "BGP routing table entry for" in line)
start_index = header_line_index + 1
@@ -69,10 +71,8 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
start_index += 1
except StopIteration:
return {"nodes": [], "edges": []}
-
- if start_index == -1 or start_index >= len(lines):
+ if start_index >= len(lines):
return {"nodes": [], "edges": []}
-
path_blocks, current_block = [], []
for line in lines[start_index:]:
stripped_line = line.strip()
@@ -85,8 +85,12 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
current_block.append(line)
if current_block: path_blocks.append(current_block)
- all_paths, best_path_asns = [], []
+ all_paths_info, best_path_info = [], None
for block in path_blocks:
+ block_text = "\n".join(block)
+ block_text_lower = block_text.lower()
+ is_best = 'best' in block_text_lower
+ is_multipath = 'multipath' in block_text_lower
path_line = block[0]
if "From:" in path_line and len(block) > 1:
as_path_line_candidates = [l for l in block if re.search(r'\b\d+\b', l)]
@@ -96,104 +100,111 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
if match: clean_path_line = clean_path_line[:match.start()]
path_asns_raw = re.findall(r'\b(\d+)\b', clean_path_line)
path_asns = list(dict.fromkeys(path_asns_raw))
- if path_asns not in all_paths: all_paths.append(path_asns)
- if 'best' in "\n".join(block).lower() and not best_path_asns:
- best_path_asns = path_asns
+ local_pref_match = re.search(r'localpref (\d+)', block_text)
+ local_pref = int(local_pref_match.group(1)) if local_pref_match else None
+ next_hop_match = re.search(r'^\s*([\da-fA-F:.]+)\s+from', block_text, re.MULTILINE)
+ next_hop = next_hop_match.group(1) if next_hop_match else None
+ community_match = re.search(r'Large Community: ([\d:]+)', block_text)
+ community = community_match.group(1) if community_match else None
+ category = 'other'
+ if community:
+ if community in TRANSIT_COMMUNITIES: category = 'transit'
+ elif community in IX_COMMUNITIES: category = 'ix'
+ elif community in CUSTOMER_COMMUNITIES: category = 'customer'
+ path_info = {"asns": path_asns, "local_pref": local_pref, "next_hop": next_hop, "is_best": is_best, "is_multipath": is_multipath, "community": community, "category": category}
+ all_paths_info.append(path_info)
+ if is_best and not best_path_info:
+ best_path_info = path_info
- if not all_paths and best_path_asns is not None and not best_path_asns:
- if any('Local' in block[0] for block in path_blocks):
- all_paths.append([])
- if 'best' in bgp_data: best_path_asns = []
- if not all_paths and best_path_asns is not None and not best_path_asns:
- all_paths.append([])
- all_asns_in_graph = {asn for path in all_paths for asn in path}
+ all_asns_in_graph = {asn for path in all_paths_info for asn in path['asns']}
if all_asns_in_graph:
_bulk_get_as_names(list(all_asns_in_graph))
- ordered_paths = []
- if best_path_asns is not None: ordered_paths.append(best_path_asns)
- for path in all_paths:
- if path != best_path_asns: ordered_paths.append(path)
- node_lanes, node_levels, max_level = {}, {}, 0
- Y_SEPARATION, X_SEPARATION = 110, 280
- y_lane_alternator = 1
- for i, path in enumerate(ordered_paths):
- lane_y = 0
- if i > 0 or best_path_asns is None:
- lane_y = y_lane_alternator * Y_SEPARATION
- if y_lane_alternator > 0: y_lane_alternator *= -1
- else: y_lane_alternator = (y_lane_alternator * -1) + 1
- full_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path]
- for node_id in full_chain:
- if node_id not in node_lanes: node_lanes[node_id] = lane_y
- for path in all_paths:
- full_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path]
- for level, node_id in enumerate(full_chain):
- node_levels[node_id] = max(node_levels.get(node_id, 0), level)
- max_level = max(max_level, level)
- node_levels[prefix] = max_level + 1
- if prefix not in node_lanes: node_lanes[prefix] = 0
- nodes, edges = [], []
- best_path_node_ids = {f"AS{asn}" for asn in best_path_asns} | {ROUTER_NAME, prefix} if best_path_asns is not None else {ROUTER_NAME, prefix}
- all_node_ids = set(node_levels.keys())
- for node_id in sorted(list(all_node_ids)):
- color = '#FADBD8' if node_id in best_path_node_ids else '#D6DBDF'
- label, is_endpoint = node_id, (node_id == ROUTER_NAME or node_id == prefix)
- if node_id.startswith('AS'):
- asn_number = node_id[2:]
- as_name = AS_NAME_CACHE.get(asn_number, "")
- if as_name:
- wrapped_name = '\n'.join(textwrap.wrap(as_name, width=AS_NAME_WRAP_WIDTH))
- label = f"{node_id}\n{wrapped_name}"
- else:
- label = f"{node_id}"
- elif is_endpoint:
- label = f"{node_id}"
- nodes.append({"id": node_id, "label": label, "color": color, "x": node_levels[node_id] * X_SEPARATION, "y": node_lanes.get(node_id, 0), "fixed": True})
- edge_map = {}
- def add_edge(u, v, color, width, dashes=False):
- edge_tuple = tuple(sorted((u, v)))
- if u == v: return
- if edge_tuple not in edge_map or color == '#C0392B':
- edge_map[edge_tuple] = {"from": u, "to": v, "color": color, "width": width, "dashes": dashes}
- if best_path_asns is not None:
- path_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in best_path_asns] + [prefix]
- for i in range(len(path_chain) - 1): add_edge(path_chain[i], path_chain[i+1], '#C0392B', 3, False)
- for path in all_paths:
- if path == best_path_asns: continue
- path_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path] + [prefix]
- for i in range(len(path_chain) - 1): add_edge(path_chain[i], path_chain[i+1], '#2C3E50', 1, True)
- edges = list(edge_map.values())
- return {"nodes": nodes, "edges": edges}
+ ordered_paths = sorted(all_paths_info, key=lambda p: (not p['is_best'], not p['is_multipath']))
+
+ nodes, edges = [], []
+ X_SEPARATION, Y_SEPARATION = 300, 200
+ max_path_len = max(len(p['asns']) for p in ordered_paths) if ordered_paths else 0
+ nodes.append({"id": ROUTER_NAME, "label": f"{ROUTER_NAME}", "color": '#FADBD8', "x": 0, "y": 0, "fixed": True, "path_category": "global", "is_active": True})
+ nodes.append({"id": prefix, "label": f"{prefix}", "color": '#FADBD8', "x": (max_path_len + 1) * X_SEPARATION, "y": 0, "fixed": True, "path_category": "global", "is_active": True})
+
+ y_pos_counter_up, y_pos_counter_down = 1, 1
+ for i, path_info in enumerate(ordered_paths):
+ lane_y = 0
+ if not path_info['is_best']:
+ if y_pos_counter_up <= y_pos_counter_down:
+ lane_y = y_pos_counter_up * Y_SEPARATION
+ y_pos_counter_up += 1
+ else:
+ lane_y = -y_pos_counter_down * Y_SEPARATION
+ y_pos_counter_down += 1
+
+ style, is_active_path = {}, False
+ if path_info['is_best']:
+ style = {"node_color": '#FADBD8', "edge_color": '#C0392B', "width": 3, "dashes": False, "path_type": " (best)"}
+ is_active_path = True
+ elif path_info['is_multipath']:
+ style = {"node_color": '#FDEBD0', "edge_color": '#F39C12', "width": 2, "dashes": False, "path_type": " (multipath)"}
+ is_active_path = True
+ else:
+ style = {"node_color": '#D6DBDF', "edge_color": '#2C3E50', "width": 1, "dashes": True, "path_type": ""}
+ is_active_path = False
+
+ path_node_ids = []
+ for j, asn in enumerate(path_info['asns']):
+ unique_node_id = f"AS{asn}-{i}"
+ path_node_ids.append(unique_node_id)
+ as_name = AS_NAME_CACHE.get(asn, "")
+ wrapped_name = '\n'.join(textwrap.wrap(as_name, width=AS_NAME_WRAP_WIDTH)) if as_name else ""
+
+ base_label = f"AS{asn}"
+ if j == 0 and path_info['local_pref'] is not None:
+ base_label += f" (LP: {path_info['local_pref']})"
+
+ label = f"{base_label}\n{wrapped_name}"
+
+ if j == 0 and path_info['next_hop']:
+ label += f"\nNext Hop: {path_info['next_hop']}{style['path_type']}"
+
+ nodes.append({"id": unique_node_id, "label": label, "color": style['node_color'], "x": (j + 1) * X_SEPARATION, "y": lane_y, "fixed": True, "path_category": path_info['category'], "is_active": is_active_path})
+
+ full_chain = [ROUTER_NAME] + path_node_ids + [prefix]
+ smooth_config = {"enabled": True, "type": "cubicBezier", "forceDirection": "horizontal", "roundness": 0.85}
+
+ for j in range(len(full_chain) - 1):
+ edges.append({
+ "from": full_chain[j],
+ "to": full_chain[j+1],
+ "color": style['edge_color'],
+ "width": style['width'],
+ "dashes": style['dashes'],
+ "path_category": path_info['category'],
+ "is_active": is_active_path,
+ "smooth": smooth_config,
+ })
+
+ return {"nodes": nodes, "edges": edges}
def generate_visual_route_graph(ip_address_str: str) -> dict:
if not ip_address_str:
return {"error": "IP address is required."}
-
- address_to_lookup = ""
- ip_version = ""
-
+ address_to_lookup, ip_version = "", ""
if '/' in ip_address_str:
try:
net_obj = ipaddress.ip_network(ip_address_str, strict=False)
- ip_version = f"ipv{net_obj.version}"
- address_to_lookup = net_obj.with_prefixlen
+ ip_version, address_to_lookup = f"ipv{net_obj.version}", net_obj.with_prefixlen
except ValueError:
return {"error": f"Invalid CIDR notation: {ip_address_str}"}
else:
try:
ip_obj = ipaddress.ip_address(ip_address_str)
- ip_version = f"ipv{ip_obj.version}"
- address_to_lookup = ip_address_str
+ ip_version, address_to_lookup = f"ipv{ip_obj.version}", ip_address_str
except ValueError:
return {"error": f"Invalid IP address: {ip_address_str}"}
-
bgp_data = _get_bgp_data(address_to_lookup, ip_version)
if not bgp_data:
return {"error": f"Failed to retrieve BGP data for {address_to_lookup}."}
-
graph_data = _parse_bgp_paths_to_graph(bgp_data)
if not graph_data.get("nodes"):
return {"error": "Could not parse valid AS paths from the API response."}
-
return graph_data
\ No newline at end of file