Skip to content

Commit 8d6b768

Browse files
fix(providers): settle hosted-key streaming cost on stream drain, not finalizeTiming
1 parent 5f9046e commit 8d6b768

2 files changed

Lines changed: 105 additions & 10 deletions

File tree

apps/sim/providers/streaming-execution.test.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,17 @@
22
* @vitest-environment node
33
*/
44
import { describe, expect, it, vi } from 'vitest'
5+
6+
const { mockRecordCostCharged } = vi.hoisted(() => ({ mockRecordCostCharged: vi.fn() }))
7+
8+
vi.mock('@/providers/utils', () => ({
9+
calculateCost: vi.fn(() => ({ input: 1, output: 2, total: 3, pricing: {} })),
10+
}))
11+
vi.mock('@/lib/core/config/env-flags', () => ({ getCostMultiplier: () => 1 }))
12+
vi.mock('@/lib/monitoring/metrics', () => ({
13+
hostedKeyMetrics: { recordCostCharged: mockRecordCostCharged },
14+
}))
15+
516
import type { NormalizedBlockOutput } from '@/executor/types'
617
import { createStreamingExecution } from '@/providers/streaming-execution'
718

@@ -27,6 +38,51 @@ describe('createStreamingExecution', () => {
2738
const providerStartTime = 1_000
2839
const providerStartTimeISO = new Date(providerStartTime).toISOString()
2940

41+
it('settles hosted-key cost on stream drain even when finalizeTiming is never called (post-tool path)', async () => {
42+
mockRecordCostCharged.mockClear()
43+
// A source stream that closes immediately, mirroring a drained provider stream.
44+
const sourceStream = new ReadableStream({ start: (c) => c.close() })
45+
46+
const result = createStreamingExecution({
47+
model: 'test-model',
48+
providerStartTime,
49+
providerStartTimeISO,
50+
timing: {
51+
kind: 'accumulated',
52+
modelTime: 1,
53+
toolsTime: 0,
54+
firstResponseTime: 1,
55+
iterations: 1,
56+
timeSegments: [],
57+
},
58+
initialTokens: { input: 0, output: 0, total: 0 },
59+
initialCost: { input: 0, output: 0, total: 0 },
60+
hostedKey: { provider: 'openai', envVar: 'OPENAI_API_KEY_1' },
61+
cached: false,
62+
// Post-tool streaming path: sets final tokens but never calls finalizeTiming.
63+
createStream: ({ output }) => {
64+
output.tokens = { input: 100, output: 50, total: 150 }
65+
return sourceStream
66+
},
67+
})
68+
69+
// Cost not settled until the stream is actually drained.
70+
expect(mockRecordCostCharged).not.toHaveBeenCalled()
71+
72+
const reader = result.stream.getReader()
73+
while (!(await reader.read()).done) {
74+
// drain
75+
}
76+
77+
// Settlement ran on drain: cost recomputed from final tokens, metric emitted once.
78+
expect(result.execution.output.cost).toEqual({ input: 1, output: 2, total: 3, pricing: {} })
79+
expect(mockRecordCostCharged).toHaveBeenCalledTimes(1)
80+
expect(mockRecordCostCharged).toHaveBeenCalledWith(3, {
81+
provider: 'openai',
82+
tool: 'test-model',
83+
})
84+
})
85+
3086
it('assembles the simple (no-tools) shape and finalizes timing on drain', () => {
3187
const drainTime = 5_000
3288
vi.spyOn(Date, 'now').mockReturnValue(drainTime)

apps/sim/providers/streaming-execution.ts

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,41 @@ import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types
44
import type { TimeSegment } from '@/providers/types'
55
import { calculateCost } from '@/providers/utils'
66

7+
/**
8+
* Passthrough of `source` that invokes `onDrain` exactly once, after the source
9+
* completes naturally (not on error or cancel). Lets the hosted-key cost settle
10+
* when final tokens are known, regardless of whether the provider's drain
11+
* callback finalized timing.
12+
*/
13+
function settleHostedCostOnStreamDrain(
14+
source: ReadableStream,
15+
onDrain: () => void
16+
): ReadableStream {
17+
const reader = source.getReader()
18+
let settled = false
19+
return new ReadableStream({
20+
async pull(controller) {
21+
try {
22+
const { done, value } = await reader.read()
23+
if (done) {
24+
if (!settled) {
25+
settled = true
26+
onDrain()
27+
}
28+
controller.close()
29+
return
30+
}
31+
controller.enqueue(value)
32+
} catch (error) {
33+
controller.error(error)
34+
}
35+
},
36+
cancel(reason) {
37+
return reader.cancel(reason)
38+
},
39+
})
40+
}
41+
742
/**
843
* Settle the authoritative streaming LLM cost onto `output.cost` from its final
944
* tokens (the single cost seam shared with the non-streaming path), and — on the
@@ -200,19 +235,23 @@ export function createStreamingExecution(
200235
}
201236

202237
const timingKind = timing.kind
203-
const stream = createStream({
238+
const baseStream = createStream({
204239
output,
205-
finalizeTiming: () => {
206-
// Hosted-key path: the wrapper owns the authoritative cost — recompute from
207-
// the now-final tokens (with the cost multiplier the per-provider streaming
208-
// paths omit) and emit the hosted-key cost metric exactly once on drain.
209-
if (hostedKey) {
210-
settleStreamingLlmCost(output, model, hostedKey, cached ?? false)
211-
}
212-
finalizeTiming(output, providerStartTime, timingKind)
213-
},
240+
finalizeTiming: () => finalizeTiming(output, providerStartTime, timingKind),
214241
})
215242

243+
// Settle hosted-key cost on actual stream drain. This must NOT hang off the
244+
// provider's `finalizeTiming` call — the post-tool streaming paths
245+
// (`createStream: ({ output }) => …`) never invoke it — so instead we wrap the
246+
// returned stream and settle once the source completes (final tokens are set by
247+
// the provider's drain callback before the stream closes). Recomputes the
248+
// authoritative cost with the multiplier and emits the cost metric exactly once.
249+
const stream = hostedKey
250+
? settleHostedCostOnStreamDrain(baseStream, () =>
251+
settleStreamingLlmCost(output, model, hostedKey, cached ?? false)
252+
)
253+
: baseStream
254+
216255
return {
217256
stream,
218257
execution: {

0 commit comments

Comments
 (0)