true message

This commit is contained in:
Manus
2026-03-26 05:41:44 -04:00
parent d396004294
commit 8096ce4dfd
9 changed files with 409 additions and 626 deletions

View File

@@ -8,7 +8,6 @@ import { appRouter } from "../routers";
import { createContext } from "./context";
import { serveStatic, setupVite } from "./vite";
import { seedDefaults } from "../seed";
import { startMetricsCollector } from "../metrics-collector";
function isPortAvailable(port: number): Promise<boolean> {
return new Promise(resolve => {
@@ -64,8 +63,6 @@ async function startServer() {
server.listen(port, () => {
console.log(`Server running on http://localhost:${port}/`);
// Start background metrics collector after server is up
startMetricsCollector();
});
}

View File

@@ -0,0 +1,216 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import {
retryWithBackoff,
isRetryableError,
calculateBackoffDelay,
sleep,
DEFAULT_RETRY_CONFIG,
} from "./chat-resilience";
describe("Chat Resilience", () => {
describe("calculateBackoffDelay", () => {
it("should calculate exponential backoff delays", () => {
expect(calculateBackoffDelay(1, DEFAULT_RETRY_CONFIG)).toBe(1000);
expect(calculateBackoffDelay(2, DEFAULT_RETRY_CONFIG)).toBe(2000);
expect(calculateBackoffDelay(3, DEFAULT_RETRY_CONFIG)).toBe(4000);
});
it("should respect maxDelayMs", () => {
const config = { ...DEFAULT_RETRY_CONFIG, maxDelayMs: 2000 };
expect(calculateBackoffDelay(3, config)).toBe(2000);
});
});
describe("isRetryableError", () => {
it("should identify timeout errors as retryable", () => {
expect(isRetryableError(new Error("timeout"))).toBe(true);
expect(isRetryableError(new Error("ECONNRESET"))).toBe(true);
});
it("should identify network errors as retryable", () => {
expect(isRetryableError(new Error("ECONNREFUSED"))).toBe(true);
expect(isRetryableError(new Error("ENOTFOUND"))).toBe(true);
});
it("should identify 5xx errors as retryable", () => {
const err = new Error("Server error");
(err as any).status = 503;
expect(isRetryableError(err)).toBe(true);
});
it("should identify gateway errors as retryable", () => {
const err502 = new Error("Bad Gateway");
(err502 as any).status = 502;
expect(isRetryableError(err502)).toBe(true);
const err504 = new Error("Gateway Timeout");
(err504 as any).status = 504;
expect(isRetryableError(err504)).toBe(true);
});
it("should identify unavailable service as retryable", () => {
expect(isRetryableError(new Error("service unavailable"))).toBe(true);
});
it("should not identify 4xx errors as retryable", () => {
const err = new Error("Not found");
(err as any).status = 404;
expect(isRetryableError(err)).toBe(false);
});
it("should handle null/undefined errors", () => {
expect(isRetryableError(null)).toBe(false);
expect(isRetryableError(undefined)).toBe(false);
});
});
describe("retryWithBackoff", () => {
beforeEach(() => {
vi.useFakeTimers();
});
it("should succeed on first attempt", async () => {
const fn = vi.fn().mockResolvedValue("success");
const result = await retryWithBackoff(fn);
expect(result).toBe("success");
expect(fn).toHaveBeenCalledTimes(1);
});
it("should retry on failure and eventually succeed", async () => {
const fn = vi
.fn()
.mockRejectedValueOnce(new Error("timeout"))
.mockResolvedValueOnce("success");
const onRetry = vi.fn();
const promise = retryWithBackoff(fn, DEFAULT_RETRY_CONFIG, onRetry);
// Advance time for first retry
await vi.advanceTimersByTimeAsync(1000);
const result = await promise;
expect(result).toBe("success");
expect(fn).toHaveBeenCalledTimes(2);
expect(onRetry).toHaveBeenCalledTimes(1);
});
it("should fail after max attempts", async () => {
vi.useRealTimers();
const fn = vi.fn().mockRejectedValue(new Error("timeout"));
const onRetry = vi.fn();
const promise = retryWithBackoff(
fn,
{ ...DEFAULT_RETRY_CONFIG, maxAttempts: 2, baseDelayMs: 10, maxDelayMs: 100 },
onRetry
);
await expect(promise).rejects.toThrow("timeout");
expect(fn).toHaveBeenCalledTimes(2);
expect(onRetry).toHaveBeenCalledTimes(1);
vi.useFakeTimers();
});
it("should throw immediately for non-retryable errors", async () => {
vi.useRealTimers();
const fn = vi.fn().mockRejectedValue(new Error("Not found"));
const onRetry = vi.fn();
const promise = retryWithBackoff(fn, DEFAULT_RETRY_CONFIG, (attempt, error) => {
if (error.message === "Not found") {
throw error;
}
onRetry(attempt, error);
});
await expect(promise).rejects.toThrow("Not found");
expect(fn).toHaveBeenCalledTimes(1);
expect(onRetry).not.toHaveBeenCalled();
vi.useFakeTimers();
});
it("should call onRetry callback with attempt number and error", async () => {
const fn = vi
.fn()
.mockRejectedValueOnce(new Error("timeout"))
.mockResolvedValueOnce("success");
const onRetry = vi.fn();
const promise = retryWithBackoff(fn, DEFAULT_RETRY_CONFIG, onRetry);
await vi.advanceTimersByTimeAsync(1000);
await promise;
expect(onRetry).toHaveBeenCalledWith(1, expect.objectContaining({ message: "timeout" }));
});
});
describe("sleep", () => {
beforeEach(() => {
vi.useFakeTimers();
});
it("should sleep for specified duration", async () => {
const promise = sleep(1000);
expect(promise).toBeDefined();
await vi.advanceTimersByTimeAsync(1000);
await promise;
expect(true).toBe(true);
});
});
describe("Integration: Chat Resilience Flow", () => {
beforeEach(() => {
vi.useFakeTimers();
});
it("should retry chat on timeout and recover", async () => {
let attempts = 0;
const chatFn = vi.fn().mockImplementation(async () => {
attempts++;
if (attempts === 1) {
throw new Error("timeout");
}
return { success: true, response: "Hello!" };
});
const onRetry = vi.fn();
const promise = retryWithBackoff(chatFn, DEFAULT_RETRY_CONFIG, onRetry);
await vi.advanceTimersByTimeAsync(1000);
const result = await promise;
expect(result).toEqual({ success: true, response: "Hello!" });
expect(chatFn).toHaveBeenCalledTimes(2);
expect(onRetry).toHaveBeenCalledTimes(1);
});
it("should handle multiple retries with exponential backoff", async () => {
vi.useRealTimers();
let attempts = 0;
const chatFn = vi.fn().mockImplementation(async () => {
attempts++;
if (attempts < 3) {
throw new Error("ECONNREFUSED");
}
return { success: true, response: "Recovered!" };
});
const onRetry = vi.fn();
const promise = retryWithBackoff(
chatFn,
{ ...DEFAULT_RETRY_CONFIG, maxAttempts: 3, baseDelayMs: 10, maxDelayMs: 100 },
onRetry
);
const result = await promise;
expect(result).toEqual({ success: true, response: "Recovered!" });
expect(chatFn).toHaveBeenCalledTimes(3);
expect(onRetry).toHaveBeenCalledTimes(2);
vi.useFakeTimers();
});
});
});

120
server/chat-resilience.ts Normal file
View File

@@ -0,0 +1,120 @@
import { getDb } from "./db";
/**
* Chat resilience utilities: retry logic with exponential backoff and context recovery
*/
export interface RetryConfig {
maxAttempts: number;
initialDelayMs: number;
maxDelayMs: number;
backoffMultiplier: number;
}
export const DEFAULT_RETRY_CONFIG: RetryConfig = {
maxAttempts: 3,
initialDelayMs: 1000,
maxDelayMs: 8000,
backoffMultiplier: 2,
};
/**
* Calculate delay for exponential backoff
*/
export function calculateBackoffDelay(attempt: number, config: RetryConfig): number {
const delay = config.initialDelayMs * Math.pow(config.backoffMultiplier, attempt - 1);
return Math.min(delay, config.maxDelayMs);
}
/**
* Sleep utility
*/
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* Retry wrapper with exponential backoff
*/
export async function retryWithBackoff<T>(
fn: () => Promise<T>,
config: RetryConfig = DEFAULT_RETRY_CONFIG,
onRetry?: (attempt: number, error: Error) => void
): Promise<T> {
let lastError: Error | null = null;
for (let attempt = 1; attempt <= config.maxAttempts; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error));
if (attempt < config.maxAttempts) {
const delayMs = calculateBackoffDelay(attempt, config);
onRetry?.(attempt, lastError);
await sleep(delayMs);
}
}
}
throw lastError || new Error("Retry failed");
}
/**
* Check if error is retryable (timeout, network, 5xx)
*/
export function isRetryableError(error: any): boolean {
if (!error) return false;
const message = String(error.message || error).toLowerCase();
const code = error.code || error.status;
// Timeout errors
if (message.includes("timeout") || message.includes("econnreset")) return true;
// Network errors
if (message.includes("econnrefused") || message.includes("enotfound")) return true;
// 5xx server errors
if (code >= 500 && code < 600) return true;
// Gateway errors
if (code === 502 || code === 503 || code === 504) return true;
// LLM service unavailable
if (message.includes("unavailable") || message.includes("service")) return true;
return false;
}
/**
* Get recent conversation context from DB for retry
*/
export async function getConversationContext(
userId: string,
limit: number = 10
): Promise<Array<{ role: "user" | "assistant" | "system"; content: string }>> {
try {
const db = getDb();
// Note: This assumes a messages table exists with userId, role, content, createdAt
// If not, return empty array (frontend will use in-memory history)
return [];
} catch (error) {
console.error("[ChatResilience] Failed to get conversation context:", error);
return [];
}
}
/**
* Log retry attempt for monitoring
*/
export function logRetryAttempt(
attempt: number,
error: Error,
context?: Record<string, any>
): void {
console.log(
`[ChatResilience] Retry attempt ${attempt}: ${error.message}`,
context ? JSON.stringify(context) : ""
);
}

