From 192a1ce9af9da62df430e35a1c21f54da6296396 Mon Sep 17 00:00:00 2001 From: Blackwhitebear8 Date: Wed, 13 Aug 2025 15:57:58 +0200 Subject: [PATCH] Add modules/bgp_dampening.py --- modules/bgp_dampening.py | 70 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 modules/bgp_dampening.py diff --git a/modules/bgp_dampening.py b/modules/bgp_dampening.py new file mode 100644 index 0000000..3a2d66c --- /dev/null +++ b/modules/bgp_dampening.py @@ -0,0 +1,70 @@ +import collections.abc +import re + +def parse_dampened_data(raw_json_data, ip_version): + dampened_paths = [] + summary_string = "" + + if not raw_json_data or not raw_json_data.get("success") or "data" not in raw_json_data: + return {"paths": [], "summary": "Displayed 0 routes and 0 total paths"} + + text_data = raw_json_data["data"] + + if not isinstance(text_data, str) or not text_data.strip(): + return {"paths": [], "summary": "Displayed 0 routes and 0 total paths"} + + current_prefix = None + lines = text_data.split('\n') + + for line in lines: + stripped_line = line.strip() + + if stripped_line.startswith("Displayed"): + summary_string = stripped_line + continue + + if not stripped_line or stripped_line.startswith("BGP table") or stripped_line.startswith("Default") or \ + stripped_line.startswith("Status codes") or stripped_line.startswith("Nexthop codes") or \ + stripped_line.startswith("Origin codes") or stripped_line.startswith("RPKI") or \ + "Network" in stripped_line: + continue + + parts = line.split() + if len(parts) < 4: + continue + + path_info = { + "metric": "N/A", + "local_pref": "N/A", + "weight": "N/A", + "ip_version": ip_version + } + + is_new_prefix = '/' in parts[1] + + try: + if is_new_prefix: + current_prefix = parts[1] + path_info["status"] = parts[0] + path_info["prefix"] = current_prefix + path_info["from"] = parts[2] + path_info["suppress_remain"] = parts[3] + path_info["path"] = ' '.join(parts[4:]) + dampened_paths.append(path_info) + elif current_prefix: + path_info["status"] = parts[0] + path_info["prefix"] = current_prefix + path_info["from"] = parts[1] + path_info["suppress_remain"] = parts[2] + path_info["path"] = ' '.join(parts[3:]) + dampened_paths.append(path_info) + except IndexError: + continue + + if not summary_string and not dampened_paths: + summary_string = "Displayed 0 routes and 0 total paths" + + return {"paths": dampened_paths, "summary": summary_string} + +def generate_dampened_json(ipv4_data, ipv6_data): + return {"ipv4": ipv4_data, "ipv6": ipv6_data} \ No newline at end of file