fix(bull): requeue jobs after restart

This commit is contained in:
Gergo Moricz 2024-07-12 13:55:53 +02:00
parent 961b27811d
commit 10957b748b
1 changed files with 20 additions and 15 deletions

View File

@ -118,16 +118,25 @@ if (cluster.isMaster) {
app.post(`/admin/${process.env.BULL_AUTH_KEY}/shutdown`, async (req, res) => { app.post(`/admin/${process.env.BULL_AUTH_KEY}/shutdown`, async (req, res) => {
try { try {
const wsq = getWebScraperQueue();
console.log("Gracefully shutting down..."); console.log("Gracefully shutting down...");
await getWebScraperQueue().pause(false, true);
res.json({ ok: true });
} catch (error) {
console.error(error);
return res.status(500).json({ error: error.message });
}
});
await wsq.pause(false, true); app.post(`/admin/${process.env.BULL_AUTH_KEY}/unpause`, async (req, res) => {
try {
const wsq = getWebScraperQueue();
const jobs = await wsq.getActive(); const jobs = await wsq.getActive();
console.log("Requeueing", jobs.length, "jobs...");
if (jobs.length > 0) { if (jobs.length > 0) {
console.log("Removing", jobs.length, "jobs..."); console.log(" Removing", jobs.length, "jobs...");
await Promise.all(jobs.map(async x => { await Promise.all(jobs.map(async x => {
await wsq.client.del(await x.lockKey()); await wsq.client.del(await x.lockKey());
@ -136,7 +145,7 @@ if (cluster.isMaster) {
await x.remove(); await x.remove();
})); }));
console.log("Re-adding", jobs.length, "jobs..."); console.log(" Re-adding", jobs.length, "jobs...");
await wsq.addBulk(jobs.map(x => ({ await wsq.addBulk(jobs.map(x => ({
data: x.data, data: x.data,
opts: { opts: {
@ -144,21 +153,17 @@ if (cluster.isMaster) {
}, },
}))); })));
console.log("Done!"); console.log(" Done!");
res.json({ ok: true });
} }
await getWebScraperQueue().resume(false);
res.json({ ok: true });
} catch (error) { } catch (error) {
console.error(error); console.error(error);
return res.status(500).json({ error: error.message }); return res.status(500).json({ error: error.message });
} }
}); });
app.post(`/admin/${process.env.BULL_AUTH_KEY}/unpause`, async (req, res) => {
await getWebScraperQueue().resume(false);
res.json({ ok: true });
});
app.get(`/serverHealthCheck`, async (req, res) => { app.get(`/serverHealthCheck`, async (req, res) => {
try { try {
const webScraperQueue = getWebScraperQueue(); const webScraperQueue = getWebScraperQueue();