Files
GoClaw/server/routers.ts
bboxwtf 13b7ab57c5 feat(chat): add TaskBoard with tabbed Console/Tasks panel, auto-retry loop, and persistent task CRUD
- chatStore.ts: Added TaskBoard data model (Task, TaskSubtask, TaskBoard types),
  CRUD methods (addTask, updateTaskStatus, addSubtask, removeTask, clearTasks),
  progress tracking (getTaskProgress), auto-retry logic on session errors,
  elapsed time tracking with interval timer, localStorage persistence

- TaskBoard.tsx: New interactive component with task list, expandable subtasks,
  priority badges (critical/high/medium/low), creator badges (user/orchestrator/agent),
  status cycling (pending->in_progress->completed), add task form with priority selector,
  progress bar with completion %, elapsed time display, auto-retry toggle

- Chat.tsx: Replaced single Console right panel with tabbed panel (Console + Tasks),
  tab badges showing task completion count, pulsing indicator for active console events,
  Tasks tab imports and renders TaskBoard component

- routers.ts: Added tasks router with CRUD endpoints (list, create, updateStatus,
  addSubtask, delete) backed by MySQL chatTasks table

- schema.ts: Added chatTasks table (taskId, sessionId, content, status, priority,
  createdBy, assignedTo, subtasks JSON, elapsedMs, retryCount, lastError, timestamps)

- Auto-retry: When session errors and tasks remain incomplete, orchestrator
  automatically retries after 3s with context about the TODO board
2026-03-22 13:12:36 +00:00

