From 846c5415ce67d9704964aec8ec9aa82ba4d675ab Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 16:57:25 +0100 Subject: [PATCH 01/22] feat(webapp): add compute migration feature flags --- apps/webapp/app/v3/featureFlags.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/webapp/app/v3/featureFlags.ts b/apps/webapp/app/v3/featureFlags.ts index 3066f2dda0..be6b962166 100644 --- a/apps/webapp/app/v3/featureFlags.ts +++ b/apps/webapp/app/v3/featureFlags.ts @@ -11,6 +11,9 @@ export const FEATURE_FLAG = { mollifierEnabled: "mollifierEnabled", workerQueueScheduledSplitEnabled: "workerQueueScheduledSplitEnabled", realtimeBackend: "realtimeBackend", + computeMigrationEnabled: "computeMigrationEnabled", + computeMigrationFreePercentage: "computeMigrationFreePercentage", + computeMigrationPaidPercentage: "computeMigrationPaidPercentage", } as const; export const FeatureFlagCatalog = { @@ -27,6 +30,9 @@ export const FeatureFlagCatalog = { // globally and per-org (org wins). Defaults to "electric" when unset. // "shadow" serves Electric but diffs the native path in the background. [FEATURE_FLAG.realtimeBackend]: z.enum(["electric", "native", "shadow"]), + [FEATURE_FLAG.computeMigrationEnabled]: z.coerce.boolean(), + [FEATURE_FLAG.computeMigrationFreePercentage]: z.coerce.number().int().min(0).max(100), + [FEATURE_FLAG.computeMigrationPaidPercentage]: z.coerce.number().int().min(0).max(100), }; export type FeatureFlagKey = keyof typeof FeatureFlagCatalog; From 8b12326617c32343b6947af84b32d24abf87185b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 16:59:02 +0100 Subject: [PATCH 02/22] feat(webapp): add deterministic org hashBucket for rollout --- apps/webapp/app/utils/computeBucket.test.ts | 28 +++++++++++++++++++++ apps/webapp/app/utils/computeBucket.ts | 15 +++++++++++ 2 files changed, 43 insertions(+) create mode 100644 apps/webapp/app/utils/computeBucket.test.ts create mode 100644 apps/webapp/app/utils/computeBucket.ts diff --git a/apps/webapp/app/utils/computeBucket.test.ts b/apps/webapp/app/utils/computeBucket.test.ts new file mode 100644 index 0000000000..e33578a594 --- /dev/null +++ b/apps/webapp/app/utils/computeBucket.test.ts @@ -0,0 +1,28 @@ +import { describe, it, expect } from "vitest"; +import { hashBucket } from "./computeBucket"; + +describe("hashBucket", () => { + it("returns a stable value in [0, 100) for the same id", () => { + const a = hashBucket("org_abc"); + const b = hashBucket("org_abc"); + expect(a).toBe(b); + expect(a).toBeGreaterThanOrEqual(0); + expect(a).toBeLessThan(100); + }); + + it("is nested: the set enrolled at 1% is a subset of the set at 5%", () => { + const ids = Array.from({ length: 5000 }, (_, i) => `org_${i}`); + const at1 = new Set(ids.filter((id) => hashBucket(id) < 1)); + const at5 = ids.filter((id) => hashBucket(id) < 5); + for (const id of at1) { + expect(at5).toContain(id); + } + }); + + it("distributes roughly uniformly", () => { + const ids = Array.from({ length: 10000 }, (_, i) => `org_${i}`); + const under10 = ids.filter((id) => hashBucket(id) < 10).length; + expect(under10).toBeGreaterThan(700); + expect(under10).toBeLessThan(1300); + }); +}); diff --git a/apps/webapp/app/utils/computeBucket.ts b/apps/webapp/app/utils/computeBucket.ts new file mode 100644 index 0000000000..7a948c57db --- /dev/null +++ b/apps/webapp/app/utils/computeBucket.ts @@ -0,0 +1,15 @@ +/** + * Deterministic 0-99 bucket for an org id, stable across processes and deploys. + * FNV-1a (non-crypto): we only need determinism + uniform spread, not collision + * resistance. Used for nested percentage rollout: `hashBucket(orgId) < percentage`. + * Ramping the percentage down keeps a strict subset (the low buckets), so an org + * never flaps in and out as the dial moves. + */ +export function hashBucket(orgId: string): number { + let hash = 0x811c9dc5; // FNV offset basis + for (let i = 0; i < orgId.length; i++) { + hash ^= orgId.charCodeAt(i); + hash = Math.imul(hash, 0x01000193) >>> 0; + } + return hash % 100; +} From eeca099aabf492ca245675b513ca7ef6611c1ba7 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:00:28 +0100 Subject: [PATCH 03/22] feat(webapp): add compute migration env config --- apps/webapp/app/env.server.ts | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index c12b29a608..1724512637 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -158,6 +158,16 @@ const EnvironmentSchema = z WORKER_SCHEMA: z.string().default("graphile_worker"), WORKER_CONCURRENCY: z.coerce.number().int().default(10), WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), + // JSON map of container-region masterQueue -> compute-backing masterQueue. + // Absence of an entry means that region is never migrated (e.g. EU until it + // has a compute backing). Example: {"us-east-1":"us-east-1-next"} + COMPUTE_BACKING_MAP: z.string().default("{}"), + // How often each replica reloads the global flags snapshot from the DB. + // Sets kill/ramp propagation latency. + GLOBAL_FLAGS_RELOAD_INTERVAL_MS: z.coerce.number().int().default(5000), + // Max time the first trigger blocks waiting for the initial flags load + // before falling back to defaults (off = container, the safe direction). + GLOBAL_FLAGS_READY_TIMEOUT_MS: z.coerce.number().int().default(5000), WORKER_ENABLED: z.string().default("true"), GRACEFUL_SHUTDOWN_TIMEOUT: z.coerce.number().int().default(60000), DISABLE_SSE: z.string().optional(), From 7e4149296443f3c7e1ce80ec50369c44181c2701 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:04:12 +0100 Subject: [PATCH 04/22] feat(webapp): add compute migration resolver --- .../concerns/computeMigration.server.ts | 90 ++++++++++++++ apps/webapp/test/computeMigration.test.ts | 110 ++++++++++++++++++ 2 files changed, 200 insertions(+) create mode 100644 apps/webapp/app/runEngine/concerns/computeMigration.server.ts create mode 100644 apps/webapp/test/computeMigration.test.ts diff --git a/apps/webapp/app/runEngine/concerns/computeMigration.server.ts b/apps/webapp/app/runEngine/concerns/computeMigration.server.ts new file mode 100644 index 0000000000..e3a0cfb286 --- /dev/null +++ b/apps/webapp/app/runEngine/concerns/computeMigration.server.ts @@ -0,0 +1,90 @@ +import { hashBucket } from "~/utils/computeBucket"; + +/** Subset of the global flags snapshot this resolver reads. */ +export type ComputeMigrationFlags = { + computeMigrationEnabled?: boolean; + computeMigrationFreePercentage?: number; + computeMigrationPaidPercentage?: number; +}; + +export type ComputeBackingMap = Record; + +/** + * Parse COMPUTE_BACKING_MAP (container-region masterQueue -> compute-backing + * masterQueue). Never throws: bad JSON or non-string values yield {} so a + * misconfigured env disables migration rather than breaking triggers. + */ +export function parseComputeBackingMap(raw: string): ComputeBackingMap { + try { + const parsed = JSON.parse(raw); + if (typeof parsed !== "object" || parsed === null) return {}; + const out: ComputeBackingMap = {}; + for (const [k, v] of Object.entries(parsed)) { + if (typeof v === "string") out[k] = v; + } + return out; + } catch { + return {}; + } +} + +type MigrationDecisionInput = { + planType: string | undefined; + orgId: string; + orgFeatureFlags: Record | null | undefined; + flags: ComputeMigrationFlags | undefined; +}; + +/** + * Whether this org should run on the compute backing. Shared by the trigger-time + * transform and the deploy-time template decision so a migrated org always gets a + * compute template. Precedence: per-org override (both directions) wins; otherwise + * global enable + the plan's percentage bucket. Enterprise and unknown plans are + * never enrolled by percentage (override only). The sole opt-out is the per-org + * `computeMigrationEnabled: false`. + */ +export function isOrgMigrated({ + planType, + orgId, + orgFeatureFlags, + flags, +}: MigrationDecisionInput): boolean { + const override = orgFeatureFlags?.["computeMigrationEnabled"]; + if (override === false) return false; + if (override === true) return true; + + if (!(flags?.computeMigrationEnabled ?? false)) return false; + + const pct = + planType === "free" + ? flags?.computeMigrationFreePercentage ?? 0 + : planType === "paid" + ? flags?.computeMigrationPaidPercentage ?? 0 + : 0; // enterprise / undefined + + return hashBucket(orgId) < pct; +} + +type ResolveInput = MigrationDecisionInput & { + baseWorkerQueue: string | undefined; + envType: string; + backingMap: ComputeBackingMap; +}; + +/** + * Rewrite the resolved worker queue to its compute backing when the org is + * migrated and the region has a backing. Same-geo swap (us-east-1 -> us-east-1-next): + * any explicit placement is a geography preference, honored by staying in-region. + * Applied after region resolution, mirroring the scheduled-split. + */ +export function resolveComputeMigration({ + baseWorkerQueue, + envType, + backingMap, + ...decision +}: ResolveInput): string | undefined { + if (baseWorkerQueue === undefined) return baseWorkerQueue; + if (envType === "DEVELOPMENT") return baseWorkerQueue; + if (!isOrgMigrated(decision)) return baseWorkerQueue; + return backingMap[baseWorkerQueue] ?? baseWorkerQueue; +} diff --git a/apps/webapp/test/computeMigration.test.ts b/apps/webapp/test/computeMigration.test.ts new file mode 100644 index 0000000000..2c9bd9dd41 --- /dev/null +++ b/apps/webapp/test/computeMigration.test.ts @@ -0,0 +1,110 @@ +import { describe, it, expect } from "vitest"; +import { + parseComputeBackingMap, + isOrgMigrated, + resolveComputeMigration, +} from "~/runEngine/concerns/computeMigration.server"; + +const BACKING = { "us-east-1": "us-east-1-next" }; + +describe("parseComputeBackingMap", () => { + it("parses valid JSON", () => { + expect(parseComputeBackingMap('{"us-east-1":"us-east-1-next"}')).toEqual(BACKING); + }); + it("returns {} on invalid JSON without throwing", () => { + expect(parseComputeBackingMap("not json")).toEqual({}); + }); + it("returns {} on non-string values", () => { + expect(parseComputeBackingMap('{"us-east-1":5}')).toEqual({}); + }); +}); + +describe("isOrgMigrated", () => { + const base = { + planType: "free" as string | undefined, + orgFeatureFlags: {} as Record, + flags: { computeMigrationEnabled: true, computeMigrationFreePercentage: 100 }, + }; + + it("migrates a free org at 100%", () => { + expect(isOrgMigrated({ ...base, orgId: "org_x" })).toBe(true); + }); + it("does not migrate when globally disabled", () => { + expect( + isOrgMigrated({ ...base, orgId: "org_x", flags: { computeMigrationEnabled: false, computeMigrationFreePercentage: 100 } }) + ).toBe(false); + }); + it("per-org override false excludes even at 100%", () => { + expect( + isOrgMigrated({ ...base, orgId: "org_x", orgFeatureFlags: { computeMigrationEnabled: false } }) + ).toBe(false); + }); + it("per-org override true enrolls even when globally off", () => { + expect( + isOrgMigrated({ + ...base, + orgId: "org_x", + orgFeatureFlags: { computeMigrationEnabled: true }, + flags: { computeMigrationEnabled: false, computeMigrationFreePercentage: 0 }, + }) + ).toBe(true); + }); + it("paid uses the paid dial", () => { + expect( + isOrgMigrated({ + planType: "paid", + orgId: "org_x", + orgFeatureFlags: {}, + flags: { computeMigrationEnabled: true, computeMigrationPaidPercentage: 100 }, + }) + ).toBe(true); + }); + it("enterprise is never enrolled by percentage", () => { + expect( + isOrgMigrated({ + planType: "enterprise", + orgId: "org_x", + orgFeatureFlags: {}, + flags: { computeMigrationEnabled: true, computeMigrationFreePercentage: 100, computeMigrationPaidPercentage: 100 }, + }) + ).toBe(false); + }); + it("undefined planType is not enrolled", () => { + expect( + isOrgMigrated({ planType: undefined, orgId: "org_x", orgFeatureFlags: {}, flags: { computeMigrationEnabled: true } }) + ).toBe(false); + }); +}); + +describe("resolveComputeMigration", () => { + const enrolled = { + planType: "free", + orgFeatureFlags: {}, + flags: { computeMigrationEnabled: true, computeMigrationFreePercentage: 100 }, + envType: "PRODUCTION", + backingMap: BACKING, + }; + + it("swaps to the compute backing for an enrolled free org", () => { + expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x" })) + .toBe("us-east-1-next"); + }); + it("leaves a region with no backing untouched (EU)", () => { + expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: "eu-central-1", orgId: "org_x" })) + .toBe("eu-central-1"); + }); + it("does not migrate DEVELOPMENT", () => { + expect( + resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", envType: "DEVELOPMENT" }) + ).toBe("us-east-1"); + }); + it("leaves a non-enrolled org untouched", () => { + expect( + resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", flags: { computeMigrationEnabled: false } }) + ).toBe("us-east-1"); + }); + it("undefined baseWorkerQueue passes through", () => { + expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: undefined, orgId: "org_x" })) + .toBeUndefined(); + }); +}); From b36b83c9f51556f4ffd9f6fe48482e271e23cf1c Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:09:53 +0100 Subject: [PATCH 05/22] feat(webapp): add createReloadingRegistry helper --- .../app/utils/reloadingRegistry.server.ts | 120 ++++++++++++++++++ apps/webapp/test/reloadingRegistry.test.ts | 58 +++++++++ 2 files changed, 178 insertions(+) create mode 100644 apps/webapp/app/utils/reloadingRegistry.server.ts create mode 100644 apps/webapp/test/reloadingRegistry.test.ts diff --git a/apps/webapp/app/utils/reloadingRegistry.server.ts b/apps/webapp/app/utils/reloadingRegistry.server.ts new file mode 100644 index 0000000000..62a8d53376 --- /dev/null +++ b/apps/webapp/app/utils/reloadingRegistry.server.ts @@ -0,0 +1,120 @@ +import pRetry from "p-retry"; +import { Counter, Gauge } from "prom-client"; +import { metricsRegister } from "~/metrics.server"; +import { logger } from "~/services/logger.server"; +import { signalsEmitter } from "~/services/signals.server"; + +const loadFailures = new Counter({ + name: "reloading_registry_load_failures_total", + help: "Failed loads of a reloading registry", + labelNames: ["name"], + registers: [metricsRegister], +}); + +const lastSuccessfulLoadAt = new Gauge({ + name: "reloading_registry_last_successful_load_timestamp_seconds", + help: "Unix time of the last successful registry load (staleness signal)", + labelNames: ["name"], + registers: [metricsRegister], +}); + +export type ReloadingRegistry = { + isReady: Promise; + readonly isLoaded: boolean; + current(): T | undefined; + reload(): Promise; + waitUntilReady(timeoutMs: number): Promise; + stop(): void; +}; + +export type ReloadingRegistryOptions = { + /** Tag for metrics + logs. */ + name: string; + /** Loads the full snapshot from the source of truth. */ + load: () => Promise; + /** How often to reload after the first successful load. */ + intervalMs: number; + /** Startup retry config; defaults to forever with backoff. */ + retry?: { retries?: number }; +}; + +/** + * In-memory snapshot loaded at startup and refreshed on an interval. Reads are + * synchronous (`current()`); the first read should gate on `waitUntilReady` so a + * cold replica never serves a default over a real value. Mirrors the datastore / + * LLM-pricing registries. Interval-only: no pub/sub (a follow-up if sub-second + * propagation is ever needed). + */ +export function createReloadingRegistry(opts: ReloadingRegistryOptions): ReloadingRegistry { + let snapshot: T | undefined; + let loaded = false; + let resolveReady!: () => void; + const isReady = new Promise((resolve) => { + resolveReady = resolve; + }); + + async function doLoad() { + snapshot = await opts.load(); + lastSuccessfulLoadAt.set({ name: opts.name }, Date.now() / 1000); + if (!loaded) { + loaded = true; + resolveReady(); + } + } + + const startup = pRetry(() => doLoad(), { + forever: opts.retry?.retries === undefined, + retries: opts.retry?.retries, + minTimeout: 1_000, + maxTimeout: 60_000, + factor: 2, + onFailedAttempt: (error) => { + loadFailures.inc({ name: opts.name }); + logger.warn("[ReloadingRegistry] startup load failed, retrying", { + name: opts.name, + attemptNumber: error.attemptNumber, + retriesLeft: error.retriesLeft, + error: error.message, + }); + }, + }); + startup.catch((err) => { + logger.error("[ReloadingRegistry] startup load gave up", { + name: opts.name, + error: err instanceof Error ? err.message : String(err), + }); + }); + + const interval = setInterval(() => { + doLoad().catch((err) => { + loadFailures.inc({ name: opts.name }); + logger.warn("[ReloadingRegistry] reload failed", { + name: opts.name, + error: err instanceof Error ? err.message : String(err), + }); + }); + }, opts.intervalMs); + + function stop() { + clearInterval(interval); + } + signalsEmitter.on("SIGTERM", stop); + signalsEmitter.on("SIGINT", stop); + + return { + isReady, + get isLoaded() { + return loaded; + }, + current: () => snapshot, + reload: doLoad, + async waitUntilReady(timeoutMs: number) { + if (loaded || timeoutMs <= 0) return; + await Promise.race([ + isReady, + new Promise((resolve) => setTimeout(resolve, timeoutMs)), + ]); + }, + stop, + }; +} diff --git a/apps/webapp/test/reloadingRegistry.test.ts b/apps/webapp/test/reloadingRegistry.test.ts new file mode 100644 index 0000000000..7034c6f70d --- /dev/null +++ b/apps/webapp/test/reloadingRegistry.test.ts @@ -0,0 +1,58 @@ +import { describe, it, expect } from "vitest"; +import { createReloadingRegistry } from "~/utils/reloadingRegistry.server"; + +describe("createReloadingRegistry", () => { + it("current() is undefined before load, snapshot after isReady", async () => { + const reg = createReloadingRegistry({ + name: "test-a", + intervalMs: 10_000, + load: async () => ({ value: 42 }), + }); + expect(reg.current()).toBeUndefined(); + await reg.isReady; + expect(reg.isLoaded).toBe(true); + expect(reg.current()).toEqual({ value: 42 }); + reg.stop(); + }); + + it("waitUntilReady resolves once loaded", async () => { + const reg = createReloadingRegistry({ + name: "test-b", + intervalMs: 10_000, + load: async () => 1, + }); + await reg.waitUntilReady(1000); + expect(reg.current()).toBe(1); + reg.stop(); + }); + + it("waitUntilReady times out (and stays unloaded) when load never succeeds", async () => { + const reg = createReloadingRegistry({ + name: "test-c", + intervalMs: 10_000, + retry: { retries: 0 }, + load: async () => { + throw new Error("db down"); + }, + }); + await reg.waitUntilReady(50); + expect(reg.isLoaded).toBe(false); + expect(reg.current()).toBeUndefined(); + reg.stop(); + }); + + it("reload() picks up a changed value", async () => { + let v = 1; + const reg = createReloadingRegistry({ + name: "test-d", + intervalMs: 10_000, + load: async () => v, + }); + await reg.isReady; + expect(reg.current()).toBe(1); + v = 2; + await reg.reload(); + expect(reg.current()).toBe(2); + reg.stop(); + }); +}); From 7067ac74c629f9b94c562404e4ed6228e652badc Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:11:37 +0100 Subject: [PATCH 06/22] feat(webapp): boot global flags registry --- apps/webapp/app/entry.server.tsx | 2 ++ .../app/v3/globalFlagsRegistry.server.ts | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+) create mode 100644 apps/webapp/app/v3/globalFlagsRegistry.server.ts diff --git a/apps/webapp/app/entry.server.tsx b/apps/webapp/app/entry.server.tsx index 1282127cb2..cd6d45ccc9 100644 --- a/apps/webapp/app/entry.server.tsx +++ b/apps/webapp/app/entry.server.tsx @@ -43,6 +43,8 @@ import { registerRunChangeNotifierHandlers } from "./services/realtime/runChange import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server"; (globalThis as Record).__sessionsReplicationInstance = sessionsReplicationInstance; +import { globalFlagsRegistry } from "./v3/globalFlagsRegistry.server"; +(globalThis as Record).__globalFlagsRegistry = globalFlagsRegistry; const ABORT_DELAY = 30000; diff --git a/apps/webapp/app/v3/globalFlagsRegistry.server.ts b/apps/webapp/app/v3/globalFlagsRegistry.server.ts new file mode 100644 index 0000000000..d99c94317a --- /dev/null +++ b/apps/webapp/app/v3/globalFlagsRegistry.server.ts @@ -0,0 +1,19 @@ +import { singleton } from "~/utils/singleton"; +import { env } from "~/env.server"; +import { flags } from "~/v3/featureFlags.server"; +import type { FeatureFlagCatalog } from "~/v3/featureFlags"; +import { createReloadingRegistry } from "~/utils/reloadingRegistry.server"; + +/** + * In-memory snapshot of the global feature flags, refreshed every + * GLOBAL_FLAGS_RELOAD_INTERVAL_MS. `flags()` reads the DB-backed global values + * (no per-org overrides). Read synchronously on the trigger hot path; callers + * gate the first read on `waitUntilReady`. + */ +export const globalFlagsRegistry = singleton("globalFlagsRegistry", () => + createReloadingRegistry>({ + name: "global-flags", + intervalMs: env.GLOBAL_FLAGS_RELOAD_INTERVAL_MS, + load: () => flags(), + }) +); From 266fc607ce086659ef743d664b0986283f684d6a Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:16:45 +0100 Subject: [PATCH 07/22] feat(webapp): route migrated orgs to the compute backing at trigger --- .../runEngine/services/triggerTask.server.ts | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 77057990db..84103edd4f 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -38,6 +38,11 @@ import { resolveScheduledQueueSplitEnabled, workerQueueForRun, } from "../concerns/workerQueueSplit.server"; +import { + parseComputeBackingMap, + resolveComputeMigration, +} from "../concerns/computeMigration.server"; +import { globalFlagsRegistry } from "~/v3/globalFlagsRegistry.server"; import { publishClaim as publishMollifierClaim, releaseClaim as releaseMollifierClaim, @@ -65,6 +70,8 @@ import { mollifyTrigger } from "~/v3/mollifier/mollifierMollify.server"; import { type MollifierBuffer } from "@trigger.dev/redis-worker"; import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server"; +const COMPUTE_BACKING_MAP = parseComputeBackingMap(env.COMPUTE_BACKING_MAP); + class NoopTriggerRacepointSystem implements TriggerRacepointSystem { async waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise { return; @@ -358,6 +365,23 @@ export class RunEngineTriggerTaskService { const baseWorkerQueue = workerQueueResult?.masterQueue; const enableFastPath = workerQueueResult?.enableFastPath ?? false; + // Plan-aware compute migration: rewrite the resolved region to its + // compute backing for enrolled orgs. Reads the in-memory global-flags + // snapshot (no DB query). Gate the first read on the registry so a cold + // replica never serves a default over a real flag value. + if (!globalFlagsRegistry.isLoaded) { + await globalFlagsRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS); + } + const migratedWorkerQueue = resolveComputeMigration({ + baseWorkerQueue, + planType, + orgId: environment.organization.id, + orgFeatureFlags: environment.organization.featureFlags as Record | null, + flags: globalFlagsRegistry.current(), + envType: environment.type, + backingMap: COMPUTE_BACKING_MAP, + }); + // Build annotations for this run const triggerSource = options.triggerSource ?? "api"; const triggerAction = options.triggerAction ?? "trigger"; @@ -386,13 +410,13 @@ export class RunEngineTriggerTaskService { globalDefault: env.TRIGGER_WORKER_QUEUE_SCHEDULED_SPLIT_ENABLED === "1", }); const workerQueue = - baseWorkerQueue !== undefined + migratedWorkerQueue !== undefined ? workerQueueForRun({ - workerQueue: baseWorkerQueue, + workerQueue: migratedWorkerQueue, rootTriggerSource: annotations.rootTriggerSource, splitEnabled: scheduledQueueSplitEnabled, }) - : baseWorkerQueue; + : migratedWorkerQueue; try { return await this.traceEventConcern.traceRun( From 58d8fe519da69d8c8374ac9a855aaae50ada7315 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:20:09 +0100 Subject: [PATCH 08/22] feat(webapp): build compute template for migrated orgs --- .../computeTemplateCreation.server.ts | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/v3/services/computeTemplateCreation.server.ts b/apps/webapp/app/v3/services/computeTemplateCreation.server.ts index c972952b47..6388dbb132 100644 --- a/apps/webapp/app/v3/services/computeTemplateCreation.server.ts +++ b/apps/webapp/app/v3/services/computeTemplateCreation.server.ts @@ -9,6 +9,9 @@ import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { ServiceValidationError } from "./baseService.server"; import { FailDeploymentService } from "./failDeployment.server"; import { resolveComputeAccess } from "../regionAccess.server"; +import { isOrgMigrated, parseComputeBackingMap } from "~/runEngine/concerns/computeMigration.server"; +import { globalFlagsRegistry } from "~/v3/globalFlagsRegistry.server"; +import { getEntitlement } from "~/services/platform.v3.server"; type TemplateCreationMode = "required" | "shadow" | "skip"; @@ -145,10 +148,10 @@ export class ComputeTemplateCreationService { where: { id: projectId }, select: { defaultWorkerGroup: { - select: { workloadType: true }, + select: { workloadType: true, masterQueue: true }, }, organization: { - select: { featureFlags: true }, + select: { id: true, featureFlags: true }, }, }, }); @@ -161,6 +164,27 @@ export class ComputeTemplateCreationService { return "required"; } + // Migrated orgs route runs to the compute backing even though their stored + // default is still the container region, so they need a compute template too. + // shadow mode: never fail a deploy over a backing the org didn't opt into. + const backingMap = parseComputeBackingMap(env.COMPUTE_BACKING_MAP); + const defaultQueue = project.defaultWorkerGroup?.masterQueue; + if (defaultQueue && backingMap[defaultQueue]) { + const planType = (await getEntitlement(project.organization.id))?.plan?.type; + if (!globalFlagsRegistry.isLoaded) { + await globalFlagsRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS); + } + const migrated = isOrgMigrated({ + planType, + orgId: project.organization.id, + orgFeatureFlags: project.organization.featureFlags as Record | null, + flags: globalFlagsRegistry.current(), + }); + if (migrated) { + return "shadow"; + } + } + const hasComputeAccess = await resolveComputeAccess(prisma, project.organization.featureFlags); if (hasComputeAccess) { From c4e0dcf734c19594a6537d9394999b6c4c27d461 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:20:25 +0100 Subject: [PATCH 09/22] chore(webapp): server-changes note for compute migration --- .server-changes/plan-aware-compute-migration.md | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 .server-changes/plan-aware-compute-migration.md diff --git a/.server-changes/plan-aware-compute-migration.md b/.server-changes/plan-aware-compute-migration.md new file mode 100644 index 0000000000..9fe0813896 --- /dev/null +++ b/.server-changes/plan-aware-compute-migration.md @@ -0,0 +1,10 @@ +--- +area: webapp +type: feature +--- + +Gradually route a configurable percentage of free (then paid) organizations onto +the compute backing at trigger time, with a per-org exclusion and an admin kill +switch. Controlled by the `computeMigrationEnabled`, `computeMigrationFreePercentage`, +and `computeMigrationPaidPercentage` feature flags and the `COMPUTE_BACKING_MAP` +env var. From 65ce3424f63054c439a4b27fb159640711f730cd Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 17:21:59 +0100 Subject: [PATCH 10/22] test(webapp): move hashBucket test into test/ so vitest includes it --- apps/webapp/{app/utils => test}/computeBucket.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename apps/webapp/{app/utils => test}/computeBucket.test.ts (94%) diff --git a/apps/webapp/app/utils/computeBucket.test.ts b/apps/webapp/test/computeBucket.test.ts similarity index 94% rename from apps/webapp/app/utils/computeBucket.test.ts rename to apps/webapp/test/computeBucket.test.ts index e33578a594..dc2d2e277f 100644 --- a/apps/webapp/app/utils/computeBucket.test.ts +++ b/apps/webapp/test/computeBucket.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from "vitest"; -import { hashBucket } from "./computeBucket"; +import { hashBucket } from "~/utils/computeBucket"; describe("hashBucket", () => { it("returns a stable value in [0, 100) for the same id", () => { From 4791b01e80d3f46cc6b1cc8b18a6b7db0d7a37ec Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 20:14:56 +0100 Subject: [PATCH 11/22] fix(webapp): hide compute backing on read surfaces and fix replay --- .../v3/ApiRetrieveRunPresenter.server.ts | 6 +++++- .../presenters/v3/NextRunListPresenter.server.ts | 6 +++++- .../app/presenters/v3/SpanPresenter.server.ts | 4 +++- .../routes/resources.taskruns.$runParam.replay.ts | 7 ++++++- .../runEngine/concerns/computeMigration.server.ts | 13 +++++++++++++ .../app/runEngine/services/triggerTask.server.ts | 10 +++------- apps/webapp/app/v3/computeBackingMap.server.ts | 5 +++++ .../webapp/app/v3/services/replayTaskRun.server.ts | 5 ++++- apps/webapp/test/computeMigration.test.ts | 14 ++++++++++++++ 9 files changed, 58 insertions(+), 12 deletions(-) create mode 100644 apps/webapp/app/v3/computeBackingMap.server.ts diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index 00290b3620..fa7bf0cfee 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -16,6 +16,8 @@ import assertNever from "assert-never"; import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions"; import { $replica, prisma } from "~/db.server"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; +import { regionForBacking } from "~/runEngine/concerns/computeMigration.server"; +import { computeBackingMap } from "~/v3/computeBackingMap.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { findRunByIdWithMollifierFallback, @@ -520,7 +522,9 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V triggerFunction: resolveTriggerFunction(run), batchId: run.batch?.friendlyId, metadata, - region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined, + region: run.workerQueue + ? regionForBacking(baseWorkerQueue(run.workerQueue), computeBackingMap) + : undefined, }; } diff --git a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts index 8d1b8c1388..54f57a37c2 100644 --- a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts @@ -12,6 +12,8 @@ import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server"; import { getTaskIdentifiers } from "~/models/task.server"; import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; +import { regionForBacking } from "~/runEngine/concerns/computeMigration.server"; +import { computeBackingMap } from "~/v3/computeBackingMap.server"; import { machinePresetFromRun } from "~/v3/machinePresets.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus"; @@ -260,7 +262,9 @@ export class NextRunListPresenter { name: run.queue.replace("task/", ""), type: run.queue.startsWith("task/") ? "task" : "custom", }, - region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined, + region: run.workerQueue + ? regionForBacking(baseWorkerQueue(run.workerQueue), computeBackingMap) + : undefined, taskKind: RunAnnotations.safeParse(run.annotations).data?.taskKind ?? "STANDARD", }; }), diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index bd0ac5c540..be7405eabd 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -16,6 +16,8 @@ import { } from "@trigger.dev/core/v3/serverOnly"; import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; +import { regionForBacking } from "~/runEngine/concerns/computeMigration.server"; +import { computeBackingMap } from "~/v3/computeBackingMap.server"; import { logger } from "~/services/logger.server"; import { rehydrateAttribute } from "~/v3/eventRepository/eventRepository.server"; import { machinePresetFromRun } from "~/v3/machinePresets.server"; @@ -303,7 +305,7 @@ export class SpanPresenter extends BasePresenter { location: true, }, where: { - masterQueue: baseWorkerQueue(run.workerQueue), + masterQueue: regionForBacking(baseWorkerQueue(run.workerQueue), computeBackingMap), }, }); diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts index 631bd5ece5..78372a2110 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts @@ -19,6 +19,8 @@ import { } from "~/v3/mollifier/syntheticReplayTaskRun.server"; import parseDuration from "parse-duration"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; +import { regionForBacking } from "~/runEngine/concerns/computeMigration.server"; +import { computeBackingMap } from "~/v3/computeBackingMap.server"; import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server"; import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server"; import { ReplayRunData } from "~/v3/replayTask"; @@ -210,7 +212,10 @@ export async function loader({ request, params }: LoaderFunctionArgs) { maxAttempts: run.maxAttempts, maxDurationSeconds: run.maxDurationInSeconds, machinePreset: run.machinePreset, - region: environment.type === "DEVELOPMENT" ? undefined : baseWorkerQueue(run.workerQueue), + region: + environment.type === "DEVELOPMENT" + ? undefined + : regionForBacking(baseWorkerQueue(run.workerQueue), computeBackingMap), regions: regionsResult.regions, ttlSeconds: run.ttl ? parseDuration(run.ttl, "s") ?? undefined : undefined, idempotencyKey: run.idempotencyKey, diff --git a/apps/webapp/app/runEngine/concerns/computeMigration.server.ts b/apps/webapp/app/runEngine/concerns/computeMigration.server.ts index e3a0cfb286..eb874daec8 100644 --- a/apps/webapp/app/runEngine/concerns/computeMigration.server.ts +++ b/apps/webapp/app/runEngine/concerns/computeMigration.server.ts @@ -28,6 +28,19 @@ export function parseComputeBackingMap(raw: string): ComputeBackingMap { } } +/** + * Inverse of the backing map: given a worker queue, return the user-facing geo + * region. If the queue is a compute backing (a *value* in the map), return the + * region it backs; otherwise return the queue unchanged. Used to hide the + * backing on customer surfaces and to re-derive the region on replay. + */ +export function regionForBacking(queue: string, backingMap: ComputeBackingMap): string { + for (const [region, backing] of Object.entries(backingMap)) { + if (backing === queue) return region; + } + return queue; +} + type MigrationDecisionInput = { planType: string | undefined; orgId: string; diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 84103edd4f..5e91381e5b 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -38,10 +38,8 @@ import { resolveScheduledQueueSplitEnabled, workerQueueForRun, } from "../concerns/workerQueueSplit.server"; -import { - parseComputeBackingMap, - resolveComputeMigration, -} from "../concerns/computeMigration.server"; +import { resolveComputeMigration } from "../concerns/computeMigration.server"; +import { computeBackingMap } from "~/v3/computeBackingMap.server"; import { globalFlagsRegistry } from "~/v3/globalFlagsRegistry.server"; import { publishClaim as publishMollifierClaim, @@ -70,8 +68,6 @@ import { mollifyTrigger } from "~/v3/mollifier/mollifierMollify.server"; import { type MollifierBuffer } from "@trigger.dev/redis-worker"; import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server"; -const COMPUTE_BACKING_MAP = parseComputeBackingMap(env.COMPUTE_BACKING_MAP); - class NoopTriggerRacepointSystem implements TriggerRacepointSystem { async waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise { return; @@ -379,7 +375,7 @@ export class RunEngineTriggerTaskService { orgFeatureFlags: environment.organization.featureFlags as Record | null, flags: globalFlagsRegistry.current(), envType: environment.type, - backingMap: COMPUTE_BACKING_MAP, + backingMap: computeBackingMap, }); // Build annotations for this run diff --git a/apps/webapp/app/v3/computeBackingMap.server.ts b/apps/webapp/app/v3/computeBackingMap.server.ts new file mode 100644 index 0000000000..4a602f592e --- /dev/null +++ b/apps/webapp/app/v3/computeBackingMap.server.ts @@ -0,0 +1,5 @@ +import { env } from "~/env.server"; +import { parseComputeBackingMap } from "~/runEngine/concerns/computeMigration.server"; + +/** Parsed once: region -> compute-backing worker queue. Operator env, rarely changes. */ +export const computeBackingMap = parseComputeBackingMap(env.COMPUTE_BACKING_MAP); diff --git a/apps/webapp/app/v3/services/replayTaskRun.server.ts b/apps/webapp/app/v3/services/replayTaskRun.server.ts index 7975694b5e..5da53106be 100644 --- a/apps/webapp/app/v3/services/replayTaskRun.server.ts +++ b/apps/webapp/app/v3/services/replayTaskRun.server.ts @@ -7,6 +7,8 @@ import { import { type TaskRun } from "@trigger.dev/database"; import { findEnvironmentById } from "~/models/runtimeEnvironment.server"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; +import { regionForBacking } from "~/runEngine/concerns/computeMigration.server"; +import { computeBackingMap } from "~/v3/computeBackingMap.server"; import { logger } from "~/services/logger.server"; import { BaseService } from "./baseService.server"; import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server"; @@ -68,7 +70,8 @@ export class ReplayTaskRunService extends BaseService { authenticatedEnvironment.type === "DEVELOPMENT"; const region = ignoreRegion ? undefined - : overrideOptions.region ?? baseWorkerQueue(existingTaskRun.workerQueue); + : overrideOptions.region ?? + regionForBacking(baseWorkerQueue(existingTaskRun.workerQueue), computeBackingMap); try { const taskQueue = await this._prisma.taskQueue.findFirst({ diff --git a/apps/webapp/test/computeMigration.test.ts b/apps/webapp/test/computeMigration.test.ts index 2c9bd9dd41..018627a132 100644 --- a/apps/webapp/test/computeMigration.test.ts +++ b/apps/webapp/test/computeMigration.test.ts @@ -3,6 +3,7 @@ import { parseComputeBackingMap, isOrgMigrated, resolveComputeMigration, + regionForBacking, } from "~/runEngine/concerns/computeMigration.server"; const BACKING = { "us-east-1": "us-east-1-next" }; @@ -108,3 +109,16 @@ describe("resolveComputeMigration", () => { .toBeUndefined(); }); }); + +describe("regionForBacking", () => { + it("maps a backing to its region", () => { + expect(regionForBacking("us-east-1-next", BACKING)).toBe("us-east-1"); + }); + it("passes a non-backing queue through unchanged", () => { + expect(regionForBacking("us-east-1", BACKING)).toBe("us-east-1"); + expect(regionForBacking("eu-central-1", BACKING)).toBe("eu-central-1"); + }); + it("passes through unchanged with an empty map", () => { + expect(regionForBacking("us-east-1-next", {})).toBe("us-east-1-next"); + }); +}); From 8060760be14b549961f70ba27ed72da59204430e Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 20:17:31 +0100 Subject: [PATCH 12/22] fix(webapp): store geo region in clickhouse to hide compute backing from query --- apps/webapp/app/services/runsReplicationService.server.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index d6055c21b1..651c7b5eb7 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -39,6 +39,8 @@ import pLimit from "p-limit"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; import { calculateErrorFingerprint } from "~/utils/errorFingerprinting"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; +import { regionForBacking } from "~/runEngine/concerns/computeMigration.server"; +import { computeBackingMap } from "~/v3/computeBackingMap.server"; import { isClickHouseJsonParseError, parseRowNumberFromError, @@ -1122,7 +1124,7 @@ export class RunsReplicationService { event === "delete" ? 1 : 0, // _is_deleted run.concurrencyKey ?? "", // concurrency_key run.bulkActionGroupIds ?? [], // bulk_action_group_ids - baseWorkerQueue(run.masterQueue ?? ""), // worker_queue (region; strip any split suffix like `:scheduled`) + regionForBacking(baseWorkerQueue(run.masterQueue ?? ""), computeBackingMap), // worker_queue (geo region; strip `:scheduled` and hide the compute backing - customers query this column) run.maxDurationInSeconds ?? null, // max_duration_in_seconds annotations?.triggerSource ?? "", // trigger_source annotations?.rootTriggerSource ?? "", // root_trigger_source From 25e3fe1ea3a5a419a9fdbf98e99643a897a27419 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 20:20:39 +0100 Subject: [PATCH 13/22] fix(webapp): serialize registry reloads and clear readiness timeout --- .../app/utils/reloadingRegistry.server.ts | 21 +++++-- apps/webapp/test/reloadingRegistry.test.ts | 60 ++++++++++++++++++- 2 files changed, 75 insertions(+), 6 deletions(-) diff --git a/apps/webapp/app/utils/reloadingRegistry.server.ts b/apps/webapp/app/utils/reloadingRegistry.server.ts index 62a8d53376..6aa1388d6d 100644 --- a/apps/webapp/app/utils/reloadingRegistry.server.ts +++ b/apps/webapp/app/utils/reloadingRegistry.server.ts @@ -48,13 +48,17 @@ export type ReloadingRegistryOptions = { export function createReloadingRegistry(opts: ReloadingRegistryOptions): ReloadingRegistry { let snapshot: T | undefined; let loaded = false; + let loadSeq = 0; let resolveReady!: () => void; const isReady = new Promise((resolve) => { resolveReady = resolve; }); async function doLoad() { - snapshot = await opts.load(); + const seq = ++loadSeq; + const next = await opts.load(); + if (seq < loadSeq) return; // a newer load started while we were awaiting; don't clobber + snapshot = next; lastSuccessfulLoadAt.set({ name: opts.name }, Date.now() / 1000); if (!loaded) { loaded = true; @@ -110,10 +114,17 @@ export function createReloadingRegistry(opts: ReloadingRegistryOptions): R reload: doLoad, async waitUntilReady(timeoutMs: number) { if (loaded || timeoutMs <= 0) return; - await Promise.race([ - isReady, - new Promise((resolve) => setTimeout(resolve, timeoutMs)), - ]); + let timer: ReturnType | undefined; + try { + await Promise.race([ + isReady, + new Promise((resolve) => { + timer = setTimeout(resolve, timeoutMs); + }), + ]); + } finally { + if (timer) clearTimeout(timer); + } }, stop, }; diff --git a/apps/webapp/test/reloadingRegistry.test.ts b/apps/webapp/test/reloadingRegistry.test.ts index 7034c6f70d..8a11bed60d 100644 --- a/apps/webapp/test/reloadingRegistry.test.ts +++ b/apps/webapp/test/reloadingRegistry.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect } from "vitest"; +import { describe, it, expect, vi } from "vitest"; import { createReloadingRegistry } from "~/utils/reloadingRegistry.server"; describe("createReloadingRegistry", () => { @@ -55,4 +55,62 @@ describe("createReloadingRegistry", () => { expect(reg.current()).toBe(2); reg.stop(); }); + + it("newer load wins even if an older load resolves later", async () => { + // load hands the test a deferred resolver per call so completion order is controllable. + const deferred: Array<(value: number) => void> = []; + const reg = createReloadingRegistry({ + name: "test-e", + intervalMs: 10_000, + load: () => + new Promise((resolve) => { + deferred.push(resolve); + }), + }); + + // deferred[0] is the startup load; let it complete with an initial value. + deferred[0](0); + await reg.isReady; + + // start two overlapping loads; don't await yet (deferred[1] older, deferred[2] newer) + const older = reg.reload(); + const newer = reg.reload(); + + // resolve the NEWER load first, then the OLDER load last + deferred[2](2); + deferred[1](1); + await Promise.all([older, newer]); + + // the older load completing last must NOT clobber the newer snapshot + expect(reg.current()).toBe(2); + reg.stop(); + }); + + it("waitUntilReady clears its timeout when ready wins", async () => { + const clearSpy = vi.spyOn(global, "clearTimeout"); + // load resolves only when the test releases it, so waitUntilReady runs the + // race while still unloaded (it would return early if already loaded) + let releaseLoad!: () => void; + const loadGate = new Promise((resolve) => { + releaseLoad = resolve; + }); + const reg = createReloadingRegistry({ + name: "test-f", + intervalMs: 10_000, + load: async () => { + await loadGate; + return 1; + }, + }); + + // long timeout so isReady is what actually wins the race + const waiting = reg.waitUntilReady(10_000); + releaseLoad(); + await reg.isReady; + await waiting; + + expect(clearSpy).toHaveBeenCalled(); + clearSpy.mockRestore(); + reg.stop(); + }); }); From 18d5b1179a11484176cfd974a643f86278e51f6b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 20:23:59 +0100 Subject: [PATCH 14/22] fix(webapp): strict boolean kill switch, bound reload interval, cuid bucket test --- apps/webapp/app/env.server.ts | 4 ++-- apps/webapp/app/v3/featureFlags.ts | 6 +++++- apps/webapp/test/computeBucket.test.ts | 18 ++++++++++++++++++ 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 1724512637..856bb0f8e0 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -164,10 +164,10 @@ const EnvironmentSchema = z COMPUTE_BACKING_MAP: z.string().default("{}"), // How often each replica reloads the global flags snapshot from the DB. // Sets kill/ramp propagation latency. - GLOBAL_FLAGS_RELOAD_INTERVAL_MS: z.coerce.number().int().default(5000), + GLOBAL_FLAGS_RELOAD_INTERVAL_MS: z.coerce.number().int().min(1000).default(5000), // Max time the first trigger blocks waiting for the initial flags load // before falling back to defaults (off = container, the safe direction). - GLOBAL_FLAGS_READY_TIMEOUT_MS: z.coerce.number().int().default(5000), + GLOBAL_FLAGS_READY_TIMEOUT_MS: z.coerce.number().int().min(0).default(5000), WORKER_ENABLED: z.string().default("true"), GRACEFUL_SHUTDOWN_TIMEOUT: z.coerce.number().int().default(60000), DISABLE_SSE: z.string().optional(), diff --git a/apps/webapp/app/v3/featureFlags.ts b/apps/webapp/app/v3/featureFlags.ts index be6b962166..18abaae67f 100644 --- a/apps/webapp/app/v3/featureFlags.ts +++ b/apps/webapp/app/v3/featureFlags.ts @@ -30,7 +30,11 @@ export const FeatureFlagCatalog = { // globally and per-org (org wins). Defaults to "electric" when unset. // "shadow" serves Electric but diffs the native path in the background. [FEATURE_FLAG.realtimeBackend]: z.enum(["electric", "native", "shadow"]), - [FEATURE_FLAG.computeMigrationEnabled]: z.coerce.boolean(), + // Strict z.boolean() (not z.coerce.boolean()): coercion turns the string "false" + // into true, which would silently flip this kill switch / per-org exclude the wrong + // way if written as a string via the admin PAT route. The admin toggle sends a real + // boolean, so this only rejects the dangerous stringified case. + [FEATURE_FLAG.computeMigrationEnabled]: z.boolean(), [FEATURE_FLAG.computeMigrationFreePercentage]: z.coerce.number().int().min(0).max(100), [FEATURE_FLAG.computeMigrationPaidPercentage]: z.coerce.number().int().min(0).max(100), }; diff --git a/apps/webapp/test/computeBucket.test.ts b/apps/webapp/test/computeBucket.test.ts index dc2d2e277f..a3d0c9c72b 100644 --- a/apps/webapp/test/computeBucket.test.ts +++ b/apps/webapp/test/computeBucket.test.ts @@ -1,4 +1,5 @@ import { describe, it, expect } from "vitest"; +import cuid from "cuid"; import { hashBucket } from "~/utils/computeBucket"; describe("hashBucket", () => { @@ -25,4 +26,21 @@ describe("hashBucket", () => { expect(under10).toBeGreaterThan(700); expect(under10).toBeLessThan(1300); }); + + // Org ids are `@default(cuid())` primary keys (e.g. "cjld2cjxh0000qzrmn831i7rn"), + // not the synthetic sequential ids above. cuids share a "c" prefix + timestamp/counter + // structure, so verify the hash still spreads *real-shaped* ids evenly across deciles + // (so a percentage dial maps to ~that fraction of actual orgs, not just of the id space). + it("distributes cuids evenly across all 10 deciles", () => { + const ids = Array.from({ length: 20000 }, () => cuid()); + const counts = new Array(10).fill(0); + for (const id of ids) { + counts[Math.floor(hashBucket(id) / 10)]++; + } + // Expected ~2000 per decile; allow a wide band so it isn't flaky. + for (const count of counts) { + expect(count).toBeGreaterThan(1700); + expect(count).toBeLessThan(2300); + } + }); }); From aefdf3acec853454884df725c4e645c9c5a84995 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 20:58:18 +0100 Subject: [PATCH 15/22] feat(database,webapp): add WorkerInstanceGroup.region + worker-region registry --- apps/webapp/app/v3/workerRegions.server.ts | 60 +++++++++++++++++++ apps/webapp/test/workerRegions.test.ts | 42 +++++++++++++ .../migration.sql | 2 + .../database/prisma/schema.prisma | 4 ++ 4 files changed, 108 insertions(+) create mode 100644 apps/webapp/app/v3/workerRegions.server.ts create mode 100644 apps/webapp/test/workerRegions.test.ts create mode 100644 internal-packages/database/prisma/migrations/20260615210000_add_worker_instance_group_region/migration.sql diff --git a/apps/webapp/app/v3/workerRegions.server.ts b/apps/webapp/app/v3/workerRegions.server.ts new file mode 100644 index 0000000000..3a64c52013 --- /dev/null +++ b/apps/webapp/app/v3/workerRegions.server.ts @@ -0,0 +1,60 @@ +import { type WorkloadType } from "@trigger.dev/database"; +import { prisma } from "~/db.server"; +import { env } from "~/env.server"; +import { singleton } from "~/utils/singleton"; +import { createReloadingRegistry } from "~/utils/reloadingRegistry.server"; + +export type WorkerGroupRegionRow = { + masterQueue: string; + region: string | null; + workloadType: WorkloadType; + hidden: boolean; +}; + +/** + * Reverse map: a stored worker queue -> its user-facing geo region. A backing + * queue (e.g. "us-east-1-next") returns the region it backs ("us-east-1"); + * anything unknown or with no region set passes through unchanged (so container + * queues and not-yet-labelled groups behave exactly as before). + */ +export function regionForQueue(queue: string, groups: WorkerGroupRegionRow[]): string { + const self = groups.find((g) => g.masterQueue === queue); + return self?.region ?? queue; +} + +/** + * Forward map: the compute (MICROVM) backing queue for the region that `queue` + * belongs to, or undefined if the region has no compute backing. `queue` is the + * resolved (container) worker queue; we look up its region, then find a visible + * MICROVM group in the same region. + */ +export function backingForQueue(queue: string, groups: WorkerGroupRegionRow[]): string | undefined { + const self = groups.find((g) => g.masterQueue === queue); + const region = self?.region; + if (!region) return undefined; + const backing = groups.find( + (g) => + g.workloadType === "MICROVM" && + g.region === region && + !g.hidden && + g.masterQueue !== queue + ); + return backing?.masterQueue; +} + +/** + * In-memory snapshot of every worker group's (queue, region, type, hidden), + * refreshed on an interval. Read synchronously on the hot path; callers gate the + * first read on `waitUntilReady`. Replaces the COMPUTE_BACKING_MAP env var as the + * source of truth for region<->backing resolution. + */ +export const workerRegionRegistry = singleton("workerRegionRegistry", () => + createReloadingRegistry({ + name: "worker-region", + intervalMs: env.GLOBAL_FLAGS_RELOAD_INTERVAL_MS, + load: () => + prisma.workerInstanceGroup.findMany({ + select: { masterQueue: true, region: true, workloadType: true, hidden: true }, + }), + }) +); diff --git a/apps/webapp/test/workerRegions.test.ts b/apps/webapp/test/workerRegions.test.ts new file mode 100644 index 0000000000..6e80de6f2c --- /dev/null +++ b/apps/webapp/test/workerRegions.test.ts @@ -0,0 +1,42 @@ +import { describe, it, expect } from "vitest"; +import { regionForQueue, backingForQueue, type WorkerGroupRegionRow } from "~/v3/workerRegions.server"; + +const groups: WorkerGroupRegionRow[] = [ + { masterQueue: "us-east-1", region: "us-east-1", workloadType: "CONTAINER", hidden: false }, + { masterQueue: "us-east-1-next", region: "us-east-1", workloadType: "MICROVM", hidden: false }, + { masterQueue: "eu-central-1", region: "eu-central-1", workloadType: "CONTAINER", hidden: false }, +]; + +describe("regionForQueue", () => { + it("maps a backing queue to its region", () => { + expect(regionForQueue("us-east-1-next", groups)).toBe("us-east-1"); + }); + it("maps a container queue to its own region", () => { + expect(regionForQueue("us-east-1", groups)).toBe("us-east-1"); + }); + it("passes an unknown queue through unchanged", () => { + expect(regionForQueue("mystery", groups)).toBe("mystery"); + }); + it("passes through when a group has no region set", () => { + expect(regionForQueue("x", [{ masterQueue: "x", region: null, workloadType: "CONTAINER", hidden: false }])).toBe("x"); + }); +}); + +describe("backingForQueue", () => { + it("finds the MICROVM backing for a region with one", () => { + expect(backingForQueue("us-east-1", groups)).toBe("us-east-1-next"); + }); + it("returns undefined for a region with no compute backing (EU)", () => { + expect(backingForQueue("eu-central-1", groups)).toBeUndefined(); + }); + it("returns undefined when the queue's group has no region", () => { + expect(backingForQueue("x", [{ masterQueue: "x", region: null, workloadType: "CONTAINER", hidden: false }])).toBeUndefined(); + }); + it("ignores hidden MICROVM groups", () => { + const g: WorkerGroupRegionRow[] = [ + { masterQueue: "us-east-1", region: "us-east-1", workloadType: "CONTAINER", hidden: false }, + { masterQueue: "us-east-1-next", region: "us-east-1", workloadType: "MICROVM", hidden: true }, + ]; + expect(backingForQueue("us-east-1", g)).toBeUndefined(); + }); +}); diff --git a/internal-packages/database/prisma/migrations/20260615210000_add_worker_instance_group_region/migration.sql b/internal-packages/database/prisma/migrations/20260615210000_add_worker_instance_group_region/migration.sql new file mode 100644 index 0000000000..2ff2282554 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260615210000_add_worker_instance_group_region/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "WorkerInstanceGroup" ADD COLUMN IF NOT EXISTS "region" TEXT; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 337a6059eb..89090ffc2d 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1532,6 +1532,10 @@ model WorkerInstanceGroup { workloadType WorkloadType @default(CONTAINER) + /// Geo region (e.g. "us-east-1"); container + MICROVM groups for one geo share it. + /// Set-once while the group has runs - changing it breaks region resolution for them. + region String? + /// When true, runs enqueued to this worker queue may skip the intermediate queue /// and be pushed directly to the worker queue when concurrency is available. enableFastPath Boolean @default(false) From 53eaa7a50d727fe5f2e2ac2fdf2f91f5f22d9989 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 21:05:12 +0100 Subject: [PATCH 16/22] refactor(webapp): resolve region<->backing from WorkerInstanceGroup.region, drop env map --- apps/webapp/app/env.server.ts | 4 -- .../v3/ApiRetrieveRunPresenter.server.ts | 5 +- .../v3/NextRunListPresenter.server.ts | 5 +- .../app/presenters/v3/SpanPresenter.server.ts | 5 +- .../resources.taskruns.$runParam.replay.ts | 5 +- .../concerns/computeMigration.server.ts | 40 ++------------- .../runEngine/services/triggerTask.server.ts | 8 ++- .../services/runsReplicationService.server.ts | 5 +- .../webapp/app/v3/computeBackingMap.server.ts | 5 -- .../computeTemplateCreation.server.ts | 9 ++-- .../app/v3/services/replayTaskRun.server.ts | 5 +- apps/webapp/app/v3/workerRegions.server.ts | 4 +- apps/webapp/test/computeMigration.test.ts | 50 +++---------------- 13 files changed, 35 insertions(+), 115 deletions(-) delete mode 100644 apps/webapp/app/v3/computeBackingMap.server.ts diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 856bb0f8e0..d2021f4fe3 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -158,10 +158,6 @@ const EnvironmentSchema = z WORKER_SCHEMA: z.string().default("graphile_worker"), WORKER_CONCURRENCY: z.coerce.number().int().default(10), WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), - // JSON map of container-region masterQueue -> compute-backing masterQueue. - // Absence of an entry means that region is never migrated (e.g. EU until it - // has a compute backing). Example: {"us-east-1":"us-east-1-next"} - COMPUTE_BACKING_MAP: z.string().default("{}"), // How often each replica reloads the global flags snapshot from the DB. // Sets kill/ramp propagation latency. GLOBAL_FLAGS_RELOAD_INTERVAL_MS: z.coerce.number().int().min(1000).default(5000), diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index fa7bf0cfee..80ab96df2f 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -16,8 +16,7 @@ import assertNever from "assert-never"; import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions"; import { $replica, prisma } from "~/db.server"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; -import { regionForBacking } from "~/runEngine/concerns/computeMigration.server"; -import { computeBackingMap } from "~/v3/computeBackingMap.server"; +import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { findRunByIdWithMollifierFallback, @@ -523,7 +522,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V batchId: run.batch?.friendlyId, metadata, region: run.workerQueue - ? regionForBacking(baseWorkerQueue(run.workerQueue), computeBackingMap) + ? regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? []) : undefined, }; } diff --git a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts index 54f57a37c2..54da35c533 100644 --- a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts @@ -12,8 +12,7 @@ import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server"; import { getTaskIdentifiers } from "~/models/task.server"; import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; -import { regionForBacking } from "~/runEngine/concerns/computeMigration.server"; -import { computeBackingMap } from "~/v3/computeBackingMap.server"; +import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server"; import { machinePresetFromRun } from "~/v3/machinePresets.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus"; @@ -263,7 +262,7 @@ export class NextRunListPresenter { type: run.queue.startsWith("task/") ? "task" : "custom", }, region: run.workerQueue - ? regionForBacking(baseWorkerQueue(run.workerQueue), computeBackingMap) + ? regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? []) : undefined, taskKind: RunAnnotations.safeParse(run.annotations).data?.taskKind ?? "STANDARD", }; diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index be7405eabd..a652940230 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -16,8 +16,7 @@ import { } from "@trigger.dev/core/v3/serverOnly"; import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; -import { regionForBacking } from "~/runEngine/concerns/computeMigration.server"; -import { computeBackingMap } from "~/v3/computeBackingMap.server"; +import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server"; import { logger } from "~/services/logger.server"; import { rehydrateAttribute } from "~/v3/eventRepository/eventRepository.server"; import { machinePresetFromRun } from "~/v3/machinePresets.server"; @@ -305,7 +304,7 @@ export class SpanPresenter extends BasePresenter { location: true, }, where: { - masterQueue: regionForBacking(baseWorkerQueue(run.workerQueue), computeBackingMap), + masterQueue: regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? []), }, }); diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts index 78372a2110..3511c3b29d 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts @@ -19,8 +19,7 @@ import { } from "~/v3/mollifier/syntheticReplayTaskRun.server"; import parseDuration from "parse-duration"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; -import { regionForBacking } from "~/runEngine/concerns/computeMigration.server"; -import { computeBackingMap } from "~/v3/computeBackingMap.server"; +import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server"; import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server"; import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server"; import { ReplayRunData } from "~/v3/replayTask"; @@ -215,7 +214,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) { region: environment.type === "DEVELOPMENT" ? undefined - : regionForBacking(baseWorkerQueue(run.workerQueue), computeBackingMap), + : regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? []), regions: regionsResult.regions, ttlSeconds: run.ttl ? parseDuration(run.ttl, "s") ?? undefined : undefined, idempotencyKey: run.idempotencyKey, diff --git a/apps/webapp/app/runEngine/concerns/computeMigration.server.ts b/apps/webapp/app/runEngine/concerns/computeMigration.server.ts index eb874daec8..fb0193fc8c 100644 --- a/apps/webapp/app/runEngine/concerns/computeMigration.server.ts +++ b/apps/webapp/app/runEngine/concerns/computeMigration.server.ts @@ -7,40 +7,6 @@ export type ComputeMigrationFlags = { computeMigrationPaidPercentage?: number; }; -export type ComputeBackingMap = Record; - -/** - * Parse COMPUTE_BACKING_MAP (container-region masterQueue -> compute-backing - * masterQueue). Never throws: bad JSON or non-string values yield {} so a - * misconfigured env disables migration rather than breaking triggers. - */ -export function parseComputeBackingMap(raw: string): ComputeBackingMap { - try { - const parsed = JSON.parse(raw); - if (typeof parsed !== "object" || parsed === null) return {}; - const out: ComputeBackingMap = {}; - for (const [k, v] of Object.entries(parsed)) { - if (typeof v === "string") out[k] = v; - } - return out; - } catch { - return {}; - } -} - -/** - * Inverse of the backing map: given a worker queue, return the user-facing geo - * region. If the queue is a compute backing (a *value* in the map), return the - * region it backs; otherwise return the queue unchanged. Used to hide the - * backing on customer surfaces and to re-derive the region on replay. - */ -export function regionForBacking(queue: string, backingMap: ComputeBackingMap): string { - for (const [region, backing] of Object.entries(backingMap)) { - if (backing === queue) return region; - } - return queue; -} - type MigrationDecisionInput = { planType: string | undefined; orgId: string; @@ -81,7 +47,7 @@ export function isOrgMigrated({ type ResolveInput = MigrationDecisionInput & { baseWorkerQueue: string | undefined; envType: string; - backingMap: ComputeBackingMap; + backing: string | undefined; // the compute backing for this queue's region, or undefined }; /** @@ -93,11 +59,11 @@ type ResolveInput = MigrationDecisionInput & { export function resolveComputeMigration({ baseWorkerQueue, envType, - backingMap, + backing, ...decision }: ResolveInput): string | undefined { if (baseWorkerQueue === undefined) return baseWorkerQueue; if (envType === "DEVELOPMENT") return baseWorkerQueue; if (!isOrgMigrated(decision)) return baseWorkerQueue; - return backingMap[baseWorkerQueue] ?? baseWorkerQueue; + return backing ?? baseWorkerQueue; } diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 5e91381e5b..c5b513d0de 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -39,7 +39,7 @@ import { workerQueueForRun, } from "../concerns/workerQueueSplit.server"; import { resolveComputeMigration } from "../concerns/computeMigration.server"; -import { computeBackingMap } from "~/v3/computeBackingMap.server"; +import { workerRegionRegistry, backingForQueue } from "~/v3/workerRegions.server"; import { globalFlagsRegistry } from "~/v3/globalFlagsRegistry.server"; import { publishClaim as publishMollifierClaim, @@ -368,6 +368,10 @@ export class RunEngineTriggerTaskService { if (!globalFlagsRegistry.isLoaded) { await globalFlagsRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS); } + if (!workerRegionRegistry.isLoaded) { + await workerRegionRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS); + } + const workerGroups = workerRegionRegistry.current() ?? []; const migratedWorkerQueue = resolveComputeMigration({ baseWorkerQueue, planType, @@ -375,7 +379,7 @@ export class RunEngineTriggerTaskService { orgFeatureFlags: environment.organization.featureFlags as Record | null, flags: globalFlagsRegistry.current(), envType: environment.type, - backingMap: computeBackingMap, + backing: baseWorkerQueue ? backingForQueue(baseWorkerQueue, workerGroups) : undefined, }); // Build annotations for this run diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 651c7b5eb7..b9f699d9c2 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -39,8 +39,7 @@ import pLimit from "p-limit"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; import { calculateErrorFingerprint } from "~/utils/errorFingerprinting"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; -import { regionForBacking } from "~/runEngine/concerns/computeMigration.server"; -import { computeBackingMap } from "~/v3/computeBackingMap.server"; +import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server"; import { isClickHouseJsonParseError, parseRowNumberFromError, @@ -1124,7 +1123,7 @@ export class RunsReplicationService { event === "delete" ? 1 : 0, // _is_deleted run.concurrencyKey ?? "", // concurrency_key run.bulkActionGroupIds ?? [], // bulk_action_group_ids - regionForBacking(baseWorkerQueue(run.masterQueue ?? ""), computeBackingMap), // worker_queue (geo region; strip `:scheduled` and hide the compute backing - customers query this column) + regionForQueue(baseWorkerQueue(run.masterQueue ?? ""), workerRegionRegistry.current() ?? []), // worker_queue (geo region; strip `:scheduled` and hide the compute backing - customers query this column) run.maxDurationInSeconds ?? null, // max_duration_in_seconds annotations?.triggerSource ?? "", // trigger_source annotations?.rootTriggerSource ?? "", // root_trigger_source diff --git a/apps/webapp/app/v3/computeBackingMap.server.ts b/apps/webapp/app/v3/computeBackingMap.server.ts deleted file mode 100644 index 4a602f592e..0000000000 --- a/apps/webapp/app/v3/computeBackingMap.server.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { env } from "~/env.server"; -import { parseComputeBackingMap } from "~/runEngine/concerns/computeMigration.server"; - -/** Parsed once: region -> compute-backing worker queue. Operator env, rarely changes. */ -export const computeBackingMap = parseComputeBackingMap(env.COMPUTE_BACKING_MAP); diff --git a/apps/webapp/app/v3/services/computeTemplateCreation.server.ts b/apps/webapp/app/v3/services/computeTemplateCreation.server.ts index 6388dbb132..555f320410 100644 --- a/apps/webapp/app/v3/services/computeTemplateCreation.server.ts +++ b/apps/webapp/app/v3/services/computeTemplateCreation.server.ts @@ -9,7 +9,8 @@ import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { ServiceValidationError } from "./baseService.server"; import { FailDeploymentService } from "./failDeployment.server"; import { resolveComputeAccess } from "../regionAccess.server"; -import { isOrgMigrated, parseComputeBackingMap } from "~/runEngine/concerns/computeMigration.server"; +import { isOrgMigrated } from "~/runEngine/concerns/computeMigration.server"; +import { backingForQueue, workerRegionRegistry } from "~/v3/workerRegions.server"; import { globalFlagsRegistry } from "~/v3/globalFlagsRegistry.server"; import { getEntitlement } from "~/services/platform.v3.server"; @@ -167,9 +168,11 @@ export class ComputeTemplateCreationService { // Migrated orgs route runs to the compute backing even though their stored // default is still the container region, so they need a compute template too. // shadow mode: never fail a deploy over a backing the org didn't opt into. - const backingMap = parseComputeBackingMap(env.COMPUTE_BACKING_MAP); + if (!workerRegionRegistry.isLoaded) { + await workerRegionRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS); + } const defaultQueue = project.defaultWorkerGroup?.masterQueue; - if (defaultQueue && backingMap[defaultQueue]) { + if (defaultQueue && backingForQueue(defaultQueue, workerRegionRegistry.current() ?? [])) { const planType = (await getEntitlement(project.organization.id))?.plan?.type; if (!globalFlagsRegistry.isLoaded) { await globalFlagsRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS); diff --git a/apps/webapp/app/v3/services/replayTaskRun.server.ts b/apps/webapp/app/v3/services/replayTaskRun.server.ts index 5da53106be..0884685092 100644 --- a/apps/webapp/app/v3/services/replayTaskRun.server.ts +++ b/apps/webapp/app/v3/services/replayTaskRun.server.ts @@ -7,8 +7,7 @@ import { import { type TaskRun } from "@trigger.dev/database"; import { findEnvironmentById } from "~/models/runtimeEnvironment.server"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; -import { regionForBacking } from "~/runEngine/concerns/computeMigration.server"; -import { computeBackingMap } from "~/v3/computeBackingMap.server"; +import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server"; import { logger } from "~/services/logger.server"; import { BaseService } from "./baseService.server"; import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server"; @@ -71,7 +70,7 @@ export class ReplayTaskRunService extends BaseService { const region = ignoreRegion ? undefined : overrideOptions.region ?? - regionForBacking(baseWorkerQueue(existingTaskRun.workerQueue), computeBackingMap); + regionForQueue(baseWorkerQueue(existingTaskRun.workerQueue), workerRegionRegistry.current() ?? []); try { const taskQueue = await this._prisma.taskQueue.findFirst({ diff --git a/apps/webapp/app/v3/workerRegions.server.ts b/apps/webapp/app/v3/workerRegions.server.ts index 3a64c52013..327c977d2b 100644 --- a/apps/webapp/app/v3/workerRegions.server.ts +++ b/apps/webapp/app/v3/workerRegions.server.ts @@ -45,8 +45,8 @@ export function backingForQueue(queue: string, groups: WorkerGroupRegionRow[]): /** * In-memory snapshot of every worker group's (queue, region, type, hidden), * refreshed on an interval. Read synchronously on the hot path; callers gate the - * first read on `waitUntilReady`. Replaces the COMPUTE_BACKING_MAP env var as the - * source of truth for region<->backing resolution. + * first read on `waitUntilReady`. DB-backed source of truth for region<->backing + * resolution (replaces the old env-var backing map). */ export const workerRegionRegistry = singleton("workerRegionRegistry", () => createReloadingRegistry({ diff --git a/apps/webapp/test/computeMigration.test.ts b/apps/webapp/test/computeMigration.test.ts index 018627a132..eef7ac0907 100644 --- a/apps/webapp/test/computeMigration.test.ts +++ b/apps/webapp/test/computeMigration.test.ts @@ -1,25 +1,9 @@ import { describe, it, expect } from "vitest"; import { - parseComputeBackingMap, isOrgMigrated, resolveComputeMigration, - regionForBacking, } from "~/runEngine/concerns/computeMigration.server"; -const BACKING = { "us-east-1": "us-east-1-next" }; - -describe("parseComputeBackingMap", () => { - it("parses valid JSON", () => { - expect(parseComputeBackingMap('{"us-east-1":"us-east-1-next"}')).toEqual(BACKING); - }); - it("returns {} on invalid JSON without throwing", () => { - expect(parseComputeBackingMap("not json")).toEqual({}); - }); - it("returns {} on non-string values", () => { - expect(parseComputeBackingMap('{"us-east-1":5}')).toEqual({}); - }); -}); - describe("isOrgMigrated", () => { const base = { planType: "free" as string | undefined, @@ -83,42 +67,20 @@ describe("resolveComputeMigration", () => { orgFeatureFlags: {}, flags: { computeMigrationEnabled: true, computeMigrationFreePercentage: 100 }, envType: "PRODUCTION", - backingMap: BACKING, }; - it("swaps to the compute backing for an enrolled free org", () => { - expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x" })) - .toBe("us-east-1-next"); + expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", backing: "us-east-1-next" })).toBe("us-east-1-next"); }); - it("leaves a region with no backing untouched (EU)", () => { - expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: "eu-central-1", orgId: "org_x" })) - .toBe("eu-central-1"); + it("leaves the queue unchanged when there is no backing for the region (EU)", () => { + expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: "eu-central-1", orgId: "org_x", backing: undefined })).toBe("eu-central-1"); }); it("does not migrate DEVELOPMENT", () => { - expect( - resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", envType: "DEVELOPMENT" }) - ).toBe("us-east-1"); + expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", backing: "us-east-1-next", envType: "DEVELOPMENT" })).toBe("us-east-1"); }); it("leaves a non-enrolled org untouched", () => { - expect( - resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", flags: { computeMigrationEnabled: false } }) - ).toBe("us-east-1"); + expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", backing: "us-east-1-next", flags: { computeMigrationEnabled: false } })).toBe("us-east-1"); }); it("undefined baseWorkerQueue passes through", () => { - expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: undefined, orgId: "org_x" })) - .toBeUndefined(); - }); -}); - -describe("regionForBacking", () => { - it("maps a backing to its region", () => { - expect(regionForBacking("us-east-1-next", BACKING)).toBe("us-east-1"); - }); - it("passes a non-backing queue through unchanged", () => { - expect(regionForBacking("us-east-1", BACKING)).toBe("us-east-1"); - expect(regionForBacking("eu-central-1", BACKING)).toBe("eu-central-1"); - }); - it("passes through unchanged with an empty map", () => { - expect(regionForBacking("us-east-1-next", {})).toBe("us-east-1-next"); + expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: undefined, orgId: "org_x", backing: "us-east-1-next" })).toBeUndefined(); }); }); From 222d653a78bea364ac26785ad6089e4070bf5e8a Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 22:36:36 +0100 Subject: [PATCH 17/22] feat(webapp): stamp geo region on runs, keep worker_queue raw, test-safe registries --- apps/webapp/app/entry.server.tsx | 2 + .../v3/ApiRetrieveRunPresenter.server.ts | 8 +-- .../v3/NextRunListPresenter.server.ts | 5 +- .../app/presenters/v3/SpanPresenter.server.ts | 4 +- .../resources.taskruns.$runParam.replay.ts | 5 +- .../concerns/computeMigration.server.ts | 30 +++++--- .../runEngine/services/triggerTask.server.ts | 24 ++++--- .../services/runsReplicationService.server.ts | 4 +- .../clickhouseRunsRepository.server.ts | 5 +- .../runsRepository/runsRepository.server.ts | 1 + .../app/utils/reloadingRegistry.server.ts | 70 +++++++++++-------- .../app/v3/globalFlagsRegistry.server.ts | 1 + .../app/v3/mollifier/readFallback.server.ts | 2 + apps/webapp/app/v3/querySchemas.ts | 4 +- .../app/v3/services/replayTaskRun.server.ts | 4 +- apps/webapp/app/v3/workerRegions.server.ts | 21 ++++-- apps/webapp/test/computeMigration.test.ts | 30 ++++++-- apps/webapp/test/reloadingRegistry.test.ts | 19 +++++ .../test/runsReplicationService.part1.test.ts | 5 ++ apps/webapp/test/workerRegions.test.ts | 16 ++--- .../schema/032_add_task_runs_v2_region.sql | 7 ++ internal-packages/clickhouse/src/taskRuns.ts | 4 ++ .../migration.sql | 2 + .../database/prisma/schema.prisma | 6 +- .../run-engine/src/engine/index.ts | 2 + .../run-engine/src/engine/types.ts | 1 + 26 files changed, 196 insertions(+), 86 deletions(-) create mode 100644 internal-packages/clickhouse/schema/032_add_task_runs_v2_region.sql create mode 100644 internal-packages/database/prisma/migrations/20260615220000_add_task_run_region/migration.sql diff --git a/apps/webapp/app/entry.server.tsx b/apps/webapp/app/entry.server.tsx index cd6d45ccc9..72191c1d0c 100644 --- a/apps/webapp/app/entry.server.tsx +++ b/apps/webapp/app/entry.server.tsx @@ -45,6 +45,8 @@ import { sessionsReplicationInstance } from "./services/sessionsReplicationInsta sessionsReplicationInstance; import { globalFlagsRegistry } from "./v3/globalFlagsRegistry.server"; (globalThis as Record).__globalFlagsRegistry = globalFlagsRegistry; +import { workerRegionRegistry } from "./v3/workerRegions.server"; +(globalThis as Record).__workerRegionRegistry = workerRegionRegistry; const ABORT_DELAY = 30000; diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index 80ab96df2f..ddfba79940 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -16,7 +16,6 @@ import assertNever from "assert-never"; import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions"; import { $replica, prisma } from "~/db.server"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; -import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { findRunByIdWithMollifierFallback, @@ -50,6 +49,7 @@ const commonRunSelect = { depth: true, scheduleId: true, workerQueue: true, + region: true, lockedToVersion: { select: { version: true, @@ -521,9 +521,8 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V triggerFunction: resolveTriggerFunction(run), batchId: run.batch?.friendlyId, metadata, - region: run.workerQueue - ? regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? []) - : undefined, + region: + run.region || run.workerQueue ? run.region ?? baseWorkerQueue(run.workerQueue) : undefined, }; } @@ -687,6 +686,7 @@ export function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun { // API response's `region` to undefined instead of advertising a // misleading "main" region for a not-yet-assigned buffered run). workerQueue: buffered.workerQueue ?? "", + region: buffered.region ?? "", parentTaskRun: null, rootTaskRun: null, childRuns: [], diff --git a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts index 54da35c533..a95153d3e1 100644 --- a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts @@ -12,7 +12,6 @@ import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server"; import { getTaskIdentifiers } from "~/models/task.server"; import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; -import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server"; import { machinePresetFromRun } from "~/v3/machinePresets.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus"; @@ -261,9 +260,7 @@ export class NextRunListPresenter { name: run.queue.replace("task/", ""), type: run.queue.startsWith("task/") ? "task" : "custom", }, - region: run.workerQueue - ? regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? []) - : undefined, + region: run.region || baseWorkerQueue(run.workerQueue), taskKind: RunAnnotations.safeParse(run.annotations).data?.taskKind ?? "STANDARD", }; }), diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index a652940230..a992db5577 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -16,7 +16,6 @@ import { } from "@trigger.dev/core/v3/serverOnly"; import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; -import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server"; import { logger } from "~/services/logger.server"; import { rehydrateAttribute } from "~/v3/eventRepository/eventRepository.server"; import { machinePresetFromRun } from "~/v3/machinePresets.server"; @@ -304,7 +303,7 @@ export class SpanPresenter extends BasePresenter { location: true, }, where: { - masterQueue: regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? []), + masterQueue: run.region ?? baseWorkerQueue(run.workerQueue), }, }); @@ -514,6 +513,7 @@ export class SpanPresenter extends BasePresenter { }, engine: true, workerQueue: true, + region: true, error: true, output: true, outputType: true, diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts index 3511c3b29d..fa16a163d5 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts @@ -19,7 +19,6 @@ import { } from "~/v3/mollifier/syntheticReplayTaskRun.server"; import parseDuration from "parse-duration"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; -import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server"; import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server"; import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server"; import { ReplayRunData } from "~/v3/replayTask"; @@ -53,6 +52,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) { maxDurationInSeconds: true, machinePreset: true, workerQueue: true, + region: true, ttl: true, idempotencyKey: true, runTags: true, @@ -164,6 +164,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) { maxDurationInSeconds: buffered.maxDurationInSeconds ?? null, machinePreset: buffered.machinePreset ?? null, workerQueue: buffered.workerQueue ?? null, + region: buffered.region ?? null, ttl: buffered.ttl ?? null, idempotencyKey: buffered.idempotencyKey ?? null, runTags: buffered.runTags, @@ -214,7 +215,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) { region: environment.type === "DEVELOPMENT" ? undefined - : regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? []), + : run.region ?? baseWorkerQueue(run.workerQueue), regions: regionsResult.regions, ttlSeconds: run.ttl ? parseDuration(run.ttl, "s") ?? undefined : undefined, idempotencyKey: run.idempotencyKey, diff --git a/apps/webapp/app/runEngine/concerns/computeMigration.server.ts b/apps/webapp/app/runEngine/concerns/computeMigration.server.ts index fb0193fc8c..c885a80356 100644 --- a/apps/webapp/app/runEngine/concerns/computeMigration.server.ts +++ b/apps/webapp/app/runEngine/concerns/computeMigration.server.ts @@ -46,24 +46,32 @@ export function isOrgMigrated({ type ResolveInput = MigrationDecisionInput & { baseWorkerQueue: string | undefined; + baseEnableFastPath: boolean; + region: string | undefined; // geo of the base queue (same whether migrated or not) + backing: { workerQueue: string; enableFastPath: boolean } | undefined; envType: string; - backing: string | undefined; // the compute backing for this queue's region, or undefined }; /** - * Rewrite the resolved worker queue to its compute backing when the org is - * migrated and the region has a backing. Same-geo swap (us-east-1 -> us-east-1-next): - * any explicit placement is a geography preference, honored by staying in-region. - * Applied after region resolution, mirroring the scheduled-split. + * Produce the target descriptor `{ workerQueue, region, enableFastPath }` for a + * run. When the org is migrated and the region has a compute backing, the queue + * and fast-path setting come from the MICROVM backing group; `region` is the geo + * either way. Same-geo swap (us-east-1 -> us-east-1-next): any explicit placement + * is a geography preference, honored by staying in-region. Applied after region + * resolution, mirroring the scheduled-split. */ export function resolveComputeMigration({ baseWorkerQueue, - envType, + baseEnableFastPath, + region, backing, + envType, ...decision -}: ResolveInput): string | undefined { - if (baseWorkerQueue === undefined) return baseWorkerQueue; - if (envType === "DEVELOPMENT") return baseWorkerQueue; - if (!isOrgMigrated(decision)) return baseWorkerQueue; - return backing ?? baseWorkerQueue; +}: ResolveInput): { workerQueue: string | undefined; region: string | undefined; enableFastPath: boolean } { + const passthrough = { workerQueue: baseWorkerQueue, region, enableFastPath: baseEnableFastPath }; + if (baseWorkerQueue === undefined) return passthrough; + if (envType === "DEVELOPMENT") return passthrough; + if (!isOrgMigrated(decision)) return passthrough; + if (!backing) return passthrough; + return { workerQueue: backing.workerQueue, region, enableFastPath: backing.enableFastPath }; } diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index c5b513d0de..0e819fba49 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -39,7 +39,7 @@ import { workerQueueForRun, } from "../concerns/workerQueueSplit.server"; import { resolveComputeMigration } from "../concerns/computeMigration.server"; -import { workerRegionRegistry, backingForQueue } from "~/v3/workerRegions.server"; +import { workerRegionRegistry, backingForQueue, regionForQueue } from "~/v3/workerRegions.server"; import { globalFlagsRegistry } from "~/v3/globalFlagsRegistry.server"; import { publishClaim as publishMollifierClaim, @@ -372,14 +372,18 @@ export class RunEngineTriggerTaskService { await workerRegionRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS); } const workerGroups = workerRegionRegistry.current() ?? []; - const migratedWorkerQueue = resolveComputeMigration({ + const region = baseWorkerQueue ? regionForQueue(baseWorkerQueue, workerGroups) : undefined; + const backing = baseWorkerQueue ? backingForQueue(baseWorkerQueue, workerGroups) : undefined; + const migrated = resolveComputeMigration({ baseWorkerQueue, + baseEnableFastPath: enableFastPath, + region, + backing, planType, orgId: environment.organization.id, orgFeatureFlags: environment.organization.featureFlags as Record | null, flags: globalFlagsRegistry.current(), envType: environment.type, - backing: baseWorkerQueue ? backingForQueue(baseWorkerQueue, workerGroups) : undefined, }); // Build annotations for this run @@ -410,13 +414,13 @@ export class RunEngineTriggerTaskService { globalDefault: env.TRIGGER_WORKER_QUEUE_SCHEDULED_SPLIT_ENABLED === "1", }); const workerQueue = - migratedWorkerQueue !== undefined + migrated.workerQueue !== undefined ? workerQueueForRun({ - workerQueue: migratedWorkerQueue, + workerQueue: migrated.workerQueue, rootTriggerSource: annotations.rootTriggerSource, splitEnabled: scheduledQueueSplitEnabled, }) - : migratedWorkerQueue; + : migrated.workerQueue; try { return await this.traceEventConcern.traceRun( @@ -515,7 +519,8 @@ export class RunEngineTriggerTaskService { queueName, lockedQueueId, workerQueue, - enableFastPath, + region: migrated.region, + enableFastPath: migrated.enableFastPath, lockedToBackgroundWorker: lockedToBackgroundWorker ?? undefined, delayUntil, ttl, @@ -593,7 +598,8 @@ export class RunEngineTriggerTaskService { queueName, lockedQueueId, workerQueue, - enableFastPath, + region: migrated.region, + enableFastPath: migrated.enableFastPath, lockedToBackgroundWorker: lockedToBackgroundWorker ?? undefined, delayUntil, ttl, @@ -742,6 +748,7 @@ export class RunEngineTriggerTaskService { queueName: string; lockedQueueId?: string; workerQueue?: string; + region?: string; enableFastPath: boolean; lockedToBackgroundWorker?: { id: string; version: string; sdkVersion: string; cliVersion: string }; delayUntil?: Date; @@ -795,6 +802,7 @@ export class RunEngineTriggerTaskService { queue: args.queueName, lockedQueueId: args.lockedQueueId, workerQueue: args.workerQueue, + region: args.region, enableFastPath: args.enableFastPath, isTest: args.body.options?.test ?? false, delayUntil: args.delayUntil, diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index b9f699d9c2..06223c488f 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -39,7 +39,6 @@ import pLimit from "p-limit"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; import { calculateErrorFingerprint } from "~/utils/errorFingerprinting"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; -import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server"; import { isClickHouseJsonParseError, parseRowNumberFromError, @@ -1123,7 +1122,8 @@ export class RunsReplicationService { event === "delete" ? 1 : 0, // _is_deleted run.concurrencyKey ?? "", // concurrency_key run.bulkActionGroupIds ?? [], // bulk_action_group_ids - regionForQueue(baseWorkerQueue(run.masterQueue ?? ""), workerRegionRegistry.current() ?? []), // worker_queue (geo region; strip `:scheduled` and hide the compute backing - customers query this column) + baseWorkerQueue(run.masterQueue ?? ""), // worker_queue (raw - operators slice by this) + run.region ?? "", // region (geo for customers) run.maxDurationInSeconds ?? null, // max_duration_in_seconds annotations?.triggerSource ?? "", // trigger_source annotations?.rootTriggerSource ?? "", // root_trigger_source diff --git a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts index 304777b39e..88e792b4a4 100644 --- a/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts @@ -204,6 +204,7 @@ export class ClickHouseRunsRepository implements IRunsRepository { machinePreset: true, queue: true, workerQueue: true, + region: true, annotations: true, }, }); @@ -377,7 +378,9 @@ function applyRunFiltersToQueryBuilder( } if (options.regions && options.regions.length > 0) { - queryBuilder.where("worker_queue IN {regions: Array(String)}", { regions: options.regions }); + queryBuilder.where("if(region != '', region, worker_queue) IN {regions: Array(String)}", { + regions: options.regions, + }); } if (options.machines && options.machines.length > 0) { diff --git a/apps/webapp/app/services/runsRepository/runsRepository.server.ts b/apps/webapp/app/services/runsRepository/runsRepository.server.ts index 74963bc3ff..d40e5cf8c0 100644 --- a/apps/webapp/app/services/runsRepository/runsRepository.server.ts +++ b/apps/webapp/app/services/runsRepository/runsRepository.server.ts @@ -108,6 +108,7 @@ export type ListedRun = Prisma.TaskRunGetPayload<{ machinePreset: true; queue: true; workerQueue: true; + region: true; annotations: true; }; }>; diff --git a/apps/webapp/app/utils/reloadingRegistry.server.ts b/apps/webapp/app/utils/reloadingRegistry.server.ts index 6aa1388d6d..431557f60b 100644 --- a/apps/webapp/app/utils/reloadingRegistry.server.ts +++ b/apps/webapp/app/utils/reloadingRegistry.server.ts @@ -36,6 +36,8 @@ export type ReloadingRegistryOptions = { intervalMs: number; /** Startup retry config; defaults to forever with backoff. */ retry?: { retries?: number }; + /** Start the background load + interval at construction. Default true; set false to keep inert (e.g. tests). */ + autoStart?: boolean; }; /** @@ -48,6 +50,7 @@ export type ReloadingRegistryOptions = { export function createReloadingRegistry(opts: ReloadingRegistryOptions): ReloadingRegistry { let snapshot: T | undefined; let loaded = false; + let started = false; let loadSeq = 0; let resolveReady!: () => void; const isReady = new Promise((resolve) => { @@ -66,41 +69,50 @@ export function createReloadingRegistry(opts: ReloadingRegistryOptions): R } } - const startup = pRetry(() => doLoad(), { - forever: opts.retry?.retries === undefined, - retries: opts.retry?.retries, - minTimeout: 1_000, - maxTimeout: 60_000, - factor: 2, - onFailedAttempt: (error) => { - loadFailures.inc({ name: opts.name }); - logger.warn("[ReloadingRegistry] startup load failed, retrying", { - name: opts.name, - attemptNumber: error.attemptNumber, - retriesLeft: error.retriesLeft, - error: error.message, - }); - }, - }); - startup.catch((err) => { - logger.error("[ReloadingRegistry] startup load gave up", { - name: opts.name, - error: err instanceof Error ? err.message : String(err), - }); - }); + let interval: ReturnType | undefined; - const interval = setInterval(() => { - doLoad().catch((err) => { - loadFailures.inc({ name: opts.name }); - logger.warn("[ReloadingRegistry] reload failed", { + if (opts.autoStart !== false) { + started = true; + + const startup = pRetry(() => doLoad(), { + forever: opts.retry?.retries === undefined, + retries: opts.retry?.retries, + minTimeout: 1_000, + maxTimeout: 60_000, + factor: 2, + onFailedAttempt: (error) => { + loadFailures.inc({ name: opts.name }); + logger.warn("[ReloadingRegistry] startup load failed, retrying", { + name: opts.name, + attemptNumber: error.attemptNumber, + retriesLeft: error.retriesLeft, + error: error.message, + }); + }, + }); + startup.catch((err) => { + logger.error("[ReloadingRegistry] startup load gave up", { name: opts.name, error: err instanceof Error ? err.message : String(err), }); }); - }, opts.intervalMs); + + interval = setInterval(() => { + doLoad().catch((err) => { + loadFailures.inc({ name: opts.name }); + logger.warn("[ReloadingRegistry] reload failed", { + name: opts.name, + error: err instanceof Error ? err.message : String(err), + }); + }); + }, opts.intervalMs); + interval.unref(); // never keep the process alive; SIGTERM still clears it + } else { + resolveReady(); // inert: any direct `await isReady` resolves immediately + } function stop() { - clearInterval(interval); + if (interval) clearInterval(interval); } signalsEmitter.on("SIGTERM", stop); signalsEmitter.on("SIGINT", stop); @@ -113,7 +125,7 @@ export function createReloadingRegistry(opts: ReloadingRegistryOptions): R current: () => snapshot, reload: doLoad, async waitUntilReady(timeoutMs: number) { - if (loaded || timeoutMs <= 0) return; + if (!started || loaded || timeoutMs <= 0) return; let timer: ReturnType | undefined; try { await Promise.race([ diff --git a/apps/webapp/app/v3/globalFlagsRegistry.server.ts b/apps/webapp/app/v3/globalFlagsRegistry.server.ts index d99c94317a..0fd4a0cb50 100644 --- a/apps/webapp/app/v3/globalFlagsRegistry.server.ts +++ b/apps/webapp/app/v3/globalFlagsRegistry.server.ts @@ -14,6 +14,7 @@ export const globalFlagsRegistry = singleton("globalFlagsRegistry", () => createReloadingRegistry>({ name: "global-flags", intervalMs: env.GLOBAL_FLAGS_RELOAD_INTERVAL_MS, + autoStart: process.env.NODE_ENV !== "test", // only auto-poll outside tests load: () => flags(), }) ); diff --git a/apps/webapp/app/v3/mollifier/readFallback.server.ts b/apps/webapp/app/v3/mollifier/readFallback.server.ts index 21dd6c2395..d684741219 100644 --- a/apps/webapp/app/v3/mollifier/readFallback.server.ts +++ b/apps/webapp/app/v3/mollifier/readFallback.server.ts @@ -85,6 +85,7 @@ export type SyntheticRun = { runtimeEnvironmentId: string | undefined; engine: "V2"; workerQueue: string | undefined; + region: string | undefined; queue: string | undefined; concurrencyKey: string | undefined; machinePreset: string | undefined; @@ -222,6 +223,7 @@ export async function findRunByIdWithMollifierFallback( asString(environment?.id) ?? entry.envId, engine: "V2", workerQueue: asString(snapshot.workerQueue), + region: asString(snapshot.region), queue: asString(snapshot.queue), concurrencyKey: asString(snapshot.concurrencyKey), machinePreset: asString(snapshot.machine), diff --git a/apps/webapp/app/v3/querySchemas.ts b/apps/webapp/app/v3/querySchemas.ts index 947e6e8e46..776cb5f01a 100644 --- a/apps/webapp/app/v3/querySchemas.ts +++ b/apps/webapp/app/v3/querySchemas.ts @@ -192,12 +192,12 @@ export const runsSchema: TableSchema = { }, region: { name: "region", - clickhouseName: "worker_queue", + clickhouseName: "region", ...column("String", { description: "Region", example: "us-east-1", }), - expression: "if(startsWith(worker_queue, 'cm'), NULL, worker_queue)", + expression: "multiIf(region != '', region, startsWith(worker_queue, 'cm'), NULL, worker_queue)", }, // Timing diff --git a/apps/webapp/app/v3/services/replayTaskRun.server.ts b/apps/webapp/app/v3/services/replayTaskRun.server.ts index 0884685092..11ba42b558 100644 --- a/apps/webapp/app/v3/services/replayTaskRun.server.ts +++ b/apps/webapp/app/v3/services/replayTaskRun.server.ts @@ -7,7 +7,6 @@ import { import { type TaskRun } from "@trigger.dev/database"; import { findEnvironmentById } from "~/models/runtimeEnvironment.server"; import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; -import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server"; import { logger } from "~/services/logger.server"; import { BaseService } from "./baseService.server"; import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server"; @@ -70,7 +69,8 @@ export class ReplayTaskRunService extends BaseService { const region = ignoreRegion ? undefined : overrideOptions.region ?? - regionForQueue(baseWorkerQueue(existingTaskRun.workerQueue), workerRegionRegistry.current() ?? []); + existingTaskRun.region ?? + baseWorkerQueue(existingTaskRun.workerQueue); try { const taskQueue = await this._prisma.taskQueue.findFirst({ diff --git a/apps/webapp/app/v3/workerRegions.server.ts b/apps/webapp/app/v3/workerRegions.server.ts index 327c977d2b..e925af11af 100644 --- a/apps/webapp/app/v3/workerRegions.server.ts +++ b/apps/webapp/app/v3/workerRegions.server.ts @@ -9,6 +9,7 @@ export type WorkerGroupRegionRow = { region: string | null; workloadType: WorkloadType; hidden: boolean; + enableFastPath: boolean; }; /** @@ -26,9 +27,13 @@ export function regionForQueue(queue: string, groups: WorkerGroupRegionRow[]): s * Forward map: the compute (MICROVM) backing queue for the region that `queue` * belongs to, or undefined if the region has no compute backing. `queue` is the * resolved (container) worker queue; we look up its region, then find a visible - * MICROVM group in the same region. + * MICROVM group in the same region. Returns the backing group's queue and its + * `enableFastPath` so the caller adopts the backing's fast-path setting. */ -export function backingForQueue(queue: string, groups: WorkerGroupRegionRow[]): string | undefined { +export function backingForQueue( + queue: string, + groups: WorkerGroupRegionRow[] +): { workerQueue: string; enableFastPath: boolean } | undefined { const self = groups.find((g) => g.masterQueue === queue); const region = self?.region; if (!region) return undefined; @@ -39,7 +44,8 @@ export function backingForQueue(queue: string, groups: WorkerGroupRegionRow[]): !g.hidden && g.masterQueue !== queue ); - return backing?.masterQueue; + if (!backing) return undefined; + return { workerQueue: backing.masterQueue, enableFastPath: backing.enableFastPath }; } /** @@ -52,9 +58,16 @@ export const workerRegionRegistry = singleton("workerRegionRegistry", () => createReloadingRegistry({ name: "worker-region", intervalMs: env.GLOBAL_FLAGS_RELOAD_INTERVAL_MS, + autoStart: process.env.NODE_ENV !== "test", // only auto-poll outside tests load: () => prisma.workerInstanceGroup.findMany({ - select: { masterQueue: true, region: true, workloadType: true, hidden: true }, + select: { + masterQueue: true, + region: true, + workloadType: true, + hidden: true, + enableFastPath: true, + }, }), }) ); diff --git a/apps/webapp/test/computeMigration.test.ts b/apps/webapp/test/computeMigration.test.ts index eef7ac0907..0c914f2e2a 100644 --- a/apps/webapp/test/computeMigration.test.ts +++ b/apps/webapp/test/computeMigration.test.ts @@ -67,20 +67,40 @@ describe("resolveComputeMigration", () => { orgFeatureFlags: {}, flags: { computeMigrationEnabled: true, computeMigrationFreePercentage: 100 }, envType: "PRODUCTION", + baseEnableFastPath: false, + region: "us-east-1", }; + const backing = { workerQueue: "us-east-1-next", enableFastPath: true }; + it("swaps to the compute backing for an enrolled free org", () => { - expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", backing: "us-east-1-next" })).toBe("us-east-1-next"); + expect( + resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", backing }) + ).toEqual({ workerQueue: "us-east-1-next", region: "us-east-1", enableFastPath: true }); }); it("leaves the queue unchanged when there is no backing for the region (EU)", () => { - expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: "eu-central-1", orgId: "org_x", backing: undefined })).toBe("eu-central-1"); + expect( + resolveComputeMigration({ + ...enrolled, + baseWorkerQueue: "eu-central-1", + region: "eu-central-1", + orgId: "org_x", + backing: undefined, + }) + ).toEqual({ workerQueue: "eu-central-1", region: "eu-central-1", enableFastPath: false }); }); it("does not migrate DEVELOPMENT", () => { - expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", backing: "us-east-1-next", envType: "DEVELOPMENT" })).toBe("us-east-1"); + expect( + resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", backing, envType: "DEVELOPMENT" }) + ).toEqual({ workerQueue: "us-east-1", region: "us-east-1", enableFastPath: false }); }); it("leaves a non-enrolled org untouched", () => { - expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", backing: "us-east-1-next", flags: { computeMigrationEnabled: false } })).toBe("us-east-1"); + expect( + resolveComputeMigration({ ...enrolled, baseWorkerQueue: "us-east-1", orgId: "org_x", backing, flags: { computeMigrationEnabled: false } }) + ).toEqual({ workerQueue: "us-east-1", region: "us-east-1", enableFastPath: false }); }); it("undefined baseWorkerQueue passes through", () => { - expect(resolveComputeMigration({ ...enrolled, baseWorkerQueue: undefined, orgId: "org_x", backing: "us-east-1-next" })).toBeUndefined(); + expect( + resolveComputeMigration({ ...enrolled, baseWorkerQueue: undefined, region: undefined, orgId: "org_x", backing }) + ).toEqual({ workerQueue: undefined, region: undefined, enableFastPath: false }); }); }); diff --git a/apps/webapp/test/reloadingRegistry.test.ts b/apps/webapp/test/reloadingRegistry.test.ts index 8a11bed60d..4b01f34a01 100644 --- a/apps/webapp/test/reloadingRegistry.test.ts +++ b/apps/webapp/test/reloadingRegistry.test.ts @@ -86,6 +86,25 @@ describe("createReloadingRegistry", () => { reg.stop(); }); + it("autoStart:false stays inert and non-blocking", async () => { + let loadCalls = 0; + const reg = createReloadingRegistry({ + name: "test-inert", + intervalMs: 10_000, + autoStart: false, + load: async () => { + loadCalls++; + return 1; + }, + }); + expect(reg.isLoaded).toBe(false); + expect(reg.current()).toBeUndefined(); + await reg.waitUntilReady(10_000); // must resolve ~immediately, not wait 10s + expect(reg.isLoaded).toBe(false); + expect(loadCalls).toBe(0); // never hit the DB/load + reg.stop(); + }); + it("waitUntilReady clears its timeout when ready wins", async () => { const clearSpy = vi.spyOn(global, "clearTimeout"); // load resolves only when the test releases it, so waitUntilReady runs the diff --git a/apps/webapp/test/runsReplicationService.part1.test.ts b/apps/webapp/test/runsReplicationService.part1.test.ts index 5a085944a6..9fe4402b54 100644 --- a/apps/webapp/test/runsReplicationService.part1.test.ts +++ b/apps/webapp/test/runsReplicationService.part1.test.ts @@ -82,6 +82,8 @@ describe("RunsReplicationService (part 1/7)", () => { traceId: "1234", spanId: "1234", queue: "test", + workerQueue: "us-east-1-next", + region: "us-east-1", runtimeEnvironmentId: runtimeEnvironment.id, projectId: project.id, organizationId: organization.id, @@ -121,6 +123,9 @@ describe("RunsReplicationService (part 1/7)", () => { trigger_source: "api", root_trigger_source: "dashboard", is_warm_start: 1, + // worker_queue stays the raw backing (operators); region is the geo (customers) + worker_queue: "us-east-1-next", + region: "us-east-1", }) ); diff --git a/apps/webapp/test/workerRegions.test.ts b/apps/webapp/test/workerRegions.test.ts index 6e80de6f2c..7befdfee84 100644 --- a/apps/webapp/test/workerRegions.test.ts +++ b/apps/webapp/test/workerRegions.test.ts @@ -2,9 +2,9 @@ import { describe, it, expect } from "vitest"; import { regionForQueue, backingForQueue, type WorkerGroupRegionRow } from "~/v3/workerRegions.server"; const groups: WorkerGroupRegionRow[] = [ - { masterQueue: "us-east-1", region: "us-east-1", workloadType: "CONTAINER", hidden: false }, - { masterQueue: "us-east-1-next", region: "us-east-1", workloadType: "MICROVM", hidden: false }, - { masterQueue: "eu-central-1", region: "eu-central-1", workloadType: "CONTAINER", hidden: false }, + { masterQueue: "us-east-1", region: "us-east-1", workloadType: "CONTAINER", hidden: false, enableFastPath: false }, + { masterQueue: "us-east-1-next", region: "us-east-1", workloadType: "MICROVM", hidden: false, enableFastPath: true }, + { masterQueue: "eu-central-1", region: "eu-central-1", workloadType: "CONTAINER", hidden: false, enableFastPath: false }, ]; describe("regionForQueue", () => { @@ -18,24 +18,24 @@ describe("regionForQueue", () => { expect(regionForQueue("mystery", groups)).toBe("mystery"); }); it("passes through when a group has no region set", () => { - expect(regionForQueue("x", [{ masterQueue: "x", region: null, workloadType: "CONTAINER", hidden: false }])).toBe("x"); + expect(regionForQueue("x", [{ masterQueue: "x", region: null, workloadType: "CONTAINER", hidden: false, enableFastPath: false }])).toBe("x"); }); }); describe("backingForQueue", () => { it("finds the MICROVM backing for a region with one", () => { - expect(backingForQueue("us-east-1", groups)).toBe("us-east-1-next"); + expect(backingForQueue("us-east-1", groups)).toEqual({ workerQueue: "us-east-1-next", enableFastPath: true }); }); it("returns undefined for a region with no compute backing (EU)", () => { expect(backingForQueue("eu-central-1", groups)).toBeUndefined(); }); it("returns undefined when the queue's group has no region", () => { - expect(backingForQueue("x", [{ masterQueue: "x", region: null, workloadType: "CONTAINER", hidden: false }])).toBeUndefined(); + expect(backingForQueue("x", [{ masterQueue: "x", region: null, workloadType: "CONTAINER", hidden: false, enableFastPath: false }])).toBeUndefined(); }); it("ignores hidden MICROVM groups", () => { const g: WorkerGroupRegionRow[] = [ - { masterQueue: "us-east-1", region: "us-east-1", workloadType: "CONTAINER", hidden: false }, - { masterQueue: "us-east-1-next", region: "us-east-1", workloadType: "MICROVM", hidden: true }, + { masterQueue: "us-east-1", region: "us-east-1", workloadType: "CONTAINER", hidden: false, enableFastPath: false }, + { masterQueue: "us-east-1-next", region: "us-east-1", workloadType: "MICROVM", hidden: true, enableFastPath: true }, ]; expect(backingForQueue("us-east-1", g)).toBeUndefined(); }); diff --git a/internal-packages/clickhouse/schema/032_add_task_runs_v2_region.sql b/internal-packages/clickhouse/schema/032_add_task_runs_v2_region.sql new file mode 100644 index 0000000000..ed55cada85 --- /dev/null +++ b/internal-packages/clickhouse/schema/032_add_task_runs_v2_region.sql @@ -0,0 +1,7 @@ +-- +goose Up +ALTER TABLE trigger_dev.task_runs_v2 +ADD COLUMN IF NOT EXISTS region String DEFAULT ''; + +-- +goose Down +ALTER TABLE trigger_dev.task_runs_v2 +DROP COLUMN IF EXISTS region; diff --git a/internal-packages/clickhouse/src/taskRuns.ts b/internal-packages/clickhouse/src/taskRuns.ts index 2201f4baf6..633f6e668b 100644 --- a/internal-packages/clickhouse/src/taskRuns.ts +++ b/internal-packages/clickhouse/src/taskRuns.ts @@ -48,6 +48,7 @@ export const TaskRunV2 = z.object({ concurrency_key: z.string().default(""), bulk_action_group_ids: z.array(z.string()).default([]), worker_queue: z.string().default(""), + region: z.string().default(""), max_duration_in_seconds: z.number().int().nullish(), trigger_source: z.string().default(""), root_trigger_source: z.string().default(""), @@ -108,6 +109,7 @@ export const TASK_RUN_COLUMNS = [ "concurrency_key", "bulk_action_group_ids", "worker_queue", + "region", "max_duration_in_seconds", "trigger_source", "root_trigger_source", @@ -175,6 +177,7 @@ export type TaskRunFieldTypes = { concurrency_key: string; bulk_action_group_ids: string[]; worker_queue: string; + region: string; max_duration_in_seconds: number | null; trigger_source: string; root_trigger_source: string; @@ -313,6 +316,7 @@ export type TaskRunInsertArray = [ concurrency_key: string, bulk_action_group_ids: string[], worker_queue: string, + region: string, max_duration_in_seconds: number | null, trigger_source: string, root_trigger_source: string, diff --git a/internal-packages/database/prisma/migrations/20260615220000_add_task_run_region/migration.sql b/internal-packages/database/prisma/migrations/20260615220000_add_task_run_region/migration.sql new file mode 100644 index 0000000000..e30bb37484 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260615220000_add_task_run_region/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "TaskRun" ADD COLUMN IF NOT EXISTS "region" TEXT; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 89090ffc2d..2aeb27e303 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -915,6 +915,9 @@ model TaskRun { /// The main queue that this run is part of workerQueue String @default("main") @map("masterQueue") + /// User-facing geo region, stamped at trigger; workerQueue is where it actually ran. + region String? + /// @deprecated secondaryMasterQueue String? @@ -1532,8 +1535,7 @@ model WorkerInstanceGroup { workloadType WorkloadType @default(CONTAINER) - /// Geo region (e.g. "us-east-1"); container + MICROVM groups for one geo share it. - /// Set-once while the group has runs - changing it breaks region resolution for them. + /// Geo region; container + MICROVM groups for one geo share it. Set-once once it has runs. region String? /// When true, runs enqueued to this worker queue may skip the intermediate queue diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 2b434a86ee..84941560a5 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -689,6 +689,7 @@ export class RunEngine { cliVersion, concurrencyKey, workerQueue, + region, enableFastPath, queue, lockedQueueId, @@ -857,6 +858,7 @@ export class RunEngine { queue, lockedQueueId, workerQueue, + region, isTest, delayUntil, queuedAt, diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 0077478318..98c722e8c7 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -247,6 +247,7 @@ export type TriggerParams = { cliVersion?: string; concurrencyKey?: string; workerQueue?: string; + region?: string; /** When true, the run queue may push directly to the worker queue if concurrency is available. * Gated per WorkerInstanceGroup (production) or always true (development). */ enableFastPath?: boolean; From b75e18ad6b150d620d3e99152968b3571c1a0d67 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 23:06:57 +0100 Subject: [PATCH 18/22] fix(webapp): fail-open entitlement lookup in migration mode, harden region fallback --- .../v3/ApiRetrieveRunPresenter.server.ts | 2 +- .../app/presenters/v3/SpanPresenter.server.ts | 2 +- .../resources.taskruns.$runParam.replay.ts | 2 +- .../computeTemplateCreation.server.ts | 22 +++++++++++++++---- .../app/v3/services/replayTaskRun.server.ts | 4 ++-- .../clickhouse/src/taskRuns.test.ts | 14 +++++++++--- 6 files changed, 34 insertions(+), 12 deletions(-) diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index ddfba79940..ba5e8e919e 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -522,7 +522,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V batchId: run.batch?.friendlyId, metadata, region: - run.region || run.workerQueue ? run.region ?? baseWorkerQueue(run.workerQueue) : undefined, + run.region || run.workerQueue ? run.region || baseWorkerQueue(run.workerQueue) : undefined, }; } diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index a992db5577..898b3bec60 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -303,7 +303,7 @@ export class SpanPresenter extends BasePresenter { location: true, }, where: { - masterQueue: run.region ?? baseWorkerQueue(run.workerQueue), + masterQueue: run.region || baseWorkerQueue(run.workerQueue), }, }); diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts index fa16a163d5..6c29ac55d4 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts @@ -215,7 +215,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) { region: environment.type === "DEVELOPMENT" ? undefined - : run.region ?? baseWorkerQueue(run.workerQueue), + : run.region || baseWorkerQueue(run.workerQueue), regions: regionsResult.regions, ttlSeconds: run.ttl ? parseDuration(run.ttl, "s") ?? undefined : undefined, idempotencyKey: run.idempotencyKey, diff --git a/apps/webapp/app/v3/services/computeTemplateCreation.server.ts b/apps/webapp/app/v3/services/computeTemplateCreation.server.ts index 555f320410..16968b01d4 100644 --- a/apps/webapp/app/v3/services/computeTemplateCreation.server.ts +++ b/apps/webapp/app/v3/services/computeTemplateCreation.server.ts @@ -173,16 +173,30 @@ export class ComputeTemplateCreationService { } const defaultQueue = project.defaultWorkerGroup?.masterQueue; if (defaultQueue && backingForQueue(defaultQueue, workerRegionRegistry.current() ?? [])) { - const planType = (await getEntitlement(project.organization.id))?.plan?.type; if (!globalFlagsRegistry.isLoaded) { await globalFlagsRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS); } - const migrated = isOrgMigrated({ - planType, + const decision = { orgId: project.organization.id, orgFeatureFlags: project.organization.featureFlags as Record | null, flags: globalFlagsRegistry.current(), - }); + }; + // Per-org override needs no plan; only the percentage path does. So skip the + // external entitlement lookup unless it could matter, and degrade gracefully + // if it throws - a shadow-template check must never fail a deploy. + let migrated = isOrgMigrated({ ...decision, planType: undefined }); + if (!migrated && (decision.flags?.computeMigrationEnabled ?? false)) { + let planType: string | undefined; + try { + planType = (await getEntitlement(project.organization.id))?.plan?.type; + } catch (error) { + logger.warn("compute migration: entitlement lookup failed; skipping shadow template", { + organizationId: project.organization.id, + error: error instanceof Error ? error.message : String(error), + }); + } + migrated = isOrgMigrated({ ...decision, planType }); + } if (migrated) { return "shadow"; } diff --git a/apps/webapp/app/v3/services/replayTaskRun.server.ts b/apps/webapp/app/v3/services/replayTaskRun.server.ts index 11ba42b558..ed2292e32b 100644 --- a/apps/webapp/app/v3/services/replayTaskRun.server.ts +++ b/apps/webapp/app/v3/services/replayTaskRun.server.ts @@ -68,8 +68,8 @@ export class ReplayTaskRunService extends BaseService { authenticatedEnvironment.type === "DEVELOPMENT"; const region = ignoreRegion ? undefined - : overrideOptions.region ?? - existingTaskRun.region ?? + : overrideOptions.region || + existingTaskRun.region || baseWorkerQueue(existingTaskRun.workerQueue); try { diff --git a/internal-packages/clickhouse/src/taskRuns.test.ts b/internal-packages/clickhouse/src/taskRuns.test.ts index 76c2267b07..72eab055ee 100644 --- a/internal-packages/clickhouse/src/taskRuns.test.ts +++ b/internal-packages/clickhouse/src/taskRuns.test.ts @@ -83,6 +83,7 @@ describe("Task Runs V2", () => { "concurrency_key_1234", // concurrency_key ["bulk_action_group_id_1234", "bulk_action_group_id_1235"], // bulk_action_group_ids "", // worker_queue + "", // region null, // max_duration_in_seconds "", // trigger_source "", // root_trigger_source @@ -215,6 +216,7 @@ describe("Task Runs V2", () => { "", // concurrency_key [], // bulk_action_group_ids "", // worker_queue + "", // region null, // max_duration_in_seconds "", // trigger_source "", // root_trigger_source @@ -270,6 +272,7 @@ describe("Task Runs V2", () => { "", // concurrency_key [], // bulk_action_group_ids "", // worker_queue + "", // region null, // max_duration_in_seconds "", // trigger_source "", // root_trigger_source @@ -372,6 +375,7 @@ describe("Task Runs V2", () => { "", // concurrency_key [], // bulk_action_group_ids "", // worker_queue + "", // region null, // max_duration_in_seconds "", // trigger_source "", // root_trigger_source @@ -482,6 +486,7 @@ describe("Task Runs V2", () => { "", // concurrency_key [], // bulk_action_group_ids "", // worker_queue + "", // region null, // max_duration_in_seconds "", // trigger_source "", // root_trigger_source @@ -536,7 +541,8 @@ describe("Task Runs V2", () => { 0, "", [], - "", + "", // worker_queue + "", // region null, "", "", @@ -597,7 +603,8 @@ describe("Task Runs V2", () => { 0, "", [], - "", + "", // worker_queue + "", // region null, "", "", @@ -652,7 +659,8 @@ describe("Task Runs V2", () => { 0, "", [], - "", + "", // worker_queue + "", // region null, "", "", From ea9a07155a85cd6eda936535e65d59426561fed9 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 23:15:19 +0100 Subject: [PATCH 19/22] docs(webapp): trim server-changes note to one behavior-level line --- .server-changes/plan-aware-compute-migration.md | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/.server-changes/plan-aware-compute-migration.md b/.server-changes/plan-aware-compute-migration.md index 9fe0813896..bc42ccc647 100644 --- a/.server-changes/plan-aware-compute-migration.md +++ b/.server-changes/plan-aware-compute-migration.md @@ -3,8 +3,4 @@ area: webapp type: feature --- -Gradually route a configurable percentage of free (then paid) organizations onto -the compute backing at trigger time, with a per-org exclusion and an admin kill -switch. Controlled by the `computeMigrationEnabled`, `computeMigrationFreePercentage`, -and `computeMigrationPaidPercentage` feature flags and the `COMPUTE_BACKING_MAP` -env var. +Gradually roll out a new run execution backend to a configurable percentage of organizations. From e3524e9c91acbd00975f5ea898cc9541221ec358 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 23:24:37 +0100 Subject: [PATCH 20/22] refactor(webapp): centralize display-region fallback in regionForDisplay --- .../presenters/v3/ApiRetrieveRunPresenter.server.ts | 5 ++--- .../presenters/v3/NextRunListPresenter.server.ts | 4 ++-- .../routes/resources.taskruns.$runParam.replay.ts | 4 ++-- .../runEngine/concerns/workerQueueSplit.server.ts | 13 +++++++++++++ 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index ba5e8e919e..fec8dabdb0 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -15,7 +15,7 @@ import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/databa import assertNever from "assert-never"; import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions"; import { $replica, prisma } from "~/db.server"; -import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; +import { regionForDisplay } from "~/runEngine/concerns/workerQueueSplit.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { findRunByIdWithMollifierFallback, @@ -521,8 +521,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V triggerFunction: resolveTriggerFunction(run), batchId: run.batch?.friendlyId, metadata, - region: - run.region || run.workerQueue ? run.region || baseWorkerQueue(run.workerQueue) : undefined, + region: regionForDisplay(run.region, run.workerQueue), }; } diff --git a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts index a95153d3e1..3594aa71ce 100644 --- a/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts @@ -11,7 +11,7 @@ import { timeFilters } from "~/components/runs/v3/SharedFilters"; import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server"; import { getTaskIdentifiers } from "~/models/task.server"; import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; -import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; +import { regionForDisplay } from "~/runEngine/concerns/workerQueueSplit.server"; import { machinePresetFromRun } from "~/v3/machinePresets.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus"; @@ -260,7 +260,7 @@ export class NextRunListPresenter { name: run.queue.replace("task/", ""), type: run.queue.startsWith("task/") ? "task" : "custom", }, - region: run.region || baseWorkerQueue(run.workerQueue), + region: regionForDisplay(run.region, run.workerQueue), taskKind: RunAnnotations.safeParse(run.annotations).data?.taskKind ?? "STANDARD", }; }), diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts index 6c29ac55d4..03bfdaccc6 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts @@ -18,7 +18,7 @@ import { type SyntheticReplayTaskRun, } from "~/v3/mollifier/syntheticReplayTaskRun.server"; import parseDuration from "parse-duration"; -import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server"; +import { regionForDisplay } from "~/runEngine/concerns/workerQueueSplit.server"; import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server"; import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server"; import { ReplayRunData } from "~/v3/replayTask"; @@ -215,7 +215,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) { region: environment.type === "DEVELOPMENT" ? undefined - : run.region || baseWorkerQueue(run.workerQueue), + : regionForDisplay(run.region, run.workerQueue), regions: regionsResult.regions, ttlSeconds: run.ttl ? parseDuration(run.ttl, "s") ?? undefined : undefined, idempotencyKey: run.idempotencyKey, diff --git a/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts b/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts index af0d4a819a..a2bf5df87b 100644 --- a/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts +++ b/apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts @@ -33,6 +33,19 @@ export function baseWorkerQueue(workerQueue: string | null | undefined): string return colon === -1 ? workerQueue : workerQueue.slice(0, colon); } +/** + * User-facing region for read surfaces: the explicit geo region if set, else the + * region derived from the worker queue, else undefined. Use everywhere a run's + * region is displayed so an empty queue never surfaces as `""` and all surfaces + * agree. Not for query keys — those want the raw worker queue, not this fallback. + */ +export function regionForDisplay( + region: string | null | undefined, + workerQueue: string | null | undefined +): string | undefined { + return region || (workerQueue ? baseWorkerQueue(workerQueue) : undefined); +} + /** `TriggerSource` value used for runs originating from a schedule. */ const SCHEDULE_TRIGGER_SOURCE = "schedule"; From 126a01f69269537b7adf8dd557d87c28830eac9c Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Mon, 15 Jun 2026 23:32:50 +0100 Subject: [PATCH 21/22] fix(webapp): look up run's actual worker group, show geo region in span detail --- apps/webapp/app/presenters/v3/SpanPresenter.server.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index 898b3bec60..98ee75cda3 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -303,11 +303,17 @@ export class SpanPresenter extends BasePresenter { location: true, }, where: { - masterQueue: run.region || baseWorkerQueue(run.workerQueue), + // masterQueue is unique and IS the run's backing queue, so this finds + // the group the run actually ran on. + masterQueue: baseWorkerQueue(run.workerQueue), }, }); - region = workerGroup ?? null; + // Show the stamped geo region as the name so a migrated run never reveals + // its compute backing; fall back to the group name for unstamped runs. + region = workerGroup + ? { name: run.region ?? workerGroup.name, location: workerGroup.location } + : null; } // Only AGENT-tagged runs (chat.agent and friends) can be session-bound, From 188dac278feb1b3d28fadb3a89bb24b25314e2a1 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Tue, 16 Jun 2026 00:17:12 +0100 Subject: [PATCH 22/22] docs(webapp): trim hot-path comment, note region must stay whereTransform-free --- apps/webapp/app/runEngine/services/triggerTask.server.ts | 8 ++++---- apps/webapp/app/v3/querySchemas.ts | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 0e819fba49..1c2936e89a 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -361,10 +361,10 @@ export class RunEngineTriggerTaskService { const baseWorkerQueue = workerQueueResult?.masterQueue; const enableFastPath = workerQueueResult?.enableFastPath ?? false; - // Plan-aware compute migration: rewrite the resolved region to its - // compute backing for enrolled orgs. Reads the in-memory global-flags - // snapshot (no DB query). Gate the first read on the registry so a cold - // replica never serves a default over a real flag value. + // Rewrite the region to its compute backing for migration-enrolled orgs, + // from the in-memory flag snapshot (no DB query). The isLoaded gates only + // block during cold start so the first request can't serve a default over + // a real flag; once warm they're a synchronous no-op. if (!globalFlagsRegistry.isLoaded) { await globalFlagsRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS); } diff --git a/apps/webapp/app/v3/querySchemas.ts b/apps/webapp/app/v3/querySchemas.ts index 776cb5f01a..304664800e 100644 --- a/apps/webapp/app/v3/querySchemas.ts +++ b/apps/webapp/app/v3/querySchemas.ts @@ -197,6 +197,7 @@ export const runsSchema: TableSchema = { description: "Region", example: "us-east-1", }), + // No whereTransform: the expression drives WHERE too, so pre-region rows still match. expression: "multiIf(region != '', region, startsWith(worker_queue, 'cm'), NULL, worker_queue)", },