Skip to content

Commit c96a2f8

Browse files
fix(providers): record hosted-stream failures provider-agnostically at the chokepoint (covers gemini)
1 parent ed64de9 commit c96a2f8

3 files changed

Lines changed: 72 additions & 46 deletions

File tree

apps/sim/providers/index.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
uploadLargeFilesToProvider,
1111
} from '@/providers/file-attachments.server'
1212
import { getProviderExecutor } from '@/providers/registry'
13+
import { recordHostedStreamFailure } from '@/providers/streaming-execution'
1314
import type { ProviderId, ProviderRequest, ProviderResponse } from '@/providers/types'
1415
import {
1516
calculateCost,
@@ -229,14 +230,17 @@ export async function executeProviderRequest(
229230
if (isBYOK) {
230231
zeroCostForBYOK(response)
231232
} else if (hostedKeyEnvVar) {
232-
// Hosted key used: record usage now; cost is settled on stream drain via
233-
// settleStreamingLlmCost (from createStreamingExecution, or the provider's
234-
// bespoke stream finalizer for gemini).
235-
hostedKeyMetrics.recordUsed({
236-
provider: providerId,
237-
tool: response.execution.output?.model ?? request.model,
238-
key: hostedKeyEnvVar,
239-
})
233+
// Hosted key used: record usage at dispatch. Cost is settled per-provider on
234+
// successful stream drain (createStreamingExecution, or gemini's bespoke
235+
// finalizer); a mid-stream error records a failure here — provider-agnostic,
236+
// so it covers gemini and any non-wrapper stream too.
237+
const model = response.execution.output?.model ?? request.model
238+
hostedKeyMetrics.recordUsed({ provider: providerId, tool: model, key: hostedKeyEnvVar })
239+
response.stream = recordHostedStreamFailure(
240+
response.stream,
241+
{ provider: providerId, envVar: hostedKeyEnvVar },
242+
model
243+
)
240244
}
241245
return response
242246
}

apps/sim/providers/streaming-execution.test.ts

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ vi.mock('@/lib/api-key/hosted-cost', () => ({
2020
}))
2121

2222
import type { NormalizedBlockOutput } from '@/executor/types'
23-
import { createStreamingExecution } from '@/providers/streaming-execution'
23+
import {
24+
createStreamingExecution,
25+
recordHostedStreamFailure,
26+
} from '@/providers/streaming-execution'
2427

2528
/**
2629
* Builds a fake stream factory mirroring the providers' `createReadableStreamFrom*`
@@ -90,27 +93,19 @@ describe('createStreamingExecution', () => {
9093
expect(mockRecordFailed).not.toHaveBeenCalled()
9194
})
9295

93-
it('records a hosted-key failure (not cost) when the stream errors mid-drain', async () => {
96+
it('recordHostedStreamFailure records a failure (not cost) when the stream errors', async () => {
9497
mockRecordCostCharged.mockClear()
9598
mockRecordFailed.mockClear()
9699
const boom = new Error('upstream 500')
97-
const sourceStream = new ReadableStream({
98-
pull: (c) => c.error(boom),
99-
})
100+
const sourceStream = new ReadableStream({ pull: (c) => c.error(boom) })
100101

101-
const result = createStreamingExecution({
102-
model: 'test-model',
103-
providerStartTime,
104-
providerStartTimeISO,
105-
timing: { kind: 'simple', segmentName: 'test-model' },
106-
initialTokens: { input: 0, output: 0, total: 0 },
107-
initialCost: { input: 0, output: 0, total: 0 },
108-
hostedKey: { provider: 'openai', envVar: 'OPENAI_API_KEY_1' },
109-
cached: false,
110-
createStream: () => sourceStream,
111-
})
102+
const wrapped = recordHostedStreamFailure(
103+
sourceStream,
104+
{ provider: 'openai', envVar: 'OPENAI_API_KEY_1' },
105+
'test-model'
106+
)
112107

113-
const reader = result.stream.getReader()
108+
const reader = wrapped.getReader()
114109
await expect(reader.read()).rejects.toThrow('upstream 500')
115110

116111
// Failure recorded once; no cost charged for a failed stream.
@@ -124,6 +119,20 @@ describe('createStreamingExecution', () => {
124119
expect(mockRecordCostCharged).not.toHaveBeenCalled()
125120
})
126121

122+
it('recordHostedStreamFailure does not record a failure when the stream completes', async () => {
123+
mockRecordFailed.mockClear()
124+
const wrapped = recordHostedStreamFailure(
125+
new ReadableStream({ start: (c) => c.close() }),
126+
{ provider: 'openai', envVar: 'OPENAI_API_KEY_1' },
127+
'test-model'
128+
)
129+
const reader = wrapped.getReader()
130+
while (!(await reader.read()).done) {
131+
// drain
132+
}
133+
expect(mockRecordFailed).not.toHaveBeenCalled()
134+
})
135+
127136
it('assembles the simple (no-tools) shape and finalizes timing on drain', () => {
128137
const drainTime = 5_000
129138
vi.spyOn(Date, 'now').mockReturnValue(drainTime)

apps/sim/providers/streaming-execution.ts

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,13 @@ import type { TimeSegment } from '@/providers/types'
66
import { calculateCost } from '@/providers/utils'
77

88
/**
9-
* Passthrough of `source` that runs exactly one terminal callback: `onDrain`
10-
* when it completes normally (cost is settled then), or `onError` when a read
11-
* errors mid-stream (records the hosted-key failure). A client `cancel` is not a
12-
* key failure, so it runs neither. Lets hosted-key cost/failure metrics stay
13-
* symmetric regardless of whether the provider's drain callback finalized timing.
9+
* Passthrough of `source` that runs at most one terminal callback: `onDrain` when
10+
* it completes normally, or `onError` when a read errors mid-stream. A client
11+
* `cancel` runs neither (an abort is not a key failure).
1412
*/
15-
function settleHostedCostOnStreamDrain(
13+
function tapStreamTermination(
1614
source: ReadableStream,
17-
onDrain: () => void,
18-
onError: (error: unknown) => void
15+
callbacks: { onDrain?: () => void; onError?: (error: unknown) => void }
1916
): ReadableStream {
2017
const reader = source.getReader()
2118
let finished = false
@@ -26,7 +23,7 @@ function settleHostedCostOnStreamDrain(
2623
if (done) {
2724
if (!finished) {
2825
finished = true
29-
onDrain()
26+
callbacks.onDrain?.()
3027
}
3128
controller.close()
3229
return
@@ -35,7 +32,7 @@ function settleHostedCostOnStreamDrain(
3532
} catch (error) {
3633
if (!finished) {
3734
finished = true
38-
onError(error)
35+
callbacks.onError?.(error)
3936
}
4037
controller.error(error)
4138
}
@@ -46,6 +43,29 @@ function settleHostedCostOnStreamDrain(
4643
})
4744
}
4845

46+
/**
47+
* Wrap a hosted-key streaming response so a mid-stream read error records a
48+
* hosted-key failure metric. Applied provider-agnostically at the chokepoint
49+
* (`executeProviderRequest`) so it covers every provider — including ones that
50+
* build streams bespoke (gemini) and don't go through {@link createStreamingExecution}.
51+
* Cost on success is settled per-provider; this only handles the failure leg.
52+
*/
53+
export function recordHostedStreamFailure(
54+
source: ReadableStream,
55+
hostedKey: { provider: string; envVar: string },
56+
model: string
57+
): ReadableStream {
58+
return tapStreamTermination(source, {
59+
onError: (error) =>
60+
hostedKeyMetrics.recordFailed({
61+
provider: hostedKey.provider,
62+
tool: model,
63+
key: hostedKey.envVar,
64+
reason: classifyHostedKeyFailure(error),
65+
}),
66+
})
67+
}
68+
4969
/**
5070
* Settle the authoritative streaming LLM cost onto `output.cost` from its final
5171
* tokens (the single cost seam shared with the non-streaming path), and — on the
@@ -253,18 +273,11 @@ export function createStreamingExecution(
253273
// returned stream and settle once the source completes (final tokens are set by
254274
// the provider's drain callback before the stream closes). Recomputes the
255275
// authoritative cost with the multiplier and emits the cost metric exactly once.
276+
// Failure on error is handled provider-agnostically in executeProviderRequest.
256277
const stream = hostedKey
257-
? settleHostedCostOnStreamDrain(
258-
baseStream,
259-
() => settleStreamingLlmCost(output, model, hostedKey, cached ?? false),
260-
(error) =>
261-
hostedKeyMetrics.recordFailed({
262-
provider: hostedKey.provider,
263-
tool: model,
264-
key: hostedKey.envVar,
265-
reason: classifyHostedKeyFailure(error),
266-
})
267-
)
278+
? tapStreamTermination(baseStream, {
279+
onDrain: () => settleStreamingLlmCost(output, model, hostedKey, cached ?? false),
280+
})
268281
: baseStream
269282

270283
return {

0 commit comments

Comments
 (0)