Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions plugins/outputs/cloudwatchlogs/internal/pusher/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ type logEventBatch struct {
// Callbacks specifically for updating state
stateCallbacks []func()
batchers map[string]*state.RangeQueueBatcher

// Retry metadata
retryCountShort int // Number of retries using short delay strategy
retryCountLong int // Number of retries using long delay strategy
startTime time.Time // Time of first request (for max retry duration calculation)
nextRetryTime time.Time // When this batch should be retried next
}

func newLogEventBatch(target Target, entityProvider logs.LogEntityProvider) *logEventBatch {
Expand Down Expand Up @@ -226,3 +232,43 @@ func (t byTimestamp) Swap(i, j int) {
func (t byTimestamp) Less(i, j int) bool {
return *t[i].Timestamp < *t[j].Timestamp
}

// initializeStartTime sets the start time if not already set.
func (b *logEventBatch) initializeStartTime() {
if b.startTime.IsZero() {
b.startTime = time.Now()
}
}
Comment thread
agarakan marked this conversation as resolved.

// updateRetryMetadata updates the retry metadata after a failed send attempt.
// It increments the appropriate retry counter based on the error type and calculates the next retry time.
func (b *logEventBatch) updateRetryMetadata(err error) {
// Determine retry strategy and increment counter
var wait time.Duration
if chooseRetryWaitStrategy(err) == retryLong {
wait = retryWaitLong(b.retryCountLong)
b.retryCountLong++
} else {
wait = retryWaitShort(b.retryCountShort)
b.retryCountShort++
}

// Calculate next retry time
b.nextRetryTime = time.Now().Add(wait)
}

// isExpired checks if the batch has exceeded the maximum retry duration.
func (b *logEventBatch) isExpired(maxRetryDuration time.Duration) bool {
Comment thread
agarakan marked this conversation as resolved.
if b.startTime.IsZero() {
return false
}
return time.Since(b.startTime) > maxRetryDuration
}

// isReadyForRetry checks if enough time has passed since the last failure to retry this batch.
func (b *logEventBatch) isReadyForRetry() bool {
if b.nextRetryTime.IsZero() {
return true
}
return time.Now().After(b.nextRetryTime)
}
31 changes: 31 additions & 0 deletions plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,3 +404,34 @@ func TestValidateAndTruncateMessage(t *testing.T) {
})
}
}
func TestBatchRetryMetadata(t *testing.T) {
Comment thread
agarakan marked this conversation as resolved.
target := Target{Group: "test-group", Stream: "test-stream"}
batch := newLogEventBatch(target, nil)

// Test initial state
assert.True(t, batch.startTime.IsZero())
assert.True(t, batch.isReadyForRetry())
assert.False(t, batch.isExpired(time.Hour))

// Test initializeStartTime
batch.initializeStartTime()
assert.False(t, batch.startTime.IsZero())

// Test updateRetryMetadata
err := assert.AnError
batch.updateRetryMetadata(err)
assert.Equal(t, 1, batch.retryCountShort)
assert.Equal(t, 0, batch.retryCountLong)
assert.False(t, batch.nextRetryTime.IsZero())

// Test isReadyForRetry - should be false immediately after retry metadata update
assert.False(t, batch.isReadyForRetry())

// Test isReadyForRetry - should be true after nextRetryTime passes
batch.nextRetryTime = time.Now().Add(-1 * time.Second) // Set to past time
assert.True(t, batch.isReadyForRetry())

// Test isExpired
batch.startTime = time.Now().Add(-25 * time.Hour)
assert.True(t, batch.isExpired(24*time.Hour))
}
35 changes: 18 additions & 17 deletions plugins/outputs/cloudwatchlogs/internal/pusher/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ func (s *sender) Send(batch *logEventBatch) {
if len(batch.events) == 0 {
return
}

// Initialize start time before build()
batch.initializeStartTime()
input := batch.build()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we start to use the retry heap, we're going to have to think about storing the built input on the batch.

startTime := time.Now()

retryCountShort := 0
retryCountLong := 0
for {
output, err := s.service.PutLogEvents(input)
if err == nil {
Expand All @@ -84,7 +84,7 @@ func (s *sender) Send(batch *logEventBatch) {
}
}
batch.done()
s.logger.Debugf("Pusher published %v log events to group: %v stream: %v with size %v KB in %v.", len(batch.events), batch.Group, batch.Stream, batch.bufferedSize/1024, time.Since(startTime))
s.logger.Debugf("Pusher published %v log events to group: %v stream: %v with size %v KB in %v.", len(batch.events), batch.Group, batch.Stream, batch.bufferedSize/1024, time.Since(batch.startTime))
return
}

Expand All @@ -110,27 +110,28 @@ func (s *sender) Send(batch *logEventBatch) {
s.logger.Errorf("Aws error received when sending logs to %v/%v: %v", batch.Group, batch.Stream, awsErr)
}

// retry wait strategy depends on the type of error returned
var wait time.Duration
if chooseRetryWaitStrategy(err) == retryLong {
wait = retryWaitLong(retryCountLong)
retryCountLong++
} else {
wait = retryWaitShort(retryCountShort)
retryCountShort++
}
// Update retry metadata in the batch
batch.updateRetryMetadata(err)

if time.Since(startTime)+wait > s.RetryDuration() {
s.logger.Errorf("All %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, batch.Group, batch.Stream)
// Check if retry would exceed max duration
totalRetries := batch.retryCountShort + batch.retryCountLong - 1
if batch.nextRetryTime.After(batch.startTime.Add(s.RetryDuration())) {
s.logger.Errorf("All %v retries to %v/%v failed for PutLogEvents, request dropped.", totalRetries, batch.Group, batch.Stream)
batch.updateState()
return
}

s.logger.Warnf("Retried %v time, going to sleep %v before retrying.", retryCountShort+retryCountLong-1, wait)
// Calculate wait time until next retry
wait := time.Until(batch.nextRetryTime)
if wait < 0 {
wait = 0
}

s.logger.Warnf("Retried %v time, going to sleep %v before retrying.", totalRetries, wait)

select {
case <-s.stopCh:
s.logger.Errorf("Stop requested after %v retries to %v/%v failed for PutLogEvents, request dropped.", retryCountShort+retryCountLong-1, batch.Group, batch.Stream)
s.logger.Errorf("Stop requested after %v retries to %v/%v failed for PutLogEvents, request dropped.", totalRetries, batch.Group, batch.Stream)
batch.updateState()
return
case <-time.After(wait):
Expand Down