Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
846c541
feat(webapp): add compute migration feature flags
nicktrn Jun 15, 2026
8b12326
feat(webapp): add deterministic org hashBucket for rollout
nicktrn Jun 15, 2026
eeca099
feat(webapp): add compute migration env config
nicktrn Jun 15, 2026
7e41492
feat(webapp): add compute migration resolver
nicktrn Jun 15, 2026
b36b83c
feat(webapp): add createReloadingRegistry helper
nicktrn Jun 15, 2026
7067ac7
feat(webapp): boot global flags registry
nicktrn Jun 15, 2026
266fc60
feat(webapp): route migrated orgs to the compute backing at trigger
nicktrn Jun 15, 2026
58d8fe5
feat(webapp): build compute template for migrated orgs
nicktrn Jun 15, 2026
c4e0dcf
chore(webapp): server-changes note for compute migration
nicktrn Jun 15, 2026
65ce342
test(webapp): move hashBucket test into test/ so vitest includes it
nicktrn Jun 15, 2026
4791b01
fix(webapp): hide compute backing on read surfaces and fix replay
nicktrn Jun 15, 2026
8060760
fix(webapp): store geo region in clickhouse to hide compute backing f…
nicktrn Jun 15, 2026
25e3fe1
fix(webapp): serialize registry reloads and clear readiness timeout
nicktrn Jun 15, 2026
18d5b11
fix(webapp): strict boolean kill switch, bound reload interval, cuid …
nicktrn Jun 15, 2026
aefdf3a
feat(database,webapp): add WorkerInstanceGroup.region + worker-region…
nicktrn Jun 15, 2026
53eaa7a
refactor(webapp): resolve region<->backing from WorkerInstanceGroup.r…
nicktrn Jun 15, 2026
222d653
feat(webapp): stamp geo region on runs, keep worker_queue raw, test-s…
nicktrn Jun 15, 2026
b75e18a
fix(webapp): fail-open entitlement lookup in migration mode, harden r…
nicktrn Jun 15, 2026
ea9a071
docs(webapp): trim server-changes note to one behavior-level line
nicktrn Jun 15, 2026
e3524e9
refactor(webapp): centralize display-region fallback in regionForDisplay
nicktrn Jun 15, 2026
126a01f
fix(webapp): look up run's actual worker group, show geo region in sp…
nicktrn Jun 15, 2026
188dac2
docs(webapp): trim hot-path comment, note region must stay whereTrans…
nicktrn Jun 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/plan-aware-compute-migration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Gradually roll out a new run execution backend to a configurable percentage of organizations.
4 changes: 4 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ import { registerRunChangeNotifierHandlers } from "./services/realtime/runChange
import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server";
(globalThis as Record<string, unknown>).__sessionsReplicationInstance =
sessionsReplicationInstance;
import { globalFlagsRegistry } from "./v3/globalFlagsRegistry.server";
(globalThis as Record<string, unknown>).__globalFlagsRegistry = globalFlagsRegistry;
import { workerRegionRegistry } from "./v3/workerRegions.server";
(globalThis as Record<string, unknown>).__workerRegionRegistry = workerRegionRegistry;

const ABORT_DELAY = 30000;

Expand Down
6 changes: 6 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ const EnvironmentSchema = z
WORKER_SCHEMA: z.string().default("graphile_worker"),
WORKER_CONCURRENCY: z.coerce.number().int().default(10),
WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
// How often each replica reloads the global flags snapshot from the DB.
// Sets kill/ramp propagation latency.
GLOBAL_FLAGS_RELOAD_INTERVAL_MS: z.coerce.number().int().min(1000).default(5000),
// Max time the first trigger blocks waiting for the initial flags load
// before falling back to defaults (off = container, the safe direction).
GLOBAL_FLAGS_READY_TIMEOUT_MS: z.coerce.number().int().min(0).default(5000),
WORKER_ENABLED: z.string().default("true"),
GRACEFUL_SHUTDOWN_TIMEOUT: z.coerce.number().int().default(60000),
DISABLE_SSE: z.string().optional(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/databa
import assertNever from "assert-never";
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
import { $replica, prisma } from "~/db.server";
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
import { regionForDisplay } from "~/runEngine/concerns/workerQueueSplit.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import {
findRunByIdWithMollifierFallback,
Expand Down Expand Up @@ -49,6 +49,7 @@ const commonRunSelect = {
depth: true,
scheduleId: true,
workerQueue: true,
region: true,
lockedToVersion: {
select: {
version: true,
Expand Down Expand Up @@ -520,7 +521,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V
triggerFunction: resolveTriggerFunction(run),
batchId: run.batch?.friendlyId,
metadata,
region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined,
region: regionForDisplay(run.region, run.workerQueue),
};
}

Expand Down Expand Up @@ -684,6 +685,7 @@ export function synthesiseFoundRunFromBuffer(buffered: SyntheticRun): FoundRun {
// API response's `region` to undefined instead of advertising a
// misleading "main" region for a not-yet-assigned buffered run).
workerQueue: buffered.workerQueue ?? "",
region: buffered.region ?? "",
parentTaskRun: null,
rootTaskRun: null,
childRuns: [],
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { timeFilters } from "~/components/runs/v3/SharedFilters";
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
import { getTaskIdentifiers } from "~/models/task.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
import { regionForDisplay } from "~/runEngine/concerns/workerQueueSplit.server";
import { machinePresetFromRun } from "~/v3/machinePresets.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus";
Expand Down Expand Up @@ -260,7 +260,7 @@ export class NextRunListPresenter {
name: run.queue.replace("task/", ""),
type: run.queue.startsWith("task/") ? "task" : "custom",
},
region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined,
region: regionForDisplay(run.region, run.workerQueue),
taskKind: RunAnnotations.safeParse(run.annotations).data?.taskKind ?? "STANDARD",
};
}),
Expand Down
9 changes: 8 additions & 1 deletion apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,17 @@ export class SpanPresenter extends BasePresenter {
location: true,
},
where: {
// masterQueue is unique and IS the run's backing queue, so this finds
// the group the run actually ran on.
masterQueue: baseWorkerQueue(run.workerQueue),
},
});

region = workerGroup ?? null;
// Show the stamped geo region as the name so a migrated run never reveals
// its compute backing; fall back to the group name for unstamped runs.
region = workerGroup
? { name: run.region ?? workerGroup.name, location: workerGroup.location }
: null;
}

// Only AGENT-tagged runs (chat.agent and friends) can be session-bound,
Expand Down Expand Up @@ -513,6 +519,7 @@ export class SpanPresenter extends BasePresenter {
},
engine: true,
workerQueue: true,
region: true,
error: true,
output: true,
outputType: true,
Expand Down
9 changes: 7 additions & 2 deletions apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
type SyntheticReplayTaskRun,
} from "~/v3/mollifier/syntheticReplayTaskRun.server";
import parseDuration from "parse-duration";
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
import { regionForDisplay } from "~/runEngine/concerns/workerQueueSplit.server";
import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server";
import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server";
import { ReplayRunData } from "~/v3/replayTask";
Expand Down Expand Up @@ -52,6 +52,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
maxDurationInSeconds: true,
machinePreset: true,
workerQueue: true,
region: true,
ttl: true,
idempotencyKey: true,
runTags: true,
Expand Down Expand Up @@ -163,6 +164,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
maxDurationInSeconds: buffered.maxDurationInSeconds ?? null,
machinePreset: buffered.machinePreset ?? null,
workerQueue: buffered.workerQueue ?? null,
region: buffered.region ?? null,
ttl: buffered.ttl ?? null,
idempotencyKey: buffered.idempotencyKey ?? null,
runTags: buffered.runTags,
Expand Down Expand Up @@ -210,7 +212,10 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
maxAttempts: run.maxAttempts,
maxDurationSeconds: run.maxDurationInSeconds,
machinePreset: run.machinePreset,
region: environment.type === "DEVELOPMENT" ? undefined : baseWorkerQueue(run.workerQueue),
region:
environment.type === "DEVELOPMENT"
? undefined
: regionForDisplay(run.region, run.workerQueue),
regions: regionsResult.regions,
ttlSeconds: run.ttl ? parseDuration(run.ttl, "s") ?? undefined : undefined,
idempotencyKey: run.idempotencyKey,
Expand Down
77 changes: 77 additions & 0 deletions apps/webapp/app/runEngine/concerns/computeMigration.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { hashBucket } from "~/utils/computeBucket";

/** Subset of the global flags snapshot this resolver reads. */
export type ComputeMigrationFlags = {
computeMigrationEnabled?: boolean;
computeMigrationFreePercentage?: number;
computeMigrationPaidPercentage?: number;
};

type MigrationDecisionInput = {
planType: string | undefined;
orgId: string;
orgFeatureFlags: Record<string, unknown> | null | undefined;
flags: ComputeMigrationFlags | undefined;
};

