diff --git a/packages/backend/src/services/__tests__/dispatcher-concurrency.test.ts b/packages/backend/src/services/__tests__/dispatcher-concurrency.test.ts index 0eb1a7e0..fa02ca22 100644 --- a/packages/backend/src/services/__tests__/dispatcher-concurrency.test.ts +++ b/packages/backend/src/services/__tests__/dispatcher-concurrency.test.ts @@ -317,4 +317,87 @@ describe('Dispatcher concurrency slot release', () => { // Verify no slots are leaked after a normal non-streaming dispatch expect(tracker.getProviderCount('p1')).toBe(0); }); + + test('enforceContextLimit does not leak concurrency slot', async () => { + setConfigForTesting({ + ...makeConfig(2), + models: { + 'test-alias': { + selector: 'in_order', + target_groups: [ + { + name: 'default', + selector: 'in_order', + targets: [{ provider: 'p1', model: 'model-a', enabled: true }], + }, + ], + }, + // model-a config with enforce_limits enabled and a tiny context window + 'model-a': { + enforce_limits: true, + context_length: 100, + }, + }, + } as any); + + const tracker = ConcurrencyTracker.getInstance(); + expect(tracker.getTargetCount('p1', 'model-a')).toBe(0); + + const dispatcher = new Dispatcher(); + try { + // Request with messages that exceed the tiny 100-token context limit + await dispatcher.dispatch({ + model: 'test-alias', + messages: [{ role: 'user', content: 'x'.repeat(1000) }], + incomingApiType: 'chat', + stream: false, + }); + } catch (e: any) { + // Error is expected (may be wrapped by buildAllTargetsFailedError) + expect(e).toBeDefined(); + } + + // The slot must NOT be leaked — enforceContextLimit ran before acquire() + expect(tracker.getTargetCount('p1', 'model-a')).toBe(0); + expect(tracker.getProviderCount('p1')).toBe(0); + }); + + test('TTFB stall timeout releases concurrency slot', async () => { + // Enable stall detection with a very short TTFB timeout so the + // fetch itself gets aborted before the provider responds. + setConfigForTesting({ + ...makeConfig(2), + stall: { ttfbSeconds: 0.05, ttfbBytes: 100 }, + } as any); + + // Simulate a slow provider that respects abort signals. + // The stall timeout will abort before this resolves. + fetchMock.mockImplementation(async (_url: string, opts: any) => { + return new Promise((_resolve, reject) => { + const timer = setTimeout(() => { + _resolve(streamingResponse()); + }, 500); + opts?.signal?.addEventListener('abort', () => { + clearTimeout(timer); + reject(new DOMException('The operation was aborted', 'AbortError')); + }); + }); + }); + + const tracker = ConcurrencyTracker.getInstance(); + expect(tracker.getTargetCount('p1', 'model-a')).toBe(0); + + const dispatcher = new Dispatcher(); + try { + await dispatcher.dispatch(makeChatRequest(true)); + } catch { + // Expected — stall timeout error propagates + } + + // The slot must be released after the TTFB stall abort + expect(tracker.getTargetCount('p1', 'model-a')).toBe(0); + expect(tracker.getProviderCount('p1')).toBe(0); + expect(tracker.getTargetCount('p2', 'model-b')).toBe(0); + expect(tracker.getProviderCount('p2')).toBe(0); + }); }); diff --git a/packages/backend/src/services/dispatcher.ts b/packages/backend/src/services/dispatcher.ts index 408279c3..21125b4c 100644 --- a/packages/backend/src/services/dispatcher.ts +++ b/packages/backend/src/services/dispatcher.ts @@ -357,6 +357,18 @@ export class Dispatcher { continue; } + // Pre-dispatch context limit enforcement (opt-in per alias). Runs on + // the finalized per-target request — after any vision fallthrough has + // expanded the prompt and after cooldown has selected a live target — + // so we reject oversized prompts locally with a 400 instead of + // burning an upstream round trip on a guaranteed failure. Checked + // BEFORE acquiring a concurrency slot so that a thrown + // ContextLengthExceededError (a client-side problem; failing over to + // another target won't help) never leaks an acquired slot. + if (aliasConfig?.enforce_limits && route.canonicalModel) { + enforceContextLimit(currentRequest, aliasConfig, route.canonicalModel); + } + // Acquire concurrency slot before upstream request const acquired = ConcurrencyTracker.getInstance().acquire(route.provider, route.model); if (!acquired) { @@ -372,17 +384,6 @@ export class Dispatcher { continue; } - // Pre-dispatch context limit enforcement (opt-in per alias). Runs on - // the finalized per-target request — after any vision fallthrough has - // expanded the prompt and after cooldown has selected a live target — - // so we reject oversized prompts locally with a 400 instead of - // burning an upstream round trip on a guaranteed failure. A thrown - // ContextLengthExceededError escapes the loop (it's a client-side - // problem; failing over to another target won't help). - if (aliasConfig?.enforce_limits && route.canonicalModel) { - enforceContextLimit(currentRequest, aliasConfig, route.canonicalModel); - } - attemptedProviders.push(`${route.provider}/${route.model}`); let released = false; @@ -641,9 +642,10 @@ export class Dispatcher { logger.info( `TTFB stall: fetch timed out after ${ttfbMs}ms for ${route.provider}/${route.model}, retrying with next provider` ); + doRelease(); continue; } - + doRelease(); throw stallError; } throw fetchError;