Files
dokploy/apps/schedules/src/queue.ts
Mauricio Siu ccb141339b Enhance schedule management and job handling features
- Updated the scheduleRouter to manage job scheduling and removal based on the enabled status of schedules, improving job lifecycle management.
- Refactored the scheduleJob and removeJob utilities to support scheduling and removing jobs for both server and schedule types.
- Introduced a new schema for jobQueue to accommodate schedule jobs, enhancing the flexibility of job definitions.
- Improved the runJobs function to execute scheduled jobs based on their enabled status, ensuring proper execution of active schedules.
- Enhanced the initialization process for schedules to automatically schedule active jobs from the database, streamlining the setup process.
2025-05-03 00:54:01 -06:00

95 lines
2.3 KiB
TypeScript

import { Queue, type RepeatableJob } from "bullmq";
import IORedis from "ioredis";
import { logger } from "./logger.js";
import type { QueueJob } from "./schema.js";
export const connection = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
});
export const jobQueue = new Queue("backupQueue", {
connection,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
});
export const cleanQueue = async () => {
try {
await jobQueue.obliterate({ force: true });
logger.info("Queue Cleaned");
} catch (error) {
logger.error("Error cleaning queue:", error);
}
};
export const scheduleJob = (job: QueueJob) => {
if (job.type === "backup") {
jobQueue.add(job.backupId, job, {
repeat: {
pattern: job.cronSchedule,
},
});
} else if (job.type === "server") {
jobQueue.add(`${job.serverId}-cleanup`, job, {
repeat: {
pattern: job.cronSchedule,
},
});
} else if (job.type === "schedule") {
jobQueue.add(job.scheduleId, job, {
repeat: {
pattern: job.cronSchedule,
},
});
}
};
export const removeJob = async (data: QueueJob) => {
if (data.type === "backup") {
const { backupId, cronSchedule } = data;
const result = await jobQueue.removeRepeatable(backupId, {
pattern: cronSchedule,
});
return result;
}
if (data.type === "server") {
const { serverId, cronSchedule } = data;
const result = await jobQueue.removeRepeatable(`${serverId}-cleanup`, {
pattern: cronSchedule,
});
return result;
}
if (data.type === "schedule") {
const { scheduleId, cronSchedule } = data;
const result = await jobQueue.removeRepeatable(scheduleId, {
pattern: cronSchedule,
});
return result;
}
return false;
};
export const getJobRepeatable = async (
data: QueueJob,
): Promise<RepeatableJob | null> => {
const repeatableJobs = await jobQueue.getRepeatableJobs();
if (data.type === "backup") {
const { backupId } = data;
const job = repeatableJobs.find((j) => j.name === backupId);
return job ? job : null;
}
if (data.type === "server") {
const { serverId } = data;
const job = repeatableJobs.find((j) => j.name === `${serverId}-cleanup`);
return job ? job : null;
}
if (data.type === "schedule") {
const { scheduleId } = data;
const job = repeatableJobs.find((j) => j.name === scheduleId);
return job ? job : null;
}
return null;
};