feat(gateway): restore Phase C full agent lifecycle API

- Restored Phase C gateway code (handlers, main.go, docker client, db)
- Added routes: GET /api/agents/running, POST /api/agents (CRUD),
  POST /api/agents/{id}/deploy, POST /api/agents/{id}/stop,
  POST /api/agents/{id}/restart, POST /api/agents/{id}/scale
- Fixed StopAgent: always try to stop by canonical name goclaw-agent-{id}
  even when serviceName is empty in DB
- Fixed DeployAgent: handle 409 conflict by removing existing container
  and retrying once (idempotent deploy)
- Added swarm_manager.go: background SwarmManager for dead-letter recovery
- Added AGENT_NETWORK and AGENT_DB_URL config options
- Updated .gitignore to exclude gateway binaries
- All agents use standalone docker run (not Swarm) on bridge network

Verified on prod: deploy/stop/restart cycle works correctly,
/api/agents/running returns live running agents with containerStatus
This commit is contained in:
bboxwtf
2026-04-19 11:40:39 +00:00
parent dbde22cec1
commit f8e0ca7d5d
16 changed files with 4505 additions and 257 deletions

5
.gitignore vendored
View File

@@ -121,3 +121,8 @@ deploy-secrets
.kilo/
.manus/
AGENTS.md
# Gateway binaries
gateway/gateway
gateway/gateway-new
gateway/agent-worker

View File

