Skip to content

Commit 89d3983

Browse files
authored
[ISSUE-184] Update consumer details UI to support consumers subscribed to multiple topics (#192)
* Start refactoring backend consumer code to support multiple topics * Start refactoring backend consumer code to support multiple topics * code cleanup * wip * Update UI to allow for selecting which topic to view graphs/metrics for * cleanup * Rework test cases and return values * Update Test cases * gracefully handle when a consumer group has no topics subscribed * resolve test failure
1 parent 56e9ff7 commit 89d3983

File tree

15 files changed

+846
-246
lines changed

15 files changed

+846
-246
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
1616
## 2.4.0 (07/02/2019)
1717
#### New Features
1818
- [PR-180](https://github.com/SourceLabOrg/kafka-webview/issues/180) Consumer Group page now shows average rate of consumption per partition.
19+
- [ISSUE-184](https://github.com/SourceLabOrg/kafka-webview/issues/184) Cluster Kafka Consumer View for multiple topics.
1920

2021
#### Bug Fixes
2122
- [ISSUE-175](https://github.com/SourceLabOrg/kafka-webview/issues/175) Update multi-threaded consumers with unique consumerId [PR](https://github.com/SourceLabOrg/kafka-webview/pull/176).

dev-cluster/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
<dependency>
5353
<groupId>com.salesforce.kafka.test</groupId>
5454
<artifactId>kafka-junit-core</artifactId>
55-
<version>3.1.1</version>
55+
<version>3.1.2</version>
5656
</dependency>
5757

5858
<!-- Include Kafka 1.1.x -->

dev-cluster/src/main/java/org/sourcelab/kafka/devcluster/DevCluster.java

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,12 @@
4747
import org.slf4j.LoggerFactory;
4848

4949
import java.net.URL;
50+
import java.util.ArrayList;
51+
import java.util.Arrays;
52+
import java.util.Collection;
5053
import java.util.Collections;
54+
import java.util.HashMap;
55+
import java.util.Map;
5156
import java.util.Properties;
5257

5358
/**
@@ -146,15 +151,11 @@ public static void main(final String[] args) throws Exception {
146151
if (topicNames != null && topicNames.length > 0) {
147152
final KafkaTestUtils testUtils = new KafkaTestUtils(kafkaTestCluster);
148153
if (cmd.hasOption("consumer")) {
149-
for (final String topicName : topicNames) {
150-
runEndlessConsumer(topicName, testUtils);
151-
}
154+
runEndlessConsumer(Arrays.asList(topicNames), testUtils);
152155
}
153156

154157
if (cmd.hasOption("producer")) {
155-
for (final String topicName : topicNames) {
156-
runEndlessProducer(topicName, testUtils);
157-
}
158+
runEndlessProducer(Arrays.asList(topicNames), testUtils);
158159
}
159160
}
160161

@@ -237,46 +238,57 @@ private static CommandLine parseArguments(final String[] args) throws ParseExcep
237238

238239
/**
239240
* Fire up a new thread running an endless producer script into the given topic and partitions.
240-
* @param topicName Name of the topic to produce records into.
241+
* @param topicNames Names of the topic to produce records into.
241242
* @param utils KafkaUtils instance.
242243
*/
243244
private static void runEndlessProducer(
244-
final String topicName,
245+
final Collection<String> topicNames,
245246
final KafkaTestUtils utils
246247
) {
247248
final Thread producerThread = new Thread(() -> {
248-
// Determine how many partitions there are
249-
final TopicDescription topicDescription = utils.describeTopic(topicName);
249+
final Map<String, TopicDescription> topicDescriptions = new HashMap<>();
250+
251+
// Gather details about topics
252+
for (final String topicName : topicNames) {
253+
// Determine how many partitions there are
254+
topicDescriptions.put(topicName, utils.describeTopic(topicName));
255+
}
250256

251257
do {
252258
// Publish some data into that topic for each partition.
253-
for (final TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
254-
utils.produceRecords(1000, topicName, partitionInfo.partition());
259+
for (final Map.Entry<String, TopicDescription> entry : topicDescriptions.entrySet()) {
260+
final String topicName = entry.getKey();
261+
final TopicDescription topicDescription = entry.getValue();
262+
263+
for (final TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
264+
utils.produceRecords(1000, topicName, partitionInfo.partition());
265+
try {
266+
Thread.sleep(1000L);
267+
} catch (InterruptedException e) {
268+
return;
269+
}
270+
}
255271
try {
256-
Thread.sleep(1000L);
272+
Thread.sleep(5000L);
257273
} catch (InterruptedException e) {
258274
return;
259275
}
260276
}
261-
try {
262-
Thread.sleep(5000L);
263-
} catch (InterruptedException e) {
264-
return;
265-
}
277+
266278
} while (true);
267279
});
268280

269-
logger.info("Starting endless producer for topic {}", topicName);
270-
producerThread.setName("Endless producer for topic " + topicName);
281+
logger.info("Starting endless producer for topic {}", topicNames);
282+
producerThread.setName("Endless producer for topic " + topicNames);
271283
producerThread.start();
272284
}
273285

274286
/**
275287
* Fire up a new thread running an enless consumer script that reads from the given topic.
276-
* @param topicName Topic to consume from.
288+
* @param topicNames Topics to consume from.
277289
* @param utils KafkaUtils instance.
278290
*/
279-
private static void runEndlessConsumer(final String topicName, final KafkaTestUtils utils) {
291+
private static void runEndlessConsumer(final Collection<String> topicNames, final KafkaTestUtils utils) {
280292
final Thread consumerThread = new Thread(() -> {
281293
// Start a consumer
282294
final Properties properties = new Properties();
@@ -286,7 +298,7 @@ private static void runEndlessConsumer(final String topicName, final KafkaTestUt
286298
try (final KafkaConsumer<String, String> consumer
287299
= utils.getKafkaConsumer(StringDeserializer.class, StringDeserializer.class, properties)) {
288300

289-
consumer.subscribe(Collections.singleton(topicName));
301+
consumer.subscribe(topicNames);
290302
do {
291303
final ConsumerRecords<String, String> records = consumer.poll(1000);
292304
consumer.commitSync();
@@ -305,8 +317,8 @@ private static void runEndlessConsumer(final String topicName, final KafkaTestUt
305317
}
306318
});
307319

308-
logger.info("Starting endless consumer for topic {}", topicName);
309-
consumerThread.setName("Endless consumer for topic " + topicName);
320+
logger.info("Starting endless consumer for topic {}", topicNames);
321+
consumerThread.setName("Endless consumer for topic " + topicNames);
310322
consumerThread.start();
311323
}
312324
}

kafka-webview-ui/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@
218218
<dependency>
219219
<groupId>com.salesforce.kafka.test</groupId>
220220
<artifactId>kafka-junit4</artifactId>
221-
<version>3.1.1</version>
221+
<version>3.1.2</version>
222222
<scope>test</scope>
223223
</dependency>
224224
<dependency>

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/api/ApiController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,8 @@ public ConsumerGroupOffsets getConsumerOffsets(
554554
final Cluster cluster = retrieveClusterById(id);
555555

556556
try (final KafkaOperations operations = createOperationsClient(cluster)) {
557-
return operations.getConsumerGroupOffsets(consumerGroupId);
557+
final ConsumerGroupOffsets offsets = operations.getConsumerGroupOffsets(consumerGroupId);
558+
return offsets;
558559
} catch (final Exception exception) {
559560
throw new ApiException("ClusterNodes", exception);
560561
}

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperations.java

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupIdentifier;
5151
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsets;
5252
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupOffsetsWithTailPositions;
53+
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerGroupTopicOffsets;
5354
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.CreateTopic;
5455
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.NodeDetails;
5556
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.NodeList;
@@ -455,27 +456,26 @@ public List<ConsumerGroupDetails> getConsumerGroupDetails(final List<String> con
455456
*/
456457
public ConsumerGroupOffsets getConsumerGroupOffsets(final String consumerGroupId) {
457458
try {
459+
// Create new builder
460+
final ConsumerGroupOffsets.Builder builder = ConsumerGroupOffsets.newBuilder()
461+
.withConsumerId(consumerGroupId);
462+
458463
// Make request
459464
final ListConsumerGroupOffsetsResult results = adminClient.listConsumerGroupOffsets(consumerGroupId);
460465

461-
final List<PartitionOffset> offsetList = new ArrayList<>();
462-
String topic = null;
463-
464466
// Iterate over results
465467
final Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> partitionsToOffsets = results
466468
.partitionsToOffsetAndMetadata()
467469
.get();
468470

471+
// Loop over results
469472
for (final Map.Entry<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> entry : partitionsToOffsets.entrySet()) {
470-
offsetList.add(new PartitionOffset(
471-
entry.getKey().partition(), entry.getValue().offset()
472-
));
473-
if (topic == null) {
474-
topic = entry.getKey().topic();
475-
}
473+
builder.withOffset(
474+
entry.getKey().topic(), entry.getKey().partition(), entry.getValue().offset()
475+
);
476476
}
477477

478-
return new ConsumerGroupOffsets(consumerGroupId, topic, offsetList);
478+
return builder.build();
479479
} catch (final InterruptedException | ExecutionException e) {
480480
throw new RuntimeException(e.getMessage(), e);
481481
}
@@ -490,24 +490,30 @@ public ConsumerGroupOffsets getConsumerGroupOffsets(final String consumerGroupId
490490
*/
491491
public ConsumerGroupOffsetsWithTailPositions getConsumerGroupOffsetsWithTailOffsets(final String consumerGroupId) {
492492
final ConsumerGroupOffsets consumerGroupOffsets = getConsumerGroupOffsets(consumerGroupId);
493-
final TailOffsets tailOffsets = getTailOffsets(consumerGroupOffsets.getTopic(), consumerGroupOffsets.getPartitions());
494493

495-
final List<PartitionOffsetWithTailPosition> offsetsWithPartitions = new ArrayList<>();
494+
// Create builder
495+
final ConsumerGroupOffsetsWithTailPositions.Builder builder = ConsumerGroupOffsetsWithTailPositions.newBuilder()
496+
.withConsumerId(consumerGroupId)
497+
.withTimestamp(System.currentTimeMillis());
496498

497-
for (final PartitionOffset entry : tailOffsets.getOffsets()) {
498-
offsetsWithPartitions.add(new PartitionOffsetWithTailPosition(
499-
entry.getPartition(),
500-
consumerGroupOffsets.getOffsetForPartition(entry.getPartition()),
501-
entry.getOffset()
502-
));
499+
// Loop over each topic
500+
for (final String topicName : consumerGroupOffsets.getTopicNames()) {
501+
final ConsumerGroupTopicOffsets topicOffsets = consumerGroupOffsets.getOffsetsForTopic(topicName);
502+
503+
// Retrieve tail offsets
504+
final TailOffsets tailOffsets = getTailOffsets(topicName, topicOffsets.getPartitions());
505+
506+
// Build offsets with partitions
507+
for (final PartitionOffset entry : tailOffsets.getOffsets()) {
508+
builder.withOffsetsForTopic(topicName, new PartitionOffsetWithTailPosition(
509+
entry.getPartition(),
510+
topicOffsets.getOffsetForPartition(entry.getPartition()),
511+
entry.getOffset()
512+
));
513+
}
503514
}
504515

505-
return new ConsumerGroupOffsetsWithTailPositions(
506-
consumerGroupId,
507-
consumerGroupOffsets.getTopic(),
508-
offsetsWithPartitions,
509-
System.currentTimeMillis()
510-
);
516+
return builder.build();
511517
}
512518

513519
/**

0 commit comments

Comments
 (0)