import json import subprocess import os import requests import re from jinja2 import Template from flask import Flask, render_template_string, jsonify, url_for, redirect, render_template, request, abort from datetime import datetime, timedelta app = Flask(__name__) from modules.parse import ( run_bgp_curl_command, run_arp_curl_command, run_neighbors_curl_command, run_interfaces_curl_command, run_bgp_route_curl_command, run_rpki_cache_connection_curl_command, run_rpki_lookup_curl_command, run_bgp_neighbor_detail_curl_command, run_bfd_peers_curl_command, run_bfd_peer_detail_curl_command, run_bgp_dampeningv4_curl_command, run_bgp_dampeningv6_curl_command,run_firewall_ipv4_curl_command, run_firewall_ipv6_curl_command, run_bgp_reset_command, run_bgp_shutdown_command, run_bgp_enable_command, run_firewall_log_curl_command, run_dhcpv6_leases_curl_command ) from modules.bgp import parse_bgp_data, generate_bgp_json from modules.arp import parse_arp_data, generate_arp_json from modules.neighbors import parse_neighbors_data, generate_neighbors_json from modules.interfaces import parse_interface_data from modules.akvorado import get_widget_data from modules.librenms import get_port_id, fetch_graph_base64 from modules.rpki import parse_rpki_cache_data, parse_rpki_lookup_data from modules.bfd import parse_bfd_peers_data from modules.bgp_dampening import parse_dampened_data, generate_dampened_json from modules.firewall import parse_firewall_data, parse_firewall_log_data from modules.visual_route import generate_visual_route_graph from modules.database import get_peer_history, get_total_routes_history, get_unique_peers from modules.dhcpv6 import parse_dhcpv6_leases_data, generate_dhcpv6_leases_json from modules.irr_tools import make_irr_api_request @app.context_processor def inject_global_vars(): return dict( hostname=os.getenv("HOSTNAME", "unknown"), vyos_version=os.getenv("VYOS_VERSION", "N/A") ) @app.route("/ping") def ping(): return "pong" @app.errorhandler(404) def page_not_found(e): return render_template("404.html"), 404 @app.route('/') def index(): bgp_image = os.getenv("BGP_TOOLS_IMAGE") return render_template('index.html', bgp_image=bgp_image) @app.route("/bgp") def bgp(): bgp_data = run_bgp_curl_command() ipv4_info, ipv4_peers, ipv6_info, ipv6_peers = parse_bgp_data(bgp_data) return render_template('bgp.html') @app.route("/bgp/json") def bgp_json(): bgp_data = run_bgp_curl_command() ipv4_info, ipv4_peers, ipv6_info, ipv6_peers = parse_bgp_data(bgp_data) bfd_data = run_bfd_peers_curl_command() bfd_peers = parse_bfd_peers_data(bfd_data) return jsonify(generate_bgp_json(ipv4_info, ipv4_peers, ipv6_info, ipv6_peers, bfd_peers)) @app.route('/bgp/action', methods=['POST']) def bgp_action(): data = request.json action = data.get('action') neighbor_ip = data.get('neighbor_ip') if not action or not neighbor_ip: return jsonify({"success": False, "error": "Missing action or neighbor_ip"}), 400 try: if action == 'hard_reset': result = run_bgp_reset_command(neighbor_ip, soft=False) elif action == 'soft_reset': result = run_bgp_reset_command(neighbor_ip, soft=True) elif action == 'shutdown': result = run_bgp_shutdown_command(neighbor_ip) elif action == 'enable': result = run_bgp_enable_command(neighbor_ip) else: return jsonify({"success": False, "error": "Invalid action"}), 400 if result.get("success"): return jsonify({"success": True, "message": f"Action '{action}' for neighbor {neighbor_ip} successful.", "details": result.get("data")}) else: return jsonify({"success": False, "error": f"Action '{action}' failed.", "details": result.get("error")}), 500 except Exception as e: return jsonify({"success": False, "error": str(e)}), 500 @app.route("/bgp/dampened") def bgp_dampened_page(): return render_template("bgp_dampened.html") @app.route("/bgp/dampened/json") def bgp_dampened_json(): try: ipv4_dampened_raw = run_bgp_dampeningv4_curl_command() ipv6_dampened_raw = run_bgp_dampeningv6_curl_command() ipv4_paths = parse_dampened_data(ipv4_dampened_raw, "ipv4") ipv6_paths = parse_dampened_data(ipv6_dampened_raw, "ipv6") return jsonify(generate_dampened_json(ipv4_paths, ipv6_paths)) except Exception as e: return jsonify({"error": "Failed to retrieve BGP dampened data.", "details": str(e)}), 500 @app.route('/bfd/peer/') def bfd_peer_detail(peer_ip): try: raw_data = run_bfd_peer_detail_curl_command(peer_ip) bfd_details = raw_data.get('data', 'Geen data gevonden voor deze BFD peer.') return render_template('bfd_peer_detail.html', peer_ip=peer_ip, bfd_data=bfd_details) except Exception as e: error_message = f"Kon de BFD-details niet ophalen: {e}" return render_template('bfd_peer_detail.html', peer_ip=peer_ip, error=error_message) @app.route("/arp") def arp(): return render_template("arp.html") @app.route("/arp/json") def arp_json(): arp_data = run_arp_curl_command() arp_table = parse_arp_data(arp_data) return jsonify(generate_arp_json(arp_table)) @app.route("/neighbors") def neighbors(): return render_template("neighbors.html") @app.route("/neighbors/json") def neighborsp_json(): neighbors_data = run_neighbors_curl_command() neighbors_table = parse_neighbors_data(neighbors_data) return jsonify(generate_neighbors_json(neighbors_table)) @app.route('/bgp/neighbor//') def bgp_neighbor_detail(ip_version, neighbor_ip): if ip_version not in ['ipv4', 'ipv6']: abort(404) try: raw_data = run_bgp_neighbor_detail_curl_command(ip_version, neighbor_ip) neighbor_details = raw_data.get('data', 'No data found for this neighbor.') return render_template('bgp_neighbor_detail.html', neighbor_ip=neighbor_ip, neighbor_data=neighbor_details) except Exception as e: error_message = f"Unable to retrieve details: {e}" return render_template('bgp_neighbor_detail.html', neighbor_ip=neighbor_ip, error=error_message) @app.route('/interfaces') def interface_table_page(): return render_template("interfaces.html") @app.route('/interfaces/json') def interface_table_summary_json(): data = run_interfaces_curl_command() interface_table = parse_interface_data(data) return jsonify({"interface_table": interface_table}) @app.route('/bgp-route') def bgp_route_page(): return render_template("bgp-route.html") @app.route('/bgp-route/lookup', methods=['POST']) def bgp_route_lookup(): data = request.json ip_version = data.get('ip_version') bgprouteprefix = data.get('bgprouteprefix') if not ip_version or not bgprouteprefix: return jsonify({"error": "ip_version and bgprouteprefix are required"}), 400 try: result = run_bgp_route_curl_command(ip_version, bgprouteprefix) return jsonify(result) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/rpki') def rpki_page(): return render_template("rpki.html") @app.route('/rpki/status') def rpki_status(): try: raw_result = run_rpki_cache_connection_curl_command() parsed_data = parse_rpki_cache_data(raw_result) return jsonify(parsed_data) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/rpki/lookup', methods=['POST']) def rpki_lookup(): data = request.json query = data.get('query') if not query: return jsonify({"error": "Query is required"}), 400 try: if re.match(r'^\d+$', query): result = run_rpki_lookup_curl_command("as-number", query) elif '/' in query: result = run_rpki_lookup_curl_command("prefix", query) else: return jsonify({"error": "Invalid query format. Use AS number or prefix."}), 400 parsed_result = parse_rpki_lookup_data(result) return jsonify(parsed_result) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/stats') def stats_page(): interface_name = os.getenv("LIBRENMS_MAIN_PORT", "dummy") port_id = get_port_id(interface_name) if not port_id: return render_template("stats.html", interface_name=interface_name, error=True) daily = fetch_graph_base64(port_id) return render_template( "stats.html", interface_name=interface_name, daily=daily, error=False ) @app.route("/stats/src-as") def stats_srcas_json(): return jsonify(get_widget_data("top/src-as")) @app.route("/stats/src-ports") def stats_srcport_json(): return jsonify(get_widget_data("top/src-port")) @app.route("/stats/protocol") def stats_protocol_json(): return jsonify(get_widget_data("top/protocol")) @app.route("/stats/src-country") def stats_srccountry_json(): return jsonify(get_widget_data("top/src-country")) @app.route("/stats/etype") def stats_etype_json(): return jsonify(get_widget_data("top/etype")) @app.route("/stats/graph") def stats_graph_json(): return jsonify(get_widget_data("graph")) @app.route("/stats/flow-rate") def stats_flow_rate_json(): return jsonify(get_widget_data("flow-rate")) @app.route("/stats/exporters") def stats_exporters_json(): return jsonify(get_widget_data("exporters")) @app.route("/port/") def graph_page(interface_name): port_id = get_port_id(interface_name) if not port_id: return render_template( "port.html", interface_name=interface_name, error=True ) daily = fetch_graph_base64(port_id) weekly = fetch_graph_base64(port_id, days_ago=7) monthly = fetch_graph_base64(port_id, days_ago=28) return render_template( "port.html", interface_name=interface_name, daily=daily, weekly=weekly, monthly=monthly, error=False ) @app.route('/visual-route') def visual_route_page(): return render_template("visual-route.html") @app.route('/visual-route/graph', methods=['POST']) def visual_route_graph(): ip_address_str = request.json.get('ip_address') graph_data = generate_visual_route_graph(ip_address_str) if "error" in graph_data: return jsonify(graph_data), 400 return jsonify(graph_data) @app.route('/firewall') def firewall_page(): return render_template("firewall.html") @app.route('/firewall/json') def firewall_json(): try: ipv4_raw = run_firewall_ipv4_curl_command() ipv6_raw = run_firewall_ipv6_curl_command() ipv4_rulesets = parse_firewall_data(ipv4_raw) ipv6_rulesets = parse_firewall_data(ipv6_raw) return jsonify({"ipv4": ipv4_rulesets, "ipv6": ipv6_rulesets}) except Exception as e: return jsonify({"error": "Failed to retrieve firewall data.", "details": str(e)}), 500 @app.route('/firewall/log///') def firewall_log_page(ip_version, ruleset_name, rule_number): try: raw_data = run_firewall_log_curl_command(ip_version, ruleset_name, rule_number) log_data = parse_firewall_log_data(raw_data) except Exception as e: log_data = f"Could not retrieve the logs: {e}" return render_template('firewall_log.html', ip_version=ip_version, ruleset_name=ruleset_name, rule_number=rule_number, log_data=log_data) @app.route('/bgp/peer///graph') def bgp_peer_graph_page(ip_version, neighbor_ip): if ip_version not in ['ipv4', 'ipv6']: abort(404) return render_template('bgp_peer_graph.html', neighbor_ip=neighbor_ip, ip_version=ip_version) @app.route('/bgp/peer//history') def bgp_peer_history_data(neighbor_ip): start_date_str = request.args.get('start_date') end_date_str = request.args.get('end_date') aggregation_interval_minutes = 10 if start_date_str and end_date_str: start_date = datetime.fromisoformat(start_date_str) end_date = datetime.fromisoformat(end_date_str) duration_days = (end_date - start_date).total_seconds() / 86400 if duration_days > 30: aggregation_interval_minutes = 120 elif duration_days > 7: aggregation_interval_minutes = 60 elif duration_days > 2: aggregation_interval_minutes = 30 else: time_range = request.args.get('range', '24h') end_date = datetime.utcnow() if time_range == '7d': start_date = end_date - timedelta(days=7) aggregation_interval_minutes = 30 elif time_range == '30d': start_date = end_date - timedelta(days=30) aggregation_interval_minutes = 60 elif time_range == '90d': start_date = end_date - timedelta(days=90) aggregation_interval_minutes = 120 else: start_date = end_date - timedelta(hours=24) history_data = get_peer_history(neighbor_ip, start_date.isoformat(), end_date.isoformat(), aggregation_interval_minutes) labels = [item['timestamp'] for item in history_data] received_data = [item['prefixes_received'] for item in history_data] sent_data = [item['prefixes_sent'] for item in history_data] return jsonify({ "labels": labels, "received": received_data, "sent": sent_data }) @app.route('/bgp/history') def history_page(): peers = get_unique_peers() return render_template('history.html', peers=peers) @app.route('/history/api/total-routes') def total_routes_history_data(): start_date_str = request.args.get('start_date') end_date_str = request.args.get('end_date') aggregation_interval_minutes = 10 if start_date_str and end_date_str: start_date = datetime.fromisoformat(start_date_str) end_date = datetime.fromisoformat(end_date_str) duration_days = (end_date - start_date).total_seconds() / 86400 if duration_days > 30: aggregation_interval_minutes = 120 elif duration_days > 7: aggregation_interval_minutes = 60 elif duration_days > 2: aggregation_interval_minutes = 30 else: time_range = request.args.get('range', '24h') end_date = datetime.utcnow() if time_range == '7d': start_date = end_date - timedelta(days=7) aggregation_interval_minutes = 30 elif time_range == '30d': start_date = end_date - timedelta(days=30) aggregation_interval_minutes = 60 elif time_range == '90d': start_date = end_date - timedelta(days=90) aggregation_interval_minutes = 120 else: start_date = end_date - timedelta(hours=24) history_data = get_total_routes_history(start_date.isoformat(), end_date.isoformat(), aggregation_interval_minutes) response_data = { "labels": sorted(list(set([item['timestamp'] for item in history_data]))), "ipv4_routes": [], "ipv6_routes": [] } ipv4_map = {item['timestamp']: item['total_routes'] for item in history_data if item['ip_version'] == 'ipv4'} ipv6_map = {item['timestamp']: item['total_routes'] for item in history_data if item['ip_version'] == 'ipv6'} for label in response_data["labels"]: response_data["ipv4_routes"].append(ipv4_map.get(label, None)) response_data["ipv6_routes"].append(ipv6_map.get(label, None)) return jsonify(response_data) @app.route("/dhcpv6-leases") def dhcpv6_leases_page(): pool_names_str = os.getenv("DHCPV6_PD_POOLS") if not pool_names_str: return render_template("dhcpv6_leases.html", error="No DHCPv6 PD pools configured in .env (DHCPV6_PD_POOLS).", pool_names=[], selected_pool=None) pool_names = [pool.strip() for pool in pool_names_str.split(',') if pool.strip()] if not pool_names: return render_template("dhcpv6_leases.html", error="No DHCPv6 PD pools configured in .env (DHCPV6_PD_POOLS).", pool_names=[], selected_pool=None) selected_pool = pool_names[0] return render_template("dhcpv6_leases.html", pool_names=pool_names, selected_pool=selected_pool) @app.route("/dhcpv6-leases/json") def dhcpv6_leases_json(): pool_name = request.args.get('pool') if not pool_name: return jsonify({"error": "Query parameter 'pool' is missing"}), 400 pool_names_str = os.getenv("DHCPV6_PD_POOLS", "") pool_names = [pool.strip() for pool in pool_names_str.split(',') if pool.strip()] if pool_name not in pool_names: return jsonify({"error": "Invalid pool name"}), 403 try: leases_data = run_dhcpv6_leases_curl_command(pool_name) leases_table = parse_dhcpv6_leases_data(leases_data) return jsonify(generate_dhcpv6_leases_json(leases_table)) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/irr-tools') def irr_tools_page(): data = make_irr_api_request('GET', '/asns') content = data.get('content', '') error = data.get('error') return render_template('irr_tools.html', asns_content=content, error=error) @app.route('/irr-tools/save', methods=['POST']) def irr_tools_save(): content = request.json.get('content') result = make_irr_api_request('POST', '/asns', json_data={"content": content}) return jsonify(result) @app.route('/irr-tools/run', methods=['POST']) def irr_tools_run(): result = make_irr_api_request('POST', '/run-update') return jsonify(result) @app.route('/irr-tools/log', methods=['GET']) def irr_tools_log(): result = make_irr_api_request('GET', '/log') return jsonify(result)