Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6b231dc
introduce retry metadata to batch struct
agarakan Dec 30, 2025
8521373
Remove unused reset method
agarakan Dec 30, 2025
7244af8
add unit tests for retryMetadata
agarakan Dec 30, 2025
d2f21e1
fix lint
agarakan Dec 30, 2025
66186a7
Introduce retryHeap and retryHeapProcessor
agarakan Dec 30, 2025
83224b4
Exchange pushch for semaphor to enformce heap size and blocking
agarakan Dec 30, 2025
7cfc794
Add conditional logic to sender to call batch.Fail() during concurrency
agarakan Dec 30, 2025
b4ffd7a
Add unit tests
agarakan Dec 30, 2025
0e4b0bc
Instantiate RetryHeap and RetryHeapProcessor if concurrency enabled
agarakan Dec 30, 2025
9c1332a
Add unit tests for retryheap instantiation
agarakan Dec 30, 2025
dddb691
Update sender to reference retryHeap to call push on fail
agarakan Dec 30, 2025
02bc5c6
Add unit tests for sender logic
agarakan Dec 31, 2025
ef7d627
Implement halt on target logic
agarakan Dec 31, 2025
309f904
lint
agarakan Dec 31, 2025
d0727b6
Merge branch 'enable-multithreaded-logging-by-default' into sender-bl…
the-mann Feb 9, 2026
800c06b
Merge branch 'enable-multithreaded-logging-by-default' into sender-bl…
the-mann Feb 9, 2026
d9296a6
lint
the-mann Feb 9, 2026
a5621fc
Merge remote-tracking branch 'origin/sender-block-on-failure' into se…
the-mann Feb 9, 2026
7051a0c
fix tests
the-mann Feb 9, 2026
fd185db
Fix race condition in RetryHeap Stop and Push methods
the-mann Feb 9, 2026
d79ae7f
Add failing test for circuit breaker resume on batch expiry
the-mann Feb 9, 2026
de410f1
lx
the-mann Feb 9, 2026
28ba902
test(pusher): Add automated recovery tests for poison pill
the-mann Feb 10, 2026
11b1d26
Add test filtering to integration test workflows
the-mann Feb 11, 2026
78c947b
Merge remote-tracking branch 'origin/main' into sender-block-on-failure
the-mann Feb 11, 2026
60b6f49
Remove test filtering feature (moved to separate PR)
the-mann Feb 11, 2026
1b1973b
Trigger PR diff refresh
the-mann Feb 11, 2026
8a4960f
Merge remote-tracking branch 'origin/enable-multithreaded-logging-by-…
the-mann Feb 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions plugins/outputs/cloudwatchlogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ func (c *CloudWatchLogs) Close() error {
c.workerPool.Stop()
}

if c.retryHeapProcessor != nil {
c.retryHeapProcessor.Stop()
}

if c.retryHeap != nil {
c.retryHeap.Stop()
}

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions plugins/outputs/cloudwatchlogs/internal/pusher/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ type logEventBatch struct {
retryCountLong int // Number of retries using long delay strategy
startTime time.Time // Time of first request (for max retry duration calculation)
nextRetryTime time.Time // When this batch should be retried next
lastError error // Last error encountered
}

