812 lines
25 KiB
Go
812 lines
25 KiB
Go
// Package tools implements the GoClaw Tool Executor.
|
|
// Each tool corresponds to a function the LLM can call via OpenAI function calling.
|
|
package tools
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.softuniq.eu/UniqAI/GoClaw/gateway/internal/db"
|
|
)
|
|
|
|
// ─── Types ────────────────────────────────────────────────────────────────────
|
|
|
|
type ToolResult struct {
|
|
Success bool `json:"success"`
|
|
Result any `json:"result,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
DurationMs int64 `json:"durationMs"`
|
|
}
|
|
|
|
// ─── Tool Definitions (OpenAI function calling schema) ────────────────────────
|
|
|
|
type ToolDef struct {
|
|
Type string `json:"type"`
|
|
Function FuncDef `json:"function"`
|
|
}
|
|
|
|
type FuncDef struct {
|
|
Name string `json:"name"`
|
|
Description string `json:"description"`
|
|
Parameters map[string]any `json:"parameters"`
|
|
}
|
|
|
|
// OrchestratorTools returns the full list of tools available to the orchestrator.
|
|
func OrchestratorTools() []ToolDef {
|
|
return []ToolDef{
|
|
{
|
|
Type: "function",
|
|
Function: FuncDef{
|
|
Name: "shell_exec",
|
|
Description: "Execute a bash command on the host system. Returns stdout and stderr.",
|
|
Parameters: map[string]any{
|
|
"type": "object",
|
|
"properties": map[string]any{
|
|
"command": map[string]any{"type": "string", "description": "Bash command to execute"},
|
|
"timeout": map[string]any{"type": "number", "description": "Timeout in seconds (default: 30)"},
|
|
},
|
|
"required": []string{"command"},
|
|
"additionalProperties": false,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Type: "function",
|
|
Function: FuncDef{
|
|
Name: "file_read",
|
|
Description: "Read a file from the filesystem. Returns file content as text.",
|
|
Parameters: map[string]any{
|
|
"type": "object",
|
|
"properties": map[string]any{
|
|
"path": map[string]any{"type": "string", "description": "Absolute or relative file path"},
|
|
},
|
|
"required": []string{"path"},
|
|
"additionalProperties": false,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Type: "function",
|
|
Function: FuncDef{
|
|
Name: "file_write",
|
|
Description: "Write content to a file. Creates parent directories if needed.",
|
|
Parameters: map[string]any{
|
|
"type": "object",
|
|
"properties": map[string]any{
|
|
"path": map[string]any{"type": "string", "description": "File path to write"},
|
|
"content": map[string]any{"type": "string", "description": "Content to write"},
|
|
"append": map[string]any{"type": "boolean", "description": "Append instead of overwrite"},
|
|
},
|
|
"required": []string{"path", "content"},
|
|
"additionalProperties": false,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Type: "function",
|
|
Function: FuncDef{
|
|
Name: "file_list",
|
|
Description: "List files and directories at a given path.",
|
|
Parameters: map[string]any{
|
|
"type": "object",
|
|
"properties": map[string]any{
|
|
"path": map[string]any{"type": "string", "description": "Directory path"},
|
|
"recursive": map[string]any{"type": "boolean", "description": "List recursively"},
|
|
},
|
|
"required": []string{"path"},
|
|
"additionalProperties": false,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Type: "function",
|
|
Function: FuncDef{
|
|
Name: "http_request",
|
|
Description: "Make an HTTP request (GET, POST, PUT, DELETE) to any URL.",
|
|
Parameters: map[string]any{
|
|
"type": "object",
|
|
"properties": map[string]any{
|
|
"url": map[string]any{"type": "string", "description": "Target URL"},
|
|
"method": map[string]any{"type": "string", "description": "HTTP method (default: GET)"},
|
|
"headers": map[string]any{"type": "object", "description": "Request headers"},
|
|
"body": map[string]any{"type": "string", "description": "Request body for POST/PUT"},
|
|
},
|
|
"required": []string{"url"},
|
|
"additionalProperties": false,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Type: "function",
|
|
Function: FuncDef{
|
|
Name: "docker_exec",
|
|
Description: "Execute a Docker CLI command (docker ps, docker logs, docker exec, etc.).",
|
|
Parameters: map[string]any{
|
|
"type": "object",
|
|
"properties": map[string]any{
|
|
"command": map[string]any{"type": "string", "description": "Docker command without 'docker' prefix (e.g. 'ps -a', 'logs mycontainer')"},
|
|
},
|
|
"required": []string{"command"},
|
|
"additionalProperties": false,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Type: "function",
|
|
Function: FuncDef{
|
|
Name: "list_agents",
|
|
Description: "List all available specialized agents with their capabilities.",
|
|
Parameters: map[string]any{
|
|
"type": "object",
|
|
"properties": map[string]any{},
|
|
"additionalProperties": false,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Type: "function",
|
|
Function: FuncDef{
|
|
Name: "delegate_to_agent",
|
|
Description: "Delegate a task to a specific agent container via A2A protocol. " +
|
|
"The agent processes the task with its own LLM and tools. " +
|
|
"Use async=true for fire-and-forget with callback_url, or sync (default) to wait for result.",
|
|
Parameters: map[string]any{
|
|
"type": "object",
|
|
"properties": map[string]any{
|
|
"agentId": map[string]any{"type": "number", "description": "Target agent ID"},
|
|
"task": map[string]any{"type": "string", "description": "Task description / prompt for the agent"},
|
|
"async": map[string]any{"type": "boolean", "description": "If true, returns task_id immediately; if false (default), waits for result"},
|
|
"callbackUrl": map[string]any{"type": "string", "description": "URL to POST result when async=true"},
|
|
"priority": map[string]any{"type": "number", "description": "Task priority 0-10 (default 5)"},
|
|
"timeoutSecs": map[string]any{"type": "number", "description": "Max seconds to wait (default 120)"},
|
|
},
|
|
"required": []string{"agentId", "task"},
|
|
"additionalProperties": false,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Type: "function",
|
|
Function: FuncDef{
|
|
Name: "fanout_agents",
|
|
Description: "Send the SAME task to MULTIPLE agents IN PARALLEL and collect all results. " +
|
|
"Useful when you need different specialists to work on the same problem simultaneously. " +
|
|
"Returns results from all agents as an array.",
|
|
Parameters: map[string]any{
|
|
"type": "object",
|
|
"properties": map[string]any{
|
|
"agentIds": map[string]any{
|
|
"type": "array",
|
|
"items": map[string]any{"type": "number"},
|
|
"description": "List of agent IDs to send the task to (max 10)",
|
|
},
|
|
"task": map[string]any{"type": "string", "description": "Task to send to all agents"},
|
|
"timeoutSecs": map[string]any{"type": "number", "description": "Max seconds per agent (default 60)"},
|
|
},
|
|
"required": []string{"agentIds", "task"},
|
|
"additionalProperties": false,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// ─── Executor ─────────────────────────────────────────────────────────────────
|
|
|
|
type Executor struct {
|
|
projectRoot string
|
|
httpClient *http.Client
|
|
// agentListFn is injected to avoid circular dependency with orchestrator
|
|
agentListFn func() ([]map[string]any, error)
|
|
// database is used for delegate_to_agent to look up service address
|
|
database *db.DB
|
|
}
|
|
|
|
func NewExecutor(projectRoot string, agentListFn func() ([]map[string]any, error)) *Executor {
|
|
return &Executor{
|
|
projectRoot: projectRoot,
|
|
httpClient: &http.Client{
|
|
Timeout: 60 * time.Second,
|
|
},
|
|
agentListFn: agentListFn,
|
|
}
|
|
}
|
|
|
|
// SetDatabase injects the DB reference so delegate_to_agent can resolve agent addresses.
|
|
func (e *Executor) SetDatabase(database *db.DB) {
|
|
e.database = database
|
|
}
|
|
|
|
// Execute dispatches a tool call by name.
|
|
func (e *Executor) Execute(ctx context.Context, toolName string, argsJSON string) ToolResult {
|
|
start := time.Now()
|
|
|
|
var args map[string]any
|
|
if err := json.Unmarshal([]byte(argsJSON), &args); err != nil {
|
|
return ToolResult{Success: false, Error: "invalid args JSON: " + err.Error(), DurationMs: ms(start)}
|
|
}
|
|
|
|
var result any
|
|
var execErr error
|
|
|
|
switch toolName {
|
|
case "shell_exec":
|
|
result, execErr = e.shellExec(ctx, args)
|
|
case "file_read":
|
|
result, execErr = e.fileRead(args)
|
|
case "file_write":
|
|
result, execErr = e.fileWrite(args)
|
|
case "file_list":
|
|
result, execErr = e.fileList(args)
|
|
case "http_request":
|
|
result, execErr = e.httpRequest(ctx, args)
|
|
case "docker_exec":
|
|
result, execErr = e.dockerExec(ctx, args)
|
|
case "list_agents":
|
|
result, execErr = e.listAgents()
|
|
case "delegate_to_agent":
|
|
result, execErr = e.delegateToAgent(ctx, args)
|
|
case "fanout_agents":
|
|
result, execErr = e.fanoutAgents(ctx, args)
|
|
default:
|
|
return ToolResult{Success: false, Error: fmt.Sprintf("unknown tool: %s", toolName), DurationMs: ms(start)}
|
|
}
|
|
|
|
if execErr != nil {
|
|
return ToolResult{Success: false, Error: execErr.Error(), DurationMs: ms(start)}
|
|
}
|
|
return ToolResult{Success: true, Result: result, DurationMs: ms(start)}
|
|
}
|
|
|
|
// ─── Tool Implementations ─────────────────────────────────────────────────────
|
|
|
|
func (e *Executor) shellExec(ctx context.Context, args map[string]any) (any, error) {
|
|
command, _ := args["command"].(string)
|
|
if command == "" {
|
|
return nil, fmt.Errorf("command is required")
|
|
}
|
|
|
|
// Safety: block destructive patterns
|
|
blocked := []string{"rm -rf /", "mkfs", "dd if=/dev/zero", ":(){ :|:& };:"}
|
|
for _, b := range blocked {
|
|
if strings.Contains(command, b) {
|
|
return nil, fmt.Errorf("command blocked for safety: contains '%s'", b)
|
|
}
|
|
}
|
|
|
|
timeoutSec := 30
|
|
if t, ok := args["timeout"].(float64); ok && t > 0 {
|
|
timeoutSec = int(t)
|
|
}
|
|
|
|
ctx2, cancel := context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second)
|
|
defer cancel()
|
|
|
|
// Prefer bash; fall back to sh (Alpine / minimal containers only have sh)
|
|
shell := "bash"
|
|
if _, err := exec.LookPath("bash"); err != nil {
|
|
shell = "sh"
|
|
}
|
|
cmd := exec.CommandContext(ctx2, shell, "-c", command)
|
|
cmd.Dir = e.projectRoot
|
|
|
|
out, runErr := cmd.CombinedOutput()
|
|
stdout := string(out)
|
|
if len(stdout) > 20000 {
|
|
stdout = stdout[:20000] + "\n...[truncated]"
|
|
}
|
|
|
|
exitCode := 0
|
|
if cmd.ProcessState != nil {
|
|
exitCode = cmd.ProcessState.ExitCode()
|
|
}
|
|
if runErr != nil {
|
|
// Return partial output even on error — LLM can see what happened
|
|
return map[string]any{"stdout": stdout, "stderr": runErr.Error(), "exitCode": exitCode}, nil
|
|
}
|
|
return map[string]any{"stdout": stdout, "stderr": "", "exitCode": 0}, nil
|
|
}
|
|
|
|
func (e *Executor) fileRead(args map[string]any) (any, error) {
|
|
path, _ := args["path"].(string)
|
|
if path == "" {
|
|
return nil, fmt.Errorf("path is required")
|
|
}
|
|
path = e.resolvePath(path)
|
|
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
content := string(data)
|
|
if len(content) > 50000 {
|
|
content = content[:50000] + "\n...[truncated]"
|
|
}
|
|
return map[string]any{"content": content, "size": len(data), "path": path}, nil
|
|
}
|
|
|
|
func (e *Executor) fileWrite(args map[string]any) (any, error) {
|
|
path, _ := args["path"].(string)
|
|
content, _ := args["content"].(string)
|
|
appendMode, _ := args["append"].(bool)
|
|
|
|
if path == "" {
|
|
return nil, fmt.Errorf("path is required")
|
|
}
|
|
path = e.resolvePath(path)
|
|
|
|
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
flag := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
|
|
if appendMode {
|
|
flag = os.O_WRONLY | os.O_CREATE | os.O_APPEND
|
|
}
|
|
|
|
f, err := os.OpenFile(path, flag, 0644)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer f.Close()
|
|
|
|
n, err := f.WriteString(content)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return map[string]any{"written": n, "path": path, "append": appendMode}, nil
|
|
}
|
|
|
|
func (e *Executor) fileList(args map[string]any) (any, error) {
|
|
path, _ := args["path"].(string)
|
|
if path == "" {
|
|
path = "."
|
|
}
|
|
path = e.resolvePath(path)
|
|
recursive, _ := args["recursive"].(bool)
|
|
|
|
var entries []map[string]any
|
|
|
|
if recursive {
|
|
err := filepath.Walk(path, func(p string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
rel, _ := filepath.Rel(path, p)
|
|
entries = append(entries, map[string]any{
|
|
"name": rel,
|
|
"isDir": info.IsDir(),
|
|
"size": info.Size(),
|
|
})
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
dirEntries, err := os.ReadDir(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, de := range dirEntries {
|
|
info, _ := de.Info()
|
|
size := int64(0)
|
|
if info != nil {
|
|
size = info.Size()
|
|
}
|
|
entries = append(entries, map[string]any{
|
|
"name": de.Name(),
|
|
"isDir": de.IsDir(),
|
|
"size": size,
|
|
})
|
|
}
|
|
}
|
|
|
|
return map[string]any{"path": path, "entries": entries, "count": len(entries)}, nil
|
|
}
|
|
|
|
func (e *Executor) httpRequest(ctx context.Context, args map[string]any) (any, error) {
|
|
url, _ := args["url"].(string)
|
|
if url == "" {
|
|
return nil, fmt.Errorf("url is required")
|
|
}
|
|
method := "GET"
|
|
if m, ok := args["method"].(string); ok && m != "" {
|
|
method = strings.ToUpper(m)
|
|
}
|
|
|
|
var bodyReader io.Reader
|
|
if body, ok := args["body"].(string); ok && body != "" {
|
|
bodyReader = strings.NewReader(body)
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, method, url, bodyReader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req.Header.Set("User-Agent", "GoClaw-Gateway/1.0")
|
|
if headers, ok := args["headers"].(map[string]any); ok {
|
|
for k, v := range headers {
|
|
req.Header.Set(k, fmt.Sprintf("%v", v))
|
|
}
|
|
}
|
|
if bodyReader != nil && req.Header.Get("Content-Type") == "" {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
}
|
|
|
|
resp, err := e.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
respBody, _ := io.ReadAll(resp.Body)
|
|
text := string(respBody)
|
|
if len(text) > 10000 {
|
|
text = text[:10000] + "\n...[truncated]"
|
|
}
|
|
|
|
return map[string]any{
|
|
"status": resp.StatusCode,
|
|
"statusText": resp.Status,
|
|
"body": text,
|
|
}, nil
|
|
}
|
|
|
|
func (e *Executor) dockerExec(ctx context.Context, args map[string]any) (any, error) {
|
|
command, _ := args["command"].(string)
|
|
if command == "" {
|
|
return nil, fmt.Errorf("command is required")
|
|
}
|
|
|
|
ctx2, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
// Try docker CLI first (available if docker binary is in PATH)
|
|
parts := strings.Fields("docker " + command)
|
|
cmd := exec.CommandContext(ctx2, parts[0], parts[1:]...)
|
|
out, err := cmd.CombinedOutput()
|
|
output := string(out)
|
|
if err == nil {
|
|
if len(output) > 10000 {
|
|
output = output[:10000] + "\n...[truncated]"
|
|
}
|
|
return map[string]any{"output": output}, nil
|
|
}
|
|
|
|
// Fallback: route via Docker socket using curl (gateway has /var/run/docker.sock mounted)
|
|
// This works even when docker CLI binary is not installed in the container.
|
|
subCmd := strings.TrimSpace(command)
|
|
fields := strings.Fields(subCmd)
|
|
if len(fields) == 0 {
|
|
return map[string]any{"output": output, "error": err.Error()}, nil
|
|
}
|
|
firstWord := fields[0]
|
|
|
|
var shellCmd string
|
|
switch firstWord {
|
|
case "ps":
|
|
all := "false"
|
|
if strings.Contains(subCmd, "-a") || strings.Contains(subCmd, "--all") {
|
|
all = "true"
|
|
}
|
|
// Use jq if available, fall back to raw JSON (both alpine gateway and agent-worker have curl)
|
|
shellCmd = fmt.Sprintf(
|
|
`curl -sf --unix-socket /var/run/docker.sock "http://localhost/v1.44/containers/json?all=%s" | `+
|
|
`jq -r '.[] | (.Id[:12]) + " " + .Image + " " + .Status + " " + (.Names|join(","))' 2>/dev/null || `+
|
|
`curl -sf --unix-socket /var/run/docker.sock "http://localhost/v1.44/containers/json?all=%s"`,
|
|
all, all)
|
|
case "logs":
|
|
if len(fields) < 2 {
|
|
return nil, fmt.Errorf("docker logs requires container name/id")
|
|
}
|
|
container := fields[len(fields)-1]
|
|
tail := "100"
|
|
shellCmd = fmt.Sprintf(
|
|
`curl -sf --unix-socket /var/run/docker.sock `+
|
|
`"http://localhost/v1.44/containers/%s/logs?stdout=true&stderr=true&tail=%s×tamps=false" 2>&1 | `+
|
|
`strings 2>/dev/null || cat`,
|
|
container, tail)
|
|
case "inspect":
|
|
if len(fields) < 2 {
|
|
return nil, fmt.Errorf("docker inspect requires container name/id")
|
|
}
|
|
container := fields[len(fields)-1]
|
|
shellCmd = fmt.Sprintf(
|
|
`curl -sf --unix-socket /var/run/docker.sock "http://localhost/v1.44/containers/%s/json"`,
|
|
container)
|
|
case "stats":
|
|
if len(fields) < 2 {
|
|
return nil, fmt.Errorf("docker stats requires container name/id")
|
|
}
|
|
container := fields[len(fields)-1]
|
|
shellCmd = fmt.Sprintf(
|
|
`curl -sf --unix-socket /var/run/docker.sock "http://localhost/v1.44/containers/%s/stats?stream=false"`,
|
|
container)
|
|
default:
|
|
return map[string]any{
|
|
"output": output,
|
|
"error": fmt.Sprintf("docker CLI not found in $PATH; socket fallback supports: ps, logs, inspect, stats. Command was: docker %s", command),
|
|
"hint": "Use shell_exec with 'curl -s --unix-socket /var/run/docker.sock ...' for other Docker API calls",
|
|
}, nil
|
|
}
|
|
|
|
fallbackCmd := exec.CommandContext(ctx2, "sh", "-c", shellCmd)
|
|
fallbackOut, fallbackErr := fallbackCmd.CombinedOutput()
|
|
result := string(fallbackOut)
|
|
if len(result) > 10000 {
|
|
result = result[:10000] + "\n...[truncated]"
|
|
}
|
|
if fallbackErr != nil {
|
|
return map[string]any{"output": result, "error": "socket fallback: " + fallbackErr.Error()}, nil
|
|
}
|
|
return map[string]any{"output": result, "via": "docker-socket-api"}, nil
|
|
}
|
|
|
|
func (e *Executor) listAgents() (any, error) {
|
|
if e.agentListFn == nil {
|
|
return map[string]any{"agents": []any{}, "note": "DB not connected"}, nil
|
|
}
|
|
agents, err := e.agentListFn()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return map[string]any{"agents": agents, "count": len(agents)}, nil
|
|
}
|
|
|
|
// A2ATaskRequest is the standard agent-to-agent task message format (Phase C).
|
|
type A2ATaskRequest struct {
|
|
TaskID string `json:"task_id"`
|
|
FromAgentID int `json:"from_agent_id"`
|
|
Task string `json:"input"`
|
|
CallbackURL string `json:"callback_url,omitempty"`
|
|
Priority int `json:"priority"`
|
|
TimeoutSecs int `json:"timeout_secs"`
|
|
}
|
|
|
|
// delegateToAgent sends a task to an agent's container via A2A HTTP protocol.
|
|
// Resolves the agent's service address from DB, respects priority/timeout from args.
|
|
// Falls back with a clear message if agent is not deployed/running.
|
|
func (e *Executor) delegateToAgent(ctx context.Context, args map[string]any) (any, error) {
|
|
agentIDf, _ := args["agentId"].(float64)
|
|
agentID := int(agentIDf)
|
|
|
|
task, _ := args["task"].(string)
|
|
if task == "" {
|
|
task, _ = args["message"].(string) // backward compat
|
|
}
|
|
if task == "" {
|
|
return nil, fmt.Errorf("task is required")
|
|
}
|
|
|
|
callbackURL, _ := args["callbackUrl"].(string)
|
|
async, _ := args["async"].(bool)
|
|
|
|
priority := 5
|
|
if pf, ok := args["priority"].(float64); ok && pf > 0 {
|
|
priority = int(pf)
|
|
}
|
|
timeoutSecs := 120
|
|
if tf, ok := args["timeoutSecs"].(float64); ok && tf > 0 {
|
|
timeoutSecs = int(tf)
|
|
}
|
|
|
|
// Resolve agent container address from DB
|
|
if e.database != nil {
|
|
cfg, err := e.database.GetAgentByID(agentID)
|
|
if err == nil && cfg != nil && cfg.ServicePort > 0 && cfg.ContainerStatus == "running" {
|
|
agentURL := fmt.Sprintf("http://%s:%d", cfg.ServiceName, cfg.ServicePort)
|
|
req := A2ATaskRequest{
|
|
TaskID: fmt.Sprintf("orch-%d-%d", agentID, time.Now().UnixMilli()),
|
|
FromAgentID: 0, // orchestrator
|
|
Task: task,
|
|
CallbackURL: callbackURL,
|
|
Priority: priority,
|
|
TimeoutSecs: timeoutSecs,
|
|
}
|
|
if async {
|
|
return e.postA2ATask(ctx, agentURL, req)
|
|
}
|
|
return e.postA2AChat(ctx, agentURL, task, timeoutSecs)
|
|
}
|
|
if e.database != nil {
|
|
cfg, _ := e.database.GetAgentByID(agentID)
|
|
status := "unknown"
|
|
if cfg != nil {
|
|
status = cfg.ContainerStatus
|
|
if status == "" {
|
|
status = "stopped"
|
|
}
|
|
}
|
|
return map[string]any{
|
|
"delegated": false,
|
|
"agentId": agentID,
|
|
"status": status,
|
|
"note": fmt.Sprintf(
|
|
"Agent %d container is %q. Deploy it via Web Panel (POST /api/agents/%d/deploy) then retry.",
|
|
agentID, status, agentID),
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
return map[string]any{
|
|
"delegated": false,
|
|
"agentId": agentID,
|
|
"note": "No database connection — cannot resolve agent address.",
|
|
}, nil
|
|
}
|
|
|
|
// postA2ATask POSTs to agent's /task endpoint using A2A protocol (async).
|
|
func (e *Executor) postA2ATask(ctx context.Context, agentURL string, req A2ATaskRequest) (any, error) {
|
|
payload, _ := json.Marshal(req)
|
|
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, agentURL+"/task", bytes.NewReader(payload))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("a2a build request: %w", err)
|
|
}
|
|
httpReq.Header.Set("Content-Type", "application/json")
|
|
httpReq.Header.Set("X-GoClaw-From", "orchestrator")
|
|
resp, err := e.httpClient.Do(httpReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("a2a task HTTP error: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
body, _ := io.ReadAll(resp.Body)
|
|
var result map[string]any
|
|
_ = json.Unmarshal(body, &result)
|
|
return result, nil
|
|
}
|
|
|
|
// postA2AChat POSTs to agent's /chat endpoint (sync, waits for LLM response).
|
|
func (e *Executor) postA2AChat(ctx context.Context, agentURL string, task string, timeoutSecs int) (any, error) {
|
|
payload, _ := json.Marshal(map[string]any{
|
|
"messages": []map[string]string{{"role": "user", "content": task}},
|
|
"timeout_secs": timeoutSecs,
|
|
})
|
|
chatCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSecs)*time.Second)
|
|
defer cancel()
|
|
|
|
httpReq, err := http.NewRequestWithContext(chatCtx, http.MethodPost, agentURL+"/chat", bytes.NewReader(payload))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("a2a chat request: %w", err)
|
|
}
|
|
httpReq.Header.Set("Content-Type", "application/json")
|
|
httpReq.Header.Set("X-GoClaw-From", "orchestrator")
|
|
|
|
// Use a client with longer timeout for sync chats
|
|
client := &http.Client{Timeout: time.Duration(timeoutSecs+10) * time.Second}
|
|
resp, err := client.Do(httpReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("a2a chat HTTP error: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
body, _ := io.ReadAll(resp.Body)
|
|
var result map[string]any
|
|
_ = json.Unmarshal(body, &result)
|
|
return result, nil
|
|
}
|
|
|
|
// fanoutAgents sends the same task to multiple agents in parallel and collects results.
|
|
func (e *Executor) fanoutAgents(ctx context.Context, args map[string]any) (any, error) {
|
|
task, _ := args["task"].(string)
|
|
if task == "" {
|
|
return nil, fmt.Errorf("task is required")
|
|
}
|
|
|
|
timeoutSecs := 60
|
|
if tf, ok := args["timeoutSecs"].(float64); ok && tf > 0 {
|
|
timeoutSecs = int(tf)
|
|
}
|
|
|
|
// Parse agentIds array
|
|
rawIDs, _ := args["agentIds"].([]any)
|
|
if len(rawIDs) == 0 {
|
|
return nil, fmt.Errorf("agentIds must be a non-empty array")
|
|
}
|
|
if len(rawIDs) > 10 {
|
|
rawIDs = rawIDs[:10] // cap at 10
|
|
}
|
|
|
|
type agentResult struct {
|
|
AgentID int `json:"agentId"`
|
|
AgentName string `json:"agentName,omitempty"`
|
|
Success bool `json:"success"`
|
|
Result any `json:"result,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
Delegated bool `json:"delegated"`
|
|
DurationMs int64 `json:"durationMs"`
|
|
}
|
|
|
|
results := make([]agentResult, len(rawIDs))
|
|
var wg sync.WaitGroup
|
|
|
|
fanCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSecs+5)*time.Second)
|
|
defer cancel()
|
|
|
|
for i, rawID := range rawIDs {
|
|
idf, _ := rawID.(float64)
|
|
agentID := int(idf)
|
|
idx := i
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
start := time.Now()
|
|
ar := agentResult{AgentID: agentID}
|
|
|
|
if e.database == nil {
|
|
ar.Error = "no database connection"
|
|
results[idx] = ar
|
|
return
|
|
}
|
|
|
|
cfg, err := e.database.GetAgentByID(agentID)
|
|
if err != nil || cfg == nil {
|
|
ar.Error = fmt.Sprintf("agent %d not found", agentID)
|
|
results[idx] = ar
|
|
return
|
|
}
|
|
ar.AgentName = cfg.Name
|
|
|
|
if cfg.ServicePort == 0 || cfg.ContainerStatus != "running" {
|
|
ar.Delegated = false
|
|
ar.Error = fmt.Sprintf("agent %q is %q — not running", cfg.Name, cfg.ContainerStatus)
|
|
results[idx] = ar
|
|
return
|
|
}
|
|
|
|
agentURL := fmt.Sprintf("http://%s:%d", cfg.ServiceName, cfg.ServicePort)
|
|
res, chatErr := e.postA2AChat(fanCtx, agentURL, task, timeoutSecs)
|
|
ar.DurationMs = ms(start)
|
|
if chatErr != nil {
|
|
ar.Success = false
|
|
ar.Error = chatErr.Error()
|
|
} else {
|
|
ar.Success = true
|
|
ar.Delegated = true
|
|
ar.Result = res
|
|
}
|
|
results[idx] = ar
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
succeeded := 0
|
|
for _, r := range results {
|
|
if r.Success {
|
|
succeeded++
|
|
}
|
|
}
|
|
|
|
return map[string]any{
|
|
"task": task,
|
|
"total": len(results),
|
|
"succeeded": succeeded,
|
|
"failed": len(results) - succeeded,
|
|
"results": results,
|
|
}, nil
|
|
}
|
|
|
|
// ─── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
func (e *Executor) resolvePath(path string) string {
|
|
if filepath.IsAbs(path) {
|
|
return path
|
|
}
|
|
return filepath.Join(e.projectRoot, path)
|
|
}
|
|
|
|
func ms(start time.Time) int64 {
|
|
return time.Since(start).Milliseconds()
|
|
}
|