Skip to content

Commit f785dc5

Browse files
committed
Add automatic publisher confirmation tracking with throttling
Traditional publisher confirms in the Java client require manual tracking of sequence numbers and correlation of Basic.Return messages. This makes per-message error handling complex and provides no built-in async pattern or backpressure mechanism. This change introduces automatic publisher confirmation tracking with a `CompletableFuture`-based API and progressive throttling, following the design of the .NET client's publisher confirmation implementation. When enabled via `ChannelOptions`, the library automatically tracks each message's confirmation status and completes futures when the broker sends Basic.Ack, Basic.Nack, or Basic.Return. New API methods: - `Channel.basicPublishAsync()` - Returns `CompletableFuture<Void>` that completes when broker confirms the message - `Connection.createChannel(ChannelOptions)` - Creates channel with publisher confirmation tracking enabled New classes: - `ChannelOptions` - Configuration for channel creation with tracking settings and optional `RateLimiter` for backpressure control - `PublishException` - Exception thrown when message is nack'd or returned, with full details including sequence number and routing information - `RateLimiter` - Interface for custom rate limiting strategies - `ThrottlingRateLimiter` - Progressive throttling implementation that applies delays proportional to capacity usage (0-1000ms) when available permits fall below a configurable threshold (default 50%) The implementation adds a sequence number header (`x-java-pub-seq-no`) to each tracked message, allowing correlation of Basic.Return responses with specific messages. The `ThrottlingRateLimiter` uses `Semaphore` for concurrency control and calculates delay as `percentageUsed * 1000ms`, matching the .NET client's algorithm exactly. Passing `null` for the rate limiter disables backpressure for unlimited outstanding confirmations. The feature is opt-in and maintains full backward compatibility. Existing `basicPublish()` and `waitForConfirms()` methods remain unchanged. Tests include 9 unit tests for `ThrottlingRateLimiter` and 23 integration tests for publisher confirmation tracking with various rate limiting scenarios.
1 parent 285b142 commit f785dc5

20 files changed

+2548
-14
lines changed

