Files
GoClaw/gateway/internal/docker/client.go
bboxwtf a8a8ea1ee2 feat(swarm): autonomous agent containers, Swarm Manager with auto-stop, /nodes UI overhaul
## 1. Fix /nodes Swarm Status Display
- Add SwarmStatusBanner component: clear green/red/loading state
- Shows nodeId, managerAddr, isManager badge
- Error state explains what to check (docker.sock mount)
- Header now shows 'swarm unreachable — check gateway' vs 'active'
- swarmOk now checks nodeId presence, not just data existence

## 2. Autonomous Agent Container
- New docker/Dockerfile.agent — builds Go agent binary from gateway/cmd/agent/
- New gateway/cmd/agent/main.go — standalone HTTP microservice:
  * GET /health — liveness probe with idle time info
  * POST /task — receives task, forwards to Gateway orchestrator
  * GET /info  — agent metadata (id, hostname, gateway url)
  * Idle watchdog: calls /api/swarm/agents/{name}/stop after IdleTimeoutMinutes
  * Connects to Swarm overlay network (goclaw-net) → reaches DB/Gateway by DNS
  * Env: AGENT_ID, GATEWAY_URL, DATABASE_URL, IDLE_TIMEOUT_MINUTES

## 3. Swarm Manager Agent (auto-stop after 15min idle)
- New gateway/internal/api/swarm_manager.go:
  * SwarmManager goroutine checks every 60s
  * Scales idle GoClaw agent services to 0 replicas after 15 min
  * Tracks lastActivity from task UpdatedAt timestamps
- New REST endpoints in gateway:
  * GET  /api/swarm/agents           — list agents with idleMinutes
  * POST /api/swarm/agents/{name}/start — scale up agent
  * POST /api/swarm/agents/{name}/stop  — scale to 0
  * DELETE /api/swarm/services/{id}     — remove service permanently
- SwarmManager started as background goroutine in main.go with context cancel

## 4. Docker Client Enhancements
- Added NetworkAttachment type and Networks field to ServiceSpec
- CreateAgentServiceFull(opts) — supports overlay networks, custom labels
- CreateAgentService() delegates to CreateAgentServiceFull for backward compat
- RemoveService(id) — DELETE /v1.44/services/{id}
- GetServiceLastActivity(id) — finds latest task UpdatedAt for idle detection

## 5. tRPC & Gateway Proxy
- New functions: removeSwarmService, listSwarmAgents, startSwarmAgent, stopSwarmAgent
- SwarmAgentInfo type with idleMinutes, lastActivity, desiredReplicas
- createAgentService now accepts networks[] parameter
- New tRPC endpoints: nodes.removeService, nodes.listAgents, nodes.startAgent, nodes.stopAgent

## 6. Nodes.tsx UI Overhaul
- SwarmStatusBanner component at top — no more silent 'connecting…'
- New 'Agents' tab with AgentManagerRow: idle time, auto-stop warning, start/stop/remove buttons
- IdleColor coding: green < 5m, yellow 5-10m, red 10m+ with countdown to auto-stop
- ServiceRow: added Remove button with confirmation dialog
- RemoveConfirmDialog component
- DeployAgentDialog: added overlay networks field, default env includes GATEWAY_URL
- All queries refetch after agent start/stop/remove
2026-03-21 20:37:21 +00:00

665 lines
20 KiB
Go

