Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/heavy-bats-poke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents': patch
---

SDK to support dynamic, load-based idle process scaling
59 changes: 10 additions & 49 deletions agents/src/ipc/proc_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ function createMockExecutor() {
userArguments: {},
runningJob: undefined,
status: JobStatus.RUNNING,
start: vi.fn(async () => {}),
join: vi.fn(async () => {}),
initialize: vi.fn(async () => {}),
close: vi.fn(async () => {}),
launchJob: vi.fn(async () => {}),
start: vi.fn(async () => { }),
join: vi.fn(async () => { }),
initialize: vi.fn(async () => { }),
close: vi.fn(async () => { }),
launchJob: vi.fn(async () => { }),
};
return executor;
}
Expand All @@ -31,51 +31,16 @@ async function flushMicrotasks(ticks = 10): Promise<void> {
}

describe('ProcPool warmed process lock handling', () => {
it('releases lock token from the dequeued warmed process entry', async (): Promise<
Throws<void, Error>
> => {
const pool = new ProcPool('agent', 1, 1000, 1000, undefined, 0, 0);
const unlock = vi.fn();
const executor = createMockExecutor();
const jobInfo = {
acceptArguments: { name: 'n', identity: 'i', metadata: '' },
job: { id: 'job-id' },
url: 'wss://example.com',
token: 'token',
workerId: 'worker-id',
} as unknown as RunningJobInfo;

await pool.warmedProcQueue.put({ proc: executor, unlock });
await pool.launchJob(jobInfo);

expect(unlock).toHaveBeenCalledTimes(1);
expect(executor.launchJob).toHaveBeenCalledWith(jobInfo);
});

it('releases queued lock tokens during close', async () => {
const pool = new ProcPool('agent', 1, 1000, 1000, undefined, 0, 0);
const unlock = vi.fn();
const executor = createMockExecutor();

await pool.warmedProcQueue.put({ proc: executor, unlock });
pool.started = true;
await pool.close();

expect(unlock).toHaveBeenCalledTimes(1);
expect(executor.close).toHaveBeenCalledTimes(1);
});

it('releases both init and proc locks when closed before proc starts', async () => {
it('releases init lock when closed before proc starts', async () => {
const pool = new ProcPool('agent', 1, 1000, 1000, undefined, 0, 0);
const initUnlock = vi.fn();
const procUnlock = vi.fn();
pool.closed = true;
pool.initMutex.lock = vi.fn(async () => initUnlock);

await pool.procWatchTask(procUnlock);
await pool.procWatchTask();

expect(initUnlock).toHaveBeenCalledTimes(1);
expect(procUnlock).toHaveBeenCalledTimes(1);
});

it('releases initMutex after warming so concurrent procWatchTasks can initialise', async (): Promise<
Expand All @@ -86,9 +51,8 @@ describe('ProcPool warmed process lock handling', () => {
// the pool to effective concurrency 1 regardless of numIdleProcesses.
const pool = new ProcPool('agent', 1, 1000, 1000, undefined, 0, 0);
const initUnlock = vi.fn();
const procUnlock = vi.fn();

let joinResolve: () => void = () => {};
let joinResolve: () => void = () => { };
const joinPromise = new Promise<void>((resolve) => {
joinResolve = resolve;
});
Expand All @@ -106,7 +70,7 @@ describe('ProcPool warmed process lock handling', () => {
pool.initMutex.lock = vi.fn(async () => initUnlock);

try {
const watchPromise = pool.procWatchTask(procUnlock);
const watchPromise = pool.procWatchTask();
await flushMicrotasks();

// initMutex released while proc.join() is still pending.
Expand All @@ -119,7 +83,6 @@ describe('ProcPool warmed process lock handling', () => {

// finally block must not double-release.
expect(initUnlock).toHaveBeenCalledTimes(1);
expect(procUnlock).not.toHaveBeenCalled();
} finally {
jobProcExecutorSpy.mockRestore();
}
Expand All @@ -130,7 +93,6 @@ describe('ProcPool warmed process lock handling', () => {
> => {
const pool = new ProcPool('agent', 1, 1000, 1000, undefined, 0, 0);
const initUnlock = vi.fn();
const procUnlock = vi.fn();

const mockProc: JobExecutor = {
...createMockExecutor(),
Expand All @@ -148,10 +110,9 @@ describe('ProcPool warmed process lock handling', () => {
pool.initMutex.lock = vi.fn(async () => initUnlock);

try {
await pool.procWatchTask(procUnlock);
await pool.procWatchTask();

expect(initUnlock).toHaveBeenCalledTimes(1);
expect(procUnlock).toHaveBeenCalledTimes(1);
expect(pool.warmedProcQueue.items.length).toBe(0);
} finally {
jobProcExecutorSpy.mockRestore();
Expand Down
112 changes: 57 additions & 55 deletions agents/src/ipc/proc_pool.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { MultiMutex, Mutex } from '@livekit/mutex';
import { Mutex } from '@livekit/mutex';
import { type Throws, ThrowsPromise } from '@livekit/throws-transformer/throws';
import type { RunningJobInfo } from '../job.js';
import { Queue } from '../utils.js';
Expand All @@ -14,14 +14,17 @@ export class ProcPool {
initializeTimeout: number;
closeTimeout: number;
executors: JobExecutor[] = [];
tasks: Promise<void>[] = [];
spawnTasks: Set<Promise<void>> = new Set();
started = false;
closed = false;
controller = new AbortController();
initMutex = new Mutex();
procMutex?: MultiMutex;
// Keep each lock token paired with its warmed process so MultiMutex slots are always released correctly.
warmedProcQueue = new Queue<{ proc: JobExecutor; unlock: () => void }>();

targetIdleProcesses: number;
defaultNumIdleProcesses: number;
jobsWaitingForProcess = 0;

warmedProcQueue = new Queue<JobExecutor>();
inferenceExecutor?: InferenceExecutor;
memoryWarnMB: number;
memoryLimitMB: number;
Expand All @@ -36,9 +39,8 @@ export class ProcPool {
memoryLimitMB: number,
) {
this.agent = agent;
if (numIdleProcesses > 0) {
this.procMutex = new MultiMutex(numIdleProcesses);
}
this.targetIdleProcesses = numIdleProcesses;
this.defaultNumIdleProcesses = numIdleProcesses;
this.initializeTimeout = initializeTimeout;
this.closeTimeout = closeTimeout;
this.inferenceExecutor = inferenceExecutor;
Expand All @@ -54,33 +56,30 @@ export class ProcPool {
return this.executors.find((x) => x.runningJob && x.runningJob.job.id === id) || null;
}

setTargetIdleProcesses(num: number) {
this.targetIdleProcesses = num;
}

async launchJob(info: RunningJobInfo): Promise<Throws<void, Error>> {
let proc: JobExecutor;
if (this.procMutex) {
const entry = await this.warmedProcQueue.get();
proc = entry.proc;
// Release exactly the slot that produced this warmed process.
entry.unlock();
} else {
proc = new JobProcExecutor(
this.agent,
this.inferenceExecutor,
this.initializeTimeout,
this.closeTimeout,
this.memoryWarnMB,
this.memoryLimitMB,
2500,
60000,
500,
);
this.executors.push(proc);
await proc.start();
await proc.initialize();
this.jobsWaitingForProcess++;
try {
if (
this.warmedProcQueue.items.length === 0 &&
this.spawnTasks.size < this.jobsWaitingForProcess
) {
const task = this.procWatchTask();
this.spawnTasks.add(task);
task.finally(() => this.spawnTasks.delete(task));
}
proc = await this.warmedProcQueue.get();
} finally {
this.jobsWaitingForProcess--;
}
await proc.launchJob(info);
}

async procWatchTask(procUnlock: () => void) {
async procWatchTask() {
const proc = new JobProcExecutor(
this.agent,
this.inferenceExecutor,
Expand All @@ -98,7 +97,6 @@ export class ProcPool {

const unlock = await this.initMutex.lock();
let initReleased = false;
let procUnlockTransferred = false;
try {
if (this.closed) {
return;
Expand All @@ -107,24 +105,18 @@ export class ProcPool {
await proc.start();
try {
await proc.initialize();
await this.warmedProcQueue.put({ proc, unlock: procUnlock });
procUnlockTransferred = true;
// Release initMutex after enqueue — holding it through join() serialises
// the pool to concurrency 1 since child procs are one-shot.
await this.warmedProcQueue.put(proc);
unlock();
initReleased = true;
} catch {
// Initialization failed before enqueue, so release the acquired slot immediately.
// Initialization failed before enqueue
}

await proc.join();
} finally {
if (!initReleased) {
unlock();
}
if (!procUnlockTransferred) {
procUnlock();
}
}
} finally {
const procIndex = this.executors.indexOf(proc);
Expand All @@ -146,20 +138,31 @@ export class ProcPool {
}

async run(signal: AbortSignal) {
if (this.procMutex) {
while (!signal.aborted) {
const procUnlock = await this.procMutex.lock();
const task = this.procWatchTask(procUnlock);
this.tasks.push(task);
task.finally(() => {
const taskIndex = this.tasks.indexOf(task);
if (taskIndex !== -1) {
this.tasks.splice(taskIndex, 1);
} else {
throw new Error(`task ${task} not found in tasks`);
}
});
while (!signal.aborted) {
const currentPending = this.warmedProcQueue.items.length + this.spawnTasks.size;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 spawnTasks overcounts by including tasks for processes already running jobs, preventing idle pool replenishment

In proc_pool.ts:142, currentPending is computed as warmedProcQueue.items.length + spawnTasks.size. However, spawnTasks includes tasks for processes that have already been consumed from the queue by launchJob() and are currently running jobs (still awaiting proc.join() at proc_pool.ts:115). This causes double-counting: idle processes appear both in items.length and spawnTasks, and running processes appear in spawnTasks despite no longer being idle. As a result, toSpawn = target - currentPending is systematically too low, and the pool never spawns replacement idle processes after jobs consume them.

Concrete scenario showing the bug

With numIdleProcesses=4, after 4 processes are warmed: queue=4, spawnTasks=4 (waiting on join). After 4 jobs consume all processes: queue=0, spawnTasks=4. A 5th job calls launchJob() where spawnTasks.size (4) < jobsWaitingForProcess (1) is false, so no demand spawn occurs. The run() loop computes currentPending=4, target=4, toSpawn=0. The 5th job blocks on warmedProcQueue.get() indefinitely until a prior job finishes — a regression from the old MultiMutex approach where entry.unlock() immediately freed a slot.

Prompt for agents
The root cause is that spawnTasks includes tasks for processes currently running jobs (they stay in spawnTasks until proc.join() resolves), so spawnTasks.size conflates initializing processes, idle processes, and running-job processes.

In the old code, the MultiMutex slot was released via entry.unlock() the moment a process was consumed from the queue, immediately allowing a replacement to be spawned. The new code needs an equivalent mechanism.

Possible approaches:
1. Track a separate counter or set for processes that are actively running jobs (post-consumption). Subtract those from currentPending or exclude them from spawnTasks.
2. Remove the spawn task from spawnTasks when its process is consumed from warmedProcQueue, and track the running-job lifecycle separately.
3. Use a different signal (like the queue's consumption event) to trigger replacement spawning, rather than relying on polling with spawnTasks.size.

The fix should also update the demand-driven check in launchJob() (line 69) which has the same overcounting issue: spawnTasks.size < this.jobsWaitingForProcess should compare against tasks that haven't yet produced a consumable process, not all spawn tasks.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

const target = Math.max(
Math.min(this.targetIdleProcesses, this.defaultNumIdleProcesses),
this.jobsWaitingForProcess,
);
const toSpawn = target - currentPending;

for (let i = 0; i < toSpawn; i++) {
const task = this.procWatchTask();
this.spawnTasks.add(task);
task.finally(() => this.spawnTasks.delete(task));
}

await new Promise<void>((resolve) => {
const timeout = setTimeout(resolve, 100);
signal.addEventListener(
'abort',
() => {
clearTimeout(timeout);
resolve();
},
{ once: true },
);
});
}
}

Expand All @@ -169,11 +172,10 @@ export class ProcPool {
}
this.closed = true;
this.controller.abort();
this.warmedProcQueue.items.forEach((e) => {
e.unlock();
e.proc.close();
this.warmedProcQueue.items.forEach((proc) => {
proc.close();
});
this.executors.forEach((e) => e.close());
await ThrowsPromise.allSettled(this.tasks);
await ThrowsPromise.allSettled(Array.from(this.spawnTasks));
}
}
19 changes: 19 additions & 0 deletions agents/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,25 @@ export class AgentServer {
const currentlyAvailable = !isFull;
currentStatus = currentlyAvailable ? WorkerStatus.WS_AVAILABLE : WorkerStatus.WS_FULL;

if (isFull) {
this.#procPool.setTargetIdleProcesses(this.#opts.numIdleProcesses);
Comment on lines +686 to +687
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 When worker is at full load, target idle processes is set to maximum instead of zero

In worker.ts:686-687, when isFull is true (worker load >= threshold), the code sets targetIdleProcesses to this.#opts.numIdleProcesses (the configured maximum). This is inverted — when the system is overloaded, idle processes should be reduced to 0, not maximized. The else branch correctly computes a lower target as load increases (proportional to remaining capacity), but the isFull branch jumps to max, defeating the purpose of load-based scaling and causing the pool to spawn/maintain idle processes even when the worker is at full capacity, wasting resources and potentially worsening the overload.

Suggested change
if (isFull) {
this.#procPool.setTargetIdleProcesses(this.#opts.numIdleProcesses);
if (isFull) {
this.#procPool.setTargetIdleProcesses(0);
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

} else {
const activeJobs = this.activeJobs.length;
if (activeJobs > 0) {
const jobLoad = currentLoad / activeJobs;
if (jobLoad > 0) {
const availableLoad = Math.max(this.#opts.loadThreshold - currentLoad, 0.0);
const availableJob = Math.min(
Math.ceil(availableLoad / jobLoad),
this.#opts.numIdleProcesses,
);
this.#procPool.setTargetIdleProcesses(availableJob);
}
} else {
this.#procPool.setTargetIdleProcesses(this.#opts.numIdleProcesses);
}
}

if (oldStatus != currentStatus) {
const extra = { load: currentLoad, loadThreshold: this.#opts.loadThreshold };
if (isFull) {
Expand Down