// Package db provides MySQL/TiDB connectivity and agent config queries. package db import ( "database/sql" "encoding/json" "fmt" "log" "strings" _ "github.com/go-sql-driver/mysql" ) // AgentConfig holds the orchestrator/agent configuration loaded from DB. type AgentConfig struct { ID int Name string Model string SystemPrompt string AllowedTools []string Temperature float64 MaxTokens int IsOrchestrator bool IsSystem bool IsActive bool // Container / Swarm fields (Phase A) ServiceName string ServicePort int ContainerImage string ContainerStatus string // "stopped" | "deploying" | "running" | "error" } // AgentRow is a minimal agent representation for listing. type AgentRow struct { ID int `json:"id"` Name string `json:"name"` Role string `json:"role"` Model string `json:"model"` Description string `json:"description"` IsActive bool `json:"isActive"` IsSystem bool `json:"isSystem"` IsOrchestrator bool `json:"isOrchestrator"` } type DB struct { conn *sql.DB } func Connect(dsn string) (*DB, error) { if dsn == "" { return nil, fmt.Errorf("DATABASE_URL is empty") } // Convert mysql:// URL to DSN format if needed dsn = normalizeDSN(dsn) conn, err := sql.Open("mysql", dsn) if err != nil { return nil, fmt.Errorf("failed to open DB: %w", err) } if err := conn.Ping(); err != nil { return nil, fmt.Errorf("failed to ping DB: %w", err) } log.Println("[DB] Connected to MySQL") return &DB{conn: conn}, nil } func (d *DB) Close() { if d.conn != nil { _ = d.conn.Close() } } // GetOrchestratorConfig loads the agent with isOrchestrator=1 from DB. func (d *DB) GetOrchestratorConfig() (*AgentConfig, error) { row := d.conn.QueryRow(` SELECT id, name, model, systemPrompt, allowedTools, temperature, maxTokens, isOrchestrator, isSystem, isActive, COALESCE(serviceName,''), COALESCE(servicePort,0), COALESCE(containerImage,'goclaw-agent-worker:latest'), COALESCE(containerStatus,'stopped') FROM agents WHERE isOrchestrator = 1 LIMIT 1 `) return scanAgentConfig(row) } // GetAgentByID loads a specific agent by ID. func (d *DB) GetAgentByID(id int) (*AgentConfig, error) { row := d.conn.QueryRow(` SELECT id, name, model, systemPrompt, allowedTools, temperature, maxTokens, isOrchestrator, isSystem, isActive, COALESCE(serviceName,''), COALESCE(servicePort,0), COALESCE(containerImage,'goclaw-agent-worker:latest'), COALESCE(containerStatus,'stopped') FROM agents WHERE id = ? LIMIT 1 `, id) return scanAgentConfig(row) } // ListAgents returns all active agents. func (d *DB) ListAgents() ([]AgentRow, error) { rows, err := d.conn.Query(` SELECT id, name, role, model, COALESCE(description,''), isActive, isSystem, isOrchestrator FROM agents ORDER BY isOrchestrator DESC, isSystem DESC, id ASC `) if err != nil { return nil, err } defer rows.Close() var agents []AgentRow for rows.Next() { var a AgentRow var isActive, isSystem, isOrch int if err := rows.Scan(&a.ID, &a.Name, &a.Role, &a.Model, &a.Description, &isActive, &isSystem, &isOrch); err != nil { continue } a.IsActive = isActive == 1 a.IsSystem = isSystem == 1 a.IsOrchestrator = isOrch == 1 agents = append(agents, a) } return agents, nil } // ─── Helpers ────────────────────────────────────────────────────────────────── func scanAgentConfig(row *sql.Row) (*AgentConfig, error) { var cfg AgentConfig var systemPrompt sql.NullString var allowedToolsJSON sql.NullString var temperature sql.NullFloat64 var maxTokens sql.NullInt64 var isOrch, isSystem, isActive int err := row.Scan( &cfg.ID, &cfg.Name, &cfg.Model, &systemPrompt, &allowedToolsJSON, &temperature, &maxTokens, &isOrch, &isSystem, &isActive, &cfg.ServiceName, &cfg.ServicePort, &cfg.ContainerImage, &cfg.ContainerStatus, ) if err != nil { return nil, err } cfg.SystemPrompt = systemPrompt.String cfg.Temperature = temperature.Float64 if cfg.Temperature == 0 { cfg.Temperature = 0.5 } cfg.MaxTokens = int(maxTokens.Int64) if cfg.MaxTokens == 0 { cfg.MaxTokens = 8192 } cfg.IsOrchestrator = isOrch == 1 cfg.IsSystem = isSystem == 1 cfg.IsActive = isActive == 1 if allowedToolsJSON.Valid && allowedToolsJSON.String != "" && allowedToolsJSON.String != "null" { _ = json.Unmarshal([]byte(allowedToolsJSON.String), &cfg.AllowedTools) } return &cfg, nil } // ─── Agent Container Fields ─────────────────────────────────────────────────── // These methods support the agent-worker container architecture where each // agent runs as an autonomous Docker Swarm service. // UpdateContainerStatus updates the container lifecycle state of an agent. func (d *DB) UpdateContainerStatus(agentID int, status, serviceName string, servicePort int) error { if d.conn == nil { return nil } _, err := d.conn.Exec(` UPDATE agents SET containerStatus = ?, serviceName = ?, servicePort = ?, updatedAt = NOW() WHERE id = ? `, status, serviceName, servicePort, agentID) return err } // HistoryInput holds data for one conversation entry. type HistoryInput struct { AgentID int UserMessage string AgentResponse string ConversationID string Status string // "success" | "error" | "pending" } // HistoryRow is a single entry from agentHistory for sliding window memory. type HistoryRow struct { ID int `json:"id"` UserMessage string `json:"userMessage"` AgentResponse string `json:"agentResponse"` ConvID string `json:"conversationId"` } // SaveHistory inserts a row into the agentHistory table. // Non-fatal — logs on error but does not return one. func (d *DB) SaveHistory(h HistoryInput) { if d.conn == nil { return } status := h.Status if status == "" { status = "success" } convID := sql.NullString{String: h.ConversationID, Valid: h.ConversationID != ""} resp := sql.NullString{String: h.AgentResponse, Valid: h.AgentResponse != ""} _, err := d.conn.Exec(` INSERT INTO agentHistory (agentId, userMessage, agentResponse, conversationId, status) VALUES (?, ?, ?, ?, ?) `, h.AgentID, truncate(h.UserMessage, 65535), resp, convID, status, ) if err != nil { log.Printf("[DB] SaveHistory error: %v", err) } } // GetAgentHistory returns the last N conversation turns for an agent, oldest first. func (d *DB) GetAgentHistory(agentID, limit int) ([]HistoryRow, error) { if d.conn == nil { return nil, nil } rows, err := d.conn.Query(` SELECT id, userMessage, COALESCE(agentResponse,''), COALESCE(conversationId,'') FROM agentHistory WHERE agentId = ? ORDER BY id DESC LIMIT ? `, agentID, limit) if err != nil { return nil, err } defer rows.Close() var result []HistoryRow for rows.Next() { var h HistoryRow if err := rows.Scan(&h.ID, &h.UserMessage, &h.AgentResponse, &h.ConvID); err != nil { continue } result = append(result, h) } // Reverse so oldest is first (for LLM context ordering) for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 { result[i], result[j] = result[j], result[i] } return result, nil } // truncate caps a string to maxLen bytes. func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s } return s[:maxLen] } // normalizeDSN converts mysql://user:pass@host:port/db to user:pass@tcp(host:port)/db func normalizeDSN(dsn string) string { if !strings.HasPrefix(dsn, "mysql://") { return dsn } // Strip scheme dsn = strings.TrimPrefix(dsn, "mysql://") // user:pass@host:port/db → user:pass@tcp(host:port)/db atIdx := strings.LastIndex(dsn, "@") if atIdx < 0 { return dsn } userInfo := dsn[:atIdx] hostDB := dsn[atIdx+1:] // Split host:port/db slashIdx := strings.Index(hostDB, "/") var hostPort, dbName string if slashIdx >= 0 { hostPort = hostDB[:slashIdx] dbName = hostDB[slashIdx:] } else { hostPort = hostDB dbName = "" } // TiDB Cloud and other cloud MySQL require TLS — detect by host pattern tlsParam := "" if strings.Contains(hostPort, "tidbcloud") || strings.Contains(hostPort, "tidb.cloud") || strings.Contains(hostPort, "aws") || strings.Contains(hostPort, "gcp") || strings.Contains(hostPort, "azure") { tlsParam = "&tls=true" } // Also detect if the original DSN had ?ssl or ?tls params if strings.Contains(dbName, "ssl") || strings.Contains(dbName, "tls") { tlsParam = "" // already handled in dbName } return fmt.Sprintf("%s@tcp(%s)%s?parseTime=true&charset=utf8mb4%s", userInfo, hostPort, dbName, tlsParam) }