From 08c9113405fa3d2c28ef7a5981c43b15ba39bbe8 Mon Sep 17 00:00:00 2001 From: Mauricio Siu Date: Tue, 3 Mar 2026 01:04:26 -0600 Subject: [PATCH] feat: implement deployment jobs API and enhance queue management Added a new endpoint to fetch deployment jobs for a server, integrating with the Inngest API to retrieve job details. Updated the queue management system to support centralized job retrieval for cloud environments, improving the deployment monitoring experience. Enhanced the UI to include action buttons for job cancellation and improved error handling for job fetching. --- apps/api/src/index.ts | 25 +- apps/api/src/service.ts | 240 ++++++++++++++++++ .../deployments/show-queue-table.tsx | 70 ++++- apps/dokploy/pages/dashboard/deployments.tsx | 25 +- apps/dokploy/server/api/routers/deployment.ts | 72 ++++-- apps/dokploy/server/utils/deploy.ts | 31 +++ packages/server/src/services/deployment.ts | 35 +++ 7 files changed, 472 insertions(+), 26 deletions(-) create mode 100644 apps/api/src/service.ts diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 8ddb56dec..0bb6e1401 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -10,6 +10,7 @@ import { type DeployJob, deployJobSchema, } from "./schema.js"; +import { fetchDeploymentJobs } from "./service.js"; import { deploy } from "./utils.js"; const app = new Hono(); @@ -118,7 +119,6 @@ app.post("/deploy", zValidator("json", deployJobSchema), async (c) => { 200, ); } catch (error) { - console.log("error", error); logger.error("Failed to send deployment event", error); return c.json( { @@ -176,6 +176,29 @@ app.get("/health", async (c) => { return c.json({ status: "ok" }); }); +// List deployment jobs (Inngest runs) for a server - same shape as BullMQ queue for the UI +app.get("/jobs", async (c) => { + const serverId = c.req.query("serverId"); + if (!serverId) { + return c.json({ message: "serverId is required" }, 400); + } + + try { + const rows = await fetchDeploymentJobs(serverId); + return c.json(rows); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (message.includes("INNGEST_BASE_URL")) { + return c.json( + { message: "INNGEST_BASE_URL is required to list deployment jobs" }, + 503, + ); + } + logger.error("Failed to fetch jobs from Inngest", { serverId, error }); + return c.json([], 200); + } +}); + // Serve Inngest functions endpoint app.on( ["GET", "POST", "PUT"], diff --git a/apps/api/src/service.ts b/apps/api/src/service.ts new file mode 100644 index 000000000..2890ecf6b --- /dev/null +++ b/apps/api/src/service.ts @@ -0,0 +1,240 @@ +import { logger } from "./logger"; + +const baseUrl = process.env.INNGEST_BASE_URL ?? ""; +const signingKey = process.env.INNGEST_SIGNING_KEY ?? ""; + +/** Event shape from GET /v1/events (https://api.inngest.com/v1/events) */ +type InngestEventRow = { + internal_id?: string; + accountID?: string; + environmentID?: string; + source?: string; + sourceID?: string | null; + /** RFC3339 timestamp – API uses receivedAt, dev server may use received_at */ + receivedAt?: string; + received_at?: string; + id: string; + name: string; + data: Record; + user?: unknown; + ts: number; + v?: string | null; + metadata?: { + fetchedAt: string; + cachedUntil: string | null; + }; +}; + +/** Run shape from GET /v1/events/{eventId}/runs – the actual job execution */ +type InngestRun = { + run_id: string; + event_id: string; + status: string; // "Running" | "Completed" | "Failed" | "Cancelled" | "Queued"? + run_started_at?: string; + ended_at?: string | null; + output?: unknown; + // dev server / API may use different casing + run_started_at_ms?: number; +}; + +function getEventReceivedAt(ev: InngestEventRow): string | undefined { + return ev.receivedAt ?? ev.received_at; +} + +/** Map Inngest run status to BullMQ-style state for the UI */ +function runStatusToState( + status: string, +): "pending" | "active" | "completed" | "failed" | "cancelled" { + const s = status.toLowerCase(); + if (s === "running") return "active"; + if (s === "completed") return "completed"; + if (s === "failed") return "failed"; + if (s === "cancelled") return "cancelled"; + if (s === "queued") return "pending"; + return "pending"; +} + +export const fetchInngestEvents = async () => { + const maxEvents = MAX_EVENTS; + const all: InngestEventRow[] = []; + let cursor: string | undefined; + + do { + const params = new URLSearchParams({ limit: "100" }); + if (cursor) { + params.set("cursor", cursor); + } + + const res = await fetch(`${baseUrl}/v1/events?${params}`, { + headers: { + Authorization: `Bearer ${signingKey}`, + "Content-Type": "application/json", + }, + }); + + if (!res.ok) { + logger.warn("Inngest API error", { + status: res.status, + body: await res.text(), + }); + break; + } + + const body = (await res.json()) as { + data?: InngestEventRow[]; + cursor?: string; + nextCursor?: string; + }; + const data = Array.isArray(body.data) ? body.data : []; + all.push(...data); + + // Next page: API may return cursor/nextCursor, or use last event's internal_id (per API docs) + const nextCursor = + body.cursor ?? body.nextCursor ?? data[data.length - 1]?.internal_id; + const hasMore = data.length === 100 && nextCursor && all.length < maxEvents; + cursor = hasMore ? nextCursor : undefined; + } while (cursor); + + return all.slice(0, maxEvents); +}; + +/** Fetch runs for a single event (GET /v1/events/{eventId}/runs) – runs are the actual jobs */ +export const fetchInngestRunsForEvent = async ( + eventId: string, +): Promise => { + const res = await fetch( + `${baseUrl}/v1/events/${encodeURIComponent(eventId)}/runs`, + { + headers: { + Authorization: `Bearer ${signingKey}`, + "Content-Type": "application/json", + }, + }, + ); + if (!res.ok) { + logger.warn("Inngest runs API error", { + eventId, + status: res.status, + body: await res.text(), + }); + return []; + } + const body = (await res.json()) as { data?: InngestRun[] }; + return Array.isArray(body.data) ? body.data : []; +}; + +/** One row for the queue UI (BullMQ-compatible shape) */ +export type DeploymentJobRow = { + id: string; + name: string; + data: Record; + timestamp: number; + processedOn?: number; + finishedOn?: number; + failedReason?: string; + state: string; +}; + +/** Build queue rows from events + their runs (one row per run, or pending if no run yet) */ +function buildDeploymentRowsFromRuns( + events: InngestEventRow[], + runsByEventId: Map, + serverId: string, +): DeploymentJobRow[] { + const requested = events.filter( + (e) => + e.name === "deployment/requested" && + (e.data as Record)?.serverId === serverId, + ); + const rows: DeploymentJobRow[] = []; + + for (const ev of requested) { + const data = (ev.data ?? {}) as Record; + const runs = runsByEventId.get(ev.id) ?? []; + + if (runs.length === 0) { + // Queued: event received but no run yet + rows.push({ + id: ev.id, + name: ev.name, + data, + timestamp: ev.ts, + processedOn: ev.ts, + finishedOn: undefined, + failedReason: undefined, + state: "pending", + }); + continue; + } + + for (const run of runs) { + const state = runStatusToState(run.status); + const runStartedMs = + run.run_started_at_ms ?? + (run.run_started_at ? new Date(run.run_started_at).getTime() : ev.ts); + const endedMs = run.ended_at + ? new Date(run.ended_at).getTime() + : undefined; + const failedReason = + state === "failed" && + run.output && + typeof run.output === "object" && + "error" in run.output + ? String((run.output as { error?: unknown }).error) + : undefined; + + rows.push({ + id: run.run_id, + name: ev.name, + data, + timestamp: runStartedMs, + processedOn: runStartedMs, + finishedOn: + state === "completed" || state === "failed" || state === "cancelled" + ? endedMs + : undefined, + failedReason, + state, + }); + } + } + + return rows.sort((a, b) => (b.timestamp ?? 0) - (a.timestamp ?? 0)); +} + +/** Fetch deployment jobs for a server: events → runs → rows (correct model: runs = jobs) */ +export const fetchDeploymentJobs = async ( + serverId: string, +): Promise => { + if (!signingKey) { + logger.warn("INNGEST_SIGNING_KEY not set, returning empty jobs list"); + return []; + } + if (!baseUrl) { + throw new Error("INNGEST_BASE_URL is required to list deployment jobs"); + } + + const events = await fetchInngestEvents(); + + const requestedForServer = events.filter( + (e) => + e.name === "deployment/requested" && + (e.data as Record)?.serverId === serverId, + ); + // Limit to avoid too many run fetches + const toFetch = requestedForServer.slice(0, 50); + const runsByEventId = new Map(); + + await Promise.all( + toFetch.map(async (ev) => { + const runs = await fetchInngestRunsForEvent(ev.id); + runsByEventId.set(ev.id, runs); + }), + ); + + return buildDeploymentRowsFromRuns(events, runsByEventId, serverId); +}; + +const DEFAULT_MAX_EVENTS = 500; + +const MAX_EVENTS = DEFAULT_MAX_EVENTS; diff --git a/apps/dokploy/components/dashboard/deployments/show-queue-table.tsx b/apps/dokploy/components/dashboard/deployments/show-queue-table.tsx index ad8f3d551..c9f6d9dde 100644 --- a/apps/dokploy/components/dashboard/deployments/show-queue-table.tsx +++ b/apps/dokploy/components/dashboard/deployments/show-queue-table.tsx @@ -1,8 +1,10 @@ "use client"; import type { inferRouterOutputs } from "@trpc/server"; -import { ListTodo, Loader2 } from "lucide-react"; +import Link from "next/link"; +import { ArrowRight, ListTodo, Loader2, XCircle } from "lucide-react"; import { Badge } from "@/components/ui/badge"; +import { Button } from "@/components/ui/button"; import { Table, TableBody, @@ -27,11 +29,13 @@ const stateVariants: Record< | "green" | "red" > = { + pending: "secondary", waiting: "secondary", active: "yellow", delayed: "outline", completed: "green", failed: "destructive", + cancelled: "outline", paused: "outline", }; @@ -62,11 +66,22 @@ function getJobLabel(row: QueueRow): string { } export function ShowQueueTable(props: { embedded?: boolean }) { - const { embedded = false } = props; + const { embedded: _embedded = false } = props; const { data: queueList, isLoading } = api.deployment.queueList.useQuery( undefined, { refetchInterval: 3000 }, ); + const { data: isCloud } = api.settings.isCloud.useQuery(); + const utils = api.useUtils(); + const { mutateAsync: cancelApplicationDeployment, isPending: isCancellingApp } = + api.application.cancelDeployment.useMutation({ + onSuccess: () => void utils.deployment.queueList.invalidate(), + }); + const { mutateAsync: cancelComposeDeployment, isPending: isCancellingCompose } = + api.compose.cancelDeployment.useMutation({ + onSuccess: () => void utils.deployment.queueList.invalidate(), + }); + const isCancelling = isCancellingApp || isCancellingCompose; return (
@@ -88,6 +103,7 @@ export function ShowQueueTable(props: { embedded?: boolean }) { Processed Finished Error + Actions @@ -95,6 +111,8 @@ export function ShowQueueTable(props: { embedded?: boolean }) { queueList.map((row) => { const d = row.data as Record; const appType = d?.applicationType as string | undefined; + const pathInfo = row.servicePath; + const hasLink = pathInfo?.href != null; return ( @@ -121,12 +139,58 @@ export function ShowQueueTable(props: { embedded?: boolean }) { {row.failedReason ?? "—"} + +
+ {hasLink ? ( + + ) : ( + + )} + {isCloud && + row.state === "active" && + (d?.applicationId != null || d?.composeId != null) && ( + + )} +
+
); }) ) : ( - +

Queue is empty

diff --git a/apps/dokploy/pages/dashboard/deployments.tsx b/apps/dokploy/pages/dashboard/deployments.tsx index 9d2ce4e53..744301abf 100644 --- a/apps/dokploy/pages/dashboard/deployments.tsx +++ b/apps/dokploy/pages/dashboard/deployments.tsx @@ -1,6 +1,7 @@ import { validateRequest } from "@dokploy/server/lib/auth"; import { Rocket } from "lucide-react"; import type { GetServerSidePropsContext } from "next"; +import { useRouter } from "next/router"; import type { ReactElement } from "react"; import { ShowDeploymentsTable } from "@/components/dashboard/deployments/show-deployments-table"; import { ShowQueueTable } from "@/components/dashboard/deployments/show-queue-table"; @@ -13,7 +14,29 @@ import { } from "@/components/ui/card"; import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs"; +const TAB_VALUES = ["deployments", "queue"] as const; +type TabValue = (typeof TAB_VALUES)[number]; + +function isValidTab(t: string): t is TabValue { + return TAB_VALUES.includes(t as TabValue); +} + function DeploymentsPage() { + const router = useRouter(); + const tab = + router.query.tab && isValidTab(router.query.tab as string) + ? (router.query.tab as TabValue) + : "deployments"; + + const setTab = (value: string) => { + if (!isValidTab(value)) return; + router.replace( + { pathname: "/dashboard/deployments", query: { tab: value } }, + undefined, + { shallow: true }, + ); + }; + return (
@@ -30,7 +53,7 @@ function DeploymentsPage() {
- + Deployments Queue diff --git a/apps/dokploy/server/api/routers/deployment.ts b/apps/dokploy/server/api/routers/deployment.ts index 5b1765bdb..bcf4ddbf7 100644 --- a/apps/dokploy/server/api/routers/deployment.ts +++ b/apps/dokploy/server/api/routers/deployment.ts @@ -10,7 +10,9 @@ import { findDeploymentById, findMemberById, findServerById, + IS_CLOUD, removeDeployment, + resolveServicePath, updateDeploymentStatus, } from "@dokploy/server"; import { db } from "@dokploy/server/db"; @@ -23,8 +25,13 @@ import { apiFindAllByServer, apiFindAllByType, deployments, + server, } from "@/server/db/schema"; import { myQueue } from "@/server/queues/queueSetup"; +import { + fetchDeployApiJobs, + type QueueJobRow, +} from "@/server/utils/deploy"; import { createTRPCRouter, protectedProcedure } from "../trpc"; export const deploymentRouter = createTRPCRouter({ @@ -83,26 +90,51 @@ export const deploymentRouter = createTRPCRouter({ return findAllDeploymentsCentralized(orgId, accessedServices); }), - queueList: protectedProcedure.query(async () => { - const jobs = await myQueue.getJobs(); - const rows = await Promise.all( - jobs.map(async (job) => { - const state = await job.getState(); - console.log(job.data); - return { - id: job.id, - name: job.name ?? undefined, - data: job.data as Record, - timestamp: job.timestamp, - processedOn: job.processedOn, - finishedOn: job.finishedOn, - failedReason: job.failedReason ?? undefined, - state, - }; - }), + queueList: protectedProcedure.query(async ({ ctx }) => { + const orgId = ctx.session.activeOrganizationId; + let rows: QueueJobRow[]; + + if (IS_CLOUD) { + const servers = await db.query.server.findMany({ + where: eq(server.organizationId, orgId), + columns: { serverId: true }, + }); + rows = []; + for (const { serverId } of servers) { + const serverRows = await fetchDeployApiJobs(serverId); + rows.push(...serverRows); + } + rows.sort((a, b) => (b.timestamp ?? 0) - (a.timestamp ?? 0)); + } else { + const jobs = await myQueue.getJobs(); + const jobRows = await Promise.all( + jobs.map(async (job) => { + const state = await job.getState(); + return { + id: String(job.id), + name: job.name ?? undefined, + data: job.data as Record, + timestamp: job.timestamp, + processedOn: job.processedOn, + finishedOn: job.finishedOn, + failedReason: job.failedReason ?? undefined, + state, + }; + }), + ); + jobRows.sort((a, b) => (b.timestamp ?? 0) - (a.timestamp ?? 0)); + rows = jobRows; + } + + return Promise.all( + rows.map(async (row) => ({ + ...row, + servicePath: await resolveServicePath( + orgId, + (row.data ?? {}) as Record, + ), + })), ); - rows.sort((a, b) => (b.timestamp ?? 0) - (a.timestamp ?? 0)); - return rows; }), allByType: protectedProcedure @@ -115,10 +147,8 @@ export const deploymentRouter = createTRPCRouter({ rollback: true, }, }); - return deploymentsList; }), - killProcess: protectedProcedure .input( z.object({ diff --git a/apps/dokploy/server/utils/deploy.ts b/apps/dokploy/server/utils/deploy.ts index f4591e3b3..bb429002a 100644 --- a/apps/dokploy/server/utils/deploy.ts +++ b/apps/dokploy/server/utils/deploy.ts @@ -50,3 +50,34 @@ export const cancelDeployment = async (cancelData: CancelDeploymentData) => { throw error; } }; + +export type QueueJobRow = { + id: string; + name?: string; + data: Record; + timestamp?: number; + processedOn?: number; + finishedOn?: number; + failedReason?: string; + state: string; +}; + +export const fetchDeployApiJobs = async ( + serverId: string, +): Promise => { + try { + const res = await fetch( + `${process.env.SERVER_URL}/jobs?serverId=${encodeURIComponent(serverId)}`, + { + headers: { + "Content-Type": "application/json", + "X-API-Key": process.env.API_KEY || "NO-DEFINED", + }, + }, + ); + if (!res.ok) return []; + return (await res.json()) as QueueJobRow[]; + } catch { + return []; + } +}; diff --git a/packages/server/src/services/deployment.ts b/packages/server/src/services/deployment.ts index 5d7a36f15..dbb632bf7 100644 --- a/packages/server/src/services/deployment.ts +++ b/packages/server/src/services/deployment.ts @@ -42,6 +42,41 @@ import { findScheduleById } from "./schedule"; import { findServerById, type Server } from "./server"; import { findVolumeBackupById } from "./volume-backups"; +export type ServicePath = { href: string | null; label: string }; + +export async function resolveServicePath( + orgId: string, + data: Record, +): Promise { + try { + const applicationId = data?.applicationId as string | undefined; + const composeId = data?.composeId as string | undefined; + if (applicationId) { + const app = await findApplicationById(applicationId); + if (app.environment.project.organizationId !== orgId) { + return { href: null, label: "Application" }; + } + return { + href: `/dashboard/project/${app.environment.project.projectId}/environment/${app.environment.environmentId}/services/application/${app.applicationId}`, + label: "Application", + }; + } + if (composeId) { + const comp = await findComposeById(composeId); + if (comp.environment.project.organizationId !== orgId) { + return { href: null, label: "Compose" }; + } + return { + href: `/dashboard/project/${comp.environment.project.projectId}/environment/${comp.environment.environmentId}/services/compose/${comp.composeId}`, + label: "Compose", + }; + } + } catch { + // not found or unauthorized + } + return { href: null, label: "—" }; +} + export type Deployment = typeof deployments.$inferSelect; export const findDeploymentById = async (deploymentId: string) => {