/**
* Whether this org should run on the compute backing. Shared by the trigger-time
* transform and the deploy-time template decision so a migrated org always gets a
* compute template. Precedence: per-org override (both directions) wins; otherwise
* global enable + the plan's percentage bucket. Enterprise and unknown plans are
* never enrolled by percentage (override only). The sole opt-out is the per-org
* `computeMigrationEnabled: false`.
*/
export function isOrgMigrated({
planType,
orgId,
orgFeatureFlags,
flags,
}: MigrationDecisionInput): boolean {
const override = orgFeatureFlags?.["computeMigrationEnabled"];
if (override === false) return false;
if (override === true) return true;

if (!(flags?.computeMigrationEnabled ?? false)) return false;

const pct =
planType === "free"
? flags?.computeMigrationFreePercentage ?? 0
: planType === "paid"
? flags?.computeMigrationPaidPercentage ?? 0
: 0; // enterprise / undefined

return hashBucket(orgId) < pct;
}

type ResolveInput = MigrationDecisionInput & {
baseWorkerQueue: string | undefined;
baseEnableFastPath: boolean;
region: string | undefined; // geo of the base queue (same whether migrated or not)
backing: { workerQueue: string; enableFastPath: boolean } | undefined;
envType: string;
};

/**
* Produce the target descriptor `{ workerQueue, region, enableFastPath }` for a
* run. When the org is migrated and the region has a compute backing, the queue
* and fast-path setting come from the MICROVM backing group; `region` is the geo
* either way. Same-geo swap (us-east-1 -> us-east-1-next): any explicit placement
* is a geography preference, honored by staying in-region. Applied after region
* resolution, mirroring the scheduled-split.
*/
export function resolveComputeMigration({
baseWorkerQueue,
baseEnableFastPath,
region,
backing,
envType,
...decision
}: ResolveInput): { workerQueue: string | undefined; region: string | undefined; enableFastPath: boolean } {
const passthrough = { workerQueue: baseWorkerQueue, region, enableFastPath: baseEnableFastPath };
if (baseWorkerQueue === undefined) return passthrough;
if (envType === "DEVELOPMENT") return passthrough;
if (!isOrgMigrated(decision)) return passthrough;
if (!backing) return passthrough;
return { workerQueue: backing.workerQueue, region, enableFastPath: backing.enableFastPath };
}
13 changes: 13 additions & 0 deletions apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,19 @@ export function baseWorkerQueue(workerQueue: string | null | undefined): string
return colon === -1 ? workerQueue : workerQueue.slice(0, colon);
}

/**
* User-facing region for read surfaces: the explicit geo region if set, else the
* region derived from the worker queue, else undefined. Use everywhere a run's
* region is displayed so an empty queue never surfaces as `""` and all surfaces
* agree. Not for query keys — those want the raw worker queue, not this fallback.
*/
export function regionForDisplay(
region: string | null | undefined,
workerQueue: string | null | undefined
): string | undefined {
return region || (workerQueue ? baseWorkerQueue(workerQueue) : undefined);
}

/** `TriggerSource` value used for runs originating from a schedule. */
const SCHEDULE_TRIGGER_SOURCE = "schedule";

Expand Down
42 changes: 37 additions & 5 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ import {
resolveScheduledQueueSplitEnabled,
workerQueueForRun,
} from "../concerns/workerQueueSplit.server";
import { resolveComputeMigration } from "../concerns/computeMigration.server";
import { workerRegionRegistry, backingForQueue, regionForQueue } from "~/v3/workerRegions.server";
import { globalFlagsRegistry } from "~/v3/globalFlagsRegistry.server";
import {
publishClaim as publishMollifierClaim,
releaseClaim as releaseMollifierClaim,
Expand Down Expand Up @@ -358,6 +361,31 @@ export class RunEngineTriggerTaskService {
const baseWorkerQueue = workerQueueResult?.masterQueue;
const enableFastPath = workerQueueResult?.enableFastPath ?? false;
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

// Rewrite the region to its compute backing for migration-enrolled orgs,
// from the in-memory flag snapshot (no DB query). The isLoaded gates only
// block during cold start so the first request can't serve a default over
// a real flag; once warm they're a synchronous no-op.
if (!globalFlagsRegistry.isLoaded) {
await globalFlagsRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS);
}
if (!workerRegionRegistry.isLoaded) {
await workerRegionRegistry.waitUntilReady(env.GLOBAL_FLAGS_READY_TIMEOUT_MS);
}
const workerGroups = workerRegionRegistry.current() ?? [];
const region = baseWorkerQueue ? regionForQueue(baseWorkerQueue, workerGroups) : undefined;
const backing = baseWorkerQueue ? backingForQueue(baseWorkerQueue, workerGroups) : undefined;
const migrated = resolveComputeMigration({
baseWorkerQueue,
baseEnableFastPath: enableFastPath,
region,
backing,
planType,
orgId: environment.organization.id,
orgFeatureFlags: environment.organization.featureFlags as Record<string, unknown> | null,
flags: globalFlagsRegistry.current(),
envType: environment.type,
});

// Build annotations for this run
const triggerSource = options.triggerSource ?? "api";
const triggerAction = options.triggerAction ?? "trigger";
Expand Down Expand Up @@ -386,13 +414,13 @@ export class RunEngineTriggerTaskService {
globalDefault: env.TRIGGER_WORKER_QUEUE_SCHEDULED_SPLIT_ENABLED === "1",
});
const workerQueue =
baseWorkerQueue !== undefined
migrated.workerQueue !== undefined
? workerQueueForRun({
workerQueue: baseWorkerQueue,
workerQueue: migrated.workerQueue,
rootTriggerSource: annotations.rootTriggerSource,
splitEnabled: scheduledQueueSplitEnabled,
})
: baseWorkerQueue;
: migrated.workerQueue;

