From 744ebab15a2220d5449f20445ebd8058eeb8bcbb Mon Sep 17 00:00:00 2001 From: Mauricio Siu Date: Tue, 10 Feb 2026 03:11:33 -0600 Subject: [PATCH] refactor(deployments): enhance deployment worker and queue handling for cloud environments - Refactored the deployment worker to create a no-op worker when Redis is disabled (e.g., IS_CLOUD), preventing BullMQ connection errors. - Updated queue initialization to use a no-op queue in cloud environments, ensuring compatibility and stability. - Improved error handling and logging for job processing in the deployment worker. --- .../server/queues/deployments-queue.ts | 136 ++++++++++-------- apps/dokploy/server/queues/queueSetup.ts | 41 ++++-- 2 files changed, 102 insertions(+), 75 deletions(-) diff --git a/apps/dokploy/server/queues/deployments-queue.ts b/apps/dokploy/server/queues/deployments-queue.ts index 0474b63e2..e92f4c192 100644 --- a/apps/dokploy/server/queues/deployments-queue.ts +++ b/apps/dokploy/server/queues/deployments-queue.ts @@ -2,6 +2,7 @@ import { deployApplication, deployCompose, deployPreviewApplication, + IS_CLOUD, rebuildApplication, rebuildCompose, rebuildPreviewApplication, @@ -13,70 +14,83 @@ import { type Job, Worker } from "bullmq"; import type { DeploymentJob } from "./queue-types"; import { redisConfig } from "./redis-connection"; -export const deploymentWorker = new Worker( - "deployments", - async (job: Job) => { - try { - if (job.data.applicationType === "application") { - await updateApplicationStatus(job.data.applicationId, "running"); +const createDeploymentWorker = () => + new Worker( + "deployments", + async (job: Job) => { + try { + if (job.data.applicationType === "application") { + await updateApplicationStatus(job.data.applicationId, "running"); - if (job.data.type === "redeploy") { - await rebuildApplication({ - applicationId: job.data.applicationId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, + if (job.data.type === "redeploy") { + await rebuildApplication({ + applicationId: job.data.applicationId, + titleLog: job.data.titleLog, + descriptionLog: job.data.descriptionLog, + }); + } else if (job.data.type === "deploy") { + await deployApplication({ + applicationId: job.data.applicationId, + titleLog: job.data.titleLog, + descriptionLog: job.data.descriptionLog, + }); + } + } else if (job.data.applicationType === "compose") { + await updateCompose(job.data.composeId, { + composeStatus: "running", }); - } else if (job.data.type === "deploy") { - await deployApplication({ - applicationId: job.data.applicationId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, + if (job.data.type === "deploy") { + await deployCompose({ + composeId: job.data.composeId, + titleLog: job.data.titleLog, + descriptionLog: job.data.descriptionLog, + }); + } else if (job.data.type === "redeploy") { + await rebuildCompose({ + composeId: job.data.composeId, + titleLog: job.data.titleLog, + descriptionLog: job.data.descriptionLog, + }); + } + } else if (job.data.applicationType === "application-preview") { + await updatePreviewDeployment(job.data.previewDeploymentId, { + previewStatus: "running", }); - } - } else if (job.data.applicationType === "compose") { - await updateCompose(job.data.composeId, { - composeStatus: "running", - }); - if (job.data.type === "deploy") { - await deployCompose({ - composeId: job.data.composeId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, - }); - } else if (job.data.type === "redeploy") { - await rebuildCompose({ - composeId: job.data.composeId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, - }); - } - } else if (job.data.applicationType === "application-preview") { - await updatePreviewDeployment(job.data.previewDeploymentId, { - previewStatus: "running", - }); - if (job.data.type === "redeploy") { - await rebuildPreviewApplication({ - applicationId: job.data.applicationId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, - previewDeploymentId: job.data.previewDeploymentId, - }); - } else if (job.data.type === "deploy") { - await deployPreviewApplication({ - applicationId: job.data.applicationId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, - previewDeploymentId: job.data.previewDeploymentId, - }); + if (job.data.type === "redeploy") { + await rebuildPreviewApplication({ + applicationId: job.data.applicationId, + titleLog: job.data.titleLog, + descriptionLog: job.data.descriptionLog, + previewDeploymentId: job.data.previewDeploymentId, + }); + } else if (job.data.type === "deploy") { + await deployPreviewApplication({ + applicationId: job.data.applicationId, + titleLog: job.data.titleLog, + descriptionLog: job.data.descriptionLog, + previewDeploymentId: job.data.previewDeploymentId, + }); + } } + } catch (error) { + console.log("Error", error); } - } catch (error) { - console.log("Error", error); - } - }, - { - autorun: false, - connection: redisConfig, - }, -); + }, + { + autorun: false, + connection: redisConfig, + }, + ); + +/** No-op worker when Redis is disabled (e.g. IS_CLOUD). Avoids BullMQ connection errors. */ +const noopWorker = { + run: () => Promise.resolve(), + close: () => Promise.resolve(), + cancelJob: () => Promise.resolve(), + cancelAllJobs: () => Promise.resolve(), +}; + +export const deploymentWorker = !IS_CLOUD + ? createDeploymentWorker() + : (noopWorker as unknown as Worker); diff --git a/apps/dokploy/server/queues/queueSetup.ts b/apps/dokploy/server/queues/queueSetup.ts index 3900a1a77..75d59e079 100644 --- a/apps/dokploy/server/queues/queueSetup.ts +++ b/apps/dokploy/server/queues/queueSetup.ts @@ -1,15 +1,26 @@ +import { IS_CLOUD } from "@dokploy/server"; import { execAsync, execAsyncRemote, } from "@dokploy/server/utils/process/execAsync"; +import type { Job } from "bullmq"; import { Queue } from "bullmq"; import { deploymentWorker } from "./deployments-queue"; import { redisConfig } from "./redis-connection"; -const myQueue = new Queue("deployments", { - connection: redisConfig, +/** No-op queue when Redis is disabled (e.g. IS_CLOUD). Avoids BullMQ connection errors. */ +const createNoopQueue = () => ({ + getJobs: () => Promise.resolve([] as Job[]), + add: () => + Promise.resolve({ id: "noop", remove: () => Promise.resolve() } as Job), + close: () => Promise.resolve(), + on: () => {}, }); +const myQueue = !IS_CLOUD + ? new Queue("deployments", { connection: redisConfig }) + : (createNoopQueue() as unknown as Queue); + export const getJobsByApplicationId = async (applicationId: string) => { const jobs = await myQueue.getJobs(); return jobs.filter((job) => job?.data?.applicationId === applicationId); @@ -20,19 +31,21 @@ export const getJobsByComposeId = async (composeId: string) => { return jobs.filter((job) => job?.data?.composeId === composeId); }; -process.on("SIGTERM", () => { - myQueue.close(); - process.exit(0); -}); +if (!IS_CLOUD) { + process.on("SIGTERM", () => { + myQueue.close(); + process.exit(0); + }); -myQueue.on("error", (error) => { - if ((error as any).code === "ECONNREFUSED") { - console.error( - "Make sure you have installed Redis and it is running.", - error, - ); - } -}); + myQueue.on("error", (error) => { + if ((error as any).code === "ECONNREFUSED") { + console.error( + "Make sure you have installed Redis and it is running.", + error, + ); + } + }); +} export const cleanQueuesByApplication = async (applicationId: string) => { const jobs = await myQueue.getJobs(["waiting", "delayed"]);