Skip to content

Commit ed64de9

Browse files
fix(providers): record hosted-key failure when a hosted stream errors mid-drain
1 parent 8d6b768 commit ed64de9

2 files changed

Lines changed: 68 additions & 12 deletions

File tree

apps/sim/providers/streaming-execution.test.ts

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,20 @@
33
*/
44
import { describe, expect, it, vi } from 'vitest'
55

6-
const { mockRecordCostCharged } = vi.hoisted(() => ({ mockRecordCostCharged: vi.fn() }))
6+
const { mockRecordCostCharged, mockRecordFailed } = vi.hoisted(() => ({
7+
mockRecordCostCharged: vi.fn(),
8+
mockRecordFailed: vi.fn(),
9+
}))
710

811
vi.mock('@/providers/utils', () => ({
912
calculateCost: vi.fn(() => ({ input: 1, output: 2, total: 3, pricing: {} })),
1013
}))
1114
vi.mock('@/lib/core/config/env-flags', () => ({ getCostMultiplier: () => 1 }))
1215
vi.mock('@/lib/monitoring/metrics', () => ({
13-
hostedKeyMetrics: { recordCostCharged: mockRecordCostCharged },
16+
hostedKeyMetrics: { recordCostCharged: mockRecordCostCharged, recordFailed: mockRecordFailed },
17+
}))
18+
vi.mock('@/lib/api-key/hosted-cost', () => ({
19+
classifyHostedKeyFailure: () => 'other',
1420
}))
1521

1622
import type { NormalizedBlockOutput } from '@/executor/types'
@@ -81,6 +87,41 @@ describe('createStreamingExecution', () => {
8187
provider: 'openai',
8288
tool: 'test-model',
8389
})
90+
expect(mockRecordFailed).not.toHaveBeenCalled()
91+
})
92+
93+
it('records a hosted-key failure (not cost) when the stream errors mid-drain', async () => {
94+
mockRecordCostCharged.mockClear()
95+
mockRecordFailed.mockClear()
96+
const boom = new Error('upstream 500')
97+
const sourceStream = new ReadableStream({
98+
pull: (c) => c.error(boom),
99+
})
100+
101+
const result = createStreamingExecution({
102+
model: 'test-model',
103+
providerStartTime,
104+
providerStartTimeISO,
105+
timing: { kind: 'simple', segmentName: 'test-model' },
106+
initialTokens: { input: 0, output: 0, total: 0 },
107+
initialCost: { input: 0, output: 0, total: 0 },
108+
hostedKey: { provider: 'openai', envVar: 'OPENAI_API_KEY_1' },
109+
cached: false,
110+
createStream: () => sourceStream,
111+
})
112+
113+
const reader = result.stream.getReader()
114+
await expect(reader.read()).rejects.toThrow('upstream 500')
115+
116+
// Failure recorded once; no cost charged for a failed stream.
117+
expect(mockRecordFailed).toHaveBeenCalledTimes(1)
118+
expect(mockRecordFailed).toHaveBeenCalledWith({
119+
provider: 'openai',
120+
tool: 'test-model',
121+
key: 'OPENAI_API_KEY_1',
122+
reason: 'other',
123+
})
124+
expect(mockRecordCostCharged).not.toHaveBeenCalled()
84125
})
85126

86127
it('assembles the simple (no-tools) shape and finalizes timing on drain', () => {

apps/sim/providers/streaming-execution.ts

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,42 @@
1+
import { classifyHostedKeyFailure } from '@/lib/api-key/hosted-cost'
12
import { getCostMultiplier } from '@/lib/core/config/env-flags'
23
import { hostedKeyMetrics } from '@/lib/monitoring/metrics'
34
import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types'
45
import type { TimeSegment } from '@/providers/types'
56
import { calculateCost } from '@/providers/utils'
67

78
/**
8-
* Passthrough of `source` that invokes `onDrain` exactly once, after the source
9-
* completes naturally (not on error or cancel). Lets the hosted-key cost settle
10-
* when final tokens are known, regardless of whether the provider's drain
11-
* callback finalized timing.
9+
* Passthrough of `source` that runs exactly one terminal callback: `onDrain`
10+
* when it completes normally (cost is settled then), or `onError` when a read
11+
* errors mid-stream (records the hosted-key failure). A client `cancel` is not a
12+
* key failure, so it runs neither. Lets hosted-key cost/failure metrics stay
13+
* symmetric regardless of whether the provider's drain callback finalized timing.
1214
*/
1315
function settleHostedCostOnStreamDrain(
1416
source: ReadableStream,
15-
onDrain: () => void
17+
onDrain: () => void,
18+
onError: (error: unknown) => void
1619
): ReadableStream {
1720
const reader = source.getReader()
18-
let settled = false
21+
let finished = false
1922
return new ReadableStream({
2023
async pull(controller) {
2124
try {
2225
const { done, value } = await reader.read()
2326
if (done) {
24-
if (!settled) {
25-
settled = true
27+
if (!finished) {
28+
finished = true
2629
onDrain()
2730
}
2831
controller.close()
2932
return
3033
}
3134
controller.enqueue(value)
3235
} catch (error) {
36+
if (!finished) {
37+
finished = true
38+
onError(error)
39+
}
3340
controller.error(error)
3441
}
3542
},
@@ -247,8 +254,16 @@ export function createStreamingExecution(
247254
// the provider's drain callback before the stream closes). Recomputes the
248255
// authoritative cost with the multiplier and emits the cost metric exactly once.
249256
const stream = hostedKey
250-
? settleHostedCostOnStreamDrain(baseStream, () =>
251-
settleStreamingLlmCost(output, model, hostedKey, cached ?? false)
257+
? settleHostedCostOnStreamDrain(
258+
baseStream,
259+
() => settleStreamingLlmCost(output, model, hostedKey, cached ?? false),
260+
(error) =>
261+
hostedKeyMetrics.recordFailed({
262+
provider: hostedKey.provider,
263+
tool: model,
264+
key: hostedKey.envVar,
265+
reason: classifyHostedKeyFailure(error),
266+
})
252267
)
253268
: baseStream
254269

0 commit comments

Comments
 (0)