diff --git a/pom.xml b/pom.xml
index 1b299484..0ed59983 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
1.8
1.8
- 4.9.2
+ 4.9.4
1.15.0
2.6
2.4.2
diff --git a/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
new file mode 100644
index 00000000..31a1833f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
+import org.apache.rocketmq.client.trace.TraceDispatcher;
+import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.RPCHook;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
+
+ private final InternalLogger log = ClientLogger.getLog();
+
+ private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
+
+ /**
+ * Consumers belonging to the same consumer group share a group id. The consumers in a group
+ * then divides the topic as fairly amongst themselves as possible by establishing that each
+ * queue is only consumed by a single consumer from the group. If all consumers are from the
+ * same group, it functions as a traditional message queue. Each message would be consumed by
+ * one consumer of the group only. When multiple consumer groups exist, the flow of the data
+ * consumption model aligns with the traditional publish-subscribe model. The messages are
+ * broadcast to all consumer groups.
+ */
+ private String consumerGroup;
+
+ /**
+ * Long polling mode, the Consumer connection max suspend time, it is not recommended to modify
+ */
+ private long brokerSuspendMaxTimeMillis = 1000 * 20;
+
+ /**
+ * Long polling mode, the Consumer connection timeout(must greater than
+ * brokerSuspendMaxTimeMillis), it is not recommended to modify
+ */
+ private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
+
+ /** The socket timeout in milliseconds */
+ private long consumerPullTimeoutMillis = 1000 * 10;
+
+ /** Consumption pattern,default is clustering */
+ private MessageModel messageModel = MessageModel.CLUSTERING;
+ /** Message queue listener */
+ private MessageQueueListener messageQueueListener;
+ /** Offset Storage */
+ private OffsetStore offsetStore;
+
+ /** Queue allocation algorithm */
+ private AllocateMessageQueueStrategy allocateMessageQueueStrategy =
+ new AllocateMessageQueueAveragely();
+ /** Whether the unit of subscription group */
+ private boolean unitMode = false;
+
+ /** The flag for auto commit offset */
+ private boolean autoCommit = true;
+
+ /** Pull thread number */
+ private int pullThreadNums = 20;
+
+ /** Minimum commit offset interval time in milliseconds. */
+ private static final long MIN_AUTOCOMMIT_INTERVAL_MILLIS = 1000;
+
+ /** Maximum commit offset interval time in milliseconds. */
+ private long autoCommitIntervalMillis = 5 * 1000;
+
+ /** Maximum number of messages pulled each time. */
+ private int pullBatchSize = 10;
+
+ /**
+ * Flow control threshold for consume request, each consumer will cache at most 10000 consume
+ * requests by default. Consider the {@code pullBatchSize}, the instantaneous value may exceed
+ * the limit
+ */
+ private long pullThresholdForAll = 10000;
+
+ /** Consume max span offset. */
+ private int consumeMaxSpan = 2000;
+
+ /**
+ * Flow control threshold on queue level, each message queue will cache at most 1000 messages by
+ * default, Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
+ */
+ private int pullThresholdForQueue = 1000;
+
+ /**
+ * Limit the cached message size on queue level, each message queue will cache at most 100 MiB
+ * messages by default, Consider the {@code pullBatchSize}, the instantaneous value may exceed
+ * the limit
+ *
+ *
The size of a message only measured by message body, so it's not accurate
+ */
+ private int pullThresholdSizeForQueue = 100;
+
+ /** The poll timeout in milliseconds */
+ private long pollTimeoutMillis = 1000 * 5;
+
+ /** Interval time in in milliseconds for checking changes in topic metadata. */
+ private long topicMetadataCheckIntervalMillis = 30 * 1000;
+
+ private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
+
+ /**
+ * Backtracking consumption time with second precision. Time format is 20131223171201
+ * Implying Seventeen twelve and 01 seconds on December 23, 2013 year
+ * Default backtracking consumption time Half an hour ago.
+ */
+ private String consumeTimestamp =
+ UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
+
+ /** Interface of asynchronous transfer data */
+ private TraceDispatcher traceDispatcher = null;
+
+ /** The flag for message trace */
+ private boolean enableMsgTrace = false;
+
+ /**
+ * The name value of message trace topic.If you don't config,you can use the default trace topic
+ * name.
+ */
+ private String customizedTraceTopic;
+
+ /** Default constructor. */
+ public DefaultLitePullConsumer() {
+ this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
+ }
+
+ /**
+ * Constructor specifying consumer group.
+ *
+ * @param consumerGroup Consumer group.
+ */
+ public DefaultLitePullConsumer(final String consumerGroup) {
+ this(null, consumerGroup, null);
+ }
+
+ /**
+ * Constructor specifying RPC hook.
+ *
+ * @param rpcHook RPC hook to execute before each remoting command.
+ */
+ public DefaultLitePullConsumer(RPCHook rpcHook) {
+ this(null, MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
+ }
+
+ /**
+ * Constructor specifying consumer group, RPC hook
+ *
+ * @param consumerGroup Consumer group.
+ * @param rpcHook RPC hook to execute before each remoting command.
+ */
+ public DefaultLitePullConsumer(final String consumerGroup, RPCHook rpcHook) {
+ this(null, consumerGroup, rpcHook);
+ }
+
+ /**
+ * Constructor specifying namespace, consumer group and RPC hook.
+ *
+ * @param consumerGroup Consumer group.
+ * @param rpcHook RPC hook to execute before each remoting command.
+ */
+ public DefaultLitePullConsumer(
+ final String namespace, final String consumerGroup, RPCHook rpcHook) {
+ this.namespace = namespace;
+ this.consumerGroup = consumerGroup;
+ this.enableStreamRequestType = true;
+ defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, rpcHook);
+ }
+
+ @Override
+ public void start() throws MQClientException {
+ setTraceDispatcher();
+ setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
+ this.defaultLitePullConsumerImpl.start();
+ if (null != traceDispatcher) {
+ try {
+ traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
+ } catch (MQClientException e) {
+ log.warn("trace dispatcher start failed ", e);
+ }
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ this.defaultLitePullConsumerImpl.shutdown();
+ if (null != traceDispatcher) {
+ traceDispatcher.shutdown();
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.defaultLitePullConsumerImpl.isRunning();
+ }
+
+ @Override
+ public void subscribe(String topic, String subExpression) throws MQClientException {
+ this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
+ }
+
+ @Override
+ public void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
+ this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), messageSelector);
+ }
+
+ @Override
+ public void unsubscribe(String topic) {
+ this.defaultLitePullConsumerImpl.unsubscribe(withNamespace(topic));
+ }
+
+ @Override
+ public void assign(Collection messageQueues) {
+ defaultLitePullConsumerImpl.assign(queuesWithNamespace(messageQueues));
+ }
+
+ @Override
+ public List poll() {
+ return defaultLitePullConsumerImpl.poll(this.getPollTimeoutMillis());
+ }
+
+ @Override
+ public List poll(long timeout) {
+ return defaultLitePullConsumerImpl.poll(timeout);
+ }
+
+ @Override
+ public void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+ this.defaultLitePullConsumerImpl.seek(queueWithNamespace(messageQueue), offset);
+ }
+
+ @Override
+ public void pause(Collection messageQueues) {
+ this.defaultLitePullConsumerImpl.pause(queuesWithNamespace(messageQueues));
+ }
+
+ @Override
+ public void resume(Collection messageQueues) {
+ this.defaultLitePullConsumerImpl.resume(queuesWithNamespace(messageQueues));
+ }
+
+ @Override
+ public Collection fetchMessageQueues(String topic) throws MQClientException {
+ return this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
+ }
+
+ @Override
+ public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp)
+ throws MQClientException {
+ return this.defaultLitePullConsumerImpl.searchOffset(
+ queueWithNamespace(messageQueue), timestamp);
+ }
+
+ @Override
+ public void registerTopicMessageQueueChangeListener(
+ String topic, TopicMessageQueueChangeListener topicMessageQueueChangeListener)
+ throws MQClientException {
+ this.defaultLitePullConsumerImpl.registerTopicMessageQueueChangeListener(
+ withNamespace(topic), topicMessageQueueChangeListener);
+ }
+
+ @Override
+ public void commitSync() {
+ this.defaultLitePullConsumerImpl.commitAll();
+ }
+
+ @Override
+ public void commit(final Set messageQueues, boolean persist) {
+ this.defaultLitePullConsumerImpl.commit(messageQueues, persist);
+ }
+
+ @Override
+ public Long committed(MessageQueue messageQueue) throws MQClientException {
+ return this.defaultLitePullConsumerImpl.committed(queueWithNamespace(messageQueue));
+ }
+
+ @Override
+ public void setSubExpressionForAssign(final String topic, final String subExpresion) {
+ defaultLitePullConsumerImpl.setSubExpressionForAssign(withNamespace(topic), subExpresion);
+ }
+
+ @Override
+ public void updateNameServerAddress(String nameServerAddress) {
+ this.defaultLitePullConsumerImpl.updateNameServerAddr(nameServerAddress);
+ }
+
+ @Override
+ public void seekToBegin(MessageQueue messageQueue) throws MQClientException {
+ this.defaultLitePullConsumerImpl.seekToBegin(queueWithNamespace(messageQueue));
+ }
+
+ @Override
+ public void seekToEnd(MessageQueue messageQueue) throws MQClientException {
+ this.defaultLitePullConsumerImpl.seekToEnd(queueWithNamespace(messageQueue));
+ }
+
+ @Override
+ public boolean isAutoCommit() {
+ return autoCommit;
+ }
+
+ @Override
+ public void setAutoCommit(boolean autoCommit) {
+ this.autoCommit = autoCommit;
+ }
+
+ public boolean isConnectBrokerByUser() {
+ return this.defaultLitePullConsumerImpl.getPullAPIWrapper().isConnectBrokerByUser();
+ }
+
+ public void setConnectBrokerByUser(boolean connectBrokerByUser) {
+ this.defaultLitePullConsumerImpl
+ .getPullAPIWrapper()
+ .setConnectBrokerByUser(connectBrokerByUser);
+ }
+
+ public long getDefaultBrokerId() {
+ return this.defaultLitePullConsumerImpl.getPullAPIWrapper().getDefaultBrokerId();
+ }
+
+ public void setDefaultBrokerId(long defaultBrokerId) {
+ this.defaultLitePullConsumerImpl.getPullAPIWrapper().setDefaultBrokerId(defaultBrokerId);
+ }
+
+ public int getPullThreadNums() {
+ return pullThreadNums;
+ }
+
+ public void setPullThreadNums(int pullThreadNums) {
+ this.pullThreadNums = pullThreadNums;
+ }
+
+ public long getAutoCommitIntervalMillis() {
+ return autoCommitIntervalMillis;
+ }
+
+ public void setAutoCommitIntervalMillis(long autoCommitIntervalMillis) {
+ if (autoCommitIntervalMillis >= MIN_AUTOCOMMIT_INTERVAL_MILLIS) {
+ this.autoCommitIntervalMillis = autoCommitIntervalMillis;
+ }
+ }
+
+ public int getPullBatchSize() {
+ return pullBatchSize;
+ }
+
+ public void setPullBatchSize(int pullBatchSize) {
+ this.pullBatchSize = pullBatchSize;
+ }
+
+ public long getPullThresholdForAll() {
+ return pullThresholdForAll;
+ }
+
+ public void setPullThresholdForAll(long pullThresholdForAll) {
+ this.pullThresholdForAll = pullThresholdForAll;
+ }
+
+ public int getConsumeMaxSpan() {
+ return consumeMaxSpan;
+ }
+
+ public void setConsumeMaxSpan(int consumeMaxSpan) {
+ this.consumeMaxSpan = consumeMaxSpan;
+ }
+
+ public int getPullThresholdForQueue() {
+ return pullThresholdForQueue;
+ }
+
+ public void setPullThresholdForQueue(int pullThresholdForQueue) {
+ this.pullThresholdForQueue = pullThresholdForQueue;
+ }
+
+ public int getPullThresholdSizeForQueue() {
+ return pullThresholdSizeForQueue;
+ }
+
+ public void setPullThresholdSizeForQueue(int pullThresholdSizeForQueue) {
+ this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
+ }
+
+ public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
+ return allocateMessageQueueStrategy;
+ }
+
+ public void setAllocateMessageQueueStrategy(
+ AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
+ this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+ }
+
+ public long getBrokerSuspendMaxTimeMillis() {
+ return brokerSuspendMaxTimeMillis;
+ }
+
+ public long getPollTimeoutMillis() {
+ return pollTimeoutMillis;
+ }
+
+ public void setPollTimeoutMillis(long pollTimeoutMillis) {
+ this.pollTimeoutMillis = pollTimeoutMillis;
+ }
+
+ public OffsetStore getOffsetStore() {
+ return offsetStore;
+ }
+
+ public void setOffsetStore(OffsetStore offsetStore) {
+ this.offsetStore = offsetStore;
+ }
+
+ @Override
+ public boolean isUnitMode() {
+ return unitMode;
+ }
+
+ @Override
+ public void setUnitMode(boolean isUnitMode) {
+ this.unitMode = isUnitMode;
+ }
+
+ public MessageModel getMessageModel() {
+ return messageModel;
+ }
+
+ public void setMessageModel(MessageModel messageModel) {
+ this.messageModel = messageModel;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public MessageQueueListener getMessageQueueListener() {
+ return messageQueueListener;
+ }
+
+ public void setMessageQueueListener(MessageQueueListener messageQueueListener) {
+ this.messageQueueListener = messageQueueListener;
+ }
+
+ public long getConsumerPullTimeoutMillis() {
+ return consumerPullTimeoutMillis;
+ }
+
+ public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) {
+ this.consumerPullTimeoutMillis = consumerPullTimeoutMillis;
+ }
+
+ public long getConsumerTimeoutMillisWhenSuspend() {
+ return consumerTimeoutMillisWhenSuspend;
+ }
+
+ public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) {
+ this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
+ }
+
+ public long getTopicMetadataCheckIntervalMillis() {
+ return topicMetadataCheckIntervalMillis;
+ }
+
+ public void setTopicMetadataCheckIntervalMillis(long topicMetadataCheckIntervalMillis) {
+ this.topicMetadataCheckIntervalMillis = topicMetadataCheckIntervalMillis;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public ConsumeFromWhere getConsumeFromWhere() {
+ return consumeFromWhere;
+ }
+
+ public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+ if (consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
+ && consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
+ && consumeFromWhere != ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
+ throw new RuntimeException("Invalid ConsumeFromWhere Value", null);
+ }
+ this.consumeFromWhere = consumeFromWhere;
+ }
+
+ public String getConsumeTimestamp() {
+ return consumeTimestamp;
+ }
+
+ public void setConsumeTimestamp(String consumeTimestamp) {
+ this.consumeTimestamp = consumeTimestamp;
+ }
+
+ public TraceDispatcher getTraceDispatcher() {
+ return traceDispatcher;
+ }
+
+ public void setCustomizedTraceTopic(String customizedTraceTopic) {
+ this.customizedTraceTopic = customizedTraceTopic;
+ }
+
+ private void setTraceDispatcher() {
+ if (isEnableMsgTrace()) {
+ try {
+ AsyncTraceDispatcher traceDispatcher =
+ new AsyncTraceDispatcher(
+ consumerGroup,
+ TraceDispatcher.Type.CONSUME,
+ customizedTraceTopic,
+ null);
+ traceDispatcher.getTraceProducer().setUseTLS(this.isUseTLS());
+ this.traceDispatcher = traceDispatcher;
+ this.defaultLitePullConsumerImpl.registerConsumeMessageHook(
+ new ConsumeMessageTraceHookImpl(traceDispatcher));
+ } catch (Throwable e) {
+ log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
+ }
+ }
+ }
+
+ public String getCustomizedTraceTopic() {
+ return customizedTraceTopic;
+ }
+
+ public boolean isEnableMsgTrace() {
+ return enableMsgTrace;
+ }
+
+ public void setEnableMsgTrace(boolean enableMsgTrace) {
+ this.enableMsgTrace = enableMsgTrace;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
new file mode 100644
index 00000000..9b1faf8b
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public interface LitePullConsumer {
+
+ /** Start the consumer */
+ void start() throws MQClientException;
+
+ /** Shutdown the consumer */
+ void shutdown();
+
+ /**
+ * This consumer is still running
+ *
+ * @return true if consumer is still running
+ */
+ boolean isRunning();
+
+ /**
+ * Subscribe some topic with subExpression
+ *
+ * @param subExpression subscription expression.it only support or operation such as "tag1 ||
+ * tag2 || tag3"
+ * if null or * expression,meaning subscribe all
+ * @throws MQClientException if there is any client error.
+ */
+ void subscribe(final String topic, final String subExpression) throws MQClientException;
+
+ /**
+ * Subscribe some topic with selector.
+ *
+ * @param selector message selector({@link MessageSelector}), can be null.
+ * @throws MQClientException if there is any client error.
+ */
+ void subscribe(final String topic, final MessageSelector selector) throws MQClientException;
+
+ /**
+ * Unsubscribe consumption some topic
+ *
+ * @param topic Message topic that needs to be unsubscribe.
+ */
+ void unsubscribe(final String topic);
+
+ /**
+ * Manually assign a list of message queues to this consumer. This interface does not allow for
+ * incremental assignment and will replace the previous assignment (if there is one).
+ *
+ * @param messageQueues Message queues that needs to be assigned.
+ */
+ void assign(Collection messageQueues);
+
+ /**
+ * Fetch data for the topics or partitions specified using assign API
+ *
+ * @return list of message, can be null.
+ */
+ List poll();
+
+ /**
+ * Fetch data for the topics or partitions specified using assign API
+ *
+ * @param timeout The amount time, in milliseconds, spent waiting in poll if data is not
+ * available. Must not be negative
+ * @return list of message, can be null.
+ */
+ List poll(long timeout);
+
+ /**
+ * Overrides the fetch offsets that the consumer will use on the next poll. If this API is
+ * invoked for the same message queue more than once, the latest offset will be used on the next
+ * poll(). Note that you may lose data if this API is arbitrarily used in the middle of
+ * consumption.
+ *
+ * @param messageQueue
+ * @param offset
+ */
+ void seek(MessageQueue messageQueue, long offset) throws MQClientException;
+
+ /**
+ * Set topic subExpression for assign mode. This interface does not allow be call after start().
+ * Default value is * if not set. assignment and will replace the previous assignment (if there
+ * is one).
+ *
+ * @param subExpression subscription expression.it only support or operation such as "tag1 ||
+ * tag2 || tag3"
+ * if * null or * expression,meaning subscribe all
+ */
+ void setSubExpressionForAssign(final String topic, final String subExpression);
+
+ /**
+ * Suspend pulling from the requested message queues.
+ *
+ * Because of the implementation of pre-pull, fetch data in {@link #poll()} will not stop
+ * immediately until the messages of the requested message queues drain.
+ *
+ *
Note that this method does not affect message queue subscription. In particular, it does
+ * not cause a group rebalance.
+ *
+ * @param messageQueues Message queues that needs to be paused.
+ */
+ void pause(Collection messageQueues);
+
+ /**
+ * Resume specified message queues which have been paused with {@link #pause(Collection)}.
+ *
+ * @param messageQueues Message queues that needs to be resumed.
+ */
+ void resume(Collection messageQueues);
+
+ /**
+ * Whether to enable auto-commit consume offset.
+ *
+ * @return true if enable auto-commit, false if disable auto-commit.
+ */
+ boolean isAutoCommit();
+
+ /**
+ * Set whether to enable auto-commit consume offset.
+ *
+ * @param autoCommit Whether to enable auto-commit.
+ */
+ void setAutoCommit(boolean autoCommit);
+
+ /**
+ * Get metadata about the message queues for a given topic.
+ *
+ * @param topic The topic that need to get metadata.
+ * @return collection of message queues
+ * @throws MQClientException if there is any client error.
+ */
+ Collection fetchMessageQueues(String topic) throws MQClientException;
+
+ /**
+ * Look up the offsets for the given message queue by timestamp. The returned offset for each
+ * message queue is the earliest offset whose timestamp is greater than or equal to the given
+ * timestamp in the corresponding message queue.
+ *
+ * @param messageQueue Message queues that needs to get offset by timestamp.
+ * @param timestamp
+ * @return offset
+ * @throws MQClientException if there is any client error.
+ */
+ Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) throws MQClientException;
+
+ /** Manually commit consume offset. */
+ void commitSync();
+
+ void commit(final Set messageQueues, boolean persist);
+
+ /**
+ * Get the last committed offset for the given message queue.
+ *
+ * @param messageQueue
+ * @return offset, if offset equals -1 means no offset in broker.
+ * @throws MQClientException if there is any client error.
+ */
+ Long committed(MessageQueue messageQueue) throws MQClientException;
+
+ /**
+ * Register a callback for sensing topic metadata changes.
+ *
+ * @param topic The topic that need to monitor.
+ * @param topicMessageQueueChangeListener Callback when topic metadata changes, refer {@link
+ * TopicMessageQueueChangeListener}
+ * @throws MQClientException if there is any client error.
+ */
+ void registerTopicMessageQueueChangeListener(
+ String topic, TopicMessageQueueChangeListener topicMessageQueueChangeListener)
+ throws MQClientException;
+
+ /** Update name server addresses. */
+ void updateNameServerAddress(String nameServerAddress);
+
+ /**
+ * Overrides the fetch offsets with the begin offset that the consumer will use on the next
+ * poll. If this API is invoked for the same message queue more than once, the latest offset
+ * will be used on the next poll(). Note that you may lose data if this API is arbitrarily used
+ * in the middle of consumption.
+ *
+ * @param messageQueue
+ */
+ void seekToBegin(MessageQueue messageQueue) throws MQClientException;
+
+ /**
+ * Overrides the fetch offsets with the end offset that the consumer will use on the next poll.
+ * If this API is invoked for the same message queue more than once, the latest offset will be
+ * used on the next poll(). Note that you may lose data if this API is arbitrarily used in the
+ * middle of consumption.
+ *
+ * @param messageQueue
+ */
+ void seekToEnd(MessageQueue messageQueue) throws MQClientException;
+}
diff --git a/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
new file mode 100644
index 00000000..6f5e6030
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -0,0 +1,1322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.Validators;
+import org.apache.rocketmq.client.consumer.*;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.hook.FilterMessageHook;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+public class DefaultLitePullConsumerImpl implements MQConsumerInner {
+
+ private final InternalLogger log = ClientLogger.getLog();
+
+ private final long consumerStartTimestamp = System.currentTimeMillis();
+
+ private final RPCHook rpcHook;
+
+ private final ArrayList filterMessageHookList =
+ new ArrayList();
+
+ private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
+
+ protected MQClientInstance mQClientFactory;
+
+ private PullAPIWrapper pullAPIWrapper;
+
+ private OffsetStore offsetStore;
+
+ private RebalanceImpl rebalanceImpl = new RebalanceLitePullImpl(this);
+
+ private enum SubscriptionType {
+ NONE,
+ SUBSCRIBE,
+ ASSIGN
+ }
+
+ private static final String NOT_RUNNING_EXCEPTION_MESSAGE =
+ "The consumer not running, please start it first.";
+
+ private static final String SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE =
+ "Subscribe and assign are mutually exclusive.";
+ /** the type of subscription */
+ private SubscriptionType subscriptionType = SubscriptionType.NONE;
+ /** Delay some time when exception occur */
+ private long pullTimeDelayMillsWhenException = 1000;
+ /** Flow control interval */
+ private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
+ /** Delay some time when suspend pull service */
+ private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
+
+ private static final long PULL_TIME_DELAY_MILLS_ON_EXCEPTION = 3 * 1000;
+
+ private ConcurrentHashMap topicToSubExpression =
+ new ConcurrentHashMap<>();
+
+ private DefaultLitePullConsumer defaultLitePullConsumer;
+
+ private final ConcurrentMap taskTable =
+ new ConcurrentHashMap();
+
+ private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue();
+
+ private final BlockingQueue consumeRequestCache =
+ new LinkedBlockingQueue();
+
+ private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
+
+ private final ScheduledExecutorService scheduledExecutorService;
+
+ private Map topicMessageQueueChangeListenerMap =
+ new HashMap();
+
+ private Map> messageQueuesForTopic =
+ new HashMap>();
+
+ private long consumeRequestFlowControlTimes = 0L;
+
+ private long queueFlowControlTimes = 0L;
+
+ private long queueMaxSpanFlowControlTimes = 0L;
+
+ private long nextAutoCommitDeadline = -1L;
+
+ private final MessageQueueLock messageQueueLock = new MessageQueueLock();
+
+ private final ArrayList consumeMessageHookList =
+ new ArrayList();
+
+ public DefaultLitePullConsumerImpl(
+ final DefaultLitePullConsumer defaultLitePullConsumer, final RPCHook rpcHook) {
+ this.defaultLitePullConsumer = defaultLitePullConsumer;
+ this.rpcHook = rpcHook;
+ this.scheduledThreadPoolExecutor =
+ new ScheduledThreadPoolExecutor(
+ this.defaultLitePullConsumer.getPullThreadNums(),
+ new ThreadFactoryImpl(
+ "PullMsgThread-"
+ + this.defaultLitePullConsumer.getConsumerGroup()));
+ this.scheduledExecutorService =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "MonitorMessageQueueChangeThread");
+ }
+ });
+ this.pullTimeDelayMillsWhenException =
+ defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
+ }
+
+ public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
+ this.consumeMessageHookList.add(hook);
+ log.info("register consumeMessageHook Hook, {}", hook.hookName());
+ }
+
+ public void executeHookBefore(final ConsumeMessageContext context) {
+ if (!this.consumeMessageHookList.isEmpty()) {
+ for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+ try {
+ hook.consumeMessageBefore(context);
+ } catch (Throwable e) {
+ log.error(
+ "consumeMessageHook {} executeHookBefore exception",
+ hook.hookName(),
+ e);
+ }
+ }
+ }
+ }
+
+ public void executeHookAfter(final ConsumeMessageContext context) {
+ if (!this.consumeMessageHookList.isEmpty()) {
+ for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+ try {
+ hook.consumeMessageAfter(context);
+ } catch (Throwable e) {
+ log.error(
+ "consumeMessageHook {} executeHookAfter exception", hook.hookName(), e);
+ }
+ }
+ }
+ }
+
+ private void checkServiceState() {
+ if (this.serviceState != ServiceState.RUNNING) {
+ throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
+ }
+ }
+
+ public void updateNameServerAddr(String newAddresses) {
+ this.mQClientFactory.getMQClientAPIImpl().updateNameServerAddressList(newAddresses);
+ }
+
+ private synchronized void setSubscriptionType(SubscriptionType type) {
+ if (this.subscriptionType == SubscriptionType.NONE) {
+ this.subscriptionType = type;
+ } else if (this.subscriptionType != type) {
+ throw new IllegalStateException(SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE);
+ }
+ }
+
+ private void updateAssignedMessageQueue(String topic, Set assignedMessageQueue) {
+ this.assignedMessageQueue.updateAssignedMessageQueue(topic, assignedMessageQueue);
+ }
+
+ private void updatePullTask(String topic, Set mqNewSet) {
+ Iterator> it = this.taskTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry next = it.next();
+ if (next.getKey().getTopic().equals(topic)) {
+ if (!mqNewSet.contains(next.getKey())) {
+ next.getValue().setCancelled(true);
+ it.remove();
+ }
+ }
+ }
+ startPullTask(mqNewSet);
+ }
+
+ class MessageQueueListenerImpl implements MessageQueueListener {
+ @Override
+ public void messageQueueChanged(
+ String topic, Set mqAll, Set mqDivided) {
+ MessageModel messageModel = defaultLitePullConsumer.getMessageModel();
+ switch (messageModel) {
+ case BROADCASTING:
+ updateAssignedMessageQueue(topic, mqAll);
+ updatePullTask(topic, mqAll);
+ break;
+ case CLUSTERING:
+ updateAssignedMessageQueue(topic, mqDivided);
+ updatePullTask(topic, mqDivided);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ public synchronized void shutdown() {
+ switch (this.serviceState) {
+ case CREATE_JUST:
+ break;
+ case RUNNING:
+ persistConsumerOffset();
+ this.mQClientFactory.unregisterConsumer(
+ this.defaultLitePullConsumer.getConsumerGroup());
+ scheduledThreadPoolExecutor.shutdown();
+ scheduledExecutorService.shutdown();
+ this.mQClientFactory.shutdown();
+ this.serviceState = ServiceState.SHUTDOWN_ALREADY;
+ log.info(
+ "the consumer [{}] shutdown OK",
+ this.defaultLitePullConsumer.getConsumerGroup());
+ break;
+ default:
+ break;
+ }
+ }
+
+ public synchronized boolean isRunning() {
+ return this.serviceState == ServiceState.RUNNING;
+ }
+
+ public synchronized void start() throws MQClientException {
+ switch (this.serviceState) {
+ case CREATE_JUST:
+ this.serviceState = ServiceState.START_FAILED;
+
+ this.checkConfig();
+
+ if (this.defaultLitePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
+ this.defaultLitePullConsumer.changeInstanceNameToPID();
+ }
+
+ initMQClientFactory();
+
+ initRebalanceImpl();
+
+ initPullAPIWrapper();
+
+ initOffsetStore();
+
+ mQClientFactory.start();
+
+ startScheduleTask();
+
+ this.serviceState = ServiceState.RUNNING;
+
+ log.info(
+ "the consumer [{}] start OK",
+ this.defaultLitePullConsumer.getConsumerGroup());
+
+ operateAfterRunning();
+
+ break;
+ case RUNNING:
+ case START_FAILED:
+ case SHUTDOWN_ALREADY:
+ throw new MQClientException(
+ "The PullConsumer service state not OK, maybe started once, "
+ + this.serviceState
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+ null);
+ default:
+ break;
+ }
+ }
+
+ private void initMQClientFactory() throws MQClientException {
+ this.mQClientFactory =
+ MQClientManager.getInstance()
+ .getOrCreateMQClientInstance(this.defaultLitePullConsumer, this.rpcHook);
+ boolean registerOK =
+ mQClientFactory.registerConsumer(
+ this.defaultLitePullConsumer.getConsumerGroup(), this);
+ if (!registerOK) {
+ this.serviceState = ServiceState.CREATE_JUST;
+
+ throw new MQClientException(
+ "The consumer group["
+ + this.defaultLitePullConsumer.getConsumerGroup()
+ + "] has been created before, specify another name please."
+ + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
+ null);
+ }
+ }
+
+ private void initRebalanceImpl() {
+ this.rebalanceImpl.setConsumerGroup(this.defaultLitePullConsumer.getConsumerGroup());
+ this.rebalanceImpl.setMessageModel(this.defaultLitePullConsumer.getMessageModel());
+ this.rebalanceImpl.setAllocateMessageQueueStrategy(
+ this.defaultLitePullConsumer.getAllocateMessageQueueStrategy());
+ this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
+ }
+
+ private void initPullAPIWrapper() {
+ this.pullAPIWrapper =
+ new PullAPIWrapper(
+ mQClientFactory,
+ this.defaultLitePullConsumer.getConsumerGroup(),
+ isUnitMode());
+ this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
+ }
+
+ private void initOffsetStore() throws MQClientException {
+ if (this.defaultLitePullConsumer.getOffsetStore() != null) {
+ this.offsetStore = this.defaultLitePullConsumer.getOffsetStore();
+ } else {
+ switch (this.defaultLitePullConsumer.getMessageModel()) {
+ case BROADCASTING:
+ this.offsetStore =
+ new LocalFileOffsetStore(
+ this.mQClientFactory,
+ this.defaultLitePullConsumer.getConsumerGroup());
+ break;
+ case CLUSTERING:
+ this.offsetStore =
+ new RemoteBrokerOffsetStore(
+ this.mQClientFactory,
+ this.defaultLitePullConsumer.getConsumerGroup());
+ break;
+ default:
+ break;
+ }
+ this.defaultLitePullConsumer.setOffsetStore(this.offsetStore);
+ }
+ this.offsetStore.load();
+ }
+
+ private void startScheduleTask() {
+ scheduledExecutorService.scheduleAtFixedRate(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ fetchTopicMessageQueuesAndCompare();
+ } catch (Exception e) {
+ log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
+ }
+ }
+ },
+ 1000 * 10,
+ this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ private void operateAfterRunning() throws MQClientException {
+ // If subscribe function invoke before start function, then update topic subscribe info
+ // after initialization.
+ if (subscriptionType == SubscriptionType.SUBSCRIBE) {
+ updateTopicSubscribeInfoWhenSubscriptionChanged();
+ }
+ // If assign function invoke before start function, then update pull task after
+ // initialization.
+ if (subscriptionType == SubscriptionType.ASSIGN) {
+ updateAssignPullTask(assignedMessageQueue.messageQueues());
+ }
+
+ for (String topic : topicMessageQueueChangeListenerMap.keySet()) {
+ Set messageQueues = fetchMessageQueues(topic);
+ messageQueuesForTopic.put(topic, messageQueues);
+ }
+ this.mQClientFactory.checkClientInBroker();
+ }
+
+ private void checkConfig() throws MQClientException {
+ // Check consumerGroup
+ Validators.checkGroup(this.defaultLitePullConsumer.getConsumerGroup());
+
+ // Check consumerGroup name is not equal default consumer group name.
+ if (this.defaultLitePullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
+ throw new MQClientException(
+ "consumerGroup can not equal "
+ + MixAll.DEFAULT_CONSUMER_GROUP
+ + ", please specify another one."
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // Check messageModel is not null.
+ if (null == this.defaultLitePullConsumer.getMessageModel()) {
+ throw new MQClientException(
+ "messageModel is null" + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ // Check allocateMessageQueueStrategy is not null
+ if (null == this.defaultLitePullConsumer.getAllocateMessageQueueStrategy()) {
+ throw new MQClientException(
+ "allocateMessageQueueStrategy is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ if (this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend()
+ < this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis()) {
+ throw new MQClientException(
+ "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+ }
+
+ public PullAPIWrapper getPullAPIWrapper() {
+ return pullAPIWrapper;
+ }
+
+ private void startPullTask(Collection mqSet) {
+ for (MessageQueue messageQueue : mqSet) {
+ if (!this.taskTable.containsKey(messageQueue)) {
+ PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
+ this.taskTable.put(messageQueue, pullTask);
+ this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ private void updateAssignPullTask(Collection mqNewSet) {
+ Iterator> it = this.taskTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry next = it.next();
+ if (!mqNewSet.contains(next.getKey())) {
+ next.getValue().setCancelled(true);
+ it.remove();
+ }
+ }
+
+ startPullTask(mqNewSet);
+ }
+
+ private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
+ Map subTable = rebalanceImpl.getSubscriptionInner();
+ if (subTable != null) {
+ for (final Map.Entry entry : subTable.entrySet()) {
+ final String topic = entry.getKey();
+ this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+ }
+ }
+ }
+
+ public synchronized void subscribe(String topic, String subExpression)
+ throws MQClientException {
+ try {
+ if (topic == null || "".equals(topic)) {
+ throw new IllegalArgumentException("Topic can not be null or empty.");
+ }
+ setSubscriptionType(SubscriptionType.SUBSCRIBE);
+ SubscriptionData subscriptionData =
+ FilterAPI.buildSubscriptionData(topic, subExpression);
+ this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+ this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
+ assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
+ if (serviceState == ServiceState.RUNNING) {
+ this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+ updateTopicSubscribeInfoWhenSubscriptionChanged();
+ }
+ } catch (Exception e) {
+ throw new MQClientException("subscribe exception", e);
+ }
+ }
+
+ public synchronized void subscribe(String topic, MessageSelector messageSelector)
+ throws MQClientException {
+ try {
+ if (topic == null || "".equals(topic)) {
+ throw new IllegalArgumentException("Topic can not be null or empty.");
+ }
+ setSubscriptionType(SubscriptionType.SUBSCRIBE);
+ if (messageSelector == null) {
+ subscribe(topic, SubscriptionData.SUB_ALL);
+ return;
+ }
+ SubscriptionData subscriptionData =
+ FilterAPI.build(
+ topic,
+ messageSelector.getExpression(),
+ messageSelector.getExpressionType());
+ this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+ this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
+ assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
+ if (serviceState == ServiceState.RUNNING) {
+ this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+ updateTopicSubscribeInfoWhenSubscriptionChanged();
+ }
+ } catch (Exception e) {
+ throw new MQClientException("subscribe exception", e);
+ }
+ }
+
+ public synchronized void unsubscribe(final String topic) {
+ this.rebalanceImpl.getSubscriptionInner().remove(topic);
+ removePullTaskCallback(topic);
+ assignedMessageQueue.removeAssignedMessageQueue(topic);
+ }
+
+ public synchronized void assign(Collection messageQueues) {
+ if (messageQueues == null || messageQueues.isEmpty()) {
+ throw new IllegalArgumentException("Message queues can not be null or empty.");
+ }
+ setSubscriptionType(SubscriptionType.ASSIGN);
+ assignedMessageQueue.updateAssignedMessageQueue(messageQueues);
+ if (serviceState == ServiceState.RUNNING) {
+ updateAssignPullTask(messageQueues);
+ }
+ }
+
+ private void maybeAutoCommit() {
+ long now = System.currentTimeMillis();
+ if (now >= nextAutoCommitDeadline) {
+ commitAll();
+ nextAutoCommitDeadline = now + defaultLitePullConsumer.getAutoCommitIntervalMillis();
+ }
+ }
+
+ public synchronized List poll(long timeout) {
+ try {
+ checkServiceState();
+ if (timeout < 0) {
+ throw new IllegalArgumentException("Timeout must not be negative");
+ }
+
+ if (defaultLitePullConsumer.isAutoCommit()) {
+ maybeAutoCommit();
+ }
+ long endTime = System.currentTimeMillis() + timeout;
+
+ ConsumeRequest consumeRequest =
+ consumeRequestCache.poll(
+ endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+
+ if (endTime - System.currentTimeMillis() > 0) {
+ while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
+ consumeRequest =
+ consumeRequestCache.poll(
+ endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ if (endTime - System.currentTimeMillis() <= 0) {
+ break;
+ }
+ }
+ }
+
+ if (consumeRequest != null && !consumeRequest.getProcessQueue().isDropped()) {
+ List messages = consumeRequest.getMessageExts();
+ long offset = consumeRequest.getProcessQueue().removeMessage(messages);
+ assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
+ // If namespace not null , reset Topic without namespace.
+ this.resetTopic(messages);
+ return messages;
+ }
+ } catch (InterruptedException ignore) {
+
+ }
+
+ return Collections.emptyList();
+ }
+
+ public synchronized void setSubExpressionForAssign(
+ final String topic, final String subExpression) {
+ if (StringUtils.isBlank(subExpression)) {
+ throw new IllegalArgumentException("subExpression can not be null or empty.");
+ }
+ if (serviceState != ServiceState.CREATE_JUST) {
+ throw new IllegalStateException("setAssignTag only can be called before start.");
+ }
+ setSubscriptionType(SubscriptionType.ASSIGN);
+ topicToSubExpression.put(topic, subExpression);
+ }
+
+ public void pause(Collection messageQueues) {
+ assignedMessageQueue.pause(messageQueues);
+ }
+
+ public void resume(Collection messageQueues) {
+ assignedMessageQueue.resume(messageQueues);
+ }
+
+ public synchronized void seek(MessageQueue messageQueue, long offset) throws MQClientException {
+ if (!assignedMessageQueue.messageQueues().contains(messageQueue)) {
+ if (subscriptionType == SubscriptionType.SUBSCRIBE) {
+ throw new MQClientException(
+ "The message queue is not in assigned list, may be rebalancing, message queue: "
+ + messageQueue,
+ null);
+ } else {
+ throw new MQClientException(
+ "The message queue is not in assigned list, message queue: " + messageQueue,
+ null);
+ }
+ }
+ long minOffset = minOffset(messageQueue);
+ long maxOffset = maxOffset(messageQueue);
+ if (offset < minOffset || offset > maxOffset) {
+ throw new MQClientException(
+ "Seek offset illegal, seek offset = "
+ + offset
+ + ", min offset = "
+ + minOffset
+ + ", max offset = "
+ + maxOffset,
+ null);
+ }
+ final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
+ synchronized (objLock) {
+ clearMessageQueueInCache(messageQueue);
+
+ PullTaskImpl oldPullTaskImpl = this.taskTable.get(messageQueue);
+ if (oldPullTaskImpl != null) {
+ oldPullTaskImpl.tryInterrupt();
+ this.taskTable.remove(messageQueue);
+ }
+ assignedMessageQueue.setSeekOffset(messageQueue, offset);
+ if (!this.taskTable.containsKey(messageQueue)) {
+ PullTaskImpl pullTask = new PullTaskImpl(messageQueue);
+ this.taskTable.put(messageQueue, pullTask);
+ this.scheduledThreadPoolExecutor.schedule(pullTask, 0, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ public void seekToBegin(MessageQueue messageQueue) throws MQClientException {
+ long begin = minOffset(messageQueue);
+ this.seek(messageQueue, begin);
+ }
+
+ public void seekToEnd(MessageQueue messageQueue) throws MQClientException {
+ long end = maxOffset(messageQueue);
+ this.seek(messageQueue, end);
+ }
+
+ private long maxOffset(MessageQueue messageQueue) throws MQClientException {
+ checkServiceState();
+ return this.mQClientFactory.getMQAdminImpl().maxOffset(messageQueue);
+ }
+
+ private long minOffset(MessageQueue messageQueue) throws MQClientException {
+ checkServiceState();
+ return this.mQClientFactory.getMQAdminImpl().minOffset(messageQueue);
+ }
+
+ private void removePullTaskCallback(final String topic) {
+ removePullTask(topic);
+ }
+
+ private void removePullTask(final String topic) {
+ Iterator> it = this.taskTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry next = it.next();
+ if (next.getKey().getTopic().equals(topic)) {
+ next.getValue().setCancelled(true);
+ it.remove();
+ }
+ }
+ }
+
+ public synchronized void commitAll() {
+ for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
+ try {
+ commit(messageQueue);
+ } catch (Exception e) {
+ log.error("An error occurred when update consume offset Automatically.");
+ }
+ }
+ }
+
+ public synchronized void commit(final Set messageQueues, boolean persist) {
+ if (messageQueues == null || messageQueues.size() == 0) {
+ return;
+ }
+
+ for (MessageQueue messageQueue : messageQueues) {
+ commit(messageQueue);
+ }
+
+ if (persist) {
+ this.offsetStore.persistAll(messageQueues);
+ }
+ }
+
+ private synchronized void commit(MessageQueue messageQueue) {
+ long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue);
+
+ if (consumerOffset != -1) {
+ ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
+ if (processQueue != null && !processQueue.isDropped()) {
+ updateConsumeOffset(messageQueue, consumerOffset);
+ }
+ } else {
+ log.error("consumerOffset is -1 in messageQueue [" + messageQueue + "].");
+ }
+ }
+
+ private void updatePullOffset(
+ MessageQueue messageQueue, long nextPullOffset, ProcessQueue processQueue) {
+ if (assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
+ assignedMessageQueue.updatePullOffset(messageQueue, nextPullOffset, processQueue);
+ }
+ }
+
+ private void submitConsumeRequest(ConsumeRequest consumeRequest) {
+ try {
+ consumeRequestCache.put(consumeRequest);
+ } catch (InterruptedException e) {
+ log.error("Submit consumeRequest error", e);
+ }
+ }
+
+ private long fetchConsumeOffset(MessageQueue messageQueue) throws MQClientException {
+ checkServiceState();
+ long offset = this.rebalanceImpl.computePullFromWhereWithException(messageQueue);
+ return offset;
+ }
+
+ public long committed(MessageQueue messageQueue) throws MQClientException {
+ checkServiceState();
+ long offset =
+ this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
+ if (offset == -2) {
+ throw new MQClientException("Fetch consume offset from broker exception", null);
+ }
+ return offset;
+ }
+
+ private void clearMessageQueueInCache(MessageQueue messageQueue) {
+ ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
+ if (processQueue != null) {
+ processQueue.clear();
+ }
+ Iterator iter = consumeRequestCache.iterator();
+ while (iter.hasNext()) {
+ if (iter.next().getMessageQueue().equals(messageQueue)) {
+ iter.remove();
+ }
+ }
+ }
+
+ private long nextPullOffset(MessageQueue messageQueue) throws MQClientException {
+ long offset = -1;
+ long seekOffset = assignedMessageQueue.getSeekOffset(messageQueue);
+ if (seekOffset != -1) {
+ offset = seekOffset;
+ assignedMessageQueue.updateConsumeOffset(messageQueue, offset);
+ assignedMessageQueue.setSeekOffset(messageQueue, -1);
+ } else {
+ offset = assignedMessageQueue.getPullOffset(messageQueue);
+ if (offset == -1) {
+ offset = fetchConsumeOffset(messageQueue);
+ }
+ }
+ return offset;
+ }
+
+ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+ checkServiceState();
+ return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
+ }
+
+ public class PullTaskImpl implements Runnable {
+ private final MessageQueue messageQueue;
+ private volatile boolean cancelled = false;
+ private Thread currentThread;
+
+ public PullTaskImpl(final MessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
+ public void tryInterrupt() {
+ setCancelled(true);
+ if (currentThread == null) {
+ return;
+ }
+ if (!currentThread.isInterrupted()) {
+ currentThread.interrupt();
+ }
+ }
+
+ @Override
+ public void run() {
+
+ if (!this.isCancelled()) {
+
+ this.currentThread = Thread.currentThread();
+
+ if (assignedMessageQueue.isPaused(messageQueue)) {
+ scheduledThreadPoolExecutor.schedule(
+ this, PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
+ log.debug("Message Queue: {} has been paused!", messageQueue);
+ return;
+ }
+
+ ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
+
+ if (null == processQueue || processQueue.isDropped()) {
+ log.info(
+ "The message queue not be able to poll, because it's dropped. group={}, messageQueue={}",
+ defaultLitePullConsumer.getConsumerGroup(),
+ this.messageQueue);
+ return;
+ }
+
+ if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize()
+ > defaultLitePullConsumer.getPullThresholdForAll()) {
+ scheduledThreadPoolExecutor.schedule(
+ this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
+ log.warn(
+ "The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}",
+ consumeRequestCache.size(),
+ consumeRequestFlowControlTimes);
+ }
+ return;
+ }
+
+ long cachedMessageCount = processQueue.getMsgCount().get();
+ long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
+
+ if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
+ scheduledThreadPoolExecutor.schedule(
+ this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ if ((queueFlowControlTimes++ % 1000) == 0) {
+ log.warn(
+ "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
+ defaultLitePullConsumer.getPullThresholdForQueue(),
+ processQueue.getMsgTreeMap().firstKey(),
+ processQueue.getMsgTreeMap().lastKey(),
+ cachedMessageCount,
+ cachedMessageSizeInMiB,
+ queueFlowControlTimes);
+ }
+ return;
+ }
+
+ if (cachedMessageSizeInMiB
+ > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
+ scheduledThreadPoolExecutor.schedule(
+ this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ if ((queueFlowControlTimes++ % 1000) == 0) {
+ log.warn(
+ "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
+ defaultLitePullConsumer.getPullThresholdSizeForQueue(),
+ processQueue.getMsgTreeMap().firstKey(),
+ processQueue.getMsgTreeMap().lastKey(),
+ cachedMessageCount,
+ cachedMessageSizeInMiB,
+ queueFlowControlTimes);
+ }
+ return;
+ }
+
+ if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
+ scheduledThreadPoolExecutor.schedule(
+ this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+ if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
+ log.warn(
+ "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
+ processQueue.getMsgTreeMap().firstKey(),
+ processQueue.getMsgTreeMap().lastKey(),
+ processQueue.getMaxSpan(),
+ queueMaxSpanFlowControlTimes);
+ }
+ return;
+ }
+
+ long offset = 0L;
+ try {
+ offset = nextPullOffset(messageQueue);
+ } catch (Exception e) {
+ log.error("Failed to get next pull offset", e);
+ scheduledThreadPoolExecutor.schedule(
+ this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
+ return;
+ }
+
+ if (this.isCancelled() || processQueue.isDropped()) {
+ return;
+ }
+ long pullDelayTimeMills = 0;
+ try {
+ SubscriptionData subscriptionData;
+ String topic = this.messageQueue.getTopic();
+ if (subscriptionType == SubscriptionType.SUBSCRIBE) {
+ subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
+ } else {
+ String subExpression4Assign = topicToSubExpression.get(topic);
+ subExpression4Assign =
+ subExpression4Assign == null
+ ? SubscriptionData.SUB_ALL
+ : subExpression4Assign;
+ subscriptionData =
+ FilterAPI.buildSubscriptionData(topic, subExpression4Assign);
+ }
+
+ PullResult pullResult =
+ pull(
+ messageQueue,
+ subscriptionData,
+ offset,
+ defaultLitePullConsumer.getPullBatchSize());
+ if (this.isCancelled() || processQueue.isDropped()) {
+ return;
+ }
+ switch (pullResult.getPullStatus()) {
+ case FOUND:
+ final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
+ synchronized (objLock) {
+ if (pullResult.getMsgFoundList() != null
+ && !pullResult.getMsgFoundList().isEmpty()
+ && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
+ processQueue.putMessage(pullResult.getMsgFoundList());
+ submitConsumeRequest(
+ new ConsumeRequest(
+ pullResult.getMsgFoundList(),
+ messageQueue,
+ processQueue));
+ }
+ }
+ break;
+ case OFFSET_ILLEGAL:
+ log.warn("The pull request offset illegal, {}", pullResult.toString());
+ break;
+ default:
+ break;
+ }
+ updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
+ } catch (InterruptedException interruptedException) {
+ log.warn("Polling thread was interrupted.", interruptedException);
+ } catch (Throwable e) {
+ pullDelayTimeMills = pullTimeDelayMillsWhenException;
+ log.error("An error occurred in pull message process.", e);
+ }
+
+ if (!this.isCancelled()) {
+ scheduledThreadPoolExecutor.schedule(
+ this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
+ } else {
+ log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
+ }
+ }
+ }
+
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ public void setCancelled(boolean cancelled) {
+ this.cancelled = cancelled;
+ }
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+ }
+
+ private PullResult pull(
+ MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return pull(
+ mq,
+ subscriptionData,
+ offset,
+ maxNums,
+ this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
+ }
+
+ private PullResult pull(
+ MessageQueue mq,
+ SubscriptionData subscriptionData,
+ long offset,
+ int maxNums,
+ long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout);
+ }
+
+ private PullResult pullSyncImpl(
+ MessageQueue mq,
+ SubscriptionData subscriptionData,
+ long offset,
+ int maxNums,
+ boolean block,
+ long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+
+ if (null == mq) {
+ throw new MQClientException("mq is null", null);
+ }
+
+ if (offset < 0) {
+ throw new MQClientException("offset < 0", null);
+ }
+
+ if (maxNums <= 0) {
+ throw new MQClientException("maxNums <= 0", null);
+ }
+
+ int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false, true);
+
+ long timeoutMillis =
+ block
+ ? this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend()
+ : timeout;
+
+ boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
+ PullResult pullResult =
+ this.pullAPIWrapper.pullKernelImpl(
+ mq,
+ subscriptionData.getSubString(),
+ subscriptionData.getExpressionType(),
+ isTagType ? 0L : subscriptionData.getSubVersion(),
+ offset,
+ maxNums,
+ sysFlag,
+ 0,
+ this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis(),
+ timeoutMillis,
+ CommunicationMode.SYNC,
+ null);
+ this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
+ if (!this.consumeMessageHookList.isEmpty()) {
+ ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
+ consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
+ consumeMessageContext.setConsumerGroup(this.groupName());
+ consumeMessageContext.setMq(mq);
+ consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
+ consumeMessageContext.setSuccess(false);
+ this.executeHookBefore(consumeMessageContext);
+ consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
+ consumeMessageContext.setSuccess(true);
+ this.executeHookAfter(consumeMessageContext);
+ }
+ return pullResult;
+ }
+
+ private void resetTopic(List msgList) {
+ if (null == msgList || msgList.size() == 0) {
+ return;
+ }
+
+ // If namespace not null , reset Topic without namespace.
+ for (MessageExt messageExt : msgList) {
+ if (null != this.defaultLitePullConsumer.getNamespace()) {
+ messageExt.setTopic(
+ NamespaceUtil.withoutNamespace(
+ messageExt.getTopic(),
+ this.defaultLitePullConsumer.getNamespace()));
+ }
+ }
+ }
+
+ public void updateConsumeOffset(MessageQueue mq, long offset) {
+ checkServiceState();
+ this.offsetStore.updateOffset(mq, offset, false);
+ }
+
+ @Override
+ public String groupName() {
+ return this.defaultLitePullConsumer.getConsumerGroup();
+ }
+
+ @Override
+ public MessageModel messageModel() {
+ return this.defaultLitePullConsumer.getMessageModel();
+ }
+
+ @Override
+ public ConsumeType consumeType() {
+ return ConsumeType.CONSUME_ACTIVELY;
+ }
+
+ @Override
+ public ConsumeFromWhere consumeFromWhere() {
+ return ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
+ }
+
+ @Override
+ public Set subscriptions() {
+ Set subSet = new HashSet();
+
+ subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
+
+ return subSet;
+ }
+
+ @Override
+ public void doRebalance() {
+ if (this.rebalanceImpl != null) {
+ this.rebalanceImpl.doRebalance(false);
+ }
+ }
+
+ @Override
+ public void persistConsumerOffset() {
+ try {
+ checkServiceState();
+ Set mqs = new HashSet();
+ if (this.subscriptionType == SubscriptionType.SUBSCRIBE) {
+ Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
+ mqs.addAll(allocateMq);
+ } else if (this.subscriptionType == SubscriptionType.ASSIGN) {
+ Set assignedMessageQueue =
+ this.assignedMessageQueue.getAssignedMessageQueues();
+ mqs.addAll(assignedMessageQueue);
+ }
+ this.offsetStore.persistAll(mqs);
+ } catch (Exception e) {
+ log.error(
+ "Persist consumer offset error for group: {} ",
+ this.defaultLitePullConsumer.getConsumerGroup(),
+ e);
+ }
+ }
+
+ @Override
+ public void updateTopicSubscribeInfo(String topic, Set info) {
+ Map subTable = this.rebalanceImpl.getSubscriptionInner();
+ if (subTable != null) {
+ if (subTable.containsKey(topic)) {
+ this.rebalanceImpl.getTopicSubscribeInfoTable().put(topic, info);
+ }
+ }
+ }
+
+ @Override
+ public boolean isSubscribeTopicNeedUpdate(String topic) {
+ Map subTable = this.rebalanceImpl.getSubscriptionInner();
+ if (subTable != null) {
+ if (subTable.containsKey(topic)) {
+ return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(topic);
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean isUnitMode() {
+ return this.defaultLitePullConsumer.isUnitMode();
+ }
+
+ @Override
+ public ConsumerRunningInfo consumerRunningInfo() {
+ ConsumerRunningInfo info = new ConsumerRunningInfo();
+
+ Properties prop = MixAll.object2Properties(this.defaultLitePullConsumer);
+ prop.put(
+ ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP,
+ String.valueOf(this.consumerStartTimestamp));
+ info.setProperties(prop);
+
+ info.getSubscriptionSet().addAll(this.subscriptions());
+ return info;
+ }
+
+ private void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway);
+ }
+
+ public OffsetStore getOffsetStore() {
+ return offsetStore;
+ }
+
+ public DefaultLitePullConsumer getDefaultLitePullConsumer() {
+ return defaultLitePullConsumer;
+ }
+
+ public Set fetchMessageQueues(String topic) throws MQClientException {
+ checkServiceState();
+ Set result =
+ this.mQClientFactory.getMQAdminImpl().fetchSubscribeMessageQueues(topic);
+ return parseMessageQueues(result);
+ }
+
+ private synchronized void fetchTopicMessageQueuesAndCompare() throws MQClientException {
+ for (Map.Entry entry :
+ topicMessageQueueChangeListenerMap.entrySet()) {
+ String topic = entry.getKey();
+ TopicMessageQueueChangeListener topicMessageQueueChangeListener = entry.getValue();
+ Set oldMessageQueues = messageQueuesForTopic.get(topic);
+ Set newMessageQueues = fetchMessageQueues(topic);
+ boolean isChanged = !isSetEqual(newMessageQueues, oldMessageQueues);
+ if (isChanged) {
+ messageQueuesForTopic.put(topic, newMessageQueues);
+ if (topicMessageQueueChangeListener != null) {
+ topicMessageQueueChangeListener.onChanged(topic, newMessageQueues);
+ }
+ }
+ }
+ }
+
+ private boolean isSetEqual(Set set1, Set set2) {
+ if (set1 == null && set2 == null) {
+ return true;
+ }
+
+ if (set1 == null
+ || set2 == null
+ || set1.size() != set2.size()
+ || set1.size() == 0
+ || set2.size() == 0) {
+ return false;
+ }
+
+ Iterator iter = set2.iterator();
+ boolean isEqual = true;
+ while (iter.hasNext()) {
+ if (!set1.contains(iter.next())) {
+ isEqual = false;
+ }
+ }
+ return isEqual;
+ }
+
+ public synchronized void registerTopicMessageQueueChangeListener(
+ String topic, TopicMessageQueueChangeListener listener) throws MQClientException {
+ if (topic == null || listener == null) {
+ throw new MQClientException("Topic or listener is null", null);
+ }
+ if (topicMessageQueueChangeListenerMap.containsKey(topic)) {
+ log.warn(
+ "Topic {} had been registered, new listener will overwrite the old one", topic);
+ }
+ topicMessageQueueChangeListenerMap.put(topic, listener);
+ if (this.serviceState == ServiceState.RUNNING) {
+ Set messageQueues = fetchMessageQueues(topic);
+ messageQueuesForTopic.put(topic, messageQueues);
+ }
+ }
+
+ private Set parseMessageQueues(Set queueSet) {
+ Set resultQueues = new HashSet();
+ for (MessageQueue messageQueue : queueSet) {
+ String userTopic =
+ NamespaceUtil.withoutNamespace(
+ messageQueue.getTopic(), this.defaultLitePullConsumer.getNamespace());
+ resultQueues.add(
+ new MessageQueue(
+ userTopic, messageQueue.getBrokerName(), messageQueue.getQueueId()));
+ }
+ return resultQueues;
+ }
+
+ public class ConsumeRequest {
+ private final List messageExts;
+ private final MessageQueue messageQueue;
+ private final ProcessQueue processQueue;
+
+ public ConsumeRequest(
+ final List messageExts,
+ final MessageQueue messageQueue,
+ final ProcessQueue processQueue) {
+ this.messageExts = messageExts;
+ this.messageQueue = messageQueue;
+ this.processQueue = processQueue;
+ }
+
+ public List getMessageExts() {
+ return messageExts;
+ }
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+ public ProcessQueue getProcessQueue() {
+ return processQueue;
+ }
+ }
+
+ public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
+ this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index 97037a11..8e59096d 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -18,7 +18,6 @@
package org.apache.rocketmq.flink.legacy;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
-import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -161,7 +160,8 @@ public void open(Configuration parameters) throws Exception {
Validate.notEmpty(topic, "Consumer topic can not be empty");
Validate.notEmpty(group, "Consumer group can not be empty");
- String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG);
+ String tag =
+ props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
Validate.isTrue(
!(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
@@ -221,6 +221,15 @@ public void open(Configuration parameters) throws Exception {
String.valueOf(indexOfThisSubTask),
String.valueOf(System.nanoTime()));
consumer.setInstanceName(instanceName);
+
+ int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE, DEFAULT_CONSUMER_BATCH_SIZE);
+ consumer.setPullBatchSize(pullBatchSize);
+
+ if (StringUtils.isEmpty(sql)) {
+ consumer.setSubExpressionForAssign(topic, tag);
+ } else {
+ consumer.setSubExpressionForAssign(topic, sql);
+ }
consumer.start();
Counter outputCounter =
@@ -250,6 +259,8 @@ public void open(Configuration parameters) throws Exception {
RocketMQUtils.allocate(totalQueues, taskNumber, ctx.getIndexOfThisSubtask());
// If the job recovers from the state, the state has already contained the offsets of last
// commit.
+ consumer.assign(totalQueues);
+
if (!restored) {
initOffsets(messageQueues);
}
@@ -257,10 +268,6 @@ public void open(Configuration parameters) throws Exception {
@Override
public void run(SourceContext context) throws Exception {
- String sql = props.getProperty(RocketMQConfig.CONSUMER_SQL);
- String tag =
- props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
- int pullBatchSize = getInteger(props, CONSUMER_BATCH_SIZE, DEFAULT_CONSUMER_BATCH_SIZE);
timer.scheduleAtFixedRate(
() -> {
// context.emitWatermark(waterMarkPerQueue.getCurrentWatermark());
@@ -269,12 +276,7 @@ public void run(SourceContext context) throws Exception {
5,
5,
TimeUnit.SECONDS);
- if (StringUtils.isEmpty(sql)) {
- consumer.subscribe(topic, tag);
- } else {
- // pull with sql do not support block pull.
- consumer.subscribe(topic, MessageSelector.bySql(sql));
- }
+
for (MessageQueue mq : messageQueues) {
this.executor.execute(
() ->
@@ -283,8 +285,9 @@ public void run(SourceContext context) throws Exception {
while (runningChecker.isRunning()) {
try {
Long offset = offsetTable.get(mq);
- consumer.setPullBatchSize(pullBatchSize);
- consumer.seek(mq, offset);
+ if (offset != null) {
+ consumer.seek(mq, offset);
+ }
boolean found = false;
List messages =
consumer.poll(
diff --git a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
index bf489bb1..6e275d34 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/enumerator/RocketMQSourceEnumerator.java
@@ -236,6 +236,7 @@ private Set discoverAndInitializePartitionSplit()
new HashSet<>(Collections.unmodifiableSet(discoveredPartitions));
Collection messageQueues = consumer.fetchMessageQueues(topic);
+ consumer.assign(messageQueues);
Set result = new HashSet<>();
for (MessageQueue messageQueue : messageQueues) {
Tuple3 topicPartition =
@@ -340,10 +341,12 @@ private long getOffsetByMessageQueue(MessageQueue mq) throws MQClientException {
switch (consumerOffsetMode) {
case CONSUMER_OFFSET_EARLIEST:
consumer.seekToBegin(mq);
- return -1;
+ offset = consumer.committed(mq);
+ break;
case CONSUMER_OFFSET_LATEST:
consumer.seekToEnd(mq);
- return -1;
+ offset = consumer.committed(mq);
+ break;
case CONSUMER_OFFSET_TIMESTAMP:
offset = consumer.offsetForTimestamp(mq, consumerOffsetTimestamp);
break;
diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
index 72fd96ef..8dc04001 100644
--- a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
+++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java
@@ -21,7 +21,6 @@
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
-import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -122,6 +121,7 @@ public RecordsWithSplitIds> fetch() throws IOException {
Collection messageQueues;
try {
messageQueues = consumer.fetchMessageQueues(topic);
+ consumer.assign(messageQueues);
} catch (MQClientException e) {
LOG.error(
String.format(
@@ -330,12 +330,13 @@ private void initialRocketMQConsumer(
topic,
consumerGroup,
"" + System.nanoTime()));
- consumer.start();
- if (StringUtils.isNotEmpty(sql)) {
- consumer.subscribe(topic, MessageSelector.bySql(sql));
+
+ if (StringUtils.isEmpty(sql)) {
+ consumer.setSubExpressionForAssign(topic, tag);
} else {
- consumer.subscribe(topic, tag);
+ consumer.setSubExpressionForAssign(topic, sql);
}
+ consumer.start();
} catch (MQClientException e) {
LOG.error("Failed to initial RocketMQ consumer.", e);
consumer.shutdown();