From ea973f24e07549d176d3138bd88ada1682c58e1c Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Thu, 22 Jan 2026 11:03:03 -0500 Subject: [PATCH 1/3] add `getConsumerGroupV3Api` sidecar handle method --- src/sidecar/sidecarHandle.ts | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/sidecar/sidecarHandle.ts b/src/sidecar/sidecarHandle.ts index 2fbafa1480..18f3e3afa5 100644 --- a/src/sidecar/sidecarHandle.ts +++ b/src/sidecar/sidecarHandle.ts @@ -21,6 +21,7 @@ import { import { ConfigsV3Api, Configuration as KafkaRestConfiguration, + ConsumerGroupV3Api, PartitionV3Api, RecordsV3Api, TopicV3Api, @@ -269,6 +270,22 @@ export class SidecarHandle { return new TopicV3Api(config); } + /** + * Creates and returns a (Kafka v3 REST OpenAPI spec) {@link ConsumerGroupV3Api} client instance + * with a preconfigured {@link KafkaRestConfiguration}. + */ + public getConsumerGroupV3Api(clusterId: string, connectionId: ConnectionId): ConsumerGroupV3Api { + const config = new KafkaRestConfiguration({ + ...this.defaultClientConfigParams, + headers: { + ...this.defaultClientConfigParams.headers, + [CLUSTER_ID_HEADER]: clusterId, + [SIDECAR_CONNECTION_ID_HEADER]: connectionId, + }, + }); + return new ConsumerGroupV3Api(config); + } + public getConfigsV3Api(clusterId: string, connectionId: ConnectionId): ConfigsV3Api { const config = new KafkaRestConfiguration({ ...this.defaultClientConfigParams, From 9aa94ba926cc045110742d259f0057de7050de6d Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Thu, 22 Jan 2026 11:04:46 -0500 Subject: [PATCH 2/3] add fetch functions for consumer groups and members/consumers --- src/loaders/utils/loaderUtils.ts | 58 +++++++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/src/loaders/utils/loaderUtils.ts b/src/loaders/utils/loaderUtils.ts index a09fbeedfd..9fb070dda9 100644 --- a/src/loaders/utils/loaderUtils.ts +++ b/src/loaders/utils/loaderUtils.ts @@ -1,6 +1,15 @@ import { createHash } from "crypto"; import { toKafkaTopicOperations } from "../../authz/types"; -import type { TopicData, TopicDataList, TopicV3Api } from "../../clients/kafkaRest"; +import type { + ConsumerData, + ConsumerDataList, + ConsumerGroupData, + ConsumerGroupDataList, + ConsumerGroupV3Api, + TopicData, + TopicDataList, + TopicV3Api, +} from "../../clients/kafkaRest"; import type { Schema as ResponseSchema, SubjectsV1Api } from "../../clients/schemaRegistryRest"; import { isResponseError } from "../../errors"; import type { IFlinkStatementSubmitParameters } from "../../flinkSql/statementUtils"; @@ -82,6 +91,53 @@ export async function fetchTopics(cluster: KafkaCluster): Promise { return topicsResp.data; } +/** Fetch all consumer groups for a given Kafka cluster. */ +export async function fetchConsumerGroups(cluster: KafkaCluster): Promise { + logger.debug( + `fetching consumer groups for ${cluster.connectionType} Kafka cluster ${cluster.id}`, + ); + + const sidecar = await getSidecar(); + const client: ConsumerGroupV3Api = sidecar.getConsumerGroupV3Api( + cluster.id, + cluster.connectionId, + ); + const response: ConsumerGroupDataList = await client.listKafkaConsumerGroups({ + cluster_id: cluster.id, + }); + const consumerGroups: ConsumerGroupData[] = response.data; + + logger.debug( + `fetched ${consumerGroups.length} consumer group(s) for ${cluster.connectionType} Kafka cluster ${cluster.id}`, + ); + return consumerGroups; +} + +/** Fetch the members (consumers) of a given Kafka cluster's consumer group ID. */ +export async function fetchConsumerGroupMembers( + cluster: KafkaCluster, + consumerGroupId: string, +): Promise { + logger.debug( + `fetching members for consumer group ${consumerGroupId} in ${cluster.connectionType} Kafka cluster ${cluster.id}`, + ); + + const sidecar = await getSidecar(); + const client: ConsumerGroupV3Api = sidecar.getConsumerGroupV3Api( + cluster.id, + cluster.connectionId, + ); + const response: ConsumerDataList = await client.listKafkaConsumers({ + cluster_id: cluster.id, + consumer_group_id: consumerGroupId, + }); + + logger.debug( + `fetched ${response.data.length} member(s) for consumer group ${consumerGroupId} in ${cluster.connectionType} Kafka cluster ${cluster.id}`, + ); + return response.data; +} + /** * Convert an array of {@link TopicData} to an array of {@link KafkaTopic} * and set whether or not each topic has a matching schema by subject. From eefd75169519d2402da05fb2e6076d49596f3346 Mon Sep 17 00:00:00 2001 From: Dave Shoup Date: Thu, 22 Jan 2026 16:09:24 -0500 Subject: [PATCH 3/3] add basic tests for fetchConsumerGroups and fetchConsumerGroupMemebers --- src/loaders/utils/loaderUtils.test.ts | 155 +++++++++++++++++++++++++- 1 file changed, 153 insertions(+), 2 deletions(-) diff --git a/src/loaders/utils/loaderUtils.test.ts b/src/loaders/utils/loaderUtils.test.ts index bd5f4ad4cd..21b7465c55 100644 --- a/src/loaders/utils/loaderUtils.test.ts +++ b/src/loaders/utils/loaderUtils.test.ts @@ -10,8 +10,15 @@ import { import { TEST_CCLOUD_FLINK_COMPUTE_POOL } from "../../../tests/unit/testResources/flinkComputePool"; import { TEST_CCLOUD_ORGANIZATION_ID } from "../../../tests/unit/testResources/organization"; import { createResponseError, createTestTopicData } from "../../../tests/unit/testUtils"; -import { TopicV3Api } from "../../clients/kafkaRest"; -import type { TopicData } from "../../clients/kafkaRest/models"; +import { ConsumerGroupV3Api, TopicV3Api } from "../../clients/kafkaRest"; +import type { + ConsumerData, + ConsumerDataList, + ConsumerGroupData, + ConsumerGroupDataList, + Relationship, + TopicData, +} from "../../clients/kafkaRest/models"; import type { GetSchemaByVersionRequest, Schema as ResponseSchema, @@ -31,6 +38,9 @@ export const topicsResponseData: TopicData[] = [ createTestTopicData(TEST_LOCAL_KAFKA_CLUSTER.id, "topic3", ["READ", "WRITE"]), createTestTopicData(TEST_LOCAL_KAFKA_CLUSTER.id, "topic4", ["READ", "WRITE"]), ]; +// required for ConsumerGroupData/ConsumerGroupDataList but will be unused in the associated tests +const testMetadata = { self: "", resource_name: null }; +const testRelationship: Relationship = { related: "" }; describe("loaderUtils.ts", () => { let sandbox: sinon.SinonSandbox; @@ -326,6 +336,147 @@ describe("loaderUtils.ts", () => { }); }); + describe("fetchConsumerGroups()", () => { + let stubbedSidecar: sinon.SinonStubbedInstance; + let stubbedClient: sinon.SinonStubbedInstance; + + beforeEach(() => { + stubbedSidecar = getSidecarStub(sandbox); + stubbedClient = sandbox.createStubInstance(ConsumerGroupV3Api); + stubbedSidecar.getConsumerGroupV3Api.returns(stubbedClient); + }); + + it("should return a ConsumerGroupData array from listKafkaConsumerGroups", async () => { + const testGroups: ConsumerGroupData[] = [ + { + kind: "KafkaConsumerGroup", + cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id, + consumer_group_id: "group-1", + is_simple: false, + partition_assignor: "range", + state: "Stable", + metadata: testMetadata, + coordinator: testRelationship, + lag_summary: testRelationship, + }, + { + kind: "KafkaConsumerGroup", + cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id, + consumer_group_id: "group-2", + is_simple: false, + partition_assignor: "range", + state: "Empty", + metadata: testMetadata, + coordinator: testRelationship, + lag_summary: testRelationship, + }, + ]; + const testResponse: ConsumerGroupDataList = { + kind: "KafkaConsumerGroupList", + data: testGroups, + metadata: testMetadata, + }; + stubbedClient.listKafkaConsumerGroups.resolves(testResponse); + + const result = await loaderUtils.fetchConsumerGroups(TEST_LOCAL_KAFKA_CLUSTER); + + assert.strictEqual(result.length, 2); + assert.deepStrictEqual(result[0], testGroups[0]); + assert.deepStrictEqual(result[1], testGroups[1]); + sinon.assert.calledOnceWithExactly(stubbedClient.listKafkaConsumerGroups, { + cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id, + }); + }); + + it("should return an empty array when no consumer groups exist", async () => { + stubbedClient.listKafkaConsumerGroups.resolves({ + kind: "KafkaConsumerGroupList", + data: [], + metadata: testMetadata, + }); + + const result = await loaderUtils.fetchConsumerGroups(TEST_LOCAL_KAFKA_CLUSTER); + + assert.strictEqual(result.length, 0); + }); + }); + + describe("fetchConsumerGroupMembers()", () => { + let stubbedSidecar: sinon.SinonStubbedInstance; + let stubbedClient: sinon.SinonStubbedInstance; + + const testGroupId = "test-group"; + + beforeEach(() => { + stubbedSidecar = getSidecarStub(sandbox); + stubbedClient = sandbox.createStubInstance(ConsumerGroupV3Api); + stubbedSidecar.getConsumerGroupV3Api.returns(stubbedClient); + }); + + it("should return a ConsumerData array from listKafkaConsumers", async () => { + const testConsumers: ConsumerData[] = [ + { + kind: "KafkaConsumer", + cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id, + consumer_group_id: testGroupId, + consumer_id: "consumer-1", + client_id: "client-1", + instance_id: "instance-1", + metadata: testMetadata, + assignments: testRelationship, + }, + { + kind: "KafkaConsumer", + cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id, + consumer_group_id: testGroupId, + consumer_id: "consumer-2", + client_id: "client-2", + instance_id: "instance-2", + metadata: testMetadata, + assignments: testRelationship, + }, + ]; + const testResponse: ConsumerDataList = { + kind: "KafkaConsumerList", + data: testConsumers, + metadata: testMetadata, + }; + stubbedClient.listKafkaConsumers.resolves(testResponse); + + const result = await loaderUtils.fetchConsumerGroupMembers( + TEST_LOCAL_KAFKA_CLUSTER, + testGroupId, + ); + + assert.strictEqual(result.length, 2); + assert.deepStrictEqual(result[0], testConsumers[0]); + assert.deepStrictEqual(result[1], testConsumers[1]); + sinon.assert.calledOnceWithExactly(stubbedClient.listKafkaConsumers, { + cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id, + consumer_group_id: testGroupId, + }); + }); + + it("should return an empty array when a consumer group has no (active) consumers/members", async () => { + stubbedClient.listKafkaConsumers.resolves({ + kind: "KafkaConsumerList", + metadata: testMetadata, + data: [], + }); + + const result = await loaderUtils.fetchConsumerGroupMembers( + TEST_LOCAL_KAFKA_CLUSTER, + testGroupId, + ); + + assert.strictEqual(result.length, 0); + sinon.assert.calledOnceWithExactly(stubbedClient.listKafkaConsumers, { + cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id, + consumer_group_id: testGroupId, + }); + }); + }); + describe("generateFlinkStatementKey()", () => { const mainStatementParams: IFlinkStatementSubmitParameters = { statement: "SHOW USER FUNCTIONS",