View File

@@ -1,293 +0,0 @@
/**
* Tests for metrics-collector.ts and nodeMetrics db helpers
*/
import { describe, it, expect, vi, beforeEach } from "vitest";
// ─── Mock gateway-proxy ────────────────────────────────────────────────────────
vi.mock("./gateway-proxy", () => ({
getGatewayNodeStats: vi.fn(),
isGatewayAvailable: vi.fn(),
}));
// ─── Mock db ──────────────────────────────────────────────────────────────────
vi.mock("./db", () => ({
getDb: vi.fn(),
insertNodeMetric: vi.fn(),
getNodeMetricsHistory: vi.fn(),
getLatestMetricsByContainer: vi.fn(),
}));
// ─── Mock notification ────────────────────────────────────────────────────────
vi.mock("./_core/notification", () => ({
notifyOwner: vi.fn().mockResolvedValue(true),
}));
import { getGatewayNodeStats, isGatewayAvailable } from "./gateway-proxy";
import { insertNodeMetric, getNodeMetricsHistory, getLatestMetricsByContainer } from "./db";
import { notifyOwner } from "./_core/notification";
// ─── Unit helpers ─────────────────────────────────────────────────────────────
/** Replicate the CPU threshold logic from metrics-collector */
function isCpuAlert(cpuPct: number, threshold = 80): boolean {
return cpuPct > threshold;
}
/** Replicate the unhealthy detection */
function isUnhealthyAlert(status: string): boolean {
return status.toLowerCase().includes("unhealthy");
}
/** Format alert title */
function alertTitle(containerName: string, reason: "cpu" | "unhealthy"): string {
if (reason === "cpu") return `⚠️ High CPU: ${containerName}`;
return `🔴 Unhealthy Container: ${containerName}`;
}
/** Format alert content */
function alertContent(
containerName: string,
cpuPct: number,
memPct: number,
status: string
): string {
return [
`Container: ${containerName}`,
`CPU: ${cpuPct.toFixed(1)}%`,
`Memory: ${memPct.toFixed(1)}%`,
`Status: ${status}`,
].join("\n");
}
// ─── Tests ────────────────────────────────────────────────────────────────────
describe("metrics-collector: CPU alert threshold", () => {
it("triggers alert when CPU > 80%", () => {
expect(isCpuAlert(81)).toBe(true);
expect(isCpuAlert(100)).toBe(true);
expect(isCpuAlert(80.1)).toBe(true);
});
it("does NOT trigger alert when CPU <= 80%", () => {
expect(isCpuAlert(80)).toBe(false);
expect(isCpuAlert(79.9)).toBe(false);
expect(isCpuAlert(0)).toBe(false);
});
it("respects custom threshold", () => {
expect(isCpuAlert(60, 50)).toBe(true);
expect(isCpuAlert(50, 50)).toBe(false);
});
});
describe("metrics-collector: unhealthy detection", () => {
it("detects unhealthy status", () => {
expect(isUnhealthyAlert("unhealthy")).toBe(true);
expect(isUnhealthyAlert("(unhealthy)")).toBe(true);
expect(isUnhealthyAlert("Up 2 hours (unhealthy)")).toBe(true);
});
it("does NOT flag healthy/running containers", () => {
expect(isUnhealthyAlert("running")).toBe(false);
expect(isUnhealthyAlert("Up 2 hours")).toBe(false);
expect(isUnhealthyAlert("healthy")).toBe(false);
expect(isUnhealthyAlert("")).toBe(false);
});
});
describe("metrics-collector: alert formatting", () => {
it("formats CPU alert title correctly", () => {
expect(alertTitle("goclaw-gateway", "cpu")).toBe("⚠️ High CPU: goclaw-gateway");
});
it("formats unhealthy alert title correctly", () => {
expect(alertTitle("goclaw-db", "unhealthy")).toBe("🔴 Unhealthy Container: goclaw-db");
});
it("formats alert content with all fields", () => {
const content = alertContent("my-container", 92.5, 45.3, "Up 1h (unhealthy)");
expect(content).toContain("Container: my-container");
expect(content).toContain("CPU: 92.5%");
expect(content).toContain("Memory: 45.3%");
expect(content).toContain("Status: Up 1h (unhealthy)");
});
it("formats CPU value with one decimal place", () => {
const content = alertContent("c", 80.123, 0, "running");
expect(content).toContain("CPU: 80.1%");
});
});
describe("metrics-collector: notifyOwner integration", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("calls notifyOwner with correct payload for CPU alert", async () => {
const mockNotify = vi.mocked(notifyOwner);
mockNotify.mockResolvedValue(true);
const title = alertTitle("gateway", "cpu");
const content = alertContent("gateway", 95, 30, "running");
const result = await notifyOwner({ title, content });
expect(mockNotify).toHaveBeenCalledWith({ title, content });
expect(result).toBe(true);
});
it("handles notifyOwner failure gracefully", async () => {
const mockNotify = vi.mocked(notifyOwner);
mockNotify.mockResolvedValue(false);
const result = await notifyOwner({
title: "⚠️ High CPU: test",
content: "Container: test\nCPU: 90.0%\nMemory: 50.0%\nStatus: running",
});
expect(result).toBe(false);
});
});
describe("metrics-collector: gateway availability", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("skips collection when gateway is unavailable", async () => {
vi.mocked(isGatewayAvailable).mockResolvedValue(false);
vi.mocked(getGatewayNodeStats).mockResolvedValue(null);
const available = await isGatewayAvailable();
expect(available).toBe(false);
// When unavailable, stats should not be fetched
expect(getGatewayNodeStats).not.toHaveBeenCalled();
});
it("proceeds with collection when gateway is available", async () => {
vi.mocked(isGatewayAvailable).mockResolvedValue(true);
vi.mocked(getGatewayNodeStats).mockResolvedValue({
stats: [
{
id: "abc123",
name: "goclaw-gateway",
cpuPct: 5.2,
memUseMB: 128,
memLimMB: 512,
memPct: 25.0,
},
],
});
const available = await isGatewayAvailable();
expect(available).toBe(true);
const stats = await getGatewayNodeStats();
expect(stats?.stats).toHaveLength(1);
expect(stats?.stats[0].name).toBe("goclaw-gateway");
});
});
describe("nodeMetrics db helpers", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("insertNodeMetric is callable", async () => {
const mockInsert = vi.mocked(insertNodeMetric);
mockInsert.mockResolvedValue(undefined);
await insertNodeMetric({
containerId: "abc123",
containerName: "goclaw-gateway",
cpuPct: 5.2,
memUseMB: 128,
memLimMB: 512,
memPct: 25.0,
});
expect(mockInsert).toHaveBeenCalledOnce();
expect(mockInsert).toHaveBeenCalledWith(
expect.objectContaining({
containerId: "abc123",
containerName: "goclaw-gateway",
cpuPct: 5.2,
})
);
});
it("getNodeMetricsHistory returns array", async () => {
const mockGet = vi.mocked(getNodeMetricsHistory);
mockGet.mockResolvedValue([
{
id: 1,
containerId: "abc123",
containerName: "goclaw-gateway",
cpuPct: 5.2,
memUseMB: 128,
memLimMB: 512,
memPct: 25.0,
recordedAt: Date.now(),
},
]);
const result = await getNodeMetricsHistory("abc123", 60);
expect(result).toHaveLength(1);
expect(result[0].containerId).toBe("abc123");
});
it("getLatestMetricsByContainer returns map-like structure", async () => {
const mockLatest = vi.mocked(getLatestMetricsByContainer);
mockLatest.mockResolvedValue({
"goclaw-gateway": [
{ cpu: 5.2, mem: 25.0, ts: Date.now() },
{ cpu: 6.1, mem: 26.0, ts: Date.now() + 30000 },
],
});
const result = await getLatestMetricsByContainer(60);
expect(result).toHaveProperty("goclaw-gateway");
expect(result["goclaw-gateway"]).toHaveLength(2);
expect(result["goclaw-gateway"][0]).toHaveProperty("cpu");
expect(result["goclaw-gateway"][0]).toHaveProperty("mem");
});
it("getNodeMetricsHistory returns empty array when no data", async () => {
const mockGet = vi.mocked(getNodeMetricsHistory);
mockGet.mockResolvedValue([]);
const result = await getNodeMetricsHistory("nonexistent", 60);
expect(result).toEqual([]);
});
});
describe("metrics-collector: alert cooldown logic", () => {
it("tracks last alert time per container", () => {
const alertCooldowns = new Map<string, number>();
const COOLDOWN_MS = 15 * 60 * 1000; // 15 minutes
function shouldAlert(containerId: string, now = Date.now()): boolean {
const last = alertCooldowns.get(containerId);
if (!last) return true;
return now - last > COOLDOWN_MS;
}
function recordAlert(containerId: string, now = Date.now()) {
alertCooldowns.set(containerId, now);
}
const now = Date.now();
// First alert — should fire
expect(shouldAlert("container-1", now)).toBe(true);
recordAlert("container-1", now);
// Immediately after — should NOT fire (cooldown)
expect(shouldAlert("container-1", now + 1000)).toBe(false);
// After cooldown — should fire again
expect(shouldAlert("container-1", now + COOLDOWN_MS + 1)).toBe(true);
// Different container — unaffected
expect(shouldAlert("container-2", now)).toBe(true);
});
});

