Skip to content

Commit 0fbc551

Browse files
refactor thread model
1 parent 6f98bd6 commit 0fbc551

File tree

18 files changed

+1617
-344
lines changed

18 files changed

+1617
-344
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
public class PipeSubtaskExecutorManager {
3434
private final PipeProcessorSubtaskExecutor processorExecutor;
3535
private final Supplier<PipeSinkSubtaskExecutor> connectorExecutorSupplier;
36-
private final SubscriptionSubtaskExecutor subscriptionExecutor;
36+
private volatile SubscriptionSubtaskExecutor subscriptionExecutor;
3737

3838
public PipeProcessorSubtaskExecutor getProcessorExecutor() {
3939
return processorExecutor;
@@ -49,6 +49,7 @@ public IoTConsensusV2SubtaskExecutor getConsensusExecutor() {
4949
}
5050

5151
public SubscriptionSubtaskExecutor getSubscriptionExecutor() {
52+
ensureSubscriptionExecutors();
5253
return subscriptionExecutor;
5354
}
5455

@@ -57,15 +58,28 @@ public SubscriptionSubtaskExecutor getSubscriptionExecutor() {
5758
private PipeSubtaskExecutorManager() {
5859
processorExecutor = new PipeProcessorSubtaskExecutor();
5960
connectorExecutorSupplier = PipeSinkSubtaskExecutor::new;
60-
subscriptionExecutor =
61-
SubscriptionConfig.getInstance().getSubscriptionEnabled()
62-
? new SubscriptionSubtaskExecutor()
63-
: null;
61+
ensureSubscriptionExecutors();
6462
// IoTV2 uses global singleton executor pool.
6563
IoTV2GlobalComponentContainer.getInstance()
6664
.setConsensusExecutor(new IoTConsensusV2SubtaskExecutor());
6765
}
6866

67+
public synchronized void ensureSubscriptionExecutors() {
68+
if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
69+
return;
70+
}
71+
if (subscriptionExecutor == null || subscriptionExecutor.isShutdown()) {
72+
subscriptionExecutor = new SubscriptionSubtaskExecutor();
73+
}
74+
}
75+
76+
public synchronized void shutdownSubscriptionExecutors() {
77+
if (subscriptionExecutor != null) {
78+
subscriptionExecutor.shutdown();
79+
subscriptionExecutor = null;
80+
}
81+
}
82+
6983
private static class PipeTaskExecutorHolder {
7084
private static PipeSubtaskExecutorManager instance = null;
7185
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.db.subscription.broker.consensus.ConsensusSubscriptionSetupHandler;
3030
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
3131
import org.apache.iotdb.db.subscription.resource.SubscriptionDataNodeResourceManager;
32+
import org.apache.iotdb.db.subscription.task.execution.ConsensusSubscriptionPrefetchExecutorManager;
3233
import org.apache.iotdb.db.subscription.task.subtask.SubscriptionSinkSubtask;
3334
import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
3435
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
@@ -229,6 +230,7 @@ public void seek(
229230
final ConsensusSubscriptionBroker consensusBroker =
230231
consumerGroupIdToConsensusBroker.get(consumerGroupId);
231232
if (Objects.nonNull(consensusBroker) && consensusBroker.hasQueue(topicName)) {
233+
ensureConsensusSeekRuntimeAvailable(consumerGroupId, topicName, "seek");
232234
if (seekType != PipeSubscribeSeekReq.SEEK_TO_BEGINNING
233235
&& seekType != PipeSubscribeSeekReq.SEEK_TO_END) {
234236
final String errorMessage =
@@ -261,6 +263,7 @@ public void seekToTopicProgress(
261263
final ConsensusSubscriptionBroker consensusBroker =
262264
consumerGroupIdToConsensusBroker.get(consumerGroupId);
263265
if (Objects.nonNull(consensusBroker) && consensusBroker.hasQueue(topicName)) {
266+
ensureConsensusSeekRuntimeAvailable(consumerGroupId, topicName, "seek(topicProgress)");
264267
consensusBroker.seek(topicName, topicProgress);
265268
return;
266269
}
@@ -283,6 +286,7 @@ public void seekAfterTopicProgress(
283286
final ConsensusSubscriptionBroker consensusBroker =
284287
consumerGroupIdToConsensusBroker.get(consumerGroupId);
285288
if (Objects.nonNull(consensusBroker) && consensusBroker.hasQueue(topicName)) {
289+
ensureConsensusSeekRuntimeAvailable(consumerGroupId, topicName, "seekAfter(topicProgress)");
286290
consensusBroker.seekAfter(topicName, topicProgress);
287291
return;
288292
}
@@ -296,6 +300,20 @@ public void seekAfterTopicProgress(
296300
throw new SubscriptionException(errorMessage);
297301
}
298302

303+
private void ensureConsensusSeekRuntimeAvailable(
304+
final String consumerGroupId, final String topicName, final String operation) {
305+
if (!ConsensusSubscriptionPrefetchExecutorManager.getInstance().isStarted()
306+
|| SubscriptionAgent.runtime().isShutdown()) {
307+
final String errorMessage =
308+
String.format(
309+
"Subscription: consensus %s is unavailable because subscription runtime is stopped, "
310+
+ "consumerGroup=%s, topic=%s",
311+
operation, consumerGroupId, topicName);
312+
LOGGER.warn(errorMessage);
313+
throw new SubscriptionException(errorMessage);
314+
}
315+
}
316+
299317
public boolean isCommitContextOutdated(final SubscriptionCommitContext commitContext) {
300318
final String consumerGroupId = commitContext.getConsumerGroupId();
301319
final String topicName = commitContext.getTopicName();
@@ -533,6 +551,12 @@ public void applyRuntimeStateForRegion(
533551
}
534552
}
535553

554+
public void abortConsensusPendingSeeksForRuntimeStop() {
555+
for (final ConsensusSubscriptionBroker broker : consumerGroupIdToConsensusBroker.values()) {
556+
broker.abortPendingSeeksForRuntimeStop();
557+
}
558+
}
559+
536560
public void updateCompletedTopicNames(final String consumerGroupId, final String topicName) {
537561
final SubscriptionBroker pipeBroker = consumerGroupIdToPipeBroker.get(consumerGroupId);
538562
if (Objects.isNull(pipeBroker)) {
@@ -584,12 +608,10 @@ public void removePrefetchingQueue(final String consumerGroupId, final String to
584608
}
585609

586610
public boolean executePrefetch(final String consumerGroupId, final String topicName) {
587-
// Try consensus broker first
588-
final ConsensusSubscriptionBroker consensusBroker =
589-
consumerGroupIdToConsensusBroker.get(consumerGroupId);
590-
if (Objects.nonNull(consensusBroker) && consensusBroker.hasQueue(topicName)) {
591-
return consensusBroker.executePrefetch(topicName);
611+
if (ConsensusSubscriptionSetupHandler.isConsensusBasedTopic(topicName)) {
612+
return false;
592613
}
614+
593615
// Fall back to pipe broker
594616
final SubscriptionBroker pipeBroker = consumerGroupIdToPipeBroker.get(consumerGroupId);
595617
if (Objects.isNull(pipeBroker)) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionRuntimeAgent.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.service.IService;
2424
import org.apache.iotdb.commons.service.ServiceType;
2525
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
26+
import org.apache.iotdb.db.subscription.task.execution.ConsensusSubscriptionPrefetchExecutorManager;
2627

2728
import java.util.concurrent.atomic.AtomicBoolean;
2829

@@ -67,6 +68,7 @@ public void start() throws StartupException {
6768
}
6869

6970
SubscriptionConfig.getInstance().printAllConfigs();
71+
ConsensusSubscriptionPrefetchExecutorManager.getInstance().start();
7072

7173
SubscriptionAgentLauncher.launchSubscriptionTopicAgent();
7274
SubscriptionAgentLauncher.launchSubscriptionConsumerAgent();
@@ -80,8 +82,9 @@ public void stop() {
8082
return;
8183
}
8284
isShutdown.set(true);
83-
84-
// let PipeDataNodeRuntimeAgent to drop all related pipe tasks
85+
SubscriptionAgent.broker().abortConsensusPendingSeeksForRuntimeStop();
86+
ConsensusSubscriptionPrefetchExecutorManager.getInstance().stop();
87+
SubscriptionAgent.broker().abortConsensusPendingSeeksForRuntimeStop();
8588
}
8689

8790
public boolean isShutdown() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/ConsensusSubscriptionBroker.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -380,18 +380,10 @@ private void seekQueueToRegionProgress(
380380

381381
@Override
382382
public boolean executePrefetch(final String topicName) {
383-
final List<ConsensusPrefetchingQueue> queues =
384-
topicNameToConsensusPrefetchingQueues.get(topicName);
385-
if (Objects.isNull(queues) || queues.isEmpty()) {
386-
return false;
387-
}
388-
boolean anyPrefetched = false;
389-
for (final ConsensusPrefetchingQueue q : queues) {
390-
if (!q.isClosed() && q.executePrefetch()) {
391-
anyPrefetched = true;
392-
}
393-
}
394-
return anyPrefetched;
383+
// Consensus prefetch is fully driven by queue-local wakeup sources and the dedicated delayed
384+
// scheduler. This interface remains only to satisfy the shared broker contract used by
385+
// pipe-based subscription.
386+
return false;
395387
}
396388

397389
@Override
@@ -721,6 +713,17 @@ public void applyRuntimeStateForRegion(
721713
}
722714
}
723715

716+
public void abortPendingSeeksForRuntimeStop() {
717+
for (final List<ConsensusPrefetchingQueue> queues :
718+
topicNameToConsensusPrefetchingQueues.values()) {
719+
for (final ConsensusPrefetchingQueue q : queues) {
720+
if (!q.isClosed()) {
721+
q.abortPendingSeekForRuntimeStop();
722+
}
723+
}
724+
}
725+
}
726+
724727
@Override
725728
public void removeQueue(final String topicName) {
726729
final List<ConsensusPrefetchingQueue> queues =

0 commit comments

Comments
 (0)