func newLogEventBatch(target Target, entityProvider logs.LogEntityProvider) *logEventBatch {
Expand Down Expand Up @@ -261,6 +262,9 @@ func (b *logEventBatch) initializeStartTime() {
// updateRetryMetadata updates the retry metadata after a failed send attempt.
// It increments the appropriate retry counter based on the error type and calculates the next retry time.
func (b *logEventBatch) updateRetryMetadata(err error) {
// Store the error
b.lastError = err

// Determine retry strategy and increment counter
var wait time.Duration
if chooseRetryWaitStrategy(err) == retryLong {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func TestBatchRetryMetadata(t *testing.T) {
batch.updateRetryMetadata(err)
assert.Equal(t, 1, batch.retryCountShort)
assert.Equal(t, 0, batch.retryCountLong)
assert.Equal(t, err, batch.lastError)
assert.False(t, batch.nextRetryTime.IsZero())

// Test isReadyForRetry - should be false immediately after retry metadata update
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package pusher

import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
"github.com/aws/amazon-cloudwatch-agent/tool/testutil"
)

// TestCircuitBreakerBlocksTargetAfterFailure verifies that when a batch fails
// for a target, the circuit breaker prevents additional batches from that target
// from being sent until the failing batch is retried successfully.
//
// Without a circuit breaker, a problematic target continues producing new batches
// that flood the SenderQueue/WorkerPool, starving healthy targets.
func TestCircuitBreakerBlocksTargetAfterFailure(t *testing.T) {
logger := testutil.NewNopLogger()

failingTarget := Target{Group: "failing-group", Stream: "stream"}
healthyTarget := Target{Group: "healthy-group", Stream: "stream"}

var failingTargetSendCount atomic.Int32
var healthyTargetSendCount atomic.Int32

service := &stubLogsService{
ple: func(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
if *input.LogGroupName == failingTarget.Group {
failingTargetSendCount.Add(1)
return nil, &cloudwatchlogs.ServiceUnavailableException{}
}
healthyTargetSendCount.Add(1)
return &cloudwatchlogs.PutLogEventsOutput{}, nil
},
cls: func(_ *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) {
return &cloudwatchlogs.CreateLogStreamOutput{}, nil
},
clg: func(_ *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) {
return &cloudwatchlogs.CreateLogGroupOutput{}, nil
},
dlg: func(_ *cloudwatchlogs.DescribeLogGroupsInput) (*cloudwatchlogs.DescribeLogGroupsOutput, error) {
return &cloudwatchlogs.DescribeLogGroupsOutput{}, nil
},
}

concurrency := 5
workerPool := NewWorkerPool(concurrency)
retryHeap := NewRetryHeap(concurrency, logger)
defer workerPool.Stop()
defer retryHeap.Stop()

tm := NewTargetManager(logger, service)

var wg sync.WaitGroup
flushTimeout := 50 * time.Millisecond
retryDuration := time.Hour

failingPusher := NewPusher(logger, failingTarget, service, tm, nil, workerPool, flushTimeout, retryDuration, &wg, retryHeap)
healthyPusher := NewPusher(logger, healthyTarget, service, tm, nil, workerPool, flushTimeout, retryDuration, &wg, retryHeap)
defer failingPusher.Stop()
defer healthyPusher.Stop()

now := time.Now()

// Send events to both targets. The failing target will fail on PutLogEvents,
// and the circuit breaker should block it from sending more batches.
for i := 0; i < 10; i++ {
failingPusher.AddEvent(newStubLogEvent("fail", now))
healthyPusher.AddEvent(newStubLogEvent("ok", now))
}

// Wait for flushes to occur
time.Sleep(500 * time.Millisecond)

// Send more events - the failing target should be blocked by circuit breaker
for i := 0; i < 10; i++ {
failingPusher.AddEvent(newStubLogEvent("fail-more", now))
healthyPusher.AddEvent(newStubLogEvent("ok-more", now))
}

time.Sleep(500 * time.Millisecond)

// Circuit breaker assertion: after the first failure, the failing target should
// NOT have sent additional batches. Only 1 send attempt should have been made
// before the circuit breaker blocks it.
assert.LessOrEqual(t, failingTargetSendCount.Load(), int32(1),
"Circuit breaker should block failing target from sending more than 1 batch, "+
"but %d batches were sent. Without a circuit breaker, the failing target "+
"continues flooding the worker pool with bad requests.", failingTargetSendCount.Load())

// Healthy target should continue sending successfully
assert.Greater(t, healthyTargetSendCount.Load(), int32(0),
"Healthy target should continue sending while failing target is blocked")
}
26 changes: 12 additions & 14 deletions plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,22 +135,20 @@ func TestSenderPool(t *testing.T) {
assert.Equal(t, int32(200), completed.Load())
}

func TestSenderPoolRetryHeap(t *testing.T) {
assert.NotPanics(t, func() {
logger := testutil.NewNopLogger()
mockService := new(mockLogsService)
mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil)
func TestSenderPoolRetryHeap(_ *testing.T) {
logger := testutil.NewNopLogger()
mockService := new(mockLogsService)
mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil)

// Create RetryHeap
retryHeap := NewRetryHeap(10, logger)
defer retryHeap.Stop()
// Create RetryHeap
retryHeap := NewRetryHeap(10, logger)
defer retryHeap.Stop()

s := newSender(logger, mockService, nil, time.Second, retryHeap)
p := NewWorkerPool(12)
defer p.Stop()
s := newSender(logger, mockService, nil, time.Second, retryHeap)
p := NewWorkerPool(12)
defer p.Stop()

sp := newSenderPool(p, s)
sp := newSenderPool(p, s)

sp.Stop()
})
sp.Stop()
}
43 changes: 43 additions & 0 deletions plugins/outputs/cloudwatchlogs/internal/pusher/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type queue struct {
initNonBlockingChOnce sync.Once
startNonBlockCh chan struct{}
wg *sync.WaitGroup

// Circuit breaker halt/resume functionality
haltCond *sync.Cond
halted bool
}

var _ (Queue) = (*queue)(nil)
Expand All @@ -67,6 +71,8 @@ func newQueue(
stopCh: make(chan struct{}),
startNonBlockCh: make(chan struct{}),
wg: wg,
haltCond: sync.NewCond(&sync.Mutex{}),
halted: false,
}
q.flushTimeout.Store(flushTimeout)
q.wg.Add(1)
Expand Down Expand Up @@ -175,6 +181,11 @@ func (q *queue) merge(mergeChan chan logs.LogEvent) {
func (q *queue) send() {
if len(q.batch.events) > 0 {
q.batch.addDoneCallback(q.onSuccessCallback(q.batch.bufferedSize))
q.batch.addFailCallback(q.onFailCallback())

// Wait if halted (circuit breaker)
q.waitIfHalted()

q.sender.Send(q.batch)
q.batch = newLogEventBatch(q.target, q.entityProvider)
}
Expand All @@ -183,6 +194,7 @@ func (q *queue) send() {
// onSuccessCallback returns a callback function to be executed after a successful send.
func (q *queue) onSuccessCallback(bufferedSize int) func() {
return func() {
q.resume() // Resume queue on success
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Say a bad batch from a target caused this to halt. Now that bad batch is re-tried for 14 days and eventually dropped - but this never gets resumed in that case right? So this target is blocked forever in that scenario?

Copy link
Copy Markdown
Contributor Author

@agarakan agarakan Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes valid, the design details a resume on batch expiry to avoid this. I missed including that here, added now

Copy link
Copy Markdown
Contributor

@the-mann the-mann Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q.lastSentTime.Store(time.Now())
go q.addStats("rawSize", float64(bufferedSize))
q.resetFlushTimer()
Expand Down Expand Up @@ -245,3 +257,34 @@ func hasValidTime(e logs.LogEvent) bool {
}
return true
}

// waitIfHalted blocks until the queue is unhalted (circuit breaker functionality)
func (q *queue) waitIfHalted() {
q.haltCond.L.Lock()
for q.halted {
q.haltCond.Wait()
}
q.haltCond.L.Unlock()
}

// halt stops the queue from sending batches (called on failure)
func (q *queue) halt() {
q.haltCond.L.Lock()
q.halted = true
q.haltCond.L.Unlock()
}

// resume allows the queue to send batches again (called on success)
func (q *queue) resume() {
q.haltCond.L.Lock()
q.halted = false
q.haltCond.Broadcast()
q.haltCond.L.Unlock()
}

// onFailCallback returns a callback function to be executed after a failed send
func (q *queue) onFailCallback() func() {
return func() {
q.halt()
}
}
67 changes: 67 additions & 0 deletions plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,8 @@ func TestQueueCallbackRegistration(t *testing.T) {
flushTimer: time.NewTimer(10 * time.Millisecond),
startNonBlockCh: make(chan struct{}),
wg: &wg,
haltCond: sync.NewCond(&sync.Mutex{}),
halted: false,
}
q.flushTimeout.Store(10 * time.Millisecond)

Expand Down Expand Up @@ -801,6 +803,8 @@ func TestQueueCallbackRegistration(t *testing.T) {
flushTimer: time.NewTimer(10 * time.Millisecond),
startNonBlockCh: make(chan struct{}),
wg: &wg,
haltCond: sync.NewCond(&sync.Mutex{}),
halted: false,
}
q.flushTimeout.Store(10 * time.Millisecond)

Expand All @@ -814,3 +818,66 @@ func TestQueueCallbackRegistration(t *testing.T) {
mockSender.AssertExpectations(t)
})
}
func TestQueueHaltResume(t *testing.T) {
logger := testutil.NewNopLogger()

var sendCount atomic.Int32
mockSender := &mockSender{}
mockSender.On("Send", mock.Anything).Run(func(args mock.Arguments) {
sendCount.Add(1)
batch := args.Get(0).(*logEventBatch)
// Simulate failure on first call, success on second
if sendCount.Load() == 1 {
batch.fail() // This should halt the queue
} else {
batch.done() // This should resume the queue
}
}).Return()

var wg sync.WaitGroup
q := newQueue(logger, Target{"G", "S", util.StandardLogGroupClass, -1}, 10*time.Millisecond, nil, mockSender, &wg)
defer q.Stop()

// Add first event - should trigger send and halt
q.AddEvent(newStubLogEvent("first message", time.Now()))

// Wait a bit for the first send to complete and halt
time.Sleep(50 * time.Millisecond)

// Add second event - should be queued but not sent due to halt
q.AddEvent(newStubLogEvent("second message", time.Now()))

// Verify only one send happened (queue is halted)
assert.Equal(t, int32(1), sendCount.Load(), "Should have only one send due to halt")

// Trigger flush to force send of second batch - this should block until resumed
done := make(chan bool)
go func() {
time.Sleep(100 * time.Millisecond) // Wait a bit
// Manually resume by calling success callback on a dummy batch
dummyBatch := newLogEventBatch(Target{"G", "S", util.StandardLogGroupClass, -1}, nil)
dummyBatch.addDoneCallback(func() {
// This simulates a successful send that should resume the queue
})
dummyBatch.done()
done <- true
}()

// This should eventually complete when the queue is resumed
select {
case <-done:
// Success - the resume worked
case <-time.After(5 * time.Second):
t.Fatal("Test timed out - queue may be permanently halted")
}

mockSender.AssertExpectations(t)
}

// TestQueueResumeOnBatchExpiry verifies that when a batch expires after 14 days of retrying,
// the circuit breaker resumes the queue to allow new batches to be processed.
// This prevents the target from being permanently blocked when a bad batch is eventually dropped.
//
// Scenario from PR comment: "Say a bad batch from a target caused this to halt. Now that bad batch
// is re-tried for 14 days and eventually dropped - but this never gets resumed in that case right?
// So this target is blocked forever in that scenario?"
Loading