Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 153 additions & 2 deletions src/loaders/utils/loaderUtils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@ import {
import { TEST_CCLOUD_FLINK_COMPUTE_POOL } from "../../../tests/unit/testResources/flinkComputePool";
import { TEST_CCLOUD_ORGANIZATION_ID } from "../../../tests/unit/testResources/organization";
import { createResponseError, createTestTopicData } from "../../../tests/unit/testUtils";
import { TopicV3Api } from "../../clients/kafkaRest";
import type { TopicData } from "../../clients/kafkaRest/models";
import { ConsumerGroupV3Api, TopicV3Api } from "../../clients/kafkaRest";
import type {
ConsumerData,
ConsumerDataList,
ConsumerGroupData,
ConsumerGroupDataList,
Relationship,
TopicData,
} from "../../clients/kafkaRest/models";
import type {
GetSchemaByVersionRequest,
Schema as ResponseSchema,
Expand All @@ -31,6 +38,9 @@ export const topicsResponseData: TopicData[] = [
createTestTopicData(TEST_LOCAL_KAFKA_CLUSTER.id, "topic3", ["READ", "WRITE"]),
createTestTopicData(TEST_LOCAL_KAFKA_CLUSTER.id, "topic4", ["READ", "WRITE"]),
];
// required for ConsumerGroupData/ConsumerGroupDataList but will be unused in the associated tests
const testMetadata = { self: "", resource_name: null };
const testRelationship: Relationship = { related: "" };

describe("loaderUtils.ts", () => {
let sandbox: sinon.SinonSandbox;
Expand Down Expand Up @@ -326,6 +336,147 @@ describe("loaderUtils.ts", () => {
});
});

describe("fetchConsumerGroups()", () => {
let stubbedSidecar: sinon.SinonStubbedInstance<sidecar.SidecarHandle>;
let stubbedClient: sinon.SinonStubbedInstance<ConsumerGroupV3Api>;

beforeEach(() => {
stubbedSidecar = getSidecarStub(sandbox);
stubbedClient = sandbox.createStubInstance(ConsumerGroupV3Api);
stubbedSidecar.getConsumerGroupV3Api.returns(stubbedClient);
});

it("should return a ConsumerGroupData array from listKafkaConsumerGroups", async () => {
const testGroups: ConsumerGroupData[] = [
{
kind: "KafkaConsumerGroup",
cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id,
consumer_group_id: "group-1",
is_simple: false,
partition_assignor: "range",
state: "Stable",
metadata: testMetadata,
coordinator: testRelationship,
lag_summary: testRelationship,
},
{
kind: "KafkaConsumerGroup",
cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id,
consumer_group_id: "group-2",
is_simple: false,
partition_assignor: "range",
state: "Empty",
metadata: testMetadata,
coordinator: testRelationship,
lag_summary: testRelationship,
},
];
const testResponse: ConsumerGroupDataList = {
kind: "KafkaConsumerGroupList",
data: testGroups,
metadata: testMetadata,
};
stubbedClient.listKafkaConsumerGroups.resolves(testResponse);

const result = await loaderUtils.fetchConsumerGroups(TEST_LOCAL_KAFKA_CLUSTER);

assert.strictEqual(result.length, 2);
assert.deepStrictEqual(result[0], testGroups[0]);
assert.deepStrictEqual(result[1], testGroups[1]);
sinon.assert.calledOnceWithExactly(stubbedClient.listKafkaConsumerGroups, {
cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id,
});
});

it("should return an empty array when no consumer groups exist", async () => {
stubbedClient.listKafkaConsumerGroups.resolves({
kind: "KafkaConsumerGroupList",
data: [],
metadata: testMetadata,
});

const result = await loaderUtils.fetchConsumerGroups(TEST_LOCAL_KAFKA_CLUSTER);

assert.strictEqual(result.length, 0);
});
});

describe("fetchConsumerGroupMembers()", () => {
let stubbedSidecar: sinon.SinonStubbedInstance<sidecar.SidecarHandle>;
let stubbedClient: sinon.SinonStubbedInstance<ConsumerGroupV3Api>;

const testGroupId = "test-group";

beforeEach(() => {
stubbedSidecar = getSidecarStub(sandbox);
stubbedClient = sandbox.createStubInstance(ConsumerGroupV3Api);
stubbedSidecar.getConsumerGroupV3Api.returns(stubbedClient);
});

it("should return a ConsumerData array from listKafkaConsumers", async () => {
const testConsumers: ConsumerData[] = [
{
kind: "KafkaConsumer",
cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id,
consumer_group_id: testGroupId,
consumer_id: "consumer-1",
client_id: "client-1",
instance_id: "instance-1",
metadata: testMetadata,
assignments: testRelationship,
},
{
kind: "KafkaConsumer",
cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id,
consumer_group_id: testGroupId,
consumer_id: "consumer-2",
client_id: "client-2",
instance_id: "instance-2",
metadata: testMetadata,
assignments: testRelationship,
},
];
const testResponse: ConsumerDataList = {
kind: "KafkaConsumerList",
data: testConsumers,
metadata: testMetadata,
};
stubbedClient.listKafkaConsumers.resolves(testResponse);

const result = await loaderUtils.fetchConsumerGroupMembers(
TEST_LOCAL_KAFKA_CLUSTER,
testGroupId,
);

assert.strictEqual(result.length, 2);
assert.deepStrictEqual(result[0], testConsumers[0]);
assert.deepStrictEqual(result[1], testConsumers[1]);
sinon.assert.calledOnceWithExactly(stubbedClient.listKafkaConsumers, {
cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id,
consumer_group_id: testGroupId,
});
});

it("should return an empty array when a consumer group has no (active) consumers/members", async () => {
stubbedClient.listKafkaConsumers.resolves({
kind: "KafkaConsumerList",
metadata: testMetadata,
data: [],
});

const result = await loaderUtils.fetchConsumerGroupMembers(
TEST_LOCAL_KAFKA_CLUSTER,
testGroupId,
);

assert.strictEqual(result.length, 0);
sinon.assert.calledOnceWithExactly(stubbedClient.listKafkaConsumers, {
cluster_id: TEST_LOCAL_KAFKA_CLUSTER.id,
consumer_group_id: testGroupId,
});
});
});

describe("generateFlinkStatementKey()", () => {
const mainStatementParams: IFlinkStatementSubmitParameters = {
statement: "SHOW USER FUNCTIONS",
Expand Down
58 changes: 57 additions & 1 deletion src/loaders/utils/loaderUtils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import { createHash } from "crypto";
import { toKafkaTopicOperations } from "../../authz/types";
import type { TopicData, TopicDataList, TopicV3Api } from "../../clients/kafkaRest";
import type {
ConsumerData,
ConsumerDataList,
ConsumerGroupData,
ConsumerGroupDataList,
ConsumerGroupV3Api,
TopicData,
TopicDataList,
TopicV3Api,
} from "../../clients/kafkaRest";
import type { Schema as ResponseSchema, SubjectsV1Api } from "../../clients/schemaRegistryRest";
import { isResponseError } from "../../errors";
import type { IFlinkStatementSubmitParameters } from "../../flinkSql/statementUtils";
Expand Down Expand Up @@ -82,6 +91,53 @@ export async function fetchTopics(cluster: KafkaCluster): Promise<TopicData[]> {
return topicsResp.data;
}

/** Fetch all consumer groups for a given Kafka cluster. */
export async function fetchConsumerGroups(cluster: KafkaCluster): Promise<ConsumerGroupData[]> {
logger.debug(
`fetching consumer groups for ${cluster.connectionType} Kafka cluster ${cluster.id}`,
);

const sidecar = await getSidecar();
const client: ConsumerGroupV3Api = sidecar.getConsumerGroupV3Api(
cluster.id,
cluster.connectionId,
);
const response: ConsumerGroupDataList = await client.listKafkaConsumerGroups({
cluster_id: cluster.id,
});
const consumerGroups: ConsumerGroupData[] = response.data;

logger.debug(
`fetched ${consumerGroups.length} consumer group(s) for ${cluster.connectionType} Kafka cluster ${cluster.id}`,
);
return consumerGroups;
}

/** Fetch the members (consumers) of a given Kafka cluster's consumer group ID. */
export async function fetchConsumerGroupMembers(
cluster: KafkaCluster,
consumerGroupId: string,
): Promise<ConsumerData[]> {
logger.debug(
`fetching members for consumer group ${consumerGroupId} in ${cluster.connectionType} Kafka cluster ${cluster.id}`,
);

const sidecar = await getSidecar();
const client: ConsumerGroupV3Api = sidecar.getConsumerGroupV3Api(
cluster.id,
cluster.connectionId,
);
const response: ConsumerDataList = await client.listKafkaConsumers({
cluster_id: cluster.id,
consumer_group_id: consumerGroupId,
});

logger.debug(
`fetched ${response.data.length} member(s) for consumer group ${consumerGroupId} in ${cluster.connectionType} Kafka cluster ${cluster.id}`,
);
return response.data;
}

/**
* Convert an array of {@link TopicData} to an array of {@link KafkaTopic}
* and set whether or not each topic has a matching schema by subject.
Expand Down
17 changes: 17 additions & 0 deletions src/sidecar/sidecarHandle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
import {
ConfigsV3Api,
Configuration as KafkaRestConfiguration,
ConsumerGroupV3Api,
PartitionV3Api,
RecordsV3Api,
TopicV3Api,
Expand Down Expand Up @@ -269,6 +270,22 @@ export class SidecarHandle {
return new TopicV3Api(config);
}

/**
* Creates and returns a (Kafka v3 REST OpenAPI spec) {@link ConsumerGroupV3Api} client instance
* with a preconfigured {@link KafkaRestConfiguration}.
*/
public getConsumerGroupV3Api(clusterId: string, connectionId: ConnectionId): ConsumerGroupV3Api {
const config = new KafkaRestConfiguration({
...this.defaultClientConfigParams,
headers: {
...this.defaultClientConfigParams.headers,
[CLUSTER_ID_HEADER]: clusterId,
[SIDECAR_CONNECTION_ID_HEADER]: connectionId,
},
});
return new ConsumerGroupV3Api(config);
}

public getConfigsV3Api(clusterId: string, connectionId: ConnectionId): ConfigsV3Api {
const config = new KafkaRestConfiguration({
...this.defaultClientConfigParams,
Expand Down