LookingGlas/app.py

120 lines
No EOL
3.5 KiB
Python

import os
import ipaddress
import json
import re
from flask import Flask, render_template, jsonify, request, Response
from dotenv import load_dotenv
from modules.network_tools import execute_command_streaming
from modules.visual_route import generate_visual_route_graph
load_dotenv()
app = Flask(__name__)
FQDN_REGEX = re.compile(r'^(?!-)(?:[a-zA-Z0-9-]{0,62}[a-zA-Z0-9]\.){1,126}(?!-d$)[a-zA-Z0-9-]{2,63}$')
MAX_TARGET_LENGTH = 255
def is_valid_ip_or_prefix(target: str) -> bool:
try:
if '/' in target:
ipaddress.ip_network(target, strict=False)
else:
ipaddress.ip_address(target)
return True
except ValueError:
return False
def is_valid_hostname(hostname: str) -> bool:
if len(hostname) > MAX_TARGET_LENGTH:
return False
if is_valid_ip_or_prefix(hostname):
return False
return FQDN_REGEX.match(hostname) is not None
def get_ip_version(target):
try:
ip = ipaddress.ip_address(target)
return f"ipv{ip.version}"
except ValueError:
try:
net = ipaddress.ip_network(target, strict=False)
return f"ipv{net.version}"
except ValueError:
return 'ipv4'
@app.route('/')
def index():
return render_template('looking_glass.html')
@app.route('/api/locations')
def get_locations():
locations_str = os.getenv('LOCATIONS', '{}')
try:
locations = json.loads(locations_str)
config_data = {
"locations": locations,
"client_ip_api": {
"v4": os.getenv('CLIENT_IPV4_API_URL'),
"v6": os.getenv('CLIENT_IPV6_API_URL')
}
}
return jsonify(config_data)
except json.JSONDecodeError:
return jsonify({"error": "Invalid LOCATIONS format in .env file"}), 500
@app.route('/api/client-ip')
def client_ip():
client_ip_addr = request.headers.get('X-Forwarded-for', request.remote_addr)
if client_ip_addr and client_ip_addr.startswith('::ffff:'):
client_ip_addr = client_ip_addr[7:]
return jsonify({'ip': client_ip_addr})
@app.route('/api/execute', methods=['POST'])
def execute_command():
data = request.json
method = data.get('method')
target = data.get('target', '').strip()
if not target:
return Response("Error: A target is required.", status=400, mimetype='text/plain')
if len(target) > MAX_TARGET_LENGTH:
return Response(f"Error: Target exceeds maximum length.", status=400, mimetype='text/plain')
if target.startswith('-'):
return Response("Error: Target cannot start with a hyphen.", status=400, mimetype='text/plain')
is_ip = is_valid_ip_or_prefix(target)
is_host = is_valid_hostname(target)
if not (is_ip or is_host):
return Response(
"Error: Invalid input. Please provide a valid IP, prefix, or fully qualified domain name.",
status=400,
mimetype='text/plain'
)
version = get_ip_version(target)
if version == 'ipv6' and method in ['ping', 'mtr', 'traceroute']:
method += '6'
return Response(execute_command_streaming(method, target), mimetype='text/plain')
@app.route('/api/visualize', methods=['POST'])
def visualize_route():
ip_address_str = request.json.get('ip_address')
graph_data = generate_visual_route_graph(ip_address_str)
if "error" in graph_data:
return jsonify(graph_data), 400
return jsonify(graph_data)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5008, debug=True)