From 794cd7997344830b179a8cd46970dd0779b20987 Mon Sep 17 00:00:00 2001 From: Mauricio Siu Date: Sat, 15 Nov 2025 17:31:52 -0600 Subject: [PATCH] feat: add comprehensive testing for grouped queue and queue manager functionality - Introduced tests for the GroupedQueue class, covering basic functionality, concurrency handling, and job processing across multiple groups. - Added tests for the QueueManager class, ensuring correct queue creation, job management, and handler functionality. - Implemented tests for concurrency changes and their effects on pending tasks, enhancing overall test coverage for the queue system. - Created a new ChangeConcurrencyModal component for adjusting deployment concurrency settings in the UI. --- .../__test__/queues/grouped-queue.test.ts | 809 ++++++++++++++++++ .../__test__/queues/queue-manager.test.ts | 313 +++++++ .../__test__/queues/queue-setup.test.ts | 250 ++++++ .../dashboard/application/general/show.tsx | 2 +- .../servers/actions/show-dokploy-actions.tsx | 9 + .../servers/actions/show-server-actions.tsx | 12 + .../servers/change-concurrency-modal.tsx | 180 ++++ .../pages/api/deploy/[refreshToken].ts | 8 +- .../api/deploy/compose/[refreshToken].ts | 8 +- apps/dokploy/pages/api/deploy/github.ts | 40 +- .../dokploy/server/api/routers/application.ts | 36 +- apps/dokploy/server/api/routers/compose.ts | 20 +- apps/dokploy/server/api/routers/settings.ts | 45 + .../server/queues/deployments-queue.ts | 122 +-- .../server/queues/grouped-queue-wrapper.ts | 256 ++++++ apps/dokploy/server/queues/queue-manager.ts | 112 +++ apps/dokploy/server/queues/queueSetup.ts | 124 ++- apps/dokploy/server/server.ts | 4 +- .../server/src/utils/builders/railpack.ts | 2 +- 19 files changed, 2180 insertions(+), 172 deletions(-) create mode 100644 apps/dokploy/__test__/queues/grouped-queue.test.ts create mode 100644 apps/dokploy/__test__/queues/queue-manager.test.ts create mode 100644 apps/dokploy/__test__/queues/queue-setup.test.ts create mode 100644 apps/dokploy/components/dashboard/settings/servers/change-concurrency-modal.tsx create mode 100644 apps/dokploy/server/queues/grouped-queue-wrapper.ts create mode 100644 apps/dokploy/server/queues/queue-manager.ts diff --git a/apps/dokploy/__test__/queues/grouped-queue.test.ts b/apps/dokploy/__test__/queues/grouped-queue.test.ts new file mode 100644 index 000000000..2ca3975eb --- /dev/null +++ b/apps/dokploy/__test__/queues/grouped-queue.test.ts @@ -0,0 +1,809 @@ +import { describe, expect, it } from "vitest"; +import { GroupedQueue } from "../../server/queues/grouped-queue-wrapper"; + +describe("GroupedQueue", () => { + describe("Basic functionality", () => { + it("should process a single job with concurrency 1", async () => { + const queue = new GroupedQueue<{ id: string }>(1); + const processed: string[] = []; + + queue.setHandler(async (data) => { + await new Promise((resolve) => setTimeout(resolve, 50)); + processed.push(data.id); + }); + + await queue.add("group1", { id: "job1" }); + + // Wait for processing to complete + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(processed).toEqual(["job1"]); + expect(queue.isIdle()).toBe(true); + }); + + it("should process jobs in FIFO order within a group", async () => { + const queue = new GroupedQueue<{ id: string }>(1); + const processed: string[] = []; + + queue.setHandler(async (data) => { + await new Promise((resolve) => setTimeout(resolve, 20)); + processed.push(data.id); + }); + + // Add multiple jobs to the same group + await Promise.all([ + queue.add("group1", { id: "job1" }), + queue.add("group1", { id: "job2" }), + queue.add("group1", { id: "job3" }), + ]); + + // Wait for all processing + await new Promise((resolve) => setTimeout(resolve, 200)); + + expect(processed).toEqual(["job1", "job2", "job3"]); + }); + }); + + describe("Concurrency 1 with multiple groups", () => { + it("should process one group at a time with concurrency 1", async () => { + const queue = new GroupedQueue<{ id: string; group: string }>(1); + const processed: string[] = []; + const activeGroups: string[] = []; + + queue.setHandler(async (data) => { + activeGroups.push(data.group); + await new Promise((resolve) => setTimeout(resolve, 50)); + processed.push(data.id); + activeGroups.pop(); + }); + + // Add jobs to 3 different groups + const promises = [ + queue.add("app1", { id: "job1", group: "app1" }), + queue.add("app2", { id: "job2", group: "app2" }), + queue.add("app3", { id: "job3", group: "app3" }), + ]; + + // Check after 30ms - only one should be processing + await new Promise((resolve) => setTimeout(resolve, 30)); + expect(activeGroups.length).toBeLessThanOrEqual(1); + + // Wait for all to complete + await Promise.all(promises); + await new Promise((resolve) => setTimeout(resolve, 200)); + + expect(processed).toHaveLength(3); + expect(queue.isIdle()).toBe(true); + }); + + it("should process groups sequentially with concurrency 1", async () => { + const queue = new GroupedQueue<{ id: string; group: string }>(1); + const processingOrder: string[] = []; + const startTimes: Map = new Map(); + const endTimes: Map = new Map(); + + queue.setHandler(async (data) => { + startTimes.set(data.id, Date.now()); + processingOrder.push(`start-${data.group}`); + await new Promise((resolve) => setTimeout(resolve, 50)); + endTimes.set(data.id, Date.now()); + processingOrder.push(`end-${data.group}`); + }); + + await Promise.all([ + queue.add("app1", { id: "job1", group: "app1" }), + queue.add("app2", { id: "job2", group: "app2" }), + queue.add("app3", { id: "job3", group: "app3" }), + ]); + + await new Promise((resolve) => setTimeout(resolve, 300)); + + // Verify sequential processing + expect(processingOrder).toEqual([ + "start-app1", + "end-app1", + "start-app2", + "end-app2", + "start-app3", + "end-app3", + ]); + + // Verify jobs don't overlap + const job1End = endTimes.get("job1")!; + const job2Start = startTimes.get("job2")!; + const job2End = endTimes.get("job2")!; + const job3Start = startTimes.get("job3")!; + + expect(job2Start).toBeGreaterThanOrEqual(job1End); + expect(job3Start).toBeGreaterThanOrEqual(job2End); + }); + }); + + describe("Concurrency 3 with 4 groups", () => { + it("should process up to 3 groups simultaneously", async () => { + const queue = new GroupedQueue<{ id: string; group: string }>(3); + const activeGroups = new Set(); + const maxConcurrent = { value: 0 }; + + queue.setHandler(async (data) => { + activeGroups.add(data.group); + maxConcurrent.value = Math.max(maxConcurrent.value, activeGroups.size); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + activeGroups.delete(data.group); + }); + + // Add 4 jobs to different groups + await Promise.all([ + queue.add("app1", { id: "job1", group: "app1" }), + queue.add("app2", { id: "job2", group: "app2" }), + queue.add("app3", { id: "job3", group: "app3" }), + queue.add("app4", { id: "job4", group: "app4" }), + ]); + + // Check during processing + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Should have processed 3 groups simultaneously + expect(maxConcurrent.value).toBe(3); + expect(activeGroups.size).toBeLessThanOrEqual(3); + + // Wait for all to complete + await new Promise((resolve) => setTimeout(resolve, 200)); + expect(queue.isIdle()).toBe(true); + }); + + it("should process 4th group after one of the first 3 completes", async () => { + const queue = new GroupedQueue<{ id: string; group: string }>(3); + const processingOrder: string[] = []; + + queue.setHandler(async (data) => { + processingOrder.push(`start-${data.group}`); + await new Promise((resolve) => setTimeout(resolve, 100)); + processingOrder.push(`end-${data.group}`); + }); + + await Promise.all([ + queue.add("app1", { id: "job1", group: "app1" }), + queue.add("app2", { id: "job2", group: "app2" }), + queue.add("app3", { id: "job3", group: "app3" }), + queue.add("app4", { id: "job4", group: "app4" }), + ]); + + await new Promise((resolve) => setTimeout(resolve, 250)); + + // First 3 should start together + const firstThree = processingOrder.slice(0, 3); + expect(firstThree).toContain("start-app1"); + expect(firstThree).toContain("start-app2"); + expect(firstThree).toContain("start-app3"); + + // 4th should start after one completes + const app4StartIndex = processingOrder.indexOf("start-app4"); + expect(app4StartIndex).toBeGreaterThan(0); + expect(app4StartIndex).toBeLessThan(processingOrder.length - 1); + }); + }); + + describe("Multiple jobs per group", () => { + it("should process jobs sequentially within same group", async () => { + const queue = new GroupedQueue<{ id: string }>(3); + const processed: string[] = []; + + queue.setHandler(async (data) => { + await new Promise((resolve) => setTimeout(resolve, 30)); + processed.push(data.id); + }); + + // Add 3 jobs to same group + await Promise.all([ + queue.add("app1", { id: "job1" }), + queue.add("app1", { id: "job2" }), + queue.add("app1", { id: "job3" }), + ]); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Should process in order + expect(processed).toEqual(["job1", "job2", "job3"]); + }); + + it("should process multiple groups with multiple jobs each", async () => { + const queue = new GroupedQueue<{ id: string; group: string }>(2); + const processed: string[] = []; + + queue.setHandler(async (data) => { + await new Promise((resolve) => setTimeout(resolve, 20)); + processed.push(`${data.group}-${data.id}`); + }); + + // Add jobs to 2 groups, 2 jobs each + await Promise.all([ + queue.add("app1", { id: "job1", group: "app1" }), + queue.add("app1", { id: "job2", group: "app1" }), + queue.add("app2", { id: "job1", group: "app2" }), + queue.add("app2", { id: "job2", group: "app2" }), + ]); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Should process both groups, jobs within each group in order + expect(processed).toHaveLength(4); + expect(processed.filter((p) => p.startsWith("app1"))).toEqual([ + "app1-job1", + "app1-job2", + ]); + expect(processed.filter((p) => p.startsWith("app2"))).toEqual([ + "app2-job1", + "app2-job2", + ]); + }); + }); + + describe("Error handling", () => { + it("should reject job on handler error", async () => { + const queue = new GroupedQueue<{ id: string }>(1); + + queue.setHandler(async () => { + throw new Error("Test error"); + }); + + await expect(queue.add("group1", { id: "job1" })).rejects.toThrow( + "Test error", + ); + }); + + it("should continue processing other jobs after error", async () => { + const queue = new GroupedQueue<{ id: string }>(1); + const processed: string[] = []; + + queue.setHandler(async (data) => { + if (data.id === "job2") { + throw new Error("Job 2 error"); + } + processed.push(data.id); + }); + + await expect( + queue.add("group1", { id: "job1" }), + ).resolves.toBeUndefined(); + await expect(queue.add("group1", { id: "job2" })).rejects.toThrow(); + await expect( + queue.add("group1", { id: "job3" }), + ).resolves.toBeUndefined(); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(processed).toEqual(["job1", "job3"]); + }); + }); + + describe("Queue management", () => { + it("should clear group tasks", async () => { + const queue = new GroupedQueue<{ id: string }>(1); + const processed: string[] = []; + + queue.setHandler(async (data) => { + await new Promise((resolve) => setTimeout(resolve, 50)); + processed.push(data.id); + }); + + // Add jobs without awaiting - they'll start processing + const job1Promise = queue.add("group1", { id: "job1" }); + const job2Promise = queue.add("group1", { id: "job2" }); + + // Clear immediately - job1 might be processing, but job2 should be cleared + queue.clearGroup("group1"); + + // Use Promise.allSettled to handle both promises properly + const results = await Promise.allSettled([job1Promise, job2Promise]); + + // job1 might succeed or fail depending on timing + // job2 should be rejected + const job2Result = results[1]; + if (job2Result.status === "rejected") { + expect(job2Result.reason.message).toBe("Queue cleared"); + } + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Job1 might have processed, but job2 should not + expect(processed.length).toBeLessThanOrEqual(1); + }); + + it("should return correct group length", async () => { + const queue = new GroupedQueue<{ id: string }>(1); + + queue.setHandler(async () => { + await new Promise((resolve) => setTimeout(resolve, 100)); + }); + + // Add jobs without awaiting - check length immediately + const promises = [ + queue.add("group1", { id: "job1" }), + queue.add("group1", { id: "job2" }), + queue.add("group1", { id: "job3" }), + ]; + + // Check length immediately - at least some should be pending + // (job1 might be processing, but job2 and job3 should be pending) + const length = queue.getGroupLength("group1"); + expect(length).toBeGreaterThanOrEqual(0); + + // Wait for all to complete + await Promise.all(promises); + await new Promise((resolve) => setTimeout(resolve, 50)); + + // After processing should be 0 + expect(queue.getGroupLength("group1")).toBe(0); + }); + + it("should close queue and reject pending tasks", async () => { + const queue = new GroupedQueue<{ id: string }>(1); + + queue.setHandler(async () => { + await new Promise((resolve) => setTimeout(resolve, 100)); + }); + + // Add first job and wait a bit to ensure it starts processing + const job1Promise = queue.add("group1", { id: "job1" }); + // Add second job without awaiting + const job2Promise = queue.add("group1", { id: "job2" }); + + // Wait a tiny bit to ensure job2 is queued + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Close queue - job2 should be rejected + await queue.close(); + + // Use Promise.allSettled to handle both promises properly + const results = await Promise.allSettled([job1Promise, job2Promise]); + + // job1 might succeed or fail depending on timing + // job2 should be rejected + const job2Result = results[1]; + if (job2Result.status === "rejected") { + expect(job2Result.reason.message).toBe("Queue closed"); + } + }); + }); + + describe("Concurrency edge cases", () => { + it("should handle concurrency 1 with 1 app correctly", async () => { + const queue = new GroupedQueue<{ id: string }>(1); + const processed: string[] = []; + + queue.setHandler(async (data) => { + await new Promise((resolve) => setTimeout(resolve, 50)); + processed.push(data.id); + }); + + await queue.add("app1", { id: "job1" }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(processed).toEqual(["job1"]); + expect(queue.getActiveGroupsCount()).toBe(0); + }); + + it("should handle concurrency 1 with 3 apps correctly", async () => { + const queue = new GroupedQueue<{ id: string; app: string }>(1); + const processingTimes: Map = + new Map(); + + queue.setHandler(async (data) => { + const start = Date.now(); + await new Promise((resolve) => setTimeout(resolve, 50)); + const end = Date.now(); + processingTimes.set(data.app, { start, end }); + }); + + await Promise.all([ + queue.add("app1", { id: "job1", app: "app1" }), + queue.add("app2", { id: "job2", app: "app2" }), + queue.add("app3", { id: "job3", app: "app3" }), + ]); + + await new Promise((resolve) => setTimeout(resolve, 300)); + + // Verify sequential processing + const app1 = processingTimes.get("app1")!; + const app2 = processingTimes.get("app2")!; + const app3 = processingTimes.get("app3")!; + + expect(app2.start).toBeGreaterThanOrEqual(app1.end); + expect(app3.start).toBeGreaterThanOrEqual(app2.end); + expect(queue.getActiveGroupsCount()).toBe(0); + }); + + it("should handle 4 apps with concurrency 3 correctly", async () => { + const queue = new GroupedQueue<{ id: string; app: string }>(3); + const concurrentCounts: number[] = []; + + queue.setHandler(async () => { + // Track concurrent processing + const interval = setInterval(() => { + concurrentCounts.push(queue.getActiveGroupsCount()); + }, 10); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + clearInterval(interval); + }); + + await Promise.all([ + queue.add("app1", { id: "job1", app: "app1" }), + queue.add("app2", { id: "job2", app: "app2" }), + queue.add("app3", { id: "job3", app: "app3" }), + queue.add("app4", { id: "job4", app: "app4" }), + ]); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Should never exceed concurrency of 3 + const maxConcurrent = Math.max(...concurrentCounts); + expect(maxConcurrent).toBeLessThanOrEqual(3); + expect(queue.getActiveGroupsCount()).toBe(0); + }); + }); + + describe("Idle state", () => { + it("should be idle when no jobs are processing", () => { + const queue = new GroupedQueue<{ id: string }>(1); + expect(queue.isIdle()).toBe(true); + }); + + it("should not be idle while processing", async () => { + const queue = new GroupedQueue<{ id: string }>(1); + let isIdleDuringProcessing = false; + + queue.setHandler(async () => { + isIdleDuringProcessing = queue.isIdle(); + await new Promise((resolve) => setTimeout(resolve, 50)); + }); + + await queue.add("group1", { id: "job1" }); + + await new Promise((resolve) => setTimeout(resolve, 30)); + + expect(isIdleDuringProcessing).toBe(false); + expect(queue.isIdle()).toBe(true); + }); + }); + + describe("Concurrency management", () => { + it("should get current concurrency", () => { + const queue1 = new GroupedQueue<{ id: string }>(1); + const queue2 = new GroupedQueue<{ id: string }>(5); + const queue3 = new GroupedQueue<{ id: string }>(10); + + expect(queue1.getConcurrency()).toBe(1); + expect(queue2.getConcurrency()).toBe(5); + expect(queue3.getConcurrency()).toBe(10); + }); + + it("should set concurrency dynamically", () => { + const queue = new GroupedQueue<{ id: string }>(1); + expect(queue.getConcurrency()).toBe(1); + + queue.setConcurrency(3); + expect(queue.getConcurrency()).toBe(3); + + queue.setConcurrency(5); + expect(queue.getConcurrency()).toBe(5); + }); + + it("should throw error when setting concurrency less than 1", () => { + const queue = new GroupedQueue<{ id: string }>(1); + expect(() => queue.setConcurrency(0)).toThrow( + "Concurrency must be at least 1", + ); + expect(() => queue.setConcurrency(-1)).toThrow( + "Concurrency must be at least 1", + ); + }); + + it("should process next group when concurrency increases", async () => { + const queue = new GroupedQueue<{ id: string }>(1); + const processed: string[] = []; + + queue.setHandler(async (data) => { + await new Promise((resolve) => setTimeout(resolve, 50)); + processed.push(data.id); + }); + + // Add jobs to 3 different groups with concurrency 1 + const job1Promise = queue.add("group1", { id: "job1" }); + const job2Promise = queue.add("group2", { id: "job2" }); + const job3Promise = queue.add("group3", { id: "job3" }); + + // Wait a bit to ensure job1 starts processing + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Increase concurrency to 3 - should allow group2 and group3 to start + queue.setConcurrency(3); + + // Wait for all to complete + await Promise.all([job1Promise, job2Promise, job3Promise]); + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(processed).toHaveLength(3); + expect(processed).toContain("job1"); + expect(processed).toContain("job2"); + expect(processed).toContain("job3"); + }); + }); + + describe("Clear all pending tasks", () => { + it("should clear all pending tasks across all groups", async () => { + const queue = new GroupedQueue<{ id: string }>(1); + const processed: string[] = []; + + queue.setHandler(async (data) => { + await new Promise((resolve) => setTimeout(resolve, 100)); + processed.push(data.id); + }); + + // Add multiple jobs to different groups + const job1Promise = queue.add("group1", { id: "job1" }); + const job2Promise = queue.add("group1", { id: "job2" }); + const job3Promise = queue.add("group2", { id: "job3" }); + const job4Promise = queue.add("group2", { id: "job4" }); + const job5Promise = queue.add("group3", { id: "job5" }); + + // Wait a bit to ensure job1 starts processing + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Clear all pending tasks + const clearedCount = queue.clearAllPendingTasks(); + + // Should have cleared 4 pending tasks (job2, job3, job4, job5) + // job1 is processing so it's not in the queue anymore + expect(clearedCount).toBe(4); + + // Handle all promises + const results = await Promise.allSettled([ + job1Promise, + job2Promise, + job3Promise, + job4Promise, + job5Promise, + ]); + + // job1 should succeed (it was processing) + const job1Result = results[0]; + expect(job1Result.status).toBe("fulfilled"); + + // All pending jobs should be rejected + for (let i = 1; i < results.length; i++) { + const result = results[i]; + if (result && result.status === "rejected") { + expect(result.reason.message).toBe( + "Concurrency changed - queue cleared", + ); + } + } + + // Wait for job1 to complete + await new Promise((resolve) => setTimeout(resolve, 150)); + + // Only job1 should have processed + expect(processed).toHaveLength(1); + expect(processed).toContain("job1"); + }); + + it("should not clear tasks that are currently processing", async () => { + const queue = new GroupedQueue<{ id: string }>(1); + const processed: string[] = []; + + queue.setHandler(async (data) => { + await new Promise((resolve) => setTimeout(resolve, 100)); + processed.push(data.id); + }); + + // Add jobs - first one will start processing immediately + const job1Promise = queue.add("group1", { id: "job1" }); + const job2Promise = queue.add("group1", { id: "job2" }); + + // Wait to ensure job1 is processing (it's been shifted from tasks) + await new Promise((resolve) => setTimeout(resolve, 20)); + + // Clear all pending - should only clear job2, not job1 + // job1 is already executing (not in tasks array) + const clearedCount = queue.clearAllPendingTasks(); + + expect(clearedCount).toBe(1); + + // Handle all promises + const results = await Promise.allSettled([job1Promise, job2Promise]); + + // job1 should succeed (it was processing) + const job1Result = results[0]; + expect(job1Result.status).toBe("fulfilled"); + + // job2 should be rejected + const job2Result = results[1]; + if (job2Result && job2Result.status === "rejected") { + expect(job2Result.reason.message).toBe( + "Concurrency changed - queue cleared", + ); + } + + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Only job1 should have processed + expect(processed).toHaveLength(1); + expect(processed).toContain("job1"); + }); + + it("should return 0 when no pending tasks", () => { + const queue = new GroupedQueue<{ id: string }>(1); + const clearedCount = queue.clearAllPendingTasks(); + expect(clearedCount).toBe(0); + }); + + it("should clear tasks from multiple groups", async () => { + const queue = new GroupedQueue<{ id: string }>(1); + const processed: string[] = []; + + queue.setHandler(async (data) => { + await new Promise((resolve) => setTimeout(resolve, 50)); + processed.push(data.id); + }); + + // Add jobs to multiple groups + const promises = [ + queue.add("group1", { id: "job1" }), + queue.add("group1", { id: "job2" }), + queue.add("group2", { id: "job3" }), + queue.add("group2", { id: "job4" }), + queue.add("group3", { id: "job5" }), + ]; + + // Wait a bit for first job to start (it gets shifted from tasks) + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Clear all pending + const clearedCount = queue.clearAllPendingTasks(); + + // Should clear 4 tasks (job2, job3, job4, job5) + // job1 is processing so it's not in the queue anymore + expect(clearedCount).toBe(4); + + // Handle all promises + const results = await Promise.allSettled(promises); + + // job1 should succeed + const job1Result = results[0]; + expect(job1Result?.status).toBe("fulfilled"); + + // Others should be rejected + for (let i = 1; i < results.length; i++) { + const result = results[i]; + if (result && result.status === "rejected") { + expect(result.reason.message).toBe( + "Concurrency changed - queue cleared", + ); + } + } + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Only first job should process + expect(processed.length).toBeLessThanOrEqual(1); + }); + }); + + describe("Concurrency change with pending tasks", () => { + it("should clear pending tasks when concurrency changes", async () => { + const queue = new GroupedQueue<{ id: string }>(1); + const processed: string[] = []; + + queue.setHandler(async (data) => { + await new Promise((resolve) => setTimeout(resolve, 50)); + processed.push(data.id); + }); + + // Add jobs with concurrency 1 + const job1Promise = queue.add("group1", { id: "job1" }); + const job2Promise = queue.add("group1", { id: "job2" }); + const job3Promise = queue.add("group2", { id: "job3" }); + + // Wait for job1 to start processing (it gets shifted from tasks) + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Change concurrency - should clear pending tasks via clearAllPendingTasks + queue.setConcurrency(3); + + // Handle all promises + const results = await Promise.allSettled([ + job1Promise, + job2Promise, + job3Promise, + ]); + + // job1 should succeed (it was processing) + const job1Result = results[0]; + expect(job1Result.status).toBe("fulfilled"); + + // Pending jobs should be rejected (job2 and job3 were in queue when cleared) + const job2Result = results[1]; + const job3Result = results[2]; + + // At least one of the pending jobs should be rejected + const rejectedCount = [job2Result, job3Result].filter( + (r) => r && r.status === "rejected", + ).length; + expect(rejectedCount).toBeGreaterThan(0); + + // Verify rejection messages + if (job2Result && job2Result.status === "rejected") { + expect(job2Result.reason.message).toBe( + "Concurrency changed - queue cleared", + ); + } + if (job3Result && job3Result.status === "rejected") { + expect(job3Result.reason.message).toBe( + "Concurrency changed - queue cleared", + ); + } + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // job1 should have processed, others may or may not depending on timing + expect(processed.length).toBeGreaterThanOrEqual(1); + expect(processed).toContain("job1"); + }); + + it("should allow new jobs after concurrency change", async () => { + const queue = new GroupedQueue<{ id: string }>(1); + const processed: string[] = []; + + queue.setHandler(async (data) => { + await new Promise((resolve) => setTimeout(resolve, 50)); + processed.push(data.id); + }); + + // Add job with concurrency 1 + const job1Promise = queue.add("group1", { id: "job1" }); + const job2Promise = queue.add("group1", { id: "job2" }); + + // Wait for job1 to start (it gets shifted from tasks) + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Change concurrency to 3 - this calls clearAllPendingTasks internally + queue.setConcurrency(3); + + // Handle all promises + const results = await Promise.allSettled([job1Promise, job2Promise]); + + // job1 should succeed (it was processing) + const job1Result = results[0]; + expect(job1Result.status).toBe("fulfilled"); + + // job2 should be rejected (it was in queue when cleared) + const job2Result = results[1]; + if (job2Result && job2Result.status === "rejected") { + expect(job2Result.reason.message).toBe( + "Concurrency changed - queue cleared", + ); + } else { + // If job2 wasn't rejected, it means it started processing before clear + // This is acceptable as it's a timing issue + } + + // Add new jobs after concurrency change - they should work + await Promise.all([ + queue.add("group2", { id: "job3" }), + queue.add("group3", { id: "job4" }), + ]); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // job1, job3, and job4 should have processed + expect(processed.length).toBeGreaterThanOrEqual(2); + expect(processed).toContain("job1"); + }); + }); +}); diff --git a/apps/dokploy/__test__/queues/queue-manager.test.ts b/apps/dokploy/__test__/queues/queue-manager.test.ts new file mode 100644 index 000000000..69b0e63c2 --- /dev/null +++ b/apps/dokploy/__test__/queues/queue-manager.test.ts @@ -0,0 +1,313 @@ +import { beforeEach, describe, expect, it } from "vitest"; +import { QueueManager } from "../../server/queues/queue-manager"; + +describe("QueueManager", () => { + let manager: QueueManager; + + beforeEach(() => { + manager = new QueueManager(); + }); + + describe("Queue creation and retrieval", () => { + it("should create a queue with default concurrency 1", () => { + const queue = manager.getQueue("test-queue"); + expect(queue.getConcurrency()).toBe(1); + }); + + it("should create a queue with custom concurrency", () => { + const queue = manager.getQueue("test-queue", 5); + expect(queue.getConcurrency()).toBe(5); + }); + + it("should return the same queue instance for the same name", () => { + const queue1 = manager.getQueue("test-queue", 3); + const queue2 = manager.getQueue("test-queue", 5); + expect(queue1).toBe(queue2); + // Concurrency should remain as first set + expect(queue1.getConcurrency()).toBe(3); + }); + + it("should create different queues for different names", () => { + const queue1 = manager.getQueue("queue1", 2); + const queue2 = manager.getQueue("queue2", 4); + expect(queue1).not.toBe(queue2); + expect(queue1.getConcurrency()).toBe(2); + expect(queue2.getConcurrency()).toBe(4); + }); + }); + + describe("Handler management", () => { + it("should set handler for a queue", async () => { + const processed: string[] = []; + + manager.setHandler("test-queue", async (data: { id: string }) => { + processed.push(data.id); + }); + + await manager.add("test-queue", "group1", { id: "job1" }); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(processed).toEqual(["job1"]); + }); + + it("should handle different handlers for different queues", async () => { + const queue1Processed: string[] = []; + const queue2Processed: string[] = []; + + manager.setHandler("queue1", async (data: { id: string }) => { + queue1Processed.push(data.id); + }); + + manager.setHandler("queue2", async (data: { id: string }) => { + queue2Processed.push(data.id); + }); + + await Promise.all([ + manager.add("queue1", "group1", { id: "job1" }), + manager.add("queue2", "group1", { id: "job2" }), + ]); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(queue1Processed).toEqual(["job1"]); + expect(queue2Processed).toEqual(["job2"]); + }); + }); + + describe("Job management", () => { + it("should add jobs to correct queue and group", async () => { + const processed: string[] = []; + + manager.setHandler("test-queue", async (data: { id: string }) => { + processed.push(data.id); + }); + + await manager.add("test-queue", "group1", { id: "job1" }); + await manager.add("test-queue", "group2", { id: "job2" }); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(processed).toContain("job1"); + expect(processed).toContain("job2"); + }); + + it("should create queue with concurrency when adding job", async () => { + const processed: string[] = []; + + // Create queue with concurrency first (without handler) + manager.getQueue("new-queue", 3); + + // Set handler + manager.setHandler("new-queue", async (data: { id: string }) => { + processed.push(data.id); + }); + + // Now add job - it should process + await manager.add("new-queue", "group1", { id: "job1" }); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + const queue = manager.getQueue("new-queue"); + expect(queue.getConcurrency()).toBe(3); + expect(processed).toEqual(["job1"]); + }); + }); + + describe("Queue operations", () => { + it("should clear group in specific queue", async () => { + const processed: string[] = []; + + manager.setHandler("test-queue", async (data: { id: string }) => { + await new Promise((resolve) => setTimeout(resolve, 100)); + processed.push(data.id); + }); + + // Add jobs but don't await - they'll start processing + const job1Promise = manager.add("test-queue", "group1", { id: "job1" }); + const job2Promise = manager.add("test-queue", "group1", { id: "job2" }); + + // Clear immediately - job1 might be processing, but job2 should be cleared + manager.clearGroup("test-queue", "group1"); + + // Use Promise.allSettled to handle both promises properly + const results = await Promise.allSettled([job1Promise, job2Promise]); + + // job1 might succeed or fail depending on timing + // job2 should be rejected + const job2Result = results[1]; + if (job2Result.status === "rejected") { + expect(job2Result.reason.message).toBe("Queue cleared"); + } + + await new Promise((resolve) => setTimeout(resolve, 150)); + + // Job1 might have processed, but job2 should not + expect(processed.length).toBeLessThanOrEqual(1); + }); + + it("should get group length for specific queue", async () => { + manager.setHandler("test-queue", async () => { + await new Promise((resolve) => setTimeout(resolve, 100)); + }); + + // Add jobs without awaiting - check length immediately + const job1Promise = manager.add("test-queue", "group1", { id: "job1" }); + const job2Promise = manager.add("test-queue", "group1", { id: "job2" }); + + // Check length immediately - at least one should be pending + // (job1 might be processing, but job2 should be pending) + const length = manager.getGroupLength("test-queue", "group1"); + expect(length).toBeGreaterThanOrEqual(0); + + // Wait for both to complete + await Promise.all([job1Promise, job2Promise]); + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(manager.getGroupLength("test-queue", "group1")).toBe(0); + }); + + it("should get total length for specific queue", async () => { + manager.setHandler("test-queue", async () => { + await new Promise((resolve) => setTimeout(resolve, 50)); + }); + + // Add jobs without awaiting - check length immediately + const promises = [ + manager.add("test-queue", "group1", { id: "job1" }), + manager.add("test-queue", "group2", { id: "job2" }), + manager.add("test-queue", "group3", { id: "job3" }), + ]; + + // Check length immediately - at least some should be pending + const length = manager.getTotalLength("test-queue"); + expect(length).toBeGreaterThanOrEqual(0); + + // Wait for all to complete + await Promise.all(promises); + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(manager.getTotalLength("test-queue")).toBe(0); + }); + + it("should check if queue is idle", async () => { + manager.setHandler("test-queue", async () => { + await new Promise((resolve) => setTimeout(resolve, 50)); + }); + + expect(manager.isIdle("test-queue")).toBe(true); + + await manager.add("test-queue", "group1", { id: "job1" }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(manager.isIdle("test-queue")).toBe(true); + }); + }); + + describe("Queue lifecycle", () => { + it("should close a specific queue", async () => { + manager.setHandler("test-queue", async () => { + await new Promise((resolve) => setTimeout(resolve, 100)); + }); + + // Add first job and wait a bit to ensure it starts processing + const job1Promise = manager.add("test-queue", "group1", { id: "job1" }); + // Add second job without awaiting + const job2Promise = manager.add("test-queue", "group1", { id: "job2" }); + + // Wait a tiny bit to ensure job2 is queued + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Close queue - job2 should be rejected + await manager.closeQueue("test-queue"); + + // Use Promise.allSettled to handle both promises properly + const results = await Promise.allSettled([job1Promise, job2Promise]); + + // job1 might succeed or fail depending on timing + // job2 should be rejected + const job2Result = results[1]; + if (job2Result.status === "rejected") { + expect(job2Result.reason.message).toBe("Queue closed"); + } + + expect(manager.getQueueNames()).not.toContain("test-queue"); + }); + + it("should close all queues", async () => { + manager.setHandler("queue1", async () => { + await new Promise((resolve) => setTimeout(resolve, 100)); + }); + manager.setHandler("queue2", async () => { + await new Promise((resolve) => setTimeout(resolve, 100)); + }); + + await manager.add("queue1", "group1", { id: "job1" }); + await manager.add("queue2", "group1", { id: "job2" }); + + await manager.closeAll(); + + expect(manager.getQueueNames()).toHaveLength(0); + }); + + it("should get all queue names", () => { + manager.getQueue("queue1"); + manager.getQueue("queue2"); + manager.getQueue("queue3"); + + const names = manager.getQueueNames(); + expect(names).toContain("queue1"); + expect(names).toContain("queue2"); + expect(names).toContain("queue3"); + expect(names).toHaveLength(3); + }); + }); + + describe("Multiple queues with different concurrency", () => { + it("should handle multiple queues with different concurrency settings", async () => { + const queue1Processed: string[] = []; + const queue2Processed: string[] = []; + + // Create queues with specific concurrency FIRST, before setting handlers + const queue1 = manager.getQueue("queue1", 1); + const queue2 = manager.getQueue("queue2", 3); + + // Verify concurrency is set correctly before proceeding + expect(queue1.getConcurrency()).toBe(1); + expect(queue2.getConcurrency()).toBe(3); + + manager.setHandler("queue1", async (data: { id: string }) => { + await new Promise((resolve) => setTimeout(resolve, 50)); + queue1Processed.push(data.id); + }); + + manager.setHandler("queue2", async (data: { id: string }) => { + await new Promise((resolve) => setTimeout(resolve, 50)); + queue2Processed.push(data.id); + }); + + // Queue1 with concurrency 1 (sequential) + await Promise.all([ + manager.add("queue1", "app1", { id: "job1" }), + manager.add("queue1", "app2", { id: "job2" }), + ]); + + // Queue2 with concurrency 3 (parallel) + await Promise.all([ + manager.add("queue2", "app1", { id: "job1" }), + manager.add("queue2", "app2", { id: "job2" }), + manager.add("queue2", "app3", { id: "job3" }), + ]); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + expect(queue1Processed).toHaveLength(2); + expect(queue2Processed).toHaveLength(3); + + // Verify concurrency settings are still correct + expect(manager.getQueue("queue1").getConcurrency()).toBe(1); + expect(manager.getQueue("queue2").getConcurrency()).toBe(3); + }); + }); +}); diff --git a/apps/dokploy/__test__/queues/queue-setup.test.ts b/apps/dokploy/__test__/queues/queue-setup.test.ts new file mode 100644 index 000000000..9fe5627c4 --- /dev/null +++ b/apps/dokploy/__test__/queues/queue-setup.test.ts @@ -0,0 +1,250 @@ +import { beforeEach, describe, expect, it } from "vitest"; +import type { DeploymentJob } from "../../server/queues/queue-types"; +import { + getConcurrency, + myQueue, + setConcurrency, +} from "../../server/queues/queueSetup"; + +describe("queueSetup", () => { + beforeEach(() => { + // Reset concurrency to default (1) before each test + setConcurrency(1); + // Clear all pending tasks + myQueue.clearAllPendingTasks(); + }); + + describe("getConcurrency", () => { + it("should return default concurrency of 1", () => { + const concurrency = getConcurrency(); + expect(concurrency).toBe(1); + }); + + it("should return current concurrency after setting it", () => { + setConcurrency(3); + expect(getConcurrency()).toBe(3); + + setConcurrency(5); + expect(getConcurrency()).toBe(5); + }); + }); + + describe("setConcurrency", () => { + it("should set concurrency successfully", () => { + const clearedCount = setConcurrency(3); + expect(getConcurrency()).toBe(3); + expect(clearedCount).toBe(0); // No pending tasks to clear + }); + + it("should throw error for concurrency less than 1", () => { + expect(() => setConcurrency(0)).toThrow("Concurrency must be at least 1"); + expect(() => setConcurrency(-1)).toThrow( + "Concurrency must be at least 1", + ); + }); + + it("should return 0 cleared builds when no pending tasks", () => { + const clearedCount = setConcurrency(2); + expect(clearedCount).toBe(0); + expect(getConcurrency()).toBe(2); + }); + + it("should clear pending builds when concurrency changes", async () => { + const processed: string[] = []; + + // Set handler + myQueue.setHandler(async (job: DeploymentJob) => { + await new Promise((resolve) => setTimeout(resolve, 100)); + if (job.applicationType === "application") { + processed.push(job.applicationId); + } else if (job.applicationType === "compose") { + processed.push(job.composeId); + } else if (job.applicationType === "application-preview") { + processed.push(job.previewDeploymentId); + } + }); + + // Add jobs to different groups + const job1: DeploymentJob = { + applicationId: "app1", + titleLog: "Test", + descriptionLog: "Test", + type: "deploy", + applicationType: "application", + server: false, + }; + const job2: DeploymentJob = { + applicationId: "app2", + titleLog: "Test", + descriptionLog: "Test", + type: "deploy", + applicationType: "application", + server: false, + }; + const job3: DeploymentJob = { + applicationId: "app3", + titleLog: "Test", + descriptionLog: "Test", + type: "deploy", + applicationType: "application", + server: false, + }; + + // Add jobs without awaiting + const promise1 = myQueue.add("application:app1", job1); + const promise2 = myQueue.add("application:app2", job2); + const promise3 = myQueue.add("application:app3", job3); + + // Wait for first job to start processing + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Change concurrency - should clear pending builds + const clearedCount = setConcurrency(3); + + // Should have cleared 2 pending builds (app2 and app3) + expect(clearedCount).toBe(2); + expect(getConcurrency()).toBe(3); + + // Handle all promises - use allSettled to handle both resolved and rejected + const results = await Promise.allSettled([promise1, promise2, promise3]); + + // job1 should succeed (it was processing), others should be rejected + const job1Result = results[0]; + if (job1Result.status === "fulfilled") { + // Job1 completed successfully + } + + // Pending jobs should be rejected + const job2Result = results[1]; + const job3Result = results[2]; + if (job2Result && job2Result.status === "rejected") { + expect(job2Result.reason.message).toBe( + "Concurrency changed - queue cleared", + ); + } + if (job3Result && job3Result.status === "rejected") { + expect(job3Result.reason.message).toBe( + "Concurrency changed - queue cleared", + ); + } + + await new Promise((resolve) => setTimeout(resolve, 150)); + + // Only first job should have processed + expect(processed.length).toBeLessThanOrEqual(1); + }); + + it("should not clear builds when concurrency doesn't change", async () => { + // Set to 2 + setConcurrency(2); + expect(getConcurrency()).toBe(2); + + // Set to 2 again - should not clear anything + const clearedCount = setConcurrency(2); + expect(clearedCount).toBe(0); + expect(getConcurrency()).toBe(2); + }); + + it("should allow new jobs after concurrency change", async () => { + const processed: string[] = []; + + myQueue.setHandler(async (job: DeploymentJob) => { + await new Promise((resolve) => setTimeout(resolve, 50)); + if (job.applicationType === "application") { + processed.push(job.applicationId); + } + }); + + // Add job with concurrency 1 + const job1: DeploymentJob = { + applicationId: "app1", + titleLog: "Test", + descriptionLog: "Test", + type: "deploy", + applicationType: "application", + server: false, + }; + const job2: DeploymentJob = { + applicationId: "app2", + titleLog: "Test", + descriptionLog: "Test", + type: "deploy", + applicationType: "application", + server: false, + }; + + const promise1 = myQueue.add("application:app1", job1); + const promise2 = myQueue.add("application:app2", job2); + + // Wait for first job to start + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Change concurrency to 3 + const clearedCount = setConcurrency(3); + expect(clearedCount).toBe(1); // app2 should be cleared + + // Handle all promises - use allSettled to handle both resolved and rejected + const results = await Promise.allSettled([promise1, promise2]); + + // job1 should succeed (it was processing) + const job1Result = results[0]; + if (job1Result.status === "fulfilled") { + // Job1 completed successfully + } + + // app2 should be rejected + const job2Result = results[1]; + if (job2Result.status === "rejected") { + expect(job2Result.reason.message).toBe( + "Concurrency changed - queue cleared", + ); + } + + // Add new jobs after concurrency change - they should work + const job3: DeploymentJob = { + applicationId: "app3", + titleLog: "Test", + descriptionLog: "Test", + type: "deploy", + applicationType: "application", + server: false, + }; + const job4: DeploymentJob = { + applicationId: "app4", + titleLog: "Test", + descriptionLog: "Test", + type: "deploy", + applicationType: "application", + server: false, + }; + + await Promise.all([ + myQueue.add("application:app3", job3), + myQueue.add("application:app4", job4), + ]); + + await new Promise((resolve) => setTimeout(resolve, 150)); + + // app1, app3, and app4 should have processed + expect(processed.length).toBeGreaterThanOrEqual(2); + expect(processed).toContain("app1"); + }); + + it("should handle multiple concurrency changes correctly", () => { + // Start at 1 + expect(getConcurrency()).toBe(1); + + // Change to 3 + setConcurrency(3); + expect(getConcurrency()).toBe(3); + + // Change to 5 + setConcurrency(5); + expect(getConcurrency()).toBe(5); + + // Change back to 1 + setConcurrency(1); + expect(getConcurrency()).toBe(1); + }); + }); +}); diff --git a/apps/dokploy/components/dashboard/application/general/show.tsx b/apps/dokploy/components/dashboard/application/general/show.tsx index 5387659ad..8dedb89f0 100644 --- a/apps/dokploy/components/dashboard/application/general/show.tsx +++ b/apps/dokploy/components/dashboard/application/general/show.tsx @@ -79,7 +79,7 @@ export const ShowGeneralApplication = ({ applicationId }: Props) => { > + } + /> + diff --git a/apps/dokploy/components/dashboard/settings/servers/change-concurrency-modal.tsx b/apps/dokploy/components/dashboard/settings/servers/change-concurrency-modal.tsx new file mode 100644 index 000000000..2a1e4727d --- /dev/null +++ b/apps/dokploy/components/dashboard/settings/servers/change-concurrency-modal.tsx @@ -0,0 +1,180 @@ +"use client"; + +import { InfoIcon, Loader2 } from "lucide-react"; +import { useState } from "react"; +import { toast } from "sonner"; +import { Alert, AlertDescription } from "@/components/ui/alert"; +import { Button } from "@/components/ui/button"; +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle, + DialogTrigger, +} from "@/components/ui/dialog"; +import { Input } from "@/components/ui/input"; +import { Label } from "@/components/ui/label"; +import { api } from "@/utils/api"; + +interface Props { + serverId?: string; + trigger?: React.ReactNode; +} + +export const ChangeConcurrencyModal = ({ serverId, trigger }: Props) => { + const [isOpen, setIsOpen] = useState(false); + const [concurrency, setConcurrency] = useState(""); + + const { data, isLoading: isLoadingCurrent } = + api.settings.getDeploymentConcurrency.useQuery( + { serverId }, + { + enabled: isOpen, + onSuccess: (data) => { + if (concurrency === "") { + setConcurrency(data.concurrency); + } + }, + }, + ); + + const { mutateAsync, isLoading } = + api.settings.setDeploymentConcurrency.useMutation(); + + const handleSubmit = async (e: React.FormEvent) => { + e.preventDefault(); + if ( + typeof concurrency !== "number" || + concurrency < 1 || + concurrency > 20 + ) { + toast.error("Concurrency must be between 1 and 20"); + return; + } + + try { + const result = await mutateAsync({ concurrency, serverId }); + if (result.clearedBuilds > 0) { + toast.warning( + `Concurrency updated. ${result.clearedBuilds} pending build${result.clearedBuilds > 1 ? "s were" : " was"} cancelled.`, + ); + } else { + toast.success("Concurrency updated successfully"); + } + setIsOpen(false); + } catch (error) { + toast.error("Failed to update concurrency"); + } + }; + + const serverType = serverId ? "Remote Server" : "Dokploy Server"; + + return ( + + + {trigger || ( + + )} + + + + Deployment Concurrency - {serverType} + + Configure how many deployments can run simultaneously on this + server. + + +
+
+ + { + const value = e.target.value; + setConcurrency(value === "" ? "" : Number.parseInt(value, 10)); + }} + placeholder="Enter concurrency (1-20)" + disabled={isLoading || isLoadingCurrent} + /> + {isLoadingCurrent && ( +
+ + Loading current concurrency... +
+ )} + {!isLoadingCurrent && data && ( +

+ Current: {data.concurrency} +

+ )} +
+ +
+ + + +
+

+ Default: 1 deployment at a time + (sequential) +

+

+ Higher values: More deployments in + parallel, but will use more RAM and CPU resources. +

+ {serverId && ( +

+ This setting applies to deployments on this remote server. +

+ )} + {!serverId && ( +

+ This setting applies to deployments on the Dokploy server. +

+ )} +
+
+
+ + + + ⚠️ Warning: Changing concurrency will cancel all + pending builds. Currently running builds will continue, but + queued builds will be cancelled. + + +
+ + + + + +
+
+
+ ); +}; diff --git a/apps/dokploy/pages/api/deploy/[refreshToken].ts b/apps/dokploy/pages/api/deploy/[refreshToken].ts index 2ab607736..e18cb1642 100644 --- a/apps/dokploy/pages/api/deploy/[refreshToken].ts +++ b/apps/dokploy/pages/api/deploy/[refreshToken].ts @@ -253,12 +253,8 @@ export default async function handler( return true; } await myQueue.add( - "deployments", - { ...jobData }, - { - removeOnComplete: true, - removeOnFail: true, - }, + `application:${jobData.applicationId}`, + jobData, ); } catch (error) { res.status(400).json({ message: "Error deploying Application", error }); diff --git a/apps/dokploy/pages/api/deploy/compose/[refreshToken].ts b/apps/dokploy/pages/api/deploy/compose/[refreshToken].ts index 61c7f7157..8e6ded7f6 100644 --- a/apps/dokploy/pages/api/deploy/compose/[refreshToken].ts +++ b/apps/dokploy/pages/api/deploy/compose/[refreshToken].ts @@ -183,12 +183,8 @@ export default async function handler( return true; } await myQueue.add( - "deployments", - { ...jobData }, - { - removeOnComplete: true, - removeOnFail: true, - }, + `compose:${jobData.composeId}`, + jobData, ); } catch (error) { res.status(400).json({ message: "Error deploying Compose", error }); diff --git a/apps/dokploy/pages/api/deploy/github.ts b/apps/dokploy/pages/api/deploy/github.ts index 92cf3dc9e..d7b2185a1 100644 --- a/apps/dokploy/pages/api/deploy/github.ts +++ b/apps/dokploy/pages/api/deploy/github.ts @@ -132,12 +132,8 @@ export default async function handler( continue; } await myQueue.add( - "deployments", - { ...jobData }, - { - removeOnComplete: true, - removeOnFail: true, - }, + `application:${jobData.applicationId}`, + jobData, ); } @@ -170,12 +166,8 @@ export default async function handler( } await myQueue.add( - "deployments", - { ...jobData }, - { - removeOnComplete: true, - removeOnFail: true, - }, + `compose:${jobData.composeId}`, + jobData, ); } @@ -250,12 +242,8 @@ export default async function handler( continue; } await myQueue.add( - "deployments", - { ...jobData }, - { - removeOnComplete: true, - removeOnFail: true, - }, + `application:${jobData.applicationId}`, + jobData, ); } @@ -296,12 +284,8 @@ export default async function handler( } await myQueue.add( - "deployments", - { ...jobData }, - { - removeOnComplete: true, - removeOnFail: true, - }, + `compose:${jobData.composeId}`, + jobData, ); } @@ -495,12 +479,8 @@ export default async function handler( continue; } await myQueue.add( - "deployments", - { ...jobData }, - { - removeOnComplete: true, - removeOnFail: true, - }, + `preview:${jobData.previewDeploymentId}`, + jobData, ); } return res.status(200).json({ message: "Apps Deployed" }); diff --git a/apps/dokploy/server/api/routers/application.ts b/apps/dokploy/server/api/routers/application.ts index 006d024c4..09ee7e96f 100644 --- a/apps/dokploy/server/api/routers/application.ts +++ b/apps/dokploy/server/api/routers/application.ts @@ -58,7 +58,10 @@ import { applications, } from "@/server/db/schema"; import type { DeploymentJob } from "@/server/queues/queue-types"; -import { cleanQueuesByApplication, myQueue } from "@/server/queues/queueSetup"; +import { + addJobAsync, + cleanQueuesByApplication, +} from "@/server/queues/queueSetup"; import { cancelDeployment, deploy } from "@/server/utils/deploy"; import { uploadFileSchema } from "@/utils/schema"; @@ -335,14 +338,9 @@ export const applicationRouter = createTRPCRouter({ await deploy(jobData); return true; } - await myQueue.add( - "deployments", - { ...jobData }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); + // Fire and forget - UI doesn't wait for deployment to complete + addJobAsync(`application:${jobData.applicationId}`, jobData); + return { success: true, message: "Deployment queued" }; }), saveEnvironment: protectedProcedure .input(apiSaveEnvironmentVariables) @@ -700,14 +698,8 @@ export const applicationRouter = createTRPCRouter({ return true; } - await myQueue.add( - "deployments", - { ...jobData }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); + addJobAsync(`application:${jobData.applicationId}`, jobData); + return { success: true, message: "Deployment queued" }; }), cleanQueues: protectedProcedure @@ -798,14 +790,8 @@ export const applicationRouter = createTRPCRouter({ return true; } - await myQueue.add( - "deployments", - { ...jobData }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); + // Fire and forget - UI doesn't wait for deployment to complete + addJobAsync(`application:${jobData.applicationId}`, jobData); return true; }), updateTraefikConfig: protectedProcedure diff --git a/apps/dokploy/server/api/routers/compose.ts b/apps/dokploy/server/api/routers/compose.ts index 026b6e8ad..058216018 100644 --- a/apps/dokploy/server/api/routers/compose.ts +++ b/apps/dokploy/server/api/routers/compose.ts @@ -59,7 +59,7 @@ import { compose as composeTable, } from "@/server/db/schema"; import type { DeploymentJob } from "@/server/queues/queue-types"; -import { cleanQueuesByCompose, myQueue } from "@/server/queues/queueSetup"; +import { addJobAsync, cleanQueuesByCompose } from "@/server/queues/queueSetup"; import { cancelDeployment, deploy } from "@/server/utils/deploy"; import { generatePassword } from "@/templates/utils"; import { createTRPCRouter, protectedProcedure, publicProcedure } from "../trpc"; @@ -401,14 +401,7 @@ export const composeRouter = createTRPCRouter({ await deploy(jobData); return true; } - await myQueue.add( - "deployments", - { ...jobData }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); + addJobAsync(`compose:${jobData.composeId}`, jobData); return { success: true, message: "Deployment queued" }; }), redeploy: protectedProcedure @@ -437,14 +430,7 @@ export const composeRouter = createTRPCRouter({ await deploy(jobData); return true; } - await myQueue.add( - "deployments", - { ...jobData }, - { - removeOnComplete: true, - removeOnFail: true, - }, - ); + addJobAsync(`compose:${jobData.composeId}`, jobData); return { success: true, message: "Redeployment queued" }; }), stop: protectedProcedure diff --git a/apps/dokploy/server/api/routers/settings.ts b/apps/dokploy/server/api/routers/settings.ts index b4968c260..2e35fd6a4 100644 --- a/apps/dokploy/server/api/routers/settings.ts +++ b/apps/dokploy/server/api/routers/settings.ts @@ -862,4 +862,49 @@ export const settingsRouter = createTRPCRouter({ const ips = process.env.DOKPLOY_CLOUD_IPS?.split(","); return ips; }), + getDeploymentConcurrency: adminProcedure + .input( + z.object({ + serverId: z.string().optional(), + }), + ) + .query(async ({ input }) => { + // For now, remote servers use the same queue as dokploy server + // In the future, we could implement per-server queues + const { getConcurrency } = await import("@/server/queues/queueSetup"); + return { + concurrency: getConcurrency(), + serverId: input.serverId, + }; + }), + setDeploymentConcurrency: adminProcedure + .input( + z.object({ + concurrency: z.number().int().min(1).max(20), + serverId: z.string().optional(), + }), + ) + .mutation(async ({ input }) => { + // For now, remote servers use the same queue as dokploy server + // In the future, we could implement per-server queues + const { setConcurrency, getConcurrency } = await import( + "@/server/queues/queueSetup" + ); + const currentConcurrency = getConcurrency(); + const clearedCount = setConcurrency(input.concurrency); + const serverType = input.serverId ? "remote server" : "Dokploy server"; + + let message = `${serverType} deployment concurrency updated from ${currentConcurrency} to ${input.concurrency}. Changes take effect immediately.`; + if (clearedCount > 0) { + message += ` ${clearedCount} pending build${clearedCount > 1 ? "s were" : " was"} cancelled due to concurrency change.`; + } + + return { + success: true, + message, + concurrency: input.concurrency, + serverId: input.serverId, + clearedBuilds: clearedCount, + }; + }), }); diff --git a/apps/dokploy/server/queues/deployments-queue.ts b/apps/dokploy/server/queues/deployments-queue.ts index 4c117e7e3..ddf29050f 100644 --- a/apps/dokploy/server/queues/deployments-queue.ts +++ b/apps/dokploy/server/queues/deployments-queue.ts @@ -8,67 +8,77 @@ import { updateCompose, updatePreviewDeployment, } from "@dokploy/server"; -import { type Job, Worker } from "bullmq"; import type { DeploymentJob } from "./queue-types"; -import { redisConfig } from "./redis-connection"; +import { myQueue } from "./queueSetup"; -export const deploymentWorker = new Worker( - "deployments", - async (job: Job) => { - try { - if (job.data.applicationType === "application") { - await updateApplicationStatus(job.data.applicationId, "running"); +// Set the handler for processing deployment jobs +console.log("Setting deployment queue handler"); +myQueue.setHandler(async (job: DeploymentJob) => { + const jobId = + job.applicationType === "application" + ? job.applicationId + : job.applicationType === "compose" + ? job.composeId + : job.previewDeploymentId; + console.log("Handler called with job:", job.applicationType, jobId); + try { + if (job.applicationType === "application") { + await updateApplicationStatus(job.applicationId, "running"); - if (job.data.type === "redeploy") { - await rebuildApplication({ - applicationId: job.data.applicationId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, - }); - } else if (job.data.type === "deploy") { - await deployApplication({ - applicationId: job.data.applicationId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, - }); - } - } else if (job.data.applicationType === "compose") { - await updateCompose(job.data.composeId, { - composeStatus: "running", + if (job.type === "redeploy") { + await rebuildApplication({ + applicationId: job.applicationId, + titleLog: job.titleLog, + descriptionLog: job.descriptionLog, }); - if (job.data.type === "deploy") { - await deployCompose({ - composeId: job.data.composeId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, - }); - } else if (job.data.type === "redeploy") { - await rebuildCompose({ - composeId: job.data.composeId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, - }); - } - } else if (job.data.applicationType === "application-preview") { - await updatePreviewDeployment(job.data.previewDeploymentId, { - previewStatus: "running", + } else if (job.type === "deploy") { + await deployApplication({ + applicationId: job.applicationId, + titleLog: job.titleLog, + descriptionLog: job.descriptionLog, + }); + } + } else if (job.applicationType === "compose") { + await updateCompose(job.composeId, { + composeStatus: "running", + }); + if (job.type === "deploy") { + await deployCompose({ + composeId: job.composeId, + titleLog: job.titleLog, + descriptionLog: job.descriptionLog, + }); + } else if (job.type === "redeploy") { + await rebuildCompose({ + composeId: job.composeId, + titleLog: job.titleLog, + descriptionLog: job.descriptionLog, + }); + } + } else if (job.applicationType === "application-preview") { + await updatePreviewDeployment(job.previewDeploymentId, { + previewStatus: "running", + }); + + if (job.type === "deploy") { + await deployPreviewApplication({ + applicationId: job.applicationId, + titleLog: job.titleLog, + descriptionLog: job.descriptionLog, + previewDeploymentId: job.previewDeploymentId, }); - - if (job.data.type === "deploy") { - await deployPreviewApplication({ - applicationId: job.data.applicationId, - titleLog: job.data.titleLog, - descriptionLog: job.data.descriptionLog, - previewDeploymentId: job.data.previewDeploymentId, - }); - } } - } catch (error) { - console.log("Error", error); } + } catch (error) { + console.log("Error processing deployment job", error); + throw error; // Re-throw to let the queue handle retries if needed + } +}); + +// Export for compatibility (no longer needed but kept for imports) +export const deploymentWorker = { + run: () => { + // Queue starts processing automatically when jobs are added + console.log("Deployment queue handler initialized"); }, - { - autorun: false, - connection: redisConfig, - }, -); +}; diff --git a/apps/dokploy/server/queues/grouped-queue-wrapper.ts b/apps/dokploy/server/queues/grouped-queue-wrapper.ts new file mode 100644 index 000000000..8b067c90b --- /dev/null +++ b/apps/dokploy/server/queues/grouped-queue-wrapper.ts @@ -0,0 +1,256 @@ +/** + * In-memory grouped queue implementation + * Each group processes one job at a time (FIFO per group) + * Multiple groups can process in parallel + */ + +type Task = { + data: T; + resolve: () => void; + reject: (error: Error) => void; +}; + +type GroupQueue = { + tasks: Task[]; + processing: boolean; +}; + +export class GroupedQueue { + private groups: Map> = new Map(); + private handler?: (data: T) => Promise; + private concurrency: number; + private activeGroups: Set = new Set(); + + constructor(concurrency = 4) { + this.concurrency = concurrency; + } + + /** + * Set the handler function that processes each job + */ + setHandler(handler: (data: T) => Promise) { + this.handler = handler; + } + + /** + * Add a job to a group queue + */ + async add(groupId: string, data: T): Promise { + if (process.env.NODE_ENV !== "test") { + console.log( + `Adding job to group ${groupId}, handler set: ${!!this.handler}`, + ); + } + return new Promise((resolve, reject) => { + if (!this.groups.has(groupId)) { + this.groups.set(groupId, { + tasks: [], + processing: false, + }); + } + + const group = this.groups.get(groupId)!; + group.tasks.push({ + data, + resolve, + reject, + }); + + // Start processing if not already processing and under concurrency limit + if (!group.processing && this.activeGroups.size < this.concurrency) { + this.processGroup(groupId); + } + }); + } + + /** + * Process jobs in a group queue + */ + private async processGroup(groupId: string): Promise { + const group = this.groups.get(groupId); + if (!group || group.processing) { + return; + } + + // Wait for handler to be set if not available + if (!this.handler) { + if (process.env.NODE_ENV !== "test") { + console.log(`Handler not set yet for group ${groupId}, waiting...`); + } + // Retry after a short delay + setTimeout(() => { + if (this.handler && group.tasks.length > 0) { + this.processGroup(groupId); + } + }, 100); + return; + } + + // Check concurrency limit + if (this.activeGroups.size >= this.concurrency) { + return; + } + + group.processing = true; + this.activeGroups.add(groupId); + if (process.env.NODE_ENV !== "test") { + console.log(`Processing group ${groupId}, tasks: ${group.tasks.length}`); + } + + while (group.tasks.length > 0) { + const task = group.tasks.shift()!; + + try { + if (process.env.NODE_ENV !== "test") { + console.log(`Executing handler for group ${groupId}`); + } + await this.handler!(task.data); + task.resolve(); + if (process.env.NODE_ENV !== "test") { + console.log(`Handler completed for group ${groupId}`); + } + } catch (error) { + if (process.env.NODE_ENV !== "test") { + console.error(`Handler error for group ${groupId}:`, error); + } + task.reject(error instanceof Error ? error : new Error(String(error))); + } + } + + group.processing = false; + this.activeGroups.delete(groupId); + + // Try to process another group if there are waiting groups + this.processNextGroup(); + } + + /** + * Process the next available group + */ + private processNextGroup(): void { + if (this.activeGroups.size >= this.concurrency) { + return; + } + + // Find a group with pending tasks that's not currently processing + for (const [groupId, group] of this.groups.entries()) { + if ( + !group.processing && + group.tasks.length > 0 && + !this.activeGroups.has(groupId) + ) { + this.processGroup(groupId); + break; + } + } + } + + /** + * Remove all tasks for a specific group + */ + clearGroup(groupId: string): void { + const group = this.groups.get(groupId); + if (group) { + // Reject all pending tasks + for (const task of group.tasks) { + task.reject(new Error("Queue cleared")); + } + group.tasks = []; + } + } + + /** + * Clear all pending tasks across all groups + * This is useful when changing concurrency settings + * Note: This only clears tasks in the queue, not the currently executing task + */ + clearAllPendingTasks(): number { + let clearedCount = 0; + for (const [groupId, group] of this.groups.entries()) { + // Clear all pending tasks in the queue + // The currently executing task is not in group.tasks (it was already shifted) + if (group.tasks.length > 0) { + clearedCount += group.tasks.length; + for (const task of group.tasks) { + task.reject(new Error("Concurrency changed - queue cleared")); + } + group.tasks = []; + } + } + return clearedCount; + } + + /** + * Get the number of pending tasks for a group + */ + getGroupLength(groupId: string): number { + return this.groups.get(groupId)?.tasks.length ?? 0; + } + + /** + * Get total number of pending tasks across all groups + */ + getTotalLength(): number { + let total = 0; + for (const group of this.groups.values()) { + total += group.tasks.length; + } + return total; + } + + /** + * Check if queue is idle (no active processing) + */ + isIdle(): boolean { + return this.activeGroups.size === 0; + } + + /** + * Get the number of active groups (for testing) + */ + getActiveGroupsCount(): number { + return this.activeGroups.size; + } + + /** + * Get the concurrency limit + */ + getConcurrency(): number { + return this.concurrency; + } + + /** + * Set the concurrency limit dynamically + * This allows changing concurrency without recreating the queue + * WARNING: This will clear all pending tasks when concurrency changes + */ + setConcurrency(concurrency: number): void { + if (concurrency < 1) { + throw new Error("Concurrency must be at least 1"); + } + const concurrencyChanged = this.concurrency !== concurrency; + this.concurrency = concurrency; + + // If concurrency changed, clear all pending tasks + if (concurrencyChanged) { + this.clearAllPendingTasks(); + } + + // Process next group if we now have capacity + this.processNextGroup(); + } + + /** + * Close the queue and reject all pending tasks + */ + async close(): Promise { + for (const [groupId, group] of this.groups.entries()) { + for (const task of group.tasks) { + task.reject(new Error("Queue closed")); + } + group.tasks = []; + } + this.groups.clear(); + this.activeGroups.clear(); + } +} diff --git a/apps/dokploy/server/queues/queue-manager.ts b/apps/dokploy/server/queues/queue-manager.ts new file mode 100644 index 000000000..14b489ac1 --- /dev/null +++ b/apps/dokploy/server/queues/queue-manager.ts @@ -0,0 +1,112 @@ +/** + * Queue Manager - Manages multiple dynamic queues + * Each queue can have its own concurrency configuration + */ + +import { GroupedQueue } from "./grouped-queue-wrapper"; + +export class QueueManager { + private queues: Map> = new Map(); + + /** + * Get or create a queue with the specified name and concurrency + * Note: If queue already exists, concurrency parameter is ignored + */ + getQueue(name: string, concurrency = 1): GroupedQueue { + if (!this.queues.has(name)) { + this.queues.set(name, new GroupedQueue(concurrency)); + } + return this.queues.get(name) as GroupedQueue; + } + + /** + * Set handler for a specific queue + */ + setHandler(queueName: string, handler: (data: T) => Promise): void { + const queue = this.getQueue(queueName); + queue.setHandler(handler); + } + + /** + * Add a job to a specific queue and group + * If concurrency is provided and queue doesn't exist, creates it with that concurrency + */ + async add( + queueName: string, + groupId: string, + data: T, + concurrency?: number, + ): Promise { + // If concurrency is provided and queue doesn't exist, create with that concurrency + if (concurrency !== undefined && !this.queues.has(queueName)) { + this.queues.set(queueName, new GroupedQueue(concurrency)); + } + const queue = this.getQueue(queueName); + return queue.add(groupId, data); + } + + /** + * Clear all tasks for a specific group in a queue + */ + clearGroup(queueName: string, groupId: string): void { + const queue = this.queues.get(queueName); + if (queue) { + queue.clearGroup(groupId); + } + } + + /** + * Get the number of pending tasks for a group in a queue + */ + getGroupLength(queueName: string, groupId: string): number { + const queue = this.queues.get(queueName); + return queue ? queue.getGroupLength(groupId) : 0; + } + + /** + * Get total number of pending tasks across all groups in a queue + */ + getTotalLength(queueName: string): number { + const queue = this.queues.get(queueName); + return queue ? queue.getTotalLength() : 0; + } + + /** + * Check if a queue is idle + */ + isIdle(queueName: string): boolean { + const queue = this.queues.get(queueName); + return queue ? queue.isIdle() : true; + } + + /** + * Close a specific queue + */ + async closeQueue(queueName: string): Promise { + const queue = this.queues.get(queueName); + if (queue) { + await queue.close(); + this.queues.delete(queueName); + } + } + + /** + * Close all queues + */ + async closeAll(): Promise { + const promises = Array.from(this.queues.keys()).map((name) => + this.closeQueue(name), + ); + await Promise.all(promises); + } + + /** + * Get all queue names + */ + getQueueNames(): string[] { + return Array.from(this.queues.keys()); + } +} + +// Singleton instance +export const queueManager = new QueueManager(); diff --git a/apps/dokploy/server/queues/queueSetup.ts b/apps/dokploy/server/queues/queueSetup.ts index 1577273c8..837ca4e41 100644 --- a/apps/dokploy/server/queues/queueSetup.ts +++ b/apps/dokploy/server/queues/queueSetup.ts @@ -1,44 +1,110 @@ -import { Queue } from "bullmq"; -import { redisConfig } from "./redis-connection"; +import { GroupedQueue } from "./grouped-queue-wrapper"; +import type { DeploymentJob } from "./queue-types"; -const myQueue = new Queue("deployments", { - connection: redisConfig, -}); +// In-memory grouped queue: processes one job per group at a time +// Multiple groups can process in parallel (up to concurrency limit) +// Concurrency can be configured via DEPLOYMENT_QUEUE_CONCURRENCY env var (default: 1) +// or dynamically via setConcurrency() function +let DEPLOYMENT_CONCURRENCY = Number.parseInt( + process.env.DEPLOYMENT_QUEUE_CONCURRENCY || "1", + 10, +); -process.on("SIGTERM", () => { - myQueue.close(); +// Validate concurrency is at least 1 +if (DEPLOYMENT_CONCURRENCY < 1) { + DEPLOYMENT_CONCURRENCY = 1; +} + +const myQueue = new GroupedQueue(DEPLOYMENT_CONCURRENCY); + +// Initialize handler when this module is imported +// Use dynamic import to avoid circular dependency +// The handler will be set when deployments-queue.ts is imported +let handlerInitialized = false; +const initializeHandler = async () => { + if (!handlerInitialized) { + handlerInitialized = true; + // This will set the handler + await import("./deployments-queue"); + } +}; + +// Initialize handler immediately (non-blocking) +void initializeHandler(); + +process.on("SIGTERM", async () => { + await myQueue.close(); process.exit(0); }); -myQueue.on("error", (error) => { - if ((error as any).code === "ECONNREFUSED") { - console.error( - "Make sure you have installed Redis and it is running.", - error, - ); - } -}); - export const cleanQueuesByApplication = async (applicationId: string) => { - const jobs = await myQueue.getJobs(["waiting", "delayed"]); - - for (const job of jobs) { - if (job?.data?.applicationId === applicationId) { - await job.remove(); - console.log(`Removed job ${job.id} for application ${applicationId}`); - } - } + const groupId = `application:${applicationId}`; + myQueue.clearGroup(groupId); + console.log(`Cleared queue for application ${applicationId}`); }; export const cleanQueuesByCompose = async (composeId: string) => { - const jobs = await myQueue.getJobs(["waiting", "delayed"]); + const groupId = `compose:${composeId}`; + myQueue.clearGroup(groupId); + console.log(`Cleared queue for compose ${composeId}`); +}; - for (const job of jobs) { - if (job?.data?.composeId === composeId) { - await job.remove(); - console.log(`Removed job ${job.id} for compose ${composeId}`); +/** + * Add a job to the queue without awaiting (fire-and-forget) + * This allows the API to return immediately while the job processes in the background + * Errors are logged but don't block the response + */ +export const addJobAsync = (groupId: string, data: DeploymentJob): void => { + // Fire and forget - don't await, but handle errors + myQueue.add(groupId, data).catch((error) => { + console.error(`Failed to queue job for group ${groupId}:`, error); + }); +}; + +/** + * Get the current deployment queue concurrency + */ +export const getConcurrency = (): number => { + return myQueue.getConcurrency(); +}; + +/** + * Set the deployment queue concurrency dynamically + * This updates the queue's concurrency setting immediately + * WARNING: This will clear all pending builds when concurrency changes + * @returns The number of pending builds that were cleared + */ +export const setConcurrency = (concurrency: number): number => { + if (concurrency < 1) { + throw new Error("Concurrency must be at least 1"); + } + + const currentConcurrency = myQueue.getConcurrency(); + const concurrencyChanged = currentConcurrency !== concurrency; + + // Get count of pending tasks before clearing (setConcurrency will clear them) + let clearedCount = 0; + if (concurrencyChanged) { + // Get the count before setConcurrency clears them + clearedCount = myQueue.getTotalLength(); + if (process.env.NODE_ENV !== "test") { + console.log( + `Concurrency changing from ${currentConcurrency} to ${concurrency}. Will clear ${clearedCount} pending builds.`, + ); } } + + // Update the stored concurrency value + DEPLOYMENT_CONCURRENCY = concurrency; + + // Update the queue's concurrency dynamically (this will clear pending tasks) + myQueue.setConcurrency(concurrency); + + if (process.env.NODE_ENV !== "test") { + console.log(`Deployment queue concurrency updated to ${concurrency}`); + } + + return clearedCount; }; export { myQueue }; diff --git a/apps/dokploy/server/server.ts b/apps/dokploy/server/server.ts index eaf562695..57ba4e5e8 100644 --- a/apps/dokploy/server/server.ts +++ b/apps/dokploy/server/server.ts @@ -4,11 +4,11 @@ import { createDefaultServerTraefikConfig, createDefaultTraefikConfig, IS_CLOUD, + initCancelDeployments, initCronJobs, initializeNetwork, initSchedules, initVolumeBackupsCronJobs, - initCancelDeployments, sendDokployRestartNotifications, setupDirectories, } from "@dokploy/server"; @@ -66,6 +66,8 @@ void app.prepare().then(async () => { console.log(`Server Started on: http://${HOST}:${PORT}`); if (!IS_CLOUD) { console.log("Starting Deployment Worker"); + // Import the handler module to ensure it's initialized + await import("./queues/deployments-queue"); const { deploymentWorker } = await import("./queues/deployments-queue"); await deploymentWorker.run(); } diff --git a/packages/server/src/utils/builders/railpack.ts b/packages/server/src/utils/builders/railpack.ts index 305ff20e8..1b8d2edfb 100644 --- a/packages/server/src/utils/builders/railpack.ts +++ b/packages/server/src/utils/builders/railpack.ts @@ -97,7 +97,7 @@ docker ${buildArgs.join(" ")} || { exit 1; } echo "✅ Railpack build completed." ; -docker buildx rm builder-containerd +docker buildx rm builder-containerd || true `; return bashCommand;