Skip to content

Commit a1a460e

Browse files
committed
feat(webapp): stamp geo region on runs, keep worker_queue raw, test-safe registries
1 parent 697de03 commit a1a460e

26 files changed

Lines changed: 196 additions & 86 deletions

apps/webapp/app/entry.server.tsx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ import { sessionsReplicationInstance } from "./services/sessionsReplicationInsta
4545
sessionsReplicationInstance;
4646
import { globalFlagsRegistry } from "./v3/globalFlagsRegistry.server";
4747
(globalThis as Record<string, unknown>).__globalFlagsRegistry = globalFlagsRegistry;
48+
import { workerRegionRegistry } from "./v3/workerRegions.server";
49+
(globalThis as Record<string, unknown>).__workerRegionRegistry = workerRegionRegistry;
4850

4951
const ABORT_DELAY = 30000;
5052

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import assertNever from "assert-never";
1616
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
1717
import { $replica, prisma } from "~/db.server";
1818
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
19-
import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server";
2019
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
2120
import {
2221
findRunByIdWithMollifierFallback,
@@ -50,6 +49,7 @@ const commonRunSelect = {
5049
depth: true,
5150
scheduleId: true,
5251
workerQueue: true,
52+
region: true,
5353
lockedToVersion: {
5454
select: {
5555
version: true,
@@ -521,9 +521,8 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V
521521
triggerFunction: resolveTriggerFunction(run),
522522
batchId: run.batch?.friendlyId,
523523
metadata,
524-
region: run.workerQueue
525-
? regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? [])
526-
: undefined,
524+
region:
525+
run.region || run.workerQueue ? run.region ?? baseWorkerQueue(run.workerQueue) : undefined,
527526
};
528527
}
529528

@@ -687,6 +686,7 @@ export function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
687686
// API response's `region` to undefined instead of advertising a
688687
// misleading "main" region for a not-yet-assigned buffered run).
689688
workerQueue: buffered.workerQueue ?? "",
689+
region: buffered.region ?? "",
690690
parentTaskRun: null,
691691
rootTaskRun: null,
692692
childRuns: [],

apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
1212
import { getTaskIdentifiers } from "~/models/task.server";
1313
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
1414
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
15-
import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server";
1615
import { machinePresetFromRun } from "~/v3/machinePresets.server";
1716
import { ServiceValidationError } from "~/v3/services/baseService.server";
1817
import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus";
@@ -261,9 +260,7 @@ export class NextRunListPresenter {
261260
name: run.queue.replace("task/", ""),
262261
type: run.queue.startsWith("task/") ? "task" : "custom",
263262
},
264-
region: run.workerQueue
265-
? regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? [])
266-
: undefined,
263+
region: run.region || baseWorkerQueue(run.workerQueue),
267264
taskKind: RunAnnotations.safeParse(run.annotations).data?.taskKind ?? "STANDARD",
268265
};
269266
}),

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import {
1616
} from "@trigger.dev/core/v3/serverOnly";
1717
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
1818
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
19-
import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server";
2019
import { logger } from "~/services/logger.server";
2120
import { rehydrateAttribute } from "~/v3/eventRepository/eventRepository.server";
2221
import { machinePresetFromRun } from "~/v3/machinePresets.server";
@@ -304,7 +303,7 @@ export class SpanPresenter extends BasePresenter {
304303
location: true,
305304
},
306305
where: {
307-
masterQueue: regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? []),
306+
masterQueue: run.region ?? baseWorkerQueue(run.workerQueue),
308307
},
309308
});
310309

@@ -514,6 +513,7 @@ export class SpanPresenter extends BasePresenter {
514513
},
515514
engine: true,
516515
workerQueue: true,
516+
region: true,
517517
error: true,
518518
output: true,
519519
outputType: true,

apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import {
1919
} from "~/v3/mollifier/syntheticReplayTaskRun.server";
2020
import parseDuration from "parse-duration";
2121
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
22-
import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server";
2322
import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server";
2423
import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server";
2524
import { ReplayRunData } from "~/v3/replayTask";
@@ -53,6 +52,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
5352
maxDurationInSeconds: true,
5453
machinePreset: true,
5554
workerQueue: true,
55+
region: true,
5656
ttl: true,
5757
idempotencyKey: true,
5858
runTags: true,
@@ -164,6 +164,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
164164
maxDurationInSeconds: buffered.maxDurationInSeconds ?? null,
165165
machinePreset: buffered.machinePreset ?? null,
166166
workerQueue: buffered.workerQueue ?? null,
167+
region: buffered.region ?? null,
167168
ttl: buffered.ttl ?? null,
168169
idempotencyKey: buffered.idempotencyKey ?? null,
169170
runTags: buffered.runTags,
@@ -214,7 +215,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
214215
region:
215216
environment.type === "DEVELOPMENT"
216217
? undefined
217-
: regionForQueue(baseWorkerQueue(run.workerQueue), workerRegionRegistry.current() ?? []),
218+
: run.region ?? baseWorkerQueue(run.workerQueue),
218219
regions: regionsResult.regions,
219220
ttlSeconds: run.ttl ? parseDuration(run.ttl, "s") ?? undefined : undefined,
220221
idempotencyKey: run.idempotencyKey,

