mirror of
https://github.com/Dokploy/dokploy
synced 2025-06-26 18:27:59 +00:00
Enhance schedule management and job handling features
- Updated the scheduleRouter to manage job scheduling and removal based on the enabled status of schedules, improving job lifecycle management. - Refactored the scheduleJob and removeJob utilities to support scheduling and removing jobs for both server and schedule types. - Introduced a new schema for jobQueue to accommodate schedule jobs, enhancing the flexibility of job definitions. - Improved the runJobs function to execute scheduled jobs based on their enabled status, ensuring proper execution of active schedules. - Enhanced the initialization process for schedules to automatically schedule active jobs from the database, streamlining the setup process.
This commit is contained in:
parent
c87af312ca
commit
ccb141339b
@ -16,27 +16,74 @@ import {
|
||||
createSchedule,
|
||||
updateSchedule,
|
||||
} from "@dokploy/server/services/schedule";
|
||||
|
||||
import { IS_CLOUD, scheduleJob } from "@dokploy/server";
|
||||
import { removeJob, schedule } from "@/server/utils/backup";
|
||||
import { removeScheduleJob } from "@dokploy/server";
|
||||
export const scheduleRouter = createTRPCRouter({
|
||||
create: protectedProcedure
|
||||
.input(createScheduleSchema)
|
||||
.mutation(async ({ input }) => {
|
||||
const schedule = await createSchedule(input);
|
||||
return schedule;
|
||||
const newSchedule = await createSchedule(input);
|
||||
|
||||
if (newSchedule?.enabled) {
|
||||
if (IS_CLOUD) {
|
||||
schedule({
|
||||
scheduleId: newSchedule.scheduleId,
|
||||
type: "schedule",
|
||||
cronSchedule: newSchedule.cronExpression,
|
||||
});
|
||||
} else {
|
||||
scheduleJob(newSchedule);
|
||||
}
|
||||
}
|
||||
return newSchedule;
|
||||
}),
|
||||
|
||||
update: protectedProcedure
|
||||
.input(updateScheduleSchema)
|
||||
.mutation(async ({ input }) => {
|
||||
const schedule = await updateSchedule(input);
|
||||
return schedule;
|
||||
const updatedSchedule = await updateSchedule(input);
|
||||
|
||||
if (IS_CLOUD) {
|
||||
if (updatedSchedule?.enabled) {
|
||||
schedule({
|
||||
scheduleId: updatedSchedule.scheduleId,
|
||||
type: "schedule",
|
||||
cronSchedule: updatedSchedule.cronExpression,
|
||||
});
|
||||
} else {
|
||||
await removeJob({
|
||||
cronSchedule: updatedSchedule.cronExpression,
|
||||
scheduleId: updatedSchedule.scheduleId,
|
||||
type: "schedule",
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (updatedSchedule?.enabled) {
|
||||
removeScheduleJob(updatedSchedule.scheduleId);
|
||||
scheduleJob(updatedSchedule);
|
||||
} else {
|
||||
removeScheduleJob(updatedSchedule.scheduleId);
|
||||
}
|
||||
}
|
||||
return updatedSchedule;
|
||||
}),
|
||||
|
||||
delete: protectedProcedure
|
||||
.input(z.object({ scheduleId: z.string() }))
|
||||
.mutation(async ({ input }) => {
|
||||
const schedule = await findScheduleById(input.scheduleId);
|
||||
await deleteSchedule(input.scheduleId);
|
||||
|
||||
if (IS_CLOUD) {
|
||||
await removeJob({
|
||||
cronSchedule: schedule.cronExpression,
|
||||
scheduleId: schedule.scheduleId,
|
||||
type: "schedule",
|
||||
});
|
||||
} else {
|
||||
removeScheduleJob(schedule.scheduleId);
|
||||
}
|
||||
return true;
|
||||
}),
|
||||
|
||||
|
@ -14,6 +14,11 @@ type QueueJob =
|
||||
type: "server";
|
||||
cronSchedule: string;
|
||||
serverId: string;
|
||||
}
|
||||
| {
|
||||
type: "schedule";
|
||||
cronSchedule: string;
|
||||
scheduleId: string;
|
||||
};
|
||||
export const schedule = async (job: QueueJob) => {
|
||||
try {
|
||||
|
@ -34,8 +34,8 @@ app.use(async (c, next) => {
|
||||
app.post("/create-backup", zValidator("json", jobQueueSchema), async (c) => {
|
||||
const data = c.req.valid("json");
|
||||
scheduleJob(data);
|
||||
logger.info({ data }, "Backup created successfully");
|
||||
return c.json({ message: "Backup created successfully" });
|
||||
logger.info({ data }, `[${data.type}] created successfully`);
|
||||
return c.json({ message: `[${data.type}] created successfully` });
|
||||
});
|
||||
|
||||
app.post("/update-backup", zValidator("json", jobQueueSchema), async (c) => {
|
||||
@ -55,6 +55,12 @@ app.post("/update-backup", zValidator("json", jobQueueSchema), async (c) => {
|
||||
type: "server",
|
||||
cronSchedule: job.pattern,
|
||||
});
|
||||
} else if (data.type === "schedule") {
|
||||
result = await removeJob({
|
||||
scheduleId: data.scheduleId,
|
||||
type: "schedule",
|
||||
cronSchedule: job.pattern,
|
||||
});
|
||||
}
|
||||
logger.info({ result }, "Job removed");
|
||||
}
|
||||
|
@ -36,6 +36,12 @@ export const scheduleJob = (job: QueueJob) => {
|
||||
pattern: job.cronSchedule,
|
||||
},
|
||||
});
|
||||
} else if (job.type === "schedule") {
|
||||
jobQueue.add(job.scheduleId, job, {
|
||||
repeat: {
|
||||
pattern: job.cronSchedule,
|
||||
},
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@ -54,7 +60,13 @@ export const removeJob = async (data: QueueJob) => {
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
if (data.type === "schedule") {
|
||||
const { scheduleId, cronSchedule } = data;
|
||||
const result = await jobQueue.removeRepeatable(scheduleId, {
|
||||
pattern: cronSchedule,
|
||||
});
|
||||
return result;
|
||||
}
|
||||
return false;
|
||||
};
|
||||
|
||||
@ -72,6 +84,11 @@ export const getJobRepeatable = async (
|
||||
const job = repeatableJobs.find((j) => j.name === `${serverId}-cleanup`);
|
||||
return job ? job : null;
|
||||
}
|
||||
if (data.type === "schedule") {
|
||||
const { scheduleId } = data;
|
||||
const job = repeatableJobs.find((j) => j.name === scheduleId);
|
||||
return job ? job : null;
|
||||
}
|
||||
|
||||
return null;
|
||||
};
|
||||
|
@ -11,6 +11,11 @@ export const jobQueueSchema = z.discriminatedUnion("type", [
|
||||
type: z.literal("server"),
|
||||
serverId: z.string(),
|
||||
}),
|
||||
z.object({
|
||||
cronSchedule: z.string(),
|
||||
type: z.literal("schedule"),
|
||||
scheduleId: z.string(),
|
||||
}),
|
||||
]);
|
||||
|
||||
export type QueueJob = z.infer<typeof jobQueueSchema>;
|
||||
|
@ -3,15 +3,17 @@ import {
|
||||
cleanUpSystemPrune,
|
||||
cleanUpUnusedImages,
|
||||
findBackupById,
|
||||
findScheduleById,
|
||||
findServerById,
|
||||
keepLatestNBackups,
|
||||
runCommand,
|
||||
runMariadbBackup,
|
||||
runMongoBackup,
|
||||
runMySqlBackup,
|
||||
runPostgresBackup,
|
||||
} from "@dokploy/server";
|
||||
import { db } from "@dokploy/server/dist/db";
|
||||
import { backups, server } from "@dokploy/server/dist/db/schema";
|
||||
import { backups, schedules, server } from "@dokploy/server/dist/db/schema";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { logger } from "./logger.js";
|
||||
import { scheduleJob } from "./queue.js";
|
||||
@ -57,8 +59,7 @@ export const runJobs = async (job: QueueJob) => {
|
||||
await runMariadbBackup(mariadb, backup);
|
||||
await keepLatestNBackups(backup, server.serverId);
|
||||
}
|
||||
}
|
||||
if (job.type === "server") {
|
||||
} else if (job.type === "server") {
|
||||
const { serverId } = job;
|
||||
const server = await findServerById(serverId);
|
||||
if (server.serverStatus === "inactive") {
|
||||
@ -68,6 +69,12 @@ export const runJobs = async (job: QueueJob) => {
|
||||
await cleanUpUnusedImages(serverId);
|
||||
await cleanUpDockerBuilder(serverId);
|
||||
await cleanUpSystemPrune(serverId);
|
||||
} else if (job.type === "schedule") {
|
||||
const { scheduleId } = job;
|
||||
const schedule = await findScheduleById(scheduleId);
|
||||
if (schedule.enabled) {
|
||||
await runCommand(schedule.scheduleId);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
@ -112,4 +119,17 @@ export const initializeJobs = async () => {
|
||||
});
|
||||
}
|
||||
logger.info({ Quantity: backupsResult.length }, "Backups Initialized");
|
||||
|
||||
const schedulesResult = await db.query.schedules.findMany({
|
||||
where: eq(schedules.enabled, true),
|
||||
});
|
||||
|
||||
for (const schedule of schedulesResult) {
|
||||
scheduleJob({
|
||||
scheduleId: schedule.scheduleId,
|
||||
type: "schedule",
|
||||
cronSchedule: schedule.cronExpression,
|
||||
});
|
||||
}
|
||||
logger.info({ Quantity: schedulesResult.length }, "Schedules Initialized");
|
||||
};
|
||||
|
@ -30,6 +30,7 @@ export * from "./services/github";
|
||||
export * from "./services/gitlab";
|
||||
export * from "./services/gitea";
|
||||
export * from "./services/server";
|
||||
export * from "./services/schedule";
|
||||
export * from "./services/application";
|
||||
export * from "./utils/databases/rebuild";
|
||||
export * from "./setup/config-paths";
|
||||
|
@ -8,10 +8,9 @@ import type {
|
||||
updateScheduleSchema,
|
||||
} from "../db/schema/schedule";
|
||||
import { execAsync, execAsyncRemote } from "../utils/process/execAsync";
|
||||
import { IS_CLOUD, paths } from "../constants";
|
||||
import { paths } from "../constants";
|
||||
import path from "node:path";
|
||||
import { encodeBase64 } from "../utils/docker/utils";
|
||||
import { scheduleJob, removeScheduleJob } from "../utils/schedules/utils";
|
||||
|
||||
export type ScheduleExtended = Awaited<ReturnType<typeof findScheduleById>>;
|
||||
|
||||
@ -29,10 +28,6 @@ export const createSchedule = async (
|
||||
await handleScript(newSchedule);
|
||||
}
|
||||
|
||||
if (newSchedule?.enabled) {
|
||||
scheduleJob(newSchedule);
|
||||
}
|
||||
|
||||
return newSchedule;
|
||||
};
|
||||
|
||||
@ -94,6 +89,13 @@ export const updateSchedule = async (
|
||||
.where(eq(schedules.scheduleId, scheduleId))
|
||||
.returning();
|
||||
|
||||
if (!updatedSchedule) {
|
||||
throw new TRPCError({
|
||||
code: "NOT_FOUND",
|
||||
message: "Schedule not found",
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
updatedSchedule?.scheduleType === "dokploy-server" ||
|
||||
updatedSchedule?.scheduleType === "server"
|
||||
@ -101,16 +103,6 @@ export const updateSchedule = async (
|
||||
await handleScript(updatedSchedule);
|
||||
}
|
||||
|
||||
if (IS_CLOUD) {
|
||||
// scheduleJob(updatedSchedule);
|
||||
} else {
|
||||
if (updatedSchedule?.enabled) {
|
||||
removeScheduleJob(scheduleId);
|
||||
scheduleJob(updatedSchedule);
|
||||
} else {
|
||||
removeScheduleJob(scheduleId);
|
||||
}
|
||||
}
|
||||
return updatedSchedule;
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user