Skip to content

Commit 002b845

Browse files
authored
feat(supervisor): verify warm-start delivery, cold-start silently lost dispatches (#3918)
### Problem Firestarter's `didWarmStart: true` means the response was written to a long-poll socket — not that the runner received it. A silently dead poller (no FIN, e.g. a VM torn down mid-poll) leaves the dispatched run stuck in `PENDING_EXECUTING` until the run engine's heartbeat redrive, and each redrive burns a queue redelivery toward `TASK_RUN_DEQUEUED_MAX_RETRIES`. ### Change After a warm-start hit, the supervisor retains the `DequeuedMessage` (TimerWheel, default 10s), then probes the existing `getLatestSnapshot` API. If the run is still on the exact dequeued snapshot, no runner ever acted — it falls through to the regular cold-create path. Recovery: ~10s + cold start, no new APIs, no CLI changes. - **Double-start safe**: `startRunAttempt` runs under a per-run lock and 409s stale snapshot ids, so a reviving runner and the fallback workload can't both execute; the loser exits before running anything. - **Probe errors → do nothing**: healthy runners legitimately act late during platform brownouts (nested attempt-start retries), so falling back on uncertainty would stampede duplicates. The heartbeat redrive stays as the backstop (also covers supervisor restarts dropping timers). - **Off by default**: `TRIGGER_WARM_START_VERIFY_ENABLED` (+ `TRIGGER_WARM_START_VERIFY_DELAY_MS`, 1–60s, default 10s). Disabled = complete no-op. Works for all workload managers (compute/k8s/docker) since it hooks the shared dequeue path. - Emits `warmstart.verify` wide events (`outcome: delivered | fallback | probe_error`), making the silent-loss rate directly measurable.
1 parent 19c0763 commit 002b845

5 files changed

Lines changed: 423 additions & 55 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: supervisor
3+
type: feature
4+
---
5+
6+
Verify warm-start dispatches were acted on and cold-start the run within seconds when a dispatch is silently lost (opt-in via TRIGGER_WARM_START_VERIFY_ENABLED).

apps/supervisor/src/env.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,16 @@ const Env = z
7979
TRIGGER_CHECKPOINT_URL: z.string().optional(),
8080
TRIGGER_METADATA_URL: z.string().optional(),
8181

82+
// Warm-start delivery verification: after a warm-start hit, probe the
83+
// platform and cold-start the run if no runner acted on the dispatch
84+
TRIGGER_WARM_START_VERIFY_ENABLED: BoolEnv.default(false),
85+
TRIGGER_WARM_START_VERIFY_DELAY_MS: z.coerce
86+
.number()
87+
.int()
88+
.min(1_000)
89+
.max(60_000)
90+
.default(10_000),
91+
8292
// Used by the resource monitor
8393
RESOURCE_MONITOR_ENABLED: BoolEnv.default(false),
8494
RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(),

apps/supervisor/src/index.ts

Lines changed: 102 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ import { PodCleaner } from "./services/podCleaner.js";
2727
import { FailedPodHandler } from "./services/failedPodHandler.js";
2828
import { getWorkerToken } from "./workerToken.js";
2929
import { OtlpTraceService } from "./services/otlpTraceService.js";
30+
import {
31+
WarmStartVerificationService,
32+
type WarmStartTimings,
33+
} from "./services/warmStartVerificationService.js";
3034
import { extractTraceparent, getRestoreRunnerId } from "./util.js";
3135
import { Redis } from "ioredis";
3236
import { BackpressureMonitor } from "./backpressure/backpressureMonitor.js";
@@ -63,6 +67,7 @@ class ManagedSupervisor {
6367
private readonly logger = new SimpleStructuredLogger("managed-supervisor");
6468
private readonly resourceMonitor: ResourceMonitor;
6569
private readonly checkpointClient?: CheckpointClient;
70+
private readonly warmStartVerifier?: WarmStartVerificationService;
6671

6772
private readonly podCleaner?: PodCleaner;
6873
private readonly failedPodHandler?: FailedPodHandler;
@@ -311,6 +316,19 @@ class ManagedSupervisor {
311316
});
312317
}
313318

