Skip to content

Commit c17cd9f

Browse files
committed
Add automatic publisher confirmation tracking with throttling
Traditional publisher confirms in the Java client require manual tracking of sequence numbers and correlation of Basic.Return messages. This makes per-message error handling complex and provides no built-in async pattern or backpressure mechanism. This change introduces automatic publisher confirmation tracking with a `CompletableFuture`-based API and progressive throttling, following the design of the .NET client's publisher confirmation implementation. When enabled via `ChannelOptions`, the library automatically tracks each message's confirmation status and completes futures when the broker sends Basic.Ack, Basic.Nack, or Basic.Return. New API methods: - `Channel.basicPublishAsync()` - Returns `CompletableFuture<Void>` that completes when broker confirms the message - `Connection.createChannel(ChannelOptions)` - Creates channel with publisher confirmation tracking enabled New classes: - `ChannelOptions` - Configuration for channel creation with tracking settings and optional `RateLimiter` for backpressure control - `PublishException` - Exception thrown when message is nack'd or returned, with full details including sequence number and routing information - `RateLimiter` - Interface for custom rate limiting strategies - `ThrottlingRateLimiter` - Progressive throttling implementation that applies delays proportional to capacity usage (0-1000ms) when available permits fall below a configurable threshold (default 50%) The implementation adds a sequence number header (`x-java-pub-seq-no`) to each tracked message, allowing correlation of Basic.Return responses with specific messages. The `ThrottlingRateLimiter` uses `Semaphore` for concurrency control and calculates delay as `percentageUsed * 1000ms`, matching the .NET client's algorithm exactly. Passing `null` for the rate limiter disables backpressure for unlimited outstanding confirmations. The feature is opt-in and maintains full backward compatibility. Existing `basicPublish()` and `waitForConfirms()` methods remain unchanged. Tests include 9 unit tests for `ThrottlingRateLimiter` and 23 integration tests for publisher confirmation tracking with various rate limiting scenarios.
1 parent 285b142 commit c17cd9f

20 files changed

+2669
-14
lines changed

