package docker import ( "bytes" "context" "encoding/json" "fmt" "io" "net" "net/http" "os/exec" "strings" "time" ) // DockerClient communicates with the Docker daemon via Unix socket. type DockerClient struct { httpClient *http.Client baseURL string } // NewDockerClient creates a client talking to /var/run/docker.sock. func NewDockerClient() *DockerClient { transport := &http.Transport{ DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { return (&net.Dialer{Timeout: 5 * time.Second}).DialContext(ctx, "unix", "/var/run/docker.sock") }, } return &DockerClient{ httpClient: &http.Client{Transport: transport, Timeout: 30 * time.Second}, baseURL: "http://localhost", } } // ─── HTTP helpers ───────────────────────────────────────────────────────────── func (c *DockerClient) get(path string, out interface{}) error { resp, err := c.httpClient.Get(c.baseURL + path) if err != nil { return fmt.Errorf("docker GET %s: %w", path, err) } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) if resp.StatusCode >= 400 { return fmt.Errorf("docker GET %s: status %d: %s", path, resp.StatusCode, string(body)) } return json.Unmarshal(body, out) } func (c *DockerClient) post(path string, payload interface{}, out interface{}) error { b, err := json.Marshal(payload) if err != nil { return err } resp, err := c.httpClient.Post(c.baseURL+path, "application/json", bytes.NewReader(b)) if err != nil { return fmt.Errorf("docker POST %s: %w", path, err) } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) if resp.StatusCode >= 400 { return fmt.Errorf("docker POST %s: status %d: %s", path, resp.StatusCode, string(body)) } if out != nil && len(body) > 0 { return json.Unmarshal(body, out) } return nil } func (c *DockerClient) postUpdate(path string, version int, payload interface{}) error { b, err := json.Marshal(payload) if err != nil { return err } url := fmt.Sprintf("%s%s?version=%d", c.baseURL, path, version) req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(b)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") resp, err := c.httpClient.Do(req) if err != nil { return fmt.Errorf("docker POST(update) %s: %w", path, err) } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) if resp.StatusCode >= 400 { return fmt.Errorf("docker POST(update) %s: status %d: %s", path, resp.StatusCode, string(body)) } return nil } // ─── Swarm Node Types ───────────────────────────────────────────────────────── type SwarmNode struct { ID string `json:"ID"` Description NodeDescription `json:"Description"` Status NodeStatus `json:"Status"` ManagerStatus *ManagerStatus `json:"ManagerStatus,omitempty"` Spec NodeSpec `json:"Spec"` UpdatedAt time.Time `json:"UpdatedAt"` CreatedAt time.Time `json:"CreatedAt"` Version VersionInfo `json:"Version"` } type VersionInfo struct { Index int `json:"Index"` } type NodeDescription struct { Hostname string `json:"Hostname"` Platform Platform `json:"Platform"` Resources Resources `json:"Resources"` Engine Engine `json:"Engine"` } type Platform struct { Architecture string `json:"Architecture"` OS string `json:"OS"` } type Resources struct { NanoCPUs int64 `json:"NanoCPUs"` MemoryBytes int64 `json:"MemoryBytes"` } type Engine struct { EngineVersion string `json:"EngineVersion"` } type NodeStatus struct { State string `json:"State"` Addr string `json:"Addr"` Message string `json:"Message"` } type ManagerStatus struct { Addr string `json:"Addr"` Leader bool `json:"Leader"` Reachability string `json:"Reachability"` } type NodeSpec struct { Role string `json:"Role"` Availability string `json:"Availability"` Labels map[string]string `json:"Labels"` } // ─── Swarm Service Types ────────────────────────────────────────────────────── type SwarmService struct { ID string `json:"ID"` Spec ServiceSpec `json:"Spec"` ServiceStatus *ServiceStatus `json:"ServiceStatus,omitempty"` UpdatedAt time.Time `json:"UpdatedAt"` CreatedAt time.Time `json:"CreatedAt"` Version VersionInfo `json:"Version"` } type ServiceSpec struct { Name string `json:"Name"` Mode ServiceMode `json:"Mode"` TaskTemplate TaskTemplate `json:"TaskTemplate"` EndpointSpec *EndpointSpec `json:"EndpointSpec,omitempty"` Labels map[string]string `json:"Labels"` Networks []NetworkAttachment `json:"Networks,omitempty"` } type NetworkAttachment struct { Target string `json:"Target"` Aliases []string `json:"Aliases,omitempty"` } type ServiceMode struct { Replicated *ReplicatedService `json:"Replicated,omitempty"` Global *struct{} `json:"Global,omitempty"` } type ReplicatedService struct { Replicas int `json:"Replicas"` } type TaskTemplate struct { ContainerSpec ContainerSpec `json:"ContainerSpec"` Resources *TaskResources `json:"Resources,omitempty"` Placement *Placement `json:"Placement,omitempty"` } type ContainerSpec struct { Image string `json:"Image"` Env []string `json:"Env,omitempty"` Labels map[string]string `json:"Labels,omitempty"` } type TaskResources struct { Limits *ResourceSpec `json:"Limits,omitempty"` Reservations *ResourceSpec `json:"Reservations,omitempty"` } type ResourceSpec struct { NanoCPUs int64 `json:"NanoCPUs,omitempty"` MemoryBytes int64 `json:"MemoryBytes,omitempty"` } type Placement struct { Constraints []string `json:"Constraints,omitempty"` } type EndpointSpec struct { Ports []PortConfig `json:"Ports,omitempty"` } type PortConfig struct { Protocol string `json:"Protocol"` TargetPort int `json:"TargetPort"` PublishedPort int `json:"PublishedPort"` PublishMode string `json:"PublishMode"` } type ServiceStatus struct { RunningTasks int `json:"RunningTasks"` DesiredTasks int `json:"DesiredTasks"` CompletedTasks int `json:"CompletedTasks"` } // ─── Swarm Task Types ───────────────────────────────────────────────────────── type SwarmTask struct { ID string `json:"ID"` ServiceID string `json:"ServiceID"` NodeID string `json:"NodeID"` Spec TaskSpec `json:"Spec"` Status TaskStatus `json:"Status"` Slot int `json:"Slot"` UpdatedAt time.Time `json:"UpdatedAt"` CreatedAt time.Time `json:"CreatedAt"` } type TaskSpec struct { ContainerSpec ContainerSpec `json:"ContainerSpec"` } type TaskStatus struct { Timestamp time.Time `json:"Timestamp"` State string `json:"State"` Message string `json:"Message"` ContainerStatus *ContainerTaskStatus `json:"ContainerStatus,omitempty"` } type ContainerTaskStatus struct { ContainerID string `json:"ContainerID"` PID int `json:"PID"` } // ─── Swarm Info / Tokens ────────────────────────────────────────────────────── type DockerInfo struct { Swarm SwarmInfo `json:"Swarm"` } type SwarmInfo struct { NodeID string `json:"NodeID"` LocalNodeState string `json:"LocalNodeState"` ControlAvailable bool `json:"ControlAvailable"` Managers int `json:"Managers"` Nodes int `json:"Nodes"` RemoteManagers []RemoteManager `json:"RemoteManagers"` } type RemoteManager struct { NodeID string `json:"NodeID"` Addr string `json:"Addr"` } type SwarmSpec struct { JoinTokens JoinTokens `json:"JoinTokens"` ID string `json:"ID"` } type JoinTokens struct { Worker string `json:"Worker"` Manager string `json:"Manager"` } // ─── Container types ────────────────────────────────────────────────────────── type Container struct { ID string `json:"Id"` Names []string `json:"Names"` Image string `json:"Image"` State string `json:"State"` Status string `json:"Status"` Labels map[string]string `json:"Labels"` } type ContainerStats struct { CPUStats CPUStats `json:"cpu_stats"` PreCPUStats CPUStats `json:"precpu_stats"` MemoryStats MemoryStats `json:"memory_stats"` } type CPUStats struct { CPUUsage CPUUsage `json:"cpu_usage"` SystemCPUUsage int64 `json:"system_cpu_usage"` OnlineCPUs int `json:"online_cpus"` } type CPUUsage struct { TotalUsage int64 `json:"total_usage"` PercpuUsage []int64 `json:"percpu_usage"` } type MemoryStats struct { Usage int64 `json:"usage"` MaxUsage int64 `json:"max_usage"` Limit int64 `json:"limit"` Stats map[string]int64 `json:"stats"` } // ─── Methods: Swarm info ────────────────────────────────────────────────────── func (c *DockerClient) IsSwarmActive() bool { var info DockerInfo if err := c.get("/v1.44/info", &info); err != nil { return false } return info.Swarm.LocalNodeState == "active" } func (c *DockerClient) GetSwarmInfo() (*DockerInfo, error) { var info DockerInfo if err := c.get("/v1.44/info", &info); err != nil { return nil, err } return &info, nil } // GetJoinTokens returns the Swarm worker and manager join tokens. // Requires this node to be a swarm manager. func (c *DockerClient) GetJoinTokens() (*SwarmSpec, error) { var spec SwarmSpec if err := c.get("/v1.44/swarm", &spec); err != nil { return nil, err } return &spec, nil } // GetManagerAddr returns the advertise address (IP:2377) for joining this swarm. func (c *DockerClient) GetManagerAddr() string { info, err := c.GetSwarmInfo() if err != nil || len(info.Swarm.RemoteManagers) == 0 { return "" } return info.Swarm.RemoteManagers[0].Addr } // ─── Methods: Nodes ─────────────────────────────────────────────────────────── func (c *DockerClient) ListNodes() ([]SwarmNode, error) { var nodes []SwarmNode if err := c.get("/v1.44/nodes", &nodes); err != nil { return nil, err } return nodes, nil } // UpdateNodeAvailability sets a node's availability (active|pause|drain). func (c *DockerClient) UpdateNodeAvailability(nodeID, availability string) error { // First get current node spec + version var node SwarmNode if err := c.get("/v1.44/nodes/"+nodeID, &node); err != nil { return err } node.Spec.Availability = availability return c.postUpdate("/v1.44/nodes/"+nodeID+"/update", node.Version.Index, node.Spec) } // AddNodeLabel adds a label to a swarm node. func (c *DockerClient) AddNodeLabel(nodeID, key, value string) error { var node SwarmNode if err := c.get("/v1.44/nodes/"+nodeID, &node); err != nil { return err } if node.Spec.Labels == nil { node.Spec.Labels = map[string]string{} } node.Spec.Labels[key] = value return c.postUpdate("/v1.44/nodes/"+nodeID+"/update", node.Version.Index, node.Spec) } // ─── Methods: Services ──────────────────────────────────────────────────────── // ListServices returns all swarm services, optionally filtered by label. func (c *DockerClient) ListServices() ([]SwarmService, error) { var services []SwarmService // Include ServiceStatus so running/desired replicas are returned if err := c.get("/v1.44/services?status=true", &services); err != nil { return nil, err } return services, nil } // GetService returns a single service by ID or name. func (c *DockerClient) GetService(idOrName string) (*SwarmService, error) { var svc SwarmService if err := c.get("/v1.44/services/"+idOrName+"?status=true", &svc); err != nil { return nil, err } return &svc, nil } // ScaleService updates the replica count for a replicated service. func (c *DockerClient) ScaleService(idOrName string, replicas int) error { svc, err := c.GetService(idOrName) if err != nil { return err } if svc.Spec.Mode.Replicated == nil { return fmt.Errorf("service %s is not in replicated mode", idOrName) } svc.Spec.Mode.Replicated.Replicas = replicas return c.postUpdate( "/v1.44/services/"+svc.ID+"/update", svc.Version.Index, svc.Spec, ) } // ListServiceTasks returns all tasks for a given service. func (c *DockerClient) ListServiceTasks(serviceID string) ([]SwarmTask, error) { var tasks []SwarmTask filter := fmt.Sprintf(`{"service":["%s"]}`, serviceID) path := "/v1.44/tasks?filters=" + urlEncode(filter) if err := c.get(path, &tasks); err != nil { return nil, err } return tasks, nil } // ListAllTasks returns all swarm tasks (across services). func (c *DockerClient) ListAllTasks() ([]SwarmTask, error) { var tasks []SwarmTask if err := c.get("/v1.44/tasks", &tasks); err != nil { return nil, err } return tasks, nil } // CreateAgentService deploys a new swarm service for an AI agent. // image: container image, name: service name, replicas: initial count, // env: environment variables, port: optional published port (0 = none). // CreateAgentServiceOpts holds options for deploying an agent Swarm service. type CreateAgentServiceOpts struct { Name string Image string Replicas int Env []string Port int Networks []string // overlay network names/IDs to attach Labels map[string]string } func (c *DockerClient) CreateAgentService(name, image string, replicas int, env []string, port int) (*SwarmService, error) { return c.CreateAgentServiceFull(CreateAgentServiceOpts{ Name: name, Image: image, Replicas: replicas, Env: env, Port: port, }) } func (c *DockerClient) CreateAgentServiceFull(opts CreateAgentServiceOpts) (*SwarmService, error) { labels := map[string]string{ "goclaw.agent": "true", "goclaw.name": opts.Name, } for k, v := range opts.Labels { labels[k] = v } spec := ServiceSpec{ Name: opts.Name, Mode: ServiceMode{ Replicated: &ReplicatedService{Replicas: opts.Replicas}, }, TaskTemplate: TaskTemplate{ ContainerSpec: ContainerSpec{ Image: opts.Image, Env: opts.Env, }, }, Labels: labels, } if opts.Port > 0 { spec.EndpointSpec = &EndpointSpec{ Ports: []PortConfig{ { Protocol: "tcp", TargetPort: opts.Port, PublishMode: "ingress", }, }, } } if len(opts.Networks) > 0 { for _, net := range opts.Networks { spec.Networks = append(spec.Networks, NetworkAttachment{ Target: net, Aliases: []string{opts.Name}, }) } } var created struct { ID string `json:"ID"` } if err := c.post("/v1.44/services/create", spec, &created); err != nil { return nil, err } return c.GetService(created.ID) } // RemoveService removes a swarm service by ID or name. func (c *DockerClient) RemoveService(idOrName string) error { req, err := http.NewRequest(http.MethodDelete, c.baseURL+"/v1.44/services/"+urlEncode(idOrName), nil) if err != nil { return err } resp, err := c.httpClient.Do(req) if err != nil { return fmt.Errorf("docker DELETE service %s: %w", idOrName, err) } defer resp.Body.Close() if resp.StatusCode >= 400 { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("docker DELETE service %s: status %d: %s", idOrName, resp.StatusCode, string(body)) } return nil } // GetServiceLastActivity returns the most recent task update time for a service. // Used to determine whether a service is idle. func (c *DockerClient) GetServiceLastActivity(serviceID string) (time.Time, error) { tasks, err := c.ListServiceTasks(serviceID) if err != nil { return time.Time{}, err } var latest time.Time for _, t := range tasks { if t.UpdatedAt.After(latest) { latest = t.UpdatedAt } } return latest, nil } // ─── Methods: Containers ───────────────────────────────────────────────────── func (c *DockerClient) ListContainers() ([]Container, error) { var containers []Container if err := c.get("/v1.44/containers/json?all=false", &containers); err != nil { return nil, err } return containers, nil } func (c *DockerClient) GetContainerStats(containerID string) (*ContainerStats, error) { var stats ContainerStats if err := c.get(fmt.Sprintf("/v1.44/containers/%s/stats?stream=false", containerID), &stats); err != nil { return nil, err } return &stats, nil } // ─── Host Shell execution ───────────────────────────────────────────────────── // The gateway runs inside a container but has /var/run/docker.sock mounted. // We use `docker exec` against the host PID namespace via a privileged helper, // OR simply run commands via the docker socket by exec-ing into the gateway // container's own shell with nsenter to reach PID 1 on the host. // // Approach: use `nsenter -t 1 -m -u -i -n -p -- ` via the host PID namespace. // This requires the container to run with --privileged or SYS_PTRACE capability // and PID namespace sharing. We add that to docker-compose.yml. // // Alternative (safer): exec into host via SSH or a privileged sidecar. // For now we use nsenter which works when pid:host and privileged: true. // ExecOnHost runs a shell command on the host via nsenter into PID 1. // Returns combined stdout+stderr. func ExecOnHost(command string) (string, error) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Try nsenter (requires pid:host + SYS_ADMIN or privileged) cmd := exec.CommandContext(ctx, "nsenter", "-t", "1", "-m", "-u", "-i", "-n", "-p", "--", "sh", "-c", command) var out bytes.Buffer var stderr bytes.Buffer cmd.Stdout = &out cmd.Stderr = &stderr if err := cmd.Run(); err != nil { // If nsenter fails, fall back to running in container scope cmd2 := exec.CommandContext(ctx, "sh", "-c", command) var out2 bytes.Buffer var stderr2 bytes.Buffer cmd2.Stdout = &out2 cmd2.Stderr = &stderr2 if err2 := cmd2.Run(); err2 != nil { combined := out2.String() + stderr2.String() if combined == "" { combined = err2.Error() } return combined, err2 } return out2.String() + stderr2.String(), nil } return out.String() + stderr.String(), nil } // ExecDockerCLI runs `docker ` on the host by calling the docker socket. // Since we have the socket mounted, we can exec docker commands directly // using the docker CLI binary if available. func ExecDockerCLI(args ...string) (string, error) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() cmd := exec.CommandContext(ctx, "docker", args...) var out, stderr bytes.Buffer cmd.Stdout = &out cmd.Stderr = &stderr if err := cmd.Run(); err != nil { return out.String() + stderr.String(), err } return out.String(), nil } // CalcCPUPercent computes CPU% from stats snapshot. func CalcCPUPercent(stats *ContainerStats) float64 { cpuDelta := float64(stats.CPUStats.CPUUsage.TotalUsage) - float64(stats.PreCPUStats.CPUUsage.TotalUsage) systemDelta := float64(stats.CPUStats.SystemCPUUsage) - float64(stats.PreCPUStats.SystemCPUUsage) numCPU := float64(stats.CPUStats.OnlineCPUs) if numCPU == 0 { numCPU = float64(len(stats.CPUStats.CPUUsage.PercpuUsage)) } if systemDelta > 0 && cpuDelta > 0 { return (cpuDelta / systemDelta) * numCPU * 100.0 } return 0 } // ─── Helpers ────────────────────────────────────────────────────────────────── func urlEncode(s string) string { var b strings.Builder for _, r := range s { switch { case r >= 'A' && r <= 'Z', r >= 'a' && r <= 'z', r >= '0' && r <= '9', r == '-', r == '_', r == '.', r == '~': b.WriteRune(r) default: b.WriteString(fmt.Sprintf("%%%02X", r)) } } return b.String() }