File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change 44 "context"
55 "errors"
66 "math"
7+ "sync"
78 "time"
89)
910
@@ -31,6 +32,9 @@ type ReadSourceChannel struct {
3132
3233 // Flag that the channel is already running
3334 started bool
35+
36+ // WaitGroup to wait for the goroutine to finish on Stop/Close
37+ wg sync.WaitGroup
3438}
3539
3640func NewReadSourceChannel (sourceReader SourceReader ) * ReadSourceChannel {
@@ -52,7 +56,10 @@ func (c *ReadSourceChannel) Start(ctx context.Context) {
5256 ctx , cancel := context .WithCancel (ctx )
5357 c .cancel = cancel
5458
59+ c .wg .Add (1 )
5560 go func () {
61+ defer c .wg .Done ()
62+
5663 // Track if we've received end of input
5764 atEOI := false
5865
@@ -115,13 +122,15 @@ func (c *ReadSourceChannel) Stop() {
115122 // Cancel the context to stop the loop
116123 if c .cancel != nil {
117124 c .cancel ()
125+ c .wg .Wait ()
118126 c .cancel = nil
119127 }
120128}
121129
122130func (c * ReadSourceChannel ) Close () {
123131 if c .cancel != nil {
124132 c .cancel ()
133+ c .wg .Wait ()
125134 }
126135
127136 close (c .C )
You can’t perform that action at this time.
0 commit comments