-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathstreaming.go
More file actions
342 lines (290 loc) · 8.43 KB
/
streaming.go
File metadata and controls
342 lines (290 loc) · 8.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
package opik
import (
"context"
"strings"
"sync"
"time"
)
// StreamChunk represents a chunk of streaming data.
type StreamChunk struct {
Content string
Index int
IsFirst bool
IsLast bool
Timestamp time.Time
TokenCount int
FinishReason string
Metadata map[string]any
}
// StreamAccumulator accumulates streaming chunks into a complete response.
type StreamAccumulator struct {
mu sync.Mutex
chunks []StreamChunk
content strings.Builder
totalTokens int
firstChunk time.Time
lastChunk time.Time
finishReason string
metadata map[string]any
}
// NewStreamAccumulator creates a new stream accumulator.
func NewStreamAccumulator() *StreamAccumulator {
return &StreamAccumulator{
metadata: make(map[string]any),
}
}
// AddChunk adds a chunk to the accumulator.
func (a *StreamAccumulator) AddChunk(chunk StreamChunk) {
a.mu.Lock()
defer a.mu.Unlock()
if len(a.chunks) == 0 {
a.firstChunk = chunk.Timestamp
}
a.lastChunk = chunk.Timestamp
a.chunks = append(a.chunks, chunk)
a.content.WriteString(chunk.Content)
a.totalTokens += chunk.TokenCount
if chunk.FinishReason != "" {
a.finishReason = chunk.FinishReason
}
for k, v := range chunk.Metadata {
a.metadata[k] = v
}
}
// Content returns the accumulated content.
func (a *StreamAccumulator) Content() string {
a.mu.Lock()
defer a.mu.Unlock()
return a.content.String()
}
// TotalTokens returns the total token count.
func (a *StreamAccumulator) TotalTokens() int {
a.mu.Lock()
defer a.mu.Unlock()
return a.totalTokens
}
// ChunkCount returns the number of chunks received.
func (a *StreamAccumulator) ChunkCount() int {
a.mu.Lock()
defer a.mu.Unlock()
return len(a.chunks)
}
// Duration returns the duration from first to last chunk.
func (a *StreamAccumulator) Duration() time.Duration {
a.mu.Lock()
defer a.mu.Unlock()
if a.firstChunk.IsZero() {
return 0
}
return a.lastChunk.Sub(a.firstChunk)
}
// TimeToFirstChunk returns the time to first chunk (should be set externally).
func (a *StreamAccumulator) TimeToFirstChunk(streamStart time.Time) time.Duration {
a.mu.Lock()
defer a.mu.Unlock()
if a.firstChunk.IsZero() {
return 0
}
return a.firstChunk.Sub(streamStart)
}
// FinishReason returns the finish reason.
func (a *StreamAccumulator) FinishReason() string {
a.mu.Lock()
defer a.mu.Unlock()
return a.finishReason
}
// Metadata returns the accumulated metadata.
func (a *StreamAccumulator) Metadata() map[string]any {
a.mu.Lock()
defer a.mu.Unlock()
result := make(map[string]any, len(a.metadata))
for k, v := range a.metadata {
result[k] = v
}
return result
}
// ToOutput returns the accumulator as a span output.
func (a *StreamAccumulator) ToOutput() map[string]any {
a.mu.Lock()
defer a.mu.Unlock()
return map[string]any{
"content": a.content.String(),
"chunk_count": len(a.chunks),
"total_tokens": a.totalTokens,
"finish_reason": a.finishReason,
}
}
// StreamingSpan wraps a span for streaming operations.
type StreamingSpan struct {
span *Span
accumulator *StreamAccumulator
startTime time.Time
onChunk func(chunk StreamChunk)
}
// NewStreamingSpan creates a new streaming span wrapper.
func NewStreamingSpan(span *Span) *StreamingSpan {
return &StreamingSpan{
span: span,
accumulator: NewStreamAccumulator(),
startTime: time.Now(),
}
}
// OnChunk sets a callback for when chunks are received.
func (s *StreamingSpan) OnChunk(fn func(chunk StreamChunk)) {
s.onChunk = fn
}
// AddChunk adds a chunk and tracks it.
func (s *StreamingSpan) AddChunk(content string, opts ...StreamChunkOption) {
chunk := StreamChunk{
Content: content,
Index: s.accumulator.ChunkCount(),
IsFirst: s.accumulator.ChunkCount() == 0,
Timestamp: time.Now(),
Metadata: make(map[string]any),
}
for _, opt := range opts {
opt(&chunk)
}
s.accumulator.AddChunk(chunk)
if s.onChunk != nil {
s.onChunk(chunk)
}
}
// StreamChunkOption configures a stream chunk.
type StreamChunkOption func(*StreamChunk)
// WithChunkTokenCount sets the token count for a chunk.
func WithChunkTokenCount(count int) StreamChunkOption {
return func(c *StreamChunk) {
c.TokenCount = count
}
}
// WithChunkFinishReason sets the finish reason.
func WithChunkFinishReason(reason string) StreamChunkOption {
return func(c *StreamChunk) {
c.FinishReason = reason
c.IsLast = true
}
}
// WithChunkMetadata adds metadata to a chunk.
func WithChunkMetadata(key string, value any) StreamChunkOption {
return func(c *StreamChunk) {
c.Metadata[key] = value
}
}
// End ends the streaming span with accumulated data.
func (s *StreamingSpan) End(ctx context.Context, opts ...SpanOption) error {
// Add streaming metadata
metadata := map[string]any{
"streaming": true,
"chunk_count": s.accumulator.ChunkCount(),
"time_to_first_chunk": s.accumulator.TimeToFirstChunk(s.startTime).Milliseconds(),
"stream_duration_ms": s.accumulator.Duration().Milliseconds(),
"total_tokens": s.accumulator.TotalTokens(),
}
allOpts := append([]SpanOption{
WithSpanOutput(s.accumulator.ToOutput()),
WithSpanMetadata(metadata),
}, opts...)
return s.span.End(ctx, allOpts...)
}
// Span returns the underlying span.
func (s *StreamingSpan) Span() *Span {
return s.span
}
// Accumulator returns the stream accumulator.
func (s *StreamingSpan) Accumulator() *StreamAccumulator {
return s.accumulator
}
// ID returns the span ID.
func (s *StreamingSpan) ID() string {
return s.span.ID()
}
// TraceID returns the trace ID.
func (s *StreamingSpan) TraceID() string {
return s.span.TraceID()
}
// StartStreamingSpan starts a new span configured for streaming.
func StartStreamingSpan(ctx context.Context, name string, opts ...SpanOption) (context.Context, *StreamingSpan, error) {
// Add streaming metadata to options
allOpts := append([]SpanOption{
WithSpanMetadata(map[string]any{"streaming": true}),
}, opts...)
newCtx, span, err := StartSpan(ctx, name, allOpts...)
if err != nil {
return ctx, nil, err
}
streamingSpan := NewStreamingSpan(span)
return newCtx, streamingSpan, nil
}
// StreamHandler is a generic interface for processing streams.
type StreamHandler interface {
// HandleChunk processes a chunk of data.
HandleChunk(chunk StreamChunk) error
// Finalize is called when the stream ends.
Finalize() error
}
// TracingStreamHandler wraps a StreamHandler to add tracing.
type TracingStreamHandler struct {
inner StreamHandler
streamingSpan *StreamingSpan
}
// NewTracingStreamHandler creates a tracing stream handler.
func NewTracingStreamHandler(inner StreamHandler, span *Span) *TracingStreamHandler {
return &TracingStreamHandler{
inner: inner,
streamingSpan: NewStreamingSpan(span),
}
}
// HandleChunk processes a chunk and tracks it.
func (h *TracingStreamHandler) HandleChunk(chunk StreamChunk) error {
h.streamingSpan.accumulator.AddChunk(chunk)
return h.inner.HandleChunk(chunk)
}
// Finalize ends the stream and the span.
func (h *TracingStreamHandler) Finalize() error {
return h.inner.Finalize()
}
// StreamingSpan returns the streaming span for ending.
func (h *TracingStreamHandler) StreamingSpan() *StreamingSpan {
return h.streamingSpan
}
// BufferingStreamHandler buffers chunks and calls a callback with complete content.
type BufferingStreamHandler struct {
accumulator *StreamAccumulator
onComplete func(content string) error
onChunk func(chunk StreamChunk) error
}
// NewBufferingStreamHandler creates a buffering handler.
func NewBufferingStreamHandler(onComplete func(content string) error) *BufferingStreamHandler {
return &BufferingStreamHandler{
accumulator: NewStreamAccumulator(),
onComplete: onComplete,
}
}
// OnChunk sets a callback for each chunk.
func (h *BufferingStreamHandler) OnChunk(fn func(chunk StreamChunk) error) {
h.onChunk = fn
}
// HandleChunk adds a chunk to the buffer.
func (h *BufferingStreamHandler) HandleChunk(chunk StreamChunk) error {
h.accumulator.AddChunk(chunk)
if h.onChunk != nil {
return h.onChunk(chunk)
}
return nil
}
// Finalize calls the completion callback with buffered content.
func (h *BufferingStreamHandler) Finalize() error {
if h.onComplete != nil {
return h.onComplete(h.accumulator.Content())
}
return nil
}
// Content returns the current buffered content.
func (h *BufferingStreamHandler) Content() string {
return h.accumulator.Content()
}
// Accumulator returns the stream accumulator.
func (h *BufferingStreamHandler) Accumulator() *StreamAccumulator {
return h.accumulator
}