diff --git a/modules/__init__.py b/modules/__init__.py
new file mode 100644
index 0000000..06d7405
Binary files /dev/null and b/modules/__init__.py differ
diff --git a/modules/bgp.py b/modules/bgp.py
new file mode 100644
index 0000000..a4101dc
--- /dev/null
+++ b/modules/bgp.py
@@ -0,0 +1,84 @@
+def parse_bgp_data(data):
+ ipv4_section = ""
+ ipv6_section = ""
+ ipv4_info = {}
+ ipv6_info = {}
+
+ if "data" in data:
+ raw_data = data["data"]
+
+ ipv4_marker = "IPv4 Unicast Summary (VRF bgp):"
+ ipv6_marker = "IPv6 Unicast Summary (VRF bgp):"
+
+ ipv4_start = raw_data.find(ipv4_marker)
+ ipv6_start = raw_data.find(ipv6_marker)
+
+ if ipv4_start != -1:
+ if ipv6_start != -1:
+ ipv4_section = raw_data[ipv4_start + len(ipv4_marker):ipv6_start].strip()
+ else:
+ ipv4_section = raw_data[ipv4_start + len(ipv4_marker):].strip()
+ ipv4_info = extract_bgp_info(ipv4_section)
+
+ if ipv6_start != -1:
+ ipv6_section = raw_data[ipv6_start + len(ipv6_marker):].strip()
+ ipv6_info = extract_bgp_info(ipv6_section)
+
+ def process_peers(peer_data):
+ peers = []
+ for line in peer_data.split("\n"):
+ if line.strip().startswith("Neighbor"):
+ continue
+ if line.strip():
+ peer_info = line.split()
+ if len(peer_info) >= 12:
+ peers.append({
+ "neighbor": peer_info[0],
+ "version": peer_info[1],
+ "as_number": peer_info[2],
+ "msg_received": peer_info[3],
+ "msg_sent": peer_info[4],
+ "table_version": peer_info[5],
+ "in_queue": peer_info[6],
+ "out_queue": peer_info[7],
+ "up_down": peer_info[8],
+ "state_pfx_rcd": peer_info[9],
+ "prefix_sent": peer_info[10],
+ "description": " ".join(peer_info[11:])
+ })
+ return peers
+
+ ipv4_peers = process_peers(ipv4_section)
+ ipv6_peers = process_peers(ipv6_section)
+
+ return ipv4_info, ipv4_peers, ipv6_info, ipv6_peers
+
+def extract_bgp_info(raw_data):
+ lines = raw_data.split("\n")
+ info = {}
+ for line in lines:
+ if "BGP router identifier" in line:
+ parts = line.split(",")
+ info["router_id"] = parts[0].split("identifier")[1].strip()
+ info["local_as"] = parts[1].split("number")[1].strip().split(" ")[0]
+ if "vrf-id" in parts[-1]:
+ info["vrf_id"] = parts[-1].split("vrf-id")[1].strip()
+ if "BGP table version" in line:
+ info["table_version"] = line.split("version")[1].strip()
+ if "RIB entries" in line:
+ parts = line.split(",")
+ info["rib_entries"] = parts[0].split("entries")[1].strip()
+ info["rib_memory"] = parts[1].split("using")[1].strip()
+ if "Peers" in line:
+ parts = line.split(",")
+ info["peers"] = parts[0].split("Peers")[1].strip()
+ info["peers_memory"] = parts[1].split("using")[1].strip()
+ return info
+
+def generate_bgp_json(ipv4_info, ipv4_peers, ipv6_info, ipv6_peers):
+ return {
+ "ipv4_info": ipv4_info,
+ "ipv4_peers": ipv4_peers,
+ "ipv6_info": ipv6_info,
+ "ipv6_peers": ipv6_peers
+ }
\ No newline at end of file
diff --git a/modules/parse.py b/modules/parse.py
new file mode 100644
index 0000000..209b354
--- /dev/null
+++ b/modules/parse.py
@@ -0,0 +1,58 @@
+import subprocess
+import json
+import os
+from dotenv import load_dotenv
+
+load_dotenv()
+
+VYOS_API_URL = os.getenv("VYOS_API_URL")
+VYOS_API_KEY = os.getenv("VYOS_API_KEY")
+
+def run_bgp_curl_command():
+ curl_command = [
+ "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
+ "--form", "data={\"op\": \"show\", \"path\": [\"bgp\", \"vrf\", \"bgp\", \"summ\"]}",
+ "--form", f"key={VYOS_API_KEY}"
+ ]
+ response = subprocess.check_output(curl_command, text=True)
+ return json.loads(response)
+
+def run_arp_curl_command():
+ curl_command = [
+ "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
+ "--form", "data={\"op\": \"show\", \"path\": [\"arp\"]}",
+ "--form", f"key={VYOS_API_KEY}"
+ ]
+ response = subprocess.check_output(curl_command, text=True)
+ return json.loads(response)
+
+def run_neighbors_curl_command():
+ curl_command = [
+ "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
+ "--form", "data={\"op\": \"show\", \"path\": [\"ipv6\", \"neighbors\"]}",
+ "--form", f"key={VYOS_API_KEY}"
+ ]
+ response = subprocess.check_output(curl_command, text=True)
+ return json.loads(response)
+
+def run_interfaces_curl_command():
+ curl_command = [
+ "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
+ "--form", "data={\"op\": \"show\", \"path\": [\"interfaces\"]}",
+ "--form", f"key={VYOS_API_KEY}"
+ ]
+ response = subprocess.check_output(curl_command, text=True)
+ return json.loads(response)
+
+def run_bgp_route_curl_command(ip_version, bgprouteprefix):
+ data_json = {
+ "op": "show",
+ "path": ["bgp", "vrf", "bgp", ip_version, bgprouteprefix]
+ }
+ curl_command = [
+ "curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
+ "--form", f"data={json.dumps(data_json)}",
+ "--form", f"key={VYOS_API_KEY}"
+ ]
+ response = subprocess.check_output(curl_command, text=True)
+ return json.loads(response)
\ No newline at end of file
diff --git a/modules/visual_route.py b/modules/visual_route.py
new file mode 100644
index 0000000..611b55f
--- /dev/null
+++ b/modules/visual_route.py
@@ -0,0 +1,189 @@
+import requests
+import re
+import json
+import socket
+import textwrap
+import os
+import ipaddress
+from dotenv import load_dotenv
+
+load_dotenv()
+
+TRANSIT_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_TRANSIT', '').split(',')))
+IX_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_IX', '').split(',')))
+CUSTOMER_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_CUSTOMER', '').split(',')))
+
+API_URL = os.getenv('BGP_API_URL', 'http://127.0.0.1:5000/bgp-route/lookup')
+AS_NAME_CACHE = {}
+AS_NAME_WRAP_WIDTH = 25
+ROUTER_NAME = os.getenv('BGP_VIS_ROUTER_NAME', 'My Router')
+
+def _bulk_get_as_names(asn_numbers: list[str]):
+ lookup_list = [asn for asn in asn_numbers if asn not in AS_NAME_CACHE]
+ if not lookup_list: return
+ print(f"Performing bulk lookup for {len(lookup_list)} AS numbers...")
+ query = "begin\n" + "\n".join(f"AS{asn}" for asn in lookup_list) + "\nend\n"
+ try:
+ with socket.create_connection(('bgp.tools', 43), timeout=10) as s:
+ s.sendall(query.encode('utf-8'))
+ response_data = b""
+ while True:
+ chunk = s.recv(4096)
+ if not chunk: break
+ response_data += chunk
+ response_str = response_data.decode('utf-8')
+ for line in response_str.splitlines():
+ parts = line.split('|')
+ if len(parts) > 1:
+ asn_num_str = parts[0].strip()
+ as_name = parts[-1].strip()
+ if asn_num_str.isdigit(): AS_NAME_CACHE[asn_num_str] = as_name
+ except (socket.error, socket.timeout) as e:
+ print(f"Bulk AS name lookup failed: {e}")
+ for asn in lookup_list:
+ if asn not in AS_NAME_CACHE: AS_NAME_CACHE[asn] = ""
+
+def _get_bgp_data(ip_address: str, ip_version: str) -> str | None:
+ print(f"Querying API for {ip_version} address: {ip_address}...")
+ payload = {"ip_version": ip_version, "bgprouteprefix": ip_address}
+ headers = {"Content-Type": "application/json"}
+ try:
+ response = requests.post(API_URL, json=payload, headers=headers, timeout=10)
+ response.raise_for_status()
+ data = response.json()
+ if data.get("success") and data.get("data"): return data["data"]
+ else: return None
+ except requests.exceptions.RequestException: return None
+
+def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
+ prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data)
+ prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix"
+ lines = bgp_data.split('\n')
+
+ path_blocks = []
+ try:
+ paths_header_index = next(i for i, line in enumerate(lines) if "Paths:" in line)
+ current_block = []
+ for line in lines[paths_header_index + 1:]:
+ stripped_line = line.strip()
+ if not stripped_line: continue
+
+ is_new_path_line = False
+ if line.startswith(' ') and not line.startswith(' '):
+ first_word = stripped_line.split(' ')[0].replace(',', '')
+ if first_word.isdigit():
+ is_new_path_line = True
+
+ if is_new_path_line:
+ if current_block: path_blocks.append(current_block)
+ current_block = [line]
+ elif current_block:
+ current_block.append(line)
+
+ if current_block: path_blocks.append(current_block)
+ except StopIteration:
+ return {"nodes": [], "edges": []}
+
+ all_paths_info = []
+ best_path_info = None
+ for block in path_blocks:
+ block_text = "\n".join(block)
+ clean_lines = [line for line in block if not line.strip().startswith("AddPath ID:")]
+ block_text_for_check = "\n".join(clean_lines)
+ is_best = bool(re.search(r'\bbest\b', block_text_for_check, re.IGNORECASE))
+ is_multipath = 'multipath' in block_text_for_check.lower()
+
+ path_line = block[0].strip()
+ path_asns_raw = []
+ for part in path_line.split(' '):
+ clean_part = part.replace(',', '').strip()
+ if clean_part.isdigit():
+ path_asns_raw.append(clean_part)
+ else:
+ break
+ path_asns = list(dict.fromkeys(path_asns_raw))
+
+ local_pref_match = re.search(r'localpref (\d+)', block_text)
+ local_pref = int(local_pref_match.group(1)) if local_pref_match else None
+ next_hop_match = re.search(r'^\s*([\da-fA-F:.]+)\s+from', block_text, re.MULTILINE)
+ next_hop = next_hop_match.group(1) if next_hop_match else None
+ community_match = re.search(r'Large Community: ([\d:]+)', block_text)
+ community = community_match.group(1) if community_match else None
+ category = 'other'
+ if community:
+ if community in TRANSIT_COMMUNITIES: category = 'transit'
+ elif community in IX_COMMUNITIES: category = 'ix'
+ elif community in CUSTOMER_COMMUNITIES: category = 'customer'
+ path_info = {"asns": path_asns, "local_pref": local_pref, "next_hop": next_hop, "is_best": is_best, "is_multipath": is_multipath, "community": community, "category": category}
+ all_paths_info.append(path_info)
+ if is_best and not best_path_info:
+ best_path_info = path_info
+
+ all_asns_in_graph = {asn for path in all_paths_info for asn in path['asns']}
+ if all_asns_in_graph:
+ _bulk_get_as_names(list(all_asns_in_graph))
+
+ ordered_paths = sorted(all_paths_info, key=lambda p: (not p['is_best'], not p['is_multipath']))
+
+ nodes, edges = [], []
+ X_SEPARATION, Y_SEPARATION = 300, 200
+ max_path_len = max((len(p['asns']) for p in ordered_paths if p['asns']), default=0)
+ nodes.append({"id": ROUTER_NAME, "label": f"{ROUTER_NAME}", "color": '#FADBD8', "x": 0, "y": 0, "fixed": True, "path_category": "global", "is_active": True})
+ nodes.append({"id": prefix, "label": f"{prefix}", "color": '#FADBD8', "x": (max_path_len + 1) * X_SEPARATION, "y": 0, "fixed": True, "path_category": "global", "is_active": True})
+
+ y_pos_counter_up, y_pos_counter_down = 1, 1
+ for i, path_info in enumerate(ordered_paths):
+ lane_y = 0
+ if i > 0:
+ if y_pos_counter_up <= y_pos_counter_down: lane_y = y_pos_counter_up * Y_SEPARATION; y_pos_counter_up += 1
+ else: lane_y = -y_pos_counter_down * Y_SEPARATION; y_pos_counter_down += 1
+
+ style, is_active_path = {}, False
+ if path_info['is_best']:
+ style = {"node_color": '#FADBD8', "edge_color": '#C0392B', "width": 3, "dashes": False, "path_type": " (best)"}; is_active_path = True
+ elif path_info['is_multipath']:
+ style = {"node_color": '#FDEBD0', "edge_color": '#F39C12', "width": 2, "dashes": False, "path_type": " (multipath)"}; is_active_path = True
+ else:
+ style = {"node_color": '#D6DBDF', "edge_color": '#2C3E50', "width": 1, "dashes": True, "path_type": ""}; is_active_path = False
+
+ path_node_ids = []
+ for j, asn in enumerate(path_info['asns']):
+ unique_node_id = f"AS{asn}-{i}"
+ path_node_ids.append(unique_node_id)
+ as_name = AS_NAME_CACHE.get(asn, ""); wrapped_name = '\n'.join(textwrap.wrap(as_name, width=AS_NAME_WRAP_WIDTH)) if as_name else ""
+ base_label = f"AS{asn}"
+ if j == 0 and path_info['local_pref'] is not None: base_label += f" (LP: {path_info['local_pref']})"
+ label = f"{base_label}\n{wrapped_name}"
+ if j == 0 and path_info['next_hop']: label += f"\nNext Hop: {path_info['next_hop']}{style['path_type']}"
+ nodes.append({"id": unique_node_id, "label": label, "color": style['node_color'], "x": (j + 1) * X_SEPARATION, "y": lane_y, "fixed": True, "path_category": path_info['category'], "is_active": is_active_path})
+
+ full_chain = [ROUTER_NAME] + path_node_ids + [prefix]
+ smooth_config = {"enabled": True, "type": "cubicBezier", "forceDirection": "horizontal", "roundness": 0.85}
+ for j in range(len(full_chain) - 1):
+ edges.append({"from": full_chain[j], "to": full_chain[j+1], "color": style['edge_color'], "width": style['width'], "dashes": style['dashes'], "path_category": path_info['category'], "is_active": is_active_path, "smooth": smooth_config})
+
+ return {"nodes": nodes, "edges": edges, "path_count": len(ordered_paths)}
+
+def generate_visual_route_graph(ip_address_str: str) -> dict:
+ if not ip_address_str:
+ return {"error": "IP address is required."}
+ address_to_lookup, ip_version = "", ""
+ if '/' in ip_address_str:
+ try:
+ net_obj = ipaddress.ip_network(ip_address_str, strict=False)
+ ip_version, address_to_lookup = f"ipv{net_obj.version}", net_obj.with_prefixlen
+ except ValueError:
+ return {"error": f"Invalid CIDR notation: {ip_address_str}"}
+ else:
+ try:
+ ip_obj = ipaddress.ip_address(ip_address_str)
+ ip_version, address_to_lookup = f"ipv{ip_obj.version}", ip_address_str
+ except ValueError:
+ return {"error": f"Invalid IP address: {ip_address_str}"}
+ bgp_data = _get_bgp_data(address_to_lookup, ip_version)
+ if not bgp_data:
+ return {"error": f"Failed to retrieve BGP data for {address_to_lookup}."}
+ graph_data = _parse_bgp_paths_to_graph(bgp_data)
+ if not graph_data.get("nodes"):
+ return {"error": "Could not parse valid AS paths from the API response."}
+ return graph_data
\ No newline at end of file