Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/opencode/src/config/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ export const Info = Schema.Struct({
description:
"Timeout in milliseconds between streamed SSE chunks for this provider. If no chunk arrives within this window, the request is aborted.",
}),
maxConcurrency: Schema.optional(PositiveInt).annotate({
description:
"Maximum number of concurrent requests to this provider. Set to 1 to force sequential requests. Omit to disable the limit.",
}),
}),
[Schema.Record(Schema.String, Schema.Any)],
),
Expand Down
36 changes: 36 additions & 0 deletions packages/opencode/src/provider/concurrency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { Context, Effect, Layer, Scope, Semaphore } from "effect"
import type { ProviderID } from "./schema"

export interface Interface {
readonly acquire: (
providerID: ProviderID,
maxConcurrency: number | undefined,
) => Effect.Effect<void, never, Scope.Scope>
}

export class Service extends Context.Service<Service, Interface>()("@opencode/ProviderConcurrency") {}

export const defaultLayer = Layer.effect(
Service,
Effect.gen(function* () {
const semaphores = new Map<ProviderID, Semaphore.Semaphore>()

const get = (providerID: ProviderID, maxConcurrency: number) => {
const existing = semaphores.get(providerID)
if (existing) return existing
const next = Semaphore.makeUnsafe(maxConcurrency)
semaphores.set(providerID, next)
return next
}

const acquire: Interface["acquire"] = (providerID, maxConcurrency) => {
if (maxConcurrency === undefined) return Effect.void
const sem = get(providerID, maxConcurrency)
return Effect.asVoid(Effect.acquireRelease(sem.take(1), () => sem.release(1)))
}

return Service.of({ acquire })
}),
)

export * as ProviderConcurrency from "./concurrency"
15 changes: 13 additions & 2 deletions packages/opencode/src/session/llm.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Provider } from "@/provider/provider"
import { ProviderConcurrency } from "@/provider/concurrency"
import * as Log from "@opencode-ai/core/util/log"
import { Context, Effect, Layer, Record } from "effect"
import * as Stream from "effect/Stream"
Expand Down Expand Up @@ -62,7 +63,13 @@ export class Service extends Context.Service<Service, Interface>()("@opencode/LL
const live: Layer.Layer<
Service,
never,
Auth.Service | Config.Service | Provider.Service | Plugin.Service | Permission.Service | RuntimeFlags.Service
| Auth.Service
| Config.Service
| Provider.Service
| Plugin.Service
| Permission.Service
| RuntimeFlags.Service
| ProviderConcurrency.Service
> = Layer.effect(
Service,
Effect.gen(function* () {
Expand All @@ -72,6 +79,7 @@ const live: Layer.Layer<
const plugin = yield* Plugin.Service
const perm = yield* Permission.Service
const flags = yield* RuntimeFlags.Service
const providerConcurrency = yield* ProviderConcurrency.Service

const run = Effect.fn("LLM.run")(function* (input: StreamRequest) {
const l = log
Expand Down Expand Up @@ -413,6 +421,9 @@ const live: Layer.Layer<
(ctrl) => Effect.sync(() => ctrl.abort()),
)

const item = yield* provider.getProvider(input.model.providerID)
yield* providerConcurrency.acquire(input.model.providerID, item.options?.maxConcurrency)

const result = yield* run({ ...input, abort: ctrl.signal })

return Stream.fromAsyncIterable(result.fullStream, (e) => (e instanceof Error ? e : new Error(String(e))))
Expand All @@ -424,7 +435,7 @@ const live: Layer.Layer<
}),
)

export const layer = live.pipe(Layer.provide(Permission.defaultLayer))
export const layer = live.pipe(Layer.provide(Permission.defaultLayer), Layer.provide(ProviderConcurrency.defaultLayer))

export const defaultLayer = Layer.suspend(() =>
layer.pipe(
Expand Down
108 changes: 108 additions & 0 deletions packages/opencode/test/provider/concurrency.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import { describe, expect } from "bun:test"
import { Effect } from "effect"
import { ProviderConcurrency } from "@/provider/concurrency"
import { ProviderID } from "@/provider/schema"
import { testEffect } from "../lib/effect"

const it = testEffect(ProviderConcurrency.defaultLayer)

describe("provider.concurrency", () => {
it.live("serializes acquires for the same provider", () =>
Effect.gen(function* () {
const svc = yield* ProviderConcurrency.Service
const pid = ProviderID.make("featherless")

let active = 0
let max = 0

const work = Effect.scoped(
Effect.gen(function* () {
yield* svc.acquire(pid, 1)
active++
max = Math.max(max, active)
yield* Effect.sleep("20 millis")
active--
}),
)

yield* Effect.all([work, work, work], { concurrency: "unbounded" })

expect(max).toBe(1)
}),
)

it.live("isolates different providers", () =>
Effect.gen(function* () {
const svc = yield* ProviderConcurrency.Service
const a = ProviderID.make("a")
const b = ProviderID.make("b")

let active = 0
let max = 0

const work = (pid: ProviderID) =>
Effect.scoped(
Effect.gen(function* () {
yield* svc.acquire(pid, 1)
active++
max = Math.max(max, active)
yield* Effect.sleep("20 millis")
active--
}),
)

yield* Effect.all([work(a), work(b)], { concurrency: "unbounded" })

expect(max).toBe(2)
}),
)

it.live("undefined maxConcurrency does not block", () =>
Effect.gen(function* () {
const svc = yield* ProviderConcurrency.Service
const pid = ProviderID.make("nolimit")

let active = 0
let max = 0

const work = Effect.scoped(
Effect.gen(function* () {
yield* svc.acquire(pid, undefined)
active++
max = Math.max(max, active)
yield* Effect.sleep("20 millis")
active--
}),
)

yield* Effect.all([work, work, work], { concurrency: "unbounded" })

expect(max).toBe(3)
}),
)

it.live("releases the permit when the scope closes", () =>
Effect.gen(function* () {
const svc = yield* ProviderConcurrency.Service
const pid = ProviderID.make("seq")

const order: string[] = []

const work = (name: string) =>
Effect.scoped(
Effect.gen(function* () {
yield* svc.acquire(pid, 1)
order.push(`${name}:start`)
yield* Effect.sleep("10 millis")
order.push(`${name}:end`)
}),
)

yield* Effect.all([work("A"), work("B")], { concurrency: "unbounded" })

const firstStart = order[0].split(":")[0]
const firstEnd = order[1].split(":")[0]
expect(firstStart).toBe(firstEnd)
}),
)
})
Loading