diff --git a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java index 9e954575d..a72724a1d 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java @@ -12,6 +12,7 @@ import io.kafbat.ui.model.ConsumerGroupDetailsDTO; import io.kafbat.ui.model.ConsumerGroupOffsetsResetDTO; import io.kafbat.ui.model.ConsumerGroupOrderingDTO; +import io.kafbat.ui.model.ConsumerGroupsLagResponseDTO; import io.kafbat.ui.model.ConsumerGroupsPageResponseDTO; import io.kafbat.ui.model.PartitionOffsetDTO; import io.kafbat.ui.model.SortOrderDTO; @@ -20,9 +21,12 @@ import io.kafbat.ui.service.ConsumerGroupService; import io.kafbat.ui.service.OffsetsResetService; import io.kafbat.ui.service.mcp.McpTool; +import java.time.Instant; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Supplier; +import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -32,6 +36,7 @@ import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuples; @RestController @RequiredArgsConstructor @@ -95,6 +100,39 @@ public Mono> getConsumerGroup(String clu .doOnEach(sig -> audit(context, sig)); } + @Override + public Mono> getConsumerGroupsLag(String clusterName, + List groupNames, + Long lastUpdate, + ServerWebExchange exchange) { + + var context = AccessContext.builder() + .cluster(clusterName) + .operationName("getConsumerGroupsLag") + .build(); + + Mono> result = + consumerGroupService.getConsumerGroupsLag(getCluster(clusterName), groupNames, Optional.ofNullable(lastUpdate)) + .flatMap(t -> + Flux.fromIterable(t.getT1().entrySet()) + .filterWhen(cg -> accessControlService.isConsumerGroupAccessible(cg.getKey(), clusterName)) + .collectList() + .map(l -> l.stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + .map(l -> Tuples.of(t.getT2(), l)) + ) + .map(t -> + new ConsumerGroupsLagResponseDTO( + t.getT1().orElse(0L), t.getT2() + ) + ) + .map(ResponseEntity::ok) + .switchIfEmpty(Mono.just(ResponseEntity.notFound().build())); + + return validateAccess(context) + .then(result) + .doOnEach(sig -> audit(context, sig)); + } + @Override public Mono>> getTopicConsumerGroups(String clusterName, String topicName, diff --git a/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java b/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java index 800eab757..9ed3dede5 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java @@ -1,16 +1,25 @@ package io.kafbat.ui.mapper; +import io.kafbat.ui.api.model.ConsumerGroupLag; +import io.kafbat.ui.api.model.ConsumerGroupState; import io.kafbat.ui.model.BrokerDTO; import io.kafbat.ui.model.ConsumerGroupDTO; import io.kafbat.ui.model.ConsumerGroupDetailsDTO; +import io.kafbat.ui.model.ConsumerGroupLagDTO; import io.kafbat.ui.model.ConsumerGroupStateDTO; import io.kafbat.ui.model.ConsumerGroupTopicPartitionDTO; import io.kafbat.ui.model.InternalConsumerGroup; import io.kafbat.ui.model.InternalTopicConsumerGroup; +import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState; +import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; diff --git a/api/src/main/java/io/kafbat/ui/model/InternalConsumerGroup.java b/api/src/main/java/io/kafbat/ui/model/InternalConsumerGroup.java index 1f5246666..3954691f8 100644 --- a/api/src/main/java/io/kafbat/ui/model/InternalConsumerGroup.java +++ b/api/src/main/java/io/kafbat/ui/model/InternalConsumerGroup.java @@ -1,5 +1,7 @@ package io.kafbat.ui.model; +import static io.kafbat.ui.util.ConsumerGroupUtil.calculateConsumerLag; + import java.util.Collection; import java.util.Map; import java.util.Optional; @@ -56,22 +58,6 @@ public static InternalConsumerGroup create( return builder.build(); } - private static Long calculateConsumerLag(Map offsets, Map endOffsets) { - Long consumerLag = null; - // consumerLag should be undefined if no committed offsets found for topic - if (!offsets.isEmpty()) { - consumerLag = offsets.entrySet().stream() - .mapToLong(e -> - Optional.ofNullable(endOffsets) - .map(o -> o.get(e.getKey())) - .map(o -> o - e.getValue()) - .orElse(0L) - ).sum(); - } - - return consumerLag; - } - private static Integer calculateTopicNum(Map offsets, Collection members) { return (int) Stream.concat( diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index ce8f6a09c..9dead32fb 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -1,9 +1,13 @@ package io.kafbat.ui.service; +import static io.kafbat.ui.util.ConsumerGroupUtil.calculateConsumerLag; +import static io.kafbat.ui.util.ConsumerGroupUtil.calculateLag; + import com.google.common.collect.Streams; import com.google.common.collect.Table; import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.emitter.EnhancedConsumer; +import io.kafbat.ui.model.ConsumerGroupLagDTO; import io.kafbat.ui.model.ConsumerGroupOrderingDTO; import io.kafbat.ui.model.InternalConsumerGroup; import io.kafbat.ui.model.InternalTopicConsumerGroup; @@ -16,6 +20,7 @@ import io.kafbat.ui.service.rbac.AccessControlService; import io.kafbat.ui.util.ApplicationMetrics; import io.kafbat.ui.util.KafkaClientSslPropertiesUtil; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -38,6 +43,8 @@ import org.apache.kafka.common.TopicPartition; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; @Service @RequiredArgsConstructor @@ -141,6 +148,86 @@ private boolean isConsumerGroupRelatesToTopic(String topic, return hasActiveMembersForTopic || hasCommittedOffsets; } + public Mono, Optional>> getConsumerGroupsLag( + KafkaCluster cluster, Collection groupNames, Optional lastUpdate) { + Statistics statistics = statisticsCache.get(cluster); + + Map endOffsets = statistics.getClusterState().getTopicStates().entrySet().stream() + .flatMap(e -> e.getValue().endOffsets().entrySet().stream().map(p -> + Map.entry(new TopicPartition(e.getKey(), p.getKey()), p.getValue())) + ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (statistics.getStatus().equals(ServerStatusDTO.ONLINE)) { + boolean select = lastUpdate + .map(t -> statistics.getClusterState().getScrapeFinishedAt().isAfter(Instant.ofEpochMilli(t))) + .orElse(true); + + if (select) { + Map consumerGroupsStates = + statistics.getClusterState().getConsumerGroupsStates(); + + return Mono.just( + Tuples.of( + groupNames.stream() + .map(g -> Optional.ofNullable(consumerGroupsStates.get(g))) + .filter(Optional::isPresent) + .map(Optional::get) + .map(g -> Map.entry(g.group(), buildConsumerGroup(g, endOffsets))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), + Optional.of(statistics.getClusterState().getScrapeFinishedAt().toEpochMilli()) + ) + ); + } + + } + + return Mono.just(Tuples.of(Map.of(), lastUpdate)); + } + + private ConsumerGroupLagDTO buildConsumerGroup( + ScrapedClusterState.ConsumerGroupState state, + Map endOffsets + ) { + var commitedTopicPartitions = Stream.concat( + state.description().members().stream() + .flatMap(m -> + m.assignment().topicPartitions().stream() + .map(t -> Map.entry(t, Optional.empty())) + ), + state.committedOffsets().entrySet().stream() + .map(o -> Map.entry(o.getKey(), Optional.ofNullable(o.getValue()))) + ).collect( + Collectors.groupingBy( + Map.Entry::getKey, + Collectors.mapping(Map.Entry::getValue, + Collectors.>reducing( + Optional.empty(), + (a, b) -> Stream.of(a, b) + .flatMap(Optional::stream) + .max(Long::compare) + ) + ) + ) + ); + + Map topicsLags = commitedTopicPartitions.entrySet().stream() + .map(e -> + Map.entry( + e.getKey(), + calculateLag(e.getValue(), Optional.ofNullable(endOffsets.get(e.getKey()))).orElse(0L) + ) + ).collect( + Collectors.groupingBy( + (e) -> e.getKey().topic(), + Collectors.reducing(0L, Map.Entry::getValue, Long::sum) + ) + ); + + long lag = topicsLags.values().stream().mapToLong(v -> v).sum(); + + return new ConsumerGroupLagDTO(lag, topicsLags); + } + public record ConsumerGroupsPage(List consumerGroups, int totalPages) { } diff --git a/api/src/main/java/io/kafbat/ui/util/ConsumerGroupUtil.java b/api/src/main/java/io/kafbat/ui/util/ConsumerGroupUtil.java new file mode 100644 index 000000000..472f6f03a --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/util/ConsumerGroupUtil.java @@ -0,0 +1,35 @@ +package io.kafbat.ui.util; + +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.common.TopicPartition; + +public class ConsumerGroupUtil { + private ConsumerGroupUtil() { + } + + public static Long calculateConsumerLag(Map offsets, + Map endOffsets) { + Long consumerLag = null; + // consumerLag should be undefined if no committed offsets found for topic + if (!offsets.isEmpty()) { + consumerLag = offsets.entrySet().stream() + .mapToLong(e -> + calculateLag( + Optional.ofNullable(e.getValue()), + Optional.ofNullable(endOffsets.get(e.getKey())) + ).orElse(0L) + ).sum(); + } + + return consumerLag; + } + + public static Optional calculateLag(Optional commitedOffset, Optional endOffset) { + Optional consumerLag = Optional.empty(); + if (endOffset.isPresent()) { + consumerLag = commitedOffset.map(o -> endOffset.get() - o); + } + return consumerLag; + } +} diff --git a/api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java b/api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java index ac633f908..69b63d175 100644 --- a/api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java +++ b/api/src/test/java/io/kafbat/ui/service/ConsumerGroupServiceTest.java @@ -4,6 +4,7 @@ import com.google.common.collect.ImmutableTable; import io.kafbat.ui.config.ClustersProperties; +import io.kafbat.ui.model.ConsumerGroupLagDTO; import io.kafbat.ui.model.InternalTopicConsumerGroup; import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.model.Metrics; @@ -16,7 +17,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.clients.admin.ConsumerGroupDescription; @@ -26,8 +29,13 @@ import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mapstruct.Mapping; import org.mockito.Mockito; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; class ConsumerGroupServiceTest { @@ -180,10 +188,20 @@ private ScrapedClusterState.ConsumerGroupState generate( List topicPartitions, Map lastOffsets, ConsumerGroupState state + ) { + return generate(topicPartitions, lastOffsets, state, 1); + } + + + private ScrapedClusterState.ConsumerGroupState generate( + List topicPartitions, + Map lastOffsets, + ConsumerGroupState state, + long lagPerPartition ) { final String name = UUID.randomUUID().toString(); Map commited = topicPartitions.stream() - .map(tp -> Map.entry(tp, lastOffsets.get(tp) - 1)) + .map(tp -> Map.entry(tp, lastOffsets.get(tp) - lagPerPartition)) .collect(Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue @@ -209,4 +227,141 @@ private ScrapedClusterState.ConsumerGroupState generate( ), commited ); } + + @ParameterizedTest + @MethodSource("consumerGroupsLags") + void calculateConsumerGroupsLags(String topic, + Map endOffsets, + long lagPerPartition, + long expectedLag, + Optional scapedInstant, + Optional queryInstant, + boolean expectedResult) { + // given + ClustersProperties.Cluster clusterProperties = new ClustersProperties.Cluster(); + clusterProperties.setName("test"); + + ClustersProperties clustersProperties = new ClustersProperties(); + clustersProperties.getClusters().add(clusterProperties); + + AdminClientService admin = Mockito.mock(AdminClientService.class); + + KafkaCluster cluster = KafkaCluster.builder() + .name("test") + .originalProperties(clusterProperties) + .build(); + + Map consumers = + Stream.generate(() -> generate( + List.of( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1) + ), + endOffsets, + ConsumerGroupState.DEAD, + lagPerPartition + )).limit(10).collect(Collectors.toMap( + ScrapedClusterState.ConsumerGroupState::group, + s -> s + )); + + ScrapedClusterState state = ScrapedClusterState.builder() + .scrapeFinishedAt(scapedInstant.orElse(Instant.now())) + .nodesStates(Map.of()) + .topicStates(Map.of( + topic, + new ScrapedClusterState.TopicState( + topic, null, List.of(), Map.of(), + endOffsets.entrySet().stream() + .filter(t -> t.getKey().topic().equals(topic)) + .collect( + Collectors.toMap( + e -> e.getKey().partition(), + Map.Entry::getValue + ) + ), null, null + ) + )) + .consumerGroupsStates(consumers) + .build(); + + Statistics statistics = Statistics.builder() + .status(ServerStatusDTO.ONLINE) + .version("Unknown") + .features(List.of()) + .clusterDescription(ReactiveAdminClient.ClusterDescription.empty()) + .metrics(Metrics.empty()) + .clusterState(state) + .build(); + + StatisticsCache cache = Mockito.mock(StatisticsCache.class); + Mockito.when(cache.get(cluster)).thenReturn(statistics); + + AccessControlService acl = Mockito.mock(AccessControlService.class); + ConsumerGroupService consumerGroupService = + new ConsumerGroupService(admin, acl, clustersProperties, cache); + + Tuple2, Optional> result = + consumerGroupService.getConsumerGroupsLag( + cluster, consumers.keySet(), queryInstant.map(Instant::toEpochMilli) + ).block(); + + assertThat(result).isNotNull(); + if (expectedResult) { + assertThat(result.getT1()).hasSize(consumers.size()); + assertThat(result.getT2()).isPresent(); + assertThat(result.getT2()).get().isEqualTo(state.getScrapeFinishedAt().toEpochMilli()); + + for (ConsumerGroupLagDTO dto : result.getT1().values()) { + assertThat(dto.getLag()).isEqualTo(expectedLag); + assertThat(dto.getTopics()).size().isEqualTo(1); + assertThat(dto.getTopics().get(topic)).isEqualTo(expectedLag); + } + } else { + assertThat(result.getT1()).isEmpty(); + assertThat(result.getT2()).isPresent(); + assertThat(result.getT2()).isEqualTo(queryInstant.map(Instant::toEpochMilli)); + } + } + + public static Stream consumerGroupsLags() { + String topic = "topic_" + UUID.randomUUID(); + String anotherTopic = "another_topic_" + UUID.randomUUID(); + return Stream.of( + Arguments.of( + topic, + Map.of( + new TopicPartition(topic, 0), 100L, + new TopicPartition(topic, 1), 100L, + new TopicPartition(anotherTopic, 0), 50L, + new TopicPartition(anotherTopic, 1), 50L + ), + 10L, + 20L, + Optional.empty(), Optional.empty(), true + ), + Arguments.of( + topic, + Map.of( + new TopicPartition(topic, 0), 100L, + new TopicPartition(topic, 1), 100L + ), + 0L, + 0L, + Optional.empty(), Optional.empty(), true + ), + Arguments.of( + topic, + Map.of( + new TopicPartition(topic, 0), 100L, + new TopicPartition(topic, 1), 100L + ), + 0L, + 0L, + Optional.of(Instant.now().minusSeconds(10)), + Optional.of(Instant.now()), + false + ) + ); + } } diff --git a/contract-typespec/api/consumer-groups.tsp b/contract-typespec/api/consumer-groups.tsp index 8e13d588e..9c10462ff 100644 --- a/contract-typespec/api/consumer-groups.tsp +++ b/contract-typespec/api/consumer-groups.tsp @@ -23,6 +23,16 @@ interface ConsumerGroupsApi { @query fts?: boolean ): ConsumerGroupsPageResponse; + @get + @route("/lag") + @operationId("getConsumerGroupsLag") + @summary("getConsumerGroupsLag") + getConsumerGroupsLag( + @path clusterName: string, + @query ids: Array, + @query lastUpdate?: int64, + ): ConsumerGroupsLagResponse; + @get @route("/{id}") @operationId("getConsumerGroup") @@ -124,6 +134,16 @@ model ConsumerGroupOffsetsReset { partitionsOffsets?: PartitionOffset[]; } +model ConsumerGroupsLagResponse { + updateTimestamp: int64; + consumerGroups: Record; +} + +model ConsumerGroupLag { + lag: int64; + topics: Record; +} + enum ConsumerGroupOffsetsResetType { EARLIEST, LATEST,