Skip to content

Commit 3e7e1a8

Browse files
committed
Replace probe-based consumer readiness with 30s timer gate
The probe-based approach (waiting for consumer to receive a message) works locally but fails in CI where the PushConsumer never receives probe messages within the health check timeout. Switch to a simple 30-second timer gate after consumer startup, which reliably gives enough time for rebalance to complete in both local and CI environments.
1 parent 625b9fd commit 3e7e1a8

File tree

3 files changed

+13
-18
lines changed

3 files changed

+13
-18
lines changed

test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/CaseController.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public class CaseController {
4949
private MessageService messageService;
5050

5151
private volatile boolean consumerStarted = false;
52+
private volatile long consumerStartTime = 0;
5253

5354
@RequestMapping("/rocketmq-5-grpc-scenario")
5455
@ResponseBody
@@ -95,20 +96,17 @@ public String healthCheck() throws Exception {
9596
GROUP
9697
);
9798
final Producer producer = ProducerSingleton.getInstance(endpoints, NORMAL_TOPIC);
99+
consumerStartTime = System.currentTimeMillis();
98100
} catch (Exception e) {
99101
consumerStarted = false;
100102
throw e;
101103
}
102104
}
103105

104-
// Send probe messages until the consumer receives one, confirming
105-
// that rebalance is complete and it can receive messages.
106-
if (!MessageService.PUSH_CONSUMER_READY) {
107-
try {
108-
messageService.sendNormalMessage(NORMAL_TOPIC, TAG_NOMARL, GROUP);
109-
} catch (Exception e) {
110-
log.error("Probe message failed", e);
111-
}
106+
// PushConsumer needs time for initial rebalance with the broker.
107+
// Return non-200 to force health check retries (3s each), giving
108+
// the consumer enough time before the entry service is called.
109+
if (System.currentTimeMillis() - consumerStartTime < 30000) {
112110
throw new RuntimeException("Consumer rebalance in progress");
113111
}
114112

test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/MessageService.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -240,13 +240,10 @@ public void updateNormalTopic(String topic) {
240240
MQAdminStartup.main(subArgs);
241241
}
242242

243-
public static volatile boolean PUSH_CONSUMER_READY = false;
244-
245243
public static class MyConsumer implements MessageListener {
246244

247245
@Override
248246
public ConsumeResult consume(MessageView messageView) {
249-
PUSH_CONSUMER_READY = true;
250247
log.info("Consume message successfully, messageId={},messageBody={}", messageView.getMessageId(),
251248
messageView.getBody().toString()
252249
);

test/plugin/scenarios/rocketmq-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/controller/CaseController.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class CaseController {
4949
private String namerServer;
5050

5151
private volatile boolean consumerStarted = false;
52-
private volatile boolean consumerReady = false;
52+
private volatile long consumerStartTime = 0;
5353

5454
@RequestMapping("/rocketmq-scenario")
5555
@ResponseBody
@@ -93,31 +93,31 @@ public String healthCheck() throws Exception {
9393
probeProducer.shutdown();
9494
System.out.printf("HealthCheck: Topic created via probe message.%n");
9595

96-
// Start consumer after topic exists. Use CONSUME_FROM_FIRST_OFFSET so
97-
// the consumer picks up the probe message once rebalance completes.
96+
// Start consumer after topic exists so rebalance finds queues immediately.
9897
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
9998
consumer.setNamesrvAddr(namerServer);
10099
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
101100
consumer.subscribe("TopicTest", "*");
102101
consumer.registerMessageListener(new MessageListenerConcurrently() {
103102
@Override
104103
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
105-
consumerReady = true;
106104
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody(), StandardCharsets.UTF_8));
107105
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
108106
}
109107
});
110108
consumer.start();
109+
consumerStartTime = System.currentTimeMillis();
111110
System.out.printf("Consumer Started.%n");
112111
} catch (Exception e) {
113112
consumerStarted = false;
114113
throw e;
115114
}
116115
}
117116

118-
// Wait until consumer has received the probe message, confirming
119-
// that rebalance is complete and it can receive messages.
120-
if (!consumerReady) {
117+
// PushConsumer needs time for initial rebalance with the broker.
118+
// Return non-200 to force health check retries (3s each), giving
119+
// the consumer enough time before the entry service is called.
120+
if (System.currentTimeMillis() - consumerStartTime < 30000) {
121121
throw new RuntimeException("Consumer rebalance in progress");
122122
}
123123

0 commit comments

Comments
 (0)