From ccb141339bec399351d1a5adf0914160bc6ae49d Mon Sep 17 00:00:00 2001 From: Mauricio Siu <47042324+Siumauricio@users.noreply.github.com> Date: Sat, 3 May 2025 00:54:01 -0600 Subject: [PATCH] Enhance schedule management and job handling features - Updated the scheduleRouter to manage job scheduling and removal based on the enabled status of schedules, improving job lifecycle management. - Refactored the scheduleJob and removeJob utilities to support scheduling and removing jobs for both server and schedule types. - Introduced a new schema for jobQueue to accommodate schedule jobs, enhancing the flexibility of job definitions. - Improved the runJobs function to execute scheduled jobs based on their enabled status, ensuring proper execution of active schedules. - Enhanced the initialization process for schedules to automatically schedule active jobs from the database, streamlining the setup process. --- apps/dokploy/server/api/routers/schedule.ts | 57 +++++++++++++++++++-- apps/dokploy/server/utils/backup.ts | 5 ++ apps/schedules/src/index.ts | 10 +++- apps/schedules/src/queue.ts | 19 ++++++- apps/schedules/src/schema.ts | 5 ++ apps/schedules/src/utils.ts | 26 ++++++++-- packages/server/src/index.ts | 1 + packages/server/src/services/schedule.ts | 24 +++------ 8 files changed, 120 insertions(+), 27 deletions(-) diff --git a/apps/dokploy/server/api/routers/schedule.ts b/apps/dokploy/server/api/routers/schedule.ts index d5e3adeb..2b204adb 100644 --- a/apps/dokploy/server/api/routers/schedule.ts +++ b/apps/dokploy/server/api/routers/schedule.ts @@ -16,27 +16,74 @@ import { createSchedule, updateSchedule, } from "@dokploy/server/services/schedule"; - +import { IS_CLOUD, scheduleJob } from "@dokploy/server"; +import { removeJob, schedule } from "@/server/utils/backup"; +import { removeScheduleJob } from "@dokploy/server"; export const scheduleRouter = createTRPCRouter({ create: protectedProcedure .input(createScheduleSchema) .mutation(async ({ input }) => { - const schedule = await createSchedule(input); - return schedule; + const newSchedule = await createSchedule(input); + + if (newSchedule?.enabled) { + if (IS_CLOUD) { + schedule({ + scheduleId: newSchedule.scheduleId, + type: "schedule", + cronSchedule: newSchedule.cronExpression, + }); + } else { + scheduleJob(newSchedule); + } + } + return newSchedule; }), update: protectedProcedure .input(updateScheduleSchema) .mutation(async ({ input }) => { - const schedule = await updateSchedule(input); - return schedule; + const updatedSchedule = await updateSchedule(input); + + if (IS_CLOUD) { + if (updatedSchedule?.enabled) { + schedule({ + scheduleId: updatedSchedule.scheduleId, + type: "schedule", + cronSchedule: updatedSchedule.cronExpression, + }); + } else { + await removeJob({ + cronSchedule: updatedSchedule.cronExpression, + scheduleId: updatedSchedule.scheduleId, + type: "schedule", + }); + } + } else { + if (updatedSchedule?.enabled) { + removeScheduleJob(updatedSchedule.scheduleId); + scheduleJob(updatedSchedule); + } else { + removeScheduleJob(updatedSchedule.scheduleId); + } + } + return updatedSchedule; }), delete: protectedProcedure .input(z.object({ scheduleId: z.string() })) .mutation(async ({ input }) => { + const schedule = await findScheduleById(input.scheduleId); await deleteSchedule(input.scheduleId); + if (IS_CLOUD) { + await removeJob({ + cronSchedule: schedule.cronExpression, + scheduleId: schedule.scheduleId, + type: "schedule", + }); + } else { + removeScheduleJob(schedule.scheduleId); + } return true; }), diff --git a/apps/dokploy/server/utils/backup.ts b/apps/dokploy/server/utils/backup.ts index 4fc9db93..cf0b6c22 100644 --- a/apps/dokploy/server/utils/backup.ts +++ b/apps/dokploy/server/utils/backup.ts @@ -14,6 +14,11 @@ type QueueJob = type: "server"; cronSchedule: string; serverId: string; + } + | { + type: "schedule"; + cronSchedule: string; + scheduleId: string; }; export const schedule = async (job: QueueJob) => { try { diff --git a/apps/schedules/src/index.ts b/apps/schedules/src/index.ts index 0ef8e930..7ab2b98c 100644 --- a/apps/schedules/src/index.ts +++ b/apps/schedules/src/index.ts @@ -34,8 +34,8 @@ app.use(async (c, next) => { app.post("/create-backup", zValidator("json", jobQueueSchema), async (c) => { const data = c.req.valid("json"); scheduleJob(data); - logger.info({ data }, "Backup created successfully"); - return c.json({ message: "Backup created successfully" }); + logger.info({ data }, `[${data.type}] created successfully`); + return c.json({ message: `[${data.type}] created successfully` }); }); app.post("/update-backup", zValidator("json", jobQueueSchema), async (c) => { @@ -55,6 +55,12 @@ app.post("/update-backup", zValidator("json", jobQueueSchema), async (c) => { type: "server", cronSchedule: job.pattern, }); + } else if (data.type === "schedule") { + result = await removeJob({ + scheduleId: data.scheduleId, + type: "schedule", + cronSchedule: job.pattern, + }); } logger.info({ result }, "Job removed"); } diff --git a/apps/schedules/src/queue.ts b/apps/schedules/src/queue.ts index e751fa6d..5a1efc05 100644 --- a/apps/schedules/src/queue.ts +++ b/apps/schedules/src/queue.ts @@ -36,6 +36,12 @@ export const scheduleJob = (job: QueueJob) => { pattern: job.cronSchedule, }, }); + } else if (job.type === "schedule") { + jobQueue.add(job.scheduleId, job, { + repeat: { + pattern: job.cronSchedule, + }, + }); } }; @@ -54,7 +60,13 @@ export const removeJob = async (data: QueueJob) => { }); return result; } - + if (data.type === "schedule") { + const { scheduleId, cronSchedule } = data; + const result = await jobQueue.removeRepeatable(scheduleId, { + pattern: cronSchedule, + }); + return result; + } return false; }; @@ -72,6 +84,11 @@ export const getJobRepeatable = async ( const job = repeatableJobs.find((j) => j.name === `${serverId}-cleanup`); return job ? job : null; } + if (data.type === "schedule") { + const { scheduleId } = data; + const job = repeatableJobs.find((j) => j.name === scheduleId); + return job ? job : null; + } return null; }; diff --git a/apps/schedules/src/schema.ts b/apps/schedules/src/schema.ts index feadb5a9..32b2536b 100644 --- a/apps/schedules/src/schema.ts +++ b/apps/schedules/src/schema.ts @@ -11,6 +11,11 @@ export const jobQueueSchema = z.discriminatedUnion("type", [ type: z.literal("server"), serverId: z.string(), }), + z.object({ + cronSchedule: z.string(), + type: z.literal("schedule"), + scheduleId: z.string(), + }), ]); export type QueueJob = z.infer; diff --git a/apps/schedules/src/utils.ts b/apps/schedules/src/utils.ts index b0d9b877..32ffbda2 100644 --- a/apps/schedules/src/utils.ts +++ b/apps/schedules/src/utils.ts @@ -3,15 +3,17 @@ import { cleanUpSystemPrune, cleanUpUnusedImages, findBackupById, + findScheduleById, findServerById, keepLatestNBackups, + runCommand, runMariadbBackup, runMongoBackup, runMySqlBackup, runPostgresBackup, } from "@dokploy/server"; import { db } from "@dokploy/server/dist/db"; -import { backups, server } from "@dokploy/server/dist/db/schema"; +import { backups, schedules, server } from "@dokploy/server/dist/db/schema"; import { eq } from "drizzle-orm"; import { logger } from "./logger.js"; import { scheduleJob } from "./queue.js"; @@ -57,8 +59,7 @@ export const runJobs = async (job: QueueJob) => { await runMariadbBackup(mariadb, backup); await keepLatestNBackups(backup, server.serverId); } - } - if (job.type === "server") { + } else if (job.type === "server") { const { serverId } = job; const server = await findServerById(serverId); if (server.serverStatus === "inactive") { @@ -68,6 +69,12 @@ export const runJobs = async (job: QueueJob) => { await cleanUpUnusedImages(serverId); await cleanUpDockerBuilder(serverId); await cleanUpSystemPrune(serverId); + } else if (job.type === "schedule") { + const { scheduleId } = job; + const schedule = await findScheduleById(scheduleId); + if (schedule.enabled) { + await runCommand(schedule.scheduleId); + } } } catch (error) { logger.error(error); @@ -112,4 +119,17 @@ export const initializeJobs = async () => { }); } logger.info({ Quantity: backupsResult.length }, "Backups Initialized"); + + const schedulesResult = await db.query.schedules.findMany({ + where: eq(schedules.enabled, true), + }); + + for (const schedule of schedulesResult) { + scheduleJob({ + scheduleId: schedule.scheduleId, + type: "schedule", + cronSchedule: schedule.cronExpression, + }); + } + logger.info({ Quantity: schedulesResult.length }, "Schedules Initialized"); }; diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 46916638..950403b2 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -30,6 +30,7 @@ export * from "./services/github"; export * from "./services/gitlab"; export * from "./services/gitea"; export * from "./services/server"; +export * from "./services/schedule"; export * from "./services/application"; export * from "./utils/databases/rebuild"; export * from "./setup/config-paths"; diff --git a/packages/server/src/services/schedule.ts b/packages/server/src/services/schedule.ts index a00ffc35..b9a6cd52 100644 --- a/packages/server/src/services/schedule.ts +++ b/packages/server/src/services/schedule.ts @@ -8,10 +8,9 @@ import type { updateScheduleSchema, } from "../db/schema/schedule"; import { execAsync, execAsyncRemote } from "../utils/process/execAsync"; -import { IS_CLOUD, paths } from "../constants"; +import { paths } from "../constants"; import path from "node:path"; import { encodeBase64 } from "../utils/docker/utils"; -import { scheduleJob, removeScheduleJob } from "../utils/schedules/utils"; export type ScheduleExtended = Awaited>; @@ -29,10 +28,6 @@ export const createSchedule = async ( await handleScript(newSchedule); } - if (newSchedule?.enabled) { - scheduleJob(newSchedule); - } - return newSchedule; }; @@ -94,6 +89,13 @@ export const updateSchedule = async ( .where(eq(schedules.scheduleId, scheduleId)) .returning(); + if (!updatedSchedule) { + throw new TRPCError({ + code: "NOT_FOUND", + message: "Schedule not found", + }); + } + if ( updatedSchedule?.scheduleType === "dokploy-server" || updatedSchedule?.scheduleType === "server" @@ -101,16 +103,6 @@ export const updateSchedule = async ( await handleScript(updatedSchedule); } - if (IS_CLOUD) { - // scheduleJob(updatedSchedule); - } else { - if (updatedSchedule?.enabled) { - removeScheduleJob(scheduleId); - scheduleJob(updatedSchedule); - } else { - removeScheduleJob(scheduleId); - } - } return updatedSchedule; };