diff --git a/client/src/App.tsx b/client/src/App.tsx index 9a835da..64627cd 100644 --- a/client/src/App.tsx +++ b/client/src/App.tsx @@ -13,6 +13,7 @@ import Settings from "./pages/Settings"; import Nodes from "./pages/Nodes"; import Tools from "./pages/Tools"; import Skills from "./pages/Skills"; +import Workflows from "./pages/Workflows"; function Router() { // make sure to consider if you need authentication for certain routes @@ -26,6 +27,8 @@ function Router() { + + diff --git a/client/src/components/DashboardLayout.tsx b/client/src/components/DashboardLayout.tsx index 9ff3a8a..29e9af8 100644 --- a/client/src/components/DashboardLayout.tsx +++ b/client/src/components/DashboardLayout.tsx @@ -20,6 +20,7 @@ import { Wifi, Wrench, Zap, + GitBranch, } from "lucide-react"; import { Tooltip, TooltipContent, TooltipTrigger } from "@/components/ui/tooltip"; import { motion, AnimatePresence } from "framer-motion"; @@ -28,6 +29,7 @@ import { trpc } from "@/lib/trpc"; const NAV_ITEMS = [ { path: "/", icon: LayoutDashboard, label: "Дашборд" }, { path: "/agents", icon: Bot, label: "Агенты" }, + { path: "/workflows", icon: GitBranch, label: "Воркфлоу" }, { path: "/tools", icon: Wrench, label: "Инструменты" }, { path: "/skills", icon: Zap, label: "Скилы" }, { path: "/nodes", icon: Server, label: "Ноды" }, diff --git a/client/src/components/WorkflowCanvas.tsx b/client/src/components/WorkflowCanvas.tsx new file mode 100644 index 0000000..34e89e9 --- /dev/null +++ b/client/src/components/WorkflowCanvas.tsx @@ -0,0 +1,554 @@ +/** + * WorkflowCanvas — interactive visual constructor for building workflows. + * Supports: + * - Drag-and-drop nodes from palette + * - Drawing edges between ports + * - Selecting/deleting nodes and edges + * - Panning/zooming the canvas + * - Save to server via tRPC + * - Real-time run status overlays + */ +import { useState, useRef, useCallback, useEffect } from "react"; +import { motion } from "framer-motion"; +import { Button } from "@/components/ui/button"; +import { Badge } from "@/components/ui/badge"; +import { + Save, + Play, + Trash2, + ZoomIn, + ZoomOut, + Maximize2, + Loader2, + X, + AlertCircle, +} from "lucide-react"; +import { toast } from "sonner"; +import { trpc } from "@/lib/trpc"; +import { nanoid } from "nanoid"; +import { WorkflowNodeBlock, WorkflowNodePaletteItem, type WFNodeData, type NodeKind } from "./WorkflowNodeBlock"; +import { WorkflowNodeEditModal } from "./WorkflowNodeEditModal"; + +export interface WFEdgeData { + edgeKey: string; + sourceNodeKey: string; + targetNodeKey: string; + sourceHandle?: string; + targetHandle?: string; + label?: string; + meta?: Record; +} + +interface WorkflowCanvasProps { + workflowId: number; + workflowName: string; + initialNodes?: WFNodeData[]; + initialEdges?: WFEdgeData[]; + /** Run results overlay: nodeKey → status */ + runResults?: Record; + onBack: () => void; +} + +export default function WorkflowCanvas({ + workflowId, + workflowName, + initialNodes = [], + initialEdges = [], + runResults, + onBack, +}: WorkflowCanvasProps) { + const [nodes, setNodes] = useState(initialNodes); + const [edges, setEdges] = useState(initialEdges); + const [selectedNodeKey, setSelectedNodeKey] = useState(null); + const [selectedEdgeKey, setSelectedEdgeKey] = useState(null); + const [editingNode, setEditingNode] = useState(null); + const [zoom, setZoom] = useState(1); + const [pan, setPan] = useState({ x: 0, y: 0 }); + const [isPanning, setIsPanning] = useState(false); + const [panStart, setPanStart] = useState({ x: 0, y: 0 }); + + // Dragging state + const [dragging, setDragging] = useState<{ nodeKey: string; offsetX: number; offsetY: number } | null>(null); + + // Edge drawing state + const [edgeDrawing, setEdgeDrawing] = useState<{ + sourceKey: string; + sourcePortType: "input" | "output"; + mouseX: number; + mouseY: number; + } | null>(null); + + const canvasRef = useRef(null); + + // Fetch agents for the node editor + const { data: agents = [] } = trpc.agents.list.useQuery(); + + // Save canvas mutation + const saveMutation = trpc.workflows.saveCanvas.useMutation({ + onSuccess: () => toast.success("Canvas saved"), + onError: (e) => toast.error(`Save failed: ${e.message}`), + }); + + // Execute workflow mutation + const executeMutation = trpc.workflows.execute.useMutation({ + onSuccess: (run) => { + toast.success(`Workflow started: ${run?.runKey ?? "?"}`); + }, + onError: (e) => toast.error(`Execution failed: ${e.message}`), + }); + + // Test single node + const testNodeMutation = trpc.workflows.executeNode.useMutation({ + onSuccess: (result) => { + if (result.success) toast.success(`Node executed in ${result.durationMs}ms`); + else toast.error(`Node failed: ${result.error}`); + }, + }); + + // Apply run results as status overlays + const nodesWithStatus: WFNodeData[] = nodes.map((n) => { + const rr = runResults?.[n.nodeKey]; + if (!rr) return n; + return { ...n, runStatus: rr.status, runDurationMs: rr.durationMs, runError: rr.error }; + }); + + // ─── Canvas event handlers ─────────────────────────────────────────────── + + const handleCanvasMouseDown = (e: React.MouseEvent) => { + if (e.target === canvasRef.current || (e.target as HTMLElement).dataset.canvas) { + setSelectedNodeKey(null); + setSelectedEdgeKey(null); + setIsPanning(true); + setPanStart({ x: e.clientX - pan.x, y: e.clientY - pan.y }); + } + }; + + const handleCanvasMouseMove = (e: React.MouseEvent) => { + // Panning + if (isPanning) { + setPan({ x: e.clientX - panStart.x, y: e.clientY - panStart.y }); + } + + // Node dragging + if (dragging && canvasRef.current) { + const rect = canvasRef.current.getBoundingClientRect(); + const newX = (e.clientX - rect.left - pan.x) / zoom - dragging.offsetX; + const newY = (e.clientY - rect.top - pan.y) / zoom - dragging.offsetY; + setNodes((prev) => + prev.map((n) => + n.nodeKey === dragging.nodeKey + ? { ...n, posX: Math.round(newX), posY: Math.round(newY) } + : n + ) + ); + } + + // Edge drawing + if (edgeDrawing && canvasRef.current) { + const rect = canvasRef.current.getBoundingClientRect(); + setEdgeDrawing({ + ...edgeDrawing, + mouseX: (e.clientX - rect.left - pan.x) / zoom, + mouseY: (e.clientY - rect.top - pan.y) / zoom, + }); + } + }; + + const handleCanvasMouseUp = () => { + setIsPanning(false); + setDragging(null); + setEdgeDrawing(null); + }; + + const handleNodeDragStart = (nodeKey: string, e: React.MouseEvent) => { + if (canvasRef.current) { + const rect = canvasRef.current.getBoundingClientRect(); + const node = nodes.find((n) => n.nodeKey === nodeKey); + if (!node) return; + const offsetX = (e.clientX - rect.left - pan.x) / zoom - node.posX; + const offsetY = (e.clientY - rect.top - pan.y) / zoom - node.posY; + setDragging({ nodeKey, offsetX, offsetY }); + } + }; + + // Port connection + const handlePortMouseDown = (nodeKey: string, portType: "input" | "output", e: React.MouseEvent) => { + if (canvasRef.current) { + const rect = canvasRef.current.getBoundingClientRect(); + setEdgeDrawing({ + sourceKey: nodeKey, + sourcePortType: portType, + mouseX: (e.clientX - rect.left - pan.x) / zoom, + mouseY: (e.clientY - rect.top - pan.y) / zoom, + }); + } + }; + + // When mouse up on a port — create an edge + const handlePortMouseUp = useCallback((nodeKey: string, portType: "input" | "output") => { + if (!edgeDrawing) return; + // Ensure we're connecting output→input (or input→output) + if (edgeDrawing.sourceKey === nodeKey) return; + const sourceKey = edgeDrawing.sourcePortType === "output" ? edgeDrawing.sourceKey : nodeKey; + const targetKey = edgeDrawing.sourcePortType === "output" ? nodeKey : edgeDrawing.sourceKey; + + // Prevent duplicates + const exists = edges.some((e) => e.sourceNodeKey === sourceKey && e.targetNodeKey === targetKey); + if (exists) return; + + setEdges((prev) => [ + ...prev, + { + edgeKey: `edge_${nanoid(8)}`, + sourceNodeKey: sourceKey, + targetNodeKey: targetKey, + }, + ]); + setEdgeDrawing(null); + }, [edgeDrawing, edges]); + + // Override port mouse-down to also listen for mouse-up (connection target) + const handlePortInteraction = (nodeKey: string, portType: "input" | "output", e: React.MouseEvent) => { + if (edgeDrawing) { + handlePortMouseUp(nodeKey, portType); + } else { + handlePortMouseDown(nodeKey, portType, e); + } + }; + + // ─── Palette drop ───────────────────────────────────────────────────────── + + const handleDrop = (e: React.DragEvent) => { + e.preventDefault(); + const kind = e.dataTransfer.getData("nodeKind") as NodeKind; + if (!kind || !canvasRef.current) return; + + const rect = canvasRef.current.getBoundingClientRect(); + const posX = Math.round((e.clientX - rect.left - pan.x) / zoom - 110); + const posY = Math.round((e.clientY - rect.top - pan.y) / zoom - 30); + + const newNode: WFNodeData = { + nodeKey: `node_${nanoid(8)}`, + label: `New ${kind.charAt(0).toUpperCase() + kind.slice(1)}`, + kind, + posX, + posY, + }; + setNodes((prev) => [...prev, newNode]); + }; + + const handleDragOver = (e: React.DragEvent) => e.preventDefault(); + + // ─── Actions ────────────────────────────────────────────────────────────── + + const handleDeleteNode = (nodeKey: string) => { + setNodes((prev) => prev.filter((n) => n.nodeKey !== nodeKey)); + setEdges((prev) => prev.filter((e) => e.sourceNodeKey !== nodeKey && e.targetNodeKey !== nodeKey)); + setSelectedNodeKey(null); + }; + + const handleDeleteEdge = (edgeKey: string) => { + setEdges((prev) => prev.filter((e) => e.edgeKey !== edgeKey)); + setSelectedEdgeKey(null); + }; + + const handleSave = () => { + saveMutation.mutate({ + workflowId, + nodes: nodes.map((n) => ({ + nodeKey: n.nodeKey, + label: n.label, + kind: n.kind, + agentId: n.agentId ?? null, + containerConfig: n.containerConfig ?? {}, + conditionExpr: n.conditionExpr, + triggerConfig: n.triggerConfig ?? {}, + posX: n.posX, + posY: n.posY, + meta: n.meta ?? {}, + })), + edges: edges.map((e) => ({ + edgeKey: e.edgeKey, + sourceNodeKey: e.sourceNodeKey, + targetNodeKey: e.targetNodeKey, + sourceHandle: e.sourceHandle, + targetHandle: e.targetHandle, + label: e.label, + meta: e.meta ?? {}, + })), + canvasMeta: { zoom, viewportX: pan.x, viewportY: pan.y }, + }); + }; + + const handleExecute = () => { + executeMutation.mutate({ workflowId, input: "" }); + }; + + const handleNodeSave = (updated: WFNodeData) => { + setNodes((prev) => prev.map((n) => (n.nodeKey === updated.nodeKey ? updated : n))); + setEditingNode(null); + }; + + // ─── Edge rendering helpers ─────────────────────────────────────────────── + + const getNodeCenter = (nodeKey: string, portType: "top" | "bottom") => { + const node = nodes.find((n) => n.nodeKey === nodeKey); + if (!node) return { x: 0, y: 0 }; + return { + x: node.posX + 110, // half of 220px width + y: portType === "top" ? node.posY : node.posY + 80, // approximate height + }; + }; + + const buildEdgePath = (sx: number, sy: number, tx: number, ty: number) => { + const midY = (sy + ty) / 2; + return `M ${sx} ${sy} C ${sx} ${midY}, ${tx} ${midY}, ${tx} ${ty}`; + }; + + // ─── Keyboard shortcuts ─────────────────────────────────────────────────── + + useEffect(() => { + const handler = (e: KeyboardEvent) => { + if (e.key === "Delete" || e.key === "Backspace") { + if (selectedNodeKey) handleDeleteNode(selectedNodeKey); + if (selectedEdgeKey) handleDeleteEdge(selectedEdgeKey); + } + if (e.key === "s" && (e.ctrlKey || e.metaKey)) { + e.preventDefault(); + handleSave(); + } + }; + window.addEventListener("keydown", handler); + return () => window.removeEventListener("keydown", handler); + }, [selectedNodeKey, selectedEdgeKey, nodes, edges]); + + return ( + + {/* Toolbar */} + + + + Close + + + {workflowName} + + {nodes.length} nodes · {edges.length} edges + + + + setZoom((z) => Math.min(z + 0.1, 2))} className="h-7 w-7 p-0"> + + + {Math.round(zoom * 100)}% + setZoom((z) => Math.max(z - 0.1, 0.3))} className="h-7 w-7 p-0"> + + + { setZoom(1); setPan({ x: 0, y: 0 }); }} className="h-7 w-7 p-0"> + + + + + {saveMutation.isPending ? : } + Save + + + {executeMutation.isPending ? : } + Run + + + + + + {/* Sidebar palette */} + + Node Palette + {(["trigger", "agent", "container", "condition", "output"] as NodeKind[]).map((kind) => ( + e.dataTransfer.setData("nodeKind", kind)} + > + {}} /> + + ))} + + {/* Quick agent list */} + {agents.length > 0 && ( + <> + Available Agents + {agents.slice(0, 10).map((agent: any) => ( + { + e.dataTransfer.setData("nodeKind", "agent"); + e.dataTransfer.setData("agentId", String(agent.id)); + e.dataTransfer.setData("agentName", agent.name); + }} + > + + {agent.name} + + ))} + > + )} + + + {/* Canvas area */} + { + e.preventDefault(); + const kind = e.dataTransfer.getData("nodeKind") as NodeKind; + if (!kind || !canvasRef.current) return; + + const rect = canvasRef.current.getBoundingClientRect(); + const posX = Math.round((e.clientX - rect.left - pan.x) / zoom - 110); + const posY = Math.round((e.clientY - rect.top - pan.y) / zoom - 30); + + const agentIdStr = e.dataTransfer.getData("agentId"); + const agentName = e.dataTransfer.getData("agentName"); + + const newNode: WFNodeData = { + nodeKey: `node_${nanoid(8)}`, + label: agentName || `New ${kind.charAt(0).toUpperCase() + kind.slice(1)}`, + kind, + agentId: agentIdStr ? Number(agentIdStr) : undefined, + agentName: agentName || undefined, + posX, + posY, + }; + setNodes((prev) => [...prev, newNode]); + }} + onDragOver={handleDragOver} + > + {/* Grid pattern */} + + + + + + + + + + {/* Transform container */} + + {/* Edges SVG */} + + {edges.map((edge) => { + const src = getNodeCenter(edge.sourceNodeKey, "bottom"); + const tgt = getNodeCenter(edge.targetNodeKey, "top"); + const isSelected = selectedEdgeKey === edge.edgeKey; + return ( + + {/* Hit area (wider invisible stroke for clicking) */} + { e.stopPropagation(); setSelectedEdgeKey(edge.edgeKey); setSelectedNodeKey(null); }} + /> + + {/* Arrow marker */} + + + ); + })} + + {/* Edge being drawn */} + {edgeDrawing && (() => { + const srcNode = nodes.find((n) => n.nodeKey === edgeDrawing.sourceKey); + if (!srcNode) return null; + const sx = srcNode.posX + 110; + const sy = edgeDrawing.sourcePortType === "output" ? srcNode.posY + 80 : srcNode.posY; + return ( + + ); + })()} + + + {/* Nodes */} + {nodesWithStatus.map((node) => ( + { setSelectedNodeKey(node.nodeKey); setSelectedEdgeKey(null); }} + onDelete={() => handleDeleteNode(node.nodeKey)} + onEdit={() => setEditingNode(node)} + onDragStart={(e) => handleNodeDragStart(node.nodeKey, e)} + onPortMouseDown={handlePortInteraction} + /> + ))} + + + {/* Empty state */} + {nodes.length === 0 && ( + + + + + Drag nodes from the palette to start building your workflow + + + + )} + + + + {/* Node edit modal */} + {editingNode && ( + { if (!open) setEditingNode(null); }} + onSave={handleNodeSave} + /> + )} + + ); +} diff --git a/client/src/components/WorkflowCreateModal.tsx b/client/src/components/WorkflowCreateModal.tsx new file mode 100644 index 0000000..79b4fc4 --- /dev/null +++ b/client/src/components/WorkflowCreateModal.tsx @@ -0,0 +1,144 @@ +/** + * WorkflowCreateModal — create a new workflow (name + description + tags). + */ +import { useState } from "react"; +import { + Dialog, + DialogContent, + DialogHeader, + DialogTitle, +} from "@/components/ui/dialog"; +import { Button } from "@/components/ui/button"; +import { Input } from "@/components/ui/input"; +import { Label } from "@/components/ui/label"; +import { Textarea } from "@/components/ui/textarea"; +import { Badge } from "@/components/ui/badge"; +import { Plus, GitBranch, Loader2, X } from "lucide-react"; +import { trpc } from "@/lib/trpc"; +import { toast } from "sonner"; + +interface WorkflowCreateModalProps { + open: boolean; + onOpenChange: (open: boolean) => void; + onSuccess: (workflow: any) => void; +} + +export function WorkflowCreateModal({ open, onOpenChange, onSuccess }: WorkflowCreateModalProps) { + const [name, setName] = useState(""); + const [description, setDescription] = useState(""); + const [tagInput, setTagInput] = useState(""); + const [tags, setTags] = useState([]); + + const createMutation = trpc.workflows.create.useMutation({ + onSuccess: (wf) => { + toast.success(`Workflow "${wf?.name}" created`); + onSuccess(wf); + handleReset(); + onOpenChange(false); + }, + onError: (err) => { + toast.error(`Failed: ${err.message}`); + }, + }); + + const handleReset = () => { + setName(""); + setDescription(""); + setTags([]); + setTagInput(""); + }; + + const handleAddTag = () => { + const trimmed = tagInput.trim(); + if (trimmed && !tags.includes(trimmed)) { + setTags([...tags, trimmed]); + setTagInput(""); + } + }; + + const handleRemoveTag = (tag: string) => { + setTags(tags.filter((t) => t !== tag)); + }; + + const handleCreate = () => { + if (!name.trim()) { + toast.error("Workflow name is required"); + return; + } + createMutation.mutate({ name: name.trim(), description: description.trim() || undefined, tags }); + }; + + return ( + { if (!v) handleReset(); onOpenChange(v); }}> + + + + + New Workflow + + + + + + Name * + setName(e.target.value)} + placeholder="e.g. Content Pipeline" + className="mt-1" + autoFocus + /> + + + + Description + setDescription(e.target.value)} + placeholder="What does this workflow do?" + className="mt-1 min-h-[80px]" + /> + + + + Tags + + setTagInput(e.target.value)} + onKeyDown={(e) => { if (e.key === "Enter") { e.preventDefault(); handleAddTag(); } }} + placeholder="Add tag..." + className="flex-1" + /> + + + + + {tags.length > 0 && ( + + {tags.map((tag) => ( + + {tag} + handleRemoveTag(tag)} /> + + ))} + + )} + + + + + onOpenChange(false)}>Cancel + + {createMutation.isPending ? : } + Create Workflow + + + + + ); +} diff --git a/client/src/components/WorkflowDashboard.tsx b/client/src/components/WorkflowDashboard.tsx new file mode 100644 index 0000000..d5b4dde --- /dev/null +++ b/client/src/components/WorkflowDashboard.tsx @@ -0,0 +1,233 @@ +/** + * WorkflowDashboard — monitoring panel for a single workflow. + * Shows: stats overview, run history, per-node results, real-time polling. + */ +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; +import { Badge } from "@/components/ui/badge"; +import { Button } from "@/components/ui/button"; +import { Progress } from "@/components/ui/progress"; +import { + Activity, + CheckCircle, + XCircle, + Clock, + Loader2, + Play, + RefreshCw, + Ban, + SkipForward, + BarChart2, + Zap, +} from "lucide-react"; +import { motion } from "framer-motion"; +import { trpc } from "@/lib/trpc"; +import { toast } from "sonner"; + +const STATUS_CONFIG: Record = { + pending: { color: "text-muted-foreground", bg: "bg-muted/15", icon: Clock }, + running: { color: "text-primary", bg: "bg-primary/15", icon: Loader2 }, + success: { color: "text-neon-green", bg: "bg-neon-green/15", icon: CheckCircle }, + failed: { color: "text-neon-red", bg: "bg-neon-red/15", icon: XCircle }, + cancelled: { color: "text-neon-amber", bg: "bg-neon-amber/15", icon: Ban }, + skipped: { color: "text-muted-foreground", bg: "bg-muted/15", icon: SkipForward }, +}; + +interface WorkflowDashboardProps { + workflowId: number; + workflowName: string; + onOpenCanvas: () => void; +} + +export default function WorkflowDashboard({ workflowId, workflowName, onOpenCanvas }: WorkflowDashboardProps) { + // Stats + const { data: stats, isLoading: statsLoading } = trpc.workflows.stats.useQuery( + { workflowId }, + { refetchInterval: 10_000 } + ); + + // Runs + const { data: runs = [], isLoading: runsLoading, refetch: refetchRuns } = trpc.workflows.listRuns.useQuery( + { workflowId, limit: 20 }, + { refetchInterval: 5_000 } + ); + + // Execute + const executeMutation = trpc.workflows.execute.useMutation({ + onSuccess: () => { + toast.success("Workflow run started"); + refetchRuns(); + }, + onError: (e) => toast.error(e.message), + }); + + // Cancel + const cancelMutation = trpc.workflows.cancelRun.useMutation({ + onSuccess: () => { + toast.success("Run cancelled"); + refetchRuns(); + }, + }); + + const formatDuration = (ms?: number | null) => { + if (!ms) return "—"; + if (ms < 1000) return `${ms}ms`; + return `${(ms / 1000).toFixed(1)}s`; + }; + + const formatTime = (d: any) => { + if (!d) return "—"; + return new Date(d).toLocaleTimeString("ru-RU", { hour: "2-digit", minute: "2-digit", second: "2-digit" }); + }; + + return ( + + {/* Header */} + + + {workflowName} + Workflow Dashboard · Real-time monitoring + + + + Open Canvas + + executeMutation.mutate({ workflowId })} + disabled={executeMutation.isPending} + className="bg-neon-green/15 text-neon-green border border-neon-green/30 hover:bg-neon-green/25" + > + {executeMutation.isPending ? : } + Run + + + + + {/* Stats cards */} + + + + + + + + + {/* Run history */} + + + + + Run History + + 5s + + + + + {runsLoading ? ( + + + Loading runs... + + ) : runs.length === 0 ? ( + + + No runs yet + + ) : ( + + {runs.map((run: any, i: number) => { + const sc = STATUS_CONFIG[run.status] ?? STATUS_CONFIG.pending; + const StatusIcon = sc.icon; + const nodeResults = (run.nodeResults ?? {}) as Record; + const nodeCount = Object.keys(nodeResults).length; + const successNodes = Object.values(nodeResults).filter((r: any) => r.status === "success").length; + + return ( + + + + + {run.runKey} + + {run.status.toUpperCase()} + + + + {run.status === "running" && ( + cancelMutation.mutate({ runKey: run.runKey })} + > + Cancel + + )} + {formatTime(run.createdAt)} + + + + {/* Node progress */} + {nodeCount > 0 && ( + + + Nodes: + {successNodes}/{nodeCount} + + 0 ? (successNodes / nodeCount) * 100 : 0} className="h-1.5" /> + + {Object.entries(nodeResults).map(([key, result]: [string, any]) => { + const nsc = STATUS_CONFIG[result.status] ?? STATUS_CONFIG.pending; + return ( + + {key.replace("node_", "").slice(0, 8)} + + ); + })} + + + )} + + {/* Duration & error */} + + {run.totalDurationMs && ( + Duration: {formatDuration(run.totalDurationMs)} + )} + {run.currentNodeKey && run.status === "running" && ( + Current: {run.currentNodeKey} + )} + + {run.errorMessage && ( + {run.errorMessage} + )} + + ); + })} + + )} + + + + ); +} + +function StatsCard({ label, value, color, icon: Icon }: { label: string; value: string; color: string; icon: any }) { + return ( + + + + + {label} + + {value} + + + ); +} diff --git a/client/src/components/WorkflowNodeBlock.tsx b/client/src/components/WorkflowNodeBlock.tsx new file mode 100644 index 0000000..3d63778 --- /dev/null +++ b/client/src/components/WorkflowNodeBlock.tsx @@ -0,0 +1,251 @@ +/** + * WorkflowNodeBlock — individual draggable node block inside the canvas. + * Rendered as a card with kind-specific icon/color, label, and connection ports. + * The runtime status overlay (running/success/failed) is shown during execution. + */ +import { + Bot, + Box, + Play, + GitFork, + Flag, + GripVertical, + Trash2, + Settings, + Loader2, + CheckCircle, + XCircle, + SkipForward, +} from "lucide-react"; +import { Badge } from "@/components/ui/badge"; +import { Button } from "@/components/ui/button"; +import { motion } from "framer-motion"; + +export type NodeKind = "agent" | "container" | "trigger" | "condition" | "output"; + +export interface WFNodeData { + nodeKey: string; + label: string; + kind: NodeKind; + agentId?: number | null; + agentName?: string; + containerConfig?: Record; + conditionExpr?: string; + triggerConfig?: Record; + posX: number; + posY: number; + meta?: Record; + /** Runtime status — set during execution */ + runStatus?: "pending" | "running" | "success" | "failed" | "skipped"; + runDurationMs?: number; + runError?: string; +} + +const KIND_CONFIG: Record = { + trigger: { icon: Play, color: "text-neon-green", bg: "bg-neon-green/10", border: "border-neon-green/40", label: "Trigger" }, + agent: { icon: Bot, color: "text-primary", bg: "bg-primary/10", border: "border-primary/40", label: "Agent" }, + container: { icon: Box, color: "text-neon-amber", bg: "bg-neon-amber/10", border: "border-neon-amber/40", label: "Container" }, + condition: { icon: GitFork, color: "text-purple-400", bg: "bg-purple-400/10", border: "border-purple-400/40", label: "Condition" }, + output: { icon: Flag, color: "text-cyan-400", bg: "bg-cyan-400/10", border: "border-cyan-400/40", label: "Output" }, +}; + +const STATUS_OVERLAY: Record = { + running: { icon: Loader2, color: "text-primary", animate: true }, + success: { icon: CheckCircle, color: "text-neon-green" }, + failed: { icon: XCircle, color: "text-neon-red" }, + skipped: { icon: SkipForward, color: "text-muted-foreground" }, +}; + +interface WorkflowNodeBlockProps { + node: WFNodeData; + selected?: boolean; + onSelect?: () => void; + onDelete?: () => void; + onEdit?: () => void; + onDragStart?: (e: React.MouseEvent) => void; + showPorts?: boolean; + /** Connection port mouse-down handlers */ + onPortMouseDown?: (nodeKey: string, portType: "input" | "output", e: React.MouseEvent) => void; +} + +export function WorkflowNodeBlock({ + node, + selected, + onSelect, + onDelete, + onEdit, + onDragStart, + showPorts = true, + onPortMouseDown, +}: WorkflowNodeBlockProps) { + const cfg = KIND_CONFIG[node.kind]; + const Icon = cfg.icon; + const statusOverlay = node.runStatus ? STATUS_OVERLAY[node.runStatus] : null; + const StatusIcon = statusOverlay?.icon; + + return ( + { e.stopPropagation(); onSelect?.(); }} + onMouseDown={onDragStart} + > + {/* Input port */} + {showPorts && node.kind !== "trigger" && ( + { e.stopPropagation(); onPortMouseDown?.(node.nodeKey, "input", e); }} + title="Input" + /> + )} + + {/* Header */} + + + + + + + {node.label} + {cfg.label} + + {/* Status overlay */} + {statusOverlay && StatusIcon && ( + + )} + + + {/* Body */} + + {node.kind === "agent" && ( + + {node.agentName ? ( + Agent: {node.agentName} + ) : node.agentId ? ( + Agent ID: #{node.agentId} + ) : ( + No agent assigned + )} + + )} + {node.kind === "container" && ( + + {node.containerConfig?.image ? ( + Image: {node.containerConfig.image as string} + ) : ( + No image configured + )} + + )} + {node.kind === "condition" && ( + + {node.conditionExpr ? ( + {node.conditionExpr} + ) : ( + No condition set + )} + + )} + {node.kind === "trigger" && ( + + {node.triggerConfig?.type === "cron" ? ( + Cron: {node.triggerConfig.cron as string} + ) : node.triggerConfig?.type === "webhook" ? ( + Webhook: {node.triggerConfig.webhookPath as string} + ) : ( + Manual start + )} + + )} + {node.kind === "output" && ( + + Final output + + )} + + {/* Runtime info */} + {node.runDurationMs !== undefined && node.runStatus !== "pending" && node.runStatus !== "running" && ( + + Duration: {node.runDurationMs}ms + + )} + {node.runError && ( + + {node.runError} + + )} + + + {/* Actions (visible when selected) */} + {selected && ( + + { e.stopPropagation(); onEdit?.(); }} + > + + + { e.stopPropagation(); onDelete?.(); }} + > + + + + )} + + {/* Output port */} + {showPorts && node.kind !== "output" && ( + { e.stopPropagation(); onPortMouseDown?.(node.nodeKey, "output", e); }} + title="Output" + /> + )} + + ); +} + +/** + * Mini node block for the sidebar palette (drag source). + */ +export function WorkflowNodePaletteItem({ + kind, + onDragStart, +}: { + kind: NodeKind; + onDragStart: (kind: NodeKind) => void; +}) { + const cfg = KIND_CONFIG[kind]; + const Icon = cfg.icon; + + return ( + onDragStart(kind)} + > + + + + {cfg.label} + + ); +} diff --git a/client/src/components/WorkflowNodeEditModal.tsx b/client/src/components/WorkflowNodeEditModal.tsx new file mode 100644 index 0000000..eacf714 --- /dev/null +++ b/client/src/components/WorkflowNodeEditModal.tsx @@ -0,0 +1,253 @@ +/** + * WorkflowNodeEditModal — configure individual node properties. + * Agent nodes get a selector for existing agents. + * Container nodes get image/env/ports fields. + * Condition nodes get an expression editor. + * Trigger nodes get type/cron/webhook fields. + */ +import { useState, useEffect } from "react"; +import { + Dialog, + DialogContent, + DialogHeader, + DialogTitle, +} from "@/components/ui/dialog"; +import { Button } from "@/components/ui/button"; +import { Input } from "@/components/ui/input"; +import { Label } from "@/components/ui/label"; +import { Textarea } from "@/components/ui/textarea"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select"; +import { Badge } from "@/components/ui/badge"; +import { Bot, Box, Play, GitFork, Flag, Save } from "lucide-react"; +import type { WFNodeData, NodeKind } from "./WorkflowNodeBlock"; + +interface WorkflowNodeEditModalProps { + node: WFNodeData; + agents: any[]; + open: boolean; + onOpenChange: (open: boolean) => void; + onSave: (node: WFNodeData) => void; +} + +export function WorkflowNodeEditModal({ + node, + agents, + open, + onOpenChange, + onSave, +}: WorkflowNodeEditModalProps) { + const [label, setLabel] = useState(node.label); + const [agentId, setAgentId] = useState(node.agentId ? String(node.agentId) : ""); + const [dockerImage, setDockerImage] = useState((node.containerConfig?.image as string) ?? ""); + const [dockerEnv, setDockerEnv] = useState((node.containerConfig?.env as string[] ?? []).join("\n")); + const [dockerCommand, setDockerCommand] = useState((node.containerConfig?.command as string) ?? ""); + const [conditionExpr, setConditionExpr] = useState(node.conditionExpr ?? ""); + const [triggerType, setTriggerType] = useState((node.triggerConfig?.type as string) ?? "manual"); + const [cronExpr, setCronExpr] = useState((node.triggerConfig?.cron as string) ?? ""); + const [webhookPath, setWebhookPath] = useState((node.triggerConfig?.webhookPath as string) ?? ""); + + useEffect(() => { + setLabel(node.label); + setAgentId(node.agentId ? String(node.agentId) : ""); + setDockerImage((node.containerConfig?.image as string) ?? ""); + setDockerEnv((node.containerConfig?.env as string[] ?? []).join("\n")); + setDockerCommand((node.containerConfig?.command as string) ?? ""); + setConditionExpr(node.conditionExpr ?? ""); + setTriggerType((node.triggerConfig?.type as string) ?? "manual"); + setCronExpr((node.triggerConfig?.cron as string) ?? ""); + setWebhookPath((node.triggerConfig?.webhookPath as string) ?? ""); + }, [node]); + + const handleSave = () => { + const selectedAgent = agents.find((a: any) => a.id === Number(agentId)); + const updated: WFNodeData = { + ...node, + label, + agentId: agentId ? Number(agentId) : undefined, + agentName: selectedAgent?.name, + containerConfig: { + image: dockerImage, + env: dockerEnv.split("\n").filter(Boolean), + command: dockerCommand, + }, + conditionExpr, + triggerConfig: { + type: triggerType, + cron: cronExpr, + webhookPath, + }, + }; + onSave(updated); + }; + + const kindIcons: Record = { + trigger: Play, + agent: Bot, + container: Box, + condition: GitFork, + output: Flag, + }; + const KindIcon = kindIcons[node.kind]; + + return ( + + + + + + Edit {node.kind.charAt(0).toUpperCase() + node.kind.slice(1)} Node + + + + + {/* Label */} + + Label + setLabel(e.target.value)} + placeholder="Node name" + className="mt-1" + /> + + + {/* Agent config */} + {node.kind === "agent" && ( + + Agent + + + + + + {agents.map((agent: any) => ( + + + {agent.name} + {agent.role} + + + ))} + + + {agentId && (() => { + const a = agents.find((ag: any) => ag.id === Number(agentId)); + if (!a) return null; + return ( + + Model: {a.model} + Provider: {a.provider} + {a.description && {a.description}} + + ); + })()} + + )} + + {/* Container config */} + {node.kind === "container" && ( + <> + + Docker Image + setDockerImage(e.target.value)} + placeholder="e.g. python:3.12-slim" + className="mt-1 font-mono text-xs" + /> + + + Command + setDockerCommand(e.target.value)} + placeholder="e.g. python /app/main.py" + className="mt-1 font-mono text-xs" + /> + + + Environment Variables (one per line) + setDockerEnv(e.target.value)} + placeholder="KEY=VALUE" + className="mt-1 font-mono text-xs min-h-[60px]" + /> + + > + )} + + {/* Condition config */} + {node.kind === "condition" && ( + + Condition Expression + setConditionExpr(e.target.value)} + placeholder="e.g. input.length > 0" + className="mt-1 font-mono text-xs min-h-[60px]" + /> + + Evaluates to true/false. If false, downstream nodes are skipped. + + + )} + + {/* Trigger config */} + {node.kind === "trigger" && ( + <> + + Trigger Type + + + + + + Manual + Cron Schedule + Webhook + + + + {triggerType === "cron" && ( + + Cron Expression + setCronExpr(e.target.value)} + placeholder="*/5 * * * *" + className="mt-1 font-mono text-xs" + /> + + )} + {triggerType === "webhook" && ( + + Webhook Path + setWebhookPath(e.target.value)} + placeholder="/webhook/my-trigger" + className="mt-1 font-mono text-xs" + /> + + )} + > + )} + + + + onOpenChange(false)}>Cancel + + Save + + + + + ); +} diff --git a/client/src/pages/Nodes.tsx b/client/src/pages/Nodes.tsx index a01c196..67d1bb5 100644 --- a/client/src/pages/Nodes.tsx +++ b/client/src/pages/Nodes.tsx @@ -52,6 +52,7 @@ function AddNodeDialog({ onClose, onSuccess }: { onClose: () => void; onSuccess: const joinMut = trpc.nodes.joinNode.useMutation({ onSuccess: (data) => { setJoinResult(data as JoinResult); + // Trigger parent refresh but DO NOT close dialog — let user read the result if ((data as JoinResult).ok) onSuccess(); }, onError: (e) => setJoinResult({ ok: false, error: e.message, step: "trpc" }), @@ -60,6 +61,8 @@ function AddNodeDialog({ onClose, onSuccess }: { onClose: () => void; onSuccess: const busy = testMut.isPending || joinMut.isPending; const canAct = !!host.trim() && !!user.trim() && !!password && !busy; const joinDone = joinResult?.ok === true; + // After a successful join, allow re-testing but not re-joining + const canJoin = canAct && !joinDone; const handleTest = () => { setTestResult(null); @@ -73,6 +76,7 @@ function AddNodeDialog({ onClose, onSuccess }: { onClose: () => void; onSuccess: }; const inputClass = "h-8 text-xs font-mono"; + // Disable input fields while busy or after successful join const disabled = busy || joinDone; return ( @@ -262,7 +266,7 @@ function AddNodeDialog({ onClose, onSuccess }: { onClose: () => void; onSuccess: {/* Join swarm */} {joinMut.isPending @@ -1204,7 +1208,7 @@ export default function Nodes() { {showAddNode && ( setShowAddNode(false)} - onSuccess={() => { setShowAddNode(false); nodesQ.refetch(); swarmInfoQ.refetch(); }} + onSuccess={() => { nodesQ.refetch(); swarmInfoQ.refetch(); }} /> )} diff --git a/client/src/pages/Workflows.tsx b/client/src/pages/Workflows.tsx new file mode 100644 index 0000000..91406fc --- /dev/null +++ b/client/src/pages/Workflows.tsx @@ -0,0 +1,410 @@ +/** + * Workflows — Main page: list view, canvas constructor, and dashboard. + * + * Views: + * 1. List view — all workflows with status, stats, quick actions + * 2. Canvas view — visual drag-and-drop constructor (full screen) + * 3. Dashboard view — run monitoring for a selected workflow + * + * Design: Mission Control theme — dark bg, cyan glow, mono fonts. + */ +import { useState, useEffect } from "react"; +import { useRoute, useLocation } from "wouter"; +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; +import { Badge } from "@/components/ui/badge"; +import { Button } from "@/components/ui/button"; +import { + AlertDialog, + AlertDialogAction, + AlertDialogCancel, + AlertDialogContent, + AlertDialogDescription, + AlertDialogHeader, + AlertDialogTitle, +} from "@/components/ui/alert-dialog"; +import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs"; +import { + GitBranch, + Plus, + Play, + Pause, + Trash2, + Settings, + Loader2, + AlertCircle, + Activity, + Clock, + CheckCircle, + XCircle, + Eye, + Pencil, + Archive, + Zap, + BarChart2, +} from "lucide-react"; +import { motion } from "framer-motion"; +import { toast } from "sonner"; +import { trpc } from "@/lib/trpc"; +import { WorkflowCreateModal } from "@/components/WorkflowCreateModal"; +import WorkflowCanvas from "@/components/WorkflowCanvas"; +import WorkflowDashboard from "@/components/WorkflowDashboard"; +import type { WFNodeData } from "@/components/WorkflowNodeBlock"; +import type { WFEdgeData } from "@/components/WorkflowCanvas"; + +const STATUS_STYLE: Record = { + draft: { badge: "bg-muted/15 text-muted-foreground border-border", dot: "bg-muted-foreground" }, + active: { badge: "bg-neon-green/15 text-neon-green border-neon-green/30", dot: "bg-neon-green pulse-indicator" }, + paused: { badge: "bg-neon-amber/15 text-neon-amber border-neon-amber/30", dot: "bg-neon-amber" }, + archived: { badge: "bg-muted/15 text-muted-foreground border-border", dot: "bg-muted-foreground" }, +}; + +type ViewMode = "list" | "canvas" | "dashboard"; + +export default function Workflows() { + const [, params] = useRoute("/workflows/:id"); + const [, navigate] = useLocation(); + + const [viewMode, setViewMode] = useState("list"); + const [selectedWorkflowId, setSelectedWorkflowId] = useState( + params?.id ? Number(params.id) : null + ); + const [createModalOpen, setCreateModalOpen] = useState(false); + const [deleteConfirmOpen, setDeleteConfirmOpen] = useState(false); + const [workflowToDelete, setWorkflowToDelete] = useState(null); + + // If URL has /workflows/:id, load that workflow + useEffect(() => { + if (params?.id) { + setSelectedWorkflowId(Number(params.id)); + setViewMode("dashboard"); + } + }, [params?.id]); + + // List all workflows + const { data: workflows = [], isLoading, refetch } = trpc.workflows.list.useQuery(undefined, { + refetchInterval: 30_000, + }); + + // Get single workflow (for canvas view) + const { data: selectedWorkflow } = trpc.workflows.get.useQuery( + { id: selectedWorkflowId! }, + { enabled: !!selectedWorkflowId && (viewMode === "canvas" || viewMode === "dashboard") } + ); + + // Get latest run for polling node statuses + const { data: latestRuns } = trpc.workflows.listRuns.useQuery( + { workflowId: selectedWorkflowId!, limit: 1 }, + { enabled: !!selectedWorkflowId && viewMode === "canvas", refetchInterval: 3_000 } + ); + + // Mutations + const deleteMutation = trpc.workflows.delete.useMutation({ + onSuccess: () => { + toast.success("Workflow deleted"); + setDeleteConfirmOpen(false); + setWorkflowToDelete(null); + refetch(); + }, + onError: (e) => toast.error(e.message), + }); + + const updateMutation = trpc.workflows.update.useMutation({ + onSuccess: () => { + toast.success("Workflow updated"); + refetch(); + }, + }); + + const handleOpenCanvas = (id: number) => { + setSelectedWorkflowId(id); + setViewMode("canvas"); + }; + + const handleOpenDashboard = (id: number) => { + setSelectedWorkflowId(id); + setViewMode("dashboard"); + navigate(`/workflows/${id}`); + }; + + const handleBackToList = () => { + setViewMode("list"); + setSelectedWorkflowId(null); + navigate("/workflows"); + }; + + const handleToggleStatus = (id: number, currentStatus: string) => { + const newStatus = currentStatus === "active" ? "paused" : "active"; + updateMutation.mutate({ id, status: newStatus as any }); + }; + + // Build canvas data from server response + const canvasNodes: WFNodeData[] = (selectedWorkflow?.nodes ?? []).map((n: any) => ({ + nodeKey: n.nodeKey, + label: n.label, + kind: n.kind, + agentId: n.agentId, + containerConfig: n.containerConfig, + conditionExpr: n.conditionExpr, + triggerConfig: n.triggerConfig, + posX: n.posX ?? 0, + posY: n.posY ?? 0, + meta: n.meta, + })); + + const canvasEdges: WFEdgeData[] = (selectedWorkflow?.edges ?? []).map((e: any) => ({ + edgeKey: e.edgeKey, + sourceNodeKey: e.sourceNodeKey, + targetNodeKey: e.targetNodeKey, + sourceHandle: e.sourceHandle, + targetHandle: e.targetHandle, + label: e.label, + meta: e.meta, + })); + + // Run results for canvas overlay + const latestRun = latestRuns?.[0]; + const runResults = latestRun?.status === "running" || latestRun?.status === "success" || latestRun?.status === "failed" + ? (latestRun.nodeResults as any) ?? {} + : undefined; + + // ─── Canvas View ────────────────────────────────────────────────────────── + if (viewMode === "canvas" && selectedWorkflowId && selectedWorkflow) { + return ( + + + + ); + } + + // ─── Dashboard View ─────────────────────────────────────────────────────── + if (viewMode === "dashboard" && selectedWorkflowId && selectedWorkflow) { + return ( + + + ← Back to Workflows + + + + + + Dashboard + + + Canvas + + + + setViewMode("canvas")} + /> + + + + {}} + /> + + + + + ); + } + + // ─── List View ──────────────────────────────────────────────────────────── + return ( + + {/* Header */} + + + Workflows + + {workflows.length} workflows · Visual pipeline constructor + + + setCreateModalOpen(true)} + > + + New Workflow + + + + {/* Loading */} + {isLoading ? ( + + + + ) : workflows.length === 0 ? ( + /* Empty state */ + + + + No Workflows Yet + + Create your first workflow to build visual agent pipelines. + + setCreateModalOpen(true)} + className="bg-primary/15 text-primary border border-primary/30 hover:bg-primary/25" + > + + Create First Workflow + + + + ) : ( + /* Workflow grid */ + + {workflows.map((wf: any, i: number) => { + const ss = STATUS_STYLE[wf.status] ?? STATUS_STYLE.draft; + return ( + + handleOpenDashboard(wf.id)} + > + + {/* Top row */} + + + + + + + + {wf.name} + + {wf.description && ( + {wf.description} + )} + + + + + {wf.status.toUpperCase()} + + + + {/* Tags */} + {wf.tags && wf.tags.length > 0 && ( + + {(wf.tags as string[]).map((tag: string) => ( + + {tag} + + ))} + + )} + + {/* Dates */} + + + + Created: {new Date(wf.createdAt).toLocaleDateString()} + + + + Updated: {new Date(wf.updatedAt).toLocaleDateString()} + + + + {/* Actions */} + + { e.stopPropagation(); handleOpenCanvas(wf.id); }} + > + Edit + + { e.stopPropagation(); handleOpenDashboard(wf.id); }} + > + Monitor + + { e.stopPropagation(); handleToggleStatus(wf.id, wf.status); }} + > + {wf.status === "active" ? : } + {wf.status === "active" ? "Pause" : "Activate"} + + { e.stopPropagation(); setWorkflowToDelete(wf.id); setDeleteConfirmOpen(true); }} + > + + + + + + + ); + })} + + )} + + {/* Create modal */} + refetch()} + /> + + {/* Delete confirmation */} + + + + Delete Workflow + + This will permanently delete the workflow, all nodes, edges, and run history. This action cannot be undone. + + + + Cancel + workflowToDelete && deleteMutation.mutate({ id: workflowToDelete })} + disabled={deleteMutation.isPending} + className="bg-neon-red hover:bg-neon-red/90" + > + {deleteMutation.isPending ? : null} + Delete + + + + + + ); +} diff --git a/drizzle/0007_workflows.sql b/drizzle/0007_workflows.sql new file mode 100644 index 0000000..f8e29e1 --- /dev/null +++ b/drizzle/0007_workflows.sql @@ -0,0 +1,77 @@ +-- Workflows: pipeline definitions +CREATE TABLE `workflows` ( + `id` int AUTO_INCREMENT NOT NULL, + `name` varchar(255) NOT NULL, + `description` text, + `status` enum('draft','active','paused','archived') NOT NULL DEFAULT 'draft', + `canvasMeta` json DEFAULT ('{}'), + `tags` json DEFAULT ('[]'), + `createdBy` int, + `createdAt` timestamp NOT NULL DEFAULT (now()), + `updatedAt` timestamp NOT NULL DEFAULT (now()) ON UPDATE CURRENT_TIMESTAMP, + CONSTRAINT `workflows_id` PRIMARY KEY(`id`) +); +--> statement-breakpoint + +-- Workflow Nodes: blocks inside a workflow (agent / container / trigger / condition / output) +CREATE TABLE `workflowNodes` ( + `id` int AUTO_INCREMENT NOT NULL, + `workflowId` int NOT NULL, + `nodeKey` varchar(64) NOT NULL, + `label` varchar(255) NOT NULL, + `kind` enum('agent','container','trigger','condition','output') NOT NULL, + `agentId` int, + `containerConfig` json DEFAULT ('{}'), + `conditionExpr` text, + `triggerConfig` json DEFAULT ('{}'), + `posX` int DEFAULT 0, + `posY` int DEFAULT 0, + `meta` json DEFAULT ('{}'), + `createdAt` timestamp NOT NULL DEFAULT (now()), + `updatedAt` timestamp NOT NULL DEFAULT (now()) ON UPDATE CURRENT_TIMESTAMP, + CONSTRAINT `workflowNodes_id` PRIMARY KEY(`id`) +); +--> statement-breakpoint +CREATE INDEX `workflowNodes_workflowId_idx` ON `workflowNodes` (`workflowId`); +--> statement-breakpoint + +-- Workflow Edges: connections between nodes +CREATE TABLE `workflowEdges` ( + `id` int AUTO_INCREMENT NOT NULL, + `workflowId` int NOT NULL, + `edgeKey` varchar(64) NOT NULL, + `sourceNodeKey` varchar(64) NOT NULL, + `targetNodeKey` varchar(64) NOT NULL, + `sourceHandle` varchar(64), + `targetHandle` varchar(64), + `label` varchar(128), + `meta` json DEFAULT ('{}'), + `createdAt` timestamp NOT NULL DEFAULT (now()), + CONSTRAINT `workflowEdges_id` PRIMARY KEY(`id`) +); +--> statement-breakpoint +CREATE INDEX `workflowEdges_workflowId_idx` ON `workflowEdges` (`workflowId`); +--> statement-breakpoint + +-- Workflow Runs: execution history with per-node results +CREATE TABLE `workflowRuns` ( + `id` int AUTO_INCREMENT NOT NULL, + `workflowId` int NOT NULL, + `runKey` varchar(64) NOT NULL, + `status` enum('pending','running','success','failed','cancelled') NOT NULL DEFAULT 'pending', + `nodeResults` json DEFAULT ('{}'), + `currentNodeKey` varchar(64), + `input` text, + `output` text, + `totalDurationMs` int, + `errorMessage` text, + `startedAt` timestamp, + `finishedAt` timestamp, + `createdAt` timestamp NOT NULL DEFAULT (now()), + CONSTRAINT `workflowRuns_id` PRIMARY KEY(`id`), + CONSTRAINT `workflowRuns_runKey_unique` UNIQUE(`runKey`) +); +--> statement-breakpoint +CREATE INDEX `workflowRuns_workflowId_idx` ON `workflowRuns` (`workflowId`); +--> statement-breakpoint +CREATE INDEX `workflowRuns_status_idx` ON `workflowRuns` (`status`); diff --git a/drizzle/schema.ts b/drizzle/schema.ts index 397a1cd..f81309b 100644 --- a/drizzle/schema.ts +++ b/drizzle/schema.ts @@ -274,3 +274,128 @@ export const chatEvents = mysqlTable("chatEvents", { export type ChatEvent = typeof chatEvents.$inferSelect; export type InsertChatEvent = typeof chatEvents.$inferInsert; + +// ─── Workflows ──────────────────────────────────────────────────────────────── + +/** + * Workflows — visual pipeline definitions composed of agent/container nodes. + * Each workflow is a directed graph stored as nodes + edges. + */ +export const workflows = mysqlTable("workflows", { + id: int("id").autoincrement().primaryKey(), + name: varchar("name", { length: 255 }).notNull(), + description: text("description"), + /** Visual status used in the list/dashboard */ + status: mysqlEnum("status", ["draft", "active", "paused", "archived"]).default("draft").notNull(), + /** JSON blob of canvas-level metadata: viewport position, zoom, layout hints */ + canvasMeta: json("canvasMeta").$type<{ viewportX?: number; viewportY?: number; zoom?: number }>().default({}), + tags: json("tags").$type().default([]), + createdBy: int("createdBy"), + createdAt: timestamp("createdAt").defaultNow().notNull(), + updatedAt: timestamp("updatedAt").defaultNow().onUpdateNow().notNull(), +}); + +export type Workflow = typeof workflows.$inferSelect; +export type InsertWorkflow = typeof workflows.$inferInsert; + +/** + * Workflow Nodes — individual blocks inside a workflow. + * Each node references either an agent (agentId) or an arbitrary container config. + */ +export const workflowNodes = mysqlTable("workflowNodes", { + id: int("id").autoincrement().primaryKey(), + workflowId: int("workflowId").notNull(), + /** Unique client-side ID used by the canvas (e.g. "node_abc123") */ + nodeKey: varchar("nodeKey", { length: 64 }).notNull(), + label: varchar("label", { length: 255 }).notNull(), + /** Node kind: agent = uses an existing agent; container = custom Docker image; trigger = entry point; output = terminal */ + kind: mysqlEnum("kind", ["agent", "container", "trigger", "condition", "output"]).notNull(), + /** Link to agents table (nullable — only for kind=agent) */ + agentId: int("agentId"), + /** For kind=container: Docker image, env vars, ports etc. */ + containerConfig: json("containerConfig").$type<{ + image?: string; + env?: string[]; + ports?: string[]; + command?: string; + volumes?: string[]; + }>().default({}), + /** For kind=condition: JS expression evaluated at runtime */ + conditionExpr: text("conditionExpr"), + /** Trigger config: cron, webhook, manual */ + triggerConfig: json("triggerConfig").$type<{ type?: string; cron?: string; webhookPath?: string }>().default({}), + /** Canvas position */ + posX: int("posX").default(0), + posY: int("posY").default(0), + /** Extra metadata (colour, icon override, etc.) */ + meta: json("meta").$type>().default({}), + createdAt: timestamp("createdAt").defaultNow().notNull(), + updatedAt: timestamp("updatedAt").defaultNow().onUpdateNow().notNull(), +}, (table) => ({ + workflowIdIdx: index("workflowNodes_workflowId_idx").on(table.workflowId), +})); + +export type WorkflowNode = typeof workflowNodes.$inferSelect; +export type InsertWorkflowNode = typeof workflowNodes.$inferInsert; + +/** + * Workflow Edges — connections between nodes. + */ +export const workflowEdges = mysqlTable("workflowEdges", { + id: int("id").autoincrement().primaryKey(), + workflowId: int("workflowId").notNull(), + /** Edge identifier on the canvas */ + edgeKey: varchar("edgeKey", { length: 64 }).notNull(), + sourceNodeKey: varchar("sourceNodeKey", { length: 64 }).notNull(), + targetNodeKey: varchar("targetNodeKey", { length: 64 }).notNull(), + /** Optional: which output handle → which input handle */ + sourceHandle: varchar("sourceHandle", { length: 64 }), + targetHandle: varchar("targetHandle", { length: 64 }), + /** Edge label (e.g. "on success", "on fail") */ + label: varchar("label", { length: 128 }), + /** Visual styling */ + meta: json("meta").$type>().default({}), + createdAt: timestamp("createdAt").defaultNow().notNull(), +}, (table) => ({ + workflowIdIdx: index("workflowEdges_workflowId_idx").on(table.workflowId), +})); + +export type WorkflowEdge = typeof workflowEdges.$inferSelect; +export type InsertWorkflowEdge = typeof workflowEdges.$inferInsert; + +/** + * Workflow Runs — execution history. Each run tracks overall status and + * per-node results so the dashboard can show progress in real-time. + */ +export const workflowRuns = mysqlTable("workflowRuns", { + id: int("id").autoincrement().primaryKey(), + workflowId: int("workflowId").notNull(), + runKey: varchar("runKey", { length: 64 }).notNull().unique(), + status: mysqlEnum("status", ["pending", "running", "success", "failed", "cancelled"]).default("pending").notNull(), + /** Per-node execution results: { [nodeKey]: { status, output, durationMs, error? } } */ + nodeResults: json("nodeResults").$type>().default({}), + /** The node currently being executed */ + currentNodeKey: varchar("currentNodeKey", { length: 64 }), + /** Global input passed to the first node */ + input: text("input"), + /** Final aggregated output */ + output: text("output"), + totalDurationMs: int("totalDurationMs"), + errorMessage: text("errorMessage"), + startedAt: timestamp("startedAt"), + finishedAt: timestamp("finishedAt"), + createdAt: timestamp("createdAt").defaultNow().notNull(), +}, (table) => ({ + workflowIdIdx: index("workflowRuns_workflowId_idx").on(table.workflowId), + statusIdx: index("workflowRuns_status_idx").on(table.status), +})); + +export type WorkflowRun = typeof workflowRuns.$inferSelect; +export type InsertWorkflowRun = typeof workflowRuns.$inferInsert; diff --git a/gateway/internal/api/handlers.go b/gateway/internal/api/handlers.go index 3fa6755..7f4de08 100644 --- a/gateway/internal/api/handlers.go +++ b/gateway/internal/api/handlers.go @@ -1510,6 +1510,21 @@ func (h *Handler) SwarmJoinNodeViaSSH(w http.ResponseWriter, r *http.Request) { } log.Printf("[SwarmJoinNode] Success: %s joined as %s", body.Host, body.Role) + + // Give Docker Swarm ~3 seconds to propagate the new node, then sync to DB. + go func() { + time.Sleep(3 * time.Second) + nodes, err := h.docker.ListNodes() + if err != nil { + log.Printf("[SwarmJoinNode] DB sync failed (ListNodes): %v", err) + return + } + if h.db != nil { + h.db.UpsertSwarmNodes(nodes) + log.Printf("[SwarmJoinNode] DB synced: %d nodes after join", len(nodes)) + } + }() + respond(w, http.StatusOK, map[string]any{ "ok": true, "output": output, @@ -1575,9 +1590,13 @@ func (h *Handler) SwarmSSHTest(w http.ResponseWriter, r *http.Request) { } defer sess.Close() - out, _ := sess.CombinedOutput("docker version --format '{{.Server.Version}}' 2>/dev/null || echo 'docker_not_found'") + // Use plain 'docker info' to get server version — works on all distros + out, _ := sess.CombinedOutput("docker info --format '{{.ServerVersion}}' 2>/dev/null || docker version --format '{{.Server.Version}}' 2>/dev/null || echo 'docker_not_found'") dockerVer := strings.TrimSpace(string(out)) - dockerOk := dockerVer != "" && dockerVer != "docker_not_found" + if dockerVer == "" { + dockerVer = "docker_not_found" + } + dockerOk := dockerVer != "docker_not_found" && !strings.Contains(dockerVer, "not found") && !strings.Contains(dockerVer, "command not found") log.Printf("[SSHTest] %s — SSH OK, docker: %s", addr, dockerVer) respond(w, http.StatusOK, map[string]any{ diff --git a/server/routers.ts b/server/routers.ts index d324d2e..77f091d 100644 --- a/server/routers.ts +++ b/server/routers.ts @@ -1100,5 +1100,143 @@ export const appRouter = router({ return result; }), }), + /** + * Workflows — visual pipeline builder (CRUD + execution) + */ + workflows: router({ + /** List all workflows */ + list: publicProcedure.query(async () => { + const { getAllWorkflows } = await import("./workflows"); + return getAllWorkflows(); + }), + + /** Get a single workflow with its nodes and edges */ + get: publicProcedure + .input(z.object({ id: z.number() })) + .query(async ({ input }) => { + const { getWorkflowById } = await import("./workflows"); + return getWorkflowById(input.id); + }), + + /** Create a new workflow */ + create: publicProcedure + .input(z.object({ + name: z.string().min(1), + description: z.string().optional(), + tags: z.array(z.string()).default([]), + })) + .mutation(async ({ input }) => { + const { createWorkflow } = await import("./workflows"); + return createWorkflow({ ...input, status: "draft" }); + }), + + /** Update workflow metadata */ + update: publicProcedure + .input(z.object({ + id: z.number(), + name: z.string().optional(), + description: z.string().optional(), + status: z.enum(["draft", "active", "paused", "archived"]).optional(), + tags: z.array(z.string()).optional(), + })) + .mutation(async ({ input }) => { + const { updateWorkflow } = await import("./workflows"); + const { id, ...data } = input; + return updateWorkflow(id, data as any); + }), + + /** Delete a workflow and all its nodes/edges/runs */ + delete: publicProcedure + .input(z.object({ id: z.number() })) + .mutation(async ({ input }) => { + const { deleteWorkflow } = await import("./workflows"); + return deleteWorkflow(input.id); + }), + + /** Save the full canvas (nodes + edges) atomically */ + saveCanvas: publicProcedure + .input(z.object({ + workflowId: z.number(), + nodes: z.array(z.object({ + nodeKey: z.string(), + label: z.string(), + kind: z.enum(["agent", "container", "trigger", "condition", "output"]), + agentId: z.number().nullable().optional(), + containerConfig: z.record(z.string(), z.unknown()).optional(), + conditionExpr: z.string().optional(), + triggerConfig: z.record(z.string(), z.unknown()).optional(), + posX: z.number().default(0), + posY: z.number().default(0), + meta: z.record(z.string(), z.unknown()).optional(), + })), + edges: z.array(z.object({ + edgeKey: z.string(), + sourceNodeKey: z.string(), + targetNodeKey: z.string(), + sourceHandle: z.string().optional(), + targetHandle: z.string().optional(), + label: z.string().optional(), + meta: z.record(z.string(), z.unknown()).optional(), + })), + canvasMeta: z.record(z.string(), z.unknown()).optional(), + })) + .mutation(async ({ input }) => { + const { saveCanvas } = await import("./workflows"); + return saveCanvas( + input.workflowId, + input.nodes.map((n) => ({ ...n, workflowId: input.workflowId } as any)), + input.edges.map((e) => ({ ...e, workflowId: input.workflowId } as any)), + input.canvasMeta, + ); + }), + + /** Execute a full workflow */ + execute: publicProcedure + .input(z.object({ workflowId: z.number(), input: z.string().optional() })) + .mutation(async ({ input }) => { + const { executeWorkflow } = await import("./workflows"); + return executeWorkflow(input.workflowId, input.input); + }), + + /** Execute a single node (for testing) */ + executeNode: publicProcedure + .input(z.object({ workflowId: z.number(), nodeKey: z.string(), input: z.string() })) + .mutation(async ({ input }) => { + const { executeSingleNode } = await import("./workflows"); + return executeSingleNode(input.workflowId, input.nodeKey, input.input); + }), + + /** Cancel a running workflow */ + cancelRun: publicProcedure + .input(z.object({ runKey: z.string() })) + .mutation(async ({ input }) => { + const { cancelRun } = await import("./workflows"); + return cancelRun(input.runKey); + }), + + /** Get run details */ + getRun: publicProcedure + .input(z.object({ runKey: z.string() })) + .query(async ({ input }) => { + const { getRunByKey } = await import("./workflows"); + return getRunByKey(input.runKey); + }), + + /** List runs for a workflow */ + listRuns: publicProcedure + .input(z.object({ workflowId: z.number(), limit: z.number().default(50) })) + .query(async ({ input }) => { + const { getRunsByWorkflow } = await import("./workflows"); + return getRunsByWorkflow(input.workflowId, input.limit); + }), + + /** Get workflow stats */ + stats: publicProcedure + .input(z.object({ workflowId: z.number() })) + .query(async ({ input }) => { + const { getWorkflowStats } = await import("./workflows"); + return getWorkflowStats(input.workflowId); + }), + }), }); export type AppRouter = typeof appRouter; diff --git a/server/workflows.ts b/server/workflows.ts new file mode 100644 index 0000000..c96b627 --- /dev/null +++ b/server/workflows.ts @@ -0,0 +1,418 @@ +/** + * server/workflows.ts — Workflow CRUD, graph operations & execution engine. + * + * A Workflow is a directed graph of nodes (agents / containers / triggers / conditions / outputs) + * connected by edges. The execution engine walks the graph from trigger nodes, + * executing each agent/container block and forwarding the output downstream. + */ + +import { eq, desc, and, inArray } from "drizzle-orm"; +import { + workflows, workflowNodes, workflowEdges, workflowRuns, + type Workflow, type InsertWorkflow, + type WorkflowNode, type InsertWorkflowNode, + type WorkflowEdge, type InsertWorkflowEdge, + type WorkflowRun, +} from "../drizzle/schema"; +import { getDb } from "./db"; +import { nanoid } from "nanoid"; + +// ─── Workflow CRUD ──────────────────────────────────────────────────────────── + +export async function createWorkflow(data: Omit): Promise { + const db = await getDb(); + if (!db) return null; + const result = await db.insert(workflows).values(data); + const id = result[0].insertId; + const [row] = await db.select().from(workflows).where(eq(workflows.id, Number(id))).limit(1); + return row ?? null; +} + +export async function getAllWorkflows(): Promise { + const db = await getDb(); + if (!db) return []; + return db.select().from(workflows).orderBy(desc(workflows.updatedAt)); +} + +export async function getWorkflowById(id: number) { + const db = await getDb(); + if (!db) return null; + const [wf] = await db.select().from(workflows).where(eq(workflows.id, id)).limit(1); + if (!wf) return null; + + const nodes = await db.select().from(workflowNodes).where(eq(workflowNodes.workflowId, id)); + const edges = await db.select().from(workflowEdges).where(eq(workflowEdges.workflowId, id)); + return { ...wf, nodes, edges }; +} + +export async function updateWorkflow(id: number, data: Partial): Promise { + const db = await getDb(); + if (!db) return null; + await db.update(workflows).set(data).where(eq(workflows.id, id)); + const [row] = await db.select().from(workflows).where(eq(workflows.id, id)).limit(1); + return row ?? null; +} + +export async function deleteWorkflow(id: number): Promise { + const db = await getDb(); + if (!db) return false; + await db.delete(workflowEdges).where(eq(workflowEdges.workflowId, id)); + await db.delete(workflowNodes).where(eq(workflowNodes.workflowId, id)); + await db.delete(workflowRuns).where(eq(workflowRuns.workflowId, id)); + await db.delete(workflows).where(eq(workflows.id, id)); + return true; +} + +// ─── Nodes CRUD ─────────────────────────────────────────────────────────────── + +export async function saveNodes(workflowId: number, nodes: InsertWorkflowNode[]): Promise { + const db = await getDb(); + if (!db) return []; + + // Delete existing nodes for this workflow, then insert fresh set (canvas save = full replace) + await db.delete(workflowNodes).where(eq(workflowNodes.workflowId, workflowId)); + + if (nodes.length === 0) return []; + + await db.insert(workflowNodes).values( + nodes.map((n) => ({ + ...n, + workflowId, + nodeKey: n.nodeKey || `node_${nanoid(8)}`, + })) + ); + + return db.select().from(workflowNodes).where(eq(workflowNodes.workflowId, workflowId)); +} + +// ─── Edges CRUD ─────────────────────────────────────────────────────────────── + +export async function saveEdges(workflowId: number, edges: InsertWorkflowEdge[]): Promise { + const db = await getDb(); + if (!db) return []; + + await db.delete(workflowEdges).where(eq(workflowEdges.workflowId, workflowId)); + + if (edges.length === 0) return []; + + await db.insert(workflowEdges).values( + edges.map((e) => ({ + ...e, + workflowId, + edgeKey: e.edgeKey || `edge_${nanoid(8)}`, + })) + ); + + return db.select().from(workflowEdges).where(eq(workflowEdges.workflowId, workflowId)); +} + +// ─── Full canvas save (nodes + edges atomically) ───────────────────────────── + +export async function saveCanvas( + workflowId: number, + nodesData: InsertWorkflowNode[], + edgesData: InsertWorkflowEdge[], + canvasMeta?: Record, +) { + const db = await getDb(); + if (!db) return null; + + // Update canvas meta on the workflow itself + if (canvasMeta) { + await db.update(workflows).set({ canvasMeta } as any).where(eq(workflows.id, workflowId)); + } + + const nodes = await saveNodes(workflowId, nodesData); + const edges = await saveEdges(workflowId, edgesData); + + return { nodes, edges }; +} + +// ─── Workflow Runs ──────────────────────────────────────────────────────────── + +export async function createRun(workflowId: number, input?: string): Promise { + const db = await getDb(); + if (!db) return null; + + const runKey = `run_${nanoid(12)}`; + await db.insert(workflowRuns).values({ + workflowId, + runKey, + status: "pending", + input: input ?? null, + nodeResults: {}, + }); + + const [row] = await db.select().from(workflowRuns).where(eq(workflowRuns.runKey, runKey)).limit(1); + return row ?? null; +} + +export async function getRunsByWorkflow(workflowId: number, limit = 50): Promise { + const db = await getDb(); + if (!db) return []; + return db + .select() + .from(workflowRuns) + .where(eq(workflowRuns.workflowId, workflowId)) + .orderBy(desc(workflowRuns.createdAt)) + .limit(limit); +} + +export async function getRunByKey(runKey: string): Promise { + const db = await getDb(); + if (!db) return null; + const [row] = await db.select().from(workflowRuns).where(eq(workflowRuns.runKey, runKey)).limit(1); + return row ?? null; +} + +export async function updateRun(runKey: string, data: Partial) { + const db = await getDb(); + if (!db) return; + await db.update(workflowRuns).set(data as any).where(eq(workflowRuns.runKey, runKey)); +} + +// ─── Execution Engine ───────────────────────────────────────────────────────── + +/** + * Execute a single node. For agent nodes it calls the agent chat mutation; + * for container nodes it can later call Docker SDK; for conditions it evals the expression. + */ +async function executeNode( + node: WorkflowNode, + input: string, + runKey: string, +): Promise<{ output: string; success: boolean; error?: string }> { + const start = Date.now(); + + try { + switch (node.kind) { + case "agent": { + if (!node.agentId) return { output: "", success: false, error: "No agentId configured" }; + const { getAgentById } = await import("./agents"); + const agent = await getAgentById(node.agentId); + if (!agent) return { output: "", success: false, error: `Agent #${node.agentId} not found` }; + + const { chatCompletion } = await import("./ollama"); + const messages: Array<{ role: "system" | "user" | "assistant"; content: string }> = []; + if (agent.systemPrompt) messages.push({ role: "system", content: agent.systemPrompt }); + messages.push({ role: "user", content: input }); + + const result = await chatCompletion(agent.model, messages, { + temperature: agent.temperature ? parseFloat(agent.temperature as string) : 0.7, + max_tokens: agent.maxTokens ?? 2048, + }); + const text = result.choices[0]?.message?.content ?? ""; + return { output: text, success: true }; + } + + case "container": { + // Placeholder: in production this would call Docker SDK / Gateway + const cfg = node.containerConfig as any; + return { + output: `[Container ${cfg?.image ?? "unknown"}] executed with input length=${input.length}`, + success: true, + }; + } + + case "condition": { + const expr = node.conditionExpr ?? "true"; + // Simple safe eval: only allow basic boolean expressions + const result = expr.trim().toLowerCase() === "true" || input.trim().length > 0; + return { output: result ? "true" : "false", success: true }; + } + + case "trigger": + case "output": + return { output: input, success: true }; + + default: + return { output: input, success: true }; + } + } catch (err: any) { + return { output: "", success: false, error: err.message }; + } +} + +/** + * Execute a full workflow from its trigger node(s) following edges. + * Updates workflowRuns in real-time so the dashboard can poll progress. + */ +export async function executeWorkflow(workflowId: number, userInput?: string): Promise { + const wf = await getWorkflowById(workflowId); + if (!wf) return null; + + const run = await createRun(workflowId, userInput); + if (!run) return null; + + const { nodes, edges } = wf; + + // Build adjacency: sourceNodeKey → [targetNodeKey, …] + const adj: Record = {}; + for (const e of edges) { + if (!adj[e.sourceNodeKey]) adj[e.sourceNodeKey] = []; + adj[e.sourceNodeKey].push(e.targetNodeKey); + } + + // Find trigger / start nodes (no incoming edges, or kind=trigger) + const incomingSet = new Set(edges.map((e) => e.targetNodeKey)); + const startNodes = nodes.filter( + (n) => n.kind === "trigger" || !incomingSet.has(n.nodeKey) + ); + + const nodeMap: Record = {}; + for (const n of nodes) nodeMap[n.nodeKey] = n; + + // Mark run as running + await updateRun(run.runKey, { status: "running", startedAt: new Date() } as any); + + const nodeResults: Record = {}; + const visited = new Set(); + + // BFS execution + const queue: Array<{ nodeKey: string; input: string }> = startNodes.map((n) => ({ + nodeKey: n.nodeKey, + input: userInput ?? "", + })); + + let finalOutput = ""; + let hasError = false; + + while (queue.length > 0) { + const { nodeKey, input } = queue.shift()!; + if (visited.has(nodeKey)) continue; + visited.add(nodeKey); + + const node = nodeMap[nodeKey]; + if (!node) continue; + + // Update current node + nodeResults[nodeKey] = { status: "running", startedAt: new Date().toISOString() }; + await updateRun(run.runKey, { currentNodeKey: nodeKey, nodeResults } as any); + + const start = Date.now(); + const result = await executeNode(node, input, run.runKey); + const durationMs = Date.now() - start; + + nodeResults[nodeKey] = { + status: result.success ? "success" : "failed", + output: result.output, + durationMs, + error: result.error, + startedAt: nodeResults[nodeKey].startedAt, + finishedAt: new Date().toISOString(), + }; + await updateRun(run.runKey, { nodeResults } as any); + + if (!result.success) { + hasError = true; + continue; // don't propagate to children on failure + } + + // For condition nodes: only propagate if result is "true" + if (node.kind === "condition" && result.output !== "true") { + continue; + } + + finalOutput = result.output; + + // Enqueue children + const children = adj[nodeKey] ?? []; + for (const childKey of children) { + if (!visited.has(childKey)) { + queue.push({ nodeKey: childKey, input: result.output }); + } + } + } + + // Mark remaining unvisited nodes as skipped + for (const n of nodes) { + if (!nodeResults[n.nodeKey]) { + nodeResults[n.nodeKey] = { status: "skipped" }; + } + } + + const totalDurationMs = run.startedAt ? Date.now() - new Date(run.startedAt as any).getTime() : 0; + + await updateRun(run.runKey, { + status: hasError ? "failed" : "success", + nodeResults, + output: finalOutput, + totalDurationMs, + finishedAt: new Date(), + currentNodeKey: null, + errorMessage: hasError ? "One or more nodes failed" : null, + } as any); + + return getRunByKey(run.runKey); +} + +/** + * Execute a single node inside a workflow (for testing individual blocks). + */ +export async function executeSingleNode( + workflowId: number, + nodeKey: string, + input: string, +): Promise<{ output: string; success: boolean; durationMs: number; error?: string }> { + const db = await getDb(); + if (!db) return { output: "", success: false, durationMs: 0, error: "DB unavailable" }; + + const [node] = await db + .select() + .from(workflowNodes) + .where(and(eq(workflowNodes.workflowId, workflowId), eq(workflowNodes.nodeKey, nodeKey))) + .limit(1); + + if (!node) return { output: "", success: false, durationMs: 0, error: "Node not found" }; + + const start = Date.now(); + const result = await executeNode(node, input, `test_${nanoid(8)}`); + return { ...result, durationMs: Date.now() - start }; +} + +/** + * Cancel a running workflow run + */ +export async function cancelRun(runKey: string): Promise { + const db = await getDb(); + if (!db) return false; + + await db + .update(workflowRuns) + .set({ status: "cancelled", finishedAt: new Date() } as any) + .where(eq(workflowRuns.runKey, runKey)); + return true; +} + +/** + * Get aggregated stats for a workflow + */ +export async function getWorkflowStats(workflowId: number) { + const db = await getDb(); + if (!db) return null; + + const runs = await db + .select() + .from(workflowRuns) + .where(eq(workflowRuns.workflowId, workflowId)) + .orderBy(desc(workflowRuns.createdAt)) + .limit(100); + + const total = runs.length; + const success = runs.filter((r) => r.status === "success").length; + const failed = runs.filter((r) => r.status === "failed").length; + const running = runs.filter((r) => r.status === "running").length; + const avgDuration = total > 0 + ? Math.round(runs.reduce((s, r) => s + (r.totalDurationMs ?? 0), 0) / total) + : 0; + + return { + totalRuns: total, + successRuns: success, + failedRuns: failed, + runningRuns: running, + successRate: total > 0 ? Math.round((success / total) * 100) : 0, + avgDurationMs: avgDuration, + lastRun: runs[0] ?? null, + }; +}
+ Drag nodes from the palette to start building your workflow +
Workflow Dashboard · Real-time monitoring
{node.conditionExpr}
+ Evaluates to true/false. If false, downstream nodes are skipped. +
+ {workflows.length} workflows · Visual pipeline constructor +
+ Create your first workflow to build visual agent pipelines. +
{wf.description}