Update app.py

This commit is contained in:
Blackwhitebear8 2025-10-17 20:09:00 +02:00
parent e303af457a
commit 8d98b58c37

95
app.py
View file

@ -9,7 +9,7 @@ from datetime import datetime, timedelta
app = Flask(__name__)
from modules.parse import run_bgp_curl_command, run_arp_curl_command, run_neighbors_curl_command, run_interfaces_curl_command, run_bgp_route_curl_command, run_rpki_cache_connection_curl_command, run_rpki_lookup_curl_command, run_bgp_neighbor_detail_curl_command, run_bfd_peers_curl_command, run_bfd_peer_detail_curl_command, run_bgp_dampeningv4_curl_command, run_bgp_dampeningv6_curl_command,run_firewall_ipv4_curl_command, run_firewall_ipv6_curl_command, run_bgp_reset_command, run_bgp_shutdown_command, run_bgp_enable_command, run_firewall_log_curl_command
from modules.parse import run_bgp_curl_command, run_arp_curl_command, run_neighbors_curl_command, run_interfaces_curl_command, run_bgp_route_curl_command, run_rpki_cache_connection_curl_command, run_rpki_lookup_curl_command, run_bgp_neighbor_detail_curl_command, run_bfd_peers_curl_command, run_bfd_peer_detail_curl_command, run_bgp_dampeningv4_curl_command, run_bgp_dampeningv6_curl_command,run_firewall_ipv4_curl_command, run_firewall_ipv6_curl_command, run_bgp_reset_command, run_bgp_shutdown_command, run_bgp_enable_command, run_firewall_log_curl_command, run_dhcpv6_leases_curl_command
from modules.bgp import parse_bgp_data, generate_bgp_json
from modules.arp import parse_arp_data, generate_arp_json
from modules.neighbors import parse_neighbors_data, generate_neighbors_json
@ -22,10 +22,14 @@ from modules.bgp_dampening import parse_dampened_data, generate_dampened_json
from modules.firewall import parse_firewall_data, parse_firewall_log_data
from modules.visual_route import generate_visual_route_graph
from modules.database import get_peer_history, get_total_routes_history, get_unique_peers
from modules.dhcpv6 import parse_dhcpv6_leases_data, generate_dhcpv6_leases_json
@app.context_processor
def inject_hostname():
return dict(hostname=os.getenv("HOSTNAME", "unknown"))
def inject_global_vars():
return dict(
hostname=os.getenv("HOSTNAME", "unknown"),
vyos_version=os.getenv("VYOS_VERSION", "N/A")
)
@app.route("/ping")
def ping():
@ -338,17 +342,23 @@ def bgp_peer_graph_page(ip_version, neighbor_ip):
@app.route('/bgp/peer/<path:neighbor_ip>/history')
def bgp_peer_history_data(neighbor_ip):
time_range = request.args.get('range', '24h')
end_date = datetime.utcnow()
if time_range == '7d':
start_date = end_date - timedelta(days=7)
elif time_range == '30d':
start_date = end_date - timedelta(days=30)
elif time_range == '90d':
start_date = end_date - timedelta(days=90)
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
if start_date_str and end_date_str:
start_date = datetime.fromisoformat(start_date_str)
end_date = datetime.fromisoformat(end_date_str)
else:
start_date = end_date - timedelta(hours=24)
time_range = request.args.get('range', '24h')
end_date = datetime.utcnow()
if time_range == '7d':
start_date = end_date - timedelta(days=7)
elif time_range == '30d':
start_date = end_date - timedelta(days=30)
elif time_range == '90d':
start_date = end_date - timedelta(days=90)
else:
start_date = end_date - timedelta(hours=24)
history_data = get_peer_history(neighbor_ip, start_date.isoformat(), end_date.isoformat())
@ -369,15 +379,23 @@ def history_page():
@app.route('/history/api/total-routes')
def total_routes_history_data():
time_range = request.args.get('range', '24h')
end_date = datetime.utcnow()
if time_range == '7d':
start_date = end_date - timedelta(days=7)
elif time_range == '30d':
start_date = end_date - timedelta(days=30)
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
if start_date_str and end_date_str:
start_date = datetime.fromisoformat(start_date_str)
end_date = datetime.fromisoformat(end_date_str)
else:
start_date = end_date - timedelta(hours=24)
time_range = request.args.get('range', '24h')
end_date = datetime.utcnow()
if time_range == '7d':
start_date = end_date - timedelta(days=7)
elif time_range == '30d':
start_date = end_date - timedelta(days=30)
elif time_range == '90d':
start_date = end_date - timedelta(days=90)
else: # Default to 24h
start_date = end_date - timedelta(hours=24)
history_data = get_total_routes_history(start_date.isoformat(), end_date.isoformat())
@ -394,4 +412,37 @@ def total_routes_history_data():
response_data["ipv4_routes"].append(ipv4_map.get(label, None))
response_data["ipv6_routes"].append(ipv6_map.get(label, None))
return jsonify(response_data)
return jsonify(response_data)
@app.route("/dhcpv6-leases")
def dhcpv6_leases_page():
pool_names_str = os.getenv("DHCPV6_PD_POOLS")
if not pool_names_str:
return render_template("dhcpv6_leases.html", error="No DHCPv6 PD pools configured in .env (DHCPV6_PD_POOLS).", pool_names=[], selected_pool=None)
pool_names = [pool.strip() for pool in pool_names_str.split(',') if pool.strip()]
if not pool_names:
return render_template("dhcpv6_leases.html", error="No DHCPv6 PD pools configured in .env (DHCPV6_PD_POOLS).", pool_names=[], selected_pool=None)
selected_pool = pool_names[0]
return render_template("dhcpv6_leases.html", pool_names=pool_names, selected_pool=selected_pool)
@app.route("/dhcpv6-leases/json")
def dhcpv6_leases_json():
pool_name = request.args.get('pool')
if not pool_name:
return jsonify({"error": "Query parameter 'pool' is missing"}), 400
pool_names_str = os.getenv("DHCPV6_PD_POOLS", "")
pool_names = [pool.strip() for pool in pool_names_str.split(',') if pool.strip()]
if pool_name not in pool_names:
return jsonify({"error": "Invalid pool name"}), 403
try:
leases_data = run_dhcpv6_leases_curl_command(pool_name)
leases_table = parse_dhcpv6_leases_data(leases_data)
return jsonify(generate_dhcpv6_leases_json(leases_table))
except Exception as e:
return jsonify({"error": str(e)}), 500