import re def parse_rpki_lookup_data(api_response): """ Parses the raw string data from the RPKI lookup API call. """ if not api_response.get("success") or not api_response.get("data"): return api_response raw_data = api_response["data"] lines = raw_data.strip().split('\n') prefixes = [] summary = [] header_found = False prefix_regex = re.compile(r'^(\S+)\s+(.+?)\s+(\S+)$') for line in lines: line = line.strip() if not line or "RPKI/RTR prefix table" in line: continue if "Prefix Length Origin-AS" in line: header_found = True continue if not header_found: continue match = prefix_regex.match(line) if match: prefixes.append({ "prefix": match.group(1).strip(), "length": match.group(2).strip(), "as": match.group(3).strip() }) elif "Number of" in line: summary.append(line) return {"prefixes": prefixes, "summary": summary} def parse_rpki_cache_data(api_response): """ Parses the raw string data from the RPKI cache connection status API call. """ if not api_response.get("success") or not api_response.get("data"): return [] raw_data = api_response["data"] servers = [] lines = raw_data.strip().split('\n') for line in lines: match = re.search(r'rpki tcp cache ([\d\.:a-fA-F]+) (\d+) pref (\d+)(.*)', line) if match: ip_address = match.group(1) port = match.group(2) preference = match.group(3) status_text = match.group(4).strip() status = "Not Connected" if "(connected)" in status_text: status = "Connected" servers.append({ "ip_address": ip_address, "port": port, "preference": preference, "status": status }) return servers