From 72c0fd2775e23d1009e4a15f4519506ce42c45b4 Mon Sep 17 00:00:00 2001 From: Sirn Thanabulpong Date: Wed, 20 May 2026 12:54:38 +0900 Subject: [PATCH 1/2] fix(dispatcher): move enforceContextLimit before concurrency acquire MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-dispatch context limit enforcement now runs before the concurrency slot is acquired. Previously, if enforceContextLimit threw a ContextLengthExceededError, the acquired slot was never released — leaking exactly one slot per oversized request until the provider deadlocked at maxConcurrency. --- .../__tests__/dispatcher-concurrency.test.ts | 44 +++++++++++++++++++ packages/backend/src/services/dispatcher.ts | 23 +++++----- 2 files changed, 56 insertions(+), 11 deletions(-) diff --git a/packages/backend/src/services/__tests__/dispatcher-concurrency.test.ts b/packages/backend/src/services/__tests__/dispatcher-concurrency.test.ts index 0eb1a7e0..bb91a1fb 100644 --- a/packages/backend/src/services/__tests__/dispatcher-concurrency.test.ts +++ b/packages/backend/src/services/__tests__/dispatcher-concurrency.test.ts @@ -317,4 +317,48 @@ describe('Dispatcher concurrency slot release', () => { // Verify no slots are leaked after a normal non-streaming dispatch expect(tracker.getProviderCount('p1')).toBe(0); }); + + test('enforceContextLimit does not leak concurrency slot', async () => { + setConfigForTesting({ + ...makeConfig(2), + models: { + 'test-alias': { + selector: 'in_order', + target_groups: [ + { + name: 'default', + selector: 'in_order', + targets: [{ provider: 'p1', model: 'model-a', enabled: true }], + }, + ], + }, + // model-a config with enforce_limits enabled and a tiny context window + 'model-a': { + enforce_limits: true, + context_length: 100, + }, + }, + } as any); + + const tracker = ConcurrencyTracker.getInstance(); + expect(tracker.getTargetCount('p1', 'model-a')).toBe(0); + + const dispatcher = new Dispatcher(); + try { + // Request with messages that exceed the tiny 100-token context limit + await dispatcher.dispatch({ + model: 'test-alias', + messages: [{ role: 'user', content: 'x'.repeat(1000) }], + incomingApiType: 'chat', + stream: false, + }); + } catch (e: any) { + // Error is expected (may be wrapped by buildAllTargetsFailedError) + expect(e).toBeDefined(); + } + + // The slot must NOT be leaked — enforceContextLimit ran before acquire() + expect(tracker.getTargetCount('p1', 'model-a')).toBe(0); + expect(tracker.getProviderCount('p1')).toBe(0); + }); }); diff --git a/packages/backend/src/services/dispatcher.ts b/packages/backend/src/services/dispatcher.ts index 408279c3..4312eb76 100644 --- a/packages/backend/src/services/dispatcher.ts +++ b/packages/backend/src/services/dispatcher.ts @@ -357,6 +357,18 @@ export class Dispatcher { continue; } + // Pre-dispatch context limit enforcement (opt-in per alias). Runs on + // the finalized per-target request — after any vision fallthrough has + // expanded the prompt and after cooldown has selected a live target — + // so we reject oversized prompts locally with a 400 instead of + // burning an upstream round trip on a guaranteed failure. Checked + // BEFORE acquiring a concurrency slot so that a thrown + // ContextLengthExceededError (a client-side problem; failing over to + // another target won't help) never leaks an acquired slot. + if (aliasConfig?.enforce_limits && route.canonicalModel) { + enforceContextLimit(currentRequest, aliasConfig, route.canonicalModel); + } + // Acquire concurrency slot before upstream request const acquired = ConcurrencyTracker.getInstance().acquire(route.provider, route.model); if (!acquired) { @@ -372,17 +384,6 @@ export class Dispatcher { continue; } - // Pre-dispatch context limit enforcement (opt-in per alias). Runs on - // the finalized per-target request — after any vision fallthrough has - // expanded the prompt and after cooldown has selected a live target — - // so we reject oversized prompts locally with a 400 instead of - // burning an upstream round trip on a guaranteed failure. A thrown - // ContextLengthExceededError escapes the loop (it's a client-side - // problem; failing over to another target won't help). - if (aliasConfig?.enforce_limits && route.canonicalModel) { - enforceContextLimit(currentRequest, aliasConfig, route.canonicalModel); - } - attemptedProviders.push(`${route.provider}/${route.model}`); let released = false; From f481101729174899be50a986cc4fca67796eb9a5 Mon Sep 17 00:00:00 2001 From: Sirn Thanabulpong Date: Wed, 20 May 2026 18:18:24 +0900 Subject: [PATCH 2/2] fix(dispatcher): release concurrency slot on TTFB stall timeout When the TTFB stall detection aborts the fetch request, the catch block either continues the failover loop or throws the stall error without calling doRelease(). This leaks the concurrency slot permanently. Each stalled request increments the count by one, eventually deadlocking the provider at maxConcurrency. --- .../__tests__/dispatcher-concurrency.test.ts | 39 +++++++++++++++++++ packages/backend/src/services/dispatcher.ts | 3 +- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/packages/backend/src/services/__tests__/dispatcher-concurrency.test.ts b/packages/backend/src/services/__tests__/dispatcher-concurrency.test.ts index bb91a1fb..fa02ca22 100644 --- a/packages/backend/src/services/__tests__/dispatcher-concurrency.test.ts +++ b/packages/backend/src/services/__tests__/dispatcher-concurrency.test.ts @@ -361,4 +361,43 @@ describe('Dispatcher concurrency slot release', () => { expect(tracker.getTargetCount('p1', 'model-a')).toBe(0); expect(tracker.getProviderCount('p1')).toBe(0); }); + + test('TTFB stall timeout releases concurrency slot', async () => { + // Enable stall detection with a very short TTFB timeout so the + // fetch itself gets aborted before the provider responds. + setConfigForTesting({ + ...makeConfig(2), + stall: { ttfbSeconds: 0.05, ttfbBytes: 100 }, + } as any); + + // Simulate a slow provider that respects abort signals. + // The stall timeout will abort before this resolves. + fetchMock.mockImplementation(async (_url: string, opts: any) => { + return new Promise((_resolve, reject) => { + const timer = setTimeout(() => { + _resolve(streamingResponse()); + }, 500); + opts?.signal?.addEventListener('abort', () => { + clearTimeout(timer); + reject(new DOMException('The operation was aborted', 'AbortError')); + }); + }); + }); + + const tracker = ConcurrencyTracker.getInstance(); + expect(tracker.getTargetCount('p1', 'model-a')).toBe(0); + + const dispatcher = new Dispatcher(); + try { + await dispatcher.dispatch(makeChatRequest(true)); + } catch { + // Expected — stall timeout error propagates + } + + // The slot must be released after the TTFB stall abort + expect(tracker.getTargetCount('p1', 'model-a')).toBe(0); + expect(tracker.getProviderCount('p1')).toBe(0); + expect(tracker.getTargetCount('p2', 'model-b')).toBe(0); + expect(tracker.getProviderCount('p2')).toBe(0); + }); }); diff --git a/packages/backend/src/services/dispatcher.ts b/packages/backend/src/services/dispatcher.ts index 4312eb76..21125b4c 100644 --- a/packages/backend/src/services/dispatcher.ts +++ b/packages/backend/src/services/dispatcher.ts @@ -642,9 +642,10 @@ export class Dispatcher { logger.info( `TTFB stall: fetch timed out after ${ttfbMs}ms for ${route.provider}/${route.model}, retrying with next provider` ); + doRelease(); continue; } - + doRelease(); throw stallError; } throw fetchError;