Skip to content

Commit 75bfa23

Browse files
authored
Subscription: Fixed multiple problems (#17418)
1 parent 2b7b6cb commit 75bfa23

File tree

6 files changed

+46
-41
lines changed

6 files changed

+46
-41
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -220,26 +220,21 @@ public void validatePipePluginUsageByTopic(String pipePluginName) throws Subscri
220220
}
221221
}
222222

223-
public void validatePipePluginUsageByTopicInternal(String pipePluginName)
223+
private void validatePipePluginUsageByTopicInternal(String pipePluginName)
224224
throws SubscriptionException {
225-
acquireReadLock();
226-
try {
227-
topicMetaKeeper
228-
.getAllTopicMeta()
229-
.forEach(
230-
meta -> {
231-
if (pipePluginName.equals(meta.getConfig().getAttribute().get("processor"))) {
232-
final String exceptionMessage =
233-
String.format(
234-
"PipePlugin '%s' is already used by Topic '%s' as a processor.",
235-
pipePluginName, meta.getTopicName());
236-
LOGGER.warn(exceptionMessage);
237-
throw new SubscriptionException(exceptionMessage);
238-
}
239-
});
240-
} finally {
241-
releaseReadLock();
242-
}
225+
topicMetaKeeper
226+
.getAllTopicMeta()
227+
.forEach(
228+
meta -> {
229+
if (pipePluginName.equals(meta.getConfig().getAttribute().get("processor"))) {
230+
final String exceptionMessage =
231+
String.format(
232+
"PipePlugin '%s' is already used by Topic '%s' as a processor.",
233+
pipePluginName, meta.getTopicName());
234+
LOGGER.warn(exceptionMessage);
235+
throw new SubscriptionException(exceptionMessage);
236+
}
237+
});
243238
}
244239