try {
return await this.traceEventConcern.traceRun(
Expand Down Expand Up @@ -491,7 +519,8 @@ export class RunEngineTriggerTaskService {
queueName,
lockedQueueId,
workerQueue,
enableFastPath,
region: migrated.region,
enableFastPath: migrated.enableFastPath,
lockedToBackgroundWorker: lockedToBackgroundWorker ?? undefined,
delayUntil,
ttl,
Expand Down Expand Up @@ -569,7 +598,8 @@ export class RunEngineTriggerTaskService {
queueName,
lockedQueueId,
workerQueue,
enableFastPath,
region: migrated.region,
enableFastPath: migrated.enableFastPath,
lockedToBackgroundWorker: lockedToBackgroundWorker ?? undefined,
delayUntil,
ttl,
Expand Down Expand Up @@ -718,6 +748,7 @@ export class RunEngineTriggerTaskService {
queueName: string;
lockedQueueId?: string;
workerQueue?: string;
region?: string;
enableFastPath: boolean;
lockedToBackgroundWorker?: { id: string; version: string; sdkVersion: string; cliVersion: string };
delayUntil?: Date;
Expand Down Expand Up @@ -771,6 +802,7 @@ export class RunEngineTriggerTaskService {
queue: args.queueName,
lockedQueueId: args.lockedQueueId,
workerQueue: args.workerQueue,
region: args.region,
enableFastPath: args.enableFastPath,
isTest: args.body.options?.test ?? false,
delayUntil: args.delayUntil,
Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,8 @@ export class RunsReplicationService {
event === "delete" ? 1 : 0, // _is_deleted
run.concurrencyKey ?? "", // concurrency_key
run.bulkActionGroupIds ?? [], // bulk_action_group_ids
baseWorkerQueue(run.masterQueue ?? ""), // worker_queue (region; strip any split suffix like `:scheduled`)
baseWorkerQueue(run.masterQueue ?? ""), // worker_queue (raw - operators slice by this)
run.region ?? "", // region (geo for customers)
run.maxDurationInSeconds ?? null, // max_duration_in_seconds
annotations?.triggerSource ?? "", // trigger_source
annotations?.rootTriggerSource ?? "", // root_trigger_source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ export class ClickHouseRunsRepository implements IRunsRepository {
machinePreset: true,
queue: true,
workerQueue: true,
region: true,
annotations: true,
},
});
Expand Down Expand Up @@ -377,7 +378,9 @@ function applyRunFiltersToQueryBuilder<T>(
}

if (options.regions && options.regions.length > 0) {
queryBuilder.where("worker_queue IN {regions: Array(String)}", { regions: options.regions });
queryBuilder.where("if(region != '', region, worker_queue) IN {regions: Array(String)}", {
regions: options.regions,
});
}

if (options.machines && options.machines.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export type ListedRun = Prisma.TaskRunGetPayload<{
machinePreset: true;
queue: true;
workerQueue: true;
region: true;
annotations: true;
};
}>;
Expand Down
15 changes: 15 additions & 0 deletions apps/webapp/app/utils/computeBucket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* Deterministic 0-99 bucket for an org id, stable across processes and deploys.
* FNV-1a (non-crypto): we only need determinism + uniform spread, not collision
* resistance. Used for nested percentage rollout: `hashBucket(orgId) < percentage`.
* Ramping the percentage down keeps a strict subset (the low buckets), so an org
* never flaps in and out as the dial moves.
*/
export function hashBucket(orgId: string): number {
let hash = 0x811c9dc5; // FNV offset basis
for (let i = 0; i < orgId.length; i++) {
hash ^= orgId.charCodeAt(i);
hash = Math.imul(hash, 0x01000193) >>> 0;
}
return hash % 100;
}
Loading