diff --git a/gateway/internal/api/handlers.go b/gateway/internal/api/handlers.go index b897b00..e9e9078 100644 --- a/gateway/internal/api/handlers.go +++ b/gateway/internal/api/handlers.go @@ -172,6 +172,25 @@ func (h *Handler) OrchestratorStream(w http.ResponseWriter, r *http.Request) { log.Printf("[API] POST /api/orchestrator/stream — messages=%d model=%q", len(req.Messages), req.Model) + // Extract the last user message for history/metrics storage + userMessage := "" + for i := len(req.Messages) - 1; i >= 0; i-- { + if req.Messages[i].Role == "user" { + userMessage = req.Messages[i].Content + break + } + } + + // Determine orchestrator agent ID (look for isOrchestrator=1 in DB) + orchAgentID := 1 // fallback to agent ID 1 + if h.db != nil { + if cfg, err := h.db.GetOrchestratorConfig(); err == nil && cfg != nil { + orchAgentID = cfg.ID + } + } + + startTime := time.Now() + ctx, cancel := context.WithTimeout(r.Context(), time.Duration(h.cfg.RequestTimeoutSecs)*time.Second) defer cancel() @@ -216,6 +235,27 @@ func (h *Handler) OrchestratorStream(w http.ResponseWriter, r *http.Request) { writeSSE(w, flusher, streamEvent{Type: sseEventError, Error: result.Error}) fmt.Fprintf(w, "data: [DONE]\n\n") flusher.Flush() + // Persist error metric + history (fire-and-forget goroutine) + if h.db != nil { + go func() { + reqID := fmt.Sprintf("orch-%d", time.Now().UnixNano()) + h.db.SaveMetric(db.MetricInput{ + AgentID: orchAgentID, + RequestID: reqID, + UserMessage: userMessage, + ProcessingTimeMs: time.Since(startTime).Milliseconds(), + Status: "error", + ErrorMessage: result.Error, + Model: result.Model, + }) + h.db.SaveHistory(db.HistoryInput{ + AgentID: orchAgentID, + UserMessage: userMessage, + AgentResponse: "", + Status: "error", + }) + }() + } return } @@ -248,6 +288,42 @@ func (h *Handler) OrchestratorStream(w http.ResponseWriter, r *http.Request) { }) fmt.Fprintf(w, "data: [DONE]\n\n") flusher.Flush() + + // Persist metrics + history asynchronously (never blocks the response) + if h.db != nil { + go func() { + reqID := fmt.Sprintf("orch-%d", time.Now().UnixNano()) + var inputTok, outputTok, totalTok int + if result.Usage != nil { + inputTok = result.Usage.PromptTokens + outputTok = result.Usage.CompletionTokens + totalTok = result.Usage.TotalTokens + } + toolNames := make([]string, len(result.ToolCalls)) + for i, tc := range result.ToolCalls { + toolNames[i] = tc.Tool + } + h.db.SaveMetric(db.MetricInput{ + AgentID: orchAgentID, + RequestID: reqID, + UserMessage: userMessage, + AgentResponse: result.Response, + InputTokens: inputTok, + OutputTokens: outputTok, + TotalTokens: totalTok, + ProcessingTimeMs: time.Since(startTime).Milliseconds(), + Status: "success", + ToolsCalled: toolNames, + Model: result.Model, + }) + h.db.SaveHistory(db.HistoryInput{ + AgentID: orchAgentID, + UserMessage: userMessage, + AgentResponse: result.Response, + Status: "success", + }) + }() + } } // ─── Providers Reload ───────────────────────────────────────────────────────── diff --git a/gateway/internal/db/db.go b/gateway/internal/db/db.go index 9d7eb3b..f4365db 100644 --- a/gateway/internal/db/db.go +++ b/gateway/internal/db/db.go @@ -149,6 +149,98 @@ func (d *DB) GetActiveProvider() (*ProviderRow, error) { return &p, nil } +// ─── Metrics & History ──────────────────────────────────────────────────────── + +// MetricInput holds data for a single orchestrator request metric. +type MetricInput struct { + AgentID int + RequestID string + UserMessage string + AgentResponse string + InputTokens int + OutputTokens int + TotalTokens int + ProcessingTimeMs int64 + Status string // "success" | "error" | "timeout" + ErrorMessage string + ToolsCalled []string + Model string +} + +// SaveMetric inserts a row into the agentMetrics table. +// Non-fatal — logs on error but does not return one. +func (d *DB) SaveMetric(m MetricInput) { + if d.conn == nil { + return + } + toolsJSON, _ := json.Marshal(m.ToolsCalled) + _, err := d.conn.Exec(` + INSERT INTO agentMetrics + (agentId, requestId, userMessage, agentResponse, + inputTokens, outputTokens, totalTokens, + processingTimeMs, status, errorMessage, toolsCalled, model) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, + m.AgentID, + m.RequestID, + truncate(m.UserMessage, 65535), + truncate(m.AgentResponse, 65535), + m.InputTokens, m.OutputTokens, m.TotalTokens, + m.ProcessingTimeMs, + m.Status, + m.ErrorMessage, + string(toolsJSON), + m.Model, + ) + if err != nil { + log.Printf("[DB] SaveMetric error: %v", err) + } +} + +// HistoryInput holds data for one conversation entry. +type HistoryInput struct { + AgentID int + UserMessage string + AgentResponse string + ConversationID string + Status string // "success" | "error" | "pending" +} + +// SaveHistory inserts a row into the agentHistory table. +// Non-fatal — logs on error but does not return one. +func (d *DB) SaveHistory(h HistoryInput) { + if d.conn == nil { + return + } + status := h.Status + if status == "" { + status = "success" + } + convID := sql.NullString{String: h.ConversationID, Valid: h.ConversationID != ""} + resp := sql.NullString{String: h.AgentResponse, Valid: h.AgentResponse != ""} + _, err := d.conn.Exec(` + INSERT INTO agentHistory (agentId, userMessage, agentResponse, conversationId, status) + VALUES (?, ?, ?, ?, ?) + `, + h.AgentID, + truncate(h.UserMessage, 65535), + resp, + convID, + status, + ) + if err != nil { + log.Printf("[DB] SaveHistory error: %v", err) + } +} + +// truncate caps a string to maxLen bytes (not runes — fast path for DB limits). +func truncate(s string, maxLen int) string { + if len(s) <= maxLen { + return s + } + return s[:maxLen] +} + // ─── Helpers ────────────────────────────────────────────────────────────────── func scanAgentConfig(row *sql.Row) (*AgentConfig, error) { diff --git a/server/routers.ts b/server/routers.ts index b333101..83df47d 100644 --- a/server/routers.ts +++ b/server/routers.ts @@ -387,6 +387,20 @@ export const appRouter = router({ status: "success", }); + // Save metric + const { saveMetric } = await import("./agents"); + await saveMetric(input.agentId, { + userMessage: input.message, + agentResponse: response, + inputTokens: result.usage?.prompt_tokens ?? 0, + outputTokens: result.usage?.completion_tokens ?? 0, + totalTokens: result.usage?.total_tokens ?? 0, + processingTimeMs, + status: "success", + toolsCalled: [], + model: result.model ?? agent.model, + }).catch(() => {}); // non-fatal + return { success: true as const, response, @@ -395,12 +409,22 @@ export const appRouter = router({ processingTimeMs, }; } catch (err: any) { + const processingTimeMs = Date.now() - startTime; await saveHistory(input.agentId, { userMessage: input.message, agentResponse: null, conversationId: input.conversationId, status: "error", }); + const { saveMetric } = await import("./agents"); + saveMetric(input.agentId, { + userMessage: input.message, + processingTimeMs, + status: "error", + errorMessage: err.message, + toolsCalled: [], + model: agent.model, + }).catch(() => {}); // non-fatal return { success: false as const, response: "",