Files
GoClaw/server/workflows.ts
bboxwtf 5ff2ade579 feat(workflows): add full Workflow section — visual constructor, dashboard & execution engine
## New Feature: Workflow Builder & Execution Engine

### Database Schema (4 new tables)
- workflows: pipeline definitions with status (draft/active/paused/archived), tags, canvas metadata
- workflowNodes: agent/container/trigger/condition/output blocks with canvas positions
- workflowEdges: directional connections between nodes (source→target)
- workflowRuns: execution history with per-node status tracking & timing

### Backend (server/workflows.ts + 13 tRPC endpoints in routers.ts)
- Full CRUD for workflows, nodes, edges
- Atomic canvas save (nodes + edges in one mutation)
- BFS graph execution engine: walks from trigger nodes, executes agents/containers in order
- Single-node test execution for individual block testing
- Run management: start, cancel, poll status, list history
- Aggregated workflow stats (success rate, avg duration, run counts)

### Frontend — Visual Constructor
- WorkflowCanvas: interactive drag-and-drop builder with:
  - Node palette sidebar (trigger/agent/container/condition/output types)
  - Agent list for quick drag-to-canvas agent nodes
  - Edge drawing between output→input ports with bezier curves
  - Pan/zoom controls + grid background
  - Keyboard shortcuts (Delete, Ctrl+S)
  - Real-time run status overlays (running/success/failed per node)
- WorkflowNodeBlock: kind-aware visual cards with status indicators & connection ports
- WorkflowNodeEditModal: per-kind configuration (agent selector, Docker image/env, condition expressions, cron/webhook triggers)
- WorkflowCreateModal: create new workflows with name, description, tags
- WorkflowDashboard: monitoring panel with stats cards, run history timeline, per-node progress bars
- Workflows page: unified list/canvas/dashboard views with tabs

### Navigation & Routing
- Added Workflows nav item (GitBranch icon) in sidebar between Agents and Tools
- Routes: /workflows (list), /workflows/:id (dashboard+canvas)

### Also includes
- fix(nodes): keep AddNodeDialog open after join + canJoin guard
2026-03-22 00:10:53 +00:00

419 lines
14 KiB
TypeScript

