""" Evolution Research API — FastAPI backend for agent-model evaluation jobs. Endpoints: POST /api/research → start background evaluation job GET /api/research/{id} → job status & results POST /api/research/cell → evaluate single agent-model pair GET /api/real-fit-report → serve real-fit-report.json (live from DB) GET /api/models → list available models GET /api/evaluation/{agent}/{model} → detailed evaluation record POST /api/evolve-agent/start → start role-fit testing job (evolution-prompt + evolution-skeptic) """ import json import os import sqlite3 import subprocess import time import uuid from datetime import datetime, timezone from pathlib import Path from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel app = FastAPI(title="Evolution Research API", version="1.1.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) JOB_STATE_PATH = Path(os.environ.get("JOB_STATE_PATH", "/app/data/research-jobs.json")) REPORT_PATH = Path(os.environ.get("REPORT_PATH", "/app/data/real-fit-report.json")) META_PATH = Path(os.environ.get("META_PATH", "/app/kilo-meta.json")) EVOLUTION_PATH = Path(os.environ.get("EVOLUTION_PATH", "/app/data/evolution.json")) ENGINE_PATH = Path(os.environ.get("ENGINE_PATH", "/app/scripts/real-fit-engine.py")) DB_PATH = Path(os.environ.get("REAL_FIT_DB", REPORT_PATH.parent / "real-fit.db")) def _load_json(path: Path) -> dict: if path.exists(): with open(path, "r", encoding="utf-8") as f: return json.load(f) return {} def _save_json(path: Path, data: dict) -> None: path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) def _load_jobs() -> dict: return _load_json(JOB_STATE_PATH) def _save_jobs(jobs: dict) -> None: _save_json(JOB_STATE_PATH, jobs) class ResearchRequest(BaseModel): agent: str models: list[str] class CellRequest(BaseModel): agent: str model: str class EvolveAgentRequest(BaseModel): agent: str models: list[str] def _spawn_engine_job(job_id: str, agent: str, models: list[str]) -> None: """Spawn real-fit-engine.py as a background subprocess to evaluate models. After evaluation, regenerates the report JSON so results are immediately visible. """ model_arg = ",".join(models) subprocess.Popen( ["python3", "-c", f""" import subprocess, json, time, os job_id = {repr(job_id)} job_state_path = os.environ.get('JOB_STATE_PATH', '/app/data/research-jobs.json') engine_path = os.environ.get('ENGINE_PATH', '/app/scripts/real-fit-engine.py') def load_jobs(): try: with open(job_state_path) as f: return json.load(f) except Exception: return {{}} def save_jobs(jobs): with open(job_state_path, 'w') as f: json.dump(jobs, f, indent=2) jobs = load_jobs() job = jobs.get(job_id) if job: job['status'] = 'running' job['updated_at'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) save_jobs(jobs) cmd = ['python3', engine_path, '--evaluate', {repr(agent)}, '--models', {repr(model_arg)}, '--report'] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) stdout, stderr = proc.communicate() jobs = load_jobs() job = jobs.get(job_id) if job: job['status'] = 'done' if proc.returncode == 0 else 'error' job['progress'] = 100 job['result'] = {{'returncode': proc.returncode, 'stdout': stdout, 'stderr': stderr}} job['updated_at'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) save_jobs(jobs) """], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) @app.get("/api/models") def get_models(): meta = _load_json(META_PATH) agents_meta = meta.get("agents", {}) models = set() for agent in agents_meta.values(): m = agent.get("model", "") if m: models.add(m) evolution = _load_json(EVOLUTION_PATH) for agent_data in evolution.get("agents", {}).values(): curr = agent_data.get("current", {}) m = curr.get("model", "") if m: models.add(m) for rec in agent_data.get("recommendations", []): mod = rec.get("model", "") if mod: models.add(mod) return {"models": sorted(models)} @app.get("/api/evaluation/{agent}/{model}") def get_evaluation(agent: str, model: str): db_path = str(DB_PATH) if not os.path.exists(db_path): raise HTTPException(status_code=404, detail="Evaluation database not found") conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() # Step 1: Get the best evaluation for this agent-model pair cursor.execute( """ SELECT e.id, e.agent_name, e.model, e.prompt_id, e.response, e.scores, e.total_score, e.explanation, e.evaluator, e.latency_ms, e.tokens_prompt, e.tokens_response, e.evaluated_at FROM evaluations e WHERE e.agent_name = ? AND e.model = ? AND e.total_score > 0 ORDER BY e.total_score DESC, e.id DESC LIMIT 1 """, (agent, model), ) row = cursor.fetchone() if not row: # Fallback: try any evaluation even with score 0 cursor.execute( """ SELECT e.id, e.agent_name, e.model, e.prompt_id, e.response, e.scores, e.total_score, e.explanation, e.evaluator, e.latency_ms, e.tokens_prompt, e.tokens_response, e.evaluated_at FROM evaluations e WHERE e.agent_name = ? AND e.model = ? ORDER BY e.id DESC LIMIT 1 """, (agent, model), ) row = cursor.fetchone() if not row: conn.close() raise HTTPException(status_code=404, detail="Evaluation not found for this agent-model pair") result = dict(row) prompt_id = result.get("prompt_id") # Step 2: Get prompt data — try by prompt_id first, then fallback by agent_name system_prompt = "" user_prompt = "" expected_keywords_raw = "[]" rubric_raw = "{}" if prompt_id: cursor.execute( "SELECT system_prompt, user_prompt, expected_keywords, rubric FROM test_prompts WHERE id = ?", (prompt_id,), ) tp = cursor.fetchone() if tp and tp["system_prompt"]: system_prompt = tp["system_prompt"] user_prompt = tp["user_prompt"] or "" expected_keywords_raw = tp["expected_keywords"] or "[]" rubric_raw = tp["rubric"] or "{}" # Fallback: find prompt by agent_name if JOIN failed if not system_prompt: cursor.execute( "SELECT system_prompt, user_prompt, expected_keywords, rubric FROM test_prompts WHERE agent_name = ? ORDER BY id DESC LIMIT 1", (agent,), ) tp = cursor.fetchone() if tp: system_prompt = tp["system_prompt"] or "" user_prompt = tp["user_prompt"] or "" expected_keywords_raw = tp["expected_keywords"] or "[]" rubric_raw = tp["rubric"] or "{}" conn.close() # Assign all fetched prompt data to the result result["system_prompt"] = system_prompt result["user_prompt"] = user_prompt result["expected_keywords"] = expected_keywords_raw result["rubric"] = rubric_raw for key in ("expected_keywords", "rubric", "scores"): raw = result.get(key) if isinstance(raw, str): try: result[key] = json.loads(raw) except json.JSONDecodeError: result[key] = [] if key == "expected_keywords" else {} elif raw is None: result[key] = [] if key == "expected_keywords" else {} return result def _sync_agents_from_meta(db_path: Path, meta_path: Path | None = None) -> None: """Import any missing agents from kilo-meta.json into the DB agents table.""" if meta_path is None: meta_path = db_path.parent.parent.parent / "kilo-meta.json" if not meta_path.exists(): return with open(meta_path) as f: meta = json.load(f) conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() cursor.execute("SELECT name FROM agents") existing = {r[0] for r in cursor.fetchall()} for name, info in meta.get("agents", {}).items(): if name in existing: continue cursor.execute( "INSERT OR IGNORE INTO agents (name, description, category, current_model, color, updated) VALUES (?, ?, ?, ?, ?, ?)", ( name, info.get("description", ""), info.get("category", "meta"), info.get("model", ""), info.get("color", "#6B7280"), datetime.now(timezone.utc).isoformat(), ), ) conn.commit() conn.close() def _build_report_from_db(db_path: Path) -> dict: """Build real-fit report dynamically from SQLite DB (filtered, objective).""" _sync_agents_from_meta(db_path) conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute(""" SELECT name, description, category, current_model FROM agents """) agents_meta = {row["name"]: dict(row) for row in cursor.fetchall()} # Only take evaluations that are NOT HTTP error responses # AND prefer evaluator='rubric_v2' over 'rubric_v1' cursor.execute(""" SELECT agent_name, model, total_score, evaluator, response FROM evaluations WHERE total_score > 0 AND evaluator NOT LIKE '%rubric_v1%' AND (response IS NULL OR (response NOT LIKE '%[HTTP %' AND response != '')) ORDER BY agent_name, model, CASE evaluator WHEN 'evolution-skeptic' THEN 0 WHEN 'rubric_v2' THEN 1 ELSE 2 END, total_score DESC """) # Take the first (best preferred evaluator, highest score) per agent-model best_evals = {} for row in cursor.fetchall(): agent = row["agent_name"] model = row["model"] score = row["total_score"] if agent not in best_evals: best_evals[agent] = {} if model not in best_evals[agent]: best_evals[agent][model] = score # Rebuild fit_scores from selected evaluations only cursor.execute(""" SELECT agent_name, model, MAX(total_score) as best_score, scores, explanation FROM evaluations WHERE total_score > 0 AND evaluator NOT LIKE '%rubric_v1%' AND (response IS NULL OR (response NOT LIKE '%[HTTP %' AND response != '')) GROUP BY agent_name, model """) fit_scores = {} for row in cursor.fetchall(): fit_scores[row["agent_name"]] = { "model": row["model"], "fit": row["best_score"], "explanation": ( f"Best model for {row['agent_name']} is {row['model']} " f"with avg score {row['best_score']:.1f}. " "Evaluator preference: evolution-skeptic > rubric_v2 > rubric_v1 (ignored HTTP errors)." ), } conn.close() agents_report = {} for agent_name, meta in agents_meta.items(): evals = best_evals.get(agent_name, {}) if evals: best_model = max(evals, key=evals.get) best_score = evals[best_model] else: best_model = "" best_score = 0.0 agents_report[agent_name] = { "name": agent_name, "evaluations": evals, "info": [ meta.get("description") or "", meta.get("category") or "", meta.get("current_model") or "", ], "best_model": best_model, "best_score": best_score, } total_evals = sum(len(evals) for evals in best_evals.values()) return { "generated": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()).replace("+0000", "+00:00"), "source": "real-fit-engine-db-filtered", "total_evaluations": total_evals, "agents": agents_report, "fit_scores": fit_scores, } @app.get("/api/real-fit-report") def get_real_fit_report(): db_path = str(DB_PATH) if os.path.exists(db_path): return _build_report_from_db(DB_PATH) return _load_json(REPORT_PATH) @app.post("/api/research") def start_research(req: ResearchRequest): job_id = str(uuid.uuid4()) jobs = _load_jobs() jobs[job_id] = { "id": job_id, "agent": req.agent, "models": req.models, "status": "pending", "progress": 0, "result": None, "created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "updated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } _save_jobs(jobs) _spawn_engine_job(job_id, req.agent, req.models) return {"job_id": job_id, "status": "pending", "agent": req.agent, "models": req.models} def _extract_scores_from_report(agent: str, models: list[str]) -> list[dict]: """Read real-fit-report.json and return scores for agent x models.""" report = _load_json(REPORT_PATH) agent_data = report.get("agents", {}).get(agent, {}) evaluations = agent_data.get("evaluations", {}) results = [] for m in models: score = evaluations.get(m, 0) pending = score == 0 results.append({"model": m, "score": score, "pending": pending}) return results @app.get("/api/research/{job_id}") def get_research(job_id: str): jobs = _load_jobs() job = jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") if job.get("status") == "done" and job.get("result") is not None: job["models_scored"] = _extract_scores_from_report(job["agent"], job.get("models", [])) return job @app.post("/api/research/cell") def research_cell(req: CellRequest): job_id = str(uuid.uuid4()) jobs = _load_jobs() jobs[job_id] = { "id": job_id, "agent": req.agent, "models": [req.model], "status": "pending", "progress": 0, "result": None, "created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "updated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } _save_jobs(jobs) _spawn_engine_job(job_id, req.agent, [req.model]) return {"job_id": job_id, "status": "pending", "agent": req.agent, "model": req.model} @app.post("/api/evolve-agent/start") def start_evolve_agent(req: EvolveAgentRequest): """Start a role-fit evaluation job using evolution-prompt and evolution-skeptic. For now, this places a job in the queue that will be picked up by the real-fit-engine. In the full implementation: 1. evolution-prompt generates role-specific stress-test prompts from agent definition 2. Each model in models list is tested with the same prompt 3. evolution-skeptic evaluates each response with per-dimension rubric scoring 4. Results are stored in SQLite and report is regenerated """ job_id = str(uuid.uuid4()) jobs = _load_jobs() jobs[job_id] = { "id": job_id, "type": "evolve-agent", "agent": req.agent, "models": req.models, "status": "pending", "progress": 0, "result": None, "created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "updated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), } _save_jobs(jobs) # Placeholder: spawn the same engine job with evolve-agent type # In full implementation, this would spawn a script that: # 1. Reads agent definition from .kilo/agents/{agent}.md # 2. Calls Ollama API for evolution-prompt to generate test prompts # 3. For each model: calls Ollama API, stores response # 4. Calls Ollama API for evolution-skeptic to evaluate # 5. Stores results in SQLite, rebuilds report _spawn_engine_job(job_id, req.agent, req.models) return {"job_id": job_id, "status": "pending", "agent": req.agent, "models": req.models}