Skip to content
Merged
40 changes: 31 additions & 9 deletions plugins/outputs/cloudwatchlogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,16 @@ type CloudWatchLogs struct {

Log telegraf.Logger `toml:"-"`

pusherWaitGroup sync.WaitGroup
cwDests sync.Map
workerPool pusher.WorkerPool
targetManager pusher.TargetManager
once sync.Once
middleware awsmiddleware.Middleware
configurer *awsmiddleware.Configurer
configurerOnce sync.Once
pusherWaitGroup sync.WaitGroup
cwDests sync.Map
workerPool pusher.WorkerPool
retryHeap pusher.RetryHeap
retryHeapProcessor *pusher.RetryHeapProcessor
targetManager pusher.TargetManager
once sync.Once
middleware awsmiddleware.Middleware
configurer *awsmiddleware.Configurer
configurerOnce sync.Once
}

var _ logs.LogBackend = (*CloudWatchLogs)(nil)
Expand All @@ -87,6 +89,16 @@ func (c *CloudWatchLogs) Connect() error {
}

func (c *CloudWatchLogs) Close() error {
// Stop components in specific order to prevent race conditions:
// 1. RetryHeap - stop accepting new batches first
// 2. Pushers - stop all active pushers (queues/senders)
// 3. Wait for pushers to complete
// 4. RetryHeapProcessor - stop retry processing and wait for WorkerPool usage to complete
// 5. WorkerPool - finally stop the worker threads

if c.retryHeap != nil {
Comment thread
agarakan marked this conversation as resolved.
c.retryHeap.Stop()
}

c.cwDests.Range(func(_, value interface{}) bool {
if d, ok := value.(*cwDest); ok {
Expand All @@ -97,6 +109,10 @@ func (c *CloudWatchLogs) Close() error {

c.pusherWaitGroup.Wait()

if c.retryHeapProcessor != nil {
c.retryHeapProcessor.Stop()
Comment thread
agarakan marked this conversation as resolved.
}

if c.workerPool != nil {
c.workerPool.Stop()
}
Expand Down Expand Up @@ -150,10 +166,16 @@ func (c *CloudWatchLogs) getDest(t pusher.Target, logSrc logs.LogSrc) *cwDest {
c.once.Do(func() {
if c.Concurrency > 1 {
c.workerPool = pusher.NewWorkerPool(c.Concurrency)
c.retryHeap = pusher.NewRetryHeap(c.Concurrency, c.Log)

retryHeapProcessorRetryer := retryer.NewLogThrottleRetryer(c.Log)
Comment thread
agarakan marked this conversation as resolved.
retryHeapProcessorClient := c.createClient(retryHeapProcessorRetryer)
c.retryHeapProcessor = pusher.NewRetryHeapProcessor(c.retryHeap, c.workerPool, retryHeapProcessorClient, c.targetManager, c.Log, maxRetryTimeout, retryHeapProcessorRetryer)
c.retryHeapProcessor.Start()
}
c.targetManager = pusher.NewTargetManager(c.Log, client)
})
p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, &c.pusherWaitGroup, c.Concurrency)
p := pusher.NewPusher(c.Log, t, client, c.targetManager, logSrc, c.workerPool, c.ForceFlushInterval.Duration, maxRetryTimeout, &c.pusherWaitGroup, c.retryHeap)
cwd := &cwDest{
pusher: p,
retryer: logThrottleRetryer,
Expand Down
37 changes: 37 additions & 0 deletions plugins/outputs/cloudwatchlogs/cloudwatchlogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/aws/amazon-cloudwatch-agent/logs"
Expand Down Expand Up @@ -100,3 +101,39 @@ func TestDuplicateDestination(t *testing.T) {
// Then the destination for cloudwatchlogs endpoint would be the same
require.Equal(t, d1, d2)
}

func TestRetryHeapCreation(t *testing.T) {
t.Run("ConcurrencyEnabled", func(t *testing.T) {
c := &CloudWatchLogs{
Log: testutil.Logger{Name: "test"},
AccessKey: "access_key",
SecretKey: "secret_key",
Concurrency: 2, // > 1 enables concurrency
cwDests: sync.Map{},
}

c.CreateDest("FILENAME", "", -1, util.StandardLogGroupClass, nil)

// Should create RetryHeap and processor
assert.NotNil(t, c.retryHeap)
assert.NotNil(t, c.retryHeapProcessor)
assert.NotNil(t, c.workerPool)
})

t.Run("ConcurrencyDisabled", func(t *testing.T) {
c := &CloudWatchLogs{
Log: testutil.Logger{Name: "test"},
AccessKey: "access_key",
SecretKey: "secret_key",
Concurrency: 1, // <= 1 disables concurrency
cwDests: sync.Map{},
}

c.CreateDest("FILENAME", "", -1, util.StandardLogGroupClass, nil)

// Should not create RetryHeap and processor
assert.Nil(t, c.retryHeap)
assert.Nil(t, c.retryHeapProcessor)
assert.Nil(t, c.workerPool)
})
}
22 changes: 21 additions & 1 deletion plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestSenderPool(t *testing.T) {
logger := testutil.NewNopLogger()
mockService := new(mockLogsService)
mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil)
s := newSender(logger, mockService, nil, time.Second, false)
s := newSender(logger, mockService, nil, time.Second, nil)
p := NewWorkerPool(12)
sp := newSenderPool(p, s)

Expand All @@ -134,3 +134,23 @@ func TestSenderPool(t *testing.T) {
s.Stop()
assert.Equal(t, int32(200), completed.Load())
}

func TestSenderPoolRetryHeap(t *testing.T) {
assert.NotPanics(t, func() {
logger := testutil.NewNopLogger()
mockService := new(mockLogsService)
mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil)

// Create RetryHeap
retryHeap := NewRetryHeap(10, logger)
defer retryHeap.Stop()

s := newSender(logger, mockService, nil, time.Second, retryHeap)
p := NewWorkerPool(12)
defer p.Stop()

sp := newSenderPool(p, s)

sp.Stop()
})
}
10 changes: 5 additions & 5 deletions plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ func NewPusher(
flushTimeout time.Duration,
retryDuration time.Duration,
wg *sync.WaitGroup,
concurrency int,
retryHeap RetryHeap,
) *Pusher {
concurrencyEnabled := concurrency > 1
s := createSender(logger, service, targetManager, workerPool, retryDuration, concurrencyEnabled)
s := createSender(logger, service, targetManager, workerPool, retryDuration, retryHeap)

q := newQueue(logger, target, flushTimeout, entityProvider, s, wg)
targetManager.PutRetentionPolicy(target)
return &Pusher{
Expand All @@ -62,9 +62,9 @@ func createSender(
targetManager TargetManager,
workerPool WorkerPool,
retryDuration time.Duration,
concurrencyEnabled bool,
retryHeap RetryHeap,
) Sender {
s := newSender(logger, service, targetManager, retryDuration, concurrencyEnabled)
s := newSender(logger, service, targetManager, retryDuration, retryHeap)
if workerPool == nil {
return s
}
Expand Down
38 changes: 37 additions & 1 deletion plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func setupPusher(t *testing.T, workerPool WorkerPool, wg *sync.WaitGroup) *Pushe
time.Second,
time.Minute,
wg,
1, // concurrency
nil, // retryHeap
)

assert.NotNil(t, pusher)
Expand All @@ -125,3 +125,39 @@ func setupPusher(t *testing.T, workerPool WorkerPool, wg *sync.WaitGroup) *Pushe
mockManager.AssertCalled(t, "PutRetentionPolicy", target)
return pusher
}

func TestPusherRetryHeap(t *testing.T) {
logger := testutil.NewNopLogger()
target := Target{Group: "G", Stream: "S"}
service := &stubLogsService{}
mockManager := new(mockTargetManager)
mockManager.On("PutRetentionPolicy", target).Return()

workerPool := NewWorkerPool(2)
defer workerPool.Stop()

retryHeap := NewRetryHeap(10, logger)
defer retryHeap.Stop()

var wg sync.WaitGroup
pusher := NewPusher(
logger,
target,
service,
mockManager,
nil,
workerPool,
time.Second,
time.Minute,
&wg,
retryHeap,
)

assert.NotNil(t, pusher)
assert.Equal(t, target, pusher.Target)

// Verify pusher has retryHeap when concurrency enabled
// (RetryHeap is now passed to the underlying sender, not senderPool)

mockManager.AssertCalled(t, "PutRetentionPolicy", target)
}
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func testPreparationWithLogger(
) (*queue, Sender) {
t.Helper()
tm := NewTargetManager(logger, service)
s := newSender(logger, service, tm, retryDuration, false)
s := newSender(logger, service, tm, retryDuration, nil)
q := newQueue(
logger,
Target{"G", "S", util.StandardLogGroupClass, retention},
Expand Down
40 changes: 33 additions & 7 deletions plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ package pusher

import (
"container/heap"
"errors"
"sync"
"time"

"github.com/influxdata/telegraf"

"github.com/aws/amazon-cloudwatch-agent/internal/retryer"
)

// retryHeapImpl implements heap.Interface for logEventBatch sorted by nextRetryTime
Expand Down Expand Up @@ -40,7 +41,7 @@ func (h *retryHeapImpl) Pop() interface{} {

// RetryHeap manages failed batches during their retry wait periods
type RetryHeap interface {
Push(batch *logEventBatch) error
Push(batch *logEventBatch)
PopReady() []*logEventBatch
Size() int
Stop()
Expand All @@ -52,34 +53,38 @@ type retryHeap struct {
semaphore chan struct{} // Size enforcer
stopCh chan struct{}
maxSize int
stopped bool
logger telegraf.Logger
}

var _ RetryHeap = (*retryHeap)(nil)

// NewRetryHeap creates a new retry heap with the specified maximum size
func NewRetryHeap(maxSize int) RetryHeap {
func NewRetryHeap(maxSize int, logger telegraf.Logger) RetryHeap {
rh := &retryHeap{
heap: make(retryHeapImpl, 0, maxSize),
maxSize: maxSize,
semaphore: make(chan struct{}, maxSize), // Semaphore for size enforcement
stopCh: make(chan struct{}),
logger: logger,
}
heap.Init(&rh.heap)
return rh
}

// Push adds a batch to the heap, blocking if full
func (rh *retryHeap) Push(batch *logEventBatch) error {
func (rh *retryHeap) Push(batch *logEventBatch) {
// Acquire semaphore slot (blocks if at maxSize capacity)
select {
case rh.semaphore <- struct{}{}:
// add batch to heap with mutex protection
rh.mutex.Lock()
heap.Push(&rh.heap, batch)
rh.mutex.Unlock()
return nil
case <-rh.stopCh:
return errors.New("retry heap stopped")
// RetryHeap is stopped, drop the batch
rh.logger.Errorf("Stop requested for %v/%v failed for PutLogEvents, request dropped.", batch.Group, batch.Stream)
batch.updateState()
Comment thread
agarakan marked this conversation as resolved.
}
}

Expand Down Expand Up @@ -111,24 +116,36 @@ func (rh *retryHeap) Size() int {

// Stop stops the retry heap
func (rh *retryHeap) Stop() {
if rh.stopped {
return
}
close(rh.stopCh)
rh.stopped = true
}

// RetryHeapProcessor manages the retry heap and moves ready batches back to sender queue
type RetryHeapProcessor struct {
retryHeap RetryHeap
senderPool Sender
retryer *retryer.LogThrottleRetryer
stopCh chan struct{}
logger telegraf.Logger
stopped bool
maxRetryDuration time.Duration
wg sync.WaitGroup
}

// NewRetryHeapProcessor creates a new retry heap processor
func NewRetryHeapProcessor(retryHeap RetryHeap, senderPool Sender, logger telegraf.Logger, maxRetryDuration time.Duration) *RetryHeapProcessor {
func NewRetryHeapProcessor(retryHeap RetryHeap, workerPool WorkerPool, service cloudWatchLogsService, targetManager TargetManager, logger telegraf.Logger, maxRetryDuration time.Duration, retryer *retryer.LogThrottleRetryer) *RetryHeapProcessor {
// Create processor's own sender and senderPool
// Pass retryHeap so failed batches go back to RetryHeap instead of blocking on sync retry
sender := newSender(logger, service, targetManager, maxRetryDuration, retryHeap)
senderPool := newSenderPool(workerPool, sender)
Comment on lines +140 to +143
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could this be done outside of the RetryHeapProcessor constructor? Could just pass in a sender and have cloudwatchlogs.go create it. That way the constructor doesn't need to know about the worker pool, service, target manager, or logger. Could even export the createSender function in pusher.go and have cloudwatchlogs.go call that.

Copy link
Copy Markdown
Contributor Author

@agarakan agarakan Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will come back to this in a separate PR since it doesnt affect functionality but does require refactoring the retryHeapProcessor constructor. Currently it seems like per-component dependencies are all created within the component that uses them.


return &RetryHeapProcessor{
retryHeap: retryHeap,
senderPool: senderPool,
retryer: retryer,
stopCh: make(chan struct{}),
logger: logger,
stopped: false,
Expand All @@ -138,6 +155,7 @@ func NewRetryHeapProcessor(retryHeap RetryHeap, senderPool Sender, logger telegr

// Start begins processing the retry heap every 100ms
func (p *RetryHeapProcessor) Start() {
p.wg.Add(1)
go p.processLoop()
}

Expand All @@ -150,12 +168,16 @@ func (p *RetryHeapProcessor) Stop() {
// Process any remaining batches before stopping
p.processReadyMessages()

p.retryer.Stop()
p.senderPool.Stop()
close(p.stopCh)
p.wg.Wait()
p.stopped = true
Comment thread
agarakan marked this conversation as resolved.
}

// processLoop runs the main processing loop
func (p *RetryHeapProcessor) processLoop() {
defer p.wg.Done()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

Expand All @@ -171,6 +193,10 @@ func (p *RetryHeapProcessor) processLoop() {

// processReadyMessages checks the heap for ready batches and moves them back to sender queue
func (p *RetryHeapProcessor) processReadyMessages() {
if p.stopped {
return
}

readyBatches := p.retryHeap.PopReady()

for _, batch := range readyBatches {
Expand Down
Loading