mirror of
https://github.com/Dokploy/dokploy.git
synced 2026-06-18 13:45:23 +02:00
Compare commits
18 Commits
2451-githu
...
feat/concu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2da2b2dd39 | ||
|
|
7273c636a0 | ||
|
|
d6a0585bae | ||
|
|
935d1686f2 | ||
|
|
349248105a | ||
|
|
d922568510 | ||
|
|
44ae4df151 | ||
|
|
77fdda4c09 | ||
|
|
8a1e36cc3b | ||
|
|
1635bab44f | ||
|
|
4a52459015 | ||
|
|
17f333ac2a | ||
|
|
d770307d64 | ||
|
|
aa434cbdea | ||
|
|
c42054b965 | ||
|
|
03588bf375 | ||
|
|
8c420ff4f5 | ||
|
|
cbf6f95891 |
2
.github/workflows/dokploy.yml
vendored
2
.github/workflows/dokploy.yml
vendored
@@ -2,7 +2,7 @@ name: Dokploy Docker Build
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main, canary]
|
||||
branches: [main, canary, "fix/re-apply-database-migration-fix"]
|
||||
workflow_dispatch:
|
||||
|
||||
env:
|
||||
|
||||
@@ -79,7 +79,7 @@ export const ShowGeneralApplication = ({ applicationId }: Props) => {
|
||||
>
|
||||
<Button
|
||||
variant="default"
|
||||
isLoading={data?.applicationStatus === "running"}
|
||||
// isLoading={data?.applicationStatus === "running"}
|
||||
className="flex items-center gap-1.5 group focus-visible:ring-2 focus-visible:ring-offset-2"
|
||||
>
|
||||
<Tooltip>
|
||||
|
||||
@@ -11,11 +11,20 @@ interface Props {
|
||||
export const DashboardLayout = ({ children }: Props) => {
|
||||
const { data: haveRootAccess } = api.user.haveRootAccess.useQuery();
|
||||
const { data: isCloud } = api.settings.isCloud.useQuery();
|
||||
const { data: isUserSubscribed } = api.settings.isUserSubscribed.useQuery(
|
||||
undefined,
|
||||
{
|
||||
enabled: isCloud === true,
|
||||
refetchOnWindowFocus: false,
|
||||
refetchOnMount: false,
|
||||
refetchOnReconnect: false,
|
||||
},
|
||||
);
|
||||
|
||||
return (
|
||||
<>
|
||||
<Page>{children}</Page>
|
||||
{isCloud === true && (
|
||||
{isCloud === true && isUserSubscribed === true && (
|
||||
<ChatwootWidget websiteToken="USCpQRKzHvFMssf3p6Eacae5" />
|
||||
)}
|
||||
|
||||
|
||||
2
apps/dokploy/drizzle/0107_calm_power_pack.sql
Normal file
2
apps/dokploy/drizzle/0107_calm_power_pack.sql
Normal file
@@ -0,0 +1,2 @@
|
||||
ALTER TABLE "user_temp" ADD COLUMN "serverConcurrency" integer DEFAULT 1 NOT NULL;--> statement-breakpoint
|
||||
ALTER TABLE "server" ADD COLUMN "concurrency" integer DEFAULT 1 NOT NULL;
|
||||
6438
apps/dokploy/drizzle/meta/0107_snapshot.json
Normal file
6438
apps/dokploy/drizzle/meta/0107_snapshot.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -750,6 +750,13 @@
|
||||
"when": 1754912062243,
|
||||
"tag": "0106_purple_maggott",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 107,
|
||||
"version": "7",
|
||||
"when": 1756436825081,
|
||||
"tag": "0107_calm_power_pack",
|
||||
"breakpoints": true
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -7,6 +7,10 @@ function prepareDefine(config: DotenvParseOutput | undefined) {
|
||||
const define = {};
|
||||
// @ts-ignore
|
||||
for (const [key, value] of Object.entries(config)) {
|
||||
// Skip DATABASE_URL to allow runtime environment variable override
|
||||
if (key === "DATABASE_URL") {
|
||||
continue;
|
||||
}
|
||||
// @ts-ignore
|
||||
define[`process.env.${key}`] = JSON.stringify(value);
|
||||
}
|
||||
@@ -14,6 +18,7 @@ function prepareDefine(config: DotenvParseOutput | undefined) {
|
||||
}
|
||||
|
||||
const define = prepareDefine(result.parsed);
|
||||
|
||||
try {
|
||||
esbuild
|
||||
.build({
|
||||
|
||||
@@ -1,149 +0,0 @@
|
||||
// import { drizzle } from "drizzle-orm/postgres-js";
|
||||
// import { nanoid } from "nanoid";
|
||||
// import postgres from "postgres";
|
||||
// import * as schema from "./server/db/schema";
|
||||
|
||||
// const connectionString = process.env.DATABASE_URL!;
|
||||
|
||||
// const sql = postgres(connectionString, { max: 1 });
|
||||
// const db = drizzle(sql, {
|
||||
// schema,
|
||||
// });
|
||||
|
||||
// await db
|
||||
// .transaction(async (db) => {
|
||||
// const admins = await db.query.admins.findMany({
|
||||
// with: {
|
||||
// auth: true,
|
||||
// users: {
|
||||
// with: {
|
||||
// auth: true,
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// });
|
||||
// for (const admin of admins) {
|
||||
// const user = await db
|
||||
// .insert(schema.users_temp)
|
||||
// .values({
|
||||
// id: admin.adminId,
|
||||
// email: admin.auth.email,
|
||||
// token: admin.auth.token || "",
|
||||
// emailVerified: true,
|
||||
// updatedAt: new Date(),
|
||||
// role: "admin",
|
||||
// serverIp: admin.serverIp,
|
||||
// image: admin.auth.image,
|
||||
// certificateType: admin.certificateType,
|
||||
// host: admin.host,
|
||||
// letsEncryptEmail: admin.letsEncryptEmail,
|
||||
// sshPrivateKey: admin.sshPrivateKey,
|
||||
// enableDockerCleanup: admin.enableDockerCleanup,
|
||||
// enableLogRotation: admin.enableLogRotation,
|
||||
// enablePaidFeatures: admin.enablePaidFeatures,
|
||||
// metricsConfig: admin.metricsConfig,
|
||||
// cleanupCacheApplications: admin.cleanupCacheApplications,
|
||||
// cleanupCacheOnPreviews: admin.cleanupCacheOnPreviews,
|
||||
// cleanupCacheOnCompose: admin.cleanupCacheOnCompose,
|
||||
// stripeCustomerId: admin.stripeCustomerId,
|
||||
// stripeSubscriptionId: admin.stripeSubscriptionId,
|
||||
// serversQuantity: admin.serversQuantity,
|
||||
// })
|
||||
// .returning()
|
||||
// .then((user) => user[0]);
|
||||
|
||||
// await db.insert(schema.account).values({
|
||||
// providerId: "credential",
|
||||
// userId: user?.id || "",
|
||||
// password: admin.auth.password,
|
||||
// is2FAEnabled: admin.auth.is2FAEnabled || false,
|
||||
// createdAt: new Date(admin.auth.createdAt) || new Date(),
|
||||
// updatedAt: new Date(admin.auth.createdAt) || new Date(),
|
||||
// });
|
||||
|
||||
// const organization = await db
|
||||
// .insert(schema.organization)
|
||||
// .values({
|
||||
// name: "My Organization",
|
||||
// slug: nanoid(),
|
||||
// ownerId: user?.id || "",
|
||||
// createdAt: new Date(admin.createdAt) || new Date(),
|
||||
// })
|
||||
// .returning()
|
||||
// .then((organization) => organization[0]);
|
||||
|
||||
// for (const member of admin.users) {
|
||||
// const userTemp = await db
|
||||
// .insert(schema.users_temp)
|
||||
// .values({
|
||||
// id: member.userId,
|
||||
// email: member.auth.email,
|
||||
// token: member.token || "",
|
||||
// emailVerified: true,
|
||||
// updatedAt: new Date(admin.createdAt) || new Date(),
|
||||
// role: "user",
|
||||
// image: member.auth.image,
|
||||
// createdAt: admin.createdAt,
|
||||
// canAccessToAPI: member.canAccessToAPI || false,
|
||||
// canAccessToDocker: member.canAccessToDocker || false,
|
||||
// canAccessToGitProviders: member.canAccessToGitProviders || false,
|
||||
// canAccessToSSHKeys: member.canAccessToSSHKeys || false,
|
||||
// canAccessToTraefikFiles: member.canAccessToTraefikFiles || false,
|
||||
// canCreateProjects: member.canCreateProjects || false,
|
||||
// canCreateServices: member.canCreateServices || false,
|
||||
// canDeleteProjects: member.canDeleteProjects || false,
|
||||
// canDeleteServices: member.canDeleteServices || false,
|
||||
// accessedProjects: member.accessedProjects || [],
|
||||
// accessedServices: member.accessedServices || [],
|
||||
// })
|
||||
// .returning()
|
||||
// .then((userTemp) => userTemp[0]);
|
||||
|
||||
// await db.insert(schema.account).values({
|
||||
// providerId: "credential",
|
||||
// userId: member?.userId || "",
|
||||
// password: member.auth.password,
|
||||
// is2FAEnabled: member.auth.is2FAEnabled || false,
|
||||
// createdAt: new Date(member.auth.createdAt) || new Date(),
|
||||
// updatedAt: new Date(member.auth.createdAt) || new Date(),
|
||||
// });
|
||||
|
||||
// await db.insert(schema.member).values({
|
||||
// organizationId: organization?.id || "",
|
||||
// userId: userTemp?.id || "",
|
||||
// role: "admin",
|
||||
// createdAt: new Date(member.createdAt) || new Date(),
|
||||
// });
|
||||
// }
|
||||
// }
|
||||
// })
|
||||
// .then(() => {
|
||||
// console.log("Migration finished");
|
||||
// })
|
||||
// .catch((error) => {
|
||||
// console.error(error);
|
||||
// });
|
||||
|
||||
// await db
|
||||
// .transaction(async (db) => {
|
||||
// const projects = await db.query.projects.findMany({
|
||||
// with: {
|
||||
// user: {
|
||||
// with: {
|
||||
// organizations: true,
|
||||
// },
|
||||
// },
|
||||
// },
|
||||
// });
|
||||
// for (const project of projects) {
|
||||
// const _user = await db.update(schema.projects).set({
|
||||
// organizationId: project.user.organizations[0]?.id || "",
|
||||
// });
|
||||
// }
|
||||
// })
|
||||
// .then(() => {
|
||||
// console.log("Migration finished");
|
||||
// })
|
||||
// .catch((error) => {
|
||||
// console.error(error);
|
||||
// });
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "dokploy",
|
||||
"version": "v0.24.12",
|
||||
"version": "v0.25.0",
|
||||
"private": true,
|
||||
"license": "Apache-2.0",
|
||||
"type": "module",
|
||||
@@ -97,7 +97,6 @@
|
||||
"better-auth": "v1.2.8-beta.7",
|
||||
"bl": "6.0.11",
|
||||
"boxen": "^7.1.1",
|
||||
"bullmq": "5.4.2",
|
||||
"class-variance-authority": "^0.7.1",
|
||||
"clsx": "^2.1.1",
|
||||
"cmdk": "^0.2.1",
|
||||
@@ -126,6 +125,7 @@
|
||||
"nodemailer": "6.9.14",
|
||||
"octokit": "3.1.2",
|
||||
"otpauth": "^9.4.0",
|
||||
"p-limit": "^7.1.1",
|
||||
"pino": "9.4.0",
|
||||
"pino-pretty": "11.2.2",
|
||||
"postgres": "3.4.4",
|
||||
|
||||
@@ -54,7 +54,11 @@ import {
|
||||
applications,
|
||||
} from "@/server/db/schema";
|
||||
import type { DeploymentJob } from "@/server/queues/queue-types";
|
||||
import { cleanQueuesByApplication, myQueue } from "@/server/queues/queueSetup";
|
||||
import {
|
||||
addJobWithUserContext,
|
||||
cleanQueuesByApplication,
|
||||
myQueue,
|
||||
} from "@/server/queues/queueSetup";
|
||||
import { deploy } from "@/server/utils/deploy";
|
||||
import { uploadFileSchema } from "@/utils/schema";
|
||||
|
||||
@@ -668,14 +672,7 @@ export const applicationRouter = createTRPCRouter({
|
||||
|
||||
return true;
|
||||
}
|
||||
await myQueue.add(
|
||||
"deployments",
|
||||
{ ...jobData },
|
||||
{
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
},
|
||||
);
|
||||
await addJobWithUserContext({ ...jobData }, ctx.user.id);
|
||||
}),
|
||||
|
||||
cleanQueues: protectedProcedure
|
||||
|
||||
@@ -80,7 +80,7 @@ export const redisRouter = createTRPCRouter({
|
||||
type: "volume",
|
||||
});
|
||||
|
||||
return true;
|
||||
return newRedis;
|
||||
} catch (error) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ import {
|
||||
} from "@dokploy/server";
|
||||
import { generateOpenApiDocument } from "@dokploy/trpc-openapi";
|
||||
import { TRPCError } from "@trpc/server";
|
||||
import { sql } from "drizzle-orm";
|
||||
import { eq, sql } from "drizzle-orm";
|
||||
import { dump, load } from "js-yaml";
|
||||
import { scheduledJobs, scheduleJob } from "node-schedule";
|
||||
import { z } from "zod";
|
||||
@@ -60,6 +60,8 @@ import {
|
||||
apiServerSchema,
|
||||
apiTraefikConfig,
|
||||
apiUpdateDockerCleanup,
|
||||
projects,
|
||||
server,
|
||||
} from "@/server/db/schema";
|
||||
import { removeJob, schedule } from "@/server/utils/backup";
|
||||
import packageInfo from "../../../package.json";
|
||||
@@ -706,6 +708,18 @@ export const settingsRouter = createTRPCRouter({
|
||||
isCloud: publicProcedure.query(async () => {
|
||||
return IS_CLOUD;
|
||||
}),
|
||||
isUserSubscribed: protectedProcedure.query(async ({ ctx }) => {
|
||||
const haveServers = await db.query.server.findMany({
|
||||
where: eq(server.organizationId, ctx.session?.activeOrganizationId || ""),
|
||||
});
|
||||
const haveProjects = await db.query.projects.findMany({
|
||||
where: eq(
|
||||
projects.organizationId,
|
||||
ctx.session?.activeOrganizationId || "",
|
||||
),
|
||||
});
|
||||
return haveServers.length > 0 || haveProjects.length > 0;
|
||||
}),
|
||||
health: publicProcedure.query(async () => {
|
||||
if (IS_CLOUD) {
|
||||
try {
|
||||
|
||||
@@ -6,14 +6,18 @@ declare global {
|
||||
var db: PostgresJsDatabase<typeof schema> | undefined;
|
||||
}
|
||||
|
||||
const dbUrl =
|
||||
process.env.DATABASE_URL ||
|
||||
"postgres://dokploy:amukds4wi9001583845717ad2@dokploy-postgres:5432/dokploy";
|
||||
|
||||
export let db: PostgresJsDatabase<typeof schema>;
|
||||
if (process.env.NODE_ENV === "production") {
|
||||
db = drizzle(postgres(process.env.DATABASE_URL!), {
|
||||
db = drizzle(postgres(dbUrl!), {
|
||||
schema,
|
||||
});
|
||||
} else {
|
||||
if (!global.db)
|
||||
global.db = drizzle(postgres(process.env.DATABASE_URL!), {
|
||||
global.db = drizzle(postgres(dbUrl!), {
|
||||
schema,
|
||||
});
|
||||
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
import { drizzle } from "drizzle-orm/postgres-js";
|
||||
import postgres from "postgres";
|
||||
|
||||
const connectionString = process.env.DATABASE_URL!;
|
||||
|
||||
const pg = postgres(connectionString, { max: 1 });
|
||||
const _db = drizzle(pg);
|
||||
|
||||
async function seed() {
|
||||
console.log("> Seed:", process.env.DATABASE_PATH, "\n");
|
||||
|
||||
// const authenticationR = await db
|
||||
// .insert(users)
|
||||
// .values([
|
||||
// {
|
||||
// email: "user1@hotmail.com",
|
||||
// password: password("12345671"),
|
||||
// },
|
||||
// ])
|
||||
// .onConflictDoNothing()
|
||||
// .returning();
|
||||
|
||||
// console.log("\nSemillas Update:", authenticationR.length);
|
||||
}
|
||||
|
||||
seed().catch((e) => {
|
||||
console.error(e);
|
||||
process.exit(1);
|
||||
});
|
||||
104
apps/dokploy/server/queues/README.md
Normal file
104
apps/dokploy/server/queues/README.md
Normal file
@@ -0,0 +1,104 @@
|
||||
# Queue System Migration - BullMQ to p-limit
|
||||
|
||||
This directory contains the new queue system that replaces BullMQ with [p-limit](https://github.com/sindresorhus/p-limit) for deployment queues.
|
||||
|
||||
## Why the Migration?
|
||||
|
||||
- **Resource Issues**: Users experienced freezing during builds due to resource constraints
|
||||
- **Cancellation Problems**: BullMQ workers couldn't be properly canceled when Docker processes restart
|
||||
- **Retry Loops**: Unwanted automatic retries when processes are killed
|
||||
|
||||
## New Architecture
|
||||
|
||||
### Key Features
|
||||
|
||||
1. **Per-Server Queues**: Deployments are grouped by server (local "dokploy-server" or remote servers)
|
||||
2. **Ordered Processing**: Within each server, deployments are processed based on server concurrency settings
|
||||
3. **Global User Concurrency**: User's `serverConcurrency` controls total deployments across all servers
|
||||
4. **Proper Cancellation**: Jobs can be canceled using AbortController
|
||||
5. **No Redis Dependency**: In-memory queues eliminate Redis dependency issues
|
||||
|
||||
### Files
|
||||
|
||||
- `service-queue.ts` - New p-limit based queue implementation
|
||||
- `queueSetup.ts` - Compatibility layer for existing code
|
||||
- `deployments-queue.ts` - Legacy compatibility exports
|
||||
- `queue-types.ts` - Shared type definitions
|
||||
|
||||
## Usage Examples
|
||||
|
||||
```typescript
|
||||
import { addJobWithUserContext, cancelDeploymentJobs, getDeploymentQueueStatus } from './queueSetup';
|
||||
|
||||
// Add a deployment job with user context (recommended for API routes)
|
||||
const result = await addJobWithUserContext({
|
||||
applicationType: 'application',
|
||||
applicationId: '123',
|
||||
type: 'deploy',
|
||||
titleLog: 'Deploying app',
|
||||
descriptionLog: 'Starting deployment',
|
||||
serverId: 'server-456' // Optional - for remote deployments
|
||||
}, 'user-id-789'); // User ID for concurrency settings
|
||||
|
||||
// Cancel jobs for a service
|
||||
const cancelled = cancelDeploymentJobs('app-123');
|
||||
|
||||
// Get queue status
|
||||
const status = getDeploymentQueueStatus('app-123');
|
||||
```
|
||||
|
||||
### Database-Driven Concurrency
|
||||
|
||||
The system now automatically reads concurrency settings from the database:
|
||||
|
||||
1. **Global User Concurrency**: From `users_temp.serverConcurrency` field
|
||||
- Controls the **TOTAL** number of deployments that can run simultaneously for a user
|
||||
- Example: If `serverConcurrency = 1`, only 1 deployment across ALL services at a time
|
||||
- Example: If `serverConcurrency = 3`, maximum 3 deployments can run simultaneously across all services
|
||||
|
||||
2. **Server Concurrency**: From `server.concurrency` field
|
||||
- Controls how many deployments can run simultaneously **on a specific server**
|
||||
- Only applies when deploying to remote servers (`serverId` is present)
|
||||
- Example: Server A can handle 2 concurrent deployments, Server B can handle 1
|
||||
|
||||
### Concurrency Hierarchy
|
||||
|
||||
```
|
||||
User Global Limit (users_temp.serverConcurrency)
|
||||
├── dokploy-server (local deployments)
|
||||
│ ├── App A deployment
|
||||
│ ├── App B deployment
|
||||
│ └── Compose C deployment
|
||||
├── remote-server-1 (server.concurrency = 2)
|
||||
│ ├── App D deployment
|
||||
│ └── App E deployment
|
||||
└── remote-server-2 (server.concurrency = 1)
|
||||
└── App F deployment
|
||||
```
|
||||
|
||||
**Example Scenarios:**
|
||||
|
||||
- **User has `serverConcurrency = 1`**: Only 1 deployment total across ALL servers
|
||||
- **User has `serverConcurrency = 3`**: Maximum 3 deployments simultaneously across all servers
|
||||
- **Local server**: All local apps/compose share the "dokploy-server" queue
|
||||
- **Remote server with `concurrency = 2`**: That server can handle up to 2 concurrent deployments
|
||||
- **Queue grouping**: `app-123` and `app-456` on same server share the same queue
|
||||
|
||||
## Configuration
|
||||
|
||||
- **Global Concurrency**: Set how many services can deploy simultaneously
|
||||
- **Service Concurrency**: Each service processes 1 deployment at a time (FIFO)
|
||||
|
||||
```typescript
|
||||
import { setGlobalConcurrency } from './service-queue';
|
||||
|
||||
// Allow 5 services to deploy simultaneously
|
||||
setGlobalConcurrency(5);
|
||||
```
|
||||
|
||||
## Migration Notes
|
||||
|
||||
- The schedules app still uses BullMQ for cron/repeatable jobs (different use case)
|
||||
- All existing API endpoints work unchanged due to compatibility layer
|
||||
- No breaking changes to existing functionality
|
||||
- Improved resource usage and cancellation capabilities
|
||||
@@ -1,122 +1,58 @@
|
||||
import {
|
||||
deployApplication,
|
||||
deployCompose,
|
||||
deployPreviewApplication,
|
||||
deployRemoteApplication,
|
||||
deployRemoteCompose,
|
||||
deployRemotePreviewApplication,
|
||||
rebuildApplication,
|
||||
rebuildCompose,
|
||||
rebuildRemoteApplication,
|
||||
rebuildRemoteCompose,
|
||||
updateApplicationStatus,
|
||||
updateCompose,
|
||||
updatePreviewDeployment,
|
||||
} from "@dokploy/server";
|
||||
import { type Job, Worker } from "bullmq";
|
||||
import type { DeploymentJob } from "./queue-types";
|
||||
import { redisConfig } from "./redis-connection";
|
||||
// This file is kept for backward compatibility but now uses the new service-queue system
|
||||
// The actual queue logic has been moved to service-queue.ts using p-limit
|
||||
|
||||
export const deploymentWorker = new Worker(
|
||||
"deployments",
|
||||
async (job: Job<DeploymentJob>) => {
|
||||
try {
|
||||
if (job.data.applicationType === "application") {
|
||||
await updateApplicationStatus(job.data.applicationId, "running");
|
||||
import { serviceQueueManager } from "./service-queue";
|
||||
|
||||
if (job.data.server) {
|
||||
if (job.data.type === "redeploy") {
|
||||
await rebuildRemoteApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
} else if (job.data.type === "deploy") {
|
||||
await deployRemoteApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (job.data.type === "redeploy") {
|
||||
await rebuildApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
} else if (job.data.type === "deploy") {
|
||||
await deployApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if (job.data.applicationType === "compose") {
|
||||
await updateCompose(job.data.composeId, {
|
||||
composeStatus: "running",
|
||||
});
|
||||
|
||||
if (job.data.server) {
|
||||
if (job.data.type === "redeploy") {
|
||||
await rebuildRemoteCompose({
|
||||
composeId: job.data.composeId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
} else if (job.data.type === "deploy") {
|
||||
await deployRemoteCompose({
|
||||
composeId: job.data.composeId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (job.data.type === "deploy") {
|
||||
await deployCompose({
|
||||
composeId: job.data.composeId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
} else if (job.data.type === "redeploy") {
|
||||
await rebuildCompose({
|
||||
composeId: job.data.composeId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if (job.data.applicationType === "application-preview") {
|
||||
await updatePreviewDeployment(job.data.previewDeploymentId, {
|
||||
previewStatus: "running",
|
||||
});
|
||||
if (job.data.server) {
|
||||
if (job.data.type === "deploy") {
|
||||
await deployRemotePreviewApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
previewDeploymentId: job.data.previewDeploymentId,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (job.data.type === "deploy") {
|
||||
await deployPreviewApplication({
|
||||
applicationId: job.data.applicationId,
|
||||
titleLog: job.data.titleLog,
|
||||
descriptionLog: job.data.descriptionLog,
|
||||
previewDeploymentId: job.data.previewDeploymentId,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.log("Error", error);
|
||||
}
|
||||
// Legacy compatibility - this is no longer used but kept to avoid breaking imports
|
||||
export const deploymentWorker = {
|
||||
run: async () => {
|
||||
console.log(
|
||||
"Legacy deploymentWorker.run() called - now using service-queue system",
|
||||
);
|
||||
// The service queue manager starts automatically, no need to do anything
|
||||
return Promise.resolve();
|
||||
},
|
||||
{
|
||||
autorun: false,
|
||||
connection: redisConfig,
|
||||
close: async () => {
|
||||
console.log("Legacy deploymentWorker.close() called");
|
||||
return Promise.resolve();
|
||||
},
|
||||
);
|
||||
};
|
||||
|
||||
// Legacy exports for backward compatibility
|
||||
export const getWorkersMap = () => {
|
||||
console.warn(
|
||||
"getWorkersMap() is deprecated - use serviceQueueManager instead",
|
||||
);
|
||||
return {};
|
||||
};
|
||||
|
||||
export const getWorker = (_serverId?: string) => {
|
||||
console.warn("getWorker() is deprecated - use serviceQueueManager instead");
|
||||
return undefined;
|
||||
};
|
||||
|
||||
export const createDeploymentWorker = (defaultConcurrency = 1) => {
|
||||
console.warn(
|
||||
"createDeploymentWorker() is deprecated - use serviceQueueManager instead",
|
||||
);
|
||||
serviceQueueManager.setGlobalConcurrency(defaultConcurrency);
|
||||
return deploymentWorker;
|
||||
};
|
||||
|
||||
export const createServerDeploymentWorker = (
|
||||
_serverId: string,
|
||||
_concurrency = 1,
|
||||
) => {
|
||||
console.warn(
|
||||
"createServerDeploymentWorker() is deprecated - use serviceQueueManager instead",
|
||||
);
|
||||
// The new system automatically creates queues per service, no need for explicit worker creation
|
||||
return deploymentWorker;
|
||||
};
|
||||
|
||||
export const removeServerDeploymentWorker = (serverId: string) => {
|
||||
console.warn(
|
||||
"removeServerDeploymentWorker() is deprecated - use removeServiceQueue instead",
|
||||
);
|
||||
serviceQueueManager.removeServiceQueue(serverId);
|
||||
};
|
||||
|
||||
@@ -1,44 +1,101 @@
|
||||
import { Queue } from "bullmq";
|
||||
import { redisConfig } from "./redis-connection";
|
||||
import type { DeploymentJob } from "./queue-types";
|
||||
import {
|
||||
addDeploymentJob,
|
||||
cancelDeploymentJobs,
|
||||
getDeploymentQueueStatus,
|
||||
setGlobalConcurrency,
|
||||
} from "./service-queue";
|
||||
|
||||
const myQueue = new Queue("deployments", {
|
||||
connection: redisConfig,
|
||||
});
|
||||
// Default queue name for local deployments
|
||||
export const DEFAULT_QUEUE = "default";
|
||||
|
||||
process.on("SIGTERM", () => {
|
||||
myQueue.close();
|
||||
process.exit(0);
|
||||
});
|
||||
// Initialize with default concurrency of 3 services
|
||||
setGlobalConcurrency(3);
|
||||
|
||||
myQueue.on("error", (error) => {
|
||||
if ((error as any).code === "ECONNREFUSED") {
|
||||
console.error(
|
||||
"Make sure you have installed Redis and it is running.",
|
||||
error,
|
||||
);
|
||||
// Helper function to determine service ID from job data
|
||||
// Groups deployments by SERVER, not by individual application/compose
|
||||
const getServiceId = (jobData: DeploymentJob): string => {
|
||||
// If it has a serverId, group by that server
|
||||
if (jobData.serverId) {
|
||||
return jobData.serverId;
|
||||
}
|
||||
});
|
||||
|
||||
// For local deployments (no serverId), group all under the main Dokploy server
|
||||
return "dokploy-server";
|
||||
};
|
||||
|
||||
// Compatibility functions to replace BullMQ usage
|
||||
export const myQueue = {
|
||||
add: async (
|
||||
_name: string,
|
||||
jobData: DeploymentJob,
|
||||
_options?: any,
|
||||
userId?: string,
|
||||
) => {
|
||||
const serviceId = getServiceId(jobData);
|
||||
const jobId = await addDeploymentJob(serviceId, jobData, userId);
|
||||
console.log(`Added deployment job ${jobId} to service ${serviceId}`);
|
||||
return { id: jobId };
|
||||
},
|
||||
|
||||
close: () => {
|
||||
console.log("Service queue manager shutdown initiated");
|
||||
return Promise.resolve();
|
||||
},
|
||||
};
|
||||
|
||||
export const cleanQueuesByApplication = async (applicationId: string) => {
|
||||
const jobs = await myQueue.getJobs(["waiting", "delayed"]);
|
||||
// Cancel jobs for this specific application across all servers
|
||||
let totalCancelled = 0;
|
||||
|
||||
for (const job of jobs) {
|
||||
if (job?.data?.applicationId === applicationId) {
|
||||
await job.remove();
|
||||
console.log(`Removed job ${job.id} for application ${applicationId}`);
|
||||
}
|
||||
}
|
||||
// Check the local Dokploy server
|
||||
const localCancelled = cancelDeploymentJobs(
|
||||
"dokploy-server",
|
||||
applicationId,
|
||||
undefined,
|
||||
);
|
||||
totalCancelled += localCancelled;
|
||||
|
||||
// TODO: Also check remote servers if we need to track which servers have this application
|
||||
// For now, we only clean from the local server queue
|
||||
|
||||
console.log(
|
||||
`Cancelled ${totalCancelled} jobs for application ${applicationId}`,
|
||||
);
|
||||
return totalCancelled;
|
||||
};
|
||||
|
||||
export const cleanQueuesByCompose = async (composeId: string) => {
|
||||
const jobs = await myQueue.getJobs(["waiting", "delayed"]);
|
||||
// Cancel jobs for this specific compose across all servers
|
||||
let totalCancelled = 0;
|
||||
|
||||
for (const job of jobs) {
|
||||
if (job?.data?.composeId === composeId) {
|
||||
await job.remove();
|
||||
console.log(`Removed job ${job.id} for compose ${composeId}`);
|
||||
}
|
||||
}
|
||||
// Check the local Dokploy server
|
||||
const localCancelled = cancelDeploymentJobs(
|
||||
"dokploy-server",
|
||||
undefined,
|
||||
composeId,
|
||||
);
|
||||
totalCancelled += localCancelled;
|
||||
|
||||
// TODO: Also check remote servers if we need to track which servers have this compose
|
||||
// For now, we only clean from the local server queue
|
||||
|
||||
console.log(`Cancelled ${totalCancelled} jobs for compose ${composeId}`);
|
||||
return totalCancelled;
|
||||
};
|
||||
|
||||
export { myQueue };
|
||||
// Export queue status for monitoring
|
||||
export const getQueueStatus = getDeploymentQueueStatus;
|
||||
|
||||
// New function to add jobs with user context (for API routes)
|
||||
export const addJobWithUserContext = async (
|
||||
jobData: DeploymentJob,
|
||||
userId?: string,
|
||||
): Promise<{ id: string }> => {
|
||||
const serviceId = getServiceId(jobData);
|
||||
const jobId = await addDeploymentJob(serviceId, jobData, userId);
|
||||
console.log(
|
||||
`Added deployment job ${jobId} to service ${serviceId} with user context ${userId || "none"}`,
|
||||
);
|
||||
return { id: jobId };
|
||||
};
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
import type { ConnectionOptions } from "bullmq";
|
||||
|
||||
export const redisConfig: ConnectionOptions = {
|
||||
host:
|
||||
process.env.NODE_ENV === "production"
|
||||
? process.env.REDIS_HOST || "dokploy-redis"
|
||||
: "127.0.0.1",
|
||||
};
|
||||
500
apps/dokploy/server/queues/service-queue.ts
Normal file
500
apps/dokploy/server/queues/service-queue.ts
Normal file
@@ -0,0 +1,500 @@
|
||||
import {
|
||||
deployApplication,
|
||||
deployCompose,
|
||||
deployPreviewApplication,
|
||||
deployRemoteApplication,
|
||||
deployRemoteCompose,
|
||||
deployRemotePreviewApplication,
|
||||
findServerById,
|
||||
rebuildApplication,
|
||||
rebuildCompose,
|
||||
rebuildRemoteApplication,
|
||||
rebuildRemoteCompose,
|
||||
updateApplicationStatus,
|
||||
updateCompose,
|
||||
updatePreviewDeployment,
|
||||
} from "@dokploy/server";
|
||||
import { db } from "@dokploy/server/db";
|
||||
import { users_temp } from "@dokploy/server/db/schema";
|
||||
import { eq } from "drizzle-orm";
|
||||
import pLimit from "p-limit";
|
||||
import type { DeploymentJob } from "./queue-types";
|
||||
|
||||
// Types for our p-limit based queue system
|
||||
export interface QueueJob {
|
||||
id: string;
|
||||
data: DeploymentJob;
|
||||
createdAt: Date;
|
||||
status: "waiting" | "processing" | "completed" | "failed" | "cancelled";
|
||||
abortController: AbortController;
|
||||
promise?: Promise<void>;
|
||||
}
|
||||
|
||||
export interface ServiceQueue {
|
||||
serviceId: string;
|
||||
jobs: QueueJob[];
|
||||
limit: ReturnType<typeof pLimit>; // p-limit instance with concurrency 1
|
||||
}
|
||||
|
||||
// Global queue management using p-limit
|
||||
class ServiceQueueManager {
|
||||
private queues: Map<string, ServiceQueue> = new Map();
|
||||
private globalLimit: ReturnType<typeof pLimit>;
|
||||
private isShuttingDown = false;
|
||||
|
||||
constructor(globalConcurrency = 3) {
|
||||
// Global limit controls how many services can deploy simultaneously
|
||||
this.globalLimit = pLimit(globalConcurrency);
|
||||
this.setupShutdownHandlers();
|
||||
}
|
||||
|
||||
// Set global concurrency (how many services can deploy simultaneously)
|
||||
setGlobalConcurrency(concurrency: number) {
|
||||
this.globalLimit = pLimit(concurrency);
|
||||
}
|
||||
|
||||
// Get concurrency settings from database
|
||||
private async getConcurrencySettings(jobData: DeploymentJob): Promise<{
|
||||
serviceConcurrency: number;
|
||||
}> {
|
||||
try {
|
||||
// Default: Each service processes 1 deployment at a time (FIFO within service)
|
||||
let serviceConcurrency = 1;
|
||||
|
||||
// If it's a server deployment, get server-specific concurrency
|
||||
// This controls how many deployments can run simultaneously ON THAT SERVER
|
||||
if (jobData.serverId) {
|
||||
try {
|
||||
const serverData = await findServerById(jobData.serverId);
|
||||
serviceConcurrency = serverData.concurrency || 1;
|
||||
console.log(
|
||||
`Server ${jobData.serverId} can handle ${serviceConcurrency} concurrent deployments`,
|
||||
);
|
||||
} catch (error) {
|
||||
console.warn(
|
||||
`Could not get server concurrency for ${jobData.serverId}, using default: 1`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
serviceConcurrency,
|
||||
};
|
||||
} catch (error) {
|
||||
console.warn(
|
||||
"Error getting concurrency settings, using defaults:",
|
||||
error,
|
||||
);
|
||||
return {
|
||||
serviceConcurrency: 1,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Get or create a queue for a service with dynamic concurrency
|
||||
private async getOrCreateQueue(
|
||||
serviceId: string,
|
||||
jobData?: DeploymentJob,
|
||||
): Promise<ServiceQueue> {
|
||||
if (!this.queues.has(serviceId)) {
|
||||
let serviceConcurrency = 1; // Default
|
||||
|
||||
// Get concurrency from database if we have job data
|
||||
if (jobData) {
|
||||
const settings = await this.getConcurrencySettings(jobData);
|
||||
serviceConcurrency = settings.serviceConcurrency;
|
||||
}
|
||||
|
||||
this.queues.set(serviceId, {
|
||||
serviceId,
|
||||
jobs: [],
|
||||
// Service concurrency from database or default to 1
|
||||
limit: pLimit(serviceConcurrency),
|
||||
});
|
||||
|
||||
console.log(
|
||||
`Created queue for service ${serviceId} with concurrency: ${serviceConcurrency}`,
|
||||
);
|
||||
}
|
||||
return this.queues.get(serviceId)!;
|
||||
}
|
||||
|
||||
// Add a job to a service queue
|
||||
async addJob(
|
||||
serviceId: string,
|
||||
jobData: DeploymentJob,
|
||||
userId?: string,
|
||||
): Promise<string> {
|
||||
if (this.isShuttingDown) {
|
||||
throw new Error("Queue manager is shutting down");
|
||||
}
|
||||
|
||||
// Update global concurrency based on user settings if provided
|
||||
// This controls the TOTAL number of deployments across ALL services for this user
|
||||
if (userId) {
|
||||
try {
|
||||
const userData = await db.query.users_temp.findFirst({
|
||||
where: eq(users_temp.id, userId),
|
||||
});
|
||||
|
||||
if (userData?.serverConcurrency) {
|
||||
// This is GLOBAL concurrency - total deployments across all services
|
||||
this.globalLimit = pLimit(userData.serverConcurrency);
|
||||
console.log(
|
||||
`Set GLOBAL concurrency to ${userData.serverConcurrency} deployments total for user ${userId}`,
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
console.warn(
|
||||
`Could not get user concurrency settings for ${userId}:`,
|
||||
error,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const queue = await this.getOrCreateQueue(serviceId, jobData);
|
||||
const jobId = `${serviceId}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
|
||||
|
||||
const job: QueueJob = {
|
||||
id: jobId,
|
||||
data: jobData,
|
||||
createdAt: new Date(),
|
||||
status: "waiting",
|
||||
abortController: new AbortController(),
|
||||
};
|
||||
|
||||
queue.jobs.push(job);
|
||||
console.log(
|
||||
`Added job ${jobId} to service ${serviceId} queue. Queue length: ${queue.jobs.length}`,
|
||||
);
|
||||
|
||||
// Start processing the job using p-limit
|
||||
this.processJob(queue, job);
|
||||
|
||||
return jobId;
|
||||
}
|
||||
|
||||
// Process a job using both global and service-level p-limit
|
||||
private processJob(queue: ServiceQueue, job: QueueJob) {
|
||||
// Use global limit to control cross-service concurrency
|
||||
job.promise = this.globalLimit(() =>
|
||||
// Use service limit to ensure ordered processing within service
|
||||
queue.limit(async () => {
|
||||
if (job.status === "cancelled" || this.isShuttingDown) {
|
||||
return;
|
||||
}
|
||||
|
||||
job.status = "processing";
|
||||
console.log(`Processing job ${job.id} for service ${queue.serviceId}`);
|
||||
|
||||
try {
|
||||
await this.executeJob(job);
|
||||
job.status = "completed";
|
||||
console.log(`Completed job ${job.id} for service ${queue.serviceId}`);
|
||||
} catch (error) {
|
||||
if (job.abortController.signal.aborted) {
|
||||
job.status = "cancelled";
|
||||
console.log(
|
||||
`Job ${job.id} was cancelled for service ${queue.serviceId}`,
|
||||
);
|
||||
} else {
|
||||
job.status = "failed";
|
||||
console.error(
|
||||
`Job ${job.id} failed for service ${queue.serviceId}:`,
|
||||
error,
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
// Clean up completed/failed jobs after a delay
|
||||
setTimeout(() => {
|
||||
queue.jobs = queue.jobs.filter((j) => j.id !== job.id);
|
||||
}, 5000);
|
||||
}
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
// Remove/cancel jobs for a specific service
|
||||
cancelJobsByService(
|
||||
serviceId: string,
|
||||
applicationId?: string,
|
||||
composeId?: string,
|
||||
): number {
|
||||
const queue = this.queues.get(serviceId);
|
||||
if (!queue) return 0;
|
||||
|
||||
let cancelledCount = 0;
|
||||
|
||||
// Cancel waiting and processing jobs
|
||||
for (const job of queue.jobs) {
|
||||
if (job.status === "waiting" || job.status === "processing") {
|
||||
// Check if this job matches the filter criteria
|
||||
const matchesApplication = applicationId
|
||||
? (job.data.applicationType === "application" ||
|
||||
job.data.applicationType === "application-preview") &&
|
||||
job.data.applicationId === applicationId
|
||||
: true;
|
||||
const matchesCompose = composeId
|
||||
? job.data.applicationType === "compose" &&
|
||||
job.data.composeId === composeId
|
||||
: true;
|
||||
|
||||
if (matchesApplication && matchesCompose) {
|
||||
job.status = "cancelled";
|
||||
job.abortController.abort();
|
||||
cancelledCount++;
|
||||
console.log(`Cancelled job ${job.id} for service ${serviceId}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove cancelled jobs from queue immediately
|
||||
queue.jobs = queue.jobs.filter((job) => job.status !== "cancelled");
|
||||
|
||||
return cancelledCount;
|
||||
}
|
||||
|
||||
// Get queue status for a service
|
||||
getQueueStatus(serviceId: string) {
|
||||
const queue = this.queues.get(serviceId);
|
||||
if (!queue) return null;
|
||||
|
||||
return {
|
||||
serviceId,
|
||||
totalJobs: queue.jobs.length,
|
||||
waitingJobs: queue.jobs.filter((j) => j.status === "waiting").length,
|
||||
processingJobs: queue.jobs.filter((j) => j.status === "processing")
|
||||
.length,
|
||||
completedJobs: queue.jobs.filter((j) => j.status === "completed").length,
|
||||
failedJobs: queue.jobs.filter((j) => j.status === "failed").length,
|
||||
// p-limit queue status
|
||||
activeCount: queue.limit.activeCount,
|
||||
pendingCount: queue.limit.pendingCount,
|
||||
};
|
||||
}
|
||||
|
||||
// Get all queues status
|
||||
getAllQueuesStatus() {
|
||||
const status: Record<string, any> = {};
|
||||
for (const [serviceId] of this.queues) {
|
||||
status[serviceId] = this.getQueueStatus(serviceId);
|
||||
}
|
||||
status.global = {
|
||||
activeCount: this.globalLimit.activeCount,
|
||||
pendingCount: this.globalLimit.pendingCount,
|
||||
concurrency: this.globalLimit.concurrency,
|
||||
};
|
||||
return status;
|
||||
}
|
||||
|
||||
// Clear pending jobs from a service queue using p-limit's clearQueue
|
||||
clearServiceQueue(serviceId: string) {
|
||||
const queue = this.queues.get(serviceId);
|
||||
if (queue) {
|
||||
// Cancel all waiting jobs
|
||||
for (const job of queue.jobs) {
|
||||
if (job.status === "waiting") {
|
||||
job.status = "cancelled";
|
||||
job.abortController.abort();
|
||||
}
|
||||
}
|
||||
|
||||
// Clear p-limit's internal queue
|
||||
queue.limit.clearQueue();
|
||||
|
||||
// Remove cancelled jobs
|
||||
queue.jobs = queue.jobs.filter((job) => job.status !== "cancelled");
|
||||
|
||||
console.log(`Cleared service queue for ${serviceId}`);
|
||||
}
|
||||
}
|
||||
|
||||
private async executeJob(job: QueueJob): Promise<void> {
|
||||
const { data } = job;
|
||||
|
||||
// Check if job was cancelled before execution
|
||||
if (job.abortController.signal.aborted) {
|
||||
throw new Error("Job was cancelled");
|
||||
}
|
||||
|
||||
try {
|
||||
if (data.applicationType === "application") {
|
||||
await updateApplicationStatus(data.applicationId, "running");
|
||||
|
||||
if (data.server) {
|
||||
if (data.type === "redeploy") {
|
||||
await rebuildRemoteApplication({
|
||||
applicationId: data.applicationId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
} else if (data.type === "deploy") {
|
||||
await deployRemoteApplication({
|
||||
applicationId: data.applicationId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (data.type === "redeploy") {
|
||||
await rebuildApplication({
|
||||
applicationId: data.applicationId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
} else if (data.type === "deploy") {
|
||||
await deployApplication({
|
||||
applicationId: data.applicationId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if (data.applicationType === "compose") {
|
||||
await updateCompose(data.composeId, {
|
||||
composeStatus: "running",
|
||||
});
|
||||
|
||||
if (data.server) {
|
||||
if (data.type === "redeploy") {
|
||||
await rebuildRemoteCompose({
|
||||
composeId: data.composeId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
} else if (data.type === "deploy") {
|
||||
await deployRemoteCompose({
|
||||
composeId: data.composeId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (data.type === "deploy") {
|
||||
await deployCompose({
|
||||
composeId: data.composeId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
} else if (data.type === "redeploy") {
|
||||
await rebuildCompose({
|
||||
composeId: data.composeId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if (data.applicationType === "application-preview") {
|
||||
await updatePreviewDeployment(data.previewDeploymentId, {
|
||||
previewStatus: "running",
|
||||
});
|
||||
if (data.server) {
|
||||
if (data.type === "deploy") {
|
||||
await deployRemotePreviewApplication({
|
||||
applicationId: data.applicationId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
previewDeploymentId: data.previewDeploymentId,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
if (data.type === "deploy") {
|
||||
await deployPreviewApplication({
|
||||
applicationId: data.applicationId,
|
||||
titleLog: data.titleLog,
|
||||
descriptionLog: data.descriptionLog,
|
||||
previewDeploymentId: data.previewDeploymentId,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.log("Deployment Error", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private setupShutdownHandlers() {
|
||||
const gracefulShutdown = async () => {
|
||||
console.log("Shutting down service queue manager...");
|
||||
this.isShuttingDown = true;
|
||||
|
||||
// Cancel all jobs
|
||||
for (const queue of this.queues.values()) {
|
||||
for (const job of queue.jobs) {
|
||||
job.abortController.abort();
|
||||
}
|
||||
// Clear p-limit queues
|
||||
queue.limit.clearQueue();
|
||||
}
|
||||
|
||||
// Clear global queue
|
||||
this.globalLimit.clearQueue();
|
||||
|
||||
// Wait a bit for jobs to finish cancelling
|
||||
await new Promise((resolve) => setTimeout(resolve, 2000));
|
||||
process.exit(0);
|
||||
};
|
||||
|
||||
process.on("SIGTERM", gracefulShutdown);
|
||||
process.on("SIGINT", gracefulShutdown);
|
||||
}
|
||||
|
||||
// Remove a specific service queue entirely
|
||||
removeServiceQueue(serviceId: string) {
|
||||
const queue = this.queues.get(serviceId);
|
||||
if (queue) {
|
||||
// Cancel all jobs in the queue
|
||||
for (const job of queue.jobs) {
|
||||
job.abortController.abort();
|
||||
}
|
||||
// Clear p-limit queue
|
||||
queue.limit.clearQueue();
|
||||
this.queues.delete(serviceId);
|
||||
console.log(`Removed service queue for ${serviceId}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Global instance
|
||||
export const serviceQueueManager = new ServiceQueueManager();
|
||||
|
||||
// Helper functions to maintain compatibility with existing code
|
||||
export const addDeploymentJob = async (
|
||||
serviceId: string,
|
||||
jobData: DeploymentJob,
|
||||
userId?: string,
|
||||
): Promise<string> => {
|
||||
return await serviceQueueManager.addJob(serviceId, jobData, userId);
|
||||
};
|
||||
|
||||
export const cancelDeploymentJobs = (
|
||||
serviceId: string,
|
||||
applicationId?: string,
|
||||
composeId?: string,
|
||||
): number => {
|
||||
return serviceQueueManager.cancelJobsByService(
|
||||
serviceId,
|
||||
applicationId,
|
||||
composeId,
|
||||
);
|
||||
};
|
||||
|
||||
export const getDeploymentQueueStatus = (serviceId?: string) => {
|
||||
if (serviceId) {
|
||||
return serviceQueueManager.getQueueStatus(serviceId);
|
||||
}
|
||||
return serviceQueueManager.getAllQueuesStatus();
|
||||
};
|
||||
|
||||
export const setGlobalConcurrency = (concurrency: number) => {
|
||||
serviceQueueManager.setGlobalConcurrency(concurrency);
|
||||
};
|
||||
|
||||
export const removeServiceQueue = (serviceId: string) => {
|
||||
serviceQueueManager.removeServiceQueue(serviceId);
|
||||
};
|
||||
|
||||
export const clearServiceQueue = (serviceId: string) => {
|
||||
serviceQueueManager.clearServiceQueue(serviceId);
|
||||
};
|
||||
@@ -111,4 +111,4 @@
|
||||
"node": "^20.16.0",
|
||||
"pnpm": ">=9.12.0"
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,14 +0,0 @@
|
||||
import { defineConfig } from "drizzle-kit";
|
||||
|
||||
export default defineConfig({
|
||||
schema: "./server/db/schema/index.ts",
|
||||
dialect: "postgresql",
|
||||
dbCredentials: {
|
||||
url: process.env.DATABASE_URL!,
|
||||
},
|
||||
out: "drizzle",
|
||||
migrations: {
|
||||
table: "migrations",
|
||||
schema: "public",
|
||||
},
|
||||
});
|
||||
@@ -1,21 +0,0 @@
|
||||
// import { drizzle } from "drizzle-orm/postgres-js";
|
||||
// import { migrate } from "drizzle-orm/postgres-js/migrator";
|
||||
// import postgres from "postgres";
|
||||
|
||||
// const connectionString = process.env.DATABASE_URL!;
|
||||
|
||||
// const sql = postgres(connectionString, { max: 1 });
|
||||
// const db = drizzle(sql);
|
||||
|
||||
// export const migration = async () =>
|
||||
// await migrate(db, { migrationsFolder: "drizzle" })
|
||||
// .then(() => {
|
||||
// console.log("Migration complete");
|
||||
// sql.end();
|
||||
// })
|
||||
// .catch((error) => {
|
||||
// console.log("Migration failed", error);
|
||||
// })
|
||||
// .finally(() => {
|
||||
// sql.end();
|
||||
// });
|
||||
@@ -1,23 +0,0 @@
|
||||
import { sql } from "drizzle-orm";
|
||||
// Credits to Louistiti from Drizzle Discord: https://discord.com/channels/1043890932593987624/1130802621750448160/1143083373535973406
|
||||
import { drizzle } from "drizzle-orm/postgres-js";
|
||||
import postgres from "postgres";
|
||||
|
||||
const connectionString = process.env.DATABASE_URL!;
|
||||
|
||||
const pg = postgres(connectionString, { max: 1 });
|
||||
const db = drizzle(pg);
|
||||
|
||||
const clearDb = async (): Promise<void> => {
|
||||
try {
|
||||
const tablesQuery = sql<string>`DROP SCHEMA public CASCADE; CREATE SCHEMA public; DROP schema drizzle CASCADE;`;
|
||||
const tables = await db.execute(tablesQuery);
|
||||
console.log(tables);
|
||||
await pg.end();
|
||||
} catch (error) {
|
||||
console.error("Error cleaning database", error);
|
||||
} finally {
|
||||
}
|
||||
};
|
||||
|
||||
clearDb();
|
||||
@@ -48,6 +48,7 @@ export const server = pgTable("server", {
|
||||
sshKeyId: text("sshKeyId").references(() => sshKeys.sshKeyId, {
|
||||
onDelete: "set null",
|
||||
}),
|
||||
concurrency: integer("concurrency").notNull().default(1),
|
||||
metricsConfig: jsonb("metricsConfig")
|
||||
.$type<{
|
||||
server: {
|
||||
|
||||
@@ -62,6 +62,7 @@ export const users_temp = pgTable("user_temp", {
|
||||
// Metrics
|
||||
enablePaidFeatures: boolean("enablePaidFeatures").notNull().default(false),
|
||||
allowImpersonation: boolean("allowImpersonation").notNull().default(false),
|
||||
serverConcurrency: integer("serverConcurrency").notNull().default(1),
|
||||
metricsConfig: jsonb("metricsConfig")
|
||||
.$type<{
|
||||
server: {
|
||||
|
||||
@@ -1,35 +0,0 @@
|
||||
// import bc from "bcrypt";
|
||||
// import { drizzle } from "drizzle-orm/postgres-js";
|
||||
// import postgres from "postgres";
|
||||
// import { users } from "./schema";
|
||||
|
||||
// const connectionString = process.env.DATABASE_URL!;
|
||||
|
||||
// const pg = postgres(connectionString, { max: 1 });
|
||||
// const db = drizzle(pg);
|
||||
|
||||
// function password(txt: string) {
|
||||
// return bc.hashSync(txt, 10);
|
||||
// }
|
||||
|
||||
// async function seed() {
|
||||
// console.log("> Seed:", process.env.DATABASE_PATH, "\n");
|
||||
|
||||
// // const authenticationR = await db
|
||||
// // .insert(users)
|
||||
// // .values([
|
||||
// // {
|
||||
// // email: "user1@hotmail.com",
|
||||
// // password: password("12345671"),
|
||||
// // },
|
||||
// // ])
|
||||
// // .onConflictDoNothing()
|
||||
// // .returning();
|
||||
|
||||
// // console.log("\nSemillas Update:", authenticationR.length);
|
||||
// }
|
||||
|
||||
// seed().catch((e) => {
|
||||
// console.error(e);
|
||||
// process.exit(1);
|
||||
// });
|
||||
14
pnpm-lock.yaml
generated
14
pnpm-lock.yaml
generated
@@ -280,9 +280,6 @@ importers:
|
||||
boxen:
|
||||
specifier: ^7.1.1
|
||||
version: 7.1.1
|
||||
bullmq:
|
||||
specifier: 5.4.2
|
||||
version: 5.4.2
|
||||
class-variance-authority:
|
||||
specifier: ^0.7.1
|
||||
version: 0.7.1
|
||||
@@ -367,6 +364,9 @@ importers:
|
||||
otpauth:
|
||||
specifier: ^9.4.0
|
||||
version: 9.4.0
|
||||
p-limit:
|
||||
specifier: ^7.1.1
|
||||
version: 7.1.1
|
||||
pino:
|
||||
specifier: 9.4.0
|
||||
version: 9.4.0
|
||||
@@ -6412,6 +6412,10 @@ packages:
|
||||
resolution: {integrity: sha512-/Eaoq+QyLSiXQ4lyYV23f14mZRQcXnxfHrN0vCai+ak9G0pp9iEQukIIZq5NccEvwRB8PUnZT0KsOoDCINS1qQ==}
|
||||
engines: {node: '>=18'}
|
||||
|
||||
p-limit@7.1.1:
|
||||
resolution: {integrity: sha512-i8PyM2JnsNChVSYWLr2BAjNoLi0BAYC+wecOnZnVV+YSNJkzP7cWmvI34dk0WArWfH9KwBHNoZI3P3MppImlIA==}
|
||||
engines: {node: '>=20'}
|
||||
|
||||
p-locate@4.1.0:
|
||||
resolution: {integrity: sha512-R79ZZ/0wAxKGu3oYMlz8jy/kbhsNrS7SKZ7PxEHBgJ5+F2mtFW2fK2cOtBh1cHYkQsbzFV7I+EoRKe6Yt0oK7A==}
|
||||
engines: {node: '>=8'}
|
||||
@@ -13869,6 +13873,10 @@ snapshots:
|
||||
dependencies:
|
||||
yocto-queue: 1.2.1
|
||||
|
||||
p-limit@7.1.1:
|
||||
dependencies:
|
||||
yocto-queue: 1.2.1
|
||||
|
||||
p-locate@4.1.0:
|
||||
dependencies:
|
||||
p-limit: 2.3.0
|
||||
|
||||
Reference in New Issue
Block a user