Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -317,4 +317,87 @@ describe('Dispatcher concurrency slot release', () => {
// Verify no slots are leaked after a normal non-streaming dispatch
expect(tracker.getProviderCount('p1')).toBe(0);
});

test('enforceContextLimit does not leak concurrency slot', async () => {
setConfigForTesting({
...makeConfig(2),
models: {
'test-alias': {
selector: 'in_order',
target_groups: [
{
name: 'default',
selector: 'in_order',
targets: [{ provider: 'p1', model: 'model-a', enabled: true }],
},
],
},
// model-a config with enforce_limits enabled and a tiny context window
'model-a': {
enforce_limits: true,
context_length: 100,
},
},
} as any);

const tracker = ConcurrencyTracker.getInstance();
expect(tracker.getTargetCount('p1', 'model-a')).toBe(0);

const dispatcher = new Dispatcher();
try {
// Request with messages that exceed the tiny 100-token context limit
await dispatcher.dispatch({
model: 'test-alias',
messages: [{ role: 'user', content: 'x'.repeat(1000) }],
incomingApiType: 'chat',
stream: false,
});
} catch (e: any) {
// Error is expected (may be wrapped by buildAllTargetsFailedError)
expect(e).toBeDefined();
}

// The slot must NOT be leaked — enforceContextLimit ran before acquire()
expect(tracker.getTargetCount('p1', 'model-a')).toBe(0);
expect(tracker.getProviderCount('p1')).toBe(0);
});

test('TTFB stall timeout releases concurrency slot', async () => {
// Enable stall detection with a very short TTFB timeout so the
// fetch itself gets aborted before the provider responds.
setConfigForTesting({
...makeConfig(2),
stall: { ttfbSeconds: 0.05, ttfbBytes: 100 },
} as any);

// Simulate a slow provider that respects abort signals.
// The stall timeout will abort before this resolves.
fetchMock.mockImplementation(async (_url: string, opts: any) => {
return new Promise((_resolve, reject) => {
const timer = setTimeout(() => {
_resolve(streamingResponse());
}, 500);
opts?.signal?.addEventListener('abort', () => {
clearTimeout(timer);
reject(new DOMException('The operation was aborted', 'AbortError'));
});
});
});

const tracker = ConcurrencyTracker.getInstance();
expect(tracker.getTargetCount('p1', 'model-a')).toBe(0);

const dispatcher = new Dispatcher();
try {
await dispatcher.dispatch(makeChatRequest(true));
} catch {
// Expected — stall timeout error propagates
}

// The slot must be released after the TTFB stall abort
expect(tracker.getTargetCount('p1', 'model-a')).toBe(0);
expect(tracker.getProviderCount('p1')).toBe(0);
expect(tracker.getTargetCount('p2', 'model-b')).toBe(0);
expect(tracker.getProviderCount('p2')).toBe(0);
});
});
26 changes: 14 additions & 12 deletions packages/backend/src/services/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,18 @@ export class Dispatcher {
continue;
}

// Pre-dispatch context limit enforcement (opt-in per alias). Runs on
// the finalized per-target request — after any vision fallthrough has
// expanded the prompt and after cooldown has selected a live target —
// so we reject oversized prompts locally with a 400 instead of
// burning an upstream round trip on a guaranteed failure. Checked
// BEFORE acquiring a concurrency slot so that a thrown
// ContextLengthExceededError (a client-side problem; failing over to
// another target won't help) never leaks an acquired slot.
if (aliasConfig?.enforce_limits && route.canonicalModel) {
enforceContextLimit(currentRequest, aliasConfig, route.canonicalModel);
}

// Acquire concurrency slot before upstream request
const acquired = ConcurrencyTracker.getInstance().acquire(route.provider, route.model);
if (!acquired) {
Expand All @@ -372,17 +384,6 @@ export class Dispatcher {
continue;
}

// Pre-dispatch context limit enforcement (opt-in per alias). Runs on
// the finalized per-target request — after any vision fallthrough has
// expanded the prompt and after cooldown has selected a live target —
// so we reject oversized prompts locally with a 400 instead of
// burning an upstream round trip on a guaranteed failure. A thrown
// ContextLengthExceededError escapes the loop (it's a client-side
// problem; failing over to another target won't help).
if (aliasConfig?.enforce_limits && route.canonicalModel) {
enforceContextLimit(currentRequest, aliasConfig, route.canonicalModel);
}

attemptedProviders.push(`${route.provider}/${route.model}`);

let released = false;
Expand Down Expand Up @@ -641,9 +642,10 @@ export class Dispatcher {
logger.info(
`TTFB stall: fetch timed out after ${ttfbMs}ms for ${route.provider}/${route.model}, retrying with next provider`
);
doRelease();
continue;
}

doRelease();
throw stallError;
}
throw fetchError;
Expand Down