245240
public void validateBeforeAlteringTopic(TopicMeta topicMeta) throws SubscriptionException {
@@ -335,21 +330,25 @@ public TSStatus createTopic(CreateTopicPlan plan) {
335330
public TSStatus alterTopic(AlterTopicPlan plan) {
336331
acquireWriteLock();
337332
try {
338-
topicMetaKeeper.removeTopicMeta(plan.getTopicMeta().getTopicName());
339-
topicMetaKeeper.addTopicMeta(plan.getTopicMeta().getTopicName(), plan.getTopicMeta());
340-
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
333+
return alterTopicInternal(plan);
341334
} finally {
342335
releaseWriteLock();
343336
}
344337
}
345338

346-
public TSStatus alterMultipleTopics(AlterMultipleTopicsPlan plan) {
339+
private TSStatus alterTopicInternal(final AlterTopicPlan plan) {
340+
topicMetaKeeper.removeTopicMeta(plan.getTopicMeta().getTopicName());
341+
topicMetaKeeper.addTopicMeta(plan.getTopicMeta().getTopicName(), plan.getTopicMeta());
342+
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
343+
}
344+
345+
public TSStatus alterMultipleTopics(final AlterMultipleTopicsPlan plan) {
347346
acquireWriteLock();
348347
try {
349-
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
348+
final TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
350349
status.setSubStatus(new ArrayList<>());
351-
for (AlterTopicPlan subPlan : plan.getSubPlans()) {
352-
TSStatus innerStatus = alterTopic(subPlan);
350+
for (final AlterTopicPlan subPlan : plan.getSubPlans()) {
351+
final TSStatus innerStatus = alterTopicInternal(subPlan);
353352
status.getSubStatus().add(innerStatus);
354353
if (innerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
355354
status.setCode(TSStatusCode.ALTER_TOPIC_ERROR.getStatusCode());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,9 @@ public DeletionResource(
7070
}
7171

7272
public synchronized void decreaseReference() {
73-
if (pipeTaskReferenceCount.get() == 1) {
73+
if (pipeTaskReferenceCount.decrementAndGet() == 0) {
7474
removeSelf();
7575
}
76-
pipeTaskReferenceCount.decrementAndGet();
7776
}
7877

7978
public void removeSelf() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ private int getPrefetchingQueueCountInternal() {
279279
*/
280280
private static class Cache<T> {
281281

282-
private T value;
282+
private volatile T value;
283283
private volatile boolean valid = false;
284284
private final Supplier<T> supplier;
285285

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ private void handleSingleConsumerGroupMetaChangesInternal(
9696

9797
// if consumer group meta does not exist on local agent
9898
if (Objects.isNull(metaInAgent)) {
99-
consumerGroupMetaKeeper.removeConsumerGroupMeta(consumerGroupId);
10099
consumerGroupMetaKeeper.addConsumerGroupMeta(consumerGroupId, metaFromCoordinator);
101100
SubscriptionAgent.broker().createBrokerIfNotExist(consumerGroupId);
102101
return;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,12 @@ public boolean isTopicExisted(final String topicName) {
149149
public String getTopicFormat(final String topicName) {
150150
acquireReadLock();
151151
try {
152-
return topicMetaKeeper
153-
.getTopicMeta(topicName)
154-
.getConfig()
155-
.getStringOrDefault(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_DEFAULT_VALUE);
152+
return topicMetaKeeper.containsTopicMeta(topicName)
153+
? topicMetaKeeper
154+
.getTopicMeta(topicName)
155+
.getConfig()
156+
.getStringOrDefault(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_DEFAULT_VALUE)
157+
: null;
156158
} finally {
157159
releaseReadLock();
158160
}
@@ -161,10 +163,12 @@ public String getTopicFormat(final String topicName) {
161163
public String getTopicMode(final String topicName) {
162164
acquireReadLock();
163165
try {
164-
return topicMetaKeeper
165-
.getTopicMeta(topicName)
166-
.getConfig()
167-
.getStringOrDefault(TopicConstant.MODE_KEY, TopicConstant.MODE_DEFAULT_VALUE);
166+
return topicMetaKeeper.containsTopicMeta(topicName)
167+
? topicMetaKeeper
168+
.getTopicMeta(topicName)
169+
.getConfig()
170+
.getStringOrDefault(TopicConstant.MODE_KEY, TopicConstant.MODE_DEFAULT_VALUE)
171+
: null;
168172
} finally {
169173
releaseReadLock();
170174
}
@@ -174,6 +178,7 @@ public Map<String, TopicConfig> getTopicConfigs(final Set<String> topicNames) {
174178
acquireReadLock();
175179
try {
176180
return topicNames.stream()
181+
.filter(topicMetaKeeper::containsTopicMeta)
177182
.collect(
178183
Collectors.toMap(
179184
topicName -> topicName,

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.nio.ByteBuffer;
3535
import java.util.Collections;
3636
import java.util.HashSet;
37+
import java.util.Iterator;
3738
import java.util.Map;
3839
import java.util.Objects;
3940
import java.util.Optional;
@@ -146,11 +147,13 @@ public void addConsumer(final ConsumerMeta consumerMeta) {
146147

147148
public void removeConsumer(final String consumerId) {
148149
consumerIdToConsumerMeta.remove(consumerId);
149-
for (final Map.Entry<String, Set<String>> entry :
150-
topicNameToSubscribedConsumerIdSet.entrySet()) {
150+
final Iterator<Map.Entry<String, Set<String>>> iterator =
151+
topicNameToSubscribedConsumerIdSet.entrySet().iterator();
152+
while (iterator.hasNext()) {
153+
final Map.Entry<String, Set<String>> entry = iterator.next();
151154
entry.getValue().remove(consumerId);
152155
if (entry.getValue().isEmpty()) {
153-
topicNameToSubscribedConsumerIdSet.remove(entry.getKey());
156+
iterator.remove();
154157
}
155158
}
156159
}

0 commit comments

Comments
 (0)