Skip to content

Commit c64f7ba

Browse files
committed
Add automatic publisher confirmation tracking with throttling and context
Traditional publisher confirms in the Java client require manual tracking of sequence numbers and correlation of Basic.Return messages. This makes per-message error handling complex and provides no built-in async pattern, backpressure mechanism, or message correlation support. This change introduces automatic publisher confirmation tracking with a `CompletableFuture`-based API, progressive throttling, and generic context parameter for message correlation, following the design of the .NET client's publisher confirmation implementation. When enabled via `ChannelOptions`, the library automatically tracks each message's confirmation status and completes futures when the broker sends Basic.Ack, Basic.Nack, or Basic.Return. New API methods: - `Channel.basicPublishAsync()` - Returns `CompletableFuture<T>` that completes with user-provided context when broker confirms the message. The generic context parameter (such as a correlation ID) eliminates the need for separate tracking structures. - `Connection.createChannel(ChannelOptions)` - Creates channel with publisher confirmation tracking enabled New classes: - `ChannelOptions` - Configuration with builder pattern for channel creation with tracking settings and optional `RateLimiter` for backpressure control - `PublishException` - Exception thrown when message is nack'd or returned, with full details including sequence number, routing information, and user-provided context - `RateLimiter` - Interface for custom rate limiting strategies - `ThrottlingRateLimiter` - Progressive throttling implementation that applies delays proportional to capacity usage (0-1000ms) when available permits fall below a configurable threshold (default 50%) The implementation adds a sequence number header (`x-seq-no`) to each tracked message, allowing correlation of Basic.Return responses with specific messages. The `ThrottlingRateLimiter` uses `Semaphore` for concurrency control and calculates delay as `percentageUsed * 1000ms`, matching the .NET client's algorithm exactly. Passing `null` for the rate limiter disables backpressure for unlimited outstanding confirmations. Publisher confirmation state is stored in a single `ConcurrentHashMap` containing `ConfirmationEntry` objects that hold the future, rate limiter permit, and user context. The map is sized based on the rate limiter's capacity hint for optimal memory usage. The feature is opt-in and maintains full backward compatibility. Existing `basicPublish()` and `waitForConfirms()` methods remain unchanged. Tests include 9 unit tests for `ThrottlingRateLimiter` and 25 integration tests for publisher confirmation tracking with context parameter verification and various rate limiting scenarios.
1 parent 285b142 commit c64f7ba

21 files changed

+2845
-14
lines changed

