diff --git a/.server-changes/enqueue-fast-path.md b/.server-changes/enqueue-fast-path.md new file mode 100644 index 00000000000..65ff0dbaca8 --- /dev/null +++ b/.server-changes/enqueue-fast-path.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Reduce run start latency by skipping the intermediate queue when concurrency is available. This optimization is rolled out per-region and enabled automatically for development environments. diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts index 12b0b29c5ff..07f3cb0fc73 100644 --- a/apps/webapp/app/runEngine/concerns/queues.server.ts +++ b/apps/webapp/app/runEngine/concerns/queues.server.ts @@ -254,9 +254,9 @@ export class DefaultQueueManager implements QueueManager { async getWorkerQueue( environment: AuthenticatedEnvironment, regionOverride?: string - ): Promise { + ): Promise<{ masterQueue: string; enableFastPath: boolean } | undefined> { if (environment.type === "DEVELOPMENT") { - return environment.id; + return { masterQueue: environment.id, enableFastPath: true }; } const workerGroupService = new WorkerGroupService({ @@ -279,7 +279,10 @@ export class DefaultQueueManager implements QueueManager { throw new ServiceValidationError("No worker group found"); } - return workerGroup.masterQueue; + return { + masterQueue: workerGroup.masterQueue, + enableFastPath: workerGroup.enableFastPath, + }; } } diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 1fe3b839e39..f7c0252fa33 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -290,7 +290,12 @@ export class RunEngineTriggerTaskService { const depth = parentRun ? parentRun.depth + 1 : 0; - const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region); + const workerQueueResult = await this.queueConcern.getWorkerQueue( + environment, + body.options?.region + ); + const workerQueue = workerQueueResult?.masterQueue; + const enableFastPath = workerQueueResult?.enableFastPath ?? false; // Build annotations for this run const triggerSource = options.triggerSource ?? "api"; @@ -344,6 +349,7 @@ export class RunEngineTriggerTaskService { queue: queueName, lockedQueueId, workerQueue, + enableFastPath, isTest: body.options?.test ?? false, delayUntil, queuedAt: delayUntil ? undefined : new Date(), diff --git a/apps/webapp/app/runEngine/types.ts b/apps/webapp/app/runEngine/types.ts index 3fc8d8034b7..91a3da2bbba 100644 --- a/apps/webapp/app/runEngine/types.ts +++ b/apps/webapp/app/runEngine/types.ts @@ -70,7 +70,7 @@ export interface QueueManager { getWorkerQueue( env: AuthenticatedEnvironment, regionOverride?: string - ): Promise; + ): Promise<{ masterQueue: string; enableFastPath: boolean } | undefined>; } export interface PayloadProcessor { diff --git a/internal-packages/database/prisma/migrations/20260330210247_add_enable_fast_path_to_worker_instance_group/migration.sql b/internal-packages/database/prisma/migrations/20260330210247_add_enable_fast_path_to_worker_instance_group/migration.sql new file mode 100644 index 00000000000..b7ca1576264 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260330210247_add_enable_fast_path_to_worker_instance_group/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "public"."WorkerInstanceGroup" ADD COLUMN "enableFastPath" BOOLEAN NOT NULL DEFAULT false; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index 9b2388f87b1..033a550de90 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1331,6 +1331,10 @@ model WorkerInstanceGroup { workloadType WorkloadType @default(CONTAINER) + /// When true, runs enqueued to this worker queue may skip the intermediate queue + /// and be pushed directly to the worker queue when concurrency is available. + enableFastPath Boolean @default(false) + createdAt DateTime @default(now()) updatedAt DateTime @updatedAt } diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index e459dd85697..a1446d54b20 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -462,6 +462,7 @@ export class RunEngine { cliVersion, concurrencyKey, workerQueue, + enableFastPath, queue, lockedQueueId, isTest, @@ -799,6 +800,7 @@ export class RunEngine { tx: prisma, skipRunLock: true, includeTtl: true, + enableFastPath, }); } catch (enqueueError) { this.logger.error("engine.trigger(): failed to schedule TTL or enqueue run", { diff --git a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts index 9856fa855fc..d899aa7a6f3 100644 --- a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts @@ -36,6 +36,7 @@ export class EnqueueSystem { runnerId, skipRunLock, includeTtl = false, + enableFastPath = false, }: { run: TaskRun; env: MinimalAuthenticatedEnvironment; @@ -57,6 +58,8 @@ export class EnqueueSystem { skipRunLock?: boolean; /** When true, include TTL in the queued message (only for first enqueue from trigger). Default false. */ includeTtl?: boolean; + /** When true, allow the queue to push directly to worker queue if concurrency is available. */ + enableFastPath?: boolean; }) { const prisma = tx ?? this.$.prisma; @@ -98,6 +101,7 @@ export class EnqueueSystem { await this.$.runQueue.enqueueMessage({ env, workerQueue, + enableFastPath, message: { runId: run.id, taskIdentifier: run.taskIdentifier, diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index f30a0fb3796..6ecb726e3af 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -181,6 +181,9 @@ export type TriggerParams = { cliVersion?: string; concurrencyKey?: string; workerQueue?: string; + /** When true, the run queue may push directly to the worker queue if concurrency is available. + * Gated per WorkerInstanceGroup (production) or always true (development). */ + enableFastPath?: boolean; queue: string; lockedQueueId?: string; isTest: boolean; diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 9088099ef1a..de0df73ad05 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -618,16 +618,40 @@ export class RunQueue { ); } + /** + * Enqueue a run message for execution. + * + * The Lua script atomically decides between two paths: + * + * **Fast path** (when `enableFastPath` is true and conditions are met): + * Skips the queue sorted set and pushes directly to the worker queue (Redis list). + * This eliminates the ~500ms debounce delay from the processQueueForWorkerQueue job. + * Conditions: no available messages in the queue, env concurrency available, queue concurrency available. + * + * **Slow path** (default, or when fast-path conditions aren't met): + * Adds to the queue sorted set, then a debounced processQueueForWorkerQueue job moves it + * to the worker queue after checking concurrency limits. + * + * Both paths are atomic within their respective Lua scripts. The fast path claims concurrency + * slots (SADD to currentConcurrency sets) identically to how the dequeue Lua does, so + * ack/nack/release operations work the same regardless of which path was taken. + * + * @param enableFastPath - Gated per WorkerInstanceGroup. Dev environments always true. + * Production regions opt in via the `enableFastPath` flag on WorkerInstanceGroup. + */ public async enqueueMessage({ env, message, workerQueue, skipDequeueProcessing = false, + enableFastPath = false, }: { env: MinimalAuthenticatedEnvironment; message: InputPayload; workerQueue: string; skipDequeueProcessing?: boolean; + /** When true, the Lua script may push directly to the worker queue if concurrency is available. */ + enableFastPath?: boolean; }) { return await this.#trace( "enqueueMessage", @@ -656,8 +680,28 @@ export class RunQueue { attempt: 0, }; - if (!skipDequeueProcessing) { - // This will move the message to the worker queue so it can be dequeued + // Pass TTL info to enqueue so it can be added atomically + const ttlInfo = + message.ttlExpiresAt && this.options.ttlSystem + ? { + ttlExpiresAt: message.ttlExpiresAt, + ttlQueueKey: this.keys.ttlQueueKeyForShard(this.#getTtlShardForQueue(queueKey)), + ttlMember: `${queueKey}|${message.runId}|${message.orgId}`, + } + : undefined; + + // Enqueue the message. The Lua script atomically decides whether to fast-path + // (push directly to worker queue) or slow-path (add to queue sorted set). + const fastPathTaken = await this.#callEnqueueMessage( + messagePayload, + ttlInfo, + enableFastPath + ); + + span.setAttribute("fastPath", fastPathTaken); + + if (!fastPathTaken && !skipDequeueProcessing) { + // Slow path: schedule the dequeue job to move the message from queue to worker queue await this.worker.enqueueOnce({ id: dedupQueueKey, // dedupe by environment and base queue (CK wildcard for CK queues) job: "processQueueForWorkerQueue", @@ -670,18 +714,6 @@ export class RunQueue { availableAt: new Date(Date.now() + (this.options.processWorkerQueueDebounceMs ?? 500)), // 500ms from now }); } - - // Pass TTL info to enqueue so it can be added atomically - const ttlInfo = - message.ttlExpiresAt && this.options.ttlSystem - ? { - ttlExpiresAt: message.ttlExpiresAt, - ttlQueueKey: this.keys.ttlQueueKeyForShard(this.#getTtlShardForQueue(queueKey)), - ttlMember: `${queueKey}|${message.runId}|${message.orgId}`, - } - : undefined; - - await this.#callEnqueueMessage(messagePayload, ttlInfo); }, { kind: SpanKind.PRODUCER, @@ -1711,14 +1743,38 @@ export class RunQueue { }); } + /** + * Calls the appropriate Lua enqueue script variant (plain, TTL, CK, or TTL+CK). + * + * Each variant receives the same fast-path keys/args in addition to its own specific keys. + * The Lua script returns 1 (fast path taken) or 0 (slow path taken). + * + * All four variants share this KEYS layout: + * KEYS[1..N] = variant-specific keys (queue, message, concurrency, env, master, TTL, CK index) + * KEYS[N+1] = workerQueueKey (fast-path: RPUSH target) + * KEYS[N+2] = queueConcurrencyLimitKey + * KEYS[N+3] = envConcurrencyLimitKey + * KEYS[N+4] = envConcurrencyLimitBurstFactorKey + * + * And this ARGV layout (appended after variant-specific args): + * messageKeyValue (fast-path: value to RPUSH into worker queue) + * defaultEnvConcurrencyLimit (fallback if no Redis key set) + * defaultEnvConcurrencyBurstFactor + * currentTime (ms timestamp for ZRANGEBYSCORE availability check) + * enableFastPath ('1' or '0') + * + * @returns true if the fast path was taken (message pushed directly to worker queue) + */ async #callEnqueueMessage( message: OutputPayloadV2, ttlInfo?: { ttlExpiresAt: number; ttlQueueKey: string; ttlMember: string; - } - ) { + }, + enableFastPath: boolean = false + ): Promise { + // --- Slow-path keys (used by all variants) --- const queueKey = message.queue; const messageKey = this.keys.messageKey(message.orgId, message.runId); const queueCurrentConcurrencyKey = this.keys.queueCurrentConcurrencyKeyFromQueue(message.queue); @@ -1731,10 +1787,25 @@ export class RunQueue { this.shardCount ); + // --- Fast-path keys (appended to each variant's KEYS) --- + const workerQueueKey = this.keys.workerQueueKey(message.workerQueue); + const queueConcurrencyLimitKey = this.keys.queueConcurrencyLimitKeyFromQueue(message.queue); + const envConcurrencyLimitKey = this.keys.envConcurrencyLimitKeyFromQueue(message.queue); + const envConcurrencyLimitBurstFactorKey = + this.keys.envConcurrencyLimitBurstFactorKeyFromQueue(message.queue); + // The value stored in the worker queue list — used to look up the message payload on dequeue + const messageKeyValue = messageKey; + const queueName = message.queue; const messageId = message.runId; const messageData = JSON.stringify(message); const messageScore = String(message.timestamp); + const currentTime = String(Date.now()); + const enableFastPathArg = enableFastPath ? "1" : "0"; + const defaultEnvConcurrencyLimit = String(this.options.defaultEnvConcurrency); + const defaultEnvConcurrencyBurstFactor = String( + this.options.defaultEnvConcurrencyBurstFactor ?? 1.0 + ); this.logger.debug("Calling enqueueMessage", { queueKey, @@ -1749,17 +1820,21 @@ export class RunQueue { messageData, messageScore, masterQueueKey, + enableFastPath, ttlInfo, service: this.name, }); + let result: number; + // Use CK-aware enqueue for messages with concurrency keys if (message.concurrencyKey) { const ckIndexKey = this.keys.ckIndexKeyFromQueue(message.queue); const ckWildcardName = this.keys.toCkWildcard(message.queue); if (ttlInfo) { - await this.redis.enqueueMessageWithTtlCk( + result = await this.redis.enqueueMessageWithTtlCk( + // keys masterQueueKey, queueKey, messageKey, @@ -1770,16 +1845,27 @@ export class RunQueue { envQueueKey, ttlInfo.ttlQueueKey, ckIndexKey, + workerQueueKey, + queueConcurrencyLimitKey, + envConcurrencyLimitKey, + envConcurrencyLimitBurstFactorKey, + // args queueName, messageId, messageData, messageScore, ttlInfo.ttlMember, String(ttlInfo.ttlExpiresAt), - ckWildcardName + ckWildcardName, + messageKeyValue, + defaultEnvConcurrencyLimit, + defaultEnvConcurrencyBurstFactor, + currentTime, + enableFastPathArg ); } else { - await this.redis.enqueueMessageCk( + result = await this.redis.enqueueMessageCk( + // keys masterQueueKey, queueKey, messageKey, @@ -1789,16 +1875,27 @@ export class RunQueue { envCurrentDequeuedKey, envQueueKey, ckIndexKey, + workerQueueKey, + queueConcurrencyLimitKey, + envConcurrencyLimitKey, + envConcurrencyLimitBurstFactorKey, + // args queueName, messageId, messageData, messageScore, - ckWildcardName + ckWildcardName, + messageKeyValue, + defaultEnvConcurrencyLimit, + defaultEnvConcurrencyBurstFactor, + currentTime, + enableFastPathArg ); } } else if (ttlInfo) { // Use the TTL-aware enqueue that atomically adds to both queues - await this.redis.enqueueMessageWithTtl( + result = await this.redis.enqueueMessageWithTtl( + // keys masterQueueKey, queueKey, messageKey, @@ -1808,15 +1905,26 @@ export class RunQueue { envCurrentDequeuedKey, envQueueKey, ttlInfo.ttlQueueKey, + workerQueueKey, + queueConcurrencyLimitKey, + envConcurrencyLimitKey, + envConcurrencyLimitBurstFactorKey, + // args queueName, messageId, messageData, messageScore, ttlInfo.ttlMember, - String(ttlInfo.ttlExpiresAt) + String(ttlInfo.ttlExpiresAt), + messageKeyValue, + defaultEnvConcurrencyLimit, + defaultEnvConcurrencyBurstFactor, + currentTime, + enableFastPathArg ); } else { - await this.redis.enqueueMessage( + result = await this.redis.enqueueMessage( + // keys masterQueueKey, queueKey, messageKey, @@ -1825,12 +1933,24 @@ export class RunQueue { queueCurrentDequeuedKey, envCurrentDequeuedKey, envQueueKey, + workerQueueKey, + queueConcurrencyLimitKey, + envConcurrencyLimitKey, + envConcurrencyLimitBurstFactorKey, + // args queueName, messageId, messageData, - messageScore + messageScore, + messageKeyValue, + defaultEnvConcurrencyLimit, + defaultEnvConcurrencyBurstFactor, + currentTime, + enableFastPathArg ); } + + return result === 1; } async #callDequeueMessagesFromQueue({ @@ -2794,8 +2914,24 @@ end `, }); + // Enqueue message with optional fast path. + // + // Returns 1 (fast path: message pushed directly to worker queue) or 0 (slow path: message + // added to queue sorted set, needs processQueueForWorkerQueue to move it later). + // + // Fast-path conditions (all must be true): + // 1. enableFastPath == '1' (gated per WorkerInstanceGroup) + // 2. No available messages in the queue (ZRANGEBYSCORE finds nothing with score <= now) + // 3. Env concurrency has capacity (SCARD < limit * burstFactor) + // 4. Queue concurrency has capacity (SCARD < min(queueLimit, envLimit)) + // + // The fast path performs the same concurrency bookkeeping (SADD to currentConcurrency sets) + // as dequeueMessagesFromQueue, so ack/nack/release work identically for both paths. + // + // When enableFastPath == '0', the script skips the fast-path check entirely and behaves + // identically to the pre-fast-path version (with the addition of returning 0). this.redis.defineCommand("enqueueMessage", { - numberOfKeys: 8, + numberOfKeys: 12, lua: ` local masterQueueKey = KEYS[1] local queueKey = KEYS[2] @@ -2805,12 +2941,51 @@ local envCurrentConcurrencyKey = KEYS[5] local queueCurrentDequeuedKey = KEYS[6] local envCurrentDequeuedKey = KEYS[7] local envQueueKey = KEYS[8] +-- Fast-path keys (KEYS 9-12) +local workerQueueKey = KEYS[9] +local queueConcurrencyLimitKey = KEYS[10] +local envConcurrencyLimitKey = KEYS[11] +local envConcurrencyLimitBurstFactorKey = KEYS[12] local queueName = ARGV[1] local messageId = ARGV[2] local messageData = ARGV[3] local messageScore = ARGV[4] +-- Fast-path args (ARGV 5-9) +local messageKeyValue = ARGV[5] +local defaultEnvConcurrencyLimit = ARGV[6] +local defaultEnvConcurrencyBurstFactor = ARGV[7] +local currentTime = ARGV[8] +local enableFastPath = ARGV[9] + +-- Fast path: check if we can skip the queue and go directly to worker queue +if enableFastPath == '1' then + local available = redis.call('ZRANGEBYSCORE', queueKey, '-inf', currentTime, 'LIMIT', 0, 1) + if #available == 0 then + local envCurrent = tonumber(redis.call('SCARD', envCurrentConcurrencyKey) or '0') + local envLimit = tonumber(redis.call('GET', envConcurrencyLimitKey) or defaultEnvConcurrencyLimit) + local envBurstFactor = tonumber(redis.call('GET', envConcurrencyLimitBurstFactorKey) or defaultEnvConcurrencyBurstFactor) + local envLimitWithBurst = math.floor(envLimit * envBurstFactor) + + if envCurrent < envLimitWithBurst then + local queueCurrent = tonumber(redis.call('SCARD', queueCurrentConcurrencyKey) or '0') + local queueLimit = math.min( + tonumber(redis.call('GET', queueConcurrencyLimitKey) or '1000000'), + envLimit + ) + + if queueCurrent < queueLimit then + redis.call('SET', messageKey, messageData) + redis.call('SADD', queueCurrentConcurrencyKey, messageId) + redis.call('SADD', envCurrentConcurrencyKey, messageId) + redis.call('RPUSH', workerQueueKey, messageKeyValue) + return 1 + end + end + end +end +-- Slow path: normal enqueue -- Write the message to the message key redis.call('SET', messageKey, messageData) @@ -2834,12 +3009,17 @@ redis.call('SREM', queueCurrentConcurrencyKey, messageId) redis.call('SREM', envCurrentConcurrencyKey, messageId) redis.call('SREM', queueCurrentDequeuedKey, messageId) redis.call('SREM', envCurrentDequeuedKey, messageId) + +return 0 `, }); - // Enqueue with TTL tracking - atomically adds to both normal queue and TTL sorted set + // Enqueue with TTL tracking. Same fast-path logic as enqueueMessage. + // On fast path, the TTL sorted set is intentionally skipped — the expireRun worker job + // (scheduled independently before enqueue) handles TTL expiry. This mirrors what + // dequeueMessagesFromQueue does: it removes from the TTL set when dequeuing. this.redis.defineCommand("enqueueMessageWithTtl", { - numberOfKeys: 9, + numberOfKeys: 13, lua: ` local masterQueueKey = KEYS[1] local queueKey = KEYS[2] @@ -2850,6 +3030,11 @@ local queueCurrentDequeuedKey = KEYS[6] local envCurrentDequeuedKey = KEYS[7] local envQueueKey = KEYS[8] local ttlQueueKey = KEYS[9] +-- Fast-path keys (KEYS 10-13) +local workerQueueKey = KEYS[10] +local queueConcurrencyLimitKey = KEYS[11] +local envConcurrencyLimitKey = KEYS[12] +local envConcurrencyLimitBurstFactorKey = KEYS[13] local queueName = ARGV[1] local messageId = ARGV[2] @@ -2857,7 +3042,42 @@ local messageData = ARGV[3] local messageScore = ARGV[4] local ttlMember = ARGV[5] local ttlScore = ARGV[6] +-- Fast-path args (ARGV 7-11) +local messageKeyValue = ARGV[7] +local defaultEnvConcurrencyLimit = ARGV[8] +local defaultEnvConcurrencyBurstFactor = ARGV[9] +local currentTime = ARGV[10] +local enableFastPath = ARGV[11] + +-- Fast path: check if we can skip the queue and go directly to worker queue +if enableFastPath == '1' then + local available = redis.call('ZRANGEBYSCORE', queueKey, '-inf', currentTime, 'LIMIT', 0, 1) + if #available == 0 then + local envCurrent = tonumber(redis.call('SCARD', envCurrentConcurrencyKey) or '0') + local envLimit = tonumber(redis.call('GET', envConcurrencyLimitKey) or defaultEnvConcurrencyLimit) + local envBurstFactor = tonumber(redis.call('GET', envConcurrencyLimitBurstFactorKey) or defaultEnvConcurrencyBurstFactor) + local envLimitWithBurst = math.floor(envLimit * envBurstFactor) + + if envCurrent < envLimitWithBurst then + local queueCurrent = tonumber(redis.call('SCARD', queueCurrentConcurrencyKey) or '0') + local queueLimit = math.min( + tonumber(redis.call('GET', queueConcurrencyLimitKey) or '1000000'), + envLimit + ) + + if queueCurrent < queueLimit then + redis.call('SET', messageKey, messageData) + redis.call('SADD', queueCurrentConcurrencyKey, messageId) + redis.call('SADD', envCurrentConcurrencyKey, messageId) + redis.call('RPUSH', workerQueueKey, messageKeyValue) + -- Skip TTL sorted set: the expireRun worker job handles TTL expiry independently + return 1 + end + end + end +end +-- Slow path: normal enqueue -- Write the message to the message key redis.call('SET', messageKey, messageData) @@ -2884,12 +3104,16 @@ redis.call('SREM', queueCurrentConcurrencyKey, messageId) redis.call('SREM', envCurrentConcurrencyKey, messageId) redis.call('SREM', queueCurrentDequeuedKey, messageId) redis.call('SREM', envCurrentDequeuedKey, messageId) + +return 0 `, }); - // CK-aware enqueue: adds to CK index + master queue with :ck:* member + // CK-aware enqueue: adds to CK index + master queue with :ck:* member. + // Same fast-path logic as enqueueMessage. For CK queues, the fast-path checks the + // per-CK sub-queue for available messages and the per-CK concurrency limit. this.redis.defineCommand("enqueueMessageCk", { - numberOfKeys: 9, + numberOfKeys: 13, lua: ` local masterQueueKey = KEYS[1] local queueKey = KEYS[2] @@ -2900,13 +3124,53 @@ local queueCurrentDequeuedKey = KEYS[6] local envCurrentDequeuedKey = KEYS[7] local envQueueKey = KEYS[8] local ckIndexKey = KEYS[9] +-- Fast-path keys (KEYS 10-13) +local workerQueueKey = KEYS[10] +local queueConcurrencyLimitKey = KEYS[11] +local envConcurrencyLimitKey = KEYS[12] +local envConcurrencyLimitBurstFactorKey = KEYS[13] local queueName = ARGV[1] local messageId = ARGV[2] local messageData = ARGV[3] local messageScore = ARGV[4] local ckWildcardName = ARGV[5] +-- Fast-path args (ARGV 6-10) +local messageKeyValue = ARGV[6] +local defaultEnvConcurrencyLimit = ARGV[7] +local defaultEnvConcurrencyBurstFactor = ARGV[8] +local currentTime = ARGV[9] +local enableFastPath = ARGV[10] + +-- Fast path: check if we can skip the queue and go directly to worker queue +if enableFastPath == '1' then + local available = redis.call('ZRANGEBYSCORE', queueKey, '-inf', currentTime, 'LIMIT', 0, 1) + if #available == 0 then + local envCurrent = tonumber(redis.call('SCARD', envCurrentConcurrencyKey) or '0') + local envLimit = tonumber(redis.call('GET', envConcurrencyLimitKey) or defaultEnvConcurrencyLimit) + local envBurstFactor = tonumber(redis.call('GET', envConcurrencyLimitBurstFactorKey) or defaultEnvConcurrencyBurstFactor) + local envLimitWithBurst = math.floor(envLimit * envBurstFactor) + + if envCurrent < envLimitWithBurst then + -- For CK queues, check per-CK concurrency (same key as queue concurrency) + local queueCurrent = tonumber(redis.call('SCARD', queueCurrentConcurrencyKey) or '0') + local queueLimit = math.min( + tonumber(redis.call('GET', queueConcurrencyLimitKey) or '1000000'), + envLimit + ) + + if queueCurrent < queueLimit then + redis.call('SET', messageKey, messageData) + redis.call('SADD', queueCurrentConcurrencyKey, messageId) + redis.call('SADD', envCurrentConcurrencyKey, messageId) + redis.call('RPUSH', workerQueueKey, messageKeyValue) + return 1 + end + end + end +end +-- Slow path: normal enqueue -- Write the message to the message key redis.call('SET', messageKey, messageData) @@ -2936,12 +3200,15 @@ redis.call('SREM', queueCurrentConcurrencyKey, messageId) redis.call('SREM', envCurrentConcurrencyKey, messageId) redis.call('SREM', queueCurrentDequeuedKey, messageId) redis.call('SREM', envCurrentDequeuedKey, messageId) + +return 0 `, }); - // CK-aware enqueue with TTL tracking + // CK-aware enqueue with TTL tracking. Combines CK and TTL behavior with fast path. + // On fast path: skips both the CK sub-queue and TTL sorted set. this.redis.defineCommand("enqueueMessageWithTtlCk", { - numberOfKeys: 10, + numberOfKeys: 14, lua: ` local masterQueueKey = KEYS[1] local queueKey = KEYS[2] @@ -2953,6 +3220,11 @@ local envCurrentDequeuedKey = KEYS[7] local envQueueKey = KEYS[8] local ttlQueueKey = KEYS[9] local ckIndexKey = KEYS[10] +-- Fast-path keys (KEYS 11-14) +local workerQueueKey = KEYS[11] +local queueConcurrencyLimitKey = KEYS[12] +local envConcurrencyLimitKey = KEYS[13] +local envConcurrencyLimitBurstFactorKey = KEYS[14] local queueName = ARGV[1] local messageId = ARGV[2] @@ -2961,7 +3233,42 @@ local messageScore = ARGV[4] local ttlMember = ARGV[5] local ttlScore = ARGV[6] local ckWildcardName = ARGV[7] +-- Fast-path args (ARGV 8-12) +local messageKeyValue = ARGV[8] +local defaultEnvConcurrencyLimit = ARGV[9] +local defaultEnvConcurrencyBurstFactor = ARGV[10] +local currentTime = ARGV[11] +local enableFastPath = ARGV[12] + +-- Fast path: check if we can skip the queue and go directly to worker queue +if enableFastPath == '1' then + local available = redis.call('ZRANGEBYSCORE', queueKey, '-inf', currentTime, 'LIMIT', 0, 1) + if #available == 0 then + local envCurrent = tonumber(redis.call('SCARD', envCurrentConcurrencyKey) or '0') + local envLimit = tonumber(redis.call('GET', envConcurrencyLimitKey) or defaultEnvConcurrencyLimit) + local envBurstFactor = tonumber(redis.call('GET', envConcurrencyLimitBurstFactorKey) or defaultEnvConcurrencyBurstFactor) + local envLimitWithBurst = math.floor(envLimit * envBurstFactor) + + if envCurrent < envLimitWithBurst then + local queueCurrent = tonumber(redis.call('SCARD', queueCurrentConcurrencyKey) or '0') + local queueLimit = math.min( + tonumber(redis.call('GET', queueConcurrencyLimitKey) or '1000000'), + envLimit + ) + + if queueCurrent < queueLimit then + redis.call('SET', messageKey, messageData) + redis.call('SADD', queueCurrentConcurrencyKey, messageId) + redis.call('SADD', envCurrentConcurrencyKey, messageId) + redis.call('RPUSH', workerQueueKey, messageKeyValue) + -- Skip TTL sorted set: the expireRun worker job handles TTL expiry independently + return 1 + end + end + end +end +-- Slow path: normal enqueue -- Write the message to the message key redis.call('SET', messageKey, messageData) @@ -2994,6 +3301,8 @@ redis.call('SREM', queueCurrentConcurrencyKey, messageId) redis.call('SREM', envCurrentConcurrencyKey, messageId) redis.call('SREM', queueCurrentDequeuedKey, messageId) redis.call('SREM', envCurrentDequeuedKey, messageId) + +return 0 `, }); @@ -3870,13 +4179,22 @@ declare module "@internal/redis" { queueCurrentDequeuedKey: string, envCurrentDequeuedKey: string, envQueueKey: string, + workerQueueKey: string, + queueConcurrencyLimitKey: string, + envConcurrencyLimitKey: string, + envConcurrencyLimitBurstFactorKey: string, //args queueName: string, messageId: string, messageData: string, messageScore: string, - callback?: Callback - ): Result; + messageKeyValue: string, + defaultEnvConcurrencyLimit: string, + defaultEnvConcurrencyBurstFactor: string, + currentTime: string, + enableFastPath: string, + callback?: Callback + ): Result; enqueueMessageWithTtl( //keys @@ -3889,6 +4207,10 @@ declare module "@internal/redis" { envCurrentDequeuedKey: string, envQueueKey: string, ttlQueueKey: string, + workerQueueKey: string, + queueConcurrencyLimitKey: string, + envConcurrencyLimitKey: string, + envConcurrencyLimitBurstFactorKey: string, //args queueName: string, messageId: string, @@ -3896,8 +4218,13 @@ declare module "@internal/redis" { messageScore: string, ttlMember: string, ttlScore: string, - callback?: Callback - ): Result; + messageKeyValue: string, + defaultEnvConcurrencyLimit: string, + defaultEnvConcurrencyBurstFactor: string, + currentTime: string, + enableFastPath: string, + callback?: Callback + ): Result; expireTtlRuns( //keys @@ -4057,14 +4384,23 @@ declare module "@internal/redis" { envCurrentDequeuedKey: string, envQueueKey: string, ckIndexKey: string, + workerQueueKey: string, + queueConcurrencyLimitKey: string, + envConcurrencyLimitKey: string, + envConcurrencyLimitBurstFactorKey: string, //args queueName: string, messageId: string, messageData: string, messageScore: string, ckWildcardName: string, - callback?: Callback - ): Result; + messageKeyValue: string, + defaultEnvConcurrencyLimit: string, + defaultEnvConcurrencyBurstFactor: string, + currentTime: string, + enableFastPath: string, + callback?: Callback + ): Result; enqueueMessageWithTtlCk( //keys @@ -4078,6 +4414,10 @@ declare module "@internal/redis" { envQueueKey: string, ttlQueueKey: string, ckIndexKey: string, + workerQueueKey: string, + queueConcurrencyLimitKey: string, + envConcurrencyLimitKey: string, + envConcurrencyLimitBurstFactorKey: string, //args queueName: string, messageId: string, @@ -4086,8 +4426,13 @@ declare module "@internal/redis" { ttlMember: string, ttlScore: string, ckWildcardName: string, - callback?: Callback - ): Result; + messageKeyValue: string, + defaultEnvConcurrencyLimit: string, + defaultEnvConcurrencyBurstFactor: string, + currentTime: string, + enableFastPath: string, + callback?: Callback + ): Result; dequeueMessagesFromCkQueue( //keys diff --git a/internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts b/internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts index bf4ed87f295..8ce2d68d5de 100644 --- a/internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/enqueueMessage.test.ts @@ -1,7 +1,7 @@ import { assertNonNullable, redisTest } from "@internal/testcontainers"; import { trace } from "@internal/tracing"; import { Logger } from "@trigger.dev/core/logger"; -import { describe } from "node:test"; +import { describe } from "vitest"; import { setTimeout } from "node:timers/promises"; import { FairQueueSelectionStrategy } from "../fairQueueSelectionStrategy.js"; import { RunQueue } from "../index.js"; @@ -39,7 +39,7 @@ const messageDev: InputPayload = { taskIdentifier: "task/my-task", orgId: "o1234", projectId: "p1234", - environmentId: "e4321", + environmentId: "e1234", environmentType: "DEVELOPMENT", queue: "task/my-task", timestamp: Date.now(), @@ -48,24 +48,28 @@ const messageDev: InputPayload = { vi.setConfig({ testTimeout: 60_000 }); -describe("RunQueue.enqueueMessage", () => { - redisTest("should add the message to the queue", async ({ redisContainer }) => { - const queue = new RunQueue({ - ...testOptions, - queueSelectionStrategy: new FairQueueSelectionStrategy({ - redis: { - keyPrefix: "runqueue:test:", - host: redisContainer.getHost(), - port: redisContainer.getPort(), - }, - keys: testOptions.keys, - }), +function createQueue(redisContainer: { getHost: () => string; getPort: () => number }, prefix = "runqueue:test:") { + return new RunQueue({ + ...testOptions, + queueSelectionStrategy: new FairQueueSelectionStrategy({ redis: { - keyPrefix: "runqueue:test:", + keyPrefix: prefix, host: redisContainer.getHost(), port: redisContainer.getPort(), }, - }); + keys: testOptions.keys, + }), + redis: { + keyPrefix: prefix, + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); +} + +describe("RunQueue.enqueueMessage", () => { + redisTest("should add the message to the queue", async ({ redisContainer }) => { + const queue = createQueue(redisContainer); try { //initial queue length @@ -127,3 +131,326 @@ describe("RunQueue.enqueueMessage", () => { } }); }); + +describe("RunQueue.enqueueMessage fast path", () => { + redisTest("should fast-path to worker queue when queue is empty and concurrency available", async ({ redisContainer }) => { + const queue = createQueue(redisContainer, "runqueue:fp1:"); + + try { + // Set concurrency limits + await queue.updateEnvConcurrencyLimits(authenticatedEnvDev); + + // Enqueue with fast path enabled + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: messageDev, + workerQueue: authenticatedEnvDev.id, + enableFastPath: true, + }); + + // Queue sorted set should be empty (fast path skips it) + const queueLength = await queue.lengthOfQueue(authenticatedEnvDev, messageDev.queue); + expect(queueLength).toBe(0); + + // Queue concurrency should be claimed (operational concurrency) + const queueConcurrency = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + messageDev.queue + ); + expect(queueConcurrency).toBe(1); + + // Message should be directly in worker queue - dequeue it + const dequeued = await queue.dequeueMessageFromWorkerQueue( + "test_12345", + authenticatedEnvDev.id, + { blockingPop: false } + ); + assertNonNullable(dequeued); + expect(dequeued.messageId).toEqual(messageDev.runId); + expect(dequeued.message.version).toEqual("2"); + } finally { + await queue.quit(); + } + }); + + redisTest("should take slow path when enableFastPath is false", async ({ redisContainer }) => { + const queue = createQueue(redisContainer, "runqueue:fp2:"); + + try { + await queue.updateEnvConcurrencyLimits(authenticatedEnvDev); + + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: messageDev, + workerQueue: authenticatedEnvDev.id, + enableFastPath: false, + }); + + // Message should be in the queue sorted set (slow path) + const queueLength = await queue.lengthOfQueue(authenticatedEnvDev, messageDev.queue); + expect(queueLength).toBe(1); + + // No concurrency claimed yet + const queueConcurrency = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + messageDev.queue + ); + expect(queueConcurrency).toBe(0); + } finally { + await queue.quit(); + } + }); + + redisTest("should take slow path when queue has available messages", async ({ redisContainer }) => { + const queue = createQueue(redisContainer, "runqueue:fp3:"); + + try { + await queue.updateEnvConcurrencyLimits(authenticatedEnvDev); + + // Enqueue a first message (slow path to populate the queue) + const message1: InputPayload = { + ...messageDev, + runId: "r1111", + timestamp: Date.now() - 1000, // in the past, so it's "available" + }; + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: message1, + workerQueue: authenticatedEnvDev.id, + enableFastPath: false, + }); + + // Now enqueue a second message with fast path + const message2: InputPayload = { + ...messageDev, + runId: "r2222", + timestamp: Date.now(), + }; + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: message2, + workerQueue: authenticatedEnvDev.id, + enableFastPath: true, + }); + + // Both messages should be in the queue sorted set (slow path for both) + const queueLength = await queue.lengthOfQueue(authenticatedEnvDev, messageDev.queue); + expect(queueLength).toBe(2); + } finally { + await queue.quit(); + } + }); + + redisTest("should fast-path when queue only has future-scored messages", async ({ redisContainer }) => { + const queue = createQueue(redisContainer, "runqueue:fp4:"); + + try { + await queue.updateEnvConcurrencyLimits(authenticatedEnvDev); + + // Enqueue a message with a future timestamp (simulating a nacked retry) + const futureMessage: InputPayload = { + ...messageDev, + runId: "r_future", + timestamp: Date.now() + 60_000, // 60 seconds in the future + }; + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: futureMessage, + workerQueue: authenticatedEnvDev.id, + enableFastPath: false, + }); + + // Queue has 1 message but it's not available (future score) + const queueLength = await queue.lengthOfQueue(authenticatedEnvDev, messageDev.queue); + expect(queueLength).toBe(1); + + // Now enqueue a new message with fast path + const newMessage: InputPayload = { + ...messageDev, + runId: "r_new", + timestamp: Date.now(), + }; + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: newMessage, + workerQueue: authenticatedEnvDev.id, + enableFastPath: true, + }); + + // The future message stays in queue, new message went to worker queue + const queueLength2 = await queue.lengthOfQueue(authenticatedEnvDev, messageDev.queue); + expect(queueLength2).toBe(1); // Only the future message + + // Queue concurrency claimed for the fast-pathed message + const queueConcurrency = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + messageDev.queue + ); + expect(queueConcurrency).toBe(1); + + // Can dequeue the fast-pathed message from worker queue + const dequeued = await queue.dequeueMessageFromWorkerQueue( + "test_12345", + authenticatedEnvDev.id, + { blockingPop: false } + ); + assertNonNullable(dequeued); + expect(dequeued.messageId).toEqual("r_new"); + } finally { + await queue.quit(); + } + }); + + redisTest("should take slow path when env concurrency is full", async ({ redisContainer }) => { + // Use a low concurrency limit + const lowConcurrencyEnv = { + ...authenticatedEnvDev, + maximumConcurrencyLimit: 1, + concurrencyLimitBurstFactor: new Decimal(1.0), + }; + + const queue = createQueue(redisContainer, "runqueue:fp5:"); + + try { + await queue.updateEnvConcurrencyLimits(lowConcurrencyEnv); + + // First message takes fast path + const message1: InputPayload = { + ...messageDev, + runId: "r_first", + timestamp: Date.now(), + }; + await queue.enqueueMessage({ + env: lowConcurrencyEnv, + message: message1, + workerQueue: lowConcurrencyEnv.id, + enableFastPath: true, + }); + + // Queue concurrency is now 1 (fast path claimed it) + const queueConcurrency = await queue.currentConcurrencyOfQueue( + lowConcurrencyEnv, + messageDev.queue + ); + expect(queueConcurrency).toBe(1); + + // Second message should take slow path (env concurrency full) + const message2: InputPayload = { + ...messageDev, + runId: "r_second", + timestamp: Date.now(), + }; + await queue.enqueueMessage({ + env: lowConcurrencyEnv, + message: message2, + workerQueue: lowConcurrencyEnv.id, + enableFastPath: true, + }); + + // Second message should be in queue sorted set + const queueLength = await queue.lengthOfQueue(lowConcurrencyEnv, messageDev.queue); + expect(queueLength).toBe(1); + + // Queue concurrency unchanged (still 1 from first message) + const queueConcurrency2 = await queue.currentConcurrencyOfQueue( + lowConcurrencyEnv, + messageDev.queue + ); + expect(queueConcurrency2).toBe(1); + } finally { + await queue.quit(); + } + }); + + redisTest("fast-path message can be acknowledged correctly", async ({ redisContainer }) => { + const queue = createQueue(redisContainer, "runqueue:fp6:"); + + try { + await queue.updateEnvConcurrencyLimits(authenticatedEnvDev); + + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: messageDev, + workerQueue: authenticatedEnvDev.id, + enableFastPath: true, + }); + + // Verify fast path was taken + const queueConcurrency = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + messageDev.queue + ); + expect(queueConcurrency).toBe(1); + + // Dequeue from worker queue + const dequeued = await queue.dequeueMessageFromWorkerQueue( + "test_12345", + authenticatedEnvDev.id, + { blockingPop: false } + ); + assertNonNullable(dequeued); + + // Acknowledge the message + await queue.acknowledgeMessage(messageDev.orgId, dequeued.messageId); + + // Queue concurrency should be released + const queueConcurrencyAfter = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + messageDev.queue + ); + expect(queueConcurrencyAfter).toBe(0); + } finally { + await queue.quit(); + } + }); + + redisTest("fast-path message can be nacked and re-enqueued", async ({ redisContainer }) => { + const queue = createQueue(redisContainer, "runqueue:fp7:"); + + try { + await queue.updateEnvConcurrencyLimits(authenticatedEnvDev); + + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: messageDev, + workerQueue: authenticatedEnvDev.id, + enableFastPath: true, + }); + + // Verify fast path was taken + const queueConcurrency = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + messageDev.queue + ); + expect(queueConcurrency).toBe(1); + + // Dequeue from worker queue + const dequeued = await queue.dequeueMessageFromWorkerQueue( + "test_12345", + authenticatedEnvDev.id, + { blockingPop: false } + ); + assertNonNullable(dequeued); + + // Nack the message (re-enqueue it) + await queue.nackMessage({ + orgId: messageDev.orgId, + messageId: dequeued.messageId, + retryAt: Date.now() + 1000, + }); + + // Queue concurrency should be released + const queueConcurrencyAfter = await queue.currentConcurrencyOfQueue( + authenticatedEnvDev, + messageDev.queue + ); + expect(queueConcurrencyAfter).toBe(0); + + // Message should now be in the queue sorted set + const queueLength = await queue.lengthOfQueue(authenticatedEnvDev, messageDev.queue); + expect(queueLength).toBe(1); + } finally { + await queue.quit(); + } + }); +});