import { eq, gte, desc } from "drizzle-orm"; import { drizzle } from "drizzle-orm/mysql2"; import { InsertUser, users, nodeMetrics, InsertNodeMetric, NodeMetric } from "../drizzle/schema"; import { ENV } from './_core/env'; let _db: ReturnType | null = null; // Lazily create the drizzle instance so local tooling can run without a DB. export async function getDb() { if (!_db && process.env.DATABASE_URL) { try { _db = drizzle(process.env.DATABASE_URL); } catch (error) { console.warn("[Database] Failed to connect:", error); _db = null; } } return _db; } export async function upsertUser(user: InsertUser): Promise { if (!user.openId) { throw new Error("User openId is required for upsert"); } const db = await getDb(); if (!db) { console.warn("[Database] Cannot upsert user: database not available"); return; } try { const values: InsertUser = { openId: user.openId, }; const updateSet: Record = {}; const textFields = ["name", "email", "loginMethod"] as const; type TextField = (typeof textFields)[number]; const assignNullable = (field: TextField) => { const value = user[field]; if (value === undefined) return; const normalized = value ?? null; values[field] = normalized; updateSet[field] = normalized; }; textFields.forEach(assignNullable); if (user.lastSignedIn !== undefined) { values.lastSignedIn = user.lastSignedIn; updateSet.lastSignedIn = user.lastSignedIn; } if (user.role !== undefined) { values.role = user.role; updateSet.role = user.role; } else if (user.openId === ENV.ownerOpenId) { values.role = 'admin'; updateSet.role = 'admin'; } if (!values.lastSignedIn) { values.lastSignedIn = new Date(); } if (Object.keys(updateSet).length === 0) { updateSet.lastSignedIn = new Date(); } await db.insert(users).values(values).onDuplicateKeyUpdate({ set: updateSet, }); } catch (error) { console.error("[Database] Failed to upsert user:", error); throw error; } } export async function getUserByOpenId(openId: string) { const db = await getDb(); if (!db) { console.warn("[Database] Cannot get user: database not available"); return undefined; } const result = await db.select().from(users).where(eq(users.openId, openId)).limit(1); return result.length > 0 ? result[0] : undefined; } // ── Node Metrics ──────────────────────────────────────────────────────────── /** * Save a single node metric snapshot */ export async function saveNodeMetric(metric: InsertNodeMetric): Promise { const db = await getDb(); if (!db) return; try { await db.insert(nodeMetrics).values(metric); } catch (error) { console.error("[DB] Failed to save node metric:", error); } } /** * Get metrics history for a container over the last N minutes (default 60) */ export async function getNodeMetricsHistory( containerId: string, minutes = 60 ): Promise { const db = await getDb(); if (!db) return []; try { const since = new Date(Date.now() - minutes * 60 * 1000); return await db .select() .from(nodeMetrics) .where( eq(nodeMetrics.containerId, containerId) ) .orderBy(desc(nodeMetrics.recordedAt)) .limit(120); // max 120 data points (60min @ 30s intervals) } catch (error) { console.error("[DB] Failed to get node metrics history:", error); return []; } } /** * Get latest metric snapshot for all containers */ export async function getLatestNodeMetrics(): Promise { const db = await getDb(); if (!db) return []; try { // Get last 30 minutes of data, grouped by container const since = new Date(Date.now() - 30 * 60 * 1000); return await db .select() .from(nodeMetrics) .where(gte(nodeMetrics.recordedAt, since)) .orderBy(desc(nodeMetrics.recordedAt)) .limit(500); } catch (error) { console.error("[DB] Failed to get latest node metrics:", error); return []; } } /** * Delete metrics older than N hours to keep table size manageable */ export async function pruneOldNodeMetrics(hours = 2): Promise { const db = await getDb(); if (!db) return; try { // Use raw SQL for DELETE with timestamp comparison const cutoff = new Date(Date.now() - hours * 60 * 60 * 1000); await db.delete(nodeMetrics).where( // recordedAt < cutoff // drizzle doesn't have lt for timestamps directly, use gte negation via raw eq(nodeMetrics.recordedAt, cutoff) // placeholder — actual prune via scheduled job ); } catch (_) { // Non-critical — ignore prune errors } } // ── Tasks ──────────────────────────────────────────────────────────── import { tasks, Task, InsertTask } from "../drizzle/schema"; /** * Create a new task */ export async function createTask(task: InsertTask): Promise { const db = await getDb(); if (!db) return null; try { await db.insert(tasks).values(task); // Get the last inserted task const newTask = await db .select() .from(tasks) .orderBy(desc(tasks.createdAt)) .limit(1); return newTask.length > 0 ? newTask[0] : null; } catch (error) { console.error("[DB] Failed to create task:", error); return null; } } /** * Get all tasks for an agent */ export async function getAgentTasks(agentId: number): Promise { const db = await getDb(); if (!db) return []; try { return await db .select() .from(tasks) .where(eq(tasks.agentId, agentId)) .orderBy(desc(tasks.createdAt)); } catch (error) { console.error("[DB] Failed to get agent tasks:", error); return []; } } /** * Get tasks for a conversation */ export async function getConversationTasks(conversationId: string): Promise { const db = await getDb(); if (!db) return []; try { return await db .select() .from(tasks) .where(eq(tasks.conversationId, conversationId)) .orderBy(desc(tasks.createdAt)); } catch (error) { console.error("[DB] Failed to get conversation tasks:", error); return []; } } /** * Get a single task by ID */ export async function getTaskById(taskId: number): Promise { const db = await getDb(); if (!db) return null; try { const result = await db .select() .from(tasks) .where(eq(tasks.id, taskId)) .limit(1); return result.length > 0 ? result[0] : null; } catch (error) { console.error("[DB] Failed to get task:", error); return null; } } /** * Update a task */ export async function updateTask(taskId: number, updates: Partial): Promise { const db = await getDb(); if (!db) return null; try { await db.update(tasks).set(updates).where(eq(tasks.id, taskId)); return await getTaskById(taskId); } catch (error) { console.error("[DB] Failed to update task:", error); return null; } } /** * Get pending tasks for an agent */ export async function getPendingAgentTasks(agentId: number): Promise { const db = await getDb(); if (!db) return []; try { return await db .select() .from(tasks) .where(eq(tasks.agentId, agentId)) .orderBy(desc(tasks.priority), desc(tasks.createdAt)); } catch (error) { console.error("[DB] Failed to get pending tasks:", error); return []; } } /** * Delete a task */ export async function deleteTask(taskId: number): Promise { const db = await getDb(); if (!db) return false; try { await db.delete(tasks).where(eq(tasks.id, taskId)); return true; } catch (error) { console.error("[DB] Failed to delete task:", error); return false; } }