diff --git a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go index 770ef5e3f9..ddc4bed453 100644 --- a/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go +++ b/plugins/outputs/cloudwatchlogs/cloudwatchlogs.go @@ -153,7 +153,7 @@ func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest { } c.targetManager = pusher.NewTargetManager(c.Log, client) }) - p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, &c.pusherWaitGroup) + p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, &c.pusherWaitGroup, c.Concurrency) cwd := &cwDest{ pusher: p, retryer: logThrottleRetryer, diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go b/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go index a0013e785e..3c83be15a0 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/batch.go @@ -100,7 +100,9 @@ type logEventBatch struct { doneCallbacks []func() // Callbacks specifically for updating state stateCallbacks []func() - batchers map[string]*state.RangeQueueBatcher + // Callbacks to execute when batch fails (for circuit breaker notification) + failCallbacks []func() + batchers map[string]*state.RangeQueueBatcher // Retry metadata retryCountShort int // Number of retries using short delay strategy @@ -182,6 +184,13 @@ func (b *logEventBatch) addStateCallback(callback func()) { } } +// addFailCallback adds the callback to the end of the registered fail callbacks. +func (b *logEventBatch) addFailCallback(callback func()) { + if callback != nil { + b.failCallbacks = append(b.failCallbacks, callback) + } +} + // done runs all registered callbacks, including both success callbacks and state callbacks. func (b *logEventBatch) done() { b.updateState() @@ -203,6 +212,15 @@ func (b *logEventBatch) updateState() { } } +// fail runs fail callbacks to notify upstream components of batch failure. +// This is used for circuit breaker notification when a batch fails. +func (b *logEventBatch) fail() { + for i := len(b.failCallbacks) - 1; i >= 0; i-- { + callback := b.failCallbacks[i] + callback() + } +} + // build creates a cloudwatchlogs.PutLogEventsInput from the batch. The log events in the batch must be in // chronological order by their timestamp. func (b *logEventBatch) build() *cloudwatchlogs.PutLogEventsInput { diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go index d9f3860967..94fe4b6713 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go @@ -107,7 +107,7 @@ func TestSenderPool(t *testing.T) { logger := testutil.NewNopLogger() mockService := new(mockLogsService) mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil) - s := newSender(logger, mockService, nil, time.Second) + s := newSender(logger, mockService, nil, time.Second, false) p := NewWorkerPool(12) sp := newSenderPool(p, s) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go index 57256ae033..e833868931 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go @@ -34,8 +34,10 @@ func NewPusher( flushTimeout time.Duration, retryDuration time.Duration, wg *sync.WaitGroup, + concurrency int, ) *Pusher { - s := createSender(logger, service, targetManager, workerPool, retryDuration) + concurrencyEnabled := concurrency > 1 + s := createSender(logger, service, targetManager, workerPool, retryDuration, concurrencyEnabled) q := newQueue(logger, target, flushTimeout, entityProvider, s, wg) targetManager.PutRetentionPolicy(target) return &Pusher{ @@ -60,8 +62,9 @@ func createSender( targetManager TargetManager, workerPool WorkerPool, retryDuration time.Duration, + concurrencyEnabled bool, ) Sender { - s := newSender(logger, service, targetManager, retryDuration) + s := newSender(logger, service, targetManager, retryDuration, concurrencyEnabled) if workerPool == nil { return s } diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go index 6d63e3c4ff..ce575c213f 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go @@ -113,6 +113,7 @@ func setupPusher(t *testing.T, workerPool WorkerPool, wg *sync.WaitGroup) *Pushe time.Second, time.Minute, wg, + 1, // concurrency ) assert.NotNil(t, pusher) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go index b5fc04d02e..dab9865131 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go @@ -712,7 +712,7 @@ func testPreparationWithLogger( ) (*queue, Sender) { t.Helper() tm := NewTargetManager(logger, service) - s := newSender(logger, service, tm, retryDuration) + s := newSender(logger, service, tm, retryDuration, false) q := newQueue( logger, Target{"G", "S", util.StandardLogGroupClass, retention}, diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go b/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go index 3df074a0fb..31a3b8be29 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/sender.go @@ -30,12 +30,13 @@ type Sender interface { } type sender struct { - service cloudWatchLogsService - retryDuration atomic.Value - targetManager TargetManager - logger telegraf.Logger - stopCh chan struct{} - stopped bool + service cloudWatchLogsService + retryDuration atomic.Value + targetManager TargetManager + logger telegraf.Logger + stopCh chan struct{} + stopped bool + concurrencyEnabled bool } var _ (Sender) = (*sender)(nil) @@ -45,13 +46,15 @@ func newSender( service cloudWatchLogsService, targetManager TargetManager, retryDuration time.Duration, + concurrencyEnabled bool, ) Sender { s := &sender{ - logger: logger, - service: service, - targetManager: targetManager, - stopCh: make(chan struct{}), - stopped: false, + logger: logger, + service: service, + targetManager: targetManager, + stopCh: make(chan struct{}), + stopped: false, + concurrencyEnabled: concurrencyEnabled, } s.retryDuration.Store(retryDuration) return s @@ -121,7 +124,13 @@ func (s *sender) Send(batch *logEventBatch) { return } - // Calculate wait time until next retry + // If concurrency enabled, notify failure (will handle RetryHeap push) and return + // Otherwise, continue with existing busy-wait retry behavior + if s.isConcurrencyEnabled() { + batch.fail() + } + + // Calculate wait time until next retry (synchronous mode) wait := time.Until(batch.nextRetryTime) if wait < 0 { wait = 0 @@ -156,3 +165,8 @@ func (s *sender) SetRetryDuration(retryDuration time.Duration) { func (s *sender) RetryDuration() time.Duration { return s.retryDuration.Load().(time.Duration) } + +// isConcurrencyEnabled returns whether concurrency mode is enabled for this sender. +func (s *sender) isConcurrencyEnabled() bool { + return s.concurrencyEnabled +} diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go index 3b469350ef..450e63006a 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go @@ -15,6 +15,7 @@ import ( "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" "github.com/aws/amazon-cloudwatch-agent/tool/testutil" + "github.com/aws/amazon-cloudwatch-agent/tool/util" ) type mockLogsService struct { @@ -80,7 +81,7 @@ func TestSender(t *testing.T) { mockManager := new(mockTargetManager) mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second) + s := newSender(logger, mockService, mockManager, time.Second, false) s.Send(batch) s.Stop() @@ -103,7 +104,7 @@ func TestSender(t *testing.T) { mockManager := new(mockTargetManager) mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{RejectedLogEventsInfo: rejectedInfo}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second) + s := newSender(logger, mockService, mockManager, time.Second, false) s.Send(batch) s.Stop() @@ -122,7 +123,7 @@ func TestSender(t *testing.T) { mockManager.On("InitTarget", mock.Anything).Return(nil).Once() mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second) + s := newSender(logger, mockService, mockManager, time.Second, false) s.Send(batch) s.Stop() @@ -149,7 +150,7 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.InvalidParameterException{}).Once() - s := newSender(logger, mockService, mockManager, time.Second) + s := newSender(logger, mockService, mockManager, time.Second, false) s.Send(batch) s.Stop() @@ -177,7 +178,7 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.DataAlreadyAcceptedException{}).Once() - s := newSender(logger, mockService, mockManager, time.Second) + s := newSender(logger, mockService, mockManager, time.Second, false) s.Send(batch) s.Stop() @@ -205,7 +206,7 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, errors.New("test")).Once() - s := newSender(logger, mockService, mockManager, time.Second) + s := newSender(logger, mockService, mockManager, time.Second, false) s.Send(batch) s.Stop() @@ -225,7 +226,7 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once() - s := newSender(logger, mockService, mockManager, time.Second) + s := newSender(logger, mockService, mockManager, time.Second, false) s.Send(batch) s.Stop() @@ -251,7 +252,7 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, awserr.New("SomeAWSError", "Some AWS error", nil)).Once() - s := newSender(logger, mockService, mockManager, 100*time.Millisecond) + s := newSender(logger, mockService, mockManager, 100*time.Millisecond, false) s.Send(batch) s.Stop() @@ -279,7 +280,7 @@ func TestSender(t *testing.T) { mockService.On("PutLogEvents", mock.Anything). Return(&cloudwatchlogs.PutLogEventsOutput{}, awserr.New("SomeAWSError", "Some AWS error", nil)).Once() - s := newSender(logger, mockService, mockManager, time.Second) + s := newSender(logger, mockService, mockManager, time.Second, false) go func() { time.Sleep(50 * time.Millisecond) @@ -292,4 +293,37 @@ func TestSender(t *testing.T) { assert.True(t, stateCallbackCalled, "State callback was not called when stop was requested") assert.False(t, doneCallbackCalled, "Done callback should not be called when stop was requested") }) + + t.Run("ConcurrencyEnabled/CallsFailCallback", func(t *testing.T) { + logger := testutil.NewNopLogger() + batch := newLogEventBatch(Target{"G", "S", util.StandardLogGroupClass, -1}, nil) + batch.append(newLogEvent(time.Now(), "Test message", nil)) + + // Initialize batch for retry logic + batch.initializeStartTime() + + mockService := new(mockLogsService) + mockManager := new(mockTargetManager) + mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.ServiceUnavailableException{}).Once() + + // Enable concurrency with 1 hour retry duration + s := newSender(logger, mockService, mockManager, time.Hour, true) + + // Track if fail callback was called + failCalled := false + batch.addFailCallback(func() { + failCalled = true + }) + + go func() { + time.Sleep(50 * time.Millisecond) + s.Stop() + }() + + s.Send(batch) + + // Should call fail callback when concurrency is enabled + assert.True(t, failCalled, "fail callback should be called when concurrency is enabled") + mockService.AssertExpectations(t) + }) }