Skip to content

Commit 4745754

Browse files
d-csclaude
andauthored
feat(webapp,run-engine): mollifier drainer replay + stale sweep + cancelled-run engine API (#3754)
## Summary The replay side of the mollifier: - `DrainerHandler`: reads buffered snapshots and replays them through `engine.trigger` to materialise PG rows. - `RunEngine.createCancelledRun`: new public method the handler uses to write CANCELED rows directly from snapshots (bypass queue + waitpoint, emit `runCancelled`). Tolerates the cjson empty-table tags edge case found during validation. - Drainer fairness: org → env rotation so a heavy env doesn't starve light ones in the same org. - Stale-entry sweep + telemetry + alertable gauge so a stuck/offline drainer surfaces in alerts. Both the drainer and sweep default-off; nothing fires unless flagged on (`TRIGGER_MOLLIFIER_DRAINER_ENABLED`, `TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED`). Stacked on the trigger-time decisions PR. ## Test plan - [x] \`pnpm run typecheck --filter webapp\` passes - [x] \`pnpm run test --filter webapp test/mollifierDrainerHandler.test.ts\` passes - [x] \`pnpm run test --filter webapp test/mollifierStaleSweep.test.ts\` passes - [x] \`pnpm run test --filter @internal/run-engine src/engine/tests/createCancelledRun.test.ts\` passes - [x] \`pnpm run test --filter @trigger.dev/redis-worker packages/redis-worker/src/mollifier/drainer.test.ts\` passes --- ## Ship-gate follow-up fix **Drainer writes SYSTEM_FAILURE on max-attempts exhaustion.** Adds an `onTerminalFailure` callback on `MollifierDrainerOptions` so the customer's run lands a SYSTEM_FAILURE PG row even when the drainer exhausts `MAX_ATTEMPTS` on a retryable PG error (previously `buffer.fail()` was called with no row written → silent data loss). The callback runs before `buffer.fail()` on every terminal path (non-retryable AND max-attempts-exhausted), and re-throwing a retryable error from the callback causes the drainer to requeue rather than fail. Bumps `@trigger.dev/redis-worker` to a **minor** changeset (additive option + new exported types). Includes 5 unit tests covering both terminal causes plus the requeue-on-retryable-callback-failure path and no-callback back-compat. --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 20a676d commit 4745754

21 files changed

Lines changed: 3670 additions & 52 deletions
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": minor
3+
---
4+
5+
Add `onTerminalFailure` callback to `MollifierDrainerOptions` so the customer's run lands a SYSTEM_FAILURE PG row even when the drainer exhausts `maxAttempts` on a retryable PG error. Previously, retryable-error exhaustion called `buffer.fail()` directly, which atomically marks FAILED + DELs the entry hash with no PG write — silent data loss when PG was unreachable across the full retry budget. The callback fires before `buffer.fail()` on any terminal path (`cause: "non-retryable"` or `"max-attempts-exhausted"`); throwing a retryable error from the callback causes the drainer to requeue rather than fail.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Mollifier drainer replay: replay buffered entries into `engine.trigger`, stale-entry sweep, a drainer-health gauge, and run-engine cancelled/failed run APIs. Known limitation: stale-sweep runs per-webapp instance, so stale-entry counter metrics multiply by N webapps in HA until a distributed lease lands as follow-up.

apps/webapp/app/entry.server.tsx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { renderToPipeableStream } from "react-dom/server";
99
import { PassThrough } from "stream";
1010
import * as Worker from "~/services/worker.server";
1111
import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server";
12+
import { initMollifierStaleSweepWorker } from "~/v3/mollifierStaleSweepWorker.server";
1213
import { bootstrap } from "./bootstrap";
1314
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
1415
import {
@@ -228,6 +229,7 @@ Worker.init().catch((error) => {
228229
});
229230

230231
initMollifierDrainerWorker();
232+
initMollifierStaleSweepWorker();
231233

232234
bootstrap().catch((error) => {
233235
logError(error);

apps/webapp/app/env.server.ts

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,13 +1063,16 @@ const EnvironmentSchema = z
10631063
// Separate switch for the drainer (consumer side) so it can be split
10641064
// off onto a dedicated worker service. Unset → inherits
10651065
// TRIGGER_MOLLIFIER_ENABLED, so single-container self-hosters don't have to
1066-
// flip two switches. In multi-replica deployments, set this to "0"
1067-
// explicitly on every replica except the one dedicated drainer
1068-
// service — otherwise every replica's polling loop races for the
1069-
// same buffer entries. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill
1070-
// switch; setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
1071-
// no-op because the gate-side singleton refuses to construct a
1072-
// buffer when the system is off.
1066+
// flip two switches. Multi-replica drainers are correct — `popAndMarkDraining`
1067+
// is an atomic ZPOPMIN + status flip in one Lua call, so only one replica
1068+
// can win any given entry — but inefficient: polling load (SMEMBERS +
1069+
// per-env scans) multiplies by N, and `TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY`
1070+
// is per-process so engine load also multiplies. Splitting the drainer
1071+
// onto a dedicated worker keeps that traffic off the request-serving
1072+
// replicas. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill switch;
1073+
// setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
1074+
// no-op because the gate-side singleton refuses to construct a buffer
1075+
// when the system is off.
10731076
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
10741077
TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"),
10751078
TRIGGER_MOLLIFIER_REDIS_HOST: z
@@ -1098,6 +1101,32 @@ const EnvironmentSchema = z
10981101
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
10991102
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
11001103
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),
1104+
// Periodic sweep that scans buffer queue LISTs for entries whose
1105+
// dwell exceeds the stale threshold. Independent of the drainer —
1106+
// its job is exactly to make a stuck/offline drainer visible to
1107+
// ops. Defaults: explicitly opt-in (a separate kill switch from
1108+
// the mollifier itself), run every 5 minutes, alert on anything
1109+
// that's been dwelling for 5+ minutes (matches the sweep interval
1110+
// — "anything still here when we check" is the simplest threshold
1111+
// that converges).
1112+
//
1113+
// The sweep was previously defaulting to inherit
1114+
// `TRIGGER_MOLLIFIER_ENABLED`, which meant any deployment already
1115+
// running with the mollifier on would auto-start the sweep worker
1116+
// on upgrade — turning on new background load with no explicit
1117+
// rollout step. Hard-defaulting to "0" preserves the intent of
1118+
// exposing the sweep as a separate switch.
1119+
TRIGGER_MOLLIFIER_STALE_SWEEP_ENABLED: z.string().default("0"),
1120+
TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS: z.coerce
1121+
.number()
1122+
.int()
1123+
.positive()
1124+
.default(5 * 60_000),
1125+
TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS: z.coerce
1126+
.number()
1127+
.int()
1128+
.positive()
1129+
.default(5 * 60_000),
11011130

11021131
BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce
11031132
.number()

apps/webapp/app/runEngine/services/triggerFailedTask.server.ts

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type { PrismaClientOrTransaction } from "@trigger.dev/database";
66
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
77
import { logger } from "~/services/logger.server";
88
import { getEventRepository } from "~/v3/eventRepository/index.server";
9+
import { PerformTaskRunAlertsService } from "~/v3/services/alerts/performTaskRunAlerts.server";
910
import { DefaultQueueManager } from "../concerns/queues.server";
1011
import type { TriggerTaskRequest } from "../types";
1112

@@ -176,6 +177,14 @@ export class TriggerFailedTaskService {
176177
event.setAttribute("runId", failedRunFriendlyId);
177178
event.failWithError(taskRunError);
178179

180+
// `emitRunFailedEvent: false` because this call site owns the
181+
// trace-event lifecycle via the outer `traceEvent({
182+
// incomplete: false, isError: true })`. Letting the engine
183+
// emit `runFailed` here would race the
184+
// `completeFailedRunEvent` listener against the outer trace
185+
// event's own completion write for the same (traceId, spanId).
186+
// We re-trigger the alerts side directly after the trace
187+
// event closes, below.
179188
return await this.engine.createFailedTaskRun({
180189
friendlyId: failedRunFriendlyId,
181190
environment: {
@@ -200,12 +209,30 @@ export class TriggerFailedTaskService {
200209
spanId: event.spanId,
201210
traceContext: traceContext as Record<string, unknown>,
202211
taskEventStore: store,
212+
emitRunFailedEvent: false,
203213
...(queueName !== undefined && { queue: queueName }),
204214
...(lockedQueueId !== undefined && { lockedQueueId }),
205215
});
206216
}
207217
);
208218

219+
// Alerts side of `runFailed` — the engine emit was suppressed
220+
// above so the trace-event completion isn't double-written; we
221+
// still need the alert pipeline to fire so customers' ERROR
222+
// channels see the failure. Best-effort: a failed enqueue logs
223+
// but doesn't block returning the friendlyId, mirroring the
224+
// engine handler's behaviour at runEngineHandlers.server.ts:81.
225+
try {
226+
await PerformTaskRunAlertsService.enqueue(failedRun.id);
227+
} catch (alertsError) {
228+
logger.warn("TriggerFailedTaskService: alert enqueue failed", {
229+
taskId: request.taskId,
230+
friendlyId: failedRun.friendlyId,
231+
error:
232+
alertsError instanceof Error ? alertsError.message : String(alertsError),
233+
});
234+
}
235+
209236
return failedRun.friendlyId;
210237
} catch (createError) {
211238
const createErrorMsg =
@@ -264,7 +291,7 @@ export class TriggerFailedTaskService {
264291
}
265292
}
266293

267-
await this.engine.createFailedTaskRun({
294+
const failedRun = await this.engine.createFailedTaskRun({
268295
friendlyId: failedRunFriendlyId,
269296
environment: {
270297
id: opts.environmentId,
@@ -286,8 +313,32 @@ export class TriggerFailedTaskService {
286313
depth,
287314
resumeParentOnCompletion: opts.resumeParentOnCompletion,
288315
batch: opts.batch,
316+
// Suppress the engine's `runFailed` bus emit — the listener
317+
// (`runEngineHandlers.server.ts` `runFailed`) calls
318+
// `completeFailedRunEvent`, which writes a ClickHouse trace event
319+
// row keyed on (traceId, spanId). This caller has no trace
320+
// context (the method name is literally `callWithoutTraceEvents`)
321+
// so the emit would write a row with empty traceId/spanId —
322+
// orphan event in the store. We still want alert coverage,
323+
// though, so enqueue directly below.
324+
emitRunFailedEvent: false,
289325
});
290326

327+
// Alerts side of `runFailed` — the engine emit was suppressed
328+
// above so we don't create an orphan trace event; enqueue the
329+
// alert directly so customers' ERROR channels still see the
330+
// failure. Best-effort, mirroring the `call()` path.
331+
try {
332+
await PerformTaskRunAlertsService.enqueue(failedRun.id);
333+
} catch (alertsError) {
334+
logger.warn("TriggerFailedTaskService.callWithoutTraceEvents: alert enqueue failed", {
335+
taskId: opts.taskId,
336+
friendlyId: failedRun.friendlyId,
337+
error:
338+
alertsError instanceof Error ? alertsError.message : String(alertsError),
339+
});
340+
}
341+
291342
return failedRunFriendlyId;
292343
} catch (createError) {
293344
logger.error("TriggerFailedTaskService: failed to create pre-failed TaskRun (no trace)", {

apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1-
import { createHash } from "node:crypto";
2-
import { MollifierDrainer, serialiseSnapshot } from "@trigger.dev/redis-worker";
1+
import { MollifierDrainer } from "@trigger.dev/redis-worker";
2+
import { prisma } from "~/db.server";
33
import { env } from "~/env.server";
4+
import { engine as runEngine } from "~/v3/runEngine.server";
45
import { logger } from "~/services/logger.server";
56
import { singleton } from "~/utils/singleton";
67
import { getMollifierBuffer } from "./mollifierBuffer.server";
7-
import type { BufferedTriggerPayload } from "./bufferedTriggerPayload.server";
8+
import {
9+
createDrainerHandler,
10+
createDrainerTerminalFailureHandler,
11+
isRetryablePgError,
12+
} from "./mollifierDrainerHandler.server";
13+
import type { MollifierSnapshot } from "./mollifierSnapshot.server";
814

915
// Distinct error class for the deterministic "fail loud at boot" throws
1016
// below. The bootstrap in `mollifierDrainerWorker.server.ts` catches
@@ -25,7 +31,7 @@ export class MollifierConfigurationError extends Error {
2531
}
2632
}
2733

28-
function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload> {
34+
function initializeMollifierDrainer(): MollifierDrainer<MollifierSnapshot> {
2935
const buffer = getMollifierBuffer();
3036
if (!buffer) {
3137
// Unreachable in normal config: getMollifierDrainer() gates on the
@@ -68,40 +74,14 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
6874
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
6975
});
7076

71-
// No-op ack handler: the trigger has ALREADY been written to Postgres
72-
// via engine.trigger (dual-write at the call site). Popping + acking
73-
// here proves the dequeue mechanism works end-to-end without duplicating
74-
// the work. A later change replaces this with an engine.trigger replay
75-
// that performs the actual Postgres write.
76-
const drainer = new MollifierDrainer<BufferedTriggerPayload>({
77+
const drainer = new MollifierDrainer<MollifierSnapshot>({
7778
buffer,
78-
handler: async (input) => {
79-
// Hash the (re-serialised, canonical) payload on the drain side rather
80-
// than on the trigger hot path. Burst-time CPU stays with engine.trigger;
81-
// the drainer is the natural place for the audit-equivalence checksum.
82-
// Re-serialisation is identity for the BufferedTriggerPayload shape
83-
// (only strings/numbers/plain objects), so this hash matches what the
84-
// call site wrote into Redis.
85-
const reserialised = serialiseSnapshot(input.payload);
86-
const payloadHash = createHash("sha256").update(reserialised).digest("hex");
87-
logger.info("mollifier.drained", {
88-
runId: input.runId,
89-
envId: input.envId,
90-
orgId: input.orgId,
91-
taskId: input.payload.taskId,
92-
attempts: input.attempts,
93-
ageMs: Date.now() - input.createdAt.getTime(),
94-
payloadBytes: reserialised.length,
95-
payloadHash,
96-
});
97-
},
79+
handler: createDrainerHandler({ engine: runEngine, prisma }),
80+
onTerminalFailure: createDrainerTerminalFailureHandler({ engine: runEngine, prisma }),
9881
concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY,
9982
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
10083
maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK,
101-
// A no-op handler shouldn't throw, but if something does (e.g. an
102-
// unexpected deserialise failure), don't loop — let it FAIL terminally
103-
// so the entry is observable in metrics.
104-
isRetryable: () => false,
84+
isRetryable: isRetryablePgError,
10585
});
10686

10787
return drainer;
@@ -114,7 +94,7 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
11494
// handler registration, leaving a narrow window where a SIGTERM landing
11595
// between `start()` and `process.once("SIGTERM", ...)` would skip the
11696
// graceful stop. The split is intentional.
117-
export function getMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload> | null {
97+
export function getMollifierDrainer(): MollifierDrainer<MollifierSnapshot> | null {
11898
if (env.TRIGGER_MOLLIFIER_ENABLED !== "1") return null;
11999
return singleton("mollifierDrainer", initializeMollifierDrainer);
120100
}

0 commit comments

Comments
 (0)