From 072c1f0dec96f2bfbab38e3cd5ac451a1665ae57 Mon Sep 17 00:00:00 2001 From: Hankunming <1109939087@qq.com> Date: Fri, 12 Apr 2024 16:34:32 +0800 Subject: [PATCH] fix: added queue's some messages may can not be consumed caused by offset init --- .../rocketmq/legacy/RocketMQSourceFunction.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java index bedf97f8..11ae25d1 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java @@ -243,7 +243,7 @@ public void open(Configuration parameters) throws Exception { if (restored) { initOffsetTableFromRestoredOffsets(messageQueues); } else { - initOffsets(messageQueues); + initOffsets(messageQueues, false); } } @@ -370,10 +370,12 @@ private void awaitTermination() throws InterruptedException { * @param messageQueues * @throws MQClientException */ - private void initOffsets(List messageQueues) throws MQClientException { + private void initOffsets(List messageQueues, boolean added) throws MQClientException { + StartupMode startupMode = startMode; + if(added) startupMode = StartupMode.EARLIEST; for (MessageQueue mq : messageQueues) { long offset; - switch (startMode) { + switch (startupMode) { case LATEST: consumer.seekToEnd(mq); offset = consumer.committed(mq); @@ -554,7 +556,7 @@ public void initOffsetTableFromRestoredOffsets(List messageQueues) } if (extMessageQueue.size() != 0) { log.info("no restoredOffsets for {}, so init offset for these queues", extMessageQueue); - initOffsets(extMessageQueue); + initOffsets(extMessageQueue, true); } log.info("init offset table [{}] from restoredOffsets successful.", offsetTable); }