Files
GoClaw/gateway/internal/db/db.go
bboxwtf c57d694236 feat(phase21): real Docker Swarm management — live nodes, services, tasks, host shell, agent deployment
## What's implemented

### Go Gateway — New /api/swarm/* endpoints (handlers.go + docker/client.go + db.go)
- GET  /api/swarm/info          — swarm state, manager address, join tokens
- GET  /api/swarm/nodes         — live node list (hostname, IP, CPU, RAM, role, labels)
- POST /api/swarm/nodes/{id}/label        — add/update node label
- POST /api/swarm/nodes/{id}/availability — set node availability (active|pause|drain)
- GET  /api/swarm/services       — all swarm services with replica counts
- POST /api/swarm/services/create — deploy a new agent as a swarm service
- GET  /api/swarm/services/{id}/tasks  — tasks per service (which node runs which replica)
- POST /api/swarm/services/{id}/scale  — scale replicas
- GET  /api/swarm/join-token    — worker/manager join command with token + manager addr
- POST /api/swarm/shell         — execute commands on the HOST via nsenter PID 1

### Docker client (client.go)
- ListServices, GetService, ScaleService, ListServiceTasks, CreateAgentService
- AddNodeLabel, UpdateNodeAvailability (patch node spec via Docker API)
- ExecOnHost (nsenter -t 1 → falls back to container scope)

### DB persistence (db.go)
- UpsertSwarmNodes — stores live node state to swarmNodes table
- UpsertSwarmTokens / GetSwarmTokens — persist join tokens
- Startup goroutine in main.go syncs tokens to DB on gateway start

### Node.js tRPC wrappers (routers.ts + gateway-proxy.ts)
- nodes.swarmInfo, nodes.list, nodes.services, nodes.serviceTasks
- nodes.scaleService, nodes.joinToken, nodes.execShell
- nodes.addNodeLabel, nodes.setAvailability, nodes.deployAgentService

### Frontend — Nodes.tsx (complete rewrite)
- Real swarm overview cards (nodes, managers, services, running tasks)
- Join token cards with copy button for worker & manager tokens
- Node cards with inline availability selector (active/pause/drain) + add-label form
- Services table with Scale dialog + Tasks drawer (replica → node mapping)
- Deploy Agent dialog (image, replicas, env vars, published port)
- Host Shell tab with command history and quick-command buttons

### docker-compose.yml
- gateway now runs with privileged: true + pid: host
  → nsenter can access the host PID namespace for real host-level shell execution

## Verified end-to-end
- GET /api/swarm/info returns manager addr + join tokens ✓
- GET /api/swarm/nodes returns node wsm (2 cores, 3.9 GB) ✓
- POST /api/swarm/services/create → deployed goclaw-test-agent (2 replicas) ✓
- GET /api/swarm/services/{id}/tasks returns task list with nodeId ✓
- POST /api/swarm/services/{id}/scale → scale to 0 ✓
- POST /api/swarm/shell {command:'docker node ls'} → real host output ✓
- tRPC chain: browser → control-center → gateway → docker.sock ✓
2026-03-21 17:23:32 +00:00

677 lines
20 KiB
Go

