mirror of
https://github.com/Dokploy/dokploy.git
synced 2026-06-15 20:25:23 +02:00
feat: add builds concurrency management for servers
- Introduced a new `BuildsConcurrency` component to manage the number of concurrent builds for both local and remote servers, gated by license validity. - Implemented backend logic to resolve effective builds concurrency based on server settings and organization licenses. - Added unit tests for concurrency resolution logic to ensure correct behavior under various licensing scenarios. - Updated database schema to include `buildsConcurrency` field for servers and web server settings. - Refactored deployment queue to support in-memory job processing with configurable concurrency limits. This feature enhances deployment flexibility and control for enterprise users.
This commit is contained in:
109
apps/dokploy/__test__/queues/concurrency.test.ts
Normal file
109
apps/dokploy/__test__/queues/concurrency.test.ts
Normal file
@@ -0,0 +1,109 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const hasValidLicense = vi.fn();
|
||||
const getWebServerSettings = vi.fn();
|
||||
const findFirstOrg = vi.fn();
|
||||
const findFirstServer = vi.fn();
|
||||
|
||||
vi.mock("@dokploy/server/db", () => ({
|
||||
db: {
|
||||
query: {
|
||||
organization: {
|
||||
findFirst: (...args: unknown[]) => findFirstOrg(...args),
|
||||
},
|
||||
server: {
|
||||
findFirst: (...args: unknown[]) => findFirstServer(...args),
|
||||
},
|
||||
},
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock("@dokploy/server/db/schema", () => ({
|
||||
organization: {},
|
||||
server: {},
|
||||
}));
|
||||
|
||||
vi.mock("@dokploy/server/services/proprietary/license-key", () => ({
|
||||
hasValidLicense: (...args: unknown[]) => hasValidLicense(...args),
|
||||
}));
|
||||
|
||||
vi.mock("@dokploy/server/services/web-server-settings", () => ({
|
||||
getWebServerSettings: (...args: unknown[]) => getWebServerSettings(...args),
|
||||
}));
|
||||
|
||||
vi.mock("drizzle-orm", () => ({ eq: vi.fn() }));
|
||||
|
||||
import { resolveBuildsConcurrency } from "../../server/queues/concurrency";
|
||||
import { LOCAL_PARTITION } from "../../server/queues/in-memory-queue";
|
||||
|
||||
describe("resolveBuildsConcurrency (enterprise gating)", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
findFirstOrg.mockResolvedValue({ id: "org-1" });
|
||||
});
|
||||
|
||||
describe("local web server partition", () => {
|
||||
it("returns the configured concurrency when licensed", async () => {
|
||||
getWebServerSettings.mockResolvedValue({ buildsConcurrency: 5 });
|
||||
hasValidLicense.mockResolvedValue(true);
|
||||
|
||||
await expect(resolveBuildsConcurrency(LOCAL_PARTITION)).resolves.toBe(5);
|
||||
});
|
||||
|
||||
it("clamps to 1 when there is no valid license", async () => {
|
||||
getWebServerSettings.mockResolvedValue({ buildsConcurrency: 10 });
|
||||
hasValidLicense.mockResolvedValue(false);
|
||||
|
||||
await expect(resolveBuildsConcurrency(LOCAL_PARTITION)).resolves.toBe(1);
|
||||
});
|
||||
|
||||
it("caps the configured value at 20 when licensed", async () => {
|
||||
getWebServerSettings.mockResolvedValue({ buildsConcurrency: 999 });
|
||||
hasValidLicense.mockResolvedValue(true);
|
||||
|
||||
await expect(resolveBuildsConcurrency(LOCAL_PARTITION)).resolves.toBe(20);
|
||||
});
|
||||
|
||||
it("defaults to 1 when settings are missing", async () => {
|
||||
getWebServerSettings.mockResolvedValue(undefined);
|
||||
hasValidLicense.mockResolvedValue(true);
|
||||
|
||||
await expect(resolveBuildsConcurrency(LOCAL_PARTITION)).resolves.toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("remote server partition", () => {
|
||||
it("returns the server concurrency when its org is licensed", async () => {
|
||||
findFirstServer.mockResolvedValue({
|
||||
buildsConcurrency: 4,
|
||||
organizationId: "org-1",
|
||||
});
|
||||
hasValidLicense.mockResolvedValue(true);
|
||||
|
||||
await expect(resolveBuildsConcurrency("server-1")).resolves.toBe(4);
|
||||
expect(hasValidLicense).toHaveBeenCalledWith("org-1");
|
||||
});
|
||||
|
||||
it("clamps to 1 when the server org is not licensed", async () => {
|
||||
findFirstServer.mockResolvedValue({
|
||||
buildsConcurrency: 8,
|
||||
organizationId: "org-1",
|
||||
});
|
||||
hasValidLicense.mockResolvedValue(false);
|
||||
|
||||
await expect(resolveBuildsConcurrency("server-1")).resolves.toBe(1);
|
||||
});
|
||||
|
||||
it("defaults to 1 for an unknown server", async () => {
|
||||
findFirstServer.mockResolvedValue(undefined);
|
||||
|
||||
await expect(resolveBuildsConcurrency("ghost")).resolves.toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to 1 if resolution throws", async () => {
|
||||
getWebServerSettings.mockRejectedValue(new Error("db down"));
|
||||
|
||||
await expect(resolveBuildsConcurrency(LOCAL_PARTITION)).resolves.toBe(1);
|
||||
});
|
||||
});
|
||||
337
apps/dokploy/__test__/queues/in-memory-queue.test.ts
Normal file
337
apps/dokploy/__test__/queues/in-memory-queue.test.ts
Normal file
@@ -0,0 +1,337 @@
|
||||
import { beforeEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
getGroup,
|
||||
getPartition,
|
||||
InMemoryQueue,
|
||||
LOCAL_PARTITION,
|
||||
} from "../../server/queues/in-memory-queue";
|
||||
import type { DeploymentJob } from "../../server/queues/queue-types";
|
||||
|
||||
const appJob = (applicationId: string, serverId?: string): DeploymentJob => ({
|
||||
applicationId,
|
||||
titleLog: "deploy",
|
||||
descriptionLog: "",
|
||||
type: "deploy",
|
||||
applicationType: "application",
|
||||
serverId,
|
||||
});
|
||||
|
||||
const composeJob = (composeId: string, serverId?: string): DeploymentJob => ({
|
||||
composeId,
|
||||
titleLog: "deploy",
|
||||
descriptionLog: "",
|
||||
type: "deploy",
|
||||
applicationType: "compose",
|
||||
serverId,
|
||||
});
|
||||
|
||||
/** A controllable async task: resolves only when `release()` is called. */
|
||||
const deferred = () => {
|
||||
let resolve!: () => void;
|
||||
const promise = new Promise<void>((r) => {
|
||||
resolve = r;
|
||||
});
|
||||
return { promise, release: resolve };
|
||||
};
|
||||
|
||||
const flush = () => new Promise((r) => setTimeout(r, 0));
|
||||
|
||||
describe("getPartition / getGroup", () => {
|
||||
it("partitions by serverId, falling back to the local partition", () => {
|
||||
expect(getPartition(appJob("a"))).toBe(LOCAL_PARTITION);
|
||||
expect(getPartition(appJob("a", "server-1"))).toBe("server-1");
|
||||
});
|
||||
|
||||
it("groups applications and compose by their id", () => {
|
||||
expect(getGroup(appJob("a"))).toBe("application:a");
|
||||
expect(getGroup(composeJob("c"))).toBe("compose:c");
|
||||
});
|
||||
});
|
||||
|
||||
describe("InMemoryQueue concurrency", () => {
|
||||
let nowValue = 0;
|
||||
const now = () => ++nowValue;
|
||||
|
||||
beforeEach(() => {
|
||||
nowValue = 0;
|
||||
});
|
||||
|
||||
it("runs different applications concurrently up to the limit", async () => {
|
||||
const tasks = new Map<string, ReturnType<typeof deferred>>();
|
||||
const started: string[] = [];
|
||||
|
||||
const queue = new InMemoryQueue({ resolveConcurrency: () => 2, now });
|
||||
queue.process(async (job) => {
|
||||
const id = (job.data as any).applicationId;
|
||||
started.push(id);
|
||||
const d = deferred();
|
||||
tasks.set(id, d);
|
||||
await d.promise;
|
||||
});
|
||||
await queue.run();
|
||||
|
||||
await queue.add(appJob("a"));
|
||||
await queue.add(appJob("b"));
|
||||
await queue.add(appJob("c"));
|
||||
await flush();
|
||||
|
||||
// Concurrency 2 -> only a and b start, c waits.
|
||||
expect(started).toEqual(["a", "b"]);
|
||||
|
||||
tasks.get("a")!.release();
|
||||
await flush();
|
||||
|
||||
// A slot freed -> c starts.
|
||||
expect(started).toEqual(["a", "b", "c"]);
|
||||
});
|
||||
|
||||
it("serializes jobs of the same application (per-group FIFO)", async () => {
|
||||
const tasks: Array<ReturnType<typeof deferred>> = [];
|
||||
const started: number[] = [];
|
||||
let counter = 0;
|
||||
|
||||
const queue = new InMemoryQueue({ resolveConcurrency: () => 5, now });
|
||||
queue.process(async () => {
|
||||
started.push(++counter);
|
||||
const d = deferred();
|
||||
tasks.push(d);
|
||||
await d.promise;
|
||||
});
|
||||
await queue.run();
|
||||
|
||||
// Two deploys of the SAME app, even with concurrency 5.
|
||||
await queue.add(appJob("same"));
|
||||
await queue.add(appJob("same"));
|
||||
await flush();
|
||||
|
||||
// Only the first one runs; the second waits for the group to free.
|
||||
expect(started).toEqual([1]);
|
||||
|
||||
tasks[0]!.release();
|
||||
await flush();
|
||||
|
||||
expect(started).toEqual([1, 2]);
|
||||
});
|
||||
|
||||
it("isolates concurrency per server partition", async () => {
|
||||
const started: string[] = [];
|
||||
const tasks = new Map<string, ReturnType<typeof deferred>>();
|
||||
|
||||
// server-1 allows 1, server-2 allows 1, but they are independent.
|
||||
const queue = new InMemoryQueue({
|
||||
resolveConcurrency: () => 1,
|
||||
now,
|
||||
});
|
||||
queue.process(async (job) => {
|
||||
const id = `${job.data.serverId}:${(job.data as any).applicationId}`;
|
||||
started.push(id);
|
||||
const d = deferred();
|
||||
tasks.set(id, d);
|
||||
await d.promise;
|
||||
});
|
||||
await queue.run();
|
||||
|
||||
await queue.add(appJob("a", "server-1"));
|
||||
await queue.add(appJob("b", "server-2"));
|
||||
await flush();
|
||||
|
||||
// One per partition runs in parallel despite concurrency 1 each.
|
||||
expect(started.sort()).toEqual(["server-1:a", "server-2:b"]);
|
||||
});
|
||||
|
||||
it("honors a different concurrency per server", async () => {
|
||||
const started: string[] = [];
|
||||
const tasks = new Map<string, ReturnType<typeof deferred>>();
|
||||
|
||||
// server-fast allows 2, server-slow allows 1.
|
||||
const queue = new InMemoryQueue({
|
||||
resolveConcurrency: (partition) => (partition === "server-fast" ? 2 : 1),
|
||||
now,
|
||||
});
|
||||
queue.process(async (job) => {
|
||||
const id = `${job.data.serverId}:${(job.data as any).applicationId}`;
|
||||
started.push(id);
|
||||
const d = deferred();
|
||||
tasks.set(id, d);
|
||||
await d.promise;
|
||||
});
|
||||
await queue.run();
|
||||
|
||||
await queue.add(appJob("a", "server-fast"));
|
||||
await queue.add(appJob("b", "server-fast"));
|
||||
await queue.add(appJob("c", "server-slow"));
|
||||
await queue.add(appJob("d", "server-slow"));
|
||||
await flush();
|
||||
|
||||
// server-fast runs 2 in parallel; server-slow only 1.
|
||||
expect(started.sort()).toEqual([
|
||||
"server-fast:a",
|
||||
"server-fast:b",
|
||||
"server-slow:c",
|
||||
]);
|
||||
|
||||
// Free a server-slow slot -> its queued app starts.
|
||||
tasks.get("server-slow:c")!.release();
|
||||
await flush();
|
||||
expect(started).toContain("server-slow:d");
|
||||
});
|
||||
|
||||
it("serializes the same app on a server even with spare concurrency", async () => {
|
||||
const started: number[] = [];
|
||||
const tasks: Array<ReturnType<typeof deferred>> = [];
|
||||
let counter = 0;
|
||||
|
||||
// Plenty of room (concurrency 2) but two deploys of the SAME app.
|
||||
const queue = new InMemoryQueue({ resolveConcurrency: () => 2, now });
|
||||
queue.process(async () => {
|
||||
started.push(++counter);
|
||||
const d = deferred();
|
||||
tasks.push(d);
|
||||
await d.promise;
|
||||
});
|
||||
await queue.run();
|
||||
|
||||
await queue.add(appJob("app-x", "server-1"));
|
||||
await queue.add(appJob("app-x", "server-1"));
|
||||
await flush();
|
||||
|
||||
// Only one build of app-x runs despite 2 free slots.
|
||||
expect(started).toEqual([1]);
|
||||
|
||||
tasks[0]!.release();
|
||||
await flush();
|
||||
expect(started).toEqual([1, 2]);
|
||||
});
|
||||
|
||||
it("clamps concurrency below 1 up to 1 (license-disabled behaviour)", async () => {
|
||||
const started: string[] = [];
|
||||
const tasks = new Map<string, ReturnType<typeof deferred>>();
|
||||
|
||||
// Simulate a non-licensed resolver returning 0 — must still run 1.
|
||||
const queue = new InMemoryQueue({ resolveConcurrency: () => 0, now });
|
||||
queue.process(async (job) => {
|
||||
const id = (job.data as any).applicationId;
|
||||
started.push(id);
|
||||
const d = deferred();
|
||||
tasks.set(id, d);
|
||||
await d.promise;
|
||||
});
|
||||
await queue.run();
|
||||
|
||||
await queue.add(appJob("a"));
|
||||
await queue.add(appJob("b"));
|
||||
await flush();
|
||||
|
||||
expect(started).toEqual(["a"]);
|
||||
});
|
||||
|
||||
it("picks up concurrency changes between scheduling ticks", async () => {
|
||||
const started: string[] = [];
|
||||
const tasks = new Map<string, ReturnType<typeof deferred>>();
|
||||
let limit = 1;
|
||||
|
||||
const queue = new InMemoryQueue({
|
||||
resolveConcurrency: () => limit,
|
||||
now,
|
||||
});
|
||||
queue.process(async (job) => {
|
||||
const id = (job.data as any).applicationId;
|
||||
started.push(id);
|
||||
const d = deferred();
|
||||
tasks.set(id, d);
|
||||
await d.promise;
|
||||
});
|
||||
await queue.run();
|
||||
|
||||
await queue.add(appJob("a"));
|
||||
await queue.add(appJob("b"));
|
||||
await flush();
|
||||
expect(started).toEqual(["a"]);
|
||||
|
||||
// Raise the limit (e.g. license activated) and release the running job
|
||||
// so a new tick observes the new concurrency.
|
||||
limit = 2;
|
||||
tasks.get("a")!.release();
|
||||
await flush();
|
||||
|
||||
expect(started).toContain("b");
|
||||
});
|
||||
});
|
||||
|
||||
describe("InMemoryQueue job management", () => {
|
||||
it("lists waiting jobs and removes them by predicate", async () => {
|
||||
const block = deferred();
|
||||
const queue = new InMemoryQueue({ resolveConcurrency: () => 1 });
|
||||
queue.process(async () => {
|
||||
await block.promise;
|
||||
});
|
||||
await queue.run();
|
||||
|
||||
await queue.add(appJob("running"));
|
||||
await queue.add(appJob("waiting-1"));
|
||||
await queue.add(composeJob("waiting-2"));
|
||||
await flush();
|
||||
|
||||
const waiting = await queue.getJobs(["waiting"]);
|
||||
expect(waiting.map((j) => j.data)).toHaveLength(2);
|
||||
|
||||
const removed = queue.removeWaiting(
|
||||
(data) => (data as any).applicationId === "waiting-1",
|
||||
);
|
||||
expect(removed).toBe(1);
|
||||
|
||||
const after = await queue.getJobs(["waiting"]);
|
||||
expect(after).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("clears all waiting jobs", async () => {
|
||||
const block = deferred();
|
||||
const queue = new InMemoryQueue({ resolveConcurrency: () => 1 });
|
||||
queue.process(async () => {
|
||||
await block.promise;
|
||||
});
|
||||
await queue.run();
|
||||
|
||||
await queue.add(appJob("running"));
|
||||
await queue.add(appJob("waiting-1"));
|
||||
await queue.add(appJob("waiting-2"));
|
||||
await flush();
|
||||
|
||||
expect(queue.clearWaiting()).toBe(2);
|
||||
expect(await queue.getJobs(["waiting"])).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("starts processing as soon as a processor is registered", async () => {
|
||||
const started: string[] = [];
|
||||
const queue = new InMemoryQueue({ resolveConcurrency: () => 5 });
|
||||
|
||||
// No processor yet -> jobs queue but do not run.
|
||||
await queue.add(appJob("a"));
|
||||
await flush();
|
||||
expect(started).toEqual([]);
|
||||
|
||||
// Registering the processor auto-starts the queue (no separate run()).
|
||||
queue.process(async (job) => {
|
||||
started.push((job.data as any).applicationId);
|
||||
});
|
||||
await flush();
|
||||
expect(started).toEqual(["a"]);
|
||||
});
|
||||
|
||||
it("continues scheduling after a job throws", async () => {
|
||||
const started: string[] = [];
|
||||
const queue = new InMemoryQueue({ resolveConcurrency: () => 1 });
|
||||
queue.process(async (job) => {
|
||||
const id = (job.data as any).applicationId;
|
||||
started.push(id);
|
||||
if (id === "a") throw new Error("boom");
|
||||
});
|
||||
await queue.run();
|
||||
|
||||
await queue.add(appJob("a"));
|
||||
await queue.add(appJob("b"));
|
||||
await flush();
|
||||
|
||||
expect(started).toEqual(["a", "b"]);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,106 @@
|
||||
import { useEffect, useState } from "react";
|
||||
import { toast } from "sonner";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import { Input } from "@/components/ui/input";
|
||||
import { api } from "@/utils/api";
|
||||
|
||||
const clamp = (value: number) => Math.min(20, Math.max(1, value));
|
||||
|
||||
interface Props {
|
||||
/**
|
||||
* When provided, configures concurrency for that remote server. When
|
||||
* omitted, configures the local Dokploy web server.
|
||||
*/
|
||||
serverId?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enterprise-only control to set the number of concurrent builds, either for a
|
||||
* remote server (`serverId` provided) or the local web server (omitted).
|
||||
* Hidden when the instance has no valid license.
|
||||
*/
|
||||
export const BuildsConcurrency = ({ serverId }: Props) => {
|
||||
const { data: isCloud } = api.settings.isCloud.useQuery();
|
||||
const { data: haveValidLicense } =
|
||||
api.licenseKey.haveValidLicenseKey.useQuery();
|
||||
|
||||
const serverQuery = api.server.one.useQuery(
|
||||
{ serverId: serverId ?? "" },
|
||||
{ enabled: !!serverId },
|
||||
);
|
||||
const webServerQuery = api.settings.getWebServerSettings.useQuery(undefined, {
|
||||
enabled: !serverId,
|
||||
});
|
||||
|
||||
const current = serverId
|
||||
? serverQuery.data?.buildsConcurrency
|
||||
: webServerQuery.data?.buildsConcurrency;
|
||||
const refetch = serverId ? serverQuery.refetch : webServerQuery.refetch;
|
||||
|
||||
const updateServer = api.server.updateBuildsConcurrency.useMutation();
|
||||
const updateWebServer = api.settings.updateBuildsConcurrency.useMutation();
|
||||
const isPending = serverId
|
||||
? updateServer.isPending
|
||||
: updateWebServer.isPending;
|
||||
|
||||
const [value, setValue] = useState("1");
|
||||
|
||||
useEffect(() => {
|
||||
if (current) {
|
||||
setValue(String(current));
|
||||
}
|
||||
}, [current]);
|
||||
|
||||
// Concurrent builds are a self-hosted enterprise feature; not shown in cloud.
|
||||
if (isCloud || !haveValidLicense) return null;
|
||||
|
||||
const handleSave = async () => {
|
||||
const parsed = clamp(Number.parseInt(value, 10) || 1);
|
||||
setValue(String(parsed));
|
||||
try {
|
||||
if (serverId) {
|
||||
await updateServer.mutateAsync({ serverId, buildsConcurrency: parsed });
|
||||
} else {
|
||||
await updateWebServer.mutateAsync({ buildsConcurrency: parsed });
|
||||
}
|
||||
await refetch();
|
||||
toast.success("Builds concurrency updated");
|
||||
} catch {
|
||||
toast.error("Error updating builds concurrency");
|
||||
}
|
||||
};
|
||||
|
||||
const hasChanges = Number(value) !== (current ?? 1);
|
||||
|
||||
return (
|
||||
<div className="flex flex-row items-center justify-between rounded-lg border p-3">
|
||||
<div className="space-y-0.5">
|
||||
<p className="text-sm font-medium">Concurrent Builds</p>
|
||||
<p className="text-sm text-muted-foreground">
|
||||
Maximum number of deployments that can build at the same time on
|
||||
{serverId ? " this server" : " the local Dokploy server"}. Builds of
|
||||
the same service are always serialized.
|
||||
</p>
|
||||
</div>
|
||||
<div className="flex items-center gap-2">
|
||||
<Input
|
||||
type="number"
|
||||
min={1}
|
||||
max={20}
|
||||
value={value}
|
||||
onChange={(e) => setValue(e.target.value)}
|
||||
className="w-20"
|
||||
/>
|
||||
<Button
|
||||
type="button"
|
||||
size="sm"
|
||||
onClick={handleSave}
|
||||
isLoading={isPending}
|
||||
disabled={!hasChanges}
|
||||
>
|
||||
Save
|
||||
</Button>
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
};
|
||||
@@ -39,6 +39,7 @@ import {
|
||||
import { Switch } from "@/components/ui/switch";
|
||||
import { Textarea } from "@/components/ui/textarea";
|
||||
import { api } from "@/utils/api";
|
||||
import { BuildsConcurrency } from "./actions/builds-concurrency";
|
||||
|
||||
const Schema = z.object({
|
||||
name: z.string().min(1, {
|
||||
@@ -444,6 +445,7 @@ export const HandleServers = ({ serverId, asButton = false }: Props) => {
|
||||
</FormItem>
|
||||
)}
|
||||
/>
|
||||
{serverId && <BuildsConcurrency serverId={serverId} />}
|
||||
</form>
|
||||
|
||||
<DialogFooter>
|
||||
|
||||
2
apps/dokploy/drizzle/0172_quick_the_professor.sql
Normal file
2
apps/dokploy/drizzle/0172_quick_the_professor.sql
Normal file
@@ -0,0 +1,2 @@
|
||||
ALTER TABLE "server" ADD COLUMN "buildsConcurrency" integer DEFAULT 1 NOT NULL;--> statement-breakpoint
|
||||
ALTER TABLE "webServerSettings" ADD COLUMN "buildsConcurrency" integer DEFAULT 1 NOT NULL;
|
||||
8458
apps/dokploy/drizzle/meta/0172_snapshot.json
Normal file
8458
apps/dokploy/drizzle/meta/0172_snapshot.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1205,6 +1205,13 @@
|
||||
"when": 1780775037209,
|
||||
"tag": "0171_lucky_echo",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 172,
|
||||
"version": "7",
|
||||
"when": 1781045439162,
|
||||
"tag": "0172_quick_the_professor",
|
||||
"breakpoints": true
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -3,6 +3,7 @@ import { createServerSideHelpers } from "@trpc/react-query/server";
|
||||
import type { GetServerSidePropsContext } from "next";
|
||||
import type { ReactElement } from "react";
|
||||
import superjson from "superjson";
|
||||
import { BuildsConcurrency } from "@/components/dashboard/settings/servers/actions/builds-concurrency";
|
||||
import { ToggleEnforceSSO } from "@/components/dashboard/settings/servers/actions/toggle-enforce-sso";
|
||||
import { ToggleRemoteServersOnly } from "@/components/dashboard/settings/servers/actions/toggle-remote-servers-only";
|
||||
import { DashboardLayout } from "@/components/layouts/dashboard-layout";
|
||||
@@ -78,6 +79,7 @@ const Page = ({ isCloud }: Props) => {
|
||||
<CardContent className="flex flex-col gap-4">
|
||||
<ToggleRemoteServersOnly />
|
||||
<ToggleEnforceSSO />
|
||||
<BuildsConcurrency />
|
||||
</CardContent>
|
||||
</EnterpriseFeatureGate>
|
||||
</div>
|
||||
|
||||
@@ -68,11 +68,9 @@ import {
|
||||
environments,
|
||||
projects,
|
||||
} from "@/server/db/schema";
|
||||
import { deploymentWorker } from "@/server/queues/deployments-queue";
|
||||
import type { DeploymentJob } from "@/server/queues/queue-types";
|
||||
import {
|
||||
cleanQueuesByApplication,
|
||||
getJobsByApplicationId,
|
||||
killDockerBuild,
|
||||
myQueue,
|
||||
} from "@/server/queues/queueSetup";
|
||||
@@ -242,12 +240,7 @@ export const applicationRouter = createTRPCRouter({
|
||||
.returning();
|
||||
|
||||
if (!IS_CLOUD) {
|
||||
const queueJobs = await getJobsByApplicationId(input.applicationId);
|
||||
for (const job of queueJobs) {
|
||||
if (job.id) {
|
||||
deploymentWorker.cancelJob(job.id, "User requested cancellation");
|
||||
}
|
||||
}
|
||||
await cleanQueuesByApplication(input.applicationId);
|
||||
}
|
||||
|
||||
const cleanupOperations = [
|
||||
@@ -339,10 +332,10 @@ export const applicationRouter = createTRPCRouter({
|
||||
type: "redeploy",
|
||||
applicationType: "application",
|
||||
server: !!application.serverId,
|
||||
serverId: application.serverId ?? undefined,
|
||||
};
|
||||
|
||||
if (IS_CLOUD && application.serverId) {
|
||||
jobData.serverId = application.serverId;
|
||||
deploy(jobData).catch((error) => {
|
||||
console.error("Background deployment failed:", error);
|
||||
});
|
||||
@@ -707,9 +700,9 @@ export const applicationRouter = createTRPCRouter({
|
||||
type: "deploy",
|
||||
applicationType: "application",
|
||||
server: !!application.serverId,
|
||||
serverId: application.serverId ?? undefined,
|
||||
};
|
||||
if (IS_CLOUD && application.serverId) {
|
||||
jobData.serverId = application.serverId;
|
||||
deploy(jobData).catch((error) => {
|
||||
console.error("Background deployment failed:", error);
|
||||
});
|
||||
@@ -826,9 +819,9 @@ export const applicationRouter = createTRPCRouter({
|
||||
type: "deploy",
|
||||
applicationType: "application",
|
||||
server: !!app.serverId,
|
||||
serverId: app.serverId ?? undefined,
|
||||
};
|
||||
if (IS_CLOUD && app.serverId) {
|
||||
jobData.serverId = app.serverId;
|
||||
deploy(jobData).catch((error) => {
|
||||
console.error("Background deployment failed:", error);
|
||||
});
|
||||
|
||||
@@ -68,11 +68,9 @@ import {
|
||||
environments,
|
||||
projects,
|
||||
} from "@/server/db/schema";
|
||||
import { deploymentWorker } from "@/server/queues/deployments-queue";
|
||||
import type { DeploymentJob } from "@/server/queues/queue-types";
|
||||
import {
|
||||
cleanQueuesByCompose,
|
||||
getJobsByComposeId,
|
||||
killDockerBuild,
|
||||
myQueue,
|
||||
} from "@/server/queues/queueSetup";
|
||||
@@ -252,12 +250,7 @@ export const composeRouter = createTRPCRouter({
|
||||
.returning();
|
||||
|
||||
if (!IS_CLOUD) {
|
||||
const queueJobs = await getJobsByComposeId(input.composeId);
|
||||
for (const job of queueJobs) {
|
||||
if (job.id) {
|
||||
deploymentWorker.cancelJob(job.id, "User requested cancellation");
|
||||
}
|
||||
}
|
||||
await cleanQueuesByCompose(input.composeId);
|
||||
}
|
||||
|
||||
const cleanupOperations = [
|
||||
@@ -430,10 +423,10 @@ export const composeRouter = createTRPCRouter({
|
||||
applicationType: "compose",
|
||||
descriptionLog: input.description || "",
|
||||
server: !!compose.serverId,
|
||||
serverId: compose.serverId ?? undefined,
|
||||
};
|
||||
|
||||
if (IS_CLOUD && compose.serverId) {
|
||||
jobData.serverId = compose.serverId;
|
||||
deploy(jobData).catch((error) => {
|
||||
console.error("Background deployment failed:", error);
|
||||
});
|
||||
@@ -479,9 +472,9 @@ export const composeRouter = createTRPCRouter({
|
||||
applicationType: "compose",
|
||||
descriptionLog: input.description || "",
|
||||
server: !!compose.serverId,
|
||||
serverId: compose.serverId ?? undefined,
|
||||
};
|
||||
if (IS_CLOUD && compose.serverId) {
|
||||
jobData.serverId = compose.serverId;
|
||||
deploy(jobData).catch((error) => {
|
||||
console.error("Background deployment failed:", error);
|
||||
});
|
||||
|
||||
@@ -86,10 +86,10 @@ export const previewDeploymentRouter = createTRPCRouter({
|
||||
applicationType: "application-preview",
|
||||
previewDeploymentId: input.previewDeploymentId,
|
||||
server: !!application.serverId,
|
||||
serverId: application.serverId ?? undefined,
|
||||
};
|
||||
|
||||
if (IS_CLOUD && application.serverId) {
|
||||
jobData.serverId = application.serverId;
|
||||
deploy(jobData).catch((error) => {
|
||||
console.error("Background deployment failed:", error);
|
||||
});
|
||||
|
||||
@@ -25,6 +25,7 @@ import { z } from "zod";
|
||||
import { updateServersBasedOnQuantity } from "@/pages/api/stripe/webhook";
|
||||
import {
|
||||
createTRPCRouter,
|
||||
enterpriseProcedure,
|
||||
protectedProcedure,
|
||||
withPermission,
|
||||
} from "@/server/api/trpc";
|
||||
@@ -34,6 +35,7 @@ import {
|
||||
apiFindOneServer,
|
||||
apiRemoveServer,
|
||||
apiUpdateServer,
|
||||
apiUpdateServerBuildsConcurrency,
|
||||
apiUpdateServerMonitoring,
|
||||
applications,
|
||||
compose,
|
||||
@@ -479,6 +481,20 @@ export const serverRouter = createTRPCRouter({
|
||||
throw error;
|
||||
}
|
||||
}),
|
||||
updateBuildsConcurrency: enterpriseProcedure
|
||||
.input(apiUpdateServerBuildsConcurrency)
|
||||
.mutation(async ({ input, ctx }) => {
|
||||
const currentServer = await findServerById(input.serverId);
|
||||
if (currentServer.organizationId !== ctx.session.activeOrganizationId) {
|
||||
throw new TRPCError({
|
||||
code: "UNAUTHORIZED",
|
||||
message: "You are not authorized to update this server",
|
||||
});
|
||||
}
|
||||
return await updateServerById(input.serverId, {
|
||||
buildsConcurrency: input.buildsConcurrency,
|
||||
});
|
||||
}),
|
||||
publicIp: protectedProcedure.query(async () => {
|
||||
if (IS_CLOUD) {
|
||||
return "";
|
||||
|
||||
@@ -67,6 +67,7 @@ import {
|
||||
apiServerSchema,
|
||||
apiTraefikConfig,
|
||||
apiUpdateDockerCleanup,
|
||||
apiUpdateWebServerBuildsConcurrency,
|
||||
projects,
|
||||
server,
|
||||
} from "@/server/db/schema";
|
||||
@@ -468,6 +469,28 @@ export const settingsRouter = createTRPCRouter({
|
||||
return true;
|
||||
}),
|
||||
|
||||
updateBuildsConcurrency: enterpriseProcedure
|
||||
.input(apiUpdateWebServerBuildsConcurrency)
|
||||
.mutation(async ({ input, ctx }) => {
|
||||
if (IS_CLOUD) {
|
||||
throw new TRPCError({
|
||||
code: "BAD_REQUEST",
|
||||
message: "This feature is only available for self-hosted instances",
|
||||
});
|
||||
}
|
||||
|
||||
await updateWebServerSettings({
|
||||
buildsConcurrency: input.buildsConcurrency,
|
||||
});
|
||||
|
||||
await audit(ctx, {
|
||||
action: "update",
|
||||
resourceType: "settings",
|
||||
resourceName: "builds-concurrency",
|
||||
});
|
||||
return true;
|
||||
}),
|
||||
|
||||
updateEnforceSSO: enterpriseProcedure
|
||||
.input(z.object({ enforceSSO: z.boolean() }))
|
||||
.mutation(async ({ input, ctx }) => {
|
||||
|
||||
66
apps/dokploy/server/queues/concurrency.ts
Normal file
66
apps/dokploy/server/queues/concurrency.ts
Normal file
@@ -0,0 +1,66 @@
|
||||
import { db } from "@dokploy/server/db";
|
||||
import { organization, server } from "@dokploy/server/db/schema";
|
||||
import { hasValidLicense } from "@dokploy/server/services/proprietary/license-key";
|
||||
import { getWebServerSettings } from "@dokploy/server/services/web-server-settings";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { LOCAL_PARTITION } from "./in-memory-queue";
|
||||
|
||||
/**
|
||||
* Resolve the effective builds concurrency for a queue partition.
|
||||
*
|
||||
* Concurrent deployments (concurrency > 1) are an enterprise feature: without a
|
||||
* valid license the effective concurrency is always clamped to 1, so the
|
||||
* community experience is unchanged and an expired license degrades gracefully
|
||||
* back to sequential deployments instead of breaking anything.
|
||||
*
|
||||
* - `LOCAL_PARTITION` -> concurrency stored on the web server settings (the
|
||||
* local Dokploy web server), gated by the owner organization's license.
|
||||
* - any other partition -> concurrency stored on the matching `server` row,
|
||||
* gated by that server's organization license.
|
||||
*/
|
||||
export const resolveBuildsConcurrency = async (
|
||||
partition: string,
|
||||
): Promise<number> => {
|
||||
try {
|
||||
if (partition === LOCAL_PARTITION) {
|
||||
return await resolveLocalConcurrency();
|
||||
}
|
||||
return await resolveServerConcurrency(partition);
|
||||
} catch (error) {
|
||||
console.error(
|
||||
"Failed to resolve builds concurrency, defaulting to 1",
|
||||
error,
|
||||
);
|
||||
return 1;
|
||||
}
|
||||
};
|
||||
|
||||
const clamp = (value: number, licensed: boolean): number => {
|
||||
if (!licensed) return 1;
|
||||
return Math.min(20, Math.max(1, value));
|
||||
};
|
||||
|
||||
const resolveLocalConcurrency = async (): Promise<number> => {
|
||||
const settings = await getWebServerSettings();
|
||||
const buildsConcurrency = settings?.buildsConcurrency ?? 1;
|
||||
|
||||
// Self-hosted is single-tenant; gate on any organization's license.
|
||||
const anyOrg = await db.query.organization.findFirst({
|
||||
columns: { id: true },
|
||||
});
|
||||
const licensed = anyOrg ? await hasValidLicense(anyOrg.id) : false;
|
||||
|
||||
return clamp(buildsConcurrency, licensed);
|
||||
};
|
||||
|
||||
const resolveServerConcurrency = async (serverId: string): Promise<number> => {
|
||||
const currentServer = await db.query.server.findFirst({
|
||||
where: eq(server.serverId, serverId),
|
||||
columns: { buildsConcurrency: true, organizationId: true },
|
||||
});
|
||||
|
||||
if (!currentServer) return 1;
|
||||
|
||||
const licensed = await hasValidLicense(currentServer.organizationId);
|
||||
return clamp(currentServer.buildsConcurrency, licensed);
|
||||
};
|
||||
@@ -2,7 +2,6 @@ import {
|
||||
deployApplication,
|
||||
deployCompose,
|
||||
deployPreviewApplication,
|
||||
IS_CLOUD,
|
||||
rebuildApplication,
|
||||
rebuildCompose,
|
||||
rebuildPreviewApplication,
|
||||
@@ -10,87 +9,69 @@ import {
|
||||
updateCompose,
|
||||
updatePreviewDeployment,
|
||||
} from "@dokploy/server";
|
||||
import { type Job, Worker } from "bullmq";
|
||||
import type { DeploymentJob } from "./queue-types";
|
||||
import { redisConfig } from "./redis-connection";
|
||||
import type { InMemoryJob } from "./in-memory-queue";
|
||||
|
||||
const createDeploymentWorker = () =>
|
||||
new Worker(
|
||||
"deployments",
|
||||
async (job: Job<DeploymentJob>) => {
|
||||
try {
|
||||
if (job.data.applicationType === "application") {
|
||||
await updateApplicationStatus(job.data.applicationId, "running");
|
||||
/**
|
||||
* Processes a single deployment job. Shared by the in-memory queue worker and
|
||||
* (in cloud) the direct background execution path.
|
||||
*/
|
||||
export const processDeploymentJob = async (job: InMemoryJob) => {
|
||||
try {
|
||||
if (job.data.applicationType === "application") {
|
||||
await updateApplicationStatus(job.data.applicationId, "running");
|
||||
|
||||
if (job.data.type === "redeploy") {
|
||||
await rebuildApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
} else if (job.data.type === "deploy") {
|
||||
await deployApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
}
|
||||
} else if (job.data.applicationType === "compose") {
|
||||
await updateCompose(job.data.composeId, {
|
||||
composeStatus: "running",
|
||||
});
|
||||
if (job.data.type === "deploy") {
|
||||
await deployCompose({
|
||||
composeId: job.data.composeId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
} else if (job.data.type === "redeploy") {
|
||||
await rebuildCompose({
|
||||
composeId: job.data.composeId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
}
|
||||
} else if (job.data.applicationType === "application-preview") {
|
||||
await updatePreviewDeployment(job.data.previewDeploymentId, {
|
||||
previewStatus: "running",
|
||||
});
|
||||
|
||||
if (job.data.type === "redeploy") {
|
||||
await rebuildPreviewApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
previewDeploymentId: job.data.previewDeploymentId,
|
||||
});
|
||||
} else if (job.data.type === "deploy") {
|
||||
await deployPreviewApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
previewDeploymentId: job.data.previewDeploymentId,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.log("Error", error);
|
||||
if (job.data.type === "redeploy") {
|
||||
await rebuildApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
} else if (job.data.type === "deploy") {
|
||||
await deployApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
}
|
||||
},
|
||||
{
|
||||
autorun: false,
|
||||
connection: redisConfig,
|
||||
},
|
||||
);
|
||||
} else if (job.data.applicationType === "compose") {
|
||||
await updateCompose(job.data.composeId, {
|
||||
composeStatus: "running",
|
||||
});
|
||||
if (job.data.type === "deploy") {
|
||||
await deployCompose({
|
||||
composeId: job.data.composeId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
} else if (job.data.type === "redeploy") {
|
||||
await rebuildCompose({
|
||||
composeId: job.data.composeId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
}
|
||||
} else if (job.data.applicationType === "application-preview") {
|
||||
await updatePreviewDeployment(job.data.previewDeploymentId, {
|
||||
previewStatus: "running",
|
||||
});
|
||||
|
||||
/** No-op worker when Redis is disabled (e.g. IS_CLOUD). Avoids BullMQ connection errors. */
|
||||
const noopWorker = {
|
||||
run: () => Promise.resolve(),
|
||||
close: () => Promise.resolve(),
|
||||
cancelJob: () => Promise.resolve(),
|
||||
cancelAllJobs: () => Promise.resolve(),
|
||||
if (job.data.type === "redeploy") {
|
||||
await rebuildPreviewApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
previewDeploymentId: job.data.previewDeploymentId,
|
||||
});
|
||||
} else if (job.data.type === "deploy") {
|
||||
await deployPreviewApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
previewDeploymentId: job.data.previewDeploymentId,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.log("Error", error);
|
||||
}
|
||||
};
|
||||
|
||||
export const deploymentWorker = !IS_CLOUD
|
||||
? createDeploymentWorker()
|
||||
: (noopWorker as unknown as Worker<DeploymentJob>);
|
||||
|
||||
262
apps/dokploy/server/queues/in-memory-queue.ts
Normal file
262
apps/dokploy/server/queues/in-memory-queue.ts
Normal file
@@ -0,0 +1,262 @@
|
||||
import type { DeploymentJob } from "./queue-types";
|
||||
|
||||
/**
|
||||
* In-memory deployment queue for self-hosted instances.
|
||||
*
|
||||
* Replaces BullMQ/Redis for deployments. The model is per-group FIFO with a
|
||||
* configurable concurrency per partition (server):
|
||||
*
|
||||
* - Jobs are partitioned by `serverId` (the local web server uses the
|
||||
* `LOCAL_PARTITION` key). Each partition runs up to `concurrency` jobs at
|
||||
* the same time, so two different applications can build concurrently.
|
||||
* - Within a partition, jobs that belong to the same group (same application
|
||||
* or compose) never run in parallel — they are serialized FIFO. This avoids
|
||||
* two builds of the same service stepping on each other (same code dir,
|
||||
* same container name, etc).
|
||||
*
|
||||
* The concurrency is resolved lazily per partition through `resolveConcurrency`
|
||||
* so it can be gated by the enterprise license at run time (a non-licensed
|
||||
* instance always resolves to 1).
|
||||
*
|
||||
* The public surface (`add`, `getJobs`, `close`, `on`) mirrors the subset of
|
||||
* BullMQ used by the routers so it can be a drop-in replacement.
|
||||
*/
|
||||
|
||||
export const LOCAL_PARTITION = "__local__";
|
||||
|
||||
export type JobState = "waiting" | "active";
|
||||
|
||||
export interface InMemoryJob {
|
||||
id: string;
|
||||
name: string;
|
||||
data: DeploymentJob;
|
||||
timestamp: number;
|
||||
processedOn?: number;
|
||||
finishedOn?: number;
|
||||
failedReason?: string;
|
||||
getState: () => Promise<JobState>;
|
||||
remove: () => Promise<void>;
|
||||
}
|
||||
|
||||
type Processor = (job: InMemoryJob) => Promise<void>;
|
||||
|
||||
/** Resolve the partition key (serverId) a job belongs to. */
|
||||
export const getPartition = (data: DeploymentJob): string =>
|
||||
data.serverId ?? LOCAL_PARTITION;
|
||||
|
||||
/** Resolve the FIFO group a job belongs to (the service being deployed). */
|
||||
export const getGroup = (data: DeploymentJob): string => {
|
||||
if (data.applicationType === "compose") {
|
||||
return `compose:${data.composeId}`;
|
||||
}
|
||||
return `application:${data.applicationId}`;
|
||||
};
|
||||
|
||||
interface InternalJob {
|
||||
id: string;
|
||||
name: string;
|
||||
data: DeploymentJob;
|
||||
timestamp: number;
|
||||
processedOn?: number;
|
||||
finishedOn?: number;
|
||||
failedReason?: string;
|
||||
state: JobState;
|
||||
partition: string;
|
||||
group: string;
|
||||
}
|
||||
|
||||
interface Partition {
|
||||
waiting: InternalJob[];
|
||||
/** Groups currently running in this partition. */
|
||||
activeGroups: Set<string>;
|
||||
active: InternalJob[];
|
||||
}
|
||||
|
||||
export interface InMemoryQueueOptions {
|
||||
/**
|
||||
* Returns the max number of jobs that may run in parallel for a given
|
||||
* partition. Called on every scheduling tick so license/config changes are
|
||||
* picked up without restarting the queue. Must return a value >= 1.
|
||||
*/
|
||||
resolveConcurrency: (partition: string) => Promise<number> | number;
|
||||
/** Monotonic clock; injectable for tests. Defaults to Date.now. */
|
||||
now?: () => number;
|
||||
}
|
||||
|
||||
export class InMemoryQueue {
|
||||
private partitions = new Map<string, Partition>();
|
||||
private processor: Processor | null = null;
|
||||
private running = false;
|
||||
private seq = 0;
|
||||
private readonly resolveConcurrency: InMemoryQueueOptions["resolveConcurrency"];
|
||||
private readonly now: () => number;
|
||||
|
||||
constructor(options: InMemoryQueueOptions) {
|
||||
this.resolveConcurrency = options.resolveConcurrency;
|
||||
this.now = options.now ?? (() => Date.now());
|
||||
}
|
||||
|
||||
private getPartitionState(key: string): Partition {
|
||||
let partition = this.partitions.get(key);
|
||||
if (!partition) {
|
||||
partition = { waiting: [], activeGroups: new Set(), active: [] };
|
||||
this.partitions.set(key, partition);
|
||||
}
|
||||
return partition;
|
||||
}
|
||||
|
||||
/**
|
||||
* Register the worker that processes each job. Registering a processor also
|
||||
* starts the queue: in dev (tsx/Next) the module that calls `run()` and the
|
||||
* module that calls `add()` can resolve to different instances, so we must
|
||||
* not depend on a separate `run()` call to flip `running` on.
|
||||
*/
|
||||
process(processor: Processor) {
|
||||
this.processor = processor;
|
||||
this.running = true;
|
||||
this.schedule();
|
||||
}
|
||||
|
||||
run() {
|
||||
this.running = true;
|
||||
this.schedule();
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
async add(data: DeploymentJob): Promise<{ id: string }> {
|
||||
const id = `job-${++this.seq}`;
|
||||
const partitionKey = getPartition(data);
|
||||
const job: InternalJob = {
|
||||
id,
|
||||
name: "deployments",
|
||||
data,
|
||||
timestamp: this.now(),
|
||||
state: "waiting",
|
||||
partition: partitionKey,
|
||||
group: getGroup(data),
|
||||
};
|
||||
this.getPartitionState(partitionKey).waiting.push(job);
|
||||
this.schedule();
|
||||
return { id };
|
||||
}
|
||||
|
||||
private toPublic(job: InternalJob): InMemoryJob {
|
||||
return {
|
||||
id: job.id,
|
||||
name: job.name,
|
||||
data: job.data,
|
||||
timestamp: job.timestamp,
|
||||
processedOn: job.processedOn,
|
||||
finishedOn: job.finishedOn,
|
||||
getState: () => Promise.resolve(job.state),
|
||||
remove: () => this.remove(job.id),
|
||||
};
|
||||
}
|
||||
|
||||
/** Snapshot of jobs in the requested states (defaults to waiting + active). */
|
||||
getJobs(states?: JobState[]): Promise<InMemoryJob[]> {
|
||||
const wantWaiting = !states || states.includes("waiting");
|
||||
const wantActive = !states || states.includes("active");
|
||||
const jobs: InMemoryJob[] = [];
|
||||
for (const partition of this.partitions.values()) {
|
||||
if (wantWaiting) {
|
||||
jobs.push(...partition.waiting.map((job) => this.toPublic(job)));
|
||||
}
|
||||
if (wantActive) {
|
||||
jobs.push(...partition.active.map((job) => this.toPublic(job)));
|
||||
}
|
||||
}
|
||||
return Promise.resolve(jobs);
|
||||
}
|
||||
|
||||
/** Remove a single waiting job by id. Active jobs cannot be removed. */
|
||||
remove(id: string): Promise<void> {
|
||||
for (const partition of this.partitions.values()) {
|
||||
const before = partition.waiting.length;
|
||||
partition.waiting = partition.waiting.filter((job) => job.id !== id);
|
||||
if (partition.waiting.length !== before) break;
|
||||
}
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
/** Remove waiting jobs matching a predicate. Active jobs are not affected. */
|
||||
removeWaiting(predicate: (data: DeploymentJob) => boolean): number {
|
||||
let removed = 0;
|
||||
for (const partition of this.partitions.values()) {
|
||||
partition.waiting = partition.waiting.filter((job) => {
|
||||
const match = predicate(job.data);
|
||||
if (match) removed++;
|
||||
return !match;
|
||||
});
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
|
||||
/** Drop every waiting job across all partitions. */
|
||||
clearWaiting(): number {
|
||||
let removed = 0;
|
||||
for (const partition of this.partitions.values()) {
|
||||
removed += partition.waiting.length;
|
||||
partition.waiting = [];
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
|
||||
on() {
|
||||
// No-op: kept for BullMQ API compatibility (error events, etc).
|
||||
}
|
||||
|
||||
close() {
|
||||
this.running = false;
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
private schedule() {
|
||||
if (!this.running || !this.processor) return;
|
||||
for (const key of this.partitions.keys()) {
|
||||
void this.drainPartition(key);
|
||||
}
|
||||
}
|
||||
|
||||
private async drainPartition(key: string) {
|
||||
const partition = this.partitions.get(key);
|
||||
if (!partition || !this.processor) return;
|
||||
|
||||
const concurrency = Math.max(1, await this.resolveConcurrency(key));
|
||||
|
||||
while (partition.active.length < concurrency) {
|
||||
// First waiting job whose group is not already running.
|
||||
const index = partition.waiting.findIndex(
|
||||
(job) => !partition.activeGroups.has(job.group),
|
||||
);
|
||||
if (index === -1) break;
|
||||
|
||||
const job = partition.waiting.splice(index, 1)[0];
|
||||
if (!job) break;
|
||||
job.state = "active";
|
||||
job.processedOn = this.now();
|
||||
partition.activeGroups.add(job.group);
|
||||
partition.active.push(job);
|
||||
|
||||
void this.runJob(job);
|
||||
}
|
||||
}
|
||||
|
||||
private async runJob(job: InternalJob) {
|
||||
try {
|
||||
await this.processor?.(this.toPublic(job));
|
||||
} catch (error) {
|
||||
job.failedReason = error instanceof Error ? error.message : String(error);
|
||||
console.error("In-memory deployment job failed", error);
|
||||
} finally {
|
||||
job.finishedOn = this.now();
|
||||
const partition = this.partitions.get(job.partition);
|
||||
if (partition) {
|
||||
partition.active = partition.active.filter((j) => j.id !== job.id);
|
||||
partition.activeGroups.delete(job.group);
|
||||
}
|
||||
// A slot (and possibly the group) freed up — try to schedule more.
|
||||
void this.drainPartition(job.partition);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,32 +3,89 @@ import {
|
||||
execAsync,
|
||||
execAsyncRemote,
|
||||
} from "@dokploy/server/utils/process/execAsync";
|
||||
import type { Job } from "bullmq";
|
||||
import { Queue } from "bullmq";
|
||||
import { deploymentWorker } from "./deployments-queue";
|
||||
import { redisConfig } from "./redis-connection";
|
||||
import { resolveBuildsConcurrency } from "./concurrency";
|
||||
import { processDeploymentJob } from "./deployments-queue";
|
||||
import { type InMemoryJob, InMemoryQueue } from "./in-memory-queue";
|
||||
import type { DeploymentJob } from "./queue-types";
|
||||
|
||||
/** No-op queue when Redis is disabled (e.g. IS_CLOUD). Avoids BullMQ connection errors. */
|
||||
const createNoopQueue = () => ({
|
||||
getJobs: () => Promise.resolve([] as Job[]),
|
||||
add: () =>
|
||||
Promise.resolve({ id: "noop", remove: () => Promise.resolve() } as Job),
|
||||
/**
|
||||
* Deployment queue.
|
||||
*
|
||||
* Self-hosted uses an in-memory, per-group FIFO queue with configurable
|
||||
* concurrency per server (enterprise-gated). Cloud does not use the queue at
|
||||
* all — deployments run directly in the background — so we expose a no-op.
|
||||
*/
|
||||
|
||||
interface DeploymentQueue {
|
||||
add: (
|
||||
name: string,
|
||||
data: DeploymentJob,
|
||||
opts?: Record<string, unknown>,
|
||||
) => Promise<{ id: string }>;
|
||||
getJobs: (states?: Array<"waiting" | "active">) => Promise<InMemoryJob[]>;
|
||||
close: () => Promise<void>;
|
||||
on: (...args: unknown[]) => void;
|
||||
run: () => Promise<void>;
|
||||
removeWaiting: (predicate: (data: DeploymentJob) => boolean) => number;
|
||||
clearWaiting: () => number;
|
||||
}
|
||||
|
||||
const createNoopQueue = (): DeploymentQueue => ({
|
||||
add: () => Promise.resolve({ id: "noop" }),
|
||||
getJobs: () => Promise.resolve([]),
|
||||
close: () => Promise.resolve(),
|
||||
on: () => {},
|
||||
run: () => Promise.resolve(),
|
||||
removeWaiting: () => 0,
|
||||
clearWaiting: () => 0,
|
||||
});
|
||||
|
||||
const myQueue = !IS_CLOUD
|
||||
? new Queue("deployments", { connection: redisConfig })
|
||||
: (createNoopQueue() as unknown as Queue);
|
||||
const createInMemoryQueue = (): DeploymentQueue => {
|
||||
const queue = new InMemoryQueue({
|
||||
resolveConcurrency: resolveBuildsConcurrency,
|
||||
});
|
||||
queue.process(processDeploymentJob);
|
||||
|
||||
return {
|
||||
add: (_name, data) => queue.add(data),
|
||||
getJobs: (states) => queue.getJobs(states),
|
||||
close: () => queue.close(),
|
||||
on: () => {},
|
||||
run: () => queue.run(),
|
||||
removeWaiting: (predicate) => queue.removeWaiting(predicate),
|
||||
clearWaiting: () => queue.clearWaiting(),
|
||||
};
|
||||
};
|
||||
|
||||
// Use a global singleton so the deployment queue is shared across every module
|
||||
// instance. In dev (tsx/Next) the same file can be evaluated more than once
|
||||
// (relative import in server.ts vs `@/` alias in the routers); without this the
|
||||
// worker and the `add()` calls would land on different queue instances.
|
||||
const globalForQueue = globalThis as unknown as {
|
||||
__dokployDeploymentQueue?: DeploymentQueue;
|
||||
};
|
||||
|
||||
if (!globalForQueue.__dokployDeploymentQueue) {
|
||||
globalForQueue.__dokployDeploymentQueue = !IS_CLOUD
|
||||
? createInMemoryQueue()
|
||||
: createNoopQueue();
|
||||
}
|
||||
|
||||
const myQueue: DeploymentQueue = globalForQueue.__dokployDeploymentQueue;
|
||||
|
||||
/** Start processing jobs. Called once on server startup (self-hosted). */
|
||||
export const startDeploymentWorker = () => myQueue.run();
|
||||
|
||||
export const getJobsByApplicationId = async (applicationId: string) => {
|
||||
const jobs = await myQueue.getJobs();
|
||||
return jobs.filter((job) => job?.data?.applicationId === applicationId);
|
||||
return jobs.filter(
|
||||
(job) => (job.data as any)?.applicationId === applicationId,
|
||||
);
|
||||
};
|
||||
|
||||
export const getJobsByComposeId = async (composeId: string) => {
|
||||
const jobs = await myQueue.getJobs();
|
||||
return jobs.filter((job) => job?.data?.composeId === composeId);
|
||||
return jobs.filter((job) => (job.data as any)?.composeId === composeId);
|
||||
};
|
||||
|
||||
if (!IS_CLOUD) {
|
||||
@@ -36,44 +93,33 @@ if (!IS_CLOUD) {
|
||||
myQueue.close();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
myQueue.on("error", (error) => {
|
||||
if ((error as any).code === "ECONNREFUSED") {
|
||||
console.error(
|
||||
"Make sure you have installed Redis and it is running.",
|
||||
error,
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
export const cleanQueuesByApplication = async (applicationId: string) => {
|
||||
const jobs = await myQueue.getJobs(["waiting", "delayed"]);
|
||||
const removed = myQueue.removeWaiting(
|
||||
(data) => (data as any)?.applicationId === applicationId,
|
||||
);
|
||||
if (removed > 0) {
|
||||
console.log(
|
||||
`Removed ${removed} waiting job(s) for application ${applicationId}`,
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
for (const job of jobs) {
|
||||
if (job?.data?.applicationId === applicationId) {
|
||||
await job.remove();
|
||||
console.log(`Removed job ${job.id} for application ${applicationId}`);
|
||||
}
|
||||
export const cleanQueuesByCompose = async (composeId: string) => {
|
||||
const removed = myQueue.removeWaiting(
|
||||
(data) => (data as any)?.composeId === composeId,
|
||||
);
|
||||
if (removed > 0) {
|
||||
console.log(`Removed ${removed} waiting job(s) for compose ${composeId}`);
|
||||
}
|
||||
};
|
||||
|
||||
export const cleanAllDeploymentQueue = async () => {
|
||||
deploymentWorker.cancelAllJobs("User requested cancellation");
|
||||
myQueue.clearWaiting();
|
||||
return true;
|
||||
};
|
||||
|
||||
export const cleanQueuesByCompose = async (composeId: string) => {
|
||||
const jobs = await myQueue.getJobs(["waiting", "delayed"]);
|
||||
|
||||
for (const job of jobs) {
|
||||
if (job?.data?.composeId === composeId) {
|
||||
await job.remove();
|
||||
console.log(`Removed job ${job.id} for compose ${composeId}`);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
export const killDockerBuild = async (
|
||||
type: "application" | "compose",
|
||||
serverId: string | null,
|
||||
|
||||
@@ -71,8 +71,8 @@ void app.prepare().then(async () => {
|
||||
|
||||
if (!IS_CLOUD) {
|
||||
console.log("Starting Deployment Worker");
|
||||
const { deploymentWorker } = await import("./queues/deployments-queue");
|
||||
await deploymentWorker.run();
|
||||
const { startDeploymentWorker } = await import("./queues/queueSetup");
|
||||
await startDeploymentWorker();
|
||||
}
|
||||
} catch (e) {
|
||||
console.error("Main Server Error", e);
|
||||
|
||||
@@ -41,6 +41,7 @@ export const server = pgTable("server", {
|
||||
.notNull()
|
||||
.$defaultFn(() => generateAppName("server")),
|
||||
enableDockerCleanup: boolean("enableDockerCleanup").notNull().default(false),
|
||||
buildsConcurrency: integer("buildsConcurrency").notNull().default(1),
|
||||
createdAt: text("createdAt").notNull(),
|
||||
organizationId: text("organizationId")
|
||||
.notNull()
|
||||
@@ -182,6 +183,11 @@ export const apiUpdateServer = createSchema
|
||||
enableDockerCleanup: z.boolean().default(true),
|
||||
});
|
||||
|
||||
export const apiUpdateServerBuildsConcurrency = z.object({
|
||||
serverId: z.string().min(1),
|
||||
buildsConcurrency: z.number().int().min(1).max(20),
|
||||
});
|
||||
|
||||
export const apiUpdateServerMonitoring = createSchema
|
||||
.pick({
|
||||
serverId: true,
|
||||
|
||||
@@ -1,5 +1,12 @@
|
||||
import { relations } from "drizzle-orm";
|
||||
import { boolean, jsonb, pgTable, text, timestamp } from "drizzle-orm/pg-core";
|
||||
import {
|
||||
boolean,
|
||||
integer,
|
||||
jsonb,
|
||||
pgTable,
|
||||
text,
|
||||
timestamp,
|
||||
} from "drizzle-orm/pg-core";
|
||||
import { createInsertSchema } from "drizzle-zod";
|
||||
import { nanoid } from "nanoid";
|
||||
import { z } from "zod";
|
||||
@@ -98,6 +105,8 @@ export const webServerSettings = pgTable("webServerSettings", {
|
||||
}),
|
||||
// Deployment Configuration (self-hosted only)
|
||||
remoteServersOnly: boolean("remoteServersOnly").notNull().default(false),
|
||||
// Concurrent builds on the local web server (enterprise-gated to > 1)
|
||||
buildsConcurrency: integer("buildsConcurrency").notNull().default(1),
|
||||
// Auth Configuration (self-hosted only)
|
||||
enforceSSO: boolean("enforceSSO").notNull().default(false),
|
||||
// Cache Cleanup Configuration
|
||||
@@ -161,6 +170,11 @@ export const apiUpdateWebServerSettings = createSchema.partial().extend({
|
||||
cleanupCacheOnCompose: z.boolean().optional(),
|
||||
remoteServersOnly: z.boolean().optional(),
|
||||
enforceSSO: z.boolean().optional(),
|
||||
buildsConcurrency: z.number().int().min(1).max(20).optional(),
|
||||
});
|
||||
|
||||
export const apiUpdateWebServerBuildsConcurrency = z.object({
|
||||
buildsConcurrency: z.number().int().min(1).max(20),
|
||||
});
|
||||
|
||||
export const apiAssignDomain = z
|
||||
|
||||
Reference in New Issue
Block a user