/** * server/workflows.ts — Workflow CRUD, graph operations & execution engine. * * A Workflow is a directed graph of nodes (agents / containers / triggers / conditions / outputs) * connected by edges. The execution engine walks the graph from trigger nodes, * executing each agent/container block and forwarding the output downstream. */ import { eq, desc, and, inArray } from "drizzle-orm"; import { workflows, workflowNodes, workflowEdges, workflowRuns, type Workflow, type InsertWorkflow, type WorkflowNode, type InsertWorkflowNode, type WorkflowEdge, type InsertWorkflowEdge, type WorkflowRun, } from "../drizzle/schema"; import { getDb } from "./db"; import { nanoid } from "nanoid"; // ─── Workflow CRUD ──────────────────────────────────────────────────────────── export async function createWorkflow(data: Omit): Promise { const db = await getDb(); if (!db) return null; const result = await db.insert(workflows).values(data); const id = result[0].insertId; const [row] = await db.select().from(workflows).where(eq(workflows.id, Number(id))).limit(1); return row ?? null; } export async function getAllWorkflows(): Promise { const db = await getDb(); if (!db) return []; return db.select().from(workflows).orderBy(desc(workflows.updatedAt)); } export async function getWorkflowById(id: number) { const db = await getDb(); if (!db) return null; const [wf] = await db.select().from(workflows).where(eq(workflows.id, id)).limit(1); if (!wf) return null; const nodes = await db.select().from(workflowNodes).where(eq(workflowNodes.workflowId, id)); const edges = await db.select().from(workflowEdges).where(eq(workflowEdges.workflowId, id)); return { ...wf, nodes, edges }; } export async function updateWorkflow(id: number, data: Partial): Promise { const db = await getDb(); if (!db) return null; await db.update(workflows).set(data).where(eq(workflows.id, id)); const [row] = await db.select().from(workflows).where(eq(workflows.id, id)).limit(1); return row ?? null; } export async function deleteWorkflow(id: number): Promise { const db = await getDb(); if (!db) return false; await db.delete(workflowEdges).where(eq(workflowEdges.workflowId, id)); await db.delete(workflowNodes).where(eq(workflowNodes.workflowId, id)); await db.delete(workflowRuns).where(eq(workflowRuns.workflowId, id)); await db.delete(workflows).where(eq(workflows.id, id)); return true; } // ─── Nodes CRUD ─────────────────────────────────────────────────────────────── export async function saveNodes(workflowId: number, nodes: InsertWorkflowNode[]): Promise { const db = await getDb(); if (!db) return []; // Delete existing nodes for this workflow, then insert fresh set (canvas save = full replace) await db.delete(workflowNodes).where(eq(workflowNodes.workflowId, workflowId)); if (nodes.length === 0) return []; await db.insert(workflowNodes).values( nodes.map((n) => ({ ...n, workflowId, nodeKey: n.nodeKey || `node_${nanoid(8)}`, })) ); return db.select().from(workflowNodes).where(eq(workflowNodes.workflowId, workflowId)); } // ─── Edges CRUD ─────────────────────────────────────────────────────────────── export async function saveEdges(workflowId: number, edges: InsertWorkflowEdge[]): Promise { const db = await getDb(); if (!db) return []; await db.delete(workflowEdges).where(eq(workflowEdges.workflowId, workflowId)); if (edges.length === 0) return []; await db.insert(workflowEdges).values( edges.map((e) => ({ ...e, workflowId, edgeKey: e.edgeKey || `edge_${nanoid(8)}`, })) ); return db.select().from(workflowEdges).where(eq(workflowEdges.workflowId, workflowId)); } // ─── Full canvas save (nodes + edges atomically) ───────────────────────────── export async function saveCanvas( workflowId: number, nodesData: InsertWorkflowNode[], edgesData: InsertWorkflowEdge[], canvasMeta?: Record, ) { const db = await getDb(); if (!db) return null; // Update canvas meta on the workflow itself if (canvasMeta) { await db.update(workflows).set({ canvasMeta } as any).where(eq(workflows.id, workflowId)); } const nodes = await saveNodes(workflowId, nodesData); const edges = await saveEdges(workflowId, edgesData); return { nodes, edges }; } // ─── Workflow Runs ──────────────────────────────────────────────────────────── export async function createRun(workflowId: number, input?: string): Promise { const db = await getDb(); if (!db) return null; const runKey = `run_${nanoid(12)}`; await db.insert(workflowRuns).values({ workflowId, runKey, status: "pending", input: input ?? null, nodeResults: {}, }); const [row] = await db.select().from(workflowRuns).where(eq(workflowRuns.runKey, runKey)).limit(1); return row ?? null; } export async function getRunsByWorkflow(workflowId: number, limit = 50): Promise { const db = await getDb(); if (!db) return []; return db .select() .from(workflowRuns) .where(eq(workflowRuns.workflowId, workflowId)) .orderBy(desc(workflowRuns.createdAt)) .limit(limit); } export async function getRunByKey(runKey: string): Promise { const db = await getDb(); if (!db) return null; const [row] = await db.select().from(workflowRuns).where(eq(workflowRuns.runKey, runKey)).limit(1); return row ?? null; } export async function updateRun(runKey: string, data: Partial) { const db = await getDb(); if (!db) return; await db.update(workflowRuns).set(data as any).where(eq(workflowRuns.runKey, runKey)); } // ─── Execution Engine ───────────────────────────────────────────────────────── /** * Execute a single node. For agent nodes it calls the agent chat mutation; * for container nodes it can later call Docker SDK; for conditions it evals the expression. */ async function executeNode( node: WorkflowNode, input: string, runKey: string, ): Promise<{ output: string; success: boolean; error?: string }> { const start = Date.now(); try { switch (node.kind) { case "agent": { if (!node.agentId) return { output: "", success: false, error: "No agentId configured" }; const { getAgentById } = await import("./agents"); const agent = await getAgentById(node.agentId); if (!agent) return { output: "", success: false, error: `Agent #${node.agentId} not found` }; const { chatCompletion } = await import("./ollama"); const messages: Array<{ role: "system" | "user" | "assistant"; content: string }> = []; if (agent.systemPrompt) messages.push({ role: "system", content: agent.systemPrompt }); messages.push({ role: "user", content: input }); const result = await chatCompletion(agent.model, messages, { temperature: agent.temperature ? parseFloat(agent.temperature as string) : 0.7, max_tokens: agent.maxTokens ?? 2048, }); const text = result.choices[0]?.message?.content ?? ""; return { output: text, success: true }; } case "container": { // Placeholder: in production this would call Docker SDK / Gateway const cfg = node.containerConfig as any; return { output: `[Container ${cfg?.image ?? "unknown"}] executed with input length=${input.length}`, success: true, }; } case "condition": { const expr = node.conditionExpr ?? "true"; // Simple safe eval: only allow basic boolean expressions const result = expr.trim().toLowerCase() === "true" || input.trim().length > 0; return { output: result ? "true" : "false", success: true }; } case "trigger": case "output": return { output: input, success: true }; default: return { output: input, success: true }; } } catch (err: any) { return { output: "", success: false, error: err.message }; } } /** * Execute a full workflow from its trigger node(s) following edges. * Updates workflowRuns in real-time so the dashboard can poll progress. */ export async function executeWorkflow(workflowId: number, userInput?: string): Promise { const wf = await getWorkflowById(workflowId); if (!wf) return null; const run = await createRun(workflowId, userInput); if (!run) return null; const { nodes, edges } = wf; // Build adjacency: sourceNodeKey → [targetNodeKey, …] const adj: Record = {}; for (const e of edges) { if (!adj[e.sourceNodeKey]) adj[e.sourceNodeKey] = []; adj[e.sourceNodeKey].push(e.targetNodeKey); } // Find trigger / start nodes (no incoming edges, or kind=trigger) const incomingSet = new Set(edges.map((e) => e.targetNodeKey)); const startNodes = nodes.filter( (n) => n.kind === "trigger" || !incomingSet.has(n.nodeKey) ); const nodeMap: Record = {}; for (const n of nodes) nodeMap[n.nodeKey] = n; // Mark run as running await updateRun(run.runKey, { status: "running", startedAt: new Date() } as any); const nodeResults: Record = {}; const visited = new Set(); // BFS execution const queue: Array<{ nodeKey: string; input: string }> = startNodes.map((n) => ({ nodeKey: n.nodeKey, input: userInput ?? "", })); let finalOutput = ""; let hasError = false; while (queue.length > 0) { const { nodeKey, input } = queue.shift()!; if (visited.has(nodeKey)) continue; visited.add(nodeKey); const node = nodeMap[nodeKey]; if (!node) continue; // Update current node nodeResults[nodeKey] = { status: "running", startedAt: new Date().toISOString() }; await updateRun(run.runKey, { currentNodeKey: nodeKey, nodeResults } as any); const start = Date.now(); const result = await executeNode(node, input, run.runKey); const durationMs = Date.now() - start; nodeResults[nodeKey] = { status: result.success ? "success" : "failed", output: result.output, durationMs, error: result.error, startedAt: nodeResults[nodeKey].startedAt, finishedAt: new Date().toISOString(), }; await updateRun(run.runKey, { nodeResults } as any); if (!result.success) { hasError = true; continue; // don't propagate to children on failure } // For condition nodes: only propagate if result is "true" if (node.kind === "condition" && result.output !== "true") { continue; } finalOutput = result.output; // Enqueue children const children = adj[nodeKey] ?? []; for (const childKey of children) { if (!visited.has(childKey)) { queue.push({ nodeKey: childKey, input: result.output }); } } } // Mark remaining unvisited nodes as skipped for (const n of nodes) { if (!nodeResults[n.nodeKey]) { nodeResults[n.nodeKey] = { status: "skipped" }; } } const totalDurationMs = run.startedAt ? Date.now() - new Date(run.startedAt as any).getTime() : 0; await updateRun(run.runKey, { status: hasError ? "failed" : "success", nodeResults, output: finalOutput, totalDurationMs, finishedAt: new Date(), currentNodeKey: null, errorMessage: hasError ? "One or more nodes failed" : null, } as any); return getRunByKey(run.runKey); } /** * Execute a single node inside a workflow (for testing individual blocks). */ export async function executeSingleNode( workflowId: number, nodeKey: string, input: string, ): Promise<{ output: string; success: boolean; durationMs: number; error?: string }> { const db = await getDb(); if (!db) return { output: "", success: false, durationMs: 0, error: "DB unavailable" }; const [node] = await db .select() .from(workflowNodes) .where(and(eq(workflowNodes.workflowId, workflowId), eq(workflowNodes.nodeKey, nodeKey))) .limit(1); if (!node) return { output: "", success: false, durationMs: 0, error: "Node not found" }; const start = Date.now(); const result = await executeNode(node, input, `test_${nanoid(8)}`); return { ...result, durationMs: Date.now() - start }; } /** * Cancel a running workflow run */ export async function cancelRun(runKey: string): Promise { const db = await getDb(); if (!db) return false; await db .update(workflowRuns) .set({ status: "cancelled", finishedAt: new Date() } as any) .where(eq(workflowRuns.runKey, runKey)); return true; } /** * Get aggregated stats for a workflow */ export async function getWorkflowStats(workflowId: number) { const db = await getDb(); if (!db) return null; const runs = await db .select() .from(workflowRuns) .where(eq(workflowRuns.workflowId, workflowId)) .orderBy(desc(workflowRuns.createdAt)) .limit(100); const total = runs.length; const success = runs.filter((r) => r.status === "success").length; const failed = runs.filter((r) => r.status === "failed").length; const running = runs.filter((r) => r.status === "running").length; const avgDuration = total > 0 ? Math.round(runs.reduce((s, r) => s + (r.totalDurationMs ?? 0), 0) / total) : 0; return { totalRuns: total, successRuns: success, failedRuns: failed, runningRuns: running, successRate: total > 0 ? Math.round((success / total) * 100) : 0, avgDurationMs: avgDuration, lastRun: runs[0] ?? null, }; }