This commit is contained in:
Blackwhitebear8 2025-06-22 16:33:28 +02:00
commit 5e31dd0214
37 changed files with 2082 additions and 0 deletions

0
modules/__init__.py Normal file
View file

17
modules/akvorado.py Normal file
View file

@ -0,0 +1,17 @@
import os
import requests
from dotenv import load_dotenv
load_dotenv()
BASE_URL = os.getenv("AKVORADO_BASE_URL", "http://localhost:8081/api/v0/console/widget")
def get_widget_data(endpoint: str):
url = f"{BASE_URL}/{endpoint}?0"
try:
res = requests.get(url)
res.raise_for_status()
return res.json()
except requests.RequestException as e:
print(f"Fout bij ophalen van {endpoint}: {e}")
return {"top": []}

21
modules/arp.py Normal file
View file

@ -0,0 +1,21 @@
def parse_arp_data(data):
arp_table = []
if "data" in data:
raw_data = data["data"]
for line in raw_data.split("\n"):
if line.strip() and not line.startswith("Address") and "---" not in line:
arp_info = line.split()
if len(arp_info) >= 4:
arp_table.append({
"address": arp_info[0],
"interface": arp_info[1],
"link_layer_address": arp_info[2],
"state": arp_info[3]
})
return arp_table
def generate_arp_json(arp_table):
return {"arp_table": arp_table}

84
modules/bgp.py Normal file
View file

@ -0,0 +1,84 @@
def parse_bgp_data(data):
ipv4_section = ""
ipv6_section = ""
ipv4_info = {}
ipv6_info = {}
if "data" in data:
raw_data = data["data"]
ipv4_marker = "IPv4 Unicast Summary (VRF bgp):"
ipv6_marker = "IPv6 Unicast Summary (VRF bgp):"
ipv4_start = raw_data.find(ipv4_marker)
ipv6_start = raw_data.find(ipv6_marker)
if ipv4_start != -1:
if ipv6_start != -1:
ipv4_section = raw_data[ipv4_start + len(ipv4_marker):ipv6_start].strip()
else:
ipv4_section = raw_data[ipv4_start + len(ipv4_marker):].strip()
ipv4_info = extract_bgp_info(ipv4_section)
if ipv6_start != -1:
ipv6_section = raw_data[ipv6_start + len(ipv6_marker):].strip()
ipv6_info = extract_bgp_info(ipv6_section)
def process_peers(peer_data):
peers = []
for line in peer_data.split("\n"):
if line.strip().startswith("Neighbor"):
continue
if line.strip():
peer_info = line.split()
if len(peer_info) >= 12:
peers.append({
"neighbor": peer_info[0],
"version": peer_info[1],
"as_number": peer_info[2],
"msg_received": peer_info[3],
"msg_sent": peer_info[4],
"table_version": peer_info[5],
"in_queue": peer_info[6],
"out_queue": peer_info[7],
"up_down": peer_info[8],
"state_pfx_rcd": peer_info[9],
"prefix_sent": peer_info[10],
"description": " ".join(peer_info[11:])
})
return peers
ipv4_peers = process_peers(ipv4_section)
ipv6_peers = process_peers(ipv6_section)
return ipv4_info, ipv4_peers, ipv6_info, ipv6_peers
def extract_bgp_info(raw_data):
lines = raw_data.split("\n")
info = {}
for line in lines:
if "BGP router identifier" in line:
parts = line.split(",")
info["router_id"] = parts[0].split("identifier")[1].strip()
info["local_as"] = parts[1].split("number")[1].strip().split(" ")[0]
if "vrf-id" in parts[-1]:
info["vrf_id"] = parts[-1].split("vrf-id")[1].strip()
if "BGP table version" in line:
info["table_version"] = line.split("version")[1].strip()
if "RIB entries" in line:
parts = line.split(",")
info["rib_entries"] = parts[0].split("entries")[1].strip()
info["rib_memory"] = parts[1].split("using")[1].strip()
if "Peers" in line:
parts = line.split(",")
info["peers"] = parts[0].split("Peers")[1].strip()
info["peers_memory"] = parts[1].split("using")[1].strip()
return info
def generate_bgp_json(ipv4_info, ipv4_peers, ipv6_info, ipv6_peers):
return {
"ipv4_info": ipv4_info,
"ipv4_peers": ipv4_peers,
"ipv6_info": ipv6_info,
"ipv6_peers": ipv6_peers
}

30
modules/interfaces.py Normal file
View file