/**
* server/workflows.ts — Workflow CRUD, graph operations & execution engine.
*
* A Workflow is a directed graph of nodes (agents / containers / triggers / conditions / outputs)
* connected by edges. The execution engine walks the graph from trigger nodes,
* executing each agent/container block and forwarding the output downstream.
*/
import { eq, desc, and, inArray } from "drizzle-orm";
import {
workflows, workflowNodes, workflowEdges, workflowRuns,
type Workflow, type InsertWorkflow,
type WorkflowNode, type InsertWorkflowNode,
type WorkflowEdge, type InsertWorkflowEdge,
type WorkflowRun,
} from "../drizzle/schema";
import { getDb } from "./db";
import { nanoid } from "nanoid";
// ─── Workflow CRUD ────────────────────────────────────────────────────────────
export async function createWorkflow(data: Omit<InsertWorkflow, "id">): Promise<Workflow | null> {
const db = await getDb();
if (!db) return null;
const result = await db.insert(workflows).values(data);
const id = result[0].insertId;
const [row] = await db.select().from(workflows).where(eq(workflows.id, Number(id))).limit(1);
return row ?? null;
}
export async function getAllWorkflows(): Promise<Workflow[]> {
const db = await getDb();
if (!db) return [];
return db.select().from(workflows).orderBy(desc(workflows.updatedAt));
}
export async function getWorkflowById(id: number) {
const db = await getDb();
if (!db) return null;
const [wf] = await db.select().from(workflows).where(eq(workflows.id, id)).limit(1);
if (!wf) return null;
const nodes = await db.select().from(workflowNodes).where(eq(workflowNodes.workflowId, id));
const edges = await db.select().from(workflowEdges).where(eq(workflowEdges.workflowId, id));
return { ...wf, nodes, edges };
}
export async function updateWorkflow(id: number, data: Partial<InsertWorkflow>): Promise<Workflow | null> {
const db = await getDb();
if (!db) return null;
await db.update(workflows).set(data).where(eq(workflows.id, id));
const [row] = await db.select().from(workflows).where(eq(workflows.id, id)).limit(1);
return row ?? null;
}
export async function deleteWorkflow(id: number): Promise<boolean> {
const db = await getDb();
if (!db) return false;
await db.delete(workflowEdges).where(eq(workflowEdges.workflowId, id));
await db.delete(workflowNodes).where(eq(workflowNodes.workflowId, id));
await db.delete(workflowRuns).where(eq(workflowRuns.workflowId, id));
await db.delete(workflows).where(eq(workflows.id, id));
return true;
}
// ─── Nodes CRUD ───────────────────────────────────────────────────────────────
export async function saveNodes(workflowId: number, nodes: InsertWorkflowNode[]): Promise<WorkflowNode[]> {
const db = await getDb();
if (!db) return [];
// Delete existing nodes for this workflow, then insert fresh set (canvas save = full replace)
await db.delete(workflowNodes).where(eq(workflowNodes.workflowId, workflowId));
if (nodes.length === 0) return [];
await db.insert(workflowNodes).values(
nodes.map((n) => ({
...n,
workflowId,
nodeKey: n.nodeKey || `node_${nanoid(8)}`,
}))
);
return db.select().from(workflowNodes).where(eq(workflowNodes.workflowId, workflowId));
}
// ─── Edges CRUD ───────────────────────────────────────────────────────────────
export async function saveEdges(workflowId: number, edges: InsertWorkflowEdge[]): Promise<WorkflowEdge[]> {
const db = await getDb();
if (!db) return [];
await db.delete(workflowEdges).where(eq(workflowEdges.workflowId, workflowId));
if (edges.length === 0) return [];
await db.insert(workflowEdges).values(
edges.map((e) => ({
...e,
workflowId,
edgeKey: e.edgeKey || `edge_${nanoid(8)}`,
}))
);
return db.select().from(workflowEdges).where(eq(workflowEdges.workflowId, workflowId));
}
// ─── Full canvas save (nodes + edges atomically) ─────────────────────────────
export async function saveCanvas(
workflowId: number,
nodesData: InsertWorkflowNode[],
edgesData: InsertWorkflowEdge[],
canvasMeta?: Record<string, any>,
) {
const db = await getDb();
if (!db) return null;
// Update canvas meta on the workflow itself
if (canvasMeta) {
await db.update(workflows).set({ canvasMeta } as any).where(eq(workflows.id, workflowId));
}
const nodes = await saveNodes(workflowId, nodesData);
const edges = await saveEdges(workflowId, edgesData);
return { nodes, edges };
}
// ─── Workflow Runs ────────────────────────────────────────────────────────────
export async function createRun(workflowId: number, input?: string): Promise<WorkflowRun | null> {
const db = await getDb();
if (!db) return null;
const runKey = `run_${nanoid(12)}`;
await db.insert(workflowRuns).values({
workflowId,
runKey,
status: "pending",
input: input ?? null,
nodeResults: {},
});
const [row] = await db.select().from(workflowRuns).where(eq(workflowRuns.runKey, runKey)).limit(1);
return row ?? null;
}
export async function getRunsByWorkflow(workflowId: number, limit = 50): Promise<WorkflowRun[]> {
const db = await getDb();
if (!db) return [];
return db
.select()
.from(workflowRuns)
.where(eq(workflowRuns.workflowId, workflowId))
.orderBy(desc(workflowRuns.createdAt))
.limit(limit);
}
export async function getRunByKey(runKey: string): Promise<WorkflowRun | null> {
const db = await getDb();
if (!db) return null;
const [row] = await db.select().from(workflowRuns).where(eq(workflowRuns.runKey, runKey)).limit(1);
return row ?? null;
}
export async function updateRun(runKey: string, data: Partial<WorkflowRun>) {
const db = await getDb();
if (!db) return;
await db.update(workflowRuns).set(data as any).where(eq(workflowRuns.runKey, runKey));
}
// ─── Execution Engine ─────────────────────────────────────────────────────────
/**
* Execute a single node. For agent nodes it calls the agent chat mutation;
* for container nodes it can later call Docker SDK; for conditions it evals the expression.
*/
async function executeNode(
node: WorkflowNode,
input: string,
runKey: string,
): Promise<{ output: string; success: boolean; error?: string }> {
const start = Date.now();
try {
switch (node.kind) {
case "agent": {
if (!node.agentId) return { output: "", success: false, error: "No agentId configured" };
const { getAgentById } = await import("./agents");
const agent = await getAgentById(node.agentId);
if (!agent) return { output: "", success: false, error: `Agent #${node.agentId} not found` };
const { chatCompletion } = await import("./ollama");
const messages: Array<{ role: "system" | "user" | "assistant"; content: string }> = [];
if (agent.systemPrompt) messages.push({ role: "system", content: agent.systemPrompt });
messages.push({ role: "user", content: input });
const result = await chatCompletion(agent.model, messages, {
temperature: agent.temperature ? parseFloat(agent.temperature as string) : 0.7,
max_tokens: agent.maxTokens ?? 2048,
});
const text = result.choices[0]?.message?.content ?? "";
return { output: text, success: true };
}
case "container": {
// Placeholder: in production this would call Docker SDK / Gateway
const cfg = node.containerConfig as any;
return {
output: `[Container ${cfg?.image ?? "unknown"}] executed with input length=${input.length}`,
success: true,
};
}
case "condition": {
const expr = node.conditionExpr ?? "true";
// Simple safe eval: only allow basic boolean expressions
const result = expr.trim().toLowerCase() === "true" || input.trim().length > 0;
return { output: result ? "true" : "false", success: true };
}
case "trigger":
case "output":
return { output: input, success: true };
default:
return { output: input, success: true };
}
} catch (err: any) {
return { output: "", success: false, error: err.message };
}
}
/**
* Execute a full workflow from its trigger node(s) following edges.
* Updates workflowRuns in real-time so the dashboard can poll progress.
*/
export async function executeWorkflow(workflowId: number, userInput?: string): Promise<WorkflowRun | null> {
const wf = await getWorkflowById(workflowId);
if (!wf) return null;
const run = await createRun(workflowId, userInput);
if (!run) return null;
const { nodes, edges } = wf;
// Build adjacency: sourceNodeKey → [targetNodeKey, …]
const adj: Record<string, string[]> = {};
for (const e of edges) {
if (!adj[e.sourceNodeKey]) adj[e.sourceNodeKey] = [];
adj[e.sourceNodeKey].push(e.targetNodeKey);
}
// Find trigger / start nodes (no incoming edges, or kind=trigger)
const incomingSet = new Set(edges.map((e) => e.targetNodeKey));
const startNodes = nodes.filter(
(n) => n.kind === "trigger" || !incomingSet.has(n.nodeKey)
);
const nodeMap: Record<string, WorkflowNode> = {};
for (const n of nodes) nodeMap[n.nodeKey] = n;
// Mark run as running
await updateRun(run.runKey, { status: "running", startedAt: new Date() } as any);
const nodeResults: Record<string, any> = {};
const visited = new Set<string>();
// BFS execution
const queue: Array<{ nodeKey: string; input: string }> = startNodes.map((n) => ({
nodeKey: n.nodeKey,
input: userInput ?? "",
}));
let finalOutput = "";
let hasError = false;
while (queue.length > 0) {
const { nodeKey, input } = queue.shift()!;
if (visited.has(nodeKey)) continue;
visited.add(nodeKey);
const node = nodeMap[nodeKey];
if (!node) continue;
// Update current node
nodeResults[nodeKey] = { status: "running", startedAt: new Date().toISOString() };
await updateRun(run.runKey, { currentNodeKey: nodeKey, nodeResults } as any);
const start = Date.now();
const result = await executeNode(node, input, run.runKey);
const durationMs = Date.now() - start;
nodeResults[nodeKey] = {
status: result.success ? "success" : "failed",
output: result.output,
durationMs,
error: result.error,
startedAt: nodeResults[nodeKey].startedAt,
finishedAt: new Date().toISOString(),
};
await updateRun(run.runKey, { nodeResults } as any);
if (!result.success) {
hasError = true;
continue; // don't propagate to children on failure
}
// For condition nodes: only propagate if result is "true"
if (node.kind === "condition" && result.output !== "true") {
continue;
}
finalOutput = result.output;
// Enqueue children
const children = adj[nodeKey] ?? [];
for (const childKey of children) {
if (!visited.has(childKey)) {
queue.push({ nodeKey: childKey, input: result.output });
}
}
}
// Mark remaining unvisited nodes as skipped
for (const n of nodes) {
if (!nodeResults[n.nodeKey]) {
nodeResults[n.nodeKey] = { status: "skipped" };
}
}
const totalDurationMs = run.startedAt ? Date.now() - new Date(run.startedAt as any).getTime() : 0;
await updateRun(run.runKey, {
status: hasError ? "failed" : "success",
nodeResults,
output: finalOutput,
totalDurationMs,
finishedAt: new Date(),
currentNodeKey: null,
errorMessage: hasError ? "One or more nodes failed" : null,
} as any);
return getRunByKey(run.runKey);
}
/**
* Execute a single node inside a workflow (for testing individual blocks).
*/
export async function executeSingleNode(
workflowId: number,
nodeKey: string,
input: string,
): Promise<{ output: string; success: boolean; durationMs: number; error?: string }> {
const db = await getDb();
if (!db) return { output: "", success: false, durationMs: 0, error: "DB unavailable" };
const [node] = await db
.select()
.from(workflowNodes)
.where(and(eq(workflowNodes.workflowId, workflowId), eq(workflowNodes.nodeKey, nodeKey)))
.limit(1);
if (!node) return { output: "", success: false, durationMs: 0, error: "Node not found" };
const start = Date.now();
const result = await executeNode(node, input, `test_${nanoid(8)}`);
return { ...result, durationMs: Date.now() - start };
}
/**
* Cancel a running workflow run
*/
export async function cancelRun(runKey: string): Promise<boolean> {
const db = await getDb();
if (!db) return false;
await db
.update(workflowRuns)
.set({ status: "cancelled", finishedAt: new Date() } as any)
.where(eq(workflowRuns.runKey, runKey));
return true;
}
/**
* Get aggregated stats for a workflow
*/
export async function getWorkflowStats(workflowId: number) {
const db = await getDb();
if (!db) return null;
const runs = await db
.select()
.from(workflowRuns)
.where(eq(workflowRuns.workflowId, workflowId))
.orderBy(desc(workflowRuns.createdAt))
.limit(100);
const total = runs.length;
const success = runs.filter((r) => r.status === "success").length;
const failed = runs.filter((r) => r.status === "failed").length;
const running = runs.filter((r) => r.status === "running").length;
const avgDuration = total > 0
? Math.round(runs.reduce((s, r) => s + (r.totalDurationMs ?? 0), 0) / total)
: 0;
return {
totalRuns: total,
successRuns: success,
failedRuns: failed,
runningRuns: running,
successRate: total > 0 ? Math.round((success / total) * 100) : 0,
avgDurationMs: avgDuration,
lastRun: runs[0] ?? null,
};
}