Skip to content

Commit 97d2f72

Browse files
authored
feat(supervisor): schedule-tree node affinity (#3271)
Scheduled runs create predictable hourly spikes that compete with on-demand runs for node capacity. Runs triggered "on-demand" via the SDK, API, or dashboard, are more sensitive to cold start latency since users are typically waiting on the result. When a burst of scheduled runs lands at the top of the hour, it can saturate the shared pool resources causing contention, affecting cold starts across the board. The idea in this change is to absorb these periodic spikes in a dedicated pool without affecting the cold starts of on-demand runs. Scheduled runs are inherently less sensitive to cold starts. ### Changes in this PR Follows up on run annotations (#3241), which made trigger origin available on every run in the tree. This PR exposes annotations at dequeue time to the supervisor. This enables scheduling decisions based on trigger source. The affinities are soft preferences at schedule time, so runs fall back gracefully if the target pool is out out of capacity.
1 parent 1a6481a commit 97d2f72

File tree

7 files changed

+128
-34
lines changed

7 files changed

+128
-34
lines changed

apps/supervisor/src/env.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,24 @@ const Env = z.object({
110110

111111
KUBERNETES_MEMORY_OVERHEAD_GB: z.coerce.number().min(0).optional(), // Optional memory overhead to add to the limit in GB
112112
KUBERNETES_SCHEDULER_NAME: z.string().optional(), // Custom scheduler name for pods
113-
KUBERNETES_LARGE_MACHINE_POOL_LABEL: z.string().optional(), // if set, large-* presets affinity for machinepool=<value>
113+
// Large machine affinity settings - large-* presets prefer a dedicated pool
114+
KUBERNETES_LARGE_MACHINE_AFFINITY_ENABLED: BoolEnv.default(false),
115+
KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_KEY: z.string().trim().min(1).default("node.cluster.x-k8s.io/machinepool"),
116+
KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_VALUE: z.string().trim().min(1).default("large-machines"),
117+
KUBERNETES_LARGE_MACHINE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(100),
114118

115119
// Project affinity settings - pods from the same project prefer the same node
116120
KUBERNETES_PROJECT_AFFINITY_ENABLED: BoolEnv.default(false),
117121
KUBERNETES_PROJECT_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(50),
118122
KUBERNETES_PROJECT_AFFINITY_TOPOLOGY_KEY: z.string().trim().min(1).default("kubernetes.io/hostname"),
119123

124+
// Schedule affinity settings - runs from schedule trees prefer a dedicated pool
125+
KUBERNETES_SCHEDULE_AFFINITY_ENABLED: BoolEnv.default(false),
126+
KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY: z.string().trim().min(1).default("node.cluster.x-k8s.io/machinepool"),
127+
KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE: z.string().trim().min(1).default("scheduled-runs"),
128+
KUBERNETES_SCHEDULE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(80),
129+
KUBERNETES_SCHEDULE_ANTI_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(20),
130+
120131
// Placement tags settings
121132
PLACEMENT_TAGS_ENABLED: BoolEnv.default(false),
122133
PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io"),

apps/supervisor/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ class ManagedSupervisor {
267267
snapshotId: message.snapshot.id,
268268
snapshotFriendlyId: message.snapshot.friendlyId,
269269
placementTags: message.placementTags,
270+
annotations: message.run.annotations,
270271
});
271272

272273
// Disabled for now

apps/supervisor/src/workloadManager/kubernetes.ts

Lines changed: 87 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ export class KubernetesWorkloadManager implements WorkloadManager {
120120
},
121121
spec: {
122122
...this.addPlacementTags(this.#defaultPodSpec, opts.placementTags),
123-
affinity: this.#getAffinity(opts.machine, opts.projectId),
123+
affinity: this.#getAffinity(opts),
124124
terminationGracePeriodSeconds: 60 * 60,
125125
containers: [
126126
{
@@ -335,13 +335,22 @@ export class KubernetesWorkloadManager implements WorkloadManager {
335335
};
336336
}
337337

338+
#isScheduledRun(opts: WorkloadManagerCreateOptions): boolean {
339+
return opts.annotations?.rootTriggerSource === "schedule";
340+
}
341+
338342
#getSharedLabels(opts: WorkloadManagerCreateOptions): Record<string, string> {
339343
return {
340344
env: opts.envId,
341345
envtype: this.#envTypeToLabelValue(opts.envType),
342346
org: opts.orgId,
343347
project: opts.projectId,
344348
machine: opts.machine.name,
349+
// We intentionally use a boolean label rather than exposing the full trigger source
350+
// (e.g. sdk, api, cli, mcp, schedule) to keep label cardinality low in metrics.
351+
// The schedule vs non-schedule distinction is all we need for the current metrics
352+
// and pool-level scheduling decisions; finer-grained source breakdowns live in run annotations.
353+
scheduled: String(this.#isScheduledRun(opts)),
345354
};
346355
}
347356

@@ -390,22 +399,43 @@ export class KubernetesWorkloadManager implements WorkloadManager {
390399
return preset.name.startsWith("large-");
391400
}
392401

393-
#getAffinity(preset: MachinePreset, projectId: string): k8s.V1Affinity | undefined {
394-
const nodeAffinity = this.#getNodeAffinityRules(preset);
395-
const podAffinity = this.#getProjectPodAffinity(projectId);
396-
397-
if (!nodeAffinity && !podAffinity) {
402+
#getAffinity(opts: WorkloadManagerCreateOptions): k8s.V1Affinity | undefined {
403+
const largeNodeAffinity = this.#getNodeAffinityRules(opts.machine);
404+
const scheduleNodeAffinity = this.#getScheduleNodeAffinityRules(this.#isScheduledRun(opts));
405+
const podAffinity = this.#getProjectPodAffinity(opts.projectId);
406+
407+
// Merge node affinity rules from multiple sources
408+
const preferred = [
409+
...(largeNodeAffinity?.preferredDuringSchedulingIgnoredDuringExecution ?? []),
410+
...(scheduleNodeAffinity?.preferredDuringSchedulingIgnoredDuringExecution ?? []),
411+
];
412+
// Only large machine affinity produces hard requirements (non-large runs must stay off the large pool).
413+
// Schedule affinity is soft both ways.
414+
const required = [
415+
...(largeNodeAffinity?.requiredDuringSchedulingIgnoredDuringExecution?.nodeSelectorTerms ?? []),
416+
];
417+
418+
const hasNodeAffinity = preferred.length > 0 || required.length > 0;
419+
420+
if (!hasNodeAffinity && !podAffinity) {
398421
return undefined;
399422
}
400423

401424
return {
402-
...(nodeAffinity && { nodeAffinity }),
425+
...(hasNodeAffinity && {
426+
nodeAffinity: {
427+
...(preferred.length > 0 && { preferredDuringSchedulingIgnoredDuringExecution: preferred }),
428+
...(required.length > 0 && {
429+
requiredDuringSchedulingIgnoredDuringExecution: { nodeSelectorTerms: required },
430+
}),
431+
},
432+
}),
403433
...(podAffinity && { podAffinity }),
404434
};
405435
}
406436

407437
#getNodeAffinityRules(preset: MachinePreset): k8s.V1NodeAffinity | undefined {
408-
if (!env.KUBERNETES_LARGE_MACHINE_POOL_LABEL) {
438+
if (!env.KUBERNETES_LARGE_MACHINE_AFFINITY_ENABLED) {
409439
return undefined;
410440
}
411441

@@ -414,13 +444,13 @@ export class KubernetesWorkloadManager implements WorkloadManager {
414444
return {
415445
preferredDuringSchedulingIgnoredDuringExecution: [
416446
{
417-
weight: 100,
447+
weight: env.KUBERNETES_LARGE_MACHINE_AFFINITY_WEIGHT,
418448
preference: {
419449
matchExpressions: [
420450
{
421-
key: "node.cluster.x-k8s.io/machinepool",
451+
key: env.KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_KEY,
422452
operator: "In",
423-
values: [env.KUBERNETES_LARGE_MACHINE_POOL_LABEL],
453+
values: [env.KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_VALUE],
424454
},
425455
],
426456
},
@@ -436,9 +466,9 @@ export class KubernetesWorkloadManager implements WorkloadManager {
436466
{
437467
matchExpressions: [
438468
{
439-
key: "node.cluster.x-k8s.io/machinepool",
469+
key: env.KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_KEY,
440470
operator: "NotIn",
441-
values: [env.KUBERNETES_LARGE_MACHINE_POOL_LABEL],
471+
values: [env.KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_VALUE],
442472
},
443473
],
444474
},
@@ -447,6 +477,50 @@ export class KubernetesWorkloadManager implements WorkloadManager {
447477
};
448478
}
449479

480+
#getScheduleNodeAffinityRules(isScheduledRun: boolean): k8s.V1NodeAffinity | undefined {
481+
if (!env.KUBERNETES_SCHEDULE_AFFINITY_ENABLED || !env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE) {
482+
return undefined;
483+
}
484+
485+
if (isScheduledRun) {
486+
// soft preference for the schedule pool
487+
return {
488+
preferredDuringSchedulingIgnoredDuringExecution: [
489+
{
490+
weight: env.KUBERNETES_SCHEDULE_AFFINITY_WEIGHT,
491+
preference: {
492+
matchExpressions: [
493+
{
494+
key: env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY,
495+
operator: "In",
496+
values: [env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE],
497+
},
498+
],
499+
},
500+
},
501+
],
502+
};
503+
}
504+
505+
// soft anti-affinity: non-schedule runs prefer to avoid the schedule pool
506+
return {
507+
preferredDuringSchedulingIgnoredDuringExecution: [
508+
{
509+
weight: env.KUBERNETES_SCHEDULE_ANTI_AFFINITY_WEIGHT,
510+
preference: {
511+
matchExpressions: [
512+
{
513+
key: env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY,
514+
operator: "NotIn",
515+
values: [env.KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE],
516+
},
517+
],
518+
},
519+
},
520+
],
521+
};
522+
}
523+
450524
#getProjectPodAffinity(projectId: string): k8s.V1PodAffinity | undefined {
451525
if (!env.KUBERNETES_PROJECT_AFFINITY_ENABLED) {
452526
return undefined;

apps/supervisor/src/workloadManager/types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { EnvironmentType, MachinePreset, PlacementTag } from "@trigger.dev/core/v3";
1+
import type { EnvironmentType, MachinePreset, PlacementTag, RunAnnotations } from "@trigger.dev/core/v3";
22

33
export interface WorkloadManagerOptions {
44
workloadApiProtocol: "http" | "https";
@@ -35,4 +35,5 @@ export interface WorkloadManagerCreateOptions {
3535
runFriendlyId: string;
3636
snapshotId: string;
3737
snapshotFriendlyId: string;
38+
annotations?: RunAnnotations;
3839
}

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { BillingCache } from "../billingCache.js";
22
import { startSpan } from "@internal/tracing";
33
import { assertExhaustive, tryCatch } from "@trigger.dev/core";
4-
import { DequeuedMessage, RetryOptions } from "@trigger.dev/core/v3";
4+
import { DequeuedMessage, RetryOptions, RunAnnotations } from "@trigger.dev/core/v3";
55
import { placementTag } from "@trigger.dev/core/v3/serverOnly";
66
import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic";
77
import {
@@ -575,6 +575,7 @@ export class DequeueSystem {
575575
// Keeping this for backwards compatibility, but really this should be called workerQueue
576576
masterQueue: lockedTaskRun.workerQueue,
577577
traceContext: lockedTaskRun.traceContext as Record<string, unknown>,
578+
annotations: RunAnnotations.safeParse(lockedTaskRun.annotations).data,
578579
},
579580
environment: {
580581
id: lockedTaskRun.runtimeEnvironment.id,

packages/core/src/v3/schemas/api.ts

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -540,24 +540,8 @@ export const DeploymentTriggeredVia = z
540540

541541
export type DeploymentTriggeredVia = z.infer<typeof DeploymentTriggeredVia>;
542542

543-
export const TriggerSource = z
544-
.enum(["sdk", "api", "dashboard", "cli", "mcp", "schedule"])
545-
.or(anyString);
546-
547-
export type TriggerSource = z.infer<typeof TriggerSource>;
548-
549-
export const TriggerAction = z.enum(["trigger", "replay", "test"]).or(anyString);
550-
551-
export type TriggerAction = z.infer<typeof TriggerAction>;
552-
553-
export const RunAnnotations = z.object({
554-
triggerSource: TriggerSource,
555-
triggerAction: TriggerAction,
556-
rootTriggerSource: TriggerSource,
557-
rootScheduleId: z.string().optional(),
558-
});
559-
560-
export type RunAnnotations = z.infer<typeof RunAnnotations>;
543+
// TriggerSource, TriggerAction, and RunAnnotations are defined in runEngine.ts
544+
// They are re-exported through the schemas barrel (index.ts)
561545

562546
export const UpsertBranchRequestBody = z.object({
563547
git: GitMeta.optional(),

packages/core/src/v3/schemas/runEngine.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,27 @@ import { Enum, MachinePreset, RuntimeEnvironmentType, TaskRunExecution } from ".
33
import { EnvironmentType } from "./schemas.js";
44
import type * as DB_TYPES from "@trigger.dev/database";
55

6+
const anyString = z.custom<string & {}>((v) => typeof v === "string");
7+
8+
export const TriggerSource = z
9+
.enum(["sdk", "api", "dashboard", "cli", "mcp", "schedule"])
10+
.or(anyString);
11+
12+
export type TriggerSource = z.infer<typeof TriggerSource>;
13+
14+
export const TriggerAction = z.enum(["trigger", "replay", "test"]).or(anyString);
15+
16+
export type TriggerAction = z.infer<typeof TriggerAction>;
17+
18+
export const RunAnnotations = z.object({
19+
triggerSource: TriggerSource,
20+
triggerAction: TriggerAction,
21+
rootTriggerSource: TriggerSource,
22+
rootScheduleId: z.string().optional(),
23+
});
24+
25+
export type RunAnnotations = z.infer<typeof RunAnnotations>;
26+
627
export const TaskRunExecutionStatus = {
728
RUN_CREATED: "RUN_CREATED",
829
QUEUED: "QUEUED",
@@ -259,6 +280,7 @@ export const DequeuedMessage = z.object({
259280
attemptNumber: z.number(),
260281
masterQueue: z.string(),
261282
traceContext: z.record(z.unknown()),
283+
annotations: RunAnnotations.optional(),
262284
}),
263285
environment: z.object({
264286
id: z.string(),

0 commit comments

Comments
 (0)