apps/webapp/app/runEngine/concerns/computeMigration.server.ts

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,24 +46,32 @@ export function isOrgMigrated({
4646

4747
type ResolveInput = MigrationDecisionInput & {
4848
baseWorkerQueue: string | undefined;
49+
baseEnableFastPath: boolean;
50+
region: string | undefined; // geo of the base queue (same whether migrated or not)
51+
backing: { workerQueue: string; enableFastPath: boolean } | undefined;
4952
envType: string;
50-
backing: string | undefined; // the compute backing for this queue's region, or undefined
5153
};
5254

5355
/**
54-
* Rewrite the resolved worker queue to its compute backing when the org is
55-
* migrated and the region has a backing. Same-geo swap (us-east-1 -> us-east-1-next):
56-
* any explicit placement is a geography preference, honored by staying in-region.
57-
* Applied after region resolution, mirroring the scheduled-split.
56+
* Produce the target descriptor `{ workerQueue, region, enableFastPath }` for a
57+
* run. When the org is migrated and the region has a compute backing, the queue
58+
* and fast-path setting come from the MICROVM backing group; `region` is the geo
59+
* either way. Same-geo swap (us-east-1 -> us-east-1-next): any explicit placement
60+
* is a geography preference, honored by staying in-region. Applied after region
61+
* resolution, mirroring the scheduled-split.
5862
*/
5963
export function resolveComputeMigration({
6064
baseWorkerQueue,
61-
envType,
65+
baseEnableFastPath,
66+
region,
6267
backing,
68+
envType,
6369
...decision
64-
}: ResolveInput): string | undefined {
65-
if (baseWorkerQueue === undefined) return baseWorkerQueue;
66-
if (envType === "DEVELOPMENT") return baseWorkerQueue;
67-
if (!isOrgMigrated(decision)) return baseWorkerQueue;
68-
return backing ?? baseWorkerQueue;
70+
}: ResolveInput): { workerQueue: string | undefined; region: string | undefined; enableFastPath: boolean } {
71+
const passthrough = { workerQueue: baseWorkerQueue, region, enableFastPath: baseEnableFastPath };
72+
if (baseWorkerQueue === undefined) return passthrough;
73+
if (envType === "DEVELOPMENT") return passthrough;
74+
if (!isOrgMigrated(decision)) return passthrough;
75+
if (!backing) return passthrough;
76+
return { workerQueue: backing.workerQueue, region, enableFastPath: backing.enableFastPath };
6977
}

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import {
3939
workerQueueForRun,
4040
} from "../concerns/workerQueueSplit.server";
4141
import { resolveComputeMigration } from "../concerns/computeMigration.server";
42-
import { workerRegionRegistry, backingForQueue } from "~/v3/workerRegions.server";
42+
import { workerRegionRegistry, backingForQueue, regionForQueue } from "~/v3/workerRegions.server";
4343
import { globalFlagsRegistry } from "~/v3/globalFlagsRegistry.server";
4444
import {
4545
publishClaim as publishMollifierClaim,
@@ -372,14 +372,18 @@ export class RunEngineTriggerTaskService {
372372
await workerRegionRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS);
373373
}
374374
const workerGroups = workerRegionRegistry.current() ?? [];
375-
const migratedWorkerQueue = resolveComputeMigration({
375+
const region = baseWorkerQueue ? regionForQueue(baseWorkerQueue, workerGroups) : undefined;
376+
const backing = baseWorkerQueue ? backingForQueue(baseWorkerQueue, workerGroups) : undefined;
377+
const migrated = resolveComputeMigration({
376378
baseWorkerQueue,
379+
baseEnableFastPath: enableFastPath,
380+
region,
381+
backing,
377382
planType,
378383
orgId: environment.organization.id,
379384
orgFeatureFlags: environment.organization.featureFlags as Record<string, unknown> | null,
380385
flags: globalFlagsRegistry.current(),
381386
envType: environment.type,
382-
backing: baseWorkerQueue ? backingForQueue(baseWorkerQueue, workerGroups) : undefined,
383387
});
384388

385389
// Build annotations for this run
@@ -410,13 +414,13 @@ export class RunEngineTriggerTaskService {
410414
globalDefault: env.TRIGGER_WORKER_QUEUE_SCHEDULED_SPLIT_ENABLED === "1",
411415
});
412416
const workerQueue =
413-
migratedWorkerQueue !== undefined
417+
migrated.workerQueue !== undefined
414418
? workerQueueForRun({
415-
workerQueue: migratedWorkerQueue,
419+
workerQueue: migrated.workerQueue,
416420
rootTriggerSource: annotations.rootTriggerSource,
417421
splitEnabled: scheduledQueueSplitEnabled,
418422
})
419-
: migratedWorkerQueue;
423+
: migrated.workerQueue;
420424

421425
try {
422426
return await this.traceEventConcern.traceRun(
@@ -515,7 +519,8 @@ export class RunEngineTriggerTaskService {
515519
queueName,
516520
lockedQueueId,
517521
workerQueue,
518-
enableFastPath,
522+
region: migrated.region,
523+
enableFastPath: migrated.enableFastPath,
519524
lockedToBackgroundWorker: lockedToBackgroundWorker ?? undefined,
520525
delayUntil,
521526
ttl,
@@ -593,7 +598,8 @@ export class RunEngineTriggerTaskService {
593598
queueName,
594599
lockedQueueId,
595600
workerQueue,
596-
enableFastPath,
601+
region: migrated.region,
602+
enableFastPath: migrated.enableFastPath,
597603
lockedToBackgroundWorker: lockedToBackgroundWorker ?? undefined,
598604
delayUntil,
599605
ttl,
@@ -742,6 +748,7 @@ export class RunEngineTriggerTaskService {
742748
queueName: string;
743749
lockedQueueId?: string;
744750
workerQueue?: string;
751+
region?: string;
745752
enableFastPath: boolean;
746753
lockedToBackgroundWorker?: { id: string; version: string; sdkVersion: string; cliVersion: string };
747754
delayUntil?: Date;
@@ -795,6 +802,7 @@ export class RunEngineTriggerTaskService {
795802
queue: args.queueName,
796803
lockedQueueId: args.lockedQueueId,
797804
workerQueue: args.workerQueue,
805+
region: args.region,
798806
enableFastPath: args.enableFastPath,
799807
isTest: args.body.options?.test ?? false,
800808
delayUntil: args.delayUntil,

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import pLimit from "p-limit";
3939
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
4040
import { calculateErrorFingerprint } from "~/utils/errorFingerprinting";
4141
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
42-
import { regionForQueue, workerRegionRegistry } from "~/v3/workerRegions.server";
4342
import {
4443
isClickHouseJsonParseError,
4544
parseRowNumberFromError,
@@ -1123,7 +1122,8 @@ export class RunsReplicationService {
11231122
event === "delete" ? 1 : 0, // _is_deleted
11241123
run.concurrencyKey ?? "", // concurrency_key
11251124
run.bulkActionGroupIds ?? [], // bulk_action_group_ids
1126-
regionForQueue(baseWorkerQueue(run.masterQueue ?? ""), workerRegionRegistry.current() ?? []), // worker_queue (geo region; strip `:scheduled` and hide the compute backing - customers query this column)
1125+
baseWorkerQueue(run.masterQueue ?? ""), // worker_queue (raw - operators slice by this)
1126+
run.region ?? "", // region (geo for customers)
11271127
run.maxDurationInSeconds ?? null, // max_duration_in_seconds
11281128
annotations?.triggerSource ?? "", // trigger_source
11291129
annotations?.rootTriggerSource ?? "", // root_trigger_source

apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ export class ClickHouseRunsRepository implements IRunsRepository {
204204
machinePreset: true,
205205
queue: true,
206206
workerQueue: true,
207+
region: true,
207208
annotations: true,
208209
},
209210
});
@@ -377,7 +378,9 @@ function applyRunFiltersToQueryBuilder<T>(
377378
}
378379

379380
if (options.regions && options.regions.length > 0) {
380-
queryBuilder.where("worker_queue IN {regions: Array(String)}", { regions: options.regions });
381+
queryBuilder.where("if(region != '', region, worker_queue) IN {regions: Array(String)}", {
382+
regions: options.regions,
383+
});
381384
}
382385

383386
if (options.machines && options.machines.length > 0) {

apps/webapp/app/services/runsRepository/runsRepository.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ export type ListedRun = Prisma.TaskRunGetPayload<{
108108
machinePreset: true;
109109
queue: true;
110110
workerQueue: true;
111+
region: true;
111112
annotations: true;
112113
};
113114
}>;

0 commit comments

Comments
 (0)