From b893a363dd607b04b6346ab784913f615b2ae3f3 Mon Sep 17 00:00:00 2001 From: German Osin Date: Mon, 1 Dec 2025 16:21:33 +0100 Subject: [PATCH 1/4] BE: #1374 lag endpoints --- .../controller/ConsumerGroupsController.java | 30 +++++++++ .../kafbat/ui/mapper/ConsumerGroupMapper.java | 19 ++++++ .../ui/model/InternalConsumerGroup.java | 18 +---- .../ui/service/ConsumerGroupService.java | 65 +++++++++++++++++++ .../io/kafbat/ui/util/ConsumerGroupUtil.java | 27 ++++++++ contract-typespec/api/consumer-groups.tsp | 20 ++++++ 6 files changed, 163 insertions(+), 16 deletions(-) create mode 100644 api/src/main/java/io/kafbat/ui/util/ConsumerGroupUtil.java diff --git a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java index 9e954575d..332be2de2 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java @@ -12,6 +12,7 @@ import io.kafbat.ui.model.ConsumerGroupDetailsDTO; import io.kafbat.ui.model.ConsumerGroupOffsetsResetDTO; import io.kafbat.ui.model.ConsumerGroupOrderingDTO; +import io.kafbat.ui.model.ConsumerGroupsLagResponseDTO; import io.kafbat.ui.model.ConsumerGroupsPageResponseDTO; import io.kafbat.ui.model.PartitionOffsetDTO; import io.kafbat.ui.model.SortOrderDTO; @@ -20,9 +21,12 @@ import io.kafbat.ui.service.ConsumerGroupService; import io.kafbat.ui.service.OffsetsResetService; import io.kafbat.ui.service.mcp.McpTool; +import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Supplier; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -95,6 +99,32 @@ public Mono> getConsumerGroup(String clu .doOnEach(sig -> audit(context, sig)); } + @Override + public Mono> getConsumerGroupsLag(String clusterName, + List groupNames, + Long lastUpdate, + ServerWebExchange exchange) { + + var context = AccessContext.builder() + .cluster(clusterName) + .operationName("getConsumerGroupsLag") + .build(); + + Mono> result = + consumerGroupService.getConsumerGroupsLag(getCluster(clusterName), groupNames, Optional.ofNullable(lastUpdate)) + .flatMapMany(m -> Flux.fromIterable(m.entrySet())) + .filterWhen(cg -> accessControlService.isConsumerGroupAccessible(cg.getKey(), clusterName)) + .collectList() + .map(l -> l.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + .map(m -> new ConsumerGroupsLagResponseDTO(Instant.now().toEpochMilli(), m)) + .map(ResponseEntity::ok) + .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); + + return validateAccess(context) + .then(result) + .doOnEach(sig -> audit(context, sig)); + } + @Override public Mono>> getTopicConsumerGroups(String clusterName, String topicName, diff --git a/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java b/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java index 800eab757..a4c2df95c 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java @@ -1,16 +1,25 @@ package io.kafbat.ui.mapper; +import io.kafbat.ui.api.model.ConsumerGroupLag; +import io.kafbat.ui.api.model.ConsumerGroupState; import io.kafbat.ui.model.BrokerDTO; import io.kafbat.ui.model.ConsumerGroupDTO; import io.kafbat.ui.model.ConsumerGroupDetailsDTO; +import io.kafbat.ui.model.ConsumerGroupLagDTO; import io.kafbat.ui.model.ConsumerGroupStateDTO; import io.kafbat.ui.model.ConsumerGroupTopicPartitionDTO; import io.kafbat.ui.model.InternalConsumerGroup; import io.kafbat.ui.model.InternalTopicConsumerGroup; +import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState; +import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -76,6 +85,16 @@ public static ConsumerGroupDetailsDTO toDetailsDto(InternalConsumerGroup g) { return details; } + public static ConsumerGroupLagDTO toDto(ScrapedClusterState.ConsumerGroupState state) { + + Set topicPartitions = Stream.concat( + state.description().members().stream() + .flatMap(m -> m.assignment().topicPartitions().stream()), + state.committedOffsets().keySet().stream() + ).collect(Collectors.toSet()); + + } + private static T convertToConsumerGroup( InternalConsumerGroup c, T consumerGroup) { consumerGroup.setGroupId(c.getGroupId()); diff --git a/api/src/main/java/io/kafbat/ui/model/InternalConsumerGroup.java b/api/src/main/java/io/kafbat/ui/model/InternalConsumerGroup.java index 1f5246666..3954691f8 100644 --- a/api/src/main/java/io/kafbat/ui/model/InternalConsumerGroup.java +++ b/api/src/main/java/io/kafbat/ui/model/InternalConsumerGroup.java @@ -1,5 +1,7 @@ package io.kafbat.ui.model; +import static io.kafbat.ui.util.ConsumerGroupUtil.calculateConsumerLag; + import java.util.Collection; import java.util.Map; import java.util.Optional; @@ -56,22 +58,6 @@ public static InternalConsumerGroup create( return builder.build(); } - private static Long calculateConsumerLag(Map offsets, Map endOffsets) { - Long consumerLag = null; - // consumerLag should be undefined if no committed offsets found for topic - if (!offsets.isEmpty()) { - consumerLag = offsets.entrySet().stream() - .mapToLong(e -> - Optional.ofNullable(endOffsets) - .map(o -> o.get(e.getKey())) - .map(o -> o - e.getValue()) - .orElse(0L) - ).sum(); - } - - return consumerLag; - } - private static Integer calculateTopicNum(Map offsets, Collection members) { return (int) Stream.concat( diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index ce8f6a09c..5d4d39b1e 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -2,8 +2,10 @@ import com.google.common.collect.Streams; import com.google.common.collect.Table; +import io.kafbat.ui.api.model.ConsumerGroupLag; import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.emitter.EnhancedConsumer; +import io.kafbat.ui.model.ConsumerGroupLagDTO; import io.kafbat.ui.model.ConsumerGroupOrderingDTO; import io.kafbat.ui.model.InternalConsumerGroup; import io.kafbat.ui.model.InternalTopicConsumerGroup; @@ -16,6 +18,7 @@ import io.kafbat.ui.service.rbac.AccessControlService; import io.kafbat.ui.util.ApplicationMetrics; import io.kafbat.ui.util.KafkaClientSslPropertiesUtil; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -141,6 +144,68 @@ private boolean isConsumerGroupRelatesToTopic(String topic, return hasActiveMembersForTopic || hasCommittedOffsets; } + public Mono> getConsumerGroupsLag(KafkaCluster cluster, List groupNames, Optional lastUpdate) { + Statistics statistics = statisticsCache.get(cluster); + + if (statistics.getStatus().equals(ServerStatusDTO.ONLINE)) { + boolean select = lastUpdate + .map(t -> statistics.getClusterState().getScrapeFinishedAt().isAfter(Instant.ofEpochMilli(t))) + .orElse(true); + + if (select) { + Map consumerGroupsStates = + statistics.getClusterState().getConsumerGroupsStates(); + + return Mono.just(groupNames.stream() + .map(g -> Optional.ofNullable(consumerGroupsStates.get(g))) + .filter(Optional::isPresent) + .map(Optional::get) + .map(g -> Map.entry(g.group(), buildConsumerGroup(g))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) + ); + } + + } + + return Mono.just(Map.of()); + } + + private ConsumerGroupLagDTO buildConsumerGroup(ScrapedClusterState.ConsumerGroupState state) { + List>> topicPartitions = Stream.concat( + state.description().members().stream() + .flatMap(m -> + m.assignment().topicPartitions().stream() + .map(t -> Map.entry(t, Optional.empty())) + ), + state.committedOffsets().entrySet().stream() + .map(o -> Map.entry(o.getKey(), Optional.ofNullable(o.getValue()))) + ).collect( + Collectors.groupingBy( + Map.Entry::getKey, + Collectors.>, Map.Entry>>reducing( + Map.entry(new TopicPartition("", 0), Optional.empty()), + e -> e, + (a, b) -> { + if (a.getValue().isPresent() && b.getValue().isEmpty()) { + return a; + } else if (a.getValue().isEmpty() && b.getValue().isPresent()) { + return b; + } else if ((a.getValue().isPresent() && b.getValue().isPresent())) { + Long aOffset = a.getValue().get(); + Long bOffset = b.getValue().get(); + if (aOffset.compareTo(bOffset) > 0) { + return a; + } else { + return b; + } + } + return a; + } + ) + ) + ); + } + public record ConsumerGroupsPage(List consumerGroups, int totalPages) { } diff --git a/api/src/main/java/io/kafbat/ui/util/ConsumerGroupUtil.java b/api/src/main/java/io/kafbat/ui/util/ConsumerGroupUtil.java new file mode 100644 index 000000000..0d8ae8850 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/util/ConsumerGroupUtil.java @@ -0,0 +1,27 @@ +package io.kafbat.ui.util; + +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.common.TopicPartition; + +public class ConsumerGroupUtil { + private ConsumerGroupUtil() { + } + + public static Long calculateConsumerLag(Map offsets, + Map endOffsets) { + Long consumerLag = null; + // consumerLag should be undefined if no committed offsets found for topic + if (!offsets.isEmpty()) { + consumerLag = offsets.entrySet().stream() + .mapToLong(e -> + Optional.ofNullable(endOffsets) + .map(o -> o.get(e.getKey())) + .map(o -> o - e.getValue()) + .orElse(0L) + ).sum(); + } + + return consumerLag; + } +} diff --git a/contract-typespec/api/consumer-groups.tsp b/contract-typespec/api/consumer-groups.tsp index 8e13d588e..9c10462ff 100644 --- a/contract-typespec/api/consumer-groups.tsp +++ b/contract-typespec/api/consumer-groups.tsp @@ -23,6 +23,16 @@ interface ConsumerGroupsApi { @query fts?: boolean ): ConsumerGroupsPageResponse; + @get + @route("/lag") + @operationId("getConsumerGroupsLag") + @summary("getConsumerGroupsLag") + getConsumerGroupsLag( + @path clusterName: string, + @query ids: Array, + @query lastUpdate?: int64, + ): ConsumerGroupsLagResponse; + @get @route("/{id}") @operationId("getConsumerGroup") @@ -124,6 +134,16 @@ model ConsumerGroupOffsetsReset { partitionsOffsets?: PartitionOffset[]; } +model ConsumerGroupsLagResponse { + updateTimestamp: int64; + consumerGroups: Record; +} + +model ConsumerGroupLag { + lag: int64; + topics: Record; +} + enum ConsumerGroupOffsetsResetType { EARLIEST, LATEST, From 49903f8dd8a5025c5addcc17605f57264e65c063 Mon Sep 17 00:00:00 2001 From: German Osin Date: Sat, 6 Dec 2025 13:57:41 +0100 Subject: [PATCH 2/4] BE: Consumers lag endpoints --- .../controller/ConsumerGroupsController.java | 18 +- .../kafbat/ui/mapper/ConsumerGroupMapper.java | 10 -- .../ui/service/ConsumerGroupService.java | 87 ++++++---- .../ui/service/ConsumerGroupServiceTest.java | 157 +++++++++++++++++- 4 files changed, 225 insertions(+), 47 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java index 332be2de2..a72724a1d 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java @@ -36,6 +36,7 @@ import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuples; @RestController @RequiredArgsConstructor @@ -112,11 +113,18 @@ public Mono> getConsumerGroupsLag(S Mono> result = consumerGroupService.getConsumerGroupsLag(getCluster(clusterName), groupNames, Optional.ofNullable(lastUpdate)) - .flatMapMany(m -> Flux.fromIterable(m.entrySet())) - .filterWhen(cg -> accessControlService.isConsumerGroupAccessible(cg.getKey(), clusterName)) - .collectList() - .map(l -> l.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) - .map(m -> new ConsumerGroupsLagResponseDTO(Instant.now().toEpochMilli(), m)) + .flatMap(t -> + Flux.fromIterable(t.getT1().entrySet()) + .filterWhen(cg -> accessControlService.isConsumerGroupAccessible(cg.getKey(), clusterName)) + .collectList() + .map(l -> l.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + .map(l -> Tuples.of(t.getT2(), l)) + ) + .map(t -> + new ConsumerGroupsLagResponseDTO( + t.getT1().orElse(0L), t.getT2() + ) + ) .map(ResponseEntity::ok) .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); diff --git a/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java b/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java index a4c2df95c..9ed3dede5 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java @@ -85,16 +85,6 @@ public static ConsumerGroupDetailsDTO toDetailsDto(InternalConsumerGroup g) { return details; } - public static ConsumerGroupLagDTO toDto(ScrapedClusterState.ConsumerGroupState state) { - - Set topicPartitions = Stream.concat( - state.description().members().stream() - .flatMap(m -> m.assignment().topicPartitions().stream()), - state.committedOffsets().keySet().stream() - ).collect(Collectors.toSet()); - - } - private static T convertToConsumerGroup( InternalConsumerGroup c, T consumerGroup) { consumerGroup.setGroupId(c.getGroupId()); diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index 5d4d39b1e..6e2c67aa4 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -2,7 +2,6 @@ import com.google.common.collect.Streams; import com.google.common.collect.Table; -import io.kafbat.ui.api.model.ConsumerGroupLag; import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.emitter.EnhancedConsumer; import io.kafbat.ui.model.ConsumerGroupLagDTO; @@ -41,6 +40,8 @@ import org.apache.kafka.common.TopicPartition; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; @Service @RequiredArgsConstructor @@ -144,9 +145,15 @@ private boolean isConsumerGroupRelatesToTopic(String topic, return hasActiveMembersForTopic || hasCommittedOffsets; } - public Mono> getConsumerGroupsLag(KafkaCluster cluster, List groupNames, Optional lastUpdate) { + public Mono, Optional>> getConsumerGroupsLag( + KafkaCluster cluster, Collection groupNames, Optional lastUpdate) { Statistics statistics = statisticsCache.get(cluster); + Map endOffsets = statistics.getClusterState().getTopicStates().entrySet().stream() + .flatMap(e -> e.getValue().endOffsets().entrySet().stream().map(p -> + Map.entry(new TopicPartition(e.getKey(), p.getKey()), p.getValue())) + ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + if (statistics.getStatus().equals(ServerStatusDTO.ONLINE)) { boolean select = lastUpdate .map(t -> statistics.getClusterState().getScrapeFinishedAt().isAfter(Instant.ofEpochMilli(t))) @@ -156,54 +163,72 @@ public Mono> getConsumerGroupsLag(KafkaCluster Map consumerGroupsStates = statistics.getClusterState().getConsumerGroupsStates(); - return Mono.just(groupNames.stream() - .map(g -> Optional.ofNullable(consumerGroupsStates.get(g))) - .filter(Optional::isPresent) - .map(Optional::get) - .map(g -> Map.entry(g.group(), buildConsumerGroup(g))) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) + return Mono.just( + Tuples.of( + groupNames.stream() + .map(g -> Optional.ofNullable(consumerGroupsStates.get(g))) + .filter(Optional::isPresent) + .map(Optional::get) + .map(g -> Map.entry(g.group(), buildConsumerGroup(g, endOffsets))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), + Optional.of(statistics.getClusterState().getScrapeFinishedAt().toEpochMilli()) + ) ); } } - return Mono.just(Map.of()); + return Mono.just(Tuples.of(Map.of(), lastUpdate)); } - private ConsumerGroupLagDTO buildConsumerGroup(ScrapedClusterState.ConsumerGroupState state) { - List>> topicPartitions = Stream.concat( + private ConsumerGroupLagDTO buildConsumerGroup( + ScrapedClusterState.ConsumerGroupState state, + Map endOffsets + ) { + var topicPartitions = Stream.concat( state.description().members().stream() .flatMap(m -> m.assignment().topicPartitions().stream() - .map(t -> Map.entry(t, Optional.empty())) + .map(t -> Map.entry(t, Optional.empty())) ), state.committedOffsets().entrySet().stream() .map(o -> Map.entry(o.getKey(), Optional.ofNullable(o.getValue()))) ).collect( Collectors.groupingBy( Map.Entry::getKey, - Collectors.>, Map.Entry>>reducing( - Map.entry(new TopicPartition("", 0), Optional.empty()), - e -> e, - (a, b) -> { - if (a.getValue().isPresent() && b.getValue().isEmpty()) { - return a; - } else if (a.getValue().isEmpty() && b.getValue().isPresent()) { - return b; - } else if ((a.getValue().isPresent() && b.getValue().isPresent())) { - Long aOffset = a.getValue().get(); - Long bOffset = b.getValue().get(); - if (aOffset.compareTo(bOffset) > 0) { - return a; - } else { - return b; - } - } - return a; - } + Collectors.mapping(Map.Entry::getValue, + Collectors.>reducing( + Optional.empty(), + (a, b) -> Stream.of(a, b) + .flatMap(Optional::stream) + .max(Long::compare) + ) ) ) ); + + Map tpOffsets = new HashMap<>(); + + for (Map.Entry> entry : topicPartitions.entrySet()) { + Optional maybeOffset = Optional.ofNullable(endOffsets.get(entry.getKey())); + tpOffsets.put( + entry.getKey(), + maybeOffset.map(offset -> + entry.getValue().map(o -> offset - o).orElse(offset) + ).orElse(0L) + ); + } + + Map topicsLags = tpOffsets.entrySet().stream().collect( + Collectors.groupingBy( + (e) -> e.getKey().topic(), + Collectors.reducing(0L, Map.Entry::getValue, Long::sum) + ) + ); + + long lag = topicsLags.values().stream().mapToLong(v -> v).sum(); + + return new ConsumerGroupLagDTO(lag, topicsLags); } public record ConsumerGroupsPage(List consumerGroups, int totalPages) { diff --git a/api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java b/api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java index ac633f908..84208fb2f 100644 --- a/api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java +++ b/api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java @@ -4,6 +4,7 @@ import com.google.common.collect.ImmutableTable; import io.kafbat.ui.config.ClustersProperties; +import io.kafbat.ui.model.ConsumerGroupLagDTO; import io.kafbat.ui.model.InternalTopicConsumerGroup; import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.model.Metrics; @@ -16,7 +17,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.clients.admin.ConsumerGroupDescription; @@ -26,8 +29,13 @@ import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mapstruct.Mapping; import org.mockito.Mockito; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; class ConsumerGroupServiceTest { @@ -180,10 +188,20 @@ private ScrapedClusterState.ConsumerGroupState generate( List topicPartitions, Map lastOffsets, ConsumerGroupState state + ) { + return generate(topicPartitions, lastOffsets, state, 1); + } + + + private ScrapedClusterState.ConsumerGroupState generate( + List topicPartitions, + Map lastOffsets, + ConsumerGroupState state, + long lagPerPartition ) { final String name = UUID.randomUUID().toString(); Map commited = topicPartitions.stream() - .map(tp -> Map.entry(tp, lastOffsets.get(tp) - 1)) + .map(tp -> Map.entry(tp, lastOffsets.get(tp) - lagPerPartition)) .collect(Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue @@ -209,4 +227,141 @@ private ScrapedClusterState.ConsumerGroupState generate( ), commited ); } + + @ParameterizedTest + @MethodSource("consumerGroupsLags") + void calculateConsumerGroupsLags(String topic, + Map endOffsets, + long lagPerPartition, + long expectedLag, + Optional scapedInstant, + Optional queryInstant, + boolean expectedResult) { + // given + ClustersProperties.Cluster clusterProperties = new ClustersProperties.Cluster(); + clusterProperties.setName("test"); + + ClustersProperties clustersProperties = new ClustersProperties(); + clustersProperties.getClusters().add(clusterProperties); + + AdminClientService admin = Mockito.mock(AdminClientService.class); + + KafkaCluster cluster = KafkaCluster.builder() + .name("test") + .originalProperties(clusterProperties) + .build(); + + Map consumers = + Stream.generate(() -> generate( + List.of( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1) + ), + endOffsets, + ConsumerGroupState.DEAD, + lagPerPartition + )).limit(10).collect(Collectors.toMap( + ScrapedClusterState.ConsumerGroupState::group, + s -> s + )); + + ScrapedClusterState state = ScrapedClusterState.builder() + .scrapeFinishedAt(scapedInstant.orElse(Instant.now())) + .nodesStates(Map.of()) + .topicStates(Map.of( + topic, + new ScrapedClusterState.TopicState( + topic, null, List.of(), Map.of(), + endOffsets.entrySet().stream() + .filter(t -> t.getKey().topic().equals(topic)) + .collect( + Collectors.toMap( + e -> e.getKey().partition(), + Map.Entry::getValue + ) + ), null, null + ) + )) + .consumerGroupsStates(consumers) + .build(); + + Statistics statistics = Statistics.builder() + .status(ServerStatusDTO.ONLINE) + .version("Unknown") + .features(List.of()) + .clusterDescription(ReactiveAdminClient.ClusterDescription.empty()) + .metrics(Metrics.empty()) + .clusterState(state) + .build(); + + StatisticsCache cache = Mockito.mock(StatisticsCache.class); + Mockito.when(cache.get(cluster)).thenReturn(statistics); + + AccessControlService acl = Mockito.mock(AccessControlService.class); + ConsumerGroupService consumerGroupService = + new ConsumerGroupService(admin, acl, clustersProperties, cache); + + Tuple2, Optional> result = + consumerGroupService.getConsumerGroupsLag( + cluster, consumers.keySet(), queryInstant.map(Instant::toEpochMilli) + ).block(); + + assertThat(result).isNotNull(); + if (expectedResult) { + assertThat(result.getT1()).hasSize(consumers.size()); + assertThat(result.getT2()).isPresent(); + assertThat(result.getT2()).get().isEqualTo(state.getScrapeFinishedAt().toEpochMilli()); + + for (ConsumerGroupLagDTO dto : result.getT1().values()) { + assertThat(dto.getLag()).isEqualTo(expectedLag); + assertThat(dto.getTopics()).size().isEqualTo(1); + assertThat(dto.getTopics().get(topic)).isEqualTo(expectedLag); + } + } else { + assertThat(result.getT1()).isEmpty(); + assertThat(result.getT2()).isPresent(); + assertThat(result.getT2()).isEqualTo(queryInstant.map(Instant::toEpochMilli)); + } + } + + public static Stream consumerGroupsLags() { + String topic = "topic_"+ UUID.randomUUID(); + String anotherTopic = "another_topic_"+ UUID.randomUUID(); + return Stream.of( + Arguments.of( + topic, + Map.of( + new TopicPartition(topic, 0), 100L, + new TopicPartition(topic, 1), 100L, + new TopicPartition(anotherTopic, 0), 50L, + new TopicPartition(anotherTopic, 1), 50L + ), + 10L, + 20L, + Optional.empty(), Optional.empty(), true + ), + Arguments.of( + topic, + Map.of( + new TopicPartition(topic, 0), 100L, + new TopicPartition(topic, 1), 100L + ), + 0L, + 0L, + Optional.empty(), Optional.empty(), true + ), + Arguments.of( + topic, + Map.of( + new TopicPartition(topic, 0), 100L, + new TopicPartition(topic, 1), 100L + ), + 0L, + 0L, + Optional.of(Instant.now().minusSeconds(10)), + Optional.of(Instant.now()), + false + ) + ); + } } From afd6ba598ad2e167cc4445896a1a4439b6c05dd8 Mon Sep 17 00:00:00 2001 From: German Osin Date: Sat, 6 Dec 2025 14:11:36 +0100 Subject: [PATCH 3/4] Small refactoring --- .../ui/service/ConsumerGroupService.java | 35 +++++++++---------- .../io/kafbat/ui/util/ConsumerGroupUtil.java | 16 ++++++--- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index 6e2c67aa4..9dead32fb 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -1,5 +1,8 @@ package io.kafbat.ui.service; +import static io.kafbat.ui.util.ConsumerGroupUtil.calculateConsumerLag; +import static io.kafbat.ui.util.ConsumerGroupUtil.calculateLag; + import com.google.common.collect.Streams; import com.google.common.collect.Table; import io.kafbat.ui.config.ClustersProperties; @@ -185,7 +188,7 @@ private ConsumerGroupLagDTO buildConsumerGroup( ScrapedClusterState.ConsumerGroupState state, Map endOffsets ) { - var topicPartitions = Stream.concat( + var commitedTopicPartitions = Stream.concat( state.description().members().stream() .flatMap(m -> m.assignment().topicPartitions().stream() @@ -207,24 +210,18 @@ private ConsumerGroupLagDTO buildConsumerGroup( ) ); - Map tpOffsets = new HashMap<>(); - - for (Map.Entry> entry : topicPartitions.entrySet()) { - Optional maybeOffset = Optional.ofNullable(endOffsets.get(entry.getKey())); - tpOffsets.put( - entry.getKey(), - maybeOffset.map(offset -> - entry.getValue().map(o -> offset - o).orElse(offset) - ).orElse(0L) - ); - } - - Map topicsLags = tpOffsets.entrySet().stream().collect( - Collectors.groupingBy( - (e) -> e.getKey().topic(), - Collectors.reducing(0L, Map.Entry::getValue, Long::sum) - ) - ); + Map topicsLags = commitedTopicPartitions.entrySet().stream() + .map(e -> + Map.entry( + e.getKey(), + calculateLag(e.getValue(), Optional.ofNullable(endOffsets.get(e.getKey()))).orElse(0L) + ) + ).collect( + Collectors.groupingBy( + (e) -> e.getKey().topic(), + Collectors.reducing(0L, Map.Entry::getValue, Long::sum) + ) + ); long lag = topicsLags.values().stream().mapToLong(v -> v).sum(); diff --git a/api/src/main/java/io/kafbat/ui/util/ConsumerGroupUtil.java b/api/src/main/java/io/kafbat/ui/util/ConsumerGroupUtil.java index 0d8ae8850..472f6f03a 100644 --- a/api/src/main/java/io/kafbat/ui/util/ConsumerGroupUtil.java +++ b/api/src/main/java/io/kafbat/ui/util/ConsumerGroupUtil.java @@ -15,13 +15,21 @@ public static Long calculateConsumerLag(Map offsets, if (!offsets.isEmpty()) { consumerLag = offsets.entrySet().stream() .mapToLong(e -> - Optional.ofNullable(endOffsets) - .map(o -> o.get(e.getKey())) - .map(o -> o - e.getValue()) - .orElse(0L) + calculateLag( + Optional.ofNullable(e.getValue()), + Optional.ofNullable(endOffsets.get(e.getKey())) + ).orElse(0L) ).sum(); } return consumerLag; } + + public static Optional calculateLag(Optional commitedOffset, Optional endOffset) { + Optional consumerLag = Optional.empty(); + if (endOffset.isPresent()) { + consumerLag = commitedOffset.map(o -> endOffset.get() - o); + } + return consumerLag; + } } From 30422a9ee850baa9c5b73c64c078a15ebe9e5ee9 Mon Sep 17 00:00:00 2001 From: German Osin Date: Sat, 6 Dec 2025 14:13:16 +0100 Subject: [PATCH 4/4] Fixed checkstyle --- .../java/io/kafbat/ui/service/ConsumerGroupServiceTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java b/api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java index 84208fb2f..69b63d175 100644 --- a/api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java +++ b/api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java @@ -325,8 +325,8 @@ void calculateConsumerGroupsLags(String topic, } public static Stream consumerGroupsLags() { - String topic = "topic_"+ UUID.randomUUID(); - String anotherTopic = "another_topic_"+ UUID.randomUUID(); + String topic = "topic_" + UUID.randomUUID(); + String anotherTopic = "another_topic_" + UUID.randomUUID(); return Stream.of( Arguments.of( topic,