319+
if (env.TRIGGER_WARM_START_VERIFY_ENABLED && this.warmStartUrl) {
320+
this.logger.log("Warm-start delivery verification enabled", {
321+
delayMs: env.TRIGGER_WARM_START_VERIFY_DELAY_MS,
322+
});
323+
324+
this.warmStartVerifier = new WarmStartVerificationService({
325+
workerClient: this.workerSession.httpClient,
326+
delayMs: env.TRIGGER_WARM_START_VERIFY_DELAY_MS,
327+
createWorkload: (message, timings) => this.createWorkload(message, timings),
328+
wideEventOpts: this.wideEventOpts,
329+
});
330+
}
331+
314332
this.workerSession.on("runNotification", async ({ time, run }) => {
315333
this.logger.verbose("runNotification", { time, run });
316334

@@ -467,66 +485,24 @@ class ManagedSupervisor {
467485
if (didWarmStart) {
468486
setExtra(fromContext(), "path_taken", "warm_start");
469487
this.logger.debug("Warm start successful", { runId: message.run.id });
470-
return;
471-
}
472-
473-
setExtra(fromContext(), "path_taken", "cold_create");
474-
475-
const createStart = performance.now();
476-
try {
477-
if (!message.deployment.friendlyId) {
478-
// mostly a type guard, deployments always exists for deployed environments
479-
// a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments.
480-
throw new Error("Deployment is missing");
481-
}
482-
483-
await this.workloadManager.create({
484-
dequeuedAt: message.dequeuedAt,
488+
// A hit only means the response was written to the long-poll
489+
// socket, not that the runner received it. Schedule a delivery
490+
// verification that cold-starts the run if nobody acts on it.
491+
this.warmStartVerifier?.schedule(message, {
485492
dequeueResponseMs,
486493
pollingIntervalMs,
487494
warmStartCheckMs,
488-
envId: message.environment.id,
489-
envType: message.environment.type,
490-
image: message.image,
491-
machine: message.run.machine,
492-
orgId: message.organization.id,
493-
projectId: message.project.id,
494-
deploymentFriendlyId: message.deployment.friendlyId,
495-
deploymentVersion: message.backgroundWorker.version,
496-
runId: message.run.id,
497-
runFriendlyId: message.run.friendlyId,
498-
version: message.version,
499-
nextAttemptNumber: message.run.attemptNumber,
500-
snapshotId: message.snapshot.id,
501-
snapshotFriendlyId: message.snapshot.friendlyId,
502-
placementTags: message.placementTags,
503-
traceContext: message.run.traceContext,
504-
annotations: message.run.annotations,
505-
hasPrivateLink: message.organization.hasPrivateLink,
506495
});
507-
recordPhaseSince("workload_create", createStart, undefined);
508-
workloadCreateDuration.observe(
509-
{ backend: this.workloadManagerBackend, outcome: "success" },
510-
(performance.now() - createStart) / 1000
511-
);
512-
513-
// Disabled for now
514-
// this.resourceMonitor.blockResources({
515-
// cpu: message.run.machine.cpu,
516-
// memory: message.run.machine.memory,
517-
// });
518-
} catch (error) {
519-
recordPhaseSince(
520-
"workload_create",
521-
createStart,
522-
error instanceof Error ? error : new Error(String(error))
523-
);
524-
workloadCreateDuration.observe(
525-
{ backend: this.workloadManagerBackend, outcome: "error" },
526-
(performance.now() - createStart) / 1000
527-
);
528-
this.logger.error("Failed to create workload", { error });
496+
return;
529497
}
498+
499+
setExtra(fromContext(), "path_taken", "cold_create");
500+
501+
await this.createWorkload(message, {
502+
dequeueResponseMs,
503+
pollingIntervalMs,
504+
warmStartCheckMs,
505+
});
530506
}
531507
);
532508
}
@@ -561,6 +537,8 @@ class ManagedSupervisor {
561537

562538
async onRunConnected({ run }: { run: { friendlyId: string } }) {
563539
this.logger.debug("Run connected", { run });
540+
// The dispatched run reached a runner on this node - no fallback needed.
541+
this.warmStartVerifier?.cancel(run.friendlyId);
564542
this.workerSession.subscribeToRunNotifications([run.friendlyId]);
565543
}
566544

@@ -569,6 +547,72 @@ class ManagedSupervisor {
569547
this.workerSession.unsubscribeFromRunNotifications([run.friendlyId]);
570548
}
571549

550+
private async createWorkload(message: DequeuedMessage, timings: WarmStartTimings) {
551+
const createStart = performance.now();
552+
try {
553+
if (!message.deployment.friendlyId) {
554+
// mostly a type guard, deployments always exists for deployed environments
555+
// a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments.
556+
throw new Error("Deployment is missing");
557+
}
558+
559+
if (!message.image) {
560+
// same type-guard situation as deployment above
561+
throw new Error("Image is missing");
562+
}
563+
564+
await this.workloadManager.create({
565+
dequeuedAt: message.dequeuedAt,
566+
dequeueResponseMs: timings.dequeueResponseMs,
567+
pollingIntervalMs: timings.pollingIntervalMs,
568+
warmStartCheckMs: timings.warmStartCheckMs,
569+
envId: message.environment.id,
570+
envType: message.environment.type,
571+
image: message.image,
572+
machine: message.run.machine,
573+
orgId: message.organization.id,
574+
projectId: message.project.id,
575+
deploymentFriendlyId: message.deployment.friendlyId,
576+
deploymentVersion: message.backgroundWorker.version,
577+
runId: message.run.id,
578+
runFriendlyId: message.run.friendlyId,
579+
version: message.version,
580+
nextAttemptNumber: message.run.attemptNumber,
581+
snapshotId: message.snapshot.id,
582+
snapshotFriendlyId: message.snapshot.friendlyId,
583+
placementTags: message.placementTags,
584+
traceContext: message.run.traceContext,
585+
annotations: message.run.annotations,
586+
hasPrivateLink: message.organization.hasPrivateLink,
587+
});
588+
recordPhaseSince("workload_create", createStart, undefined);
589+
workloadCreateDuration.observe(
590+
{ backend: this.workloadManagerBackend, outcome: "success" },
591+
(performance.now() - createStart) / 1000
592+
);
593+
594+
// Disabled for now
595+
// this.resourceMonitor.blockResources({
596+
// cpu: message.run.machine.cpu,
597+
// memory: message.run.machine.memory,
598+
// });
599+
} catch (error) {
600+
recordPhaseSince(
601+
"workload_create",
602+
createStart,
603+
error instanceof Error ? error : new Error(String(error))
604+
);
605+
workloadCreateDuration.observe(
606+
{ backend: this.workloadManagerBackend, outcome: "error" },
607+
(performance.now() - createStart) / 1000
608+
);
609+
this.logger.error("Failed to create workload", {
610+
runId: message.run.friendlyId,
611+
error,
612+
});
613+
}
614+
}
615+
572616
private async tryWarmStart(
573617
dequeuedMessage: DequeuedMessage,
574618
traceparent: string | undefined
@@ -650,6 +694,9 @@ class ManagedSupervisor {
650694

651695
async stop() {
652696
this.logger.log("Shutting down");
697+
// Stop the verifier first: its timer can otherwise fire mid-shutdown and
698+
// cold-create a workload on a node that is going down.
699+
this.warmStartVerifier?.stop();
653700
await this.workloadServer.stop();
654701
await this.workerSession.stop();
655702

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
import { setTimeout as sleep } from "node:timers/promises";
3+
import { WarmStartVerificationService } from "./warmStartVerificationService.js";
4+
import type { DequeuedMessage } from "@trigger.dev/core/v3";
5+
import type { SupervisorHttpClient } from "@trigger.dev/core/v3/workers";
6+
7+
// The TimerWheel ticks every 100ms, so a 1000ms delay (the env minimum)
8+
// fires within ~1.1s.
9+
const DELAY_MS = 1_000;
10+
// Long enough that a pending verification would certainly have fired.
11+
const SETTLE_MS = 1_600;
12+
13+
const DEQUEUED_SNAPSHOT_ID = "snapshot_dequeued";
14+
15+
function makeMessage(runFriendlyId = "run_1"): DequeuedMessage {
16+
return {
17+
run: { friendlyId: runFriendlyId },
18+
snapshot: { friendlyId: DEQUEUED_SNAPSHOT_ID },
19+
} as unknown as DequeuedMessage;
20+
}
21+
22+
function createService(opts: {
23+
latestSnapshotId?: string;
24+
probeError?: boolean;
25+
}) {
26+
const getLatestSnapshot = vi.fn(async (_runId: string) =>
27+
opts.probeError
28+
? { success: false as const, error: "connection refused" }
29+
: {
30+
success: true as const,
31+
data: { execution: { snapshot: { friendlyId: opts.latestSnapshotId } } },
32+
}
33+
);
34+
35+
const createWorkload = vi.fn(async () => {});
36+
37+
const service = new WarmStartVerificationService({
38+
workerClient: { getLatestSnapshot } as unknown as SupervisorHttpClient,
39+
delayMs: DELAY_MS,
40+
createWorkload,
41+
wideEventOpts: { service: "supervisor-test", env: {}, enabled: false },
42+
});
43+
44+
return { service, getLatestSnapshot, createWorkload };
45+
}
46+
47+
describe("WarmStartVerificationService", () => {
48+
it("falls back to a cold create when the snapshot is unchanged", async () => {
49+
const { service, createWorkload } = createService({
50+
latestSnapshotId: DEQUEUED_SNAPSHOT_ID,
51+
});
52+
try {
53+
const message = makeMessage();
54+
const timings = { warmStartCheckMs: 12 };
55+
service.schedule(message, timings);
56+
57+
await vi.waitFor(() => expect(createWorkload).toHaveBeenCalledTimes(1), {
58+
timeout: 3_000,
59+
});
60+
expect(createWorkload).toHaveBeenCalledWith(message, timings);
61+
} finally {
62+
service.stop();
63+
}
64+
});
65+
66+
it("does nothing when the snapshot has moved on (delivered)", async () => {
67+
const { service, getLatestSnapshot, createWorkload } = createService({
68+
latestSnapshotId: "snapshot_executing",
69+
});
70+
try {
71+
service.schedule(makeMessage(), { warmStartCheckMs: 12 });
72+
73+
await vi.waitFor(() => expect(getLatestSnapshot).toHaveBeenCalledTimes(1), {
74+
timeout: 3_000,
75+
});
76+
await sleep(100);
77+
expect(createWorkload).not.toHaveBeenCalled();
78+
} finally {
79+
service.stop();
80+
}
81+
});
82+
83+
it("never falls back when the probe errors", async () => {
84+
const { service, getLatestSnapshot, createWorkload } = createService({ probeError: true });
85+
try {
86+
service.schedule(makeMessage(), { warmStartCheckMs: 12 });
87+
88+
await vi.waitFor(() => expect(getLatestSnapshot).toHaveBeenCalledTimes(1), {
89+
timeout: 3_000,
90+
});
91+
await sleep(100);
92+
expect(createWorkload).not.toHaveBeenCalled();
93+
} finally {
94+
service.stop();
95+
}
96+
});
97+
98+
it("cancel before the delay prevents the probe entirely", async () => {
99+
const { service, getLatestSnapshot, createWorkload } = createService({
100+
latestSnapshotId: DEQUEUED_SNAPSHOT_ID,
101+
});
102+
try {
103+
service.schedule(makeMessage(), { warmStartCheckMs: 12 });
104+
105+
expect(service.cancel("run_1")).toBe(true);
106+
107+
await sleep(SETTLE_MS);
108+
expect(getLatestSnapshot).not.toHaveBeenCalled();
109+
expect(createWorkload).not.toHaveBeenCalled();
110+
} finally {
111+
service.stop();
112+
}
113+
});
114+
115+
it("re-scheduling the same run replaces the pending verification", async () => {
116+
const { service, getLatestSnapshot } = createService({
117+
latestSnapshotId: "snapshot_executing",
118+
});
119+
try {
120+
service.schedule(makeMessage(), { warmStartCheckMs: 1 });
121+
service.schedule(makeMessage(), { warmStartCheckMs: 2 });
122+
123+
await vi.waitFor(() => expect(getLatestSnapshot).toHaveBeenCalledTimes(1), {
124+
timeout: 3_000,
125+
});
126+
await sleep(SETTLE_MS);
127+
expect(getLatestSnapshot).toHaveBeenCalledTimes(1);
128+
} finally {
129+
service.stop();
130+
}
131+
});
132+
});

0 commit comments

Comments
 (0)