Skip to content

Commit 3db9375

Browse files
authored
[ISSUE-175] Make consumerId unique per thread (#176)
* [ISSUE-175] Make consumerId unique per thread * Resolve failing tests, fix poor naming * Update API test * Update release version, Update Changelog * fix confusing labels
1 parent 1f18abb commit 3db9375

File tree

15 files changed

+75
-50
lines changed

15 files changed

+75
-50
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
The format is based on [Keep a Changelog](http://keepachangelog.com/)
33
and this project adheres to [Semantic Versioning](http://semver.org/).
44

5+
## 2.3.1 (UNRELEASED)
6+
#### Bug Fixes
7+
- [ISSUE-175](https://github.com/SourceLabOrg/kafka-webview/issues/175) Update multi-threaded consumers with unique consumerId [PR](https://github.com/SourceLabOrg/kafka-webview/pull/176).
8+
59
## 2.3.0 (06/19/2019)
610
#### New Features
711
- [ISSUE-166](https://github.com/SourceLabOrg/kafka-webview/issues/166) Add groupSearchFilter property to specify the filter used to list LDAP group membership. Thanks for the contribution @[BlueIcarus](https://github.com/BlueIcarus)!

dev-cluster/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
<parent>
66
<artifactId>kafka-webview</artifactId>
77
<groupId>org.sourcelab</groupId>
8-
<version>2.3.0</version>
8+
<version>2.3.1</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111

1212
<artifactId>dev-cluster</artifactId>
13-
<version>2.3.0</version>
13+
<version>2.3.1</version>
1414

1515
<!-- Require Maven 3.3.9 -->
1616
<prerequisites>

kafka-webview-plugin/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>org.sourcelab</groupId>
77
<artifactId>kafka-webview</artifactId>
8-
<version>2.3.0</version>
8+
<version>2.3.1</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<artifactId>kafka-webview-plugin</artifactId>

kafka-webview-ui/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
<parent>
66
<artifactId>kafka-webview</artifactId>
77
<groupId>org.sourcelab</groupId>
8-
<version>2.3.0</version>
8+
<version>2.3.1</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<artifactId>kafka-webview-ui</artifactId>
12-
<version>2.3.0</version>
12+
<version>2.3.1</version>
1313

1414
<!-- Module Description and Ownership -->
1515
<name>Kafka WebView UI</name>

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/controller/cluster/ClusterController.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,10 @@ public String readTopic(
168168
/**
169169
* GET Displays info about a specific consumers group in a cluster.
170170
*/
171-
@RequestMapping(path = "/{clusterId}/consumer/{consumerId:.+}", method = RequestMethod.GET)
171+
@RequestMapping(path = "/{clusterId}/consumer/{groupId:.+}", method = RequestMethod.GET)
172172
public String readConsumer(
173173
@PathVariable final Long clusterId,
174-
@PathVariable final String consumerId,
174+
@PathVariable final String groupId,
175175
final Model model,
176176
final RedirectAttributes redirectAttributes) {
177177

@@ -182,12 +182,12 @@ public String readConsumer(
182182
return "redirect:/";
183183
}
184184
model.addAttribute("cluster", cluster);
185-
model.addAttribute("consumerId", consumerId);
185+
model.addAttribute("groupId", groupId);
186186

187187
// Setup breadcrumbs
188188
setupBreadCrumbs(model)
189189
.addCrumb(cluster.getName(), "/cluster/" + clusterId)
190-
.addCrumb("Consumer " + consumerId, null);
190+
.addCrumb("Consumer Group " + groupId, null);
191191

192192

193193
// Display template

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaClientConfigUtil.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,17 +86,22 @@ private Map<String, Object> applyCommonSettings(
8686
final String consumerId,
8787
final Map<String, Object> config
8888
) {
89-
// Generate consumerId with our configured static prefix.
90-
final String prefixedConsumerId = consumerIdPrefix.concat("-").concat(consumerId);
89+
// Generate groupId with our configured static prefix.
90+
final String prefixedGroupId = consumerIdPrefix.concat("-").concat(consumerId);
91+
92+
// Generate consumerId, which should be unique per user and thread.
93+
final String prefixedConsumerId = prefixedGroupId.concat("-") + Thread.currentThread().getId();
9194

9295
// Set common config items
9396
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterConfig.getConnectString());
9497
config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
9598

96-
// ClientId and ConsumerGroupId are intended to be unique for each user session.
99+
// groupId is intended to be unique for each user session.
100+
// clientId is intended to be unique per user session and thread.
97101
// See Issue-57 https://github.com/SourceLabOrg/kafka-webview/issues/57#issuecomment-363508531
102+
// See Issue-175 https://github.com/SourceLabOrg/kafka-webview/issues/175
98103
config.put(ConsumerConfig.CLIENT_ID_CONFIG, prefixedConsumerId);
99-
config.put(ConsumerConfig.GROUP_ID_CONFIG, prefixedConsumerId);
104+
config.put(ConsumerConfig.GROUP_ID_CONFIG, prefixedGroupId);
100105

101106
// Optionally configure SSL
102107
applySslSettings(clusterConfig, config);

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/KafkaOperations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ public List<ConsumerGroupDetails> getConsumerGroupDetails(final List<String> con
440440
});
441441

442442
// Sort list by consumer group id
443-
consumerGroupDetails.sort(Comparator.comparing(ConsumerGroupDetails::getConsumerId));
443+
consumerGroupDetails.sort(Comparator.comparing(ConsumerGroupDetails::getGroupId));
444444

445445
// Return immutable list.
446446
return Collections.unmodifiableList(consumerGroupDetails);

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/ParallelWebKafkaConsumer.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public ParallelWebKafkaConsumer(
8888

8989
@Override
9090
public KafkaResults consumePerPartition() {
91-
try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
91+
try (final KafkaConsumer kafkaConsumer = createNewConsumerAndSubscribeAllPartitions()) {
9292
final List<TopicPartition> allTopicPartitions = getAllPartitions(kafkaConsumer);
9393

9494
// To preserve order
@@ -131,7 +131,7 @@ public KafkaResults consumePerPartition() {
131131

132132
@Override
133133
public ConsumerState seek(final Map<Integer, Long> partitionOffsetMap) {
134-
try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
134+
try (final KafkaConsumer kafkaConsumer = createNewConsumerAndSubscribeAllPartitions()) {
135135
for (final Map.Entry<Integer, Long> entry : partitionOffsetMap.entrySet()) {
136136
kafkaConsumer.seek(
137137
new TopicPartition(clientConfig.getTopicConfig().getTopicName(), entry.getKey()),
@@ -145,7 +145,7 @@ public ConsumerState seek(final Map<Integer, Long> partitionOffsetMap) {
145145

146146
@Override
147147
public ConsumerState seek(final long timestamp) {
148-
try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
148+
try (final KafkaConsumer kafkaConsumer = createNewConsumerAndSubscribeAllPartitions()) {
149149
// Find offsets for timestamp
150150
final Map<TopicPartition, Long> timestampMap = new HashMap<>();
151151
for (final TopicPartition topicPartition : getAllPartitions(kafkaConsumer)) {
@@ -173,7 +173,7 @@ public void close() {
173173

174174
@Override
175175
public void previous() {
176-
try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
176+
try (final KafkaConsumer kafkaConsumer = createNewConsumerAndSubscribeAllPartitions()) {
177177
// Get all available partitions
178178
final List<TopicPartition> topicPartitions = getAllPartitions(kafkaConsumer);
179179

@@ -203,7 +203,7 @@ public void previous() {
203203

204204
@Override
205205
public void next() {
206-
try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
206+
try (final KafkaConsumer kafkaConsumer = createNewConsumerAndSubscribeAllPartitions()) {
207207
// Get all available partitions
208208
final List<TopicPartition> topicPartitions = getAllPartitions(kafkaConsumer);
209209

@@ -231,7 +231,7 @@ public void next() {
231231

232232
@Override
233233
public ConsumerState toHead() {
234-
try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
234+
try (final KafkaConsumer kafkaConsumer = createNewConsumerAndSubscribeAllPartitions()) {
235235
// Get all available partitions
236236
final List<TopicPartition> topicPartitions = getAllPartitions(kafkaConsumer);
237237

@@ -253,7 +253,7 @@ public ConsumerState toHead() {
253253

254254
@Override
255255
public ConsumerState toTail() {
256-
try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
256+
try (final KafkaConsumer kafkaConsumer = createNewConsumerAndSubscribeAllPartitions()) {
257257
// Get all available partitions
258258
final List<TopicPartition> topicPartitions = getAllPartitions(kafkaConsumer);
259259

@@ -274,7 +274,20 @@ public ConsumerState toTail() {
274274
}
275275
}
276276

277+
/**
278+
* Creates a new consumer, but does NOT subscribe to any partitions.
279+
* Will be required to subscribe to the partitions manually you require.
280+
* @return KafkaConsumer
281+
*/
277282
private KafkaConsumer createNewConsumer() {
283+
return kafkaConsumerFactory.createConsumer(clientConfig);
284+
}
285+
286+
/**
287+
* Creates a new consumer AND subscribes to all partitions.
288+
* @return KafkaConsumer
289+
*/
290+
private KafkaConsumer createNewConsumerAndSubscribeAllPartitions() {
278291
return kafkaConsumerFactory.createConsumerAndSubscribe(clientConfig);
279292
}
280293

kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/dto/ConsumerGroupDetails.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
* Details about a specific consumer group.
3535
*/
3636
public class ConsumerGroupDetails {
37-
private final String consumerId;
37+
private final String groupId;
3838
private final boolean isSimple;
3939
private final String partitionAssignor;
4040
private final String state;
@@ -43,30 +43,30 @@ public class ConsumerGroupDetails {
4343

4444
/**
4545
* Constructor.
46-
* @param consumerId consumer group id.
46+
* @param groupId consumer group id.
4747
* @param isSimple if its a simple consumer group
4848
* @param partitionAssignor How partitions are assigned
4949
* @param state state of consumer
5050
* @param members members in the group.
5151
* @param coordinator node that is acting as the coordinator for this group.
5252
*/
5353
public ConsumerGroupDetails(
54-
final String consumerId,
54+
final String groupId,
5555
final boolean isSimple,
5656
final String partitionAssignor,
5757
final String state,
5858
final List<Member> members,
5959
final NodeDetails coordinator) {
60-
this.consumerId = consumerId;
60+
this.groupId = groupId;
6161
this.isSimple = isSimple;
6262
this.partitionAssignor = partitionAssignor;
6363
this.members = Collections.unmodifiableList(new ArrayList<>(members));
6464
this.state = state;
6565
this.coordinator = coordinator;
6666
}
6767

68-
public String getConsumerId() {
69-
return consumerId;
68+
public String getGroupId() {
69+
return groupId;
7070
}
7171

7272
public boolean isSimple() {
@@ -142,7 +142,7 @@ public String toString() {
142142
@Override
143143
public String toString() {
144144
return "ConsumerGroupDetails{"
145-
+ "consumerId='" + consumerId + '\''
145+
+ "groupId='" + groupId + '\''
146146
+ ", isSimple=" + isSimple
147147
+ ", partitionAssignor='" + partitionAssignor + '\''
148148
+ ", state='" + state + '\''

kafka-webview-ui/src/main/resources/templates/cluster/read.html

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@
154154

155155
jQuery.each(ClusterInfo.cachedConsumers, function(index, result) {
156156
// Filter
157-
if (!SearchTools.doesMatchText(searchStr, result.consumerId)) {
157+
if (!SearchTools.doesMatchText(searchStr, result.groupId)) {
158158
// Skip
159159
return;
160160
}
@@ -175,8 +175,8 @@
175175

176176
// Generate html from template
177177
var properties = {
178-
id: result.consumerId,
179-
encodedId: encodeURIComponent(result.consumerId),
178+
id: result.groupId,
179+
encodedId: encodeURIComponent(result.groupId),
180180
topic: uniqueTopics,
181181
state: result.state,
182182
numberOfMembers: result.members.length,
@@ -214,11 +214,11 @@
214214
ClusterInfo.handleAllConsumers(results);
215215
});
216216
},
217-
removeConsumer: function(consumerId) {
217+
removeConsumer: function(groupId) {
218218
if (!confirm('Are you sure you want to remove this consumer group?')) {
219219
return;
220220
}
221-
ApiClient.removeConsumer(ClusterInfo.clusterId, consumerId, function(result) {
221+
ApiClient.removeConsumer(ClusterInfo.clusterId, groupId, function(result) {
222222
UITools.showSuccess('Successfully remove consumer group.');
223223

224224
// Display success

0 commit comments

Comments
 (0)