Update modules/visual_route.py

This commit is contained in:
Blackwhitebear8 2025-08-14 17:54:33 +02:00
parent 28c196d431
commit 9dd1d08393

View file

@ -1,4 +1,3 @@
import requests
import re
import json
import socket
@ -13,10 +12,9 @@ TRANSIT_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_TRANSIT', '').
IX_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_IX', '').split(',')))
CUSTOMER_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_CUSTOMER', '').split(',')))
API_URL = os.getenv('BGP_API_URL', 'http://127.0.0.1:5000/bgp-route/lookup')
AS_NAME_CACHE = {}
AS_NAME_WRAP_WIDTH = 25
ROUTER_NAME = os.getenv('BGP_VIS_ROUTER_NAME', 'My Router')
ROUTER_NAME_FALLBACK = os.getenv('BGP_VIS_ROUTER_NAME', 'My Router')
MAX_IP_CIDR_LENGTH = 45
def _is_valid_ip_or_prefix(target: str) -> bool:
@ -73,18 +71,25 @@ def _bulk_get_as_names(asn_numbers: list[str]):
for asn in lookup_list:
if asn not in AS_NAME_CACHE: AS_NAME_CACHE[asn] = ""
def _get_bgp_data(ip_address: str, ip_version: str) -> str | None:
payload = {"ip_version": ip_version, "bgprouteprefix": ip_address}
headers = {"Content-Type": "application/json"}
def _get_bgp_data(ip_address: str, ip_version: str, location_config: dict, bgp_lookup_func) -> str | None:
try:
response = requests.post(API_URL, json=payload, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
if data.get("success") and data.get("data"): return data["data"]
else: return None
except requests.exceptions.RequestException: return None
api_url = location_config.get("vyos_api_url")
api_key = location_config.get("vyos_api_key")
vrf_name = location_config.get("bgp_vrf_name")
def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
response_data = bgp_lookup_func(api_url, api_key, vrf_name, ip_version, ip_address)
if response_data.get("success") and "data" in response_data:
return response_data["data"]
else:
error_message = response_data.get('error', 'Unknown error')
print(f"BGP lookup via VyOS API failed for {ip_address}. Error: {error_message}")
return None
except Exception as e:
print(f"An exception occurred during internal BGP lookup: {e}")
return None
def _parse_bgp_paths_to_graph(bgp_data: str, router_name: str) -> dict:
prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data)
prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix"
lines = bgp_data.split('\n')
@ -164,7 +169,7 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
if not ordered_paths:
return {"nodes": [], "edges": []}
nodes.append({"id": ROUTER_NAME, "label": f"<b>{ROUTER_NAME}</b>", "color": '#FADBD8', "x": 0, "y": 0, "fixed": True, "path_category": "global", "is_active": True})
nodes.append({"id": router_name, "label": f"<b>{router_name}</b>", "color": '#FADBD8', "x": 0, "y": 0, "fixed": True, "path_category": "global", "is_active": True})
nodes.append({"id": prefix, "label": f"<b>{prefix}</b>", "color": '#FADBD8', "x": (max_path_len + 1) * X_SEPARATION, "y": 0, "fixed": True, "path_category": "global", "is_active": True})
y_pos_counter_up, y_pos_counter_down = 1, 1
@ -193,7 +198,7 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
if j == 0 and path_info['next_hop']: label += f"\n<i>Next Hop: {path_info['next_hop']}{style['path_type']}</i>"
nodes.append({"id": unique_node_id, "label": label, "color": style['node_color'], "x": (j + 1) * X_SEPARATION, "y": lane_y, "fixed": True, "path_category": path_info['category'], "is_active": is_active_path})
full_chain = [ROUTER_NAME] + path_node_ids + [prefix]
full_chain = [router_name] + path_node_ids + [prefix]
smooth_config = {"enabled": True, "type": "cubicBezier", "forceDirection": "horizontal", "roundness": 0.85}
for j in range(len(full_chain) - 1):
edges.append({"from": full_chain[j], "to": full_chain[j+1], "color": style['edge_color'], "width": style['width'], "dashes": style['dashes'], "path_category": path_info['category'], "is_active": is_active_path, "smooth": smooth_config})
@ -201,7 +206,7 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
return {"nodes": nodes, "edges": edges, "path_count": len(ordered_paths)}
def get_raw_bgp_route(ip_address_str: str) -> tuple[str | None, str | None]:
def get_raw_bgp_route(ip_address_str: str, location_config: dict, bgp_lookup_func) -> tuple[str | None, str | None]:
ip_address_str, error = _validate_ip_prefix_input(ip_address_str)
if error:
return None, error
@ -210,14 +215,15 @@ def get_raw_bgp_route(ip_address_str: str) -> tuple[str | None, str | None]:
if not address_to_lookup:
return None, f"Invalid input '{ip_address_str}'."
bgp_data = _get_bgp_data(address_to_lookup, ip_version)
bgp_data = _get_bgp_data(address_to_lookup, ip_version, location_config, bgp_lookup_func)
if not bgp_data:
return None, f"Route information not found for: {address_to_lookup}"
return bgp_data, None
def generate_visual_route_graph(ip_address_str: str, location_config: dict, bgp_lookup_func) -> dict:
router_name = location_config.get('router_name', ROUTER_NAME_FALLBACK)
def generate_visual_route_graph(ip_address_str: str) -> dict:
ip_address_str, error = _validate_ip_prefix_input(ip_address_str)
if error:
return {"error": error}
@ -226,11 +232,11 @@ def generate_visual_route_graph(ip_address_str: str) -> dict:
if not address_to_lookup:
return {"error": f"Invalid input '{ip_address_str}'."}
bgp_data = _get_bgp_data(address_to_lookup, ip_version)
bgp_data = _get_bgp_data(address_to_lookup, ip_version, location_config, bgp_lookup_func)
if not bgp_data:
return {"not_found": True, "target": address_to_lookup}
graph_data = _parse_bgp_paths_to_graph(bgp_data)
graph_data = _parse_bgp_paths_to_graph(bgp_data, router_name)
if not graph_data.get("nodes"):
return {"not_found": True, "target": address_to_lookup}