Skip to content

Commit 16bc1ed

Browse files
wu-shengclaude
andcommitted
Fix intermittent rocketmq-5-grpc-scenario test failure
simpleConsumeAsync() was fire-and-forget: it kicked off a CompletableFuture chain (receiveAsync -> ackAsync) but returned immediately without waiting for completion. The test thread would exit and the HTTP response would be sent before consumer segments were created, causing the mock collector to sometimes have fewer than the expected 4 segments. Changed from whenCompleteAsync (fire-and-forget) to thenComposeAsync with .join(), so the method blocks until all async ack callbacks have completed before returning. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent b4b20a3 commit 16bc1ed

File tree

1 file changed

+6
-9
lines changed
  • test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller

1 file changed

+6
-9
lines changed

test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/MessageService.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -203,26 +203,23 @@ public void simpleConsumeAsync(String topic,
203203
maxMessageNum,
204204
invisibleDuration
205205
);
206-
future0.whenCompleteAsync((messages, throwable) -> {
207-
if (null != throwable) {
208-
log.error("Failed to receive message from remote", throwable);
209-
return;
210-
}
206+
future0.thenComposeAsync(messages -> {
211207
log.info("Received {} message(s)", messages.size());
212208
final Map<MessageView, CompletableFuture<Void>> map =
213209
messages.stream().collect(Collectors.toMap(message -> message, consumer::ackAsync));
214-
for (Map.Entry<MessageView, CompletableFuture<Void>> entry : map.entrySet()) {
210+
List<CompletableFuture<Void>> ackFutures = map.entrySet().stream().map(entry -> {
215211
final MessageId messageId = entry.getKey().getMessageId();
216212
final CompletableFuture<Void> future = entry.getValue();
217-
future.whenCompleteAsync((v, t) -> {
213+
return future.whenCompleteAsync((v, t) -> {
218214
if (null != t) {
219215
log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
220216
return;
221217
}
222218
log.info("Message is acknowledged successfully, messageId={}", messageId);
223219
}, ackCallbackExecutor);
224-
}
225-
}, receiveCallbackExecutor);
220+
}).collect(Collectors.toList());
221+
return CompletableFuture.allOf(ackFutures.toArray(new CompletableFuture[0]));
222+
}, receiveCallbackExecutor).join();
226223
} catch (Exception e) {
227224
log.error("consumer start error", e);
228225
}

0 commit comments

Comments
 (0)