Skip to content

Commit 9cb6fd1

Browse files
authored
fix(webapp): idempotent DeploymentBackgroundWorker creation (#3772)
Make `POST /api/v1/deployments/:deploymentId/background-workers` idempotent so client-side retries no longer collide on the `BackgroundWorker` `(project, env, version)` unique index. Helps make deployments more resilient against the class of indexing failures that surfaces in the dashboard as "Indexing timed out", e.g. during transient database issues.
1 parent c043c4a commit 9cb6fd1

6 files changed

Lines changed: 405 additions & 52 deletions
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Make `POST /api/v1/deployments/:deploymentId/background-workers` idempotent (for sequential requests only) so client-side retries no longer collide on the `BackgroundWorker` `(project, env, version)` unique index. Helps make deployments more resilient against the class of indexing failures that surfaces in the dashboard as "Indexing timed out", e.g. during transient database issues.

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -34,31 +34,8 @@ import { tryCatch } from "@trigger.dev/core/v3";
3434
import { engine } from "../runEngine.server";
3535
import { scheduleEngine } from "../scheduleEngine.server";
3636

37-
/**
38-
* Strip BackgroundWorkerMetadata down to the slice that's actually read after
39-
* storage. Everything else is duplicated to dedicated columns/tables
40-
* (BackgroundWorker.{contentHash,cliVersion,sdkVersion,runtime,runtimeVersion},
41-
* BackgroundWorkerTask, BackgroundWorkerFile, TaskQueue, Prompt). Today the
42-
* only post-write reader is changeCurrentDeployment.server.ts, which feeds
43-
* tasks[].schedule into syncDeclarativeSchedules. packageVersion, contentHash,
44-
* and tasks[].filePath are kept solely to satisfy BackgroundWorkerMetadata's
45-
* required fields when the column is parsed back.
46-
*/
47-
export function stripBackgroundWorkerMetadataForStorage(
48-
metadata: BackgroundWorkerMetadata
49-
): Prisma.InputJsonValue {
50-
return {
51-
packageVersion: metadata.packageVersion,
52-
contentHash: metadata.contentHash,
53-
tasks: metadata.tasks
54-
.filter((t) => t.schedule)
55-
.map((t) => ({
56-
id: t.id,
57-
filePath: t.filePath,
58-
schedule: t.schedule,
59-
})),
60-
};
61-
}
37+
import { stripBackgroundWorkerMetadataForStorage } from "./stripBackgroundWorkerMetadataForStorage.server";
38+
export { stripBackgroundWorkerMetadataForStorage };
6239

6340
export class CreateBackgroundWorkerService extends BaseService {
6441
private readonly _taskMetaCache: TaskMetadataCache;
@@ -356,8 +333,13 @@ async function createWorkerTask(
356333
prisma: PrismaClientOrTransaction,
357334
tasksToBackgroundFiles?: Map<string, string>
358335
): Promise<TaskMetadataEntry | null> {
336+
// Hoisted so the P2002 catch branch can return the same entry shape.
337+
let queue: TaskQueue | undefined;
338+
let resolvedTriggerSource: "SCHEDULED" | "AGENT" | "STANDARD" | undefined;
339+
let resolvedTtl: string | null | undefined;
340+
359341
try {
360-
let queue = queues.find((queue) => queue.name === task.queue?.name);
342+
queue = queues.find((queue) => queue.name === task.queue?.name);
361343

362344
if (!queue) {
363345
// Create a TaskQueue
@@ -374,14 +356,14 @@ async function createWorkerTask(
374356
);
375357
}
376358

377-
const resolvedTriggerSource =
359+
resolvedTriggerSource =
378360
task.triggerSource === "schedule"
379361
? ("SCHEDULED" as const)
380362
: task.triggerSource === "agent"
381363
? ("AGENT" as const)
382364
: ("STANDARD" as const);
383365

384-
const resolvedTtl =
366+
resolvedTtl =
385367
typeof task.ttl === "number" ? stringifyDuration(task.ttl) ?? null : task.ttl ?? null;
386368

387369
await prisma.backgroundWorkerTask.create({
@@ -418,10 +400,26 @@ async function createWorkerTask(
418400
if (error instanceof Prisma.PrismaClientKnownRequestError) {
419401
// The error code for unique constraint violation in Prisma is P2002
420402
if (error.code === "P2002") {
421-
logger.warn("Task already exists", {
403+
// Retry landing after the first attempt's row was already written.
404+
const existing = await prisma.backgroundWorkerTask.findFirst({
405+
where: { workerId: worker.id, slug: task.id },
406+
select: { id: true },
407+
});
408+
409+
logger.warn("Attempted to recreate background worker task", {
422410
task,
423411
worker,
424412
});
413+
414+
if (existing && queue && resolvedTriggerSource && resolvedTtl !== undefined) {
415+
return {
416+
slug: task.id,
417+
ttl: resolvedTtl,
418+
triggerSource: resolvedTriggerSource,
419+
queueId: queue.id,
420+
queueName: queue.name,
421+
};
422+
}
425423
} else {
426424
logger.error("Prisma Error creating background worker task", {
427425
error: {

apps/webapp/app/v3/services/createDeploymentBackgroundWorkerV4.server.ts

Lines changed: 80 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
import { CreateBackgroundWorkerRequestBody, logger, tryCatch } from "@trigger.dev/core/v3";
2-
import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic";
3-
import type { BackgroundWorker, PrismaClientOrTransaction, WorkerDeployment } from "@trigger.dev/database";
2+
import type {
3+
BackgroundWorker,
4+
PrismaClientOrTransaction,
5+
WorkerDeployment,
6+
} from "@trigger.dev/database";
47
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
58
import { type TaskMetadataCache } from "~/services/taskMetadataCache.server";
69
import { taskMetadataCacheInstance } from "~/services/taskMetadataCacheInstance.server";
710
import { BaseService, ServiceValidationError } from "./baseService.server";
811
import {
912
createBackgroundFiles,
1013
createWorkerResources,
11-
stripBackgroundWorkerMetadataForStorage,
1214
syncDeclarativeSchedules,
1315
} from "./createBackgroundWorker.server";
16+
import { findOrCreateBackgroundWorker } from "./createDeploymentBackgroundWorkerV4/findOrCreateBackgroundWorker.server";
1417
import { TimeoutDeploymentService } from "./timeoutDeployment.server";
1518
import { env } from "~/env.server";
1619

@@ -50,6 +53,11 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService {
5053
});
5154

5255
if (!deployment) {
56+
logger.warn("createDeploymentBackgroundWorker: deployment not found", {
57+
deploymentId,
58+
environmentId: environment.id,
59+
projectId: environment.projectId,
60+
});
5361
return;
5462
}
5563

@@ -69,26 +77,44 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService {
6977
}
7078
}
7179

80+
// Late-retry idempotency: if a worker was registered by a prior fully-
81+
// successful attempt and the deployment already moved past BUILDING, return
82+
// that worker so the CLI can finalize instead of seeing a 5xx.
83+
if (deployment.workerId) {
84+
const linkedWorker = await this._prisma.backgroundWorker.findFirst({
85+
where: { id: deployment.workerId },
86+
});
87+
if (linkedWorker) {
88+
return linkedWorker;
89+
}
90+
}
91+
7292
if (deployment.status !== "BUILDING") {
93+
logger.warn("createDeploymentBackgroundWorker: deployment not in BUILDING state", {
94+
deploymentId,
95+
deploymentStatus: deployment.status,
96+
environmentId: environment.id,
97+
projectId: environment.projectId,
98+
});
7399
return;
74100
}
75101

76-
const backgroundWorker = await this._prisma.backgroundWorker.create({
77-
data: {
78-
...BackgroundWorkerId.generate(),
79-
version: deployment.version,
80-
runtimeEnvironmentId: environment.id,
81-
projectId: environment.projectId,
82-
metadata: stripBackgroundWorkerMetadataForStorage(body.metadata),
83-
contentHash: body.metadata.contentHash,
84-
cliVersion: body.metadata.cliPackageVersion,
85-
sdkVersion: body.metadata.packageVersion,
86-
supportsLazyAttempts: body.supportsLazyAttempts,
87-
engine: body.engine,
88-
runtime: body.metadata.runtime,
89-
runtimeVersion: body.metadata.runtimeVersion,
90-
},
91-
});
102+
const [findOrCreateError, backgroundWorker] = await tryCatch(
103+
findOrCreateBackgroundWorker(environment, deployment, body, this._prisma)
104+
);
105+
106+
if (findOrCreateError) {
107+
// Definitive failures (e.g. contentHash drift) surface as
108+
// `ServiceValidationError` — fail the deployment so the operator sees it
109+
// immediately instead of waiting 8 minutes for the timeout. Transient
110+
// races throw a plain `Error` and propagate as 5xx without failing.
111+
if (findOrCreateError instanceof ServiceValidationError) {
112+
// `#failBackgroundWorkerDeployment` already throws its argument; the
113+
// outer `throw` covers the non-SVE branch.
114+
await this.#failBackgroundWorkerDeployment(deployment, findOrCreateError);
115+
}
116+
throw findOrCreateError;
117+
}
92118

93119
//upgrade the project to engine "V2" if it's not already
94120
if (environment.project.engine === "V1" && body.engine === "V2") {
@@ -188,10 +214,11 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService {
188214
throw serviceError;
189215
}
190216

191-
// Link the deployment with the background worker
192-
await this._prisma.workerDeployment.update({
217+
// Guarded BUILDING → DEPLOYING transition. `updateMany` for optimistic concurrency control
218+
const { count: updatedCount } = await this._prisma.workerDeployment.updateMany({
193219
where: {
194220
id: deployment.id,
221+
status: "BUILDING",
195222
},
196223
data: {
197224
status: "DEPLOYING",
@@ -203,6 +230,18 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService {
203230
},
204231
});
205232

233+
if (updatedCount === 0) {
234+
logger.warn(
235+
"createDeploymentBackgroundWorker: deployment no longer in BUILDING state, skipping DEPLOYING transition",
236+
{
237+
deploymentId,
238+
environmentId: environment.id,
239+
projectId: environment.projectId,
240+
}
241+
);
242+
return backgroundWorker;
243+
}
244+
206245
await TimeoutDeploymentService.enqueue(
207246
deployment.id,
208247
"DEPLOYING",
@@ -215,9 +254,14 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService {
215254
}
216255

217256
async #failBackgroundWorkerDeployment(deployment: WorkerDeployment, error: Error) {
218-
await this._prisma.workerDeployment.update({
257+
// Guarded BUILDING → FAILED transition, symmetric with the BUILDING → DEPLOYING
258+
// transition in `call()`. With idempotent retries, two attempts can run side-by-side;
259+
// without the predicate, one attempt's failure could downgrade the deployment after
260+
// the other already flipped it to DEPLOYING, leaving it stuck in FAILED with a worker.
261+
const { count: updatedCount } = await this._prisma.workerDeployment.updateMany({
219262
where: {
220263
id: deployment.id,
264+
status: "BUILDING",
221265
},
222266
data: {
223267
status: "FAILED",
@@ -229,7 +273,20 @@ export class CreateDeploymentBackgroundWorkerServiceV4 extends BaseService {
229273
},
230274
});
231275

232-
await TimeoutDeploymentService.dequeue(deployment.id, this._prisma);
276+
if (updatedCount === 0) {
277+
logger.warn(
278+
"failBackgroundWorkerDeployment: deployment moved out of BUILDING during call, skipping FAILED transition",
279+
{
280+
deploymentId: deployment.id,
281+
originalError: error.message,
282+
}
283+
);
284+
} else {
285+
// Only dequeue the timeout if we actually flipped to FAILED — otherwise a
286+
// sibling attempt may have just enqueued it as part of a successful
287+
// BUILDING → DEPLOYING transition.
288+
await TimeoutDeploymentService.dequeue(deployment.id, this._prisma);
289+
}
233290

234291
throw error;
235292
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import type { CreateBackgroundWorkerRequestBody } from "@trigger.dev/core/v3";
2+
import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic";
3+
import {
4+
isUniqueConstraintError,
5+
type BackgroundWorker,
6+
type PrismaClientOrTransaction,
7+
type WorkerDeployment,
8+
} from "@trigger.dev/database";
9+
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
10+
import { ServiceValidationError } from "../common.server";
11+
import { stripBackgroundWorkerMetadataForStorage } from "../stripBackgroundWorkerMetadataForStorage.server";
12+
13+
/**
14+
* Idempotent on `(project, environment, version)` for sequential calls, not concurrent calls.
15+
*
16+
* Failure shapes the caller distinguishes:
17+
* - `ServiceValidationError` (409): definitive — contentHash drift means a different build
18+
* is being pushed under the same deployment version, so the caller should fail the
19+
* deployment instead of waiting for a timeout.
20+
* - Plain `Error`: transient — two attempts raced the `create()` call and the loser caught
21+
* the unique-index violation. The caller should propagate this as 5xx so the CLI's
22+
* retry/backoff hits findFirst on the next attempt and returns the winner's row.
23+
*/
24+
export async function findOrCreateBackgroundWorker(
25+
environment: AuthenticatedEnvironment,
26+
deployment: WorkerDeployment,
27+
body: CreateBackgroundWorkerRequestBody,
28+
prisma: PrismaClientOrTransaction
29+
): Promise<BackgroundWorker> {
30+
const existing = await prisma.backgroundWorker.findFirst({
31+
where: {
32+
projectId: environment.projectId,
33+
runtimeEnvironmentId: environment.id,
34+
version: deployment.version,
35+
},
36+
});
37+
38+
if (existing && existing.contentHash === body.metadata.contentHash) {
39+
return existing;
40+
}
41+
42+
if (existing) {
43+
throw new ServiceValidationError(
44+
"A background worker for this deployment version already exists with a different content hash",
45+
409
46+
);
47+
}
48+
49+
try {
50+
return await prisma.backgroundWorker.create({
51+
data: {
52+
...BackgroundWorkerId.generate(),
53+
version: deployment.version,
54+
runtimeEnvironmentId: environment.id,
55+
projectId: environment.projectId,
56+
metadata: stripBackgroundWorkerMetadataForStorage(body.metadata),
57+
contentHash: body.metadata.contentHash,
58+
cliVersion: body.metadata.cliPackageVersion,
59+
sdkVersion: body.metadata.packageVersion,
60+
supportsLazyAttempts: body.supportsLazyAttempts,
61+
engine: body.engine,
62+
runtime: body.metadata.runtime,
63+
runtimeVersion: body.metadata.runtimeVersion,
64+
},
65+
});
66+
} catch (error) {
67+
// Concurrent attempts raced past `findFirst` and both reached `create`. Surface
68+
// a clear, non-Prisma error so the 5xx the caller returns isn't an opaque
69+
// P2002 — the CLI's retry will then hit `findFirst` and find the winner's row.
70+
// Intentionally NOT a ServiceValidationError so the caller doesn't fail-deploy
71+
// on a transient race.
72+
if (
73+
isUniqueConstraintError(error, ["projectId", "runtimeEnvironmentId", "version"])
74+
) {
75+
throw new Error(
76+
"Concurrent background worker registration detected for this deployment version; please retry"
77+
);
78+
}
79+
throw error;
80+
}
81+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { BackgroundWorkerMetadata } from "@trigger.dev/core/v3";
2+
import { Prisma } from "@trigger.dev/database";
3+
4+
/**
5+
* Strip BackgroundWorkerMetadata down to the slice that's actually read after
6+
* storage. Everything else is duplicated to dedicated columns/tables
7+
* (BackgroundWorker.{contentHash,cliVersion,sdkVersion,runtime,runtimeVersion},
8+
* BackgroundWorkerTask, BackgroundWorkerFile, TaskQueue, Prompt). Today the
9+
* only post-write reader is changeCurrentDeployment.server.ts, which feeds
10+
* tasks[].schedule into syncDeclarativeSchedules. packageVersion, contentHash,
11+
* and tasks[].filePath are kept solely to satisfy BackgroundWorkerMetadata's
12+
* required fields when the column is parsed back.
13+
*/
14+
export function stripBackgroundWorkerMetadataForStorage(
15+
metadata: BackgroundWorkerMetadata
16+
): Prisma.InputJsonValue {
17+
return {
18+
packageVersion: metadata.packageVersion,
19+
contentHash: metadata.contentHash,
20+
tasks: metadata.tasks
21+
.filter((t) => t.schedule)
22+
.map((t) => ({
23+
id: t.id,
24+
filePath: t.filePath,
25+
schedule: t.schedule,
26+
})),
27+
};
28+
}

0 commit comments

Comments
 (0)