diff --git a/apps/api/src/index.ts b/apps/api/src/index.ts index 8ddb56dec..0bb6e1401 100644 --- a/apps/api/src/index.ts +++ b/apps/api/src/index.ts @@ -10,6 +10,7 @@ import { type DeployJob, deployJobSchema, } from "./schema.js"; +import { fetchDeploymentJobs } from "./service.js"; import { deploy } from "./utils.js"; const app = new Hono(); @@ -118,7 +119,6 @@ app.post("/deploy", zValidator("json", deployJobSchema), async (c) => { 200, ); } catch (error) { - console.log("error", error); logger.error("Failed to send deployment event", error); return c.json( { @@ -176,6 +176,29 @@ app.get("/health", async (c) => { return c.json({ status: "ok" }); }); +// List deployment jobs (Inngest runs) for a server - same shape as BullMQ queue for the UI +app.get("/jobs", async (c) => { + const serverId = c.req.query("serverId"); + if (!serverId) { + return c.json({ message: "serverId is required" }, 400); + } + + try { + const rows = await fetchDeploymentJobs(serverId); + return c.json(rows); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (message.includes("INNGEST_BASE_URL")) { + return c.json( + { message: "INNGEST_BASE_URL is required to list deployment jobs" }, + 503, + ); + } + logger.error("Failed to fetch jobs from Inngest", { serverId, error }); + return c.json([], 200); + } +}); + // Serve Inngest functions endpoint app.on( ["GET", "POST", "PUT"], diff --git a/apps/api/src/service.ts b/apps/api/src/service.ts new file mode 100644 index 000000000..2890ecf6b --- /dev/null +++ b/apps/api/src/service.ts @@ -0,0 +1,240 @@ +import { logger } from "./logger"; + +const baseUrl = process.env.INNGEST_BASE_URL ?? ""; +const signingKey = process.env.INNGEST_SIGNING_KEY ?? ""; + +/** Event shape from GET /v1/events (https://api.inngest.com/v1/events) */ +type InngestEventRow = { + internal_id?: string; + accountID?: string; + environmentID?: string; + source?: string; + sourceID?: string | null; + /** RFC3339 timestamp – API uses receivedAt, dev server may use received_at */ + receivedAt?: string; + received_at?: string; + id: string; + name: string; + data: Record; + user?: unknown; + ts: number; + v?: string | null; + metadata?: { + fetchedAt: string; + cachedUntil: string | null; + }; +}; + +/** Run shape from GET /v1/events/{eventId}/runs – the actual job execution */ +type InngestRun = { + run_id: string; + event_id: string; + status: string; // "Running" | "Completed" | "Failed" | "Cancelled" | "Queued"? + run_started_at?: string; + ended_at?: string | null; + output?: unknown; + // dev server / API may use different casing + run_started_at_ms?: number; +}; + +function getEventReceivedAt(ev: InngestEventRow): string | undefined { + return ev.receivedAt ?? ev.received_at; +} + +/** Map Inngest run status to BullMQ-style state for the UI */ +function runStatusToState( + status: string, +): "pending" | "active" | "completed" | "failed" | "cancelled" { + const s = status.toLowerCase(); + if (s === "running") return "active"; + if (s === "completed") return "completed"; + if (s === "failed") return "failed"; + if (s === "cancelled") return "cancelled"; + if (s === "queued") return "pending"; + return "pending"; +} + +export const fetchInngestEvents = async () => { + const maxEvents = MAX_EVENTS; + const all: InngestEventRow[] = []; + let cursor: string | undefined; + + do { + const params = new URLSearchParams({ limit: "100" }); + if (cursor) { + params.set("cursor", cursor); + } + + const res = await fetch(`${baseUrl}/v1/events?${params}`, { + headers: { + Authorization: `Bearer ${signingKey}`, + "Content-Type": "application/json", + }, + }); + + if (!res.ok) { + logger.warn("Inngest API error", { + status: res.status, + body: await res.text(), + }); + break; + } + + const body = (await res.json()) as { + data?: InngestEventRow[]; + cursor?: string; + nextCursor?: string; + }; + const data = Array.isArray(body.data) ? body.data : []; + all.push(...data); + + // Next page: API may return cursor/nextCursor, or use last event's internal_id (per API docs) + const nextCursor = + body.cursor ?? body.nextCursor ?? data[data.length - 1]?.internal_id; + const hasMore = data.length === 100 && nextCursor && all.length < maxEvents; + cursor = hasMore ? nextCursor : undefined; + } while (cursor); + + return all.slice(0, maxEvents); +}; + +/** Fetch runs for a single event (GET /v1/events/{eventId}/runs) – runs are the actual jobs */ +export const fetchInngestRunsForEvent = async ( + eventId: string, +): Promise => { + const res = await fetch( + `${baseUrl}/v1/events/${encodeURIComponent(eventId)}/runs`, + { + headers: { + Authorization: `Bearer ${signingKey}`, + "Content-Type": "application/json", + }, + }, + ); + if (!res.ok) { + logger.warn("Inngest runs API error", { + eventId, + status: res.status, + body: await res.text(), + }); + return []; + } + const body = (await res.json()) as { data?: InngestRun[] }; + return Array.isArray(body.data) ? body.data : []; +}; + +/** One row for the queue UI (BullMQ-compatible shape) */ +export type DeploymentJobRow = { + id: string; + name: string; + data: Record; + timestamp: number; + processedOn?: number; + finishedOn?: number; + failedReason?: string; + state: string; +}; + +/** Build queue rows from events + their runs (one row per run, or pending if no run yet) */ +function buildDeploymentRowsFromRuns( + events: InngestEventRow[], + runsByEventId: Map, + serverId: string, +): DeploymentJobRow[] { + const requested = events.filter( + (e) => + e.name === "deployment/requested" && + (e.data as Record)?.serverId === serverId, + ); + const rows: DeploymentJobRow[] = []; + + for (const ev of requested) { + const data = (ev.data ?? {}) as Record; + const runs = runsByEventId.get(ev.id) ?? []; + + if (runs.length === 0) { + // Queued: event received but no run yet + rows.push({ + id: ev.id, + name: ev.name, + data, + timestamp: ev.ts, + processedOn: ev.ts, + finishedOn: undefined, + failedReason: undefined, + state: "pending", + }); + continue; + } + + for (const run of runs) { + const state = runStatusToState(run.status); + const runStartedMs = + run.run_started_at_ms ?? + (run.run_started_at ? new Date(run.run_started_at).getTime() : ev.ts); + const endedMs = run.ended_at + ? new Date(run.ended_at).getTime() + : undefined; + const failedReason = + state === "failed" && + run.output && + typeof run.output === "object" && + "error" in run.output + ? String((run.output as { error?: unknown }).error) + : undefined; + + rows.push({ + id: run.run_id, + name: ev.name, + data, + timestamp: runStartedMs, + processedOn: runStartedMs, + finishedOn: + state === "completed" || state === "failed" || state === "cancelled" + ? endedMs + : undefined, + failedReason, + state, + }); + } + } + + return rows.sort((a, b) => (b.timestamp ?? 0) - (a.timestamp ?? 0)); +} + +/** Fetch deployment jobs for a server: events → runs → rows (correct model: runs = jobs) */ +export const fetchDeploymentJobs = async ( + serverId: string, +): Promise => { + if (!signingKey) { + logger.warn("INNGEST_SIGNING_KEY not set, returning empty jobs list"); + return []; + } + if (!baseUrl) { + throw new Error("INNGEST_BASE_URL is required to list deployment jobs"); + } + + const events = await fetchInngestEvents(); + + const requestedForServer = events.filter( + (e) => + e.name === "deployment/requested" && + (e.data as Record)?.serverId === serverId, + ); + // Limit to avoid too many run fetches + const toFetch = requestedForServer.slice(0, 50); + const runsByEventId = new Map(); + + await Promise.all( + toFetch.map(async (ev) => { + const runs = await fetchInngestRunsForEvent(ev.id); + runsByEventId.set(ev.id, runs); + }), + ); + + return buildDeploymentRowsFromRuns(events, runsByEventId, serverId); +}; + +const DEFAULT_MAX_EVENTS = 500; + +const MAX_EVENTS = DEFAULT_MAX_EVENTS; diff --git a/apps/dokploy/components/dashboard/deployments/show-queue-table.tsx b/apps/dokploy/components/dashboard/deployments/show-queue-table.tsx index ad8f3d551..c9f6d9dde 100644 --- a/apps/dokploy/components/dashboard/deployments/show-queue-table.tsx +++ b/apps/dokploy/components/dashboard/deployments/show-queue-table.tsx @@ -1,8 +1,10 @@ "use client"; import type { inferRouterOutputs } from "@trpc/server"; -import { ListTodo, Loader2 } from "lucide-react"; +import Link from "next/link"; +import { ArrowRight, ListTodo, Loader2, XCircle } from "lucide-react"; import { Badge } from "@/components/ui/badge"; +import { Button } from "@/components/ui/button"; import { Table, TableBody, @@ -27,11 +29,13 @@ const stateVariants: Record< | "green" | "red" > = { + pending: "secondary", waiting: "secondary", active: "yellow", delayed: "outline", completed: "green", failed: "destructive", + cancelled: "outline", paused: "outline", }; @@ -62,11 +66,22 @@ function getJobLabel(row: QueueRow): string { } export function ShowQueueTable(props: { embedded?: boolean }) { - const { embedded = false } = props; + const { embedded: _embedded = false } = props; const { data: queueList, isLoading } = api.deployment.queueList.useQuery( undefined, { refetchInterval: 3000 }, ); + const { data: isCloud } = api.settings.isCloud.useQuery(); + const utils = api.useUtils(); + const { mutateAsync: cancelApplicationDeployment, isPending: isCancellingApp } = + api.application.cancelDeployment.useMutation({ + onSuccess: () => void utils.deployment.queueList.invalidate(), + }); + const { mutateAsync: cancelComposeDeployment, isPending: isCancellingCompose } = + api.compose.cancelDeployment.useMutation({ + onSuccess: () => void utils.deployment.queueList.invalidate(), + }); + const isCancelling = isCancellingApp || isCancellingCompose; return (
@@ -88,6 +103,7 @@ export function ShowQueueTable(props: { embedded?: boolean }) { Processed Finished Error + Actions @@ -95,6 +111,8 @@ export function ShowQueueTable(props: { embedded?: boolean }) { queueList.map((row) => { const d = row.data as Record; const appType = d?.applicationType as string | undefined; + const pathInfo = row.servicePath; + const hasLink = pathInfo?.href != null; return ( @@ -121,12 +139,58 @@ export function ShowQueueTable(props: { embedded?: boolean }) { {row.failedReason ?? "—"} + +
+ {hasLink ? ( + + ) : ( + + )} + {isCloud && + row.state === "active" && + (d?.applicationId != null || d?.composeId != null) && ( + + )} +
+
); }) ) : ( - +

Queue is empty

diff --git a/apps/dokploy/pages/dashboard/deployments.tsx b/apps/dokploy/pages/dashboard/deployments.tsx index 9d2ce4e53..744301abf 100644 --- a/apps/dokploy/pages/dashboard/deployments.tsx +++ b/apps/dokploy/pages/dashboard/deployments.tsx @@ -1,6 +1,7 @@ import { validateRequest } from "@dokploy/server/lib/auth"; import { Rocket } from "lucide-react"; import type { GetServerSidePropsContext } from "next"; +import { useRouter } from "next/router"; import type { ReactElement } from "react"; import { ShowDeploymentsTable } from "@/components/dashboard/deployments/show-deployments-table"; import { ShowQueueTable } from "@/components/dashboard/deployments/show-queue-table"; @@ -13,7 +14,29 @@ import { } from "@/components/ui/card"; import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs"; +const TAB_VALUES = ["deployments", "queue"] as const; +type TabValue = (typeof TAB_VALUES)[number]; + +function isValidTab(t: string): t is TabValue { + return TAB_VALUES.includes(t as TabValue); +} + function DeploymentsPage() { + const router = useRouter(); + const tab = + router.query.tab && isValidTab(router.query.tab as string) + ? (router.query.tab as TabValue) + : "deployments"; + + const setTab = (value: string) => { + if (!isValidTab(value)) return; + router.replace( + { pathname: "/dashboard/deployments", query: { tab: value } }, + undefined, + { shallow: true }, + ); + }; + return (
@@ -30,7 +53,7 @@ function DeploymentsPage() {
- + Deployments Queue diff --git a/apps/dokploy/server/api/routers/deployment.ts b/apps/dokploy/server/api/routers/deployment.ts index 5b1765bdb..bcf4ddbf7 100644 --- a/apps/dokploy/server/api/routers/deployment.ts +++ b/apps/dokploy/server/api/routers/deployment.ts @@ -10,7 +10,9 @@ import { findDeploymentById, findMemberById, findServerById, + IS_CLOUD, removeDeployment, + resolveServicePath, updateDeploymentStatus, } from "@dokploy/server"; import { db } from "@dokploy/server/db"; @@ -23,8 +25,13 @@ import { apiFindAllByServer, apiFindAllByType, deployments, + server, } from "@/server/db/schema"; import { myQueue } from "@/server/queues/queueSetup"; +import { + fetchDeployApiJobs, + type QueueJobRow, +} from "@/server/utils/deploy"; import { createTRPCRouter, protectedProcedure } from "../trpc"; export const deploymentRouter = createTRPCRouter({ @@ -83,26 +90,51 @@ export const deploymentRouter = createTRPCRouter({ return findAllDeploymentsCentralized(orgId, accessedServices); }), - queueList: protectedProcedure.query(async () => { - const jobs = await myQueue.getJobs(); - const rows = await Promise.all( - jobs.map(async (job) => { - const state = await job.getState(); - console.log(job.data); - return { - id: job.id, - name: job.name ?? undefined, - data: job.data as Record, - timestamp: job.timestamp, - processedOn: job.processedOn, - finishedOn: job.finishedOn, - failedReason: job.failedReason ?? undefined, - state, - }; - }), + queueList: protectedProcedure.query(async ({ ctx }) => { + const orgId = ctx.session.activeOrganizationId; + let rows: QueueJobRow[]; + + if (IS_CLOUD) { + const servers = await db.query.server.findMany({ + where: eq(server.organizationId, orgId), + columns: { serverId: true }, + }); + rows = []; + for (const { serverId } of servers) { + const serverRows = await fetchDeployApiJobs(serverId); + rows.push(...serverRows); + } + rows.sort((a, b) => (b.timestamp ?? 0) - (a.timestamp ?? 0)); + } else { + const jobs = await myQueue.getJobs(); + const jobRows = await Promise.all( + jobs.map(async (job) => { + const state = await job.getState(); + return { + id: String(job.id), + name: job.name ?? undefined, + data: job.data as Record, + timestamp: job.timestamp, + processedOn: job.processedOn, + finishedOn: job.finishedOn, + failedReason: job.failedReason ?? undefined, + state, + }; + }), + ); + jobRows.sort((a, b) => (b.timestamp ?? 0) - (a.timestamp ?? 0)); + rows = jobRows; + } + + return Promise.all( + rows.map(async (row) => ({ + ...row, + servicePath: await resolveServicePath( + orgId, + (row.data ?? {}) as Record, + ), + })), ); - rows.sort((a, b) => (b.timestamp ?? 0) - (a.timestamp ?? 0)); - return rows; }), allByType: protectedProcedure @@ -115,10 +147,8 @@ export const deploymentRouter = createTRPCRouter({ rollback: true, }, }); - return deploymentsList; }), - killProcess: protectedProcedure .input( z.object({ diff --git a/apps/dokploy/server/utils/deploy.ts b/apps/dokploy/server/utils/deploy.ts index f4591e3b3..bb429002a 100644 --- a/apps/dokploy/server/utils/deploy.ts +++ b/apps/dokploy/server/utils/deploy.ts @@ -50,3 +50,34 @@ export const cancelDeployment = async (cancelData: CancelDeploymentData) => { throw error; } }; + +export type QueueJobRow = { + id: string; + name?: string; + data: Record; + timestamp?: number; + processedOn?: number; + finishedOn?: number; + failedReason?: string; + state: string; +}; + +export const fetchDeployApiJobs = async ( + serverId: string, +): Promise => { + try { + const res = await fetch( + `${process.env.SERVER_URL}/jobs?serverId=${encodeURIComponent(serverId)}`, + { + headers: { + "Content-Type": "application/json", + "X-API-Key": process.env.API_KEY || "NO-DEFINED", + }, + }, + ); + if (!res.ok) return []; + return (await res.json()) as QueueJobRow[]; + } catch { + return []; + } +}; diff --git a/packages/server/src/services/deployment.ts b/packages/server/src/services/deployment.ts index 5d7a36f15..dbb632bf7 100644 --- a/packages/server/src/services/deployment.ts +++ b/packages/server/src/services/deployment.ts @@ -42,6 +42,41 @@ import { findScheduleById } from "./schedule"; import { findServerById, type Server } from "./server"; import { findVolumeBackupById } from "./volume-backups"; +export type ServicePath = { href: string | null; label: string }; + +export async function resolveServicePath( + orgId: string, + data: Record, +): Promise { + try { + const applicationId = data?.applicationId as string | undefined; + const composeId = data?.composeId as string | undefined; + if (applicationId) { + const app = await findApplicationById(applicationId); + if (app.environment.project.organizationId !== orgId) { + return { href: null, label: "Application" }; + } + return { + href: `/dashboard/project/${app.environment.project.projectId}/environment/${app.environment.environmentId}/services/application/${app.applicationId}`, + label: "Application", + }; + } + if (composeId) { + const comp = await findComposeById(composeId); + if (comp.environment.project.organizationId !== orgId) { + return { href: null, label: "Compose" }; + } + return { + href: `/dashboard/project/${comp.environment.project.projectId}/environment/${comp.environment.environmentId}/services/compose/${comp.composeId}`, + label: "Compose", + }; + } + } catch { + // not found or unauthorized + } + return { href: null, label: "—" }; +} + export type Deployment = typeof deployments.$inferSelect; export const findDeploymentById = async (deploymentId: string) => {