feat(phase20): persistent background chat sessions — DB-backed polling architecture
ARCHITECTURE:
- Replace SSE stream (breaks on page reload) with DB-backed background sessions
- Go Gateway runs orchestrator in detached goroutine using context.Background()
(survives HTTP disconnect, page reload, and laptop sleep/shutdown)
- Every SSE event (thinking/tool_call/delta/done/error) is persisted to chatEvents table
- Session lifecycle stored in chatSessions table (running→done/error)
- Frontend polls GET /api/orchestrator/getEvents every 1.5 s until status=done
DB CHANGES:
- Migration 0005_chat_sessions.sql: chatSessions + chatEvents tables
- schema.ts: TypeScript types for chatSessions and chatEvents
- db.go: ChatSessionRow and ChatEventRow structs with proper json tags (camelCase)
- db.go: CreateSession, AppendEvent, MarkSessionDone, GetSession, GetEvents, GetRecentSessions
GO GATEWAY:
- handlers.go: StartChatSession — creates DB session, launches goroutine, returns {sessionId} immediately
- handlers.go: GetChatSession, GetChatEvents, ListChatSessions handlers
- main.go: routes POST /api/chat/session, GET /api/chat/session/{id}, GET /api/chat/session/{id}/events, GET /api/chat/sessions
- JSON tags added to ChatSessionRow/ChatEventRow so Go returns camelCase to frontend
NODE.JS SERVER:
- gateway-proxy.ts: startChatSession, getChatSession, getChatEvents, listChatSessions functions
- routers.ts: orchestrator.startSession, .getSession, .getEvents, .listSessions tRPC procedures
FRONTEND:
- chatStore.ts: completely rewritten — uses background sessions + localStorage-based polling resume
* send() calls orchestrator.startSession via tRPC (returns immediately)
* Stores sessionId in localStorage (goclaw-pending-sessions)
* Polls getEvents every 1.5 s, applies events to UI incrementally
* On page reload: _resumePendingSessions() checks pending sessions and resumes polling
* cancel() stops all active polls
- chatStore.ts: conversations persisted to localStorage (v3 key, survives page reload)
- Chat.tsx: updated status texts to 'Фоновая обработка…', 'Обработка в фоне…'
VERIFIED:
- POST /api/chat/session → {sessionId, status:'running'} in <100ms
- Poll events → thinking, delta('Привет!'), done after ~2s
- chatSessions table has rows with status=done, model, totalTokens
- Cyrillic stored correctly in UTF-8
- JSON fields are camelCase: id, sessionId, seq, eventType, content, toolName...
This commit is contained in:
@@ -1,16 +1,18 @@
|
||||
/**
|
||||
* chatStore — глобальный singleton для фонового чата.
|
||||
*
|
||||
* Проблема: React-компонент размонтируется при навигации, разрывая SSE-соединение.
|
||||
* Решение: держим состояние и fetch-соединение вне React-дерева в модуле.
|
||||
* Компонент подписывается через addEventListener и читает snapshotState().
|
||||
* Архитектура:
|
||||
* 1. Пользователь отправляет сообщение → POST /api/trpc/orchestrator.startSession
|
||||
* Go Gateway создаёт запись в chatSessions и запускает горутину фоново.
|
||||
* Ответ: { sessionId } — мгновенно, без ожидания LLM.
|
||||
* 2. Фронтенд опрашивает /api/trpc/orchestrator.getEvents каждые 1.5 сек,
|
||||
* применяя новые события к UI. Polling стартует заново при перезагрузке
|
||||
* страницы, т.к. sessionId хранится в localStorage.
|
||||
* 3. Когда status === "done" | "error" — опрос прекращается.
|
||||
*
|
||||
* Использование:
|
||||
* import { chatStore } from "@/lib/chatStore";
|
||||
* chatStore.send(messages) — запустить запрос
|
||||
* chatStore.getConversations() — получить список диалогов
|
||||
* chatStore.on("update", handler) — подписаться на обновления
|
||||
* chatStore.off("update", handler) — отписаться
|
||||
* Фоновые сессии (хранятся в localStorage):
|
||||
* goclaw-pending-sessions — Map<sessionId, convId> для возобновления опроса.
|
||||
* goclaw-conversations-v3 — список диалогов (сохраняется между загрузками).
|
||||
*/
|
||||
|
||||
import { nanoid } from "nanoid";
|
||||
@@ -37,6 +39,8 @@ export interface ChatMessage {
|
||||
usage?: { prompt_tokens: number; completion_tokens: number; total_tokens: number };
|
||||
isError?: boolean;
|
||||
isStreaming?: boolean;
|
||||
/** sessionId for background sessions — used to resume polling */
|
||||
sessionId?: string;
|
||||
}
|
||||
|
||||
export interface Conversation {
|
||||
@@ -66,12 +70,12 @@ type ConsoleHandler = (entry: ConsoleEntry) => void;
|
||||
|
||||
// ─── Persistence ──────────────────────────────────────────────────────────────
|
||||
|
||||
const STORAGE_KEY = "goclaw-conversations-v2";
|
||||
const CONSOLE_KEY = "goclaw-console-v2";
|
||||
const STORAGE_KEY = "goclaw-conversations-v3";
|
||||
const PENDING_KEY = "goclaw-pending-sessions"; // sessionId → convId
|
||||
|
||||
function loadConversations(): Conversation[] {
|
||||
try {
|
||||
const raw = sessionStorage.getItem(STORAGE_KEY);
|
||||
const raw = localStorage.getItem(STORAGE_KEY);
|
||||
if (!raw) return [];
|
||||
return JSON.parse(raw);
|
||||
} catch {
|
||||
@@ -81,27 +85,27 @@ function loadConversations(): Conversation[] {
|
||||
|
||||
function persistConversations(convs: Conversation[]) {
|
||||
try {
|
||||
sessionStorage.setItem(STORAGE_KEY, JSON.stringify(convs.slice(0, 50)));
|
||||
localStorage.setItem(STORAGE_KEY, JSON.stringify(convs.slice(0, 50)));
|
||||
} catch {}
|
||||
}
|
||||
|
||||
function loadConsole(): ConsoleEntry[] {
|
||||
function loadPending(): Map<string, string> {
|
||||
try {
|
||||
const raw = sessionStorage.getItem(CONSOLE_KEY);
|
||||
if (!raw) return [];
|
||||
return JSON.parse(raw);
|
||||
const raw = localStorage.getItem(PENDING_KEY);
|
||||
if (!raw) return new Map();
|
||||
return new Map(JSON.parse(raw));
|
||||
} catch {
|
||||
return [];
|
||||
return new Map();
|
||||
}
|
||||
}
|
||||
|
||||
function persistConsole(entries: ConsoleEntry[]) {
|
||||
function savePending(m: Map<string, string>) {
|
||||
try {
|
||||
sessionStorage.setItem(CONSOLE_KEY, JSON.stringify(entries.slice(-200)));
|
||||
localStorage.setItem(PENDING_KEY, JSON.stringify([...m.entries()]));
|
||||
} catch {}
|
||||
}
|
||||
|
||||
// ─── Store ────────────────────────────────────────────────────────────────────
|
||||
// ─── Helpers ──────────────────────────────────────────────────────────────────
|
||||
|
||||
function getTs(): string {
|
||||
return new Date().toLocaleTimeString("ru-RU", {
|
||||
@@ -111,12 +115,50 @@ function getTs(): string {
|
||||
});
|
||||
}
|
||||
|
||||
/** Call the tRPC endpoint via raw fetch (avoids React-Query dependency). */
|
||||
async function trpcQuery<T>(
|
||||
path: string,
|
||||
input: unknown,
|
||||
method: "query" | "mutation"
|
||||
): Promise<T> {
|
||||
if (method === "mutation") {
|
||||
const res = await fetch(`/api/trpc/${path}`, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ json: input }),
|
||||
signal: AbortSignal.timeout(15_000),
|
||||
});
|
||||
const data = await res.json();
|
||||
if (data?.error) throw new Error(data.error.message ?? "tRPC error");
|
||||
return data?.result?.data?.json ?? data?.result?.data;
|
||||
} else {
|
||||
const encoded = encodeURIComponent(JSON.stringify({ json: input }));
|
||||
const res = await fetch(`/api/trpc/${path}?input=${encoded}`, {
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
});
|
||||
const data = await res.json();
|
||||
if (data?.error) throw new Error(data.error.message ?? "tRPC error");
|
||||
return data?.result?.data?.json ?? data?.result?.data;
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Store ────────────────────────────────────────────────────────────────────
|
||||
|
||||
class ChatStore {
|
||||
private conversations: Conversation[] = loadConversations();
|
||||
private activeId: string = "";
|
||||
private isThinking = false;
|
||||
private consoleEntries: ConsoleEntry[] = loadConsole();
|
||||
private consoleEntries: ConsoleEntry[] = [];
|
||||
|
||||
/** sessionId → convId for active polls */
|
||||
private activePolls = new Map<string, string>();
|
||||
/** sessionId → lastSeq */
|
||||
private pollSeq = new Map<string, number>();
|
||||
/** sessionId → timeout handle */
|
||||
private pollTimers = new Map<string, ReturnType<typeof setTimeout>>();
|
||||
|
||||
/** Legacy SSE abort controller (still used as fallback) */
|
||||
private abortController: AbortController | null = null;
|
||||
private isThinking = false;
|
||||
|
||||
private updateListeners = new Set<UpdateHandler>();
|
||||
private consoleListeners = new Set<ConsoleHandler>();
|
||||
@@ -125,6 +167,8 @@ class ChatStore {
|
||||
if (this.conversations.length > 0) {
|
||||
this.activeId = this.conversations[0].id;
|
||||
}
|
||||
// Resume any pending sessions from previous page load
|
||||
this._resumePendingSessions();
|
||||
}
|
||||
|
||||
// ─── Subscriptions ──────────────────────────────────────────────────────────
|
||||
@@ -155,25 +199,15 @@ class ChatStore {
|
||||
|
||||
// ─── Selectors ──────────────────────────────────────────────────────────────
|
||||
|
||||
getConversations(): Conversation[] {
|
||||
return this.conversations;
|
||||
}
|
||||
|
||||
getActiveId(): string {
|
||||
return this.activeId;
|
||||
}
|
||||
|
||||
getConversations(): Conversation[] { return this.conversations; }
|
||||
getActiveId(): string { return this.activeId; }
|
||||
getActive(): Conversation | null {
|
||||
return this.conversations.find((c) => c.id === this.activeId) ?? null;
|
||||
}
|
||||
|
||||
getIsThinking(): boolean {
|
||||
return this.isThinking;
|
||||
}
|
||||
|
||||
getConsole(): ConsoleEntry[] {
|
||||
return this.consoleEntries;
|
||||
return this.isThinking || this.activePolls.size > 0;
|
||||
}
|
||||
getConsole(): ConsoleEntry[] { return this.consoleEntries; }
|
||||
|
||||
// ─── Mutations ──────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -187,7 +221,7 @@ class ChatStore {
|
||||
const welcome: ChatMessage = {
|
||||
id: "welcome",
|
||||
role: "system",
|
||||
content: `${orchName} ready. Type a command or ask anything.`,
|
||||
content: `${orchName} ready. Type a command or ask anything.\n\n*Background mode: requests continue even when you close the tab.*`,
|
||||
timestamp: getTs(),
|
||||
};
|
||||
const conv: Conversation = {
|
||||
@@ -201,7 +235,6 @@ class ChatStore {
|
||||
this.activeId = id;
|
||||
this.consoleEntries = [];
|
||||
persistConversations(this.conversations);
|
||||
persistConsole([]);
|
||||
this.emit("update");
|
||||
return id;
|
||||
}
|
||||
@@ -222,7 +255,6 @@ class ChatStore {
|
||||
|
||||
clearConsole() {
|
||||
this.consoleEntries = [];
|
||||
persistConsole([]);
|
||||
this.emit("update");
|
||||
}
|
||||
|
||||
@@ -234,19 +266,22 @@ class ChatStore {
|
||||
private addConsoleEntry(entry: Omit<ConsoleEntry, "id" | "timestamp">) {
|
||||
const full: ConsoleEntry = { ...entry, id: nanoid(6), timestamp: getTs() };
|
||||
this.consoleEntries = [...this.consoleEntries, full];
|
||||
persistConsole(this.consoleEntries);
|
||||
this.emit("console", full);
|
||||
this.emit("update");
|
||||
}
|
||||
|
||||
// ─── Send Message (SSE) ─────────────────────────────────────────────────────
|
||||
// ─── Background Session Send ─────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Send a message using the background session API.
|
||||
* The Go Gateway processes the request in a detached goroutine — survives
|
||||
* page reloads, laptop sleep, and browser tab closure.
|
||||
*/
|
||||
async send(userText: string, activeConvId?: string) {
|
||||
if (this.isThinking || !userText.trim()) return;
|
||||
if (!userText.trim()) return;
|
||||
if (this.isThinking) return;
|
||||
|
||||
let convId = activeConvId ?? this.activeId;
|
||||
|
||||
// If no conversation exists, create one
|
||||
if (!convId || !this.conversations.find((c) => c.id === convId)) {
|
||||
convId = this.createConversation();
|
||||
}
|
||||
@@ -254,7 +289,7 @@ class ChatStore {
|
||||
const conv = this.conversations.find((c) => c.id === convId);
|
||||
if (!conv) return;
|
||||
|
||||
// Add user message
|
||||
// Add user message immediately
|
||||
const userMsg: ChatMessage = {
|
||||
id: `user-${Date.now()}`,
|
||||
role: "user",
|
||||
@@ -276,8 +311,9 @@ class ChatStore {
|
||||
history: newHistory,
|
||||
}));
|
||||
|
||||
// Placeholder streaming message
|
||||
const assistantId = `resp-${Date.now()}`;
|
||||
// Create placeholder streaming message
|
||||
const sessionId = `cs-${nanoid(12)}`;
|
||||
const assistantId = `resp-${sessionId}`;
|
||||
const placeholder: ChatMessage = {
|
||||
id: assistantId,
|
||||
role: "assistant",
|
||||
@@ -285,6 +321,7 @@ class ChatStore {
|
||||
timestamp: getTs(),
|
||||
isStreaming: true,
|
||||
toolCalls: [],
|
||||
sessionId,
|
||||
};
|
||||
this.updateConv(convId, (c) => ({
|
||||
...c,
|
||||
@@ -293,187 +330,343 @@ class ChatStore {
|
||||
|
||||
this.isThinking = true;
|
||||
this.consoleEntries = [];
|
||||
persistConsole([]);
|
||||
this.emit("update");
|
||||
|
||||
// Abort previous
|
||||
this.abortController?.abort();
|
||||
const controller = new AbortController();
|
||||
this.abortController = controller;
|
||||
|
||||
const toolCallsAccumulated: ToolCallStep[] = [];
|
||||
|
||||
try {
|
||||
const res = await fetch("/api/orchestrator/stream", {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ messages: newHistory, maxIter: 10 }),
|
||||
signal: controller.signal,
|
||||
});
|
||||
// Start background session on Go Gateway (returns immediately)
|
||||
await trpcQuery<{ sessionId: string; status: string }>(
|
||||
"orchestrator.startSession",
|
||||
{
|
||||
messages: newHistory,
|
||||
sessionId,
|
||||
maxIter: 10,
|
||||
},
|
||||
"mutation"
|
||||
);
|
||||
|
||||
if (!res.ok || !res.body) {
|
||||
throw new Error(`Server error: ${res.status}`);
|
||||
}
|
||||
// Persist to localStorage so we can resume on page reload
|
||||
const pending = loadPending();
|
||||
pending.set(sessionId, convId);
|
||||
savePending(pending);
|
||||
|
||||
this.addConsoleEntry({ type: "thinking" });
|
||||
|
||||
const reader = res.body.getReader();
|
||||
// Single UTF-8 decoder with stream:true — buffers incomplete multi-byte sequences
|
||||
const decoder = new TextDecoder("utf-8");
|
||||
let buffer = "";
|
||||
let streamedContent = "";
|
||||
let finalModel = "";
|
||||
let finalWarning = "";
|
||||
let finalUsage: any = undefined;
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
|
||||
// Decode chunk — stream:true means decoder holds partial UTF-8 bytes across chunks
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
const lines = buffer.split("\n");
|
||||
// Keep the last (potentially incomplete) line in buffer
|
||||
buffer = lines.pop() ?? "";
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.startsWith("data: ")) continue;
|
||||
const data = line.slice(6).trim();
|
||||
if (data === "[DONE]") continue;
|
||||
|
||||
try {
|
||||
const evt = JSON.parse(data) as {
|
||||
type: string;
|
||||
content?: string;
|
||||
tool?: string;
|
||||
args?: any;
|
||||
result?: any;
|
||||
error?: string;
|
||||
success?: boolean;
|
||||
durationMs?: number;
|
||||
model?: string;
|
||||
modelWarning?: string;
|
||||
usage?: any;
|
||||
};
|
||||
|
||||
switch (evt.type) {
|
||||
case "tool_call": {
|
||||
const step: ToolCallStep = {
|
||||
tool: evt.tool ?? "",
|
||||
args: evt.args ?? {},
|
||||
result: evt.result,
|
||||
error: evt.error,
|
||||
success: evt.success ?? false,
|
||||
durationMs: evt.durationMs ?? 0,
|
||||
};
|
||||
toolCallsAccumulated.push(step);
|
||||
this.addConsoleEntry({ type: "tool_call", ...step });
|
||||
this.updateConv(convId, (c) => ({
|
||||
...c,
|
||||
messages: c.messages.map((m) =>
|
||||
m.id === assistantId
|
||||
? { ...m, toolCalls: [...toolCallsAccumulated] }
|
||||
: m
|
||||
),
|
||||
}));
|
||||
break;
|
||||
}
|
||||
case "delta": {
|
||||
streamedContent += evt.content ?? "";
|
||||
this.updateConv(convId, (c) => ({
|
||||
...c,
|
||||
messages: c.messages.map((m) =>
|
||||
m.id === assistantId ? { ...m, content: streamedContent } : m
|
||||
),
|
||||
}));
|
||||
this.emit("update");
|
||||
break;
|
||||
}
|
||||
case "done": {
|
||||
finalModel = evt.model ?? "";
|
||||
finalWarning = evt.modelWarning ?? "";
|
||||
finalUsage = evt.usage;
|
||||
this.addConsoleEntry({ type: "done", model: finalModel });
|
||||
break;
|
||||
}
|
||||
case "error": {
|
||||
this.addConsoleEntry({ type: "error", error: evt.error });
|
||||
this.updateConv(convId, (c) => ({
|
||||
...c,
|
||||
messages: c.messages.map((m) =>
|
||||
m.id === assistantId
|
||||
? { ...m, content: `Error: ${evt.error}`, isError: true, isStreaming: false }
|
||||
: m
|
||||
),
|
||||
}));
|
||||
this.emit("update");
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// malformed JSON — skip line
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flush any remaining decoder bytes
|
||||
const remaining = decoder.decode(undefined, { stream: false });
|
||||
if (remaining) {
|
||||
streamedContent += remaining;
|
||||
}
|
||||
|
||||
// Finalize
|
||||
const finalContent = streamedContent || "(no response)";
|
||||
// Start polling
|
||||
this._startPolling(sessionId, convId, assistantId);
|
||||
} catch (err: any) {
|
||||
this.isThinking = false;
|
||||
this.addConsoleEntry({ type: "error", error: err.message });
|
||||
this.updateConv(convId, (c) => ({
|
||||
...c,
|
||||
history: [...c.history.filter((h) => h.role !== "assistant" || h.content !== ""),
|
||||
{ role: "assistant" as const, content: finalContent }],
|
||||
messages: c.messages.map((m) =>
|
||||
m.id === assistantId
|
||||
? {
|
||||
...m,
|
||||
content: finalContent,
|
||||
content: `Failed to start background session: ${err.message}`,
|
||||
isError: true,
|
||||
isStreaming: false,
|
||||
toolCalls: toolCallsAccumulated,
|
||||
model: finalModel,
|
||||
modelWarning: finalWarning,
|
||||
usage: finalUsage,
|
||||
}
|
||||
: m
|
||||
),
|
||||
}));
|
||||
} catch (err: any) {
|
||||
if (err.name === "AbortError") {
|
||||
// Cancelled — just remove streaming flag
|
||||
this.updateConv(convId, (c) => ({
|
||||
...c,
|
||||
messages: c.messages.map((m) =>
|
||||
m.id === assistantId
|
||||
? { ...m, isStreaming: false, content: m.content || "(cancelled)" }
|
||||
: m
|
||||
),
|
||||
}));
|
||||
} else {
|
||||
this.addConsoleEntry({ type: "error", error: err.message });
|
||||
this.updateConv(convId, (c) => ({
|
||||
...c,
|
||||
messages: c.messages.map((m) =>
|
||||
m.id === assistantId
|
||||
? { ...m, content: `Network Error: ${err.message}`, isError: true, isStreaming: false }
|
||||
: m
|
||||
),
|
||||
}));
|
||||
}
|
||||
} finally {
|
||||
this.isThinking = false;
|
||||
this.abortController = null;
|
||||
this.emit("update");
|
||||
}
|
||||
}
|
||||
|
||||
/** Cancel the current in-flight request. */
|
||||
// ─── Polling ────────────────────────────────────────────────────────────────
|
||||
|
||||
private _startPolling(sessionId: string, convId: string, assistantMsgId: string) {
|
||||
if (this.activePolls.has(sessionId)) return;
|
||||
this.activePolls.set(sessionId, convId);
|
||||
this.pollSeq.set(sessionId, 0);
|
||||
this._scheduleNextPoll(sessionId, convId, assistantMsgId, 1500);
|
||||
}
|
||||
|
||||
private _scheduleNextPoll(
|
||||
sessionId: string,
|
||||
convId: string,
|
||||
assistantMsgId: string,
|
||||
delayMs: number
|
||||
) {
|
||||
const handle = setTimeout(() => {
|
||||
this._doPoll(sessionId, convId, assistantMsgId);
|
||||
}, delayMs);
|
||||
this.pollTimers.set(sessionId, handle);
|
||||
}
|
||||
|
||||
private async _doPoll(sessionId: string, convId: string, assistantMsgId: string) {
|
||||
const afterSeq = this.pollSeq.get(sessionId) ?? 0;
|
||||
|
||||
try {
|
||||
const result = await trpcQuery<{
|
||||
sessionId: string;
|
||||
status: string;
|
||||
events: Array<{
|
||||
id: number;
|
||||
sessionId: string;
|
||||
seq: number;
|
||||
eventType: "thinking" | "tool_call" | "delta" | "done" | "error";
|
||||
content: string;
|
||||
toolName: string;
|
||||
toolArgs: string;
|
||||
toolResult: string;
|
||||
toolSuccess: boolean;
|
||||
durationMs: number;
|
||||
model: string;
|
||||
usageJson: string;
|
||||
errorMsg: string;
|
||||
createdAt: string;
|
||||
}>;
|
||||
}>("orchestrator.getEvents", { sessionId, afterSeq }, "query");
|
||||
|
||||
if (!result) {
|
||||
// Gateway not available yet — retry
|
||||
this._scheduleNextPoll(sessionId, convId, assistantMsgId, 2000);
|
||||
return;
|
||||
}
|
||||
|
||||
const { status, events } = result;
|
||||
let maxSeq = afterSeq;
|
||||
let lastContent = "";
|
||||
let lastModel = "";
|
||||
let lastUsage: any = undefined;
|
||||
let streamedContent = "";
|
||||
|
||||
// Get current content from message
|
||||
const conv = this.conversations.find((c) => c.id === convId);
|
||||
const existingMsg = conv?.messages.find((m) => m.id === assistantMsgId);
|
||||
streamedContent = existingMsg?.content ?? "";
|
||||
|
||||
for (const ev of events) {
|
||||
if (ev.seq > maxSeq) maxSeq = ev.seq;
|
||||
|
||||
switch (ev.eventType) {
|
||||
case "thinking":
|
||||
this.addConsoleEntry({ type: "thinking" });
|
||||
break;
|
||||
|
||||
case "tool_call": {
|
||||
let args: any = {};
|
||||
try { args = JSON.parse(ev.toolArgs || "{}"); } catch {}
|
||||
let resultVal: any = ev.toolResult;
|
||||
try { if (ev.toolResult) resultVal = JSON.parse(ev.toolResult); } catch {}
|
||||
const step: ToolCallStep = {
|
||||
tool: ev.toolName,
|
||||
args,
|
||||
result: resultVal,
|
||||
error: ev.errorMsg || undefined,
|
||||
success: ev.toolSuccess,
|
||||
durationMs: ev.durationMs,
|
||||
};
|
||||
this.addConsoleEntry({ type: "tool_call", ...step });
|
||||
// Append tool call to message
|
||||
this.updateConv(convId, (c) => ({
|
||||
...c,
|
||||
messages: c.messages.map((m) =>
|
||||
m.id === assistantMsgId
|
||||
? { ...m, toolCalls: [...(m.toolCalls ?? []), step] }
|
||||
: m
|
||||
),
|
||||
}));
|
||||
break;
|
||||
}
|
||||
|
||||
case "delta":
|
||||
// The Go gateway stores full response as a single delta
|
||||
streamedContent = ev.content || streamedContent;
|
||||
this.updateConv(convId, (c) => ({
|
||||
...c,
|
||||
messages: c.messages.map((m) =>
|
||||
m.id === assistantMsgId ? { ...m, content: streamedContent } : m
|
||||
),
|
||||
}));
|
||||
this.emit("update");
|
||||
break;
|
||||
|
||||
case "done": {
|
||||
lastModel = ev.model;
|
||||
try {
|
||||
const usageObj = JSON.parse(ev.usageJson || "null");
|
||||
if (usageObj) {
|
||||
lastUsage = {
|
||||
prompt_tokens: usageObj.promptTokens ?? usageObj.prompt_tokens ?? 0,
|
||||
completion_tokens: usageObj.completionTokens ?? usageObj.completion_tokens ?? 0,
|
||||
total_tokens: usageObj.totalTokens ?? usageObj.total_tokens ?? 0,
|
||||
};
|
||||
}
|
||||
} catch {}
|
||||
this.addConsoleEntry({ type: "done", model: lastModel });
|
||||
break;
|
||||
}
|
||||
|
||||
case "error":
|
||||
this.addConsoleEntry({ type: "error", error: ev.errorMsg });
|
||||
this.updateConv(convId, (c) => ({
|
||||
...c,
|
||||
messages: c.messages.map((m) =>
|
||||
m.id === assistantMsgId
|
||||
? {
|
||||
...m,
|
||||
content: `Error: ${ev.errorMsg}`,
|
||||
isError: true,
|
||||
isStreaming: false,
|
||||
}
|
||||
: m
|
||||
),
|
||||
}));
|
||||
this.emit("update");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
this.pollSeq.set(sessionId, maxSeq);
|
||||
|
||||
if (status === "done" || status === "error") {
|
||||
// Finalize message
|
||||
this.updateConv(convId, (c) => {
|
||||
const msg = c.messages.find((m) => m.id === assistantMsgId);
|
||||
const finalContent = streamedContent || msg?.content || "(no response)";
|
||||
return {
|
||||
...c,
|
||||
history: status === "done"
|
||||
? [
|
||||
...c.history.filter((h) => !(h.role === "assistant" && h.content === "")),
|
||||
{ role: "assistant" as const, content: finalContent },
|
||||
]
|
||||
: c.history,
|
||||
messages: c.messages.map((m) =>
|
||||
m.id === assistantMsgId
|
||||
? {
|
||||
...m,
|
||||
content: finalContent,
|
||||
isStreaming: false,
|
||||
model: lastModel || m.model,
|
||||
usage: lastUsage || m.usage,
|
||||
isError: status === "error" ? true : m.isError,
|
||||
}
|
||||
: m
|
||||
),
|
||||
};
|
||||
});
|
||||
|
||||
// Clean up
|
||||
this._stopPolling(sessionId);
|
||||
this.isThinking = this.activePolls.size > 0;
|
||||
this.emit("update");
|
||||
} else {
|
||||
// Session still running — poll again
|
||||
this._scheduleNextPoll(sessionId, convId, assistantMsgId, 1500);
|
||||
}
|
||||
} catch {
|
||||
// Network error — retry with backoff
|
||||
this._scheduleNextPoll(sessionId, convId, assistantMsgId, 3000);
|
||||
}
|
||||
}
|
||||
|
||||
private _stopPolling(sessionId: string) {
|
||||
clearTimeout(this.pollTimers.get(sessionId));
|
||||
this.pollTimers.delete(sessionId);
|
||||
this.activePolls.delete(sessionId);
|
||||
this.pollSeq.delete(sessionId);
|
||||
|
||||
// Remove from persistent pending list
|
||||
const pending = loadPending();
|
||||
pending.delete(sessionId);
|
||||
savePending(pending);
|
||||
}
|
||||
|
||||
/** Resume polling for sessions that were running when page was reloaded. */
|
||||
private async _resumePendingSessions() {
|
||||
const pending = loadPending();
|
||||
if (pending.size === 0) return;
|
||||
|
||||
// Small delay to let React mount first
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, 2000));
|
||||
|
||||
for (const [sessionId, convId] of pending.entries()) {
|
||||
// Find the conversation
|
||||
const conv = this.conversations.find((c) => c.id === convId);
|
||||
if (!conv) {
|
||||
// Conversation gone — clean up
|
||||
const p = loadPending();
|
||||
p.delete(sessionId);
|
||||
savePending(p);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Find the placeholder message for this session
|
||||
const msgWithSession = conv.messages.find(
|
||||
(m) => m.sessionId === sessionId && m.role === "assistant"
|
||||
);
|
||||
|
||||
// Check current session status from DB
|
||||
try {
|
||||
const sess = await trpcQuery<{
|
||||
status: string;
|
||||
finalResponse: string;
|
||||
model: string;
|
||||
totalTokens: number;
|
||||
}>("orchestrator.getSession", { sessionId }, "query");
|
||||
|
||||
if (!sess) {
|
||||
// Session not found in DB — remove pending
|
||||
const p = loadPending();
|
||||
p.delete(sessionId);
|
||||
savePending(p);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (sess.status === "done" || sess.status === "error") {
|
||||
// Already finished — update message directly from session data
|
||||
if (msgWithSession) {
|
||||
this.updateConv(convId, (c) => ({
|
||||
...c,
|
||||
history: sess.status === "done"
|
||||
? [
|
||||
...c.history.filter((h) => !(h.role === "assistant" && h.content === "")),
|
||||
{ role: "assistant" as const, content: sess.finalResponse },
|
||||
]
|
||||
: c.history,
|
||||
messages: c.messages.map((m) =>
|
||||
m.id === msgWithSession.id
|
||||
? {
|
||||
...m,
|
||||
content: sess.finalResponse || m.content || "(no response)",
|
||||
isStreaming: false,
|
||||
isError: sess.status === "error",
|
||||
model: sess.model || m.model,
|
||||
usage: sess.totalTokens
|
||||
? { prompt_tokens: 0, completion_tokens: 0, total_tokens: sess.totalTokens }
|
||||
: m.usage,
|
||||
}
|
||||
: m
|
||||
),
|
||||
}));
|
||||
}
|
||||
const p = loadPending();
|
||||
p.delete(sessionId);
|
||||
savePending(p);
|
||||
this.emit("update");
|
||||
} else {
|
||||
// Still running — resume polling
|
||||
const assistantMsgId = msgWithSession?.id ?? `resp-${sessionId}`;
|
||||
this.isThinking = true;
|
||||
this._startPolling(sessionId, convId, assistantMsgId);
|
||||
this.emit("update");
|
||||
}
|
||||
} catch {
|
||||
// Gateway not reachable — keep pending for next reload
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Cancel the current in-flight SSE request (legacy). */
|
||||
cancel() {
|
||||
this.abortController?.abort();
|
||||
// Also stop all active polls
|
||||
for (const sessionId of [...this.activePolls.keys()]) {
|
||||
this._stopPolling(sessionId);
|
||||
}
|
||||
this.isThinking = false;
|
||||
this.emit("update");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -228,7 +228,7 @@ function MessageBubble({ msg }: { msg: ChatMessage }) {
|
||||
{msg.isStreaming && (
|
||||
<span className="text-[9px] font-mono text-cyan-400 flex items-center gap-1">
|
||||
<Activity className="w-2.5 h-2.5 animate-pulse" />
|
||||
streaming
|
||||
фоновая обработка…
|
||||
</span>
|
||||
)}
|
||||
</div>
|
||||
@@ -431,10 +431,11 @@ export default function Chat() {
|
||||
{isThinking && (
|
||||
<button
|
||||
onClick={() => chatStore.cancel()}
|
||||
className="p-1.5 rounded border border-red-500/40 text-red-400 hover:bg-red-500/10 transition-colors"
|
||||
title="Cancel request"
|
||||
className="p-1.5 rounded border border-red-500/40 text-red-400 hover:bg-red-500/10 transition-colors flex items-center gap-1"
|
||||
title="Отменить фоновую обработку"
|
||||
>
|
||||
<StopCircle className="w-3.5 h-3.5" />
|
||||
<span className="text-[9px] font-mono hidden sm:inline">Stop</span>
|
||||
</button>
|
||||
)}
|
||||
|
||||
@@ -528,7 +529,7 @@ export default function Chat() {
|
||||
className="flex items-center gap-2 text-cyan-400 font-mono text-xs pl-10"
|
||||
>
|
||||
<Loader2 className="w-3.5 h-3.5 animate-spin" />
|
||||
<span className="text-muted-foreground">Processing…</span>
|
||||
<span className="text-muted-foreground">Обработка в фоне… (работает даже при перезагрузке страницы)</span>
|
||||
</motion.div>
|
||||
)}
|
||||
</div>
|
||||
@@ -562,7 +563,7 @@ export default function Chat() {
|
||||
value={input}
|
||||
onChange={(e) => setInput(e.target.value)}
|
||||
onKeyDown={(e) => e.key === "Enter" && !e.shiftKey && sendMessage()}
|
||||
placeholder={isThinking ? "Ожидание ответа…" : "Введите команду или вопрос…"}
|
||||
placeholder={isThinking ? "Фоновая обработка… (можете перезагрузить страницу)" : "Введите команду или вопрос…"}
|
||||
disabled={isThinking}
|
||||
className="bg-transparent border-none text-foreground font-mono text-sm placeholder:text-muted-foreground/50 focus-visible:ring-0 focus-visible:ring-offset-0 h-8"
|
||||
/>
|
||||
|
||||
36
drizzle/0005_chat_sessions.sql
Normal file
36
drizzle/0005_chat_sessions.sql
Normal file
@@ -0,0 +1,36 @@
|
||||
-- chatSessions: one row per chat request, survives page reloads
|
||||
CREATE TABLE IF NOT EXISTS chatSessions (
|
||||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||||
sessionId VARCHAR(64) NOT NULL UNIQUE,
|
||||
agentId INT NOT NULL DEFAULT 1,
|
||||
status ENUM('running','done','error') NOT NULL DEFAULT 'running',
|
||||
userMessage TEXT NOT NULL,
|
||||
finalResponse TEXT,
|
||||
model VARCHAR(128),
|
||||
totalTokens INT DEFAULT 0,
|
||||
processingTimeMs INT DEFAULT 0,
|
||||
errorMessage TEXT,
|
||||
createdAt TIMESTAMP NOT NULL DEFAULT NOW(),
|
||||
updatedAt TIMESTAMP NOT NULL DEFAULT NOW() ON UPDATE NOW(),
|
||||
INDEX chatSessions_status_idx (status),
|
||||
INDEX chatSessions_createdAt_idx (createdAt)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
||||
-- chatEvents: one row per SSE event within a session
|
||||
CREATE TABLE IF NOT EXISTS chatEvents (
|
||||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||||
sessionId VARCHAR(64) NOT NULL,
|
||||
seq INT NOT NULL DEFAULT 0,
|
||||
eventType ENUM('thinking','tool_call','delta','done','error') NOT NULL,
|
||||
content TEXT,
|
||||
toolName VARCHAR(128),
|
||||
toolArgs JSON,
|
||||
toolResult TEXT,
|
||||
toolSuccess TINYINT(1),
|
||||
durationMs INT,
|
||||
model VARCHAR(128),
|
||||
usageJson JSON,
|
||||
errorMsg TEXT,
|
||||
createdAt TIMESTAMP(3) NOT NULL DEFAULT NOW(3),
|
||||
INDEX chatEvents_sessionId_seq_idx (sessionId, seq)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
@@ -223,3 +223,54 @@ export const llmProviders = mysqlTable("llmProviders", {
|
||||
|
||||
export type LlmProvider = typeof llmProviders.$inferSelect;
|
||||
export type InsertLlmProvider = typeof llmProviders.$inferInsert;
|
||||
|
||||
/**
|
||||
* Chat Sessions — persistent server-side chat runs.
|
||||
* Each user message creates one session. The Go gateway processes it
|
||||
* and writes events to chatEvents. The frontend polls for events.
|
||||
*/
|
||||
export const chatSessions = mysqlTable("chatSessions", {
|
||||
id: int("id").autoincrement().primaryKey(),
|
||||
sessionId: varchar("sessionId", { length: 64 }).notNull().unique(),
|
||||
agentId: int("agentId").notNull().default(1),
|
||||
status: mysqlEnum("status", ["running", "done", "error"]).notNull().default("running"),
|
||||
userMessage: text("userMessage").notNull(),
|
||||
finalResponse: text("finalResponse"),
|
||||
model: varchar("model", { length: 128 }),
|
||||
totalTokens: int("totalTokens").default(0),
|
||||
processingTimeMs: int("processingTimeMs").default(0),
|
||||
errorMessage: text("errorMessage"),
|
||||
createdAt: timestamp("createdAt").defaultNow().notNull(),
|
||||
updatedAt: timestamp("updatedAt").defaultNow().onUpdateNow().notNull(),
|
||||
}, (table) => ({
|
||||
statusIdx: index("chatSessions_status_idx").on(table.status),
|
||||
createdAtIdx: index("chatSessions_createdAt_idx").on(table.createdAt),
|
||||
}));
|
||||
|
||||
export type ChatSession = typeof chatSessions.$inferSelect;
|
||||
export type InsertChatSession = typeof chatSessions.$inferInsert;
|
||||
|
||||
/**
|
||||
* Chat Events — individual SSE events written by Go gateway, read by frontend.
|
||||
*/
|
||||
export const chatEvents = mysqlTable("chatEvents", {
|
||||
id: int("id").autoincrement().primaryKey(),
|
||||
sessionId: varchar("sessionId", { length: 64 }).notNull(),
|
||||
seq: int("seq").notNull().default(0),
|
||||
eventType: mysqlEnum("eventType", ["thinking", "tool_call", "delta", "done", "error"]).notNull(),
|
||||
content: text("content"),
|
||||
toolName: varchar("toolName", { length: 128 }),
|
||||
toolArgs: json("toolArgs"),
|
||||
toolResult: text("toolResult"),
|
||||
toolSuccess: boolean("toolSuccess"),
|
||||
durationMs: int("durationMs"),
|
||||
model: varchar("model", { length: 128 }),
|
||||
usageJson: json("usageJson"),
|
||||
errorMsg: text("errorMsg"),
|
||||
createdAt: timestamp("createdAt", { fsp: 3 }).defaultNow().notNull(),
|
||||
}, (table) => ({
|
||||
sessionSeqIdx: index("chatEvents_sessionId_seq_idx").on(table.sessionId, table.seq),
|
||||
}));
|
||||
|
||||
export type ChatEvent = typeof chatEvents.$inferSelect;
|
||||
export type InsertChatEvent = typeof chatEvents.$inferInsert;
|
||||
|
||||
@@ -96,6 +96,12 @@ func main() {
|
||||
|
||||
// Provider config reload (called by Node.js after provider change)
|
||||
r.Post("/providers/reload", h.ProvidersReload)
|
||||
|
||||
// Persistent chat sessions (background processing, DB-backed)
|
||||
r.Post("/chat/session", h.StartChatSession)
|
||||
r.Get("/chat/sessions", h.ListChatSessions)
|
||||
r.Get("/chat/session/{id}", h.GetChatSession)
|
||||
r.Get("/chat/session/{id}/events", h.GetChatEvents)
|
||||
})
|
||||
|
||||
// ── Start Server ─────────────────────────────────────────────────────────
|
||||
|
||||
@@ -677,3 +677,242 @@ func round2(f float64) float64 {
|
||||
func init() {
|
||||
_ = fmt.Sprintf // suppress unused import
|
||||
}
|
||||
|
||||
// ─── Persistent Chat Sessions ─────────────────────────────────────────────────
|
||||
|
||||
// POST /api/chat/session
|
||||
// Creates a DB session, fires off the orchestrator in the background,
|
||||
// returns {"sessionId":"..."} immediately. The client polls for events.
|
||||
func (h *Handler) StartChatSession(w http.ResponseWriter, r *http.Request) {
|
||||
if h.db == nil {
|
||||
respondError(w, http.StatusServiceUnavailable, "DB not connected — persistent sessions unavailable")
|
||||
return
|
||||
}
|
||||
|
||||
var req struct {
|
||||
Messages []orchestrator.Message `json:"messages"`
|
||||
Model string `json:"model,omitempty"`
|
||||
MaxIter int `json:"maxIter,omitempty"`
|
||||
SessionID string `json:"sessionId,omitempty"` // client can supply its own ID
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
respondError(w, http.StatusBadRequest, "invalid body: "+err.Error())
|
||||
return
|
||||
}
|
||||
if len(req.Messages) == 0 {
|
||||
respondError(w, http.StatusBadRequest, "messages array is required")
|
||||
return
|
||||
}
|
||||
|
||||
// Use client-supplied ID or generate one
|
||||
sessionID := req.SessionID
|
||||
if sessionID == "" {
|
||||
sessionID = fmt.Sprintf("cs-%d", time.Now().UnixNano())
|
||||
}
|
||||
|
||||
// Extract last user message for storage
|
||||
userMessage := ""
|
||||
for i := len(req.Messages) - 1; i >= 0; i-- {
|
||||
if req.Messages[i].Role == "user" {
|
||||
userMessage = req.Messages[i].Content
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve orchestrator agent ID
|
||||
orchAgentID := 1
|
||||
if cfg, err := h.db.GetOrchestratorConfig(); err == nil && cfg != nil {
|
||||
orchAgentID = cfg.ID
|
||||
}
|
||||
|
||||
// Create session row in DB
|
||||
if err := h.db.CreateSession(sessionID, userMessage, orchAgentID); err != nil {
|
||||
respondError(w, http.StatusInternalServerError, "failed to create session: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
maxIter := req.MaxIter
|
||||
if maxIter <= 0 {
|
||||
maxIter = 10
|
||||
}
|
||||
model := req.Model
|
||||
|
||||
// Snapshot messages + config for the goroutine
|
||||
messages := req.Messages
|
||||
|
||||
// Launch orchestration in a fully detached goroutine.
|
||||
// This goroutine runs independently — survives HTTP disconnect.
|
||||
go func() {
|
||||
startTime := time.Now()
|
||||
|
||||
// Append initial "thinking" event
|
||||
_ = h.db.AppendEvent(db.ChatEventRow{
|
||||
SessionID: sessionID,
|
||||
EventType: "thinking",
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(),
|
||||
time.Duration(h.cfg.RequestTimeoutSecs)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
result := h.orch.ChatWithEvents(ctx, messages, model, maxIter, func(step orchestrator.ToolCallStep) {
|
||||
argsJSON, _ := json.Marshal(step.Args)
|
||||
resultStr := ""
|
||||
if step.Result != nil {
|
||||
b, _ := json.Marshal(step.Result)
|
||||
resultStr = string(b)
|
||||
}
|
||||
_ = h.db.AppendEvent(db.ChatEventRow{
|
||||
SessionID: sessionID,
|
||||
EventType: "tool_call",
|
||||
ToolName: step.Tool,
|
||||
ToolArgs: string(argsJSON),
|
||||
ToolResult: resultStr,
|
||||
ToolSuccess: step.Success,
|
||||
DurationMs: int(step.DurationMs),
|
||||
ErrorMsg: step.Error,
|
||||
})
|
||||
})
|
||||
|
||||
processingMs := time.Since(startTime).Milliseconds()
|
||||
|
||||
if !result.Success {
|
||||
_ = h.db.AppendEvent(db.ChatEventRow{
|
||||
SessionID: sessionID,
|
||||
EventType: "error",
|
||||
ErrorMsg: result.Error,
|
||||
})
|
||||
h.db.MarkSessionDone(sessionID, "error", "", result.Model, result.Error, 0, processingMs)
|
||||
return
|
||||
}
|
||||
|
||||
// Append full response as a single delta (client will display it)
|
||||
_ = h.db.AppendEvent(db.ChatEventRow{
|
||||
SessionID: sessionID,
|
||||
EventType: "delta",
|
||||
Content: result.Response,
|
||||
})
|
||||
|
||||
// Append done event
|
||||
totalTok := 0
|
||||
usageStr := "null"
|
||||
if result.Usage != nil {
|
||||
totalTok = result.Usage.TotalTokens
|
||||
b, _ := json.Marshal(result.Usage)
|
||||
usageStr = string(b)
|
||||
}
|
||||
_ = h.db.AppendEvent(db.ChatEventRow{
|
||||
SessionID: sessionID,
|
||||
EventType: "done",
|
||||
Model: result.Model,
|
||||
UsageJSON: usageStr,
|
||||
})
|
||||
|
||||
h.db.MarkSessionDone(sessionID, "done", result.Response, result.Model, "", totalTok, processingMs)
|
||||
|
||||
// Also save to legacy metrics/history tables
|
||||
reqID := fmt.Sprintf("orch-%d", time.Now().UnixNano())
|
||||
toolNames := make([]string, len(result.ToolCalls))
|
||||
for i, tc := range result.ToolCalls {
|
||||
toolNames[i] = tc.Tool
|
||||
}
|
||||
inputTok, outputTok := 0, 0
|
||||
if result.Usage != nil {
|
||||
inputTok = result.Usage.PromptTokens
|
||||
outputTok = result.Usage.CompletionTokens
|
||||
}
|
||||
h.db.SaveMetric(db.MetricInput{
|
||||
AgentID: orchAgentID,
|
||||
RequestID: reqID,
|
||||
UserMessage: userMessage,
|
||||
AgentResponse: result.Response,
|
||||
InputTokens: inputTok,
|
||||
OutputTokens: outputTok,
|
||||
TotalTokens: totalTok,
|
||||
ProcessingTimeMs: processingMs,
|
||||
Status: "success",
|
||||
ToolsCalled: toolNames,
|
||||
Model: result.Model,
|
||||
})
|
||||
h.db.SaveHistory(db.HistoryInput{
|
||||
AgentID: orchAgentID,
|
||||
UserMessage: userMessage,
|
||||
AgentResponse: result.Response,
|
||||
Status: "success",
|
||||
})
|
||||
}()
|
||||
|
||||
respond(w, http.StatusOK, map[string]any{
|
||||
"sessionId": sessionID,
|
||||
"status": "running",
|
||||
})
|
||||
}
|
||||
|
||||
// GET /api/chat/session/:id
|
||||
func (h *Handler) GetChatSession(w http.ResponseWriter, r *http.Request) {
|
||||
sessionID := r.PathValue("id")
|
||||
if sessionID == "" {
|
||||
respondError(w, http.StatusBadRequest, "sessionId required")
|
||||
return
|
||||
}
|
||||
if h.db == nil {
|
||||
respondError(w, http.StatusServiceUnavailable, "DB not connected")
|
||||
return
|
||||
}
|
||||
sess, err := h.db.GetSession(sessionID)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusNotFound, "session not found")
|
||||
return
|
||||
}
|
||||
respond(w, http.StatusOK, sess)
|
||||
}
|
||||
|
||||
// GET /api/chat/session/:id/events?after=N
|
||||
func (h *Handler) GetChatEvents(w http.ResponseWriter, r *http.Request) {
|
||||
sessionID := r.PathValue("id")
|
||||
if sessionID == "" {
|
||||
respondError(w, http.StatusBadRequest, "sessionId required")
|
||||
return
|
||||
}
|
||||
afterSeq := 0
|
||||
if v := r.URL.Query().Get("after"); v != "" {
|
||||
fmt.Sscanf(v, "%d", &afterSeq)
|
||||
}
|
||||
if h.db == nil {
|
||||
respondError(w, http.StatusServiceUnavailable, "DB not connected")
|
||||
return
|
||||
}
|
||||
events, err := h.db.GetEvents(sessionID, afterSeq)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
// Also return session status so client knows when to stop polling
|
||||
var status string
|
||||
if sess, err := h.db.GetSession(sessionID); err == nil {
|
||||
status = sess.Status
|
||||
}
|
||||
respond(w, http.StatusOK, map[string]any{
|
||||
"sessionId": sessionID,
|
||||
"status": status,
|
||||
"events": events,
|
||||
})
|
||||
}
|
||||
|
||||
// GET /api/chat/sessions?limit=N
|
||||
func (h *Handler) ListChatSessions(w http.ResponseWriter, r *http.Request) {
|
||||
if h.db == nil {
|
||||
respond(w, http.StatusOK, map[string]any{"sessions": []any{}})
|
||||
return
|
||||
}
|
||||
limit := 50
|
||||
if v := r.URL.Query().Get("limit"); v != "" {
|
||||
fmt.Sscanf(v, "%d", &limit)
|
||||
}
|
||||
sessions, err := h.db.GetRecentSessions(limit)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
respond(w, http.StatusOK, map[string]any{"sessions": sessions})
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"database/sql/driver"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
@@ -149,6 +150,249 @@ func (d *DB) GetActiveProvider() (*ProviderRow, error) {
|
||||
return &p, nil
|
||||
}
|
||||
|
||||
// ─── Chat Sessions & Events ───────────────────────────────────────────────────
|
||||
|
||||
// ChatSessionRow holds one persistent chat session.
|
||||
type ChatSessionRow struct {
|
||||
ID int `json:"id"`
|
||||
SessionID string `json:"sessionId"`
|
||||
AgentID int `json:"agentId"`
|
||||
Status string `json:"status"` // running | done | error
|
||||
UserMessage string `json:"userMessage"`
|
||||
FinalResponse string `json:"finalResponse"`
|
||||
Model string `json:"model"`
|
||||
TotalTokens int `json:"totalTokens"`
|
||||
ProcessingTimeMs int64 `json:"processingTimeMs"`
|
||||
ErrorMessage string `json:"errorMessage"`
|
||||
CreatedAt string `json:"createdAt"`
|
||||
UpdatedAt string `json:"updatedAt"`
|
||||
}
|
||||
|
||||
// ChatEventRow holds one event inside a session.
|
||||
type ChatEventRow struct {
|
||||
ID int `json:"id"`
|
||||
SessionID string `json:"sessionId"`
|
||||
Seq int `json:"seq"`
|
||||
EventType string `json:"eventType"` // thinking | tool_call | delta | done | error
|
||||
Content string `json:"content"`
|
||||
ToolName string `json:"toolName"`
|
||||
ToolArgs string `json:"toolArgs"` // JSON string
|
||||
ToolResult string `json:"toolResult"`
|
||||
ToolSuccess bool `json:"toolSuccess"`
|
||||
DurationMs int `json:"durationMs"`
|
||||
Model string `json:"model"`
|
||||
UsageJSON string `json:"usageJson"` // JSON string
|
||||
ErrorMsg string `json:"errorMsg"`
|
||||
CreatedAt string `json:"createdAt"`
|
||||
}
|
||||
|
||||
// CreateSession inserts a new running session and returns its row.
|
||||
func (d *DB) CreateSession(sessionID, userMessage string, agentID int) error {
|
||||
if d.conn == nil {
|
||||
return fmt.Errorf("DB not connected")
|
||||
}
|
||||
_, err := d.conn.Exec(`
|
||||
INSERT INTO chatSessions (sessionId, agentId, status, userMessage)
|
||||
VALUES (?, ?, 'running', ?)
|
||||
`, sessionID, agentID, truncate(userMessage, 65535))
|
||||
return err
|
||||
}
|
||||
|
||||
// AppendEvent inserts a new event row for a session.
|
||||
// seq is auto-calculated as MAX(seq)+1 for the session.
|
||||
func (d *DB) AppendEvent(e ChatEventRow) error {
|
||||
if d.conn == nil {
|
||||
return nil
|
||||
}
|
||||
toolArgs := e.ToolArgs
|
||||
if toolArgs == "" {
|
||||
toolArgs = "null"
|
||||
}
|
||||
usageJSON := e.UsageJSON
|
||||
if usageJSON == "" {
|
||||
usageJSON = "null"
|
||||
}
|
||||
var toolSuccessVal interface{}
|
||||
if e.EventType == "tool_call" {
|
||||
if e.ToolSuccess {
|
||||
toolSuccessVal = 1
|
||||
} else {
|
||||
toolSuccessVal = 0
|
||||
}
|
||||
}
|
||||
_, err := d.conn.Exec(`
|
||||
INSERT INTO chatEvents
|
||||
(sessionId, seq, eventType, content, toolName, toolArgs,
|
||||
toolResult, toolSuccess, durationMs, model, usageJson, errorMsg)
|
||||
SELECT ?, COALESCE(MAX(seq),0)+1, ?, ?, ?, ?,
|
||||
?, ?, ?, ?, ?, ?
|
||||
FROM chatEvents WHERE sessionId = ?
|
||||
`,
|
||||
e.SessionID, e.EventType,
|
||||
nullStr(e.Content), nullStr(e.ToolName), rawJSON(toolArgs),
|
||||
nullStr(e.ToolResult), toolSuccessVal, nullInt(e.DurationMs),
|
||||
nullStr(e.Model), rawJSON(usageJSON), nullStr(e.ErrorMsg),
|
||||
e.SessionID,
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("[DB] AppendEvent error: %v", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// MarkSessionDone updates a session to done/error status.
|
||||
func (d *DB) MarkSessionDone(sessionID, status, finalResponse, model, errorMessage string, totalTokens int, processingTimeMs int64) {
|
||||
if d.conn == nil {
|
||||
return
|
||||
}
|
||||
_, err := d.conn.Exec(`
|
||||
UPDATE chatSessions
|
||||
SET status=?, finalResponse=?, model=?, totalTokens=?,
|
||||
processingTimeMs=?, errorMessage=?
|
||||
WHERE sessionId=?
|
||||
`, status,
|
||||
truncate(finalResponse, 65535),
|
||||
model,
|
||||
totalTokens,
|
||||
processingTimeMs,
|
||||
truncate(errorMessage, 65535),
|
||||
sessionID,
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("[DB] MarkSessionDone error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// GetSession returns a single session by its string ID.
|
||||
func (d *DB) GetSession(sessionID string) (*ChatSessionRow, error) {
|
||||
if d.conn == nil {
|
||||
return nil, fmt.Errorf("DB not connected")
|
||||
}
|
||||
row := d.conn.QueryRow(`
|
||||
SELECT id, sessionId, agentId, status,
|
||||
COALESCE(userMessage,''),
|
||||
COALESCE(finalResponse,''),
|
||||
COALESCE(model,''),
|
||||
COALESCE(totalTokens,0),
|
||||
COALESCE(processingTimeMs,0),
|
||||
COALESCE(errorMessage,''),
|
||||
createdAt, updatedAt
|
||||
FROM chatSessions WHERE sessionId=? LIMIT 1
|
||||
`, sessionID)
|
||||
var s ChatSessionRow
|
||||
err := row.Scan(&s.ID, &s.SessionID, &s.AgentID, &s.Status,
|
||||
&s.UserMessage, &s.FinalResponse, &s.Model,
|
||||
&s.TotalTokens, &s.ProcessingTimeMs, &s.ErrorMessage,
|
||||
&s.CreatedAt, &s.UpdatedAt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &s, nil
|
||||
}
|
||||
|
||||
// GetEvents returns all events for a session with seq > afterSeq (for incremental polling).
|
||||
func (d *DB) GetEvents(sessionID string, afterSeq int) ([]ChatEventRow, error) {
|
||||
if d.conn == nil {
|
||||
return nil, fmt.Errorf("DB not connected")
|
||||
}
|
||||
rows, err := d.conn.Query(`
|
||||
SELECT id, sessionId, seq, eventType,
|
||||
COALESCE(content,''), COALESCE(toolName,''),
|
||||
COALESCE(CAST(toolArgs AS CHAR),'null'),
|
||||
COALESCE(toolResult,''),
|
||||
COALESCE(toolSuccess,0),
|
||||
COALESCE(durationMs,0),
|
||||
COALESCE(model,''),
|
||||
COALESCE(CAST(usageJson AS CHAR),'null'),
|
||||
COALESCE(errorMsg,''),
|
||||
createdAt
|
||||
FROM chatEvents
|
||||
WHERE sessionId=? AND seq > ?
|
||||
ORDER BY seq ASC
|
||||
`, sessionID, afterSeq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var result []ChatEventRow
|
||||
for rows.Next() {
|
||||
var e ChatEventRow
|
||||
var toolSuccess int
|
||||
if err := rows.Scan(
|
||||
&e.ID, &e.SessionID, &e.Seq, &e.EventType,
|
||||
&e.Content, &e.ToolName, &e.ToolArgs,
|
||||
&e.ToolResult, &toolSuccess, &e.DurationMs,
|
||||
&e.Model, &e.UsageJSON, &e.ErrorMsg, &e.CreatedAt,
|
||||
); err != nil {
|
||||
continue
|
||||
}
|
||||
e.ToolSuccess = toolSuccess == 1
|
||||
result = append(result, e)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// GetRecentSessions returns the N most recent sessions.
|
||||
func (d *DB) GetRecentSessions(limit int) ([]ChatSessionRow, error) {
|
||||
if d.conn == nil {
|
||||
return nil, fmt.Errorf("DB not connected")
|
||||
}
|
||||
rows, err := d.conn.Query(`
|
||||
SELECT id, sessionId, agentId, status,
|
||||
COALESCE(userMessage,''),
|
||||
COALESCE(finalResponse,''),
|
||||
COALESCE(model,''),
|
||||
COALESCE(totalTokens,0),
|
||||
COALESCE(processingTimeMs,0),
|
||||
COALESCE(errorMessage,''),
|
||||
createdAt, updatedAt
|
||||
FROM chatSessions ORDER BY id DESC LIMIT ?
|
||||
`, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var result []ChatSessionRow
|
||||
for rows.Next() {
|
||||
var s ChatSessionRow
|
||||
if err := rows.Scan(&s.ID, &s.SessionID, &s.AgentID, &s.Status,
|
||||
&s.UserMessage, &s.FinalResponse, &s.Model,
|
||||
&s.TotalTokens, &s.ProcessingTimeMs, &s.ErrorMessage,
|
||||
&s.CreatedAt, &s.UpdatedAt); err != nil {
|
||||
continue
|
||||
}
|
||||
result = append(result, s)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// helper — nil for empty strings
|
||||
func nullStr(s string) interface{} {
|
||||
if s == "" {
|
||||
return nil
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// helper — nil for zero int
|
||||
func nullInt(n int) interface{} {
|
||||
if n == 0 {
|
||||
return nil
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// rawJSON wraps a JSON string so it's passed as-is to MySQL (not double-encoded)
|
||||
type rawJSON string
|
||||
|
||||
func (r rawJSON) Value() (driver.Value, error) {
|
||||
if r == "null" || r == "" {
|
||||
return nil, nil
|
||||
}
|
||||
return string(r), nil
|
||||
}
|
||||
|
||||
// ─── Metrics & History ────────────────────────────────────────────────────────
|
||||
|
||||
// MetricInput holds data for a single orchestrator request metric.
|
||||
|
||||
@@ -370,3 +370,113 @@ export async function getGatewayNodeStats(): Promise<GatewayNodeStatsResult | nu
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Persistent Chat Sessions ─────────────────────────────────────────────────
|
||||
|
||||
export interface GatewayChatEvent {
|
||||
id: number;
|
||||
sessionId: string;
|
||||
seq: number;
|
||||
eventType: "thinking" | "tool_call" | "delta" | "done" | "error";
|
||||
content: string;
|
||||
toolName: string;
|
||||
toolArgs: string; // JSON string
|
||||
toolResult: string;
|
||||
toolSuccess: boolean;
|
||||
durationMs: number;
|
||||
model: string;
|
||||
usageJson: string; // JSON string
|
||||
errorMsg: string;
|
||||
createdAt: string;
|
||||
}
|
||||
|
||||
export interface GatewayChatSession {
|
||||
id: number;
|
||||
sessionId: string;
|
||||
agentId: number;
|
||||
status: "running" | "done" | "error";
|
||||
userMessage: string;
|
||||
finalResponse: string;
|
||||
model: string;
|
||||
totalTokens: number;
|
||||
processingTimeMs: number;
|
||||
errorMessage: string;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a persistent background chat session.
|
||||
* Returns the sessionId immediately; processing continues on the server.
|
||||
*/
|
||||
export async function startChatSession(
|
||||
messages: GatewayMessage[],
|
||||
sessionId: string,
|
||||
model?: string,
|
||||
maxIter = 10
|
||||
): Promise<{ sessionId: string; status: string } | null> {
|
||||
try {
|
||||
const res = await fetch(`${GATEWAY_BASE_URL}/api/chat/session`, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ messages, sessionId, model, maxIter }),
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
});
|
||||
if (!res.ok) return null;
|
||||
return res.json();
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get session metadata (status, finalResponse, tokens…).
|
||||
*/
|
||||
export async function getChatSession(sessionId: string): Promise<GatewayChatSession | null> {
|
||||
try {
|
||||
const res = await fetch(`${GATEWAY_BASE_URL}/api/chat/session/${sessionId}`, {
|
||||
signal: AbortSignal.timeout(5_000),
|
||||
});
|
||||
if (!res.ok) return null;
|
||||
return res.json();
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch events for a session with seq > afterSeq.
|
||||
* Returns { sessionId, status, events[] }.
|
||||
*/
|
||||
export async function getChatEvents(
|
||||
sessionId: string,
|
||||
afterSeq = 0
|
||||
): Promise<{ sessionId: string; status: string; events: GatewayChatEvent[] } | null> {
|
||||
try {
|
||||
const res = await fetch(
|
||||
`${GATEWAY_BASE_URL}/api/chat/session/${sessionId}/events?after=${afterSeq}`,
|
||||
{ signal: AbortSignal.timeout(5_000) }
|
||||
);
|
||||
if (!res.ok) return null;
|
||||
return res.json();
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* List recent sessions (default last 50).
|
||||
*/
|
||||
export async function listChatSessions(
|
||||
limit = 50
|
||||
): Promise<{ sessions: GatewayChatSession[] } | null> {
|
||||
try {
|
||||
const res = await fetch(`${GATEWAY_BASE_URL}/api/chat/sessions?limit=${limit}`, {
|
||||
signal: AbortSignal.timeout(5_000),
|
||||
});
|
||||
if (!res.ok) return null;
|
||||
return res.json();
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,10 @@ import {
|
||||
isGatewayAvailable,
|
||||
getGatewayNodes,
|
||||
getGatewayNodeStats,
|
||||
startChatSession,
|
||||
getChatSession,
|
||||
getChatEvents,
|
||||
listChatSessions,
|
||||
} from "./gateway-proxy";
|
||||
|
||||
// Shared system user id for non-authenticated agent management
|
||||
@@ -719,6 +723,69 @@ export const appRouter = router({
|
||||
.mutation(async ({ input }) => {
|
||||
return executeGatewayTool(input.tool, input.args);
|
||||
}),
|
||||
|
||||
// ── Persistent Background Chat Sessions ──────────────────────────────────
|
||||
// These routes start a session on the Go Gateway and return immediately.
|
||||
// The Go Gateway runs the orchestrator in a detached goroutine — survives
|
||||
// HTTP disconnects, page reloads, and laptop sleep.
|
||||
// The client polls getEvents until status === "done" | "error".
|
||||
|
||||
/** Start a background session. Returns { sessionId, status:"running" }. */
|
||||
startSession: publicProcedure
|
||||
.input(
|
||||
z.object({
|
||||
messages: z.array(
|
||||
z.object({
|
||||
role: z.enum(["user", "assistant", "system"]),
|
||||
content: z.string(),
|
||||
})
|
||||
),
|
||||
sessionId: z.string(),
|
||||
model: z.string().optional(),
|
||||
maxIter: z.number().min(1).max(20).optional(),
|
||||
})
|
||||
)
|
||||
.mutation(async ({ input }) => {
|
||||
const result = await startChatSession(
|
||||
input.messages,
|
||||
input.sessionId,
|
||||
input.model,
|
||||
input.maxIter ?? 10
|
||||
);
|
||||
if (!result) throw new Error("Gateway unavailable — cannot start background session");
|
||||
return result;
|
||||
}),
|
||||
|
||||
/** Get session metadata (status, finalResponse, tokens, model…). */
|
||||
getSession: publicProcedure
|
||||
.input(z.object({ sessionId: z.string() }))
|
||||
.query(async ({ input }) => {
|
||||
const sess = await getChatSession(input.sessionId);
|
||||
if (!sess) throw new Error("Session not found");
|
||||
return sess;
|
||||
}),
|
||||
|
||||
/** Get events for a session after a given seq number (incremental polling). */
|
||||
getEvents: publicProcedure
|
||||
.input(
|
||||
z.object({
|
||||
sessionId: z.string(),
|
||||
afterSeq: z.number().min(0).default(0),
|
||||
})
|
||||
)
|
||||
.query(async ({ input }) => {
|
||||
const result = await getChatEvents(input.sessionId, input.afterSeq);
|
||||
if (!result) return { sessionId: input.sessionId, status: "unknown", events: [] };
|
||||
return result;
|
||||
}),
|
||||
|
||||
/** List recent sessions (default last 50). */
|
||||
listSessions: publicProcedure
|
||||
.input(z.object({ limit: z.number().min(1).max(200).default(50) }))
|
||||
.query(async ({ input }) => {
|
||||
const result = await listChatSessions(input.limit);
|
||||
return result?.sessions ?? [];
|
||||
}),
|
||||
}),
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user