Compare commits

..

18 Commits

Author SHA1 Message Date
Mauricio Siu
2da2b2dd39 refactor(queues): migrate from BullMQ to p-limit for deployment management
This commit introduces a new queue system using p-limit, addressing resource issues and improving job cancellation capabilities. Key changes include:
- Removal of Redis dependency, allowing for in-memory queue management.
- Implementation of per-server queues with ordered processing based on server concurrency settings.
- Addition of helper functions for job management and status retrieval, ensuring backward compatibility with existing API endpoints.
- Updates to database schema to support server concurrency settings.

The legacy BullMQ code has been retained for compatibility but is no longer in active use.
2025-08-29 00:08:33 -06:00
Mauricio Siu
7273c636a0 Merge pull request #2461 from Dokploy/fix/re-apply-database-migration-fix
Reapply "refactor: update database connection handling and remove unu…
2025-08-28 19:21:28 -06:00
Mauricio Siu
d6a0585bae chore(package): update dokploy version to v0.25.0 2025-08-28 19:03:37 -06:00
Mauricio Siu
935d1686f2 chore: add new branch for database migration fix in Dokploy workflow 2025-08-28 19:02:21 -06:00
Mauricio Siu
349248105a Merge pull request #2482 from Dokploy/2470-post-rediscreate-returns-true-instead-of-the-redis-payload
fix(redis): return newRedis object instead of true in redis router
2025-08-28 18:43:04 -06:00
Mauricio Siu
d922568510 fix(redis): return newRedis object instead of true in redis router 2025-08-28 18:42:21 -06:00
Mauricio Siu
44ae4df151 fix(settings): change user subscription query to protected procedure 2025-08-28 18:27:47 -06:00
Mauricio Siu
77fdda4c09 Merge pull request #2481 from Dokploy/feat/allow-chatwoot-on-paid-users
feat(settings): add user subscription check to dashboard layout
2025-08-28 18:27:05 -06:00
Mauricio Siu
8a1e36cc3b feat(settings): add user subscription check to dashboard layout 2025-08-28 18:26:05 -06:00
Mauricio Siu
1635bab44f Reapply "refactor: update database connection handling and remove unused migra…"
This reverts commit 17f333ac2a.
2025-08-24 23:49:48 -06:00
Mauricio Siu
4a52459015 Merge pull request #2460 from Dokploy/revert-2459-2234-database-migration-fails-with-password-authentication-failed-when-using-a-custom-postgres_password
Revert "refactor: update database connection handling and remove unused migra…"
2025-08-24 23:44:23 -06:00
Mauricio Siu
17f333ac2a Revert "refactor: update database connection handling and remove unused migra…" 2025-08-24 23:44:00 -06:00
Mauricio Siu
d770307d64 Merge pull request #2459 from Dokploy/2234-database-migration-fails-with-password-authentication-failed-when-using-a-custom-postgres_password
refactor: update database connection handling and remove unused migra…
2025-08-24 23:43:52 -06:00
Mauricio Siu
aa434cbdea feat(db): add database connection setup using drizzle-orm for PostgreSQL 2025-08-24 16:25:04 -06:00
Mauricio Siu
c42054b965 feat(migration): implement database migration functionality using drizzle-orm 2025-08-24 16:22:42 -06:00
Mauricio Siu
03588bf375 chore: remove console.log statement from esbuild configuration 2025-08-24 16:21:01 -06:00
Mauricio Siu
8c420ff4f5 refactor: update package.json to use TypeScript source files instead of compiled JavaScript 2025-08-24 16:20:32 -06:00
Mauricio Siu
cbf6f95891 refactor: update database connection handling and remove unused migration and seed files 2025-08-24 16:19:33 -06:00
27 changed files with 7254 additions and 450 deletions

View File

@@ -2,7 +2,7 @@ name: Dokploy Docker Build
on:
push:
branches: [main, canary]
branches: [main, canary, "fix/re-apply-database-migration-fix"]
workflow_dispatch:
env:

View File

