Skip to content

Commit debd380

Browse files
committed
fix(webapp): show scheduled runs under their correct region
A worker queue can carry a `:scheduled` suffix that routes scheduled-lineage runs onto their own list. That suffix is an internal routing detail, but it leaked anywhere the worker queue is read as a region, so scheduled runs showed under a phantom region in the dashboard, run details, and the API, and slipped through region filters. A `baseWorkerQueue` helper strips any `:<class>` suffix back to the base region (region names never contain a colon) and is applied at every region read site. The runs-replication writer stores the base region in ClickHouse so the region filter matches.
1 parent 8b85da1 commit debd380

9 files changed

Lines changed: 71 additions & 6 deletions
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Scheduled runs now show under their correct region in the dashboard, run details, and the API, and match region filters, instead of appearing under a separate region.

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/databa
1515
import assertNever from "assert-never";
1616
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
1717
import { $replica, prisma } from "~/db.server";
18+
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
1819
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1920
import {
2021
findRunByIdWithMollifierFallback,
@@ -519,7 +520,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V
519520
triggerFunction: resolveTriggerFunction(run),
520521
batchId: run.batch?.friendlyId,
521522
metadata,
522-
region: run.workerQueue || undefined,
523+
region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined,
523524
};
524525
}
525526

apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { timeFilters } from "~/components/runs/v3/SharedFilters";
1111
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
1212
import { getTaskIdentifiers } from "~/models/task.server";
1313
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
14+
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
1415
import { machinePresetFromRun } from "~/v3/machinePresets.server";
1516
import { ServiceValidationError } from "~/v3/services/baseService.server";
1617
import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus";
@@ -259,7 +260,7 @@ export class NextRunListPresenter {
259260
name: run.queue.replace("task/", ""),
260261
type: run.queue.startsWith("task/") ? "task" : "custom",
261262
},
262-
region: run.workerQueue ? run.workerQueue : undefined,
263+
region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined,
263264
taskKind: RunAnnotations.safeParse(run.annotations).data?.taskKind ?? "STANDARD",
264265
};
265266
}),

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
getUserProvidedIdempotencyKey,
1616
} from "@trigger.dev/core/v3/serverOnly";
1717
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
18+
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
1819
import { logger } from "~/services/logger.server";
1920
import { rehydrateAttribute } from "~/v3/eventRepository/eventRepository.server";
2021
import { machinePresetFromRun } from "~/v3/machinePresets.server";
@@ -302,7 +303,7 @@ export class SpanPresenter extends BasePresenter {
302303
location: true,
303304
},
304305
where: {
305-
masterQueue: run.workerQueue,
306+
masterQueue: baseWorkerQueue(run.workerQueue),
306307
},
307308
});
308309

apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
type SyntheticReplayTaskRun,
1919
} from "~/v3/mollifier/syntheticReplayTaskRun.server";
2020
import parseDuration from "parse-duration";
21+
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
2122
import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server";
2223
import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server";
2324
import { ReplayRunData } from "~/v3/replayTask";
@@ -209,7 +210,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
209210
maxAttempts: run.maxAttempts,
210211
maxDurationSeconds: run.maxDurationInSeconds,
211212
machinePreset: run.machinePreset,
212-
region: environment.type === "DEVELOPMENT" ? undefined : run.workerQueue,
213+
region: environment.type === "DEVELOPMENT" ? undefined : baseWorkerQueue(run.workerQueue),
213214
regions: regionsResult.regions,
214215
ttlSeconds: run.ttl ? parseDuration(run.ttl, "s") ?? undefined : undefined,
215216
idempotencyKey: run.idempotencyKey,

apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,20 @@ import { FEATURE_FLAG, FeatureFlagCatalog } from "~/v3/featureFlags";
1212
*/
1313
export const SCHEDULED_WORKER_QUEUE_SUFFIX = ":scheduled";
1414

