From 42ca84946fe0942104fcaa3b2d46b499e99e7ab8 Mon Sep 17 00:00:00 2001 From: Krishna Shukla Date: Fri, 15 May 2026 16:20:21 +0530 Subject: [PATCH 1/2] feat: port dynamic load-based idle process scaling from Python SDK --- agents/src/ipc/proc_pool.test.ts | 59 +++------------- agents/src/ipc/proc_pool.ts | 112 ++++++++++++++++--------------- agents/src/worker.ts | 19 ++++++ 3 files changed, 86 insertions(+), 104 deletions(-) diff --git a/agents/src/ipc/proc_pool.test.ts b/agents/src/ipc/proc_pool.test.ts index 491335b44..4f1dd6549 100644 --- a/agents/src/ipc/proc_pool.test.ts +++ b/agents/src/ipc/proc_pool.test.ts @@ -14,11 +14,11 @@ function createMockExecutor() { userArguments: {}, runningJob: undefined, status: JobStatus.RUNNING, - start: vi.fn(async () => {}), - join: vi.fn(async () => {}), - initialize: vi.fn(async () => {}), - close: vi.fn(async () => {}), - launchJob: vi.fn(async () => {}), + start: vi.fn(async () => { }), + join: vi.fn(async () => { }), + initialize: vi.fn(async () => { }), + close: vi.fn(async () => { }), + launchJob: vi.fn(async () => { }), }; return executor; } @@ -31,51 +31,16 @@ async function flushMicrotasks(ticks = 10): Promise { } describe('ProcPool warmed process lock handling', () => { - it('releases lock token from the dequeued warmed process entry', async (): Promise< - Throws - > => { - const pool = new ProcPool('agent', 1, 1000, 1000, undefined, 0, 0); - const unlock = vi.fn(); - const executor = createMockExecutor(); - const jobInfo = { - acceptArguments: { name: 'n', identity: 'i', metadata: '' }, - job: { id: 'job-id' }, - url: 'wss://example.com', - token: 'token', - workerId: 'worker-id', - } as unknown as RunningJobInfo; - - await pool.warmedProcQueue.put({ proc: executor, unlock }); - await pool.launchJob(jobInfo); - - expect(unlock).toHaveBeenCalledTimes(1); - expect(executor.launchJob).toHaveBeenCalledWith(jobInfo); - }); - - it('releases queued lock tokens during close', async () => { - const pool = new ProcPool('agent', 1, 1000, 1000, undefined, 0, 0); - const unlock = vi.fn(); - const executor = createMockExecutor(); - - await pool.warmedProcQueue.put({ proc: executor, unlock }); - pool.started = true; - await pool.close(); - - expect(unlock).toHaveBeenCalledTimes(1); - expect(executor.close).toHaveBeenCalledTimes(1); - }); - it('releases both init and proc locks when closed before proc starts', async () => { + it('releases init lock when closed before proc starts', async () => { const pool = new ProcPool('agent', 1, 1000, 1000, undefined, 0, 0); const initUnlock = vi.fn(); - const procUnlock = vi.fn(); pool.closed = true; pool.initMutex.lock = vi.fn(async () => initUnlock); - await pool.procWatchTask(procUnlock); + await pool.procWatchTask(); expect(initUnlock).toHaveBeenCalledTimes(1); - expect(procUnlock).toHaveBeenCalledTimes(1); }); it('releases initMutex after warming so concurrent procWatchTasks can initialise', async (): Promise< @@ -86,9 +51,8 @@ describe('ProcPool warmed process lock handling', () => { // the pool to effective concurrency 1 regardless of numIdleProcesses. const pool = new ProcPool('agent', 1, 1000, 1000, undefined, 0, 0); const initUnlock = vi.fn(); - const procUnlock = vi.fn(); - let joinResolve: () => void = () => {}; + let joinResolve: () => void = () => { }; const joinPromise = new Promise((resolve) => { joinResolve = resolve; }); @@ -106,7 +70,7 @@ describe('ProcPool warmed process lock handling', () => { pool.initMutex.lock = vi.fn(async () => initUnlock); try { - const watchPromise = pool.procWatchTask(procUnlock); + const watchPromise = pool.procWatchTask(); await flushMicrotasks(); // initMutex released while proc.join() is still pending. @@ -119,7 +83,6 @@ describe('ProcPool warmed process lock handling', () => { // finally block must not double-release. expect(initUnlock).toHaveBeenCalledTimes(1); - expect(procUnlock).not.toHaveBeenCalled(); } finally { jobProcExecutorSpy.mockRestore(); } @@ -130,7 +93,6 @@ describe('ProcPool warmed process lock handling', () => { > => { const pool = new ProcPool('agent', 1, 1000, 1000, undefined, 0, 0); const initUnlock = vi.fn(); - const procUnlock = vi.fn(); const mockProc: JobExecutor = { ...createMockExecutor(), @@ -148,10 +110,9 @@ describe('ProcPool warmed process lock handling', () => { pool.initMutex.lock = vi.fn(async () => initUnlock); try { - await pool.procWatchTask(procUnlock); + await pool.procWatchTask(); expect(initUnlock).toHaveBeenCalledTimes(1); - expect(procUnlock).toHaveBeenCalledTimes(1); expect(pool.warmedProcQueue.items.length).toBe(0); } finally { jobProcExecutorSpy.mockRestore(); diff --git a/agents/src/ipc/proc_pool.ts b/agents/src/ipc/proc_pool.ts index 3dffacbe5..79cc7798d 100644 --- a/agents/src/ipc/proc_pool.ts +++ b/agents/src/ipc/proc_pool.ts @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { MultiMutex, Mutex } from '@livekit/mutex'; +import { Mutex } from '@livekit/mutex'; import { type Throws, ThrowsPromise } from '@livekit/throws-transformer/throws'; import type { RunningJobInfo } from '../job.js'; import { Queue } from '../utils.js'; @@ -14,14 +14,17 @@ export class ProcPool { initializeTimeout: number; closeTimeout: number; executors: JobExecutor[] = []; - tasks: Promise[] = []; + spawnTasks: Set> = new Set(); started = false; closed = false; controller = new AbortController(); initMutex = new Mutex(); - procMutex?: MultiMutex; - // Keep each lock token paired with its warmed process so MultiMutex slots are always released correctly. - warmedProcQueue = new Queue<{ proc: JobExecutor; unlock: () => void }>(); + + targetIdleProcesses: number; + defaultNumIdleProcesses: number; + jobsWaitingForProcess = 0; + + warmedProcQueue = new Queue(); inferenceExecutor?: InferenceExecutor; memoryWarnMB: number; memoryLimitMB: number; @@ -36,9 +39,8 @@ export class ProcPool { memoryLimitMB: number, ) { this.agent = agent; - if (numIdleProcesses > 0) { - this.procMutex = new MultiMutex(numIdleProcesses); - } + this.targetIdleProcesses = numIdleProcesses; + this.defaultNumIdleProcesses = numIdleProcesses; this.initializeTimeout = initializeTimeout; this.closeTimeout = closeTimeout; this.inferenceExecutor = inferenceExecutor; @@ -54,33 +56,30 @@ export class ProcPool { return this.executors.find((x) => x.runningJob && x.runningJob.job.id === id) || null; } + setTargetIdleProcesses(num: number) { + this.targetIdleProcesses = num; + } + async launchJob(info: RunningJobInfo): Promise> { let proc: JobExecutor; - if (this.procMutex) { - const entry = await this.warmedProcQueue.get(); - proc = entry.proc; - // Release exactly the slot that produced this warmed process. - entry.unlock(); - } else { - proc = new JobProcExecutor( - this.agent, - this.inferenceExecutor, - this.initializeTimeout, - this.closeTimeout, - this.memoryWarnMB, - this.memoryLimitMB, - 2500, - 60000, - 500, - ); - this.executors.push(proc); - await proc.start(); - await proc.initialize(); + this.jobsWaitingForProcess++; + try { + if ( + this.warmedProcQueue.items.length === 0 && + this.spawnTasks.size < this.jobsWaitingForProcess + ) { + const task = this.procWatchTask(); + this.spawnTasks.add(task); + task.finally(() => this.spawnTasks.delete(task)); + } + proc = await this.warmedProcQueue.get(); + } finally { + this.jobsWaitingForProcess--; } await proc.launchJob(info); } - async procWatchTask(procUnlock: () => void) { + async procWatchTask() { const proc = new JobProcExecutor( this.agent, this.inferenceExecutor, @@ -98,7 +97,6 @@ export class ProcPool { const unlock = await this.initMutex.lock(); let initReleased = false; - let procUnlockTransferred = false; try { if (this.closed) { return; @@ -107,14 +105,11 @@ export class ProcPool { await proc.start(); try { await proc.initialize(); - await this.warmedProcQueue.put({ proc, unlock: procUnlock }); - procUnlockTransferred = true; - // Release initMutex after enqueue — holding it through join() serialises - // the pool to concurrency 1 since child procs are one-shot. + await this.warmedProcQueue.put(proc); unlock(); initReleased = true; } catch { - // Initialization failed before enqueue, so release the acquired slot immediately. + // Initialization failed before enqueue } await proc.join(); @@ -122,9 +117,6 @@ export class ProcPool { if (!initReleased) { unlock(); } - if (!procUnlockTransferred) { - procUnlock(); - } } } finally { const procIndex = this.executors.indexOf(proc); @@ -146,20 +138,31 @@ export class ProcPool { } async run(signal: AbortSignal) { - if (this.procMutex) { - while (!signal.aborted) { - const procUnlock = await this.procMutex.lock(); - const task = this.procWatchTask(procUnlock); - this.tasks.push(task); - task.finally(() => { - const taskIndex = this.tasks.indexOf(task); - if (taskIndex !== -1) { - this.tasks.splice(taskIndex, 1); - } else { - throw new Error(`task ${task} not found in tasks`); - } - }); + while (!signal.aborted) { + const currentPending = this.warmedProcQueue.items.length + this.spawnTasks.size; + const target = Math.max( + Math.min(this.targetIdleProcesses, this.defaultNumIdleProcesses), + this.jobsWaitingForProcess, + ); + const toSpawn = target - currentPending; + + for (let i = 0; i < toSpawn; i++) { + const task = this.procWatchTask(); + this.spawnTasks.add(task); + task.finally(() => this.spawnTasks.delete(task)); } + + await new Promise((resolve) => { + const timeout = setTimeout(resolve, 100); + signal.addEventListener( + 'abort', + () => { + clearTimeout(timeout); + resolve(); + }, + { once: true }, + ); + }); } } @@ -169,11 +172,10 @@ export class ProcPool { } this.closed = true; this.controller.abort(); - this.warmedProcQueue.items.forEach((e) => { - e.unlock(); - e.proc.close(); + this.warmedProcQueue.items.forEach((proc) => { + proc.close(); }); this.executors.forEach((e) => e.close()); - await ThrowsPromise.allSettled(this.tasks); + await ThrowsPromise.allSettled(Array.from(this.spawnTasks)); } } diff --git a/agents/src/worker.ts b/agents/src/worker.ts index b966e885d..daea37183 100644 --- a/agents/src/worker.ts +++ b/agents/src/worker.ts @@ -683,6 +683,25 @@ export class AgentServer { const currentlyAvailable = !isFull; currentStatus = currentlyAvailable ? WorkerStatus.WS_AVAILABLE : WorkerStatus.WS_FULL; + if (isFull) { + this.#procPool.setTargetIdleProcesses(this.#opts.numIdleProcesses); + } else { + const activeJobs = this.activeJobs.length; + if (activeJobs > 0) { + const jobLoad = currentLoad / activeJobs; + if (jobLoad > 0) { + const availableLoad = Math.max(this.#opts.loadThreshold - currentLoad, 0.0); + const availableJob = Math.min( + Math.ceil(availableLoad / jobLoad), + this.#opts.numIdleProcesses, + ); + this.#procPool.setTargetIdleProcesses(availableJob); + } + } else { + this.#procPool.setTargetIdleProcesses(this.#opts.numIdleProcesses); + } + } + if (oldStatus != currentStatus) { const extra = { load: currentLoad, loadThreshold: this.#opts.loadThreshold }; if (isFull) { From 89d525fc714073b1a9f06613624c9a757662866e Mon Sep 17 00:00:00 2001 From: Krishna Shukla Date: Fri, 15 May 2026 17:54:28 +0530 Subject: [PATCH 2/2] added chageset --- .changeset/heavy-bats-poke.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/heavy-bats-poke.md diff --git a/.changeset/heavy-bats-poke.md b/.changeset/heavy-bats-poke.md new file mode 100644 index 000000000..ac6808c3c --- /dev/null +++ b/.changeset/heavy-bats-poke.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents': patch +--- + +SDK to support dynamic, load-based idle process scaling