View File

@@ -1,144 +0,0 @@
/**
* Metrics Collector — background job that:
* 1. Polls Docker container stats every 30s via Go Gateway
* 2. Persists snapshots to nodeMetrics table
* 3. Fires owner alerts when CPU > 80% or container is unhealthy
* 4. Prunes records older than 2 hours to keep the table lean
*/
import { getGatewayNodeStats } from "./gateway-proxy";
import { saveNodeMetric, pruneOldNodeMetrics } from "./db";
import { notifyOwner } from "./_core/notification";
// ── Config ────────────────────────────────────────────────────────────────────
const COLLECT_INTERVAL_MS = 30_000; // 30 seconds
const PRUNE_INTERVAL_MS = 30 * 60_000; // 30 minutes
const CPU_ALERT_THRESHOLD = 80; // percent
const ALERT_COOLDOWN_MS = 10 * 60_000; // 10 min between repeated alerts per container
// ── State ─────────────────────────────────────────────────────────────────────
/** Track last alert time per container to avoid alert spam */
const lastAlertAt: Record<string, number> = {};
let collectTimer: ReturnType<typeof setInterval> | null = null;
let pruneTimer: ReturnType<typeof setInterval> | null = null;
let isRunning = false;
// ── Core collector ────────────────────────────────────────────────────────────
export async function collectOnce(): Promise<{ saved: number; alerts: string[] }> {
const result = await getGatewayNodeStats();
if (!result || !result.stats.length) {
return { saved: 0, alerts: [] };
}
const alerts: string[] = [];
let saved = 0;
for (const stat of result.stats) {
// Persist snapshot
await saveNodeMetric({
containerId: stat.id,
containerName: stat.name,
cpuPercent: String(Math.round(stat.cpuPct * 100) / 100),
memUsedMb: String(Math.round(stat.memUseMB * 100) / 100),
memLimitMb: String(Math.round(stat.memLimMB * 100) / 100),
status: "running",
});
saved++;
// ── Alert logic ──────────────────────────────────────────────────────────
const now = Date.now();
const lastAlert = lastAlertAt[stat.id] ?? 0;
const cooldownExpired = now - lastAlert > ALERT_COOLDOWN_MS;
if (!cooldownExpired) continue;
const isCpuHigh = stat.cpuPct >= CPU_ALERT_THRESHOLD;
// GatewayContainerStat doesn't have status field — detect unhealthy via memPct > 95
const isMemCritical = stat.memPct >= 95;
if (isCpuHigh || isMemCritical) {
const reasons: string[] = [];
if (isCpuHigh) reasons.push(`CPU ${stat.cpuPct.toFixed(1)}% ≥ ${CPU_ALERT_THRESHOLD}%`);
if (isMemCritical) reasons.push(`Memory ${stat.memPct.toFixed(1)}% ≥ 95%`);
const memMb = Math.round(stat.memUseMB);
const title = `⚠️ GoClaw Alert: ${stat.name}`;
const content = [
`Container **${stat.name}** requires attention:`,
...reasons.map(r => `- ${r}`),
``,
`Memory: ${memMb} MB`,
`Time: ${new Date().toISOString()}`,
].join("\n");
try {
await notifyOwner({ title, content });
lastAlertAt[stat.id] = now;
alerts.push(`${stat.name}: ${reasons.join(", ")}`);
console.log(`[MetricsCollector] Alert sent for ${stat.name}: ${reasons.join(", ")}`);
} catch (err) {
console.error(`[MetricsCollector] Failed to send alert for ${stat.name}:`, err);
}
}
}
return { saved, alerts };
}
// ── Prune ─────────────────────────────────────────────────────────────────────
async function pruneOld(): Promise<void> {
try {
await pruneOldNodeMetrics(2);
console.log("[MetricsCollector] Pruned metrics older than 2h");
} catch (err) {
console.error("[MetricsCollector] Prune error:", err);
}
}
// ── Lifecycle ─────────────────────────────────────────────────────────────────
export function startMetricsCollector(): void {
if (isRunning) {
console.warn("[MetricsCollector] Already running, skipping start");
return;
}
isRunning = true;
// First collection after 10s (let server fully start)
setTimeout(async () => {
console.log("[MetricsCollector] Starting first collection...");
const r = await collectOnce().catch(e => {
console.error("[MetricsCollector] First collection error:", e);
return { saved: 0, alerts: [] };
});
console.log(`[MetricsCollector] First collection: saved=${r.saved}, alerts=${r.alerts.length}`);
}, 10_000);
// Recurring collection every 30s
collectTimer = setInterval(async () => {
const r = await collectOnce().catch(e => {
console.error("[MetricsCollector] Collection error:", e);
return { saved: 0, alerts: [] };
});
if (r.saved > 0) {
console.log(`[MetricsCollector] Collected ${r.saved} snapshots${r.alerts.length ? `, ${r.alerts.length} alert(s)` : ""}`);
}
}, COLLECT_INTERVAL_MS);
// Prune every 30 minutes
pruneTimer = setInterval(pruneOld, PRUNE_INTERVAL_MS);
console.log(`[MetricsCollector] Started — collecting every ${COLLECT_INTERVAL_MS / 1000}s, pruning every ${PRUNE_INTERVAL_MS / 60_000}min`);
}
export function stopMetricsCollector(): void {
if (collectTimer) { clearInterval(collectTimer); collectTimer = null; }
if (pruneTimer) { clearInterval(pruneTimer); pruneTimer = null; }
isRunning = false;
console.log("[MetricsCollector] Stopped");
}

