#!/usr/bin/env python3 """ Real-Fit Multi-Agent Evaluation Engine (sync/stdlib version — no external deps) SQLite-backed pipeline that evaluates agent-role × model fit via Ollama API. Usage: python3 real-fit-engine.py --init-db --import-evolution --generate-prompts python3 real-fit-engine.py --evaluate-all --models kimi-k2.6,deepseek-v4-pro python3 real-fit-engine.py --report python3 real-fit-engine.py --recalc --agent lead-developer --old-model qwen3-coder:480b --new-model kimi-k2.6 Configuration: OLLAMA_HOST (default: https://ollama.com/v1) """ import sqlite3, json, os, sys, re, time from glob import glob from datetime import datetime, timezone from urllib import request, error as urllib_error from concurrent.futures import ThreadPoolExecutor, as_completed DB_PATH = os.environ.get("REAL_FIT_DB", "agent-evolution/data/real-fit.db") # Load .env if present _ENV_LOADED = False if os.path.isfile(".env"): with open(".env") as f: for line in f: if line.strip() and not line.startswith("#") and "=" in line: k, v = line.strip().split("=", 1) os.environ.setdefault(k, v) _ENV_LOADED = True # Ollama Cloud credentials (from .env or fallback) _DEFAULT_KEY = "feaa56e2dff045af989346ca74cb33a6.xzJ-plOVSgTL1FbmL8PZZ3Wx" _DEFAULT_HOST = "https://ollama.com/v1" OLLAMA_HOST = os.environ.get("OLLAMA_HOST", _DEFAULT_HOST) OLLAMA_KEY = os.environ.get("OLLAMA_KEY", _DEFAULT_KEY) USE_MOCK = os.environ.get("OLLAMA_MOCK", "0") == "1" if not OLLAMA_KEY: print("[FATAL] OLLAMA_KEY not set. Cannot run real evaluations.", file=sys.stderr) sys.exit(1) DEFAULT_MODELS = ["kimi-k2.6", "deepseek-v4-pro", "deepseek-v4-flash", "glm-5.1", "qwen3-coder:480b", "qwen3.5-122b"] # ================================================================ # SCHEMA # ================================================================ SCHEMA = """ CREATE TABLE IF NOT EXISTS agents ( name TEXT PRIMARY KEY, description TEXT, category TEXT, current_model TEXT, color TEXT, updated TEXT ); CREATE TABLE IF NOT EXISTS models ( short_name TEXT PRIMARY KEY, full_id TEXT, if_score REAL, swe_bench REAL, parameters TEXT, context_window TEXT, updated TEXT ); CREATE TABLE IF NOT EXISTS test_prompts ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_name TEXT, task_type TEXT, system_prompt TEXT, user_prompt TEXT, expected_keywords TEXT, rubric TEXT ); CREATE TABLE IF NOT EXISTS evaluations ( id INTEGER PRIMARY KEY AUTOINCREMENT, agent_name TEXT, model TEXT, prompt_id INTEGER, response TEXT, latency_ms INTEGER, tokens_prompt INTEGER, tokens_response INTEGER, scores TEXT, total_score REAL, explanation TEXT, evaluated_at TEXT, evaluator TEXT ); CREATE TABLE IF NOT EXISTS recalculations ( id INTEGER PRIMARY KEY AUTOINCREMENT, trigger TEXT, agent_name TEXT, old_model TEXT, new_model TEXT, old_fit REAL, new_fit REAL, delta REAL, reason TEXT, recalculated_at TEXT ); CREATE TABLE IF NOT EXISTS fit_scores ( agent_name TEXT PRIMARY KEY, model TEXT, fit_score REAL, dimension_scores TEXT, explanation TEXT, evaluated_at TEXT, FOREIGN KEY (agent_name) REFERENCES agents(name) ); CREATE INDEX IF NOT EXISTS idx_eval_agent_model ON evaluations(agent_name, model); CREATE INDEX IF NOT EXISTS idx_recalc_agent ON recalculations(agent_name); """ def init_db(): os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.executescript(SCHEMA) conn.commit() conn.close() print(f"[db] Initialized schema in {DB_PATH}") # ================================================================ # PROMPT GENERATOR # ================================================================ def parse_frontmatter(path): """Parse YAML frontmatter and body from an agent markdown file.""" try: with open(path, 'r', encoding='utf-8') as f: content = f.read() except Exception: return {} if not content.startswith('---'): return {} parts = content.split('---', 2) if len(parts) < 3: return {} fm_raw = parts[1].strip() body = parts[2].strip() try: import yaml fm = yaml.safe_load(fm_raw) or {} except Exception: fm = {} for line in fm_raw.splitlines(): m = re.match(r'^(\w+):\s*(.+)$', line) if m: fm[m.group(1)] = m.group(2).strip() body_text = body[:1200] fm['_body'] = body_text fm['_body_snippet'] = body_text.replace('\n', ' ').strip()[:300] return fm def generate_task_for_agent(name, fm): """Generate a realistic task prompt from the agent's actual markdown definition.""" description = fm.get('description', '') if isinstance(fm, dict) else '' body = (fm.get('_body', '') if isinstance(fm, dict) else '')[:1500] system = f"You are {name}. {description}" # Build a task from real agent instructions lines = body.splitlines() instruction_lines = [] for line in lines: stripped = line.strip() if stripped and not stripped.startswith('#') and not stripped.startswith('---') and not stripped.startswith('|'): instruction_lines.append(stripped) if len(instruction_lines) >= 8: break if len(instruction_lines) >= 3: task = ( "Based on your role definition below, respond to the following scenario as you would in production.\n\n" "Your role instructions:\n" + '\n'.join(instruction_lines[:12]) + "\n\nNow, given this incoming task: \"A team member has submitted a pull request with several issues." " What do you do?\", provide your full response." ) else: task = f"Demonstrate your expertise as {name} in a realistic complex scenario. Provide a complete working solution." expected = [name.replace('-', ' ')] if description: expected.extend(description.lower().split()[:5]) for line in lines: l = line.strip() if l.startswith('-') or l.startswith('*'): expected.append(l.lstrip('-*').strip().lower()) expected = list(dict.fromkeys(expected))[:12] rubric = {'relevance': 40, 'completeness': 30, 'correctness': 30} return { 'system': system, 'task': task, 'expected': expected, 'rubric': rubric } def generate_prompts(): conn = sqlite3.connect(DB_PATH) conn.execute("DELETE FROM test_prompts") count = 0 for path in sorted(glob('.kilo/agents/*.md')): fm = parse_frontmatter(path) if not fm.get('model'): continue name = os.path.basename(path)[:-3] task = generate_task_for_agent(name, fm) if task: conn.execute(''' INSERT INTO test_prompts (agent_name, task_type, system_prompt, user_prompt, expected_keywords, rubric) VALUES (?, ?, ?, ?, ?, ?) ''', (name, 'primary', task['system'], task['task'], json.dumps(task['expected']), json.dumps(task['rubric']))) count += 1 conn.commit() conn.close() print(f"[prompts] Generated {count} test prompts") # ================================================================ # OLLAMA CLIENT # ================================================================ def call_ollama(model_short, system_prompt, user_prompt, expected_keywords=None, timeout=120): """Call Ollama API with retries. Returns (response_text, latency_ms, token_info_dict).""" if USE_MOCK: return ( "[MOCK] This is a simulated response for testing the pipeline without API calls.", 500, {"prompt": 100, "response": 200} ) headers = { "Content-Type": "application/json", "Authorization": f"Bearer {OLLAMA_KEY}", } body = json.dumps({ "model": model_short, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], "temperature": 0.2, }).encode("utf-8") url = f"{OLLAMA_HOST.rstrip('/')}/chat/completions" req = request.Request(url, data=body, headers=headers, method="POST") latency = 0 for attempt in range(1, 4): start = time.time() try: with request.urlopen(req, timeout=timeout) as resp: data = json.loads(resp.read().decode("utf-8")) latency = int((time.time() - start) * 1000) content = ( data.get("choices", [{}])[0].get("message", {}).get("content", "") or "" ) usage = data.get("usage", {}) tokens = { "prompt": usage.get("prompt_tokens", 0), "response": usage.get("completion_tokens", 0), } return content, latency, tokens except urllib_error.HTTPError as e: latency = int((time.time() - start) * 1000) if e.code in (429, 502, 503, 504): wait = 2 ** attempt print(f" [retry] {model_short}: HTTP {e.code} → sleeping {wait}s (attempt {attempt}/3)") time.sleep(wait) continue return f"[HTTP {e.code}] {e.read().decode('utf-8', 'ignore')[:200]}", latency, {} except urllib_error.URLError as e: latency = int((time.time() - start) * 1000) wait = 2 ** attempt print(f" [retry] {model_short}: {e.reason} → sleeping {wait}s (attempt {attempt}/3)") time.sleep(wait) continue except Exception as e: latency = int((time.time() - start) * 1000) return f"[ERROR] {type(e).__name__}: {str(e)[:200]}", latency, {} return "[FATAL] All retries exhausted", latency, {} # ================================================================ # EVALUATOR # ================================================================ def evaluate_response(response, expected_json, rubric_json): """Rubric-based evaluation. Returns dict with dimension scores mapped to rubric keys.""" expected = json.loads(expected_json) if isinstance(expected_json, str) else expected_json rubric = json.loads(rubric_json) if isinstance(rubric_json, str) else rubric_json resp_lower = (response or '').lower() lines = response.strip().split('\n') # 1. Keyword coverage (generic) keyword_hits = sum(1 for kw in expected if kw.lower() in resp_lower) keyword_score = min(100, (keyword_hits / max(1, len(expected)) * 100)) # 2. Code presence has_code = '```' in response or 'function' in resp_lower or 'class ' in resp_lower or 'def ' in resp_lower code_score = 100 if has_code else 20 # 3. Structure (response depth) structure_score = min(100, max(10, len(lines) * 2)) # 4. Actionability (does it suggest fixes/actions?) actionability = 0 if any(w in resp_lower for w in ['fix', 'suggest', 'recommend', 'should', 'refactor', 'replace']): actionability = 85 elif any(w in resp_lower for w in ['use', 'add', 'remove', 'change', 'improve', 'consider']): actionability = 60 # 5. Depth (content length, capped) depth = min(100, len(response) / 40) # 6. Relevance (does response mention role-specific terms?) relevance = min(100, keyword_score * 0.8 + 20) # Map rubrics to actual computed scores via heuristics generic_scores = { 'keyword_coverage': round(keyword_score, 1), 'code_presence': code_score, 'structure': round(structure_score, 1), 'actionability': round(actionability, 1), 'depth': round(depth, 1), 'relevance': round(relevance, 1), # Rubric-specific mappings (fallback chain) 'security': max(keyword_score, code_score, actionability) if any(k in resp_lower for k in ['sql', 'inject', 'xss', 'csrf']) else round(keyword_score * 0.7, 1), 'logic': round(structure_score * 0.8, 1), 'correctness': round((code_score + keyword_score) / 2, 1), 'completeness': round((keyword_score + structure_score) / 2, 1), 'thoroughness': round((keyword_score + depth) / 2, 1), 'clarity': round(structure_score * 0.9, 1), 'coverage': keyword_score, 'edge_cases': round((keyword_score + depth) / 2, 1), 'readability': round(structure_score * 0.85, 1), 'mocking': code_score if 'mock' in resp_lower else round(code_score * 0.5, 1), 'plan_quality': round((keyword_score + structure_score) / 2, 1), 'agent_selection': keyword_score, 'risk_handling': actionability, 'budget_awareness': keyword_score, 'scalability': round(structure_score * 0.7, 1), 'optimization': actionability, } total = 0 if rubric: for dim, weight in rubric.items(): dim_score = generic_scores.get(dim, 50) total += (dim_score / 100) * weight else: total = sum(generic_scores.values()) / len(generic_scores) explanation = (f"Keywords: {keyword_hits}/{len(expected)}. " f"Lines: {len(lines)}. " f"Code: {'YES' if has_code else 'NO'}. " f"Total={round(total, 1)}") return {'scores': generic_scores, 'total': round(total, 1), 'explanation': explanation} # ================================================================ # PARALLEL BATCH EVALUATION # ================================================================ def evaluate_one(args): agent_name, model, pid, system, user, expected, rubric = args resp, latency, tokens = call_ollama(model, system, user, expected) ev = evaluate_response(resp, expected, rubric) is_error = not resp or resp.startswith('[') return { 'agent': agent_name, 'model': model, 'prompt_id': pid, 'response': resp, 'latency': latency, 'tokens': tokens, 'total': ev['total'], 'scores': json.dumps(ev['scores']), 'explanation': ev['explanation'], 'is_error': is_error } def _should_skip(agent_name, model): """Check if we already have a non-error evaluation for this agent × model.""" conn = sqlite3.connect(DB_PATH) row = conn.execute(''' SELECT total_score FROM evaluations WHERE agent_name = ? AND model = ? AND response IS NOT NULL AND response NOT LIKE '[%' AND LENGTH(response) > 0 LIMIT 1''', (agent_name, model)).fetchone() conn.close() return row[0] if row else None def evaluate_single(agent_name, model, conn=None): """Evaluate one agent × model. Reuses optional open connection.""" close_conn = False if conn is None: conn = sqlite3.connect(DB_PATH) close_conn = True prompts = conn.execute(''' SELECT id, system_prompt, user_prompt, expected_keywords, rubric FROM test_prompts WHERE agent_name = ? ''', (agent_name,)).fetchall() if close_conn: conn.close() results = [] for pid, sys, usr, exp, rub in prompts: res = evaluate_one((agent_name, model, pid, sys, usr, exp, rub)) if res.get('is_error'): print(f" [SKIP] {agent_name} × {model}: error response — {res['response'][:200]}") continue conn = sqlite3.connect(DB_PATH) conn.execute('''INSERT INTO evaluations (agent_name, model, prompt_id, response, latency_ms, tokens_prompt, tokens_response, scores, total_score, explanation, evaluated_at, evaluator) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''', (res['agent'], res['model'], res['prompt_id'], res['response'], res['latency'], res['tokens']['prompt'], res['tokens']['response'], res['scores'], res['total'], res['explanation'], datetime.now(timezone.utc).isoformat(), 'rubric_v1')) conn.commit() conn.close() print(f" [{res['agent']}] × [{res['model']}] score={res['total']:.1f}") results.append(res) return results def evaluate_all(models_to_test, max_workers=4, agent_filter=None): """Evaluate agents × models with parallel workers. Args: models_to_test: list of model name strings (e.g. ['kimi-k2.6', 'glm-5.1']) max_workers: thread pool size agent_filter: optional agent name to limit evaluation to one agent """ if isinstance(models_to_test, dict): print("[error] evaluate_all received a dict instead of a list. " "Use --evaluate-all --models m1,m2 for all agents, or pass a list.") sys.exit(1) conn = sqlite3.connect(DB_PATH) if agent_filter: agents = [(agent_filter,)] else: agents = conn.execute("SELECT DISTINCT name FROM agents").fetchall() tasks = [] for (agent_name,) in agents: for model in models_to_test: existing = _should_skip(agent_name, model) if existing is not None: print(f" Already evaluated: {agent_name} × {model} = {existing:.1f} (skipping)") continue prompts = conn.execute(''' SELECT id, system_prompt, user_prompt, expected_keywords, rubric FROM test_prompts WHERE agent_name = ?''', (agent_name,)).fetchall() for pid, sys, usr, exp, rub in prompts: for model in models_to_test: if _should_skip(agent_name, model) is None: tasks.append((agent_name, model, pid, sys, usr, exp, rub)) conn.close() print(f"[eval] Prepared {len(tasks)} evaluations (agents × models × prompts)") results = [] with ThreadPoolExecutor(max_workers=max_workers) as ex: futures = {ex.submit(evaluate_one, t): t for t in tasks} for future in as_completed(futures): res = future.result() if res.get('is_error'): print(f" [SKIP] {res['agent']} × {res['model']}: error response — {res['response'][:200]}") continue results.append(res) conn = sqlite3.connect(DB_PATH) conn.execute('''INSERT INTO evaluations (agent_name, model, prompt_id, response, latency_ms, tokens_prompt, tokens_response, scores, total_score, explanation, evaluated_at, evaluator) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''', (res['agent'], res['model'], res['prompt_id'], res['response'], res['latency'], res['tokens']['prompt'], res['tokens']['response'], res['scores'], res['total'], res['explanation'], datetime.now(timezone.utc).isoformat(), 'rubric_v1')) conn.commit() conn.close() print(f" [{res['agent']}] × [{res['model']}] score={res['total']:.1f}") print(f"[eval] Stored {len(results)} evaluations") compute_aggregates() def compute_aggregates(): """Compute per-agent model fit scores from evaluation averages.""" conn = sqlite3.connect(DB_PATH) rows = conn.execute(''' SELECT agent_name, model, AVG(total_score) as avg_score FROM evaluations GROUP BY agent_name, model ''').fetchall() # For each agent pick best model best = {} for a, m, s in rows: if a not in best or s > best[a][1]: best[a] = (m, s) for a, (m, s) in best.items(): # Get dimension breakdown dims = conn.execute(''' SELECT scores FROM evaluations WHERE agent_name = ? AND model = ? ''', (a, m)).fetchall() dim_avg = {} for (score_json,) in dims: for k, v in json.loads(score_json).items(): dim_avg[k] = dim_avg.get(k, 0) + v dim_avg = {k: round(v / len(dims), 1) for k, v in dim_avg.items()} explanation = f"Best model for {a} is {m} with avg score {round(s,1)}. " explanation += f"Strongest dimension: {max(dim_avg, key=dim_avg.get)}." conn.execute('''INSERT OR REPLACE INTO fit_scores (agent_name, model, fit_score, dimension_scores, explanation, evaluated_at) VALUES (?, ?, ?, ?, ?, ?)''', (a, m, round(s, 1), json.dumps(dim_avg), explanation, datetime.now(timezone.utc).isoformat())) conn.commit() conn.close() print(f"[agg] Computed fit scores for {len(best)} agents") # ================================================================ # RECALCULATION TRIGGER # ================================================================ def trigger_recalculation(agent_name, old_model, new_model, reason="manual"): """After model or prompt change, re-evaluate and log delta.""" conn = sqlite3.connect(DB_PATH) old_row = conn.execute('''SELECT fit_score FROM fit_scores WHERE agent_name = ?''', (agent_name,)).fetchone() old_fit = old_row[0] if old_row else 0 # Re-evaluate on new model prompt = conn.execute('''SELECT system_prompt, user_prompt, expected_keywords, rubric FROM test_prompts WHERE agent_name = ? LIMIT 1''', (agent_name,)).fetchone() if prompt: sys, usr, exp, rub = prompt resp, lat, tok = call_ollama(new_model, sys, usr) ev = evaluate_response(resp, exp, rub) new_fit = ev['total'] else: new_fit = 0 delta = new_fit - old_fit conn.execute('''INSERT INTO recalculations (trigger, agent_name, old_model, new_model, old_fit, new_fit, delta, reason, recalculated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)''', (reason, agent_name, old_model, new_model, old_fit, new_fit, delta, reason, datetime.now(timezone.utc).isoformat())) conn.commit() conn.close() print(f"[recalc] {agent_name}: {old_model}({old_fit:.1f}) → {new_model}({new_fit:.1f}) Δ={delta:+.1f}") return delta # ================================================================ # REPORT / DASHBOARD DATA # ================================================================ def generate_report(): conn = sqlite3.connect(DB_PATH) # All evaluations per agent per model rows = conn.execute(''' SELECT agent_name, model, AVG(total_score) as avg_score, COUNT(*) as cnt FROM evaluations GROUP BY agent_name, model ''').fetchall() agents = {} for a, m, s, c in rows: if a not in agents: info = conn.execute('SELECT description, category, current_model FROM agents WHERE name = ?', (a,)).fetchone() agents[a] = {'name': a, 'evaluations': {}, 'info': info or ()} agents[a]['evaluations'][m] = round(s, 1) # Best per agent for a in agents: evs = agents[a]['evaluations'] best_m = max(evs, key=evs.get) agents[a]['best_model'] = best_m agents[a]['best_score'] = evs[best_m] # Fit scores table fit_rows = conn.execute('SELECT agent_name, model, fit_score, explanation FROM fit_scores').fetchall() fit_scores = {} for a, m, s, e in fit_rows: fit_scores[a] = {'model': m, 'fit': s, 'explanation': e} report = { 'generated': datetime.now(timezone.utc).isoformat(), 'source': 'real-fit-engine', 'total_evaluations': len(rows), 'agents': agents, 'fit_scores': fit_scores } out = os.environ.get('REPORT_PATH', 'agent-evolution/data/real-fit-report.json') out_dir = os.path.dirname(out) if out_dir: os.makedirs(out_dir, exist_ok=True) with open(out, 'w') as f: json.dump(report, f, ensure_ascii=False, indent=2) conn.close() print(f"[report] Written {out}: {len(agents)} agents, {len(rows)} evaluations") return report # ================================================================ # IMPORT REAL DATA # ================================================================ def import_from_evolution(): evo_path = os.environ.get('EVOLUTION_PATH', 'agent-evolution/data/evolution.json') with open(evo_path) as f: evo = json.load(f) conn = sqlite3.connect(DB_PATH) for name, a in evo['agents'].items(): c = a['current'] conn.execute('''INSERT OR REPLACE INTO agents (name, description, category, current_model, color, updated) VALUES (?, ?, ?, ?, ?, ?)''', (name, c.get('description', ''), c.get('category', 'General'), c.get('model', ''), c.get('color', ''), datetime.now(timezone.utc).isoformat())) for mid, m in evo.get('model_benchmarks', {}).items(): conn.execute('''INSERT OR REPLACE INTO models (short_name, full_id, if_score, swe_bench, parameters, context_window, updated) VALUES (?, ?, ?, ?, ?, ?, ?)''', (mid, f'ollama-cloud/{mid}', m.get('if_score'), None, m.get('parameters', ''), m.get('context_window', ''), datetime.now(timezone.utc).isoformat())) conn.commit() conn.close() print(f"[import] {len(evo['agents'])} agents, {len(evo.get('model_benchmarks',{}))} models") # ================================================================ # CLI # ================================================================ if __name__ == '__main__': import argparse p = argparse.ArgumentParser(description='Real-Fit Multi-Agent Engine') p.add_argument('--init-db', action='store_true') p.add_argument('--import-evolution', action='store_true') p.add_argument('--generate-prompts', action='store_true') p.add_argument('--evaluate', metavar='AGENT') p.add_argument('--models', default=','.join(DEFAULT_MODELS)) p.add_argument('--evaluate-all', action='store_true') p.add_argument('--report', action='store_true') p.add_argument('--recalc', action='store_true') p.add_argument('--agent', help='Agent for recalc') p.add_argument('--old-model', help='Old model for recalc') p.add_argument('--new-model', help='New model for recalc') p.add_argument('--workers', type=int, default=4) args = p.parse_args() if args.init_db: init_db() if args.import_evolution: import_from_evolution() if args.generate_prompts: generate_prompts() if args.evaluate: models = args.models.split(',') for model in models: existing = _should_skip(args.evaluate, model) if existing is not None: print(f"Already evaluated: {args.evaluate} x {model} = {existing:.1f} (skipping)") continue evaluate_single(args.evaluate, model) if args.evaluate_all: models = args.models.split(',') evaluate_all(models, args.workers) if args.report: generate_report() if args.recalc and args.agent and args.old_model and args.new_model: trigger_recalculation(args.agent, args.old_model, args.new_model) if len(sys.argv) == 1: p.print_help() print("\n=== Workflow ===") print(" python3 real-fit-engine.py --init-db --import-evolution --generate-prompts") print(" python3 real-fit-engine.py --evaluate-all --models kimi-k2.6,deepseek-v4-pro") print(" python3 real-fit-engine.py --report") print(" python3 real-fit-engine.py --recalc --agent lead-developer --old-model qwen3-coder:480b --new-model kimi-k2.6") print("\nSet OLLAMA_MOCK=0 for real Ollama API (port 11434)")