Skip to content

Conversation

@agarakan
Copy link
Contributor

@agarakan agarakan commented Dec 30, 2025

Description of the issue

Instantiate RetryHeap and RetryHeapProcessor if concurrency enabled, and push retryable batches to RetryHeap from Sender

Description of changes

Instantiates RetryHeap and RetryHeapProcessor at plugin level if concurrency is enabled
This will ensure a single RetryHeap and Processor across all Targets.
The RetryHeapProcessor should not take reference to a SenderPool dedicated to a given pusher, so it instead creates its own instance in order to push ready batches to the SenderQueue (tasks() channel)
Sender will push batch to retryheap on failure after recalculating retry metadata.

License

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Tests

All tests are passing

make lint
make fmt
make fmt-sh
make test

Requirements

Before commiting your code, please do the following steps.

  1. Run make fmt and make fmt-sh
  2. Run make lint

Integration Tests

To run integration tests against this PR, add the ready for testing label.

@agarakan agarakan requested a review from a team as a code owner December 30, 2025 23:24
sky333999
sky333999 previously approved these changes Dec 31, 2025
// Create processor's own sender and senderPool
// Note: Pass nil for retryHeap to prevent infinite retry loops -
// batches from RetryHeap that fail again use synchronous retry behavior
sender := newSender(logger, service, targetManager, maxRetryDuration, true, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the last arg here i.e. the retryHeap is set to nil, doesnt that defeat the entire purpose? Since this sender that is for the retry batches will now follow the old sync flow and block thus eventually causing all workers to block?

I was expecting the batches pop'ed from the retryHeap to also go back again onto the retryHeap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this separately, we will need to define the sender to invoke sender.send().

@sky333999 sky333999 dismissed their stale review December 31, 2025 23:41

Pending clarification on sender used in RetryHeapProcessor.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants