Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/enqueue-fast-path.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Reduce run start latency by skipping the intermediate queue when concurrency is available. This optimization is rolled out per-region and enabled automatically for development environments.
9 changes: 6 additions & 3 deletions apps/webapp/app/runEngine/concerns/queues.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ export class DefaultQueueManager implements QueueManager {
async getWorkerQueue(
environment: AuthenticatedEnvironment,
regionOverride?: string
): Promise<string | undefined> {
): Promise<{ masterQueue: string; enableFastPath: boolean } | undefined> {
if (environment.type === "DEVELOPMENT") {
return environment.id;
return { masterQueue: environment.id, enableFastPath: true };
}

const workerGroupService = new WorkerGroupService({
Expand All @@ -279,7 +279,10 @@ export class DefaultQueueManager implements QueueManager {
throw new ServiceValidationError("No worker group found");
}

return workerGroup.masterQueue;
return {
masterQueue: workerGroup.masterQueue,
enableFastPath: workerGroup.enableFastPath,
};
}
}

Expand Down
8 changes: 7 additions & 1 deletion apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,12 @@ export class RunEngineTriggerTaskService {

const depth = parentRun ? parentRun.depth + 1 : 0;

const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);
const workerQueueResult = await this.queueConcern.getWorkerQueue(
environment,
body.options?.region
);
const workerQueue = workerQueueResult?.masterQueue;
const enableFastPath = workerQueueResult?.enableFastPath ?? false;

// Build annotations for this run
const triggerSource = options.triggerSource ?? "api";
Expand Down Expand Up @@ -344,6 +349,7 @@ export class RunEngineTriggerTaskService {
queue: queueName,
lockedQueueId,
workerQueue,
enableFastPath,
isTest: body.options?.test ?? false,
delayUntil,
queuedAt: delayUntil ? undefined : new Date(),
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/runEngine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export interface QueueManager {
getWorkerQueue(
env: AuthenticatedEnvironment,
regionOverride?: string
): Promise<string | undefined>;
): Promise<{ masterQueue: string; enableFastPath: boolean } | undefined>;
}

export interface PayloadProcessor {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "public"."WorkerInstanceGroup" ADD COLUMN "enableFastPath" BOOLEAN NOT NULL DEFAULT false;
4 changes: 4 additions & 0 deletions internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,10 @@ model WorkerInstanceGroup {

workloadType WorkloadType @default(CONTAINER)

/// When true, runs enqueued to this worker queue may skip the intermediate queue
/// and be pushed directly to the worker queue when concurrency is available.
enableFastPath Boolean @default(false)

createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}
Expand Down
2 changes: 2 additions & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ export class RunEngine {
cliVersion,
concurrencyKey,
workerQueue,
enableFastPath,
queue,
lockedQueueId,
isTest,
Expand Down Expand Up @@ -799,6 +800,7 @@ export class RunEngine {
tx: prisma,
skipRunLock: true,
includeTtl: true,
enableFastPath,
});
} catch (enqueueError) {
this.logger.error("engine.trigger(): failed to schedule TTL or enqueue run", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export class EnqueueSystem {
runnerId,
skipRunLock,
includeTtl = false,
enableFastPath = false,
}: {
run: TaskRun;
env: MinimalAuthenticatedEnvironment;
Expand All @@ -57,6 +58,8 @@ export class EnqueueSystem {
skipRunLock?: boolean;
/** When true, include TTL in the queued message (only for first enqueue from trigger). Default false. */
includeTtl?: boolean;
/** When true, allow the queue to push directly to worker queue if concurrency is available. */
enableFastPath?: boolean;
}) {
const prisma = tx ?? this.$.prisma;

Expand Down Expand Up @@ -98,6 +101,7 @@ export class EnqueueSystem {
await this.$.runQueue.enqueueMessage({
env,
workerQueue,
enableFastPath,
message: {
runId: run.id,
taskIdentifier: run.taskIdentifier,
Expand Down
3 changes: 3 additions & 0 deletions internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ export type TriggerParams = {
cliVersion?: string;
concurrencyKey?: string;
workerQueue?: string;
/** When true, the run queue may push directly to the worker queue if concurrency is available.
* Gated per WorkerInstanceGroup (production) or always true (development). */
enableFastPath?: boolean;
queue: string;
lockedQueueId?: string;
isTest: boolean;
Expand Down
Loading
Loading