@@ -127,30 +127,50 @@ type AgentWorker struct {
// Recent tasks ring buffer (для GET /tasks)
recentMu sync.Mutex
recentKeys []string
// Rate-limiting semaphore — limits concurrent LLM calls.
// Filled with MAX_CONCURRENT_TASKS tokens; each worker acquires one before
// calling runChat() and releases it when done.
rateSem chan struct{}
// maxConcurrent is the configured concurrency limit (exported for /health).
maxConcurrent int
}
const (
taskQueueDepth = 100
maxRecentTasks = 50
defaultMaxIter = 8
defaultTimeout = 120
workerGoroutines = 4 // параллельных воркеров на агента
taskQueueDepth = 100
maxRecentTasks = 50
defaultMaxIter = 8
defaultTimeout = 120
workerGoroutines = 4 // параллельных воркеров на агента
defaultMaxConcurrent = 2 // default simultaneous LLM calls per agent
)
func newAgentWorker(agentID int, database *db.DB, llmClient *llm.Client) (*AgentWorker, error) {
func newAgentWorker(agentID int, database *db.DB, llmClient *llm.Client, maxConcurrent int) (*AgentWorker, error) {
cfg, err := database.GetAgentByID(agentID)
if err != nil {
return nil, fmt.Errorf("agent %d not found in DB: %w", agentID, err)
}
log.Printf("[AgentWorker] Loaded config: id=%d name=%q model=%s", cfg.ID, cfg.Name, cfg.Model)
if maxConcurrent <= 0 {
maxConcurrent = defaultMaxConcurrent
}
// Fill the semaphore with tokens equal to the concurrency limit.
sem := make(chan struct{}, maxConcurrent)
for i := 0; i < maxConcurrent; i++ {
sem <- struct{}{}
}
w := &AgentWorker{
agentID: agentID,
cfg: cfg,
llm: llmClient,
database: database,
taskQueue: make(chan *Task, taskQueueDepth),
tasks: make(map[string]*Task),
agentID: agentID,
cfg: cfg,
llm: llmClient,
database: database,
taskQueue: make(chan *Task, taskQueueDepth),
tasks: make(map[string]*Task),
rateSem: sem,
maxConcurrent: maxConcurrent,
}
// Tool executor: агент использует подмножество инструментов из allowedTools
w.executor = tools.NewExecutor("/app", func() ([]map[string]any, error) {
@@ -234,7 +254,23 @@ func (w *AgentWorker) EnqueueTask(req TaskRequest) *Task {
}
// processTask выполняет задачу через LLM loop и обновляет её статус.
// Acquires a rate-limiting token before invoking the LLM to cap concurrent
// calls at w.maxConcurrent.
func (w *AgentWorker) processTask(ctx context.Context, task *Task) {
// ── Rate limiting: acquire a token ───────────────────────────────────────
// If no token is available, block until one frees up or ctx is cancelled.
select {
case <-ctx.Done():
w.tasksMu.Lock()
task.Status = TaskCancelled
task.Error = "context cancelled before execution"
w.tasksMu.Unlock()
return
case <-w.rateSem:
// acquired
}
defer func() { w.rateSem <- struct{}{} }() // release token
now := time.Now()
w.tasksMu.Lock()
task.Status = TaskRunning
@@ -478,12 +514,17 @@ func (w *AgentWorker) postCallback(task *Task) {
// ─── HTTP Handlers ────────────────────────────────────────────────────────────
func (w *AgentWorker) handleHealth(rw http.ResponseWriter, r *http.Request) {
activeSlots := w.maxConcurrent - len(w.rateSem)
rw.Header().Set("Content-Type", "application/json")
json.NewEncoder(rw).Encode(map[string]any{
"status": "ok",
"agentId": w.agentID,
"name": w.cfg.Name,
"model": w.cfg.Model,
"queueLen": len(w.taskQueue),
"status": "ok",
"agentId": w.agentID,
"name": w.cfg.Name,
"model": w.cfg.Model,
"queueLen": len(w.taskQueue),
"activeTasks": activeSlots,
"maxConcurrent": w.maxConcurrent,
"rateLimitFree": len(w.rateSem),
})
}
@@ -643,7 +684,16 @@ func main() {
log.Fatal("[AgentWorker] DATABASE_URL env var is required")
}
log.Printf("[AgentWorker] Starting: AGENT_ID=%d PORT=%s LLM=%s", agentID, port, llmBaseURL)
// MAX_CONCURRENT_TASKS controls the rate-limiting semaphore (Phase C).
maxConcurrent := defaultMaxConcurrent
if mcStr := os.Getenv("MAX_CONCURRENT_TASKS"); mcStr != "" {
if mc, err := strconv.Atoi(mcStr); err == nil && mc > 0 {
maxConcurrent = mc
}
}
log.Printf("[AgentWorker] Starting: AGENT_ID=%d PORT=%s LLM=%s MAX_CONCURRENT=%d",
agentID, port, llmBaseURL, maxConcurrent)
// ── DB ───────────────────────────────────────────────────────────────────
database, err := db.Connect(dbURL)
@@ -656,7 +706,7 @@ func main() {
llmClient := llm.NewClient(llmBaseURL, llmAPIKey)
// ── Agent Worker ─────────────────────────────────────────────────────────
worker, err := newAgentWorker(agentID, database, llmClient)
worker, err := newAgentWorker(agentID, database, llmClient, maxConcurrent)
if err != nil {
log.Fatalf("[AgentWorker] init failed: %v", err)
}
@@ -724,4 +774,4 @@ func getEnvFirst(keys ...string) string {
}
}
return ""
}
}

View File

@@ -130,11 +130,18 @@ func TestEnqueueTask_DefaultTimeout(t *testing.T) {
// ─── HTTP Handlers ────────────────────────────────────────────────────────────
func makeTestWorker() *AgentWorker {
mc := defaultMaxConcurrent
sem := make(chan struct{}, mc)
for i := 0; i < mc; i++ {
sem <- struct{}{}
}
return &AgentWorker{
agentID: 42,
cfg: mockAgentConfig(),
taskQueue: make(chan *Task, taskQueueDepth),
tasks: make(map[string]*Task),
agentID: 42,
cfg: mockAgentConfig(),
taskQueue: make(chan *Task, taskQueueDepth),
tasks: make(map[string]*Task),
rateSem: sem,
maxConcurrent: mc,
}
}
@@ -435,4 +442,107 @@ func TestWorkerProcessesTask_WithMockLLM(t *testing.T) {
if finalStatus != TaskDone {
t.Errorf("expected task done, got %s", finalStatus)
}
}
}
// ─── Phase C: Rate-limiting tests ─────────────────────────────────────────────
// TestRateLimiting_TokensInitialized verifies that the semaphore is filled with
// maxConcurrent tokens on worker creation.
func TestRateLimiting_TokensInitialized(t *testing.T) {
mc := 3
sem := make(chan struct{}, mc)
for i := 0; i < mc; i++ {
sem <- struct{}{}
}
w := &AgentWorker{
agentID: 42,
cfg: mockAgentConfig(),
taskQueue: make(chan *Task, taskQueueDepth),
tasks: make(map[string]*Task),
rateSem: sem,
maxConcurrent: mc,
}
if len(w.rateSem) != mc {
t.Errorf("expected %d tokens in semaphore, got %d", mc, len(w.rateSem))
}
if cap(w.rateSem) != mc {
t.Errorf("expected semaphore capacity=%d, got %d", mc, cap(w.rateSem))
}
}
// TestRateLimiting_TokenAcquireRelease verifies that tokens can be acquired and
// released correctly (simulating what processTask does).
func TestRateLimiting_TokenAcquireRelease(t *testing.T) {
mc := 2
sem := make(chan struct{}, mc)
for i := 0; i < mc; i++ {
sem <- struct{}{}
}
w := &AgentWorker{
agentID: 42,
cfg: mockAgentConfig(),
taskQueue: make(chan *Task, taskQueueDepth),
tasks: make(map[string]*Task),
rateSem: sem,
maxConcurrent: mc,
}
// Acquire both tokens
<-w.rateSem
<-w.rateSem
if len(w.rateSem) != 0 {
t.Errorf("expected 0 free tokens after acquiring all, got %d", len(w.rateSem))
}
// Release one token
w.rateSem <- struct{}{}
if len(w.rateSem) != 1 {
t.Errorf("expected 1 free token after release, got %d", len(w.rateSem))
}
// Release second token
w.rateSem <- struct{}{}
if len(w.rateSem) != mc {
t.Errorf("expected %d free tokens after full release, got %d", mc, len(w.rateSem))
}
}
// TestRateLimiting_HealthShowsActiveTasks verifies the /health endpoint reports
// active task count and rate-limit info.
func TestRateLimiting_HealthShowsActiveTasks(t *testing.T) {
mc := 3
sem := make(chan struct{}, mc)
for i := 0; i < mc; i++ {
sem <- struct{}{}
}
w := &AgentWorker{
agentID: 42,
cfg: mockAgentConfig(),
taskQueue: make(chan *Task, taskQueueDepth),
tasks: make(map[string]*Task),
rateSem: sem,
maxConcurrent: mc,
}
// Simulate 1 active task (consume 1 token)
<-w.rateSem
rr := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/health", nil)
w.handleHealth(rr, req)
var body map[string]any
if err := json.NewDecoder(rr.Body).Decode(&body); err != nil {
t.Fatalf("invalid JSON: %v", err)
}
if int(body["maxConcurrent"].(float64)) != mc {
t.Errorf("expected maxConcurrent=%d, got %v", mc, body["maxConcurrent"])
}
if int(body["rateLimitFree"].(float64)) != mc-1 {
t.Errorf("expected rateLimitFree=%d, got %v", mc-1, body["rateLimitFree"])
}
if int(body["activeTasks"].(float64)) != 1 {
t.Errorf("expected activeTasks=1, got %v", body["activeTasks"])
}
}

270
gateway/cmd/agent/main.go Normal file
View File

@@ -0,0 +1,270 @@
// GoClaw Agent Server — autonomous agent microservice
//
// Each agent runs as an independent container in the Docker Swarm overlay
// network. It exposes an HTTP API that the GoClaw Orchestrator can reach
// via the Swarm DNS name (e.g. http://goclaw-agent-researcher:8080).
//
// The agent:
// - Receives task requests from the orchestrator
// - Calls the LLM via the centrally-managed GoClaw Gateway
// - Reads/writes shared state in the MySQL database
// - Reports its last-activity time so the SwarmManager can auto-stop it
// - Gracefully shuts down after IdleTimeout with no requests
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
)
// ─── Config ──────────────────────────────────────────────────────────────────
type AgentConfig struct {
AgentID string
Port string
GatewayURL string
LLMURL string
LLMAPIKey string
DatabaseURL string
IdleTimeoutMinutes int
}
func loadConfig() AgentConfig {
idleMin := 15
if v := os.Getenv("IDLE_TIMEOUT_MINUTES"); v != "" {
if n, err := strconv.Atoi(v); err == nil {
idleMin = n
}
}
port := os.Getenv("AGENT_PORT")
if port == "" {
port = "8080"
}
return AgentConfig{
AgentID: getEnv("AGENT_ID", "unnamed-agent"),
Port: port,
GatewayURL: getEnv("GATEWAY_URL", "http://goclaw-gateway:18789"),
LLMURL: getEnv("LLM_BASE_URL", "https://ollama.com/v1"),
LLMAPIKey: os.Getenv("LLM_API_KEY"),
DatabaseURL: os.Getenv("DATABASE_URL"),
IdleTimeoutMinutes: idleMin,
}
}
func getEnv(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
// ─── State ───────────────────────────────────────────────────────────────────
type Agent struct {
cfg AgentConfig
lastActivity time.Time
httpClient *http.Client
}
func NewAgent(cfg AgentConfig) *Agent {
return &Agent{
cfg: cfg,
lastActivity: time.Now(),
httpClient: &http.Client{Timeout: 120 * time.Second},
}
}
func (a *Agent) touch() {
a.lastActivity = time.Now()
}
// ─── HTTP handlers ────────────────────────────────────────────────────────────
// GET /health — liveness probe
func (a *Agent) handleHealth(w http.ResponseWriter, r *http.Request) {
respond(w, 200, map[string]any{
"ok": true,
"agentId": a.cfg.AgentID,
"lastActivity": a.lastActivity.Format(time.RFC3339),
"idleMinutes": time.Since(a.lastActivity).Minutes(),
})
}
// POST /task — receive a task from the orchestrator
// Body: { "sessionId": "abc", "messages": [...], "model": "qwen2.5:7b", "maxIter": 5 }
func (a *Agent) handleTask(w http.ResponseWriter, r *http.Request) {
a.touch()
var body struct {
SessionID string `json:"sessionId"`
Messages json.RawMessage `json:"messages"`
Model string `json:"model"`
MaxIter int `json:"maxIter"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
respondError(w, 400, "invalid request: "+err.Error())
return
}
// Forward the task to the GoClaw Gateway orchestrator
gatewayURL := a.cfg.GatewayURL + "/api/orchestrator/chat"
reqBody, _ := json.Marshal(map[string]any{
"messages": body.Messages,
"model": body.Model,
"maxIter": body.MaxIter,
})
req, err := http.NewRequestWithContext(r.Context(), "POST", gatewayURL, strings.NewReader(string(reqBody)))
if err != nil {
respondError(w, 500, "request build error: "+err.Error())
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := a.httpClient.Do(req)
if err != nil {
respondError(w, 502, "gateway error: "+err.Error())
return
}
defer resp.Body.Close()
var result map[string]any
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
respondError(w, 502, "gateway response error: "+err.Error())
return
}
a.touch()
respond(w, 200, map[string]any{
"ok": true,
"agentId": a.cfg.AgentID,
"sessionId": body.SessionID,
"result": result,
})
}
// GET /info — agent metadata
func (a *Agent) handleInfo(w http.ResponseWriter, r *http.Request) {
hostname, _ := os.Hostname()
respond(w, 200, map[string]any{
"agentId": a.cfg.AgentID,
"hostname": hostname,
"gatewayUrl": a.cfg.GatewayURL,
"idleTimeout": a.cfg.IdleTimeoutMinutes,
"lastActivity": a.lastActivity.Format(time.RFC3339),
"idleMinutes": time.Since(a.lastActivity).Minutes(),
})
}
// ─── Idle watchdog ────────────────────────────────────────────────────────────
func (a *Agent) runIdleWatchdog(cancel context.CancelFunc) {
threshold := time.Duration(a.cfg.IdleTimeoutMinutes) * time.Minute
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
idle := time.Since(a.lastActivity)
if idle >= threshold {
log.Printf("[Agent %s] Idle for %.1f min — requesting self-stop via gateway",
a.cfg.AgentID, idle.Minutes())
a.selfStop()
cancel()
return
}
}
}
// selfStop asks the GoClaw Gateway to scale this service to 0.
func (a *Agent) selfStop() {
url := fmt.Sprintf("%s/api/swarm/agents/%s/stop", a.cfg.GatewayURL, a.cfg.AgentID)
req, err := http.NewRequest("POST", url, nil)
if err != nil {
log.Printf("[Agent %s] selfStop error building request: %v", a.cfg.AgentID, err)
return
}
resp, err := a.httpClient.Do(req)
if err != nil {
log.Printf("[Agent %s] selfStop error: %v", a.cfg.AgentID, err)
return
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
log.Printf("[Agent %s] selfStop response %d: %s", a.cfg.AgentID, resp.StatusCode, string(body))
}
// ─── Helpers ─────────────────────────────────────────────────────────────────
func respond(w http.ResponseWriter, status int, data any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(data)
}
func respondError(w http.ResponseWriter, status int, msg string) {
respond(w, status, map[string]any{"error": msg})
}
// ─── Main ─────────────────────────────────────────────────────────────────────
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
cfg := loadConfig()
agent := NewAgent(cfg)
log.Printf("[Agent] %s starting on port %s (idle timeout: %d min)",
cfg.AgentID, cfg.Port, cfg.IdleTimeoutMinutes)
log.Printf("[Agent] Gateway: %s", cfg.GatewayURL)
// ── HTTP server ──────────────────────────────────────────────────────────
mux := http.NewServeMux()
mux.HandleFunc("GET /health", agent.handleHealth)
mux.HandleFunc("POST /task", agent.handleTask)
mux.HandleFunc("GET /info", agent.handleInfo)
srv := &http.Server{
Addr: ":" + cfg.Port,
Handler: mux,
ReadTimeout: 30 * time.Second,
WriteTimeout: 150 * time.Second,
IdleTimeout: 120 * time.Second,
}
ctx, cancel := context.WithCancel(context.Background())
// ── Idle watchdog ────────────────────────────────────────────────────────
go agent.runIdleWatchdog(cancel)
// ── Graceful shutdown ────────────────────────────────────────────────────
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
go func() {
log.Printf("[Agent %s] Listening on :%s", cfg.AgentID, cfg.Port)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("[Agent %s] Server error: %v", cfg.AgentID, err)
}
}()
select {
case <-quit:
log.Printf("[Agent %s] Signal received — shutting down", cfg.AgentID)
case <-ctx.Done():
log.Printf("[Agent %s] Context cancelled — shutting down", cfg.AgentID)
}
shutCtx, shutCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutCancel()
if err := srv.Shutdown(shutCtx); err != nil {
log.Printf("[Agent %s] Shutdown error: %v", cfg.AgentID, err)
}
log.Printf("[Agent %s] Stopped.", cfg.AgentID)
}

View File

@@ -47,10 +47,35 @@ func main() {
// ── Orchestrator ─────────────────────────────────────────────────────────
orch := orchestrator.New(llmClient, database, cfg.ProjectRoot)
// Apply retry policy from config
orch.SetRetryPolicy(orchestrator.RetryPolicy{
MaxLLMRetries: cfg.MaxLLMRetries,
InitialDelay: time.Duration(cfg.RetryDelaySecs) * time.Second,
MaxDelay: 30 * time.Second,
RetryOnEmpty: true,
})
log.Printf("[Gateway] LLM retry policy: maxRetries=%d, initialDelay=%ds", cfg.MaxLLMRetries, cfg.RetryDelaySecs)
// ── HTTP Handlers ────────────────────────────────────────────────────────
h := api.NewHandler(cfg, llmClient, orch, database)
// ── Sync Swarm tokens to DB on startup ──────────────────────────────────
go func() {
time.Sleep(3 * time.Second) // wait for Docker daemon readiness
if database != nil {
dockerCl := h.GetDockerClient()
if tokens, err := dockerCl.GetJoinTokens(); err == nil {
addr := dockerCl.GetManagerAddr()
database.UpsertSwarmTokens(
tokens.JoinTokens.Worker,
tokens.JoinTokens.Manager,
addr,
)
log.Printf("[Gateway] Swarm tokens synced to DB. Manager addr: %s", addr)
}
}
}()
// ── Router ───────────────────────────────────────────────────────────────
r := chi.NewRouter()
@@ -76,11 +101,19 @@ func main() {
r.Route("/api", func(r chi.Router) {
// Orchestrator
r.Post("/orchestrator/chat", h.OrchestratorChat)
r.Post("/orchestrator/stream", h.OrchestratorStream)
r.Get("/orchestrator/config", h.OrchestratorConfig)
// Agents
// Agents — CRUD + Container lifecycle (Phase A-C)
r.Get("/agents", h.ListAgents)
r.Get("/agents/running", h.ListRunningAgents) // Phase C: service discovery
r.Post("/agents", h.CreateAgent)
r.Get("/agents/{id}", h.GetAgent)
r.Delete("/agents/{id}", h.DeleteAgent)
r.Post("/agents/{id}/deploy", h.DeployAgent)
r.Post("/agents/{id}/stop", h.StopAgent)
r.Post("/agents/{id}/scale", h.ScaleAgent)
r.Post("/agents/{id}/restart", h.RestartAgent) // Phase C: dead-letter restart
// Models
r.Get("/models", h.ListModels)
@@ -92,8 +125,41 @@ func main() {
// Nodes / Docker Swarm monitoring
r.Get("/nodes", h.ListNodes)
r.Get("/nodes/stats", h.NodeStats)
// Provider config reload (called by Node.js after provider change)
r.Post("/providers/reload", h.ProvidersReload)
// Persistent chat sessions (background processing, DB-backed)
r.Post("/chat/session", h.StartChatSession)
r.Get("/chat/sessions", h.ListChatSessions)
r.Get("/chat/session/{id}", h.GetChatSession)
r.Get("/chat/session/{id}/events", h.GetChatEvents)
// ── Real Docker Swarm Management ─────────────────────────────────────
r.Get("/swarm/info", h.SwarmInfo)
r.Get("/swarm/nodes", h.SwarmNodes)
r.Post("/swarm/nodes/{id}/label", h.SwarmAddNodeLabel)
r.Post("/swarm/nodes/{id}/availability", h.SwarmSetNodeAvailability)
r.Get("/swarm/services", h.SwarmServices)
r.Post("/swarm/services/create", h.SwarmCreateService)
r.Delete("/swarm/services/{id}", h.SwarmRemoveService)
r.Get("/swarm/services/{id}/tasks", h.SwarmServiceTasks)
r.Post("/swarm/services/{id}/scale", h.SwarmScaleService)
r.Get("/swarm/join-token", h.SwarmJoinToken)
r.Post("/swarm/join-node", h.SwarmJoinNodeViaSSH)
r.Post("/swarm/ssh-test", h.SwarmSSHTest)
r.Post("/swarm/shell", h.SwarmShell)
r.Get("/swarm/agents", h.SwarmListAgents)
r.Post("/swarm/agents/{name}/start", h.SwarmStartAgent)
r.Post("/swarm/agents/{name}/stop", h.SwarmStopAgent)
})
// ── Swarm Manager: auto-stop idle agents after 15 min ────────────────────
swarmMgr := api.NewSwarmManager(h, 60*time.Second)
managerCtx, managerCancel := context.WithCancel(context.Background())
go swarmMgr.Start(managerCtx)
defer managerCancel()
// ── Start Server ─────────────────────────────────────────────────────────
srv := &http.Server{
Addr: ":" + cfg.Port,

View File

@@ -46,6 +46,22 @@ type Config struct {
DefaultModel string
MaxToolIterations int
RequestTimeoutSecs int
// Docker overlay network for agent containers
// AGENT_NETWORK — name of the Docker overlay/bridge network agents are attached to.
// Default: goclaw-agents (a dedicated overlay network)
AgentNetwork string
// AGENT_DB_URL — DATABASE_URL passed to agent containers.
// Useful when agents run on an overlay network and the DB hostname differs.
// Falls back to DatabaseURL if not set.
AgentDBURL string
// LLM retry policy
// GATEWAY_MAX_LLM_RETRIES — additional attempts after a failure/empty response (default 3).
MaxLLMRetries int
// GATEWAY_RETRY_DELAY_SECS — initial delay before first retry in seconds (default 2).
RetryDelaySecs int
}
func Load() *Config {
@@ -55,6 +71,8 @@ func Load() *Config {
maxIter, _ := strconv.Atoi(getEnv("GATEWAY_MAX_TOOL_ITERATIONS", "10"))
timeout, _ := strconv.Atoi(getEnv("GATEWAY_REQUEST_TIMEOUT_SECS", "120"))
maxLLMRetries, _ := strconv.Atoi(getEnv("GATEWAY_MAX_LLM_RETRIES", "3"))
retryDelaySecs, _ := strconv.Atoi(getEnv("GATEWAY_RETRY_DELAY_SECS", "2"))
// Resolve LLM base URL — priority: LLM_BASE_URL > OLLAMA_BASE_URL > default cloud
rawLLMURL := getEnvFirst(
@@ -82,6 +100,10 @@ func Load() *Config {
DefaultModel: getEnv("DEFAULT_MODEL", "qwen2.5:7b"),
MaxToolIterations: maxIter,
RequestTimeoutSecs: timeout,
MaxLLMRetries: maxLLMRetries,
RetryDelaySecs: retryDelaySecs,
AgentNetwork: getEnv("AGENT_NETWORK", "goclaw-agents"),
AgentDBURL: getEnv("AGENT_DB_URL", ""),
}
if cfg.LLMAPIKey == "" {

View File

@@ -3,11 +3,15 @@ module git.softuniq.eu/UniqAI/GoClaw/gateway
go 1.23.4
require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/go-chi/chi/v5 v5.2.1
github.com/go-chi/cors v1.2.1
github.com/go-sql-driver/mysql v1.8.1 // indirect
github.com/go-sql-driver/mysql v1.8.1
github.com/google/uuid v1.6.0
github.com/jmoiron/sqlx v1.4.0 // indirect
github.com/joho/godotenv v1.5.1
golang.org/x/crypto v0.37.0
)
require (
filippo.io/edwards25519 v1.1.0 // indirect
golang.org/x/sys v0.32.0 // indirect
)

View File

@@ -8,9 +8,11 @@ github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpv
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=
github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.31.0 h1:erwDkOK1Msy6offm1mOgvspSkslFnIGsFnxOKoufg3o=
golang.org/x/term v0.31.0/go.mod h1:R4BeIy7D95HzImkxGkTW1UQTtP54tio2RyHz7PwK0aw=

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,334 @@
package api
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"git.softuniq.eu/UniqAI/GoClaw/gateway/config"
"git.softuniq.eu/UniqAI/GoClaw/gateway/internal/db"
)
// mockDB implements only the methods needed for agent handler tests.
type mockDB struct {
agents []db.AgentRow
configs map[int]*db.AgentConfig
created []db.CreateAgentInput
deleted []int
statusUpdates []statusUpdate
nextID int
}
type statusUpdate struct {
agentID int
status string
serviceName string
servicePort int
}
func newMockDB() *mockDB {
return &mockDB{
configs: map[int]*db.AgentConfig{},
nextID: 100,
}
}
func (m *mockDB) ListAgents() ([]db.AgentRow, error) {
return m.agents, nil
}
func (m *mockDB) GetAgentByID(id int) (*db.AgentConfig, error) {
if cfg, ok := m.configs[id]; ok {
return cfg, nil
}
return nil, nil
}
func (m *mockDB) CreateAgent(in db.CreateAgentInput) (int, error) {
m.created = append(m.created, in)
id := m.nextID
m.nextID++
m.configs[id] = &db.AgentConfig{
ID: id,
Name: in.Name,
Model: in.Model,
}
return id, nil
}
func (m *mockDB) DeleteAgent(id int) error {
m.deleted = append(m.deleted, id)
delete(m.configs, id)
return nil
}
func (m *mockDB) UpdateContainerStatus(agentID int, status, serviceName string, servicePort int) error {
m.statusUpdates = append(m.statusUpdates, statusUpdate{agentID, status, serviceName, servicePort})
if cfg, ok := m.configs[agentID]; ok {
cfg.ContainerStatus = status
cfg.ServiceName = serviceName
cfg.ServicePort = servicePort
}
return nil
}
func (m *mockDB) AssignServicePort(start, max int) (int, error) {
return start, nil
}
// ─── DB adapter: wrap mockDB to satisfy Handler which uses *db.DB ────────────
// We use composition instead — create a handler variant that uses an interface.
// For these tests, we bypass the Handler's db field and test the logic separately.
// ─── Unit tests for new agent REST endpoints ──────────────────────────────────
// TestCreateAgent_MissingDB verifies 503 when DB is nil.
func TestCreateAgent_MissingDB(t *testing.T) {
h := &Handler{
cfg: &config.Config{},
db: nil, // no DB
}
body := `{"name":"Test","model":"qwen2.5:7b"}`
req := httptest.NewRequest(http.MethodPost, "/api/agents", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
h.CreateAgent(w, req)
if w.Code != http.StatusServiceUnavailable {
t.Errorf("expected 503, got %d", w.Code)
}
}
// TestDeleteAgent_MissingDB verifies 503 when DB is nil.
func TestDeleteAgent_MissingDB(t *testing.T) {
h := &Handler{
cfg: &config.Config{},
db: nil,
}
req := httptest.NewRequest(http.MethodDelete, "/api/agents/1", nil)
req.SetPathValue("id", "1")
w := httptest.NewRecorder()
h.DeleteAgent(w, req)
if w.Code != http.StatusServiceUnavailable {
t.Errorf("expected 503, got %d", w.Code)
}
}
// TestDeleteAgent_InvalidID verifies 400 for non-numeric id.
func TestDeleteAgent_InvalidID(t *testing.T) {
h := &Handler{
cfg: &config.Config{},
db: nil,
}
req := httptest.NewRequest(http.MethodDelete, "/api/agents/abc", nil)
req.SetPathValue("id", "abc")
w := httptest.NewRecorder()
h.DeleteAgent(w, req)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d", w.Code)
}
}
// TestDeployAgent_MissingDB verifies 503 when DB is nil.
func TestDeployAgent_MissingDB(t *testing.T) {
h := &Handler{
cfg: &config.Config{},
db: nil,
}
req := httptest.NewRequest(http.MethodPost, "/api/agents/1/deploy", nil)
req.SetPathValue("id", "1")
w := httptest.NewRecorder()
h.DeployAgent(w, req)
if w.Code != http.StatusServiceUnavailable {
t.Errorf("expected 503, got %d", w.Code)
}
}
// TestDeployAgent_InvalidID verifies 400 for non-numeric id.
func TestDeployAgent_InvalidID(t *testing.T) {
h := &Handler{
cfg: &config.Config{},
db: nil,
}
req := httptest.NewRequest(http.MethodPost, "/api/agents/xyz/deploy", nil)
req.SetPathValue("id", "xyz")
w := httptest.NewRecorder()
h.DeployAgent(w, req)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d", w.Code)
}
}
// TestStopAgent_MissingDB verifies 503 when DB is nil.
func TestStopAgent_MissingDB(t *testing.T) {
h := &Handler{
cfg: &config.Config{},
db: nil,
}
req := httptest.NewRequest(http.MethodPost, "/api/agents/1/stop", nil)
req.SetPathValue("id", "1")
w := httptest.NewRecorder()
h.StopAgent(w, req)
if w.Code != http.StatusServiceUnavailable {
t.Errorf("expected 503, got %d", w.Code)
}
}
// TestScaleAgent_MissingDB verifies 503 when DB is nil.
func TestScaleAgent_MissingDB(t *testing.T) {
h := &Handler{
cfg: &config.Config{},
db: nil,
}
body := `{"replicas":3}`
req := httptest.NewRequest(http.MethodPost, "/api/agents/1/scale", bytes.NewBufferString(body))
req.SetPathValue("id", "1")
w := httptest.NewRecorder()
h.ScaleAgent(w, req)
if w.Code != http.StatusServiceUnavailable {
t.Errorf("expected 503, got %d", w.Code)
}
}
// TestScaleAgent_BadReplicas verifies 400 for replicas < 1.
func TestScaleAgent_BadReplicas(t *testing.T) {
h := &Handler{
cfg: &config.Config{},
db: nil,
}
body := `{"replicas":0}`
req := httptest.NewRequest(http.MethodPost, "/api/agents/1/scale", bytes.NewBufferString(body))
req.SetPathValue("id", "1")
w := httptest.NewRecorder()
h.ScaleAgent(w, req)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d", w.Code)
}
}
// TestListAgents_NoDB verifies empty list when DB is nil.
func TestListAgents_NoDB(t *testing.T) {
h := &Handler{
cfg: &config.Config{},
db: nil,
}
req := httptest.NewRequest(http.MethodGet, "/api/agents", nil)
w := httptest.NewRecorder()
h.ListAgents(w, req)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d", w.Code)
}
var resp map[string]any
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatal("invalid JSON:", err)
}
if _, ok := resp["note"]; !ok {
t.Error("expected 'note' field in response when DB is nil")
}
}
// TestCreateAgent_MissingName verifies 400 when name is empty.
func TestCreateAgent_MissingName(t *testing.T) {
h := &Handler{
cfg: &config.Config{},
db: nil, // will fail at DB check first, but let's also test directly
}
body := `{"model":"qwen2.5:7b"}`
req := httptest.NewRequest(http.MethodPost, "/api/agents", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
h.CreateAgent(w, req)
// When DB is nil => 503; that's fine, confirms routing works
if w.Code != http.StatusServiceUnavailable && w.Code != http.StatusBadRequest {
t.Errorf("expected 400 or 503, got %d", w.Code)
}
}
// TestDeployAgent_SimulatedNoDB verifies simulated deploy when docker is nil.
// We can't inject a real *db.DB without a live MySQL, so we just verify
// that the nil-docker path returns 200 with the right fields.
// This test verifies the handler structure is correct.
func TestDeployAgent_DockerNil_DBNil(t *testing.T) {
// When db is nil, returns 503
h := &Handler{
cfg: &config.Config{},
db: nil,
docker: nil,
}
req := httptest.NewRequest(http.MethodPost, "/api/agents/1/deploy", nil)
req.SetPathValue("id", "1")
w := httptest.NewRecorder()
h.DeployAgent(w, req)
if w.Code != http.StatusServiceUnavailable {
t.Errorf("expected 503, got %d", w.Code)
}
}
// ─── Phase C tests ─────────────────────────────────────────────────────────────
// TestListRunningAgents_NoDB verifies empty list when DB is nil.
func TestListRunningAgents_NoDB(t *testing.T) {
h := &Handler{cfg: &config.Config{}, db: nil}
req := httptest.NewRequest(http.MethodGet, "/api/agents/running", nil)
w := httptest.NewRecorder()
h.ListRunningAgents(w, req)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d", w.Code)
}
var resp map[string]any
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatal("invalid JSON:", err)
}
if cnt, _ := resp["count"].(float64); cnt != 0 {
t.Errorf("expected count=0, got %v", cnt)
}
}
// TestRestartAgent_NoDB verifies 503 when DB is nil.
func TestRestartAgent_NoDB(t *testing.T) {
h := &Handler{cfg: &config.Config{}, db: nil}
req := httptest.NewRequest(http.MethodPost, "/api/agents/1/restart", nil)
req.SetPathValue("id", "1")
w := httptest.NewRecorder()
h.RestartAgent(w, req)
// RestartAgent calls DeployAgent which checks DB → 503
if w.Code != http.StatusServiceUnavailable {
t.Errorf("expected 503, got %d", w.Code)
}
}
// TestRestartAgent_InvalidID verifies 400 for non-numeric id.
func TestRestartAgent_InvalidID(t *testing.T) {
h := &Handler{cfg: &config.Config{}, db: nil}
req := httptest.NewRequest(http.MethodPost, "/api/agents/xyz/restart", nil)
req.SetPathValue("id", "xyz")
w := httptest.NewRecorder()
h.RestartAgent(w, req)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d", w.Code)
}
}
// TestSwarmManagerDeadLetter_DockerNil verifies that checkIdleAgents
// returns early without panic when docker is nil.
func TestSwarmManagerDeadLetter_DockerNil(t *testing.T) {
h := &Handler{cfg: &config.Config{}, db: nil, docker: nil}
sm := NewSwarmManager(h, 60*time.Second)
// Should not panic
sm.checkIdleAgents()
}

View File

@@ -0,0 +1,196 @@
// Package api Swarm Agent Lifecycle Manager
//
// The SwarmManager runs as a background goroutine inside the GoClaw Gateway
// (which is the Swarm manager node). It watches all agent services and
// automatically scales them to 0 replicas after IdleTimeout minutes of no
// activity. The orchestrator can call StartAgent / StopAgent via the REST API
// to start/stop agents on demand.
//
// Start flow: POST /api/swarm/agents/{name}/start → scale to N replicas (default 1)
// Stop flow: POST /api/swarm/agents/{name}/stop → scale to 0
// Auto-stop: background loop checks every 60 s, scales idle agents to 0
package api
import (
"context"
"encoding/json"
"log"
"net/http"
"time"
)
const (
// IdleTimeout how many minutes without any task updates before an agent
// is automatically scaled to 0.
defaultIdleTimeoutMinutes = 15
// deadLetterCheckEnabled when true, SwarmManager reconciles DB containerStatus
// with actual Swarm service existence (dead-letter recovery).
deadLetterCheckEnabled = true
)
// SwarmManager watches agent services and auto-scales them down after idle.
type SwarmManager struct {
handler *Handler
ticker *time.Ticker
done chan struct{}
}
// NewSwarmManager creates a manager that checks every checkInterval.
func NewSwarmManager(h *Handler, checkInterval time.Duration) *SwarmManager {
return &SwarmManager{
handler: h,
ticker: time.NewTicker(checkInterval),
done: make(chan struct{}),
}
}
// Start launches the background loop. Call in a goroutine.
func (m *SwarmManager) Start(ctx context.Context) {
log.Printf("[SwarmManager] Started — idle timeout %d min",
defaultIdleTimeoutMinutes)
defer m.ticker.Stop()
for {
select {
case <-m.done:
return
case <-ctx.Done():
return
case <-m.ticker.C:
m.checkIdleAgents()
}
}
}
// Stop signals the background loop to exit.
func (m *SwarmManager) Stop() {
close(m.done)
}
func (m *SwarmManager) checkIdleAgents() {
h := m.handler
if h.docker == nil {
return
}
// Build a lookup set of currently-live container/service names (both standalone + Swarm)
liveContainers := make(map[string]bool)
// Check standalone containers
if containers, err := h.docker.ListContainers(); err == nil {
for _, c := range containers {
for _, name := range c.Names {
// Docker container names are prefixed with "/"
clean := name
if len(clean) > 0 && clean[0] == '/' {
clean = clean[1:]
}
if c.Labels["goclaw.agent"] == "true" {
liveContainers[clean] = true
}
}
}
}
// Check Swarm services (for legacy/mixed environments)
services, err := h.docker.ListServices()
if err != nil {
log.Printf("[SwarmManager] list services error: %v", err)
}
idleThreshold := time.Duration(defaultIdleTimeoutMinutes) * time.Minute
now := time.Now()
for _, svc := range services {
// Only manage services labelled as GoClaw agents
if svc.Spec.Labels["goclaw.agent"] != "true" {
continue
}
liveContainers[svc.Spec.Name] = true
// Skip already-stopped services (0 desired replicas)
desired := 0
if svc.Spec.Mode.Replicated != nil {
desired = svc.Spec.Mode.Replicated.Replicas
}
if desired == 0 {
continue
}
// Check last activity time
lastActivity, actErr := h.docker.GetServiceLastActivity(svc.ID)
if actErr != nil || lastActivity.IsZero() {
lastActivity = svc.UpdatedAt
}
idle := now.Sub(lastActivity)
if idle >= idleThreshold {
log.Printf("[SwarmManager] Agent '%s' idle for %.1f min → scaling to 0",
svc.Spec.Name, idle.Minutes())
if scaleErr := h.docker.ScaleService(svc.ID, 0); scaleErr != nil {
log.Printf("[SwarmManager] scale-to-0 error for %s: %v", svc.Spec.Name, scaleErr)
}
}
}
// ── Dead-letter reconciliation (Phase C) ─────────────────────────────────
// If an agent's DB says "running" but its container/service is gone, update
// the status to "error" so the UI shows the discrepancy and allows redeploy.
if !deadLetterCheckEnabled || h.db == nil {
return
}
agents, dbErr := h.db.ListAgents()
if dbErr != nil {
return
}
for _, a := range agents {
if a.ContainerStatus != "running" || a.ServiceName == "" {
continue
}
if !liveContainers[a.ServiceName] {
log.Printf("[SwarmManager] Dead-letter: agent %d (%q) marked running but container %q not found — setting status=error",
a.ID, a.Name, a.ServiceName)
if updateErr := h.db.UpdateContainerStatus(a.ID, "error", a.ServiceName, a.ServicePort); updateErr != nil {
log.Printf("[SwarmManager] UpdateContainerStatus error for agent %d: %v", a.ID, updateErr)
}
}
}
}
// ─── HTTP Handlers for agent lifecycle ────────────────────────────────────────
// POST /api/swarm/agents/{name}/start
// Start (scale-up) a named agent service. Body: { "replicas": 1 }
func (h *Handler) SwarmStartAgent(w http.ResponseWriter, r *http.Request) {
name := r.PathValue("name")
if name == "" {
respondError(w, http.StatusBadRequest, "agent name required")
return
}
var body struct {
Replicas int `json:"replicas"`
}
_ = json.NewDecoder(r.Body).Decode(&body)
if body.Replicas <= 0 {
body.Replicas = 1
}
if err := h.docker.ScaleService(name, body.Replicas); err != nil {
respondError(w, http.StatusInternalServerError, "start agent: "+err.Error())
return
}
log.Printf("[Swarm] Agent '%s' started with %d replica(s)", name, body.Replicas)
respond(w, http.StatusOK, map[string]any{"ok": true, "name": name, "replicas": body.Replicas})
}
// POST /api/swarm/agents/{name}/stop
// Stop (scale-to-0) a named agent service.
func (h *Handler) SwarmStopAgent(w http.ResponseWriter, r *http.Request) {
name := r.PathValue("name")
if name == "" {
respondError(w, http.StatusBadRequest, "agent name required")
return
}
if err := h.docker.ScaleService(name, 0); err != nil {
respondError(w, http.StatusInternalServerError, "stop agent: "+err.Error())
return
}
log.Printf("[Swarm] Agent '%s' stopped (scaled to 0)", name)
respond(w, http.StatusOK, map[string]any{"ok": true, "name": name, "replicas": 0})
}

View File

@@ -3,6 +3,7 @@ package db
import (
"database/sql"
"database/sql/driver"
"encoding/json"
"fmt"
"log"
@@ -20,9 +21,9 @@ type AgentConfig struct {
AllowedTools []string
Temperature float64
MaxTokens int
IsOrchestrator bool
IsSystem bool
IsActive bool
IsOrchestrator bool
IsSystem bool
IsActive bool
// Container / Swarm fields (Phase A)
ServiceName string
ServicePort int
@@ -32,14 +33,34 @@ type AgentConfig struct {
// AgentRow is a minimal agent representation for listing.
type AgentRow struct {
ID int `json:"id"`
Name string `json:"name"`
Role string `json:"role"`
Model string `json:"model"`
Description string `json:"description"`
IsActive bool `json:"isActive"`
IsSystem bool `json:"isSystem"`
IsOrchestrator bool `json:"isOrchestrator"`
ID int `json:"id"`
Name string `json:"name"`
Role string `json:"role"`
Model string `json:"model"`
Description string `json:"description"`
IsActive bool `json:"isActive"`
IsSystem bool `json:"isSystem"`
IsOrchestrator bool `json:"isOrchestrator"`
// Container / Swarm fields
ServiceName string `json:"serviceName"`
ServicePort int `json:"servicePort"`
ContainerImage string `json:"containerImage"`
ContainerStatus string `json:"containerStatus"`
}
// CreateAgentInput holds the fields required to create a new agent in DB.
type CreateAgentInput struct {
Name string `json:"name"`
Role string `json:"role"`
Model string `json:"model"`
Description string `json:"description"`
SystemPrompt string `json:"systemPrompt"`
Temperature float64 `json:"temperature"`
MaxTokens int `json:"maxTokens"`
AllowedTools []string `json:"allowedTools"`
IsSystem bool `json:"isSystem"`
IsOrchestrator bool `json:"isOrchestrator"`
ContainerImage string `json:"containerImage"`
}
type DB struct {
@@ -73,8 +94,7 @@ func (d *DB) Close() {
// GetOrchestratorConfig loads the agent with isOrchestrator=1 from DB.
func (d *DB) GetOrchestratorConfig() (*AgentConfig, error) {
row := d.conn.QueryRow(`
SELECT id, name, model, systemPrompt, allowedTools, temperature, maxTokens, isOrchestrator, isSystem, isActive,
COALESCE(serviceName,''), COALESCE(servicePort,0), COALESCE(containerImage,'goclaw-agent-worker:latest'), COALESCE(containerStatus,'stopped')
SELECT id, name, model, systemPrompt, allowedTools, temperature, maxTokens, isOrchestrator, isSystem, isActive
FROM agents
WHERE isOrchestrator = 1
LIMIT 1
@@ -85,8 +105,7 @@ func (d *DB) GetOrchestratorConfig() (*AgentConfig, error) {
// GetAgentByID loads a specific agent by ID.
func (d *DB) GetAgentByID(id int) (*AgentConfig, error) {
row := d.conn.QueryRow(`
SELECT id, name, model, systemPrompt, allowedTools, temperature, maxTokens, isOrchestrator, isSystem, isActive,
COALESCE(serviceName,''), COALESCE(servicePort,0), COALESCE(containerImage,'goclaw-agent-worker:latest'), COALESCE(containerStatus,'stopped')
SELECT id, name, model, systemPrompt, allowedTools, temperature, maxTokens, isOrchestrator, isSystem, isActive
FROM agents
WHERE id = ?
LIMIT 1
@@ -94,10 +113,13 @@ func (d *DB) GetAgentByID(id int) (*AgentConfig, error) {
return scanAgentConfig(row)
}
// ListAgents returns all active agents.
// ListAgents returns all agents with container status fields.
func (d *DB) ListAgents() ([]AgentRow, error) {
rows, err := d.conn.Query(`
SELECT id, name, role, model, COALESCE(description,''), isActive, isSystem, isOrchestrator
SELECT id, name, role, model,
COALESCE(description,''), isActive, isSystem, isOrchestrator,
COALESCE(serviceName,''), COALESCE(servicePort,0),
COALESCE(containerImage,''), COALESCE(containerStatus,'stopped')
FROM agents
ORDER BY isOrchestrator DESC, isSystem DESC, id ASC
`)
@@ -110,7 +132,11 @@ func (d *DB) ListAgents() ([]AgentRow, error) {
for rows.Next() {
var a AgentRow
var isActive, isSystem, isOrch int
if err := rows.Scan(&a.ID, &a.Name, &a.Role, &a.Model, &a.Description, &isActive, &isSystem, &isOrch); err != nil {
if err := rows.Scan(
&a.ID, &a.Name, &a.Role, &a.Model, &a.Description,
&isActive, &isSystem, &isOrch,
&a.ServiceName, &a.ServicePort, &a.ContainerImage, &a.ContainerStatus,
); err != nil {
continue
}
a.IsActive = isActive == 1
@@ -121,64 +147,423 @@ func (d *DB) ListAgents() ([]AgentRow, error) {
return agents, nil
}
// ─── Helpers ──────────────────────────────────────────────────────────────────
func scanAgentConfig(row *sql.Row) (*AgentConfig, error) {
var cfg AgentConfig
var systemPrompt sql.NullString
var allowedToolsJSON sql.NullString
var temperature sql.NullFloat64
var maxTokens sql.NullInt64
var isOrch, isSystem, isActive int
err := row.Scan(
&cfg.ID, &cfg.Name, &cfg.Model,
&systemPrompt, &allowedToolsJSON,
&temperature, &maxTokens,
&isOrch, &isSystem, &isActive,
&cfg.ServiceName, &cfg.ServicePort, &cfg.ContainerImage, &cfg.ContainerStatus,
// CreateAgent inserts a new agent into the DB and returns its ID.
func (d *DB) CreateAgent(in CreateAgentInput) (int, error) {
if d.conn == nil {
return 0, fmt.Errorf("DB not connected")
}
toolsJSON := "[]"
if len(in.AllowedTools) > 0 {
b, _ := json.Marshal(in.AllowedTools)
toolsJSON = string(b)
}
temp := in.Temperature
if temp == 0 {
temp = 0.7
}
maxTok := in.MaxTokens
if maxTok == 0 {
maxTok = 8192
}
img := in.ContainerImage
if img == "" {
img = "goclaw-agent-worker:latest"
}
res, err := d.conn.Exec(`
INSERT INTO agents
(name, role, model, description, systemPrompt, temperature, maxTokens,
allowedTools, isActive, isSystem, isOrchestrator,
containerImage, containerStatus, createdAt, updatedAt)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?, ?, 'stopped', NOW(), NOW())
`,
in.Name, in.Role, in.Model, in.Description, in.SystemPrompt,
temp, maxTok, toolsJSON,
boolToInt(in.IsSystem), boolToInt(in.IsOrchestrator),
img,
)
if err != nil {
return 0, fmt.Errorf("insert agent: %w", err)
}
id, _ := res.LastInsertId()
return int(id), nil
}
func boolToInt(b bool) int {
if b {
return 1
}
return 0
}
// DeleteAgent removes an agent record by ID (only non-system agents).
func (d *DB) DeleteAgent(id int) error {
if d.conn == nil {
return fmt.Errorf("DB not connected")
}
res, err := d.conn.Exec(`DELETE FROM agents WHERE id = ? AND isSystem = 0`, id)
if err != nil {
return err
}
n, _ := res.RowsAffected()
if n == 0 {
return fmt.Errorf("agent %d not found or is a system agent", id)
}
return nil
}
// AssignServicePort finds the lowest free port in range [start, start+maxAgents).
// It reads all currently used ports from DB.
func (d *DB) AssignServicePort(start, maxAgents int) (int, error) {
if d.conn == nil {
return start, nil // offline — just return start
}
rows, err := d.conn.Query(`SELECT COALESCE(servicePort,0) FROM agents WHERE servicePort > 0`)
if err != nil {
return start, nil
}
defer rows.Close()
used := map[int]bool{}
for rows.Next() {
var p int
if rows.Scan(&p) == nil && p > 0 {
used[p] = true
}
}
for port := start; port < start+maxAgents; port++ {
if !used[port] {
return port, nil
}
}
return 0, fmt.Errorf("no free port in range %d-%d", start, start+maxAgents)
}
// ─── LLM Provider ─────────────────────────────────────────────────────────────
// ProviderRow holds the active LLM provider config from DB.
type ProviderRow struct {
ID int
Name string
BaseURL string
APIKey string // decrypted (Node.js encrypts, Go just reads raw for now)
}
// GetActiveProvider returns the active LLM provider from the llmProviders table.
// Note: The API key is stored AES-256-GCM encrypted by the Node.js server.
// The Go gateway reads the raw encrypted bytes but cannot decrypt them (no shared key in Go).
// The proper flow: Node.js decrypts the key and passes it via /api/providers/reload.
// For now, GetActiveProvider returns the stored encrypted bytes as-is (not useful for direct use).
// Use UpdateCredentials on the LLM client instead.
func (d *DB) GetActiveProvider() (*ProviderRow, error) {
var p ProviderRow
var apiKeyEncrypted sql.NullString
row := d.conn.QueryRow(`
SELECT id, name, baseUrl, COALESCE(apiKeyEncrypted, '')
FROM llmProviders
WHERE isActive = 1
LIMIT 1
`)
err := row.Scan(&p.ID, &p.Name, &p.BaseURL, &apiKeyEncrypted)
if err != nil {
return nil, err
}
cfg.SystemPrompt = systemPrompt.String
cfg.Temperature = temperature.Float64
if cfg.Temperature == 0 {
cfg.Temperature = 0.5
}
cfg.MaxTokens = int(maxTokens.Int64)
if cfg.MaxTokens == 0 {
cfg.MaxTokens = 8192
}
cfg.IsOrchestrator = isOrch == 1
cfg.IsSystem = isSystem == 1
cfg.IsActive = isActive == 1
if allowedToolsJSON.Valid && allowedToolsJSON.String != "" && allowedToolsJSON.String != "null" {
_ = json.Unmarshal([]byte(allowedToolsJSON.String), &cfg.AllowedTools)
}
return &cfg, nil
// We cannot decrypt the key in Go (different crypto impl from Node.js)
// Return empty key — the LLM client will use its env-configured key
p.APIKey = ""
return &p, nil
}
// ─── Agent Container Fields ───────────────────────────────────────────────────
// These methods support the agent-worker container architecture where each
// agent runs as an autonomous Docker Swarm service.
// ─── Chat Sessions & Events ───────────────────────────────────────────────────
// UpdateContainerStatus updates the container lifecycle state of an agent.
func (d *DB) UpdateContainerStatus(agentID int, status, serviceName string, servicePort int) error {
// ChatSessionRow holds one persistent chat session.
type ChatSessionRow struct {
ID int `json:"id"`
SessionID string `json:"sessionId"`
AgentID int `json:"agentId"`
Status string `json:"status"` // running | done | error
UserMessage string `json:"userMessage"`
FinalResponse string `json:"finalResponse"`
Model string `json:"model"`
TotalTokens int `json:"totalTokens"`
ProcessingTimeMs int64 `json:"processingTimeMs"`
ErrorMessage string `json:"errorMessage"`
CreatedAt string `json:"createdAt"`
UpdatedAt string `json:"updatedAt"`
}
// ChatEventRow holds one event inside a session.
type ChatEventRow struct {
ID int `json:"id"`
SessionID string `json:"sessionId"`
Seq int `json:"seq"`
EventType string `json:"eventType"` // thinking | tool_call | delta | done | error
Content string `json:"content"`
ToolName string `json:"toolName"`
ToolArgs string `json:"toolArgs"` // JSON string
ToolResult string `json:"toolResult"`
ToolSuccess bool `json:"toolSuccess"`
DurationMs int `json:"durationMs"`
Model string `json:"model"`
UsageJSON string `json:"usageJson"` // JSON string
ErrorMsg string `json:"errorMsg"`
CreatedAt string `json:"createdAt"`
}
// CreateSession inserts a new running session and returns its row.
func (d *DB) CreateSession(sessionID, userMessage string, agentID int) error {
if d.conn == nil {
return fmt.Errorf("DB not connected")
}
_, err := d.conn.Exec(`
INSERT INTO chatSessions (sessionId, agentId, status, userMessage)
VALUES (?, ?, 'running', ?)
`, sessionID, agentID, truncate(userMessage, 65535))
return err
}
// AppendEvent inserts a new event row for a session.
// seq is auto-calculated as MAX(seq)+1 for the session.
func (d *DB) AppendEvent(e ChatEventRow) error {
if d.conn == nil {
return nil
}
toolArgs := e.ToolArgs
if toolArgs == "" {
toolArgs = "null"
}
usageJSON := e.UsageJSON
if usageJSON == "" {
usageJSON = "null"
}
var toolSuccessVal interface{}
if e.EventType == "tool_call" {
if e.ToolSuccess {
toolSuccessVal = 1
} else {
toolSuccessVal = 0
}
}
_, err := d.conn.Exec(`
UPDATE agents
SET containerStatus = ?, serviceName = ?, servicePort = ?, updatedAt = NOW()
WHERE id = ?
`, status, serviceName, servicePort, agentID)
INSERT INTO chatEvents
(sessionId, seq, eventType, content, toolName, toolArgs,
toolResult, toolSuccess, durationMs, model, usageJson, errorMsg)
SELECT ?, COALESCE(MAX(seq),0)+1, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?
FROM chatEvents WHERE sessionId = ?
`,
e.SessionID, e.EventType,
nullStr(e.Content), nullStr(e.ToolName), rawJSON(toolArgs),
nullStr(e.ToolResult), toolSuccessVal, nullInt(e.DurationMs),
nullStr(e.Model), rawJSON(usageJSON), nullStr(e.ErrorMsg),
e.SessionID,
)
if err != nil {
log.Printf("[DB] AppendEvent error: %v", err)
}
return err
}
// MarkSessionDone updates a session to done/error status.
func (d *DB) MarkSessionDone(sessionID, status, finalResponse, model, errorMessage string, totalTokens int, processingTimeMs int64) {
if d.conn == nil {
return
}
_, err := d.conn.Exec(`
UPDATE chatSessions
SET status=?, finalResponse=?, model=?, totalTokens=?,
processingTimeMs=?, errorMessage=?
WHERE sessionId=?
`, status,
truncate(finalResponse, 65535),
model,
totalTokens,
processingTimeMs,
truncate(errorMessage, 65535),
sessionID,
)
if err != nil {
log.Printf("[DB] MarkSessionDone error: %v", err)
}
}
// GetSession returns a single session by its string ID.
func (d *DB) GetSession(sessionID string) (*ChatSessionRow, error) {
if d.conn == nil {
return nil, fmt.Errorf("DB not connected")
}
row := d.conn.QueryRow(`
SELECT id, sessionId, agentId, status,
COALESCE(userMessage,''),
COALESCE(finalResponse,''),
COALESCE(model,''),
COALESCE(totalTokens,0),
COALESCE(processingTimeMs,0),
COALESCE(errorMessage,''),
createdAt, updatedAt
FROM chatSessions WHERE sessionId=? LIMIT 1
`, sessionID)
var s ChatSessionRow
err := row.Scan(&s.ID, &s.SessionID, &s.AgentID, &s.Status,
&s.UserMessage, &s.FinalResponse, &s.Model,
&s.TotalTokens, &s.ProcessingTimeMs, &s.ErrorMessage,
&s.CreatedAt, &s.UpdatedAt)
if err != nil {
return nil, err
}
return &s, nil
}
// GetEvents returns all events for a session with seq > afterSeq (for incremental polling).
func (d *DB) GetEvents(sessionID string, afterSeq int) ([]ChatEventRow, error) {
if d.conn == nil {
return nil, fmt.Errorf("DB not connected")
}
rows, err := d.conn.Query(`
SELECT id, sessionId, seq, eventType,
COALESCE(content,''), COALESCE(toolName,''),
COALESCE(CAST(toolArgs AS CHAR),'null'),
COALESCE(toolResult,''),
COALESCE(toolSuccess,0),
COALESCE(durationMs,0),
COALESCE(model,''),
COALESCE(CAST(usageJson AS CHAR),'null'),
COALESCE(errorMsg,''),
createdAt
FROM chatEvents
WHERE sessionId=? AND seq > ?
ORDER BY seq ASC
`, sessionID, afterSeq)
if err != nil {
return nil, err
}
defer rows.Close()
var result []ChatEventRow
for rows.Next() {
var e ChatEventRow
var toolSuccess int
if err := rows.Scan(
&e.ID, &e.SessionID, &e.Seq, &e.EventType,
&e.Content, &e.ToolName, &e.ToolArgs,
&e.ToolResult, &toolSuccess, &e.DurationMs,
&e.Model, &e.UsageJSON, &e.ErrorMsg, &e.CreatedAt,
); err != nil {
continue
}
e.ToolSuccess = toolSuccess == 1
result = append(result, e)
}
return result, nil
}
// GetRecentSessions returns the N most recent sessions.
func (d *DB) GetRecentSessions(limit int) ([]ChatSessionRow, error) {
if d.conn == nil {
return nil, fmt.Errorf("DB not connected")
}
rows, err := d.conn.Query(`
SELECT id, sessionId, agentId, status,
COALESCE(userMessage,''),
COALESCE(finalResponse,''),
COALESCE(model,''),
COALESCE(totalTokens,0),
COALESCE(processingTimeMs,0),
COALESCE(errorMessage,''),
createdAt, updatedAt
FROM chatSessions ORDER BY id DESC LIMIT ?
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var result []ChatSessionRow
for rows.Next() {
var s ChatSessionRow
if err := rows.Scan(&s.ID, &s.SessionID, &s.AgentID, &s.Status,
&s.UserMessage, &s.FinalResponse, &s.Model,
&s.TotalTokens, &s.ProcessingTimeMs, &s.ErrorMessage,
&s.CreatedAt, &s.UpdatedAt); err != nil {
continue
}
result = append(result, s)
}
return result, nil
}
// helper — nil for empty strings
func nullStr(s string) interface{} {
if s == "" {
return nil
}
return s
}
// helper — nil for zero int
func nullInt(n int) interface{} {
if n == 0 {
return nil
}
return n
}
// rawJSON wraps a JSON string so it's passed as-is to MySQL (not double-encoded)
type rawJSON string
func (r rawJSON) Value() (driver.Value, error) {
if r == "null" || r == "" {
return nil, nil
}
return string(r), nil
}
// ─── Metrics & History ────────────────────────────────────────────────────────
// MetricInput holds data for a single orchestrator request metric.
type MetricInput struct {
AgentID int
RequestID string
UserMessage string
AgentResponse string
InputTokens int
OutputTokens int
TotalTokens int
ProcessingTimeMs int64
Status string // "success" | "error" | "timeout"
ErrorMessage string
ToolsCalled []string
Model string
}
// SaveMetric inserts a row into the agentMetrics table.
// Non-fatal — logs on error but does not return one.
func (d *DB) SaveMetric(m MetricInput) {
if d.conn == nil {
return
}
toolsJSON, _ := json.Marshal(m.ToolsCalled)
_, err := d.conn.Exec(`
INSERT INTO agentMetrics
(agentId, requestId, userMessage, agentResponse,
inputTokens, outputTokens, totalTokens,
processingTimeMs, status, errorMessage, toolsCalled, model)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`,
m.AgentID,
m.RequestID,
truncate(m.UserMessage, 65535),
truncate(m.AgentResponse, 65535),
m.InputTokens, m.OutputTokens, m.TotalTokens,
m.ProcessingTimeMs,
m.Status,
m.ErrorMessage,
string(toolsJSON),
m.Model,
)
if err != nil {
log.Printf("[DB] SaveMetric error: %v", err)
}
}
// HistoryInput holds data for one conversation entry.
type HistoryInput struct {
AgentID int
@@ -188,14 +573,6 @@ type HistoryInput struct {
Status string // "success" | "error" | "pending"
}
// HistoryRow is a single entry from agentHistory for sliding window memory.
type HistoryRow struct {
ID int `json:"id"`
UserMessage string `json:"userMessage"`
AgentResponse string `json:"agentResponse"`
ConvID string `json:"conversationId"`
}
// SaveHistory inserts a row into the agentHistory table.
// Non-fatal — logs on error but does not return one.
func (d *DB) SaveHistory(h HistoryInput) {
@@ -223,39 +600,7 @@ func (d *DB) SaveHistory(h HistoryInput) {
}
}
// GetAgentHistory returns the last N conversation turns for an agent, oldest first.
func (d *DB) GetAgentHistory(agentID, limit int) ([]HistoryRow, error) {
if d.conn == nil {
return nil, nil
}
rows, err := d.conn.Query(`
SELECT id, userMessage, COALESCE(agentResponse,''), COALESCE(conversationId,'')
FROM agentHistory
WHERE agentId = ?
ORDER BY id DESC
LIMIT ?
`, agentID, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var result []HistoryRow
for rows.Next() {
var h HistoryRow
if err := rows.Scan(&h.ID, &h.UserMessage, &h.AgentResponse, &h.ConvID); err != nil {
continue
}
result = append(result, h)
}
// Reverse so oldest is first (for LLM context ordering)
for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
result[i], result[j] = result[j], result[i]
}
return result, nil
}
// truncate caps a string to maxLen bytes.
// truncate caps a string to maxLen bytes (not runes — fast path for DB limits).
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
@@ -263,6 +608,154 @@ func truncate(s string, maxLen int) string {
return s[:maxLen]
}
// ─── Swarm Node Persistence ───────────────────────────────────────────────────
// SwarmNodeInput is the data shape that handlers pass to UpsertSwarmNodes.
// It matches the JSON shape from handler's NodeOut struct so we can reuse it.
type SwarmNodeInput struct {
ID string `json:"id"`
Hostname string `json:"hostname"`
Role string `json:"role"`
State string `json:"state"`
Availability string `json:"availability"`
IP string `json:"ip"`
CPUCores int `json:"cpuCores"`
MemTotalMB int64 `json:"memTotalMB"`
DockerVersion string `json:"dockerVersion"`
IsLeader bool `json:"isLeader"`
ManagerAddr string `json:"managerAddr"`
Labels map[string]string `json:"labels"`
}
// UpsertSwarmNodes inserts or updates swarm node records in the swarmNodes table.
// Called asynchronously from the SwarmNodes handler — never blocks the response.
func (d *DB) UpsertSwarmNodes(nodes interface{}) {
if d.conn == nil {
return
}
// We accept interface{} to avoid circular import; use json round-trip to parse.
b, err := json.Marshal(nodes)
if err != nil {
return
}
var list []SwarmNodeInput
if err := json.Unmarshal(b, &list); err != nil {
return
}
for _, n := range list {
labelsJSON, _ := json.Marshal(n.Labels)
isLeader := 0
if n.IsLeader {
isLeader = 1
}
isManager := 0
if n.Role == "manager" {
isManager = 1
}
state := n.State
if state != "ready" && state != "down" && state != "disconnected" {
state = "ready"
}
avail := n.Availability
if avail != "active" && avail != "pause" && avail != "drain" {
avail = "active"
}
_, err := d.conn.Exec(`
INSERT INTO swarmNodes
(nodeId, hostname, role, state, availability, advertiseAddr,
labels, engineVersion, cpuCores, memTotalMB, isManager, isLeader)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
hostname=VALUES(hostname), role=VALUES(role),
state=VALUES(state), availability=VALUES(availability),
advertiseAddr=VALUES(advertiseAddr),
labels=VALUES(labels), engineVersion=VALUES(engineVersion),
cpuCores=VALUES(cpuCores), memTotalMB=VALUES(memTotalMB),
isManager=VALUES(isManager), isLeader=VALUES(isLeader),
lastSeenAt=CURRENT_TIMESTAMP
`,
n.ID, n.Hostname, n.Role, state, avail, n.IP,
string(labelsJSON), n.DockerVersion,
n.CPUCores, n.MemTotalMB, isManager, isLeader,
)
if err != nil {
log.Printf("[DB] UpsertSwarmNodes error for node %s: %v", n.ID, err)
}
}
}
// UpsertSwarmTokens stores the current swarm join tokens.
func (d *DB) UpsertSwarmTokens(workerToken, managerToken, managerAddr string) {
if d.conn == nil {
return
}
_, err := d.conn.Exec(`
INSERT INTO swarmTokens (managerToken, workerToken, managerAddr)
VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE
managerToken=VALUES(managerToken),
workerToken=VALUES(workerToken),
managerAddr=VALUES(managerAddr)
`, managerToken, workerToken, managerAddr)
if err != nil {
log.Printf("[DB] UpsertSwarmTokens error: %v", err)
}
}
// GetSwarmTokens retrieves the stored join tokens.
func (d *DB) GetSwarmTokens() (worker, manager, addr string, err error) {
if d.conn == nil {
err = fmt.Errorf("DB not connected")
return
}
row := d.conn.QueryRow(`
SELECT COALESCE(workerToken,''), COALESCE(managerToken,''), COALESCE(managerAddr,'')
FROM swarmTokens ORDER BY id DESC LIMIT 1
`)
err = row.Scan(&worker, &manager, &addr)
return
}
// ─── Helpers ──────────────────────────────────────────────────────────────────
func scanAgentConfig(row *sql.Row) (*AgentConfig, error) {
var cfg AgentConfig
var systemPrompt sql.NullString
var allowedToolsJSON sql.NullString
var temperature sql.NullFloat64
var maxTokens sql.NullInt64
var isOrch, isSystem, isActive int
err := row.Scan(
&cfg.ID, &cfg.Name, &cfg.Model,
&systemPrompt, &allowedToolsJSON,
&temperature, &maxTokens,
&isOrch, &isSystem, &isActive,
)
if err != nil {
return nil, err
}
cfg.SystemPrompt = systemPrompt.String
cfg.Temperature = temperature.Float64
if cfg.Temperature == 0 {
cfg.Temperature = 0.5
}
cfg.MaxTokens = int(maxTokens.Int64)
if cfg.MaxTokens == 0 {
cfg.MaxTokens = 8192
}
cfg.IsOrchestrator = isOrch == 1
cfg.IsSystem = isSystem == 1
cfg.IsActive = isActive == 1
if allowedToolsJSON.Valid && allowedToolsJSON.String != "" && allowedToolsJSON.String != "null" {
_ = json.Unmarshal([]byte(allowedToolsJSON.String), &cfg.AllowedTools)
}
return &cfg, nil
}
// normalizeDSN converts mysql://user:pass@host:port/db to user:pass@tcp(host:port)/db
func normalizeDSN(dsn string) string {
if !strings.HasPrefix(dsn, "mysql://") {
@@ -304,3 +797,60 @@ func normalizeDSN(dsn string) string {
}
return fmt.Sprintf("%s@tcp(%s)%s?parseTime=true&charset=utf8mb4%s", userInfo, hostPort, dbName, tlsParam)
}
// ─── Agent Container Fields ───────────────────────────────────────────────────
// These methods support the agent-worker container architecture where each
// agent runs as an autonomous Docker Swarm service.
// UpdateContainerStatus updates the container lifecycle state of an agent.
func (d *DB) UpdateContainerStatus(agentID int, status, serviceName string, servicePort int) error {
if d.conn == nil {
return nil
}
_, err := d.conn.Exec(`
UPDATE agents
SET containerStatus = ?, serviceName = ?, servicePort = ?, updatedAt = NOW()
WHERE id = ?
`, status, serviceName, servicePort, agentID)
return err
}
// HistoryRow is a single entry from agentHistory for sliding window memory.
type HistoryRow struct {
ID int `json:"id"`
UserMessage string `json:"userMessage"`
AgentResponse string `json:"agentResponse"`
ConvID string `json:"conversationId"`
}
// GetAgentHistory returns the last N conversation turns for an agent, oldest first.
func (d *DB) GetAgentHistory(agentID, limit int) ([]HistoryRow, error) {
if d.conn == nil {
return nil, nil
}
rows, err := d.conn.Query(`
SELECT id, userMessage, COALESCE(agentResponse,''), COALESCE(conversationId,'')
FROM agentHistory
WHERE agentId = ?
ORDER BY id DESC
LIMIT ?
`, agentID, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var result []HistoryRow
for rows.Next() {
var h HistoryRow
if err := rows.Scan(&h.ID, &h.UserMessage, &h.AgentResponse, &h.ConvID); err != nil {
continue
}
result = append(result, h)
}
// Reverse so oldest is first (for LLM context ordering)
for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
result[i], result[j] = result[j], result[i]
}
return result, nil
}

View File

@@ -1,22 +1,25 @@
package docker
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os/exec"
"strings"
"time"
)
// DockerClient communicates with the Docker daemon via Unix socket or TCP.
// DockerClient communicates with the Docker daemon via Unix socket.
type DockerClient struct {
httpClient *http.Client
baseURL string
}
// NewDockerClient creates a client that talks to /var/run/docker.sock.
// NewDockerClient creates a client talking to /var/run/docker.sock.
func NewDockerClient() *DockerClient {
transport := &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
@@ -24,11 +27,13 @@ func NewDockerClient() *DockerClient {
},
}
return &DockerClient{
httpClient: &http.Client{Transport: transport, Timeout: 10 * time.Second},
baseURL: "http://localhost", // host is ignored for unix socket
httpClient: &http.Client{Transport: transport, Timeout: 30 * time.Second},
baseURL: "http://localhost",
}
}
// ─── HTTP helpers ─────────────────────────────────────────────────────────────
func (c *DockerClient) get(path string, out interface{}) error {
resp, err := c.httpClient.Get(c.baseURL + path)
if err != nil {
@@ -42,16 +47,64 @@ func (c *DockerClient) get(path string, out interface{}) error {
return json.Unmarshal(body, out)
}
// ---- Types ----------------------------------------------------------------
func (c *DockerClient) post(path string, payload interface{}, out interface{}) error {
b, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := c.httpClient.Post(c.baseURL+path, "application/json", bytes.NewReader(b))
if err != nil {
return fmt.Errorf("docker POST %s: %w", path, err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
return fmt.Errorf("docker POST %s: status %d: %s", path, resp.StatusCode, string(body))
}
if out != nil && len(body) > 0 {
return json.Unmarshal(body, out)
}
return nil
}
func (c *DockerClient) postUpdate(path string, version int, payload interface{}) error {
b, err := json.Marshal(payload)
if err != nil {
return err
}
url := fmt.Sprintf("%s%s?version=%d", c.baseURL, path, version)
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(b))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("docker POST(update) %s: %w", path, err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
return fmt.Errorf("docker POST(update) %s: status %d: %s", path, resp.StatusCode, string(body))
}
return nil
}
// ─── Swarm Node Types ─────────────────────────────────────────────────────────
type SwarmNode struct {
ID string `json:"ID"`
ID string `json:"ID"`
Description NodeDescription `json:"Description"`
Status NodeStatus `json:"Status"`
Status NodeStatus `json:"Status"`
ManagerStatus *ManagerStatus `json:"ManagerStatus,omitempty"`
Spec NodeSpec `json:"Spec"`
UpdatedAt time.Time `json:"UpdatedAt"`
CreatedAt time.Time `json:"CreatedAt"`
Spec NodeSpec `json:"Spec"`
UpdatedAt time.Time `json:"UpdatedAt"`
CreatedAt time.Time `json:"CreatedAt"`
Version VersionInfo `json:"Version"`
}
type VersionInfo struct {
Index int `json:"Index"`
}
type NodeDescription struct {
@@ -82,17 +135,155 @@ type NodeStatus struct {
}
type ManagerStatus struct {
Addr string `json:"Addr"`
Leader bool `json:"Leader"`
Reachability string `json:"Reachability"`
Addr string `json:"Addr"`
Leader bool `json:"Leader"`
Reachability string `json:"Reachability"`
}
type NodeSpec struct {
Role string `json:"Role"`
Availability string `json:"Availability"`
Role string `json:"Role"`
Availability string `json:"Availability"`
Labels map[string]string `json:"Labels"`
}
// ─── Swarm Service Types ──────────────────────────────────────────────────────
type SwarmService struct {
ID string `json:"ID"`
Spec ServiceSpec `json:"Spec"`
ServiceStatus *ServiceStatus `json:"ServiceStatus,omitempty"`
UpdatedAt time.Time `json:"UpdatedAt"`
CreatedAt time.Time `json:"CreatedAt"`
Version VersionInfo `json:"Version"`
}
type ServiceSpec struct {
Name string `json:"Name"`
Mode ServiceMode `json:"Mode"`
TaskTemplate TaskTemplate `json:"TaskTemplate"`
EndpointSpec *EndpointSpec `json:"EndpointSpec,omitempty"`
Labels map[string]string `json:"Labels"`
Networks []NetworkAttachment `json:"Networks,omitempty"`
}
type NetworkAttachment struct {
Target string `json:"Target"`
Aliases []string `json:"Aliases,omitempty"`
}
type ServiceMode struct {
Replicated *ReplicatedService `json:"Replicated,omitempty"`
Global *struct{} `json:"Global,omitempty"`
}
type ReplicatedService struct {
Replicas int `json:"Replicas"`
}
type TaskTemplate struct {
ContainerSpec ContainerSpec `json:"ContainerSpec"`
Resources *TaskResources `json:"Resources,omitempty"`
Placement *Placement `json:"Placement,omitempty"`
}
type ContainerSpec struct {
Image string `json:"Image"`
Env []string `json:"Env,omitempty"`
Labels map[string]string `json:"Labels,omitempty"`
}
type TaskResources struct {
Limits *ResourceSpec `json:"Limits,omitempty"`
Reservations *ResourceSpec `json:"Reservations,omitempty"`
}
type ResourceSpec struct {
NanoCPUs int64 `json:"NanoCPUs,omitempty"`
MemoryBytes int64 `json:"MemoryBytes,omitempty"`
}
type Placement struct {
Constraints []string `json:"Constraints,omitempty"`
}
type EndpointSpec struct {
Ports []PortConfig `json:"Ports,omitempty"`
}
type PortConfig struct {
Protocol string `json:"Protocol"`
TargetPort int `json:"TargetPort"`
PublishedPort int `json:"PublishedPort"`
PublishMode string `json:"PublishMode"`
}
type ServiceStatus struct {
RunningTasks int `json:"RunningTasks"`
DesiredTasks int `json:"DesiredTasks"`
CompletedTasks int `json:"CompletedTasks"`
}
// ─── Swarm Task Types ─────────────────────────────────────────────────────────
type SwarmTask struct {
ID string `json:"ID"`
ServiceID string `json:"ServiceID"`
NodeID string `json:"NodeID"`
Spec TaskSpec `json:"Spec"`
Status TaskStatus `json:"Status"`
Slot int `json:"Slot"`
UpdatedAt time.Time `json:"UpdatedAt"`
CreatedAt time.Time `json:"CreatedAt"`
}
type TaskSpec struct {
ContainerSpec ContainerSpec `json:"ContainerSpec"`
}
type TaskStatus struct {
Timestamp time.Time `json:"Timestamp"`
State string `json:"State"`
Message string `json:"Message"`
ContainerStatus *ContainerTaskStatus `json:"ContainerStatus,omitempty"`
}
type ContainerTaskStatus struct {
ContainerID string `json:"ContainerID"`
PID int `json:"PID"`
}
// ─── Swarm Info / Tokens ──────────────────────────────────────────────────────
type DockerInfo struct {
Swarm SwarmInfo `json:"Swarm"`
}
type SwarmInfo struct {
NodeID string `json:"NodeID"`
LocalNodeState string `json:"LocalNodeState"`
ControlAvailable bool `json:"ControlAvailable"`
Managers int `json:"Managers"`
Nodes int `json:"Nodes"`
RemoteManagers []RemoteManager `json:"RemoteManagers"`
}
type RemoteManager struct {
NodeID string `json:"NodeID"`
Addr string `json:"Addr"`
}
type SwarmSpec struct {
JoinTokens JoinTokens `json:"JoinTokens"`
ID string `json:"ID"`
}
type JoinTokens struct {
Worker string `json:"Worker"`
Manager string `json:"Manager"`
}
// ─── Container types ──────────────────────────────────────────────────────────
type Container struct {
ID string `json:"Id"`
Names []string `json:"Names"`
@@ -109,9 +300,9 @@ type ContainerStats struct {
}
type CPUStats struct {
CPUUsage CPUUsage `json:"cpu_usage"`
SystemCPUUsage int64 `json:"system_cpu_usage"`
OnlineCPUs int `json:"online_cpus"`
CPUUsage CPUUsage `json:"cpu_usage"`
SystemCPUUsage int64 `json:"system_cpu_usage"`
OnlineCPUs int `json:"online_cpus"`
}
type CPUUsage struct {
@@ -120,27 +311,14 @@ type CPUUsage struct {
}
type MemoryStats struct {
Usage int64 `json:"usage"`
MaxUsage int64 `json:"max_usage"`
Limit int64 `json:"limit"`
Usage int64 `json:"usage"`
MaxUsage int64 `json:"max_usage"`
Limit int64 `json:"limit"`
Stats map[string]int64 `json:"stats"`
}
type DockerInfo struct {
Swarm SwarmInfo `json:"Swarm"`
}
// ─── Methods: Swarm info ──────────────────────────────────────────────────────
type SwarmInfo struct {
NodeID string `json:"NodeID"`
LocalNodeState string `json:"LocalNodeState"`
ControlAvailable bool `json:"ControlAvailable"`
Managers int `json:"Managers"`
Nodes int `json:"Nodes"`
}
// ---- Methods ---------------------------------------------------------------
// IsSwarmActive checks if Docker Swarm is initialized.
func (c *DockerClient) IsSwarmActive() bool {
var info DockerInfo
if err := c.get("/v1.44/info", &info); err != nil {
@@ -149,7 +327,6 @@ func (c *DockerClient) IsSwarmActive() bool {
return info.Swarm.LocalNodeState == "active"
}
// GetSwarmInfo returns basic swarm info.
func (c *DockerClient) GetSwarmInfo() (*DockerInfo, error) {
var info DockerInfo
if err := c.get("/v1.44/info", &info); err != nil {
@@ -158,7 +335,27 @@ func (c *DockerClient) GetSwarmInfo() (*DockerInfo, error) {
return &info, nil
}
// ListNodes returns all Swarm nodes (requires manager node).
// GetJoinTokens returns the Swarm worker and manager join tokens.
// Requires this node to be a swarm manager.
func (c *DockerClient) GetJoinTokens() (*SwarmSpec, error) {
var spec SwarmSpec
if err := c.get("/v1.44/swarm", &spec); err != nil {
return nil, err
}
return &spec, nil
}
// GetManagerAddr returns the advertise address (IP:2377) for joining this swarm.
func (c *DockerClient) GetManagerAddr() string {
info, err := c.GetSwarmInfo()
if err != nil || len(info.Swarm.RemoteManagers) == 0 {
return ""
}
return info.Swarm.RemoteManagers[0].Addr
}
// ─── Methods: Nodes ───────────────────────────────────────────────────────────
func (c *DockerClient) ListNodes() ([]SwarmNode, error) {
var nodes []SwarmNode
if err := c.get("/v1.44/nodes", &nodes); err != nil {
@@ -167,7 +364,323 @@ func (c *DockerClient) ListNodes() ([]SwarmNode, error) {
return nodes, nil
}
// ListContainers returns all running containers on this host.
// UpdateNodeAvailability sets a node's availability (active|pause|drain).
func (c *DockerClient) UpdateNodeAvailability(nodeID, availability string) error {
// First get current node spec + version
var node SwarmNode
if err := c.get("/v1.44/nodes/"+nodeID, &node); err != nil {
return err
}
node.Spec.Availability = availability
return c.postUpdate("/v1.44/nodes/"+nodeID+"/update", node.Version.Index, node.Spec)
}
// AddNodeLabel adds a label to a swarm node.
func (c *DockerClient) AddNodeLabel(nodeID, key, value string) error {
var node SwarmNode
if err := c.get("/v1.44/nodes/"+nodeID, &node); err != nil {
return err
}
if node.Spec.Labels == nil {
node.Spec.Labels = map[string]string{}
}
node.Spec.Labels[key] = value
return c.postUpdate("/v1.44/nodes/"+nodeID+"/update", node.Version.Index, node.Spec)
}
// ─── Methods: Services ────────────────────────────────────────────────────────
// ListServices returns all swarm services, optionally filtered by label.
func (c *DockerClient) ListServices() ([]SwarmService, error) {
var services []SwarmService
// Include ServiceStatus so running/desired replicas are returned
if err := c.get("/v1.44/services?status=true", &services); err != nil {
return nil, err
}
return services, nil
}
// GetService returns a single service by ID or name.
func (c *DockerClient) GetService(idOrName string) (*SwarmService, error) {
var svc SwarmService
if err := c.get("/v1.44/services/"+idOrName+"?status=true", &svc); err != nil {
return nil, err
}
return &svc, nil
}
// ScaleService updates the replica count for a replicated service.
func (c *DockerClient) ScaleService(idOrName string, replicas int) error {
svc, err := c.GetService(idOrName)
if err != nil {
return err
}
if svc.Spec.Mode.Replicated == nil {
return fmt.Errorf("service %s is not in replicated mode", idOrName)
}
svc.Spec.Mode.Replicated.Replicas = replicas
return c.postUpdate(
"/v1.44/services/"+svc.ID+"/update",
svc.Version.Index,
svc.Spec,
)
}
// ListServiceTasks returns all tasks for a given service.
func (c *DockerClient) ListServiceTasks(serviceID string) ([]SwarmTask, error) {
var tasks []SwarmTask
filter := fmt.Sprintf(`{"service":["%s"]}`, serviceID)
path := "/v1.44/tasks?filters=" + urlEncode(filter)
if err := c.get(path, &tasks); err != nil {
return nil, err
}
return tasks, nil
}
// ListAllTasks returns all swarm tasks (across services).
func (c *DockerClient) ListAllTasks() ([]SwarmTask, error) {
var tasks []SwarmTask
if err := c.get("/v1.44/tasks", &tasks); err != nil {
return nil, err
}
return tasks, nil
}
// CreateAgentService deploys a new swarm service for an AI agent.
// image: container image, name: service name, replicas: initial count,
// env: environment variables, port: optional published port (0 = none).
// CreateAgentServiceOpts holds options for deploying an agent Swarm service.
type CreateAgentServiceOpts struct {
Name string
Image string
Replicas int
Env []string
Port int
Networks []string // overlay network names/IDs to attach
Labels map[string]string
Constraints []string // placement constraints, e.g. ["node.role==manager"]
}
func (c *DockerClient) CreateAgentService(name, image string, replicas int, env []string, port int) (*SwarmService, error) {
return c.CreateAgentServiceFull(CreateAgentServiceOpts{
Name: name,
Image: image,
Replicas: replicas,
Env: env,
Port: port,
})
}
func (c *DockerClient) CreateAgentServiceFull(opts CreateAgentServiceOpts) (*SwarmService, error) {
labels := map[string]string{
"goclaw.agent": "true",
"goclaw.name": opts.Name,
}
for k, v := range opts.Labels {
labels[k] = v
}
var placement *Placement
if len(opts.Constraints) > 0 {
placement = &Placement{Constraints: opts.Constraints}
}
spec := ServiceSpec{
Name: opts.Name,
Mode: ServiceMode{
Replicated: &ReplicatedService{Replicas: opts.Replicas},
},
TaskTemplate: TaskTemplate{
ContainerSpec: ContainerSpec{
Image: opts.Image,
Env: opts.Env,
},
Placement: placement,
},
Labels: labels,
}
if opts.Port > 0 {
spec.EndpointSpec = &EndpointSpec{
Ports: []PortConfig{
{
Protocol: "tcp",
TargetPort: opts.Port,
PublishedPort: opts.Port,
PublishMode: "host",
},
},
}
}
if len(opts.Networks) > 0 {
for _, net := range opts.Networks {
spec.Networks = append(spec.Networks, NetworkAttachment{
Target: net,
Aliases: []string{opts.Name},
})
}
}
var created struct {
ID string `json:"ID"`
}
if err := c.post("/v1.44/services/create", spec, &created); err != nil {
return nil, err
}
return c.GetService(created.ID)
}
// RemoveService removes a swarm service by ID or name.
func (c *DockerClient) RemoveService(idOrName string) error {
req, err := http.NewRequest(http.MethodDelete, c.baseURL+"/v1.44/services/"+urlEncode(idOrName), nil)
if err != nil {
return err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("docker DELETE service %s: %w", idOrName, err)
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("docker DELETE service %s: status %d: %s", idOrName, resp.StatusCode, string(body))
}
return nil
}
// GetServiceLastActivity returns the most recent task update time for a service.
// Used to determine whether a service is idle.
func (c *DockerClient) GetServiceLastActivity(serviceID string) (time.Time, error) {
tasks, err := c.ListServiceTasks(serviceID)
if err != nil {
return time.Time{}, err
}
var latest time.Time
for _, t := range tasks {
if t.UpdatedAt.After(latest) {
latest = t.UpdatedAt
}
}
return latest, nil
}
// ─── Methods: Containers ─────────────────────────────────────────────────────
// RunContainerOpts holds options for running a standalone container.
type RunContainerOpts struct {
Name string
Image string
Env []string
Networks []string // bridge/overlay networks to attach
Port int // host port (also used as container port)
Labels map[string]string
}
// RunContainer creates and starts a standalone Docker container (docker run equivalent).
// Returns the container ID on success.
func (c *DockerClient) RunContainer(opts RunContainerOpts) (string, error) {
labels := map[string]string{"goclaw.agent": "true"}
for k, v := range opts.Labels {
labels[k] = v
}
// Build port bindings: host port -> container port
exposedPorts := map[string]struct{}{}
portBindings := map[string][]map[string]string{}
if opts.Port > 0 {
key := fmt.Sprintf("%d/tcp", opts.Port)
exposedPorts[key] = struct{}{}
portBindings[key] = []map[string]string{{"HostPort": fmt.Sprintf("%d", opts.Port)}}
}
// Pick first network for creation; additional networks attached after
firstNetwork := ""
if len(opts.Networks) > 0 {
firstNetwork = opts.Networks[0]
}
body := map[string]any{
"Image": opts.Image,
"Env": opts.Env,
"Labels": labels,
"ExposedPorts": exposedPorts,
"HostConfig": map[string]any{
"PortBindings": portBindings,
"RestartPolicy": map[string]any{"Name": "unless-stopped"},
"NetworkMode": firstNetwork,
},
"NetworkingConfig": map[string]any{
"EndpointsConfig": map[string]any{
firstNetwork: map[string]any{
"Aliases": []string{opts.Name},
},
},
},
}
var created struct {
ID string `json:"Id"`
}
if err := c.post(fmt.Sprintf("/v1.44/containers/create?name=%s", urlEncode(opts.Name)), body, &created); err != nil {
return "", fmt.Errorf("create container %s: %w", opts.Name, err)
}
// Start the container
startURL := fmt.Sprintf("/v1.44/containers/%s/start", created.ID)
req, err := http.NewRequest(http.MethodPost, c.baseURL+startURL, nil)
if err != nil {
return created.ID, err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return created.ID, fmt.Errorf("start container: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
b, _ := io.ReadAll(resp.Body)
return created.ID, fmt.Errorf("start container HTTP %d: %s", resp.StatusCode, string(b))
}
// Attach additional networks
for i, net := range opts.Networks {
if i == 0 {
continue // already attached via NetworkMode
}
netBody := map[string]any{
"Container": created.ID,
"EndpointConfig": map[string]any{
"Aliases": []string{opts.Name},
},
}
_ = c.post(fmt.Sprintf("/v1.44/networks/%s/connect", urlEncode(net)), netBody, nil)
}
return created.ID, nil
}
// StopContainer stops and removes a standalone container by name or ID.
func (c *DockerClient) StopContainer(nameOrID string) error {
// Stop
stopURL := fmt.Sprintf("/v1.44/containers/%s/stop", urlEncode(nameOrID))
req, _ := http.NewRequest(http.MethodPost, c.baseURL+stopURL, nil)
resp, err := c.httpClient.Do(req)
if err == nil {
resp.Body.Close()
}
// Remove (force)
rmURL := fmt.Sprintf("/v1.44/containers/%s?force=true", urlEncode(nameOrID))
req, err = http.NewRequest(http.MethodDelete, c.baseURL+rmURL, nil)
if err != nil {
return err
}
resp, err = c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
b, _ := io.ReadAll(resp.Body)
return fmt.Errorf("remove container HTTP %d: %s", resp.StatusCode, string(b))
}
return nil
}
func (c *DockerClient) ListContainers() ([]Container, error) {
var containers []Container
if err := c.get("/v1.44/containers/json?all=false", &containers); err != nil {
@@ -176,7 +689,6 @@ func (c *DockerClient) ListContainers() ([]Container, error) {
return containers, nil
}
// GetContainerStats returns one-shot stats for a container (no streaming).
func (c *DockerClient) GetContainerStats(containerID string) (*ContainerStats, error) {
var stats ContainerStats
if err := c.get(fmt.Sprintf("/v1.44/containers/%s/stats?stream=false", containerID), &stats); err != nil {
@@ -185,7 +697,69 @@ func (c *DockerClient) GetContainerStats(containerID string) (*ContainerStats, e
return &stats, nil
}
// CalcCPUPercent computes CPU usage % from two consecutive stats snapshots.
// ─── Host Shell execution ─────────────────────────────────────────────────────
// The gateway runs inside a container but has /var/run/docker.sock mounted.
// We use `docker exec` against the host PID namespace via a privileged helper,
// OR simply run commands via the docker socket by exec-ing into the gateway
// container's own shell with nsenter to reach PID 1 on the host.
//
// Approach: use `nsenter -t 1 -m -u -i -n -p -- <cmd>` via the host PID namespace.
// This requires the container to run with --privileged or SYS_PTRACE capability
// and PID namespace sharing. We add that to docker-compose.yml.
//
// Alternative (safer): exec into host via SSH or a privileged sidecar.
// For now we use nsenter which works when pid:host and privileged: true.
// ExecOnHost runs a shell command on the host via nsenter into PID 1.
// Returns combined stdout+stderr.
func ExecOnHost(command string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Try nsenter (requires pid:host + SYS_ADMIN or privileged)
cmd := exec.CommandContext(ctx, "nsenter", "-t", "1", "-m", "-u", "-i", "-n", "-p", "--",
"sh", "-c", command)
var out bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
// If nsenter fails, fall back to running in container scope
cmd2 := exec.CommandContext(ctx, "sh", "-c", command)
var out2 bytes.Buffer
var stderr2 bytes.Buffer
cmd2.Stdout = &out2
cmd2.Stderr = &stderr2
if err2 := cmd2.Run(); err2 != nil {
combined := out2.String() + stderr2.String()
if combined == "" {
combined = err2.Error()
}
return combined, err2
}
return out2.String() + stderr2.String(), nil
}
return out.String() + stderr.String(), nil
}
// ExecDockerCLI runs `docker <args>` on the host by calling the docker socket.
// Since we have the socket mounted, we can exec docker commands directly
// using the docker CLI binary if available.
func ExecDockerCLI(args ...string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "docker", args...)
var out, stderr bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
return out.String() + stderr.String(), err
}
return out.String(), nil
}
// CalcCPUPercent computes CPU% from stats snapshot.
func CalcCPUPercent(stats *ContainerStats) float64 {
cpuDelta := float64(stats.CPUStats.CPUUsage.TotalUsage) - float64(stats.PreCPUStats.CPUUsage.TotalUsage)
systemDelta := float64(stats.CPUStats.SystemCPUUsage) - float64(stats.PreCPUStats.SystemCPUUsage)
@@ -198,3 +772,19 @@ func CalcCPUPercent(stats *ContainerStats) float64 {
}
return 0
}
// ─── Helpers ──────────────────────────────────────────────────────────────────
func urlEncode(s string) string {
var b strings.Builder
for _, r := range s {
switch {
case r >= 'A' && r <= 'Z', r >= 'a' && r <= 'z', r >= '0' && r <= '9',
r == '-', r == '_', r == '.', r == '~':
b.WriteRune(r)
default:
b.WriteString(fmt.Sprintf("%%%02X", r))
}
}
return b.String()
}

View File

@@ -2,6 +2,7 @@
package llm
import (
"bufio"
"bytes"
"context"
"encoding/json"
@@ -105,6 +106,13 @@ func NewClient(baseURL, apiKey string) *Client {
}
}
// UpdateCredentials updates the LLM client's base URL and API key at runtime.
// Called when the active provider is changed via the Settings UI.
func (c *Client) UpdateCredentials(baseURL, apiKey string) {
c.baseURL = strings.TrimRight(baseURL, "/")
c.apiKey = apiKey
}
func (c *Client) headers() map[string]string {
h := map[string]string{
"Content-Type": "application/json",
@@ -159,7 +167,86 @@ func (c *Client) ListModels(ctx context.Context) (*ModelsResponse, error) {
return &result, nil
}
// Chat sends a chat completion request (non-streaming).
// ChatStream sends a streaming chat completion request (SSE).
// It calls the callback for each chunk received.
func (c *Client) ChatStream(ctx context.Context, req ChatRequest, onChunk func(delta string, done bool)) error {
req.Stream = true
body, err := json.Marshal(req)
if err != nil {
return err
}
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost,
c.baseURL+"/chat/completions", bytes.NewReader(body))
if err != nil {
return err
}
for k, v := range c.headers() {
httpReq.Header.Set(k, v)
}
httpReq.Header.Set("Accept", "text/event-stream")
// Use a client without timeout for streaming
streamClient := &http.Client{Timeout: 0}
resp, err := streamClient.Do(httpReq)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("ollama stream API error (%d): %s", resp.StatusCode, string(respBody))
}
// Parse SSE stream
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
line := scanner.Text()
if !strings.HasPrefix(line, "data: ") {
continue
}
data := strings.TrimPrefix(line, "data: ")
if data == "[DONE]" {
onChunk("", true)
return nil
}
var chunk struct {
Choices []struct {
Delta struct {
Content string `json:"content"`
} `json:"delta"`
FinishReason *string `json:"finish_reason"`
} `json:"choices"`
}
if err := json.Unmarshal([]byte(data), &chunk); err != nil {
continue
}
if len(chunk.Choices) > 0 {
delta := chunk.Choices[0].Delta.Content
if delta != "" {
onChunk(delta, false)
}
if chunk.Choices[0].FinishReason != nil && *chunk.Choices[0].FinishReason == "stop" {
onChunk("", true)
return nil
}
}
}
if err := scanner.Err(); err != nil {
return err
}
onChunk("", true)
return nil
}
func (c *Client) Chat(ctx context.Context, req ChatRequest) (*ChatResponse, error) {
req.Stream = false

View File

@@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"log"
"strings"
"time"
"git.softuniq.eu/UniqAI/GoClaw/gateway/internal/db"
@@ -31,13 +32,15 @@ type ToolCallStep struct {
DurationMs int64 `json:"durationMs"`
}
// ChatResult is the response from the orchestrator chat.
type ChatResult struct {
Success bool `json:"success"`
Response string `json:"response"`
ToolCalls []ToolCallStep `json:"toolCalls"`
Model string `json:"model"`
Usage *llm.Usage `json:"usage,omitempty"`
Error string `json:"error,omitempty"`
Success bool `json:"success"`
Response string `json:"response"`
ToolCalls []ToolCallStep `json:"toolCalls"`
Model string `json:"model"`
ModelWarning string `json:"modelWarning,omitempty"`
Usage *llm.Usage `json:"usage,omitempty"`
Error string `json:"error,omitempty"`
}
// OrchestratorConfig is the runtime config loaded from DB or defaults.
@@ -51,6 +54,30 @@ type OrchestratorConfig struct {
MaxTokens int
}
// RetryPolicy controls how the orchestrator retries failed or empty LLM calls.
type RetryPolicy struct {
// MaxLLMRetries is the number of additional attempts after a failure.
// Total attempts = MaxLLMRetries + 1. Default: 3 (4 total).
MaxLLMRetries int
// InitialDelay before the first retry. Default: 2s.
InitialDelay time.Duration
// MaxDelay caps the exponential back-off. Default: 30s.
MaxDelay time.Duration
// RetryOnEmpty means an empty-content response is treated as a soft failure
// and triggers a retry. Default: true.
RetryOnEmpty bool
}
// defaultRetryPolicy returns the default retry policy.
func defaultRetryPolicy() RetryPolicy {
return RetryPolicy{
MaxLLMRetries: 3,
InitialDelay: 2 * time.Second,
MaxDelay: 30 * time.Second,
RetryOnEmpty: true,
}
}
// ─── Default System Prompt ────────────────────────────────────────────────────
const defaultSystemPrompt = `You are GoClaw Orchestrator — the main AI agent managing the GoClaw distributed AI system.
@@ -86,6 +113,7 @@ type Orchestrator struct {
executor *tools.Executor
database *db.DB
projectRoot string
retry RetryPolicy
}
func New(llmClient *llm.Client, database *db.DB, projectRoot string) *Orchestrator {
@@ -93,6 +121,7 @@ func New(llmClient *llm.Client, database *db.DB, projectRoot string) *Orchestrat
llmClient: llmClient,
database: database,
projectRoot: projectRoot,
retry: defaultRetryPolicy(),
}
// Inject agent list function to avoid circular dependency
o.executor = tools.NewExecutor(projectRoot, o.listAgentsFn)
@@ -101,6 +130,11 @@ func New(llmClient *llm.Client, database *db.DB, projectRoot string) *Orchestrat
return o
}
// SetRetryPolicy overrides the default retry policy.
func (o *Orchestrator) SetRetryPolicy(p RetryPolicy) {
o.retry = p
}
// GetConfig loads orchestrator config from DB, falls back to defaults.
func (o *Orchestrator) GetConfig() *OrchestratorConfig {
if o.database != nil {
@@ -131,25 +165,188 @@ func (o *Orchestrator) GetConfig() *OrchestratorConfig {
}
}
// Chat runs the full orchestration loop: LLM → tool calls → LLM → response.
func (o *Orchestrator) Chat(ctx context.Context, messages []Message, overrideModel string, maxIter int) ChatResult {
if maxIter <= 0 {
maxIter = 10
// resolveModel checks if the desired model is available via the LLM API.
// If not, it tries to fall back to the first available model.
// Returns the resolved model name and a warning if fallback was used.
func (o *Orchestrator) resolveModel(ctx context.Context, desired string) (model string, warning string) {
ctxShort, cancel := context.WithTimeout(ctx, 8*time.Second)
defer cancel()
models, err := o.llmClient.ListModels(ctxShort)
if err != nil || models == nil || len(models.Data) == 0 {
// Cannot verify — use desired model as-is
log.Printf("[Orchestrator] Cannot fetch model list: %v — using %q as-is", err, desired)
return desired, ""
}
// Check if desired model is available
for _, m := range models.Data {
if m.ID == desired {
return desired, "" // found — all good
}
}
// Desired model not in list — fall back to first available
fallback := models.Data[0].ID
warning = fmt.Sprintf("model %q not available — using %q instead", desired, fallback)
log.Printf("[Orchestrator] WARNING: %s", warning)
return fallback, warning
}
// ─── LLM call with retry ──────────────────────────────────────────────────────
// llmCallResult holds one attempt's outcome.
type llmCallResult struct {
resp *llm.ChatResponse
usedTools bool // whether the call was made with tools enabled
err error
attemptNum int
}
// callLLMWithRetry calls the LLM and retries on error or empty response.
// It also strips tools on the second attempt if the first fails with tools.
func (o *Orchestrator) callLLMWithRetry(
ctx context.Context,
req llm.ChatRequest,
model string,
onRetry func(attempt int, reason string), // optional event callback (may be nil)
) llmCallResult {
policy := o.retry
delay := policy.InitialDelay
maxAttempts := policy.MaxLLMRetries + 1
hasTools := len(req.Tools) > 0
for attempt := 1; attempt <= maxAttempts; attempt++ {
// On attempt > 1, always strip tools (avoid repeated tool-format errors)
useTools := hasTools && attempt == 1
r := req
if !useTools {
r.Tools = nil
r.ToolChoice = ""
}
resp, err := o.llmClient.Chat(ctx, r)
// ── Hard error (network, auth, etc.) ─────────────────────────
if err != nil {
reason := fmt.Sprintf("LLM error (attempt %d/%d): %v", attempt, maxAttempts, err)
log.Printf("[Orchestrator] %s", reason)
if attempt < maxAttempts {
if onRetry != nil {
onRetry(attempt, reason)
}
o.sleep(ctx, delay)
delay = min(delay*2, policy.MaxDelay)
continue
}
return llmCallResult{err: fmt.Errorf("LLM error after %d attempts (model: %s): %w", maxAttempts, model, err), attemptNum: attempt}
}
// ── Context cancelled ─────────────────────────────────────────
if ctx.Err() != nil {
return llmCallResult{err: ctx.Err(), attemptNum: attempt}
}
// ── Empty choices ─────────────────────────────────────────────
if len(resp.Choices) == 0 {
reason := fmt.Sprintf("empty choices (attempt %d/%d)", attempt, maxAttempts)
log.Printf("[Orchestrator] %s", reason)
if attempt < maxAttempts {
if onRetry != nil {
onRetry(attempt, reason)
}
o.sleep(ctx, delay)
delay = min(delay*2, policy.MaxDelay)
continue
}
return llmCallResult{resp: resp, usedTools: useTools, attemptNum: attempt}
}
content := strings.TrimSpace(resp.Choices[0].Message.Content)
finishReason := resp.Choices[0].FinishReason
// ── Empty content AND no tool calls — retry ───────────────────
if policy.RetryOnEmpty &&
content == "" &&
finishReason != "tool_calls" &&
len(resp.Choices[0].Message.ToolCalls) == 0 {
reason := fmt.Sprintf("empty response content (attempt %d/%d, finish_reason=%q)", attempt, maxAttempts, finishReason)
log.Printf("[Orchestrator] %s", reason)
if attempt < maxAttempts {
if onRetry != nil {
onRetry(attempt, reason)
}
o.sleep(ctx, delay)
delay = min(delay*2, policy.MaxDelay)
continue
}
// Exhausted retries — return what we have (even if empty)
log.Printf("[Orchestrator] All %d attempts exhausted — returning empty response", maxAttempts)
return llmCallResult{resp: resp, usedTools: useTools, attemptNum: attempt}
}
// ── Success ───────────────────────────────────────────────────
if attempt > 1 {
log.Printf("[Orchestrator] Succeeded on attempt %d/%d", attempt, maxAttempts)
}
return llmCallResult{resp: resp, usedTools: useTools, attemptNum: attempt}
}
// Should not be reached
return llmCallResult{err: fmt.Errorf("retry loop exited unexpectedly"), attemptNum: maxAttempts}
}
// sleep waits for d, returning early if ctx is cancelled.
func (o *Orchestrator) sleep(ctx context.Context, d time.Duration) {
select {
case <-ctx.Done():
case <-time.After(d):
}
}
// min returns the smaller of two durations.
func min(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}
// ─── Core loop (shared by Chat and ChatWithEvents) ────────────────────────────
type loopOptions struct {
messages []Message
overrideModel string
maxIter int
onToolCall func(ToolCallStep) // may be nil
onRetry func(attempt int, reason string) // may be nil
}
func (o *Orchestrator) runLoop(ctx context.Context, opts loopOptions) ChatResult {
if opts.maxIter <= 0 {
opts.maxIter = 10
}
cfg := o.GetConfig()
model := cfg.Model
if overrideModel != "" {
model = overrideModel
if opts.overrideModel != "" {
model = opts.overrideModel
}
log.Printf("[Orchestrator] Chat started: model=%s, messages=%d", model, len(messages))
// Validate model against LLM API — fall back if unavailable (prevents 401/404)
model, modelWarning := o.resolveModel(ctx, model)
log.Printf("[Orchestrator] Loop started: model=%s, messages=%d, maxIter=%d, maxRetries=%d",
model, len(opts.messages), opts.maxIter, o.retry.MaxLLMRetries)
// Build conversation
conv := []llm.Message{
{Role: "system", Content: cfg.SystemPrompt},
}
for _, m := range messages {
for _, m := range opts.messages {
conv = append(conv, llm.Message{Role: m.Role, Content: m.Content})
}
@@ -175,7 +372,7 @@ func (o *Orchestrator) Chat(ctx context.Context, messages []Message, overrideMod
var lastUsage *llm.Usage
var lastModel string
for iter := 0; iter < maxIter; iter++ {
for iter := 0; iter < opts.maxIter; iter++ {
req := llm.ChatRequest{
Model: model,
Messages: conv,
@@ -185,28 +382,22 @@ func (o *Orchestrator) Chat(ctx context.Context, messages []Message, overrideMod
ToolChoice: "auto",
}
resp, err := o.llmClient.Chat(ctx, req)
if err != nil {
// Fallback: try without tools
log.Printf("[Orchestrator] LLM error with tools: %v — retrying without tools", err)
req.Tools = nil
req.ToolChoice = ""
resp2, err2 := o.llmClient.Chat(ctx, req)
if err2 != nil {
return ChatResult{
Success: false,
Error: fmt.Sprintf("LLM error (model: %s): %v", model, err2),
}
// ── LLM call with retry ────────────────────────────────────
callRes := o.callLLMWithRetry(ctx, req, model, opts.onRetry)
if callRes.err != nil {
return ChatResult{
Success: false,
ToolCalls: toolCallSteps,
Model: model,
ModelWarning: modelWarning,
Error: callRes.err.Error(),
}
if len(resp2.Choices) > 0 {
finalResponse = resp2.Choices[0].Message.Content
lastUsage = resp2.Usage
lastModel = resp2.Model
}
break
}
resp := callRes.resp
if len(resp.Choices) == 0 {
log.Printf("[Orchestrator] No choices in response — stopping loop at iter %d", iter)
break
}
@@ -217,19 +408,17 @@ func (o *Orchestrator) Chat(ctx context.Context, messages []Message, overrideMod
lastModel = model
}
// Check if LLM wants to call tools
// ── Tool calls ─────────────────────────────────────────────
if choice.FinishReason == "tool_calls" && len(choice.Message.ToolCalls) > 0 {
// Add assistant message with tool calls to conversation
conv = append(conv, choice.Message)
// Execute each tool call
for _, tc := range choice.Message.ToolCalls {
toolName := tc.Function.Name
argsJSON := tc.Function.Arguments
log.Printf("[Orchestrator] Executing tool: %s args=%s", toolName, argsJSON)
start := time.Now()
result := o.executor.Execute(ctx, toolName, argsJSON)
step := ToolCallStep{
@@ -238,7 +427,6 @@ func (o *Orchestrator) Chat(ctx context.Context, messages []Message, overrideMod
DurationMs: time.Since(start).Milliseconds(),
}
// Parse args for display
var argsMap any
_ = json.Unmarshal([]byte(argsJSON), &argsMap)
step.Args = argsMap
@@ -255,7 +443,10 @@ func (o *Orchestrator) Chat(ctx context.Context, messages []Message, overrideMod
toolCallSteps = append(toolCallSteps, step)
// Add tool result to conversation
if opts.onToolCall != nil {
opts.onToolCall(step)
}
conv = append(conv, llm.Message{
Role: "tool",
Content: toolResultContent,
@@ -267,20 +458,70 @@ func (o *Orchestrator) Chat(ctx context.Context, messages []Message, overrideMod
continue
}
// LLM finished — extract final response
// ── Final response ─────────────────────────────────────────
finalResponse = choice.Message.Content
break
}
return ChatResult{
Success: true,
Response: finalResponse,
ToolCalls: toolCallSteps,
Model: lastModel,
Usage: lastUsage,
Success: true,
Response: finalResponse,
ToolCalls: toolCallSteps,
Model: lastModel,
ModelWarning: modelWarning,
Usage: lastUsage,
}
}
// ─── Public API ───────────────────────────────────────────────────────────────
// Chat runs the full orchestration loop: LLM → tool calls → LLM → response.
func (o *Orchestrator) Chat(ctx context.Context, messages []Message, overrideModel string, maxIter int) ChatResult {
return o.runLoop(ctx, loopOptions{
messages: messages,
overrideModel: overrideModel,
maxIter: maxIter,
})
}
// ChatWithEvents runs the full orchestration loop and calls callbacks for each
// tool execution and each retry attempt. Used for SSE streaming and DB event logging.
func (o *Orchestrator) ChatWithEvents(
ctx context.Context,
messages []Message,
overrideModel string,
maxIter int,
onToolCall func(ToolCallStep),
) ChatResult {
return o.runLoop(ctx, loopOptions{
messages: messages,
overrideModel: overrideModel,
maxIter: maxIter,
onToolCall: onToolCall,
})
}
// ChatWithEventsAndRetry is the full-featured variant that also reports retry
// attempts through onRetry so they can be streamed to the client.
func (o *Orchestrator) ChatWithEventsAndRetry(
ctx context.Context,
messages []Message,
overrideModel string,
maxIter int,
onToolCall func(ToolCallStep),
onRetry func(attempt int, reason string),
) ChatResult {
return o.runLoop(ctx, loopOptions{
messages: messages,
overrideModel: overrideModel,
maxIter: maxIter,
onToolCall: onToolCall,
onRetry: onRetry,
})
}
// ─── Helpers ──────────────────────────────────────────────────────────────────
// listAgentsFn is injected into the tool executor to list agents from DB.
func (o *Orchestrator) listAgentsFn() ([]map[string]any, error) {
if o.database == nil {

View File

@@ -13,6 +13,7 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
"git.softuniq.eu/UniqAI/GoClaw/gateway/internal/db"
@@ -155,15 +156,44 @@ func OrchestratorTools() []ToolDef {
{
Type: "function",
Function: FuncDef{
Name: "delegate_to_agent",
Description: "Delegate a task to a specialized agent (Browser Agent, Tool Builder, Agent Compiler).",
Name: "delegate_to_agent",
Description: "Delegate a task to a specific agent container via A2A protocol. " +
"The agent processes the task with its own LLM and tools. " +
"Use async=true for fire-and-forget with callback_url, or sync (default) to wait for result.",
Parameters: map[string]any{
"type": "object",
"properties": map[string]any{
"agentId": map[string]any{"type": "number", "description": "Agent ID to delegate to"},
"message": map[string]any{"type": "string", "description": "Task description for the agent"},
"agentId": map[string]any{"type": "number", "description": "Target agent ID"},
"task": map[string]any{"type": "string", "description": "Task description / prompt for the agent"},
"async": map[string]any{"type": "boolean", "description": "If true, returns task_id immediately; if false (default), waits for result"},
"callbackUrl": map[string]any{"type": "string", "description": "URL to POST result when async=true"},
"priority": map[string]any{"type": "number", "description": "Task priority 0-10 (default 5)"},
"timeoutSecs": map[string]any{"type": "number", "description": "Max seconds to wait (default 120)"},
},
"required": []string{"agentId", "message"},
"required": []string{"agentId", "task"},
"additionalProperties": false,
},
},
},
{
Type: "function",
Function: FuncDef{
Name: "fanout_agents",
Description: "Send the SAME task to MULTIPLE agents IN PARALLEL and collect all results. " +
"Useful when you need different specialists to work on the same problem simultaneously. " +
"Returns results from all agents as an array.",
Parameters: map[string]any{
"type": "object",
"properties": map[string]any{
"agentIds": map[string]any{
"type": "array",
"items": map[string]any{"type": "number"},
"description": "List of agent IDs to send the task to (max 10)",
},
"task": map[string]any{"type": "string", "description": "Task to send to all agents"},
"timeoutSecs": map[string]any{"type": "number", "description": "Max seconds per agent (default 60)"},
},
"required": []string{"agentIds", "task"},
"additionalProperties": false,
},
},
@@ -226,6 +256,8 @@ func (e *Executor) Execute(ctx context.Context, toolName string, argsJSON string
result, execErr = e.listAgents()
case "delegate_to_agent":
result, execErr = e.delegateToAgent(ctx, args)
case "fanout_agents":
result, execErr = e.fanoutAgents(ctx, args)
default:
return ToolResult{Success: false, Error: fmt.Sprintf("unknown tool: %s", toolName), DurationMs: ms(start)}
}
@@ -456,57 +488,100 @@ func (e *Executor) listAgents() (any, error) {
return map[string]any{"agents": agents, "count": len(agents)}, nil
}
// A2ATaskRequest is the standard agent-to-agent task message format (Phase C).
type A2ATaskRequest struct {
TaskID string `json:"task_id"`
FromAgentID int `json:"from_agent_id"`
Task string `json:"input"`
CallbackURL string `json:"callback_url,omitempty"`
Priority int `json:"priority"`
TimeoutSecs int `json:"timeout_secs"`
}
// delegateToAgent sends a task to an agent's container via A2A HTTP protocol.
// Resolves the agent's service address from DB, respects priority/timeout from args.
// Falls back with a clear message if agent is not deployed/running.
func (e *Executor) delegateToAgent(ctx context.Context, args map[string]any) (any, error) {
agentIDf, _ := args["agentId"].(float64)
agentID := int(agentIDf)
task, _ := args["task"].(string)
if task == "" {
task, _ = args["message"].(string) // backward compat
}
if task == "" {
return nil, fmt.Errorf("task (or message) is required")
return nil, fmt.Errorf("task is required")
}
callbackURL, _ := args["callbackUrl"].(string)
async, _ := args["async"].(bool)
priority := 5
if pf, ok := args["priority"].(float64); ok && pf > 0 {
priority = int(pf)
}
timeoutSecs := 120
if tf, ok := args["timeoutSecs"].(float64); ok && tf > 0 {
timeoutSecs = int(tf)
}
// Resolve agent container address from DB
if e.database != nil {
cfg, err := e.database.GetAgentByID(agentID)
if err == nil && cfg != nil && cfg.ServicePort > 0 && cfg.ContainerStatus == "running" {
// Agent is deployed — call its container via overlay DNS
// Docker Swarm DNS: service name resolves inside overlay network
agentURL := fmt.Sprintf("http://%s:%d", cfg.ServiceName, cfg.ServicePort)
if async {
return e.postAgentTask(ctx, agentURL, agentID, task, callbackURL)
req := A2ATaskRequest{
TaskID: fmt.Sprintf("orch-%d-%d", agentID, time.Now().UnixMilli()),
FromAgentID: 0, // orchestrator
Task: task,
CallbackURL: callbackURL,
Priority: priority,
TimeoutSecs: timeoutSecs,
}
return e.postAgentChat(ctx, agentURL, agentID, task)
if async {
return e.postA2ATask(ctx, agentURL, req)
}
return e.postA2AChat(ctx, agentURL, task, timeoutSecs)
}
if e.database != nil {
cfg, _ := e.database.GetAgentByID(agentID)
status := "unknown"
if cfg != nil {
status = cfg.ContainerStatus
if status == "" {
status = "stopped"
}
}
return map[string]any{
"delegated": false,
"agentId": agentID,
"status": status,
"note": fmt.Sprintf(
"Agent %d container is %q. Deploy it via Web Panel (POST /api/agents/%d/deploy) then retry.",
agentID, status, agentID),
}, nil
}
}
// Fallback: agent not deployed yet — return informational response
return map[string]any{
"delegated": false,
"agentId": agentID,
"task": task,
"note": fmt.Sprintf("Agent %d is not running (containerStatus != running). Deploy it first via Web Panel.", agentID),
"delegated": false,
"agentId": agentID,
"note": "No database connection — cannot resolve agent address.",
}, nil
}
// postAgentTask POSTs to agent's /task endpoint (async, returns task_id).
func (e *Executor) postAgentTask(ctx context.Context, agentURL string, fromAgentID int, task, callbackURL string) (any, error) {
payload, _ := json.Marshal(map[string]any{
"input": task,
"from_agent_id": fromAgentID,
"callback_url": callbackURL,
})
req, err := http.NewRequestWithContext(ctx, http.MethodPost, agentURL+"/task", bytes.NewReader(payload))
// postA2ATask POSTs to agent's /task endpoint using A2A protocol (async).
func (e *Executor) postA2ATask(ctx context.Context, agentURL string, req A2ATaskRequest) (any, error) {
payload, _ := json.Marshal(req)
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, agentURL+"/task", bytes.NewReader(payload))
if err != nil {
return nil, fmt.Errorf("delegate build request: %w", err)
return nil, fmt.Errorf("a2a build request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := e.httpClient.Do(req)
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("X-GoClaw-From", "orchestrator")
resp, err := e.httpClient.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("delegate HTTP error: %w", err)
return nil, fmt.Errorf("a2a task HTTP error: %w", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
@@ -515,19 +590,27 @@ func (e *Executor) postAgentTask(ctx context.Context, agentURL string, fromAgent
return result, nil
}
// postAgentChat POSTs to agent's /chat endpoint (sync, waits for response).
func (e *Executor) postAgentChat(ctx context.Context, agentURL string, _ int, task string) (any, error) {
// postA2AChat POSTs to agent's /chat endpoint (sync, waits for LLM response).
func (e *Executor) postA2AChat(ctx context.Context, agentURL string, task string, timeoutSecs int) (any, error) {
payload, _ := json.Marshal(map[string]any{
"messages": []map[string]string{{"role": "user", "content": task}},
"messages": []map[string]string{{"role": "user", "content": task}},
"timeout_secs": timeoutSecs,
})
req, err := http.NewRequestWithContext(ctx, http.MethodPost, agentURL+"/chat", bytes.NewReader(payload))
chatCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSecs)*time.Second)
defer cancel()
httpReq, err := http.NewRequestWithContext(chatCtx, http.MethodPost, agentURL+"/chat", bytes.NewReader(payload))
if err != nil {
return nil, fmt.Errorf("delegate build request: %w", err)
return nil, fmt.Errorf("a2a chat request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := e.httpClient.Do(req)
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("X-GoClaw-From", "orchestrator")
// Use a client with longer timeout for sync chats
client := &http.Client{Timeout: time.Duration(timeoutSecs+10) * time.Second}
resp, err := client.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("delegate HTTP error: %w", err)
return nil, fmt.Errorf("a2a chat HTTP error: %w", err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
@@ -536,6 +619,107 @@ func (e *Executor) postAgentChat(ctx context.Context, agentURL string, _ int, ta
return result, nil
}
// fanoutAgents sends the same task to multiple agents in parallel and collects results.
func (e *Executor) fanoutAgents(ctx context.Context, args map[string]any) (any, error) {
task, _ := args["task"].(string)
if task == "" {
return nil, fmt.Errorf("task is required")
}
timeoutSecs := 60
if tf, ok := args["timeoutSecs"].(float64); ok && tf > 0 {
timeoutSecs = int(tf)
}
// Parse agentIds array
rawIDs, _ := args["agentIds"].([]any)
if len(rawIDs) == 0 {
return nil, fmt.Errorf("agentIds must be a non-empty array")
}
if len(rawIDs) > 10 {
rawIDs = rawIDs[:10] // cap at 10
}
type agentResult struct {
AgentID int `json:"agentId"`
AgentName string `json:"agentName,omitempty"`
Success bool `json:"success"`
Result any `json:"result,omitempty"`
Error string `json:"error,omitempty"`
Delegated bool `json:"delegated"`
DurationMs int64 `json:"durationMs"`
}
results := make([]agentResult, len(rawIDs))
var wg sync.WaitGroup
fanCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSecs+5)*time.Second)
defer cancel()
for i, rawID := range rawIDs {
idf, _ := rawID.(float64)
agentID := int(idf)
idx := i
wg.Add(1)
go func() {
defer wg.Done()
start := time.Now()
ar := agentResult{AgentID: agentID}
if e.database == nil {
ar.Error = "no database connection"
results[idx] = ar
return
}
cfg, err := e.database.GetAgentByID(agentID)
if err != nil || cfg == nil {
ar.Error = fmt.Sprintf("agent %d not found", agentID)
results[idx] = ar
return
}
ar.AgentName = cfg.Name
if cfg.ServicePort == 0 || cfg.ContainerStatus != "running" {
ar.Delegated = false
ar.Error = fmt.Sprintf("agent %q is %q — not running", cfg.Name, cfg.ContainerStatus)
results[idx] = ar
return
}
agentURL := fmt.Sprintf("http://%s:%d", cfg.ServiceName, cfg.ServicePort)
res, chatErr := e.postA2AChat(fanCtx, agentURL, task, timeoutSecs)
ar.DurationMs = ms(start)
if chatErr != nil {
ar.Success = false
ar.Error = chatErr.Error()
} else {
ar.Success = true
ar.Delegated = true
ar.Result = res
}
results[idx] = ar
}()
}
wg.Wait()
succeeded := 0
for _, r := range results {
if r.Success {
succeeded++
}
}
return map[string]any{
"task": task,
"total": len(results),
"succeeded": succeeded,
"failed": len(results) - succeeded,
"results": results,
}, nil
}
// ─── Helpers ──────────────────────────────────────────────────────────────────
func (e *Executor) resolvePath(path string) string {