Root cause:
gateway binary was built before SwarmJoinNodeViaSSH was added → 404 on
/api/swarm/join-node → tRPC threw 'Gateway unavailable' error.
Gateway fixes:
- Rebuilt gateway image (golang:1.23-alpine + go mod tidy → clean go.sum)
- SwarmJoinNodeViaSSH now live in binary (was already coded, not compiled)
- NEW: POST /api/swarm/ssh-test handler (SwarmSSHTest)
• Dials SSH with 10s timeout
• Runs 'docker version' on remote to check Docker availability
• Returns {ok, sshOk, dockerOk, dockerVersion, error, step}
- Route registered: r.Post("/swarm/ssh-test", h.SwarmSSHTest)
Server fixes:
- gateway-proxy.ts: added testSSHConnection() → POST /api/swarm/ssh-test
- routers.ts: added nodes.sshTest mutation (input: host/port/user/password)
UI (client/src/pages/Nodes.tsx) — AddNodeDialog rewritten:
- Separate state for testResult and joinResult (two independent panels)
- NEW: yellow 'Test Connection' button → calls nodes.sshTest
shows SSH OK + Docker version, or human-readable error
- 'Join Swarm' button remains independent
- Human-readable error messages per step:
ssh_connect → 'Cannot connect — check IP, port, SSH running'
docker_join → 'docker swarm join failed: ...'
trpc → 'Gateway unavailable — check gateway container'
- Input changes clear stale test/join results
- ✕ close button in dialog header
- Disabled state unified (busy || joinDone)
1591 lines
48 KiB
Go
1591 lines
48 KiB
Go
// Package api implements the HTTP REST API for the GoClaw Gateway.
|
|
package api
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.softuniq.eu/UniqAI/GoClaw/gateway/config"
|
|
"git.softuniq.eu/UniqAI/GoClaw/gateway/internal/db"
|
|
dockerclient "git.softuniq.eu/UniqAI/GoClaw/gateway/internal/docker"
|
|
"git.softuniq.eu/UniqAI/GoClaw/gateway/internal/llm"
|
|
"git.softuniq.eu/UniqAI/GoClaw/gateway/internal/orchestrator"
|
|
"git.softuniq.eu/UniqAI/GoClaw/gateway/internal/tools"
|
|
"golang.org/x/crypto/ssh"
|
|
)
|
|
|
|
// Handler holds all dependencies for HTTP handlers.
|
|
type Handler struct {
|
|
cfg *config.Config
|
|
llm *llm.Client
|
|
orch *orchestrator.Orchestrator
|
|
db *db.DB
|
|
docker *dockerclient.DockerClient
|
|
}
|
|
|
|
func NewHandler(cfg *config.Config, llmClient *llm.Client, orch *orchestrator.Orchestrator, database *db.DB) *Handler {
|
|
return &Handler{
|
|
cfg: cfg,
|
|
llm: llmClient,
|
|
orch: orch,
|
|
db: database,
|
|
docker: dockerclient.NewDockerClient(),
|
|
}
|
|
}
|
|
|
|
// GetDockerClient exposes the docker client for use in main.go startup routines.
|
|
func (h *Handler) GetDockerClient() *dockerclient.DockerClient {
|
|
return h.docker
|
|
}
|
|
|
|
// ─── Health ───────────────────────────────────────────────────────────────────
|
|
|
|
// GET /health
|
|
func (h *Handler) Health(w http.ResponseWriter, r *http.Request) {
|
|
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
ollamaOK, latency, _ := h.llm.Health(ctx)
|
|
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"status": "ok",
|
|
"service": "goclaw-gateway",
|
|
"version": "1.0.0",
|
|
"timestamp": time.Now().UTC().Format(time.RFC3339),
|
|
"ollama": map[string]any{
|
|
"connected": ollamaOK,
|
|
"latencyMs": latency,
|
|
},
|
|
})
|
|
}
|
|
|
|
// ─── Orchestrator ─────────────────────────────────────────────────────────────
|
|
|
|
// POST /api/orchestrator/chat
|
|
func (h *Handler) OrchestratorChat(w http.ResponseWriter, r *http.Request) {
|
|
var req struct {
|
|
Messages []orchestrator.Message `json:"messages"`
|
|
Model string `json:"model,omitempty"`
|
|
MaxIter int `json:"maxIter,omitempty"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
respondError(w, http.StatusBadRequest, "invalid request body: "+err.Error())
|
|
return
|
|
}
|
|
if len(req.Messages) == 0 {
|
|
respondError(w, http.StatusBadRequest, "messages array is required")
|
|
return
|
|
}
|
|
|
|
log.Printf("[API] POST /api/orchestrator/chat — messages=%d model=%q", len(req.Messages), req.Model)
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), time.Duration(h.cfg.RequestTimeoutSecs)*time.Second)
|
|
defer cancel()
|
|
|
|
result := h.orch.Chat(ctx, req.Messages, req.Model, req.MaxIter)
|
|
respond(w, http.StatusOK, result)
|
|
}
|
|
|
|
// GET /api/orchestrator/config
|
|
func (h *Handler) OrchestratorConfig(w http.ResponseWriter, r *http.Request) {
|
|
cfg := h.orch.GetConfig()
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"id": cfg.ID,
|
|
"name": cfg.Name,
|
|
"model": cfg.Model,
|
|
"temperature": cfg.Temperature,
|
|
"maxTokens": cfg.MaxTokens,
|
|
"allowedTools": cfg.AllowedTools,
|
|
"systemPromptPreview": truncate(cfg.SystemPrompt, 200),
|
|
})
|
|
}
|
|
|
|
// ─── SSE Stream ───────────────────────────────────────────────────────────────
|
|
|
|
// SSE event types
|
|
const (
|
|
sseEventToolCall = "tool_call"
|
|
sseEventDelta = "delta"
|
|
sseEventDone = "done"
|
|
sseEventError = "error"
|
|
sseEventThinking = "thinking"
|
|
)
|
|
|
|
// streamEvent is a single SSE event sent to the client.
|
|
type streamEvent struct {
|
|
Type string `json:"type"`
|
|
// For delta events
|
|
Content string `json:"content,omitempty"`
|
|
// For tool_call events
|
|
Tool string `json:"tool,omitempty"`
|
|
Args any `json:"args,omitempty"`
|
|
Result any `json:"result,omitempty"`
|
|
Success *bool `json:"success,omitempty"`
|
|
DurationMs *int64 `json:"durationMs,omitempty"`
|
|
// For done events
|
|
Model string `json:"model,omitempty"`
|
|
ModelWarning string `json:"modelWarning,omitempty"`
|
|
Usage *llm.Usage `json:"usage,omitempty"`
|
|
// For error events
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// writeSSE writes a single SSE event to the response writer and flushes.
|
|
func writeSSE(w http.ResponseWriter, flusher http.Flusher, event streamEvent) {
|
|
data, err := json.Marshal(event)
|
|
if err != nil {
|
|
return
|
|
}
|
|
fmt.Fprintf(w, "data: %s\n\n", data)
|
|
flusher.Flush()
|
|
}
|
|
|
|
// POST /api/orchestrator/stream
|
|
// SSE endpoint: streams tool-call events and LLM delta tokens in real time.
|
|
func (h *Handler) OrchestratorStream(w http.ResponseWriter, r *http.Request) {
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
respondError(w, http.StatusInternalServerError, "streaming not supported")
|
|
return
|
|
}
|
|
|
|
var req struct {
|
|
Messages []orchestrator.Message `json:"messages"`
|
|
Model string `json:"model,omitempty"`
|
|
MaxIter int `json:"maxIter,omitempty"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
respondError(w, http.StatusBadRequest, "invalid request body: "+err.Error())
|
|
return
|
|
}
|
|
if len(req.Messages) == 0 {
|
|
respondError(w, http.StatusBadRequest, "messages array is required")
|
|
return
|
|
}
|
|
|
|
// Set SSE headers
|
|
w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
w.WriteHeader(http.StatusOK)
|
|
flusher.Flush()
|
|
|
|
log.Printf("[API] POST /api/orchestrator/stream — messages=%d model=%q", len(req.Messages), req.Model)
|
|
|
|
// Extract the last user message for history/metrics storage
|
|
userMessage := ""
|
|
for i := len(req.Messages) - 1; i >= 0; i-- {
|
|
if req.Messages[i].Role == "user" {
|
|
userMessage = req.Messages[i].Content
|
|
break
|
|
}
|
|
}
|
|
|
|
// Determine orchestrator agent ID (look for isOrchestrator=1 in DB)
|
|
orchAgentID := 1 // fallback to agent ID 1
|
|
if h.db != nil {
|
|
if cfg, err := h.db.GetOrchestratorConfig(); err == nil && cfg != nil {
|
|
orchAgentID = cfg.ID
|
|
}
|
|
}
|
|
|
|
startTime := time.Now()
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), time.Duration(h.cfg.RequestTimeoutSecs)*time.Second)
|
|
defer cancel()
|
|
|
|
// Run orchestration in a goroutine, streaming events via channel
|
|
type toolEvent struct {
|
|
step orchestrator.ToolCallStep
|
|
}
|
|
toolCh := make(chan toolEvent, 32)
|
|
doneCh := make(chan orchestrator.ChatResult, 1)
|
|
|
|
// Custom streaming orchestrator
|
|
go func() {
|
|
result := h.orch.ChatWithEvents(ctx, req.Messages, req.Model, req.MaxIter, func(step orchestrator.ToolCallStep) {
|
|
toolCh <- toolEvent{step: step}
|
|
})
|
|
close(toolCh)
|
|
doneCh <- result
|
|
}()
|
|
|
|
// Send thinking event
|
|
writeSSE(w, flusher, streamEvent{Type: sseEventThinking})
|
|
|
|
// Drain tool events
|
|
for ev := range toolCh {
|
|
success := ev.step.Success
|
|
dur := ev.step.DurationMs
|
|
writeSSE(w, flusher, streamEvent{
|
|
Type: sseEventToolCall,
|
|
Tool: ev.step.Tool,
|
|
Args: ev.step.Args,
|
|
Result: ev.step.Result,
|
|
Success: &success,
|
|
DurationMs: &dur,
|
|
Error: ev.step.Error,
|
|
})
|
|
}
|
|
|
|
// Get final result
|
|
result := <-doneCh
|
|
|
|
if !result.Success {
|
|
writeSSE(w, flusher, streamEvent{Type: sseEventError, Error: result.Error})
|
|
fmt.Fprintf(w, "data: [DONE]\n\n")
|
|
flusher.Flush()
|
|
// Persist error metric + history (fire-and-forget goroutine)
|
|
if h.db != nil {
|
|
go func() {
|
|
reqID := fmt.Sprintf("orch-%d", time.Now().UnixNano())
|
|
h.db.SaveMetric(db.MetricInput{
|
|
AgentID: orchAgentID,
|
|
RequestID: reqID,
|
|
UserMessage: userMessage,
|
|
ProcessingTimeMs: time.Since(startTime).Milliseconds(),
|
|
Status: "error",
|
|
ErrorMessage: result.Error,
|
|
Model: result.Model,
|
|
})
|
|
h.db.SaveHistory(db.HistoryInput{
|
|
AgentID: orchAgentID,
|
|
UserMessage: userMessage,
|
|
AgentResponse: "",
|
|
Status: "error",
|
|
})
|
|
}()
|
|
}
|
|
return
|
|
}
|
|
|
|
// Stream the response in rune-safe chunks (important for UTF-8 / Cyrillic).
|
|
// We convert to []rune first so we never split a multi-byte character.
|
|
const runeChunkSize = 6
|
|
runes := []rune(result.Response)
|
|
for i := 0; i < len(runes); i += runeChunkSize {
|
|
end := i + runeChunkSize
|
|
if end > len(runes) {
|
|
end = len(runes)
|
|
}
|
|
writeSSE(w, flusher, streamEvent{
|
|
Type: sseEventDelta,
|
|
Content: string(runes[i:end]),
|
|
})
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
|
|
// Send done event
|
|
writeSSE(w, flusher, streamEvent{
|
|
Type: sseEventDone,
|
|
Model: result.Model,
|
|
ModelWarning: result.ModelWarning,
|
|
Usage: result.Usage,
|
|
})
|
|
fmt.Fprintf(w, "data: [DONE]\n\n")
|
|
flusher.Flush()
|
|
|
|
// Persist metrics + history asynchronously (never blocks the response)
|
|
if h.db != nil {
|
|
go func() {
|
|
reqID := fmt.Sprintf("orch-%d", time.Now().UnixNano())
|
|
var inputTok, outputTok, totalTok int
|
|
if result.Usage != nil {
|
|
inputTok = result.Usage.PromptTokens
|
|
outputTok = result.Usage.CompletionTokens
|
|
totalTok = result.Usage.TotalTokens
|
|
}
|
|
toolNames := make([]string, len(result.ToolCalls))
|
|
for i, tc := range result.ToolCalls {
|
|
toolNames[i] = tc.Tool
|
|
}
|
|
h.db.SaveMetric(db.MetricInput{
|
|
AgentID: orchAgentID,
|
|
RequestID: reqID,
|
|
UserMessage: userMessage,
|
|
AgentResponse: result.Response,
|
|
InputTokens: inputTok,
|
|
OutputTokens: outputTok,
|
|
TotalTokens: totalTok,
|
|
ProcessingTimeMs: time.Since(startTime).Milliseconds(),
|
|
Status: "success",
|
|
ToolsCalled: toolNames,
|
|
Model: result.Model,
|
|
})
|
|
h.db.SaveHistory(db.HistoryInput{
|
|
AgentID: orchAgentID,
|
|
UserMessage: userMessage,
|
|
AgentResponse: result.Response,
|
|
Status: "success",
|
|
})
|
|
}()
|
|
}
|
|
}
|
|
|
|
// ─── Providers Reload ─────────────────────────────────────────────────────────
|
|
|
|
// POST /api/providers/reload
|
|
// Node.js calls this after activating a provider, sending the decrypted API key in the body.
|
|
// Body: { "name": "...", "baseUrl": "...", "apiKey": "...", "modelDefault": "..." }
|
|
func (h *Handler) ProvidersReload(w http.ResponseWriter, r *http.Request) {
|
|
// Try to read the decrypted credentials from the request body (preferred path)
|
|
var body struct {
|
|
Name string `json:"name"`
|
|
BaseURL string `json:"baseUrl"`
|
|
APIKey string `json:"apiKey"`
|
|
ModelDefault string `json:"modelDefault"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&body); err == nil && body.BaseURL != "" {
|
|
h.llm.UpdateCredentials(body.BaseURL, body.APIKey)
|
|
log.Printf("[API] Provider reloaded from body: %s (%s)", body.Name, body.BaseURL)
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"ok": true,
|
|
"name": body.Name,
|
|
"baseUrl": body.BaseURL,
|
|
})
|
|
return
|
|
}
|
|
|
|
// Fallback: try to read from DB (key will be empty since Go can't decrypt it)
|
|
if h.db != nil {
|
|
provider, err := h.db.GetActiveProvider()
|
|
if err == nil && provider != nil {
|
|
h.llm.UpdateCredentials(provider.BaseURL, provider.APIKey)
|
|
log.Printf("[API] Provider reloaded from DB: %s (%s)", provider.Name, provider.BaseURL)
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"ok": true,
|
|
"name": provider.Name,
|
|
"baseUrl": provider.BaseURL,
|
|
})
|
|
return
|
|
}
|
|
if err != nil {
|
|
log.Printf("[API] ProvidersReload: DB error: %v", err)
|
|
}
|
|
}
|
|
respond(w, http.StatusOK, map[string]any{"ok": true, "note": "No provider data received"})
|
|
}
|
|
|
|
// ─── Agents ───────────────────────────────────────────────────────────────────
|
|
|
|
// GET /api/agents
|
|
func (h *Handler) ListAgents(w http.ResponseWriter, r *http.Request) {
|
|
if h.db == nil {
|
|
respond(w, http.StatusOK, map[string]any{"agents": []any{}, "note": "DB not connected"})
|
|
return
|
|
}
|
|
agents, err := h.db.ListAgents()
|
|
if err != nil {
|
|
respondError(w, http.StatusInternalServerError, "failed to list agents: "+err.Error())
|
|
return
|
|
}
|
|
respond(w, http.StatusOK, map[string]any{"agents": agents, "count": len(agents)})
|
|
}
|
|
|
|
// GET /api/agents/{id}
|
|
func (h *Handler) GetAgent(w http.ResponseWriter, r *http.Request) {
|
|
idStr := r.PathValue("id")
|
|
id, err := strconv.Atoi(idStr)
|
|
if err != nil {
|
|
respondError(w, http.StatusBadRequest, "invalid agent id")
|
|
return
|
|
}
|
|
if h.db == nil {
|
|
respondError(w, http.StatusServiceUnavailable, "DB not connected")
|
|
return
|
|
}
|
|
agent, err := h.db.GetAgentByID(id)
|
|
if err != nil {
|
|
respondError(w, http.StatusNotFound, "agent not found")
|
|
return
|
|
}
|
|
respond(w, http.StatusOK, agent)
|
|
}
|
|
|
|
// ─── Models ───────────────────────────────────────────────────────────────────
|
|
|
|
// GET /api/models
|
|
func (h *Handler) ListModels(w http.ResponseWriter, r *http.Request) {
|
|
ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
|
|
defer cancel()
|
|
|
|
models, err := h.llm.ListModels(ctx)
|
|
if err != nil {
|
|
respondError(w, http.StatusBadGateway, "failed to fetch models: "+err.Error())
|
|
return
|
|
}
|
|
respond(w, http.StatusOK, models)
|
|
}
|
|
|
|
// ─── Tools ────────────────────────────────────────────────────────────────────
|
|
|
|
// GET /api/tools
|
|
func (h *Handler) ListTools(w http.ResponseWriter, r *http.Request) {
|
|
toolDefs := tools.OrchestratorTools()
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"tools": toolDefs,
|
|
"count": len(toolDefs),
|
|
})
|
|
}
|
|
|
|
// POST /api/tools/execute
|
|
func (h *Handler) ExecuteTool(w http.ResponseWriter, r *http.Request) {
|
|
var req struct {
|
|
Name string `json:"name"`
|
|
Arguments map[string]any `json:"arguments"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
respondError(w, http.StatusBadRequest, "invalid request body: "+err.Error())
|
|
return
|
|
}
|
|
if req.Name == "" {
|
|
respondError(w, http.StatusBadRequest, "tool name is required")
|
|
return
|
|
}
|
|
|
|
argsJSON, _ := json.Marshal(req.Arguments)
|
|
executor := tools.NewExecutor("/", nil)
|
|
result := executor.Execute(r.Context(), req.Name, string(argsJSON))
|
|
respond(w, http.StatusOK, map[string]any{"result": result})
|
|
}
|
|
|
|
// ─── Nodes ────────────────────────────────────────────────────────────────────
|
|
|
|
// NodeInfo is the unified node response sent to the frontend.
|
|
type NodeInfo struct {
|
|
ID string `json:"id"`
|
|
Hostname string `json:"hostname"`
|
|
Role string `json:"role"`
|
|
Status string `json:"status"`
|
|
Availability string `json:"availability"`
|
|
IP string `json:"ip"`
|
|
OS string `json:"os"`
|
|
Arch string `json:"arch"`
|
|
CPUCores int `json:"cpuCores"`
|
|
MemTotalMB int64 `json:"memTotalMB"`
|
|
DockerVersion string `json:"dockerVersion"`
|
|
IsLeader bool `json:"isLeader"`
|
|
ManagerAddr string `json:"managerAddr,omitempty"`
|
|
Labels map[string]string `json:"labels"`
|
|
UpdatedAt string `json:"updatedAt"`
|
|
}
|
|
|
|
// ContainerInfo is a slim container summary per node.
|
|
type ContainerInfo struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Image string `json:"image"`
|
|
State string `json:"state"`
|
|
Status string `json:"status"`
|
|
}
|
|
|
|
// GET /api/nodes
|
|
func (h *Handler) ListNodes(w http.ResponseWriter, r *http.Request) {
|
|
// Check if Swarm is active
|
|
swarmActive := h.docker.IsSwarmActive()
|
|
|
|
if swarmActive {
|
|
// Return real Swarm nodes
|
|
nodes, err := h.docker.ListNodes()
|
|
if err != nil {
|
|
log.Printf("[API] ListNodes swarm error: %v — falling back to local info", err)
|
|
h.listLocalNode(w, r)
|
|
return
|
|
}
|
|
|
|
result := make([]NodeInfo, 0, len(nodes))
|
|
for _, n := range nodes {
|
|
info := NodeInfo{
|
|
ID: n.ID[:12],
|
|
Hostname: n.Description.Hostname,
|
|
Role: n.Spec.Role,
|
|
Status: n.Status.State,
|
|
Availability: n.Spec.Availability,
|
|
IP: n.Status.Addr,
|
|
OS: n.Description.Platform.OS,
|
|
Arch: n.Description.Platform.Architecture,
|
|
CPUCores: int(n.Description.Resources.NanoCPUs / 1e9),
|
|
MemTotalMB: n.Description.Resources.MemoryBytes / (1024 * 1024),
|
|
DockerVersion: n.Description.Engine.EngineVersion,
|
|
Labels: n.Spec.Labels,
|
|
UpdatedAt: n.UpdatedAt.UTC().Format(time.RFC3339),
|
|
}
|
|
if n.ManagerStatus != nil {
|
|
info.IsLeader = n.ManagerStatus.Leader
|
|
info.ManagerAddr = n.ManagerStatus.Addr
|
|
}
|
|
if info.Labels == nil {
|
|
info.Labels = map[string]string{}
|
|
}
|
|
result = append(result, info)
|
|
}
|
|
|
|
swarmInfo, _ := h.docker.GetSwarmInfo()
|
|
managers, totalNodes := 0, len(result)
|
|
if swarmInfo != nil {
|
|
managers = swarmInfo.Swarm.Managers
|
|
totalNodes = swarmInfo.Swarm.Nodes
|
|
}
|
|
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"nodes": result,
|
|
"count": len(result),
|
|
"swarmActive": true,
|
|
"managers": managers,
|
|
"totalNodes": totalNodes,
|
|
"fetchedAt": time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
return
|
|
}
|
|
|
|
// Swarm not active — return local Docker host info
|
|
h.listLocalNode(w, r)
|
|
}
|
|
|
|
// listLocalNode returns info about the current Docker host as a single "node".
|
|
func (h *Handler) listLocalNode(w http.ResponseWriter, r *http.Request) {
|
|
info, err := h.docker.GetSwarmInfo()
|
|
hostname := "localhost"
|
|
if err == nil && info != nil {
|
|
_ = info // use for future enrichment
|
|
}
|
|
|
|
// Get containers running on this host
|
|
containers, _ := h.docker.ListContainers()
|
|
containerInfos := make([]ContainerInfo, 0, len(containers))
|
|
for _, c := range containers {
|
|
name := c.ID[:12]
|
|
if len(c.Names) > 0 {
|
|
name = c.Names[0]
|
|
if len(name) > 0 && name[0] == '/' {
|
|
name = name[1:]
|
|
}
|
|
}
|
|
containerInfos = append(containerInfos, ContainerInfo{
|
|
ID: c.ID[:12],
|
|
Name: name,
|
|
Image: c.Image,
|
|
State: c.State,
|
|
Status: c.Status,
|
|
})
|
|
}
|
|
|
|
node := NodeInfo{
|
|
ID: "local-01",
|
|
Hostname: hostname,
|
|
Role: "standalone",
|
|
Status: "ready",
|
|
Availability: "active",
|
|
IP: "127.0.0.1",
|
|
DockerVersion: "unknown",
|
|
Labels: map[string]string{},
|
|
UpdatedAt: time.Now().UTC().Format(time.RFC3339),
|
|
}
|
|
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"nodes": []NodeInfo{node},
|
|
"count": 1,
|
|
"swarmActive": false,
|
|
"containers": containerInfos,
|
|
"fetchedAt": time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
}
|
|
|
|
// GET /api/nodes/stats
|
|
// Returns live container stats (CPU%, RAM) for containers on this host.
|
|
func (h *Handler) NodeStats(w http.ResponseWriter, r *http.Request) {
|
|
containers, err := h.docker.ListContainers()
|
|
if err != nil {
|
|
respondError(w, http.StatusInternalServerError, "failed to list containers: "+err.Error())
|
|
return
|
|
}
|
|
|
|
type ContainerStat struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
CPUPct float64 `json:"cpuPct"`
|
|
MemUseMB float64 `json:"memUseMB"`
|
|
MemLimMB float64 `json:"memLimMB"`
|
|
MemPct float64 `json:"memPct"`
|
|
}
|
|
|
|
stats := make([]ContainerStat, 0, len(containers))
|
|
for _, c := range containers {
|
|
s, err := h.docker.GetContainerStats(c.ID)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
name := c.ID[:12]
|
|
if len(c.Names) > 0 {
|
|
name = c.Names[0]
|
|
if len(name) > 0 && name[0] == '/' {
|
|
name = name[1:]
|
|
}
|
|
}
|
|
cpuPct := dockerclient.CalcCPUPercent(s)
|
|
memUse := float64(s.MemoryStats.Usage) / (1024 * 1024)
|
|
memLim := float64(s.MemoryStats.Limit) / (1024 * 1024)
|
|
memPct := 0.0
|
|
if memLim > 0 {
|
|
memPct = (memUse / memLim) * 100
|
|
}
|
|
stats = append(stats, ContainerStat{
|
|
ID: c.ID[:12],
|
|
Name: name,
|
|
CPUPct: round2(cpuPct),
|
|
MemUseMB: round2(memUse),
|
|
MemLimMB: round2(memLim),
|
|
MemPct: round2(memPct),
|
|
})
|
|
}
|
|
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"stats": stats,
|
|
"count": len(stats),
|
|
"fetchedAt": time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
}
|
|
|
|
// ─── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
func respond(w http.ResponseWriter, status int, data any) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
w.WriteHeader(status)
|
|
_ = json.NewEncoder(w).Encode(data)
|
|
}
|
|
|
|
func respondError(w http.ResponseWriter, status int, msg string) {
|
|
respond(w, status, map[string]any{"error": msg})
|
|
}
|
|
|
|
func truncate(s string, n int) string {
|
|
if len(s) <= n {
|
|
return s
|
|
}
|
|
return s[:n] + "..."
|
|
}
|
|
|
|
func round2(f float64) float64 {
|
|
return float64(int(f*100)) / 100
|
|
}
|
|
|
|
func init() {
|
|
_ = fmt.Sprintf // suppress unused import
|
|
}
|
|
|
|
// ─── Persistent Chat Sessions ─────────────────────────────────────────────────
|
|
|
|
// POST /api/chat/session
|
|
// Creates a DB session, fires off the orchestrator in the background,
|
|
// returns {"sessionId":"..."} immediately. The client polls for events.
|
|
func (h *Handler) StartChatSession(w http.ResponseWriter, r *http.Request) {
|
|
if h.db == nil {
|
|
respondError(w, http.StatusServiceUnavailable, "DB not connected — persistent sessions unavailable")
|
|
return
|
|
}
|
|
|
|
var req struct {
|
|
Messages []orchestrator.Message `json:"messages"`
|
|
Model string `json:"model,omitempty"`
|
|
MaxIter int `json:"maxIter,omitempty"`
|
|
SessionID string `json:"sessionId,omitempty"` // client can supply its own ID
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
respondError(w, http.StatusBadRequest, "invalid body: "+err.Error())
|
|
return
|
|
}
|
|
if len(req.Messages) == 0 {
|
|
respondError(w, http.StatusBadRequest, "messages array is required")
|
|
return
|
|
}
|
|
|
|
// Use client-supplied ID or generate one
|
|
sessionID := req.SessionID
|
|
if sessionID == "" {
|
|
sessionID = fmt.Sprintf("cs-%d", time.Now().UnixNano())
|
|
}
|
|
|
|
// Extract last user message for storage
|
|
userMessage := ""
|
|
for i := len(req.Messages) - 1; i >= 0; i-- {
|
|
if req.Messages[i].Role == "user" {
|
|
userMessage = req.Messages[i].Content
|
|
break
|
|
}
|
|
}
|
|
|
|
// Resolve orchestrator agent ID
|
|
orchAgentID := 1
|
|
if cfg, err := h.db.GetOrchestratorConfig(); err == nil && cfg != nil {
|
|
orchAgentID = cfg.ID
|
|
}
|
|
|
|
// Create session row in DB
|
|
if err := h.db.CreateSession(sessionID, userMessage, orchAgentID); err != nil {
|
|
respondError(w, http.StatusInternalServerError, "failed to create session: "+err.Error())
|
|
return
|
|
}
|
|
|
|
maxIter := req.MaxIter
|
|
if maxIter <= 0 {
|
|
maxIter = 10
|
|
}
|
|
model := req.Model
|
|
|
|
// Snapshot messages + config for the goroutine
|
|
messages := req.Messages
|
|
|
|
// Launch orchestration in a fully detached goroutine.
|
|
// This goroutine runs independently — survives HTTP disconnect.
|
|
go func() {
|
|
startTime := time.Now()
|
|
|
|
// Append initial "thinking" event
|
|
_ = h.db.AppendEvent(db.ChatEventRow{
|
|
SessionID: sessionID,
|
|
EventType: "thinking",
|
|
})
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(),
|
|
time.Duration(h.cfg.RequestTimeoutSecs)*time.Second)
|
|
defer cancel()
|
|
|
|
result := h.orch.ChatWithEventsAndRetry(ctx, messages, model, maxIter,
|
|
// onToolCall — store each tool execution as an event
|
|
func(step orchestrator.ToolCallStep) {
|
|
argsJSON, _ := json.Marshal(step.Args)
|
|
resultStr := ""
|
|
if step.Result != nil {
|
|
b, _ := json.Marshal(step.Result)
|
|
resultStr = string(b)
|
|
}
|
|
_ = h.db.AppendEvent(db.ChatEventRow{
|
|
SessionID: sessionID,
|
|
EventType: "tool_call",
|
|
ToolName: step.Tool,
|
|
ToolArgs: string(argsJSON),
|
|
ToolResult: resultStr,
|
|
ToolSuccess: step.Success,
|
|
DurationMs: int(step.DurationMs),
|
|
ErrorMsg: step.Error,
|
|
})
|
|
},
|
|
// onRetry — emit a "thinking" event so the client sees retry progress
|
|
func(attempt int, reason string) {
|
|
msg := fmt.Sprintf("⟳ Retry %d: %s", attempt, reason)
|
|
log.Printf("[Orchestrator] %s", msg)
|
|
_ = h.db.AppendEvent(db.ChatEventRow{
|
|
SessionID: sessionID,
|
|
EventType: "thinking",
|
|
Content: msg,
|
|
})
|
|
},
|
|
)
|
|
|
|
processingMs := time.Since(startTime).Milliseconds()
|
|
|
|
if !result.Success {
|
|
_ = h.db.AppendEvent(db.ChatEventRow{
|
|
SessionID: sessionID,
|
|
EventType: "error",
|
|
ErrorMsg: result.Error,
|
|
})
|
|
h.db.MarkSessionDone(sessionID, "error", "", result.Model, result.Error, 0, processingMs)
|
|
return
|
|
}
|
|
|
|
// Append full response as a single delta (client will display it)
|
|
_ = h.db.AppendEvent(db.ChatEventRow{
|
|
SessionID: sessionID,
|
|
EventType: "delta",
|
|
Content: result.Response,
|
|
})
|
|
|
|
// Append done event
|
|
totalTok := 0
|
|
usageStr := "null"
|
|
if result.Usage != nil {
|
|
totalTok = result.Usage.TotalTokens
|
|
b, _ := json.Marshal(result.Usage)
|
|
usageStr = string(b)
|
|
}
|
|
_ = h.db.AppendEvent(db.ChatEventRow{
|
|
SessionID: sessionID,
|
|
EventType: "done",
|
|
Model: result.Model,
|
|
UsageJSON: usageStr,
|
|
})
|
|
|
|
h.db.MarkSessionDone(sessionID, "done", result.Response, result.Model, "", totalTok, processingMs)
|
|
|
|
// Also save to legacy metrics/history tables
|
|
reqID := fmt.Sprintf("orch-%d", time.Now().UnixNano())
|
|
toolNames := make([]string, len(result.ToolCalls))
|
|
for i, tc := range result.ToolCalls {
|
|
toolNames[i] = tc.Tool
|
|
}
|
|
inputTok, outputTok := 0, 0
|
|
if result.Usage != nil {
|
|
inputTok = result.Usage.PromptTokens
|
|
outputTok = result.Usage.CompletionTokens
|
|
}
|
|
h.db.SaveMetric(db.MetricInput{
|
|
AgentID: orchAgentID,
|
|
RequestID: reqID,
|
|
UserMessage: userMessage,
|
|
AgentResponse: result.Response,
|
|
InputTokens: inputTok,
|
|
OutputTokens: outputTok,
|
|
TotalTokens: totalTok,
|
|
ProcessingTimeMs: processingMs,
|
|
Status: "success",
|
|
ToolsCalled: toolNames,
|
|
Model: result.Model,
|
|
})
|
|
h.db.SaveHistory(db.HistoryInput{
|
|
AgentID: orchAgentID,
|
|
UserMessage: userMessage,
|
|
AgentResponse: result.Response,
|
|
Status: "success",
|
|
})
|
|
}()
|
|
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"sessionId": sessionID,
|
|
"status": "running",
|
|
})
|
|
}
|
|
|
|
// GET /api/chat/session/:id
|
|
func (h *Handler) GetChatSession(w http.ResponseWriter, r *http.Request) {
|
|
sessionID := r.PathValue("id")
|
|
if sessionID == "" {
|
|
respondError(w, http.StatusBadRequest, "sessionId required")
|
|
return
|
|
}
|
|
if h.db == nil {
|
|
respondError(w, http.StatusServiceUnavailable, "DB not connected")
|
|
return
|
|
}
|
|
sess, err := h.db.GetSession(sessionID)
|
|
if err != nil {
|
|
respondError(w, http.StatusNotFound, "session not found")
|
|
return
|
|
}
|
|
respond(w, http.StatusOK, sess)
|
|
}
|
|
|
|
// GET /api/chat/session/:id/events?after=N
|
|
func (h *Handler) GetChatEvents(w http.ResponseWriter, r *http.Request) {
|
|
sessionID := r.PathValue("id")
|
|
if sessionID == "" {
|
|
respondError(w, http.StatusBadRequest, "sessionId required")
|
|
return
|
|
}
|
|
afterSeq := 0
|
|
if v := r.URL.Query().Get("after"); v != "" {
|
|
fmt.Sscanf(v, "%d", &afterSeq)
|
|
}
|
|
if h.db == nil {
|
|
respondError(w, http.StatusServiceUnavailable, "DB not connected")
|
|
return
|
|
}
|
|
events, err := h.db.GetEvents(sessionID, afterSeq)
|
|
if err != nil {
|
|
respondError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
// Also return session status so client knows when to stop polling
|
|
var status string
|
|
if sess, err := h.db.GetSession(sessionID); err == nil {
|
|
status = sess.Status
|
|
}
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"sessionId": sessionID,
|
|
"status": status,
|
|
"events": events,
|
|
})
|
|
}
|
|
|
|
// GET /api/chat/sessions?limit=N
|
|
func (h *Handler) ListChatSessions(w http.ResponseWriter, r *http.Request) {
|
|
if h.db == nil {
|
|
respond(w, http.StatusOK, map[string]any{"sessions": []any{}})
|
|
return
|
|
}
|
|
limit := 50
|
|
if v := r.URL.Query().Get("limit"); v != "" {
|
|
fmt.Sscanf(v, "%d", &limit)
|
|
}
|
|
sessions, err := h.db.GetRecentSessions(limit)
|
|
if err != nil {
|
|
respondError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
respond(w, http.StatusOK, map[string]any{"sessions": sessions})
|
|
}
|
|
|
|
// ─── Real Docker Swarm Management ─────────────────────────────────────────────
|
|
|
|
// GET /api/swarm/info
|
|
// Returns swarm status, node count, join tokens, and manager address.
|
|
func (h *Handler) SwarmInfo(w http.ResponseWriter, r *http.Request) {
|
|
info, err := h.docker.GetSwarmInfo()
|
|
if err != nil {
|
|
respondError(w, http.StatusInternalServerError, "docker info error: "+err.Error())
|
|
return
|
|
}
|
|
tokens, err := h.docker.GetJoinTokens()
|
|
if err != nil {
|
|
tokens = nil
|
|
}
|
|
managerAddr := h.docker.GetManagerAddr()
|
|
|
|
result := map[string]any{
|
|
"nodeId": info.Swarm.NodeID,
|
|
"localNodeState": info.Swarm.LocalNodeState,
|
|
"isManager": info.Swarm.ControlAvailable,
|
|
"managers": info.Swarm.Managers,
|
|
"nodes": info.Swarm.Nodes,
|
|
"managerAddr": managerAddr,
|
|
}
|
|
if tokens != nil {
|
|
result["joinTokens"] = map[string]string{
|
|
"worker": tokens.JoinTokens.Worker,
|
|
"manager": tokens.JoinTokens.Manager,
|
|
}
|
|
}
|
|
respond(w, http.StatusOK, result)
|
|
}
|
|
|
|
// GET /api/swarm/nodes
|
|
// Returns all swarm nodes with their live status, labels, and resource info.
|
|
func (h *Handler) SwarmNodes(w http.ResponseWriter, r *http.Request) {
|
|
nodes, err := h.docker.ListNodes()
|
|
if err != nil {
|
|
respondError(w, http.StatusInternalServerError, "list nodes: "+err.Error())
|
|
return
|
|
}
|
|
|
|
type NodeOut struct {
|
|
ID string `json:"id"`
|
|
Hostname string `json:"hostname"`
|
|
Role string `json:"role"`
|
|
State string `json:"state"`
|
|
Availability string `json:"availability"`
|
|
IP string `json:"ip"`
|
|
OS string `json:"os"`
|
|
Arch string `json:"arch"`
|
|
CPUCores int `json:"cpuCores"`
|
|
MemTotalMB int64 `json:"memTotalMB"`
|
|
DockerVersion string `json:"dockerVersion"`
|
|
IsLeader bool `json:"isLeader"`
|
|
ManagerAddr string `json:"managerAddr,omitempty"`
|
|
Labels map[string]string `json:"labels"`
|
|
UpdatedAt string `json:"updatedAt"`
|
|
}
|
|
|
|
out := make([]NodeOut, 0, len(nodes))
|
|
for _, n := range nodes {
|
|
no := NodeOut{
|
|
ID: n.ID[:min(12, len(n.ID))],
|
|
Hostname: n.Description.Hostname,
|
|
Role: n.Spec.Role,
|
|
State: n.Status.State,
|
|
Availability: n.Spec.Availability,
|
|
IP: n.Status.Addr,
|
|
OS: n.Description.Platform.OS,
|
|
Arch: n.Description.Platform.Architecture,
|
|
CPUCores: int(n.Description.Resources.NanoCPUs / 1e9),
|
|
MemTotalMB: n.Description.Resources.MemoryBytes / (1024 * 1024),
|
|
DockerVersion: n.Description.Engine.EngineVersion,
|
|
Labels: n.Spec.Labels,
|
|
UpdatedAt: n.UpdatedAt.UTC().Format(time.RFC3339),
|
|
}
|
|
if no.Labels == nil {
|
|
no.Labels = map[string]string{}
|
|
}
|
|
if n.ManagerStatus != nil {
|
|
no.IsLeader = n.ManagerStatus.Leader
|
|
no.ManagerAddr = n.ManagerStatus.Addr
|
|
}
|
|
out = append(out, no)
|
|
}
|
|
|
|
// Persist/update nodes in DB for history
|
|
if h.db != nil {
|
|
go h.db.UpsertSwarmNodes(out)
|
|
}
|
|
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"nodes": out,
|
|
"count": len(out),
|
|
"fetchedAt": time.Now().UTC().Format(time.RFC3339),
|
|
})
|
|
}
|
|
|
|
// POST /api/swarm/nodes/{id}/label
|
|
// Body: { "key": "gpu", "value": "true" }
|
|
func (h *Handler) SwarmAddNodeLabel(w http.ResponseWriter, r *http.Request) {
|
|
nodeID := r.PathValue("id")
|
|
if nodeID == "" {
|
|
respondError(w, http.StatusBadRequest, "nodeId required")
|
|
return
|
|
}
|
|
var body struct {
|
|
Key string `json:"key"`
|
|
Value string `json:"value"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Key == "" {
|
|
respondError(w, http.StatusBadRequest, "key required in body")
|
|
return
|
|
}
|
|
if err := h.docker.AddNodeLabel(nodeID, body.Key, body.Value); err != nil {
|
|
respondError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
respond(w, http.StatusOK, map[string]any{"ok": true})
|
|
}
|
|
|
|
// POST /api/swarm/nodes/{id}/availability
|
|
// Body: { "availability": "active|pause|drain" }
|
|
func (h *Handler) SwarmSetNodeAvailability(w http.ResponseWriter, r *http.Request) {
|
|
nodeID := r.PathValue("id")
|
|
var body struct {
|
|
Availability string `json:"availability"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
|
respondError(w, http.StatusBadRequest, "invalid body")
|
|
return
|
|
}
|
|
if body.Availability != "active" && body.Availability != "pause" && body.Availability != "drain" {
|
|
respondError(w, http.StatusBadRequest, "availability must be active|pause|drain")
|
|
return
|
|
}
|
|
if err := h.docker.UpdateNodeAvailability(nodeID, body.Availability); err != nil {
|
|
respondError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
respond(w, http.StatusOK, map[string]any{"ok": true})
|
|
}
|
|
|
|
// GET /api/swarm/services
|
|
// Returns all swarm services with replica counts and task status.
|
|
func (h *Handler) SwarmServices(w http.ResponseWriter, r *http.Request) {
|
|
services, err := h.docker.ListServices()
|
|
if err != nil {
|
|
respondError(w, http.StatusInternalServerError, "list services: "+err.Error())
|
|
return
|
|
}
|
|
|
|
type ServiceOut struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Image string `json:"image"`
|
|
Mode string `json:"mode"` // replicated | global
|
|
DesiredReplicas int `json:"desiredReplicas"`
|
|
RunningTasks int `json:"runningTasks"`
|
|
DesiredTasks int `json:"desiredTasks"`
|
|
Labels map[string]string `json:"labels"`
|
|
UpdatedAt string `json:"updatedAt"`
|
|
Ports []string `json:"ports"`
|
|
IsGoClaw bool `json:"isGoClaw"` // goclaw.agent label present
|
|
}
|
|
|
|
out := make([]ServiceOut, 0, len(services))
|
|
for _, svc := range services {
|
|
mode := "replicated"
|
|
desired := 0
|
|
if svc.Spec.Mode.Replicated != nil {
|
|
desired = svc.Spec.Mode.Replicated.Replicas
|
|
} else if svc.Spec.Mode.Global != nil {
|
|
mode = "global"
|
|
}
|
|
running, desiredT := 0, 0
|
|
if svc.ServiceStatus != nil {
|
|
running = svc.ServiceStatus.RunningTasks
|
|
desiredT = svc.ServiceStatus.DesiredTasks
|
|
}
|
|
var ports []string
|
|
if svc.Spec.EndpointSpec != nil {
|
|
for _, p := range svc.Spec.EndpointSpec.Ports {
|
|
if p.PublishedPort > 0 {
|
|
ports = append(ports, fmt.Sprintf("%d:%d/%s", p.PublishedPort, p.TargetPort, p.Protocol))
|
|
}
|
|
}
|
|
}
|
|
labels := svc.Spec.Labels
|
|
if labels == nil {
|
|
labels = map[string]string{}
|
|
}
|
|
_, isGoClaw := labels["goclaw.agent"]
|
|
out = append(out, ServiceOut{
|
|
ID: svc.ID[:min(12, len(svc.ID))],
|
|
Name: svc.Spec.Name,
|
|
Image: svc.Spec.TaskTemplate.ContainerSpec.Image,
|
|
Mode: mode,
|
|
DesiredReplicas: desired,
|
|
RunningTasks: running,
|
|
DesiredTasks: desiredT,
|
|
Labels: labels,
|
|
UpdatedAt: svc.UpdatedAt.UTC().Format(time.RFC3339),
|
|
Ports: ports,
|
|
IsGoClaw: isGoClaw,
|
|
})
|
|
}
|
|
respond(w, http.StatusOK, map[string]any{"services": out, "count": len(out)})
|
|
}
|
|
|
|
// GET /api/swarm/services/{id}/tasks
|
|
// Returns all tasks for a service (shows which node each replica runs on).
|
|
func (h *Handler) SwarmServiceTasks(w http.ResponseWriter, r *http.Request) {
|
|
serviceID := r.PathValue("id")
|
|
if serviceID == "" {
|
|
respondError(w, http.StatusBadRequest, "serviceId required")
|
|
return
|
|
}
|
|
tasks, err := h.docker.ListServiceTasks(serviceID)
|
|
if err != nil {
|
|
respondError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
|
|
type TaskOut struct {
|
|
ID string `json:"id"`
|
|
ServiceID string `json:"serviceId"`
|
|
NodeID string `json:"nodeId"`
|
|
Slot int `json:"slot"`
|
|
State string `json:"state"`
|
|
Message string `json:"message"`
|
|
ContainerID string `json:"containerId"`
|
|
UpdatedAt string `json:"updatedAt"`
|
|
}
|
|
out := make([]TaskOut, 0, len(tasks))
|
|
for _, t := range tasks {
|
|
cid := ""
|
|
if t.Status.ContainerStatus != nil {
|
|
cid = t.Status.ContainerStatus.ContainerID
|
|
if len(cid) > 12 {
|
|
cid = cid[:12]
|
|
}
|
|
}
|
|
out = append(out, TaskOut{
|
|
ID: t.ID[:min(12, len(t.ID))],
|
|
ServiceID: t.ServiceID[:min(12, len(t.ServiceID))],
|
|
NodeID: t.NodeID[:min(12, len(t.NodeID))],
|
|
Slot: t.Slot,
|
|
State: t.Status.State,
|
|
Message: t.Status.Message,
|
|
ContainerID: cid,
|
|
UpdatedAt: t.UpdatedAt.UTC().Format(time.RFC3339),
|
|
})
|
|
}
|
|
respond(w, http.StatusOK, map[string]any{"tasks": out, "count": len(out)})
|
|
}
|
|
|
|
// POST /api/swarm/services/{id}/scale
|
|
// Body: { "replicas": 3 }
|
|
func (h *Handler) SwarmScaleService(w http.ResponseWriter, r *http.Request) {
|
|
serviceID := r.PathValue("id")
|
|
if serviceID == "" {
|
|
respondError(w, http.StatusBadRequest, "serviceId required")
|
|
return
|
|
}
|
|
var body struct {
|
|
Replicas int `json:"replicas"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
|
respondError(w, http.StatusBadRequest, "invalid body: "+err.Error())
|
|
return
|
|
}
|
|
if body.Replicas < 0 || body.Replicas > 100 {
|
|
respondError(w, http.StatusBadRequest, "replicas must be 0-100")
|
|
return
|
|
}
|
|
if err := h.docker.ScaleService(serviceID, body.Replicas); err != nil {
|
|
respondError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
log.Printf("[Swarm] Scaled service %s to %d replicas", serviceID, body.Replicas)
|
|
respond(w, http.StatusOK, map[string]any{"ok": true, "replicas": body.Replicas})
|
|
}
|
|
|
|
// POST /api/swarm/services/create
|
|
// Deploy a new GoClaw agent as a Swarm service.
|
|
// Body: { "name": "agent-researcher", "image": "goclaw-gateway:latest", "replicas": 2, "env": ["KEY=val"], "port": 0, "networks": ["goclaw-net"] }
|
|
func (h *Handler) SwarmCreateService(w http.ResponseWriter, r *http.Request) {
|
|
var body struct {
|
|
Name string `json:"name"`
|
|
Image string `json:"image"`
|
|
Replicas int `json:"replicas"`
|
|
Env []string `json:"env"`
|
|
Port int `json:"port"`
|
|
Networks []string `json:"networks"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Name == "" || body.Image == "" {
|
|
respondError(w, http.StatusBadRequest, "name and image required")
|
|
return
|
|
}
|
|
if body.Replicas <= 0 {
|
|
body.Replicas = 1
|
|
}
|
|
svc, err := h.docker.CreateAgentServiceFull(dockerclient.CreateAgentServiceOpts{
|
|
Name: body.Name,
|
|
Image: body.Image,
|
|
Replicas: body.Replicas,
|
|
Env: body.Env,
|
|
Port: body.Port,
|
|
Networks: body.Networks,
|
|
})
|
|
if err != nil {
|
|
respondError(w, http.StatusInternalServerError, "create service: "+err.Error())
|
|
return
|
|
}
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"ok": true,
|
|
"serviceId": svc.ID,
|
|
"name": svc.Spec.Name,
|
|
})
|
|
}
|
|
|
|
// DELETE /api/swarm/services/{id}
|
|
// Remove (stop) a swarm service.
|
|
func (h *Handler) SwarmRemoveService(w http.ResponseWriter, r *http.Request) {
|
|
serviceID := r.PathValue("id")
|
|
if serviceID == "" {
|
|
respondError(w, http.StatusBadRequest, "service id required")
|
|
return
|
|
}
|
|
if err := h.docker.RemoveService(serviceID); err != nil {
|
|
respondError(w, http.StatusInternalServerError, "remove service: "+err.Error())
|
|
return
|
|
}
|
|
log.Printf("[Swarm] Removed service %s", serviceID)
|
|
respond(w, http.StatusOK, map[string]any{"ok": true})
|
|
}
|
|
|
|
// GET /api/swarm/agents
|
|
// List all GoClaw agent services with idle time information.
|
|
func (h *Handler) SwarmListAgents(w http.ResponseWriter, r *http.Request) {
|
|
services, err := h.docker.ListServices()
|
|
if err != nil {
|
|
respondError(w, http.StatusInternalServerError, "list services: "+err.Error())
|
|
return
|
|
}
|
|
|
|
type AgentInfo struct {
|
|
ID string `json:"id"`
|
|
Name string `json:"name"`
|
|
Image string `json:"image"`
|
|
DesiredReplicas int `json:"desiredReplicas"`
|
|
RunningTasks int `json:"runningTasks"`
|
|
LastActivity time.Time `json:"lastActivity"`
|
|
IdleMinutes float64 `json:"idleMinutes"`
|
|
IsGoClaw bool `json:"isGoClaw"`
|
|
}
|
|
|
|
var agents []AgentInfo
|
|
for _, svc := range services {
|
|
isGoClaw := svc.Spec.Labels["goclaw.agent"] == "true"
|
|
desired := 0
|
|
if svc.Spec.Mode.Replicated != nil {
|
|
desired = svc.Spec.Mode.Replicated.Replicas
|
|
}
|
|
running := 0
|
|
if svc.ServiceStatus != nil {
|
|
running = svc.ServiceStatus.RunningTasks
|
|
}
|
|
lastActivity, _ := h.docker.GetServiceLastActivity(svc.ID)
|
|
if lastActivity.IsZero() {
|
|
lastActivity = svc.UpdatedAt
|
|
}
|
|
idle := time.Since(lastActivity).Minutes()
|
|
agents = append(agents, AgentInfo{
|
|
ID: svc.ID,
|
|
Name: svc.Spec.Name,
|
|
Image: svc.Spec.TaskTemplate.ContainerSpec.Image,
|
|
DesiredReplicas: desired,
|
|
RunningTasks: running,
|
|
LastActivity: lastActivity,
|
|
IdleMinutes: idle,
|
|
IsGoClaw: isGoClaw,
|
|
})
|
|
}
|
|
if agents == nil {
|
|
agents = []AgentInfo{}
|
|
}
|
|
respond(w, http.StatusOK, map[string]any{"agents": agents, "count": len(agents)})
|
|
}
|
|
|
|
|
|
// POST /api/swarm/shell
|
|
// Execute a shell command on the HOST system (via nsenter into PID 1).
|
|
// Body: { "command": "docker ps" }
|
|
// Requires the gateway container to run with privileged: true + pid: host
|
|
func (h *Handler) SwarmShell(w http.ResponseWriter, r *http.Request) {
|
|
var body struct {
|
|
Command string `json:"command"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil || body.Command == "" {
|
|
respondError(w, http.StatusBadRequest, "command required")
|
|
return
|
|
}
|
|
// Security: reject obviously dangerous patterns in production
|
|
// (In a real deployment you'd add auth + command whitelisting)
|
|
dangerous := []string{"rm -rf /", "mkfs", "dd if=/dev/zero", ":(){:|:&};:"}
|
|
cmdLower := strings.ToLower(body.Command)
|
|
for _, d := range dangerous {
|
|
if strings.Contains(cmdLower, d) {
|
|
respondError(w, http.StatusForbidden, "dangerous command rejected")
|
|
return
|
|
}
|
|
}
|
|
|
|
log.Printf("[SwarmShell] Executing: %s", body.Command)
|
|
output, err := dockerclient.ExecOnHost(body.Command)
|
|
if err != nil {
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"output": output,
|
|
"error": err.Error(),
|
|
"success": false,
|
|
})
|
|
return
|
|
}
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"output": output,
|
|
"success": true,
|
|
})
|
|
}
|
|
|
|
// GET /api/swarm/join-token?role=worker|manager
|
|
// Returns the join command for adding a new node to the swarm.
|
|
func (h *Handler) SwarmJoinToken(w http.ResponseWriter, r *http.Request) {
|
|
role := r.URL.Query().Get("role")
|
|
if role == "" {
|
|
role = "worker"
|
|
}
|
|
tokens, err := h.docker.GetJoinTokens()
|
|
if err != nil {
|
|
respondError(w, http.StatusInternalServerError, "cannot get join tokens: "+err.Error())
|
|
return
|
|
}
|
|
managerAddr := h.docker.GetManagerAddr()
|
|
|
|
var token string
|
|
if role == "manager" {
|
|
token = tokens.JoinTokens.Manager
|
|
} else {
|
|
token = tokens.JoinTokens.Worker
|
|
}
|
|
|
|
joinCmd := fmt.Sprintf("docker swarm join --token %s %s", token, managerAddr)
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"role": role,
|
|
"token": token,
|
|
"managerAddr": managerAddr,
|
|
"joinCommand": joinCmd,
|
|
})
|
|
}
|
|
|
|
func min(a, b int) int {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
// POST /api/swarm/join-node
|
|
// Connects to a remote host via SSH and runs "docker swarm join ..." to add it to the cluster.
|
|
// Body: { "host": "1.2.3.4", "port": 22, "user": "root", "password": "secret", "role": "worker" }
|
|
func (h *Handler) SwarmJoinNodeViaSSH(w http.ResponseWriter, r *http.Request) {
|
|
var body struct {
|
|
Host string `json:"host"`
|
|
Port int `json:"port"`
|
|
User string `json:"user"`
|
|
Password string `json:"password"`
|
|
Role string `json:"role"` // "worker" | "manager"
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
|
respondError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
if body.Host == "" || body.User == "" || body.Password == "" {
|
|
respondError(w, http.StatusBadRequest, "host, user and password are required")
|
|
return
|
|
}
|
|
if body.Port == 0 {
|
|
body.Port = 22
|
|
}
|
|
if body.Role == "" {
|
|
body.Role = "worker"
|
|
}
|
|
|
|
// 1. Get join token from local swarm
|
|
tokens, err := h.docker.GetJoinTokens()
|
|
if err != nil {
|
|
respondError(w, http.StatusInternalServerError, "cannot get join tokens: "+err.Error())
|
|
return
|
|
}
|
|
managerAddr := h.docker.GetManagerAddr()
|
|
var token string
|
|
if body.Role == "manager" {
|
|
token = tokens.JoinTokens.Manager
|
|
} else {
|
|
token = tokens.JoinTokens.Worker
|
|
}
|
|
joinCmd := fmt.Sprintf("docker swarm join --token %s %s", token, managerAddr)
|
|
|
|
// 2. Dial SSH to the remote host
|
|
sshCfg := &ssh.ClientConfig{
|
|
User: body.User,
|
|
Auth: []ssh.AuthMethod{
|
|
ssh.Password(body.Password),
|
|
},
|
|
HostKeyCallback: ssh.InsecureIgnoreHostKey(), // acceptable for internal cluster management
|
|
Timeout: 15 * time.Second,
|
|
}
|
|
addr := fmt.Sprintf("%s:%d", body.Host, body.Port)
|
|
log.Printf("[SwarmJoinNode] Dialing SSH %s as %s", addr, body.User)
|
|
|
|
client, err := ssh.Dial("tcp", addr, sshCfg)
|
|
if err != nil {
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"ok": false,
|
|
"step": "ssh_connect",
|
|
"error": fmt.Sprintf("SSH connection failed: %s", err.Error()),
|
|
"host": body.Host,
|
|
"command": joinCmd,
|
|
})
|
|
return
|
|
}
|
|
defer client.Close()
|
|
|
|
// 3. Run docker swarm join on the remote node
|
|
sess, err := client.NewSession()
|
|
if err != nil {
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"ok": false,
|
|
"step": "ssh_session",
|
|
"error": fmt.Sprintf("SSH session failed: %s", err.Error()),
|
|
})
|
|
return
|
|
}
|
|
defer sess.Close()
|
|
|
|
log.Printf("[SwarmJoinNode] Running on %s: %s", body.Host, joinCmd)
|
|
out, err := sess.CombinedOutput(joinCmd)
|
|
output := strings.TrimSpace(string(out))
|
|
|
|
if err != nil {
|
|
// Node might already be in the swarm — treat "already" as success
|
|
if strings.Contains(output, "already") || strings.Contains(output, "This node is already") {
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"ok": true,
|
|
"output": output,
|
|
"note": "node is already part of this swarm",
|
|
"command": joinCmd,
|
|
})
|
|
return
|
|
}
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"ok": false,
|
|
"step": "docker_join",
|
|
"error": fmt.Sprintf("docker swarm join failed: %s", err.Error()),
|
|
"output": output,
|
|
"command": joinCmd,
|
|
})
|
|
return
|
|
}
|
|
|
|
log.Printf("[SwarmJoinNode] Success: %s joined as %s", body.Host, body.Role)
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"ok": true,
|
|
"output": output,
|
|
"host": body.Host,
|
|
"role": body.Role,
|
|
"command": joinCmd,
|
|
})
|
|
}
|
|
|
|
// POST /api/swarm/ssh-test
|
|
// Tests SSH connectivity and checks if Docker is accessible on the remote host.
|
|
// Body: { "host": "1.2.3.4", "port": 22, "user": "root", "password": "secret" }
|
|
func (h *Handler) SwarmSSHTest(w http.ResponseWriter, r *http.Request) {
|
|
var body struct {
|
|
Host string `json:"host"`
|
|
Port int `json:"port"`
|
|
User string `json:"user"`
|
|
Password string `json:"password"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
|
respondError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
if body.Host == "" || body.User == "" || body.Password == "" {
|
|
respondError(w, http.StatusBadRequest, "host, user and password are required")
|
|
return
|
|
}
|
|
if body.Port == 0 {
|
|
body.Port = 22
|
|
}
|
|
|
|
sshCfg := &ssh.ClientConfig{
|
|
User: body.User,
|
|
Auth: []ssh.AuthMethod{
|
|
ssh.Password(body.Password),
|
|
},
|
|
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
addr := fmt.Sprintf("%s:%d", body.Host, body.Port)
|
|
log.Printf("[SSHTest] Dialing %s as %s", addr, body.User)
|
|
|
|
client, err := ssh.Dial("tcp", addr, sshCfg)
|
|
if err != nil {
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"ok": false,
|
|
"step": "ssh_connect",
|
|
"error": fmt.Sprintf("SSH connection failed: %s", err.Error()),
|
|
})
|
|
return
|
|
}
|
|
defer client.Close()
|
|
|
|
// Run a quick docker version check to see if Docker daemon is accessible
|
|
sess, err := client.NewSession()
|
|
if err != nil {
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"ok": false,
|
|
"step": "ssh_session",
|
|
"error": fmt.Sprintf("SSH session failed: %s", err.Error()),
|
|
})
|
|
return
|
|
}
|
|
defer sess.Close()
|
|
|
|
out, _ := sess.CombinedOutput("docker version --format '{{.Server.Version}}' 2>/dev/null || echo 'docker_not_found'")
|
|
dockerVer := strings.TrimSpace(string(out))
|
|
dockerOk := dockerVer != "" && dockerVer != "docker_not_found"
|
|
|
|
log.Printf("[SSHTest] %s — SSH OK, docker: %s", addr, dockerVer)
|
|
respond(w, http.StatusOK, map[string]any{
|
|
"ok": true,
|
|
"sshOk": true,
|
|
"dockerOk": dockerOk,
|
|
"dockerVersion": dockerVer,
|
|
"host": body.Host,
|
|
})
|
|
}
|