Add modules/visual_route.py

This commit is contained in:
Blackwhitebear8 2025-07-05 16:20:38 +02:00
parent 23edb1ab34
commit db55c822b0

181
modules/visual_route.py Normal file
View file

@ -0,0 +1,181 @@
import requests
import re
import json
import socket
import textwrap
import os
import ipaddress
from dotenv import load_dotenv
load_dotenv()
API_URL = os.getenv('BGP_API_URL', 'http://192.168.5.16:5000/bgp-route/lookup')
AS_NAME_CACHE = {}
AS_NAME_WRAP_WIDTH = 25
ROUTER_NAME = os.getenv('BGP_VIS_ROUTER_NAME', 'My Router')
def _bulk_get_as_names(asn_numbers: list[str]):
lookup_list = [asn for asn in asn_numbers if asn not in AS_NAME_CACHE]
if not lookup_list:
return
print(f"Performing bulk lookup for {len(lookup_list)} AS numbers...")
query = "begin\n" + "\n".join(f"AS{asn}" for asn in lookup_list) + "\nend\n"
try:
with socket.create_connection(('bgp.tools', 43), timeout=10) as s:
s.sendall(query.encode('utf-8'))
response_data = b""
while True:
chunk = s.recv(4096)
if not chunk: break
response_data += chunk
response_str = response_data.decode('utf-8')
for line in response_str.splitlines():
parts = line.split('|')
if len(parts) > 1:
asn_num_str = parts[0].strip()
as_name = parts[-1].strip()
if asn_num_str.isdigit(): AS_NAME_CACHE[asn_num_str] = as_name
except (socket.error, socket.timeout) as e:
print(f"Bulk AS name lookup failed: {e}")
for asn in lookup_list:
if asn not in AS_NAME_CACHE: AS_NAME_CACHE[asn] = ""
def _get_bgp_data(ip_address: str, ip_version: str) -> str | None:
print(f"Querying API for {ip_version} address: {ip_address}...")
payload = {"ip_version": ip_version, "bgprouteprefix": ip_address}
headers = {"Content-Type": "application/json"}
try:
response = requests.post(API_URL, json=payload, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
if data.get("success") and data.get("data"): return data["data"]
else: return None
except requests.exceptions.RequestException: return None
def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data)
prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix"
lines = bgp_data.split('\n')
try:
start_index = next(i for i, line in enumerate(lines) if "Paths:" in line) + 1
except StopIteration: return {"nodes": [], "edges": []}
path_blocks, current_block = [], []
for line in lines[start_index:]:
stripped_line = line.strip()
first_word_is_digit = stripped_line and stripped_line.split(' ')[0].replace(',', '').isdigit()
is_local_path = (stripped_line == 'Local')
is_new_path_line = (line.startswith(' ') and not line.startswith(' ') and (first_word_is_digit or is_local_path))
if is_new_path_line:
if current_block: path_blocks.append(current_block)
current_block = [line]
elif stripped_line and current_block:
current_block.append(line)
if current_block: path_blocks.append(current_block)
all_paths, best_path_asns = [], []
for block in path_blocks:
path_line = block[0]
clean_path_line = path_line.strip()
match = re.search(r'[,(]', clean_path_line)
if match: clean_path_line = clean_path_line[:match.start()]
path_asns_raw = re.findall(r'\b(\d+)\b', clean_path_line)
path_asns = list(dict.fromkeys(path_asns_raw))
if path_asns not in all_paths: all_paths.append(path_asns)
if 'best' in "\n".join(block).lower() and not best_path_asns:
best_path_asns = path_asns
if not all_paths and best_path_asns is not None and not best_path_asns:
if any('Local' in block[0] for block in path_blocks):
all_paths.append([])
if 'best' in bgp_data: best_path_asns = []
if not all_paths and best_path_asns is not None and not best_path_asns:
all_paths.append([])
all_asns_in_graph = {asn for path in all_paths for asn in path}
if all_asns_in_graph:
_bulk_get_as_names(list(all_asns_in_graph))
ordered_paths = []
if best_path_asns is not None: ordered_paths.append(best_path_asns)
for path in all_paths:
if path != best_path_asns: ordered_paths.append(path)
node_lanes, node_levels, max_level = {}, {}, 0
Y_SEPARATION, X_SEPARATION = 110, 280
y_lane_alternator = 1
for i, path in enumerate(ordered_paths):
lane_y = 0
if i > 0 or best_path_asns is None:
lane_y = y_lane_alternator * Y_SEPARATION
if y_lane_alternator > 0: y_lane_alternator *= -1
else: y_lane_alternator = (y_lane_alternator * -1) + 1
full_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path]
for node_id in full_chain:
if node_id not in node_lanes: node_lanes[node_id] = lane_y
for path in all_paths:
full_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path]
for level, node_id in enumerate(full_chain):
node_levels[node_id] = max(node_levels.get(node_id, 0), level)
max_level = max(max_level, level)
node_levels[prefix] = max_level + 1
if prefix not in node_lanes: node_lanes[prefix] = 0
nodes, edges = [], []
best_path_node_ids = {f"AS{asn}" for asn in best_path_asns} | {ROUTER_NAME, prefix} if best_path_asns is not None else {ROUTER_NAME, prefix}
all_node_ids = set(node_levels.keys())
for node_id in sorted(list(all_node_ids)):
color = '#FADBD8' if node_id in best_path_node_ids else '#D6DBDF'
label, is_endpoint = node_id, (node_id == ROUTER_NAME or node_id == prefix)
if node_id.startswith('AS'):
asn_number = node_id[2:]
as_name = AS_NAME_CACHE.get(asn_number, "")
if as_name:
wrapped_name = '\n'.join(textwrap.wrap(as_name, width=AS_NAME_WRAP_WIDTH))
label = f"<b>{node_id}</b>\n{wrapped_name}"
else:
label = f"<b>{node_id}</b>"
elif is_endpoint:
label = f"<b>{node_id}</b>"
nodes.append({"id": node_id, "label": label, "color": color, "x": node_levels[node_id] * X_SEPARATION, "y": node_lanes.get(node_id, 0), "fixed": True})
edge_map = {}
def add_edge(u, v, color, width, dashes=False):
edge_tuple = tuple(sorted((u, v)))
if u == v: return
if edge_tuple not in edge_map or color == '#C0392B':
edge_map[edge_tuple] = {"from": u, "to": v, "color": color, "width": width, "dashes": dashes}
if best_path_asns is not None:
path_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in best_path_asns] + [prefix]
for i in range(len(path_chain) - 1): add_edge(path_chain[i], path_chain[i+1], '#C0392B', 3, False)
for path in all_paths:
if path == best_path_asns: continue
path_chain = [ROUTER_NAME] + [f"AS{asn}" for asn in path] + [prefix]
for i in range(len(path_chain) - 1): add_edge(path_chain[i], path_chain[i+1], '#2C3E50', 1, True)
edges = list(edge_map.values())
return {"nodes": nodes, "edges": edges}
def generate_visual_route_graph(ip_address_str: str) -> dict:
if not ip_address_str:
return {"error": "IP address is required."}
try:
ip_obj = ipaddress.ip_address(ip_address_str)
ip_version = f"ipv{ip_obj.version}"
except ValueError:
return {"error": f"Invalid IP address: {ip_address_str}"}
bgp_data = _get_bgp_data(ip_address_str, ip_version)
if not bgp_data:
return {"error": f"Failed to retrieve BGP data for {ip_address_str}."}
graph_data = _parse_bgp_paths_to_graph(bgp_data)
if not graph_data.get("nodes"):
return {"error": "Could not parse valid AS paths from the API response."}
return graph_data