RUNNING_TESTS.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,26 @@ top-level directory of the source tree:
8888
-Dit.test=DeadLetterExchange
8989
```
9090

91+
* To run a single test class:
92+
93+
```
94+
./mvnw verify -Dit.test=Confirm
95+
```
96+
97+
* To run a specific test method within a test class:
98+
99+
```
100+
./mvnw verify -Dit.test=Confirm#testBasicPublishAsync
101+
```
102+
91103
Test reports can be found in `target/failsafe-reports`.
92104

93105
## Running Against a Broker in a Docker Container
94106

95107
Run the broker:
96108

97109
```
98-
docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq:3.8
110+
docker run --pull always --rm --tty --interactive --name rabbitmq --publish 5672:5672 rabbitmq:latest
99111
```
100112

101113
Launch the tests:

doc/STATUS.md

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# Publisher Confirmation Tracking - Current Status
2+
3+
**Last Updated:** 2025-12-09
4+
**Branch:** `lukebakken/publisher-confirm-tracking`
5+
6+
## Completed Work
7+
8+
### Core Feature (2025-12-08)
9+
✅ Automatic publisher confirmation tracking with `CompletableFuture` API
10+
`ChannelOptions` for configuration
11+
`PublishException` for error handling
12+
✅ Sequence number header correlation
13+
✅ All 23 Confirm integration tests passing
14+
15+
### Rate Limiting Enhancement (2025-12-08 to 2025-12-09)
16+
`RateLimiter` interface for extensibility
17+
`ThrottlingRateLimiter` matching .NET client algorithm
18+
✅ 9 unit tests (CI-friendly, behavior-focused)
19+
✅ Builder pattern for `ChannelOptions`
20+
✅ Full integration with permit tracking
21+
✅ Merged and pushed
22+
23+
## Current State
24+
25+
**Files:**
26+
- `src/main/java/com/rabbitmq/client/RateLimiter.java` - Interface
27+
- `src/main/java/com/rabbitmq/client/ThrottlingRateLimiter.java` - Implementation
28+
- `src/main/java/com/rabbitmq/client/ChannelOptions.java` - Builder pattern
29+
- `src/main/java/com/rabbitmq/client/PublishException.java` - Exception
30+
- `src/main/java/com/rabbitmq/client/impl/ChannelN.java` - Integration
31+
- `src/test/java/com/rabbitmq/client/test/ThrottlingRateLimiterTest.java` - 9 tests
32+
- `src/test/java/com/rabbitmq/client/test/functional/Confirm.java` - 23 tests
33+
34+
**Test Results:**
35+
- ThrottlingRateLimiter: 9/9 passing
36+
- Confirm: 23/23 passing
37+
- Full suite: Not yet run
38+
39+
## Next Steps
40+
41+
### Immediate (In Progress)
42+
🚧 **PublisherConfirmationState refactoring**
43+
- Extract confirmation tracking logic from `ChannelN`
44+
- Improve code organization and testability
45+
- See [publisher-confirmation-state-refactoring.md](./publisher-confirmation-state-refactoring.md)
46+
47+
### Pending
48+
- Documentation updates (`publisher-confirms-async.md`)
49+
- Full test suite validation (1088+ tests)
50+
- Recovery scenario testing
51+
- Performance benchmarking
52+
53+
## API Summary
54+
55+
```java
56+
// Create channel with throttling
57+
ThrottlingRateLimiter limiter = new ThrottlingRateLimiter(100, 50);
58+
ChannelOptions options = ChannelOptions.builder()
59+
.publisherConfirmations(true)
60+
.publisherConfirmationTracking(true)
61+
.rateLimiter(limiter)
62+
.build();
63+
Channel channel = connection.createChannel(options);
64+
65+
// Publish with automatic tracking
66+
CompletableFuture<Void> future = channel.basicPublishAsync(
67+
"", queueName, null, body
68+
);
69+
70+
// Handle result
71+
future.thenRun(() -> System.out.println("Confirmed"))
72+
.exceptionally(ex -> {
73+
if (ex.getCause() instanceof PublishException) {
74+
PublishException pe = (PublishException) ex.getCause();
75+
// Handle nack or return
76+
}
77+
return null;
78+
});
79+
```
80+
81+
## Parity with .NET Client
82+
83+
| Feature | .NET | Java | Status |
84+
|---------|------|------|--------|
85+
| Async API | `Task` | `CompletableFuture` | ✅ Complete |
86+
| Rate Limiter Interface | `RateLimiter` | `RateLimiter` | ✅ Complete |
87+
| Throttling Implementation | `ThrottlingRateLimiter` | `ThrottlingRateLimiter` | ✅ Complete |
88+
| Algorithm | Progressive delay | Progressive delay | ✅ Identical |
89+
| Builder Pattern | Constructor with defaults | Builder | ✅ Complete |
90+
| Extensibility | Custom rate limiters | Custom rate limiters | ✅ Complete |
91+
92+
## Documentation
93+
94+
- [Publisher Confirms Async](./publisher-confirms-async.md) - Main feature documentation
95+
- [Throttling Rate Limiter](./throttling-rate-limiter.md) - Rate limiting details
96+
- [PublisherConfirmationState Refactoring](./publisher-confirmation-state-refactoring.md) - Next steps
97+
- [Implementation Summary](../../workplace/.../gh-1824/implementation-summary.md) - Complete history
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# PublisherConfirmationState Refactoring Plan
2+
3+
**Date:** 2025-12-09
4+
**Status:** 🚧 Planning
5+
**Related:** [Issue #1824](https://github.com/rabbitmq/rabbitmq-java-client/issues/1824)
6+
7+
## Motivation
8+
9+
`ChannelN.java` is a large, complex class (73,902 bytes) that handles multiple concerns.
10+
Publisher confirmation tracking logic is scattered throughout the class, making it
11+
difficult to understand, test, and maintain.
12+
13+
**Current state:**
14+
- Fields: `publisherConfirmationTrackingEnabled`, `rateLimiter`, `confirmsFutures`, `confirmsPermits`, `unconfirmedSet`, `nextPublishSeqNo`, `onlyAcksReceived`
15+
- Logic spread across: constructors, `basicPublishAsync()`, `handleAckNack()`, `callReturnListeners()`, `processShutdownSignal()`
16+
- ~200+ lines of confirmation-related code mixed with other channel concerns
17+
18+
## Goal
19+
20+
Extract publisher confirmation tracking into a cohesive `PublisherConfirmationState` class that:
21+
- Encapsulates all confirmation-related state and logic
22+
- Provides clear API for `ChannelN` to interact with
23+
- Improves testability (can unit test in isolation)
24+
- Reduces complexity of `ChannelN`
25+
- Maintains all existing functionality and tests
26+
27+
## Open Questions
28+
29+
### 1. Scope & Responsibilities
30+
- ✅ Track futures and permits
31+
- ✅ Manage sequence numbers
32+
- ✅ Handle rate limiter interaction
33+
- ❓ Include `confirmSelectActivated` flag?
34+
- ❓ Include `onlyAcksReceived` flag (used by `waitForConfirms()`)?
35+
- ❓ Include `unconfirmedSet` (also used by `waitForConfirms()`)?
36+
37+
### 2. Lifecycle & Ownership
38+
- When to create: constructor or lazy initialization?
39+
- Null when disabled or always present?
40+
- Shutdown handling approach?
41+
42+
### 3. API Design
43+
Proposed interface:
44+
```java
45+
class PublisherConfirmationState {
46+
PublishInfo startPublish(RateLimiter.Permit permit, BasicProperties props);
47+
void handleAck(long seqNo, boolean multiple);
48+
void handleNack(long seqNo, boolean multiple);
49+
void handleReturn(long seqNo, String exchange, String routingKey, int replyCode, String replyText);
50+
void shutdown(Exception cause);
51+
}
52+
```
53+
54+
### 4. Thread Safety
55+
- Self-contained synchronization?
56+
- Reuse existing locks?
57+
58+
### 5. Testing Strategy
59+
- Unit tests for `PublisherConfirmationState`?
60+
- Rely on existing integration tests?
61+
62+
### 6. Backward Compatibility
63+
- `waitForConfirms()` methods depend on `unconfirmedSet` and `onlyAcksReceived`
64+
- Keep in `ChannelN` or move to state class?
65+
66+
## Benefits
67+
68+
**Code organization:**
69+
- Single responsibility for confirmation tracking
70+
- Easier to understand and modify
71+
- Clear separation of concerns
72+
73+
**Testability:**
74+
- Can unit test confirmation logic in isolation
75+
- Easier to test edge cases
76+
- Reduced test setup complexity
77+
78+
**Maintainability:**
79+
- Changes to confirmation tracking isolated to one class
80+
- Reduced risk of breaking unrelated channel functionality
81+
- Clearer code review scope
82+
83+
## Implementation Approach
84+
85+
1. Create `PublisherConfirmationState` class
86+
2. Move fields and logic incrementally
87+
3. Update `ChannelN` to delegate to state class
88+
4. Verify all tests still pass
89+
5. Add unit tests for state class (optional)
90+
91+
## Timeline
92+
93+
- **2025-12-09:** Planning and design
94+
- **Next:** Implementation
95+
- **Target:** Complete refactoring while maintaining all tests passing
96+
97+
## References
98+
99+
- [ChannelN.java](../src/main/java/com/rabbitmq/client/impl/ChannelN.java)
100+
- [Publisher Confirms Documentation](./publisher-confirms-async.md)
101+
- [Throttling Rate Limiter](./throttling-rate-limiter.md)

0 commit comments

Comments
 (0)