#!/usr/bin/env python3 """Micro API for landing page — reads live agent configs and returns JSON.""" import json, os, glob, re from datetime import datetime, timezone import socketserver import http.server PORT = 8080 FALLBACK_DIR = "/usr/share/nginx/html" def find_dir(sub): candidates = [ os.path.join(FALLBACK_DIR, sub), os.path.join(os.path.dirname(__file__), sub), f"/app/{sub}", f"./{sub}", ] for c in candidates: if os.path.isdir(c): return c return None def find_file(name): candidates = [ os.path.join(FALLBACK_DIR, "api", name), os.path.join(os.path.dirname(__file__), name), f"/app/landing/api/{name}", ] for c in candidates: if os.path.isfile(c): return c return None def parse_frontmatter(path): try: with open(path, "r", encoding="utf-8") as f: content = f.read() except Exception: return None if not content.startswith("---"): return None end = content.find("---", 3) if end == -1: return None fm = content[3:end] data = {} for line in fm.strip().split("\n"): m = re.match(r"^(\w+):\s*(.+)$", line) if m: data[m.group(1)] = m.group(2).strip() return data def load_dashboard_data(): path = find_file("dashboard-data.json") if not path: return None try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) if data.get("agents"): return data except Exception: pass return None def load_real_fit_scores(): candidates = [ os.path.join(os.path.dirname(__file__), "real-fit-report.json"), os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "real-fit-report.json"), os.path.join(FALLBACK_DIR, "data", "real-fit-report.json"), "/app/agent-evolution/data/real-fit-report.json", ] for path in candidates: if path and os.path.isfile(path): try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) return data.get("fit_scores", {}) except Exception: continue return {} def build_state_from_md(): agents_dir = find_dir(".kilo/agents") commands_dir = find_dir(".kilo/commands") agents = [] if agents_dir: for f in sorted(glob.glob(os.path.join(agents_dir, "*.md"))): fm = parse_frontmatter(f) if fm and fm.get("model"): agents.append({ "name": os.path.basename(f).replace(".md", ""), "model": fm.get("model", ""), "mode": fm.get("mode", "subagent"), "description": fm.get("description", ""), "category": infer_category(fm.get("mode", ""), os.path.basename(f)), "fit_score": None, "model_meta": None, }) commands = [] if commands_dir: for f in sorted(glob.glob(os.path.join(commands_dir, "*.md"))): fm = parse_frontmatter(f) if fm and fm.get("model"): commands.append({ "name": os.path.basename(f).replace(".md", ""), "model": fm.get("model", ""), "mode": fm.get("mode", "command"), "description": fm.get("description", ""), "fit_score": None, }) model_stats = {} for a in agents: model_stats[a["model"]] = model_stats.get(a["model"], 0) + 1 for c in commands: model_stats[c["model"]] = model_stats.get(c["model"], 0) + 1 return { "generated": datetime.now(timezone.utc).isoformat().replace("+00:00", "") + "Z", "total_agents": len(agents), "total_commands": len(commands), "model_distribution": model_stats, "agents": agents, "commands": commands, } def build_state(): dashboard = load_dashboard_data() fit_scores = load_real_fit_scores() if dashboard: agents = dashboard.get("agents", []) for a in agents: key = a.get("name") fs = fit_scores.get(key) if fs: a["fit_score"] = fs.get("fit") a["fit_explanation"] = fs.get("explanation") a["best_model"] = fs.get("model") state = { "generated": datetime.now(timezone.utc).isoformat().replace("+00:00", "") + "Z", "total_agents": dashboard.get("total_agents", 0), "total_commands": len(dashboard.get("commands", [])), "model_distribution": dashboard.get("model_distribution", {}), "agents": agents, "commands": dashboard.get("commands", []), } state["fit_scores"] = fit_scores return state return build_state_from_md() def infer_category(mode, filename): f = filename.lower() if "security" in f: return "Security" if "devops" in f: return "DevOps" if "frontend" in f or "flutter" in f: return "Frontend" if "backend" in f or "php" in f or "python" in f or "go" in f: return "Backend" if "test" in f or "sdet" in f: return "QA" return "Core" class Handler(http.server.BaseHTTPRequestHandler): def do_GET(self): if self.path == "/api/state": state = build_state() body = json.dumps(state, ensure_ascii=False).encode("utf-8") self.send_response(200) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(body) else: self.send_response(404) self.end_headers() def log_message(self, format, *args): pass # silent if __name__ == "__main__": with socketserver.TCPServer(("0.0.0.0", PORT), Handler) as httpd: print(f"[state-api] listening on :{PORT}") httpd.serve_forever()