diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index f9e593345d85f..47768931fae13 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2034,20 +2034,31 @@ protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa remoteAddress, redeliver.getConsumerId(), redeliver.hasConsumerEpoch() ? redeliver.getConsumerEpoch() : null); } - - CompletableFuture consumerFuture = consumers.get(redeliver.getConsumerId()); - - if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { - Consumer consumer = consumerFuture.getNow(null); - if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) { - consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList()); - } else { - if (redeliver.hasConsumerEpoch()) { - consumer.redeliverUnacknowledgedMessages(redeliver.getConsumerEpoch()); + boolean hasConsumerEpoch = redeliver.hasConsumerEpoch(); + List messageIdsList = redeliver.getMessageIdsList(); + int messageIdsCount = redeliver.getMessageIdsCount(); + long consumerId = redeliver.getConsumerId(); + long consumerEpoch = redeliver.getConsumerEpoch(); + CompletableFuture consumerFuture = consumers.get(consumerId); + if (consumerFuture != null) { + consumerFuture.thenAccept((consumer) -> { + if (messageIdsCount > 0 && Subscription.isIndividualAckMode(consumer.subType())) { + consumer.redeliverUnacknowledgedMessages(messageIdsList); } else { - consumer.redeliverUnacknowledgedMessages(DEFAULT_CONSUMER_EPOCH); + if (hasConsumerEpoch) { + consumer.redeliverUnacknowledgedMessages(consumerEpoch); + } else { + consumer.redeliverUnacknowledgedMessages(DEFAULT_CONSUMER_EPOCH); + } } - } + }).exceptionally(e -> { + // if consumerFuture completed exceptionally, don't need to process this redeliver command + // because, consumer will reconnect + log.warn("[{}] ignore this redeliverUnacknowledged request from consumer {}, consumerEpoch {}", + remoteAddress, redeliver.getConsumerId(), + redeliver.hasConsumerEpoch() ? redeliver.getConsumerEpoch() : null, e); + return null; + }); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index b451a8ad5dc0d..f62c96c4d1feb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -279,15 +279,21 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoc } private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) { + // broker side epoch is bigger than consumer epoch, so don't need to handle this redeliver request + if (consumerEpoch < consumer.getConsumerEpoch()) { + log.warn("[{}-{}] Ignoring redeliverUnacknowledgedMessages since broker epoch [{}] is smaller than " + + "consumer epoch [{}]", + name, consumer, consumer.getConsumerEpoch(), consumerEpoch); + return; + } - if (consumerEpoch > consumer.getConsumerEpoch()) { - if (log.isDebugEnabled()) { - log.debug("[{}-{}] Update epoch, old epoch [{}], new epoch [{}]", - name, consumer, consumer.getConsumerEpoch(), consumerEpoch); - } - consumer.setConsumerEpoch(consumerEpoch); + if (log.isDebugEnabled()) { + log.debug("[{}-{}] Update epoch, old epoch [{}] new epoch [{}]", + name, consumer, consumer.getConsumerEpoch(), consumerEpoch); } + consumer.setConsumerEpoch(consumerEpoch); + if (consumer != getActiveConsumer()) { log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend", name, consumer); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java index 7269df3b6b8b2..be38f0e7bbb9f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java @@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -322,6 +323,70 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{ assertEquals(message.getValue(), test3); } + @Test + public void testRedeliveryBrokerIgnoreSmallerEpoch() throws Exception{ + final String topic = "testRedeliveryBrokerAbortSmallerEpoch"; + final String subName = "my-sub"; + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + @Cleanup + ConsumerImpl consumer = ((ConsumerImpl) pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Failover) + .subscribe()); + + consumer.redeliverUnacknowledgedMessages(); + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic( + "persistent://public/default/testRedeliveryBrokerAbortSmallerEpoch", false).get().get(); + Awaitility.await().until(() -> persistentTopic.getSubscription(subName).getDispatcher() + .getConsumers().get(0).getConsumerEpoch() == 1); + consumer.setConsumerEpoch(-1); + producer.send("Hello Pulsar!"); + + // ignore this redeliver request + consumer.redeliverUnacknowledgedMessages(); + consumer.receive(); + assertEquals(consumer.getConsumerEpoch(), 0); + assertEquals(persistentTopic.getSubscription(subName).getDispatcher() + .getConsumers().get(0).getConsumerEpoch(), 1); + } + + @Test + public void testRedeliveryCommandDontCheckClientConnectionState() throws Exception{ + final String topic = "testRedeliveryCommandDontCheckClientConnectionState"; + final String subName = "my-sub"; + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + @Cleanup + ConsumerImpl consumer = ((ConsumerImpl) pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Failover) + .subscribe()); + + assertEquals(consumer.getState(), HandlerState.State.Ready); + consumer.setState(HandlerState.State.Connecting); + producer.send("Hello Pulsar!"); + consumer.receive(); + consumer.redeliverUnacknowledgedMessages(); + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topic, + false).get().get(); + Awaitility.await().until(() -> persistentTopic.getSubscription(subName).getDispatcher() + .getConsumers().get(0).getConsumerEpoch() == 1); + + // redeliver success, consumer also can receive message again + consumer.receive(); + } + @Test(dataProvider = "enableBatch") public void testRedeliveryAddEpochAndPermits(boolean enableBatch) throws Exception { final String topic = "testRedeliveryAddEpochAndPermits"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 004adab56f529..808b04c777c47 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2104,8 +2104,10 @@ public void redeliverUnacknowledgedMessages() { incomingQueueLock.unlock(); } - // is channel is connected, we should send redeliver command to broker - if (cnx != null && isConnected(cnx)) { + // If a subscription command has been sent to the broker, it is necessary to allow the redelivery + // request to be sent to the broker without checking the connection state, as failing to do so would + // result in the client consumer epoch being bigger than the broker consumer epoch. + if (cnx != null) { cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages( consumerId, CONSUMER_EPOCH.get(this)), cnx.ctx().voidPromise()); if (currentSize > 0) { @@ -2115,9 +2117,6 @@ public void redeliverUnacknowledgedMessages() { log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic, consumerName, currentSize); } - } else { - log.warn("[{}] Send redeliver messages command but the client is reconnect or close, " - + "so don't need to send redeliver command to broker", this); } } }