From b2af65d0f9ab863d29a4ff6e76d1d3b897af1fb0 Mon Sep 17 00:00:00 2001 From: Blackwhitebear8 Date: Thu, 14 Aug 2025 17:52:24 +0200 Subject: [PATCH] Update app.py --- app.py | 123 ++++++++++++++++++++++++++++++++------------------------- 1 file changed, 69 insertions(+), 54 deletions(-) diff --git a/app.py b/app.py index fe14f28..979238b 100644 --- a/app.py +++ b/app.py @@ -2,11 +2,16 @@ import os import ipaddress import json import re +import subprocess +import statistics +import socket from flask import Flask, render_template, jsonify, request, Response from dotenv import load_dotenv from modules.network_tools import execute_command_streaming from modules.visual_route import generate_visual_route_graph, get_raw_bgp_route +from modules.mtr_parser import MtrParser +from modules.parse import run_bgp_route_curl_command load_dotenv() @@ -15,19 +20,37 @@ app = Flask(__name__) FQDN_REGEX = re.compile(r'^(?!-)(?:[a-zA-Z0-9-]{0,62}[a-zA-Z0-9]\.){1,126}(?!-d$)[a-zA-Z0-9-]{2,63}$') MAX_TARGET_LENGTH = 255 -def is_valid_ip(target: str) -> bool: - try: - ipaddress.ip_address(target) - return True - except ValueError: - return False +def validate_target(target: str) -> str | None: + if not target: + return "A target is required." + if len(target) > MAX_TARGET_LENGTH: + return f"Target exceeds maximum length of {MAX_TARGET_LENGTH} characters." + if target.startswith('-') and '.' not in target: + return "Invalid target. Input resembles a command-line option." -def is_valid_hostname(hostname: str) -> bool: - if len(hostname) > MAX_TARGET_LENGTH: - return False - if is_valid_ip(hostname): - return False - return FQDN_REGEX.match(hostname) is not None + try: + ip = ipaddress.ip_address(target) + if ip.is_private or ip.is_loopback or ip.is_multicast or ip.is_unspecified: + return "Target IP address is in a reserved or private range and is not allowed." + return None + except ValueError: + pass + + if not FQDN_REGEX.match(target): + return "Invalid input. Please provide a valid public IP address or FQDN." + + return None + +def get_all_locations_merged(): + public_locations = json.loads(os.getenv('PUBLIC_LOCATIONS', '{}')) + backend_locations = json.loads(os.getenv('BACKEND_LOCATIONS', '{}')) + merged_locations = public_locations.copy() + for name, backend_config in backend_locations.items(): + if name in merged_locations: + merged_locations[name].update(backend_config) + else: + merged_locations[name] = backend_config + return merged_locations def get_ip_version_from_ip(target): try: @@ -36,14 +59,6 @@ def get_ip_version_from_ip(target): except ValueError: return 'ipv4' -@app.route("/ping") -def ping(): - return "pong" - -@app.errorhandler(404) -def page_not_found(e): - return render_template("404.html"), 404 - @app.route('/') def index(): return render_template('looking_glass.html') @@ -53,8 +68,15 @@ def get_locations(): locations_str = os.getenv('LOCATIONS', '{}') try: locations = json.loads(locations_str) + + sensitive_keys = {"vyos_api_url", "vyos_api_key", "bgp_vrf_name"} + public_locations = {} + for name, config in locations.items(): + public_config = {k: v for k, v in config.items() if k not in sensitive_keys} + public_locations[name] = public_config + config_data = { - "locations": locations, + "locations": public_locations, "client_ip_api": { "v4": os.getenv('CLIENT_IPV4_API_URL'), "v6": os.getenv('CLIENT_IPV6_API_URL') @@ -64,57 +86,50 @@ def get_locations(): except json.JSONDecodeError: return jsonify({"error": "Invalid LOCATIONS format in .env file"}), 500 -@app.route('/api/client-ip') -def client_ip(): - client_ip_addr = request.headers.get('X-Forwarded-for', request.remote_addr) - if client_ip_addr and client_ip_addr.startswith('::ffff:'): - client_ip_addr = client_ip_addr[7:] - return jsonify({'ip': client_ip_addr}) - - @app.route('/api/execute', methods=['POST']) def execute_command(): data = request.json method = data.get('method') target = data.get('target', '').strip() - if not target: - return jsonify({"error": "A target is required."}) - if len(target) > MAX_TARGET_LENGTH: - return jsonify({"error": f"Target exceeds maximum length of {MAX_TARGET_LENGTH} characters."}) - if target.startswith('-'): - return jsonify({"error": "Target cannot start with a hyphen."}) - - if not (is_valid_ip(target) or is_valid_hostname(target)): - return jsonify({"error": "Invalid input. Please provide a valid IP address or a fully qualified domain name."}) - - version = get_ip_version_from_ip(target) + error = validate_target(target) + if error: + return jsonify({"error": error}), 400 - if version == 'ipv6' and method in ['ping', 'mtr', 'traceroute']: - method += '6' - return Response(execute_command_streaming(method, target), mimetype='text/plain') @app.route('/api/bgp_raw_lookup', methods=['POST']) def bgp_raw_lookup(): - target = request.json.get('target', '').strip() - data, error = get_raw_bgp_route(target) + data = request.json + target = data.get('target', '').strip() + location_name = data.get('location') + + locations = get_all_locations_merged() + location_config = locations.get(location_name, {}) + + data, error = get_raw_bgp_route(target, location_config, bgp_lookup_func=run_bgp_route_curl_command) if error: - return jsonify({"error": error}) + return jsonify({"error": error}), 400 return Response(data, mimetype='text/plain') @app.route('/api/visualize', methods=['POST']) def visualize_route(): - ip_address_str = request.json.get('ip_address', '').strip() - graph_data = generate_visual_route_graph(ip_address_str) + data = request.json + ip_address_str = data.get('ip_address', '').strip() + location_name = data.get('location') + + locations = get_all_locations_merged() + location_config = locations.get(location_name, {}) + + graph_data = generate_visual_route_graph( + ip_address_str, + location_config, + bgp_lookup_func=run_bgp_route_curl_command + ) if "error" in graph_data: - return jsonify(graph_data) + return jsonify(graph_data), 400 - return jsonify(graph_data) - - -if __name__ == '__main__': - app.run(host='0.0.0.0', port=5008, debug=True) \ No newline at end of file + return jsonify(graph_data) \ No newline at end of file