15+
/**
16+
* Recover the base region a worker queue belongs to by stripping any split
17+
* suffix (e.g. `us-nyc-3:scheduled` -> `us-nyc-3`). Region/masterQueue names are
18+
* either `<name>` or `<projectId>-<name>` and never contain a colon, so the
19+
* region is everything before the first `:`. Use this wherever a worker queue is
20+
* read as a region — for display, filtering, or as a region override — so
21+
* scheduled-split runs group under their real region instead of a phantom one.
22+
* Idempotent; returns the input unchanged when there's no suffix.
23+
*/
24+
export function baseWorkerQueue(workerQueue: string): string {
25+
const colon = workerQueue.indexOf(":");
26+
return colon === -1 ? workerQueue : workerQueue.slice(0, colon);
27+
}
28+
1529
/** `TriggerSource` value used for runs originating from a schedule. */
1630
const SCHEDULE_TRIGGER_SOURCE = "schedule";
1731

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import EventEmitter from "node:events";
3838
import pLimit from "p-limit";
3939
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
4040
import { calculateErrorFingerprint } from "~/utils/errorFingerprinting";
41+
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
4142
import {
4243
isClickHouseJsonParseError,
4344
parseRowNumberFromError,
@@ -1121,7 +1122,7 @@ export class RunsReplicationService {
11211122
event === "delete" ? 1 : 0, // _is_deleted
11221123
run.concurrencyKey ?? "", // concurrency_key
11231124
run.bulkActionGroupIds ?? [], // bulk_action_group_ids
1124-
run.masterQueue ?? "", // worker_queue
1125+
baseWorkerQueue(run.masterQueue ?? ""), // worker_queue (region; strip any split suffix like `:scheduled`)
11251126
run.maxDurationInSeconds ?? null, // max_duration_in_seconds
11261127
annotations?.triggerSource ?? "", // trigger_source
11271128
annotations?.rootTriggerSource ?? "", // root_trigger_source

apps/webapp/app/v3/services/replayTaskRun.server.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
} from "@trigger.dev/core/v3";
77
import { type TaskRun } from "@trigger.dev/database";
88
import { findEnvironmentById } from "~/models/runtimeEnvironment.server";
9+
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
910
import { logger } from "~/services/logger.server";
1011
import { BaseService } from "./baseService.server";
1112
import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server";
@@ -65,7 +66,9 @@ export class ReplayTaskRunService extends BaseService {
6566
existingTaskRun.engine === "V1" ||
6667
existingEnvironment.type === "DEVELOPMENT" ||
6768
authenticatedEnvironment.type === "DEVELOPMENT";
68-
const region = ignoreRegion ? undefined : overrideOptions.region ?? existingTaskRun.workerQueue;
69+
const region = ignoreRegion
70+
? undefined
71+
: overrideOptions.region ?? baseWorkerQueue(existingTaskRun.workerQueue);
6972

7073
try {
7174
const taskQueue = await this._prisma.taskQueue.findFirst({

apps/webapp/test/workerQueueSplit.test.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { describe, expect, it } from "vitest";
22
import {
3+
baseWorkerQueue,
34
resolveScheduledQueueSplitEnabled,
45
workerQueueForRun,
56
workerQueueForClass,
@@ -98,6 +99,42 @@ describe("workerQueueForRun", () => {
9899
});
99100
});
100101

102+
describe("baseWorkerQueue", () => {
103+
const region = "us-nyc-3";
104+
const scheduled = `${region}${SCHEDULED_WORKER_QUEUE_SUFFIX}`;
105+
106+
it("strips the scheduled split suffix back to the base region", () => {
107+
expect(baseWorkerQueue(scheduled)).toBe(region);
108+
});
109+
110+
it("leaves a base region untouched (idempotent)", () => {
111+
expect(baseWorkerQueue(region)).toBe(region);
112+
expect(baseWorkerQueue(baseWorkerQueue(scheduled))).toBe(region);
113+
});
114+
115+
it("strips any future `:<class>` suffix, not just `:scheduled`", () => {
116+
expect(baseWorkerQueue("us-nyc-3:priority")).toBe(region);
117+
expect(baseWorkerQueue("us-nyc-3:a:b")).toBe(region);
118+
});
119+
120+
it("handles the project-scoped masterQueue shape (`<projectId>-<name>`)", () => {
121+
expect(baseWorkerQueue("proj_abc123-main:scheduled")).toBe("proj_abc123-main");
122+
});
123+
124+
it("returns an empty string unchanged", () => {
125+
expect(baseWorkerQueue("")).toBe("");
126+
});
127+
128+
it("round-trips with workerQueueForRun: the split queue strips back to the region it came from", () => {
129+
const enqueued = workerQueueForRun({
130+
workerQueue: region,
131+
rootTriggerSource: "schedule",
132+
splitEnabled: true,
133+
});
134+
expect(baseWorkerQueue(enqueued)).toBe(region);
135+
});
136+
});
137+
101138
describe("workerQueueForClass", () => {
102139
const region = "us-nyc-3";
103140
const scheduled = `${region}${SCHEDULED_WORKER_QUEUE_SUFFIX}`;

0 commit comments

Comments
 (0)