LookingGlas/app.py
2025-08-14 18:00:30 +02:00

143 lines
No EOL
4.4 KiB
Python

import os
import ipaddress
import json
import re
import subprocess
import statistics
import socket
from flask import Flask, render_template, jsonify, request, Response
from dotenv import load_dotenv
from modules.network_tools import execute_command_streaming
from modules.visual_route import generate_visual_route_graph, get_raw_bgp_route
from modules.mtr_parser import MtrParser
from modules.parse import run_bgp_route_curl_command
load_dotenv()
app = Flask(__name__)
FQDN_REGEX = re.compile(r'^(?!-)(?:[a-zA-Z0-9-]{0,62}[a-zA-Z0-9]\.){1,126}(?!-d$)[a-zA-Z0-9-]{2,63}$')
MAX_TARGET_LENGTH = 255
def validate_target(target: str) -> str | None:
if not target:
return "A target is required."
if len(target) > MAX_TARGET_LENGTH:
return f"Target exceeds maximum length of {MAX_TARGET_LENGTH} characters."
if target.startswith('-') and '.' not in target:
return "Invalid target. Input resembles a command-line option."
try:
ip = ipaddress.ip_address(target)
if ip.is_private or ip.is_loopback or ip.is_multicast or ip.is_unspecified:
return "Target IP address is in a reserved or private range and is not allowed."
return None
except ValueError:
pass
if not FQDN_REGEX.match(target):
return "Invalid input. Please provide a valid public IP address or FQDN."
return None
def get_all_locations_merged():
public_locations = json.loads(os.getenv('PUBLIC_LOCATIONS', '{}'))
backend_locations = json.loads(os.getenv('BACKEND_LOCATIONS', '{}'))
merged_locations = public_locations.copy()
for name, backend_config in backend_locations.items():
if name in merged_locations:
merged_locations[name].update(backend_config)
else:
merged_locations[name] = backend_config
return merged_locations
def get_ip_version_from_ip(target):
try:
ip = ipaddress.ip_address(target)
return f"ipv{ip.version}"
except ValueError:
return 'ipv4'
@app.route("/ping")
def ping():
return "pong"
@app.errorhandler(404)
def page_not_found(e):
return render_template("404.html"), 404
@app.route('/')
def index():
return render_template('looking_glass.html')
@app.route('/api/locations')
def get_locations():
locations_str = os.getenv('LOCATIONS', '{}')
try:
locations = json.loads(locations_str)
sensitive_keys = {"vyos_api_url", "vyos_api_key", "bgp_vrf_name"}
public_locations = {}
for name, config in locations.items():
public_config = {k: v for k, v in config.items() if k not in sensitive_keys}
public_locations[name] = public_config
config_data = {
"locations": public_locations,
"client_ip_api": {
"v4": os.getenv('CLIENT_IPV4_API_URL'),
"v6": os.getenv('CLIENT_IPV6_API_URL')
}
}
return jsonify(config_data)
except json.JSONDecodeError:
return jsonify({"error": "Invalid LOCATIONS format in .env file"}), 500
@app.route('/api/execute', methods=['POST'])
def execute_command():
data = request.json
method = data.get('method')
target = data.get('target', '').strip()
error = validate_target(target)
if error:
return jsonify({"error": error}), 400
return Response(execute_command_streaming(method, target), mimetype='text/plain')
@app.route('/api/bgp_raw_lookup', methods=['POST'])
def bgp_raw_lookup():
data = request.json
target = data.get('target', '').strip()
location_name = data.get('location')
locations = get_all_locations_merged()
location_config = locations.get(location_name, {})
data, error = get_raw_bgp_route(target, location_config, bgp_lookup_func=run_bgp_route_curl_command)
if error:
return jsonify({"error": error}), 400
return Response(data, mimetype='text/plain')
@app.route('/api/visualize', methods=['POST'])
def visualize_route():
data = request.json
ip_address_str = data.get('ip_address', '').strip()
location_name = data.get('location')
locations = get_all_locations_merged()
location_config = locations.get(location_name, {})
graph_data = generate_visual_route_graph(
ip_address_str,
location_config,
bgp_lookup_func=run_bgp_route_curl_command
)
if "error" in graph_data:
return jsonify(graph_data), 400
return jsonify(graph_data)