refactor: add websockets requests logs

This commit is contained in:
Mauricio Siu
2024-08-25 17:43:00 -06:00
parent 1250949c05
commit 179de344c2
4 changed files with 180 additions and 10 deletions

View File

@@ -10,7 +10,7 @@ import {
type VisibilityState,
flexRender,
} from "@tanstack/react-table";
import { useMemo, useState } from "react";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import type { LogEntry } from "./show-requests";
import { Badge } from "@/components/ui/badge";
import {
@@ -78,6 +78,10 @@ export const priorities = [
},
];
export const RequestsTable = () => {
const [statsLogs, setStatsLogs] = useState<{
data: LogEntry[];
totalCount: number;
}>();
const [statusFilter, setStatusFilter] = useState<string[]>([]);
const [search, setSearch] = useState("");
const [selectedRow, setSelectedRow] = useState<LogEntry>();
@@ -90,12 +94,93 @@ export const RequestsTable = () => {
pageSize: 10,
});
const { data: statsLogs, isLoading } = api.settings.readStatsLogs.useQuery({
sort: sorting[0],
page: pagination,
search,
status: statusFilter,
});
// const { data: statsLogs, isLoading } = api.settings.readStatsLogs.useQuery({
// sort: sorting[0],
// page: pagination,
// search,
// status: statusFilter,
// });
// useEffect(() => {
// const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
// const wsUrl = `${protocol}//${window.location.host}/request-logs`;
// const ws = new WebSocket(wsUrl);
// ws.onopen = () => {
// // Enviar parámetros iniciales
// ws.send(
// JSON.stringify({
// page: pagination,
// sort: sorting[0],
// search: search,
// status: statusFilter,
// }),
// );
// };
// ws.onmessage = (event) => {
// const data = JSON.parse(event.data);
// setStatsLogs(data);
// };
// // return () => ws.close();
// }, [pagination, search, sorting, statusFilter]);
const wsRef = useRef<WebSocket | null>(null);
const isInitialMount = useRef(true);
const sendParams = useCallback(() => {
if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) {
wsRef.current.send(
JSON.stringify({
page: pagination,
sort: sorting[0],
search: search,
status: statusFilter,
}),
);
} else {
console.log("WebSocket not ready, retrying in 100ms");
setTimeout(sendParams, 100); // Retry after 100ms
}
}, [pagination, sorting, search, statusFilter]);
useEffect(() => {
const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
const wsUrl = `${protocol}//${window.location.host}/request-logs`;
const newWs = new WebSocket(wsUrl);
newWs.onopen = () => {
console.log("WebSocket connection established");
wsRef.current = newWs;
sendParams(); // Send initial params as soon as the connection is open
};
newWs.onmessage = (event) => {
const data = JSON.parse(event.data);
setStatsLogs(data);
};
newWs.onerror = (error) => {
console.error("WebSocket error:", error);
};
newWs.onclose = () => {
console.log("WebSocket connection closed");
};
return () => {
newWs.close();
};
}, []);
useEffect(() => {
if (isInitialMount.current) {
isInitialMount.current = false;
} else {
sendParams();
}
}, [sendParams]);
const pageCount = useMemo(() => {
if (statsLogs?.totalCount) {
@@ -240,7 +325,7 @@ export const RequestsTable = () => {
colSpan={columns.length}
className="h-24 text-center"
>
{isLoading ? (
{/* {isLoading ? (
<div className="w-full flex-col gap-2 flex items-center justify-center h-[55vh]">
<span className="text-muted-foreground text-lg font-medium">
Loading...
@@ -248,7 +333,7 @@ export const RequestsTable = () => {
</div>
) : (
<>No results.</>
)}
)} */}
</TableCell>
</TableRow>
)}

View File

@@ -53,6 +53,11 @@ import { canAccessToTraefikFiles } from "../services/user";
import { adminProcedure, createTRPCRouter, protectedProcedure } from "../trpc";
import { parseRawConfig, processLogs } from "@/server/utils/access-log/utils";
import { logRotationManager } from "@/server/utils/access-log/handler";
import { observable } from "@trpc/server/observable";
import type { LogEntry } from "@/server/utils/access-log/types";
import EventEmitter from "node:events";
const ee = new EventEmitter();
export const settingsRouter = createTRPCRouter({
reloadServer: adminProcedure.mutation(async () => {

View File

@@ -23,6 +23,7 @@ import {
getPublicIpWithFallback,
setupTerminalWebSocketServer,
} from "./wss/terminal";
import { setupRequestLogsWebSocketServer } from "./wss/requests-logs";
config({ path: ".env" });
const PORT = Number.parseInt(process.env.PORT || "3000", 10);
@@ -39,7 +40,7 @@ void app.prepare().then(async () => {
setupDeploymentLogsWebSocketServer(server);
setupDockerContainerLogsWebSocketServer(server);
setupDockerContainerTerminalWebSocketServer(server);
// setupTraefikLogsWebSocketServer(server);
setupRequestLogsWebSocketServer(server);
setupTerminalWebSocketServer(server);
setupDockerStatsMonitoringSocketServer(server);

View File

@@ -0,0 +1,79 @@
import type http from "node:http";
import { WebSocketServer } from "ws";
import { validateWebSocketRequest } from "../auth/auth";
import { readMonitoringConfig } from "../utils/traefik/application";
import { parseRawConfig } from "../utils/access-log/utils";
import { apiReadStatsLogs } from "../db/schema";
import fs from "node:fs";
import path from "node:path";
import { DYNAMIC_TRAEFIK_PATH } from "../constants";
export const setupRequestLogsWebSocketServer = (
server: http.Server<typeof http.IncomingMessage, typeof http.ServerResponse>,
) => {
const wssTerm = new WebSocketServer({
noServer: true,
path: "/request-logs",
});
server.on("upgrade", (req, socket, head) => {
const { pathname } = new URL(req.url || "", `http://${req.headers.host}`);
if (pathname === "/_next/webpack-hmr") {
return;
}
if (pathname === "/request-logs") {
wssTerm.handleUpgrade(req, socket, head, function done(ws) {
wssTerm.emit("connection", ws, req);
});
}
});
const broadcastUpdate = (input: any) => {
const rawConfig = readMonitoringConfig();
const parsedConfig = parseRawConfig(
rawConfig as string,
input.page,
input.sort,
input.search,
input.status,
);
return parsedConfig;
};
const configPath = path.join(DYNAMIC_TRAEFIK_PATH, "access.log");
// eslint-disable-next-line @typescript-eslint/no-misused-promises
wssTerm.on("connection", async (ws, req) => {
const { user, session } = await validateWebSocketRequest(req);
if (!user || !session) {
ws.close();
return;
}
try {
ws.on("message", (message: string) => {
try {
const input = apiReadStatsLogs.parse(JSON.parse(message));
const parsedConfig = broadcastUpdate(input);
ws.send(JSON.stringify(parsedConfig));
fs.watch(configPath, (eventType) => {
if (eventType === "change") {
const parsedConfig = broadcastUpdate(input);
ws.send(JSON.stringify(parsedConfig));
}
});
} catch (error) {
ws.send(JSON.stringify({ error: "Invalid message format" }));
}
});
} catch (error) {
const errorMessage = (error as Error)?.message;
console.log(errorMessage);
ws.send(errorMessage);
}
});
};