Skip to content

Commit 697de03

Browse files
committed
refactor(webapp): resolve region<->backing from WorkerInstanceGroup.region, drop env map
1 parent 7d1d8fb commit 697de03

13 files changed

Lines changed: 35 additions & 115 deletions

apps/webapp/app/env.server.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,10 +158,6 @@ const EnvironmentSchema = z
158158
WORKER_SCHEMA: z.string().default("graphile_worker"),
159159
WORKER_CONCURRENCY: z.coerce.number().int().default(10),
160160
WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
161-
// JSON map of container-region masterQueue -> compute-backing masterQueue.
162-
// Absence of an entry means that region is never migrated (e.g. EU until it
163-
// has a compute backing). Example: {"us-east-1":"us-east-1-next"}
164-
COMPUTE_BACKING_MAP: z.string().default("{}"),
165161
// How often each replica reloads the global flags snapshot from the DB.
166162
// Sets kill/ramp propagation latency.
167163
GLOBAL_FLAGS_RELOAD_INTERVAL_MS: z.coerce.number().int().min(1000).default(5000),

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ import assertNever from "assert-never";
1616
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
1717
import { $replica, prisma } from "~/db.server";
1818
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
19-
import { regionForBacking } from "~/runEngine/concerns/computeMigration.server";
20-
import { computeBackingMap } from "~/v3/computeBackingMap.server";
19+
import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server";
2120
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
2221
import {
2322
findRunByIdWithMollifierFallback,
@@ -523,7 +522,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V
523522
batchId: run.batch?.friendlyId,
524523
metadata,
525524
region: run.workerQueue
526-
? regionForBacking(baseWorkerQueue(run.workerQueue), computeBackingMap)
525+
? regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? [])
527526
: undefined,
528527
};
529528
}

apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@ import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
1212
import { getTaskIdentifiers } from "~/models/task.server";
1313
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
1414
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
15-
import { regionForBacking } from "~/runEngine/concerns/computeMigration.server";
16-
import { computeBackingMap } from "~/v3/computeBackingMap.server";
15+
import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server";
1716
import { machinePresetFromRun } from "~/v3/machinePresets.server";
1817
import { ServiceValidationError } from "~/v3/services/baseService.server";
1918
import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus";
@@ -263,7 +262,7 @@ export class NextRunListPresenter {
263262
type: run.queue.startsWith("task/") ? "task" : "custom",
264263
},
265264
region: run.workerQueue
266-
? regionForBacking(baseWorkerQueue(run.workerQueue), computeBackingMap)
265+
? regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? [])
267266
: undefined,
268267
taskKind: RunAnnotations.safeParse(run.annotations).data?.taskKind ?? "STANDARD",
269268
};

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ import {
1616
} from "@trigger.dev/core/v3/serverOnly";
1717
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
1818
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
19-
import { regionForBacking } from "~/runEngine/concerns/computeMigration.server";
20-
import { computeBackingMap } from "~/v3/computeBackingMap.server";
19+
import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server";
2120
import { logger } from "~/services/logger.server";
2221
import { rehydrateAttribute } from "~/v3/eventRepository/eventRepository.server";
2322
import { machinePresetFromRun } from "~/v3/machinePresets.server";
@@ -305,7 +304,7 @@ export class SpanPresenter extends BasePresenter {
305304
location: true,
306305
},
307306
where: {
308-
masterQueue: regionForBacking(baseWorkerQueue(run.workerQueue), computeBackingMap),
307+
masterQueue: regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? []),
309308
},
310309
});
311310

apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ import {
1919
} from "~/v3/mollifier/syntheticReplayTaskRun.server";
2020
import parseDuration from "parse-duration";
2121
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
22-
import { regionForBacking } from "~/runEngine/concerns/computeMigration.server";
23-
import { computeBackingMap } from "~/v3/computeBackingMap.server";
22+
import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server";
2423
import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server";
2524
import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server";
2625
import { ReplayRunData } from "~/v3/replayTask";
@@ -215,7 +214,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
215214
region:
216215
environment.type === "DEVELOPMENT"
217216
? undefined
218-
: regionForBacking(baseWorkerQueue(run.workerQueue), computeBackingMap),
217+
: regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? []),
219218
regions: regionsResult.regions,
220219
ttlSeconds: run.ttl ? parseDuration(run.ttl, "s") ?? undefined : undefined,
221220
idempotencyKey: run.idempotencyKey,

apps/webapp/app/runEngine/concerns/computeMigration.server.ts

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -7,40 +7,6 @@ export type ComputeMigrationFlags = {
77
computeMigrationPaidPercentage?: number;
88
};
99

10-
export type ComputeBackingMap = Record<string, string>;
11-
12-
/**
13-
* Parse COMPUTE_BACKING_MAP (container-region masterQueue -> compute-backing
14-
* masterQueue). Never throws: bad JSON or non-string values yield {} so a
15-
* misconfigured env disables migration rather than breaking triggers.
16-
*/
17-
export function parseComputeBackingMap(raw: string): ComputeBackingMap {
18-
try {
19-
const parsed = JSON.parse(raw);
20-
if (typeof parsed !== "object" || parsed === null) return {};
21-
const out: ComputeBackingMap = {};
22-
for (const [k, v] of Object.entries(parsed)) {
23-
if (typeof v === "string") out[k] = v;
24-
}
25-
return out;
26-
} catch {
27-
return {};
28-
}
29-
}
30-
31-
/**
32-
* Inverse of the backing map: given a worker queue, return the user-facing geo
33-
* region. If the queue is a compute backing (a *value* in the map), return the
34-
* region it backs; otherwise return the queue unchanged. Used to hide the
35-
* backing on customer surfaces and to re-derive the region on replay.
36-
*/
37-
export function regionForBacking(queue: string, backingMap: ComputeBackingMap): string {
38-
for (const [region, backing] of Object.entries(backingMap)) {
39-
if (backing === queue) return region;
40-
}
41-
return queue;
42-
}
43-
4410
type MigrationDecisionInput = {
4511
planType: string | undefined;
4612
orgId: string;
@@ -81,7 +47,7 @@ export function isOrgMigrated({
8147
type ResolveInput = MigrationDecisionInput & {
8248
baseWorkerQueue: string | undefined;
8349
envType: string;
84-
backingMap: ComputeBackingMap;
50+
backing: string | undefined; // the compute backing for this queue's region, or undefined
8551
};
8652

8753
/**
@@ -93,11 +59,11 @@ type ResolveInput = MigrationDecisionInput & {
9359
export function resolveComputeMigration({
9460
baseWorkerQueue,
9561
envType,
96-
backingMap,
62+
backing,
9763
...decision
9864
}: ResolveInput): string | undefined {
9965
if (baseWorkerQueue === undefined) return baseWorkerQueue;
10066
if (envType === "DEVELOPMENT") return baseWorkerQueue;
10167
if (!isOrgMigrated(decision)) return baseWorkerQueue;
102-
return backingMap[baseWorkerQueue] ?? baseWorkerQueue;
68+
return backing ?? baseWorkerQueue;
10369
}

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import {
3939
workerQueueForRun,
4040
} from "../concerns/workerQueueSplit.server";
4141
import { resolveComputeMigration } from "../concerns/computeMigration.server";
42-
import { computeBackingMap } from "~/v3/computeBackingMap.server";
42+
import { workerRegionRegistry, backingForQueue } from "~/v3/workerRegions.server";
4343
import { globalFlagsRegistry } from "~/v3/globalFlagsRegistry.server";
4444
import {
4545
publishClaim as publishMollifierClaim,
@@ -368,14 +368,18 @@ export class RunEngineTriggerTaskService {
368368
if (!globalFlagsRegistry.isLoaded) {
369369
await globalFlagsRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS);
370370
}
371+
if (!workerRegionRegistry.isLoaded) {
372+
await workerRegionRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS);
373+
}
374+
const workerGroups = workerRegionRegistry.current() ?? [];
371375
const migratedWorkerQueue = resolveComputeMigration({
372376
baseWorkerQueue,
373377
planType,
374378
orgId: environment.organization.id,
375379
orgFeatureFlags: environment.organization.featureFlags as Record<string, unknown> | null,
376380
flags: globalFlagsRegistry.current(),
377381
envType: environment.type,
378-
backingMap: computeBackingMap,
382+
backing: baseWorkerQueue ? backingForQueue(baseWorkerQueue, workerGroups) : undefined,
379383
});
380384

381385
// Build annotations for this run

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ import pLimit from "p-limit";
3939
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
4040
import { calculateErrorFingerprint } from "~/utils/errorFingerprinting";
4141
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
42-
import { regionForBacking } from "~/runEngine/concerns/computeMigration.server";
43-
import { computeBackingMap } from "~/v3/computeBackingMap.server";
42+
import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server";
4443
import {
4544
isClickHouseJsonParseError,
4645
parseRowNumberFromError,
@@ -1124,7 +1123,7 @@ export class RunsReplicationService {
11241123
event === "delete" ? 1 : 0, // _is_deleted
11251124
run.concurrencyKey ?? "", // concurrency_key
11261125
run.bulkActionGroupIds ?? [], // bulk_action_group_ids
1127-
regionForBacking(baseWorkerQueue(run.masterQueue ?? ""), computeBackingMap), // worker_queue (geo region; strip `:scheduled` and hide the compute backing - customers query this column)
1126+
regionForQueue(baseWorkerQueue(run.masterQueue ?? ""), workerRegionRegistry.current() ?? []), // worker_queue (geo region; strip `:scheduled` and hide the compute backing - customers query this column)
11281127
run.maxDurationInSeconds ?? null, // max_duration_in_seconds
11291128
annotations?.triggerSource ?? "", // trigger_source
11301129
annotations?.rootTriggerSource ?? "", // root_trigger_source

apps/webapp/app/v3/computeBackingMap.server.ts

Lines changed: 0 additions & 5 deletions
This file was deleted.

apps/webapp/app/v3/services/computeTemplateCreation.server.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
99
import { ServiceValidationError } from "./baseService.server";
1010
import { FailDeploymentService } from "./failDeployment.server";
1111
import { resolveComputeAccess } from "../regionAccess.server";
12-
import { isOrgMigrated, parseComputeBackingMap } from "~/runEngine/concerns/computeMigration.server";
12+
import { isOrgMigrated } from "~/runEngine/concerns/computeMigration.server";
13+
import { backingForQueue, workerRegionRegistry } from "~/v3/workerRegions.server";
1314
import { globalFlagsRegistry } from "~/v3/globalFlagsRegistry.server";
1415
import { getEntitlement } from "~/services/platform.v3.server";
1516

@@ -167,9 +168,11 @@ export class ComputeTemplateCreationService {
167168
// Migrated orgs route runs to the compute backing even though their stored
168169
// default is still the container region, so they need a compute template too.
169170
// shadow mode: never fail a deploy over a backing the org didn't opt into.
170-
const backingMap = parseComputeBackingMap(env.COMPUTE_BACKING_MAP);
171+
if (!workerRegionRegistry.isLoaded) {
172+
await workerRegionRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS);
173+
}
171174
const defaultQueue = project.defaultWorkerGroup?.masterQueue;
172-
if (defaultQueue && backingMap[defaultQueue]) {
175+
if (defaultQueue && backingForQueue(defaultQueue, workerRegionRegistry.current() ?? [])) {
173176
const planType = (await getEntitlement(project.organization.id))?.plan?.type;
174177
if (!globalFlagsRegistry.isLoaded) {
175178
await globalFlagsRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS);

0 commit comments

Comments
 (0)