Skip to content

Commit 44c3aca

Browse files
committed
Add automatic publisher confirmation tracking with throttling
Traditional publisher confirms in the Java client require manual tracking of sequence numbers and correlation of Basic.Return messages. This makes per-message error handling complex and provides no built-in async pattern or backpressure mechanism. This change introduces automatic publisher confirmation tracking with a `CompletableFuture`-based API and progressive throttling, following the design of the .NET client's publisher confirmation implementation. When enabled via `ChannelOptions`, the library automatically tracks each message's confirmation status and completes futures when the broker sends Basic.Ack, Basic.Nack, or Basic.Return. New API methods: - `Channel.basicPublishAsync()` - Returns `CompletableFuture<Void>` that completes when broker confirms the message - `Connection.createChannel(ChannelOptions)` - Creates channel with publisher confirmation tracking enabled New classes: - `ChannelOptions` - Configuration for channel creation with tracking settings and optional `RateLimiter` for backpressure control - `PublishException` - Exception thrown when message is nack'd or returned, with full details including sequence number and routing information - `RateLimiter` - Interface for custom rate limiting strategies - `ThrottlingRateLimiter` - Progressive throttling implementation that applies delays proportional to capacity usage (0-1000ms) when available permits fall below a configurable threshold (default 50%) The implementation adds a sequence number header (`x-java-pub-seq-no`) to each tracked message, allowing correlation of Basic.Return responses with specific messages. The `ThrottlingRateLimiter` uses `Semaphore` for concurrency control and calculates delay as `percentageUsed * 1000ms`, matching the .NET client's algorithm exactly. Passing `null` for the rate limiter disables backpressure for unlimited outstanding confirmations. The feature is opt-in and maintains full backward compatibility. Existing `basicPublish()` and `waitForConfirms()` methods remain unchanged. Tests include 9 unit tests for `ThrottlingRateLimiter` and 23 integration tests for publisher confirmation tracking with various rate limiting scenarios.
1 parent 285b142 commit 44c3aca

18 files changed

+2220
-14
lines changed

RUNNING_TESTS.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,26 @@ top-level directory of the source tree:
8888
-Dit.test=DeadLetterExchange
8989
```
9090

91+
* To run a single test class:
92+
93+
```
94+
./mvnw verify -Dit.test=Confirm
95+
```
96+
97+
* To run a specific test method within a test class:
98+
99+
```
100+
./mvnw verify -Dit.test=Confirm#testBasicPublishAsync
101+
```
102+
91103
Test reports can be found in `target/failsafe-reports`.
92104

93105
## Running Against a Broker in a Docker Container
94106

95107
Run the broker:
96108

97109
```
98-
docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq:3.8
110+
docker run --pull always --rm --tty --interactive --name rabbitmq --publish 5672:5672 rabbitmq:latest
99111
```
100112

101113
Launch the tests:

0 commit comments

Comments
 (0)