// Package db provides MySQL/TiDB connectivity and agent config queries.
package db
import (
"database/sql"
"database/sql/driver"
"encoding/json"
"fmt"
"log"
"strings"
_ "github.com/go-sql-driver/mysql"
)
// AgentConfig holds the orchestrator/agent configuration loaded from DB.
type AgentConfig struct {
ID int
Name string
Model string
SystemPrompt string
AllowedTools []string
Temperature float64
MaxTokens int
IsOrchestrator bool
IsSystem bool
IsActive bool
}
// AgentRow is a minimal agent representation for listing.
type AgentRow struct {
ID int `json:"id"`
Name string `json:"name"`
Role string `json:"role"`
Model string `json:"model"`
Description string `json:"description"`
IsActive bool `json:"isActive"`
IsSystem bool `json:"isSystem"`
IsOrchestrator bool `json:"isOrchestrator"`
}
type DB struct {
conn *sql.DB
}
func Connect(dsn string) (*DB, error) {
if dsn == "" {
return nil, fmt.Errorf("DATABASE_URL is empty")
}
// Convert mysql:// URL to DSN format if needed
dsn = normalizeDSN(dsn)
conn, err := sql.Open("mysql", dsn)
if err != nil {
return nil, fmt.Errorf("failed to open DB: %w", err)
}
if err := conn.Ping(); err != nil {
return nil, fmt.Errorf("failed to ping DB: %w", err)
}
log.Println("[DB] Connected to MySQL")
return &DB{conn: conn}, nil
}
func (d *DB) Close() {
if d.conn != nil {
_ = d.conn.Close()
}
}
// GetOrchestratorConfig loads the agent with isOrchestrator=1 from DB.
func (d *DB) GetOrchestratorConfig() (*AgentConfig, error) {
row := d.conn.QueryRow(`
SELECT id, name, model, systemPrompt, allowedTools, temperature, maxTokens, isOrchestrator, isSystem, isActive
FROM agents
WHERE isOrchestrator = 1
LIMIT 1
`)
return scanAgentConfig(row)
}
// GetAgentByID loads a specific agent by ID.
func (d *DB) GetAgentByID(id int) (*AgentConfig, error) {
row := d.conn.QueryRow(`
SELECT id, name, model, systemPrompt, allowedTools, temperature, maxTokens, isOrchestrator, isSystem, isActive
FROM agents
WHERE id = ?
LIMIT 1
`, id)
return scanAgentConfig(row)
}
// ListAgents returns all active agents.
func (d *DB) ListAgents() ([]AgentRow, error) {
rows, err := d.conn.Query(`
SELECT id, name, role, model, COALESCE(description,''), isActive, isSystem, isOrchestrator
FROM agents
ORDER BY isOrchestrator DESC, isSystem DESC, id ASC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var agents []AgentRow
for rows.Next() {
var a AgentRow
var isActive, isSystem, isOrch int
if err := rows.Scan(&a.ID, &a.Name, &a.Role, &a.Model, &a.Description, &isActive, &isSystem, &isOrch); err != nil {
continue
}
a.IsActive = isActive == 1
a.IsSystem = isSystem == 1
a.IsOrchestrator = isOrch == 1
agents = append(agents, a)
}
return agents, nil
}
// ─── LLM Provider ─────────────────────────────────────────────────────────────
// ProviderRow holds the active LLM provider config from DB.
type ProviderRow struct {
ID int
Name string
BaseURL string
APIKey string // decrypted (Node.js encrypts, Go just reads raw for now)
}
// GetActiveProvider returns the active LLM provider from the llmProviders table.
// Note: The API key is stored AES-256-GCM encrypted by the Node.js server.
// The Go gateway reads the raw encrypted bytes but cannot decrypt them (no shared key in Go).
// The proper flow: Node.js decrypts the key and passes it via /api/providers/reload.
// For now, GetActiveProvider returns the stored encrypted bytes as-is (not useful for direct use).
// Use UpdateCredentials on the LLM client instead.
func (d *DB) GetActiveProvider() (*ProviderRow, error) {
var p ProviderRow
var apiKeyEncrypted sql.NullString
row := d.conn.QueryRow(`
SELECT id, name, baseUrl, COALESCE(apiKeyEncrypted, '')
FROM llmProviders
WHERE isActive = 1
LIMIT 1
`)
err := row.Scan(&p.ID, &p.Name, &p.BaseURL, &apiKeyEncrypted)
if err != nil {
return nil, err
}
// We cannot decrypt the key in Go (different crypto impl from Node.js)
// Return empty key — the LLM client will use its env-configured key
p.APIKey = ""
return &p, nil
}
// ─── Chat Sessions & Events ───────────────────────────────────────────────────
// ChatSessionRow holds one persistent chat session.
type ChatSessionRow struct {
ID int `json:"id"`
SessionID string `json:"sessionId"`
AgentID int `json:"agentId"`
Status string `json:"status"` // running | done | error
UserMessage string `json:"userMessage"`
FinalResponse string `json:"finalResponse"`
Model string `json:"model"`
TotalTokens int `json:"totalTokens"`
ProcessingTimeMs int64 `json:"processingTimeMs"`
ErrorMessage string `json:"errorMessage"`
CreatedAt string `json:"createdAt"`
UpdatedAt string `json:"updatedAt"`
}
// ChatEventRow holds one event inside a session.
type ChatEventRow struct {
ID int `json:"id"`
SessionID string `json:"sessionId"`
Seq int `json:"seq"`
EventType string `json:"eventType"` // thinking | tool_call | delta | done | error
Content string `json:"content"`
ToolName string `json:"toolName"`
ToolArgs string `json:"toolArgs"` // JSON string
ToolResult string `json:"toolResult"`
ToolSuccess bool `json:"toolSuccess"`
DurationMs int `json:"durationMs"`
Model string `json:"model"`
UsageJSON string `json:"usageJson"` // JSON string
ErrorMsg string `json:"errorMsg"`
CreatedAt string `json:"createdAt"`
}
// CreateSession inserts a new running session and returns its row.
func (d *DB) CreateSession(sessionID, userMessage string, agentID int) error {
if d.conn == nil {
return fmt.Errorf("DB not connected")
}
_, err := d.conn.Exec(`
INSERT INTO chatSessions (sessionId, agentId, status, userMessage)
VALUES (?, ?, 'running', ?)
`, sessionID, agentID, truncate(userMessage, 65535))
return err
}
// AppendEvent inserts a new event row for a session.
// seq is auto-calculated as MAX(seq)+1 for the session.
func (d *DB) AppendEvent(e ChatEventRow) error {
if d.conn == nil {
return nil
}
toolArgs := e.ToolArgs
if toolArgs == "" {
toolArgs = "null"
}
usageJSON := e.UsageJSON
if usageJSON == "" {
usageJSON = "null"
}
var toolSuccessVal interface{}
if e.EventType == "tool_call" {
if e.ToolSuccess {
toolSuccessVal = 1
} else {
toolSuccessVal = 0
}
}
_, err := d.conn.Exec(`
INSERT INTO chatEvents
(sessionId, seq, eventType, content, toolName, toolArgs,
toolResult, toolSuccess, durationMs, model, usageJson, errorMsg)
SELECT ?, COALESCE(MAX(seq),0)+1, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?
FROM chatEvents WHERE sessionId = ?
`,
e.SessionID, e.EventType,
nullStr(e.Content), nullStr(e.ToolName), rawJSON(toolArgs),
nullStr(e.ToolResult), toolSuccessVal, nullInt(e.DurationMs),
nullStr(e.Model), rawJSON(usageJSON), nullStr(e.ErrorMsg),
e.SessionID,
)
if err != nil {
log.Printf("[DB] AppendEvent error: %v", err)
}
return err
}
// MarkSessionDone updates a session to done/error status.
func (d *DB) MarkSessionDone(sessionID, status, finalResponse, model, errorMessage string, totalTokens int, processingTimeMs int64) {
if d.conn == nil {
return
}
_, err := d.conn.Exec(`
UPDATE chatSessions
SET status=?, finalResponse=?, model=?, totalTokens=?,
processingTimeMs=?, errorMessage=?
WHERE sessionId=?
`, status,
truncate(finalResponse, 65535),
model,
totalTokens,
processingTimeMs,
truncate(errorMessage, 65535),
sessionID,
)
if err != nil {
log.Printf("[DB] MarkSessionDone error: %v", err)
}
}
// GetSession returns a single session by its string ID.
func (d *DB) GetSession(sessionID string) (*ChatSessionRow, error) {
if d.conn == nil {
return nil, fmt.Errorf("DB not connected")
}
row := d.conn.QueryRow(`
SELECT id, sessionId, agentId, status,
COALESCE(userMessage,''),
COALESCE(finalResponse,''),
COALESCE(model,''),
COALESCE(totalTokens,0),
COALESCE(processingTimeMs,0),
COALESCE(errorMessage,''),
createdAt, updatedAt
FROM chatSessions WHERE sessionId=? LIMIT 1
`, sessionID)
var s ChatSessionRow
err := row.Scan(&s.ID, &s.SessionID, &s.AgentID, &s.Status,
&s.UserMessage, &s.FinalResponse, &s.Model,
&s.TotalTokens, &s.ProcessingTimeMs, &s.ErrorMessage,
&s.CreatedAt, &s.UpdatedAt)
if err != nil {
return nil, err
}
return &s, nil
}
// GetEvents returns all events for a session with seq > afterSeq (for incremental polling).
func (d *DB) GetEvents(sessionID string, afterSeq int) ([]ChatEventRow, error) {
if d.conn == nil {
return nil, fmt.Errorf("DB not connected")
}
rows, err := d.conn.Query(`
SELECT id, sessionId, seq, eventType,
COALESCE(content,''), COALESCE(toolName,''),
COALESCE(CAST(toolArgs AS CHAR),'null'),
COALESCE(toolResult,''),
COALESCE(toolSuccess,0),
COALESCE(durationMs,0),
COALESCE(model,''),
COALESCE(CAST(usageJson AS CHAR),'null'),
COALESCE(errorMsg,''),
createdAt
FROM chatEvents
WHERE sessionId=? AND seq > ?
ORDER BY seq ASC
`, sessionID, afterSeq)
if err != nil {
return nil, err
}
defer rows.Close()
var result []ChatEventRow
for rows.Next() {
var e ChatEventRow
var toolSuccess int
if err := rows.Scan(
&e.ID, &e.SessionID, &e.Seq, &e.EventType,
&e.Content, &e.ToolName, &e.ToolArgs,
&e.ToolResult, &toolSuccess, &e.DurationMs,
&e.Model, &e.UsageJSON, &e.ErrorMsg, &e.CreatedAt,
); err != nil {
continue
}
e.ToolSuccess = toolSuccess == 1
result = append(result, e)
}
return result, nil
}
// GetRecentSessions returns the N most recent sessions.
func (d *DB) GetRecentSessions(limit int) ([]ChatSessionRow, error) {
if d.conn == nil {
return nil, fmt.Errorf("DB not connected")
}
rows, err := d.conn.Query(`
SELECT id, sessionId, agentId, status,
COALESCE(userMessage,''),
COALESCE(finalResponse,''),
COALESCE(model,''),
COALESCE(totalTokens,0),
COALESCE(processingTimeMs,0),
COALESCE(errorMessage,''),
createdAt, updatedAt
FROM chatSessions ORDER BY id DESC LIMIT ?
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var result []ChatSessionRow
for rows.Next() {
var s ChatSessionRow
if err := rows.Scan(&s.ID, &s.SessionID, &s.AgentID, &s.Status,
&s.UserMessage, &s.FinalResponse, &s.Model,
&s.TotalTokens, &s.ProcessingTimeMs, &s.ErrorMessage,
&s.CreatedAt, &s.UpdatedAt); err != nil {
continue
}
result = append(result, s)
}
return result, nil
}
// helper — nil for empty strings
func nullStr(s string) interface{} {
if s == "" {
return nil
}
return s
}
// helper — nil for zero int
func nullInt(n int) interface{} {
if n == 0 {
return nil
}
return n
}
// rawJSON wraps a JSON string so it's passed as-is to MySQL (not double-encoded)
type rawJSON string
func (r rawJSON) Value() (driver.Value, error) {
if r == "null" || r == "" {
return nil, nil
}
return string(r), nil
}
// ─── Metrics & History ────────────────────────────────────────────────────────
// MetricInput holds data for a single orchestrator request metric.
type MetricInput struct {
AgentID int
RequestID string
UserMessage string
AgentResponse string
InputTokens int
OutputTokens int
TotalTokens int
ProcessingTimeMs int64
Status string // "success" | "error" | "timeout"
ErrorMessage string
ToolsCalled []string
Model string
}
// SaveMetric inserts a row into the agentMetrics table.
// Non-fatal — logs on error but does not return one.
func (d *DB) SaveMetric(m MetricInput) {
if d.conn == nil {
return
}
toolsJSON, _ := json.Marshal(m.ToolsCalled)
_, err := d.conn.Exec(`
INSERT INTO agentMetrics
(agentId, requestId, userMessage, agentResponse,
inputTokens, outputTokens, totalTokens,
processingTimeMs, status, errorMessage, toolsCalled, model)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`,
m.AgentID,
m.RequestID,
truncate(m.UserMessage, 65535),
truncate(m.AgentResponse, 65535),
m.InputTokens, m.OutputTokens, m.TotalTokens,
m.ProcessingTimeMs,
m.Status,
m.ErrorMessage,
string(toolsJSON),
m.Model,
)
if err != nil {
log.Printf("[DB] SaveMetric error: %v", err)
}
}
// HistoryInput holds data for one conversation entry.
type HistoryInput struct {
AgentID int
UserMessage string
AgentResponse string
ConversationID string
Status string // "success" | "error" | "pending"
}
// SaveHistory inserts a row into the agentHistory table.
// Non-fatal — logs on error but does not return one.
func (d *DB) SaveHistory(h HistoryInput) {
if d.conn == nil {
return
}
status := h.Status
if status == "" {
status = "success"
}
convID := sql.NullString{String: h.ConversationID, Valid: h.ConversationID != ""}
resp := sql.NullString{String: h.AgentResponse, Valid: h.AgentResponse != ""}
_, err := d.conn.Exec(`
INSERT INTO agentHistory (agentId, userMessage, agentResponse, conversationId, status)
VALUES (?, ?, ?, ?, ?)
`,
h.AgentID,
truncate(h.UserMessage, 65535),
resp,
convID,
status,
)
if err != nil {
log.Printf("[DB] SaveHistory error: %v", err)
}
}
// truncate caps a string to maxLen bytes (not runes — fast path for DB limits).
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}
// ─── Swarm Node Persistence ───────────────────────────────────────────────────
// SwarmNodeInput is the data shape that handlers pass to UpsertSwarmNodes.
// It matches the JSON shape from handler's NodeOut struct so we can reuse it.
type SwarmNodeInput struct {
ID string `json:"id"`
Hostname string `json:"hostname"`
Role string `json:"role"`
State string `json:"state"`
Availability string `json:"availability"`
IP string `json:"ip"`
CPUCores int `json:"cpuCores"`
MemTotalMB int64 `json:"memTotalMB"`
DockerVersion string `json:"dockerVersion"`
IsLeader bool `json:"isLeader"`
ManagerAddr string `json:"managerAddr"`
Labels map[string]string `json:"labels"`
}
// UpsertSwarmNodes inserts or updates swarm node records in the swarmNodes table.
// Called asynchronously from the SwarmNodes handler — never blocks the response.
func (d *DB) UpsertSwarmNodes(nodes interface{}) {
if d.conn == nil {
return
}
// We accept interface{} to avoid circular import; use json round-trip to parse.
b, err := json.Marshal(nodes)
if err != nil {
return
}
var list []SwarmNodeInput
if err := json.Unmarshal(b, &list); err != nil {
return
}
for _, n := range list {
labelsJSON, _ := json.Marshal(n.Labels)
isLeader := 0
if n.IsLeader {
isLeader = 1
}
isManager := 0
if n.Role == "manager" {
isManager = 1
}
state := n.State
if state != "ready" && state != "down" && state != "disconnected" {
state = "ready"
}
avail := n.Availability
if avail != "active" && avail != "pause" && avail != "drain" {
avail = "active"
}
_, err := d.conn.Exec(`
INSERT INTO swarmNodes
(nodeId, hostname, role, state, availability, advertiseAddr,
labels, engineVersion, cpuCores, memTotalMB, isManager, isLeader)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
hostname=VALUES(hostname), role=VALUES(role),
state=VALUES(state), availability=VALUES(availability),
advertiseAddr=VALUES(advertiseAddr),
labels=VALUES(labels), engineVersion=VALUES(engineVersion),
cpuCores=VALUES(cpuCores), memTotalMB=VALUES(memTotalMB),
isManager=VALUES(isManager), isLeader=VALUES(isLeader),
lastSeenAt=CURRENT_TIMESTAMP
`,
n.ID, n.Hostname, n.Role, state, avail, n.IP,
string(labelsJSON), n.DockerVersion,
n.CPUCores, n.MemTotalMB, isManager, isLeader,
)
if err != nil {
log.Printf("[DB] UpsertSwarmNodes error for node %s: %v", n.ID, err)
}
}
}
// UpsertSwarmTokens stores the current swarm join tokens.
func (d *DB) UpsertSwarmTokens(workerToken, managerToken, managerAddr string) {
if d.conn == nil {
return
}
_, err := d.conn.Exec(`
INSERT INTO swarmTokens (managerToken, workerToken, managerAddr)
VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE
managerToken=VALUES(managerToken),
workerToken=VALUES(workerToken),
managerAddr=VALUES(managerAddr)
`, managerToken, workerToken, managerAddr)
if err != nil {
log.Printf("[DB] UpsertSwarmTokens error: %v", err)
}
}
// GetSwarmTokens retrieves the stored join tokens.
func (d *DB) GetSwarmTokens() (worker, manager, addr string, err error) {
if d.conn == nil {
err = fmt.Errorf("DB not connected")
return
}
row := d.conn.QueryRow(`
SELECT COALESCE(workerToken,''), COALESCE(managerToken,''), COALESCE(managerAddr,'')
FROM swarmTokens ORDER BY id DESC LIMIT 1
`)
err = row.Scan(&worker, &manager, &addr)
return
}
// ─── Helpers ──────────────────────────────────────────────────────────────────
func scanAgentConfig(row *sql.Row) (*AgentConfig, error) {
var cfg AgentConfig
var systemPrompt sql.NullString
var allowedToolsJSON sql.NullString
var temperature sql.NullFloat64
var maxTokens sql.NullInt64
var isOrch, isSystem, isActive int
err := row.Scan(
&cfg.ID, &cfg.Name, &cfg.Model,
&systemPrompt, &allowedToolsJSON,
&temperature, &maxTokens,
&isOrch, &isSystem, &isActive,
)
if err != nil {
return nil, err
}
cfg.SystemPrompt = systemPrompt.String
cfg.Temperature = temperature.Float64
if cfg.Temperature == 0 {
cfg.Temperature = 0.5
}
cfg.MaxTokens = int(maxTokens.Int64)
if cfg.MaxTokens == 0 {
cfg.MaxTokens = 8192
}
cfg.IsOrchestrator = isOrch == 1
cfg.IsSystem = isSystem == 1
cfg.IsActive = isActive == 1
if allowedToolsJSON.Valid && allowedToolsJSON.String != "" && allowedToolsJSON.String != "null" {
_ = json.Unmarshal([]byte(allowedToolsJSON.String), &cfg.AllowedTools)
}
return &cfg, nil
}
// normalizeDSN converts mysql://user:pass@host:port/db to user:pass@tcp(host:port)/db
func normalizeDSN(dsn string) string {
if !strings.HasPrefix(dsn, "mysql://") {
return dsn
}
// Strip scheme
dsn = strings.TrimPrefix(dsn, "mysql://")
// user:pass@host:port/db → user:pass@tcp(host:port)/db
atIdx := strings.LastIndex(dsn, "@")
if atIdx < 0 {
return dsn
}
userInfo := dsn[:atIdx]
hostDB := dsn[atIdx+1:]
// Split host:port/db
slashIdx := strings.Index(hostDB, "/")
var hostPort, dbName string
if slashIdx >= 0 {
hostPort = hostDB[:slashIdx]
dbName = hostDB[slashIdx:]
} else {
hostPort = hostDB
dbName = ""
}
// TiDB Cloud and other cloud MySQL require TLS — detect by host pattern
tlsParam := ""
if strings.Contains(hostPort, "tidbcloud") ||
strings.Contains(hostPort, "tidb.cloud") ||
strings.Contains(hostPort, "aws") ||
strings.Contains(hostPort, "gcp") ||
strings.Contains(hostPort, "azure") {
tlsParam = "&tls=true"
}
// Also detect if the original DSN had ?ssl or ?tls params
if strings.Contains(dbName, "ssl") || strings.Contains(dbName, "tls") {
tlsParam = "" // already handled in dbName
}
return fmt.Sprintf("%s@tcp(%s)%s?parseTime=true&charset=utf8mb4%s", userInfo, hostPort, dbName, tlsParam)
}