RUNNING_TESTS.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,26 @@ top-level directory of the source tree:
8888
-Dit.test=DeadLetterExchange
8989
```
9090

91+
* To run a single test class:
92+
93+
```
94+
./mvnw verify -Dit.test=Confirm
95+
```
96+
97+
* To run a specific test method within a test class:
98+
99+
```
100+
./mvnw verify -Dit.test=Confirm#testBasicPublishAsync
101+
```
102+
91103
Test reports can be found in `target/failsafe-reports`.
92104

93105
## Running Against a Broker in a Docker Container
94106

95107
Run the broker:
96108

97109
```
98-
docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq:3.8
110+
docker run --pull always --rm --tty --interactive --name rabbitmq --publish 5672:5672 rabbitmq:latest
99111
```
100112

101113
Launch the tests:

doc/STATUS.md

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# Publisher Confirmation Tracking - Current Status
2+
3+
**Last Updated:** 2025-12-09
4+
**Branch:** `lukebakken/publisher-confirm-tracking`
5+
6+
## ⚠️ Important Note
7+
8+
**This is a new feature with NO backward compatibility requirements.**
9+
10+
All APIs are being developed from scratch and can be changed freely without
11+
concern for breaking existing code. This allows us to design the cleanest,
12+
most intuitive API possible without legacy constraints.
13+
14+
## Completed Work
15+
16+
### Core Feature (2025-12-08)
17+
✅ Automatic publisher confirmation tracking with `CompletableFuture` API
18+
✅ Generic context parameter for message correlation
19+
`ChannelOptions` with builder pattern for configuration
20+
`PublishException` with context for error handling
21+
✅ Sequence number header correlation (`x-seq-no`)
22+
✅ All 25 Confirm integration tests passing
23+
24+
### Rate Limiting Enhancement (2025-12-08 to 2025-12-09)
25+
`RateLimiter` interface for extensibility
26+
`ThrottlingRateLimiter` matching .NET client algorithm
27+
✅ 9 unit tests (CI-friendly, behavior-focused)
28+
✅ Builder pattern for `ChannelOptions`
29+
✅ Full integration with permit tracking
30+
✅ Merged and pushed
31+
32+
### API Improvements (2025-12-09)
33+
✅ Generic context parameter in `basicPublishAsync()`
34+
✅ Context flows through `CompletableFuture<T>` for message correlation
35+
✅ Context available in `PublishException` for error handling
36+
✅ Eliminates need for separate tracking structures
37+
✅ Single `ConcurrentHashMap` for confirmations (futures + permits + context)
38+
✅ Map sized based on `RateLimiter.getMaxConcurrency()` hint
39+
✅ Sequence number header changed to `x-seq-no`
40+
41+
## Current State
42+
43+
**Files:**
44+
- `src/main/java/com/rabbitmq/client/RateLimiter.java` - Interface
45+
- `src/main/java/com/rabbitmq/client/ThrottlingRateLimiter.java` - Implementation
46+
- `src/main/java/com/rabbitmq/client/ChannelOptions.java` - Builder pattern
47+
- `src/main/java/com/rabbitmq/client/PublishException.java` - Exception with context
48+
- `src/main/java/com/rabbitmq/client/impl/ChannelN.java` - Integration
49+
- `src/test/java/com/rabbitmq/client/test/ThrottlingRateLimiterTest.java` - 9 tests
50+
- `src/test/java/com/rabbitmq/client/test/functional/Confirm.java` - 25 tests
51+
52+
**Test Results:**
53+
- ThrottlingRateLimiter: 9/9 passing
54+
- Confirm: 25/25 passing (20 original + 5 new)
55+
- Full suite: Not yet run
56+
57+
## Next Steps
58+
59+
### Immediate (In Progress)
60+
🚧 **PublisherConfirmationState refactoring**
61+
- Extract confirmation tracking logic from `ChannelN`
62+
- Improve code organization and testability
63+
- See [publisher-confirmation-state-refactoring.md](./publisher-confirmation-state-refactoring.md)
64+
65+
### Pending
66+
- Documentation updates (`publisher-confirms-async.md`)
67+
- Full test suite validation (1088+ tests)
68+
- Recovery scenario testing
69+
- Performance benchmarking
70+
71+
## API Summary
72+
73+
```java
74+
// Create channel with throttling
75+
ThrottlingRateLimiter limiter = new ThrottlingRateLimiter(100, 50);
76+
ChannelOptions options = ChannelOptions.builder()
77+
.publisherConfirmations(true)
78+
.publisherConfirmationTracking(true)
79+
.rateLimiter(limiter)
80+
.build();
81+
Channel channel = connection.createChannel(options);
82+
83+
// Publish with context (such as correlation ID)
84+
String messageId = "msg-123";
85+
CompletableFuture<String> future = channel.basicPublishAsync(
86+
"", queueName, null, body, messageId
87+
);
88+
89+
// Handle result with context
90+
future.thenAccept(ctx -> System.out.println("Confirmed: " + ctx))
91+
.exceptionally(ex -> {
92+
if (ex.getCause() instanceof PublishException) {
93+
PublishException pe = (PublishException) ex.getCause();
94+
String msgId = (String) pe.getContext();
95+
System.err.println("Message " + msgId + " failed: " + pe.getMessage());
96+
}
97+
return null;
98+
});
99+
```
100+
101+
## Parity with .NET Client
102+
103+
| Feature | .NET | Java | Status |
104+
|---------|------|------|--------|
105+
| Async API | `Task` | `CompletableFuture` | ✅ Complete |
106+
| Rate Limiter Interface | `RateLimiter` | `RateLimiter` | ✅ Complete |
107+
| Throttling Implementation | `ThrottlingRateLimiter` | `ThrottlingRateLimiter` | ✅ Complete |
108+
| Algorithm | Progressive delay | Progressive delay | ✅ Identical |
109+
| Builder Pattern | Constructor with defaults | Builder | ✅ Complete |
110+
| Extensibility | Custom rate limiters | Custom rate limiters | ✅ Complete |
111+
112+
## Documentation
113+
114+
- [Publisher Confirms Async](./publisher-confirms-async.md) - Main feature documentation
115+
- [Throttling Rate Limiter](./throttling-rate-limiter.md) - Rate limiting details
116+
- [PublisherConfirmationState Refactoring](./publisher-confirmation-state-refactoring.md) - Next steps
117+
- [Implementation Summary](../../workplace/.../gh-1824/implementation-summary.md) - Complete history
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# PublisherConfirmationState Refactoring Plan
2+
3+
**Date:** 2025-12-09
4+
**Status:** 🚧 Planning
5+
**Related:** [Issue #1824](https://github.com/rabbitmq/rabbitmq-java-client/issues/1824)
6+
7+
## Motivation
8+
9+
`ChannelN.java` is a large, complex class (73,902 bytes) that handles multiple concerns.
10+
Publisher confirmation tracking logic is scattered throughout the class, making it
11+
difficult to understand, test, and maintain.
12+
13+
**Current state:**
14+
- Fields: `publisherConfirmationTrackingEnabled`, `rateLimiter`, `confirmsFutures`, `confirmsPermits`, `unconfirmedSet`, `nextPublishSeqNo`, `onlyAcksReceived`
15+
- Logic spread across: constructors, `basicPublishAsync()`, `handleAckNack()`, `callReturnListeners()`, `processShutdownSignal()`
16+
- ~200+ lines of confirmation-related code mixed with other channel concerns
17+
18+
## Goal
19+
20+
Extract publisher confirmation tracking into a cohesive `PublisherConfirmationState` class that:
21+
- Encapsulates all confirmation-related state and logic
22+
- Provides clear API for `ChannelN` to interact with
23+
- Improves testability (can unit test in isolation)
24+
- Reduces complexity of `ChannelN`
25+
- Maintains all existing functionality and tests
26+
27+
## Open Questions
28+
29+
### 1. Scope & Responsibilities
30+
- ✅ Track futures and permits
31+
- ✅ Manage sequence numbers
32+
- ✅ Handle rate limiter interaction
33+
- ❓ Include `confirmSelectActivated` flag?
34+
- ❓ Include `onlyAcksReceived` flag (used by `waitForConfirms()`)?
35+
- ❓ Include `unconfirmedSet` (also used by `waitForConfirms()`)?
36+
37+
### 2. Lifecycle & Ownership
38+
- When to create: constructor or lazy initialization?
39+
- Null when disabled or always present?
40+
- Shutdown handling approach?
41+
42+
### 3. API Design
43+
Proposed interface:
44+
```java
45+
class PublisherConfirmationState {
46+
PublishInfo startPublish(RateLimiter.Permit permit, BasicProperties props);
47+
void handleAck(long seqNo, boolean multiple);
48+
void handleNack(long seqNo, boolean multiple);
49+
void handleReturn(long seqNo, String exchange, String routingKey, int replyCode, String replyText);
50+
void shutdown(Exception cause);
51+
}
52+
```
53+
54+
### 4. Thread Safety
55+
- Self-contained synchronization?
56+
- Reuse existing locks?
57+
58+
### 5. Testing Strategy
59+
- Unit tests for `PublisherConfirmationState`?
60+
- Rely on existing integration tests?
61+
62+
### 6. Backward Compatibility
63+
- `waitForConfirms()` methods depend on `unconfirmedSet` and `onlyAcksReceived`
64+
- Keep in `ChannelN` or move to state class?
65+
66+
## Benefits
67+
68+
**Code organization:**
69+
- Single responsibility for confirmation tracking
70+
- Easier to understand and modify
71+
- Clear separation of concerns
72+
73+
**Testability:**
74+
- Can unit test confirmation logic in isolation
75+
- Easier to test edge cases
76+
- Reduced test setup complexity
77+
78+
**Maintainability:**
79+
- Changes to confirmation tracking isolated to one class
80+
- Reduced risk of breaking unrelated channel functionality
81+
- Clearer code review scope
82+
83+
## Implementation Approach
84+
85+
1. Create `PublisherConfirmationState` class
86+
2. Move fields and logic incrementally
87+
3. Update `ChannelN` to delegate to state class
88+
4. Verify all tests still pass
89+
5. Add unit tests for state class (optional)
90+
91+
## Timeline
92+
93+
- **2025-12-09:** Planning and design
94+
- **Next:** Implementation
95+
- **Target:** Complete refactoring while maintaining all tests passing
96+
97+
## References
98+
99+
- [ChannelN.java](../src/main/java/com/rabbitmq/client/impl/ChannelN.java)
100+
- [Publisher Confirms Documentation](./publisher-confirms-async.md)
101+
- [Throttling Rate Limiter](./throttling-rate-limiter.md)

0 commit comments

Comments
 (0)