refactor: update queue jobs

This commit is contained in:
Mauricio Siu
2024-10-06 14:16:31 -06:00
parent 24e4930fc1
commit 5f56512e56
6 changed files with 53 additions and 8 deletions

View File

@@ -11,13 +11,9 @@ import {
createBackup, createBackup,
findBackupById, findBackupById,
findMariadbByBackupId, findMariadbByBackupId,
findMariadbById,
findMongoByBackupId, findMongoByBackupId,
findMongoById,
findMySqlByBackupId, findMySqlByBackupId,
findMySqlById,
findPostgresByBackupId, findPostgresByBackupId,
findPostgresById,
removeBackupById, removeBackupById,
removeScheduleBackup, removeScheduleBackup,
runMariadbBackup, runMariadbBackup,

View File

@@ -7,6 +7,7 @@
"start": "node --experimental-specifier-resolution=node dist/index.js" "start": "node --experimental-specifier-resolution=node dist/index.js"
}, },
"dependencies": { "dependencies": {
"drizzle-orm": "^0.30.8",
"ioredis": "5.4.1", "ioredis": "5.4.1",
"bullmq": "5.4.2", "bullmq": "5.4.2",
"@hono/zod-validator": "0.3.0", "@hono/zod-validator": "0.3.0",

View File

@@ -6,10 +6,12 @@ import { logger } from "./logger";
import { cleanQueue, getJobRepeatable, removeJob, scheduleJob } from "./queue"; import { cleanQueue, getJobRepeatable, removeJob, scheduleJob } from "./queue";
import { jobQueueSchema } from "./schema"; import { jobQueueSchema } from "./schema";
import { firstWorker, secondWorker } from "./workers"; import { firstWorker, secondWorker } from "./workers";
import { initializeJobs } from "./utils";
const app = new Hono(); const app = new Hono();
cleanQueue(); cleanQueue();
initializeJobs();
app.use(async (c, next) => { app.use(async (c, next) => {
if (c.req.path === "/health") { if (c.req.path === "/health") {
@@ -27,7 +29,7 @@ app.use(async (c, next) => {
app.post("/create-backup", zValidator("json", jobQueueSchema), async (c) => { app.post("/create-backup", zValidator("json", jobQueueSchema), async (c) => {
const data = c.req.valid("json"); const data = c.req.valid("json");
scheduleJob(data); scheduleJob(data);
logger.info("Backup created successfully", data); logger.info({ data }, "Backup created successfully");
return c.json({ message: "Backup created successfully" }); return c.json({ message: "Backup created successfully" });
}); });
@@ -49,7 +51,7 @@ app.post("/update-backup", zValidator("json", jobQueueSchema), async (c) => {
cronSchedule: job.pattern, cronSchedule: job.pattern,
}); });
} }
logger.info("Job removed", result); logger.info({ result }, "Job removed");
} }
scheduleJob(data); scheduleJob(data);
logger.info("Backup updated successfully"); logger.info("Backup updated successfully");

View File

@@ -8,8 +8,12 @@ import {
runMySqlBackup, runMySqlBackup,
runPostgresBackup, runPostgresBackup,
} from "@dokploy/server"; } from "@dokploy/server";
import { db } from "@dokploy/server/dist/db";
import { eq } from "drizzle-orm";
import { logger } from "./logger"; import { logger } from "./logger";
import type { QueueJob } from "./schema"; import type { QueueJob } from "./schema";
import { scheduleJob } from "./queue";
import { backups, server } from "@dokploy/server/dist/db/schema";
export const runJobs = async (job: QueueJob) => { export const runJobs = async (job: QueueJob) => {
try { try {
@@ -40,3 +44,41 @@ export const runJobs = async (job: QueueJob) => {
return true; return true;
}; };
export const initializeJobs = async () => {
logger.info("Setting up Jobs....");
const servers = await db.query.server.findMany({
where: eq(server.enableDockerCleanup, true),
});
for (const server of servers) {
const { serverId } = server;
scheduleJob({
serverId,
type: "server",
cronSchedule: "0 0 * * *",
});
}
logger.info({ Quantity: servers.length }, "Servers Initialized");
const backupsResult = await db.query.backups.findMany({
where: eq(backups.enabled, true),
with: {
mariadb: true,
mysql: true,
postgres: true,
mongo: true,
},
});
for (const backup of backupsResult) {
scheduleJob({
backupId: backup.backupId,
type: "backup",
cronSchedule: backup.schedule,
});
}
logger.info({ Quantity: backupsResult.length }, "Backups Initialized");
};

View File

@@ -2,11 +2,12 @@ import { type Job, Worker } from "bullmq";
import type { QueueJob } from "./schema"; import type { QueueJob } from "./schema";
import { runJobs } from "./utils"; import { runJobs } from "./utils";
import { connection } from "./queue"; import { connection } from "./queue";
import { logger } from "./logger";
export const firstWorker = new Worker( export const firstWorker = new Worker(
"backupQueue", "backupQueue",
async (job: Job<QueueJob>) => { async (job: Job<QueueJob>) => {
console.log("Job received", job.data); logger.info({ data: job.data }, "Job received");
await runJobs(job.data); await runJobs(job.data);
}, },
{ {
@@ -17,7 +18,7 @@ export const firstWorker = new Worker(
export const secondWorker = new Worker( export const secondWorker = new Worker(
"backupQueue", "backupQueue",
async (job: Job<QueueJob>) => { async (job: Job<QueueJob>) => {
console.log(job.data); logger.info({ data: job.data }, "Job received");
await runJobs(job.data); await runJobs(job.data);
}, },
{ {

3
pnpm-lock.yaml generated
View File

@@ -462,6 +462,9 @@ importers:
dotenv: dotenv:
specifier: ^16.3.1 specifier: ^16.3.1
version: 16.4.5 version: 16.4.5
drizzle-orm:
specifier: ^0.30.8
version: 0.30.10(@types/react@18.3.5)(postgres@3.4.4)(react@18.2.0)
hono: hono:
specifier: ^4.5.8 specifier: ^4.5.8
version: 4.5.8 version: 4.5.8