import re import json import socket import textwrap import os import ipaddress from dotenv import load_dotenv load_dotenv() TRANSIT_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_TRANSIT', '').split(','))) IX_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_IX', '').split(','))) CUSTOMER_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_CUSTOMER', '').split(','))) AS_NAME_CACHE = {} AS_NAME_WRAP_WIDTH = 25 ROUTER_NAME_FALLBACK = os.getenv('BGP_VIS_ROUTER_NAME', 'My Router') MAX_IP_CIDR_LENGTH = 45 def _is_valid_ip_or_prefix(target: str) -> bool: try: if '/' in target: ipaddress.ip_network(target, strict=False) else: ipaddress.ip_address(target) return True except ValueError: return False def _validate_ip_prefix_input(ip_address_str: str): if not ip_address_str: return None, "An IP address or prefix is required." if len(ip_address_str) > MAX_IP_CIDR_LENGTH: return None, f"Input exceeds maximum length of {MAX_IP_CIDR_LENGTH} characters." if not _is_valid_ip_or_prefix(ip_address_str): return None, f"Invalid input '{ip_address_str}'. Please provide a valid IPv4/IPv6 address or prefix." return ip_address_str, None def _get_lookup_params(ip_address_str: str): try: if '/' in ip_address_str: net_obj = ipaddress.ip_network(ip_address_str, strict=False) return f"ipv{net_obj.version}", net_obj.with_prefixlen else: ip_obj = ipaddress.ip_address(ip_address_str) return f"ipv{ip_obj.version}", str(ip_obj) except ValueError: return None, None def _bulk_get_as_names(asn_numbers: list[str]): lookup_list = [asn for asn in asn_numbers if asn not in AS_NAME_CACHE] if not lookup_list: return query = "begin\n" + "\n".join(f"AS{asn}" for asn in lookup_list) + "\nend\n" try: with socket.create_connection(('bgp.tools', 43), timeout=10) as s: s.sendall(query.encode('utf-8')) response_data = b"" while True: chunk = s.recv(4096) if not chunk: break response_data += chunk response_str = response_data.decode('utf-8') for line in response_str.splitlines(): parts = line.split('|') if len(parts) > 1: asn_num_str = parts[0].strip() as_name = parts[-1].strip() if asn_num_str.isdigit(): AS_NAME_CACHE[asn_num_str] = as_name except (socket.error, socket.timeout) as e: print(f"Bulk AS name lookup failed: {e}") for asn in lookup_list: if asn not in AS_NAME_CACHE: AS_NAME_CACHE[asn] = "" def _get_bgp_data(ip_address: str, ip_version: str, location_config: dict, bgp_lookup_func) -> str | None: try: api_url = location_config.get("vyos_api_url") api_key = location_config.get("vyos_api_key") vrf_name = location_config.get("bgp_vrf_name") response_data = bgp_lookup_func(api_url, api_key, vrf_name, ip_version, ip_address) if response_data.get("success") and "data" in response_data: return response_data["data"] else: error_message = response_data.get('error', 'Unknown error') print(f"BGP lookup via VyOS API failed for {ip_address}. Error: {error_message}") return None except Exception as e: print(f"An exception occurred during internal BGP lookup: {e}") return None def _parse_bgp_paths_to_graph(bgp_data: str, router_name: str) -> dict: prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data) prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix" lines = bgp_data.split('\n') path_blocks = [] try: paths_header_index = next(i for i, line in enumerate(lines) if "Paths:" in line) current_block = [] for line in lines[paths_header_index + 1:]: stripped_line = line.strip() if not stripped_line: continue is_new_path_line = False if line.startswith(' ') and not line.startswith(' '): first_word = stripped_line.split(' ')[0] cleaned_first_word = first_word.replace(',', '') if cleaned_first_word.isdigit() or stripped_line == "Local": is_new_path_line = True if is_new_path_line: if current_block: path_blocks.append(current_block) current_block = [line] elif current_block: current_block.append(line) if current_block: path_blocks.append(current_block) except StopIteration: return {"nodes": [], "edges": []} all_paths_info, best_path_info = [], None for block in path_blocks: block_text_full = "\n".join(block) clean_lines = [line for line in block if not line.strip().startswith("AddPath ID:")] block_text_for_check = "\n".join(clean_lines) is_best = bool(re.search(r'\bbest\b', block_text_for_check, re.IGNORECASE)) is_multipath = 'multipath' in block_text_for_check.lower() path_line = block[0].strip() path_asns_raw = [] if path_line != "Local": for part in path_line.split(' '): clean_part = part.replace(',', '').strip() if clean_part.isdigit(): path_asns_raw.append(clean_part) else: break path_asns = path_asns_raw local_pref_match = re.search(r'localpref (\d+)', block_text_full) local_pref = int(local_pref_match.group(1)) if local_pref_match else None next_hop_match = re.search(r'^\s*([\da-fA-F:.]+)\s+from', block_text_full, re.MULTILINE) next_hop = next_hop_match.group(1) if next_hop_match else None community_match = re.search(r'Large Community: ([\d:]+)', block_text_full) community = community_match.group(1) if community_match else None category = 'other' if community: if community in TRANSIT_COMMUNITIES: category = 'transit' elif community in IX_COMMUNITIES: category = 'ix' elif community in CUSTOMER_COMMUNITIES: category = 'customer' path_info = {"asns": path_asns, "local_pref": local_pref, "next_hop": next_hop, "is_best": is_best, "is_multipath": is_multipath, "community": community, "category": category} all_paths_info.append(path_info) if is_best and not best_path_info: best_path_info = path_info all_asns_in_graph = {asn for path in all_paths_info for asn in path['asns']} if all_asns_in_graph: _bulk_get_as_names(list(all_asns_in_graph)) ordered_paths = sorted(all_paths_info, key=lambda p: (not p['is_best'], not p['is_multipath'])) nodes, edges = [], [] X_SEPARATION, Y_SEPARATION = 300, 200 max_path_len = max((len(p['asns']) for p in ordered_paths if p['asns']), default=0) if not ordered_paths: return {"nodes": [], "edges": []} nodes.append({"id": router_name, "label": f"{router_name}", "color": '#FADBD8', "x": 0, "y": 0, "fixed": True, "path_category": "global", "is_active": True}) nodes.append({"id": prefix, "label": f"{prefix}", "color": '#FADBD8', "x": (max_path_len + 1) * X_SEPARATION, "y": 0, "fixed": True, "path_category": "global", "is_active": True}) y_pos_counter_up, y_pos_counter_down = 1, 1 for i, path_info in enumerate(ordered_paths): lane_y = 0 if i > 0: if y_pos_counter_up <= y_pos_counter_down: lane_y = y_pos_counter_up * Y_SEPARATION; y_pos_counter_up += 1 else: lane_y = -y_pos_counter_down * Y_SEPARATION; y_pos_counter_down += 1 style, is_active_path = {}, False if path_info['is_best']: style = {"node_color": '#FADBD8', "edge_color": '#C0392B', "width": 3, "dashes": False, "path_type": " (best)"}; is_active_path = True elif path_info['is_multipath']: style = {"node_color": '#FDEBD0', "edge_color": '#F39C12', "width": 2, "dashes": False, "path_type": " (multipath)"}; is_active_path = True else: style = {"node_color": '#D6DBDF', "edge_color": '#2C3E50', "width": 1, "dashes": True, "path_type": ""}; is_active_path = False path_node_ids = [] for j, asn in enumerate(path_info['asns']): unique_node_id = f"AS{asn}-{i}-{j}" path_node_ids.append(unique_node_id) as_name = AS_NAME_CACHE.get(asn, ""); wrapped_name = '\n'.join(textwrap.wrap(as_name, width=AS_NAME_WRAP_WIDTH)) if as_name else "" base_label = f"AS{asn}" if j == 0 and path_info['local_pref'] is not None: base_label += f" (LP: {path_info['local_pref']})" label = f"{base_label}\n{wrapped_name}" if j == 0 and path_info['next_hop']: label += f"\nNext Hop: {path_info['next_hop']}{style['path_type']}" nodes.append({"id": unique_node_id, "label": label, "color": style['node_color'], "x": (j + 1) * X_SEPARATION, "y": lane_y, "fixed": True, "path_category": path_info['category'], "is_active": is_active_path}) full_chain = [router_name] + path_node_ids + [prefix] smooth_config = {"enabled": True, "type": "cubicBezier", "forceDirection": "horizontal", "roundness": 0.85} for j in range(len(full_chain) - 1): edges.append({"from": full_chain[j], "to": full_chain[j+1], "color": style['edge_color'], "width": style['width'], "dashes": style['dashes'], "path_category": path_info['category'], "is_active": is_active_path, "smooth": smooth_config}) return {"nodes": nodes, "edges": edges, "path_count": len(ordered_paths)} def get_raw_bgp_route(ip_address_str: str, location_config: dict, bgp_lookup_func) -> tuple[str | None, str | None]: ip_address_str, error = _validate_ip_prefix_input(ip_address_str) if error: return None, error ip_version, address_to_lookup = _get_lookup_params(ip_address_str) if not address_to_lookup: return None, f"Invalid input '{ip_address_str}'." bgp_data = _get_bgp_data(address_to_lookup, ip_version, location_config, bgp_lookup_func) if not bgp_data: return None, f"Route information not found for: {address_to_lookup}" return bgp_data, None def generate_visual_route_graph(ip_address_str: str, location_config: dict, bgp_lookup_func) -> dict: router_name = location_config.get('router_name', ROUTER_NAME_FALLBACK) ip_address_str, error = _validate_ip_prefix_input(ip_address_str) if error: return {"error": error} ip_version, address_to_lookup = _get_lookup_params(ip_address_str) if not address_to_lookup: return {"error": f"Invalid input '{ip_address_str}'."} bgp_data = _get_bgp_data(address_to_lookup, ip_version, location_config, bgp_lookup_func) if not bgp_data: return {"not_found": True, "target": address_to_lookup} graph_data = _parse_bgp_paths_to_graph(bgp_data, router_name) if not graph_data.get("nodes"): return {"not_found": True, "target": address_to_lookup} return graph_data