Update app.py
This commit is contained in:
parent
c06c6b3939
commit
b2af65d0f9
1 changed files with 69 additions and 54 deletions
123
app.py
123
app.py
|
|
@ -2,11 +2,16 @@ import os
|
||||||
import ipaddress
|
import ipaddress
|
||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
|
import subprocess
|
||||||
|
import statistics
|
||||||
|
import socket
|
||||||
from flask import Flask, render_template, jsonify, request, Response
|
from flask import Flask, render_template, jsonify, request, Response
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
from modules.network_tools import execute_command_streaming
|
from modules.network_tools import execute_command_streaming
|
||||||
from modules.visual_route import generate_visual_route_graph, get_raw_bgp_route
|
from modules.visual_route import generate_visual_route_graph, get_raw_bgp_route
|
||||||
|
from modules.mtr_parser import MtrParser
|
||||||
|
from modules.parse import run_bgp_route_curl_command
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
|
|
@ -15,19 +20,37 @@ app = Flask(__name__)
|
||||||
FQDN_REGEX = re.compile(r'^(?!-)(?:[a-zA-Z0-9-]{0,62}[a-zA-Z0-9]\.){1,126}(?!-d$)[a-zA-Z0-9-]{2,63}$')
|
FQDN_REGEX = re.compile(r'^(?!-)(?:[a-zA-Z0-9-]{0,62}[a-zA-Z0-9]\.){1,126}(?!-d$)[a-zA-Z0-9-]{2,63}$')
|
||||||
MAX_TARGET_LENGTH = 255
|
MAX_TARGET_LENGTH = 255
|
||||||
|
|
||||||
def is_valid_ip(target: str) -> bool:
|
def validate_target(target: str) -> str | None:
|
||||||
try:
|
if not target:
|
||||||
ipaddress.ip_address(target)
|
return "A target is required."
|
||||||
return True
|
if len(target) > MAX_TARGET_LENGTH:
|
||||||
except ValueError:
|
return f"Target exceeds maximum length of {MAX_TARGET_LENGTH} characters."
|
||||||
return False
|
if target.startswith('-') and '.' not in target:
|
||||||
|
return "Invalid target. Input resembles a command-line option."
|
||||||
|
|
||||||
def is_valid_hostname(hostname: str) -> bool:
|
try:
|
||||||
if len(hostname) > MAX_TARGET_LENGTH:
|
ip = ipaddress.ip_address(target)
|
||||||
return False
|
if ip.is_private or ip.is_loopback or ip.is_multicast or ip.is_unspecified:
|
||||||
if is_valid_ip(hostname):
|
return "Target IP address is in a reserved or private range and is not allowed."
|
||||||
return False
|
return None
|
||||||
return FQDN_REGEX.match(hostname) is not None
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if not FQDN_REGEX.match(target):
|
||||||
|
return "Invalid input. Please provide a valid public IP address or FQDN."
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_all_locations_merged():
|
||||||
|
public_locations = json.loads(os.getenv('PUBLIC_LOCATIONS', '{}'))
|
||||||
|
backend_locations = json.loads(os.getenv('BACKEND_LOCATIONS', '{}'))
|
||||||
|
merged_locations = public_locations.copy()
|
||||||
|
for name, backend_config in backend_locations.items():
|
||||||
|
if name in merged_locations:
|
||||||
|
merged_locations[name].update(backend_config)
|
||||||
|
else:
|
||||||
|
merged_locations[name] = backend_config
|
||||||
|
return merged_locations
|
||||||
|
|
||||||
def get_ip_version_from_ip(target):
|
def get_ip_version_from_ip(target):
|
||||||
try:
|
try:
|
||||||
|
|
@ -36,14 +59,6 @@ def get_ip_version_from_ip(target):
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return 'ipv4'
|
return 'ipv4'
|
||||||
|
|
||||||
@app.route("/ping")
|
|
||||||
def ping():
|
|
||||||
return "pong"
|
|
||||||
|
|
||||||
@app.errorhandler(404)
|
|
||||||
def page_not_found(e):
|
|
||||||
return render_template("404.html"), 404
|
|
||||||
|
|
||||||
@app.route('/')
|
@app.route('/')
|
||||||
def index():
|
def index():
|
||||||
return render_template('looking_glass.html')
|
return render_template('looking_glass.html')
|
||||||
|
|
@ -53,8 +68,15 @@ def get_locations():
|
||||||
locations_str = os.getenv('LOCATIONS', '{}')
|
locations_str = os.getenv('LOCATIONS', '{}')
|
||||||
try:
|
try:
|
||||||
locations = json.loads(locations_str)
|
locations = json.loads(locations_str)
|
||||||
|
|
||||||
|
sensitive_keys = {"vyos_api_url", "vyos_api_key", "bgp_vrf_name"}
|
||||||
|
public_locations = {}
|
||||||
|
for name, config in locations.items():
|
||||||
|
public_config = {k: v for k, v in config.items() if k not in sensitive_keys}
|
||||||
|
public_locations[name] = public_config
|
||||||
|
|
||||||
config_data = {
|
config_data = {
|
||||||
"locations": locations,
|
"locations": public_locations,
|
||||||
"client_ip_api": {
|
"client_ip_api": {
|
||||||
"v4": os.getenv('CLIENT_IPV4_API_URL'),
|
"v4": os.getenv('CLIENT_IPV4_API_URL'),
|
||||||
"v6": os.getenv('CLIENT_IPV6_API_URL')
|
"v6": os.getenv('CLIENT_IPV6_API_URL')
|
||||||
|
|
@ -64,57 +86,50 @@ def get_locations():
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
return jsonify({"error": "Invalid LOCATIONS format in .env file"}), 500
|
return jsonify({"error": "Invalid LOCATIONS format in .env file"}), 500
|
||||||
|
|
||||||
@app.route('/api/client-ip')
|
|
||||||
def client_ip():
|
|
||||||
client_ip_addr = request.headers.get('X-Forwarded-for', request.remote_addr)
|
|
||||||
if client_ip_addr and client_ip_addr.startswith('::ffff:'):
|
|
||||||
client_ip_addr = client_ip_addr[7:]
|
|
||||||
return jsonify({'ip': client_ip_addr})
|
|
||||||
|
|
||||||
|
|
||||||
@app.route('/api/execute', methods=['POST'])
|
@app.route('/api/execute', methods=['POST'])
|
||||||
def execute_command():
|
def execute_command():
|
||||||
data = request.json
|
data = request.json
|
||||||
method = data.get('method')
|
method = data.get('method')
|
||||||
target = data.get('target', '').strip()
|
target = data.get('target', '').strip()
|
||||||
|
|
||||||
if not target:
|
error = validate_target(target)
|
||||||
return jsonify({"error": "A target is required."})
|
if error:
|
||||||
if len(target) > MAX_TARGET_LENGTH:
|
return jsonify({"error": error}), 400
|
||||||
return jsonify({"error": f"Target exceeds maximum length of {MAX_TARGET_LENGTH} characters."})
|
|
||||||
if target.startswith('-'):
|
|
||||||
return jsonify({"error": "Target cannot start with a hyphen."})
|
|
||||||
|
|
||||||
if not (is_valid_ip(target) or is_valid_hostname(target)):
|
|
||||||
return jsonify({"error": "Invalid input. Please provide a valid IP address or a fully qualified domain name."})
|
|
||||||
|
|
||||||
version = get_ip_version_from_ip(target)
|
|
||||||
|
|
||||||
if version == 'ipv6' and method in ['ping', 'mtr', 'traceroute']:
|
|
||||||
method += '6'
|
|
||||||
|
|
||||||
return Response(execute_command_streaming(method, target), mimetype='text/plain')
|
return Response(execute_command_streaming(method, target), mimetype='text/plain')
|
||||||
|
|
||||||
@app.route('/api/bgp_raw_lookup', methods=['POST'])
|
@app.route('/api/bgp_raw_lookup', methods=['POST'])
|
||||||
def bgp_raw_lookup():
|
def bgp_raw_lookup():
|
||||||
target = request.json.get('target', '').strip()
|
data = request.json
|
||||||
data, error = get_raw_bgp_route(target)
|
target = data.get('target', '').strip()
|
||||||
|
location_name = data.get('location')
|
||||||
|
|
||||||
|
locations = get_all_locations_merged()
|
||||||
|
location_config = locations.get(location_name, {})
|
||||||
|
|
||||||
|
data, error = get_raw_bgp_route(target, location_config, bgp_lookup_func=run_bgp_route_curl_command)
|
||||||
|
|
||||||
if error:
|
if error:
|
||||||
return jsonify({"error": error})
|
return jsonify({"error": error}), 400
|
||||||
|
|
||||||
return Response(data, mimetype='text/plain')
|
return Response(data, mimetype='text/plain')
|
||||||
|
|
||||||
@app.route('/api/visualize', methods=['POST'])
|
@app.route('/api/visualize', methods=['POST'])
|
||||||
def visualize_route():
|
def visualize_route():
|
||||||
ip_address_str = request.json.get('ip_address', '').strip()
|
data = request.json
|
||||||
graph_data = generate_visual_route_graph(ip_address_str)
|
ip_address_str = data.get('ip_address', '').strip()
|
||||||
|
location_name = data.get('location')
|
||||||
|
|
||||||
|
locations = get_all_locations_merged()
|
||||||
|
location_config = locations.get(location_name, {})
|
||||||
|
|
||||||
|
graph_data = generate_visual_route_graph(
|
||||||
|
ip_address_str,
|
||||||
|
location_config,
|
||||||
|
bgp_lookup_func=run_bgp_route_curl_command
|
||||||
|
)
|
||||||
|
|
||||||
if "error" in graph_data:
|
if "error" in graph_data:
|
||||||
return jsonify(graph_data)
|
return jsonify(graph_data), 400
|
||||||
|
|
||||||
return jsonify(graph_data)
|
return jsonify(graph_data)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
app.run(host='0.0.0.0', port=5008, debug=True)
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue