Skip to content

Commit 67fa428

Browse files
authored
Update streaming consumer to commit state (#179)
* Commit Consumer State * Code cleanup * Update changelog
1 parent bef8270 commit 67fa428

File tree

2 files changed

+8
-5
lines changed

2 files changed

+8
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
66
#### Bug Fixes
77
- [ISSUE-175](https://github.com/SourceLabOrg/kafka-webview/issues/175) Update multi-threaded consumers with unique consumerId [PR](https://github.com/SourceLabOrg/kafka-webview/pull/176).
88
- [PR-178](https://github.com/SourceLabOrg/kafka-webview/pull/178) @[lucrito](https://github.com/lucrito) fixed shebang in start.sh script. Thanks!
9+
- [PR-179](https://github.com/SourceLabOrg/kafka-webview/pull/179) Streaming consumer now persists consumer state.
910

1011
## 2.3.0 (06/19/2019)
1112
#### New Features

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/SocketKafkaConsumer.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class SocketKafkaConsumer implements Runnable {
6161
// get a large flood of messages, which maybe is a good thing? Or bad? Unsure.
6262
private static final int maxQueueCapacity = 25;
6363

64-
private final KafkaConsumer kafkaConsumer;
64+
private final KafkaConsumer<?,?> kafkaConsumer;
6565
private final ClientConfig clientConfig;
6666
private final Duration pollTimeoutDuration = Duration.ofMillis(POLL_TIMEOUT_MS);
6767
private final BlockingQueue<KafkaResult> outputQueue;
@@ -74,7 +74,7 @@ public class SocketKafkaConsumer implements Runnable {
7474
* @param clientConfig The client's configuration.
7575
*/
7676
public SocketKafkaConsumer(
77-
final KafkaConsumer kafkaConsumer,
77+
final KafkaConsumer<?,?> kafkaConsumer,
7878
final ClientConfig clientConfig) {
7979

8080
this.kafkaConsumer = kafkaConsumer;
@@ -109,7 +109,7 @@ public void run() {
109109

110110
do {
111111
// Start trying to consume messages from kafka
112-
final ConsumerRecords consumerRecords = kafkaConsumer.poll(pollTimeoutDuration);
112+
final ConsumerRecords<?,?> consumerRecords = kafkaConsumer.poll(pollTimeoutDuration);
113113

114114
// If no records found
115115
if (consumerRecords.isEmpty()) {
@@ -121,7 +121,7 @@ public void run() {
121121
}
122122

123123
// Push messages onto output queue
124-
for (final ConsumerRecord consumerRecord : (Iterable<ConsumerRecord>) consumerRecords) {
124+
for (final ConsumerRecord consumerRecord : consumerRecords) {
125125
// Translate record
126126
final KafkaResult kafkaResult = new KafkaResult(
127127
consumerRecord.partition(),
@@ -141,6 +141,9 @@ public void run() {
141141
}
142142
}
143143

144+
// Commit state async.
145+
kafkaConsumer.commitAsync();
146+
144147
// Sleep for a bit
145148
sleep(DWELL_TIME_MS);
146149
}
@@ -231,7 +234,6 @@ private void seekToTimestamp(final long timestamp) {
231234
/**
232235
* Seek to the specified offsets.
233236
* @param partitionOffsetMap Map of PartitionId to Offset to seek to.
234-
* @return ConsumerState representing the consumer's positions.
235237
*/
236238
private void seek(final Map<TopicPartition, Long> partitionOffsetMap) {
237239
for (final Map.Entry<TopicPartition, Long> entry: partitionOffsetMap.entrySet()) {

0 commit comments

Comments
 (0)