Files
dokploy/apps/schedules/src/queue.ts
Mauricio Siu d6124aae81 refactor: clean up code formatting and improve error handling in job scheduling
- Simplified code formatting for better readability in various components.
- Updated job scheduling functions to handle errors gracefully, ensuring that failures in scheduling do not disrupt the overall process.
- Enhanced logging for better traceability of job scheduling issues.

These changes improve code maintainability and user experience by providing clearer error messages and more organized code structure.
2026-04-11 10:04:29 -06:00

111 lines
2.8 KiB
TypeScript

import { Queue, type RepeatableJob } from "bullmq";
import { logger } from "./logger.js";
import type { QueueJob } from "./schema.js";
export const jobQueue = new Queue("backupQueue", {
connection: {
url: process.env.REDIS_URL!,
},
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
});
export const cleanQueue = async () => {
try {
await jobQueue.obliterate({ force: true });
logger.info("Queue Cleaned");
} catch (error) {
logger.error("Error cleaning queue:", error);
}
};
export const scheduleJob = async (job: QueueJob) => {
if (job.type === "backup") {
await jobQueue.add(job.backupId, job, {
repeat: {
pattern: job.cronSchedule,
},
});
} else if (job.type === "server") {
await jobQueue.add(`${job.serverId}-cleanup`, job, {
repeat: {
pattern: job.cronSchedule,
},
});
} else if (job.type === "schedule") {
await jobQueue.add(job.scheduleId, job, {
repeat: {
pattern: job.cronSchedule,
tz: job.timezone || "UTC",
},
});
} else if (job.type === "volume-backup") {
await jobQueue.add(job.volumeBackupId, job, {
repeat: {
pattern: job.cronSchedule,
},
});
}
};
export const removeJob = async (data: QueueJob) => {
if (data.type === "backup") {
const { backupId, cronSchedule } = data;
const result = await jobQueue.removeRepeatable(backupId, {
pattern: cronSchedule,
});
return result;
}
if (data.type === "server") {
const { serverId, cronSchedule } = data;
const result = await jobQueue.removeRepeatable(`${serverId}-cleanup`, {
pattern: cronSchedule,
});
return result;
}
if (data.type === "schedule") {
const { scheduleId, cronSchedule } = data;
const result = await jobQueue.removeRepeatable(scheduleId, {
pattern: cronSchedule,
});
return result;
}
if (data.type === "volume-backup") {
const { volumeBackupId, cronSchedule } = data;
const result = await jobQueue.removeRepeatable(volumeBackupId, {
pattern: cronSchedule,
});
return result;
}
return false;
};
export const getJobRepeatable = async (
data: QueueJob,
): Promise<RepeatableJob | null> => {
const repeatableJobs = await jobQueue.getRepeatableJobs();
if (data.type === "backup") {
const { backupId } = data;
const job = repeatableJobs.find((j) => j.name === backupId);
return job ? job : null;
}
if (data.type === "server") {
const { serverId } = data;
const job = repeatableJobs.find((j) => j.name === `${serverId}-cleanup`);
return job ? job : null;
}
if (data.type === "schedule") {
const { scheduleId } = data;
const job = repeatableJobs.find((j) => j.name === scheduleId);
return job ? job : null;
}
if (data.type === "volume-backup") {
const { volumeBackupId } = data;
const job = repeatableJobs.find((j) => j.name === volumeBackupId);
return job ? job : null;
}
return null;
};