Upload files to "modules"
This commit is contained in:
parent
79615a7de4
commit
dbcf644146
2 changed files with 240 additions and 0 deletions
39
modules/network_tools.py
Normal file
39
modules/network_tools.py
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
import subprocess
|
||||
import re
|
||||
|
||||
def execute_command_streaming(method, target):
|
||||
commands = {
|
||||
'ping': ['ping', '-c', '4', target],
|
||||
'ping6': ['ping', '-c', '4', target],
|
||||
'mtr': ['mtr', '-w', '-r', '-c', '10', target],
|
||||
'mtr6': ['mtr', '-w', '-r', '-c', '10', target],
|
||||
'traceroute': ['traceroute', '-n', target],
|
||||
'traceroute6': ['traceroute', '-n', target],
|
||||
}
|
||||
|
||||
if method not in commands:
|
||||
yield f"Error: Invalid method '{method}'."
|
||||
return
|
||||
|
||||
command = commands[method]
|
||||
|
||||
try:
|
||||
proc = subprocess.Popen(
|
||||
command,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
bufsize=1,
|
||||
universal_newlines=True
|
||||
)
|
||||
|
||||
for line in iter(proc.stdout.readline, ''):
|
||||
yield line
|
||||
|
||||
proc.stdout.close()
|
||||
proc.wait()
|
||||
|
||||
except FileNotFoundError:
|
||||
yield f"Error: Command '{command[0]}' not found. Is it installed on the server?"
|
||||
except Exception as e:
|
||||
yield f"An unexpected error occurred: {e}"
|
||||
201
modules/visual_route.py
Normal file
201
modules/visual_route.py
Normal file
|
|
@ -0,0 +1,201 @@
|
|||
import requests
|
||||
import re
|
||||
import json
|
||||
import socket
|
||||
import textwrap
|
||||
import os
|
||||
import ipaddress
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
TRANSIT_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_TRANSIT', '').split(',')))
|
||||
IX_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_IX', '').split(',')))
|
||||
CUSTOMER_COMMUNITIES = set(filter(None, os.getenv('BGP_COMMUNITIES_CUSTOMER', '').split(',')))
|
||||
|
||||
API_URL = os.getenv('BGP_API_URL', 'http://127.0.0.1:5000/bgp-route/lookup')
|
||||
AS_NAME_CACHE = {}
|
||||
AS_NAME_WRAP_WIDTH = 25
|
||||
ROUTER_NAME = os.getenv('BGP_VIS_ROUTER_NAME', 'My Router')
|
||||
MAX_IP_CIDR_LENGTH = 45
|
||||
|
||||
def _bulk_get_as_names(asn_numbers: list[str]):
|
||||
lookup_list = [asn for asn in asn_numbers if asn not in AS_NAME_CACHE]
|
||||
if not lookup_list: return
|
||||
query = "begin\n" + "\n".join(f"AS{asn}" for asn in lookup_list) + "\nend\n"
|
||||
try:
|
||||
with socket.create_connection(('bgp.tools', 43), timeout=10) as s:
|
||||
s.sendall(query.encode('utf-8'))
|
||||
response_data = b""
|
||||
while True:
|
||||
chunk = s.recv(4096)
|
||||
if not chunk: break
|
||||
response_data += chunk
|
||||
response_str = response_data.decode('utf-8')
|
||||
for line in response_str.splitlines():
|
||||
parts = line.split('|')
|
||||
if len(parts) > 1:
|
||||
asn_num_str = parts[0].strip()
|
||||
as_name = parts[-1].strip()
|
||||
if asn_num_str.isdigit(): AS_NAME_CACHE[asn_num_str] = as_name
|
||||
except (socket.error, socket.timeout) as e:
|
||||
print(f"Bulk AS name lookup failed: {e}")
|
||||
for asn in lookup_list:
|
||||
if asn not in AS_NAME_CACHE: AS_NAME_CACHE[asn] = ""
|
||||
|
||||
def _get_bgp_data(ip_address: str, ip_version: str) -> str | None:
|
||||
payload = {"ip_version": ip_version, "bgprouteprefix": ip_address}
|
||||
headers = {"Content-Type": "application/json"}
|
||||
try:
|
||||
response = requests.post(API_URL, json=payload, headers=headers, timeout=10)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
if data.get("success") and data.get("data"): return data["data"]
|
||||
else: return None
|
||||
except requests.exceptions.RequestException: return None
|
||||
|
||||
def _parse_bgp_paths_to_graph(bgp_data: str) -> dict:
|
||||
prefix_match = re.search(r"BGP routing table entry for ([\w:./-]+)", bgp_data)
|
||||
prefix = prefix_match.group(1) if prefix_match else "Unknown Prefix"
|
||||
lines = bgp_data.split('\n')
|
||||
|
||||
path_blocks = []
|
||||
try:
|
||||
paths_header_index = next(i for i, line in enumerate(lines) if "Paths:" in line)
|
||||
current_block = []
|
||||
for line in lines[paths_header_index + 1:]:
|
||||
stripped_line = line.strip()
|
||||
if not stripped_line: continue
|
||||
|
||||
is_new_path_line = False
|
||||
if line.startswith(' ') and not line.startswith(' '):
|
||||
first_word = stripped_line.split(' ')[0]
|
||||
|
||||
cleaned_first_word = first_word.replace(',', '')
|
||||
if cleaned_first_word.isdigit() or stripped_line == "Local":
|
||||
is_new_path_line = True
|
||||
|
||||
if is_new_path_line:
|
||||
if current_block: path_blocks.append(current_block)
|
||||
current_block = [line]
|
||||
elif current_block:
|
||||
current_block.append(line)
|
||||
|
||||
if current_block: path_blocks.append(current_block)
|
||||
except StopIteration:
|
||||
return {"nodes": [], "edges": []}
|
||||
|
||||
all_paths_info, best_path_info = [], None
|
||||
for block in path_blocks:
|
||||
block_text_full = "\n".join(block)
|
||||
clean_lines = [line for line in block if not line.strip().startswith("AddPath ID:")]
|
||||
block_text_for_check = "\n".join(clean_lines)
|
||||
is_best = bool(re.search(r'\bbest\b', block_text_for_check, re.IGNORECASE))
|
||||
is_multipath = 'multipath' in block_text_for_check.lower()
|
||||
|
||||
path_line = block[0].strip()
|
||||
path_asns_raw = []
|
||||
if path_line != "Local":
|
||||
for part in path_line.split(' '):
|
||||
clean_part = part.replace(',', '').strip()
|
||||
if clean_part.isdigit():
|
||||
path_asns_raw.append(clean_part)
|
||||
else:
|
||||
break
|
||||
|
||||
path_asns = path_asns_raw
|
||||
|
||||
local_pref_match = re.search(r'localpref (\d+)', block_text_full)
|
||||
local_pref = int(local_pref_match.group(1)) if local_pref_match else None
|
||||
next_hop_match = re.search(r'^\s*([\da-fA-F:.]+)\s+from', block_text_full, re.MULTILINE)
|
||||
next_hop = next_hop_match.group(1) if next_hop_match else None
|
||||
community_match = re.search(r'Large Community: ([\d:]+)', block_text_full)
|
||||
community = community_match.group(1) if community_match else None
|
||||
category = 'other'
|
||||
if community:
|
||||
if community in TRANSIT_COMMUNITIES: category = 'transit'
|
||||
elif community in IX_COMMUNITIES: category = 'ix'
|
||||
elif community in CUSTOMER_COMMUNITIES: category = 'customer'
|
||||
path_info = {"asns": path_asns, "local_pref": local_pref, "next_hop": next_hop, "is_best": is_best, "is_multipath": is_multipath, "community": community, "category": category}
|
||||
all_paths_info.append(path_info)
|
||||
if is_best and not best_path_info:
|
||||
best_path_info = path_info
|
||||
|
||||
all_asns_in_graph = {asn for path in all_paths_info for asn in path['asns']}
|
||||
if all_asns_in_graph:
|
||||
_bulk_get_as_names(list(all_asns_in_graph))
|
||||
|
||||
ordered_paths = sorted(all_paths_info, key=lambda p: (not p['is_best'], not p['is_multipath']))
|
||||
|
||||
nodes, edges = [], []
|
||||
X_SEPARATION, Y_SEPARATION = 300, 200
|
||||
max_path_len = max((len(p['asns']) for p in ordered_paths if p['asns']), default=0)
|
||||
|
||||
if not ordered_paths:
|
||||
return {"nodes": [], "edges": []}
|
||||
|
||||
nodes.append({"id": ROUTER_NAME, "label": f"<b>{ROUTER_NAME}</b>", "color": '#FADBD8', "x": 0, "y": 0, "fixed": True, "path_category": "global", "is_active": True})
|
||||
nodes.append({"id": prefix, "label": f"<b>{prefix}</b>", "color": '#FADBD8', "x": (max_path_len + 1) * X_SEPARATION, "y": 0, "fixed": True, "path_category": "global", "is_active": True})
|
||||
|
||||
y_pos_counter_up, y_pos_counter_down = 1, 1
|
||||
for i, path_info in enumerate(ordered_paths):
|
||||
lane_y = 0
|
||||
if i > 0:
|
||||
if y_pos_counter_up <= y_pos_counter_down: lane_y = y_pos_counter_up * Y_SEPARATION; y_pos_counter_up += 1
|
||||
else: lane_y = -y_pos_counter_down * Y_SEPARATION; y_pos_counter_down += 1
|
||||
|
||||
style, is_active_path = {}, False
|
||||
if path_info['is_best']:
|
||||
style = {"node_color": '#FADBD8', "edge_color": '#C0392B', "width": 3, "dashes": False, "path_type": " (best)"}; is_active_path = True
|
||||
elif path_info['is_multipath']:
|
||||
style = {"node_color": '#FDEBD0', "edge_color": '#F39C12', "width": 2, "dashes": False, "path_type": " (multipath)"}; is_active_path = True
|
||||
else:
|
||||
style = {"node_color": '#D6DBDF', "edge_color": '#2C3E50', "width": 1, "dashes": True, "path_type": ""}; is_active_path = False
|
||||
|
||||
path_node_ids = []
|
||||
for j, asn in enumerate(path_info['asns']):
|
||||
unique_node_id = f"AS{asn}-{i}-{j}"
|
||||
path_node_ids.append(unique_node_id)
|
||||
as_name = AS_NAME_CACHE.get(asn, ""); wrapped_name = '\n'.join(textwrap.wrap(as_name, width=AS_NAME_WRAP_WIDTH)) if as_name else ""
|
||||
base_label = f"<b>AS{asn}</b>"
|
||||
if j == 0 and path_info['local_pref'] is not None: base_label += f" (LP: {path_info['local_pref']})"
|
||||
label = f"{base_label}\n{wrapped_name}"
|
||||
if j == 0 and path_info['next_hop']: label += f"\n<i>Next Hop: {path_info['next_hop']}{style['path_type']}</i>"
|
||||
nodes.append({"id": unique_node_id, "label": label, "color": style['node_color'], "x": (j + 1) * X_SEPARATION, "y": lane_y, "fixed": True, "path_category": path_info['category'], "is_active": is_active_path})
|
||||
|
||||
full_chain = [ROUTER_NAME] + path_node_ids + [prefix]
|
||||
smooth_config = {"enabled": True, "type": "cubicBezier", "forceDirection": "horizontal", "roundness": 0.85}
|
||||
for j in range(len(full_chain) - 1):
|
||||
edges.append({"from": full_chain[j], "to": full_chain[j+1], "color": style['edge_color'], "width": style['width'], "dashes": style['dashes'], "path_category": path_info['category'], "is_active": is_active_path, "smooth": smooth_config})
|
||||
|
||||
return {"nodes": nodes, "edges": edges, "path_count": len(ordered_paths)}
|
||||
|
||||
|
||||
def generate_visual_route_graph(ip_address_str: str) -> dict:
|
||||
if not ip_address_str:
|
||||
return {"error": "An IP address or prefix is required."}
|
||||
|
||||
if len(ip_address_str) > MAX_IP_CIDR_LENGTH:
|
||||
return {"error": f"Input exceeds maximum length of {MAX_IP_CIDR_LENGTH} characters."}
|
||||
|
||||
address_to_lookup, ip_version = "", ""
|
||||
try:
|
||||
if '/' in ip_address_str:
|
||||
net_obj = ipaddress.ip_network(ip_address_str, strict=False)
|
||||
ip_version, address_to_lookup = f"ipv{net_obj.version}", net_obj.with_prefixlen
|
||||
else:
|
||||
ip_obj = ipaddress.ip_address(ip_address_str)
|
||||
ip_version, address_to_lookup = f"ipv{ip_obj.version}", str(ip_obj)
|
||||
except ValueError:
|
||||
return {"error": f"Invalid input '{ip_address_str}'. Please provide a valid IPv4/IPv6 address or prefix."}
|
||||
|
||||
bgp_data = _get_bgp_data(address_to_lookup, ip_version)
|
||||
if not bgp_data:
|
||||
return {"not_found": True, "target": address_to_lookup}
|
||||
|
||||
graph_data = _parse_bgp_paths_to_graph(bgp_data)
|
||||
|
||||
if not graph_data.get("nodes"):
|
||||
return {"not_found": True, "target": address_to_lookup}
|
||||
|
||||
return graph_data
|
||||
Loading…
Add table
Add a link
Reference in a new issue