1362 lines
46 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { COOKIE_NAME } from "@shared/const";
import { z } from "zod";
import { getDb } from "./db";
import { getSessionCookieOptions } from "./_core/cookies";
import { systemRouter } from "./_core/systemRouter";
import { publicProcedure, router, protectedProcedure } from "./_core/trpc";
import { checkOllamaHealth, listModels, chatCompletion } from "./ollama";
import {
checkGatewayHealth,
gatewayChat,
getGatewayOrchestratorConfig,
getGatewayModels,
getGatewayTools,
executeGatewayTool,
isGatewayAvailable,
getGatewayNodes,
getGatewayNodeStats,
startChatSession,
getChatSession,
getChatEvents,
listChatSessions,
getSwarmInfo,
listSwarmNodes,
listSwarmServices,
getServiceTasks,
scaleSwarmService,
getSwarmJoinToken,
execSwarmShell,
addSwarmNodeLabel,
setNodeAvailability,
createAgentService,
removeSwarmService,
listSwarmAgents,
startSwarmAgent,
stopSwarmAgent,
getOllamaModelInfo,
joinSwarmNodeViaSSH,
testSSHConnection,
} from "./gateway-proxy";
// Shared system user id for non-authenticated agent management
const SYSTEM_USER_ID = 1;
export const appRouter = router({
system: systemRouter,
auth: router({
me: publicProcedure.query(opts => opts.ctx.user),
logout: publicProcedure.mutation(({ ctx }) => {
const cookieOptions = getSessionCookieOptions(ctx.req);
ctx.res.clearCookie(COOKIE_NAME, { ...cookieOptions, maxAge: -1 });
return { success: true } as const;
}),
}),
/**
* LLM Providers — full CRUD backed by DB (llmProviders table).
* API keys stored encrypted with AES-256-GCM; never returned in plaintext to frontend.
*/
providers: router({
/** List all providers (keys masked). */
list: publicProcedure.query(async () => {
const { listProviders } = await import("./providers");
return listProviders();
}),
/** Create a new provider. */
create: publicProcedure
.input(z.object({
name: z.string().min(1),
baseUrl: z.string().url(),
apiKey: z.string(),
modelDefault: z.string().optional(),
notes: z.string().optional(),
setActive: z.boolean().default(false),
}))
.mutation(async ({ input }) => {
const { createProvider } = await import("./providers");
const id = await createProvider(input);
return { id };
}),
/** Update a provider (pass apiKey="" to keep existing key). */
update: publicProcedure
.input(z.object({
id: z.number(),
name: z.string().optional(),
baseUrl: z.string().url().optional(),
apiKey: z.string().optional(),
modelDefault: z.string().optional(),
notes: z.string().optional(),
isActive: z.boolean().optional(),
}))
.mutation(async ({ input }) => {
const { updateProvider } = await import("./providers");
const { id, ...rest } = input;
await updateProvider(id, rest);
return { ok: true };
}),
/** Delete a provider (cannot delete the active one). */
delete: publicProcedure
.input(z.object({ id: z.number() }))
.mutation(async ({ input }) => {
const { deleteProvider } = await import("./providers");
return deleteProvider(input.id);
}),
/** Activate a provider and signal the gateway to reload its config. */
activate: publicProcedure
.input(z.object({ id: z.number() }))
.mutation(async ({ input }) => {
const { activateProvider } = await import("./providers");
await activateProvider(input.id);
return { ok: true };
}),
}),
/**
* System config — returns active LLM provider config (key masked).
* Used by Settings page and AgentDetailModal.
*/
config: router({
providers: publicProcedure.query(async () => {
const { listProviders, getActiveProvider } = await import("./providers");
// Try DB first
try {
const rows = await listProviders();
if (rows.length > 0) {
return {
providers: rows.map((p) => ({
id: p.id.toString(),
name: p.name,
baseUrl: p.baseUrl,
hasKey: !!p.apiKeyHint,
maskedKey: p.apiKeyHint ? `${p.apiKeyHint}${"*".repeat(24)}` : "",
isActive: p.isActive,
modelDefault: p.modelDefault,
})),
};
}
} catch { /* fallback below */ }
// Fallback: read from env
const { ENV } = await import("./_core/env");
const baseUrl = ENV.ollamaBaseUrl || "https://ollama.com/v1";
const apiKey = ENV.ollamaApiKey || "";
const hasKey = apiKey.length > 0;
const maskedKey = hasKey ? `${apiKey.slice(0, 8)}${"*".repeat(Math.max(0, apiKey.length - 8))}` : "";
let providerName = "Ollama Cloud";
if (baseUrl.includes("openai.com")) providerName = "OpenAI";
else if (baseUrl.includes("anthropic.com")) providerName = "Anthropic";
else if (baseUrl.includes("groq.com")) providerName = "Groq";
else if (baseUrl.includes("mistral.ai")) providerName = "Mistral";
else if (!baseUrl.includes("ollama.com")) providerName = "Custom";
return {
providers: [{ id: "primary", name: providerName, baseUrl, hasKey, maskedKey, isActive: true, modelDefault: null }],
};
}),
}),
/**
* Ollama API — серверный прокси для безопасного доступа
* Приоритет: Go Gateway → прямой Ollama
*/
ollama: router({
health: publicProcedure.query(async () => {
// Try Go Gateway first, fall back to direct Ollama
const gwHealth = await checkGatewayHealth();
if (gwHealth.connected) {
return {
connected: true as const,
latencyMs: gwHealth.latencyMs,
source: "gateway" as const,
llm: gwHealth.llm,
error: undefined as string | undefined,
};
}
// Fallback: direct Ollama
const ollamaHealth = await checkOllamaHealth();
return { ...ollamaHealth, source: "direct" as const };
}),
models: publicProcedure.query(async () => {
// Try Go Gateway first
const gwModels = await getGatewayModels();
if (gwModels) {
return {
success: true as const,
models: gwModels.data ?? [],
source: "gateway" as const,
};
}
// Fallback: direct Ollama
try {
const result = await listModels();
return {
success: true as const,
models: result.data ?? [],
source: "direct" as const,
};
} catch (err: any) {
return {
success: false as const,
models: [],
error: err.message,
};
}
}),
/** Fetch model details (context_length, family, quantization…) from Ollama /api/show */
modelInfo: publicProcedure
.input(z.object({ modelId: z.string() }))
.query(async ({ input }) => {
const info = await getOllamaModelInfo(input.modelId);
return info ?? { contextLength: 0 };
}),
chat: publicProcedure
.input(
z.object({
model: z.string(),
messages: z.array(
z.object({
role: z.enum(["system", "user", "assistant"]),
content: z.string(),
})
),
temperature: z.number().optional(),
max_tokens: z.number().optional(),
})
)
.mutation(async ({ input }) => {
try {
const result = await chatCompletion(input.model, input.messages, {
temperature: input.temperature,
max_tokens: input.max_tokens,
});
return {
success: true as const,
response: result.choices[0]?.message?.content ?? "",
model: result.model,
usage: result.usage,
};
} catch (err: any) {
return {
success: false as const,
response: "",
error: err.message,
};
}
}),
}),
/**
* Agents — управление AI-агентами (public — внутренний инструмент)
*/
agents: router({
list: publicProcedure.query(async () => {
// getAllAgents returns both system agents (userId=0) and user-created agents
const { getAllAgents } = await import("./agents");
return getAllAgents();
}),
get: publicProcedure.input(z.object({ id: z.number() })).query(async ({ input }) => {
const { getAgentById } = await import("./agents");
return getAgentById(input.id);
}),
create: publicProcedure
.input(
z.object({
name: z.string().min(1),
description: z.string().optional(),
role: z.string(),
model: z.string(),
provider: z.string(),
temperature: z.number().min(0).max(2).default(0.7),
maxTokens: z.number().default(2048),
topP: z.number().min(0).max(1).default(1.0),
frequencyPenalty: z.number().min(-2).max(2).default(0.0),
presencePenalty: z.number().min(-2).max(2).default(0.0),
systemPrompt: z.string().optional(),
allowedTools: z.array(z.string()).default([]),
allowedDomains: z.array(z.string()).default([]),
tags: z.array(z.string()).default([]),
})
)
.mutation(async ({ input }) => {
const { createAgent } = await import("./agents");
return createAgent(SYSTEM_USER_ID, {
...input,
temperature: input.temperature.toString(),
topP: input.topP.toString(),
frequencyPenalty: input.frequencyPenalty.toString(),
presencePenalty: input.presencePenalty.toString(),
} as any);
}),
update: publicProcedure
.input(
z.object({
id: z.number(),
name: z.string().optional(),
description: z.string().optional(),
model: z.string().optional(),
provider: z.string().optional(),
temperature: z.number().min(0).max(2).optional(),
maxTokens: z.number().optional(),
topP: z.number().min(0).max(1).optional(),
frequencyPenalty: z.number().min(-2).max(2).optional(),
presencePenalty: z.number().min(-2).max(2).optional(),
systemPrompt: z.string().optional(),
allowedTools: z.array(z.string()).optional(),
allowedDomains: z.array(z.string()).optional(),
isActive: z.boolean().optional(),
tags: z.array(z.string()).optional(),
})
)
.mutation(async ({ input }) => {
const { updateAgent } = await import("./agents");
const { id, temperature, topP, frequencyPenalty, presencePenalty, ...rest } = input;
const updates: Record<string, any> = { ...rest };
if (temperature !== undefined) updates.temperature = temperature.toString();
if (topP !== undefined) updates.topP = topP.toString();
if (frequencyPenalty !== undefined) updates.frequencyPenalty = frequencyPenalty.toString();
if (presencePenalty !== undefined) updates.presencePenalty = presencePenalty.toString();
return updateAgent(id, updates as any);
}),
delete: publicProcedure.input(z.object({ id: z.number() })).mutation(async ({ input }) => {
const { deleteAgent } = await import("./agents");
return deleteAgent(input.id);
}),
stats: publicProcedure.input(z.object({ id: z.number(), hoursBack: z.number().default(24) })).query(async ({ input }) => {
const { getAgentStats } = await import("./agents");
return getAgentStats(input.id, input.hoursBack);
}),
metrics: publicProcedure.input(z.object({ id: z.number(), hoursBack: z.number().default(24) })).query(async ({ input }) => {
const { getAgentMetrics } = await import("./agents");
return getAgentMetrics(input.id, input.hoursBack);
}),
history: publicProcedure.input(z.object({ id: z.number(), limit: z.number().default(50) })).query(async ({ input }) => {
const { getAgentHistory } = await import("./agents");
return getAgentHistory(input.id, input.limit);
}),
accessControl: publicProcedure.input(z.object({ id: z.number() })).query(async ({ input }) => {
const { getAgentAccessControl } = await import("./agents");
return getAgentAccessControl(input.id);
}),
updateToolAccess: publicProcedure
.input(
z.object({
agentId: z.number(),
tool: z.string(),
isAllowed: z.boolean(),
maxExecutionsPerHour: z.number().optional(),
timeoutSeconds: z.number().optional(),
allowedPatterns: z.array(z.string()).optional(),
blockedPatterns: z.array(z.string()).optional(),
})
)
.mutation(async ({ input }) => {
const { updateToolAccess } = await import("./agents");
const { agentId, ...updates } = input;
return updateToolAccess(agentId, input.tool, updates);
}),
/**
* Chat with a specific agent using its configuration
*/
chat: publicProcedure
.input(
z.object({
agentId: z.number(),
message: z.string(),
conversationId: z.string().optional(),
})
)
.mutation(async ({ input }) => {
const { getAgentById, saveHistory } = await import("./agents");
const agent = await getAgentById(input.agentId);
if (!agent) {
return { success: false as const, response: "", error: "Agent not found" };
}
const messages: Array<{ role: "system" | "user" | "assistant"; content: string }> = [];
if (agent.systemPrompt) {
messages.push({ role: "system", content: agent.systemPrompt });
}
messages.push({ role: "user", content: input.message });
const startTime = Date.now();
try {
const result = await chatCompletion(
agent.model,
messages,
{
temperature: agent.temperature ? parseFloat(agent.temperature as string) : 0.7,
max_tokens: agent.maxTokens ?? 2048,
}
);
const processingTimeMs = Date.now() - startTime;
const response = result.choices[0]?.message?.content ?? "";
// Save to history
await saveHistory(input.agentId, {
userMessage: input.message,
agentResponse: response,
conversationId: input.conversationId,
status: "success",
});
// Save metric
const { saveMetric } = await import("./agents");
await saveMetric(input.agentId, {
userMessage: input.message,
agentResponse: response,
inputTokens: result.usage?.prompt_tokens ?? 0,
outputTokens: result.usage?.completion_tokens ?? 0,
totalTokens: result.usage?.total_tokens ?? 0,
processingTimeMs,
status: "success",
toolsCalled: [],
model: result.model ?? agent.model,
}).catch(() => {}); // non-fatal
return {
success: true as const,
response,
model: result.model,
usage: result.usage,
processingTimeMs,
};
} catch (err: any) {
const processingTimeMs = Date.now() - startTime;
await saveHistory(input.agentId, {
userMessage: input.message,
agentResponse: null,
conversationId: input.conversationId,
status: "error",
});
const { saveMetric } = await import("./agents");
saveMetric(input.agentId, {
userMessage: input.message,
processingTimeMs,
status: "error",
errorMessage: err.message,
toolsCalled: [],
model: agent.model,
}).catch(() => {}); // non-fatal
return {
success: false as const,
response: "",
error: err.message,
};
}
}),
}),
/**
* Tools — управление инструментами агентов
*/
tools: router({
list: publicProcedure.query(async () => {
const { getAllTools } = await import("./tools");
return getAllTools();
}),
execute: publicProcedure
.input(
z.object({
agentId: z.number(),
tool: z.string(),
params: z.record(z.string(), z.unknown()),
})
)
.mutation(async ({ input }) => {
const { executeTool } = await import("./tools");
return executeTool(input.agentId, input.tool, input.params);
}),
}),
/**
* Browser Agent — управление браузерными сессиями через Puppeteer
*/
browser: router({
createSession: publicProcedure
.input(z.object({ agentId: z.number() }))
.mutation(async ({ input }) => {
const { createBrowserSession } = await import("./browser-agent");
return createBrowserSession(input.agentId);
}),
execute: publicProcedure
.input(
z.object({
sessionId: z.string(),
action: z.object({
type: z.enum(["navigate", "click", "type", "extract", "screenshot", "scroll", "wait", "evaluate", "close"]),
params: z.record(z.string(), z.unknown()),
}),
})
)
.mutation(async ({ input }) => {
const { executeBrowserAction } = await import("./browser-agent");
return executeBrowserAction(input.sessionId, input.action as any);
}),
getSessions: publicProcedure
.input(z.object({ agentId: z.number() }))
.query(async ({ input }) => {
const { getAgentSessions } = await import("./browser-agent");
return getAgentSessions(input.agentId);
}),
closeSession: publicProcedure
.input(z.object({ sessionId: z.string() }))
.mutation(async ({ input }) => {
const { executeBrowserAction } = await import("./browser-agent");
return executeBrowserAction(input.sessionId, { type: "close", params: {} });
}),
closeAllSessions: publicProcedure
.input(z.object({ agentId: z.number() }))
.mutation(async ({ input }) => {
const { closeAllAgentSessions } = await import("./browser-agent");
await closeAllAgentSessions(input.agentId);
return { success: true };
}),
}),
/**
* Tool Builder — генерация и установка новых инструментов через LLM
*/
toolBuilder: router({
generate: publicProcedure
.input(
z.object({
name: z.string().min(1),
description: z.string().min(10),
category: z.string().optional(),
exampleInput: z.string().optional(),
exampleOutput: z.string().optional(),
dangerous: z.boolean().optional(),
})
)
.mutation(async ({ input }) => {
const { generateTool } = await import("./tool-builder");
return generateTool(input);
}),
install: publicProcedure
.input(
z.object({
toolId: z.string(),
name: z.string(),
description: z.string(),
category: z.string(),
dangerous: z.boolean(),
parameters: z.record(z.string(), z.object({
type: z.string(),
description: z.string(),
required: z.boolean().optional(),
})),
implementation: z.string(),
})
)
.mutation(async ({ input }) => {
const { installTool } = await import("./tool-builder");
return installTool(input);
}),
listCustom: publicProcedure.query(async () => {
const { getCustomTools } = await import("./tool-builder");
return getCustomTools();
}),
delete: publicProcedure
.input(z.object({ toolId: z.string() }))
.mutation(async ({ input }) => {
const { deleteTool } = await import("./tool-builder");
return deleteTool(input.toolId);
}),
test: publicProcedure
.input(
z.object({
toolId: z.string(),
params: z.record(z.string(), z.unknown()),
})
)
.mutation(async ({ input }) => {
const { testTool } = await import("./tool-builder");
return testTool(input.toolId, input.params);
}),
}),
/**
* Agent Compiler — компиляция агентов по ТЗ через LLM
*/
agentCompiler: router({
compile: publicProcedure
.input(
z.object({
specification: z.string().min(20),
name: z.string().optional(),
preferredProvider: z.string().optional(),
preferredModel: z.string().optional(),
})
)
.mutation(async ({ input }) => {
const { compileAgentConfig } = await import("./agent-compiler");
return compileAgentConfig({ ...input, userId: SYSTEM_USER_ID });
}),
deploy: publicProcedure
.input(
z.object({
config: z.object({
name: z.string(),
description: z.string(),
role: z.string(),
model: z.string(),
provider: z.string(),
temperature: z.number(),
maxTokens: z.number(),
topP: z.number(),
frequencyPenalty: z.number(),
presencePenalty: z.number(),
systemPrompt: z.string(),
allowedTools: z.array(z.string()),
allowedDomains: z.array(z.string()),
maxRequestsPerHour: z.number(),
tags: z.array(z.string()),
reasoning: z.string(),
}),
})
)
.mutation(async ({ input }) => {
const { deployCompiledAgent } = await import("./agent-compiler");
return deployCompiledAgent(input.config, SYSTEM_USER_ID);
}),
compileAndDeploy: publicProcedure
.input(
z.object({
specification: z.string().min(20),
name: z.string().optional(),
preferredProvider: z.string().optional(),
preferredModel: z.string().optional(),
})
)
.mutation(async ({ input }) => {
const { compileAndDeployAgent } = await import("./agent-compiler");
return compileAndDeployAgent({ ...input, userId: SYSTEM_USER_ID });
}),
}),
/**
* Orchestrator — main AI agent with tool-use loop
* Приоритет: Go Gateway → Node.js fallback
*/
orchestrator: router({
// Get orchestrator config — Go Gateway first, then Node.js DB
getConfig: publicProcedure.query(async () => {
const gwConfig = await getGatewayOrchestratorConfig();
if (gwConfig) {
return { ...gwConfig, source: "gateway" as const };
}
// Fallback: Node.js orchestrator reads from DB directly
const { getOrchestratorConfig } = await import("./orchestrator");
const config = await getOrchestratorConfig();
return { ...config, source: "direct" as const };
}),
chat: publicProcedure
.input(
z.object({
messages: z.array(
z.object({
role: z.enum(["user", "assistant", "system"]),
content: z.string(),
})
),
model: z.string().optional(),
maxIterations: z.number().min(1).max(20).optional(),
})
)
.mutation(async ({ input }) => {
// Try Go Gateway first (preferred — full Go tool-use loop)
const gwAvailable = await isGatewayAvailable();
if (gwAvailable) {
const result = await gatewayChat(
input.messages,
input.model,
input.maxIterations ?? 10
);
return { ...result, source: "gateway" as const };
}
// Fallback: Node.js orchestrator
const { orchestratorChat } = await import("./orchestrator");
const result = await orchestratorChat(
input.messages,
input.model,
input.maxIterations ?? 10
);
return { ...result, source: "direct" as const };
}),
// List available tools — Go Gateway first
tools: publicProcedure.query(async () => {
const gwTools = await getGatewayTools();
if (gwTools) {
return gwTools.map((t) => ({
name: t.name,
description: t.description,
parameters: t.parameters,
source: "gateway" as const,
}));
}
// Fallback: Node.js tool definitions
const { ORCHESTRATOR_TOOLS } = await import("./orchestrator");
return ORCHESTRATOR_TOOLS.map((t) => ({
name: t.function.name,
description: t.function.description,
parameters: t.function.parameters,
source: "direct" as const,
}));
}),
// Gateway health check
gatewayHealth: publicProcedure.query(async () => {
return checkGatewayHealth();
}),
// Execute a single tool via Go Gateway
executeTool: publicProcedure
.input(
z.object({
tool: z.string(),
args: z.record(z.string(), z.unknown()),
})
)
.mutation(async ({ input }) => {
return executeGatewayTool(input.tool, input.args);
}),
// ── Persistent Background Chat Sessions ──────────────────────────────────
// These routes start a session on the Go Gateway and return immediately.
// The Go Gateway runs the orchestrator in a detached goroutine — survives
// HTTP disconnects, page reloads, and laptop sleep.
// The client polls getEvents until status === "done" | "error".
/** Start a background session. Returns { sessionId, status:"running" }. */
startSession: publicProcedure
.input(
z.object({
messages: z.array(
z.object({
role: z.enum(["user", "assistant", "system"]),
content: z.string(),
})
),
sessionId: z.string(),
model: z.string().optional(),
maxIter: z.number().min(1).max(20).optional(),
})
)
.mutation(async ({ input }) => {
const result = await startChatSession(
input.messages,
input.sessionId,
input.model,
input.maxIter ?? 10
);
if (!result) throw new Error("Gateway unavailable — cannot start background session");
return result;
}),
/** Get session metadata (status, finalResponse, tokens, model…). */
getSession: publicProcedure
.input(z.object({ sessionId: z.string() }))
.query(async ({ input }) => {
const sess = await getChatSession(input.sessionId);
if (!sess) throw new Error("Session not found");
return sess;
}),
/** Get events for a session after a given seq number (incremental polling). */
getEvents: publicProcedure
.input(
z.object({
sessionId: z.string(),
afterSeq: z.number().min(0).default(0),
})
)
.query(async ({ input }) => {
const result = await getChatEvents(input.sessionId, input.afterSeq);
if (!result) return { sessionId: input.sessionId, status: "unknown", events: [] };
return result;
}),
/** List recent sessions (default last 50). */
listSessions: publicProcedure
.input(z.object({ limit: z.number().min(1).max(200).default(50) }))
.query(async ({ input }) => {
const result = await listChatSessions(input.limit);
return result?.sessions ?? [];
}),
}),
/**
* Tasks — persistent task board for chat sessions.
* Both the orchestrator and agents can create/update tasks.
*/
tasks: router({
/** List all tasks, optionally filtered by session */
list: publicProcedure
.input(z.object({
sessionId: z.string().optional(),
status: z.enum(["pending", "in_progress", "completed", "failed", "blocked"]).optional(),
limit: z.number().min(1).max(200).default(50),
}).optional())
.query(async ({ input }) => {
const db = await getDb();
if (!db) return [];
const { chatTasks } = await import("../drizzle/schema");
const { desc, eq, and } = await import("drizzle-orm");
let query = db.select().from(chatTasks).orderBy(desc(chatTasks.createdAt)).limit(input?.limit ?? 50);
// Filtering done in JS for simplicity
const rows = await query;
return rows
.filter((r) => !input?.sessionId || r.sessionId === input.sessionId)
.filter((r) => !input?.status || r.status === input.status);
}),
/** Create a new task */
create: publicProcedure
.input(z.object({
taskId: z.string(),
content: z.string().min(1),
priority: z.enum(["critical", "high", "medium", "low"]).default("medium"),
createdBy: z.string().default("user"),
assignedTo: z.string().optional(),
sessionId: z.string().optional(),
}))
.mutation(async ({ input }) => {
const db = await getDb();
if (!db) throw new Error("Database not available");
const { chatTasks } = await import("../drizzle/schema");
await db.insert(chatTasks).values({
taskId: input.taskId,
content: input.content,
priority: input.priority,
createdBy: input.createdBy,
assignedTo: input.assignedTo,
sessionId: input.sessionId,
});
return { ok: true, taskId: input.taskId };
}),
/** Update task status */
updateStatus: publicProcedure
.input(z.object({
taskId: z.string(),
status: z.enum(["pending", "in_progress", "completed", "failed", "blocked"]),
lastError: z.string().optional(),
}))
.mutation(async ({ input }) => {
const db = await getDb();
if (!db) throw new Error("Database not available");
const { chatTasks } = await import("../drizzle/schema");
const { eq, sql } = await import("drizzle-orm");
const updateSet: Record<string, any> = { status: input.status };
if (input.status === "in_progress") {
updateSet.startedAt = new Date();
}
if (input.status === "completed") {
updateSet.completedAt = new Date();
}
if (input.lastError) {
updateSet.lastError = input.lastError;
updateSet.retryCount = sql`${chatTasks.retryCount} + 1`;
}
await db.update(chatTasks).set(updateSet).where(eq(chatTasks.taskId, input.taskId));
return { ok: true };
}),
/** Add a subtask to a task */
addSubtask: publicProcedure
.input(z.object({
taskId: z.string(),
subtask: z.object({
id: z.string(),
content: z.string().min(1),
createdBy: z.string().default("orchestrator"),
}),
}))
.mutation(async ({ input }) => {
const db = await getDb();
if (!db) throw new Error("Database not available");
const { chatTasks } = await import("../drizzle/schema");
const { eq } = await import("drizzle-orm");
const [task] = await db.select().from(chatTasks).where(eq(chatTasks.taskId, input.taskId)).limit(1);
if (!task) throw new Error("Task not found");
const subs = (task.subtasks ?? []) as any[];
subs.push({
id: input.subtask.id,
content: input.subtask.content,
status: "pending",
createdBy: input.subtask.createdBy,
createdAt: Date.now(),
});
await db.update(chatTasks).set({ subtasks: subs }).where(eq(chatTasks.taskId, input.taskId));
return { ok: true };
}),
/** Delete a task */
delete: publicProcedure
.input(z.object({ taskId: z.string() }))
.mutation(async ({ input }) => {
const db = await getDb();
if (!db) throw new Error("Database not available");
const { chatTasks } = await import("../drizzle/schema");
const { eq } = await import("drizzle-orm");
await db.delete(chatTasks).where(eq(chatTasks.taskId, input.taskId));
return { ok: true };
}),
}),
/**
* Dashboard — aggregated real-time stats for the top status bar
*/
dashboard: router({
/**
* Returns aggregated cluster stats:
* - uptime: server process uptime formatted as "Xd Yh Zm"
* - nodes: running container count from Docker
* - agents: active agent count from DB
* - cpu: average CPU% across all containers
* - mem: total RAM used in MB
*/
stats: publicProcedure.query(async () => {
// 1. Server uptime
const uptimeSec = Math.floor(process.uptime());
const days = Math.floor(uptimeSec / 86400);
const hours = Math.floor((uptimeSec % 86400) / 3600);
const mins = Math.floor((uptimeSec % 3600) / 60);
const uptime = days > 0
? `${days}d ${hours}h ${mins}m`
: hours > 0
? `${hours}h ${mins}m`
: `${mins}m`;
// 2. Container / node stats from Go Gateway
const [nodesResult, statsResult] = await Promise.allSettled([
getGatewayNodes(),
getGatewayNodeStats(),
]);
const nodes = nodesResult.status === "fulfilled" && nodesResult.value
? nodesResult.value
: null;
const statsData = statsResult.status === "fulfilled" && statsResult.value
? statsResult.value
: null;
const containerCount = nodes?.containers?.length ?? nodes?.count ?? 0;
const totalContainers = nodes?.containers?.length ?? 0;
// CPU: average across all containers
const cpuPct = statsData?.stats?.length
? statsData.stats.reduce((sum, s) => sum + s.cpuPct, 0) / statsData.stats.length
: 0;
// MEM: total used MB
const memUseMB = statsData?.stats?.length
? statsData.stats.reduce((sum, s) => sum + s.memUseMB, 0)
: 0;
// 3. Active agents from DB
let activeAgents = 0;
try {
const db = await getDb();
if (db) {
const { agents } = await import("../drizzle/schema");
const { count: drizzleCount, eq } = await import("drizzle-orm");
const [{ value }] = await db
.select({ value: drizzleCount() })
.from(agents)
.where(eq(agents.isActive, true));
activeAgents = Number(value);
}
} catch {
// non-fatal
}
return {
uptime,
nodes: `${containerCount} / ${totalContainers || containerCount}`,
agents: activeAgents,
cpuPct: Math.round(cpuPct * 10) / 10,
memUseMB: Math.round(memUseMB),
gatewayOnline: !!nodes,
fetchedAt: new Date().toISOString(),
};
}),
}),
/**
* Nodes — Docker Swarm / standalone Docker monitoring via Go Gateway
*/
nodes: router({
/**
* Full Docker Swarm info: status, node count, manager address, join tokens.
*/
swarmInfo: publicProcedure.query(async () => {
return getSwarmInfo();
}),
/**
* List real Swarm nodes with live state, resources, labels.
* Falls back to the old gateway nodes endpoint if swarm API unavailable.
*/
list: publicProcedure.query(async () => {
// Try real Swarm API first
const swarm = await listSwarmNodes();
if (swarm) return { ...swarm, swarmActive: true, fetchedAt: new Date().toISOString() };
// Fallback: old gateway nodes
const result = await getGatewayNodes();
if (!result) {
return {
nodes: [] as import("./gateway-proxy").GatewayNodeInfo[],
count: 0,
swarmActive: false,
fetchedAt: new Date().toISOString(),
error: "Gateway unavailable — is the Go Gateway running?",
};
}
return result;
}),
/**
* List all Swarm services with replica counts and running task status.
*/
services: publicProcedure.query(async () => {
const result = await listSwarmServices();
return result ?? { services: [], count: 0 };
}),
/**
* Get all tasks for a specific service (where each replica is running).
*/
serviceTasks: publicProcedure
.input(z.object({ serviceId: z.string() }))
.query(async ({ input }) => {
const result = await getServiceTasks(input.serviceId);
return result ?? { tasks: [] };
}),
/**
* Scale a service to N replicas.
*/
scaleService: publicProcedure
.input(z.object({ serviceId: z.string(), replicas: z.number().min(0).max(100) }))
.mutation(async ({ input }) => {
const ok = await scaleSwarmService(input.serviceId, input.replicas);
return { ok };
}),
/**
* Get join token and command for adding a new node.
*/
joinToken: publicProcedure
.input(z.object({ role: z.enum(["worker", "manager"]).default("worker") }))
.query(async ({ input }) => {
return getSwarmJoinToken(input.role);
}),
/**
* Execute a shell command on the HOST system via nsenter.
* Requires gateway container to run with privileged: true + pid: host.
*/
execShell: publicProcedure
.input(z.object({ command: z.string().min(1).max(4096) }))
.mutation(async ({ input }) => {
const result = await execSwarmShell(input.command);
if (!result) throw new Error("Gateway unavailable");
return result;
}),
/**
* Add a label to a swarm node.
*/
addNodeLabel: publicProcedure
.input(z.object({ nodeId: z.string(), key: z.string(), value: z.string() }))
.mutation(async ({ input }) => {
const ok = await addSwarmNodeLabel(input.nodeId, input.key, input.value);
return { ok };
}),
/**
* Set node availability (active | pause | drain).
*/
setAvailability: publicProcedure
.input(z.object({ nodeId: z.string(), availability: z.enum(["active", "pause", "drain"]) }))
.mutation(async ({ input }) => {
const ok = await setNodeAvailability(input.nodeId, input.availability);
return { ok };
}),
/**
* Deploy an agent as a new Swarm service.
*/
deployAgentService: publicProcedure
.input(z.object({
name: z.string().min(1),
image: z.string().min(1),
replicas: z.number().min(1).max(20).default(1),
env: z.array(z.string()).optional(),
port: z.number().optional(),
networks: z.array(z.string()).optional(),
}))
.mutation(async ({ input }) => {
const result = await createAgentService(input);
if (!result) throw new Error("Failed to create service");
return result;
}),
/**
* Remove (stop and delete) a Swarm service.
*/
removeService: publicProcedure
.input(z.object({ serviceId: z.string().min(1) }))
.mutation(async ({ input }) => {
const ok = await removeSwarmService(input.serviceId);
return { ok };
}),
/**
* List all GoClaw agent services with idle time info.
*/
listAgents: publicProcedure.query(async () => {
const result = await listSwarmAgents();
return result ?? { agents: [], count: 0 };
}),
/**
* Start (scale-up) an agent service by name.
*/
startAgent: publicProcedure
.input(z.object({ name: z.string().min(1), replicas: z.number().min(1).max(20).default(1) }))
.mutation(async ({ input }) => {
const ok = await startSwarmAgent(input.name, input.replicas);
return { ok };
}),
/**
* Stop (scale-to-0) an agent service by name.
*/
stopAgent: publicProcedure
.input(z.object({ name: z.string().min(1) }))
.mutation(async ({ input }) => {
const ok = await stopSwarmAgent(input.name);
return { ok };
}),
/**
* SSH into a remote host and run "docker swarm join ..." to add it to the cluster.
* The gateway fetches the join token automatically.
*/
joinNode: publicProcedure
.input(z.object({
host: z.string().min(1),
port: z.number().int().min(1).max(65535).default(22),
user: z.string().min(1),
password: z.string().min(1),
role: z.enum(["worker", "manager"]).default("worker"),
}))
.mutation(async ({ input }) => {
const result = await joinSwarmNodeViaSSH(input);
if (!result) throw new Error("Gateway unavailable — cannot reach SSH endpoint");
return result;
}),
/**
* Test SSH connectivity and Docker availability on a remote host — no swarm join performed.
*/
sshTest: publicProcedure
.input(z.object({
host: z.string().min(1),
port: z.number().int().min(1).max(65535).default(22),
user: z.string().min(1),
password: z.string().min(1),
}))
.mutation(async ({ input }) => {
const result = await testSSHConnection(input);
if (!result) throw new Error("Gateway unavailable — cannot reach SSH test endpoint");
return result;
}),
/**
* Get live container stats (CPU%, RAM) for all running containers.
*/
stats: publicProcedure.query(async () => {
const result = await getGatewayNodeStats();
if (!result) {
return {
stats: [] as import("./gateway-proxy").GatewayContainerStat[],
count: 0,
fetchedAt: new Date().toISOString(),
error: "Gateway unavailable",
};
}
return result;
}),
}),
/**
* Workflows — visual pipeline builder (CRUD + execution)
*/
workflows: router({
/** List all workflows */
list: publicProcedure.query(async () => {
const { getAllWorkflows } = await import("./workflows");
return getAllWorkflows();
}),
/** Get a single workflow with its nodes and edges */
get: publicProcedure
.input(z.object({ id: z.number() }))
.query(async ({ input }) => {
const { getWorkflowById } = await import("./workflows");
return getWorkflowById(input.id);
}),
/** Create a new workflow */
create: publicProcedure
.input(z.object({
name: z.string().min(1),
description: z.string().optional(),
tags: z.array(z.string()).default([]),
}))
.mutation(async ({ input }) => {
const { createWorkflow } = await import("./workflows");
return createWorkflow({ ...input, status: "draft" });
}),
/** Update workflow metadata */
update: publicProcedure
.input(z.object({
id: z.number(),
name: z.string().optional(),
description: z.string().optional(),
status: z.enum(["draft", "active", "paused", "archived"]).optional(),
tags: z.array(z.string()).optional(),
}))
.mutation(async ({ input }) => {
const { updateWorkflow } = await import("./workflows");
const { id, ...data } = input;
return updateWorkflow(id, data as any);
}),
/** Delete a workflow and all its nodes/edges/runs */
delete: publicProcedure
.input(z.object({ id: z.number() }))
.mutation(async ({ input }) => {
const { deleteWorkflow } = await import("./workflows");
return deleteWorkflow(input.id);
}),
/** Save the full canvas (nodes + edges) atomically */
saveCanvas: publicProcedure
.input(z.object({
workflowId: z.number(),
nodes: z.array(z.object({
nodeKey: z.string(),
label: z.string(),
kind: z.enum(["agent", "container", "trigger", "condition", "output"]),
agentId: z.number().nullable().optional(),
containerConfig: z.record(z.string(), z.unknown()).optional(),
conditionExpr: z.string().optional(),
triggerConfig: z.record(z.string(), z.unknown()).optional(),
posX: z.number().default(0),
posY: z.number().default(0),
meta: z.record(z.string(), z.unknown()).optional(),
})),
edges: z.array(z.object({
edgeKey: z.string(),
sourceNodeKey: z.string(),
targetNodeKey: z.string(),
sourceHandle: z.string().optional(),
targetHandle: z.string().optional(),
label: z.string().optional(),
meta: z.record(z.string(), z.unknown()).optional(),
})),
canvasMeta: z.record(z.string(), z.unknown()).optional(),
}))
.mutation(async ({ input }) => {
const { saveCanvas } = await import("./workflows");
return saveCanvas(
input.workflowId,
input.nodes.map((n) => ({ ...n, workflowId: input.workflowId } as any)),
input.edges.map((e) => ({ ...e, workflowId: input.workflowId } as any)),
input.canvasMeta,
);
}),
/** Execute a full workflow */
execute: publicProcedure
.input(z.object({ workflowId: z.number(), input: z.string().optional() }))
.mutation(async ({ input }) => {
const { executeWorkflow } = await import("./workflows");
return executeWorkflow(input.workflowId, input.input);
}),
/** Execute a single node (for testing) */
executeNode: publicProcedure
.input(z.object({ workflowId: z.number(), nodeKey: z.string(), input: z.string() }))
.mutation(async ({ input }) => {
const { executeSingleNode } = await import("./workflows");
return executeSingleNode(input.workflowId, input.nodeKey, input.input);
}),
/** Cancel a running workflow */
cancelRun: publicProcedure
.input(z.object({ runKey: z.string() }))
.mutation(async ({ input }) => {
const { cancelRun } = await import("./workflows");
return cancelRun(input.runKey);
}),
/** Get run details */
getRun: publicProcedure
.input(z.object({ runKey: z.string() }))
.query(async ({ input }) => {
const { getRunByKey } = await import("./workflows");
return getRunByKey(input.runKey);
}),
/** List runs for a workflow */
listRuns: publicProcedure
.input(z.object({ workflowId: z.number(), limit: z.number().default(50) }))
.query(async ({ input }) => {
const { getRunsByWorkflow } = await import("./workflows");
return getRunsByWorkflow(input.workflowId, input.limit);
}),
/** Get workflow stats */
stats: publicProcedure
.input(z.object({ workflowId: z.number() }))
.query(async ({ input }) => {
const { getWorkflowStats } = await import("./workflows");
return getWorkflowStats(input.workflowId);
}),
}),
});
export type AppRouter = typeof appRouter;