package docker
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os/exec"
"strings"
"time"
)
// DockerClient communicates with the Docker daemon via Unix socket.
type DockerClient struct {
httpClient *http.Client
baseURL string
}
// NewDockerClient creates a client talking to /var/run/docker.sock.
func NewDockerClient() *DockerClient {
transport := &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
return (&net.Dialer{Timeout: 5 * time.Second}).DialContext(ctx, "unix", "/var/run/docker.sock")
},
}
return &DockerClient{
httpClient: &http.Client{Transport: transport, Timeout: 30 * time.Second},
baseURL: "http://localhost",
}
}
// ─── HTTP helpers ─────────────────────────────────────────────────────────────
func (c *DockerClient) get(path string, out interface{}) error {
resp, err := c.httpClient.Get(c.baseURL + path)
if err != nil {
return fmt.Errorf("docker GET %s: %w", path, err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
return fmt.Errorf("docker GET %s: status %d: %s", path, resp.StatusCode, string(body))
}
return json.Unmarshal(body, out)
}
func (c *DockerClient) post(path string, payload interface{}, out interface{}) error {
b, err := json.Marshal(payload)
if err != nil {
return err
}
resp, err := c.httpClient.Post(c.baseURL+path, "application/json", bytes.NewReader(b))
if err != nil {
return fmt.Errorf("docker POST %s: %w", path, err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
return fmt.Errorf("docker POST %s: status %d: %s", path, resp.StatusCode, string(body))
}
if out != nil && len(body) > 0 {
return json.Unmarshal(body, out)
}
return nil
}
func (c *DockerClient) postUpdate(path string, version int, payload interface{}) error {
b, err := json.Marshal(payload)
if err != nil {
return err
}
url := fmt.Sprintf("%s%s?version=%d", c.baseURL, path, version)
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(b))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("docker POST(update) %s: %w", path, err)
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode >= 400 {
return fmt.Errorf("docker POST(update) %s: status %d: %s", path, resp.StatusCode, string(body))
}
return nil
}
// ─── Swarm Node Types ─────────────────────────────────────────────────────────
type SwarmNode struct {
ID string `json:"ID"`
Description NodeDescription `json:"Description"`
Status NodeStatus `json:"Status"`
ManagerStatus *ManagerStatus `json:"ManagerStatus,omitempty"`
Spec NodeSpec `json:"Spec"`
UpdatedAt time.Time `json:"UpdatedAt"`
CreatedAt time.Time `json:"CreatedAt"`
Version VersionInfo `json:"Version"`
}
type VersionInfo struct {
Index int `json:"Index"`
}
type NodeDescription struct {
Hostname string `json:"Hostname"`
Platform Platform `json:"Platform"`
Resources Resources `json:"Resources"`
Engine Engine `json:"Engine"`
}
type Platform struct {
Architecture string `json:"Architecture"`
OS string `json:"OS"`
}
type Resources struct {
NanoCPUs int64 `json:"NanoCPUs"`
MemoryBytes int64 `json:"MemoryBytes"`
}
type Engine struct {
EngineVersion string `json:"EngineVersion"`
}
type NodeStatus struct {
State string `json:"State"`
Addr string `json:"Addr"`
Message string `json:"Message"`
}
type ManagerStatus struct {
Addr string `json:"Addr"`
Leader bool `json:"Leader"`
Reachability string `json:"Reachability"`
}
type NodeSpec struct {
Role string `json:"Role"`
Availability string `json:"Availability"`
Labels map[string]string `json:"Labels"`
}
// ─── Swarm Service Types ──────────────────────────────────────────────────────
type SwarmService struct {
ID string `json:"ID"`
Spec ServiceSpec `json:"Spec"`
ServiceStatus *ServiceStatus `json:"ServiceStatus,omitempty"`
UpdatedAt time.Time `json:"UpdatedAt"`
CreatedAt time.Time `json:"CreatedAt"`
Version VersionInfo `json:"Version"`
}
type ServiceSpec struct {
Name string `json:"Name"`
Mode ServiceMode `json:"Mode"`
TaskTemplate TaskTemplate `json:"TaskTemplate"`
EndpointSpec *EndpointSpec `json:"EndpointSpec,omitempty"`
Labels map[string]string `json:"Labels"`
Networks []NetworkAttachment `json:"Networks,omitempty"`
}
type NetworkAttachment struct {
Target string `json:"Target"`
Aliases []string `json:"Aliases,omitempty"`
}
type ServiceMode struct {
Replicated *ReplicatedService `json:"Replicated,omitempty"`
Global *struct{} `json:"Global,omitempty"`
}
type ReplicatedService struct {
Replicas int `json:"Replicas"`
}
type TaskTemplate struct {
ContainerSpec ContainerSpec `json:"ContainerSpec"`
Resources *TaskResources `json:"Resources,omitempty"`
Placement *Placement `json:"Placement,omitempty"`
}
type ContainerSpec struct {
Image string `json:"Image"`
Env []string `json:"Env,omitempty"`
Labels map[string]string `json:"Labels,omitempty"`
}
type TaskResources struct {
Limits *ResourceSpec `json:"Limits,omitempty"`
Reservations *ResourceSpec `json:"Reservations,omitempty"`
}
type ResourceSpec struct {
NanoCPUs int64 `json:"NanoCPUs,omitempty"`
MemoryBytes int64 `json:"MemoryBytes,omitempty"`
}
type Placement struct {
Constraints []string `json:"Constraints,omitempty"`
}
type EndpointSpec struct {
Ports []PortConfig `json:"Ports,omitempty"`
}
type PortConfig struct {
Protocol string `json:"Protocol"`
TargetPort int `json:"TargetPort"`
PublishedPort int `json:"PublishedPort"`
PublishMode string `json:"PublishMode"`
}
type ServiceStatus struct {
RunningTasks int `json:"RunningTasks"`
DesiredTasks int `json:"DesiredTasks"`
CompletedTasks int `json:"CompletedTasks"`
}
// ─── Swarm Task Types ─────────────────────────────────────────────────────────
type SwarmTask struct {
ID string `json:"ID"`
ServiceID string `json:"ServiceID"`
NodeID string `json:"NodeID"`
Spec TaskSpec `json:"Spec"`
Status TaskStatus `json:"Status"`
Slot int `json:"Slot"`
UpdatedAt time.Time `json:"UpdatedAt"`
CreatedAt time.Time `json:"CreatedAt"`
}
type TaskSpec struct {
ContainerSpec ContainerSpec `json:"ContainerSpec"`
}
type TaskStatus struct {
Timestamp time.Time `json:"Timestamp"`
State string `json:"State"`
Message string `json:"Message"`
ContainerStatus *ContainerTaskStatus `json:"ContainerStatus,omitempty"`
}
type ContainerTaskStatus struct {
ContainerID string `json:"ContainerID"`
PID int `json:"PID"`
}
// ─── Swarm Info / Tokens ──────────────────────────────────────────────────────
type DockerInfo struct {
Swarm SwarmInfo `json:"Swarm"`
}
type SwarmInfo struct {
NodeID string `json:"NodeID"`
LocalNodeState string `json:"LocalNodeState"`
ControlAvailable bool `json:"ControlAvailable"`
Managers int `json:"Managers"`
Nodes int `json:"Nodes"`
RemoteManagers []RemoteManager `json:"RemoteManagers"`
}
type RemoteManager struct {
NodeID string `json:"NodeID"`
Addr string `json:"Addr"`
}
type SwarmSpec struct {
JoinTokens JoinTokens `json:"JoinTokens"`
ID string `json:"ID"`
}
type JoinTokens struct {
Worker string `json:"Worker"`
Manager string `json:"Manager"`
}
// ─── Container types ──────────────────────────────────────────────────────────
type Container struct {
ID string `json:"Id"`
Names []string `json:"Names"`
Image string `json:"Image"`
State string `json:"State"`
Status string `json:"Status"`
Labels map[string]string `json:"Labels"`
}
type ContainerStats struct {
CPUStats CPUStats `json:"cpu_stats"`
PreCPUStats CPUStats `json:"precpu_stats"`
MemoryStats MemoryStats `json:"memory_stats"`
}
type CPUStats struct {
CPUUsage CPUUsage `json:"cpu_usage"`
SystemCPUUsage int64 `json:"system_cpu_usage"`
OnlineCPUs int `json:"online_cpus"`
}
type CPUUsage struct {
TotalUsage int64 `json:"total_usage"`
PercpuUsage []int64 `json:"percpu_usage"`
}
type MemoryStats struct {
Usage int64 `json:"usage"`
MaxUsage int64 `json:"max_usage"`
Limit int64 `json:"limit"`
Stats map[string]int64 `json:"stats"`
}
// ─── Methods: Swarm info ──────────────────────────────────────────────────────
func (c *DockerClient) IsSwarmActive() bool {
var info DockerInfo
if err := c.get("/v1.44/info", &info); err != nil {
return false
}
return info.Swarm.LocalNodeState == "active"
}
func (c *DockerClient) GetSwarmInfo() (*DockerInfo, error) {
var info DockerInfo
if err := c.get("/v1.44/info", &info); err != nil {
return nil, err
}
return &info, nil
}
// GetJoinTokens returns the Swarm worker and manager join tokens.
// Requires this node to be a swarm manager.
func (c *DockerClient) GetJoinTokens() (*SwarmSpec, error) {
var spec SwarmSpec
if err := c.get("/v1.44/swarm", &spec); err != nil {
return nil, err
}
return &spec, nil
}
// GetManagerAddr returns the advertise address (IP:2377) for joining this swarm.
func (c *DockerClient) GetManagerAddr() string {
info, err := c.GetSwarmInfo()
if err != nil || len(info.Swarm.RemoteManagers) == 0 {
return ""
}
return info.Swarm.RemoteManagers[0].Addr
}
// ─── Methods: Nodes ───────────────────────────────────────────────────────────
func (c *DockerClient) ListNodes() ([]SwarmNode, error) {
var nodes []SwarmNode
if err := c.get("/v1.44/nodes", &nodes); err != nil {
return nil, err
}
return nodes, nil
}
// UpdateNodeAvailability sets a node's availability (active|pause|drain).
func (c *DockerClient) UpdateNodeAvailability(nodeID, availability string) error {
// First get current node spec + version
var node SwarmNode
if err := c.get("/v1.44/nodes/"+nodeID, &node); err != nil {
return err
}
node.Spec.Availability = availability
return c.postUpdate("/v1.44/nodes/"+nodeID+"/update", node.Version.Index, node.Spec)
}
// AddNodeLabel adds a label to a swarm node.
func (c *DockerClient) AddNodeLabel(nodeID, key, value string) error {
var node SwarmNode
if err := c.get("/v1.44/nodes/"+nodeID, &node); err != nil {
return err
}
if node.Spec.Labels == nil {
node.Spec.Labels = map[string]string{}
}
node.Spec.Labels[key] = value
return c.postUpdate("/v1.44/nodes/"+nodeID+"/update", node.Version.Index, node.Spec)
}
// ─── Methods: Services ────────────────────────────────────────────────────────
// ListServices returns all swarm services, optionally filtered by label.
func (c *DockerClient) ListServices() ([]SwarmService, error) {
var services []SwarmService
// Include ServiceStatus so running/desired replicas are returned
if err := c.get("/v1.44/services?status=true", &services); err != nil {
return nil, err
}
return services, nil
}
// GetService returns a single service by ID or name.
func (c *DockerClient) GetService(idOrName string) (*SwarmService, error) {
var svc SwarmService
if err := c.get("/v1.44/services/"+idOrName+"?status=true", &svc); err != nil {
return nil, err
}
return &svc, nil
}
// ScaleService updates the replica count for a replicated service.
func (c *DockerClient) ScaleService(idOrName string, replicas int) error {
svc, err := c.GetService(idOrName)
if err != nil {
return err
}
if svc.Spec.Mode.Replicated == nil {
return fmt.Errorf("service %s is not in replicated mode", idOrName)
}
svc.Spec.Mode.Replicated.Replicas = replicas
return c.postUpdate(
"/v1.44/services/"+svc.ID+"/update",
svc.Version.Index,
svc.Spec,
)
}
// ListServiceTasks returns all tasks for a given service.
func (c *DockerClient) ListServiceTasks(serviceID string) ([]SwarmTask, error) {
var tasks []SwarmTask
filter := fmt.Sprintf(`{"service":["%s"]}`, serviceID)
path := "/v1.44/tasks?filters=" + urlEncode(filter)
if err := c.get(path, &tasks); err != nil {
return nil, err
}
return tasks, nil
}
// ListAllTasks returns all swarm tasks (across services).
func (c *DockerClient) ListAllTasks() ([]SwarmTask, error) {
var tasks []SwarmTask
if err := c.get("/v1.44/tasks", &tasks); err != nil {
return nil, err
}
return tasks, nil
}
// CreateAgentService deploys a new swarm service for an AI agent.
// image: container image, name: service name, replicas: initial count,
// env: environment variables, port: optional published port (0 = none).
// CreateAgentServiceOpts holds options for deploying an agent Swarm service.
type CreateAgentServiceOpts struct {
Name string
Image string
Replicas int
Env []string
Port int
Networks []string // overlay network names/IDs to attach
Labels map[string]string
}
func (c *DockerClient) CreateAgentService(name, image string, replicas int, env []string, port int) (*SwarmService, error) {
return c.CreateAgentServiceFull(CreateAgentServiceOpts{
Name: name,
Image: image,
Replicas: replicas,
Env: env,
Port: port,
})
}
func (c *DockerClient) CreateAgentServiceFull(opts CreateAgentServiceOpts) (*SwarmService, error) {
labels := map[string]string{
"goclaw.agent": "true",
"goclaw.name": opts.Name,
}
for k, v := range opts.Labels {
labels[k] = v
}
spec := ServiceSpec{
Name: opts.Name,
Mode: ServiceMode{
Replicated: &ReplicatedService{Replicas: opts.Replicas},
},
TaskTemplate: TaskTemplate{
ContainerSpec: ContainerSpec{
Image: opts.Image,
Env: opts.Env,
},
},
Labels: labels,
}
if opts.Port > 0 {
spec.EndpointSpec = &EndpointSpec{
Ports: []PortConfig{
{
Protocol: "tcp",
TargetPort: opts.Port,
PublishMode: "ingress",
},
},
}
}
if len(opts.Networks) > 0 {
for _, net := range opts.Networks {
spec.Networks = append(spec.Networks, NetworkAttachment{
Target: net,
Aliases: []string{opts.Name},
})
}
}
var created struct {
ID string `json:"ID"`
}
if err := c.post("/v1.44/services/create", spec, &created); err != nil {
return nil, err
}
return c.GetService(created.ID)
}
// RemoveService removes a swarm service by ID or name.
func (c *DockerClient) RemoveService(idOrName string) error {
req, err := http.NewRequest(http.MethodDelete, c.baseURL+"/v1.44/services/"+urlEncode(idOrName), nil)
if err != nil {
return err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("docker DELETE service %s: %w", idOrName, err)
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("docker DELETE service %s: status %d: %s", idOrName, resp.StatusCode, string(body))
}
return nil
}
// GetServiceLastActivity returns the most recent task update time for a service.
// Used to determine whether a service is idle.
func (c *DockerClient) GetServiceLastActivity(serviceID string) (time.Time, error) {
tasks, err := c.ListServiceTasks(serviceID)
if err != nil {
return time.Time{}, err
}
var latest time.Time
for _, t := range tasks {
if t.UpdatedAt.After(latest) {
latest = t.UpdatedAt
}
}
return latest, nil
}
// ─── Methods: Containers ─────────────────────────────────────────────────────
func (c *DockerClient) ListContainers() ([]Container, error) {
var containers []Container
if err := c.get("/v1.44/containers/json?all=false", &containers); err != nil {
return nil, err
}
return containers, nil
}
func (c *DockerClient) GetContainerStats(containerID string) (*ContainerStats, error) {
var stats ContainerStats
if err := c.get(fmt.Sprintf("/v1.44/containers/%s/stats?stream=false", containerID), &stats); err != nil {
return nil, err
}
return &stats, nil
}
// ─── Host Shell execution ─────────────────────────────────────────────────────
// The gateway runs inside a container but has /var/run/docker.sock mounted.
// We use `docker exec` against the host PID namespace via a privileged helper,
// OR simply run commands via the docker socket by exec-ing into the gateway
// container's own shell with nsenter to reach PID 1 on the host.
//
// Approach: use `nsenter -t 1 -m -u -i -n -p -- <cmd>` via the host PID namespace.
// This requires the container to run with --privileged or SYS_PTRACE capability
// and PID namespace sharing. We add that to docker-compose.yml.
//
// Alternative (safer): exec into host via SSH or a privileged sidecar.
// For now we use nsenter which works when pid:host and privileged: true.
// ExecOnHost runs a shell command on the host via nsenter into PID 1.
// Returns combined stdout+stderr.
func ExecOnHost(command string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Try nsenter (requires pid:host + SYS_ADMIN or privileged)
cmd := exec.CommandContext(ctx, "nsenter", "-t", "1", "-m", "-u", "-i", "-n", "-p", "--",
"sh", "-c", command)
var out bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
// If nsenter fails, fall back to running in container scope
cmd2 := exec.CommandContext(ctx, "sh", "-c", command)
var out2 bytes.Buffer
var stderr2 bytes.Buffer
cmd2.Stdout = &out2
cmd2.Stderr = &stderr2
if err2 := cmd2.Run(); err2 != nil {
combined := out2.String() + stderr2.String()
if combined == "" {
combined = err2.Error()
}
return combined, err2
}
return out2.String() + stderr2.String(), nil
}
return out.String() + stderr.String(), nil
}
// ExecDockerCLI runs `docker <args>` on the host by calling the docker socket.
// Since we have the socket mounted, we can exec docker commands directly
// using the docker CLI binary if available.
func ExecDockerCLI(args ...string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "docker", args...)
var out, stderr bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
return out.String() + stderr.String(), err
}
return out.String(), nil
}
// CalcCPUPercent computes CPU% from stats snapshot.
func CalcCPUPercent(stats *ContainerStats) float64 {
cpuDelta := float64(stats.CPUStats.CPUUsage.TotalUsage) - float64(stats.PreCPUStats.CPUUsage.TotalUsage)
systemDelta := float64(stats.CPUStats.SystemCPUUsage) - float64(stats.PreCPUStats.SystemCPUUsage)
numCPU := float64(stats.CPUStats.OnlineCPUs)
if numCPU == 0 {
numCPU = float64(len(stats.CPUStats.CPUUsage.PercpuUsage))
}
if systemDelta > 0 && cpuDelta > 0 {
return (cpuDelta / systemDelta) * numCPU * 100.0
}
return 0
}
// ─── Helpers ──────────────────────────────────────────────────────────────────
func urlEncode(s string) string {
var b strings.Builder
for _, r := range s {
switch {
case r >= 'A' && r <= 'Z', r >= 'a' && r <= 'z', r >= '0' && r <= '9',
r == '-', r == '_', r == '.', r == '~':
b.WriteRune(r)
default:
b.WriteString(fmt.Sprintf("%%%02X", r))
}
}
return b.String()
}