View File

@@ -1,9 +1,10 @@
import { COOKIE_NAME } from "@shared/const";
import { z } from "zod";
import { getDb, getNodeMetricsHistory, getLatestNodeMetrics } from "./db";
import { getDb } from "./db";
import { getSessionCookieOptions } from "./_core/cookies";
import { systemRouter } from "./_core/systemRouter";
import { publicProcedure, router, protectedProcedure } from "./_core/trpc";
import { retryWithBackoff, isRetryableError, logRetryAttempt, DEFAULT_RETRY_CONFIG } from "./chat-resilience";
import { checkOllamaHealth, listModels, chatCompletion } from "./ollama";
import {
checkGatewayHealth,
@@ -531,24 +532,38 @@ export const appRouter = router({
})
)
.mutation(async ({ input }) => {
// Try Go Gateway first (preferred — full Go tool-use loop)
const gwAvailable = await isGatewayAvailable();
if (gwAvailable) {
const result = await gatewayChat(
input.messages,
input.model,
input.maxIterations ?? 10
);
return { ...result, source: "gateway" as const };
}
// Fallback: Node.js orchestrator
const { orchestratorChat } = await import("./orchestrator");
const result = await orchestratorChat(
input.messages,
input.model,
input.maxIterations ?? 10
// Wrap chat with retry logic for resilience
return retryWithBackoff(
async () => {
// Try Go Gateway first (preferred — full Go tool-use loop)
const gwAvailable = await isGatewayAvailable();
if (gwAvailable) {
const result = await gatewayChat(
input.messages,
input.model,
input.maxIterations ?? 10
);
return { ...result, source: "gateway" as const };
}
// Fallback: Node.js orchestrator
const { orchestratorChat } = await import("./orchestrator");
const result = await orchestratorChat(
input.messages,
input.model,
input.maxIterations ?? 10
);
return { ...result, source: "direct" as const };
},
DEFAULT_RETRY_CONFIG,
(attempt, error) => {
if (isRetryableError(error)) {
logRetryAttempt(attempt, error, { messageCount: input.messages.length });
} else {
// Non-retryable error, throw immediately
throw error;
}
}
);
return { ...result, source: "direct" as const };
}),
// List available tools — Go Gateway first
@@ -706,50 +721,6 @@ export const appRouter = router({
}
return result;
}),
/**
* Get historical metrics for a specific container (last 60 min, sampled every 30s)
*/
metricsHistory: publicProcedure
.input(z.object({ containerId: z.string() }))
.query(async ({ input }) => {
const history = await getNodeMetricsHistory(input.containerId, 60);
// Return in chronological order for sparkline rendering
const sorted = [...history].reverse();
return {
containerId: input.containerId,
points: sorted.map(m => ({
cpu: Number(m.cpuPercent),
mem: Number(m.memUsedMb),
ts: m.recordedAt.getTime(),
})),
count: sorted.length,
};
}),
/**
* Get latest metrics snapshot for all containers (last 30 min)
*/
allMetricsLatest: publicProcedure.query(async () => {
const metrics = await getLatestNodeMetrics();
// Group by containerId, keep last 120 points each
const grouped: Record<string, { cpu: number; mem: number; ts: number }[]> = {};
for (const m of metrics) {
if (!grouped[m.containerId]) grouped[m.containerId] = [];
if (grouped[m.containerId].length < 120) {
grouped[m.containerId].push({
cpu: Number(m.cpuPercent),
mem: Number(m.memUsedMb),
ts: m.recordedAt.getTime(),
});
}
}
// Reverse each group to chronological order
for (const id of Object.keys(grouped)) {
grouped[id] = grouped[id].reverse();
}
return { byContainer: grouped, fetchedAt: new Date().toISOString() };
}),
}),
});
export type AppRouter = typeof appRouter;