diff --git a/RUNNING_TESTS.md b/RUNNING_TESTS.md index f948127ba1..21a45046db 100644 --- a/RUNNING_TESTS.md +++ b/RUNNING_TESTS.md @@ -88,6 +88,18 @@ top-level directory of the source tree: -Dit.test=DeadLetterExchange ``` +* To run a single test class: + +``` +./mvnw verify -Dit.test=Confirm +``` + +* To run a specific test method within a test class: + +``` +./mvnw verify -Dit.test=Confirm#testBasicPublishAsync +``` + Test reports can be found in `target/failsafe-reports`. ## Running Against a Broker in a Docker Container @@ -95,7 +107,7 @@ Test reports can be found in `target/failsafe-reports`. Run the broker: ``` -docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq:3.8 +docker run --pull always --rm --tty --interactive --name rabbitmq --publish 5672:5672 rabbitmq:latest ``` Launch the tests: diff --git a/doc/publisher-confirmations/README.md b/doc/publisher-confirmations/README.md new file mode 100644 index 0000000000..837ffb5453 --- /dev/null +++ b/doc/publisher-confirmations/README.md @@ -0,0 +1,358 @@ +# ConfirmationChannel - Asynchronous Publisher Confirmations + +**Status:** Complete +**Date:** 2025-12-10 + +## Overview + +`ConfirmationChannel` provides asynchronous publisher confirmation tracking with a `CompletableFuture`-based API, optional rate limiting, and generic context parameter for message correlation. The implementation wraps existing `Channel` instances using listener-based integration, requiring no modifications to the core `ChannelN` class. + +## Motivation + +Traditional publisher confirms in the Java client require manual tracking of sequence numbers and correlation of Basic.Return messages. This makes per-message error handling complex and provides no built-in async pattern, backpressure mechanism, or message correlation support. + +`ConfirmationChannel` addresses these limitations by providing: +- Automatic confirmation tracking via `CompletableFuture` API +- Generic context parameter for message correlation +- Optional rate limiting for backpressure control +- Clean separation from core `Channel` implementation + +## Architecture + +### Interface Hierarchy + +``` +Channel (existing interface) + ↑ + | +ConfirmationChannel (new interface) + ↑ + | +ConfirmationChannelN (new implementation) +``` + +### Key Components + +**ConfirmationChannel Interface** +- Extends `Channel` interface +- Adds `basicPublishAsync()` methods (with and without mandatory flag) +- Generic `` context parameter for correlation +- Returns `CompletableFuture` + +**ConfirmationChannelN Implementation** +- Wraps an existing `Channel` instance (composition, not inheritance) +- Maintains its own sequence number counter (`AtomicLong`) +- Registers return and confirm listeners on the wrapped channel +- Delegates all other `Channel` methods to the wrapped instance +- Throws `UnsupportedOperationException` for `basicPublish()` methods + +### Sequence Number Management + +**Independent Sequence Space:** +- `ConfirmationChannelN` maintains its own `AtomicLong nextSeqNo` +- No coordination with `ChannelN`'s sequence numbers +- Sequence numbers start at 1 and increment for each `basicPublishAsync()` call +- Sequence number added to message headers as `x-seq-no` + +**Why Independent?** +- `basicPublish()` is disallowed on `ConfirmationChannel` +- No risk of sequence number conflicts +- Simpler implementation - no need to access `ChannelN` internals +- Clean separation of concerns + +### Confirmation Tracking + +**State Management:** +```java +private final ConcurrentHashMap> confirmations; + +private static class ConfirmationEntry { + final CompletableFuture future; + final RateLimiter.Permit permit; + final T context; +} +``` + +**Lifecycle:** +1. `basicPublishAsync()` called +2. Acquire rate limiter permit (if configured) +3. Get next sequence number +4. Create `CompletableFuture` and `ConfirmationEntry` +5. Add `x-seq-no` header to message +6. Store entry in `confirmations` map +7. Call `delegate.basicPublish()` +8. Return future to caller + +**Completion Paths:** +- **Basic.Ack** → Complete future with context value, release permit +- **Basic.Nack** → Complete exceptionally with `PublishException`, release permit +- **Basic.Return** → Complete exceptionally with `PublishException`, release permit +- **Channel close** → Complete all pending futures exceptionally, release all permits + +### Listener Integration + +**Return Listener:** +```java +delegate.addReturnListener((replyCode, replyText, exchange, routingKey, props, body) -> { + long seqNo = extractSequenceNumber(props.getHeaders()); + ConfirmationEntry entry = confirmations.remove(seqNo); + if (entry != null) { + entry.future.completeExceptionally( + new PublishException(seqNo, true, exchange, routingKey, replyCode, replyText, entry.context) + ); + entry.releasePermit(); + } +}); +``` + +**Confirm Listeners:** +```java +delegate.addConfirmListener( + (seqNo, multiple) -> handleAck(seqNo, multiple), + (seqNo, multiple) -> handleNack(seqNo, multiple) +); +``` + +**Multiple Acknowledgments:** +When `multiple=true`, all sequence numbers ≤ `seqNo` are processed: +```java +for (Long seq : new ArrayList<>(confirmations.keySet())) { + if (seq <= seqNo) { + ConfirmationEntry entry = confirmations.remove(seq); + // Complete future and release permit + } +} +``` + +## API Design + +### Constructor + +```java +public ConfirmationChannelN(Channel delegate, RateLimiter rateLimiter) +``` + +**Parameters:** +- `delegate` - The underlying `Channel` instance (typically `ChannelN`) +- `rateLimiter` - Optional rate limiter for controlling publish concurrency (can be null) + +**Initialization:** +- Calls `delegate.confirmSelect()` to enable publisher confirmations +- Registers return and confirm listeners +- Initializes confirmation tracking map + +### basicPublishAsync Methods + +```java + CompletableFuture basicPublishAsync(String exchange, String routingKey, + AMQP.BasicProperties props, byte[] body, T context) + + CompletableFuture basicPublishAsync(String exchange, String routingKey, + boolean mandatory, + AMQP.BasicProperties props, byte[] body, T context) +``` + +**Context Parameter:** +- Generic type `` allows any user-defined correlation object +- Returned in the completed future on success +- Available in `PublishException.getContext()` on failure +- Can be null if correlation not needed + +**Return Value:** +- `CompletableFuture` that completes when broker confirms/rejects +- Completes successfully with context value on Basic.Ack +- Completes exceptionally with `PublishException` on Basic.Nack or Basic.Return + +### basicPublish Methods (Disallowed) + +All `basicPublish()` method overloads throw `UnsupportedOperationException`: + +```java +@Override +public void basicPublish(String exchange, String routingKey, + AMQP.BasicProperties props, byte[] body) { + throw new UnsupportedOperationException( + "basicPublish() is not supported on ConfirmationChannel. Use basicPublishAsync() instead." + ); +} +``` + +**Rationale:** +- Prevents mixing synchronous and asynchronous publish patterns +- Eliminates sequence number coordination complexity +- Clear API contract - this channel is for async confirmations only + +### Delegated Methods + +All other `Channel` methods are delegated to the wrapped instance: +- `basicConsume()`, `basicGet()`, `basicAck()`, etc. +- `exchangeDeclare()`, `queueDeclare()`, etc. +- `addReturnListener()`, `addConfirmListener()`, etc. +- `close()`, `abort()`, etc. + +## Rate Limiting + +**Optional Feature:** +- Pass `RateLimiter` to constructor to enable +- Limits concurrent in-flight messages +- Blocks in `basicPublishAsync()` until permit available +- Permits released when confirmation received (ack/nack/return) + +**Integration:** +```java +RateLimiter.Permit permit = null; +if (rateLimiter != null) { + permit = rateLimiter.acquire(); // May block +} +// ... publish message ... +// Store permit in ConfirmationEntry for later release +``` + +## Error Handling + +### PublishException + +Enhanced with context parameter: +```java +public class PublishException extends IOException { + private final Object context; // User-provided correlation object + + // Constructor for nacks (no routing details available) + public PublishException(long sequenceNumber, Object context) + + // Constructor for returns (full routing details) + public PublishException(long sequenceNumber, boolean isReturn, + String exchange, String routingKey, + Integer replyCode, String replyText, Object context) +} +``` + +### Exception Scenarios + +**Basic.Nack:** +- Broker rejected the message +- `isReturn() == false` +- Exchange, routingKey, replyCode, replyText are null +- Only sequence number and context available + +**Basic.Return:** +- Message unroutable (mandatory flag set) +- `isReturn() == true` +- Full routing details available +- Reply code indicates reason (NO_ROUTE, NO_CONSUMERS, etc.) + +**Channel Closed:** +- All pending futures completed with `AlreadyClosedException` +- All rate limiter permits released +- Confirmations map cleared + +**I/O Error:** +- Future completed with the I/O exception +- Rate limiter permit released +- Entry removed from confirmations map + +## Usage Examples + +### Basic Usage + +```java +Connection connection = factory.newConnection(); +Channel channel = connection.createChannel(); +ConfirmationChannel confirmChannel = ConfirmationChannel.create(channel, null); + +confirmChannel.basicPublishAsync("exchange", "routing.key", props, body, "msg-123") + .thenAccept(msgId -> System.out.println("Confirmed: " + msgId)) + .exceptionally(ex -> { + System.err.println("Failed: " + ex.getMessage()); + return null; + }); +``` + +### With Rate Limiting + +```java +RateLimiter rateLimiter = new ThrottlingRateLimiter(1000); // Max 1000 in-flight +ConfirmationChannel confirmChannel = ConfirmationChannel.create(channel, rateLimiter); + +for (int i = 0; i < 10000; i++) { + String msgId = "msg-" + i; + confirmChannel.basicPublishAsync("exchange", "key", props, body, msgId) + .thenAccept(id -> System.out.println("Confirmed: " + id)) + .exceptionally(ex -> { + if (ex.getCause() instanceof PublishException) { + PublishException pe = (PublishException) ex.getCause(); + System.err.println("Failed: " + pe.getContext()); + } + return null; + }); +} +``` + +### With Context Objects + +```java +class MessageContext { + final String orderId; + final Instant timestamp; + + MessageContext(String orderId) { + this.orderId = orderId; + this.timestamp = Instant.now(); + } +} + +MessageContext ctx = new MessageContext("order-12345"); +confirmChannel.basicPublishAsync("orders", "new", props, body, ctx) + .thenAccept(context -> { + Duration latency = Duration.between(context.timestamp, Instant.now()); + System.out.println("Order " + context.orderId + " confirmed in " + latency.toMillis() + "ms"); + }); +``` + +## Test Results + +- **Confirm tests:** 24/24 passing +- **ThrottlingRateLimiterTest:** 9/9 passing +- **Total:** 33/33 tests passing + +## Testing Strategy + +### Unit Tests +- Sequence number generation and tracking +- Confirmation entry lifecycle +- Rate limiter integration +- Exception handling + +### Integration Tests (Existing) +- All 25 tests in `Confirm.java` adapted to use `ConfirmationChannel` +- Basic.Ack handling (single and multiple) +- Basic.Nack handling (single and multiple) +- Basic.Return handling +- Context parameter correlation +- Channel close cleanup + +### Rate Limiter Tests (Existing) +- 9 tests in `ThrottlingRateLimiterTest.java` +- No changes needed (rate limiter is independent) + +## Trade-offs + +**Pros:** +- Clean architecture with clear boundaries +- No risk of breaking existing functionality +- Easy to understand and maintain +- Can evolve independently of `ChannelN` + +**Cons:** +- Requires wrapping a channel (extra object) +- Two ways to do publisher confirmations (`waitForConfirms()` vs `basicPublishAsync()`) +- Cannot mix `basicPublish()` and `basicPublishAsync()` on same channel +- Slightly more verbose setup code + +## Future Enhancements + +1. **Factory method on Connection** - `connection.createConfirmationChannel(rateLimiter)` +2. **Batch operations** - `basicPublishAsyncBatch()` for multiple messages +3. **Metrics integration** - Add metrics for `basicPublishAsync()` +4. **Observability** - Integration with observation collectors +5. **Alternative rate limiters** - Token bucket, sliding window, etc. diff --git a/src/main/java/com/rabbitmq/client/ConfirmationChannel.java b/src/main/java/com/rabbitmq/client/ConfirmationChannel.java new file mode 100644 index 0000000000..0736062317 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/ConfirmationChannel.java @@ -0,0 +1,149 @@ +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client; + +import com.rabbitmq.client.impl.ConfirmationChannelN; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +/** + * A channel that supports asynchronous publisher confirmations. + *

