Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/outputs/cloudwatchlogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest {
}
c.targetManager = pusher.NewTargetManager(c.Log, client)
})
p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, &c.pusherWaitGroup)
p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, &c.pusherWaitGroup, c.Concurrency)
cwd := &cwDest{
pusher: p,
retryer: logThrottleRetryer,
Expand Down
20 changes: 19 additions & 1 deletion plugins/outputs/cloudwatchlogs/internal/pusher/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ type logEventBatch struct {
doneCallbacks []func()
// Callbacks specifically for updating state
stateCallbacks []func()
batchers map[string]*state.RangeQueueBatcher
// Callbacks to execute when batch fails (for circuit breaker notification)
failCallbacks []func()
batchers map[string]*state.RangeQueueBatcher

// Retry metadata
retryCountShort int // Number of retries using short delay strategy
Expand Down Expand Up @@ -182,6 +184,13 @@ func (b *logEventBatch) addStateCallback(callback func()) {
}
}

// addFailCallback adds the callback to the end of the registered fail callbacks.
func (b *logEventBatch) addFailCallback(callback func()) {
if callback != nil {
b.failCallbacks = append(b.failCallbacks, callback)
}
}

// done runs all registered callbacks, including both success callbacks and state callbacks.
func (b *logEventBatch) done() {
b.updateState()
Expand All @@ -203,6 +212,15 @@ func (b *logEventBatch) updateState() {
}
}

// fail runs fail callbacks to notify upstream components of batch failure.
// This is used for circuit breaker notification when a batch fails.
func (b *logEventBatch) fail() {
for i := len(b.failCallbacks) - 1; i >= 0; i-- {
callback := b.failCallbacks[i]
callback()
}
}

// build creates a cloudwatchlogs.PutLogEventsInput from the batch. The log events in the batch must be in
// chronological order by their timestamp.
func (b *logEventBatch) build() *cloudwatchlogs.PutLogEventsInput {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestSenderPool(t *testing.T) {
logger := testutil.NewNopLogger()
mockService := new(mockLogsService)
mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil)
s := newSender(logger, mockService, nil, time.Second)
s := newSender(logger, mockService, nil, time.Second, false)
p := NewWorkerPool(12)
sp := newSenderPool(p, s)

Expand Down
7 changes: 5 additions & 2 deletions plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ func NewPusher(
flushTimeout time.Duration,
retryDuration time.Duration,
wg *sync.WaitGroup,
concurrency int,
) *Pusher {
s := createSender(logger, service, targetManager, workerPool, retryDuration)
concurrencyEnabled := concurrency > 1
s := createSender(logger, service, targetManager, workerPool, retryDuration, concurrencyEnabled)
q := newQueue(logger, target, flushTimeout, entityProvider, s, wg)
targetManager.PutRetentionPolicy(target)
return &Pusher{
Expand All @@ -60,8 +62,9 @@ func createSender(
targetManager TargetManager,
workerPool WorkerPool,
retryDuration time.Duration,
concurrencyEnabled bool,
) Sender {
s := newSender(logger, service, targetManager, retryDuration)
s := newSender(logger, service, targetManager, retryDuration, concurrencyEnabled)
if workerPool == nil {
return s
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func setupPusher(t *testing.T, workerPool WorkerPool, wg *sync.WaitGroup) *Pushe
time.Second,
time.Minute,
wg,
1, // concurrency
)

assert.NotNil(t, pusher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func testPreparationWithLogger(
) (*queue, Sender) {
t.Helper()
tm := NewTargetManager(logger, service)
s := newSender(logger, service, tm, retryDuration)
s := newSender(logger, service, tm, retryDuration, false)
q := newQueue(
logger,
Target{"G", "S", util.StandardLogGroupClass, retention},
Expand Down
38 changes: 26 additions & 12 deletions plugins/outputs/cloudwatchlogs/internal/pusher/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ type Sender interface {
}

type sender struct {
service cloudWatchLogsService
retryDuration atomic.Value
targetManager TargetManager
logger telegraf.Logger
stopCh chan struct{}
stopped bool
service cloudWatchLogsService
retryDuration atomic.Value
targetManager TargetManager
logger telegraf.Logger
stopCh chan struct{}
stopped bool
concurrencyEnabled bool
}

var _ (Sender) = (*sender)(nil)
Expand All @@ -45,13 +46,15 @@ func newSender(
service cloudWatchLogsService,
targetManager TargetManager,
retryDuration time.Duration,
concurrencyEnabled bool,
) Sender {
s := &sender{
logger: logger,
service: service,
targetManager: targetManager,
stopCh: make(chan struct{}),
stopped: false,
logger: logger,
service: service,
targetManager: targetManager,
stopCh: make(chan struct{}),
stopped: false,
concurrencyEnabled: concurrencyEnabled,
}
s.retryDuration.Store(retryDuration)
return s
Expand Down Expand Up @@ -121,7 +124,13 @@ func (s *sender) Send(batch *logEventBatch) {
return
}

// Calculate wait time until next retry
// If concurrency enabled, notify failure (will handle RetryHeap push) and return
// Otherwise, continue with existing busy-wait retry behavior
if s.isConcurrencyEnabled() {
batch.fail()
}

// Calculate wait time until next retry (synchronous mode)
wait := time.Until(batch.nextRetryTime)
if wait < 0 {
wait = 0
Expand Down Expand Up @@ -156,3 +165,8 @@ func (s *sender) SetRetryDuration(retryDuration time.Duration) {
func (s *sender) RetryDuration() time.Duration {
return s.retryDuration.Load().(time.Duration)
}

// isConcurrencyEnabled returns whether concurrency mode is enabled for this sender.
func (s *sender) isConcurrencyEnabled() bool {
return s.concurrencyEnabled
}
52 changes: 43 additions & 9 deletions plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
"github.com/aws/amazon-cloudwatch-agent/tool/testutil"
"github.com/aws/amazon-cloudwatch-agent/tool/util"
)

type mockLogsService struct {
Expand Down Expand Up @@ -80,7 +81,7 @@ func TestSender(t *testing.T) {
mockManager := new(mockTargetManager)
mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once()

s := newSender(logger, mockService, mockManager, time.Second)
s := newSender(logger, mockService, mockManager, time.Second, false)
s.Send(batch)
s.Stop()

Expand All @@ -103,7 +104,7 @@ func TestSender(t *testing.T) {
mockManager := new(mockTargetManager)
mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{RejectedLogEventsInfo: rejectedInfo}, nil).Once()

s := newSender(logger, mockService, mockManager, time.Second)
s := newSender(logger, mockService, mockManager, time.Second, false)
s.Send(batch)
s.Stop()

Expand All @@ -122,7 +123,7 @@ func TestSender(t *testing.T) {
mockManager.On("InitTarget", mock.Anything).Return(nil).Once()
mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once()

s := newSender(logger, mockService, mockManager, time.Second)
s := newSender(logger, mockService, mockManager, time.Second, false)
s.Send(batch)
s.Stop()

Expand All @@ -149,7 +150,7 @@ func TestSender(t *testing.T) {
mockService.On("PutLogEvents", mock.Anything).
Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.InvalidParameterException{}).Once()

s := newSender(logger, mockService, mockManager, time.Second)
s := newSender(logger, mockService, mockManager, time.Second, false)
s.Send(batch)
s.Stop()

Expand Down Expand Up @@ -177,7 +178,7 @@ func TestSender(t *testing.T) {
mockService.On("PutLogEvents", mock.Anything).
Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.DataAlreadyAcceptedException{}).Once()

s := newSender(logger, mockService, mockManager, time.Second)
s := newSender(logger, mockService, mockManager, time.Second, false)
s.Send(batch)
s.Stop()

Expand Down Expand Up @@ -205,7 +206,7 @@ func TestSender(t *testing.T) {
mockService.On("PutLogEvents", mock.Anything).
Return(&cloudwatchlogs.PutLogEventsOutput{}, errors.New("test")).Once()

s := newSender(logger, mockService, mockManager, time.Second)
s := newSender(logger, mockService, mockManager, time.Second, false)
s.Send(batch)
s.Stop()

Expand All @@ -225,7 +226,7 @@ func TestSender(t *testing.T) {
mockService.On("PutLogEvents", mock.Anything).
Return(&cloudwatchlogs.PutLogEventsOutput{}, nil).Once()

s := newSender(logger, mockService, mockManager, time.Second)
s := newSender(logger, mockService, mockManager, time.Second, false)
s.Send(batch)
s.Stop()

Expand All @@ -251,7 +252,7 @@ func TestSender(t *testing.T) {
mockService.On("PutLogEvents", mock.Anything).
Return(&cloudwatchlogs.PutLogEventsOutput{}, awserr.New("SomeAWSError", "Some AWS error", nil)).Once()

s := newSender(logger, mockService, mockManager, 100*time.Millisecond)
s := newSender(logger, mockService, mockManager, 100*time.Millisecond, false)
s.Send(batch)
s.Stop()

Expand Down Expand Up @@ -279,7 +280,7 @@ func TestSender(t *testing.T) {
mockService.On("PutLogEvents", mock.Anything).
Return(&cloudwatchlogs.PutLogEventsOutput{}, awserr.New("SomeAWSError", "Some AWS error", nil)).Once()

s := newSender(logger, mockService, mockManager, time.Second)
s := newSender(logger, mockService, mockManager, time.Second, false)

go func() {
time.Sleep(50 * time.Millisecond)
Expand All @@ -292,4 +293,37 @@ func TestSender(t *testing.T) {
assert.True(t, stateCallbackCalled, "State callback was not called when stop was requested")
assert.False(t, doneCallbackCalled, "Done callback should not be called when stop was requested")
})

t.Run("ConcurrencyEnabled/CallsFailCallback", func(t *testing.T) {
logger := testutil.NewNopLogger()
batch := newLogEventBatch(Target{"G", "S", util.StandardLogGroupClass, -1}, nil)
batch.append(newLogEvent(time.Now(), "Test message", nil))

// Initialize batch for retry logic
batch.initializeStartTime()

mockService := new(mockLogsService)
mockManager := new(mockTargetManager)
mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, &cloudwatchlogs.ServiceUnavailableException{}).Once()

// Enable concurrency with 1 hour retry duration
s := newSender(logger, mockService, mockManager, time.Hour, true)

// Track if fail callback was called
failCalled := false
batch.addFailCallback(func() {
failCalled = true
})

go func() {
time.Sleep(50 * time.Millisecond)
s.Stop()
}()

s.Send(batch)

// Should call fail callback when concurrency is enabled
assert.True(t, failCalled, "fail callback should be called when concurrency is enabled")
mockService.AssertExpectations(t)
})
}