// Package tools implements the GoClaw Tool Executor. // Each tool corresponds to a function the LLM can call via OpenAI function calling. package tools import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "os" "os/exec" "path/filepath" "strings" "sync" "time" "git.softuniq.eu/UniqAI/GoClaw/gateway/internal/db" ) // ─── Types ──────────────────────────────────────────────────────────────────── type ToolResult struct { Success bool `json:"success"` Result any `json:"result,omitempty"` Error string `json:"error,omitempty"` DurationMs int64 `json:"durationMs"` } // ─── Tool Definitions (OpenAI function calling schema) ──────────────────────── type ToolDef struct { Type string `json:"type"` Function FuncDef `json:"function"` } type FuncDef struct { Name string `json:"name"` Description string `json:"description"` Parameters map[string]any `json:"parameters"` } // OrchestratorTools returns the full list of tools available to the orchestrator. func OrchestratorTools() []ToolDef { return []ToolDef{ { Type: "function", Function: FuncDef{ Name: "shell_exec", Description: "Execute a bash command on the host system. Returns stdout and stderr.", Parameters: map[string]any{ "type": "object", "properties": map[string]any{ "command": map[string]any{"type": "string", "description": "Bash command to execute"}, "timeout": map[string]any{"type": "number", "description": "Timeout in seconds (default: 30)"}, }, "required": []string{"command"}, "additionalProperties": false, }, }, }, { Type: "function", Function: FuncDef{ Name: "file_read", Description: "Read a file from the filesystem. Returns file content as text.", Parameters: map[string]any{ "type": "object", "properties": map[string]any{ "path": map[string]any{"type": "string", "description": "Absolute or relative file path"}, }, "required": []string{"path"}, "additionalProperties": false, }, }, }, { Type: "function", Function: FuncDef{ Name: "file_write", Description: "Write content to a file. Creates parent directories if needed.", Parameters: map[string]any{ "type": "object", "properties": map[string]any{ "path": map[string]any{"type": "string", "description": "File path to write"}, "content": map[string]any{"type": "string", "description": "Content to write"}, "append": map[string]any{"type": "boolean", "description": "Append instead of overwrite"}, }, "required": []string{"path", "content"}, "additionalProperties": false, }, }, }, { Type: "function", Function: FuncDef{ Name: "file_list", Description: "List files and directories at a given path.", Parameters: map[string]any{ "type": "object", "properties": map[string]any{ "path": map[string]any{"type": "string", "description": "Directory path"}, "recursive": map[string]any{"type": "boolean", "description": "List recursively"}, }, "required": []string{"path"}, "additionalProperties": false, }, }, }, { Type: "function", Function: FuncDef{ Name: "http_request", Description: "Make an HTTP request (GET, POST, PUT, DELETE) to any URL.", Parameters: map[string]any{ "type": "object", "properties": map[string]any{ "url": map[string]any{"type": "string", "description": "Target URL"}, "method": map[string]any{"type": "string", "description": "HTTP method (default: GET)"}, "headers": map[string]any{"type": "object", "description": "Request headers"}, "body": map[string]any{"type": "string", "description": "Request body for POST/PUT"}, }, "required": []string{"url"}, "additionalProperties": false, }, }, }, { Type: "function", Function: FuncDef{ Name: "docker_exec", Description: "Execute a Docker CLI command (docker ps, docker logs, docker exec, etc.).", Parameters: map[string]any{ "type": "object", "properties": map[string]any{ "command": map[string]any{"type": "string", "description": "Docker command without 'docker' prefix (e.g. 'ps -a', 'logs mycontainer')"}, }, "required": []string{"command"}, "additionalProperties": false, }, }, }, { Type: "function", Function: FuncDef{ Name: "list_agents", Description: "List all available specialized agents with their capabilities.", Parameters: map[string]any{ "type": "object", "properties": map[string]any{}, "additionalProperties": false, }, }, }, { Type: "function", Function: FuncDef{ Name: "delegate_to_agent", Description: "Delegate a task to a specific agent container via A2A protocol. " + "The agent processes the task with its own LLM and tools. " + "Use async=true for fire-and-forget with callback_url, or sync (default) to wait for result.", Parameters: map[string]any{ "type": "object", "properties": map[string]any{ "agentId": map[string]any{"type": "number", "description": "Target agent ID"}, "task": map[string]any{"type": "string", "description": "Task description / prompt for the agent"}, "async": map[string]any{"type": "boolean", "description": "If true, returns task_id immediately; if false (default), waits for result"}, "callbackUrl": map[string]any{"type": "string", "description": "URL to POST result when async=true"}, "priority": map[string]any{"type": "number", "description": "Task priority 0-10 (default 5)"}, "timeoutSecs": map[string]any{"type": "number", "description": "Max seconds to wait (default 120)"}, }, "required": []string{"agentId", "task"}, "additionalProperties": false, }, }, }, { Type: "function", Function: FuncDef{ Name: "fanout_agents", Description: "Send the SAME task to MULTIPLE agents IN PARALLEL and collect all results. " + "Useful when you need different specialists to work on the same problem simultaneously. " + "Returns results from all agents as an array.", Parameters: map[string]any{ "type": "object", "properties": map[string]any{ "agentIds": map[string]any{ "type": "array", "items": map[string]any{"type": "number"}, "description": "List of agent IDs to send the task to (max 10)", }, "task": map[string]any{"type": "string", "description": "Task to send to all agents"}, "timeoutSecs": map[string]any{"type": "number", "description": "Max seconds per agent (default 60)"}, }, "required": []string{"agentIds", "task"}, "additionalProperties": false, }, }, }, } } // ─── Executor ───────────────────────────────────────────────────────────────── type Executor struct { projectRoot string httpClient *http.Client // agentListFn is injected to avoid circular dependency with orchestrator agentListFn func() ([]map[string]any, error) // database is used for delegate_to_agent to look up service address database *db.DB } func NewExecutor(projectRoot string, agentListFn func() ([]map[string]any, error)) *Executor { return &Executor{ projectRoot: projectRoot, httpClient: &http.Client{ Timeout: 60 * time.Second, }, agentListFn: agentListFn, } } // SetDatabase injects the DB reference so delegate_to_agent can resolve agent addresses. func (e *Executor) SetDatabase(database *db.DB) { e.database = database } // Execute dispatches a tool call by name. func (e *Executor) Execute(ctx context.Context, toolName string, argsJSON string) ToolResult { start := time.Now() var args map[string]any if err := json.Unmarshal([]byte(argsJSON), &args); err != nil { return ToolResult{Success: false, Error: "invalid args JSON: " + err.Error(), DurationMs: ms(start)} } var result any var execErr error switch toolName { case "shell_exec": result, execErr = e.shellExec(ctx, args) case "file_read": result, execErr = e.fileRead(args) case "file_write": result, execErr = e.fileWrite(args) case "file_list": result, execErr = e.fileList(args) case "http_request": result, execErr = e.httpRequest(ctx, args) case "docker_exec": result, execErr = e.dockerExec(ctx, args) case "list_agents": result, execErr = e.listAgents() case "delegate_to_agent": result, execErr = e.delegateToAgent(ctx, args) case "fanout_agents": result, execErr = e.fanoutAgents(ctx, args) default: return ToolResult{Success: false, Error: fmt.Sprintf("unknown tool: %s", toolName), DurationMs: ms(start)} } if execErr != nil { return ToolResult{Success: false, Error: execErr.Error(), DurationMs: ms(start)} } return ToolResult{Success: true, Result: result, DurationMs: ms(start)} } // ─── Tool Implementations ───────────────────────────────────────────────────── func (e *Executor) shellExec(ctx context.Context, args map[string]any) (any, error) { command, _ := args["command"].(string) if command == "" { return nil, fmt.Errorf("command is required") } // Safety: block destructive patterns blocked := []string{"rm -rf /", "mkfs", "dd if=/dev/zero", ":(){ :|:& };:"} for _, b := range blocked { if strings.Contains(command, b) { return nil, fmt.Errorf("command blocked for safety: contains '%s'", b) } } timeoutSec := 30 if t, ok := args["timeout"].(float64); ok && t > 0 { timeoutSec = int(t) } ctx2, cancel := context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second) defer cancel() // Prefer bash; fall back to sh (Alpine / minimal containers only have sh) shell := "bash" if _, err := exec.LookPath("bash"); err != nil { shell = "sh" } cmd := exec.CommandContext(ctx2, shell, "-c", command) cmd.Dir = e.projectRoot out, runErr := cmd.CombinedOutput() stdout := string(out) if len(stdout) > 20000 { stdout = stdout[:20000] + "\n...[truncated]" } exitCode := 0 if cmd.ProcessState != nil { exitCode = cmd.ProcessState.ExitCode() } if runErr != nil { // Return partial output even on error — LLM can see what happened return map[string]any{"stdout": stdout, "stderr": runErr.Error(), "exitCode": exitCode}, nil } return map[string]any{"stdout": stdout, "stderr": "", "exitCode": 0}, nil } func (e *Executor) fileRead(args map[string]any) (any, error) { path, _ := args["path"].(string) if path == "" { return nil, fmt.Errorf("path is required") } path = e.resolvePath(path) data, err := os.ReadFile(path) if err != nil { return nil, err } content := string(data) if len(content) > 50000 { content = content[:50000] + "\n...[truncated]" } return map[string]any{"content": content, "size": len(data), "path": path}, nil } func (e *Executor) fileWrite(args map[string]any) (any, error) { path, _ := args["path"].(string) content, _ := args["content"].(string) appendMode, _ := args["append"].(bool) if path == "" { return nil, fmt.Errorf("path is required") } path = e.resolvePath(path) if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { return nil, err } flag := os.O_WRONLY | os.O_CREATE | os.O_TRUNC if appendMode { flag = os.O_WRONLY | os.O_CREATE | os.O_APPEND } f, err := os.OpenFile(path, flag, 0644) if err != nil { return nil, err } defer f.Close() n, err := f.WriteString(content) if err != nil { return nil, err } return map[string]any{"written": n, "path": path, "append": appendMode}, nil } func (e *Executor) fileList(args map[string]any) (any, error) { path, _ := args["path"].(string) if path == "" { path = "." } path = e.resolvePath(path) recursive, _ := args["recursive"].(bool) var entries []map[string]any if recursive { err := filepath.Walk(path, func(p string, info os.FileInfo, err error) error { if err != nil { return nil } rel, _ := filepath.Rel(path, p) entries = append(entries, map[string]any{ "name": rel, "isDir": info.IsDir(), "size": info.Size(), }) return nil }) if err != nil { return nil, err } } else { dirEntries, err := os.ReadDir(path) if err != nil { return nil, err } for _, de := range dirEntries { info, _ := de.Info() size := int64(0) if info != nil { size = info.Size() } entries = append(entries, map[string]any{ "name": de.Name(), "isDir": de.IsDir(), "size": size, }) } } return map[string]any{"path": path, "entries": entries, "count": len(entries)}, nil } func (e *Executor) httpRequest(ctx context.Context, args map[string]any) (any, error) { url, _ := args["url"].(string) if url == "" { return nil, fmt.Errorf("url is required") } method := "GET" if m, ok := args["method"].(string); ok && m != "" { method = strings.ToUpper(m) } var bodyReader io.Reader if body, ok := args["body"].(string); ok && body != "" { bodyReader = strings.NewReader(body) } req, err := http.NewRequestWithContext(ctx, method, url, bodyReader) if err != nil { return nil, err } req.Header.Set("User-Agent", "GoClaw-Gateway/1.0") if headers, ok := args["headers"].(map[string]any); ok { for k, v := range headers { req.Header.Set(k, fmt.Sprintf("%v", v)) } } if bodyReader != nil && req.Header.Get("Content-Type") == "" { req.Header.Set("Content-Type", "application/json") } resp, err := e.httpClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() respBody, _ := io.ReadAll(resp.Body) text := string(respBody) if len(text) > 10000 { text = text[:10000] + "\n...[truncated]" } return map[string]any{ "status": resp.StatusCode, "statusText": resp.Status, "body": text, }, nil } func (e *Executor) dockerExec(ctx context.Context, args map[string]any) (any, error) { command, _ := args["command"].(string) if command == "" { return nil, fmt.Errorf("command is required") } ctx2, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() // Try docker CLI first (available if docker binary is in PATH) parts := strings.Fields("docker " + command) cmd := exec.CommandContext(ctx2, parts[0], parts[1:]...) out, err := cmd.CombinedOutput() output := string(out) if err == nil { if len(output) > 10000 { output = output[:10000] + "\n...[truncated]" } return map[string]any{"output": output}, nil } // Fallback: route via Docker socket using curl (gateway has /var/run/docker.sock mounted) // This works even when docker CLI binary is not installed in the container. subCmd := strings.TrimSpace(command) fields := strings.Fields(subCmd) if len(fields) == 0 { return map[string]any{"output": output, "error": err.Error()}, nil } firstWord := fields[0] var shellCmd string switch firstWord { case "ps": all := "false" if strings.Contains(subCmd, "-a") || strings.Contains(subCmd, "--all") { all = "true" } // Use jq if available, fall back to raw JSON (both alpine gateway and agent-worker have curl) shellCmd = fmt.Sprintf( `curl -sf --unix-socket /var/run/docker.sock "http://localhost/v1.44/containers/json?all=%s" | `+ `jq -r '.[] | (.Id[:12]) + " " + .Image + " " + .Status + " " + (.Names|join(","))' 2>/dev/null || `+ `curl -sf --unix-socket /var/run/docker.sock "http://localhost/v1.44/containers/json?all=%s"`, all, all) case "logs": if len(fields) < 2 { return nil, fmt.Errorf("docker logs requires container name/id") } container := fields[len(fields)-1] tail := "100" shellCmd = fmt.Sprintf( `curl -sf --unix-socket /var/run/docker.sock `+ `"http://localhost/v1.44/containers/%s/logs?stdout=true&stderr=true&tail=%s×tamps=false" 2>&1 | `+ `strings 2>/dev/null || cat`, container, tail) case "inspect": if len(fields) < 2 { return nil, fmt.Errorf("docker inspect requires container name/id") } container := fields[len(fields)-1] shellCmd = fmt.Sprintf( `curl -sf --unix-socket /var/run/docker.sock "http://localhost/v1.44/containers/%s/json"`, container) case "stats": if len(fields) < 2 { return nil, fmt.Errorf("docker stats requires container name/id") } container := fields[len(fields)-1] shellCmd = fmt.Sprintf( `curl -sf --unix-socket /var/run/docker.sock "http://localhost/v1.44/containers/%s/stats?stream=false"`, container) default: return map[string]any{ "output": output, "error": fmt.Sprintf("docker CLI not found in $PATH; socket fallback supports: ps, logs, inspect, stats. Command was: docker %s", command), "hint": "Use shell_exec with 'curl -s --unix-socket /var/run/docker.sock ...' for other Docker API calls", }, nil } fallbackCmd := exec.CommandContext(ctx2, "sh", "-c", shellCmd) fallbackOut, fallbackErr := fallbackCmd.CombinedOutput() result := string(fallbackOut) if len(result) > 10000 { result = result[:10000] + "\n...[truncated]" } if fallbackErr != nil { return map[string]any{"output": result, "error": "socket fallback: " + fallbackErr.Error()}, nil } return map[string]any{"output": result, "via": "docker-socket-api"}, nil } func (e *Executor) listAgents() (any, error) { if e.agentListFn == nil { return map[string]any{"agents": []any{}, "note": "DB not connected"}, nil } agents, err := e.agentListFn() if err != nil { return nil, err } return map[string]any{"agents": agents, "count": len(agents)}, nil } // A2ATaskRequest is the standard agent-to-agent task message format (Phase C). type A2ATaskRequest struct { TaskID string `json:"task_id"` FromAgentID int `json:"from_agent_id"` Task string `json:"input"` CallbackURL string `json:"callback_url,omitempty"` Priority int `json:"priority"` TimeoutSecs int `json:"timeout_secs"` } // delegateToAgent sends a task to an agent's container via A2A HTTP protocol. // Resolves the agent's service address from DB, respects priority/timeout from args. // Falls back with a clear message if agent is not deployed/running. func (e *Executor) delegateToAgent(ctx context.Context, args map[string]any) (any, error) { agentIDf, _ := args["agentId"].(float64) agentID := int(agentIDf) task, _ := args["task"].(string) if task == "" { task, _ = args["message"].(string) // backward compat } if task == "" { return nil, fmt.Errorf("task is required") } callbackURL, _ := args["callbackUrl"].(string) async, _ := args["async"].(bool) priority := 5 if pf, ok := args["priority"].(float64); ok && pf > 0 { priority = int(pf) } timeoutSecs := 120 if tf, ok := args["timeoutSecs"].(float64); ok && tf > 0 { timeoutSecs = int(tf) } // Resolve agent container address from DB if e.database != nil { cfg, err := e.database.GetAgentByID(agentID) if err == nil && cfg != nil && cfg.ServicePort > 0 && cfg.ContainerStatus == "running" { agentURL := fmt.Sprintf("http://%s:%d", cfg.ServiceName, cfg.ServicePort) req := A2ATaskRequest{ TaskID: fmt.Sprintf("orch-%d-%d", agentID, time.Now().UnixMilli()), FromAgentID: 0, // orchestrator Task: task, CallbackURL: callbackURL, Priority: priority, TimeoutSecs: timeoutSecs, } if async { return e.postA2ATask(ctx, agentURL, req) } return e.postA2AChat(ctx, agentURL, task, timeoutSecs) } if e.database != nil { cfg, _ := e.database.GetAgentByID(agentID) status := "unknown" if cfg != nil { status = cfg.ContainerStatus if status == "" { status = "stopped" } } return map[string]any{ "delegated": false, "agentId": agentID, "status": status, "note": fmt.Sprintf( "Agent %d container is %q. Deploy it via Web Panel (POST /api/agents/%d/deploy) then retry.", agentID, status, agentID), }, nil } } return map[string]any{ "delegated": false, "agentId": agentID, "note": "No database connection — cannot resolve agent address.", }, nil } // postA2ATask POSTs to agent's /task endpoint using A2A protocol (async). func (e *Executor) postA2ATask(ctx context.Context, agentURL string, req A2ATaskRequest) (any, error) { payload, _ := json.Marshal(req) httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, agentURL+"/task", bytes.NewReader(payload)) if err != nil { return nil, fmt.Errorf("a2a build request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("X-GoClaw-From", "orchestrator") resp, err := e.httpClient.Do(httpReq) if err != nil { return nil, fmt.Errorf("a2a task HTTP error: %w", err) } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) var result map[string]any _ = json.Unmarshal(body, &result) return result, nil } // postA2AChat POSTs to agent's /chat endpoint (sync, waits for LLM response). func (e *Executor) postA2AChat(ctx context.Context, agentURL string, task string, timeoutSecs int) (any, error) { payload, _ := json.Marshal(map[string]any{ "messages": []map[string]string{{"role": "user", "content": task}}, "timeout_secs": timeoutSecs, }) chatCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSecs)*time.Second) defer cancel() httpReq, err := http.NewRequestWithContext(chatCtx, http.MethodPost, agentURL+"/chat", bytes.NewReader(payload)) if err != nil { return nil, fmt.Errorf("a2a chat request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("X-GoClaw-From", "orchestrator") // Use a client with longer timeout for sync chats client := &http.Client{Timeout: time.Duration(timeoutSecs+10) * time.Second} resp, err := client.Do(httpReq) if err != nil { return nil, fmt.Errorf("a2a chat HTTP error: %w", err) } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) var result map[string]any _ = json.Unmarshal(body, &result) return result, nil } // fanoutAgents sends the same task to multiple agents in parallel and collects results. func (e *Executor) fanoutAgents(ctx context.Context, args map[string]any) (any, error) { task, _ := args["task"].(string) if task == "" { return nil, fmt.Errorf("task is required") } timeoutSecs := 60 if tf, ok := args["timeoutSecs"].(float64); ok && tf > 0 { timeoutSecs = int(tf) } // Parse agentIds array rawIDs, _ := args["agentIds"].([]any) if len(rawIDs) == 0 { return nil, fmt.Errorf("agentIds must be a non-empty array") } if len(rawIDs) > 10 { rawIDs = rawIDs[:10] // cap at 10 } type agentResult struct { AgentID int `json:"agentId"` AgentName string `json:"agentName,omitempty"` Success bool `json:"success"` Result any `json:"result,omitempty"` Error string `json:"error,omitempty"` Delegated bool `json:"delegated"` DurationMs int64 `json:"durationMs"` } results := make([]agentResult, len(rawIDs)) var wg sync.WaitGroup fanCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSecs+5)*time.Second) defer cancel() for i, rawID := range rawIDs { idf, _ := rawID.(float64) agentID := int(idf) idx := i wg.Add(1) go func() { defer wg.Done() start := time.Now() ar := agentResult{AgentID: agentID} if e.database == nil { ar.Error = "no database connection" results[idx] = ar return } cfg, err := e.database.GetAgentByID(agentID) if err != nil || cfg == nil { ar.Error = fmt.Sprintf("agent %d not found", agentID) results[idx] = ar return } ar.AgentName = cfg.Name if cfg.ServicePort == 0 || cfg.ContainerStatus != "running" { ar.Delegated = false ar.Error = fmt.Sprintf("agent %q is %q — not running", cfg.Name, cfg.ContainerStatus) results[idx] = ar return } agentURL := fmt.Sprintf("http://%s:%d", cfg.ServiceName, cfg.ServicePort) res, chatErr := e.postA2AChat(fanCtx, agentURL, task, timeoutSecs) ar.DurationMs = ms(start) if chatErr != nil { ar.Success = false ar.Error = chatErr.Error() } else { ar.Success = true ar.Delegated = true ar.Result = res } results[idx] = ar }() } wg.Wait() succeeded := 0 for _, r := range results { if r.Success { succeeded++ } } return map[string]any{ "task": task, "total": len(results), "succeeded": succeeded, "failed": len(results) - succeeded, "results": results, }, nil } // ─── Helpers ────────────────────────────────────────────────────────────────── func (e *Executor) resolvePath(path string) string { if filepath.IsAbs(path) { return path } return filepath.Join(e.projectRoot, path) } func ms(start time.Time) int64 { return time.Since(start).Milliseconds() }