From 008f38bf85b99789d55edde8954b2b8d17b3ca48 Mon Sep 17 00:00:00 2001 From: wangkai Date: Mon, 21 Nov 2022 16:41:42 +0800 Subject: [PATCH 1/2] 1.[fix lite pull consumer api] Version below 4.6.0 does not support DefaultLitePullConsumer --- pom.xml | 2 +- .../consumer/DefaultLitePullConsumer.java | 560 +++++++ .../client/consumer/LitePullConsumer.java | 216 +++ .../consumer/DefaultLitePullConsumerImpl.java | 1322 +++++++++++++++++ .../flink/legacy/RocketMQSourceFunction.java | 31 +- .../enumerator/RocketMQSourceEnumerator.java | 7 +- .../reader/RocketMQPartitionSplitReader.java | 10 +- 7 files changed, 2127 insertions(+), 21 deletions(-) create mode 100644 src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java create mode 100644 src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java create mode 100644 src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java diff --git a/pom.xml b/pom.xml index 1b299484..0ed59983 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,7 @@ 1.8 1.8 - 4.9.2 + 4.9.4 1.15.0 2.6 2.4.2 diff --git a/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java new file mode 100644 index 00000000..31a1833f --- /dev/null +++ b/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -0,0 +1,560 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; +import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; +import org.apache.rocketmq.client.trace.TraceDispatcher; +import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.NamespaceUtil; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.RPCHook; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer { + + private final InternalLogger log = ClientLogger.getLog(); + + private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl; + + /** + * Consumers belonging to the same consumer group share a group id. The consumers in a group + * then divides the topic as fairly amongst themselves as possible by establishing that each + * queue is only consumed by a single consumer from the group. If all consumers are from the + * same group, it functions as a traditional message queue. Each message would be consumed by + * one consumer of the group only. When multiple consumer groups exist, the flow of the data + * consumption model aligns with the traditional publish-subscribe model. The messages are + * broadcast to all consumer groups. + */ + private String consumerGroup; + + /** + * Long polling mode, the Consumer connection max suspend time, it is not recommended to modify + */ + private long brokerSuspendMaxTimeMillis = 1000 * 20; + + /** + * Long polling mode, the Consumer connection timeout(must greater than + * brokerSuspendMaxTimeMillis), it is not recommended to modify + */ + private long consumerTimeoutMillisWhenSuspend = 1000 * 30; + + /** The socket timeout in milliseconds */ + private long consumerPullTimeoutMillis = 1000 * 10; + + /** Consumption pattern,default is clustering */ + private MessageModel messageModel = MessageModel.CLUSTERING; + /** Message queue listener */ + private MessageQueueListener messageQueueListener; + /** Offset Storage */ + private OffsetStore offsetStore; + + /** Queue allocation algorithm */ + private AllocateMessageQueueStrategy allocateMessageQueueStrategy = + new AllocateMessageQueueAveragely(); + /** Whether the unit of subscription group */ + private boolean unitMode = false; + + /** The flag for auto commit offset */ + private boolean autoCommit = true; + + /** Pull thread number */ + private int pullThreadNums = 20; + + /** Minimum commit offset interval time in milliseconds. */ + private static final long MIN_AUTOCOMMIT_INTERVAL_MILLIS = 1000; + + /** Maximum commit offset interval time in milliseconds. */ + private long autoCommitIntervalMillis = 5 * 1000; + + /** Maximum number of messages pulled each time. */ + private int pullBatchSize = 10; + + /** + * Flow control threshold for consume request, each consumer will cache at most 10000 consume + * requests by default. Consider the {@code pullBatchSize}, the instantaneous value may exceed + * the limit + */ + private long pullThresholdForAll = 10000; + + /** Consume max span offset. */ + private int consumeMaxSpan = 2000; + + /** + * Flow control threshold on queue level, each message queue will cache at most 1000 messages by + * default, Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit + */ + private int pullThresholdForQueue = 1000; + + /** + * Limit the cached message size on queue level, each message queue will cache at most 100 MiB + * messages by default, Consider the {@code pullBatchSize}, the instantaneous value may exceed + * the limit + * + *

The size of a message only measured by message body, so it's not accurate + */ + private int pullThresholdSizeForQueue = 100; + + /** The poll timeout in milliseconds */ + private long pollTimeoutMillis = 1000 * 5; + + /** Interval time in in milliseconds for checking changes in topic metadata. */ + private long topicMetadataCheckIntervalMillis = 30 * 1000; + + private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; + + /** + * Backtracking consumption time with second precision. Time format is 20131223171201
+ * Implying Seventeen twelve and 01 seconds on December 23, 2013 year
+ * Default backtracking consumption time Half an hour ago. + */ + private String consumeTimestamp = + UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30)); + + /** Interface of asynchronous transfer data */ + private TraceDispatcher traceDispatcher = null; + + /** The flag for message trace */ + private boolean enableMsgTrace = false; + + /** + * The name value of message trace topic.If you don't config,you can use the default trace topic + * name. + */ + private String customizedTraceTopic; + + /** Default constructor. */ + public DefaultLitePullConsumer() { + this(null, MixAll.DEFAULT_CONSUMER_GROUP, null); + } + + /** + * Constructor specifying consumer group. + * + * @param consumerGroup Consumer group. + */ + public DefaultLitePullConsumer(final String consumerGroup) { + this(null, consumerGroup, null); + } + + /** + * Constructor specifying RPC hook. + * + * @param rpcHook RPC hook to execute before each remoting command. + */ + public DefaultLitePullConsumer(RPCHook rpcHook) { + this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook); + } + + /** + * Constructor specifying consumer group, RPC hook + * + * @param consumerGroup Consumer group. + * @param rpcHook RPC hook to execute before each remoting command. + */ + public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) { + this(null, consumerGroup, rpcHook); + } + + /** + * Constructor specifying namespace, consumer group and RPC hook. + * + * @param consumerGroup Consumer group. + * @param rpcHook RPC hook to execute before each remoting command. + */ + public DefaultLitePullConsumer( + final String namespace, final String consumerGroup, RPCHook rpcHook) { + this.namespace = namespace; + this.consumerGroup = consumerGroup; + this.enableStreamRequestType = true; + defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook); + } + + @Override + public void start() throws MQClientException { + setTraceDispatcher(); + setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup)); + this.defaultLitePullConsumerImpl.start(); + if (null != traceDispatcher) { + try { + traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); + } catch (MQClientException e) { + log.warn("trace dispatcher start failed ", e); + } + } + } + + @Override + public void shutdown() { + this.defaultLitePullConsumerImpl.shutdown(); + if (null != traceDispatcher) { + traceDispatcher.shutdown(); + } + } + + @Override + public boolean isRunning() { + return this.defaultLitePullConsumerImpl.isRunning(); + } + + @Override + public void subscribe(String topic, String subExpression) throws MQClientException { + this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression); + } + + @Override + public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException { + this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector); + } + + @Override + public void unsubscribe(String topic) { + this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic)); + } + + @Override + public void assign(Collection messageQueues) { + defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues)); + } + + @Override + public List poll() { + return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis()); + } + + @Override + public List poll(long timeout) { + return defaultLitePullConsumerImpl.poll(timeout); + } + + @Override + public void seek(MessageQueue messageQueue, long offset) throws MQClientException { + this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset); + } + + @Override + public void pause(Collection messageQueues) { + this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues)); + } + + @Override + public void resume(Collection messageQueues) { + this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues)); + } + + @Override + public Collection fetchMessageQueues(String topic) throws MQClientException { + return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic)); + } + + @Override + public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) + throws MQClientException { + return this.defaultLitePullConsumerImpl.searchOffset( + queueWithNamespace(messageQueue), timestamp); + } + + @Override + public void registerTopicMessageQueueChangeListener( + String topic, TopicMessageQueueChangeListener topicMessageQueueChangeListener) + throws MQClientException { + this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener( + withNamespace(topic), topicMessageQueueChangeListener); + } + + @Override + public void commitSync() { + this.defaultLitePullConsumerImpl.commitAll(); + } + + @Override + public void commit(final Set messageQueues, boolean persist) { + this.defaultLitePullConsumerImpl.commit(messageQueues, persist); + } + + @Override + public Long committed(MessageQueue messageQueue) throws MQClientException { + return this.defaultLitePullConsumerImpl.committed(queueWithNamespace(messageQueue)); + } + + @Override + public void setSubExpressionForAssign(final String topic, final String subExpresion) { + defaultLitePullConsumerImpl.setSubExpressionForAssign(withNamespace(topic), subExpresion); + } + + @Override + public void updateNameServerAddress(String nameServerAddress) { + this.defaultLitePullConsumerImpl.updateNameServerAddr(nameServerAddress); + } + + @Override + public void seekToBegin(MessageQueue messageQueue) throws MQClientException { + this.defaultLitePullConsumerImpl.seekToBegin(queueWithNamespace(messageQueue)); + } + + @Override + public void seekToEnd(MessageQueue messageQueue) throws MQClientException { + this.defaultLitePullConsumerImpl.seekToEnd(queueWithNamespace(messageQueue)); + } + + @Override + public boolean isAutoCommit() { + return autoCommit; + } + + @Override + public void setAutoCommit(boolean autoCommit) { + this.autoCommit = autoCommit; + } + + public boolean isConnectBrokerByUser() { + return this.defaultLitePullConsumerImpl.getPullAPIWrapper().isConnectBrokerByUser(); + } + + public void setConnectBrokerByUser(boolean connectBrokerByUser) { + this.defaultLitePullConsumerImpl + .getPullAPIWrapper() + .setConnectBrokerByUser(connectBrokerByUser); + } + + public long getDefaultBrokerId() { + return this.defaultLitePullConsumerImpl.getPullAPIWrapper().getDefaultBrokerId(); + } + + public void setDefaultBrokerId(long defaultBrokerId) { + this.defaultLitePullConsumerImpl.getPullAPIWrapper().setDefaultBrokerId(defaultBrokerId); + } + + public int getPullThreadNums() { + return pullThreadNums; + } + + public void setPullThreadNums(int pullThreadNums) { + this.pullThreadNums = pullThreadNums; + } + + public long getAutoCommitIntervalMillis() { + return autoCommitIntervalMillis; + } + + public void setAutoCommitIntervalMillis(long autoCommitIntervalMillis) { + if (autoCommitIntervalMillis >= MIN_AUTOCOMMIT_INTERVAL_MILLIS) { + this.autoCommitIntervalMillis = autoCommitIntervalMillis; + } + } + + public int getPullBatchSize() { + return pullBatchSize; + } + + public void setPullBatchSize(int pullBatchSize) { + this.pullBatchSize = pullBatchSize; + } + + public long getPullThresholdForAll() { + return pullThresholdForAll; + } + + public void setPullThresholdForAll(long pullThresholdForAll) { + this.pullThresholdForAll = pullThresholdForAll; + } + + public int getConsumeMaxSpan() { + return consumeMaxSpan; + } + + public void setConsumeMaxSpan(int consumeMaxSpan) { + this.consumeMaxSpan = consumeMaxSpan; + } + + public int getPullThresholdForQueue() { + return pullThresholdForQueue; + } + + public void setPullThresholdForQueue(int pullThresholdForQueue) { + this.pullThresholdForQueue = pullThresholdForQueue; + } + + public int getPullThresholdSizeForQueue() { + return pullThresholdSizeForQueue; + } + + public void setPullThresholdSizeForQueue(int pullThresholdSizeForQueue) { + this.pullThresholdSizeForQueue = pullThresholdSizeForQueue; + } + + public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() { + return allocateMessageQueueStrategy; + } + + public void setAllocateMessageQueueStrategy( + AllocateMessageQueueStrategy allocateMessageQueueStrategy) { + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + } + + public long getBrokerSuspendMaxTimeMillis() { + return brokerSuspendMaxTimeMillis; + } + + public long getPollTimeoutMillis() { + return pollTimeoutMillis; + } + + public void setPollTimeoutMillis(long pollTimeoutMillis) { + this.pollTimeoutMillis = pollTimeoutMillis; + } + + public OffsetStore getOffsetStore() { + return offsetStore; + } + + public void setOffsetStore(OffsetStore offsetStore) { + this.offsetStore = offsetStore; + } + + @Override + public boolean isUnitMode() { + return unitMode; + } + + @Override + public void setUnitMode(boolean isUnitMode) { + this.unitMode = isUnitMode; + } + + public MessageModel getMessageModel() { + return messageModel; + } + + public void setMessageModel(MessageModel messageModel) { + this.messageModel = messageModel; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public MessageQueueListener getMessageQueueListener() { + return messageQueueListener; + } + + public void setMessageQueueListener(MessageQueueListener messageQueueListener) { + this.messageQueueListener = messageQueueListener; + } + + public long getConsumerPullTimeoutMillis() { + return consumerPullTimeoutMillis; + } + + public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) { + this.consumerPullTimeoutMillis = consumerPullTimeoutMillis; + } + + public long getConsumerTimeoutMillisWhenSuspend() { + return consumerTimeoutMillisWhenSuspend; + } + + public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) { + this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend; + } + + public long getTopicMetadataCheckIntervalMillis() { + return topicMetadataCheckIntervalMillis; + } + + public void setTopicMetadataCheckIntervalMillis(long topicMetadataCheckIntervalMillis) { + this.topicMetadataCheckIntervalMillis = topicMetadataCheckIntervalMillis; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public ConsumeFromWhere getConsumeFromWhere() { + return consumeFromWhere; + } + + public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { + if (consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET + && consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET + && consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) { + throw new RuntimeException("Invalid ConsumeFromWhere Value", null); + } + this.consumeFromWhere = consumeFromWhere; + } + + public String getConsumeTimestamp() { + return consumeTimestamp; + } + + public void setConsumeTimestamp(String consumeTimestamp) { + this.consumeTimestamp = consumeTimestamp; + } + + public TraceDispatcher getTraceDispatcher() { + return traceDispatcher; + } + + public void setCustomizedTraceTopic(String customizedTraceTopic) { + this.customizedTraceTopic = customizedTraceTopic; + } + + private void setTraceDispatcher() { + if (isEnableMsgTrace()) { + try { + AsyncTraceDispatcher traceDispatcher = + new AsyncTraceDispatcher( + consumerGroup, + TraceDispatcher.Type.CONSUME, + customizedTraceTopic, + null); + traceDispatcher.getTraceProducer().setUseTLS(this.isUseTLS()); + this.traceDispatcher = traceDispatcher; + this.defaultLitePullConsumerImpl.registerConsumeMessageHook( + new ConsumeMessageTraceHookImpl(traceDispatcher)); + } catch (Throwable e) { + log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); + } + } + } + + public String getCustomizedTraceTopic() { + return customizedTraceTopic; + } + + public boolean isEnableMsgTrace() { + return enableMsgTrace; + } + + public void setEnableMsgTrace(boolean enableMsgTrace) { + this.enableMsgTrace = enableMsgTrace; + } +} diff --git a/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java new file mode 100644 index 00000000..9b1faf8b --- /dev/null +++ b/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.consumer; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +public interface LitePullConsumer { + + /** Start the consumer */ + void start() throws MQClientException; + + /** Shutdown the consumer */ + void shutdown(); + + /** + * This consumer is still running + * + * @return true if consumer is still running + */ + boolean isRunning(); + + /** + * Subscribe some topic with subExpression + * + * @param subExpression subscription expression.it only support or operation such as "tag1 || + * tag2 || tag3"
+ * if null or * expression,meaning subscribe all + * @throws MQClientException if there is any client error. + */ + void subscribe(final String topic, final String subExpression) throws MQClientException; + + /** + * Subscribe some topic with selector. + * + * @param selector message selector({@link MessageSelector}), can be null. + * @throws MQClientException if there is any client error. + */ + void subscribe(final String topic, final MessageSelector selector) throws MQClientException; + + /** + * Unsubscribe consumption some topic + * + * @param topic Message topic that needs to be unsubscribe. + */ + void unsubscribe(final String topic); + + /** + * Manually assign a list of message queues to this consumer. This interface does not allow for + * incremental assignment and will replace the previous assignment (if there is one). + * + * @param messageQueues Message queues that needs to be assigned. + */ + void assign(Collection messageQueues); + + /** + * Fetch data for the topics or partitions specified using assign API + * + * @return list of message, can be null. + */ + List poll(); + + /** + * Fetch data for the topics or partitions specified using assign API + * + * @param timeout The amount time, in milliseconds, spent waiting in poll if data is not + * available. Must not be negative + * @return list of message, can be null. + */ + List poll(long timeout); + + /** + * Overrides the fetch offsets that the consumer will use on the next poll. If this API is + * invoked for the same message queue more than once, the latest offset will be used on the next + * poll(). Note that you may lose data if this API is arbitrarily used in the middle of + * consumption. + * + * @param messageQueue + * @param offset + */ + void seek(MessageQueue messageQueue, long offset) throws MQClientException; + + /** + * Set topic subExpression for assign mode. This interface does not allow be call after start(). + * Default value is * if not set. assignment and will replace the previous assignment (if there + * is one). + * + * @param subExpression subscription expression.it only support or operation such as "tag1 || + * tag2 || tag3"
+ * if * null or * expression,meaning subscribe all + */ + void setSubExpressionForAssign(final String topic, final String subExpression); + + /** + * Suspend pulling from the requested message queues. + * + *

Because of the implementation of pre-pull, fetch data in {@link #poll()} will not stop + * immediately until the messages of the requested message queues drain. + * + *

Note that this method does not affect message queue subscription. In particular, it does + * not cause a group rebalance. + * + * @param messageQueues Message queues that needs to be paused. + */ + void pause(Collection messageQueues); + + /** + * Resume specified message queues which have been paused with {@link #pause(Collection)}. + * + * @param messageQueues Message queues that needs to be resumed. + */ + void resume(Collection messageQueues); + + /** + * Whether to enable auto-commit consume offset. + * + * @return true if enable auto-commit, false if disable auto-commit. + */ + boolean isAutoCommit(); + + /** + * Set whether to enable auto-commit consume offset. + * + * @param autoCommit Whether to enable auto-commit. + */ + void setAutoCommit(boolean autoCommit); + + /** + * Get metadata about the message queues for a given topic. + * + * @param topic The topic that need to get metadata. + * @return collection of message queues + * @throws MQClientException if there is any client error. + */ + Collection fetchMessageQueues(String topic) throws MQClientException; + + /** + * Look up the offsets for the given message queue by timestamp. The returned offset for each + * message queue is the earliest offset whose timestamp is greater than or equal to the given + * timestamp in the corresponding message queue. + * + * @param messageQueue Message queues that needs to get offset by timestamp. + * @param timestamp + * @return offset + * @throws MQClientException if there is any client error. + */ + Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException; + + /** Manually commit consume offset. */ + void commitSync(); + + void commit(final Set messageQueues, boolean persist); + + /** + * Get the last committed offset for the given message queue. + * + * @param messageQueue + * @return offset, if offset equals -1 means no offset in broker. + * @throws MQClientException if there is any client error. + */ + Long committed(MessageQueue messageQueue) throws MQClientException; + + /** + * Register a callback for sensing topic metadata changes. + * + * @param topic The topic that need to monitor. + * @param topicMessageQueueChangeListener Callback when topic metadata changes, refer {@link + * TopicMessageQueueChangeListener} + * @throws MQClientException if there is any client error. + */ + void registerTopicMessageQueueChangeListener( + String topic, TopicMessageQueueChangeListener topicMessageQueueChangeListener) + throws MQClientException; + + /** Update name server addresses. */ + void updateNameServerAddress(String nameServerAddress); + + /** + * Overrides the fetch offsets with the begin offset that the consumer will use on the next + * poll. If this API is invoked for the same message queue more than once, the latest offset + * will be used on the next poll(). Note that you may lose data if this API is arbitrarily used + * in the middle of consumption. + * + * @param messageQueue + */ + void seekToBegin(MessageQueue messageQueue) throws MQClientException; + + /** + * Overrides the fetch offsets with the end offset that the consumer will use on the next poll. + * If this API is invoked for the same message queue more than once, the latest offset will be + * used on the next poll(). Note that you may lose data if this API is arbitrarily used in the + * middle of consumption. + * + * @param messageQueue + */ + void seekToEnd(MessageQueue messageQueue) throws MQClientException; +} diff --git a/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java new file mode 100644 index 00000000..6f5e6030 --- /dev/null +++ b/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -0,0 +1,1322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.consumer; + +import org.apache.rocketmq.client.Validators; +import org.apache.rocketmq.client.consumer.*; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore; +import org.apache.rocketmq.client.consumer.store.OffsetStore; +import org.apache.rocketmq.client.consumer.store.ReadOffsetType; +import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.hook.ConsumeMessageContext; +import org.apache.rocketmq.client.hook.ConsumeMessageHook; +import org.apache.rocketmq.client.hook.FilterMessageHook; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ServiceState; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.filter.ExpressionType; +import org.apache.rocketmq.common.filter.FilterAPI; +import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.NamespaceUtil; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.sysflag.PullSysFlag; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.exception.RemotingException; + +import org.apache.commons.lang3.StringUtils; + +import java.util.*; +import java.util.concurrent.*; + +public class DefaultLitePullConsumerImpl implements MQConsumerInner { + + private final InternalLogger log = ClientLogger.getLog(); + + private final long consumerStartTimestamp = System.currentTimeMillis(); + + private final RPCHook rpcHook; + + private final ArrayList filterMessageHookList = + new ArrayList(); + + private volatile ServiceState serviceState = ServiceState.CREATE_JUST; + + protected MQClientInstance mQClientFactory; + + private PullAPIWrapper pullAPIWrapper; + + private OffsetStore offsetStore; + + private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this); + + private enum SubscriptionType { + NONE, + SUBSCRIBE, + ASSIGN + } + + private static final String NOT_RUNNING_EXCEPTION_MESSAGE = + "The consumer not running, please start it first."; + + private static final String SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE = + "Subscribe and assign are mutually exclusive."; + /** the type of subscription */ + private SubscriptionType subscriptionType = SubscriptionType.NONE; + /** Delay some time when exception occur */ + private long pullTimeDelayMillsWhenException = 1000; + /** Flow control interval */ + private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50; + /** Delay some time when suspend pull service */ + private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000; + + private static final long PULL_TIME_DELAY_MILLS_ON_EXCEPTION = 3 * 1000; + + private ConcurrentHashMap topicToSubExpression = + new ConcurrentHashMap<>(); + + private DefaultLitePullConsumer defaultLitePullConsumer; + + private final ConcurrentMap taskTable = + new ConcurrentHashMap(); + + private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue(); + + private final BlockingQueue consumeRequestCache = + new LinkedBlockingQueue(); + + private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; + + private final ScheduledExecutorService scheduledExecutorService; + + private Map topicMessageQueueChangeListenerMap = + new HashMap(); + + private Map> messageQueuesForTopic = + new HashMap>(); + + private long consumeRequestFlowControlTimes = 0L; + + private long queueFlowControlTimes = 0L; + + private long queueMaxSpanFlowControlTimes = 0L; + + private long nextAutoCommitDeadline = -1L; + + private final MessageQueueLock messageQueueLock = new MessageQueueLock(); + + private final ArrayList consumeMessageHookList = + new ArrayList(); + + public DefaultLitePullConsumerImpl( + final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) { + this.defaultLitePullConsumer = defaultLitePullConsumer; + this.rpcHook = rpcHook; + this.scheduledThreadPoolExecutor = + new ScheduledThreadPoolExecutor( + this.defaultLitePullConsumer.getPullThreadNums(), + new ThreadFactoryImpl( + "PullMsgThread-" + + this.defaultLitePullConsumer.getConsumerGroup())); + this.scheduledExecutorService = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "MonitorMessageQueueChangeThread"); + } + }); + this.pullTimeDelayMillsWhenException = + defaultLitePullConsumer.getPullTimeDelayMillsWhenException(); + } + + public void registerConsumeMessageHook(final ConsumeMessageHook hook) { + this.consumeMessageHookList.add(hook); + log.info("register consumeMessageHook Hook, {}", hook.hookName()); + } + + public void executeHookBefore(final ConsumeMessageContext context) { + if (!this.consumeMessageHookList.isEmpty()) { + for (ConsumeMessageHook hook : this.consumeMessageHookList) { + try { + hook.consumeMessageBefore(context); + } catch (Throwable e) { + log.error( + "consumeMessageHook {} executeHookBefore exception", + hook.hookName(), + e); + } + } + } + } + + public void executeHookAfter(final ConsumeMessageContext context) { + if (!this.consumeMessageHookList.isEmpty()) { + for (ConsumeMessageHook hook : this.consumeMessageHookList) { + try { + hook.consumeMessageAfter(context); + } catch (Throwable e) { + log.error( + "consumeMessageHook {} executeHookAfter exception", hook.hookName(), e); + } + } + } + } + + private void checkServiceState() { + if (this.serviceState != ServiceState.RUNNING) { + throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE); + } + } + + public void updateNameServerAddr(String newAddresses) { + this.mQClientFactory.getMQClientAPIImpl().updateNameServerAddressList(newAddresses); + } + + private synchronized void setSubscriptionType(SubscriptionType type) { + if (this.subscriptionType == SubscriptionType.NONE) { + this.subscriptionType = type; + } else if (this.subscriptionType != type) { + throw new IllegalStateException(SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE); + } + } + + private void updateAssignedMessageQueue(String topic, Set assignedMessageQueue) { + this.assignedMessageQueue.updateAssignedMessageQueue(topic, assignedMessageQueue); + } + + private void updatePullTask(String topic, Set mqNewSet) { + Iterator> it = this.taskTable.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry next = it.next(); + if (next.getKey().getTopic().equals(topic)) { + if (!mqNewSet.contains(next.getKey())) { + next.getValue().setCancelled(true); + it.remove(); + } + } + } + startPullTask(mqNewSet); + } + + class MessageQueueListenerImpl implements MessageQueueListener { + @Override + public void messageQueueChanged( + String topic, Set mqAll, Set mqDivided) { + MessageModel messageModel = defaultLitePullConsumer.getMessageModel(); + switch (messageModel) { + case BROADCASTING: + updateAssignedMessageQueue(topic, mqAll); + updatePullTask(topic, mqAll); + break; + case CLUSTERING: + updateAssignedMessageQueue(topic, mqDivided); + updatePullTask(topic, mqDivided); + break; + default: + break; + } + } + } + + public synchronized void shutdown() { + switch (this.serviceState) { + case CREATE_JUST: + break; + case RUNNING: + persistConsumerOffset(); + this.mQClientFactory.unregisterConsumer( + this.defaultLitePullConsumer.getConsumerGroup()); + scheduledThreadPoolExecutor.shutdown(); + scheduledExecutorService.shutdown(); + this.mQClientFactory.shutdown(); + this.serviceState = ServiceState.SHUTDOWN_ALREADY; + log.info( + "the consumer [{}] shutdown OK", + this.defaultLitePullConsumer.getConsumerGroup()); + break; + default: + break; + } + } + + public synchronized boolean isRunning() { + return this.serviceState == ServiceState.RUNNING; + } + + public synchronized void start() throws MQClientException { + switch (this.serviceState) { + case CREATE_JUST: + this.serviceState = ServiceState.START_FAILED; + + this.checkConfig(); + + if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) { + this.defaultLitePullConsumer.changeInstanceNameToPID(); + } + + initMQClientFactory(); + + initRebalanceImpl(); + + initPullAPIWrapper(); + + initOffsetStore(); + + mQClientFactory.start(); + + startScheduleTask(); + + this.serviceState = ServiceState.RUNNING; + + log.info( + "the consumer [{}] start OK", + this.defaultLitePullConsumer.getConsumerGroup()); + + operateAfterRunning(); + + break; + case RUNNING: + case START_FAILED: + case SHUTDOWN_ALREADY: + throw new MQClientException( + "The PullConsumer service state not OK, maybe started once, " + + this.serviceState + + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), + null); + default: + break; + } + } + + private void initMQClientFactory() throws MQClientException { + this.mQClientFactory = + MQClientManager.getInstance() + .getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook); + boolean registerOK = + mQClientFactory.registerConsumer( + this.defaultLitePullConsumer.getConsumerGroup(), this); + if (!registerOK) { + this.serviceState = ServiceState.CREATE_JUST; + + throw new MQClientException( + "The consumer group[" + + this.defaultLitePullConsumer.getConsumerGroup() + + "] has been created before, specify another name please." + + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), + null); + } + } + + private void initRebalanceImpl() { + this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup()); + this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel()); + this.rebalanceImpl.setAllocateMessageQueueStrategy( + this.defaultLitePullConsumer.getAllocateMessageQueueStrategy()); + this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); + } + + private void initPullAPIWrapper() { + this.pullAPIWrapper = + new PullAPIWrapper( + mQClientFactory, + this.defaultLitePullConsumer.getConsumerGroup(), + isUnitMode()); + this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); + } + + private void initOffsetStore() throws MQClientException { + if (this.defaultLitePullConsumer.getOffsetStore() != null) { + this.offsetStore = this.defaultLitePullConsumer.getOffsetStore(); + } else { + switch (this.defaultLitePullConsumer.getMessageModel()) { + case BROADCASTING: + this.offsetStore = + new LocalFileOffsetStore( + this.mQClientFactory, + this.defaultLitePullConsumer.getConsumerGroup()); + break; + case CLUSTERING: + this.offsetStore = + new RemoteBrokerOffsetStore( + this.mQClientFactory, + this.defaultLitePullConsumer.getConsumerGroup()); + break; + default: + break; + } + this.defaultLitePullConsumer.setOffsetStore(this.offsetStore); + } + this.offsetStore.load(); + } + + private void startScheduleTask() { + scheduledExecutorService.scheduleAtFixedRate( + new Runnable() { + @Override + public void run() { + try { + fetchTopicMessageQueuesAndCompare(); + } catch (Exception e) { + log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e); + } + } + }, + 1000 * 10, + this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), + TimeUnit.MILLISECONDS); + } + + private void operateAfterRunning() throws MQClientException { + // If subscribe function invoke before start function, then update topic subscribe info + // after initialization. + if (subscriptionType == SubscriptionType.SUBSCRIBE) { + updateTopicSubscribeInfoWhenSubscriptionChanged(); + } + // If assign function invoke before start function, then update pull task after + // initialization. + if (subscriptionType == SubscriptionType.ASSIGN) { + updateAssignPullTask(assignedMessageQueue.messageQueues()); + } + + for (String topic : topicMessageQueueChangeListenerMap.keySet()) { + Set messageQueues = fetchMessageQueues(topic); + messageQueuesForTopic.put(topic, messageQueues); + } + this.mQClientFactory.checkClientInBroker(); + } + + private void checkConfig() throws MQClientException { + // Check consumerGroup + Validators.checkGroup(this.defaultLitePullConsumer.getConsumerGroup()); + + // Check consumerGroup name is not equal default consumer group name. + if (this.defaultLitePullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) { + throw new MQClientException( + "consumerGroup can not equal " + + MixAll.DEFAULT_CONSUMER_GROUP + + ", please specify another one." + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // Check messageModel is not null. + if (null == this.defaultLitePullConsumer.getMessageModel()) { + throw new MQClientException( + "messageModel is null" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + // Check allocateMessageQueueStrategy is not null + if (null == this.defaultLitePullConsumer.getAllocateMessageQueueStrategy()) { + throw new MQClientException( + "allocateMessageQueueStrategy is null" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + + if (this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() + < this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis()) { + throw new MQClientException( + "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" + + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), + null); + } + } + + public PullAPIWrapper getPullAPIWrapper() { + return pullAPIWrapper; + } + + private void startPullTask(Collection mqSet) { + for (MessageQueue messageQueue : mqSet) { + if (!this.taskTable.containsKey(messageQueue)) { + PullTaskImpl pullTask = new PullTaskImpl(messageQueue); + this.taskTable.put(messageQueue, pullTask); + this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS); + } + } + } + + private void updateAssignPullTask(Collection mqNewSet) { + Iterator> it = this.taskTable.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry next = it.next(); + if (!mqNewSet.contains(next.getKey())) { + next.getValue().setCancelled(true); + it.remove(); + } + } + + startPullTask(mqNewSet); + } + + private void updateTopicSubscribeInfoWhenSubscriptionChanged() { + Map subTable = rebalanceImpl.getSubscriptionInner(); + if (subTable != null) { + for (final Map.Entry entry : subTable.entrySet()) { + final String topic = entry.getKey(); + this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); + } + } + } + + public synchronized void subscribe(String topic, String subExpression) + throws MQClientException { + try { + if (topic == null || "".equals(topic)) { + throw new IllegalArgumentException("Topic can not be null or empty."); + } + setSubscriptionType(SubscriptionType.SUBSCRIBE); + SubscriptionData subscriptionData = + FilterAPI.buildSubscriptionData(topic, subExpression); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl()); + assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl); + if (serviceState == ServiceState.RUNNING) { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + updateTopicSubscribeInfoWhenSubscriptionChanged(); + } + } catch (Exception e) { + throw new MQClientException("subscribe exception", e); + } + } + + public synchronized void subscribe(String topic, MessageSelector messageSelector) + throws MQClientException { + try { + if (topic == null || "".equals(topic)) { + throw new IllegalArgumentException("Topic can not be null or empty."); + } + setSubscriptionType(SubscriptionType.SUBSCRIBE); + if (messageSelector == null) { + subscribe(topic, SubscriptionData.SUB_ALL); + return; + } + SubscriptionData subscriptionData = + FilterAPI.build( + topic, + messageSelector.getExpression(), + messageSelector.getExpressionType()); + this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); + this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl()); + assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl); + if (serviceState == ServiceState.RUNNING) { + this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); + updateTopicSubscribeInfoWhenSubscriptionChanged(); + } + } catch (Exception e) { + throw new MQClientException("subscribe exception", e); + } + } + + public synchronized void unsubscribe(final String topic) { + this.rebalanceImpl.getSubscriptionInner().remove(topic); + removePullTaskCallback(topic); + assignedMessageQueue.removeAssignedMessageQueue(topic); + } + + public synchronized void assign(Collection messageQueues) { + if (messageQueues == null || messageQueues.isEmpty()) { + throw new IllegalArgumentException("Message queues can not be null or empty."); + } + setSubscriptionType(SubscriptionType.ASSIGN); + assignedMessageQueue.updateAssignedMessageQueue(messageQueues); + if (serviceState == ServiceState.RUNNING) { + updateAssignPullTask(messageQueues); + } + } + + private void maybeAutoCommit() { + long now = System.currentTimeMillis(); + if (now >= nextAutoCommitDeadline) { + commitAll(); + nextAutoCommitDeadline = now + defaultLitePullConsumer.getAutoCommitIntervalMillis(); + } + } + + public synchronized List poll(long timeout) { + try { + checkServiceState(); + if (timeout < 0) { + throw new IllegalArgumentException("Timeout must not be negative"); + } + + if (defaultLitePullConsumer.isAutoCommit()) { + maybeAutoCommit(); + } + long endTime = System.currentTimeMillis() + timeout; + + ConsumeRequest consumeRequest = + consumeRequestCache.poll( + endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + + if (endTime - System.currentTimeMillis() > 0) { + while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) { + consumeRequest = + consumeRequestCache.poll( + endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + if (endTime - System.currentTimeMillis() <= 0) { + break; + } + } + } + + if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) { + List messages = consumeRequest.getMessageExts(); + long offset = consumeRequest.getProcessQueue().removeMessage(messages); + assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset); + // If namespace not null , reset Topic without namespace. + this.resetTopic(messages); + return messages; + } + } catch (InterruptedException ignore) { + + } + + return Collections.emptyList(); + } + + public synchronized void setSubExpressionForAssign( + final String topic, final String subExpression) { + if (StringUtils.isBlank(subExpression)) { + throw new IllegalArgumentException("subExpression can not be null or empty."); + } + if (serviceState != ServiceState.CREATE_JUST) { + throw new IllegalStateException("setAssignTag only can be called before start."); + } + setSubscriptionType(SubscriptionType.ASSIGN); + topicToSubExpression.put(topic, subExpression); + } + + public void pause(Collection messageQueues) { + assignedMessageQueue.pause(messageQueues); + } + + public void resume(Collection messageQueues) { + assignedMessageQueue.resume(messageQueues); + } + + public synchronized void seek(MessageQueue messageQueue, long offset) throws MQClientException { + if (!assignedMessageQueue.messageQueues().contains(messageQueue)) { + if (subscriptionType == SubscriptionType.SUBSCRIBE) { + throw new MQClientException( + "The message queue is not in assigned list, may be rebalancing, message queue: " + + messageQueue, + null); + } else { + throw new MQClientException( + "The message queue is not in assigned list, message queue: " + messageQueue, + null); + } + } + long minOffset = minOffset(messageQueue); + long maxOffset = maxOffset(messageQueue); + if (offset < minOffset || offset > maxOffset) { + throw new MQClientException( + "Seek offset illegal, seek offset = " + + offset + + ", min offset = " + + minOffset + + ", max offset = " + + maxOffset, + null); + } + final Object objLock = messageQueueLock.fetchLockObject(messageQueue); + synchronized (objLock) { + clearMessageQueueInCache(messageQueue); + + PullTaskImpl oldPullTaskImpl = this.taskTable.get(messageQueue); + if (oldPullTaskImpl != null) { + oldPullTaskImpl.tryInterrupt(); + this.taskTable.remove(messageQueue); + } + assignedMessageQueue.setSeekOffset(messageQueue, offset); + if (!this.taskTable.containsKey(messageQueue)) { + PullTaskImpl pullTask = new PullTaskImpl(messageQueue); + this.taskTable.put(messageQueue, pullTask); + this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS); + } + } + } + + public void seekToBegin(MessageQueue messageQueue) throws MQClientException { + long begin = minOffset(messageQueue); + this.seek(messageQueue, begin); + } + + public void seekToEnd(MessageQueue messageQueue) throws MQClientException { + long end = maxOffset(messageQueue); + this.seek(messageQueue, end); + } + + private long maxOffset(MessageQueue messageQueue) throws MQClientException { + checkServiceState(); + return this.mQClientFactory.getMQAdminImpl().maxOffset(messageQueue); + } + + private long minOffset(MessageQueue messageQueue) throws MQClientException { + checkServiceState(); + return this.mQClientFactory.getMQAdminImpl().minOffset(messageQueue); + } + + private void removePullTaskCallback(final String topic) { + removePullTask(topic); + } + + private void removePullTask(final String topic) { + Iterator> it = this.taskTable.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry next = it.next(); + if (next.getKey().getTopic().equals(topic)) { + next.getValue().setCancelled(true); + it.remove(); + } + } + } + + public synchronized void commitAll() { + for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { + try { + commit(messageQueue); + } catch (Exception e) { + log.error("An error occurred when update consume offset Automatically."); + } + } + } + + public synchronized void commit(final Set messageQueues, boolean persist) { + if (messageQueues == null || messageQueues.size() == 0) { + return; + } + + for (MessageQueue messageQueue : messageQueues) { + commit(messageQueue); + } + + if (persist) { + this.offsetStore.persistAll(messageQueues); + } + } + + private synchronized void commit(MessageQueue messageQueue) { + long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue); + + if (consumerOffset != -1) { + ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); + if (processQueue != null && !processQueue.isDropped()) { + updateConsumeOffset(messageQueue, consumerOffset); + } + } else { + log.error("consumerOffset is -1 in messageQueue [" + messageQueue + "]."); + } + } + + private void updatePullOffset( + MessageQueue messageQueue, long nextPullOffset, ProcessQueue processQueue) { + if (assignedMessageQueue.getSeekOffset(messageQueue) == -1) { + assignedMessageQueue.updatePullOffset(messageQueue, nextPullOffset, processQueue); + } + } + + private void submitConsumeRequest(ConsumeRequest consumeRequest) { + try { + consumeRequestCache.put(consumeRequest); + } catch (InterruptedException e) { + log.error("Submit consumeRequest error", e); + } + } + + private long fetchConsumeOffset(MessageQueue messageQueue) throws MQClientException { + checkServiceState(); + long offset = this.rebalanceImpl.computePullFromWhereWithException(messageQueue); + return offset; + } + + public long committed(MessageQueue messageQueue) throws MQClientException { + checkServiceState(); + long offset = + this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE); + if (offset == -2) { + throw new MQClientException("Fetch consume offset from broker exception", null); + } + return offset; + } + + private void clearMessageQueueInCache(MessageQueue messageQueue) { + ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); + if (processQueue != null) { + processQueue.clear(); + } + Iterator iter = consumeRequestCache.iterator(); + while (iter.hasNext()) { + if (iter.next().getMessageQueue().equals(messageQueue)) { + iter.remove(); + } + } + } + + private long nextPullOffset(MessageQueue messageQueue) throws MQClientException { + long offset = -1; + long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue); + if (seekOffset != -1) { + offset = seekOffset; + assignedMessageQueue.updateConsumeOffset(messageQueue, offset); + assignedMessageQueue.setSeekOffset(messageQueue, -1); + } else { + offset = assignedMessageQueue.getPullOffset(messageQueue); + if (offset == -1) { + offset = fetchConsumeOffset(messageQueue); + } + } + return offset; + } + + public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException { + checkServiceState(); + return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); + } + + public class PullTaskImpl implements Runnable { + private final MessageQueue messageQueue; + private volatile boolean cancelled = false; + private Thread currentThread; + + public PullTaskImpl(final MessageQueue messageQueue) { + this.messageQueue = messageQueue; + } + + public void tryInterrupt() { + setCancelled(true); + if (currentThread == null) { + return; + } + if (!currentThread.isInterrupted()) { + currentThread.interrupt(); + } + } + + @Override + public void run() { + + if (!this.isCancelled()) { + + this.currentThread = Thread.currentThread(); + + if (assignedMessageQueue.isPaused(messageQueue)) { + scheduledThreadPoolExecutor.schedule( + this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS); + log.debug("Message Queue: {} has been paused!", messageQueue); + return; + } + + ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); + + if (null == processQueue || processQueue.isDropped()) { + log.info( + "The message queue not be able to poll, because it's dropped. group={}, messageQueue={}", + defaultLitePullConsumer.getConsumerGroup(), + this.messageQueue); + return; + } + + if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() + > defaultLitePullConsumer.getPullThresholdForAll()) { + scheduledThreadPoolExecutor.schedule( + this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + if ((consumeRequestFlowControlTimes++ % 1000) == 0) { + log.warn( + "The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", + consumeRequestCache.size(), + consumeRequestFlowControlTimes); + } + return; + } + + long cachedMessageCount = processQueue.getMsgCount().get(); + long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); + + if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) { + scheduledThreadPoolExecutor.schedule( + this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + if ((queueFlowControlTimes++ % 1000) == 0) { + log.warn( + "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", + defaultLitePullConsumer.getPullThresholdForQueue(), + processQueue.getMsgTreeMap().firstKey(), + processQueue.getMsgTreeMap().lastKey(), + cachedMessageCount, + cachedMessageSizeInMiB, + queueFlowControlTimes); + } + return; + } + + if (cachedMessageSizeInMiB + > defaultLitePullConsumer.getPullThresholdSizeForQueue()) { + scheduledThreadPoolExecutor.schedule( + this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + if ((queueFlowControlTimes++ % 1000) == 0) { + log.warn( + "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}", + defaultLitePullConsumer.getPullThresholdSizeForQueue(), + processQueue.getMsgTreeMap().firstKey(), + processQueue.getMsgTreeMap().lastKey(), + cachedMessageCount, + cachedMessageSizeInMiB, + queueFlowControlTimes); + } + return; + } + + if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) { + scheduledThreadPoolExecutor.schedule( + this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS); + if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { + log.warn( + "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}", + processQueue.getMsgTreeMap().firstKey(), + processQueue.getMsgTreeMap().lastKey(), + processQueue.getMaxSpan(), + queueMaxSpanFlowControlTimes); + } + return; + } + + long offset = 0L; + try { + offset = nextPullOffset(messageQueue); + } catch (Exception e) { + log.error("Failed to get next pull offset", e); + scheduledThreadPoolExecutor.schedule( + this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS); + return; + } + + if (this.isCancelled() || processQueue.isDropped()) { + return; + } + long pullDelayTimeMills = 0; + try { + SubscriptionData subscriptionData; + String topic = this.messageQueue.getTopic(); + if (subscriptionType == SubscriptionType.SUBSCRIBE) { + subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic); + } else { + String subExpression4Assign = topicToSubExpression.get(topic); + subExpression4Assign = + subExpression4Assign == null + ? SubscriptionData.SUB_ALL + : subExpression4Assign; + subscriptionData = + FilterAPI.buildSubscriptionData(topic, subExpression4Assign); + } + + PullResult pullResult = + pull( + messageQueue, + subscriptionData, + offset, + defaultLitePullConsumer.getPullBatchSize()); + if (this.isCancelled() || processQueue.isDropped()) { + return; + } + switch (pullResult.getPullStatus()) { + case FOUND: + final Object objLock = messageQueueLock.fetchLockObject(messageQueue); + synchronized (objLock) { + if (pullResult.getMsgFoundList() != null + && !pullResult.getMsgFoundList().isEmpty() + && assignedMessageQueue.getSeekOffset(messageQueue) == -1) { + processQueue.putMessage(pullResult.getMsgFoundList()); + submitConsumeRequest( + new ConsumeRequest( + pullResult.getMsgFoundList(), + messageQueue, + processQueue)); + } + } + break; + case OFFSET_ILLEGAL: + log.warn("The pull request offset illegal, {}", pullResult.toString()); + break; + default: + break; + } + updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue); + } catch (InterruptedException interruptedException) { + log.warn("Polling thread was interrupted.", interruptedException); + } catch (Throwable e) { + pullDelayTimeMills = pullTimeDelayMillsWhenException; + log.error("An error occurred in pull message process.", e); + } + + if (!this.isCancelled()) { + scheduledThreadPoolExecutor.schedule( + this, pullDelayTimeMills, TimeUnit.MILLISECONDS); + } else { + log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue); + } + } + } + + public boolean isCancelled() { + return cancelled; + } + + public void setCancelled(boolean cancelled) { + this.cancelled = cancelled; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + } + + private PullResult pull( + MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return pull( + mq, + subscriptionData, + offset, + maxNums, + this.defaultLitePullConsumer.getConsumerPullTimeoutMillis()); + } + + private PullResult pull( + MessageQueue mq, + SubscriptionData subscriptionData, + long offset, + int maxNums, + long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout); + } + + private PullResult pullSyncImpl( + MessageQueue mq, + SubscriptionData subscriptionData, + long offset, + int maxNums, + boolean block, + long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + + if (null == mq) { + throw new MQClientException("mq is null", null); + } + + if (offset < 0) { + throw new MQClientException("offset < 0", null); + } + + if (maxNums <= 0) { + throw new MQClientException("maxNums <= 0", null); + } + + int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false, true); + + long timeoutMillis = + block + ? this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() + : timeout; + + boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType()); + PullResult pullResult = + this.pullAPIWrapper.pullKernelImpl( + mq, + subscriptionData.getSubString(), + subscriptionData.getExpressionType(), + isTagType ? 0L : subscriptionData.getSubVersion(), + offset, + maxNums, + sysFlag, + 0, + this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis(), + timeoutMillis, + CommunicationMode.SYNC, + null); + this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); + if (!this.consumeMessageHookList.isEmpty()) { + ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext(); + consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace()); + consumeMessageContext.setConsumerGroup(this.groupName()); + consumeMessageContext.setMq(mq); + consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); + consumeMessageContext.setSuccess(false); + this.executeHookBefore(consumeMessageContext); + consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); + consumeMessageContext.setSuccess(true); + this.executeHookAfter(consumeMessageContext); + } + return pullResult; + } + + private void resetTopic(List msgList) { + if (null == msgList || msgList.size() == 0) { + return; + } + + // If namespace not null , reset Topic without namespace. + for (MessageExt messageExt : msgList) { + if (null != this.defaultLitePullConsumer.getNamespace()) { + messageExt.setTopic( + NamespaceUtil.withoutNamespace( + messageExt.getTopic(), + this.defaultLitePullConsumer.getNamespace())); + } + } + } + + public void updateConsumeOffset(MessageQueue mq, long offset) { + checkServiceState(); + this.offsetStore.updateOffset(mq, offset, false); + } + + @Override + public String groupName() { + return this.defaultLitePullConsumer.getConsumerGroup(); + } + + @Override + public MessageModel messageModel() { + return this.defaultLitePullConsumer.getMessageModel(); + } + + @Override + public ConsumeType consumeType() { + return ConsumeType.CONSUME_ACTIVELY; + } + + @Override + public ConsumeFromWhere consumeFromWhere() { + return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; + } + + @Override + public Set subscriptions() { + Set subSet = new HashSet(); + + subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values()); + + return subSet; + } + + @Override + public void doRebalance() { + if (this.rebalanceImpl != null) { + this.rebalanceImpl.doRebalance(false); + } + } + + @Override + public void persistConsumerOffset() { + try { + checkServiceState(); + Set mqs = new HashSet(); + if (this.subscriptionType == SubscriptionType.SUBSCRIBE) { + Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); + mqs.addAll(allocateMq); + } else if (this.subscriptionType == SubscriptionType.ASSIGN) { + Set assignedMessageQueue = + this.assignedMessageQueue.getAssignedMessageQueues(); + mqs.addAll(assignedMessageQueue); + } + this.offsetStore.persistAll(mqs); + } catch (Exception e) { + log.error( + "Persist consumer offset error for group: {} ", + this.defaultLitePullConsumer.getConsumerGroup(), + e); + } + } + + @Override + public void updateTopicSubscribeInfo(String topic, Set info) { + Map subTable = this.rebalanceImpl.getSubscriptionInner(); + if (subTable != null) { + if (subTable.containsKey(topic)) { + this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic, info); + } + } + } + + @Override + public boolean isSubscribeTopicNeedUpdate(String topic) { + Map subTable = this.rebalanceImpl.getSubscriptionInner(); + if (subTable != null) { + if (subTable.containsKey(topic)) { + return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic); + } + } + + return false; + } + + @Override + public boolean isUnitMode() { + return this.defaultLitePullConsumer.isUnitMode(); + } + + @Override + public ConsumerRunningInfo consumerRunningInfo() { + ConsumerRunningInfo info = new ConsumerRunningInfo(); + + Properties prop = MixAll.object2Properties(this.defaultLitePullConsumer); + prop.put( + ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, + String.valueOf(this.consumerStartTimestamp)); + info.setProperties(prop); + + info.getSubscriptionSet().addAll(this.subscriptions()); + return info; + } + + private void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway); + } + + public OffsetStore getOffsetStore() { + return offsetStore; + } + + public DefaultLitePullConsumer getDefaultLitePullConsumer() { + return defaultLitePullConsumer; + } + + public Set fetchMessageQueues(String topic) throws MQClientException { + checkServiceState(); + Set result = + this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic); + return parseMessageQueues(result); + } + + private synchronized void fetchTopicMessageQueuesAndCompare() throws MQClientException { + for (Map.Entry entry : + topicMessageQueueChangeListenerMap.entrySet()) { + String topic = entry.getKey(); + TopicMessageQueueChangeListener topicMessageQueueChangeListener = entry.getValue(); + Set oldMessageQueues = messageQueuesForTopic.get(topic); + Set newMessageQueues = fetchMessageQueues(topic); + boolean isChanged = !isSetEqual(newMessageQueues, oldMessageQueues); + if (isChanged) { + messageQueuesForTopic.put(topic, newMessageQueues); + if (topicMessageQueueChangeListener != null) { + topicMessageQueueChangeListener.onChanged(topic, newMessageQueues); + } + } + } + } + + private boolean isSetEqual(Set set1, Set set2) { + if (set1 == null && set2 == null) { + return true; + } + + if (set1 == null + || set2 == null + || set1.size() != set2.size() + || set1.size() == 0 + || set2.size() == 0) { + return false; + } + + Iterator iter = set2.iterator(); + boolean isEqual = true; + while (iter.hasNext()) { + if (!set1.contains(iter.next())) { + isEqual = false; + } + } + return isEqual; + } + + public synchronized void registerTopicMessageQueueChangeListener( + String topic, TopicMessageQueueChangeListener listener) throws MQClientException { + if (topic == null || listener == null) { + throw new MQClientException("Topic or listener is null", null); + } + if (topicMessageQueueChangeListenerMap.containsKey(topic)) { + log.warn( + "Topic {} had been registered, new listener will overwrite the old one", topic); + } + topicMessageQueueChangeListenerMap.put(topic, listener); + if (this.serviceState == ServiceState.RUNNING) { + Set messageQueues = fetchMessageQueues(topic); + messageQueuesForTopic.put(topic, messageQueues); + } + } + + private Set parseMessageQueues(Set queueSet) { + Set resultQueues = new HashSet(); + for (MessageQueue messageQueue : queueSet) { + String userTopic = + NamespaceUtil.withoutNamespace( + messageQueue.getTopic(), this.defaultLitePullConsumer.getNamespace()); + resultQueues.add( + new MessageQueue( + userTopic, messageQueue.getBrokerName(), messageQueue.getQueueId())); + } + return resultQueues; + } + + public class ConsumeRequest { + private final List messageExts; + private final MessageQueue messageQueue; + private final ProcessQueue processQueue; + + public ConsumeRequest( + final List messageExts, + final MessageQueue messageQueue, + final ProcessQueue processQueue) { + this.messageExts = messageExts; + this.messageQueue = messageQueue; + this.processQueue = processQueue; + } + + public List getMessageExts() { + return messageExts; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + public ProcessQueue getProcessQueue() { + return processQueue; + } + } + + public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) { + this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException; + } +} diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java index 97037a11..8e59096d 100644 --- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java +++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.flink.legacy; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; -import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; @@ -161,7 +160,8 @@ public void open(Configuration parameters) throws Exception { Validate.notEmpty(topic, "Consumer topic can not be empty"); Validate.notEmpty(group, "Consumer group can not be empty"); - String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG); + String tag = + props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG); String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL); Validate.isTrue( !(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)), @@ -221,6 +221,15 @@ public void open(Configuration parameters) throws Exception { String.valueOf(indexOfThisSubTask), String.valueOf(System.nanoTime())); consumer.setInstanceName(instanceName); + + int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE, DEFAULT_CONSUMER_BATCH_SIZE); + consumer.setPullBatchSize(pullBatchSize); + + if (StringUtils.isEmpty(sql)) { + consumer.setSubExpressionForAssign(topic, tag); + } else { + consumer.setSubExpressionForAssign(topic, sql); + } consumer.start(); Counter outputCounter = @@ -250,6 +259,8 @@ public void open(Configuration parameters) throws Exception { RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask()); // If the job recovers from the state, the state has already contained the offsets of last // commit. + consumer.assign(totalQueues); + if (!restored) { initOffsets(messageQueues); } @@ -257,10 +268,6 @@ public void open(Configuration parameters) throws Exception { @Override public void run(SourceContext context) throws Exception { - String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL); - String tag = - props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG); - int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE, DEFAULT_CONSUMER_BATCH_SIZE); timer.scheduleAtFixedRate( () -> { // context.emitWatermark(waterMarkPerQueue.getCurrentWatermark()); @@ -269,12 +276,7 @@ public void run(SourceContext context) throws Exception { 5, 5, TimeUnit.SECONDS); - if (StringUtils.isEmpty(sql)) { - consumer.subscribe(topic, tag); - } else { - // pull with sql do not support block pull. - consumer.subscribe(topic, MessageSelector.bySql(sql)); - } + for (MessageQueue mq : messageQueues) { this.executor.execute( () -> @@ -283,8 +285,9 @@ public void run(SourceContext context) throws Exception { while (runningChecker.isRunning()) { try { Long offset = offsetTable.get(mq); - consumer.setPullBatchSize(pullBatchSize); - consumer.seek(mq, offset); + if (offset != null) { + consumer.seek(mq, offset); + } boolean found = false; List messages = consumer.poll( diff --git a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java index bf489bb1..6e275d34 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java +++ b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java @@ -236,6 +236,7 @@ private Set discoverAndInitializePartitionSplit() new HashSet<>(Collections.unmodifiableSet(discoveredPartitions)); Collection messageQueues = consumer.fetchMessageQueues(topic); + consumer.assign(messageQueues); Set result = new HashSet<>(); for (MessageQueue messageQueue : messageQueues) { Tuple3 topicPartition = @@ -340,10 +341,12 @@ private long getOffsetByMessageQueue(MessageQueue mq) throws MQClientException { switch (consumerOffsetMode) { case CONSUMER_OFFSET_EARLIEST: consumer.seekToBegin(mq); - return -1; + offset = consumer.committed(mq); + break; case CONSUMER_OFFSET_LATEST: consumer.seekToEnd(mq); - return -1; + offset = consumer.committed(mq); + break; case CONSUMER_OFFSET_TIMESTAMP: offset = consumer.offsetForTimestamp(mq, consumerOffsetTimestamp); break; diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java index 72fd96ef..bce7dd16 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java @@ -122,6 +122,7 @@ public RecordsWithSplitIds> fetch() throws IOException { Collection messageQueues; try { messageQueues = consumer.fetchMessageQueues(topic); + consumer.assign(messageQueues); } catch (MQClientException e) { LOG.error( String.format( @@ -330,12 +331,13 @@ private void initialRocketMQConsumer( topic, consumerGroup, "" + System.nanoTime())); - consumer.start(); - if (StringUtils.isNotEmpty(sql)) { - consumer.subscribe(topic, MessageSelector.bySql(sql)); + + if (StringUtils.isEmpty(sql)) { + consumer.setSubExpressionForAssign(topic, tag); } else { - consumer.subscribe(topic, tag); + consumer.setSubExpressionForAssign(topic, sql); } + consumer.start(); } catch (MQClientException e) { LOG.error("Failed to initial RocketMQ consumer.", e); consumer.shutdown(); From 75e991e6dbcc987f83887c7c9afdd76d7fc01909 Mon Sep 17 00:00:00 2001 From: wangkai Date: Tue, 22 Nov 2022 09:18:21 +0800 Subject: [PATCH 2/2] modify checkstyle --- .../flink/source/reader/RocketMQPartitionSplitReader.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java index bce7dd16..8dc04001 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java @@ -21,7 +21,6 @@ import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; -import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue;