From c24bb3b529076df158b1c1c7c22aeccd24ef9b87 Mon Sep 17 00:00:00 2001 From: congbobo184 Date: Thu, 6 Apr 2023 19:38:07 +0800 Subject: [PATCH 1/5] [fix][client] Fix client redeliver epoch bigger than broker consumer epoch --- .../pulsar/broker/service/ServerCnx.java | 35 ++++++---- ...sistentDispatcherSingleActiveConsumer.java | 17 +++-- .../client/impl/MessageRedeliveryTest.java | 65 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 9 ++- 4 files changed, 103 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4fc79a124acd8..760f564460af8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1765,20 +1765,31 @@ protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa remoteAddress, redeliver.getConsumerId(), redeliver.hasConsumerEpoch() ? redeliver.getConsumerEpoch() : null); } - - CompletableFuture consumerFuture = consumers.get(redeliver.getConsumerId()); - - if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { - Consumer consumer = consumerFuture.getNow(null); - if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) { - consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList()); - } else { - if (redeliver.hasConsumerEpoch()) { - consumer.redeliverUnacknowledgedMessages(redeliver.getConsumerEpoch()); + boolean hasConsumerEpoch = redeliver.hasConsumerEpoch(); + List messageIdsList = redeliver.getMessageIdsList(); + int messageIdsCount = redeliver.getMessageIdsCount(); + long consumerId = redeliver.getConsumerId(); + long consumerEpoch = redeliver.getConsumerEpoch(); + CompletableFuture consumerFuture = consumers.get(consumerId); + if (consumerFuture != null) { + consumerFuture.thenAccept((consumer) -> { + if (messageIdsCount > 0 && Subscription.isIndividualAckMode(consumer.subType())) { + consumer.redeliverUnacknowledgedMessages(messageIdsList); } else { - consumer.redeliverUnacknowledgedMessages(DEFAULT_CONSUMER_EPOCH); + if (hasConsumerEpoch) { + consumer.redeliverUnacknowledgedMessages(consumerEpoch); + } else { + consumer.redeliverUnacknowledgedMessages(DEFAULT_CONSUMER_EPOCH); + } } - } + }).exceptionally(e -> { + // if consumerFuture completed exceptionally, don't need to process this redeliver command + // because, consumer will reconnect + log.warn("[{}] ignore this redeliverUnacknowledged request from consumer {}, consumerEpoch {}", + remoteAddress, redeliver.getConsumerId(), + redeliver.hasConsumerEpoch() ? redeliver.getConsumerEpoch() : null, e); + return null; + }); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 03dce58af3a92..1f2c9fb49e93f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -278,15 +278,20 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoc } private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) { + // redeliver epoch is bigger than current consumer epoch, so don't need to handle this redeliver request + if (consumerEpoch < consumer.getConsumerEpoch()) { + log.warn("[{}-{}] Update epoch, old epoch [{}] bigger than new epoch [{}]", + name, consumer, consumer.getConsumerEpoch(), consumerEpoch); + return; + } - if (consumerEpoch > consumer.getConsumerEpoch()) { - if (log.isDebugEnabled()) { - log.debug("[{}-{}] Update epoch, old epoch [{}], new epoch [{}]", - name, consumer, consumer.getConsumerEpoch(), consumerEpoch); - } - consumer.setConsumerEpoch(consumerEpoch); + if (log.isDebugEnabled()) { + log.debug("[{}-{}] Update epoch, old epoch [{}] bigger than new epoch [{}]", + name, consumer, consumer.getConsumerEpoch(), consumerEpoch); } + consumer.setConsumerEpoch(consumerEpoch); + if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) { log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend", name, consumer); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java index 29b06f68b64eb..cc93c5cf12ae7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java @@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -317,6 +318,70 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{ assertEquals(message.getValue(), test3); } + @Test + public void testRedeliveryBrokerIgnoreSmallerEpoch() throws Exception{ + final String topic = "testRedeliveryBrokerAbortSmallerEpoch"; + final String subName = "my-sub"; + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + @Cleanup + ConsumerImpl consumer = ((ConsumerImpl) pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Failover) + .subscribe()); + + consumer.redeliverUnacknowledgedMessages(); + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic( + "persistent://public/default/testRedeliveryBrokerAbortSmallerEpoch", false).get().get(); + Awaitility.await().until(() -> persistentTopic.getSubscription(subName).getDispatcher() + .getConsumers().get(0).getConsumerEpoch() == 1); + consumer.setConsumerEpoch(0); + producer.send("Hello Pulsar!"); + + // ignore this redeliver request + consumer.redeliverUnacknowledgedMessages(); + consumer.receive(); + assertEquals(persistentTopic.getSubscription(subName).getDispatcher() + .getConsumers().get(0).getConsumerEpoch(), 1); + } + + @Test + public void testRedeliveryCommandDontCheckClientConnectionState() throws Exception{ + final String topic = "testRedeliveryCommandDontCheckClientConnectionState"; + final String subName = "my-sub"; + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + @Cleanup + ConsumerImpl consumer = ((ConsumerImpl) pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Failover) + .subscribe()); + + assertEquals(consumer.getState(), HandlerState.State.Ready); + consumer.setState(HandlerState.State.Connecting); + producer.send("Hello Pulsar!"); + consumer.receive(); + consumer.redeliverUnacknowledgedMessages(); + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic( + "persistent://public/default/testRedeliveryCommandDontCheckClientConnectionState", + false).get().get(); + Awaitility.await().until(() -> persistentTopic.getSubscription(subName).getDispatcher() + .getConsumers().get(0).getConsumerEpoch() == 1); + + // redeliver success, consumer also can receive message again + consumer.receive(); + } + @Test(dataProvider = "enableBatch") public void testRedeliveryAddEpochAndPermits(boolean enableBatch) throws Exception { final String topic = "testRedeliveryAddEpochAndPermits"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index beaa34bf20520..758e6912e68c7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1907,8 +1907,10 @@ public void redeliverUnacknowledgedMessages() { incomingQueueLock.unlock(); } - // is channel is connected, we should send redeliver command to broker - if (cnx != null && isConnected(cnx)) { + // If a subscription command has been sent to the broker, it is necessary to allow the redelivery + // request to be sent to the broker without checking the connection state, as failing to do so would + // result in the client consumer epoch being bigger than the broker consumer epoch. + if (cnx != null) { cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages( consumerId, CONSUMER_EPOCH.get(this)), cnx.ctx().voidPromise()); if (currentSize > 0) { @@ -1918,9 +1920,6 @@ public void redeliverUnacknowledgedMessages() { log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic, consumerName, currentSize); } - } else { - log.warn("[{}] Send redeliver messages command but the client is reconnect or close, " - + "so don't need to send redeliver command to broker", this); } } } From 5673942a9563a9d766b262af776b4aebcbe934a0 Mon Sep 17 00:00:00 2001 From: congbobo184 Date: Thu, 6 Apr 2023 23:13:30 +0800 Subject: [PATCH 2/5] fix log --- .../persistent/PersistentDispatcherSingleActiveConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 1f2c9fb49e93f..a12c1061eb8ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -286,7 +286,7 @@ private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consu } if (log.isDebugEnabled()) { - log.debug("[{}-{}] Update epoch, old epoch [{}] bigger than new epoch [{}]", + log.debug("[{}-{}] Update epoch, old epoch [{}] new epoch [{}]", name, consumer, consumer.getConsumerEpoch(), consumerEpoch); } From ef862ae2bb5d69cb2aaec13d63e5a044fea10e66 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 14 Oct 2024 09:51:42 +0300 Subject: [PATCH 3/5] Fix import --- .../org/apache/pulsar/client/impl/MessageRedeliveryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java index 297cbc5b820df..c0953e19b68c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java @@ -50,9 +50,9 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; From 6fb95e40549b8c060c7235ef5b61e52c5bf01c22 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 14 Oct 2024 10:11:38 +0300 Subject: [PATCH 4/5] Improve log message --- .../persistent/PersistentDispatcherSingleActiveConsumer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 2d0af9f271c60..f34f91220ec59 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -279,9 +279,10 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoc } private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) { - // redeliver epoch is bigger than current consumer epoch, so don't need to handle this redeliver request + // broker side epoch is smaller than consumer epoch, so don't need to handle this redeliver request if (consumerEpoch < consumer.getConsumerEpoch()) { - log.warn("[{}-{}] Update epoch, old epoch [{}] bigger than new epoch [{}]", + log.warn("[{}-{}] Ignoring redeliverUnacknowledgedMessages since broker epoch [{}] is smaller than " + + "consumer epoch [{}]", name, consumer, consumer.getConsumerEpoch(), consumerEpoch); return; } From ddc7def5e9519ca87fc4230d758dd5ec064caea6 Mon Sep 17 00:00:00 2001 From: congbobo184 Date: Wed, 20 Nov 2024 17:28:38 +0800 Subject: [PATCH 5/5] fix some test --- .../PersistentDispatcherSingleActiveConsumer.java | 2 +- .../apache/pulsar/client/impl/MessageRedeliveryTest.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index f34f91220ec59..f62c96c4d1feb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -279,7 +279,7 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoc } private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) { - // broker side epoch is smaller than consumer epoch, so don't need to handle this redeliver request + // broker side epoch is bigger than consumer epoch, so don't need to handle this redeliver request if (consumerEpoch < consumer.getConsumerEpoch()) { log.warn("[{}-{}] Ignoring redeliverUnacknowledgedMessages since broker epoch [{}] is smaller than " + "consumer epoch [{}]", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java index c0953e19b68c0..be38f0e7bbb9f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java @@ -345,12 +345,13 @@ public void testRedeliveryBrokerIgnoreSmallerEpoch() throws Exception{ "persistent://public/default/testRedeliveryBrokerAbortSmallerEpoch", false).get().get(); Awaitility.await().until(() -> persistentTopic.getSubscription(subName).getDispatcher() .getConsumers().get(0).getConsumerEpoch() == 1); - consumer.setConsumerEpoch(0); + consumer.setConsumerEpoch(-1); producer.send("Hello Pulsar!"); // ignore this redeliver request consumer.redeliverUnacknowledgedMessages(); consumer.receive(); + assertEquals(consumer.getConsumerEpoch(), 0); assertEquals(persistentTopic.getSubscription(subName).getDispatcher() .getConsumers().get(0).getConsumerEpoch(), 1); } @@ -377,8 +378,7 @@ public void testRedeliveryCommandDontCheckClientConnectionState() throws Excepti producer.send("Hello Pulsar!"); consumer.receive(); consumer.redeliverUnacknowledgedMessages(); - PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic( - "persistent://public/default/testRedeliveryCommandDontCheckClientConnectionState", + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get(); Awaitility.await().until(() -> persistentTopic.getSubscription(subName).getDispatcher() .getConsumers().get(0).getConsumerEpoch() == 1);