diff --git a/api/build.gradle b/api/build.gradle index 156f1d022..b427c8c3a 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -58,6 +58,8 @@ dependencies { implementation libs.lucene.queryparser implementation libs.lucene.analysis.common + implementation libs.fastcsv + implementation libs.opendatadiscovery.oddrn implementation(libs.opendatadiscovery.client) { exclude group: 'org.springframework.boot', module: 'spring-boot-starter-webflux' diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index c9f84365c..88a6a9ea7 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -45,6 +45,16 @@ public class ClustersProperties { AdminClient adminClient = new AdminClient(); + Csv csv = new Csv(); + + @Data + public static class Csv { + String lineDelimeter = "crlf"; + char quoteCharacter = '"'; + String quoteStrategy = "required"; + char fieldSeparator = ','; + } + @Data public static class AdminClient { Integer timeout; diff --git a/api/src/main/java/io/kafbat/ui/controller/AbstractController.java b/api/src/main/java/io/kafbat/ui/controller/AbstractController.java index be29d4a4b..605199f6c 100644 --- a/api/src/main/java/io/kafbat/ui/controller/AbstractController.java +++ b/api/src/main/java/io/kafbat/ui/controller/AbstractController.java @@ -4,9 +4,16 @@ import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.model.rbac.AccessContext; import io.kafbat.ui.service.ClustersStorage; +import io.kafbat.ui.service.CsvWriterService; import io.kafbat.ui.service.audit.AuditService; import io.kafbat.ui.service.rbac.AccessControlService; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Signal; @@ -15,6 +22,7 @@ public abstract class AbstractController { protected ClustersStorage clustersStorage; protected AccessControlService accessControlService; protected AuditService auditService; + protected CsvWriterService csvWriterService; protected KafkaCluster getCluster(String name) { return clustersStorage.getClusterByName(name) @@ -44,4 +52,22 @@ public void setAccessControlService(AccessControlService accessControlService) { public void setAuditService(AuditService auditService) { this.auditService = auditService; } + + public , R> Mono> responseToCsv(ResponseEntity response) { + return responseToCsv(response, (t) -> t); + } + + public Mono> responseToCsv(ResponseEntity response, Function> extract) { + if (response.getStatusCode().is2xxSuccessful()) { + return mapToCsv(extract.apply(response.getBody())).map(ResponseEntity::ok); + } else { + return Mono.just(ResponseEntity.status(response.getStatusCode()).body( + Optional.ofNullable(response.getBody()).map(Object::toString).orElse("") + )); + } + } + + protected Mono mapToCsv(Flux body) { + return csvWriterService.write(body); + } } diff --git a/api/src/main/java/io/kafbat/ui/controller/AclsController.java b/api/src/main/java/io/kafbat/ui/controller/AclsController.java index 190b1081c..5d146218a 100644 --- a/api/src/main/java/io/kafbat/ui/controller/AclsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/AclsController.java @@ -63,6 +63,8 @@ public Mono> deleteAcl(String clusterName, Mono>> listAcls(String clusterName, KafkaAclResourceTypeDTO resourceTypeDto, @@ -96,19 +98,14 @@ public Mono>> listAcls(String clusterName, } @Override - public Mono> getAclAsCsv(String clusterName, ServerWebExchange exchange) { - AccessContext context = AccessContext.builder() - .cluster(clusterName) - .aclActions(AclAction.VIEW) - .operationName("getAclAsCsv") - .build(); - - return validateAccess(context).then( - aclsService.getAclAsCsvString(getCluster(clusterName)) - .map(ResponseEntity::ok) - .flatMap(Mono::just) - .doOnEach(sig -> audit(context, sig)) - ); + public Mono> getAclAsCsv(String clusterName, + KafkaAclResourceTypeDTO resourceType, + String resourceName, + KafkaAclNamePatternTypeDTO namePatternType, + String search, Boolean fts, + ServerWebExchange exchange) { + return listAcls(clusterName, resourceType, resourceName, namePatternType, search, fts, exchange) + .flatMap(this::responseToCsv); } @Override diff --git a/api/src/main/java/io/kafbat/ui/controller/BrokersController.java b/api/src/main/java/io/kafbat/ui/controller/BrokersController.java index 9c76fdc5e..64786ceab 100644 --- a/api/src/main/java/io/kafbat/ui/controller/BrokersController.java +++ b/api/src/main/java/io/kafbat/ui/controller/BrokersController.java @@ -46,6 +46,12 @@ public Mono>> getBrokers(String clusterName, .doOnEach(sig -> audit(context, sig)); } + @Override + public Mono> getBrokersCsv(String clusterName, + ServerWebExchange exchange) { + return getBrokers(clusterName, exchange).flatMap(this::responseToCsv); + } + @Override public Mono> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) { diff --git a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java index 9e954575d..0b0341522 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java @@ -22,6 +22,7 @@ import io.kafbat.ui.service.mcp.McpTool; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.function.Supplier; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -95,6 +96,8 @@ public Mono> getConsumerGroup(String clu .doOnEach(sig -> audit(context, sig)); } + + @Override public Mono>> getTopicConsumerGroups(String clusterName, String topicName, @@ -120,6 +123,8 @@ public Mono>> getTopicConsumerGroups(Strin .doOnEach(sig -> audit(context, sig)); } + + @Override public Mono> getConsumerGroupsPage( String clusterName, @@ -138,10 +143,43 @@ public Mono> getConsumerGroupsPage .build(); return validateAccess(context).then( - consumerGroupService.getConsumerGroupsPage( + consumerGroupService.getConsumerGroups( + getCluster(clusterName), + OptionalInt.of( + Optional.ofNullable(page).filter(i -> i > 0).orElse(1) + ), + OptionalInt.of( + Optional.ofNullable(perPage).filter(i -> i > 0).orElse(defaultConsumerGroupsPageSize) + ), + search, + fts, + Optional.ofNullable(orderBy).orElse(ConsumerGroupOrderingDTO.NAME), + Optional.ofNullable(sortOrderDto).orElse(SortOrderDTO.ASC) + ) + .map(this::convertPage) + .map(ResponseEntity::ok) + ).doOnEach(sig -> audit(context, sig)); + } + + + @Override + public Mono> getConsumerGroupsCsv(String clusterName, Integer page, + Integer perPage, String search, + ConsumerGroupOrderingDTO orderBy, + SortOrderDTO sortOrderDto, Boolean fts, + ServerWebExchange exchange) { + + var context = AccessContext.builder() + .cluster(clusterName) + // consumer group access validation is within the service + .operationName("getConsumerGroupsPage") + .build(); + + return validateAccess(context).then( + consumerGroupService.getConsumerGroups( getCluster(clusterName), - Optional.ofNullable(page).filter(i -> i > 0).orElse(1), - Optional.ofNullable(perPage).filter(i -> i > 0).orElse(defaultConsumerGroupsPageSize), + OptionalInt.empty(), + OptionalInt.empty(), search, fts, Optional.ofNullable(orderBy).orElse(ConsumerGroupOrderingDTO.NAME), @@ -149,6 +187,7 @@ public Mono> getConsumerGroupsPage ) .map(this::convertPage) .map(ResponseEntity::ok) + .flatMap(r -> responseToCsv(r, (g) -> Flux.fromIterable(g.getConsumerGroups()))) ).doOnEach(sig -> audit(context, sig)); } @@ -194,7 +233,12 @@ public Mono> resetConsumerGroupOffsets(String clusterName, ); } Map offsets = reset.getPartitionsOffsets().stream() - .collect(toMap(PartitionOffsetDTO::getPartition, PartitionOffsetDTO::getOffset)); + .collect( + toMap( + PartitionOffsetDTO::getPartition, + d -> Optional.ofNullable(d.getOffset()).orElse(0L) + ) + ); return offsetsResetService.resetToOffsets(cluster, group, reset.getTopic(), offsets); default: return Mono.error( diff --git a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java index 559d9ae6c..b271d19fd 100644 --- a/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java +++ b/api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java @@ -17,6 +17,7 @@ import io.kafbat.ui.model.NewConnectorDTO; import io.kafbat.ui.model.SortOrderDTO; import io.kafbat.ui.model.TaskDTO; +import io.kafbat.ui.model.TopicsResponseDTO; import io.kafbat.ui.model.rbac.AccessContext; import io.kafbat.ui.model.rbac.permission.ConnectAction; import io.kafbat.ui.service.KafkaConnectService; @@ -56,6 +57,13 @@ public Mono>> getConnects(String clusterName, return Mono.just(ResponseEntity.ok(availableConnects)); } + @Override + public Mono> getConnectsCsv(String clusterName, Boolean withStats, + ServerWebExchange exchange) { + return getConnects(clusterName, withStats, exchange) + .flatMap(this::responseToCsv); + } + @Override public Mono>> getConnectors(String clusterName, String connectName, ServerWebExchange exchange) { @@ -157,6 +165,15 @@ public Mono>> getAllConnectors( .doOnEach(sig -> audit(context, sig)); } + @Override + public Mono> getAllConnectorsCsv(String clusterName, String search, + ConnectorColumnsToSortDTO orderBy, + SortOrderDTO sortOrder, Boolean fts, + ServerWebExchange exchange) { + return getAllConnectors(clusterName, search, orderBy, sortOrder, fts, exchange) + .flatMap(this::responseToCsv); + } + @Override public Mono>> getConnectorConfig(String clusterName, String connectName, diff --git a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java index 51af387c6..7d440719d 100644 --- a/api/src/main/java/io/kafbat/ui/controller/TopicsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/TopicsController.java @@ -29,7 +29,6 @@ import io.kafbat.ui.model.TopicUpdateDTO; import io.kafbat.ui.model.TopicsResponseDTO; import io.kafbat.ui.model.rbac.AccessContext; -import io.kafbat.ui.model.rbac.permission.ConnectAction; import io.kafbat.ui.service.KafkaConnectService; import io.kafbat.ui.service.TopicsService; import io.kafbat.ui.service.analyze.TopicAnalysisService; @@ -186,7 +185,7 @@ public Mono> getTopics(String clusterName, .operationName("getTopics") .build(); - return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal, fts) + return topicsService.getTopics(getCluster(clusterName), search, showInternal, fts) .flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName)) .flatMap(topics -> { int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE; @@ -219,6 +218,28 @@ public Mono> getTopics(String clusterName, .doOnEach(sig -> audit(context, sig)); } + @Override + public Mono> getTopicsCsv(String clusterName, Boolean showInternal, + String search, TopicColumnsToSortDTO orderBy, + SortOrderDTO sortOrder, Boolean fts, + ServerWebExchange exchange) { + + AccessContext context = AccessContext.builder() + .cluster(clusterName) + .operationName("getTopicsCsv") + .build(); + + ClustersProperties.ClusterFtsProperties ftsProperties = clustersProperties.getFts(); + Comparator comparatorForTopic = getComparatorForTopic(orderBy, ftsProperties.use(fts)); + + return topicsService + .getTopics(getCluster(clusterName), search, showInternal, fts) + .flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName)) + .map(topics -> topics.stream().sorted(comparatorForTopic).toList()) + .flatMap(topics -> responseToCsv(ResponseEntity.ok(Flux.fromIterable(topics)))) + .doOnEach(sig -> audit(context, sig)); + } + @Override public Mono> updateTopic( String clusterName, String topicName, @Valid Mono topicUpdate, diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index ce8f6a09c..9a4e56d6b 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.Properties; import java.util.Set; import java.util.function.ToIntFunction; @@ -70,6 +71,31 @@ private Mono> getConsumerGroups( }); } + public Mono getConsumerGroups( + KafkaCluster cluster, + OptionalInt pageNum, + OptionalInt perPage, + @Nullable String search, + Boolean fts, + ConsumerGroupOrderingDTO orderBy, + SortOrderDTO sortOrderDto) { + return adminClientService.get(cluster).flatMap(ac -> + ac.listConsumerGroups() + .map(listing -> filterGroups(listing, search, fts)) + .flatMapIterable(lst -> lst) + .filterWhen(cg -> accessControlService.isConsumerGroupAccessible(cg.groupId(), cluster.getName())) + .collectList() + .flatMap(allGroups -> + loadSortedDescriptions(ac, allGroups, pageNum, perPage, orderBy, sortOrderDto) + .flatMap(descriptions -> getConsumerGroups(ac, descriptions) + .map(page -> + ConsumerGroupsPage.from(page, allGroups.size(), pageNum, perPage) + ) + ) + ) + ); + } + public Mono> getConsumerGroupsForTopic(KafkaCluster cluster, String topic) { return adminClientService.get(cluster) @@ -142,33 +168,20 @@ private boolean isConsumerGroupRelatesToTopic(String topic, } public record ConsumerGroupsPage(List consumerGroups, int totalPages) { + public static ConsumerGroupsPage from(List groups, + int totalSize, + OptionalInt pageNum, + OptionalInt perPage) { + return new ConsumerGroupsPage(groups, + (totalSize / perPage.orElse(totalSize)) + (totalSize % perPage.orElse(totalSize) == 0 ? 0 : 1) + ); + } } private record GroupWithDescr(InternalConsumerGroup icg, ConsumerGroupDescription cgd) { } - public Mono getConsumerGroupsPage( - KafkaCluster cluster, - int pageNum, - int perPage, - @Nullable String search, - Boolean fts, - ConsumerGroupOrderingDTO orderBy, - SortOrderDTO sortOrderDto) { - return adminClientService.get(cluster).flatMap(ac -> - ac.listConsumerGroups() - .map(listing -> filterGroups(listing, search, fts) - ) - .flatMapIterable(lst -> lst) - .filterWhen(cg -> accessControlService.isConsumerGroupAccessible(cg.groupId(), cluster.getName())) - .collectList() - .flatMap(allGroups -> - loadSortedDescriptions(ac, allGroups, pageNum, perPage, orderBy, sortOrderDto) - .flatMap(descriptions -> getConsumerGroups(ac, descriptions) - .map(page -> new ConsumerGroupsPage( - page, - (allGroups.size() / perPage) + (allGroups.size() % perPage == 0 ? 0 : 1)))))); - } + private Collection filterGroups(Collection groups, String search, Boolean useFts) { @@ -180,8 +193,8 @@ private Collection filterGroups(Collection> loadSortedDescriptions(ReactiveAdminClient ac, List groups, - int pageNum, - int perPage, + OptionalInt pageNum, + OptionalInt perPage, ConsumerGroupOrderingDTO orderBy, SortOrderDTO sortOrderDto) { return switch (orderBy) { @@ -232,8 +245,8 @@ private Mono> loadSortedDescriptions(ReactiveAdmi private Mono> loadDescriptionsByListings(ReactiveAdminClient ac, List listings, Comparator comparator, - int pageNum, - int perPage, + OptionalInt pageNum, + OptionalInt perPage, SortOrderDTO sortOrderDto) { List sortedGroups = sortAndPaginate(listings, comparator, pageNum, perPage, sortOrderDto) .map(ConsumerGroupListing::groupId) @@ -244,13 +257,19 @@ private Mono> loadDescriptionsByListings(Reactive private Stream sortAndPaginate(Collection collection, Comparator comparator, - int pageNum, - int perPage, + OptionalInt pageNum, + OptionalInt perPage, SortOrderDTO sortOrderDto) { - return collection.stream() - .sorted(sortOrderDto == SortOrderDTO.ASC ? comparator : comparator.reversed()) - .skip((long) (pageNum - 1) * perPage) - .limit(perPage); + Stream sorted = collection.stream() + .sorted(sortOrderDto == SortOrderDTO.ASC ? comparator : comparator.reversed()); + + if (pageNum.isPresent() && perPage.isPresent()) { + return sorted + .skip((long) (pageNum.getAsInt() - 1) * perPage.getAsInt()) + .limit(perPage.getAsInt()); + } else { + return sorted; + } } private Mono> describeConsumerGroups( @@ -304,8 +323,8 @@ private Mono> loadDescriptionsByInternalConsumerG ReactiveAdminClient ac, List groups, Comparator comparator, - int pageNum, - int perPage, + OptionalInt pageNum, + OptionalInt perPage, SortOrderDTO sortOrderDto) { var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList(); diff --git a/api/src/main/java/io/kafbat/ui/service/CsvWriterService.java b/api/src/main/java/io/kafbat/ui/service/CsvWriterService.java new file mode 100644 index 000000000..794455104 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/CsvWriterService.java @@ -0,0 +1,74 @@ +package io.kafbat.ui.service; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import de.siegmar.fastcsv.writer.CsvWriter; +import de.siegmar.fastcsv.writer.LineDelimiter; +import de.siegmar.fastcsv.writer.QuoteStrategies; +import io.kafbat.ui.config.ClustersProperties; +import java.io.StringWriter; +import java.util.List; +import java.util.Map; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class CsvWriterService { + private final ObjectMapper om; + private final ClustersProperties.Csv properties; + + public CsvWriterService(ObjectMapper om, + ClustersProperties properties) { + this.om = om; + this.properties = properties.getCsv(); + } + + private CsvWriter writer(StringWriter sw) { + return CsvWriter.builder() + .fieldSeparator(properties.getFieldSeparator()) + .quoteCharacter(properties.getQuoteCharacter()) + .quoteStrategy(QuoteStrategies.valueOf(properties.getQuoteStrategy().toUpperCase())) + .lineDelimiter(LineDelimiter.valueOf(properties.getLineDelimeter().toUpperCase())) + .build(sw); + } + + public Mono write(Flux items) { + return items.collectList().map(this::write); + } + + + public String write(List items) { + final StringWriter sw = new StringWriter(); + final CsvWriter writer = writer(sw); + + if (!items.isEmpty()) { + writer.writeRecord(mapHeader(items.getFirst())); + for (T item : items) { + writer.writeRecord(mapRecord(item)); + } + } + return sw.toString(); + } + + private List mapHeader(T item) { + JsonNode jsonNode = om.valueToTree(item); + if (jsonNode.isObject()) { + return jsonNode.properties().stream() + .map(Map.Entry::getKey) + .toList(); + } else { + return List.of(jsonNode.asText()); + } + } + + private List mapRecord(T item) { + JsonNode jsonNode = om.valueToTree(item); + if (jsonNode.isObject()) { + return jsonNode.properties().stream() + .map(Map.Entry::getValue) + .map(JsonNode::asText) + .toList(); + } else { + return List.of(jsonNode.asText()); + } + } +} diff --git a/api/src/main/java/io/kafbat/ui/service/TopicsService.java b/api/src/main/java/io/kafbat/ui/service/TopicsService.java index 4bb5a8fbf..d9e800b05 100644 --- a/api/src/main/java/io/kafbat/ui/service/TopicsService.java +++ b/api/src/main/java/io/kafbat/ui/service/TopicsService.java @@ -467,8 +467,7 @@ public Mono cloneTopic( ); } - public Mono> getTopicsForPagination(KafkaCluster cluster, String search, Boolean showInternal, - Boolean fts) { + public Mono> getTopics(KafkaCluster cluster, String search, Boolean showInternal, Boolean fts) { Statistics stats = statisticsCache.get(cluster); ScrapedClusterState clusterState = stats.getClusterState(); boolean useFts = clustersProperties.getFts().use(fts); diff --git a/api/src/main/java/io/kafbat/ui/service/acl/AclsService.java b/api/src/main/java/io/kafbat/ui/service/acl/AclsService.java index 9986d603e..a65f6eec8 100644 --- a/api/src/main/java/io/kafbat/ui/service/acl/AclsService.java +++ b/api/src/main/java/io/kafbat/ui/service/acl/AclsService.java @@ -86,12 +86,6 @@ private List filter(List acls, String principalSearch, B return filter.find(principalSearch); } - public Mono getAclAsCsvString(KafkaCluster cluster) { - return adminClientService.get(cluster) - .flatMap(c -> c.listAcls(ResourcePatternFilter.ANY)) - .map(AclCsv::transformToCsvString); - } - public Mono syncAclWithAclCsv(KafkaCluster cluster, String csv) { return adminClientService.get(cluster) .flatMap(ac -> ac.listAcls(ResourcePatternFilter.ANY).flatMap(existingAclList -> { diff --git a/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java b/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java index 0ffa5633f..8b858e6ba 100644 --- a/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java +++ b/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java @@ -110,9 +110,9 @@ private void init(Map topicsInCache) { when(reactiveAdminClient.listTopics(anyBoolean())).thenReturn(Mono.just(topicsInCache.keySet())); when(clustersStorage.getClusterByName(isA(String.class))) .thenReturn(Optional.of(kafkaCluster)); - when(mockTopicsService.getTopicsForPagination(isA(KafkaCluster.class), any(), any(), any())) + when(mockTopicsService.getTopics(isA(KafkaCluster.class), any(), any(), any())) .thenAnswer(a -> - topicsService.getTopicsForPagination( + topicsService.getTopics( a.getArgument(0), a.getArgument(1), a.getArgument(2), diff --git a/api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java b/api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java index 3c8334da5..198f03e9e 100644 --- a/api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java +++ b/api/src/test/java/io/kafbat/ui/service/mcp/McpSpecificationGeneratorTest.java @@ -43,7 +43,7 @@ void testConvertController() { List specifications = MCP_SPECIFICATION_GENERATOR.convertTool(topicsController); - assertThat(specifications).hasSize(15); + assertThat(specifications).hasSize(16); List tools = List.of( new McpSchema.Tool( "recreateTopic", diff --git a/contract-typespec/api/acls.tsp b/contract-typespec/api/acls.tsp index e3b8a41e8..74f7cffe9 100644 --- a/contract-typespec/api/acls.tsp +++ b/contract-typespec/api/acls.tsp @@ -24,7 +24,14 @@ interface AclApi { @summary("getAclAsCsv") @get @operationId("getAclAsCsv") - getAclAsCsv(@path clusterName: string): string; + getAclAsCsv( + @path clusterName: string, + @query resourceType?: KafkaAclResourceType, + @query resourceName?: string, + @query namePatternType?: KafkaAclNamePatternType, + @query search?: string, + @query fts?: boolean + ): CsvResponse; @route("/csv") @summary("syncAclsCsv") diff --git a/contract-typespec/api/brokers.tsp b/contract-typespec/api/brokers.tsp index 29a2ceca5..b19695fcb 100644 --- a/contract-typespec/api/brokers.tsp +++ b/contract-typespec/api/brokers.tsp @@ -13,6 +13,12 @@ interface BrokersApi { @summary("getBrokers") getBrokers(@path clusterName: string): Broker[]; + @get + @operationId("getBrokersCsv") + @summary("getBrokersCsv") + @route("/csv") + getBrokersCsv(@path clusterName: string): CsvResponse; + @get @route("/{id}/configs") @operationId("getBrokerConfig") diff --git a/contract-typespec/api/consumer-groups.tsp b/contract-typespec/api/consumer-groups.tsp index 8e13d588e..fb4860923 100644 --- a/contract-typespec/api/consumer-groups.tsp +++ b/contract-typespec/api/consumer-groups.tsp @@ -23,6 +23,20 @@ interface ConsumerGroupsApi { @query fts?: boolean ): ConsumerGroupsPageResponse; + @get + @route("/csv") + @operationId("getConsumerGroupsCsv") + @summary("getConsumerGroupsPage") + getConsumerGroupsCsv( + @path clusterName: string, + @query page?: int32, + @query perPage?: int32, + @query search?: string, + @query orderBy?: ConsumerGroupOrdering, + @query sortOrder?: SortOrder, + @query fts?: boolean + ): CsvResponse; + @get @route("/{id}") @operationId("getConsumerGroup") diff --git a/contract-typespec/api/kafka-connect.tsp b/contract-typespec/api/kafka-connect.tsp index aa45b4aef..db5632068 100644 --- a/contract-typespec/api/kafka-connect.tsp +++ b/contract-typespec/api/kafka-connect.tsp @@ -14,6 +14,12 @@ interface ConnectInstancesApi { @summary("getConnects") getConnects(@path clusterName: string, @query withStats?: boolean): Connect[]; + @get + @operationId("getConnectsCsv") + @summary("getConnects") + @route("/csv") + getConnectsCsv(@path clusterName: string, @query withStats?: boolean): CsvResponse; + @get @route("/{connectName}/plugins") @summary("get connector plugins") @@ -49,6 +55,18 @@ interface ConnectorsApi { @query sortOrder?: SortOrder, @query fts?: boolean ): FullConnectorInfo[]; + + @get + @operationId("getAllConnectorsCsv") + @summary("getAllConnectorsCsv") + @route("/csv") + getAllConnectorsCsv( + @path clusterName: string, + @query search?: string, + @query orderBy?: ConnectorColumnsToSort, + @query sortOrder?: SortOrder, + @query fts?: boolean + ): CsvResponse; } // /api/clusters/{clusterName}/connects/{connectName}/connectors diff --git a/contract-typespec/api/responses.tsp b/contract-typespec/api/responses.tsp index c784aff70..4befce8df 100644 --- a/contract-typespec/api/responses.tsp +++ b/contract-typespec/api/responses.tsp @@ -70,3 +70,8 @@ model FieldError { @doc("Field format violations description (ex. [\"size must be between 0 and 20\", \"must be a well-formed email address\"])") restrictions?: string[]; } + +model CsvResponse is Response<200> { + @header contentType: "text/csv"; + @body _: string; +} diff --git a/contract-typespec/api/topics.tsp b/contract-typespec/api/topics.tsp index 27b13c57a..d61b8190f 100644 --- a/contract-typespec/api/topics.tsp +++ b/contract-typespec/api/topics.tsp @@ -24,6 +24,19 @@ interface TopicsApi { @query fts?: boolean ): TopicsResponse; + @get + @operationId("getTopicsCsv") + @summary("getTopics") + @route("csv") + getTopicsCsv( + @path clusterName: string, + @query showInternal?: boolean, + @query search?: string, + @query orderBy?: TopicColumnsToSort, + @query sortOrder?: SortOrder, + @query fts?: boolean + ): CsvResponse; + @post @operationId("createTopic") @summary("createTopic") diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index df563b834..88b66b66b 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -135,3 +135,5 @@ snappy = {module = 'org.xerial.snappy:snappy-java', version = '1.1.10.7'} lucene = {module = 'org.apache.lucene:lucene-core', version.ref = 'lucene'} lucene-queryparser = {module = 'org.apache.lucene:lucene-queryparser', version.ref = 'lucene'} lucene-analysis-common = {module = 'org.apache.lucene:lucene-analysis-common', version.ref = 'lucene'} + +fastcsv = {module = 'de.siegmar:fastcsv', version = '4.1.0'}