RUNNING_TESTS.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,26 @@ top-level directory of the source tree:
8888
-Dit.test=DeadLetterExchange
8989
```
9090

91+
* To run a single test class:
92+
93+
```
94+
./mvnw verify -Dit.test=Confirm
95+
```
96+
97+
* To run a specific test method within a test class:
98+
99+
```
100+
./mvnw verify -Dit.test=Confirm#testBasicPublishAsync
101+
```
102+
91103
Test reports can be found in `target/failsafe-reports`.
92104

93105
## Running Against a Broker in a Docker Container
94106

95107
Run the broker:
96108

97109
```
98-
docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq:3.8
110+
docker run --pull always --rm --tty --interactive --name rabbitmq --publish 5672:5672 rabbitmq:latest
99111
```
100112

101113
Launch the tests:

doc/STATUS.md

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
# Publisher Confirmation Tracking - Current Status
2+
3+
**Last Updated:** 2025-12-09
4+
**Branch:** `lukebakken/publisher-confirm-tracking`
5+
6+
## ⚠️ Important Note
7+
8+
**This is a new feature with NO backward compatibility requirements.**
9+
10+
All APIs are being developed from scratch and can be changed freely without
11+
concern for breaking existing code. This allows us to design the cleanest,
12+
most intuitive API possible without legacy constraints.
13+
14+
## Completed Work
15+
16+
### Core Feature (2025-12-08)
17+
✅ Automatic publisher confirmation tracking with `CompletableFuture` API
18+
✅ Generic context parameter for message correlation
19+
`ChannelOptions` with builder pattern for configuration
20+
`PublishException` with context for error handling
21+
✅ Sequence number header correlation (`x-seq-no`)
22+
✅ All 25 Confirm integration tests passing
23+
24+
### Rate Limiting Enhancement (2025-12-08 to 2025-12-09)
25+
`RateLimiter` interface for extensibility
26+
`ThrottlingRateLimiter` matching .NET client algorithm
27+
✅ 9 unit tests (CI-friendly, behavior-focused)
28+
✅ Builder pattern for `ChannelOptions`
29+
✅ Full integration with permit tracking
30+
✅ Merged and pushed
31+
32+
### API Improvements (2025-12-09)
33+
✅ Generic context parameter in `basicPublishAsync()`
34+
✅ Context flows through `CompletableFuture<T>` for message correlation
35+
✅ Context available in `PublishException` for error handling
36+
✅ Eliminates need for separate tracking structures
37+
✅ Single `ConcurrentHashMap` for confirmations (futures + permits + context)
38+
✅ Map sized based on `RateLimiter.getMaxConcurrency()` hint
39+
✅ Sequence number header changed to `x-seq-no`
40+
41+
## Current State
42+
43+
**Files:**
44+
- `src/main/java/com/rabbitmq/client/RateLimiter.java` - Interface
45+
- `src/main/java/com/rabbitmq/client/ThrottlingRateLimiter.java` - Implementation
46+
- `src/main/java/com/rabbitmq/client/ChannelOptions.java` - Builder pattern
47+
- `src/main/java/com/rabbitmq/client/PublishException.java` - Exception with context
48+
- `src/main/java/com/rabbitmq/client/impl/ChannelN.java` - Integration
49+
- `src/test/java/com/rabbitmq/client/test/ThrottlingRateLimiterTest.java` - 9 tests
50+
- `src/test/java/com/rabbitmq/client/test/functional/Confirm.java` - 25 tests
51+
52+
**Test Results:**
53+
- ThrottlingRateLimiter: 9/9 passing
54+
- Confirm: 25/25 passing (20 original + 5 new)
55+
- Full suite: Not yet run
56+
57+
## Next Steps
58+
59+
### Immediate (In Progress)
60+
🚧 **PublisherConfirmationState refactoring**
61+
- Extract confirmation tracking logic from `ChannelN`
62+
- Improve code organization and testability
63+
- See [publisher-confirmation-state-refactoring.md](./publisher-confirmation-state-refactoring.md)
64+
65+
### .NET Client Port
66+
🚧 **Context parameter for .NET client**
67+
- Partial implementation complete with backward compatibility
68+
- Under review for storage strategy decision
69+
- See `rabbitmq-dotnet-client/doc/context-parameter-implementation.md`
70+
71+
### Pending
72+
- Documentation updates (`publisher-confirms-async.md`)
73+
- Full test suite validation (1088+ tests)
74+
- Recovery scenario testing
75+
- Performance benchmarking
76+
77+
## API Summary
78+
79+
```java
80+
// Create channel with throttling
81+
ThrottlingRateLimiter limiter = new ThrottlingRateLimiter(100, 50);
82+
ChannelOptions options = ChannelOptions.builder()
83+
.publisherConfirmations(true)
84+
.publisherConfirmationTracking(true)
85+
.rateLimiter(limiter)
86+
.build();
87+
Channel channel = connection.createChannel(options);
88+
89+
// Publish with context (such as correlation ID)
90+
String messageId = "msg-123";
91+
CompletableFuture<String> future = channel.basicPublishAsync(
92+
"", queueName, null, body, messageId
93+
);
94+
95+
// Handle result with context
96+
future.thenAccept(ctx -> System.out.println("Confirmed: " + ctx))
97+
.exceptionally(ex -> {
98+
if (ex.getCause() instanceof PublishException) {
99+
PublishException pe = (PublishException) ex.getCause();
100+
String msgId = (String) pe.getContext();
101+
System.err.println("Message " + msgId + " failed: " + pe.getMessage());
102+
}
103+
return null;
104+
});
105+
```
106+
107+
## Parity with .NET Client
108+
109+
| Feature | .NET | Java | Status |
110+
|---------|------|------|--------|
111+
| Async API | `Task` | `CompletableFuture` | ✅ Complete |
112+
| Rate Limiter Interface | `RateLimiter` | `RateLimiter` | ✅ Complete |
113+
| Throttling Implementation | `ThrottlingRateLimiter` | `ThrottlingRateLimiter` | ✅ Complete |
114+
| Algorithm | Progressive delay | Progressive delay | ✅ Identical |
115+
| Builder Pattern | Constructor with defaults | Builder | ✅ Complete |
116+
| Extensibility | Custom rate limiters | Custom rate limiters | ✅ Complete |
117+
118+
## Documentation
119+
120+
- [Publisher Confirms Async](./publisher-confirms-async.md) - Main feature documentation
121+
- [Throttling Rate Limiter](./throttling-rate-limiter.md) - Rate limiting details
122+
- [PublisherConfirmationState Refactoring](./publisher-confirmation-state-refactoring.md) - Next steps
123+
- [Implementation Summary](../../workplace/.../gh-1824/implementation-summary.md) - Complete history

doc/context-parameter.md

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
# Context Parameter for Message Correlation
2+
3+
**Date:** 2025-12-09
4+
**Status:** ✅ Complete
5+
6+
## Problem
7+
8+
When publishing messages asynchronously, applications need to track which message was confirmed or failed. Without built-in correlation, applications must maintain separate data structures:
9+
10+
```java
11+
// ❌ Without context - requires separate tracking
12+
Map<String, CompletableFuture<Void>> tracking = new ConcurrentHashMap<>();
13+
14+
String msgId = generateMessageId();
15+
CompletableFuture<Void> future = channel.basicPublishAsync(...);
16+
tracking.put(msgId, future);
17+
18+
future.whenComplete((v, ex) -> {
19+
tracking.remove(msgId); // Manual cleanup
20+
if (ex != null) {
21+
handleFailure(msgId); // Must capture msgId
22+
}
23+
});
24+
```
25+
26+
**Issues:**
27+
- Separate data structure needed
28+
- Manual correlation and cleanup
29+
- Closure captures can be error-prone
30+
- Race conditions if not careful
31+
32+
## Solution
33+
34+
Add a generic context parameter to `basicPublishAsync()` that flows through the `CompletableFuture`:
35+
36+
```java
37+
<T> CompletableFuture<T> basicPublishAsync(String exchange, String routingKey,
38+
BasicProperties props, byte[] body, T context);
39+
```
40+
41+
## Usage
42+
43+
### Success Case
44+
45+
```java
46+
String messageId = "order-12345";
47+
channel.basicPublishAsync("", queue, null, body, messageId)
48+
.thenAccept(ctx -> {
49+
System.out.println("Confirmed: " + ctx);
50+
// ctx is "order-12345"
51+
});
52+
```
53+
54+
### Error Case
55+
56+
```java
57+
String messageId = "order-12345";
58+
channel.basicPublishAsync("", queue, true, null, body, messageId)
59+
.exceptionally(ex -> {
60+
if (ex.getCause() instanceof PublishException) {
61+
PublishException pe = (PublishException) ex.getCause();
62+
String msgId = (String) pe.getContext();
63+
System.err.println("Message " + msgId + " failed");
64+
}
65+
return null;
66+
});
67+
```
68+
69+
### Batch Publishing Pattern
70+
71+
```java
72+
Map<String, CompletableFuture<String>> outstandingPublishes = new ConcurrentHashMap<>();
73+
74+
for (Message msg : messages) {
75+
String msgId = msg.getId();
76+
CompletableFuture<String> future = channel.basicPublishAsync(
77+
"", queue, null, msg.getBody(), msgId
78+
);
79+
80+
future.whenComplete((ctx, ex) -> {
81+
outstandingPublishes.remove(ctx);
82+
if (ex != null) {
83+
handleFailure(ctx, ex);
84+
} else {
85+
handleSuccess(ctx);
86+
}
87+
});
88+
89+
outstandingPublishes.put(msgId, future);
90+
}
91+
92+
// Wait for all
93+
CompletableFuture.allOf(outstandingPublishes.values().toArray(new CompletableFuture[0])).join();
94+
```
95+
96+
## Implementation Details
97+
98+
### API Changes
99+
100+
**Channel interface:**
101+
```java
102+
<T> CompletableFuture<T> basicPublishAsync(..., T context);
103+
```
104+
105+
**PublishException:**
106+
```java
107+
public class PublishException extends IOException {
108+
private final Object context;
109+
110+
public Object getContext() { return context; }
111+
}
112+
```
113+
114+
**ChannelN:**
115+
```java
116+
private static class ConfirmationEntry<T> {
117+
final CompletableFuture<T> future;
118+
final RateLimiter.Permit permit;
119+
final T context;
120+
}
121+
```
122+
123+
### Type Safety
124+
125+
The context type is preserved through the `CompletableFuture<T>`:
126+
- Compile-time type checking
127+
- No casting needed in success case
128+
- Casting needed in exception case (Java limitation)
129+
130+
### Null Context
131+
132+
Passing `null` is valid when correlation is not needed:
133+
```java
134+
channel.basicPublishAsync("", queue, null, body, null)
135+
.thenRun(() -> System.out.println("Confirmed"));
136+
```
137+
138+
## Benefits
139+
140+
**No separate tracking needed** - Context flows through the future
141+
**Type-safe** - Compiler enforces context type
142+
**Composable** - Works with all `CompletableFuture` operations
143+
**Available in errors** - Context accessible in `PublishException`
144+
**Flexible** - Can be correlation ID, message object, or any type
145+
146+
## Testing
147+
148+
**Tests added:**
149+
- `testBasicPublishAsync` - Verifies 100 messages with `Integer` context
150+
- `testMaxOutstandingConfirms` - Verifies context with throttling
151+
- `testBasicPublishAsyncWithThrottling` - Verifies 50 messages with `String` context
152+
- `testBasicPublishAsyncWithContext` - Demonstrates correlation ID pattern
153+
- `testBasicPublishAsyncWithContextInException` - Verifies context in `PublishException`
154+
155+
**Result:** 25/25 Confirm tests passing
156+
157+
## Comparison with Other Approaches
158+
159+
| Approach | Pros | Cons |
160+
|----------|------|------|
161+
| **Context parameter** | Type-safe, composable, no separate tracking | Requires passing parameter |
162+
| Separate Map | Familiar pattern | Manual correlation, race conditions |
163+
| Message properties | No API change | Limited to String, pollutes headers |
164+
| Wrapper class | Encapsulation | Extra object allocation, less composable |
165+
166+
## References
167+
168+
- [Channel.java](../src/main/java/com/rabbitmq/client/Channel.java) - API definition
169+
- [PublishException.java](../src/main/java/com/rabbitmq/client/PublishException.java) - Exception with context
170+
- [Confirm.java](../src/test/java/com/rabbitmq/client/test/functional/Confirm.java) - Integration tests

0 commit comments

Comments
 (0)