640 lines
16 KiB
TypeScript
640 lines
16 KiB
TypeScript
import { eq, and, desc, gte } from "drizzle-orm";
|
|
import {
|
|
agents,
|
|
agentMetrics,
|
|
agentHistory,
|
|
agentAccessControl,
|
|
type Agent,
|
|
type InsertAgent,
|
|
type AgentMetric,
|
|
type InsertAgentMetric,
|
|
} from "../drizzle/schema";
|
|
import { getDb } from "./db";
|
|
import { nanoid } from "nanoid";
|
|
import { exec } from "child_process";
|
|
import { promisify } from "util";
|
|
|
|
const execAsync = promisify(exec);
|
|
|
|
/**
|
|
* Создать нового агента
|
|
*/
|
|
export async function createAgent(
|
|
userId: number,
|
|
data: InsertAgent
|
|
): Promise<Agent | null> {
|
|
const db = await getDb();
|
|
if (!db) return null;
|
|
|
|
try {
|
|
const result = await db.insert(agents).values({
|
|
...data,
|
|
userId,
|
|
});
|
|
|
|
const agentId = result[0].insertId;
|
|
const created = await db
|
|
.select()
|
|
.from(agents)
|
|
.where(eq(agents.id, Number(agentId)))
|
|
.limit(1);
|
|
return created[0] || null;
|
|
} catch (error) {
|
|
console.error("[DB] Failed to create agent:", error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Получить агента по ID
|
|
*/
|
|
export async function getAgentById(agentId: number): Promise<Agent | null> {
|
|
const db = await getDb();
|
|
if (!db) return null;
|
|
|
|
try {
|
|
const result = await db
|
|
.select()
|
|
.from(agents)
|
|
.where(eq(agents.id, agentId))
|
|
.limit(1);
|
|
return result[0] || null;
|
|
} catch (error) {
|
|
console.error("[DB] Failed to get agent:", error);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Получить все агенты пользователя
|
|
*/
|
|
export async function getUserAgents(userId: number): Promise<Agent[]> {
|
|
const db = await getDb();
|
|
if (!db) return [];
|
|
|
|
try {
|
|
return await db.select().from(agents).where(eq(agents.userId, userId));
|
|
} catch (error) {
|
|
console.error("[DB] Failed to get user agents:", error);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Получить все агенты (системные + пользовательские)
|
|
* Используется для страницы /agents в Control Center
|
|
*/
|
|
export async function getAllAgents(): Promise<Agent[]> {
|
|
const db = await getDb();
|
|
if (!db) return [];
|
|
|
|
try {
|
|
return await db.select().from(agents).orderBy(agents.id);
|
|
} catch (error) {
|
|
console.error("[DB] Failed to get all agents:", error);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Получить только системные агенты (isSystem=true)
|
|
*/
|
|
export async function getSystemAgents(): Promise<Agent[]> {
|
|
const db = await getDb();
|
|
if (!db) return [];
|
|
|
|
try {
|
|
return await db
|
|
.select()
|
|
.from(agents)
|
|
.where(eq(agents.isSystem, true))
|
|
.orderBy(agents.id);
|
|
} catch (error) {
|
|
console.error("[DB] Failed to get system agents:", error);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Обновить конфигурацию агента
|
|
*/
|
|
export async function updateAgent(
|
|
agentId: number,
|
|
updates: Partial<InsertAgent>
|
|
): Promise<Agent | null> {
|
|
const db = await getDb();
|
|
if (!db) return null;
|
|
|
|
try {
|
|
await db.update(agents).set(updates).where(eq(agents.id, agentId));
|
|
return getAgentById(agentId);
|
|
} catch (error) {
|
|
console.error("[DB] Failed to update agent:", error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Удалить агента
|
|
*/
|
|
export async function deleteAgent(agentId: number): Promise<boolean> {
|
|
const db = await getDb();
|
|
if (!db) return false;
|
|
|
|
try {
|
|
await db.delete(agents).where(eq(agents.id, agentId));
|
|
return true;
|
|
} catch (error) {
|
|
console.error("[DB] Failed to delete agent:", error);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Сохранить метрику запроса
|
|
*/
|
|
export async function saveMetric(
|
|
agentId: number,
|
|
data: Omit<InsertAgentMetric, "agentId">
|
|
): Promise<AgentMetric | null> {
|
|
const db = await getDb();
|
|
if (!db) return null;
|
|
|
|
try {
|
|
const requestId = nanoid();
|
|
const result = await db.insert(agentMetrics).values({
|
|
...data,
|
|
agentId,
|
|
requestId,
|
|
});
|
|
|
|
const metricId = result[0].insertId;
|
|
const created = await db
|
|
.select()
|
|
.from(agentMetrics)
|
|
.where(eq(agentMetrics.id, Number(metricId)))
|
|
.limit(1);
|
|
return created[0] || null;
|
|
} catch (error) {
|
|
console.error("[DB] Failed to save metric:", error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Получить метрики агента за последние N часов
|
|
*/
|
|
export async function getAgentMetrics(
|
|
agentId: number,
|
|
hoursBack: number = 24
|
|
): Promise<AgentMetric[]> {
|
|
const db = await getDb();
|
|
if (!db) return [];
|
|
|
|
try {
|
|
const since = new Date(Date.now() - hoursBack * 60 * 60 * 1000);
|
|
return await db
|
|
.select()
|
|
.from(agentMetrics)
|
|
.where(
|
|
and(
|
|
eq(agentMetrics.agentId, agentId),
|
|
gte(agentMetrics.createdAt, since)
|
|
)
|
|
)
|
|
.orderBy(desc(agentMetrics.createdAt));
|
|
} catch (error) {
|
|
console.error("[DB] Failed to get agent metrics:", error);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Получить статистику агента
|
|
*/
|
|
export async function getAgentStats(agentId: number, hoursBack: number = 24) {
|
|
const db = await getDb();
|
|
if (!db) return null;
|
|
|
|
try {
|
|
const metrics = await getAgentMetrics(agentId, hoursBack);
|
|
|
|
const totalRequests = metrics.length;
|
|
const successRequests = metrics.filter(m => m.status === "success").length;
|
|
const errorRequests = metrics.filter(m => m.status === "error").length;
|
|
const avgProcessingTime =
|
|
metrics.length > 0
|
|
? metrics.reduce((sum, m) => sum + m.processingTimeMs, 0) /
|
|
metrics.length
|
|
: 0;
|
|
const totalTokens = metrics.reduce(
|
|
(sum, m) => sum + (m.totalTokens || 0),
|
|
0
|
|
);
|
|
const avgTokensPerRequest =
|
|
metrics.length > 0 ? totalTokens / metrics.length : 0;
|
|
|
|
return {
|
|
totalRequests,
|
|
successRequests,
|
|
errorRequests,
|
|
successRate:
|
|
totalRequests > 0 ? (successRequests / totalRequests) * 100 : 0,
|
|
avgProcessingTime: Math.round(avgProcessingTime),
|
|
totalTokens,
|
|
avgTokensPerRequest: Math.round(avgTokensPerRequest),
|
|
period: `${hoursBack}h`,
|
|
};
|
|
} catch (error) {
|
|
console.error("[DB] Failed to get agent stats:", error);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Получить историю запросов агента
|
|
*/
|
|
export async function getAgentHistory(agentId: number, limit: number = 50) {
|
|
const db = await getDb();
|
|
if (!db) return [];
|
|
|
|
try {
|
|
return await db
|
|
.select()
|
|
.from(agentHistory)
|
|
.where(eq(agentHistory.agentId, agentId))
|
|
.orderBy(desc(agentHistory.createdAt))
|
|
.limit(limit);
|
|
} catch (error) {
|
|
console.error("[DB] Failed to get agent history:", error);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Получить управление доступами для агента
|
|
*/
|
|
export async function getAgentAccessControl(agentId: number) {
|
|
const db = await getDb();
|
|
if (!db) return [];
|
|
|
|
try {
|
|
return await db
|
|
.select()
|
|
.from(agentAccessControl)
|
|
.where(eq(agentAccessControl.agentId, agentId));
|
|
} catch (error) {
|
|
console.error("[DB] Failed to get agent access control:", error);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Обновить управление доступами для инструмента
|
|
*/
|
|
export async function updateToolAccess(
|
|
agentId: number,
|
|
tool: string,
|
|
updates: Partial<typeof agentAccessControl.$inferInsert>
|
|
) {
|
|
const db = await getDb();
|
|
if (!db) return null;
|
|
|
|
try {
|
|
const existing = await db
|
|
.select()
|
|
.from(agentAccessControl)
|
|
.where(
|
|
and(
|
|
eq(agentAccessControl.agentId, agentId),
|
|
eq(agentAccessControl.tool, tool)
|
|
)
|
|
)
|
|
.limit(1);
|
|
|
|
if (existing.length > 0) {
|
|
await db
|
|
.update(agentAccessControl)
|
|
.set(updates)
|
|
.where(
|
|
and(
|
|
eq(agentAccessControl.agentId, agentId),
|
|
eq(agentAccessControl.tool, tool)
|
|
)
|
|
);
|
|
} else {
|
|
await db.insert(agentAccessControl).values({
|
|
agentId,
|
|
tool,
|
|
...updates,
|
|
});
|
|
}
|
|
|
|
const result = await db
|
|
.select()
|
|
.from(agentAccessControl)
|
|
.where(
|
|
and(
|
|
eq(agentAccessControl.agentId, agentId),
|
|
eq(agentAccessControl.tool, tool)
|
|
)
|
|
)
|
|
.limit(1);
|
|
|
|
return result[0] || null;
|
|
} catch (error) {
|
|
console.error("[DB] Failed to update tool access:", error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Сохранить запись в историю агента
|
|
*/
|
|
export async function saveHistory(
|
|
agentId: number,
|
|
data: {
|
|
userMessage: string;
|
|
agentResponse: string | null;
|
|
conversationId?: string;
|
|
status: "pending" | "success" | "error";
|
|
}
|
|
) {
|
|
const db = await getDb();
|
|
if (!db) return null;
|
|
|
|
try {
|
|
const result = await db.insert(agentHistory).values({
|
|
agentId,
|
|
userMessage: data.userMessage,
|
|
agentResponse: data.agentResponse ?? undefined,
|
|
conversationId: data.conversationId,
|
|
status: data.status,
|
|
});
|
|
return result[0];
|
|
} catch (error) {
|
|
console.error("[DB] Failed to save history:", error);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
// ─── Container Management (Docker) ─────────────────────────────────────────
|
|
|
|
/**
|
|
* Deploy an agent as a Docker container.
|
|
* Each agent runs in its own container on the goclaw-net bridge network.
|
|
*/
|
|
export async function deployAgentContainer(agentId: number): Promise<{
|
|
success: boolean;
|
|
serviceName: string;
|
|
servicePort: number;
|
|
error?: string;
|
|
}> {
|
|
const db = await getDb();
|
|
if (!db)
|
|
return {
|
|
success: false,
|
|
serviceName: "",
|
|
servicePort: 0,
|
|
error: "DB not available",
|
|
};
|
|
|
|
const agent = await getAgentById(agentId);
|
|
if (!agent)
|
|
return {
|
|
success: false,
|
|
serviceName: "",
|
|
servicePort: 0,
|
|
error: "Agent not found",
|
|
};
|
|
|
|
const containerName = `goclaw-agent-${agentId}`;
|
|
const servicePort = 8100 + ((agentId - 1) % 900); // Ports 8100-8999
|
|
const containerImage =
|
|
(agent as any).containerImage || "goclaw-agent-worker:latest";
|
|
|
|
try {
|
|
// Remove existing container if any (stop + rm)
|
|
await execAsync(`docker rm -f ${containerName} 2>/dev/null || true`, {
|
|
timeout: 10000,
|
|
});
|
|
|
|
// Update status to deploying
|
|
await db
|
|
.update(agents)
|
|
.set({
|
|
containerStatus: "deploying",
|
|
serviceName: containerName,
|
|
servicePort,
|
|
} as any)
|
|
.where(eq(agents.id, agentId));
|
|
|
|
// Agent-worker uses Go format: user:pass@tcp(host:port)/dbname?parseTime=true
|
|
// Control-center DATABASE_URL is Node.js format: mysql://user:pass@host:port/dbname
|
|
// Convert if necessary, or use AGENT_DATABASE_URL env var
|
|
const dbUrlRaw =
|
|
process.env.AGENT_DATABASE_URL ||
|
|
process.env.DATABASE_URL ||
|
|
"mysql://goclaw:goClawPass123@db:3306/goclaw";
|
|
let dbUrl: string;
|
|
if (dbUrlRaw.startsWith("mysql://")) {
|
|
// Convert Node.js format: mysql://user:pass@host:port/dbname → user:pass@tcp(host:port)/dbname?parseTime=true
|
|
const match = dbUrlRaw.match(
|
|
/^mysql:\/\/([^:]+):([^@]+)@([^:]+):(\d+)\/(.+)$/
|
|
);
|
|
if (match) {
|
|
const [, user, pass, host, port, dbname] = match;
|
|
dbUrl = `${user}:${pass}@tcp(${host}:${port})/${dbname}?parseTime=true`;
|
|
} else {
|
|
dbUrl = "goclaw:goClawPass123@tcp(db:3306)/goclaw?parseTime=true";
|
|
}
|
|
} else {
|
|
dbUrl = dbUrlRaw; // Already in Go format
|
|
}
|
|
const llmBaseUrl = process.env.LLM_BASE_URL || "https://ollama.com/v1";
|
|
const llmApiKey = process.env.LLM_API_KEY || "";
|
|
const defaultModel = (agent as any).model || "qwen2.5:7b";
|
|
|
|
const cmd = [
|
|
"docker run -d",
|
|
`--name ${containerName}`,
|
|
`--network goclaw_goclaw-net`,
|
|
`-p ${servicePort}:${servicePort}`,
|
|
`-e AGENT_ID=${agentId}`,
|
|
`-e AGENT_PORT=${servicePort}`,
|
|
`-e "DATABASE_URL=${dbUrl}"`,
|
|
`-e "LLM_BASE_URL=${llmBaseUrl}"`,
|
|
`-e "LLM_API_KEY=${llmApiKey}"`,
|
|
`-e "DEFAULT_MODEL=${defaultModel}"`,
|
|
`--restart unless-stopped`,
|
|
containerImage,
|
|
].join(" \\\n ");
|
|
|
|
console.log(
|
|
`[Container] Deploying agent ${agentId}: ${containerName} on port ${servicePort}`
|
|
);
|
|
|
|
const { stdout, stderr } = await execAsync(cmd, { timeout: 30000 });
|
|
|
|
console.log(
|
|
`[Container] Agent ${agentId} container created: ${stdout.trim()}`
|
|
);
|
|
|
|
// Wait briefly and verify the container is running
|
|
await new Promise(r => setTimeout(r, 2000));
|
|
const { stdout: inspectOut } = await execAsync(
|
|
`docker inspect ${containerName} --format '{{.State.Status}}'`,
|
|
{ timeout: 5000 }
|
|
);
|
|
|
|
const actualStatus = inspectOut.trim();
|
|
if (actualStatus !== "running") {
|
|
const { stdout: logs } = await execAsync(
|
|
`docker logs ${containerName} --tail 20 2>&1`,
|
|
{ timeout: 5000 }
|
|
);
|
|
console.error(
|
|
`[Container] Agent ${agentId} not running. Status: ${actualStatus}`
|
|
);
|
|
console.error(`[Container] Logs:\n${logs}`);
|
|
|
|
await db
|
|
.update(agents)
|
|
.set({ containerStatus: "error" } as any)
|
|
.where(eq(agents.id, agentId));
|
|
|
|
return {
|
|
success: false,
|
|
serviceName: containerName,
|
|
servicePort: 0,
|
|
error: `Container status: ${actualStatus}`,
|
|
};
|
|
}
|
|
|
|
// Update status to running
|
|
await db
|
|
.update(agents)
|
|
.set({
|
|
containerStatus: "running",
|
|
serviceName: containerName,
|
|
servicePort,
|
|
} as any)
|
|
.where(eq(agents.id, agentId));
|
|
|
|
console.log(`[Container] Agent ${agentId} deployed: ${containerName}`);
|
|
return { success: true, serviceName: containerName, servicePort };
|
|
} catch (error: any) {
|
|
console.error(
|
|
`[Container] Failed to deploy agent ${agentId}:`,
|
|
error.message
|
|
);
|
|
|
|
// Update status to error
|
|
await db
|
|
.update(agents)
|
|
.set({
|
|
containerStatus: "error",
|
|
} as any)
|
|
.where(eq(agents.id, agentId));
|
|
|
|
return {
|
|
success: false,
|
|
serviceName: containerName,
|
|
servicePort: 0,
|
|
error: error.message,
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Stop and remove an agent's Docker container.
|
|
*/
|
|
export async function stopAgentContainer(
|
|
agentId: number
|
|
): Promise<{ success: boolean; error?: string }> {
|
|
const db = await getDb();
|
|
if (!db) return { success: false, error: "DB not available" };
|
|
|
|
const agent = await getAgentById(agentId);
|
|
if (!agent) return { success: false, error: "Agent not found" };
|
|
|
|
const containerName = (agent as any).serviceName || `goclaw-agent-${agentId}`;
|
|
|
|
try {
|
|
console.log(`[Container] Stopping agent ${agentId}: ${containerName}`);
|
|
await execAsync(`docker rm -f ${containerName}`, {
|
|
timeout: 15000,
|
|
}).catch(() => {
|
|
console.log(
|
|
`[Container] Container ${containerName} not found (already removed)`
|
|
);
|
|
});
|
|
|
|
// Update status to stopped
|
|
await db
|
|
.update(agents)
|
|
.set({
|
|
containerStatus: "stopped",
|
|
serviceName: null,
|
|
servicePort: null,
|
|
} as any)
|
|
.where(eq(agents.id, agentId));
|
|
|
|
console.log(`[Container] Agent ${agentId} stopped: ${containerName}`);
|
|
return { success: true };
|
|
} catch (error: any) {
|
|
console.error(
|
|
`[Container] Failed to stop agent ${agentId}:`,
|
|
error.message
|
|
);
|
|
|
|
// Still mark as stopped in DB even if Docker failed
|
|
await db
|
|
.update(agents)
|
|
.set({
|
|
containerStatus: "stopped",
|
|
} as any)
|
|
.where(eq(agents.id, agentId));
|
|
|
|
return { success: false, error: error.message };
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get the status of an agent's container.
|
|
* Checks Docker container state and returns current status.
|
|
*/
|
|
export async function getAgentContainerStatus(
|
|
agentId: number
|
|
): Promise<{ status: string; serviceName?: string; servicePort?: number }> {
|
|
const db = await getDb();
|
|
if (!db) return { status: "unknown" };
|
|
|
|
const agent = await getAgentById(agentId);
|
|
if (!agent) return { status: "unknown" };
|
|
|
|
const containerStatus = (agent as any).containerStatus || "stopped";
|
|
const serviceName = (agent as any).serviceName || "";
|
|
const servicePort = (agent as any).servicePort || 0;
|
|
|
|
// Verify actual Docker container state
|
|
if (serviceName) {
|
|
try {
|
|
const { stdout } = await execAsync(
|
|
`docker inspect ${serviceName} --format '{{.State.Status}}' 2>/dev/null`,
|
|
{ timeout: 5000 }
|
|
);
|
|
const state = stdout.trim();
|
|
if (state === "running") {
|
|
return { status: "running", serviceName, servicePort };
|
|
}
|
|
return { status: state || "stopped", serviceName, servicePort };
|
|
} catch {
|
|
// Container doesn't exist
|
|
return { status: "stopped" };
|
|
}
|
|
}
|
|
|
|
return { status: containerStatus, serviceName, servicePort };
|
|
}
|