@@ -79,7 +79,7 @@ export const ShowGeneralApplication = ({ applicationId }: Props) => {
>
<Button
variant="default"
isLoading={data?.applicationStatus === "running"}
// isLoading={data?.applicationStatus === "running"}
className="flex items-center gap-1.5 group focus-visible:ring-2 focus-visible:ring-offset-2"
>
<Tooltip>

View File

@@ -11,11 +11,20 @@ interface Props {
export const DashboardLayout = ({ children }: Props) => {
const { data: haveRootAccess } = api.user.haveRootAccess.useQuery();
const { data: isCloud } = api.settings.isCloud.useQuery();
const { data: isUserSubscribed } = api.settings.isUserSubscribed.useQuery(
undefined,
{
enabled: isCloud === true,
refetchOnWindowFocus: false,
refetchOnMount: false,
refetchOnReconnect: false,
},
);
return (
<>
<Page>{children}</Page>
{isCloud === true && (
{isCloud === true && isUserSubscribed === true && (
<ChatwootWidget websiteToken="USCpQRKzHvFMssf3p6Eacae5" />
)}

View File

@@ -0,0 +1,2 @@
ALTER TABLE "user_temp" ADD COLUMN "serverConcurrency" integer DEFAULT 1 NOT NULL;--> statement-breakpoint
ALTER TABLE "server" ADD COLUMN "concurrency" integer DEFAULT 1 NOT NULL;

File diff suppressed because it is too large Load Diff

View File

@@ -750,6 +750,13 @@
"when": 1754912062243,
"tag": "0106_purple_maggott",
"breakpoints": true
},
{
"idx": 107,
"version": "7",
"when": 1756436825081,
"tag": "0107_calm_power_pack",
"breakpoints": true
}
]
}

View File

@@ -7,6 +7,10 @@ function prepareDefine(config: DotenvParseOutput | undefined) {
const define = {};
// @ts-ignore
for (const [key, value] of Object.entries(config)) {
// Skip DATABASE_URL to allow runtime environment variable override
if (key === "DATABASE_URL") {
continue;
}
// @ts-ignore
define[`process.env.${key}`] = JSON.stringify(value);
}
@@ -14,6 +18,7 @@ function prepareDefine(config: DotenvParseOutput | undefined) {
}
const define = prepareDefine(result.parsed);
try {
esbuild
.build({

View File

@@ -1,149 +0,0 @@
// import { drizzle } from "drizzle-orm/postgres-js";
// import { nanoid } from "nanoid";
// import postgres from "postgres";
// import * as schema from "./server/db/schema";
// const connectionString = process.env.DATABASE_URL!;
// const sql = postgres(connectionString, { max: 1 });
// const db = drizzle(sql, {
// schema,
// });
// await db
// .transaction(async (db) => {
// const admins = await db.query.admins.findMany({
// with: {
// auth: true,
// users: {
// with: {
// auth: true,
// },
// },
// },
// });
// for (const admin of admins) {
// const user = await db
// .insert(schema.users_temp)
// .values({
// id: admin.adminId,
// email: admin.auth.email,
// token: admin.auth.token || "",
// emailVerified: true,
// updatedAt: new Date(),
// role: "admin",
// serverIp: admin.serverIp,
// image: admin.auth.image,
// certificateType: admin.certificateType,
// host: admin.host,
// letsEncryptEmail: admin.letsEncryptEmail,
// sshPrivateKey: admin.sshPrivateKey,
// enableDockerCleanup: admin.enableDockerCleanup,
// enableLogRotation: admin.enableLogRotation,
// enablePaidFeatures: admin.enablePaidFeatures,
// metricsConfig: admin.metricsConfig,
// cleanupCacheApplications: admin.cleanupCacheApplications,
// cleanupCacheOnPreviews: admin.cleanupCacheOnPreviews,
// cleanupCacheOnCompose: admin.cleanupCacheOnCompose,
// stripeCustomerId: admin.stripeCustomerId,
// stripeSubscriptionId: admin.stripeSubscriptionId,
// serversQuantity: admin.serversQuantity,
// })
// .returning()
// .then((user) => user[0]);
// await db.insert(schema.account).values({
// providerId: "credential",
// userId: user?.id || "",
// password: admin.auth.password,
// is2FAEnabled: admin.auth.is2FAEnabled || false,
// createdAt: new Date(admin.auth.createdAt) || new Date(),
// updatedAt: new Date(admin.auth.createdAt) || new Date(),
// });
// const organization = await db
// .insert(schema.organization)
// .values({
// name: "My Organization",
// slug: nanoid(),
// ownerId: user?.id || "",
// createdAt: new Date(admin.createdAt) || new Date(),
// })
// .returning()
// .then((organization) => organization[0]);
// for (const member of admin.users) {
// const userTemp = await db
// .insert(schema.users_temp)
// .values({
// id: member.userId,
// email: member.auth.email,
// token: member.token || "",
// emailVerified: true,
// updatedAt: new Date(admin.createdAt) || new Date(),
// role: "user",
// image: member.auth.image,
// createdAt: admin.createdAt,
// canAccessToAPI: member.canAccessToAPI || false,
// canAccessToDocker: member.canAccessToDocker || false,
// canAccessToGitProviders: member.canAccessToGitProviders || false,
// canAccessToSSHKeys: member.canAccessToSSHKeys || false,
// canAccessToTraefikFiles: member.canAccessToTraefikFiles || false,
// canCreateProjects: member.canCreateProjects || false,
// canCreateServices: member.canCreateServices || false,
// canDeleteProjects: member.canDeleteProjects || false,
// canDeleteServices: member.canDeleteServices || false,
// accessedProjects: member.accessedProjects || [],
// accessedServices: member.accessedServices || [],
// })
// .returning()
// .then((userTemp) => userTemp[0]);
// await db.insert(schema.account).values({
// providerId: "credential",
// userId: member?.userId || "",
// password: member.auth.password,
// is2FAEnabled: member.auth.is2FAEnabled || false,
// createdAt: new Date(member.auth.createdAt) || new Date(),
// updatedAt: new Date(member.auth.createdAt) || new Date(),
// });
// await db.insert(schema.member).values({
// organizationId: organization?.id || "",
// userId: userTemp?.id || "",
// role: "admin",
// createdAt: new Date(member.createdAt) || new Date(),
// });
// }
// }
// })
// .then(() => {
// console.log("Migration finished");
// })
// .catch((error) => {
// console.error(error);
// });
// await db
// .transaction(async (db) => {
// const projects = await db.query.projects.findMany({
// with: {
// user: {
// with: {
// organizations: true,
// },
// },
// },
// });
// for (const project of projects) {
// const _user = await db.update(schema.projects).set({
// organizationId: project.user.organizations[0]?.id || "",
// });
// }
// })
// .then(() => {
// console.log("Migration finished");
// })
// .catch((error) => {
// console.error(error);
// });

View File

@@ -1,6 +1,6 @@
{
"name": "dokploy",
"version": "v0.24.12",
"version": "v0.25.0",
"private": true,
"license": "Apache-2.0",
"type": "module",
@@ -97,7 +97,6 @@
"better-auth": "v1.2.8-beta.7",
"bl": "6.0.11",
"boxen": "^7.1.1",
"bullmq": "5.4.2",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"cmdk": "^0.2.1",
@@ -126,6 +125,7 @@
"nodemailer": "6.9.14",
"octokit": "3.1.2",
"otpauth": "^9.4.0",
"p-limit": "^7.1.1",
"pino": "9.4.0",
"pino-pretty": "11.2.2",
"postgres": "3.4.4",

View File

@@ -54,7 +54,11 @@ import {
applications,
} from "@/server/db/schema";
import type { DeploymentJob } from "@/server/queues/queue-types";
import { cleanQueuesByApplication, myQueue } from "@/server/queues/queueSetup";
import {
addJobWithUserContext,
cleanQueuesByApplication,
myQueue,
} from "@/server/queues/queueSetup";
import { deploy } from "@/server/utils/deploy";
import { uploadFileSchema } from "@/utils/schema";
@@ -668,14 +672,7 @@ export const applicationRouter = createTRPCRouter({
return true;
}
await myQueue.add(
"deployments",
{ ...jobData },
{
removeOnComplete: true,
removeOnFail: true,
},
);
await addJobWithUserContext({ ...jobData }, ctx.user.id);
}),
cleanQueues: protectedProcedure

View File

@@ -80,7 +80,7 @@ export const redisRouter = createTRPCRouter({
type: "volume",
});
return true;
return newRedis;
} catch (error) {
throw error;
}

View File

@@ -45,7 +45,7 @@ import {
} from "@dokploy/server";
import { generateOpenApiDocument } from "@dokploy/trpc-openapi";
import { TRPCError } from "@trpc/server";
import { sql } from "drizzle-orm";
import { eq, sql } from "drizzle-orm";
import { dump, load } from "js-yaml";
import { scheduledJobs, scheduleJob } from "node-schedule";
import { z } from "zod";
@@ -60,6 +60,8 @@ import {
apiServerSchema,
apiTraefikConfig,
apiUpdateDockerCleanup,
projects,
server,
} from "@/server/db/schema";
import { removeJob, schedule } from "@/server/utils/backup";
import packageInfo from "../../../package.json";
@@ -706,6 +708,18 @@ export const settingsRouter = createTRPCRouter({
isCloud: publicProcedure.query(async () => {
return IS_CLOUD;
}),
isUserSubscribed: protectedProcedure.query(async ({ ctx }) => {
const haveServers = await db.query.server.findMany({
where: eq(server.organizationId, ctx.session?.activeOrganizationId || ""),
});
const haveProjects = await db.query.projects.findMany({
where: eq(
projects.organizationId,
ctx.session?.activeOrganizationId || "",
),
});
return haveServers.length > 0 || haveProjects.length > 0;
}),
health: publicProcedure.query(async () => {
if (IS_CLOUD) {
try {

View File

@@ -6,14 +6,18 @@ declare global {
var db: PostgresJsDatabase<typeof schema> | undefined;
}
const dbUrl =
process.env.DATABASE_URL ||
"postgres://dokploy:amukds4wi9001583845717ad2@dokploy-postgres:5432/dokploy";
export let db: PostgresJsDatabase<typeof schema>;
if (process.env.NODE_ENV === "production") {
db = drizzle(postgres(process.env.DATABASE_URL!), {
db = drizzle(postgres(dbUrl!), {
schema,
});
} else {
if (!global.db)
global.db = drizzle(postgres(process.env.DATABASE_URL!), {
global.db = drizzle(postgres(dbUrl!), {
schema,
});

View File

@@ -1,29 +0,0 @@
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";
const connectionString = process.env.DATABASE_URL!;
const pg = postgres(connectionString, { max: 1 });
const _db = drizzle(pg);
async function seed() {
console.log("> Seed:", process.env.DATABASE_PATH, "\n");
// const authenticationR = await db
// .insert(users)
// .values([
// {
// email: "user1@hotmail.com",
// password: password("12345671"),
// },
// ])
// .onConflictDoNothing()
// .returning();
// console.log("\nSemillas Update:", authenticationR.length);
}
seed().catch((e) => {
console.error(e);
process.exit(1);
});

View File

@@ -0,0 +1,104 @@
# Queue System Migration - BullMQ to p-limit
This directory contains the new queue system that replaces BullMQ with [p-limit](https://github.com/sindresorhus/p-limit) for deployment queues.
## Why the Migration?
- **Resource Issues**: Users experienced freezing during builds due to resource constraints
- **Cancellation Problems**: BullMQ workers couldn't be properly canceled when Docker processes restart
- **Retry Loops**: Unwanted automatic retries when processes are killed
## New Architecture
### Key Features
1. **Per-Server Queues**: Deployments are grouped by server (local "dokploy-server" or remote servers)
2. **Ordered Processing**: Within each server, deployments are processed based on server concurrency settings
3. **Global User Concurrency**: User's `serverConcurrency` controls total deployments across all servers
4. **Proper Cancellation**: Jobs can be canceled using AbortController
5. **No Redis Dependency**: In-memory queues eliminate Redis dependency issues
### Files
- `service-queue.ts` - New p-limit based queue implementation
- `queueSetup.ts` - Compatibility layer for existing code
- `deployments-queue.ts` - Legacy compatibility exports
- `queue-types.ts` - Shared type definitions
## Usage Examples
```typescript
import { addJobWithUserContext, cancelDeploymentJobs, getDeploymentQueueStatus } from './queueSetup';
// Add a deployment job with user context (recommended for API routes)
const result = await addJobWithUserContext({
applicationType: 'application',
applicationId: '123',
type: 'deploy',
titleLog: 'Deploying app',
descriptionLog: 'Starting deployment',
serverId: 'server-456' // Optional - for remote deployments
}, 'user-id-789'); // User ID for concurrency settings
// Cancel jobs for a service
const cancelled = cancelDeploymentJobs('app-123');
// Get queue status
const status = getDeploymentQueueStatus('app-123');
```
### Database-Driven Concurrency
The system now automatically reads concurrency settings from the database:
1. **Global User Concurrency**: From `users_temp.serverConcurrency` field
- Controls the **TOTAL** number of deployments that can run simultaneously for a user
- Example: If `serverConcurrency = 1`, only 1 deployment across ALL services at a time
- Example: If `serverConcurrency = 3`, maximum 3 deployments can run simultaneously across all services
2. **Server Concurrency**: From `server.concurrency` field
- Controls how many deployments can run simultaneously **on a specific server**
- Only applies when deploying to remote servers (`serverId` is present)
- Example: Server A can handle 2 concurrent deployments, Server B can handle 1
### Concurrency Hierarchy
```
User Global Limit (users_temp.serverConcurrency)
├── dokploy-server (local deployments)
│ ├── App A deployment
│ ├── App B deployment
│ └── Compose C deployment
├── remote-server-1 (server.concurrency = 2)
│ ├── App D deployment
│ └── App E deployment
└── remote-server-2 (server.concurrency = 1)
└── App F deployment
```
**Example Scenarios:**
- **User has `serverConcurrency = 1`**: Only 1 deployment total across ALL servers
- **User has `serverConcurrency = 3`**: Maximum 3 deployments simultaneously across all servers
- **Local server**: All local apps/compose share the "dokploy-server" queue
- **Remote server with `concurrency = 2`**: That server can handle up to 2 concurrent deployments
- **Queue grouping**: `app-123` and `app-456` on same server share the same queue
## Configuration
- **Global Concurrency**: Set how many services can deploy simultaneously
- **Service Concurrency**: Each service processes 1 deployment at a time (FIFO)
```typescript
import { setGlobalConcurrency } from './service-queue';
// Allow 5 services to deploy simultaneously
setGlobalConcurrency(5);
```
## Migration Notes
- The schedules app still uses BullMQ for cron/repeatable jobs (different use case)
- All existing API endpoints work unchanged due to compatibility layer
- No breaking changes to existing functionality
- Improved resource usage and cancellation capabilities

View File

@@ -1,122 +1,58 @@
import {
deployApplication,
deployCompose,
deployPreviewApplication,
deployRemoteApplication,
deployRemoteCompose,
deployRemotePreviewApplication,
rebuildApplication,
rebuildCompose,
rebuildRemoteApplication,
rebuildRemoteCompose,
updateApplicationStatus,
updateCompose,
updatePreviewDeployment,
} from "@dokploy/server";
import { type Job, Worker } from "bullmq";
import type { DeploymentJob } from "./queue-types";
import { redisConfig } from "./redis-connection";
// This file is kept for backward compatibility but now uses the new service-queue system
// The actual queue logic has been moved to service-queue.ts using p-limit
export const deploymentWorker = new Worker(
"deployments",
async (job: Job<DeploymentJob>) => {
try {
if (job.data.applicationType === "application") {
await updateApplicationStatus(job.data.applicationId, "running");
import { serviceQueueManager } from "./service-queue";
if (job.data.server) {
if (job.data.type === "redeploy") {
await rebuildRemoteApplication({
applicationId: job.data.applicationId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
} else if (job.data.type === "deploy") {
await deployRemoteApplication({
applicationId: job.data.applicationId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
}
} else {
if (job.data.type === "redeploy") {
await rebuildApplication({
applicationId: job.data.applicationId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
} else if (job.data.type === "deploy") {
await deployApplication({
applicationId: job.data.applicationId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
}
}
} else if (job.data.applicationType === "compose") {
await updateCompose(job.data.composeId, {
composeStatus: "running",
});
if (job.data.server) {
if (job.data.type === "redeploy") {
await rebuildRemoteCompose({
composeId: job.data.composeId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
} else if (job.data.type === "deploy") {
await deployRemoteCompose({
composeId: job.data.composeId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
}
} else {
if (job.data.type === "deploy") {
await deployCompose({
composeId: job.data.composeId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
} else if (job.data.type === "redeploy") {
await rebuildCompose({
composeId: job.data.composeId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
});
}
}
} else if (job.data.applicationType === "application-preview") {
await updatePreviewDeployment(job.data.previewDeploymentId, {
previewStatus: "running",
});
if (job.data.server) {
if (job.data.type === "deploy") {
await deployRemotePreviewApplication({
applicationId: job.data.applicationId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
previewDeploymentId: job.data.previewDeploymentId,
});
}
} else {
if (job.data.type === "deploy") {
await deployPreviewApplication({
applicationId: job.data.applicationId,
titleLog: job.data.titleLog,
descriptionLog: job.data.descriptionLog,
previewDeploymentId: job.data.previewDeploymentId,
});
}
}
}
} catch (error) {
console.log("Error", error);
}
// Legacy compatibility - this is no longer used but kept to avoid breaking imports
export const deploymentWorker = {
run: async () => {
console.log(
"Legacy deploymentWorker.run() called - now using service-queue system",
);
// The service queue manager starts automatically, no need to do anything
return Promise.resolve();
},
{
autorun: false,
connection: redisConfig,
close: async () => {
console.log("Legacy deploymentWorker.close() called");
return Promise.resolve();
},
);
};
// Legacy exports for backward compatibility
export const getWorkersMap = () => {
console.warn(
"getWorkersMap() is deprecated - use serviceQueueManager instead",
);
return {};
};
export const getWorker = (_serverId?: string) => {
console.warn("getWorker() is deprecated - use serviceQueueManager instead");
return undefined;
};
export const createDeploymentWorker = (defaultConcurrency = 1) => {
console.warn(
"createDeploymentWorker() is deprecated - use serviceQueueManager instead",
);
serviceQueueManager.setGlobalConcurrency(defaultConcurrency);
return deploymentWorker;
};
export const createServerDeploymentWorker = (
_serverId: string,
_concurrency = 1,
) => {
console.warn(
"createServerDeploymentWorker() is deprecated - use serviceQueueManager instead",
);
// The new system automatically creates queues per service, no need for explicit worker creation
return deploymentWorker;
};
export const removeServerDeploymentWorker = (serverId: string) => {
console.warn(
"removeServerDeploymentWorker() is deprecated - use removeServiceQueue instead",
);
serviceQueueManager.removeServiceQueue(serverId);
};

View File

@@ -1,44 +1,101 @@
import { Queue } from "bullmq";
import { redisConfig } from "./redis-connection";
import type { DeploymentJob } from "./queue-types";
import {
addDeploymentJob,
cancelDeploymentJobs,
getDeploymentQueueStatus,
setGlobalConcurrency,
} from "./service-queue";
const myQueue = new Queue("deployments", {
connection: redisConfig,
});
// Default queue name for local deployments
export const DEFAULT_QUEUE = "default";
process.on("SIGTERM", () => {
myQueue.close();
process.exit(0);
});
// Initialize with default concurrency of 3 services
setGlobalConcurrency(3);
myQueue.on("error", (error) => {
if ((error as any).code === "ECONNREFUSED") {
console.error(
"Make sure you have installed Redis and it is running.",
error,
);
// Helper function to determine service ID from job data
// Groups deployments by SERVER, not by individual application/compose
const getServiceId = (jobData: DeploymentJob): string => {
// If it has a serverId, group by that server
if (jobData.serverId) {
return jobData.serverId;
}
});
// For local deployments (no serverId), group all under the main Dokploy server
return "dokploy-server";
};
// Compatibility functions to replace BullMQ usage
export const myQueue = {
add: async (
_name: string,
jobData: DeploymentJob,
_options?: any,
userId?: string,
) => {
const serviceId = getServiceId(jobData);
const jobId = await addDeploymentJob(serviceId, jobData, userId);
console.log(`Added deployment job ${jobId} to service ${serviceId}`);
return { id: jobId };
},
close: () => {
console.log("Service queue manager shutdown initiated");
return Promise.resolve();
},
};
export const cleanQueuesByApplication = async (applicationId: string) => {
const jobs = await myQueue.getJobs(["waiting", "delayed"]);
// Cancel jobs for this specific application across all servers
let totalCancelled = 0;
for (const job of jobs) {
if (job?.data?.applicationId === applicationId) {
await job.remove();
console.log(`Removed job ${job.id} for application ${applicationId}`);
}
}
// Check the local Dokploy server
const localCancelled = cancelDeploymentJobs(
"dokploy-server",
applicationId,
undefined,
);
totalCancelled += localCancelled;
// TODO: Also check remote servers if we need to track which servers have this application
// For now, we only clean from the local server queue
console.log(
`Cancelled ${totalCancelled} jobs for application ${applicationId}`,
);
return totalCancelled;
};
export const cleanQueuesByCompose = async (composeId: string) => {
const jobs = await myQueue.getJobs(["waiting", "delayed"]);
// Cancel jobs for this specific compose across all servers
let totalCancelled = 0;
for (const job of jobs) {
if (job?.data?.composeId === composeId) {
await job.remove();
console.log(`Removed job ${job.id} for compose ${composeId}`);
}
}
// Check the local Dokploy server
const localCancelled = cancelDeploymentJobs(
"dokploy-server",
undefined,
composeId,
);
totalCancelled += localCancelled;
// TODO: Also check remote servers if we need to track which servers have this compose
// For now, we only clean from the local server queue
console.log(`Cancelled ${totalCancelled} jobs for compose ${composeId}`);
return totalCancelled;
};
export { myQueue };
// Export queue status for monitoring
export const getQueueStatus = getDeploymentQueueStatus;
// New function to add jobs with user context (for API routes)
export const addJobWithUserContext = async (
jobData: DeploymentJob,
userId?: string,
): Promise<{ id: string }> => {
const serviceId = getServiceId(jobData);
const jobId = await addDeploymentJob(serviceId, jobData, userId);
console.log(
`Added deployment job ${jobId} to service ${serviceId} with user context ${userId || "none"}`,
);
return { id: jobId };
};

View File

@@ -1,8 +0,0 @@
import type { ConnectionOptions } from "bullmq";
export const redisConfig: ConnectionOptions = {
host:
process.env.NODE_ENV === "production"
? process.env.REDIS_HOST || "dokploy-redis"
: "127.0.0.1",
};

View File

@@ -0,0 +1,500 @@
import {
deployApplication,
deployCompose,
deployPreviewApplication,
deployRemoteApplication,
deployRemoteCompose,
deployRemotePreviewApplication,
findServerById,
rebuildApplication,
rebuildCompose,
rebuildRemoteApplication,
rebuildRemoteCompose,
updateApplicationStatus,
updateCompose,
updatePreviewDeployment,
} from "@dokploy/server";
import { db } from "@dokploy/server/db";
import { users_temp } from "@dokploy/server/db/schema";
import { eq } from "drizzle-orm";
import pLimit from "p-limit";
import type { DeploymentJob } from "./queue-types";
// Types for our p-limit based queue system
export interface QueueJob {
id: string;
data: DeploymentJob;
createdAt: Date;
status: "waiting" | "processing" | "completed" | "failed" | "cancelled";
abortController: AbortController;
promise?: Promise<void>;
}
export interface ServiceQueue {
serviceId: string;
jobs: QueueJob[];
limit: ReturnType<typeof pLimit>; // p-limit instance with concurrency 1
}
// Global queue management using p-limit
class ServiceQueueManager {
private queues: Map<string, ServiceQueue> = new Map();
private globalLimit: ReturnType<typeof pLimit>;
private isShuttingDown = false;
constructor(globalConcurrency = 3) {
// Global limit controls how many services can deploy simultaneously
this.globalLimit = pLimit(globalConcurrency);
this.setupShutdownHandlers();
}
// Set global concurrency (how many services can deploy simultaneously)
setGlobalConcurrency(concurrency: number) {
this.globalLimit = pLimit(concurrency);
}
// Get concurrency settings from database
private async getConcurrencySettings(jobData: DeploymentJob): Promise<{
serviceConcurrency: number;
}> {
try {
// Default: Each service processes 1 deployment at a time (FIFO within service)
let serviceConcurrency = 1;
// If it's a server deployment, get server-specific concurrency
// This controls how many deployments can run simultaneously ON THAT SERVER
if (jobData.serverId) {
try {
const serverData = await findServerById(jobData.serverId);
serviceConcurrency = serverData.concurrency || 1;
console.log(
`Server ${jobData.serverId} can handle ${serviceConcurrency} concurrent deployments`,
);
} catch (error) {
console.warn(
`Could not get server concurrency for ${jobData.serverId}, using default: 1`,
);
}
}
return {
serviceConcurrency,
};
} catch (error) {
console.warn(
"Error getting concurrency settings, using defaults:",
error,
);
return {
serviceConcurrency: 1,
};
}
}
// Get or create a queue for a service with dynamic concurrency
private async getOrCreateQueue(
serviceId: string,
jobData?: DeploymentJob,
): Promise<ServiceQueue> {
if (!this.queues.has(serviceId)) {
let serviceConcurrency = 1; // Default
// Get concurrency from database if we have job data
if (jobData) {
const settings = await this.getConcurrencySettings(jobData);
serviceConcurrency = settings.serviceConcurrency;
}
this.queues.set(serviceId, {
serviceId,
jobs: [],
// Service concurrency from database or default to 1
limit: pLimit(serviceConcurrency),
});
console.log(
`Created queue for service ${serviceId} with concurrency: ${serviceConcurrency}`,
);
}
return this.queues.get(serviceId)!;
}
// Add a job to a service queue
async addJob(
serviceId: string,
jobData: DeploymentJob,
userId?: string,
): Promise<string> {
if (this.isShuttingDown) {
throw new Error("Queue manager is shutting down");
}
// Update global concurrency based on user settings if provided
// This controls the TOTAL number of deployments across ALL services for this user
if (userId) {
try {
const userData = await db.query.users_temp.findFirst({
where: eq(users_temp.id, userId),
});
if (userData?.serverConcurrency) {
// This is GLOBAL concurrency - total deployments across all services
this.globalLimit = pLimit(userData.serverConcurrency);
console.log(
`Set GLOBAL concurrency to ${userData.serverConcurrency} deployments total for user ${userId}`,
);
}
} catch (error) {
console.warn(
`Could not get user concurrency settings for ${userId}:`,
error,
);
}
}
const queue = await this.getOrCreateQueue(serviceId, jobData);
const jobId = `${serviceId}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
const job: QueueJob = {
id: jobId,
data: jobData,
createdAt: new Date(),
status: "waiting",
abortController: new AbortController(),
};
queue.jobs.push(job);
console.log(
`Added job ${jobId} to service ${serviceId} queue. Queue length: ${queue.jobs.length}`,
);
// Start processing the job using p-limit
this.processJob(queue, job);
return jobId;
}
// Process a job using both global and service-level p-limit
private processJob(queue: ServiceQueue, job: QueueJob) {
// Use global limit to control cross-service concurrency
job.promise = this.globalLimit(() =>
// Use service limit to ensure ordered processing within service
queue.limit(async () => {
if (job.status === "cancelled" || this.isShuttingDown) {
return;
}
job.status = "processing";
console.log(`Processing job ${job.id} for service ${queue.serviceId}`);
try {
await this.executeJob(job);
job.status = "completed";
console.log(`Completed job ${job.id} for service ${queue.serviceId}`);
} catch (error) {
if (job.abortController.signal.aborted) {
job.status = "cancelled";
console.log(
`Job ${job.id} was cancelled for service ${queue.serviceId}`,
);
} else {
job.status = "failed";
console.error(
`Job ${job.id} failed for service ${queue.serviceId}:`,
error,
);
}
} finally {
// Clean up completed/failed jobs after a delay
setTimeout(() => {
queue.jobs = queue.jobs.filter((j) => j.id !== job.id);
}, 5000);
}
}),
);
}
// Remove/cancel jobs for a specific service
cancelJobsByService(
serviceId: string,
applicationId?: string,
composeId?: string,
): number {
const queue = this.queues.get(serviceId);
if (!queue) return 0;
let cancelledCount = 0;
// Cancel waiting and processing jobs
for (const job of queue.jobs) {
if (job.status === "waiting" || job.status === "processing") {
// Check if this job matches the filter criteria
const matchesApplication = applicationId
? (job.data.applicationType === "application" ||
job.data.applicationType === "application-preview") &&
job.data.applicationId === applicationId
: true;
const matchesCompose = composeId
? job.data.applicationType === "compose" &&
job.data.composeId === composeId
: true;
if (matchesApplication && matchesCompose) {
job.status = "cancelled";
job.abortController.abort();
cancelledCount++;
console.log(`Cancelled job ${job.id} for service ${serviceId}`);
}
}
}
// Remove cancelled jobs from queue immediately
queue.jobs = queue.jobs.filter((job) => job.status !== "cancelled");
return cancelledCount;
}
// Get queue status for a service
getQueueStatus(serviceId: string) {
const queue = this.queues.get(serviceId);
if (!queue) return null;
return {
serviceId,
totalJobs: queue.jobs.length,
waitingJobs: queue.jobs.filter((j) => j.status === "waiting").length,
processingJobs: queue.jobs.filter((j) => j.status === "processing")
.length,
completedJobs: queue.jobs.filter((j) => j.status === "completed").length,
failedJobs: queue.jobs.filter((j) => j.status === "failed").length,
// p-limit queue status
activeCount: queue.limit.activeCount,
pendingCount: queue.limit.pendingCount,
};
}
// Get all queues status
getAllQueuesStatus() {
const status: Record<string, any> = {};
for (const [serviceId] of this.queues) {
status[serviceId] = this.getQueueStatus(serviceId);
}
status.global = {
activeCount: this.globalLimit.activeCount,
pendingCount: this.globalLimit.pendingCount,
concurrency: this.globalLimit.concurrency,
};
return status;
}
// Clear pending jobs from a service queue using p-limit's clearQueue
clearServiceQueue(serviceId: string) {
const queue = this.queues.get(serviceId);
if (queue) {
// Cancel all waiting jobs
for (const job of queue.jobs) {
if (job.status === "waiting") {
job.status = "cancelled";
job.abortController.abort();
}
}
// Clear p-limit's internal queue
queue.limit.clearQueue();
// Remove cancelled jobs
queue.jobs = queue.jobs.filter((job) => job.status !== "cancelled");
console.log(`Cleared service queue for ${serviceId}`);
}
}
private async executeJob(job: QueueJob): Promise<void> {
const { data } = job;
// Check if job was cancelled before execution
if (job.abortController.signal.aborted) {
throw new Error("Job was cancelled");
}
try {
if (data.applicationType === "application") {
await updateApplicationStatus(data.applicationId, "running");
if (data.server) {
if (data.type === "redeploy") {
await rebuildRemoteApplication({
applicationId: data.applicationId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
} else if (data.type === "deploy") {
await deployRemoteApplication({
applicationId: data.applicationId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
}
} else {
if (data.type === "redeploy") {
await rebuildApplication({
applicationId: data.applicationId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
} else if (data.type === "deploy") {
await deployApplication({
applicationId: data.applicationId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
}
}
} else if (data.applicationType === "compose") {
await updateCompose(data.composeId, {
composeStatus: "running",
});
if (data.server) {
if (data.type === "redeploy") {
await rebuildRemoteCompose({
composeId: data.composeId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
} else if (data.type === "deploy") {
await deployRemoteCompose({
composeId: data.composeId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
}
} else {
if (data.type === "deploy") {
await deployCompose({
composeId: data.composeId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
} else if (data.type === "redeploy") {
await rebuildCompose({
composeId: data.composeId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
});
}
}
} else if (data.applicationType === "application-preview") {
await updatePreviewDeployment(data.previewDeploymentId, {
previewStatus: "running",
});
if (data.server) {
if (data.type === "deploy") {
await deployRemotePreviewApplication({
applicationId: data.applicationId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
previewDeploymentId: data.previewDeploymentId,
});
}
} else {
if (data.type === "deploy") {
await deployPreviewApplication({
applicationId: data.applicationId,
titleLog: data.titleLog,
descriptionLog: data.descriptionLog,
previewDeploymentId: data.previewDeploymentId,
});
}
}
}
} catch (error) {
console.log("Deployment Error", error);
throw error;
}
}
private setupShutdownHandlers() {
const gracefulShutdown = async () => {
console.log("Shutting down service queue manager...");
this.isShuttingDown = true;
// Cancel all jobs
for (const queue of this.queues.values()) {
for (const job of queue.jobs) {
job.abortController.abort();
}
// Clear p-limit queues
queue.limit.clearQueue();
}
// Clear global queue
this.globalLimit.clearQueue();
// Wait a bit for jobs to finish cancelling
await new Promise((resolve) => setTimeout(resolve, 2000));
process.exit(0);
};
process.on("SIGTERM", gracefulShutdown);
process.on("SIGINT", gracefulShutdown);
}
// Remove a specific service queue entirely
removeServiceQueue(serviceId: string) {
const queue = this.queues.get(serviceId);
if (queue) {
// Cancel all jobs in the queue
for (const job of queue.jobs) {
job.abortController.abort();
}
// Clear p-limit queue
queue.limit.clearQueue();
this.queues.delete(serviceId);
console.log(`Removed service queue for ${serviceId}`);
}
}
}
// Global instance
export const serviceQueueManager = new ServiceQueueManager();
// Helper functions to maintain compatibility with existing code
export const addDeploymentJob = async (
serviceId: string,
jobData: DeploymentJob,
userId?: string,
): Promise<string> => {
return await serviceQueueManager.addJob(serviceId, jobData, userId);
};
export const cancelDeploymentJobs = (
serviceId: string,
applicationId?: string,
composeId?: string,
): number => {
return serviceQueueManager.cancelJobsByService(
serviceId,
applicationId,
composeId,
);
};
export const getDeploymentQueueStatus = (serviceId?: string) => {
if (serviceId) {
return serviceQueueManager.getQueueStatus(serviceId);
}
return serviceQueueManager.getAllQueuesStatus();
};
export const setGlobalConcurrency = (concurrency: number) => {
serviceQueueManager.setGlobalConcurrency(concurrency);
};
export const removeServiceQueue = (serviceId: string) => {
serviceQueueManager.removeServiceQueue(serviceId);
};
export const clearServiceQueue = (serviceId: string) => {
serviceQueueManager.clearServiceQueue(serviceId);
};

View File

@@ -111,4 +111,4 @@
"node": "^20.16.0",
"pnpm": ">=9.12.0"
}
}
}

View File

@@ -1,14 +0,0 @@
import { defineConfig } from "drizzle-kit";
export default defineConfig({
schema: "./server/db/schema/index.ts",
dialect: "postgresql",
dbCredentials: {
url: process.env.DATABASE_URL!,
},
out: "drizzle",
migrations: {
table: "migrations",
schema: "public",
},
});

View File

@@ -1,21 +0,0 @@
// import { drizzle } from "drizzle-orm/postgres-js";
// import { migrate } from "drizzle-orm/postgres-js/migrator";
// import postgres from "postgres";
// const connectionString = process.env.DATABASE_URL!;
// const sql = postgres(connectionString, { max: 1 });
// const db = drizzle(sql);
// export const migration = async () =>
// await migrate(db, { migrationsFolder: "drizzle" })
// .then(() => {
// console.log("Migration complete");
// sql.end();
// })
// .catch((error) => {
// console.log("Migration failed", error);
// })
// .finally(() => {
// sql.end();
// });

View File

@@ -1,23 +0,0 @@
import { sql } from "drizzle-orm";
// Credits to Louistiti from Drizzle Discord: https://discord.com/channels/1043890932593987624/1130802621750448160/1143083373535973406
import { drizzle } from "drizzle-orm/postgres-js";
import postgres from "postgres";
const connectionString = process.env.DATABASE_URL!;
const pg = postgres(connectionString, { max: 1 });
const db = drizzle(pg);
const clearDb = async (): Promise<void> => {
try {
const tablesQuery = sql<string>`DROP SCHEMA public CASCADE; CREATE SCHEMA public; DROP schema drizzle CASCADE;`;
const tables = await db.execute(tablesQuery);
console.log(tables);
await pg.end();
} catch (error) {
console.error("Error cleaning database", error);
} finally {
}
};
clearDb();

View File

@@ -48,6 +48,7 @@ export const server = pgTable("server", {
sshKeyId: text("sshKeyId").references(() => sshKeys.sshKeyId, {
onDelete: "set null",
}),
concurrency: integer("concurrency").notNull().default(1),
metricsConfig: jsonb("metricsConfig")
.$type<{
server: {

View File

@@ -62,6 +62,7 @@ export const users_temp = pgTable("user_temp", {
// Metrics
enablePaidFeatures: boolean("enablePaidFeatures").notNull().default(false),
allowImpersonation: boolean("allowImpersonation").notNull().default(false),
serverConcurrency: integer("serverConcurrency").notNull().default(1),
metricsConfig: jsonb("metricsConfig")
.$type<{
server: {

View File

@@ -1,35 +0,0 @@
// import bc from "bcrypt";
// import { drizzle } from "drizzle-orm/postgres-js";
// import postgres from "postgres";
// import { users } from "./schema";
// const connectionString = process.env.DATABASE_URL!;
// const pg = postgres(connectionString, { max: 1 });
// const db = drizzle(pg);
// function password(txt: string) {
// return bc.hashSync(txt, 10);
// }
// async function seed() {
// console.log("> Seed:", process.env.DATABASE_PATH, "\n");
// // const authenticationR = await db
// // .insert(users)
// // .values([
// // {
// // email: "user1@hotmail.com",
// // password: password("12345671"),
// // },
// // ])
// // .onConflictDoNothing()
// // .returning();
// // console.log("\nSemillas Update:", authenticationR.length);
// }
// seed().catch((e) => {
// console.error(e);
// process.exit(1);
// });

14
pnpm-lock.yaml generated
View File

@@ -280,9 +280,6 @@ importers:
boxen:
specifier: ^7.1.1
version: 7.1.1
bullmq:
specifier: 5.4.2
version: 5.4.2
class-variance-authority:
specifier: ^0.7.1
version: 0.7.1
@@ -367,6 +364,9 @@ importers:
otpauth:
specifier: ^9.4.0
version: 9.4.0
p-limit:
specifier: ^7.1.1
version: 7.1.1
pino:
specifier: 9.4.0
version: 9.4.0
@@ -6412,6 +6412,10 @@ packages:
resolution: {integrity: sha512-/Eaoq+QyLSiXQ4lyYV23f14mZRQcXnxfHrN0vCai+ak9G0pp9iEQukIIZq5NccEvwRB8PUnZT0KsOoDCINS1qQ==}
engines: {node: '>=18'}
p-limit@7.1.1:
resolution: {integrity: sha512-i8PyM2JnsNChVSYWLr2BAjNoLi0BAYC+wecOnZnVV+YSNJkzP7cWmvI34dk0WArWfH9KwBHNoZI3P3MppImlIA==}
engines: {node: '>=20'}
p-locate@4.1.0:
resolution: {integrity: sha512-R79ZZ/0wAxKGu3oYMlz8jy/kbhsNrS7SKZ7PxEHBgJ5+F2mtFW2fK2cOtBh1cHYkQsbzFV7I+EoRKe6Yt0oK7A==}
engines: {node: '>=8'}
@@ -13869,6 +13873,10 @@ snapshots:
dependencies:
yocto-queue: 1.2.1
p-limit@7.1.1:
dependencies:
yocto-queue: 1.2.1
p-locate@4.1.0:
dependencies:
p-limit: 2.3.0