import os import ipaddress import json import re import subprocess import statistics import socket from flask import Flask, render_template, jsonify, request, Response from dotenv import load_dotenv from modules.network_tools import execute_command_streaming from modules.visual_route import generate_visual_route_graph, get_raw_bgp_route from modules.mtr_parser import MtrParser from modules.parse import run_bgp_route_curl_command load_dotenv() app = Flask(__name__) FQDN_REGEX = re.compile(r'^(?!-)(?:[a-zA-Z0-9-]{0,62}[a-zA-Z0-9]\.){1,126}(?!-d$)[a-zA-Z0-9-]{2,63}$') MAX_TARGET_LENGTH = 255 def validate_target(target: str) -> str | None: if not target: return "A target is required." if len(target) > MAX_TARGET_LENGTH: return f"Target exceeds maximum length of {MAX_TARGET_LENGTH} characters." if target.startswith('-') and '.' not in target: return "Invalid target. Input resembles a command-line option." try: ip = ipaddress.ip_address(target) if ip.is_private or ip.is_loopback or ip.is_multicast or ip.is_unspecified: return "Target IP address is in a reserved or private range and is not allowed." return None except ValueError: pass if not FQDN_REGEX.match(target): return "Invalid input. Please provide a valid public IP address or FQDN." return None def get_all_locations_merged(): public_locations = json.loads(os.getenv('PUBLIC_LOCATIONS', '{}')) backend_locations = json.loads(os.getenv('BACKEND_LOCATIONS', '{}')) merged_locations = public_locations.copy() for name, backend_config in backend_locations.items(): if name in merged_locations: merged_locations[name].update(backend_config) else: merged_locations[name] = backend_config return merged_locations def get_ip_version_from_ip(target): try: ip = ipaddress.ip_address(target) return f"ipv{ip.version}" except ValueError: return 'ipv4' @app.route('/') def index(): return render_template('looking_glass.html') @app.route('/api/locations') def get_locations(): locations_str = os.getenv('LOCATIONS', '{}') try: locations = json.loads(locations_str) sensitive_keys = {"vyos_api_url", "vyos_api_key", "bgp_vrf_name"} public_locations = {} for name, config in locations.items(): public_config = {k: v for k, v in config.items() if k not in sensitive_keys} public_locations[name] = public_config config_data = { "locations": public_locations, "client_ip_api": { "v4": os.getenv('CLIENT_IPV4_API_URL'), "v6": os.getenv('CLIENT_IPV6_API_URL') } } return jsonify(config_data) except json.JSONDecodeError: return jsonify({"error": "Invalid LOCATIONS format in .env file"}), 500 @app.route('/api/execute', methods=['POST']) def execute_command(): data = request.json method = data.get('method') target = data.get('target', '').strip() error = validate_target(target) if error: return jsonify({"error": error}), 400 return Response(execute_command_streaming(method, target), mimetype='text/plain') @app.route('/api/bgp_raw_lookup', methods=['POST']) def bgp_raw_lookup(): data = request.json target = data.get('target', '').strip() location_name = data.get('location') locations = get_all_locations_merged() location_config = locations.get(location_name, {}) data, error = get_raw_bgp_route(target, location_config, bgp_lookup_func=run_bgp_route_curl_command) if error: return jsonify({"error": error}), 400 return Response(data, mimetype='text/plain') @app.route('/api/visualize', methods=['POST']) def visualize_route(): data = request.json ip_address_str = data.get('ip_address', '').strip() location_name = data.get('location') locations = get_all_locations_merged() location_config = locations.get(location_name, {}) graph_data = generate_visual_route_graph( ip_address_str, location_config, bgp_lookup_func=run_bgp_route_curl_command ) if "error" in graph_data: return jsonify(graph_data), 400 return jsonify(graph_data)