feat(schedules): add schedules server

This commit is contained in:
Mauricio Siu
2024-10-05 22:11:38 -06:00
parent 651bf3a303
commit 43555cdabe
19 changed files with 584 additions and 122 deletions

View File

@@ -8,6 +8,8 @@
"start": "node --experimental-specifier-resolution=node dist/index.js"
},
"dependencies": {
"pino": "9.4.0",
"pino-pretty": "11.2.2",
"@hono/zod-validator": "0.3.0",
"zod": "^3.23.4",
"react": "18.2.0",
@@ -16,7 +18,6 @@
"@hono/node-server": "^1.12.1",
"hono": "^4.5.8",
"dotenv": "^16.3.1",
"@upstash/qstash": "2.7.9",
"redis": "4.7.0",
"@nerimity/mimiqueue": "1.2.3"
},

View File

@@ -6,6 +6,7 @@ import { Queue } from "@nerimity/mimiqueue";
import { zValidator } from "@hono/zod-validator";
import { type DeployJob, deployJobSchema } from "./schema";
import { deploy } from "./utils";
import { logger } from "./logger";
const app = new Hono();
const redisClient = createClient({
@@ -14,12 +15,10 @@ const redisClient = createClient({
app.post("/deploy", zValidator("json", deployJobSchema), (c) => {
const data = c.req.valid("json");
queue.add(data, { groupName: data.serverId }).then((res) => {
console.log(res);
});
const res = queue.add(data, { groupName: data.serverId });
return c.json(
{
message: "Deployment started",
message: "Deployment Added",
},
200,
);
@@ -32,17 +31,18 @@ app.get("/health", async (c) => {
const queue = new Queue({
name: "deployments",
process: async (job: DeployJob) => {
console.log(job);
logger.info("Deploying job", job);
return await deploy(job);
},
redisClient,
});
const port = Number.parseInt(process.env.PORT || "3000");
(async () => {
await redisClient.connect();
await redisClient.flushAll();
logger.info("Cleaning Redis");
})();
console.log("Starting Server ✅", port);
const port = Number.parseInt(process.env.PORT || "3000");
logger.info("Starting Deployments Server ✅", port);
serve({ fetch: app.fetch, port });

10
apps/api/src/logger.ts Normal file
View File

@@ -0,0 +1,10 @@
import pino from "pino";
export const logger = pino({
transport: {
target: "pino-pretty",
options: {
colorize: true,
},
},
});

View File

@@ -1,82 +0,0 @@
// import { Hono } from "hono";
// import { Client } from "@upstash/qstash";
// import { serve } from "@hono/node-server";
// import dotenv from "dotenv";
// import Redis from "ioredis";
// dotenv.config();
// const redis = new Redis({
// host: "localhost",
// port: 7777,
// password: "xlfvpQ0ma2BkkkPX",
// });
// // redis.set("test", "test");
// // console.log(await redis.get("test"));
// // console.log(await redis.get("user-1-processing"));
// const app = new Hono();
// console.log("QStash Token:", process.env.PUBLIC_URL);
// const qstash = new Client({
// token: process.env.QSTASH_TOKEN as string,
// });
// const queue = qstash.queue({
// queueName: "deployments",
// });
// // Endpoint que publica un mensaje en QStash
// app.post("/enqueue", async (c) => {
// const { userId, deploymentId } = await c.req.json();
// const response = await qstash.publishJSON({
// url: `${process.env.PUBLIC_URL}/process`, // Endpoint para procesar la tarea
// body: { userId, deploymentId }, // Datos del despliegue
// });
// return c.json({ message: "Task enqueued", id: response.messageId });
// });
// // Endpoint que recibe el mensaje procesado
// app.post("/process", async (c) => {
// const { userId, deploymentId } = await c.req.json();
// const isProcessing = await redis.get(`user-${userId}-processing`);
// console.log(`isProcessing for user ${userId}:`, isProcessing);
// if (isProcessing === "true") {
// console.log(
// `User ${userId} is already processing a deployment. Queuing the next one.`,
// );
// return c.json(
// {
// status: "User is already processing a deployment, waiting...",
// },
// {
// status: 400,
// },
// );
// }
// redis.set(`user-${userId}-processing`, "true");
// try {
// await new Promise((resolve) => setTimeout(resolve, 5000));
// } catch (error) {
// } finally {
// await redis.del(`user-${userId}-processing`);
// }
// return c.json({ status: "Processed", userId, deploymentId });
// });
// // Inicia el servidor en el puerto 3000
// const port = 3000;
// console.log(`Server is running on port http://localhost:${port}`);
// serve({
// fetch: app.fetch,
// port,
// });
// // 18

View File

@@ -5,6 +5,7 @@ import {
apiRemoveBackup,
apiUpdateBackup,
} from "@/server/db/schema";
import { removeJob, schedule } from "@/server/utils/backup";
import {
createBackup,
findBackupById,
@@ -20,6 +21,7 @@ import {
findMongoByBackupId,
findMySqlByBackupId,
findPostgresByBackupId,
IS_CLOUD,
} from "@dokploy/builders";
import { TRPCError } from "@trpc/server";
@@ -33,8 +35,16 @@ export const backupRouter = createTRPCRouter({
const backup = await findBackupById(newBackup.backupId);
if (backup.enabled) {
scheduleBackup(backup);
if (IS_CLOUD && backup.enabled) {
await schedule({
cronSchedule: backup.schedule,
backupId: backup.backupId,
type: "backup",
});
} else {
if (backup.enabled) {
scheduleBackup(backup);
}
}
} catch (error) {
throw new TRPCError({
@@ -55,11 +65,19 @@ export const backupRouter = createTRPCRouter({
await updateBackupById(input.backupId, input);
const backup = await findBackupById(input.backupId);
if (backup.enabled) {
removeScheduleBackup(input.backupId);
scheduleBackup(backup);
if (IS_CLOUD && backup.enabled) {
await schedule({
cronSchedule: backup.schedule,
backupId: backup.backupId,
type: "backup",
});
} else {
removeScheduleBackup(input.backupId);
if (backup.enabled) {
removeScheduleBackup(input.backupId);
scheduleBackup(backup);
} else {
removeScheduleBackup(input.backupId);
}
}
} catch (error) {
throw new TRPCError({
@@ -73,7 +91,15 @@ export const backupRouter = createTRPCRouter({
.mutation(async ({ input }) => {
try {
const value = await removeBackupById(input.backupId);
removeScheduleBackup(input.backupId);
if (IS_CLOUD && value) {
removeJob({
backupId: input.backupId,
cronSchedule: value.schedule,
type: "backup",
});
} else {
removeScheduleBackup(input.backupId);
}
return value;
} catch (error) {
throw new TRPCError({

View File

@@ -0,0 +1,46 @@
type QueueJob =
| {
type: "backup";
cronSchedule: string;
backupId: string;
}
| {
type: "server";
cronSchedule: string;
serverId: string;
};
export const schedule = async (job: QueueJob) => {
try {
const result = await fetch(`${process.env.JOBS_URL}/create-backup`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(job),
});
const data = await result.json();
console.log(data);
return data;
} catch (error) {
console.log(error);
throw error;
}
};
export const removeJob = async (job: QueueJob) => {
try {
const result = await fetch(`${process.env.JOBS_URL}/remove-job`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(job),
});
const data = await result.json();
console.log(data);
return data;
} catch (error) {
console.log(error);
throw error;
}
};

View File

@@ -33,8 +33,6 @@ export const setupDockerContainerLogsWebSocketServer = (
const tail = url.searchParams.get("tail");
const serverId = url.searchParams.get("serverId");
const { user, session } = await validateWebSocketRequest(req);
console.log(wssTerm.clients);
console.log(wssTerm.clients.size);
if (!containerId) {
ws.close(4000, "containerId no provided");

28
apps/schedules/.gitignore vendored Normal file
View File

@@ -0,0 +1,28 @@
# dev
.yarn/
!.yarn/releases
.vscode/*
!.vscode/launch.json
!.vscode/*.code-snippets
.idea/workspace.xml
.idea/usage.statistics.xml
.idea/shelf
# deps
node_modules/
# env
.env
.env.production
# logs
logs/
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
pnpm-debug.log*
lerna-debug.log*
# misc
.DS_Store

8
apps/schedules/README.md Normal file
View File

@@ -0,0 +1,8 @@
```
npm install
npm run dev
```
```
open http://localhost:3000
```

View File

@@ -0,0 +1,30 @@
{
"name": "@dokploy/schedules",
"scripts": {
"dev": "PORT=4001 tsx watch src/index.ts",
"build": "tsc --project tsconfig.json",
"start": "node --experimental-specifier-resolution=node dist/index.js"
},
"dependencies": {
"bullmq": "5.4.2",
"@hono/zod-validator": "0.3.0",
"zod": "^3.23.4",
"react": "18.2.0",
"react-dom": "18.2.0",
"@dokploy/builders": "workspace:*",
"@hono/node-server": "^1.12.1",
"hono": "^4.5.8",
"dotenv": "^16.3.1",
"pino": "9.4.0",
"pino-pretty": "11.2.2",
"redis": "4.7.0"
},
"devDependencies": {
"typescript": "^5.4.2",
"@types/react": "^18.2.37",
"@types/react-dom": "^18.2.15",
"@types/node": "^20.11.17",
"tsx": "^4.7.1"
},
"packageManager": "pnpm@9.5.0"
}

View File

@@ -0,0 +1,55 @@
import { serve } from "@hono/node-server";
import { Hono } from "hono";
import "dotenv/config";
import { zValidator } from "@hono/zod-validator";
import { jobQueueSchema } from "./schema";
import { firstWorker, secondWorker } from "./workers";
import { logger } from "./logger";
import { cleanQueue, removeJob, scheduleJob } from "./queue";
const app = new Hono();
cleanQueue();
app.post("/create-backup", zValidator("json", jobQueueSchema), (c) => {
const data = c.req.valid("json");
scheduleJob(data);
logger.info("Backup created successfully", data);
return c.json({ message: "Backup created successfully" });
});
app.post("/remove-job", zValidator("json", jobQueueSchema), async (c) => {
const data = c.req.valid("json");
const result = await removeJob(data);
logger.info("Job removed successfully", data);
return c.json({ message: "Job removed successfully", result });
});
app.get("/health", async (c) => {
return c.json({ status: "ok" });
});
export const gracefulShutdown = async (signal: string) => {
logger.warn(`Received ${signal}, closing server...`);
await firstWorker.close();
await secondWorker.close();
process.exit(0);
};
process.on("SIGINT", () => gracefulShutdown("SIGINT"));
process.on("SIGTERM", () => gracefulShutdown("SIGTERM"));
process.on("uncaughtException", (err) => {
logger.error(err, "Uncaught exception");
});
process.on("unhandledRejection", (reason, promise) => {
logger.error({ promise, reason }, "Unhandled Rejection at: Promise");
});
const port = Number.parseInt(process.env.PORT || "3000");
logger.info("Starting Schedules Server ✅", port);
serve({ fetch: app.fetch, port });

View File

@@ -0,0 +1,10 @@
import pino from "pino";
export const logger = pino({
transport: {
target: "pino-pretty",
options: {
colorize: true,
},
},
});

View File

@@ -0,0 +1,55 @@
import { Queue } from "bullmq";
import type { QueueJob } from "./schema";
import { logger } from "./logger";
export const jobQueue = new Queue("backupQueue", {
connection: {
host: process.env.REDIS_URL,
},
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
});
export const cleanQueue = async () => {
try {
await jobQueue.obliterate({ force: true });
logger.info("Queue Cleaned");
} catch (error) {
logger.error("Error cleaning queue:", error);
}
};
export const scheduleJob = (job: QueueJob) => {
if (job.type === "backup") {
jobQueue.add(job.backupId, job, {
repeat: {
pattern: job.cronSchedule,
},
});
} else if (job.type === "server") {
jobQueue.add(`${job.serverId}-cleanup`, job, {
repeat: {
pattern: job.cronSchedule,
},
});
}
};
export const removeJob = async (data: QueueJob) => {
if (data.type === "backup") {
const { backupId, cronSchedule } = data;
const result = await jobQueue.removeRepeatable(backupId, {
pattern: cronSchedule,
});
return result;
}
if (data.type === "server") {
const { serverId, cronSchedule } = data;
const result = await jobQueue.removeRepeatable(`${serverId}-cleanup`, {
pattern: cronSchedule,
});
return result;
}
};

View File

@@ -0,0 +1,16 @@
import { z } from "zod";
export const jobQueueSchema = z.discriminatedUnion("type", [
z.object({
cronSchedule: z.string(),
type: z.literal("backup"),
backupId: z.string(),
}),
z.object({
cronSchedule: z.string(),
type: z.literal("server"),
serverId: z.string(),
}),
]);
export type QueueJob = z.infer<typeof jobQueueSchema>;

View File

@@ -0,0 +1,42 @@
import {
cleanUpDockerBuilder,
cleanUpSystemPrune,
cleanUpUnusedImages,
findBackupById,
runMariadbBackup,
runMongoBackup,
runMySqlBackup,
runPostgresBackup,
} from "@dokploy/builders";
import type { QueueJob } from "./schema";
import { logger } from "./logger";
export const runJobs = async (job: QueueJob) => {
try {
if (job.type === "backup") {
const { backupId } = job;
const backup = await findBackupById(backupId);
const { databaseType, postgres, mysql, mongo, mariadb } = backup;
if (databaseType === "postgres" && postgres) {
await runPostgresBackup(postgres, backup);
} else if (databaseType === "mysql" && mysql) {
await runMySqlBackup(mysql, backup);
} else if (databaseType === "mongo" && mongo) {
await runMongoBackup(mongo, backup);
} else if (databaseType === "mariadb" && mariadb) {
await runMariadbBackup(mariadb, backup);
}
}
if (job.type === "server") {
const { serverId } = job;
await cleanUpUnusedImages(serverId);
await cleanUpDockerBuilder(serverId);
await cleanUpSystemPrune(serverId);
// await sendDockerCleanupNotifications();
}
} catch (error) {
logger.error(error);
}
return true;
};

View File

@@ -0,0 +1,28 @@
import type { QueueJob } from "./schema";
import { type Job, Worker } from "bullmq";
import { runJobs } from "./utils";
export const firstWorker = new Worker(
"backupQueue",
async (job: Job<QueueJob>) => {
await runJobs(job.data);
},
{
concurrency: 50,
connection: {
host: process.env.REDIS_URL,
},
},
);
export const secondWorker = new Worker(
"backupQueue",
async (job: Job<QueueJob>) => {
await runJobs(job.data);
},
{
concurrency: 50,
connection: {
host: process.env.REDIS_URL,
},
},
);

View File

@@ -0,0 +1,13 @@
{
"compilerOptions": {
"target": "ESNext",
"module": "ESNext",
"moduleResolution": "Node",
"strict": true,
"skipLibCheck": true,
"outDir": "dist",
"jsx": "react-jsx",
"jsxImportSource": "hono/jsx"
},
"exclude": ["node_modules", "dist"]
}