diff --git a/app.py b/app.py new file mode 100644 index 0000000..fb38b90 --- /dev/null +++ b/app.py @@ -0,0 +1,680 @@ +import uvicorn +import httpx +import subprocess +import re +import shutil +import json +from fastapi import FastAPI, WebSocket, Request, WebSocketDisconnect +from fastapi.responses import HTMLResponse, PlainTextResponse, JSONResponse +from fastapi.middleware.cors import CORSMiddleware +from starlette.exceptions import HTTPException as StarletteHTTPException + +app = FastAPI() + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +translations = { + "nl": { + "title_home": "Jouw Publieke IP Adressen", + "title_expert": "Expert Verbindingsanalyse", + "grp_v6": "Jouw IPv6-gegevens", + "grp_v4": "Jouw IPv4-gegevens", + "lbl_ip6": "Jouw IPv6-adres:", + "lbl_ip4": "Jouw IPv4-adres:", + "lbl_isp": "Provider:", + "lbl_lat": "Latency:", + "lbl_lat_v6": "Jouw IPv6-latency:", + "lbl_lat_v4": "Jouw IPv4-latency:", + "btn_copy": "Kopieer", + "link_text": "Text-Only", + "link_expert": "Expert Mode", + "link_home": "Terug naar Home", + "ana_v6": "IPv6 Analyse", + "ana_v4": "IPv4 Analyse", + "lbl_ip": "IP Adres:", + "lbl_asn": "AS Nummer:", + "lbl_tcp": "TCP Latency (Min/Avg/Max):", + "lbl_jit": "Jitter (Stabiliteit):", + "lbl_mss": "Gemeten TCP MSS:", + "stat_load": "Laden...", + "stat_conn": "Verbinden...", + "stat_err": "Fout", + "stat_unkn": "Onbekend", + "est": "(Real-time)", + "404_title": "Pagina Niet Gevonden", + "404_msg": "De opgevraagde pagina bestaat niet.", + "404_btn": "Terug naar Home" + }, + "en": { + "title_home": "Your Public IP Addresses", + "title_expert": "Expert Connection Analysis", + "grp_v6": "Your IPv6 Details", + "grp_v4": "Your IPv4 Details", + "lbl_ip6": "Your IPv6 Address:", + "lbl_ip4": "Your IPv4 Address:", + "lbl_isp": "Provider:", + "lbl_lat": "Latency:", + "lbl_lat_v6": "Your IPv6 Latency:", + "lbl_lat_v4": "Your IPv4 Latency:", + "btn_copy": "Copy", + "link_text": "Text-Only", + "link_expert": "Expert Mode", + "link_home": "Back to Home", + "ana_v6": "IPv6 Analysis", + "ana_v4": "IPv4 Analysis", + "lbl_ip": "IP Address:", + "lbl_asn": "AS Number:", + "lbl_tcp": "TCP Latency (Min/Avg/Max):", + "lbl_jit": "Jitter (Stability):", + "lbl_mss": "Measured TCP MSS:", + "stat_load": "Loading...", + "stat_conn": "Connecting...", + "stat_err": "Error", + "stat_unkn": "Unknown", + "est": "(Real-time)", + "404_title": "Page Not Found", + "404_msg": "The requested page does not exist.", + "404_btn": "Back to Home" + } +} + +style_css = """ + +""" + +def get_real_ip(request: Request): + cf = request.headers.get("cf-connecting-ip") + if cf: return cf + fwd = request.headers.get("x-forwarded-for") + if fwd: return fwd.split(",")[0].strip() + return request.client.host + +def detect_language(request: Request): + accept_language = request.headers.get("accept-language", "").lower() + if "nl" in accept_language: + return "nl" + return "en" + +def get_real_tcp_mss(ip: str, port: str): + if not ip or not port: + return None + + if not shutil.which("ss"): + return "SS tool missing" + + try: + target = f"[{ip}]:{port}" if ":" in ip else f"{ip}:{port}" + + cmd = ["ss", "-n", "-i", "dst", target] + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=1) + output = result.stdout + + match = re.search(r'mss:(\d+)', output) + if match: + return f"{match.group(1)}b" + + return "Unknown" + except Exception: + return "Error" + +async def get_isp_home(ip: str): + try: + url = f"http://ip-api.com/json/{ip}?fields=isp" + async with httpx.AsyncClient() as client: + resp = await client.get(url, timeout=2.0) + return resp.json() + except: + return {} + +async def get_isp_expert(ip: str): + try: + url = f"http://ip-api.com/json/{ip}?fields=isp,as" + async with httpx.AsyncClient() as client: + resp = await client.get(url, timeout=2.0) + data = resp.json() + raw_as = data.get("as", "") + data["as_number"] = raw_as.split(" ")[0] if raw_as else "-" + return data + except: + return {} + +@app.get("/ping", response_class=PlainTextResponse) +async def ping(): + return "pong" + +@app.exception_handler(StarletteHTTPException) +async def custom_404_handler(request: Request, exc): + if exc.status_code == 404: + lang = detect_language(request) + t = translations[lang] + html = f""" + + + + + + + 404 - {t['404_title']} + {style_css} + + +
+

404

+

{t['404_title']}

+

{t['404_msg']}

+ {t['404_btn']} +
+ + + """ + return HTMLResponse(content=html, status_code=404) + return PlainTextResponse(str(exc.detail), status_code=exc.status_code) + +@app.get("/", response_class=HTMLResponse) +async def index(request: Request): + lang = detect_language(request) + t = translations[lang] + + return f""" + + + + + + + {t['title_home']} + {style_css} + + + +
+

{t['title_home']}

+ +
+

{t['grp_v6']}

+
+ 🌐{t['lbl_ip6']} + {t['stat_load']} +
+
+ 🏢{t['lbl_isp']} + ... +
+
+ {t['lbl_lat_v6']} + ... +
+ +
+ +
+

{t['grp_v4']}

+
+ 🌐{t['lbl_ip4']} + {t['stat_load']} +
+
+ 🏢{t['lbl_isp']} + ... +
+
+ {t['lbl_lat_v4']} + ... +
+ +
+
+ + + +
{t['btn_copy']}!
+ + + + +""" + +@app.get("/expert", response_class=HTMLResponse) +async def expert_page(request: Request): + lang = detect_language(request) + t = translations[lang] + + return f""" + + + + + + {t['title_expert']} + {style_css} + + + +
+

{t['title_expert']}

+ + + + + + +
+ + + + +""" + +@app.get("/api/home") +async def api_home(request: Request, format: str = None): + ip = get_real_ip(request) + ua = request.headers.get("user-agent", "").lower() + + if format == "text" or "curl" in ua or "wget" in ua: + return PlainTextResponse(ip) + + details = await get_isp_home(ip) + + return JSONResponse({ + "ip": ip, + "isp": details.get("isp"), + "status": "ok" + }) + +@app.get("/api/expert") +async def api_expert(request: Request): + ip = get_real_ip(request) + port = request.headers.get("x-remote-port") + + details = await get_isp_expert(ip) + + return JSONResponse({ + "ip": ip, + "isp": details.get("isp"), + "as_number": details.get("as_number"), + "status": "ok" + }) + +@app.websocket("/ws") +async def ws_endpoint(websocket: WebSocket): + await websocket.accept() + + real_ip = websocket.headers.get("x-real-ip") or websocket.client.host + real_port = websocket.headers.get("x-remote-port") + + mss_value = "Unknown" + if real_port: + mss_value = get_real_tcp_mss(real_ip, real_port) + + await websocket.send_text(json.dumps({ + "type": "mss_report", + "value": mss_value + })) + + try: + while True: + data = await websocket.receive_text() + if data == "ping": + await websocket.send_text(data) + except WebSocketDisconnect: + pass + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=8000)