Files
APAW/src/kilocode/agent-manager/pipeline-runner.ts
NW f5966db155 feat(gns2): integrate HybridGiteaClient into PollingSupervisor
- PollingSupervisor now uses HybridGiteaClient (MCP primary, REST fallback)
- Added mcpUrl to PipelineConfig
- Supervisor calls initialize() to detect MCP vs REST mode automatically

Refs: Milestone #67, Issue #107
2026-05-08 22:35:21 +01:00

299 lines
9.2 KiB
TypeScript

// kilocode_change - integrated module
// Pipeline runner - GNS-2 Polling Supervisor for distributed agent workflow
import type { AgentRole } from "./index"
import {
GiteaClient,
logPipelineStep,
logAgentPerformance,
detectRepository
} from "./gitea-client"
import { HybridGiteaClient } from "./mcp-gitea-client"
export interface PipelineConfig {
giteaToken?: string
giteaApiUrl?: string
efficiencyThreshold?: number
autoLog?: boolean
pollIntervalMs?: number
}
export interface PipelineRunOptions {
issueNumber: number
milestone?: number
}
export interface PipelineResult {
success: boolean
finalAgent: string | null
finalStatus: string
agentsUsed: string[]
totalSteps: number
errors: string[]
}
export class PollingSupervisor {
private client: HybridGiteaClient
private efficiencyThreshold: number
private autoLog: boolean
private initialized: boolean = false
private pollInterval: number
constructor(config: PipelineConfig = {}) {
// Use Hybrid client: MCP first, REST fallback
this.client = new HybridGiteaClient({
mcpUrl: config.mcpUrl, // NEW: MCP server URL
restConfig: {
token: config.giteaToken,
apiUrl: config.giteaApiUrl,
}
})
this.efficiencyThreshold = config.efficiencyThreshold ?? 7
this.autoLog = config.autoLog ?? true
this.pollInterval = config.pollIntervalMs ?? 30000 // 30 seconds
}
async initialize(): Promise<void> {
if (this.initialized) return
const { owner, repo } = await detectRepository()
// Hybrid client handles both MCP and REST
this.client.setRepository(owner, repo)
await this.client.initialize() // Initialize MCP with fallback
this.initialized = true
}
/**
* GNS-2 Polling Supervisor
*
* Instead of actively dispatching agents in a while-loop,
* the supervisor periodically polls Gitea for issues that
* need attention based on labels, assignees, and comments.
*/
async supervise(options: PipelineRunOptions): Promise<PipelineResult> {
await this.initialize()
const agentsUsed: string[] = []
const errors: string[] = []
let steps = 0
const maxSteps = 100 // Safety limit
// Main polling loop
while (steps < maxSteps) {
steps++
// Check if issue is locked (circuit breaker)
const isLocked = await this.client.isLocked(options.issueNumber)
if (isLocked) {
await this.logEvent(options.issueNumber, '🔒', 'Issue locked by circuit breaker. Manual review required.')
return {
success: false,
finalAgent: null,
finalStatus: 'blocked',
agentsUsed,
totalSteps: steps,
errors: [...errors, 'Issue locked by circuit breaker']
}
}
// Get current issue state
const issue = await this.client.getIssue(options.issueNumber)
const checkpoint = await this.client.getCheckpoint(options.issueNumber)
const lastEvent = await this.client.getLastGNSEvent(options.issueNumber)
// Check if workflow is complete
if (issue.state === 'closed') {
return {
success: errors.length === 0,
finalAgent: lastEvent?.agent || null,
finalStatus: 'completed',
agentsUsed,
totalSteps: steps,
errors,
}
}
// Check budget exhaustion
if (checkpoint?.budget?.remaining !== undefined && checkpoint.budget.remaining <= 0) {
await this.client.addLabels(options.issueNumber, ['budget::exhausted'])
await this.client.lockIssue(options.issueNumber)
await this.logEvent(options.issueNumber, '💰', 'Budget exhausted. Issue locked.')
return {
success: false,
finalAgent: lastEvent?.agent || null,
finalStatus: 'budget_exhausted',
agentsUsed,
totalSteps: steps,
errors: [...errors, 'Budget exhausted']
}
}
// Determine next action based on issue state
const nextAction = await this.determineNextAction(issue, checkpoint, lastEvent)
if (nextAction.type === 'invoke_agent') {
const agentName = nextAction.agent!
if (!agentsUsed.includes(agentName)) {
agentsUsed.push(agentName)
}
await this.logEvent(
options.issueNumber,
'🚀',
`Invoking ${agentName} (depth: ${checkpoint?.depth || 0}, budget: ${checkpoint?.budget?.remaining || 'unknown'})`
)
// Update assignee to target agent
await this.client.setAssignee(options.issueNumber, agentName)
// In GNS-2, the agent itself will read the issue and act
// The supervisor just marks that the agent has been triggered
// The agent should respond by posting a comment
} else if (nextAction.type === 'wait') {
// Wait for agent to respond
await new Promise(resolve => setTimeout(resolve, this.pollInterval))
continue
} else if (nextAction.type === 'stuck') {
// Issue hasn't been updated in a while
await this.logEvent(options.issueNumber, '⏰', 'Process appears stuck. Last activity older than threshold.')
errors.push('Process stuck')
} else if (nextAction.type === 'complete') {
return {
success: errors.length === 0,
finalAgent: lastEvent?.agent || null,
finalStatus: 'completed',
agentsUsed,
totalSteps: steps,
errors,
}
}
// Wait before next poll
await new Promise(resolve => setTimeout(resolve, this.pollInterval))
}
return {
success: false,
finalAgent: null,
finalStatus: 'max_steps_reached',
agentsUsed,
totalSteps: steps,
errors: [...errors, `Max steps (${maxSteps}) reached`],
}
}
/**
* Determine what to do next based on issue state
*/
private async determineNextAction(
issue: any,
checkpoint: any | null,
lastEvent: any | null
): Promise<{ type: 'invoke_agent' | 'wait' | 'stuck' | 'complete'; agent?: string }> {
const now = new Date()
const lastUpdated = new Date(issue.updated_at)
const minutesSinceUpdate = (now.getTime() - lastUpdated.getTime()) / 60000
// If issue was just updated and it's not by the supervisor, wait
if (minutesSinceUpdate < 1) {
return { type: 'wait' }
}
// If no checkpoint exists, this is a new issue
if (!checkpoint) {
return { type: 'invoke_agent', agent: 'requirement-refiner' }
}
// If last event specifies next_agent, invoke them
if (lastEvent?.next_agent) {
// Check if next agent has already responded
const comments = await this.client.getComments(issue.number)
const hasResponded = comments.some(
c => c.user?.login === lastEvent.next_agent ||
c.body.includes(`## 🔄 ${lastEvent.next_agent}`)
)
if (!hasResponded) {
return { type: 'invoke_agent', agent: lastEvent.next_agent }
}
}
// Check status labels for routing
const statusLabels = issue.labels.filter((l: any) => l.name.startsWith('status::'))
const status = statusLabels[0]?.name.replace('status::', '') || 'new'
// Map status to agent (fallback when checkpoint/event doesn't specify)
const statusToAgent: Record<string, string> = {
'new': 'requirement-refiner',
'planned': 'history-miner',
'researching': 'system-analyst',
'designed': 'sdet-engineer',
'testing': 'lead-developer',
'implementing': 'code-skeptic',
'reviewing': 'performance-engineer',
'fixing': 'the-fixer',
'releasing': 'release-manager',
'evaluated': 'evaluator',
'completed': 'orchestrator',
}
const nextAgent = statusToAgent[status]
if (nextAgent && status !== 'completed') {
return { type: 'invoke_agent', agent: nextAgent }
}
// If completed or no next agent, mark as complete
if (status === 'completed') {
return { type: 'complete' }
}
// If stuck for more than 10 minutes
if (minutesSinceUpdate > 10) {
return { type: 'stuck' }
}
return { type: 'wait' }
}
/**
* Poll multiple issues for a milestone
*/
async superviseMilestone(milestoneId: number): Promise<PipelineResult[]> {
await this.initialize()
const triggered = await this.client.getTriggeredIssues({
milestone: milestoneId,
labels: ['status::new', 'status::planned', 'status::researching', 'status::designed', 'status::testing'],
is_locked: false,
})
const results: PipelineResult[] = []
for (const issue of triggered) {
const result = await this.supervise({ issueNumber: issue.number, milestone: milestoneId })
results.push(result)
}
return results
}
private async logEvent(issueNumber: number, emoji: string, message: string): Promise<void> {
if (this.autoLog) {
await this.client.createComment(issueNumber, {
body: `${emoji} **Supervisor**: ${message}\n\n\`\`\`\nTimestamp: ${new Date().toISOString()}\n\`\`\``
})
}
}
}
export async function createPollingSupervisor(config?: PipelineConfig): Promise<PollingSupervisor> {
const supervisor = new PollingSupervisor(config)
await supervisor.initialize()
return supervisor
}
export { GiteaClient }