Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,26 +220,21 @@ public void validatePipePluginUsageByTopic(String pipePluginName) throws Subscri
}
}

public void validatePipePluginUsageByTopicInternal(String pipePluginName)
private void validatePipePluginUsageByTopicInternal(String pipePluginName)
throws SubscriptionException {
acquireReadLock();
try {
topicMetaKeeper
.getAllTopicMeta()
.forEach(
meta -> {
if (pipePluginName.equals(meta.getConfig().getAttribute().get("processor"))) {
final String exceptionMessage =
String.format(
"PipePlugin '%s' is already used by Topic '%s' as a processor.",
pipePluginName, meta.getTopicName());
LOGGER.warn(exceptionMessage);
throw new SubscriptionException(exceptionMessage);
}
});
} finally {
releaseReadLock();
}
topicMetaKeeper
.getAllTopicMeta()
.forEach(
meta -> {
if (pipePluginName.equals(meta.getConfig().getAttribute().get("processor"))) {
final String exceptionMessage =
String.format(
"PipePlugin '%s' is already used by Topic '%s' as a processor.",
pipePluginName, meta.getTopicName());
LOGGER.warn(exceptionMessage);
throw new SubscriptionException(exceptionMessage);
}
});
}

public void validateBeforeAlteringTopic(TopicMeta topicMeta) throws SubscriptionException {
Expand Down Expand Up @@ -326,21 +321,25 @@ public TSStatus createTopic(CreateTopicPlan plan) {
public TSStatus alterTopic(AlterTopicPlan plan) {
acquireWriteLock();
try {
topicMetaKeeper.removeTopicMeta(plan.getTopicMeta().getTopicName());
topicMetaKeeper.addTopicMeta(plan.getTopicMeta().getTopicName(), plan.getTopicMeta());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
return alterTopicInternal(plan);
} finally {
releaseWriteLock();
}
}

public TSStatus alterMultipleTopics(AlterMultipleTopicsPlan plan) {
private TSStatus alterTopicInternal(final AlterTopicPlan plan) {
topicMetaKeeper.removeTopicMeta(plan.getTopicMeta().getTopicName());
topicMetaKeeper.addTopicMeta(plan.getTopicMeta().getTopicName(), plan.getTopicMeta());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}

public TSStatus alterMultipleTopics(final AlterMultipleTopicsPlan plan) {
acquireWriteLock();
try {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
final TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setSubStatus(new ArrayList<>());
for (AlterTopicPlan subPlan : plan.getSubPlans()) {
TSStatus innerStatus = alterTopic(subPlan);
for (final AlterTopicPlan subPlan : plan.getSubPlans()) {
final TSStatus innerStatus = alterTopicInternal(subPlan);
status.getSubStatus().add(innerStatus);
if (innerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
status.setCode(TSStatusCode.ALTER_TOPIC_ERROR.getStatusCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private int getPrefetchingQueueCountInternal() {
*/
private static class Cache<T> {

private T value;
private volatile T value;
private volatile boolean valid = false;
private final Supplier<T> supplier;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ private void handleSingleConsumerGroupMetaChangesInternal(

// if consumer group meta does not exist on local agent
if (Objects.isNull(metaInAgent)) {
consumerGroupMetaKeeper.removeConsumerGroupMeta(consumerGroupId);
consumerGroupMetaKeeper.addConsumerGroupMeta(consumerGroupId, metaFromCoordinator);
SubscriptionAgent.broker().createBrokerIfNotExist(consumerGroupId);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,12 @@ public boolean isTopicExisted(final String topicName) {
public String getTopicFormat(final String topicName) {
acquireReadLock();
try {
return topicMetaKeeper
.getTopicMeta(topicName)
.getConfig()
.getStringOrDefault(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_DEFAULT_VALUE);
return topicMetaKeeper.containsTopicMeta(topicName)
? topicMetaKeeper
.getTopicMeta(topicName)
.getConfig()
.getStringOrDefault(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_DEFAULT_VALUE)
: null;
} finally {
releaseReadLock();
}
Expand All @@ -161,10 +163,12 @@ public String getTopicFormat(final String topicName) {
public String getTopicMode(final String topicName) {
acquireReadLock();
try {
return topicMetaKeeper
.getTopicMeta(topicName)
.getConfig()
.getStringOrDefault(TopicConstant.MODE_KEY, TopicConstant.MODE_DEFAULT_VALUE);
return topicMetaKeeper.containsTopicMeta(topicName)
? topicMetaKeeper
.getTopicMeta(topicName)
.getConfig()
.getStringOrDefault(TopicConstant.MODE_KEY, TopicConstant.MODE_DEFAULT_VALUE)
: null;
} finally {
releaseReadLock();
}
Expand All @@ -174,6 +178,7 @@ public Map<String, TopicConfig> getTopicConfigs(final Set<String> topicNames) {
acquireReadLock();
try {
return topicNames.stream()
.filter(topicMetaKeeper::containsTopicMeta)
.collect(
Collectors.toMap(
topicName -> topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -120,11 +121,13 @@ public void addConsumer(final ConsumerMeta consumerMeta) {

public void removeConsumer(final String consumerId) {
consumerIdToConsumerMeta.remove(consumerId);
for (final Map.Entry<String, Set<String>> entry :
topicNameToSubscribedConsumerIdSet.entrySet()) {
final Iterator<Map.Entry<String, Set<String>>> iterator =
topicNameToSubscribedConsumerIdSet.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<String, Set<String>> entry = iterator.next();
entry.getValue().remove(consumerId);
if (entry.getValue().isEmpty()) {
topicNameToSubscribedConsumerIdSet.remove(entry.getKey());
iterator.remove();
}
}
}
Expand Down
Loading