Files
APAW/scripts/real-fit-engine.py
Deploy Bot dbbf4c32e1 feat(landing): add state API service with real-fit score drill-down
- Add apaw-state-api Flask service (landing/api/server.py) that serves
  agent fit scores, best models, and explanations from real-fit.db
- Add nginx proxy rule: /api/state → apaw-state-api:8080
- Add fit-score drill-down modal (click heatmap cell → score breakdown
  + explanation) in api.js, styles.css, and index.html
- Add real-fit-recalc.py script for offline score recalculation from
  stored SQLite responses
- Add real-fit-engine.py (evaluation engine) and sync-dashboard-data.py
- Add Dockerfile ENTRYPOINT + entrypoint.sh for landing container
- Add docker-compose.ollama.yml for local Ollama inference
- Update kilo.jsonc command models and agent-versions.json
- Regenerate index.standalone.html with latest dashboard data
- Add .gitignore entries for __pycache__, runtime data, and backups
2026-05-27 19:53:40 +01:00

566 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Real-Fit Multi-Agent Evaluation Engine (sync/stdlib version — no external deps)
SQLite-backed pipeline that evaluates agent-role × model fit via Ollama API.
Usage:
python3 real-fit-engine.py --init-db --import-evolution --generate-prompts
python3 real-fit-engine.py --evaluate-all --models kimi-k2.6,deepseek-v4-pro-max
python3 real-fit-engine.py --report
python3 real-fit-engine.py --recalc --agent lead-developer --old-model qwen3-coder:480b --new-model kimi-k2.6
Configuration:
OLLAMA_HOST (default: http://localhost:11434)
"""
import sqlite3, json, os, sys, re, time
from glob import glob
from datetime import datetime, timezone
from urllib import request, error as urllib_error
from concurrent.futures import ThreadPoolExecutor, as_completed
DB_PATH = "agent-evolution/data/real-fit.db"
OLLAMA_HOST = os.environ.get("OLLAMA_HOST", "https://api.ollama.com")
OLLAMA_KEY = os.environ.get("OLLAMA_KEY", "")
USE_MOCK = os.environ.get("OLLAMA_MOCK", "0") == "1" # Default to REAL for this env
DEFAULT_MODELS = ["kimi-k2.6", "deepseek-v4-pro-max", "deepseek-v4-flash",
"glm-5.1", "qwen3-coder:480b", "qwen3.5-122b"]
# ================================================================
# SCHEMA
# ================================================================
SCHEMA = """
CREATE TABLE IF NOT EXISTS agents (
name TEXT PRIMARY KEY,
description TEXT,
category TEXT,
current_model TEXT,
color TEXT,
updated TEXT
);
CREATE TABLE IF NOT EXISTS models (
short_name TEXT PRIMARY KEY,
full_id TEXT,
if_score REAL,
swe_bench REAL,
parameters TEXT,
context_window TEXT,
updated TEXT
);
CREATE TABLE IF NOT EXISTS test_prompts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_name TEXT,
task_type TEXT,
system_prompt TEXT,
user_prompt TEXT,
expected_keywords TEXT,
rubric TEXT
);
CREATE TABLE IF NOT EXISTS evaluations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_name TEXT,
model TEXT,
prompt_id INTEGER,
response TEXT,
latency_ms INTEGER,
tokens_prompt INTEGER,
tokens_response INTEGER,
scores TEXT,
total_score REAL,
explanation TEXT,
evaluated_at TEXT,
evaluator TEXT
);
CREATE TABLE IF NOT EXISTS recalculations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
trigger TEXT,
agent_name TEXT,
old_model TEXT,
new_model TEXT,
old_fit REAL,
new_fit REAL,
delta REAL,
reason TEXT,
recalculated_at TEXT
);
CREATE TABLE IF NOT EXISTS fit_scores (
agent_name TEXT PRIMARY KEY,
model TEXT,
fit_score REAL,
dimension_scores TEXT,
explanation TEXT,
evaluated_at TEXT,
FOREIGN KEY (agent_name) REFERENCES agents(name)
);
CREATE INDEX IF NOT EXISTS idx_eval_agent_model ON evaluations(agent_name, model);
CREATE INDEX IF NOT EXISTS idx_recalc_agent ON recalculations(agent_name);
"""
def init_db():
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.executescript(SCHEMA)
conn.commit()
conn.close()
print(f"[db] Initialized schema in {DB_PATH}")
# ================================================================
# PROMPT GENERATOR
# ================================================================
def parse_frontmatter(path):
try:
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
except:
return {}
if not content.startswith('---'):
return {}
end = content.find('---', 3)
if end == -1:
return {}
data = {}
for line in content[3:end].strip().split('\n'):
m = re.match(r'^(\w+):\s*(.+)$', line)
if m:
data[m.group(1)] = m.group(2).strip()
body = content[end+3:][:800]
data['_body_snippet'] = body.replace('\n', ' ').strip()[:300]
return data
TASK_LIBRARY = {
'code-skeptic': {
'system': 'You are a strict code reviewer. Find security issues, logic errors, anti-patterns. Be adversarial but constructive.',
'task': '''Review this function for security vulnerabilities and logic errors. Report: SQL injection, XSS, race conditions, code smells, and suggested fixes.
```typescript
function processPayment(userId, amount, cardToken) {
const q = `UPDATE users SET balance = balance - ${amount} WHERE id = ${userId}`;
db.exec(q);
fetch('/api/charge', { body: JSON.stringify({ cardToken, amount }) });
if (Math.random() > 0.9) { throw new Error('timeout'); }
}
```''',
'expected': ['sql injection', 'parameterized', 'race', 'localStorage', 'xss'],
'rubric': {'security': 35, 'logic': 25, 'actionability': 25, 'depth': 15}
},
'workflow-cross-checker': {
'system': 'You are a workflow cross-checker. Before any work begins, ask uncomfortable but important questions that could block the task.',
'task': 'A developer wants to add "admin can delete any user" directly from the UI. Run your cross-check protocol. Identify 5+ potential issues or blockers.',
'expected': ['soft delete', 'audit log', 'cascading', 'permission', 'data retention', 'backup'],
'rubric': {'thoroughness': 35, 'relevance': 30, 'actionability': 20, 'severity_ranking': 15}
},
'lead-developer': {
'system': 'You are lead developer. Write production-ready implementation. Tests MUST pass. Follow SOLID. Max 100 lines per file.',
'task': 'Implement a TaskQueue class with: transaction support, retry with exponential backoff, timeout handling, and Jest tests. TypeScript.',
'expected': ['class TaskQueue', 'async', 'retry', 'timeout', 'test', 'jest'],
'rubric': {'correctness': 30, 'test_coverage': 30, 'code_quality': 25, 'edge_cases': 15}
},
'sdet-engineer': {
'system': 'You are SDET. Write tests BEFORE code. Cover edge cases, nulls, async errors, concurrent access.',
'task': 'Write Jest tests for UserService: createUser, getUser, updateUser, deleteUser. Cover: valid inputs, nulls, duplicates, concurrent updates.',
'expected': ['describe', 'it', 'expect', 'null', 'async', 'mock', 'beforeEach'],
'rubric': {'coverage': 35, 'edge_cases': 30, 'readability': 20, 'mocking': 15}
},
'orchestrator': {
'system': 'You are an Orchestrator. You delegate tasks to subagents. You decide routing, handle errors, and manage budgets.',
'task': 'A user reports: "Build a REST API for ecommerce checkout". Design your delegation plan: which agents to call, in what order, what to do if one fails.',
'expected': ['system-analyst', 'lead-developer', 'code-skeptic', 'sdet-engineer', 'budget', 'parallel'],
'rubric': {'plan_quality': 30, 'agent_selection': 25, 'risk_handling': 25, 'budget_awareness': 20}
},
'system-analyst': {
'system': 'You design technical specifications, data schemas, and API contracts before implementation.',
'task': 'Design the API contract and DB schema for a multi-tenant SaaS billing system. Include rate limiting, audit trails, and idempotency.',
'expected': ['openapi', 'schema', 'idempotency', 'rate limit', 'audit', 'tenant'],
'rubric': {'completeness': 30, 'correctness': 30, 'clarity': 20, 'scalability': 20}
},
'devops-engineer': {
'system': 'You handle Docker, CI/CD, infrastructure. Security first.',
'task': 'Write a multi-stage Dockerfile for a Node.js Next.js app. Include: non-root user, health check, security scan, .dockerignore best practices.',
'expected': ['FROM node', 'USER', 'HEALTHCHECK', 'multi-stage', '.dockerignore'],
'rubric': {'security': 30, 'optimization': 25, 'correctness': 25, 'completeness': 20}
}
}
def generate_task_for_agent(name, role):
n, r = name.lower(), role.lower()
for key, task in TASK_LIBRARY.items():
if key in n:
return task
# Keyword fallback
for key in TASK_LIBRARY:
if key.replace('-', ' ') in r or any(kw in r for kw in key.split('-')):
return TASK_LIBRARY[key]
return {
'system': f'You are {name}. {role}',
'task': f'Demonstrate your expertise as {name} in a realistic complex scenario. Provide a complete working solution.',
'expected': [name.replace('-', ' ')],
'rubric': {'relevance': 40, 'completeness': 30, 'correctness': 30}
}
def generate_prompts():
conn = sqlite3.connect(DB_PATH)
conn.execute("DELETE FROM test_prompts")
count = 0
for path in sorted(glob('.kilo/agents/*.md')):
fm = parse_frontmatter(path)
if not fm.get('model'):
continue
name = os.path.basename(path)[:-3]
task = generate_task_for_agent(name, fm.get('description', ''))
if task:
conn.execute('''
INSERT INTO test_prompts (agent_name, task_type, system_prompt, user_prompt, expected_keywords, rubric)
VALUES (?, ?, ?, ?, ?, ?)
''', (name, 'primary', task['system'], task['task'],
json.dumps(task['expected']), json.dumps(task['rubric'])))
count += 1
conn.commit()
conn.close()
print(f"[prompts] Generated {count} test prompts")
# ================================================================
# OLLAMA CLIENT
# ================================================================
def call_ollama(model_short, system_prompt, user_prompt, expected_keywords=None):
"""REAL Ollama API call via /api/chat. Returns (text, latency_ms, tokens_dict)."""
if USE_MOCK:
return (
"[MOCK] This is a simulated response for testing the pipeline without API calls.",
500, {"prompt": 100, "response": 200}
)
model_map = {
'kimi-k2.6': 'kimi-k2.6',
'deepseek-v4-pro-max': 'deepseek-v4-pro',
'deepseek-v4-flash': 'deepseek-v4-flash',
'glm-5.1': 'glm-5.1',
'qwen3-coder:480b': 'qwen3-coder:480b',
'qwen3.5-122b': 'kimi-k2.6', # fallback to known working model
}
model_ollama = model_map.get(model_short, model_short)
payload = json.dumps({
"model": model_ollama,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
"stream": False,
"options": {"temperature": 0.3, "num_predict": 2048}
}).encode('utf-8')
headers = {"Content-Type": "application/json"}
if OLLAMA_KEY:
headers["Authorization"] = f"Bearer {OLLAMA_KEY}"
req = request.Request(f"{OLLAMA_HOST}/api/chat",
data=payload, headers=headers,
method='POST')
start = time.time()
try:
with request.urlopen(req, timeout=120) as resp:
elapsed = int((time.time() - start) * 1000)
data = json.loads(resp.read().decode('utf-8'))
text = data.get('message', {}).get('content', '')
return (text, elapsed,
{"prompt": data.get('prompt_eval_count', 0),
"response": data.get('eval_count', 0)})
except urllib_error.HTTPError as e:
return (f"[HTTP {e.code}: {e.reason}]", int((time.time()-start)*1000), {"prompt":0,"response":0})
except Exception as e:
return (f"[ERROR: {e}]", 0, {"prompt":0,"response":0})
# ================================================================
# EVALUATOR
# ================================================================
def evaluate_response(response, expected_json, rubric_json):
"""Rubric-based evaluation. Returns dict."""
expected = json.loads(expected_json) if isinstance(expected_json, str) else expected_json
rubric = json.loads(rubric_json) if isinstance(rubric_json, str) else rubric_json
resp_lower = (response or '').lower()
lines = response.strip().split('\n')
keyword_hits = sum(1 for kw in expected if kw.lower() in resp_lower)
keyword_score = min(100, (keyword_hits / len(expected) * 100) if expected else 50)
has_code = '```' in response or 'function' in resp_lower or 'class ' in resp_lower
code_score = 80 if has_code else 30
structure_score = min(100, len(lines) * 2) # ~50 lines = 100
scores = {'keyword_coverage': round(keyword_score, 1),
'code_presence': code_score,
'structure': round(structure_score, 1)}
total = 0
if rubric:
for dim, weight in rubric.items():
dim_score = scores.get(dim, keyword_score)
total += (dim_score / 100) * weight
else:
total = sum(scores.values()) / len(scores)
explanation = (f"Keywords: {keyword_hits}/{len(expected)}. "
f"Lines: {len(lines)}. "
f"Code: {'YES' if has_code else 'NO'}. "
f"Total={round(total, 1)}")
return {'scores': scores, 'total': round(total, 1), 'explanation': explanation}
# ================================================================
# PARALLEL BATCH EVALUATION
# ================================================================
def evaluate_one(args):
agent_name, model, pid, system, user, expected, rubric = args
resp, latency, tokens = call_ollama(model, system, user, expected)
ev = evaluate_response(resp, expected, rubric)
return {
'agent': agent_name, 'model': model, 'prompt_id': pid,
'response': resp, 'latency': latency, 'tokens': tokens,
'total': ev['total'], 'scores': json.dumps(ev['scores']),
'explanation': ev['explanation']
}
def evaluate_all(models_to_test, max_workers=4):
"""Evaluate all agents × all models with parallel workers."""
conn = sqlite3.connect(DB_PATH)
agents = conn.execute("SELECT DISTINCT name FROM agents").fetchall()
tasks = []
for (agent_name,) in agents:
prompts = conn.execute('''
SELECT id, system_prompt, user_prompt, expected_keywords, rubric
FROM test_prompts WHERE agent_name = ?''', (agent_name,)).fetchall()
for pid, sys, usr, exp, rub in prompts:
for model in models_to_test:
tasks.append((agent_name, model, pid, sys, usr, exp, rub))
conn.close()
print(f"[eval] Prepared {len(tasks)} evaluations (agents × models × prompts)")
results = []
with ThreadPoolExecutor(max_workers=max_workers) as ex:
futures = {ex.submit(evaluate_one, t): t for t in tasks}
for future in as_completed(futures):
res = future.result()
results.append(res)
conn = sqlite3.connect(DB_PATH)
conn.execute('''INSERT INTO evaluations
(agent_name, model, prompt_id, response, latency_ms, tokens_prompt, tokens_response,
scores, total_score, explanation, evaluated_at, evaluator)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''',
(res['agent'], res['model'], res['prompt_id'], res['response'], res['latency'],
res['tokens']['prompt'], res['tokens']['response'],
res['scores'], res['total'], res['explanation'],
datetime.now(timezone.utc).isoformat(), 'rubric_v1'))
conn.commit()
conn.close()
print(f" [{res['agent']}] × [{res['model']}] score={res['total']:.1f}")
print(f"[eval] Stored {len(results)} evaluations")
compute_aggregates()
def compute_aggregates():
"""Compute per-agent model fit scores from evaluation averages."""
conn = sqlite3.connect(DB_PATH)
rows = conn.execute('''
SELECT agent_name, model, AVG(total_score) as avg_score
FROM evaluations GROUP BY agent_name, model
''').fetchall()
# For each agent pick best model
best = {}
for a, m, s in rows:
if a not in best or s > best[a][1]:
best[a] = (m, s)
for a, (m, s) in best.items():
# Get dimension breakdown
dims = conn.execute('''
SELECT scores FROM evaluations WHERE agent_name = ? AND model = ?
''', (a, m)).fetchall()
dim_avg = {}
for (score_json,) in dims:
for k, v in json.loads(score_json).items():
dim_avg[k] = dim_avg.get(k, 0) + v
dim_avg = {k: round(v / len(dims), 1) for k, v in dim_avg.items()}
explanation = f"Best model for {a} is {m} with avg score {round(s,1)}. "
explanation += f"Strongest dimension: {max(dim_avg, key=dim_avg.get)}."
conn.execute('''INSERT OR REPLACE INTO fit_scores
(agent_name, model, fit_score, dimension_scores, explanation, evaluated_at)
VALUES (?, ?, ?, ?, ?, ?)''',
(a, m, round(s, 1), json.dumps(dim_avg), explanation,
datetime.now(timezone.utc).isoformat()))
conn.commit()
conn.close()
print(f"[agg] Computed fit scores for {len(best)} agents")
# ================================================================
# RECALCULATION TRIGGER
# ================================================================
def trigger_recalculation(agent_name, old_model, new_model, reason="manual"):
"""After model or prompt change, re-evaluate and log delta."""
conn = sqlite3.connect(DB_PATH)
old_row = conn.execute('''SELECT fit_score FROM fit_scores WHERE agent_name = ?''', (agent_name,)).fetchone()
old_fit = old_row[0] if old_row else 0
# Re-evaluate on new model
prompt = conn.execute('''SELECT system_prompt, user_prompt, expected_keywords, rubric
FROM test_prompts WHERE agent_name = ? LIMIT 1''', (agent_name,)).fetchone()
if prompt:
sys, usr, exp, rub = prompt
resp, lat, tok = call_ollama(new_model, sys, usr)
ev = evaluate_response(resp, exp, rub)
new_fit = ev['total']
else:
new_fit = 0
delta = new_fit - old_fit
conn.execute('''INSERT INTO recalculations
(trigger, agent_name, old_model, new_model, old_fit, new_fit, delta, reason, recalculated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(reason, agent_name, old_model, new_model, old_fit, new_fit, delta, reason,
datetime.now(timezone.utc).isoformat()))
conn.commit()
conn.close()
print(f"[recalc] {agent_name}: {old_model}({old_fit:.1f}) → {new_model}({new_fit:.1f}) Δ={delta:+.1f}")
return delta
# ================================================================
# REPORT / DASHBOARD DATA
# ================================================================
def generate_report():
conn = sqlite3.connect(DB_PATH)
# All evaluations per agent per model
rows = conn.execute('''
SELECT agent_name, model, AVG(total_score) as avg_score, COUNT(*) as cnt
FROM evaluations GROUP BY agent_name, model
''').fetchall()
agents = {}
for a, m, s, c in rows:
if a not in agents:
info = conn.execute('SELECT description, category, current_model FROM agents WHERE name = ?', (a,)).fetchone()
agents[a] = {'name': a, 'evaluations': {}, 'info': info or ()}
agents[a]['evaluations'][m] = round(s, 1)
# Best per agent
for a in agents:
evs = agents[a]['evaluations']
best_m = max(evs, key=evs.get)
agents[a]['best_model'] = best_m
agents[a]['best_score'] = evs[best_m]
# Fit scores table
fit_rows = conn.execute('SELECT agent_name, model, fit_score, explanation FROM fit_scores').fetchall()
fit_scores = {}
for a, m, s, e in fit_rows:
fit_scores[a] = {'model': m, 'fit': s, 'explanation': e}
report = {
'generated': datetime.now(timezone.utc).isoformat(),
'source': 'real-fit-engine',
'total_evaluations': len(rows),
'agents': agents,
'fit_scores': fit_scores
}
out = 'agent-evolution/data/real-fit-report.json'
with open(out, 'w') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
conn.close()
print(f"[report] Written {out}: {len(agents)} agents, {len(rows)} evaluations")
return report
# ================================================================
# IMPORT REAL DATA
# ================================================================
def import_from_evolution():
with open('agent-evolution/data/evolution.json') as f:
evo = json.load(f)
conn = sqlite3.connect(DB_PATH)
for name, a in evo['agents'].items():
c = a['current']
conn.execute('''INSERT OR REPLACE INTO agents (name, description, category, current_model, color, updated)
VALUES (?, ?, ?, ?, ?, ?)''',
(name, c.get('description', ''), c.get('category', 'General'),
c.get('model', ''), c.get('color', ''),
datetime.now(timezone.utc).isoformat()))
for mid, m in evo.get('model_benchmarks', {}).items():
conn.execute('''INSERT OR REPLACE INTO models (short_name, full_id, if_score, swe_bench, parameters, context_window, updated)
VALUES (?, ?, ?, ?, ?, ?, ?)''',
(mid, f'ollama-cloud/{mid}', m.get('if_score'), None,
m.get('parameters', ''), m.get('context_window', ''),
datetime.now(timezone.utc).isoformat()))
conn.commit()
conn.close()
print(f"[import] {len(evo['agents'])} agents, {len(evo.get('model_benchmarks',{}))} models")
# ================================================================
# CLI
# ================================================================
if __name__ == '__main__':
import argparse
p = argparse.ArgumentParser(description='Real-Fit Multi-Agent Engine')
p.add_argument('--init-db', action='store_true')
p.add_argument('--import-evolution', action='store_true')
p.add_argument('--generate-prompts', action='store_true')
p.add_argument('--evaluate', metavar='AGENT')
p.add_argument('--models', default=','.join(DEFAULT_MODELS))
p.add_argument('--evaluate-all', action='store_true')
p.add_argument('--report', action='store_true')
p.add_argument('--recalc', action='store_true')
p.add_argument('--agent', help='Agent for recalc')
p.add_argument('--old-model', help='Old model for recalc')
p.add_argument('--new-model', help='New model for recalc')
p.add_argument('--workers', type=int, default=4)
args = p.parse_args()
if args.init_db:
init_db()
if args.import_evolution:
import_from_evolution()
if args.generate_prompts:
generate_prompts()
if args.evaluate:
models = args.models.split(',')
evaluate_all({args.evaluate: models}, args.workers)
if args.evaluate_all:
models = args.models.split(',')
evaluate_all(models, args.workers)
if args.report:
generate_report()
if args.recalc and args.agent and args.old_model and args.new_model:
trigger_recalculation(args.agent, args.old_model, args.new_model)
if len(sys.argv) == 1:
p.print_help()
print("\n=== Workflow ===")
print(" python3 real-fit-engine.py --init-db --import-evolution --generate-prompts")
print(" python3 real-fit-engine.py --evaluate-all --models kimi-k2.6,deepseek-v4-pro-max")
print(" python3 real-fit-engine.py --report")
print(" python3 real-fit-engine.py --recalc --agent lead-developer --old-model qwen3-coder:480b --new-model kimi-k2.6")
print("\nSet OLLAMA_MOCK=0 for real Ollama API (port 11434)")