diff --git a/modules/visual_route.py b/modules/visual_route.py
index a95ed40..da10bc8 100644
--- a/modules/visual_route.py
+++ b/modules/visual_route.py
@@ -1,4 +1,3 @@
-import requests
import re
import json
import socket
@@ -13,10 +12,9 @@ TRANSIT_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_TRANSIT', '').
IX_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_IX', '').split(',')))
CUSTOMER_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_CUSTOMER', '').split(',')))
-API_URL = os.getenv('BGP_API_URL', 'http://127.0.0.1:5000/bgp-route/lookup')
AS_NAME_CACHE = {}
AS_NAME_WRAP_WIDTH = 25
-ROUTER_NAME = os.getenv('BGP_VIS_ROUTER_NAME', 'My Router')
+ROUTER_NAME_FALLBACK = os.getenv('BGP_VIS_ROUTER_NAME', 'My Router')
MAX_IP_CIDR_LENGTH = 45
def _is_valid_ip_or_prefix(target: str) -> bool:
@@ -73,18 +71,25 @@ def _bulk_get_as_names(asn_numbers: list[str]):
for asn in lookup_list:
if asn not in AS_NAME_CACHE: AS_NAME_CACHE[asn] = ""
-def _get_bgp_data(ip_address: str, ip_version: str) -> str | None:
- payload = {"ip_version": ip_version, "bgprouteprefix": ip_address}
- headers = {"Content-Type": "application/json"}
+def _get_bgp_data(ip_address: str, ip_version: str, location_config: dict, bgp_lookup_func) -> str | None:
try:
- response = requests.post(API_URL, json=payload, headers=headers, timeout=10)
- response.raise_for_status()
- data = response.json()
- if data.get("success") and data.get("data"): return data["data"]
- else: return None
- except requests.exceptions.RequestException: return None
+ api_url = location_config.get("vyos_api_url")
+ api_key = location_config.get("vyos_api_key")
+ vrf_name = location_config.get("bgp_vrf_name")
+
+ response_data = bgp_lookup_func(api_url, api_key, vrf_name, ip_version, ip_address)
+
+ if response_data.get("success") and "data" in response_data:
+ return response_data["data"]
+ else:
+ error_message = response_data.get('error', 'Unknown error')
+ print(f"BGP lookup via VyOS API failed for {ip_address}. Error: {error_message}")
+ return None
+ except Exception as e:
+ print(f"An exception occurred during internal BGP lookup: {e}")
+ return None
-def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
+def _parse_bgp_paths_to_graph(bgp_data: str, router_name: str) -> dict:
prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data)
prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix"
lines = bgp_data.split('\n')
@@ -164,7 +169,7 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
if not ordered_paths:
return {"nodes": [], "edges": []}
- nodes.append({"id": ROUTER_NAME, "label": f"{ROUTER_NAME}", "color": '#FADBD8', "x": 0, "y": 0, "fixed": True, "path_category": "global", "is_active": True})
+ nodes.append({"id": router_name, "label": f"{router_name}", "color": '#FADBD8', "x": 0, "y": 0, "fixed": True, "path_category": "global", "is_active": True})
nodes.append({"id": prefix, "label": f"{prefix}", "color": '#FADBD8', "x": (max_path_len + 1) * X_SEPARATION, "y": 0, "fixed": True, "path_category": "global", "is_active": True})
y_pos_counter_up, y_pos_counter_down = 1, 1
@@ -193,7 +198,7 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
if j == 0 and path_info['next_hop']: label += f"\nNext Hop: {path_info['next_hop']}{style['path_type']}"
nodes.append({"id": unique_node_id, "label": label, "color": style['node_color'], "x": (j + 1) * X_SEPARATION, "y": lane_y, "fixed": True, "path_category": path_info['category'], "is_active": is_active_path})
- full_chain = [ROUTER_NAME] + path_node_ids + [prefix]
+ full_chain = [router_name] + path_node_ids + [prefix]
smooth_config = {"enabled": True, "type": "cubicBezier", "forceDirection": "horizontal", "roundness": 0.85}
for j in range(len(full_chain) - 1):
edges.append({"from": full_chain[j], "to": full_chain[j+1], "color": style['edge_color'], "width": style['width'], "dashes": style['dashes'], "path_category": path_info['category'], "is_active": is_active_path, "smooth": smooth_config})
@@ -201,7 +206,7 @@ def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
return {"nodes": nodes, "edges": edges, "path_count": len(ordered_paths)}
-def get_raw_bgp_route(ip_address_str: str) -> tuple[str | None, str | None]:
+def get_raw_bgp_route(ip_address_str: str, location_config: dict, bgp_lookup_func) -> tuple[str | None, str | None]:
ip_address_str, error = _validate_ip_prefix_input(ip_address_str)
if error:
return None, error
@@ -210,14 +215,15 @@ def get_raw_bgp_route(ip_address_str: str) -> tuple[str | None, str | None]:
if not address_to_lookup:
return None, f"Invalid input '{ip_address_str}'."
- bgp_data = _get_bgp_data(address_to_lookup, ip_version)
+ bgp_data = _get_bgp_data(address_to_lookup, ip_version, location_config, bgp_lookup_func)
if not bgp_data:
return None, f"Route information not found for: {address_to_lookup}"
return bgp_data, None
+def generate_visual_route_graph(ip_address_str: str, location_config: dict, bgp_lookup_func) -> dict:
+ router_name = location_config.get('router_name', ROUTER_NAME_FALLBACK)
-def generate_visual_route_graph(ip_address_str: str) -> dict:
ip_address_str, error = _validate_ip_prefix_input(ip_address_str)
if error:
return {"error": error}
@@ -226,11 +232,11 @@ def generate_visual_route_graph(ip_address_str: str) -> dict:
if not address_to_lookup:
return {"error": f"Invalid input '{ip_address_str}'."}
- bgp_data = _get_bgp_data(address_to_lookup, ip_version)
+ bgp_data = _get_bgp_data(address_to_lookup, ip_version, location_config, bgp_lookup_func)
if not bgp_data:
return {"not_found": True, "target": address_to_lookup}
- graph_data = _parse_bgp_paths_to_graph(bgp_data)
+ graph_data = _parse_bgp_paths_to_graph(bgp_data, router_name)
if not graph_data.get("nodes"):
return {"not_found": True, "target": address_to_lookup}