// GoClaw Agent Worker — автономный HTTP-сервер агента. // // Каждый агент запускается как отдельный Docker Swarm service. // Загружает свой конфиг из общей DB по AGENT_ID, выполняет LLM loop // и принимает параллельные задачи от Orchestrator и других агентов. // // Endpoints: // // GET /health — liveness probe // GET /info — конфиг агента (имя, модель, роль) // POST /chat — синхронный чат (LLM loop, ждёт ответ) // POST /task — поставить задачу в очередь (async, возвращает task_id) // GET /tasks — список задач агента (active + recent) // GET /tasks/{id} — статус конкретной задачи // GET /memory — последние N сообщений из истории агента package main import ( "bytes" "context" "encoding/json" "fmt" "log" "net/http" "os" "os/signal" "strconv" "sync" "syscall" "time" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/go-chi/cors" "github.com/google/uuid" "github.com/joho/godotenv" "git.softuniq.eu/UniqAI/GoClaw/gateway/internal/db" "git.softuniq.eu/UniqAI/GoClaw/gateway/internal/llm" "git.softuniq.eu/UniqAI/GoClaw/gateway/internal/tools" ) // ─── Task types ────────────────────────────────────────────────────────────── type TaskStatus string const ( TaskPending TaskStatus = "pending" TaskRunning TaskStatus = "running" TaskDone TaskStatus = "done" TaskFailed TaskStatus = "failed" TaskCancelled TaskStatus = "cancelled" ) // Task — единица работы агента, принятая через /task. type Task struct { ID string `json:"id"` FromAgentID int `json:"from_agent_id,omitempty"` // кто делегировал (0 = человек) Input string `json:"input"` // текст задачи CallbackURL string `json:"callback_url,omitempty"` // куда POST результат Priority int `json:"priority"` // 0=normal, 1=high TimeoutSecs int `json:"timeout_secs"` Status TaskStatus `json:"status"` Result string `json:"result,omitempty"` Error string `json:"error,omitempty"` ToolCalls []ToolCallStep `json:"tool_calls,omitempty"` CreatedAt time.Time `json:"created_at"` StartedAt *time.Time `json:"started_at,omitempty"` DoneAt *time.Time `json:"done_at,omitempty"` } // ToolCallStep — шаг вызова инструмента для отображения в UI. type ToolCallStep struct { Tool string `json:"tool"` Args any `json:"args"` Result any `json:"result,omitempty"` Error string `json:"error,omitempty"` Success bool `json:"success"` DurationMs int64 `json:"duration_ms"` } // ChatMessage — сообщение в формате для /chat endpoint. type ChatMessage struct { Role string `json:"role"` Content string `json:"content"` } // ChatRequest — запрос на /chat (синхронный). type ChatRequest struct { Messages []ChatMessage `json:"messages"` Model string `json:"model,omitempty"` // override модели агента MaxIter int `json:"max_iter,omitempty"` // override max iterations } // ChatResponse — ответ /chat. type ChatResponse struct { Success bool `json:"success"` Response string `json:"response"` ToolCalls []ToolCallStep `json:"tool_calls"` Model string `json:"model"` Error string `json:"error,omitempty"` } // TaskRequest — запрос на /task (async). type TaskRequest struct { Input string `json:"input"` FromAgentID int `json:"from_agent_id,omitempty"` CallbackURL string `json:"callback_url,omitempty"` Priority int `json:"priority,omitempty"` TimeoutSecs int `json:"timeout_secs,omitempty"` } // ─── Agent Worker ───────────────────────────────────────────────────────────── type AgentWorker struct { agentID int cfg *db.AgentConfig llm *llm.Client database *db.DB executor *tools.Executor // Task queue — buffered channel taskQueue chan *Task // Task store — in-memory (id → Task) tasksMu sync.RWMutex tasks map[string]*Task // Recent tasks ring buffer (для GET /tasks) recentMu sync.Mutex recentKeys []string } const ( taskQueueDepth = 100 maxRecentTasks = 50 defaultMaxIter = 8 defaultTimeout = 120 workerGoroutines = 4 // параллельных воркеров на агента ) func newAgentWorker(agentID int, database *db.DB, llmClient *llm.Client) (*AgentWorker, error) { cfg, err := database.GetAgentByID(agentID) if err != nil { return nil, fmt.Errorf("agent %d not found in DB: %w", agentID, err) } log.Printf("[AgentWorker] Loaded config: id=%d name=%q model=%s", cfg.ID, cfg.Name, cfg.Model) w := &AgentWorker{ agentID: agentID, cfg: cfg, llm: llmClient, database: database, taskQueue: make(chan *Task, taskQueueDepth), tasks: make(map[string]*Task), } // Tool executor: агент использует подмножество инструментов из allowedTools w.executor = tools.NewExecutor("/app", func() ([]map[string]any, error) { rows, err := database.ListAgents() if err != nil { return nil, err } result := make([]map[string]any, len(rows)) for i, r := range rows { result[i] = map[string]any{ "id": r.ID, "name": r.Name, "role": r.Role, "model": r.Model, "isActive": r.IsActive, } } return result, nil }) return w, nil } // StartWorkers запускает N горутин-воркеров, читающих из taskQueue. func (w *AgentWorker) StartWorkers(ctx context.Context) { for i := 0; i < workerGoroutines; i++ { go w.runWorker(ctx, i) } log.Printf("[AgentWorker] %d worker goroutines started", workerGoroutines) } func (w *AgentWorker) runWorker(ctx context.Context, workerID int) { for { select { case <-ctx.Done(): log.Printf("[Worker-%d] shutting down", workerID) return case task := <-w.taskQueue: log.Printf("[Worker-%d] processing task %s", workerID, task.ID) w.processTask(ctx, task) } } } // EnqueueTask добавляет задачу в очередь и в хранилище. func (w *AgentWorker) EnqueueTask(req TaskRequest) *Task { timeout := req.TimeoutSecs if timeout <= 0 { timeout = defaultTimeout } task := &Task{ ID: uuid.New().String(), FromAgentID: req.FromAgentID, Input: req.Input, CallbackURL: req.CallbackURL, Priority: req.Priority, TimeoutSecs: timeout, Status: TaskPending, CreatedAt: time.Now(), } // Сохранить в store w.tasksMu.Lock() w.tasks[task.ID] = task w.tasksMu.Unlock() // Добавить в recent ring w.recentMu.Lock() w.recentKeys = append(w.recentKeys, task.ID) if len(w.recentKeys) > maxRecentTasks { w.recentKeys = w.recentKeys[len(w.recentKeys)-maxRecentTasks:] } w.recentMu.Unlock() // Отправить в очередь (non-blocking — если очередь полна, вернуть ошибку через Status) select { case w.taskQueue <- task: default: w.tasksMu.Lock() task.Status = TaskFailed task.Error = "task queue is full — agent is overloaded" w.tasksMu.Unlock() log.Printf("[AgentWorker] WARN: task queue full, task %s rejected", task.ID) } return task } // processTask выполняет задачу через LLM loop и обновляет её статус. func (w *AgentWorker) processTask(ctx context.Context, task *Task) { now := time.Now() w.tasksMu.Lock() task.Status = TaskRunning task.StartedAt = &now w.tasksMu.Unlock() // Выполняем чат chatCtx, cancel := context.WithTimeout(ctx, time.Duration(task.TimeoutSecs)*time.Second) defer cancel() messages := []ChatMessage{{Role: "user", Content: task.Input}} resp := w.runChat(chatCtx, messages, "", defaultMaxIter) doneAt := time.Now() w.tasksMu.Lock() task.DoneAt = &doneAt task.ToolCalls = resp.ToolCalls if resp.Success { task.Status = TaskDone task.Result = resp.Response } else { task.Status = TaskFailed task.Error = resp.Error } w.tasksMu.Unlock() log.Printf("[AgentWorker] task %s done: status=%s", task.ID, task.Status) // Отправить результат на callback URL если задан if task.CallbackURL != "" { go w.postCallback(task) } // Сохранить в DB history if w.database != nil { go func() { userMsg := task.Input agentResp := task.Result if task.Status == TaskFailed { agentResp = "[ERROR] " + task.Error } w.database.SaveHistory(db.HistoryInput{ AgentID: w.agentID, UserMessage: userMsg, AgentResponse: agentResp, }) }() } } // runChat — основной LLM loop агента. func (w *AgentWorker) runChat(ctx context.Context, messages []ChatMessage, overrideModel string, maxIter int) ChatResponse { model := w.cfg.Model if overrideModel != "" { model = overrideModel } if maxIter <= 0 { maxIter = defaultMaxIter } // Собрать контекст: системный промпт + история + новые сообщения conv := []llm.Message{} if w.cfg.SystemPrompt != "" { conv = append(conv, llm.Message{Role: "system", Content: w.cfg.SystemPrompt}) } // Загрузить sliding window памяти из DB if w.database != nil { history, err := w.database.GetAgentHistory(w.agentID, 20) if err == nil { for _, h := range history { conv = append(conv, llm.Message{Role: "user", Content: h.UserMessage}) if h.AgentResponse != "" { conv = append(conv, llm.Message{Role: "assistant", Content: h.AgentResponse}) } } } } // Добавить текущие сообщения for _, m := range messages { conv = append(conv, llm.Message{Role: m.Role, Content: m.Content}) } // Получить доступные инструменты агента agentTools := w.getAgentTools() temp := w.cfg.Temperature maxTok := w.cfg.MaxTokens if maxTok == 0 { maxTok = 4096 } var toolCallSteps []ToolCallStep var finalResponse string var lastModel string for iter := 0; iter < maxIter; iter++ { req := llm.ChatRequest{ Model: model, Messages: conv, Temperature: &temp, MaxTokens: &maxTok, } if len(agentTools) > 0 { req.Tools = agentTools req.ToolChoice = "auto" } resp, err := w.llm.Chat(ctx, req) if err != nil { // Fallback без инструментов req.Tools = nil req.ToolChoice = "" resp2, err2 := w.llm.Chat(ctx, req) if err2 != nil { return ChatResponse{ Success: false, Error: fmt.Sprintf("LLM error (model: %s): %v", model, err2), } } if len(resp2.Choices) > 0 { finalResponse = resp2.Choices[0].Message.Content lastModel = resp2.Model } break } if len(resp.Choices) == 0 { break } choice := resp.Choices[0] lastModel = resp.Model if lastModel == "" { lastModel = model } // Инструменты? if choice.FinishReason == "tool_calls" && len(choice.Message.ToolCalls) > 0 { conv = append(conv, choice.Message) for _, tc := range choice.Message.ToolCalls { start := time.Now() result := w.executor.Execute(ctx, tc.Function.Name, tc.Function.Arguments) step := ToolCallStep{ Tool: tc.Function.Name, Success: result.Success, DurationMs: time.Since(start).Milliseconds(), } var argsMap any _ = json.Unmarshal([]byte(tc.Function.Arguments), &argsMap) step.Args = argsMap var toolContent string if result.Success { step.Result = result.Result b, _ := json.Marshal(result.Result) toolContent = string(b) } else { step.Error = result.Error toolContent = fmt.Sprintf(`{"error": %q}`, result.Error) } toolCallSteps = append(toolCallSteps, step) conv = append(conv, llm.Message{ Role: "tool", Content: toolContent, ToolCallID: tc.ID, Name: tc.Function.Name, }) } continue } finalResponse = choice.Message.Content break } return ChatResponse{ Success: true, Response: finalResponse, ToolCalls: toolCallSteps, Model: lastModel, } } // getAgentTools возвращает только те инструменты, которые разрешены агенту. func (w *AgentWorker) getAgentTools() []llm.Tool { allTools := tools.OrchestratorTools() allowed := make(map[string]bool, len(w.cfg.AllowedTools)) for _, t := range w.cfg.AllowedTools { allowed[t] = true } // Если allowedTools пуст — агент получает базовый набор (http_request, file_read) if len(allowed) == 0 { allowed = map[string]bool{ "http_request": true, "file_read": true, "file_list": true, } } var result []llm.Tool for _, td := range allTools { if allowed[td.Function.Name] { result = append(result, llm.Tool{ Type: td.Type, Function: llm.ToolFunction{ Name: td.Function.Name, Description: td.Function.Description, Parameters: td.Function.Parameters, }, }) } } return result } // postCallback отправляет результат задачи на callback URL. func (w *AgentWorker) postCallback(task *Task) { w.tasksMu.RLock() payload, _ := json.Marshal(task) w.tasksMu.RUnlock() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodPost, task.CallbackURL, bytes.NewReader(payload)) if err != nil { log.Printf("[AgentWorker] callback URL invalid for task %s: %v", task.ID, err) return } req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { log.Printf("[AgentWorker] callback failed for task %s: %v", task.ID, err) return } resp.Body.Close() log.Printf("[AgentWorker] callback sent for task %s → %s (status %d)", task.ID, task.CallbackURL, resp.StatusCode) } // ─── HTTP Handlers ──────────────────────────────────────────────────────────── func (w *AgentWorker) handleHealth(rw http.ResponseWriter, r *http.Request) { json.NewEncoder(rw).Encode(map[string]any{ "status": "ok", "agentId": w.agentID, "name": w.cfg.Name, "model": w.cfg.Model, "queueLen": len(w.taskQueue), }) } func (w *AgentWorker) handleInfo(rw http.ResponseWriter, r *http.Request) { json.NewEncoder(rw).Encode(map[string]any{ "id": w.cfg.ID, "name": w.cfg.Name, "role": w.cfg.Model, "model": w.cfg.Model, "allowedTools": w.cfg.AllowedTools, "isSystem": w.cfg.IsSystem, }) } func (w *AgentWorker) handleChat(rw http.ResponseWriter, r *http.Request) { var req ChatRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(rw, `{"error":"invalid request body"}`, http.StatusBadRequest) return } if len(req.Messages) == 0 { http.Error(rw, `{"error":"messages required"}`, http.StatusBadRequest) return } timeout := w.cfg.MaxTokens / 10 // грубая оценка if timeout < 30 { timeout = 30 } if timeout > 300 { timeout = 300 } ctx, cancel := context.WithTimeout(r.Context(), time.Duration(timeout)*time.Second) defer cancel() resp := w.runChat(ctx, req.Messages, req.Model, req.MaxIter) rw.Header().Set("Content-Type", "application/json") json.NewEncoder(rw).Encode(resp) } func (w *AgentWorker) handleTask(rw http.ResponseWriter, r *http.Request) { var req TaskRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(rw, `{"error":"invalid request body"}`, http.StatusBadRequest) return } if req.Input == "" { http.Error(rw, `{"error":"input required"}`, http.StatusBadRequest) return } task := w.EnqueueTask(req) rw.Header().Set("Content-Type", "application/json") rw.WriteHeader(http.StatusAccepted) json.NewEncoder(rw).Encode(map[string]any{ "task_id": task.ID, "status": task.Status, "agent_id": w.agentID, "queue_len": len(w.taskQueue), }) } func (w *AgentWorker) handleListTasks(rw http.ResponseWriter, r *http.Request) { w.recentMu.Lock() keys := make([]string, len(w.recentKeys)) copy(keys, w.recentKeys) w.recentMu.Unlock() w.tasksMu.RLock() result := make([]*Task, 0, len(keys)) for i := len(keys) - 1; i >= 0; i-- { if t, ok := w.tasks[keys[i]]; ok { result = append(result, t) } } w.tasksMu.RUnlock() rw.Header().Set("Content-Type", "application/json") json.NewEncoder(rw).Encode(map[string]any{ "tasks": result, "total": len(result), "queueLen": len(w.taskQueue), }) } func (w *AgentWorker) handleGetTask(rw http.ResponseWriter, r *http.Request) { taskID := chi.URLParam(r, "id") w.tasksMu.RLock() task, ok := w.tasks[taskID] w.tasksMu.RUnlock() if !ok { http.Error(rw, `{"error":"task not found"}`, http.StatusNotFound) return } rw.Header().Set("Content-Type", "application/json") json.NewEncoder(rw).Encode(task) } func (w *AgentWorker) handleMemory(rw http.ResponseWriter, r *http.Request) { limitStr := r.URL.Query().Get("limit") limit := 20 if n, err := strconv.Atoi(limitStr); err == nil && n > 0 && n <= 100 { limit = n } if w.database == nil { rw.Header().Set("Content-Type", "application/json") json.NewEncoder(rw).Encode(map[string]any{"messages": []any{}, "total": 0}) return } history, err := w.database.GetAgentHistory(w.agentID, limit) if err != nil { http.Error(rw, `{"error":"failed to load history"}`, http.StatusInternalServerError) return } rw.Header().Set("Content-Type", "application/json") json.NewEncoder(rw).Encode(map[string]any{ "agent_id": w.agentID, "messages": history, "total": len(history), }) } // ─── Main ───────────────────────────────────────────────────────────────────── func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) _ = godotenv.Load("../.env") _ = godotenv.Load(".env") // ── Конфиг из env ──────────────────────────────────────────────────────── agentIDStr := os.Getenv("AGENT_ID") if agentIDStr == "" { log.Fatal("[AgentWorker] AGENT_ID env var is required") } agentID, err := strconv.Atoi(agentIDStr) if err != nil || agentID <= 0 { log.Fatalf("[AgentWorker] AGENT_ID must be a positive integer, got: %q", agentIDStr) } port := os.Getenv("AGENT_PORT") if port == "" { port = "8001" } llmBaseURL := getEnvFirst("LLM_BASE_URL", "OLLAMA_BASE_URL") if llmBaseURL == "" { llmBaseURL = "https://ollama.com/v1" } llmAPIKey := getEnvFirst("LLM_API_KEY", "OLLAMA_API_KEY") dbURL := os.Getenv("DATABASE_URL") if dbURL == "" { log.Fatal("[AgentWorker] DATABASE_URL env var is required") } log.Printf("[AgentWorker] Starting: AGENT_ID=%d PORT=%s LLM=%s", agentID, port, llmBaseURL) // ── DB ─────────────────────────────────────────────────────────────────── database, err := db.Connect(dbURL) if err != nil { log.Fatalf("[AgentWorker] DB connection failed: %v", err) } defer database.Close() // ── LLM Client ─────────────────────────────────────────────────────────── llmClient := llm.NewClient(llmBaseURL, llmAPIKey) // ── Agent Worker ───────────────────────────────────────────────────────── worker, err := newAgentWorker(agentID, database, llmClient) if err != nil { log.Fatalf("[AgentWorker] init failed: %v", err) } // ── Background workers ─────────────────────────────────────────────────── ctx, cancel := context.WithCancel(context.Background()) defer cancel() worker.StartWorkers(ctx) // ── Router ─────────────────────────────────────────────────────────────── r := chi.NewRouter() r.Use(middleware.RequestID) r.Use(middleware.RealIP) r.Use(middleware.Logger) r.Use(middleware.Recoverer) r.Use(cors.Handler(cors.Options{ AllowedOrigins: []string{"*"}, AllowedMethods: []string{"GET", "POST", "OPTIONS"}, AllowedHeaders: []string{"Content-Type", "Authorization", "X-Agent-ID"}, })) r.Get("/health", worker.handleHealth) r.Get("/info", worker.handleInfo) r.Post("/chat", worker.handleChat) r.Post("/task", worker.handleTask) r.Get("/tasks", worker.handleListTasks) r.Get("/tasks/{id}", worker.handleGetTask) r.Get("/memory", worker.handleMemory) // ── HTTP Server ─────────────────────────────────────────────────────────── srv := &http.Server{ Addr: ":" + port, Handler: r, ReadTimeout: 30 * time.Second, WriteTimeout: 310 * time.Second, // > max task timeout IdleTimeout: 120 * time.Second, } quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) go func() { log.Printf("[AgentWorker] agent-id=%d listening on :%s", agentID, port) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("[AgentWorker] server error: %v", err) } }() <-quit log.Println("[AgentWorker] shutting down gracefully...") cancel() // stop task workers shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second) defer shutdownCancel() if err := srv.Shutdown(shutdownCtx); err != nil { log.Printf("[AgentWorker] shutdown error: %v", err) } log.Println("[AgentWorker] stopped.") } func getEnvFirst(keys ...string) string { for _, k := range keys { if v := os.Getenv(k); v != "" { return v } } return "" }