@ -0,0 +1,30 @@
def parse_interface_data(data):
interface_table = []
if "data" in data:
raw_data = data["data"]
for line in raw_data.split("\n"):
if line.startswith("Interface") and "IP Address" in line:
continue
if line.startswith("Codes:"):
continue
if line.strip().startswith('-'):
continue
if line.strip():
interface_info = line.split()
if len(interface_info) >= 6:
interface_table.append({
"interface": interface_info[0],
"ip_address": interface_info[1] if interface_info[1] != '-' else 'N/A',
"mac_address": interface_info[2],
"vrf": interface_info[3],
"mtu": interface_info[4],
"status": interface_info[5],
"description": " ".join(interface_info[6:])
})
return interface_table

43
modules/librenms.py Normal file
View file

@ -0,0 +1,43 @@
import os
import time
import base64
import requests
from dotenv import load_dotenv
load_dotenv()
LIBRENMS_URL = os.getenv("LIBRENMS_URL", "https://nms.pixelhosting.nl")
_ports_cache = None
def get_librenms_ports():
global _ports_cache
if _ports_cache is None:
ports_str = os.getenv("LIBRENMS_PORTS", "")
ports = {}
if ports_str:
pairs = ports_str.split(",")
for pair in pairs:
if ":" in pair:
key, val = pair.split(":", 1)
ports[key.strip()] = val.strip()
_ports_cache = ports
return _ports_cache
def get_port_id(interface_name):
return get_librenms_ports().get(interface_name)
def get_timestamp_days_ago(days):
return int(time.time()) - (days * 86400)
def fetch_graph_base64(port_id, days_ago=None):
url = f"{LIBRENMS_URL}/graph.php?id={port_id}&type=port_bits&height=200&width=500"
if days_ago:
url += f"&from={get_timestamp_days_ago(days_ago)}"
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
return base64.b64encode(response.content).decode("utf-8")
except Exception as e:
print(f"[LibreNMS] Error fetching graph: {e}")
return None

21
modules/neighbors.py Normal file
View file

@ -0,0 +1,21 @@
def parse_neighbors_data(data):
neighbors_table = []
if "data" in data:
raw_data = data["data"]
for line in raw_data.split("\n"):
if line.strip() and not line.startswith("Address") and "---" not in line:
neighbors_info = line.split()
if len(neighbors_info) >= 4:
neighbors_table.append({
"address": neighbors_info[0],
"interface": neighbors_info[1],
"link_layer_address": neighbors_info[2],
"state": neighbors_info[3]
})
return neighbors_table
def generate_neighbors_json(neighbors_table):
return {"neighbors_table": neighbors_table}

58
modules/parse.py Normal file
View file

@ -0,0 +1,58 @@
import subprocess
import json
import os
from dotenv import load_dotenv
load_dotenv()
VYOS_API_URL = os.getenv("VYOS_API_URL")
VYOS_API_KEY = os.getenv("VYOS_API_KEY")
def run_bgp_curl_command():
curl_command = [
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
"--form", "data={\"op\": \"show\", \"path\": [\"bgp\", \"vrf\", \"bgp\", \"summ\"]}",
"--form", f"key={VYOS_API_KEY}"
]
response = subprocess.check_output(curl_command, text=True)
return json.loads(response)
def run_arp_curl_command():
curl_command = [
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
"--form", "data={\"op\": \"show\", \"path\": [\"arp\"]}",
"--form", f"key={VYOS_API_KEY}"
]
response = subprocess.check_output(curl_command, text=True)
return json.loads(response)
def run_neighbors_curl_command():
curl_command = [
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
"--form", "data={\"op\": \"show\", \"path\": [\"ipv6\", \"neighbors\"]}",
"--form", f"key={VYOS_API_KEY}"
]
response = subprocess.check_output(curl_command, text=True)
return json.loads(response)
def run_interfaces_curl_command():
curl_command = [
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
"--form", "data={\"op\": \"show\", \"path\": [\"interfaces\"]}",
"--form", f"key={VYOS_API_KEY}"
]
response = subprocess.check_output(curl_command, text=True)
return json.loads(response)
def run_bgp_route_curl_command(ip_version, bgprouteprefix):
data_json = {
"op": "show",
"path": ["bgp", "vrf", "bgp", ip_version, bgprouteprefix]
}
curl_command = [
"curl", "-k", "--location", "--request", "POST", f"{VYOS_API_URL}/show",
"--form", f"data={json.dumps(data_json)}",
"--form", f"key={VYOS_API_KEY}"
]
response = subprocess.check_output(curl_command, text=True)
return json.loads(response)