From 7d2aac16f9b833f99e4dc6ea232161ad92a514c6 Mon Sep 17 00:00:00 2001 From: Tomi Jokinen Date: Sat, 16 May 2026 12:53:24 -0600 Subject: [PATCH] add maxConcurrency option for providers --- packages/opencode/src/config/provider.ts | 4 + packages/opencode/src/provider/concurrency.ts | 36 ++++++ packages/opencode/src/session/llm.ts | 15 ++- .../test/provider/concurrency.test.ts | 108 ++++++++++++++++++ 4 files changed, 161 insertions(+), 2 deletions(-) create mode 100644 packages/opencode/src/provider/concurrency.ts create mode 100644 packages/opencode/test/provider/concurrency.test.ts diff --git a/packages/opencode/src/config/provider.ts b/packages/opencode/src/config/provider.ts index 5635512cedf9..05f976be6486 100644 --- a/packages/opencode/src/config/provider.ts +++ b/packages/opencode/src/config/provider.ts @@ -100,6 +100,10 @@ export const Info = Schema.Struct({ description: "Timeout in milliseconds between streamed SSE chunks for this provider. If no chunk arrives within this window, the request is aborted.", }), + maxConcurrency: Schema.optional(PositiveInt).annotate({ + description: + "Maximum number of concurrent requests to this provider. Set to 1 to force sequential requests. Omit to disable the limit.", + }), }), [Schema.Record(Schema.String, Schema.Any)], ), diff --git a/packages/opencode/src/provider/concurrency.ts b/packages/opencode/src/provider/concurrency.ts new file mode 100644 index 000000000000..67c5772fc782 --- /dev/null +++ b/packages/opencode/src/provider/concurrency.ts @@ -0,0 +1,36 @@ +import { Context, Effect, Layer, Scope, Semaphore } from "effect" +import type { ProviderID } from "./schema" + +export interface Interface { + readonly acquire: ( + providerID: ProviderID, + maxConcurrency: number | undefined, + ) => Effect.Effect +} + +export class Service extends Context.Service()("@opencode/ProviderConcurrency") {} + +export const defaultLayer = Layer.effect( + Service, + Effect.gen(function* () { + const semaphores = new Map() + + const get = (providerID: ProviderID, maxConcurrency: number) => { + const existing = semaphores.get(providerID) + if (existing) return existing + const next = Semaphore.makeUnsafe(maxConcurrency) + semaphores.set(providerID, next) + return next + } + + const acquire: Interface["acquire"] = (providerID, maxConcurrency) => { + if (maxConcurrency === undefined) return Effect.void + const sem = get(providerID, maxConcurrency) + return Effect.asVoid(Effect.acquireRelease(sem.take(1), () => sem.release(1))) + } + + return Service.of({ acquire }) + }), +) + +export * as ProviderConcurrency from "./concurrency" diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index 0cf3a2398f9b..ad05ee80b19f 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -1,4 +1,5 @@ import { Provider } from "@/provider/provider" +import { ProviderConcurrency } from "@/provider/concurrency" import * as Log from "@opencode-ai/core/util/log" import { Context, Effect, Layer, Record } from "effect" import * as Stream from "effect/Stream" @@ -62,7 +63,13 @@ export class Service extends Context.Service()("@opencode/LL const live: Layer.Layer< Service, never, - Auth.Service | Config.Service | Provider.Service | Plugin.Service | Permission.Service | RuntimeFlags.Service + | Auth.Service + | Config.Service + | Provider.Service + | Plugin.Service + | Permission.Service + | RuntimeFlags.Service + | ProviderConcurrency.Service > = Layer.effect( Service, Effect.gen(function* () { @@ -72,6 +79,7 @@ const live: Layer.Layer< const plugin = yield* Plugin.Service const perm = yield* Permission.Service const flags = yield* RuntimeFlags.Service + const providerConcurrency = yield* ProviderConcurrency.Service const run = Effect.fn("LLM.run")(function* (input: StreamRequest) { const l = log @@ -413,6 +421,9 @@ const live: Layer.Layer< (ctrl) => Effect.sync(() => ctrl.abort()), ) + const item = yield* provider.getProvider(input.model.providerID) + yield* providerConcurrency.acquire(input.model.providerID, item.options?.maxConcurrency) + const result = yield* run({ ...input, abort: ctrl.signal }) return Stream.fromAsyncIterable(result.fullStream, (e) => (e instanceof Error ? e : new Error(String(e)))) @@ -424,7 +435,7 @@ const live: Layer.Layer< }), ) -export const layer = live.pipe(Layer.provide(Permission.defaultLayer)) +export const layer = live.pipe(Layer.provide(Permission.defaultLayer), Layer.provide(ProviderConcurrency.defaultLayer)) export const defaultLayer = Layer.suspend(() => layer.pipe( diff --git a/packages/opencode/test/provider/concurrency.test.ts b/packages/opencode/test/provider/concurrency.test.ts new file mode 100644 index 000000000000..1e91cef90a17 --- /dev/null +++ b/packages/opencode/test/provider/concurrency.test.ts @@ -0,0 +1,108 @@ +import { describe, expect } from "bun:test" +import { Effect } from "effect" +import { ProviderConcurrency } from "@/provider/concurrency" +import { ProviderID } from "@/provider/schema" +import { testEffect } from "../lib/effect" + +const it = testEffect(ProviderConcurrency.defaultLayer) + +describe("provider.concurrency", () => { + it.live("serializes acquires for the same provider", () => + Effect.gen(function* () { + const svc = yield* ProviderConcurrency.Service + const pid = ProviderID.make("featherless") + + let active = 0 + let max = 0 + + const work = Effect.scoped( + Effect.gen(function* () { + yield* svc.acquire(pid, 1) + active++ + max = Math.max(max, active) + yield* Effect.sleep("20 millis") + active-- + }), + ) + + yield* Effect.all([work, work, work], { concurrency: "unbounded" }) + + expect(max).toBe(1) + }), + ) + + it.live("isolates different providers", () => + Effect.gen(function* () { + const svc = yield* ProviderConcurrency.Service + const a = ProviderID.make("a") + const b = ProviderID.make("b") + + let active = 0 + let max = 0 + + const work = (pid: ProviderID) => + Effect.scoped( + Effect.gen(function* () { + yield* svc.acquire(pid, 1) + active++ + max = Math.max(max, active) + yield* Effect.sleep("20 millis") + active-- + }), + ) + + yield* Effect.all([work(a), work(b)], { concurrency: "unbounded" }) + + expect(max).toBe(2) + }), + ) + + it.live("undefined maxConcurrency does not block", () => + Effect.gen(function* () { + const svc = yield* ProviderConcurrency.Service + const pid = ProviderID.make("nolimit") + + let active = 0 + let max = 0 + + const work = Effect.scoped( + Effect.gen(function* () { + yield* svc.acquire(pid, undefined) + active++ + max = Math.max(max, active) + yield* Effect.sleep("20 millis") + active-- + }), + ) + + yield* Effect.all([work, work, work], { concurrency: "unbounded" }) + + expect(max).toBe(3) + }), + ) + + it.live("releases the permit when the scope closes", () => + Effect.gen(function* () { + const svc = yield* ProviderConcurrency.Service + const pid = ProviderID.make("seq") + + const order: string[] = [] + + const work = (name: string) => + Effect.scoped( + Effect.gen(function* () { + yield* svc.acquire(pid, 1) + order.push(`${name}:start`) + yield* Effect.sleep("10 millis") + order.push(`${name}:end`) + }), + ) + + yield* Effect.all([work("A"), work("B")], { concurrency: "unbounded" }) + + const firstStart = order[0].split(":")[0] + const firstEnd = order[1].split(":")[0] + expect(firstStart).toBe(firstEnd) + }), + ) +})