Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pkg/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,22 @@ func StopTimer(timer *time.Timer) {
}
}

// flushCh returns all data currently queued in channel
func flushCh(ch <-chan []byte) []byte {
res := []byte{}
for {
select {
case readData, ok := <-ch:
if !ok {
return res
}
res = append(res, readData...)
default:
return res
}
}
}

// GenericReadX reads from readCh till expr matched, exceeded time or read more than size.
// Returns error if nothing was read during readTimeout or ctx was Done
// readSize - maximum read size
Expand All @@ -251,6 +267,7 @@ func GenericReadX(ctx context.Context, inBuffer []byte, readCh chan []byte, read
select {
case <-ctx.Done():
StopTimer(maxDurationTimeout)
buffer = append(buffer, flushCh(readCh)...)
return nil, buffer, buffer[len(inBuffer):], multierr.Combine(ctx.Err(), ThrowReadTimeoutException(GetLastBytes(buffer, readSize)))
default:
}
Expand Down Expand Up @@ -282,6 +299,7 @@ func GenericReadX(ctx context.Context, inBuffer []byte, readCh chan []byte, read
case <-ctx.Done():
StopTimer(readIterTimeout)
StopTimer(maxDurationTimeout)
buffer = append(buffer, flushCh(readCh)...)
return nil, buffer, buffer[len(inBuffer):], multierr.Combine(ctx.Err(), ThrowReadTimeoutException(GetLastBytes(buffer, readSize)))
case readData, ok := <-readCh:
StopTimer(readIterTimeout)
Expand Down Expand Up @@ -312,6 +330,7 @@ func GenericReadX(ctx context.Context, inBuffer []byte, readCh chan []byte, read
return NewReadXRes(Timeout, buffer, nil, []byte{}), buffer, buffer[len(inBuffer):], nil
case <-readIterTimeout.C:
StopTimer(maxDurationTimeout)
buffer = append(buffer, flushCh(readCh)...)
return nil, buffer, buffer[len(inBuffer):], ThrowReadTimeoutException(GetLastBytes(buffer, readSize))
}
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/streamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package streamer

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/annetutil/gnetcli/pkg/expr"
)
Expand Down Expand Up @@ -83,6 +85,23 @@ func setupChan(data []byte) chan []byte {
return ch
}

func TestGenericReadXCtxDoneFlushChannel(t *testing.T) {
ch := make(chan []byte, 1)
ch <- []byte("pending in channel")

ctx, cancel := context.WithCancel(context.Background())
cancel()

pat := expr.NewSimpleExpr().FromPattern("never-matches")
_, _, _, err := GenericReadX(ctx, nil, ch, 4096, time.Second, pat, 0, 0)
require.Error(t, err)

var rt *ReadTimeoutException
require.True(t, errors.As(err, &rt))
require.Equal(t, "pending in channel", string(rt.LastRead))
require.Empty(t, ch, "channel should be drained")
}

func TestGenericSplitBytes(t *testing.T) {
a, b := splitBytes([]byte("1234"), 2)
assert.Equal(t, []byte("12"), a)
Expand Down
Loading