Skip to content

Commit 625b9fd

Browse files
committed
Fix rocketmq healthCheck re-entry when initialization exceeds curl timeout
Set consumerStarted flag at the start of the init block to prevent concurrent healthCheck requests from re-entering. The initialization (producer start + send + consumer start) can exceed curl's 3s timeout, causing the health check to retry and re-enter the block repeatedly.
1 parent 809fef3 commit 625b9fd

File tree

2 files changed

+54
-38
lines changed
  • test/plugin/scenarios
    • rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller
    • rocketmq-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/controller

2 files changed

+54
-38
lines changed

test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/CaseController.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,18 +79,26 @@ public String testcase() {
7979
@ResponseBody
8080
public String healthCheck() throws Exception {
8181
if (!consumerStarted) {
82-
System.setProperty(MixAll.ROCKETMQ_HOME_ENV, this.getClass().getResource("/").getPath());
83-
messageService.updateNormalTopic(NORMAL_TOPIC);
84-
messageService.updateNormalTopic(ASYNC_PRODUCER_TOPIC);
85-
messageService.updateNormalTopic(ASYNC_CONSUMER_TOPIC);
86-
// Start push consumer early so it has time to complete rebalance
87-
messageService.pushConsumes(
88-
Collections.singletonList(NORMAL_TOPIC),
89-
Collections.singletonList(TAG_NOMARL),
90-
GROUP
91-
);
92-
final Producer producer = ProducerSingleton.getInstance(endpoints, NORMAL_TOPIC);
82+
// Set flag early to prevent re-entry from concurrent healthCheck
83+
// requests (each curl has a 3s timeout, and initialization may
84+
// take longer than that).
9385
consumerStarted = true;
86+
try {
87+
System.setProperty(MixAll.ROCKETMQ_HOME_ENV, this.getClass().getResource("/").getPath());
88+
messageService.updateNormalTopic(NORMAL_TOPIC);
89+
messageService.updateNormalTopic(ASYNC_PRODUCER_TOPIC);
90+
messageService.updateNormalTopic(ASYNC_CONSUMER_TOPIC);
91+
// Start push consumer early so it has time to complete rebalance
92+
messageService.pushConsumes(
93+
Collections.singletonList(NORMAL_TOPIC),
94+
Collections.singletonList(TAG_NOMARL),
95+
GROUP
96+
);
97+
final Producer producer = ProducerSingleton.getInstance(endpoints, NORMAL_TOPIC);
98+
} catch (Exception e) {
99+
consumerStarted = false;
100+
throw e;
101+
}
94102
}
95103

96104
// Send probe messages until the consumer receives one, confirming

test/plugin/scenarios/rocketmq-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/controller/CaseController.java

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -77,34 +77,42 @@ public String testcase() {
7777
@ResponseBody
7878
public String healthCheck() throws Exception {
7979
if (!consumerStarted) {
80-
// Send a probe message to ensure the topic exists before consumer starts.
81-
// Without this, the consumer's rebalance finds no queues and it would need
82-
// another full rebalance cycle (~20s) after the topic is eventually created.
83-
DefaultMQProducer probeProducer = new DefaultMQProducer("healthCheck_please_rename_unique_group_name");
84-
probeProducer.setNamesrvAddr(namerServer);
85-
probeProducer.start();
86-
Message probeMsg = new Message("TopicTest", "probe".getBytes(StandardCharsets.UTF_8));
87-
probeProducer.send(probeMsg);
88-
probeProducer.shutdown();
89-
System.out.printf("HealthCheck: Topic created via probe message.%n");
90-
91-
// Start consumer after topic exists. Use CONSUME_FROM_FIRST_OFFSET so
92-
// the consumer picks up the probe message once rebalance completes.
93-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
94-
consumer.setNamesrvAddr(namerServer);
95-
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
96-
consumer.subscribe("TopicTest", "*");
97-
consumer.registerMessageListener(new MessageListenerConcurrently() {
98-
@Override
99-
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
100-
consumerReady = true;
101-
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody(), StandardCharsets.UTF_8));
102-
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
103-
}
104-
});
105-
consumer.start();
80+
// Set flag early to prevent re-entry from concurrent healthCheck
81+
// requests (each curl has a 3s timeout, and initialization may
82+
// take longer than that).
10683
consumerStarted = true;
107-
System.out.printf("Consumer Started.%n");
84+
try {
85+
// Send a probe message to ensure the topic exists before consumer starts.
86+
// Without this, the consumer's rebalance finds no queues and it would need
87+
// another full rebalance cycle (~20s) after the topic is eventually created.
88+
DefaultMQProducer probeProducer = new DefaultMQProducer("healthCheck_please_rename_unique_group_name");
89+
probeProducer.setNamesrvAddr(namerServer);
90+
probeProducer.start();
91+
Message probeMsg = new Message("TopicTest", "probe".getBytes(StandardCharsets.UTF_8));
92+
probeProducer.send(probeMsg);
93+
probeProducer.shutdown();
94+
System.out.printf("HealthCheck: Topic created via probe message.%n");
95+
96+
// Start consumer after topic exists. Use CONSUME_FROM_FIRST_OFFSET so
97+
// the consumer picks up the probe message once rebalance completes.
98+
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
99+
consumer.setNamesrvAddr(namerServer);
100+
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
101+
consumer.subscribe("TopicTest", "*");
102+
consumer.registerMessageListener(new MessageListenerConcurrently() {
103+
@Override
104+
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
105+
consumerReady = true;
106+
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody(), StandardCharsets.UTF_8));
107+
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
108+
}
109+
});
110+
consumer.start();
111+
System.out.printf("Consumer Started.%n");
112+
} catch (Exception e) {
113+
consumerStarted = false;
114+
throw e;
115+
}
108116
}
109117

110118
// Wait until consumer has received the probe message, confirming

0 commit comments

Comments
 (0)