diff --git a/packages/opencode/src/config/provider.ts b/packages/opencode/src/config/provider.ts index 5635512cedf9..8b4cbfb6518a 100644 --- a/packages/opencode/src/config/provider.ts +++ b/packages/opencode/src/config/provider.ts @@ -66,6 +66,14 @@ export const Model = Schema.Struct({ ), ).annotate({ description: "Variant-specific configuration" }), ), + maxConcurrency: Schema.optional(PositiveInt).annotate({ + description: + "Maximum number of concurrent requests for this specific model. When set, uses an independent semaphore and overrides the provider-level maxConcurrency for this model.", + }), + concurrencyCost: Schema.optional(PositiveInt).annotate({ + description: + "Number of concurrency permits this model consumes per request against the provider-level maxConcurrency budget. Defaults to 1. Useful for providers like Featherless where larger models cost more units.", + }), }) export const Info = Schema.Struct({ @@ -100,6 +108,10 @@ export const Info = Schema.Struct({ description: "Timeout in milliseconds between streamed SSE chunks for this provider. If no chunk arrives within this window, the request is aborted.", }), + maxConcurrency: Schema.optional(PositiveInt).annotate({ + description: + "Maximum number of concurrent requests to this provider. Set to 1 to force sequential requests. Omit to disable the limit.", + }), }), [Schema.Record(Schema.String, Schema.Any)], ), diff --git a/packages/opencode/src/provider/concurrency.ts b/packages/opencode/src/provider/concurrency.ts new file mode 100644 index 000000000000..b0bbd37f2e86 --- /dev/null +++ b/packages/opencode/src/provider/concurrency.ts @@ -0,0 +1,76 @@ +import { Context, Effect, Layer, Scope, Semaphore } from "effect" +import * as Log from "@opencode-ai/core/util/log" +import type { ProviderID } from "./schema" + +const log = Log.create({ service: "provider.concurrency" }) + +export interface AcquireOptions { + readonly providerID: ProviderID + readonly modelID: string + readonly providerMaxConcurrency: number | undefined + readonly modelMaxConcurrency: number | undefined + readonly concurrencyCost: number | undefined +} + +export interface Interface { + readonly acquire: (options: AcquireOptions) => Effect.Effect +} + +export class Service extends Context.Service()("@opencode/ProviderConcurrency") {} + +export const defaultLayer = Layer.effect( + Service, + Effect.gen(function* () { + const semaphores = new Map() + const warnedOverBudget = new Set() + + const get = (key: string, capacity: number) => { + const existing = semaphores.get(key) + if (existing) return existing + const next = Semaphore.makeUnsafe(capacity) + semaphores.set(key, next) + return next + } + + const acquire: Interface["acquire"] = ({ + providerID, + modelID, + providerMaxConcurrency, + modelMaxConcurrency, + concurrencyCost, + }) => { + const steps: Effect.Effect[] = [] + // Model permit first: it's cheap (always 1) and held against a smaller queue, + // so we don't hog provider permits while waiting on a per-model cap. + if (modelMaxConcurrency !== undefined) { + const sem = get(`${providerID}:${modelID}`, modelMaxConcurrency) + steps.push(Effect.asVoid(Effect.acquireRelease(sem.take(1), () => sem.release(1)))) + } + if (providerMaxConcurrency !== undefined) { + const requestedCost = concurrencyCost ?? 1 + const cost = Math.min(requestedCost, providerMaxConcurrency) + // Clamp rather than block forever; warn once so the misconfig is visible. + if (requestedCost > providerMaxConcurrency) { + const key = `${providerID}:${modelID}` + if (!warnedOverBudget.has(key)) { + warnedOverBudget.add(key) + log.warn("model concurrencyCost exceeds provider maxConcurrency; clamping to budget", { + providerID, + modelID, + concurrencyCost: requestedCost, + providerMaxConcurrency, + }) + } + } + const sem = get(providerID, providerMaxConcurrency) + steps.push(Effect.asVoid(Effect.acquireRelease(sem.take(cost), () => sem.release(cost)))) + } + if (steps.length === 0) return Effect.void + return Effect.all(steps, { discard: true }) + } + + return Service.of({ acquire }) + }), +) + +export * as ProviderConcurrency from "./concurrency" diff --git a/packages/opencode/src/provider/provider.ts b/packages/opencode/src/provider/provider.ts index 2a778fd64456..4cbaeec83ab3 100644 --- a/packages/opencode/src/provider/provider.ts +++ b/packages/opencode/src/provider/provider.ts @@ -921,6 +921,8 @@ export const Model = Schema.Struct({ headers: Schema.Record(Schema.String, Schema.String), release_date: Schema.String, variants: optionalOmitUndefined(Schema.Record(Schema.String, Schema.Record(Schema.String, Schema.Any))), + maxConcurrency: optionalOmitUndefined(Schema.Number), + concurrencyCost: optionalOmitUndefined(Schema.Number), }).annotate({ identifier: "Model" }) export type Model = Types.DeepMutable> @@ -1338,6 +1340,8 @@ export const layer = Layer.effect( family: model.family ?? existingModel?.family ?? "", release_date: model.release_date ?? existingModel?.release_date ?? "", variants: {}, + maxConcurrency: model.maxConcurrency ?? existingModel?.maxConcurrency, + concurrencyCost: model.concurrencyCost ?? existingModel?.concurrencyCost, } const merged = mergeDeep(ProviderTransform.variants(parsedModel), model.variants ?? {}) parsedModel.variants = mapValues( diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index a98daecedc50..a902fcdd5291 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -1,4 +1,5 @@ import { Provider } from "@/provider/provider" +import { ProviderConcurrency } from "@/provider/concurrency" import * as Log from "@opencode-ai/core/util/log" import { Context, Effect, Layer, Record } from "effect" import * as Stream from "effect/Stream" @@ -71,6 +72,7 @@ const live: Layer.Layer< | Permission.Service | LLMClientService | RuntimeFlags.Service + | ProviderConcurrency.Service > = Layer.effect( Service, Effect.gen(function* () { @@ -81,6 +83,7 @@ const live: Layer.Layer< const perm = yield* Permission.Service const llmClient = yield* LLMClient.Service const flags = yield* RuntimeFlags.Service + const providerConcurrency = yield* ProviderConcurrency.Service const run = Effect.fn("LLM.run")(function* (input: StreamRequest) { const l = log @@ -477,6 +480,15 @@ const live: Layer.Layer< (ctrl) => Effect.sync(() => ctrl.abort()), ) + const item = yield* provider.getProvider(input.model.providerID) + yield* providerConcurrency.acquire({ + providerID: input.model.providerID, + modelID: input.model.id, + providerMaxConcurrency: item.options?.maxConcurrency, + modelMaxConcurrency: input.model.maxConcurrency, + concurrencyCost: input.model.concurrencyCost, + }) + const result = yield* run({ ...input, abort: ctrl.signal }) if (result.type === "native") return result.stream @@ -496,7 +508,7 @@ const live: Layer.Layer< }), ) -export const layer = live.pipe(Layer.provide(Permission.defaultLayer)) +export const layer = live.pipe(Layer.provide(Permission.defaultLayer), Layer.provide(ProviderConcurrency.defaultLayer)) export const defaultLayer = Layer.suspend(() => layer.pipe( diff --git a/packages/opencode/test/provider/concurrency.test.ts b/packages/opencode/test/provider/concurrency.test.ts new file mode 100644 index 000000000000..5d817db38a37 --- /dev/null +++ b/packages/opencode/test/provider/concurrency.test.ts @@ -0,0 +1,258 @@ +import { describe, expect } from "bun:test" +import { Effect } from "effect" +import { ProviderConcurrency } from "@/provider/concurrency" +import { ProviderID } from "@/provider/schema" +import { testEffect } from "../lib/effect" + +const it = testEffect(ProviderConcurrency.defaultLayer) + +const opts = (overrides: { + providerID: ProviderID + modelID?: string + providerMaxConcurrency?: number + modelMaxConcurrency?: number + concurrencyCost?: number +}) => ({ + providerID: overrides.providerID, + modelID: overrides.modelID ?? "m", + providerMaxConcurrency: overrides.providerMaxConcurrency, + modelMaxConcurrency: overrides.modelMaxConcurrency, + concurrencyCost: overrides.concurrencyCost, +}) + +describe("provider.concurrency", () => { + it.live("serializes acquires for the same provider", () => + Effect.gen(function* () { + const svc = yield* ProviderConcurrency.Service + const pid = ProviderID.make("featherless") + + let active = 0 + let max = 0 + + const work = Effect.scoped( + Effect.gen(function* () { + yield* svc.acquire(opts({ providerID: pid, providerMaxConcurrency: 1 })) + active++ + max = Math.max(max, active) + yield* Effect.sleep("20 millis") + active-- + }), + ) + + yield* Effect.all([work, work, work], { concurrency: "unbounded" }) + + expect(max).toBe(1) + }), + ) + + it.live("isolates different providers", () => + Effect.gen(function* () { + const svc = yield* ProviderConcurrency.Service + const a = ProviderID.make("a") + const b = ProviderID.make("b") + + let active = 0 + let max = 0 + + const work = (pid: ProviderID) => + Effect.scoped( + Effect.gen(function* () { + yield* svc.acquire(opts({ providerID: pid, providerMaxConcurrency: 1 })) + active++ + max = Math.max(max, active) + yield* Effect.sleep("20 millis") + active-- + }), + ) + + yield* Effect.all([work(a), work(b)], { concurrency: "unbounded" }) + + expect(max).toBe(2) + }), + ) + + it.live("no limits configured does not block", () => + Effect.gen(function* () { + const svc = yield* ProviderConcurrency.Service + const pid = ProviderID.make("nolimit") + + let active = 0 + let max = 0 + + const work = Effect.scoped( + Effect.gen(function* () { + yield* svc.acquire(opts({ providerID: pid })) + active++ + max = Math.max(max, active) + yield* Effect.sleep("20 millis") + active-- + }), + ) + + yield* Effect.all([work, work, work], { concurrency: "unbounded" }) + + expect(max).toBe(3) + }), + ) + + it.live("releases the permit when the scope closes", () => + Effect.gen(function* () { + const svc = yield* ProviderConcurrency.Service + const pid = ProviderID.make("seq") + + const order: string[] = [] + + const work = (name: string) => + Effect.scoped( + Effect.gen(function* () { + yield* svc.acquire(opts({ providerID: pid, providerMaxConcurrency: 1 })) + order.push(`${name}:start`) + yield* Effect.sleep("10 millis") + order.push(`${name}:end`) + }), + ) + + yield* Effect.all([work("A"), work("B")], { concurrency: "unbounded" }) + + const firstStart = order[0].split(":")[0] + const firstEnd = order[1].split(":")[0] + expect(firstStart).toBe(firstEnd) + }), + ) + + it.live("weighted cost limits concurrency against the provider budget", () => + Effect.gen(function* () { + const svc = yield* ProviderConcurrency.Service + const pid = ProviderID.make("featherless-weighted") + + let active = 0 + let max = 0 + + const work = Effect.scoped( + Effect.gen(function* () { + yield* svc.acquire(opts({ providerID: pid, providerMaxConcurrency: 4, concurrencyCost: 4 })) + active++ + max = Math.max(max, active) + yield* Effect.sleep("20 millis") + active-- + }), + ) + + yield* Effect.all([work, work, work], { concurrency: "unbounded" }) + + expect(max).toBe(1) + }), + ) + + it.live("models with different costs share the provider budget", () => + Effect.gen(function* () { + const svc = yield* ProviderConcurrency.Service + const pid = ProviderID.make("featherless-mixed") + + let active = 0 + let max = 0 + + const work = (modelID: string, cost: number) => + Effect.scoped( + Effect.gen(function* () { + yield* svc.acquire(opts({ providerID: pid, modelID, providerMaxConcurrency: 4, concurrencyCost: cost })) + active++ + max = Math.max(max, active) + yield* Effect.sleep("20 millis") + active-- + }), + ) + + yield* Effect.all([work("big", 4), work("small", 1), work("small", 1)], { concurrency: "unbounded" }) + + expect(max).toBe(2) + }), + ) + + it.live("per-model maxConcurrency uses an independent semaphore per model", () => + Effect.gen(function* () { + const svc = yield* ProviderConcurrency.Service + const pid = ProviderID.make("glm") + + let active = 0 + let max = 0 + + const work = (modelID: string) => + Effect.scoped( + Effect.gen(function* () { + yield* svc.acquire(opts({ providerID: pid, modelID, modelMaxConcurrency: 1 })) + active++ + max = Math.max(max, active) + yield* Effect.sleep("20 millis") + active-- + }), + ) + + yield* Effect.all([work("glm-air"), work("glm-plus")], { concurrency: "unbounded" }) + + expect(max).toBe(2) + }), + ) + + it.live("model and provider caps both apply when both are configured", () => + Effect.gen(function* () { + const svc = yield* ProviderConcurrency.Service + const pid = ProviderID.make("stacked") + + let active = 0 + let max = 0 + + // Provider budget allows 4 in parallel, but per-model cap is 1: the tighter cap wins. + const work = Effect.scoped( + Effect.gen(function* () { + yield* svc.acquire( + opts({ providerID: pid, modelID: "m", providerMaxConcurrency: 4, modelMaxConcurrency: 1 }), + ) + active++ + max = Math.max(max, active) + yield* Effect.sleep("20 millis") + active-- + }), + ) + + yield* Effect.all([work, work, work], { concurrency: "unbounded" }) + + expect(max).toBe(1) + }), + ) + + it.live("stacked caps still count weighted cost against the provider budget", () => + Effect.gen(function* () { + const svc = yield* ProviderConcurrency.Service + const pid = ProviderID.make("stacked-weighted") + + let active = 0 + let max = 0 + + // Two different models, each capped at 1 individually, but each costs 4 against a 4-unit provider budget. + // Provider budget should still serialize them across models. + const work = (modelID: string) => + Effect.scoped( + Effect.gen(function* () { + yield* svc.acquire( + opts({ + providerID: pid, + modelID, + providerMaxConcurrency: 4, + modelMaxConcurrency: 1, + concurrencyCost: 4, + }), + ) + active++ + max = Math.max(max, active) + yield* Effect.sleep("20 millis") + active-- + }), + ) + + yield* Effect.all([work("kimi"), work("deepseek")], { concurrency: "unbounded" }) + + expect(max).toBe(1) + }), + ) +})