225 lines
No EOL
6.4 KiB
Python
225 lines
No EOL
6.4 KiB
Python
import json
|
|
import subprocess
|
|
import os
|
|
import requests
|
|
import re
|
|
from jinja2 import Template
|
|
from flask import Flask, render_template_string, jsonify, url_for, redirect, render_template, request, abort
|
|
|
|
app = Flask(__name__)
|
|
|
|
from modules.parse import run_bgp_curl_command, run_arp_curl_command, run_neighbors_curl_command, run_interfaces_curl_command, run_bgp_route_curl_command, run_rpki_cache_connection_curl_command, run_rpki_lookup_curl_command
|
|
|
|
from modules.bgp import parse_bgp_data, generate_bgp_json
|
|
|
|
from modules.arp import parse_arp_data, generate_arp_json
|
|
|
|
from modules.neighbors import parse_neighbors_data, generate_neighbors_json
|
|
|
|
from modules.interfaces import parse_interface_data
|
|
|
|
from modules.akvorado import get_widget_data
|
|
|
|
from modules.librenms import get_port_id, fetch_graph_base64
|
|
|
|
from modules.rpki import parse_rpki_cache_data, parse_rpki_lookup_data
|
|
|
|
from modules.visual_route import generate_visual_route_graph
|
|
|
|
@app.context_processor
|
|
def inject_hostname():
|
|
return dict(hostname=os.getenv("HOSTNAME", "unknown"))
|
|
|
|
@app.route("/ping")
|
|
def ping():
|
|
return "pong"
|
|
|
|
@app.errorhandler(404)
|
|
def page_not_found(e):
|
|
return render_template("404.html"), 404
|
|
|
|
@app.route('/')
|
|
def index():
|
|
bgp_image = os.getenv("BGP_TOOLS_IMAGE")
|
|
return render_template('index.html', bgp_image=bgp_image)
|
|
|
|
@app.route("/bgp")
|
|
def bgp():
|
|
bgp_data = run_bgp_curl_command()
|
|
ipv4_info, ipv4_peers, ipv6_info, ipv6_peers = parse_bgp_data(bgp_data)
|
|
return render_template('bgp.html')
|
|
|
|
@app.route("/bgp/json")
|
|
def bgp_json():
|
|
bgp_data = run_bgp_curl_command()
|
|
ipv4_info, ipv4_peers, ipv6_info, ipv6_peers = parse_bgp_data(bgp_data)
|
|
return jsonify(generate_bgp_json(ipv4_info, ipv4_peers, ipv6_info, ipv6_peers))
|
|
|
|
@app.route("/arp")
|
|
def arp():
|
|
return render_template("arp.html")
|
|
|
|
@app.route("/arp/json")
|
|
def arp_json():
|
|
arp_data = run_arp_curl_command()
|
|
arp_table = parse_arp_data(arp_data)
|
|
return jsonify(generate_arp_json(arp_table))
|
|
|
|
@app.route("/neighbors")
|
|
def neighbors():
|
|
return render_template("neighbors.html")
|
|
|
|
@app.route("/neighbors/json")
|
|
def neighborsp_json():
|
|
neighbors_data = run_neighbors_curl_command()
|
|
neighbors_table = parse_neighbors_data(neighbors_data)
|
|
return jsonify(generate_neighbors_json(neighbors_table))
|
|
|
|
@app.route('/interfaces')
|
|
def interface_table_page():
|
|
return render_template("interfaces.html")
|
|
|
|
@app.route('/interfaces/json')
|
|
def interface_table_summary_json():
|
|
data = run_interfaces_curl_command()
|
|
interface_table = parse_interface_data(data)
|
|
return jsonify({"interface_table": interface_table})
|
|
|
|
@app.route('/bgp-route')
|
|
def bgp_route_page():
|
|
return render_template("bgp-route.html")
|
|
|
|
@app.route('/bgp-route/lookup', methods=['POST'])
|
|
def bgp_route_lookup():
|
|
data = request.json
|
|
ip_version = data.get('ip_version')
|
|
bgprouteprefix = data.get('bgprouteprefix')
|
|
|
|
if not ip_version or not bgprouteprefix:
|
|
return jsonify({"error": "ip_version and bgprouteprefix are required"}), 400
|
|
|
|
try:
|
|
result = run_bgp_route_curl_command(ip_version, bgprouteprefix)
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route('/rpki')
|
|
def rpki_page():
|
|
return render_template("rpki.html")
|
|
|
|
@app.route('/rpki/status')
|
|
def rpki_status():
|
|
try:
|
|
raw_result = run_rpki_cache_connection_curl_command()
|
|
parsed_data = parse_rpki_cache_data(raw_result)
|
|
return jsonify(parsed_data)
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route('/rpki/lookup', methods=['POST'])
|
|
def rpki_lookup():
|
|
data = request.json
|
|
query = data.get('query')
|
|
|
|
if not query:
|
|
return jsonify({"error": "Query is required"}), 400
|
|
|
|
try:
|
|
if re.match(r'^\d+$', query):
|
|
result = run_rpki_lookup_curl_command("as-number", query)
|
|
elif '/' in query:
|
|
result = run_rpki_lookup_curl_command("prefix", query)
|
|
else:
|
|
return jsonify({"error": "Invalid query format. Use AS number or prefix."}), 400
|
|
|
|
parsed_result = parse_rpki_lookup_data(result)
|
|
return jsonify(parsed_result)
|
|
|
|
except Exception as e:
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
@app.route('/stats')
|
|
def stats_page():
|
|
interface_name = os.getenv("LIBRENMS_MAIN_PORT", "dummy")
|
|
port_id = get_port_id(interface_name)
|
|
if not port_id:
|
|
return render_template("stats.html", interface_name=interface_name, error=True)
|
|
|
|
daily = fetch_graph_base64(port_id)
|
|
return render_template(
|
|
"stats.html",
|
|
interface_name=interface_name,
|
|
daily=daily,
|
|
error=False
|
|
)
|
|
|
|
@app.route("/stats/src-as")
|
|
def stats_srcas_json():
|
|
return jsonify(get_widget_data("top/src-as"))
|
|
|
|
@app.route("/stats/src-ports")
|
|
def stats_srcport_json():
|
|
return jsonify(get_widget_data("top/src-port"))
|
|
|
|
@app.route("/stats/protocol")
|
|
def stats_protocol_json():
|
|
return jsonify(get_widget_data("top/protocol"))
|
|
|
|
@app.route("/stats/src-country")
|
|
def stats_srccountry_json():
|
|
return jsonify(get_widget_data("top/src-country"))
|
|
|
|
@app.route("/stats/etype")
|
|
def stats_etype_json():
|
|
return jsonify(get_widget_data("top/etype"))
|
|
|
|
@app.route("/stats/graph")
|
|
def stats_graph_json():
|
|
return jsonify(get_widget_data("graph"))
|
|
|
|
@app.route("/stats/flow-rate")
|
|
def stats_flow_rate_json():
|
|
return jsonify(get_widget_data("flow-rate"))
|
|
|
|
@app.route("/stats/exporters")
|
|
def stats_exporters_json():
|
|
return jsonify(get_widget_data("exporters"))
|
|
|
|
@app.route("/port/<interface_name>")
|
|
def graph_page(interface_name):
|
|
port_id = get_port_id(interface_name)
|
|
if not port_id:
|
|
return render_template(
|
|
"port.html",
|
|
interface_name=interface_name,
|
|
error=True
|
|
)
|
|
|
|
daily = fetch_graph_base64(port_id)
|
|
weekly = fetch_graph_base64(port_id, days_ago=7)
|
|
monthly = fetch_graph_base64(port_id, days_ago=28)
|
|
|
|
return render_template(
|
|
"port.html",
|
|
interface_name=interface_name,
|
|
daily=daily,
|
|
weekly=weekly,
|
|
monthly=monthly,
|
|
error=False
|
|
)
|
|
|
|
@app.route('/visual-route')
|
|
def visual_route_page():
|
|
return render_template("visual-route.html")
|
|
|
|
@app.route('/visual-route/graph', methods=['POST'])
|
|
def visual_route_graph():
|
|
ip_address_str = request.json.get('ip_address')
|
|
|
|
graph_data = generate_visual_route_graph(ip_address_str)
|
|
|
|
if "error" in graph_data:
|
|
return jsonify(graph_data), 400
|
|
|
|
return jsonify(graph_data) |