+ * This interface extends {@link Channel} to add the {@link #basicPublishAsync} method, + * which returns a {@link CompletableFuture} that completes when the broker confirms + * or rejects the published message. + *

+ * Publisher confirmations are automatically enabled when using this channel type. + * Messages are tracked using sequence numbers, and the future completes when: + *

    + *
  • The broker sends Basic.Ack (successful confirmation)
  • + *
  • The broker sends Basic.Nack (rejection) - future completes exceptionally with {@link PublishException}
  • + *
  • The broker returns the message via Basic.Return (unroutable) - future completes exceptionally with {@link PublishException}
  • + *
+ *

+ * Example usage: + *

{@code
+ * Channel channel = connection.createChannel();
+ * ConfirmationChannel confirmChannel = ConfirmationChannel.create(channel, rateLimiter);
+ *
+ * confirmChannel.basicPublishAsync("exchange", "routing.key", props, body, "msg-123")
+ *     .thenAccept(msgId -> System.out.println("Confirmed: " + msgId))
+ *     .exceptionally(ex -> {
+ *         if (ex.getCause() instanceof PublishException) {
+ *             PublishException pe = (PublishException) ex.getCause();
+ *             System.err.println("Failed: " + pe.getContext());
+ *         }
+ *         return null;
+ *     });
+ * }
+ * + * @see Channel + * @see PublishException + * @see com.rabbitmq.client.impl.ConfirmationChannelN + */ +public interface ConfirmationChannel extends Channel +{ + /** + * Creates a new ConfirmationChannel by wrapping an existing channel. + *

+ * This factory method enables asynchronous publisher confirmation tracking + * on any existing channel. The wrapped channel will have publisher confirmations + * automatically enabled via {@link Channel#confirmSelect()}. + *

+ * Example usage: + *

{@code
+     * Channel channel = connection.createChannel();
+     * RateLimiter limiter = new ThrottlingRateLimiter(100, 50);
+     * ConfirmationChannel confirmChannel = ConfirmationChannel.create(channel, limiter);
+     *
+     * confirmChannel.basicPublishAsync("exchange", "key", props, body, "msg-123")
+     *     .thenAccept(msgId -> System.out.println("Confirmed: " + msgId));
+     * }
+ * + * @param channel the channel to wrap + * @param rateLimiter optional rate limiter for controlling publish concurrency (null for unlimited) + * @return a new ConfirmationChannel instance + * @throws IOException if enabling publisher confirmations fails + * @see RateLimiter + * @see ThrottlingRateLimiter + */ + static ConfirmationChannel create(Channel channel, RateLimiter rateLimiter) throws IOException + { + return new ConfirmationChannelN(channel, rateLimiter); + } + + /** + * Asynchronously publish a message with publisher confirmation tracking. + *

+ * This method publishes a message and returns a {@link CompletableFuture} that completes + * when the broker confirms or rejects the message. The future's value is the context + * parameter provided, allowing correlation between publish requests and confirmations. + *

+ * The future completes: + *

    + *
  • Successfully with the context value when the broker sends Basic.Ack
  • + *
  • Exceptionally with {@link PublishException} when: + *
      + *
    • The broker sends Basic.Nack (message rejected)
    • + *
    • The broker returns the message via Basic.Return (unroutable with mandatory flag)
    • + *
    • An I/O error occurs during publish
    • + *
    • The channel is closed before confirmation
    • + *
    + *
  • + *
+ *

+ * Thread safety: This method is thread-safe and can be called concurrently. + *

+ * Rate limiting: If a {@link RateLimiter} is configured, this method will + * block until a permit is available before publishing. + * + * @param exchange the exchange to publish to + * @param routingKey the routing key + * @param props message properties (null for default) + * @param body message body + * @param context user-provided context object for correlation (can be null) + * @param the type of the context parameter + * @return a CompletableFuture that completes with the context value on success, + * or exceptionally with PublishException on failure + * @throws IllegalStateException if the channel is closed + */ + CompletableFuture basicPublishAsync(String exchange, String routingKey, + AMQP.BasicProperties props, byte[] body, T context); + + /** + * Asynchronously publish a message with mandatory flag and publisher confirmation tracking. + *

+ * This is equivalent to {@link #basicPublishAsync(String, String, AMQP.BasicProperties, byte[], Object)} + * but allows specifying the mandatory flag. When mandatory is true, the broker will return + * the message via Basic.Return if it cannot be routed to any queue, causing the future + * to complete exceptionally with {@link PublishException}. + * + * @param exchange the exchange to publish to + * @param routingKey the routing key + * @param mandatory true if the message must be routable to at least one queue + * @param props message properties (null for default) + * @param body message body + * @param context user-provided context object for correlation (can be null) + * @param the type of the context parameter + * @return a CompletableFuture that completes with the context value on success, + * or exceptionally with PublishException on failure + * @throws IllegalStateException if the channel is closed + */ + CompletableFuture basicPublishAsync(String exchange, String routingKey, + boolean mandatory, + AMQP.BasicProperties props, byte[] body, T context); +} diff --git a/src/main/java/com/rabbitmq/client/Connection.java b/src/main/java/com/rabbitmq/client/Connection.java index 131d456180..8d95156869 100644 --- a/src/main/java/com/rabbitmq/client/Connection.java +++ b/src/main/java/com/rabbitmq/client/Connection.java @@ -201,23 +201,23 @@ default Optional openChannel(int channelNumber) throws IOException { * Close this connection and all its channels * with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code * and message 'OK'. - * + * * This method behaves in a similar way as {@link #close()}, with the only difference * that it waits with a provided timeout for all the close operations to * complete. When timeout is reached the socket is forced to close. - * + * * @param timeout timeout (in milliseconds) for completing all the close-related * operations, use -1 for infinity * @throws IOException if an I/O problem is encountered */ void close(int timeout) throws IOException; - + /** * Close this connection and all its channels. * * Waits with the given timeout for all the close operations to complete. * When timeout is reached the socket is forced to close. - * + * * @param closeCode the close code (See under "Reply Codes" in the AMQP specification) * @param closeMessage a message indicating the reason for closing the connection * @param timeout timeout (in milliseconds) for completing all the close-related @@ -235,18 +235,18 @@ default Optional openChannel(int channelNumber) throws IOException { * Any encountered exceptions in the close operations are silently discarded. */ void abort(); - + /** * Abort this connection and all its channels. * * Forces the connection to close and waits for all the close operations to complete. * Any encountered exceptions in the close operations are silently discarded. - * + * * @param closeCode the close code (See under "Reply Codes" in the AMQP specification) * @param closeMessage a message indicating the reason for closing the connection */ void abort(int closeCode, String closeMessage); - + /** * Abort this connection and all its channels * with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code diff --git a/src/main/java/com/rabbitmq/client/PublishException.java b/src/main/java/com/rabbitmq/client/PublishException.java new file mode 100644 index 0000000000..aab9e40ef6 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/PublishException.java @@ -0,0 +1,153 @@ +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client; + +import java.io.IOException; + +/** + * Exception thrown when a published message is nack'd or returned by the broker. + *

+ * This exception is thrown when publisher confirmation tracking is enabled and: + *

    + *
  • The broker sends Basic.Nack for a message (negative acknowledgment)
  • + *
  • The broker returns a message via Basic.Return (unroutable with mandatory flag)
  • + *
+ *

+ * Use {@link #isReturn()} to distinguish between these two cases: + *

    + *
  • Nack ({@code isReturn() == false}): The broker rejected the message. + * Additional fields (exchange, routingKey, replyCode, replyText) will be null.
  • + *
  • Return ({@code isReturn() == true}): The broker could not route the message. + * Additional fields contain routing information and the reason for the return.
  • + *
+ *

+ * Example handling: + *

{@code
+ * ConfirmationChannel channel = ConfirmationChannel.create(regularChannel, rateLimiter);
+ * channel.basicPublishAsync(exchange, routingKey, true, props, body, "msg-123")
+ *     .exceptionally(ex -> {
+ *         if (ex.getCause() instanceof PublishException) {
+ *             PublishException pe = (PublishException) ex.getCause();
+ *             String msgId = (String) pe.getContext();
+ *             if (pe.isReturn()) {
+ *                 System.err.println("Message " + msgId + " returned: " + pe.getReplyText());
+ *             } else {
+ *                 System.err.println("Message " + msgId + " nack'd");
+ *             }
+ *         }
+ *         return null;
+ *     });
+ * }
+ * + * @see ConfirmationChannel#basicPublishAsync(String, String, com.rabbitmq.client.AMQP.BasicProperties, byte[], Object) + * @see ConfirmationChannel + */ +public class PublishException extends IOException { + private final long sequenceNumber; + private final boolean isReturn; + private final String exchange; + private final String routingKey; + private final Integer replyCode; + private final String replyText; + private final Object context; + + /** + * Constructor for nack scenarios where routing details are not available. + *

+ * When the broker sends Basic.Nack, it only provides the sequence number. + * The exchange, routingKey, replyCode, and replyText fields will be null. + * + * @param sequenceNumber the publish sequence number + * @param context the user-provided context object + */ + public PublishException(long sequenceNumber, Object context) + { + this(sequenceNumber, false, null, null, null, null, context); + } + + public PublishException(long sequenceNumber, boolean isReturn, String exchange, String routingKey, + Integer replyCode, String replyText, Object context) { + super(buildMessage(sequenceNumber, isReturn, replyCode, replyText)); + this.sequenceNumber = sequenceNumber; + this.isReturn = isReturn; + this.exchange = exchange; + this.routingKey = routingKey; + this.replyCode = replyCode; + this.replyText = replyText; + this.context = context; + } + + private static String buildMessage(long sequenceNumber, boolean isReturn, Integer replyCode, String replyText) { + if (isReturn) { + return String.format("Message %d returned: %s (%d)", sequenceNumber, replyText, replyCode); + } else { + return String.format("Message %d nack'd", sequenceNumber); + } + } + + /** + * @return the publish sequence number of the failed message + */ + public long getSequenceNumber() { + return sequenceNumber; + } + + /** + * @return true if this exception was caused by Basic.Return (unroutable message), + * false if caused by Basic.Nack (broker rejection) + */ + public boolean isReturn() { + return isReturn; + } + + /** + * @return the exchange the message was published to (only available for returns, null for nacks) + */ + public String getExchange() { + return exchange; + } + + /** + * @return the routing key used (only available for returns, null for nacks) + */ + public String getRoutingKey() { + return routingKey; + } + + /** + * @return the reply code from the broker (only available for returns, null for nacks) + * @see com.rabbitmq.client.AMQP#NO_ROUTE + * @see com.rabbitmq.client.AMQP#NO_CONSUMERS + */ + public Integer getReplyCode() { + return replyCode; + } + + /** + * @return the reply text from the broker explaining why the message was returned + * (only available for returns, null for nacks) + */ + public String getReplyText() { + return replyText; + } + + /** + * @return the user-provided context object that was passed to basicPublishAsync, or null if none was provided + */ + public Object getContext() { + return context; + } +} diff --git a/src/main/java/com/rabbitmq/client/RateLimiter.java b/src/main/java/com/rabbitmq/client/RateLimiter.java new file mode 100644 index 0000000000..d751cc1699 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/RateLimiter.java @@ -0,0 +1,67 @@ +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client; + +/** + * Interface for rate limiting publisher confirmations. + *

+ * Implementations control the rate of publish operations by acquiring permits + * before publishing and releasing them when confirmations are received. + *

+ * The library provides {@link ThrottlingRateLimiter} as a default implementation + * with progressive throttling behavior. Users can implement custom rate limiting + * strategies by implementing this interface. + * + * @see ThrottlingRateLimiter + */ +public interface RateLimiter +{ + /** + * Acquires a permit, blocking if necessary until one is available. + *

+ * Implementations may apply delays or other backpressure mechanisms + * before returning the permit. + * + * @return A permit that must be released when the operation completes + * @throws InterruptedException if the thread is interrupted while waiting + */ + Permit acquire() throws InterruptedException; + + /** + * Gets the maximum concurrency supported by this rate limiter. + *

+ * This is a hint for sizing internal data structures. Implementations + * that don't have a fixed capacity should return 0. + * + * @return the maximum number of concurrent operations, or 0 if unknown/unlimited + */ + default int getMaxConcurrency() + { + return 0; + } + + /** + * A permit that represents acquired access to a rate-limited resource. + * Must be released when the operation completes. + */ + interface Permit + { + /** + * Releases this permit, returning it to the rate limiter pool. + */ + void release(); + } +} diff --git a/src/main/java/com/rabbitmq/client/ThrottlingRateLimiter.java b/src/main/java/com/rabbitmq/client/ThrottlingRateLimiter.java new file mode 100644 index 0000000000..4773300d95 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/ThrottlingRateLimiter.java @@ -0,0 +1,156 @@ +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A rate limiter that controls the rate of operations by limiting concurrency and applying delays + * when a specified threshold of concurrency usage is reached. + *

+ * The delay algorithm checks the current available permits. If the available permits are greater than or equal + * to the throttling threshold, no delay is applied. Otherwise, it calculates a delay based on the percentage + * of permits used, scaling it up to a maximum of 1000 milliseconds. + *

+ * This implementation matches the behavior of the .NET client's {@code ThrottlingRateLimiter}. + * + * @see RateLimiter + */ +public class ThrottlingRateLimiter implements RateLimiter +{ + + /** + * The default throttling percentage, which defines the threshold for applying throttling, set to 50%. + */ + public static final int DEFAULT_THROTTLING_PERCENTAGE = 50; + + private final int maxConcurrency; + private final int throttlingThreshold; + private final Semaphore semaphore; + private final AtomicInteger currentPermits; + + /** + * Initializes a new instance with the specified maximum number of concurrent calls + * and the default throttling percentage (50%). + * + * @param maxConcurrentCalls The maximum number of concurrent operations allowed + */ + public ThrottlingRateLimiter(int maxConcurrentCalls) { + this(maxConcurrentCalls, DEFAULT_THROTTLING_PERCENTAGE); + } + + /** + * Initializes a new instance with the specified maximum number of concurrent calls + * and throttling percentage. + * + * @param maxConcurrentCalls The maximum number of concurrent operations allowed + * @param throttlingPercentage The percentage of maxConcurrentCalls at which throttling is triggered + */ + public ThrottlingRateLimiter(int maxConcurrentCalls, int throttlingPercentage) { + if (maxConcurrentCalls <= 0) { + throw new IllegalArgumentException("maxConcurrentCalls must be positive"); + } + if (throttlingPercentage < 0 || throttlingPercentage > 100) { + throw new IllegalArgumentException("throttlingPercentage must be between 0 and 100"); + } + + this.maxConcurrency = maxConcurrentCalls; + this.throttlingThreshold = maxConcurrency * throttlingPercentage / 100; + this.semaphore = new Semaphore(maxConcurrentCalls, true); + this.currentPermits = new AtomicInteger(maxConcurrentCalls); + } + + /** + * Acquires a permit, blocking if necessary until one is available. + * Applies throttling delay if the number of available permits falls below the threshold. + * + * @return A permit that must be released when the operation completes + * @throws InterruptedException if the thread is interrupted while waiting + */ + public Permit acquire() throws InterruptedException { + int delay = calculateDelay(); + if (delay > 0) { + Thread.sleep(delay); + } + + semaphore.acquire(); + currentPermits.decrementAndGet(); + return new Permit(this); + } + + /** + * Releases a permit, returning it to the pool. + */ + private void release() { + currentPermits.incrementAndGet(); + semaphore.release(); + } + + /** + * Gets the current number of available permits. + * + * @return The number of permits currently available + */ + public int getAvailablePermits() { + return currentPermits.get(); + } + + /** + * Gets the maximum concurrency supported by this rate limiter. + * + * @return The maximum number of concurrent operations allowed + */ + @Override + public int getMaxConcurrency() { + return maxConcurrency; + } + + private int calculateDelay() { + int availablePermits = currentPermits.get(); + + if (availablePermits >= throttlingThreshold) { + // No delay - available permits exceed the threshold + return 0; + } + + double percentageUsed = 1.0 - (availablePermits / (double) maxConcurrency); + return (int)(percentageUsed * 1000); + } + + /** + * A permit that represents acquired access to a rate-limited resource. + * Must be released when the operation completes. + */ + public static class Permit implements RateLimiter.Permit { + private final ThrottlingRateLimiter limiter; + private boolean released = false; + + private Permit(ThrottlingRateLimiter limiter) { + this.limiter = limiter; + } + + /** + * Releases this permit, returning it to the rate limiter pool. + */ + public void release() { + if (!released) { + released = true; + limiter.release(); + } + } + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index 4c08613394..3422fd803d 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -117,6 +117,7 @@ public ChannelN(AMQConnection connection, int channelNumber, * @param channelNumber The channel number to be associated with this channel * @param workService service for managing this channel's consumer callbacks * @param metricsCollector service for managing metrics + * @param observationCollector service for managing observations */ public ChannelN(AMQConnection connection, int channelNumber, ConsumerWorkService workService, diff --git a/src/main/java/com/rabbitmq/client/impl/ConfirmationChannelN.java b/src/main/java/com/rabbitmq/client/impl/ConfirmationChannelN.java new file mode 100644 index 0000000000..c7f1b977cd --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/ConfirmationChannelN.java @@ -0,0 +1,934 @@ +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl; + +import com.rabbitmq.client.*; +import com.rabbitmq.client.AMQP.BasicProperties; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implementation of {@link ConfirmationChannel} that wraps an existing {@link Channel} + * and provides asynchronous publisher confirmation tracking. + *

+ * This class maintains its own sequence number space independent of the wrapped channel. + * The {@link #basicPublish} methods are not supported and will throw + * {@link UnsupportedOperationException}. Use {@link #basicPublishAsync} instead. + */ +public class ConfirmationChannelN implements ConfirmationChannel +{ + private static final String SEQUENCE_NUMBER_HEADER = "x-seq-no"; + + private final Channel delegate; + private final RateLimiter rateLimiter; + private final AtomicLong nextSeqNo = new AtomicLong(1); + private final Map> confirmations; + + private final ShutdownListener shutdownListener; + private final ReturnListener returnListener; + private final ConfirmListener confirmListener; + + /** + * Creates a new ConfirmationChannelN wrapping the given channel. + * + * @param delegate the channel to wrap + * @param rateLimiter optional rate limiter for controlling publish concurrency (can be null) + * @throws IOException if enabling publisher confirmations fails + */ + public ConfirmationChannelN(Channel delegate, RateLimiter rateLimiter) throws IOException + { + if (delegate == null) + { + throw new IllegalArgumentException("delegate must be non-null"); + } + + this.delegate = delegate; + this.rateLimiter = rateLimiter; + + int initialCapacity = (rateLimiter != null) ? rateLimiter.getMaxConcurrency() : 16; + this.confirmations = new ConcurrentHashMap<>(initialCapacity > 0 ? initialCapacity : 16); + + // Enable publisher confirmations on the delegate channel + delegate.confirmSelect(); + + // Register listeners for confirmations and returns + // Store listener instances so we can remove them later + this.shutdownListener = this::handleShutdown; + this.returnListener = this::handleReturn; + this.confirmListener = delegate.addConfirmListener(this::handleAck, this::handleNack); + + delegate.addReturnListener(returnListener); + delegate.addShutdownListener(shutdownListener); + } + + @Override + public CompletableFuture basicPublishAsync(String exchange, String routingKey, + BasicProperties props, byte[] body, T context) + { + return basicPublishAsync(exchange, routingKey, false, props, body, context); + } + + @Override + public CompletableFuture basicPublishAsync(String exchange, String routingKey, + boolean mandatory, + BasicProperties props, byte[] body, T context) + { + CompletableFuture future = new CompletableFuture<>(); + RateLimiter.Permit permit = null; + long seqNo = 0; + + try + { + // Acquire rate limiter permit if configured + if (rateLimiter != null) + { + permit = rateLimiter.acquire(); + } + + // Get next sequence number + seqNo = nextSeqNo.getAndIncrement(); + + // Store confirmation entry + confirmations.put(seqNo, new ConfirmationEntry<>(future, permit, context)); + + // Add sequence number to message headers + if (props == null) + { + props = MessageProperties.MINIMAL_BASIC; + } + props = addSequenceNumberHeader(props, seqNo); + + // Publish to delegate channel + // Note: Metrics are collected by the delegate's MetricsCollector + delegate.basicPublish(exchange, routingKey, mandatory, props, body); + } + catch (IOException | AlreadyClosedException | InterruptedException e) + { + // Clean up on error + confirmations.remove(seqNo); + if (permit != null) + { + permit.release(); + } + future.completeExceptionally(e); + } + + return future; + } + + // Unsupported basicPublish methods - throw UnsupportedOperationException + + @Override + public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) + throws IOException + { + throw new UnsupportedOperationException( + "basicPublish() is not supported on ConfirmationChannel. Use basicPublishAsync() instead."); + } + + @Override + public void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) + throws IOException + { + throw new UnsupportedOperationException( + "basicPublish() is not supported on ConfirmationChannel. Use basicPublishAsync() instead."); + } + + @Override + public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, + BasicProperties props, byte[] body) + throws IOException + { + throw new UnsupportedOperationException( + "basicPublish() is not supported on ConfirmationChannel. Use basicPublishAsync() instead."); + } + + // Delegated Channel methods + + @Override + public int getChannelNumber() + { + return delegate.getChannelNumber(); + } + + @Override + public Connection getConnection() + { + return delegate.getConnection(); + } + + @Override + public void close() throws IOException, TimeoutException + { + delegate.close(); + removeListeners(); + } + + @Override + public void close(int closeCode, String closeMessage) throws IOException, TimeoutException + { + delegate.close(closeCode, closeMessage); + removeListeners(); + } + + @Override + public void abort() + { + delegate.abort(); + removeListeners(); + } + + @Override + public void abort(int closeCode, String closeMessage) + { + delegate.abort(closeCode, closeMessage); + removeListeners(); + } + + @Override + public void addReturnListener(ReturnListener listener) + { + delegate.addReturnListener(listener); + } + + @Override + public ReturnListener addReturnListener(ReturnCallback returnCallback) + { + return delegate.addReturnListener(returnCallback); + } + + @Override + public boolean removeReturnListener(ReturnListener listener) + { + return delegate.removeReturnListener(listener); + } + + @Override + public void clearReturnListeners() + { + delegate.clearReturnListeners(); + } + + @Override + public void addConfirmListener(ConfirmListener listener) + { + delegate.addConfirmListener(listener); + } + + @Override + public ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback) + { + return delegate.addConfirmListener(ackCallback, nackCallback); + } + + @Override + public boolean removeConfirmListener(ConfirmListener listener) + { + return delegate.removeConfirmListener(listener); + } + + @Override + public void clearConfirmListeners() + { + delegate.clearConfirmListeners(); + } + + @Override + public Consumer getDefaultConsumer() + { + return delegate.getDefaultConsumer(); + } + + @Override + public void setDefaultConsumer(Consumer consumer) + { + delegate.setDefaultConsumer(consumer); + } + + @Override + public void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException + { + delegate.basicQos(prefetchSize, prefetchCount, global); + } + + @Override + public void basicQos(int prefetchCount, boolean global) throws IOException + { + delegate.basicQos(prefetchCount, global); + } + + @Override + public void basicQos(int prefetchCount) throws IOException + { + delegate.basicQos(prefetchCount); + } + + @Override + public String basicConsume(String queue, Consumer callback) throws IOException + { + return delegate.basicConsume(queue, callback); + } + + @Override + public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException + { + return delegate.basicConsume(queue, deliverCallback, cancelCallback); + } + + @Override + public String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException + { + return delegate.basicConsume(queue, deliverCallback, shutdownSignalCallback); + } + + @Override + public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException + { + return delegate.basicConsume(queue, deliverCallback, cancelCallback, shutdownSignalCallback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException + { + return delegate.basicConsume(queue, autoAck, callback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException + { + return delegate.basicConsume(queue, autoAck, deliverCallback, cancelCallback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException + { + return delegate.basicConsume(queue, autoAck, deliverCallback, shutdownSignalCallback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException + { + return delegate.basicConsume(queue, autoAck, deliverCallback, cancelCallback, shutdownSignalCallback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, Map arguments, Consumer callback) throws IOException + { + return delegate.basicConsume(queue, autoAck, arguments, callback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException + { + return delegate.basicConsume(queue, autoAck, arguments, deliverCallback, cancelCallback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException + { + return delegate.basicConsume(queue, autoAck, arguments, deliverCallback, shutdownSignalCallback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException + { + return delegate.basicConsume(queue, autoAck, arguments, deliverCallback, cancelCallback, shutdownSignalCallback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException + { + return delegate.basicConsume(queue, autoAck, consumerTag, callback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException + { + return delegate.basicConsume(queue, autoAck, consumerTag, deliverCallback, cancelCallback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException + { + return delegate.basicConsume(queue, autoAck, consumerTag, deliverCallback, shutdownSignalCallback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException + { + return delegate.basicConsume(queue, autoAck, consumerTag, deliverCallback, cancelCallback, shutdownSignalCallback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, Consumer callback) throws IOException + { + return delegate.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, callback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException + { + return delegate.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, deliverCallback, cancelCallback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException + { + return delegate.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, deliverCallback, shutdownSignalCallback); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException + { + return delegate.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, deliverCallback, cancelCallback, shutdownSignalCallback); + } + + @Override + public void basicCancel(String consumerTag) throws IOException + { + delegate.basicCancel(consumerTag); + } + + @Override + public AMQP.Basic.RecoverOk basicRecover() throws IOException + { + return delegate.basicRecover(); + } + + @Override + public AMQP.Basic.RecoverOk basicRecover(boolean requeue) throws IOException + { + return delegate.basicRecover(requeue); + } + + @Override + public AMQP.Tx.SelectOk txSelect() throws IOException + { + return delegate.txSelect(); + } + + @Override + public AMQP.Tx.CommitOk txCommit() throws IOException + { + return delegate.txCommit(); + } + + @Override + public AMQP.Tx.RollbackOk txRollback() throws IOException + { + return delegate.txRollback(); + } + + @Override + public AMQP.Confirm.SelectOk confirmSelect() throws IOException + { + return delegate.confirmSelect(); + } + + @Override + public long getNextPublishSeqNo() + { + return delegate.getNextPublishSeqNo(); + } + + @Override + public boolean waitForConfirms() throws InterruptedException + { + throw new UnsupportedOperationException( + "waitForConfirms() is not supported on ConfirmationChannel. Use basicPublishAsync() instead."); + } + + @Override + public boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException + { + throw new UnsupportedOperationException( + "waitForConfirms() is not supported on ConfirmationChannel. Use basicPublishAsync() instead."); + } + + @Override + public void waitForConfirmsOrDie() throws IOException, InterruptedException + { + throw new UnsupportedOperationException( + "waitForConfirmsOrDie() is not supported on ConfirmationChannel. Use basicPublishAsync() instead."); + } + + @Override + public void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException + { + throw new UnsupportedOperationException( + "waitForConfirmsOrDie() is not supported on ConfirmationChannel. Use basicPublishAsync() instead."); + } + + @Override + public void asyncRpc(com.rabbitmq.client.Method method) throws IOException + { + delegate.asyncRpc(method); + } + + @Override + public Command rpc(com.rabbitmq.client.Method method) throws IOException + { + return delegate.rpc(method); + } + + @Override + public long messageCount(String queue) throws IOException + { + return delegate.messageCount(queue); + } + + @Override + public long consumerCount(String queue) throws IOException + { + return delegate.consumerCount(queue); + } + + @Override + public CompletableFuture asyncCompletableRpc(com.rabbitmq.client.Method method) throws IOException + { + return delegate.asyncCompletableRpc(method); + } + + @Override + public void addShutdownListener(ShutdownListener listener) + { + delegate.addShutdownListener(listener); + } + + @Override + public void removeShutdownListener(ShutdownListener listener) + { + delegate.removeShutdownListener(listener); + } + + @Override + public ShutdownSignalException getCloseReason() + { + return delegate.getCloseReason(); + } + + @Override + public void notifyListeners() + { + delegate.notifyListeners(); + } + + @Override + public boolean isOpen() + { + return delegate.isOpen(); + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException + { + return delegate.exchangeDeclare(exchange, type); + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException + { + return delegate.exchangeDeclare(exchange, type); + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException + { + return delegate.exchangeDeclare(exchange, type, durable); + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException + { + return delegate.exchangeDeclare(exchange, type, durable); + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map arguments) throws IOException + { + return delegate.exchangeDeclare(exchange, type, durable, autoDelete, arguments); + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map arguments) throws IOException + { + return delegate.exchangeDeclare(exchange, type, durable, autoDelete, arguments); + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map arguments) throws IOException + { + return delegate.exchangeDeclare(exchange, type, durable, autoDelete, internal, arguments); + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments) throws IOException + { + return delegate.exchangeDeclare(exchange, type, durable, autoDelete, internal, arguments); + } + + @Override + public void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map arguments) throws IOException + { + delegate.exchangeDeclareNoWait(exchange, type, durable, autoDelete, internal, arguments); + } + + @Override + public void exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments) throws IOException + { + delegate.exchangeDeclareNoWait(exchange, type, durable, autoDelete, internal, arguments); + } + + @Override + public AMQP.Exchange.DeclareOk exchangeDeclarePassive(String exchange) throws IOException + { + return delegate.exchangeDeclarePassive(exchange); + } + + @Override + public AMQP.Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException + { + return delegate.exchangeDelete(exchange, ifUnused); + } + + @Override + public void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException + { + delegate.exchangeDeleteNoWait(exchange, ifUnused); + } + + @Override + public AMQP.Exchange.DeleteOk exchangeDelete(String exchange) throws IOException + { + return delegate.exchangeDelete(exchange); + } + + @Override + public AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException + { + return delegate.exchangeBind(destination, source, routingKey); + } + + @Override + public AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map arguments) throws IOException + { + return delegate.exchangeBind(destination, source, routingKey, arguments); + } + + @Override + public void exchangeBindNoWait(String destination, String source, String routingKey, Map arguments) throws IOException + { + delegate.exchangeBindNoWait(destination, source, routingKey, arguments); + } + + @Override + public AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException + { + return delegate.exchangeUnbind(destination, source, routingKey); + } + + @Override + public AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map arguments) throws IOException + { + return delegate.exchangeUnbind(destination, source, routingKey, arguments); + } + + @Override + public void exchangeUnbindNoWait(String destination, String source, String routingKey, Map arguments) throws IOException + { + delegate.exchangeUnbindNoWait(destination, source, routingKey, arguments); + } + + @Override + public AMQP.Queue.DeclareOk queueDeclare() throws IOException + { + return delegate.queueDeclare(); + } + + @Override + public AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) throws IOException + { + return delegate.queueDeclare(queue, durable, exclusive, autoDelete, arguments); + } + + @Override + public void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) throws IOException + { + delegate.queueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments); + } + + @Override + public AMQP.Queue.DeclareOk queueDeclarePassive(String queue) throws IOException + { + return delegate.queueDeclarePassive(queue); + } + + @Override + public AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException + { + return delegate.queueDelete(queue); + } + + @Override + public AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException + { + return delegate.queueDelete(queue, ifUnused, ifEmpty); + } + + @Override + public void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException + { + delegate.queueDeleteNoWait(queue, ifUnused, ifEmpty); + } + + @Override + public AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException + { + return delegate.queueBind(queue, exchange, routingKey); + } + + @Override + public AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map arguments) throws IOException + { + return delegate.queueBind(queue, exchange, routingKey, arguments); + } + + @Override + public void queueBindNoWait(String queue, String exchange, String routingKey, Map arguments) throws IOException + { + delegate.queueBindNoWait(queue, exchange, routingKey, arguments); + } + + @Override + public AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException + { + return delegate.queueUnbind(queue, exchange, routingKey); + } + + @Override + public AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map arguments) throws IOException + { + return delegate.queueUnbind(queue, exchange, routingKey, arguments); + } + + @Override + public AMQP.Queue.PurgeOk queuePurge(String queue) throws IOException + { + return delegate.queuePurge(queue); + } + + @Override + public GetResponse basicGet(String queue, boolean autoAck) throws IOException + { + return delegate.basicGet(queue, autoAck); + } + + @Override + public void basicAck(long deliveryTag, boolean multiple) throws IOException + { + delegate.basicAck(deliveryTag, multiple); + } + + @Override + public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException + { + delegate.basicNack(deliveryTag, multiple, requeue); + } + + @Override + public void basicReject(long deliveryTag, boolean requeue) throws IOException + { + delegate.basicReject(deliveryTag, requeue); + } + + private void removeListeners() + { + delegate.removeShutdownListener(shutdownListener); + delegate.removeReturnListener(returnListener); + delegate.removeConfirmListener(confirmListener); + } + + private BasicProperties addSequenceNumberHeader(BasicProperties props, long seqNo) + { + Map headers = props.getHeaders(); + if (headers == null) + { + headers = new HashMap<>(); + } + else + { + headers = new HashMap<>(headers); + } + headers.put(SEQUENCE_NUMBER_HEADER, seqNo); + return props.builder().headers(headers).build(); + } + + private void handleAck(long seqNo, boolean multiple) + { + handleAckNack(seqNo, multiple, false); + } + + private void handleNack(long seqNo, boolean multiple) + { + handleAckNack(seqNo, multiple, true); + } + + private void handleAckNack(long seqNo, boolean multiple, boolean nack) + { + if (multiple) + { + for (Long seq : new ArrayList<>(confirmations.keySet())) + { + if (seq <= seqNo) + { + ConfirmationEntry entry = confirmations.remove(seq); + if (entry != null) + { + if (nack) + { + entry.completeExceptionally(seq); + } + else + { + entry.complete(); + } + entry.releasePermit(); + } + } + } + } + else + { + ConfirmationEntry entry = confirmations.remove(seqNo); + if (entry != null) + { + if (nack) + { + entry.completeExceptionally(seqNo); + } + else + { + entry.complete(); + } + entry.releasePermit(); + } + } + } + + private void handleReturn(int replyCode, String replyText, String exchange, + String routingKey, BasicProperties props, byte[] body) + { + Object seqNumObj = props.getHeaders().get(SEQUENCE_NUMBER_HEADER); + long seqNo = extractSequenceNumber(seqNumObj); + + ConfirmationEntry entry = confirmations.remove(seqNo); + if (entry != null) + { + entry.completeExceptionally(new PublishException(seqNo, true, + exchange, routingKey, replyCode, replyText, entry.context)); + entry.releasePermit(); + } + } + + private void handleShutdown(ShutdownSignalException cause) + { + AlreadyClosedException ex = new AlreadyClosedException(cause); + for (ConfirmationEntry entry : confirmations.values()) + { + entry.completeExceptionally(ex); + entry.releasePermit(); + } + confirmations.clear(); + } + + /** + * Extract sequence number from message header. + * NOTE: Since this library always writes the sequence number as a Long, + * the header value should always be a Long when read back from the broker. + * The additional type checks (Integer, String, byte[]) are defensive programming + * and should never be needed in practice. + */ + private long extractSequenceNumber(Object seqNumObj) + { + if (seqNumObj instanceof Long) + { + return (Long) seqNumObj; + } + else if (seqNumObj instanceof Integer) + { + return ((Integer) seqNumObj).longValue(); + } + else if (seqNumObj instanceof String) + { + return Long.parseLong((String) seqNumObj); + } + else if (seqNumObj instanceof byte[]) + { + return Long.parseLong(new String((byte[]) seqNumObj)); + } + return 0; + } + + private static class ConfirmationEntry + { + final CompletableFuture future; + final RateLimiter.Permit permit; + final T context; + + ConfirmationEntry(CompletableFuture future, RateLimiter.Permit permit, T context) + { + if (future == null) + { + throw new IllegalArgumentException("future must be non-null"); + } + this.future = future; + this.permit = permit; + this.context = context; + } + + void complete() + { + future.complete(context); + } + + void completeExceptionally(long seq) + { + PublishException ex = new PublishException(seq, this.context); + future.completeExceptionally(ex); + } + + void completeExceptionally(Exception e) + { + future.completeExceptionally(e); + } + + void releasePermit() + { + if (permit != null) + { + permit.release(); + } + } + } + +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java index 66242c3979..265deb0ee4 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java @@ -70,6 +70,7 @@ public RecoveryAwareChannelN(AMQConnection connection, int channelNumber, Consum * @param channelNumber The channel number to be associated with this channel * @param workService service for managing this channel's consumer callbacks * @param metricsCollector service for managing metrics + * @param observationCollector service for managing observations */ public RecoveryAwareChannelN(AMQConnection connection, int channelNumber, ConsumerWorkService workService, MetricsCollector metricsCollector, ObservationCollector observationCollector) { diff --git a/src/test/java/com/rabbitmq/client/test/ThrottlingRateLimiterTest.java b/src/test/java/com/rabbitmq/client/test/ThrottlingRateLimiterTest.java new file mode 100644 index 0000000000..2433738a30 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/ThrottlingRateLimiterTest.java @@ -0,0 +1,236 @@ +// Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test; + +import com.rabbitmq.client.ThrottlingRateLimiter; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +public class ThrottlingRateLimiterTest +{ + + @Test public void testPermitAcquireAndRelease() throws InterruptedException + { + ThrottlingRateLimiter limiter = new ThrottlingRateLimiter(10, 50); + + assertEquals(10, limiter.getAvailablePermits()); + + ThrottlingRateLimiter.Permit permit = limiter.acquire(); + assertEquals(9, limiter.getAvailablePermits()); + + permit.release(); + assertEquals(10, limiter.getAvailablePermits()); + } + + @Test public void testMultipleAcquireAndRelease() throws InterruptedException + { + ThrottlingRateLimiter limiter = new ThrottlingRateLimiter(10, 50); + + List permits = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + permits.add(limiter.acquire()); + } + + assertEquals(5, limiter.getAvailablePermits()); + + for (ThrottlingRateLimiter.Permit p : permits) { + p.release(); + } + + assertEquals(10, limiter.getAvailablePermits()); + } + + @Test public void testThrottlingOccursAboveThreshold() throws InterruptedException + { + ThrottlingRateLimiter limiter = new ThrottlingRateLimiter(10, 50); + + // Hold 6 permits (60% used, above 50% threshold) + List permits = new ArrayList<>(); + for (int i = 0; i < 6; i++) { + permits.add(limiter.acquire()); + } + + assertEquals(4, limiter.getAvailablePermits()); + + // Next acquire should throttle - verify with lenient timing + long start = System.currentTimeMillis(); + ThrottlingRateLimiter.Permit permit = limiter.acquire(); + long elapsed = System.currentTimeMillis() - start; + + assertTrue(elapsed > 10, "Should apply throttling delay, got " + elapsed + "ms"); + assertEquals(3, limiter.getAvailablePermits()); + + permit.release(); + for (ThrottlingRateLimiter.Permit p : permits) { + p.release(); + } + } + + @Test public void testHighThrottlingNearCapacity() throws InterruptedException + { + ThrottlingRateLimiter limiter = new ThrottlingRateLimiter(10, 50); + + // Hold 9 permits (90% used) + List permits = new ArrayList<>(); + for (int i = 0; i < 9; i++) { + permits.add(limiter.acquire()); + } + + assertEquals(1, limiter.getAvailablePermits()); + + // Should have significant delay + long start = System.currentTimeMillis(); + ThrottlingRateLimiter.Permit permit = limiter.acquire(); + long elapsed = System.currentTimeMillis() - start; + + assertTrue(elapsed > 500, "Should have significant delay near capacity, got " + elapsed + "ms"); + assertEquals(0, limiter.getAvailablePermits()); + + permit.release(); + for (ThrottlingRateLimiter.Permit p : permits) { + p.release(); + } + } + + @Test public void testThrottlingThresholds() throws InterruptedException + { + // 80% threshold - more permissive + ThrottlingRateLimiter limiter80 = new ThrottlingRateLimiter(10, 80); + + ThrottlingRateLimiter.Permit permit1 = limiter80.acquire(); + assertEquals(9, limiter80.getAvailablePermits()); + + ThrottlingRateLimiter.Permit permit2 = limiter80.acquire(); + assertEquals(8, limiter80.getAvailablePermits()); + + permit1.release(); + permit2.release(); + + // 20% threshold - more restrictive + ThrottlingRateLimiter limiter20 = new ThrottlingRateLimiter(10, 20); + + // Hold 9 permits (90% used, well above 20% threshold) + List permits = new ArrayList<>(); + for (int i = 0; i < 9; i++) { + permits.add(limiter20.acquire()); + } + + assertEquals(1, limiter20.getAvailablePermits()); + + // Should throttle + long start = System.currentTimeMillis(); + ThrottlingRateLimiter.Permit permit = limiter20.acquire(); + long elapsed = System.currentTimeMillis() - start; + + assertTrue(elapsed > 10, "Should throttle above 20% threshold, got " + elapsed + "ms"); + + permit.release(); + for (ThrottlingRateLimiter.Permit p : permits) { + p.release(); + } + } + + @Test public void testConcurrentAcquireAndRelease() throws InterruptedException, ExecutionException + { + ThrottlingRateLimiter limiter = new ThrottlingRateLimiter(50, 50); + ExecutorService executor = Executors.newFixedThreadPool(10); + AtomicInteger successCount = new AtomicInteger(0); + + List> futures = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + futures.add(executor.submit(() -> { + try { + ThrottlingRateLimiter.Permit permit = limiter.acquire(); + successCount.incrementAndGet(); + Thread.sleep(10); + permit.release(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + })); + } + + for (Future future : futures) { + future.get(); + } + + executor.shutdown(); + assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + assertEquals(100, successCount.get()); + assertEquals(50, limiter.getAvailablePermits()); + } + + @Test public void testZeroThresholdBehavior() throws InterruptedException + { + ThrottlingRateLimiter limiter = new ThrottlingRateLimiter(10, 0); + + // 0% threshold means threshold = 0 + // Available = 10, threshold = 0, so 10 >= 0 = no throttle + ThrottlingRateLimiter.Permit permit = limiter.acquire(); + assertEquals(9, limiter.getAvailablePermits()); + permit.release(); + } + + @Test public void testHundredPercentThreshold() throws InterruptedException + { + ThrottlingRateLimiter limiter = new ThrottlingRateLimiter(10, 100); + + // 100% threshold means threshold = 10 + // Hold 9 permits: available = 1, threshold = 10, so 1 < 10 = throttle + List permits = new ArrayList<>(); + for (int i = 0; i < 9; i++) { + permits.add(limiter.acquire()); + } + + assertEquals(1, limiter.getAvailablePermits()); + + long start = System.currentTimeMillis(); + ThrottlingRateLimiter.Permit permit = limiter.acquire(); + long elapsed = System.currentTimeMillis() - start; + + assertTrue(elapsed > 500, "Should throttle with 100% threshold near capacity, got " + elapsed + "ms"); + + permit.release(); + for (ThrottlingRateLimiter.Permit p : permits) { + p.release(); + } + } + + @Test public void testGetStatistics() throws InterruptedException + { + ThrottlingRateLimiter limiter = new ThrottlingRateLimiter(10, 50); + + assertEquals(10, limiter.getAvailablePermits()); + + ThrottlingRateLimiter.Permit p1 = limiter.acquire(); + assertEquals(9, limiter.getAvailablePermits()); + + ThrottlingRateLimiter.Permit p2 = limiter.acquire(); + assertEquals(8, limiter.getAvailablePermits()); + + p1.release(); + assertEquals(9, limiter.getAvailablePermits()); + + p2.release(); + assertEquals(10, limiter.getAvailablePermits()); + } +} diff --git a/src/test/java/com/rabbitmq/client/test/functional/Confirm.java b/src/test/java/com/rabbitmq/client/test/functional/Confirm.java index 0152ee0a45..97df4ce8a4 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/Confirm.java +++ b/src/test/java/com/rabbitmq/client/test/functional/Confirm.java @@ -23,11 +23,14 @@ import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AlreadyClosedException; import com.rabbitmq.client.Channel; +import com.rabbitmq.client.ConfirmationChannel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.GetResponse; import com.rabbitmq.client.MessageProperties; +import com.rabbitmq.client.PublishException; import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.test.BrokerTestCase; @@ -316,4 +319,269 @@ protected void publish(String exchangeName, String queueName, : MessageProperties.BASIC, "nop".getBytes()); } + + /** + * Tests basic publisher confirmation tracking with context parameter. + * Verifies that futures complete successfully with their context values when messages are confirmed. + */ + @Test public void testBasicPublishAsync() throws Exception { + Channel ch = connection.createChannel(); + ConfirmationChannel confirmCh = ConfirmationChannel.create(ch, null); + String queue = confirmCh.queueDeclare().getQueue(); + + int messageCount = 100; + java.util.List> futures = new java.util.ArrayList<>(); + + for (int i = 0; i < messageCount; i++) { + futures.add(confirmCh.basicPublishAsync("", queue, null, ("msg" + i).getBytes(), i)); + } + + // Verify all futures complete with their context values + for (int i = 0; i < messageCount; i++) { + assertEquals(Integer.valueOf(i), futures.get(i).join()); + } + + assertEquals(messageCount, confirmCh.messageCount(queue)); + confirmCh.close(); + } + + /** + * Tests that unroutable messages with mandatory flag cause futures to complete exceptionally. + * Verifies PublishException contains correct return information (isReturn=true, replyCode=NO_ROUTE). + */ + @Test public void testBasicPublishAsyncWithReturn() throws Exception { + Channel ch = connection.createChannel(); + ConfirmationChannel confirmCh = ConfirmationChannel.create(ch, null); + + java.util.concurrent.CompletableFuture future = confirmCh.basicPublishAsync( + "", "nonexistent-queue", true, null, "test".getBytes(), null + ); + + try { + future.join(); + fail("Expected PublishException"); + } catch (java.util.concurrent.CompletionException e) { + assertTrue(e.getCause() instanceof PublishException); + PublishException pe = (PublishException) e.getCause(); + assertTrue(pe.isReturn()); + assertEquals(AMQP.NO_ROUTE, pe.getReplyCode().intValue()); + } + + confirmCh.close(); + } + + /** + * Tests rate limiting with ThrottlingRateLimiter. + * Verifies that messages are throttled to max 10 concurrent in-flight messages + * and all futures complete with correct context values. + */ + @Test public void testMaxOutstandingConfirms() throws Exception { + com.rabbitmq.client.ThrottlingRateLimiter limiter = + new com.rabbitmq.client.ThrottlingRateLimiter(10, 50); + Channel ch = connection.createChannel(); + ConfirmationChannel confirmCh = ConfirmationChannel.create(ch, limiter); + String queue = confirmCh.queueDeclare().getQueue(); + + java.util.concurrent.atomic.AtomicInteger completed = new java.util.concurrent.atomic.AtomicInteger(0); + java.util.List> futures = new java.util.ArrayList<>(); + + for (int i = 0; i < 50; i++) { + final int msgNum = i; + java.util.concurrent.CompletableFuture future = confirmCh.basicPublishAsync( + "", queue, null, ("msg" + i).getBytes(), i + ); + future.thenAccept(ctx -> { + assertEquals(Integer.valueOf(msgNum), ctx); + completed.incrementAndGet(); + }); + futures.add(future); + } + + java.util.concurrent.CompletableFuture.allOf(futures.toArray(new java.util.concurrent.CompletableFuture[0])).join(); + assertEquals(50, completed.get()); + confirmCh.close(); + } + + /** + * Tests that closing a channel with pending confirmations causes all futures to complete exceptionally. + * Verifies that AlreadyClosedException is thrown for in-flight messages when channel closes. + */ + @Test public void testBasicPublishAsyncChannelClose() throws Exception { + Channel ch = connection.createChannel(); + ConfirmationChannel confirmCh = ConfirmationChannel.create(ch, null); + String queue = confirmCh.queueDeclare().getQueue(); + + java.util.List> futures = new java.util.ArrayList<>(); + for (int i = 0; i < 10; i++) { + futures.add(confirmCh.basicPublishAsync("", queue, null, ("msg" + i).getBytes(), null)); + } + + confirmCh.close(); + + for (java.util.concurrent.CompletableFuture future : futures) { + try { + future.join(); + } catch (java.util.concurrent.CompletionException e) { + assertTrue(e.getCause() instanceof AlreadyClosedException); + } + } + } + + /** + * Tests that rate limiting introduces delay and all messages complete with correct context. + * Verifies ThrottlingRateLimiter limits concurrent in-flight messages and elapsed time is non-zero. + */ + @Test public void testBasicPublishAsyncWithThrottling() throws Exception { + com.rabbitmq.client.ThrottlingRateLimiter limiter = + new com.rabbitmq.client.ThrottlingRateLimiter(10, 50); + Channel ch = connection.createChannel(); + ConfirmationChannel confirmCh = ConfirmationChannel.create(ch, limiter); + String queue = confirmCh.queueDeclare().getQueue(); + + int messageCount = 50; + java.util.List> futures = new java.util.ArrayList<>(); + + long start = System.currentTimeMillis(); + for (int i = 0; i < messageCount; i++) { + String msgId = "msg-" + i; + futures.add(confirmCh.basicPublishAsync("", queue, null, ("message" + i).getBytes(), msgId)); + } + + // Verify all complete with correct context + for (int i = 0; i < messageCount; i++) { + assertEquals("msg-" + i, futures.get(i).join()); + } + + long elapsed = System.currentTimeMillis() - start; + + assertEquals(messageCount, confirmCh.messageCount(queue)); + assertTrue(elapsed > 0, "Throttling should introduce some delay"); + confirmCh.close(); + } + + /** + * Tests performance comparison between throttled and unlimited channels. + * Verifies both complete successfully and throttling introduces measurable delay. + */ + @Test public void testBasicPublishAsyncThrottlingVsUnlimited() throws Exception { + // Test with throttling (10 permits, 50% threshold) + com.rabbitmq.client.ThrottlingRateLimiter limiter = + new com.rabbitmq.client.ThrottlingRateLimiter(10, 50); + Channel throttlingCh = connection.createChannel(); + ConfirmationChannel throttlingConfirmCh = ConfirmationChannel.create(throttlingCh, limiter); + String queue1 = throttlingConfirmCh.queueDeclare().getQueue(); + + int messageCount = 4096; + java.util.List> futures = new java.util.ArrayList<>(); + + long start = System.currentTimeMillis(); + for (int i = 0; i < messageCount; i++) { + futures.add(throttlingConfirmCh.basicPublishAsync("", queue1, null, ("msg" + i).getBytes(), null)); + } + java.util.concurrent.CompletableFuture.allOf(futures.toArray(new java.util.concurrent.CompletableFuture[0])).join(); + long throttlingElapsed = System.currentTimeMillis() - start; + + assertEquals(messageCount, throttlingConfirmCh.messageCount(queue1)); + throttlingConfirmCh.close(); + + // Test with unlimited (no rate limiter) + Channel unlimitedCh = connection.createChannel(); + ConfirmationChannel unlimitedConfirmCh = ConfirmationChannel.create(unlimitedCh, null); + String queue2 = unlimitedConfirmCh.queueDeclare().getQueue(); + + futures.clear(); + start = System.currentTimeMillis(); + for (int i = 0; i < messageCount; i++) { + futures.add(unlimitedConfirmCh.basicPublishAsync("", queue2, null, ("msg" + i).getBytes(), null)); + } + java.util.concurrent.CompletableFuture.allOf(futures.toArray(new java.util.concurrent.CompletableFuture[0])).join(); + long unlimitedElapsed = System.currentTimeMillis() - start; + + assertEquals(messageCount, unlimitedConfirmCh.messageCount(queue2)); + unlimitedConfirmCh.close(); + + // Both should complete successfully + assertTrue(throttlingElapsed > 0); + assertTrue(unlimitedElapsed > 0); + } + + /** + * Tests that ConfirmationChannel works correctly without a rate limiter. + * Verifies all messages are confirmed when rateLimiter is null (unlimited concurrency). + */ + @Test public void testBasicPublishAsyncWithNullRateLimiter() throws Exception { + Channel ch = connection.createChannel(); + ConfirmationChannel confirmCh = ConfirmationChannel.create(ch, null); + String queue = confirmCh.queueDeclare().getQueue(); + + int messageCount = 100; + java.util.List> futures = new java.util.ArrayList<>(); + + for (int i = 0; i < messageCount; i++) { + futures.add(confirmCh.basicPublishAsync("", queue, null, ("msg" + i).getBytes(), null)); + } + + java.util.concurrent.CompletableFuture.allOf(futures.toArray(new java.util.concurrent.CompletableFuture[0])).join(); + + assertEquals(messageCount, confirmCh.messageCount(queue)); + confirmCh.close(); + } + + /** + * Tests context parameter correlation with String correlation IDs. + * Verifies that each future completes with its exact correlation ID for message tracking. + */ + @Test public void testBasicPublishAsyncWithContext() throws Exception { + Channel ch = connection.createChannel(); + ConfirmationChannel confirmCh = ConfirmationChannel.create(ch, null); + String queue = confirmCh.queueDeclare().getQueue(); + + int messageCount = 10; + java.util.Map> futuresByCorrelationId = new java.util.HashMap<>(); + + for (int i = 0; i < messageCount; i++) { + String correlationId = "msg-" + i; + java.util.concurrent.CompletableFuture future = confirmCh.basicPublishAsync( + "", queue, null, ("message" + i).getBytes(), correlationId + ); + futuresByCorrelationId.put(correlationId, future); + } + + // Verify all futures complete with their correlation IDs + for (java.util.Map.Entry> entry : futuresByCorrelationId.entrySet()) { + String expectedId = entry.getKey(); + String actualId = entry.getValue().join(); + assertEquals(expectedId, actualId); + } + + assertEquals(messageCount, confirmCh.messageCount(queue)); + confirmCh.close(); + } + + /** + * Tests that context parameter is available in PublishException for failed publishes. + * Verifies context is preserved when message is returned as unroutable. + */ + @Test public void testBasicPublishAsyncWithContextInException() throws Exception { + Channel ch = connection.createChannel(); + ConfirmationChannel confirmCh = ConfirmationChannel.create(ch, null); + + String messageId = "unroutable-msg-456"; + java.util.concurrent.CompletableFuture future = confirmCh.basicPublishAsync( + "", "nonexistent-queue", true, null, "test".getBytes(), messageId + ); + + try { + future.join(); + fail("Expected PublishException"); + } catch (java.util.concurrent.CompletionException e) { + assertTrue(e.getCause() instanceof PublishException); + PublishException pe = (PublishException) e.getCause(); + assertTrue(pe.isReturn()); + assertEquals(AMQP.NO_ROUTE, pe.getReplyCode().intValue()); + assertEquals(messageId, pe.getContext()); + } + + confirmCh.close(); + } }