From 210a483ae6af0cd41d7317e15c96e881637f7cdf Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 2 Apr 2026 09:41:04 +0800 Subject: [PATCH] Subscription: Fixed multiple problems (#17418) --- .../subscription/SubscriptionInfo.java | 51 +++++++++---------- .../agent/SubscriptionBrokerAgent.java | 2 +- .../agent/SubscriptionConsumerAgent.java | 1 - .../agent/SubscriptionTopicAgent.java | 21 +++++--- .../meta/consumer/ConsumerGroupMeta.java | 9 ++-- 5 files changed, 45 insertions(+), 39 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java index 9a1c6acc72a27..a2758906f5bf4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java @@ -220,26 +220,21 @@ public void validatePipePluginUsageByTopic(String pipePluginName) throws Subscri } } - public void validatePipePluginUsageByTopicInternal(String pipePluginName) + private void validatePipePluginUsageByTopicInternal(String pipePluginName) throws SubscriptionException { - acquireReadLock(); - try { - topicMetaKeeper - .getAllTopicMeta() - .forEach( - meta -> { - if (pipePluginName.equals(meta.getConfig().getAttribute().get("processor"))) { - final String exceptionMessage = - String.format( - "PipePlugin '%s' is already used by Topic '%s' as a processor.", - pipePluginName, meta.getTopicName()); - LOGGER.warn(exceptionMessage); - throw new SubscriptionException(exceptionMessage); - } - }); - } finally { - releaseReadLock(); - } + topicMetaKeeper + .getAllTopicMeta() + .forEach( + meta -> { + if (pipePluginName.equals(meta.getConfig().getAttribute().get("processor"))) { + final String exceptionMessage = + String.format( + "PipePlugin '%s' is already used by Topic '%s' as a processor.", + pipePluginName, meta.getTopicName()); + LOGGER.warn(exceptionMessage); + throw new SubscriptionException(exceptionMessage); + } + }); } public void validateBeforeAlteringTopic(TopicMeta topicMeta) throws SubscriptionException { @@ -326,21 +321,25 @@ public TSStatus createTopic(CreateTopicPlan plan) { public TSStatus alterTopic(AlterTopicPlan plan) { acquireWriteLock(); try { - topicMetaKeeper.removeTopicMeta(plan.getTopicMeta().getTopicName()); - topicMetaKeeper.addTopicMeta(plan.getTopicMeta().getTopicName(), plan.getTopicMeta()); - return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + return alterTopicInternal(plan); } finally { releaseWriteLock(); } } - public TSStatus alterMultipleTopics(AlterMultipleTopicsPlan plan) { + private TSStatus alterTopicInternal(final AlterTopicPlan plan) { + topicMetaKeeper.removeTopicMeta(plan.getTopicMeta().getTopicName()); + topicMetaKeeper.addTopicMeta(plan.getTopicMeta().getTopicName(), plan.getTopicMeta()); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } + + public TSStatus alterMultipleTopics(final AlterMultipleTopicsPlan plan) { acquireWriteLock(); try { - TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + final TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); status.setSubStatus(new ArrayList<>()); - for (AlterTopicPlan subPlan : plan.getSubPlans()) { - TSStatus innerStatus = alterTopic(subPlan); + for (final AlterTopicPlan subPlan : plan.getSubPlans()) { + final TSStatus innerStatus = alterTopicInternal(subPlan); status.getSubStatus().add(innerStatus); if (innerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { status.setCode(TSStatusCode.ALTER_TOPIC_ERROR.getStatusCode()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java index 510f8559bc147..00007f921b260 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java @@ -279,7 +279,7 @@ private int getPrefetchingQueueCountInternal() { */ private static class Cache { - private T value; + private volatile T value; private volatile boolean valid = false; private final Supplier supplier; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java index fee23cf6af4cb..4ee6b191a2478 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java @@ -96,7 +96,6 @@ private void handleSingleConsumerGroupMetaChangesInternal( // if consumer group meta does not exist on local agent if (Objects.isNull(metaInAgent)) { - consumerGroupMetaKeeper.removeConsumerGroupMeta(consumerGroupId); consumerGroupMetaKeeper.addConsumerGroupMeta(consumerGroupId, metaFromCoordinator); SubscriptionAgent.broker().createBrokerIfNotExist(consumerGroupId); return; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java index 4c2bf5d02176a..37cdaa72690be 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java @@ -149,10 +149,12 @@ public boolean isTopicExisted(final String topicName) { public String getTopicFormat(final String topicName) { acquireReadLock(); try { - return topicMetaKeeper - .getTopicMeta(topicName) - .getConfig() - .getStringOrDefault(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_DEFAULT_VALUE); + return topicMetaKeeper.containsTopicMeta(topicName) + ? topicMetaKeeper + .getTopicMeta(topicName) + .getConfig() + .getStringOrDefault(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_DEFAULT_VALUE) + : null; } finally { releaseReadLock(); } @@ -161,10 +163,12 @@ public String getTopicFormat(final String topicName) { public String getTopicMode(final String topicName) { acquireReadLock(); try { - return topicMetaKeeper - .getTopicMeta(topicName) - .getConfig() - .getStringOrDefault(TopicConstant.MODE_KEY, TopicConstant.MODE_DEFAULT_VALUE); + return topicMetaKeeper.containsTopicMeta(topicName) + ? topicMetaKeeper + .getTopicMeta(topicName) + .getConfig() + .getStringOrDefault(TopicConstant.MODE_KEY, TopicConstant.MODE_DEFAULT_VALUE) + : null; } finally { releaseReadLock(); } @@ -174,6 +178,7 @@ public Map getTopicConfigs(final Set topicNames) { acquireReadLock(); try { return topicNames.stream() + .filter(topicMetaKeeper::containsTopicMeta) .collect( Collectors.toMap( topicName -> topicName, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java index f8e486537f6fa..b316c5f155de6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java @@ -33,6 +33,7 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -120,11 +121,13 @@ public void addConsumer(final ConsumerMeta consumerMeta) { public void removeConsumer(final String consumerId) { consumerIdToConsumerMeta.remove(consumerId); - for (final Map.Entry> entry : - topicNameToSubscribedConsumerIdSet.entrySet()) { + final Iterator>> iterator = + topicNameToSubscribedConsumerIdSet.entrySet().iterator(); + while (iterator.hasNext()) { + final Map.Entry> entry = iterator.next(); entry.getValue().remove(consumerId); if (entry.getValue().isEmpty()) { - topicNameToSubscribedConsumerIdSet.remove(entry.getKey()); + iterator.remove(); } } }