Files
GoClaw/gateway/cmd/agent-worker/main.go
¨NW¨ 0f23dffc26 feat(agents): restore agent-worker container architecture + fix chat scroll and parallel chats
- Restore agent-worker from commit 153399f: autonomous HTTP server per agent
  (main.go 597 lines, main_test.go 438 lines, Dockerfile.agent-worker)
- Add container fields to agents table (serviceName, servicePort, containerImage, containerStatus)
- Update executor.go: real delegateToAgent() with HTTP POST to agent containers
- Update db.go: GetAgentByID, UpdateContainerStatus, GetAgentHistory, SaveHistory
- Update orchestrator.go: inject DB into executor for container address resolution
- Add tRPC endpoints: agents.deployContainer, agents.stopContainer, agents.containerStatus
- Add Docker Swarm deploy/stop logic in server/agents.ts
- Add Start/Stop container buttons to Agents.tsx with status badges
- Fix chat auto-scroll: replace ScrollArea with overflow-y-auto for direct scrollTop control
- Fix parallel chats: make isThinking per-conversation (thinkingConvId) instead of global
  so switching between chats works while one is processing
2026-04-10 15:43:33 +01:00

727 lines
22 KiB
Go

// GoClaw Agent Worker — автономный HTTP-сервер агента.
//
// Каждый агент запускается как отдельный Docker Swarm service.
// Загружает свой конфиг из общей DB по AGENT_ID, выполняет LLM loop
// и принимает параллельные задачи от Orchestrator и других агентов.
//
// Endpoints:
//
// GET /health — liveness probe
// GET /info — конфиг агента (имя, модель, роль)
// POST /chat — синхронный чат (LLM loop, ждёт ответ)
// POST /task — поставить задачу в очередь (async, возвращает task_id)
// GET /tasks — список задач агента (active + recent)
// GET /tasks/{id} — статус конкретной задачи
// GET /memory — последние N сообщений из истории агента
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/cors"
"github.com/google/uuid"
"github.com/joho/godotenv"
"git.softuniq.eu/UniqAI/GoClaw/gateway/internal/db"
"git.softuniq.eu/UniqAI/GoClaw/gateway/internal/llm"
"git.softuniq.eu/UniqAI/GoClaw/gateway/internal/tools"
)
// ─── Task types ──────────────────────────────────────────────────────────────
type TaskStatus string
const (
TaskPending TaskStatus = "pending"
TaskRunning TaskStatus = "running"
TaskDone TaskStatus = "done"
TaskFailed TaskStatus = "failed"
TaskCancelled TaskStatus = "cancelled"
)
// Task — единица работы агента, принятая через /task.
type Task struct {
ID string `json:"id"`
FromAgentID int `json:"from_agent_id,omitempty"` // кто делегировал (0 = человек)
Input string `json:"input"` // текст задачи
CallbackURL string `json:"callback_url,omitempty"` // куда POST результат
Priority int `json:"priority"` // 0=normal, 1=high
TimeoutSecs int `json:"timeout_secs"`
Status TaskStatus `json:"status"`
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
ToolCalls []ToolCallStep `json:"tool_calls,omitempty"`
CreatedAt time.Time `json:"created_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
DoneAt *time.Time `json:"done_at,omitempty"`
}
// ToolCallStep — шаг вызова инструмента для отображения в UI.
type ToolCallStep struct {
Tool string `json:"tool"`
Args any `json:"args"`
Result any `json:"result,omitempty"`
Error string `json:"error,omitempty"`
Success bool `json:"success"`
DurationMs int64 `json:"duration_ms"`
}
// ChatMessage — сообщение в формате для /chat endpoint.
type ChatMessage struct {
Role string `json:"role"`
Content string `json:"content"`
}
// ChatRequest — запрос на /chat (синхронный).
type ChatRequest struct {
Messages []ChatMessage `json:"messages"`
Model string `json:"model,omitempty"` // override модели агента
MaxIter int `json:"max_iter,omitempty"` // override max iterations
}
// ChatResponse — ответ /chat.
type ChatResponse struct {
Success bool `json:"success"`
Response string `json:"response"`
ToolCalls []ToolCallStep `json:"tool_calls"`
Model string `json:"model"`
Error string `json:"error,omitempty"`
}
// TaskRequest — запрос на /task (async).
type TaskRequest struct {
Input string `json:"input"`
FromAgentID int `json:"from_agent_id,omitempty"`
CallbackURL string `json:"callback_url,omitempty"`
Priority int `json:"priority,omitempty"`
TimeoutSecs int `json:"timeout_secs,omitempty"`
}
// ─── Agent Worker ─────────────────────────────────────────────────────────────
type AgentWorker struct {
agentID int
cfg *db.AgentConfig
llm *llm.Client
database *db.DB
executor *tools.Executor
// Task queue — buffered channel
taskQueue chan *Task
// Task store — in-memory (id → Task)
tasksMu sync.RWMutex
tasks map[string]*Task
// Recent tasks ring buffer (для GET /tasks)
recentMu sync.Mutex
recentKeys []string
}
const (
taskQueueDepth = 100
maxRecentTasks = 50
defaultMaxIter = 8
defaultTimeout = 120
workerGoroutines = 4 // параллельных воркеров на агента
)
func newAgentWorker(agentID int, database *db.DB, llmClient *llm.Client) (*AgentWorker, error) {
cfg, err := database.GetAgentByID(agentID)
if err != nil {
return nil, fmt.Errorf("agent %d not found in DB: %w", agentID, err)
}
log.Printf("[AgentWorker] Loaded config: id=%d name=%q model=%s", cfg.ID, cfg.Name, cfg.Model)
w := &AgentWorker{
agentID: agentID,
cfg: cfg,
llm: llmClient,
database: database,
taskQueue: make(chan *Task, taskQueueDepth),
tasks: make(map[string]*Task),
}
// Tool executor: агент использует подмножество инструментов из allowedTools
w.executor = tools.NewExecutor("/app", func() ([]map[string]any, error) {
rows, err := database.ListAgents()
if err != nil {
return nil, err
}
result := make([]map[string]any, len(rows))
for i, r := range rows {
result[i] = map[string]any{
"id": r.ID, "name": r.Name, "role": r.Role,
"model": r.Model, "isActive": r.IsActive,
}
}
return result, nil
})
return w, nil
}
// StartWorkers запускает N горутин-воркеров, читающих из taskQueue.
func (w *AgentWorker) StartWorkers(ctx context.Context) {
for i := 0; i < workerGoroutines; i++ {
go w.runWorker(ctx, i)
}
log.Printf("[AgentWorker] %d worker goroutines started", workerGoroutines)
}
func (w *AgentWorker) runWorker(ctx context.Context, workerID int) {
for {
select {
case <-ctx.Done():
log.Printf("[Worker-%d] shutting down", workerID)
return
case task := <-w.taskQueue:
log.Printf("[Worker-%d] processing task %s", workerID, task.ID)
w.processTask(ctx, task)
}
}
}
// EnqueueTask добавляет задачу в очередь и в хранилище.
func (w *AgentWorker) EnqueueTask(req TaskRequest) *Task {
timeout := req.TimeoutSecs
if timeout <= 0 {
timeout = defaultTimeout
}
task := &Task{
ID: uuid.New().String(),
FromAgentID: req.FromAgentID,
Input: req.Input,
CallbackURL: req.CallbackURL,
Priority: req.Priority,
TimeoutSecs: timeout,
Status: TaskPending,
CreatedAt: time.Now(),
}
// Сохранить в store
w.tasksMu.Lock()
w.tasks[task.ID] = task
w.tasksMu.Unlock()
// Добавить в recent ring
w.recentMu.Lock()
w.recentKeys = append(w.recentKeys, task.ID)
if len(w.recentKeys) > maxRecentTasks {
w.recentKeys = w.recentKeys[len(w.recentKeys)-maxRecentTasks:]
}
w.recentMu.Unlock()
// Отправить в очередь (non-blocking — если очередь полна, вернуть ошибку через Status)
select {
case w.taskQueue <- task:
default:
w.tasksMu.Lock()
task.Status = TaskFailed
task.Error = "task queue is full — agent is overloaded"
w.tasksMu.Unlock()
log.Printf("[AgentWorker] WARN: task queue full, task %s rejected", task.ID)
}
return task
}
// processTask выполняет задачу через LLM loop и обновляет её статус.
func (w *AgentWorker) processTask(ctx context.Context, task *Task) {
now := time.Now()
w.tasksMu.Lock()
task.Status = TaskRunning
task.StartedAt = &now
w.tasksMu.Unlock()
// Выполняем чат
chatCtx, cancel := context.WithTimeout(ctx, time.Duration(task.TimeoutSecs)*time.Second)
defer cancel()
messages := []ChatMessage{{Role: "user", Content: task.Input}}
resp := w.runChat(chatCtx, messages, "", defaultMaxIter)
doneAt := time.Now()
w.tasksMu.Lock()
task.DoneAt = &doneAt
task.ToolCalls = resp.ToolCalls
if resp.Success {
task.Status = TaskDone
task.Result = resp.Response
} else {
task.Status = TaskFailed
task.Error = resp.Error
}
w.tasksMu.Unlock()
log.Printf("[AgentWorker] task %s done: status=%s", task.ID, task.Status)
// Отправить результат на callback URL если задан
if task.CallbackURL != "" {
go w.postCallback(task)
}
// Сохранить в DB history
if w.database != nil {
go func() {
userMsg := task.Input
agentResp := task.Result
if task.Status == TaskFailed {
agentResp = "[ERROR] " + task.Error
}
w.database.SaveHistory(db.HistoryInput{
AgentID: w.agentID,
UserMessage: userMsg,
AgentResponse: agentResp,
})
}()
}
}
// runChat — основной LLM loop агента.
func (w *AgentWorker) runChat(ctx context.Context, messages []ChatMessage, overrideModel string, maxIter int) ChatResponse {
model := w.cfg.Model
if overrideModel != "" {
model = overrideModel
}
if maxIter <= 0 {
maxIter = defaultMaxIter
}
// Собрать контекст: системный промпт + история + новые сообщения
conv := []llm.Message{}
if w.cfg.SystemPrompt != "" {
conv = append(conv, llm.Message{Role: "system", Content: w.cfg.SystemPrompt})
}
// Загрузить sliding window памяти из DB
if w.database != nil {
history, err := w.database.GetAgentHistory(w.agentID, 20)
if err == nil {
for _, h := range history {
conv = append(conv, llm.Message{Role: "user", Content: h.UserMessage})
if h.AgentResponse != "" {
conv = append(conv, llm.Message{Role: "assistant", Content: h.AgentResponse})
}
}
}
}
// Добавить текущие сообщения
for _, m := range messages {
conv = append(conv, llm.Message{Role: m.Role, Content: m.Content})
}
// Получить доступные инструменты агента
agentTools := w.getAgentTools()
temp := w.cfg.Temperature
maxTok := w.cfg.MaxTokens
if maxTok == 0 {
maxTok = 4096
}
var toolCallSteps []ToolCallStep
var finalResponse string
var lastModel string
for iter := 0; iter < maxIter; iter++ {
req := llm.ChatRequest{
Model: model,
Messages: conv,
Temperature: &temp,
MaxTokens: &maxTok,
}
if len(agentTools) > 0 {
req.Tools = agentTools
req.ToolChoice = "auto"
}
resp, err := w.llm.Chat(ctx, req)
if err != nil {
// Fallback без инструментов
req.Tools = nil
req.ToolChoice = ""
resp2, err2 := w.llm.Chat(ctx, req)
if err2 != nil {
return ChatResponse{
Success: false,
Error: fmt.Sprintf("LLM error (model: %s): %v", model, err2),
}
}
if len(resp2.Choices) > 0 {
finalResponse = resp2.Choices[0].Message.Content
lastModel = resp2.Model
}
break
}
if len(resp.Choices) == 0 {
break
}
choice := resp.Choices[0]
lastModel = resp.Model
if lastModel == "" {
lastModel = model
}
// Инструменты?
if choice.FinishReason == "tool_calls" && len(choice.Message.ToolCalls) > 0 {
conv = append(conv, choice.Message)
for _, tc := range choice.Message.ToolCalls {
start := time.Now()
result := w.executor.Execute(ctx, tc.Function.Name, tc.Function.Arguments)
step := ToolCallStep{
Tool: tc.Function.Name,
Success: result.Success,
DurationMs: time.Since(start).Milliseconds(),
}
var argsMap any
_ = json.Unmarshal([]byte(tc.Function.Arguments), &argsMap)
step.Args = argsMap
var toolContent string
if result.Success {
step.Result = result.Result
b, _ := json.Marshal(result.Result)
toolContent = string(b)
} else {
step.Error = result.Error
toolContent = fmt.Sprintf(`{"error": %q}`, result.Error)
}
toolCallSteps = append(toolCallSteps, step)
conv = append(conv, llm.Message{
Role: "tool",
Content: toolContent,
ToolCallID: tc.ID,
Name: tc.Function.Name,
})
}
continue
}
finalResponse = choice.Message.Content
break
}
return ChatResponse{
Success: true,
Response: finalResponse,
ToolCalls: toolCallSteps,
Model: lastModel,
}
}
// getAgentTools возвращает только те инструменты, которые разрешены агенту.
func (w *AgentWorker) getAgentTools() []llm.Tool {
allTools := tools.OrchestratorTools()
allowed := make(map[string]bool, len(w.cfg.AllowedTools))
for _, t := range w.cfg.AllowedTools {
allowed[t] = true
}
// Если allowedTools пуст — агент получает базовый набор (http_request, file_read)
if len(allowed) == 0 {
allowed = map[string]bool{
"http_request": true,
"file_read": true,
"file_list": true,
}
}
var result []llm.Tool
for _, td := range allTools {
if allowed[td.Function.Name] {
result = append(result, llm.Tool{
Type: td.Type,
Function: llm.ToolFunction{
Name: td.Function.Name,
Description: td.Function.Description,
Parameters: td.Function.Parameters,
},
})
}
}
return result
}
// postCallback отправляет результат задачи на callback URL.
func (w *AgentWorker) postCallback(task *Task) {
w.tasksMu.RLock()
payload, _ := json.Marshal(task)
w.tasksMu.RUnlock()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, task.CallbackURL,
bytes.NewReader(payload))
if err != nil {
log.Printf("[AgentWorker] callback URL invalid for task %s: %v", task.ID, err)
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("[AgentWorker] callback failed for task %s: %v", task.ID, err)
return
}
resp.Body.Close()
log.Printf("[AgentWorker] callback sent for task %s → %s (status %d)",
task.ID, task.CallbackURL, resp.StatusCode)
}
// ─── HTTP Handlers ────────────────────────────────────────────────────────────
func (w *AgentWorker) handleHealth(rw http.ResponseWriter, r *http.Request) {
json.NewEncoder(rw).Encode(map[string]any{
"status": "ok",
"agentId": w.agentID,
"name": w.cfg.Name,
"model": w.cfg.Model,
"queueLen": len(w.taskQueue),
})
}
func (w *AgentWorker) handleInfo(rw http.ResponseWriter, r *http.Request) {
json.NewEncoder(rw).Encode(map[string]any{
"id": w.cfg.ID,
"name": w.cfg.Name,
"role": w.cfg.Model,
"model": w.cfg.Model,
"allowedTools": w.cfg.AllowedTools,
"isSystem": w.cfg.IsSystem,
})
}
func (w *AgentWorker) handleChat(rw http.ResponseWriter, r *http.Request) {
var req ChatRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(rw, `{"error":"invalid request body"}`, http.StatusBadRequest)
return
}
if len(req.Messages) == 0 {
http.Error(rw, `{"error":"messages required"}`, http.StatusBadRequest)
return
}
timeout := w.cfg.MaxTokens / 10 // грубая оценка
if timeout < 30 {
timeout = 30
}
if timeout > 300 {
timeout = 300
}
ctx, cancel := context.WithTimeout(r.Context(), time.Duration(timeout)*time.Second)
defer cancel()
resp := w.runChat(ctx, req.Messages, req.Model, req.MaxIter)
rw.Header().Set("Content-Type", "application/json")
json.NewEncoder(rw).Encode(resp)
}
func (w *AgentWorker) handleTask(rw http.ResponseWriter, r *http.Request) {
var req TaskRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(rw, `{"error":"invalid request body"}`, http.StatusBadRequest)
return
}
if req.Input == "" {
http.Error(rw, `{"error":"input required"}`, http.StatusBadRequest)
return
}
task := w.EnqueueTask(req)
rw.Header().Set("Content-Type", "application/json")
rw.WriteHeader(http.StatusAccepted)
json.NewEncoder(rw).Encode(map[string]any{
"task_id": task.ID,
"status": task.Status,
"agent_id": w.agentID,
"queue_len": len(w.taskQueue),
})
}
func (w *AgentWorker) handleListTasks(rw http.ResponseWriter, r *http.Request) {
w.recentMu.Lock()
keys := make([]string, len(w.recentKeys))
copy(keys, w.recentKeys)
w.recentMu.Unlock()
w.tasksMu.RLock()
result := make([]*Task, 0, len(keys))
for i := len(keys) - 1; i >= 0; i-- {
if t, ok := w.tasks[keys[i]]; ok {
result = append(result, t)
}
}
w.tasksMu.RUnlock()
rw.Header().Set("Content-Type", "application/json")
json.NewEncoder(rw).Encode(map[string]any{
"tasks": result,
"total": len(result),
"queueLen": len(w.taskQueue),
})
}
func (w *AgentWorker) handleGetTask(rw http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "id")
w.tasksMu.RLock()
task, ok := w.tasks[taskID]
w.tasksMu.RUnlock()
if !ok {
http.Error(rw, `{"error":"task not found"}`, http.StatusNotFound)
return
}
rw.Header().Set("Content-Type", "application/json")
json.NewEncoder(rw).Encode(task)
}
func (w *AgentWorker) handleMemory(rw http.ResponseWriter, r *http.Request) {
limitStr := r.URL.Query().Get("limit")
limit := 20
if n, err := strconv.Atoi(limitStr); err == nil && n > 0 && n <= 100 {
limit = n
}
if w.database == nil {
rw.Header().Set("Content-Type", "application/json")
json.NewEncoder(rw).Encode(map[string]any{"messages": []any{}, "total": 0})
return
}
history, err := w.database.GetAgentHistory(w.agentID, limit)
if err != nil {
http.Error(rw, `{"error":"failed to load history"}`, http.StatusInternalServerError)
return
}
rw.Header().Set("Content-Type", "application/json")
json.NewEncoder(rw).Encode(map[string]any{
"agent_id": w.agentID,
"messages": history,
"total": len(history),
})
}
// ─── Main ─────────────────────────────────────────────────────────────────────
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
_ = godotenv.Load("../.env")
_ = godotenv.Load(".env")
// ── Конфиг из env ────────────────────────────────────────────────────────
agentIDStr := os.Getenv("AGENT_ID")
if agentIDStr == "" {
log.Fatal("[AgentWorker] AGENT_ID env var is required")
}
agentID, err := strconv.Atoi(agentIDStr)
if err != nil || agentID <= 0 {
log.Fatalf("[AgentWorker] AGENT_ID must be a positive integer, got: %q", agentIDStr)
}
port := os.Getenv("AGENT_PORT")
if port == "" {
port = "8001"
}
llmBaseURL := getEnvFirst("LLM_BASE_URL", "OLLAMA_BASE_URL")
if llmBaseURL == "" {
llmBaseURL = "https://ollama.com/v1"
}
llmAPIKey := getEnvFirst("LLM_API_KEY", "OLLAMA_API_KEY")
dbURL := os.Getenv("DATABASE_URL")
if dbURL == "" {
log.Fatal("[AgentWorker] DATABASE_URL env var is required")
}
log.Printf("[AgentWorker] Starting: AGENT_ID=%d PORT=%s LLM=%s", agentID, port, llmBaseURL)
// ── DB ───────────────────────────────────────────────────────────────────
database, err := db.Connect(dbURL)
if err != nil {
log.Fatalf("[AgentWorker] DB connection failed: %v", err)
}
defer database.Close()
// ── LLM Client ───────────────────────────────────────────────────────────
llmClient := llm.NewClient(llmBaseURL, llmAPIKey)
// ── Agent Worker ─────────────────────────────────────────────────────────
worker, err := newAgentWorker(agentID, database, llmClient)
if err != nil {
log.Fatalf("[AgentWorker] init failed: %v", err)
}
// ── Background workers ───────────────────────────────────────────────────
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
worker.StartWorkers(ctx)
// ── Router ───────────────────────────────────────────────────────────────
r := chi.NewRouter()
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(middleware.Logger)
r.Use(middleware.Recoverer)
r.Use(cors.Handler(cors.Options{
AllowedOrigins: []string{"*"},
AllowedMethods: []string{"GET", "POST", "OPTIONS"},
AllowedHeaders: []string{"Content-Type", "Authorization", "X-Agent-ID"},
}))
r.Get("/health", worker.handleHealth)
r.Get("/info", worker.handleInfo)
r.Post("/chat", worker.handleChat)
r.Post("/task", worker.handleTask)
r.Get("/tasks", worker.handleListTasks)
r.Get("/tasks/{id}", worker.handleGetTask)
r.Get("/memory", worker.handleMemory)
// ── HTTP Server ───────────────────────────────────────────────────────────
srv := &http.Server{
Addr: ":" + port,
Handler: r,
ReadTimeout: 30 * time.Second,
WriteTimeout: 310 * time.Second, // > max task timeout
IdleTimeout: 120 * time.Second,
}
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
go func() {
log.Printf("[AgentWorker] agent-id=%d listening on :%s", agentID, port)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("[AgentWorker] server error: %v", err)
}
}()
<-quit
log.Println("[AgentWorker] shutting down gracefully...")
cancel() // stop task workers
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
defer shutdownCancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Printf("[AgentWorker] shutdown error: %v", err)
}
log.Println("[AgentWorker] stopped.")
}
func getEnvFirst(keys ...string) string {
for _, k := range keys {
if v := os.Getenv(k); v != "" {
return v
}
}
return ""
}