Thread-safe, generic streaming for async producer-consumer patterns (internal/utils/stream/).
Related Documentation:
- HTTP Requests - HTTP streams using
RequestAndParseStream - Generic Types - Stream type safety patterns
- Cache Operations - Pub/sub streaming patterns
The Stream[T] type provides buffered, type-safe communication between goroutines with backpressure control.
// Create with buffer size (max items before blocking)
s := stream.NewStream[T](128)
defer s.Close()// Non-blocking write (errors if full)
err := s.Write(data)
// Blocking write (waits for space)
s.WriteBlocking(data)
// Propagate errors
s.WriteError(err)
// Signal completion
s.Close()// Standard iteration pattern
for s.Next() { // Blocks waiting for data/close
data, err := s.Read()
if err != nil {
if err == stream.ErrEmpty {
continue
}
// Handle actual error
break
}
// Process data
}// Cleanup on close
s.OnClose(func() {
// Release resources
})
// Pre-close operations
s.BeforeClose(func() {
// Finalize state
})// Add validation/transformation
s.Filter(func(data T) error {
if !isValid(data) {
return errors.New("invalid data")
}
return nil
})// Process all items
err := s.Async(func(data T) {
processItem(data)
})s.IsClosed() // Check if closed
s.Size() // Current buffer sizefunc handlePluginResponse(pluginOutput <-chan Chunk) *stream.Stream[Chunk] {
response := stream.NewStream[Chunk](128)
go func() {
defer response.Close()
for chunk := range pluginOutput {
if err := response.Write(chunk); err != nil {
response.WriteError(err)
return
}
}
}()
return response
}func handleSSE(ctx *gin.Context, dataStream *stream.Stream[[]byte]) {
ctx.Header("Content-Type", "text/event-stream")
for dataStream.Next() {
data, err := dataStream.Read()
if err != nil {
ctx.SSEvent("error", err.Error())
return
}
ctx.SSEvent("data", string(data))
ctx.Writer.Flush()
}
}func processWithValidation(input *stream.Stream[Data]) *stream.Stream[Result] {
output := stream.NewStream[Result](64)
go func() {
defer output.Close()
for input.Next() {
data, err := input.Read()
if err != nil {
output.WriteError(err)
return
}
result, err := validate(data)
if err != nil {
output.WriteError(err)
return
}
output.Write(result)
}
}()
return output
}files := make(map[string]*bytes.Buffer)
for response.Next() {
chunk, _ := response.Read()
if chunk.Type == "blob_chunk" {
id := chunk.ID
if chunk.End {
// Complete file
completeFile := files[id].Bytes()
processFile(completeFile)
delete(files, id)
} else {
// Accumulate chunks
if files[id] == nil {
files[id] = bytes.NewBuffer(nil)
}
files[id].Write(chunk.Data)
}
}
}- Uses
dequefor efficient FIFO operations - Thread-safe with mutex protection
- Signal channel for blocking consumers
- Atomic operations for close state
- Condition variable for blocking writers