import os import ipaddress import json import re from flask import Flask, render_template, jsonify, request, Response from dotenv import load_dotenv from modules.network_tools import execute_command_streaming from modules.visual_route import generate_visual_route_graph load_dotenv() app = Flask(__name__) FQDN_REGEX = re.compile(r'^(?!-)(?:[a-zA-Z0-9-]{0,62}[a-zA-Z0-9]\.){1,126}(?!-d$)[a-zA-Z0-9-]{2,63}$') MAX_TARGET_LENGTH = 255 def is_valid_ip_or_prefix(target: str) -> bool: try: if '/' in target: ipaddress.ip_network(target, strict=False) else: ipaddress.ip_address(target) return True except ValueError: return False def is_valid_hostname(hostname: str) -> bool: if len(hostname) > MAX_TARGET_LENGTH: return False if is_valid_ip_or_prefix(hostname): return False return FQDN_REGEX.match(hostname) is not None def get_ip_version(target): try: ip = ipaddress.ip_address(target) return f"ipv{ip.version}" except ValueError: try: net = ipaddress.ip_network(target, strict=False) return f"ipv{net.version}" except ValueError: return 'ipv4' @app.route('/') def index(): return render_template('looking_glass.html') @app.route('/api/locations') def get_locations(): locations_str = os.getenv('LOCATIONS', '{}') try: locations = json.loads(locations_str) config_data = { "locations": locations, "client_ip_api": { "v4": os.getenv('CLIENT_IPV4_API_URL'), "v6": os.getenv('CLIENT_IPV6_API_URL') } } return jsonify(config_data) except json.JSONDecodeError: return jsonify({"error": "Invalid LOCATIONS format in .env file"}), 500 @app.route('/api/client-ip') def client_ip(): client_ip_addr = request.headers.get('X-Forwarded-for', request.remote_addr) if client_ip_addr and client_ip_addr.startswith('::ffff:'): client_ip_addr = client_ip_addr[7:] return jsonify({'ip': client_ip_addr}) @app.route('/api/execute', methods=['POST']) def execute_command(): data = request.json method = data.get('method') target = data.get('target', '').strip() if not target: return Response("Error: A target is required.", status=400, mimetype='text/plain') if len(target) > MAX_TARGET_LENGTH: return Response(f"Error: Target exceeds maximum length.", status=400, mimetype='text/plain') if target.startswith('-'): return Response("Error: Target cannot start with a hyphen.", status=400, mimetype='text/plain') is_ip = is_valid_ip_or_prefix(target) is_host = is_valid_hostname(target) if not (is_ip or is_host): return Response( "Error: Invalid input. Please provide a valid IP, prefix, or fully qualified domain name.", status=400, mimetype='text/plain' ) version = get_ip_version(target) if version == 'ipv6' and method in ['ping', 'mtr', 'traceroute']: method += '6' return Response(execute_command_streaming(method, target), mimetype='text/plain') @app.route('/api/visualize', methods=['POST']) def visualize_route(): ip_address_str = request.json.get('ip_address') graph_data = generate_visual_route_graph(ip_address_str) if "error" in graph_data: return jsonify(graph_data), 400 return jsonify(graph_data) if __name__ == '__main__': app.run(host='0.0.0.0', port=5008, debug=True)