From 19aeb503d8284860f21f1a2b1fdbadcbb25fe63c Mon Sep 17 00:00:00 2001 From: Benjamin Thuillier Date: Wed, 27 May 2026 14:51:38 +0200 Subject: [PATCH 1/2] add new command for connector, and regenerate console.json --- cmd/multiple_flags.go | 24 +- cmd/multiple_flags_test.go | 31 +- cmd/run.go | 8 +- pkg/client/console_client_run_test.go | 159 ++++++ pkg/client/console_client_test.go | 46 ++ pkg/schema/default_schema/console.json | 568 +++++++++++++++++++++- pkg/schema/openapi_parser.go | 13 +- pkg/schema/openapi_parser_console_test.go | 73 +++ pkg/schema/testdata/connector_run.yaml | 152 ++++++ 9 files changed, 1064 insertions(+), 10 deletions(-) create mode 100644 pkg/client/console_client_run_test.go create mode 100644 pkg/schema/testdata/connector_run.yaml diff --git a/cmd/multiple_flags.go b/cmd/multiple_flags.go index d35541a..06a59e1 100644 --- a/cmd/multiple_flags.go +++ b/cmd/multiple_flags.go @@ -1,6 +1,8 @@ package cmd import ( + "encoding/json" + "fmt" "strconv" "github.com/conduktor/ctl/internal/utils" @@ -31,6 +33,10 @@ func NewMultipleFlags(command *cobra.Command, flagParams map[string]schema.FlagP isFlagSet = true defaultValue := 0 result[key] = command.Flags().Int(flag.FlagName, defaultValue, usage) + } else if flag.Type == "json" { + isFlagSet = true + defaultValue := "" + result[key] = command.Flags().String(flag.FlagName, defaultValue, "JSON value") } else if utils.CdkDebug() { println("Unknown flag type: " + flag.Type) } @@ -46,14 +52,26 @@ func NewMultipleFlags(command *cobra.Command, flagParams map[string]schema.FlagP } } -func (m *MultipleFlags) ExtractFlagValueForBodyParam() map[string]interface{} { +func (m *MultipleFlags) ExtractFlagValueForBodyParam() (map[string]interface{}, error) { bodyParams := make(map[string]interface{}) for key, value := range m.result { if value != nil && m.flagSetByUser(key) { - bodyParams[key] = value + if m.flagParams[key].Type == "json" { + str, ok := value.(*string) + if !ok { + panic("Expected json flag " + key + " to be a *string") + } + var decoded interface{} + if err := json.Unmarshal([]byte(*str), &decoded); err != nil { + return nil, fmt.Errorf("invalid JSON for flag --%s: %w", m.flagParams[key].FlagName, err) + } + bodyParams[key] = decoded + } else { + bodyParams[key] = value + } } } - return bodyParams + return bodyParams, nil } func (m *MultipleFlags) ExtractFlagValueForQueryParam() map[string]string { diff --git a/cmd/multiple_flags_test.go b/cmd/multiple_flags_test.go index 37cc5f7..f129890 100644 --- a/cmd/multiple_flags_test.go +++ b/cmd/multiple_flags_test.go @@ -68,6 +68,9 @@ func TestExtractFlagValueForBodyParam(t *testing.T) { "intParam": {FlagName: "intParam", Type: "integer"}, "notSetInt": {FlagName: "notSetInt", Type: "integer"}, "zeroParam": {FlagName: "zeroParam", Type: "integer"}, + "jsonArray": {FlagName: "jsonArray", Type: "json"}, + "jsonObject": {FlagName: "jsonObject", Type: "json"}, + "notSetJson": {FlagName: "notSetJson", Type: "json"}, }) multipleFlags.result = map[string]interface{}{ "stringParam": func() *string { s := "test"; return &s }(), @@ -79,6 +82,9 @@ func TestExtractFlagValueForBodyParam(t *testing.T) { "intParam": func() *int { i := 123; return &i }(), "notSetInt": func() *int { i := 0; return &i }(), "zeroParam": func() *int { i := 0; return &i }(), + "jsonArray": func() *string { s := `[{"partition":1,"offset":42}]`; return &s }(), + "jsonObject": func() *string { s := `{"key":"value"}`; return &s }(), + "notSetJson": func() *string { s := ""; return &s }(), } expected := map[string]interface{}{ @@ -88,6 +94,8 @@ func TestExtractFlagValueForBodyParam(t *testing.T) { "boolParamFalse": func() *bool { b := false; return &b }(), "intParam": func() *int { i := 123; return &i }(), "zeroParam": func() *int { i := 0; return &i }(), + "jsonArray": []interface{}{map[string]interface{}{"partition": float64(1), "offset": float64(42)}}, + "jsonObject": map[string]interface{}{"key": "value"}, } command.Flags().Lookup("stringParam").Changed = true @@ -96,9 +104,30 @@ func TestExtractFlagValueForBodyParam(t *testing.T) { command.Flags().Lookup("intParam").Changed = true command.Flags().Lookup("zeroParam").Changed = true command.Flags().Lookup("emptyString").Changed = true - result := multipleFlags.ExtractFlagValueForBodyParam() + command.Flags().Lookup("jsonArray").Changed = true + command.Flags().Lookup("jsonObject").Changed = true + result, err := multipleFlags.ExtractFlagValueForBodyParam() + if err != nil { + t.Fatalf("unexpected error: %s", err) + } if !reflect.DeepEqual(result, expected) { t.Error(spew.Printf("got %v, want %v", result, expected)) } } + +func TestExtractFlagValueForBodyParamInvalidJSON(t *testing.T) { + command := &cobra.Command{} + multipleFlags := NewMultipleFlags(command, map[string]schema.FlagParameterOption{ + "jsonParam": {FlagName: "jsonParam", Type: "json"}, + }) + multipleFlags.result = map[string]interface{}{ + "jsonParam": func() *string { s := "not-json"; return &s }(), + } + command.Flags().Lookup("jsonParam").Changed = true + + _, err := multipleFlags.ExtractFlagValueForBodyParam() + if err == nil { + t.Error("expected an error for invalid JSON, got nil") + } +} diff --git a/cmd/run.go b/cmd/run.go index 160de42..2328424 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -62,13 +62,15 @@ func initRun(runs schema.RunCatalog) { Run: func(cmd *cobra.Command, args []string) { pathValues := make([]string, len(pathFlagValues)) queryParams := multipleFlagsForQuery.ExtractFlagValueForQueryParam() - body := multipleFlagsForBody.ExtractFlagValueForBodyParam() + body, err := multipleFlagsForBody.ExtractFlagValueForBodyParam() + if err != nil { + fmt.Fprintf(os.Stderr, "%s\n", err) + return + } for i, v := range pathFlagValues { pathValues[i] = *v } - var err error - if len(bodyFlags) == 0 { body = nil } diff --git a/pkg/client/console_client_run_test.go b/pkg/client/console_client_run_test.go new file mode 100644 index 0000000..ccc91b0 --- /dev/null +++ b/pkg/client/console_client_run_test.go @@ -0,0 +1,159 @@ +package client + +import ( + "strings" + "testing" + + "github.com/conduktor/ctl/pkg/schema" + "github.com/jarcoal/httpmock" +) + +func makeMockedClient(t *testing.T) *Client { + t.Helper() + client, err := Make(APIParameter{ + APIKey: "aToken", + BaseURL: "http://baseUrl", + }) + if err != nil { + t.Fatalf("failed creating client: %s", err) + } + client.setAuthMethodFromEnvIfNeeded() + httpmock.ActivateNonDefault(client.client.GetClient()) + return client +} + +var connectorOffsetsRun = schema.Run{ + BackendType: schema.CONSOLE, + Name: "connectorAlterOffsets", + Path: "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/offsets", + PathParameter: []string{"cluster", "connectCluster", "connector-name"}, + QueryParameter: map[string]schema.FlagParameterOption{}, + BodyFields: map[string]schema.FlagParameterOption{ + "offsets": {FlagName: "offsets", Type: "json"}, + }, + Method: "PATCH", +} + +func connectorStopRun() schema.Run { + return schema.Run{ + BackendType: schema.CONSOLE, + Name: "connectorStop", + Path: "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/stop", + PathParameter: []string{"cluster", "connectCluster", "connector-name"}, + Method: "PUT", + } +} + +func TestRunConnectorStop(t *testing.T) { + defer httpmock.Reset() + client := makeMockedClient(t) + + httpmock.RegisterMatcherResponderWithQuery( + "PUT", + "http://baseUrl/api/public/kafka/v2/cluster/my-cluster/connect/my-connect/connector/my-connector/stop", + nil, + httpmock.HeaderIs("Authorization", "Bearer aToken"), + httpmock.NewStringResponder(204, ""), + ) + + _, err := client.Run(connectorStopRun(), []string{"my-cluster", "my-connect", "my-connector"}, map[string]string{}, nil) + if err != nil { + t.Errorf("expected no error, got: %s", err) + } +} + +func TestRunConnectorGetOffsets(t *testing.T) { + defer httpmock.Reset() + client := makeMockedClient(t) + + run := schema.Run{ + BackendType: schema.CONSOLE, + Name: "connectorGetOffsets", + Path: "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/offsets", + PathParameter: []string{"cluster", "connectCluster", "connector-name"}, + Method: "GET", + } + httpmock.RegisterResponder( + "GET", + "http://baseUrl/api/public/kafka/v2/cluster/my-cluster/connect/my-connect/connector/my-connector/offsets", + httpmock.NewStringResponder(200, `{"offsets":[]}`), + ) + + body, err := client.Run(run, []string{"my-cluster", "my-connect", "my-connector"}, map[string]string{}, nil) + if err != nil { + t.Errorf("expected no error, got: %s", err) + } + if string(body) != `{"offsets":[]}` { + t.Errorf("unexpected body: %s", body) + } +} + +func TestRunConnectorResetOffsets(t *testing.T) { + defer httpmock.Reset() + client := makeMockedClient(t) + + run := schema.Run{ + BackendType: schema.CONSOLE, + Name: "connectorResetOffsets", + Path: "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/offsets", + PathParameter: []string{"cluster", "connectCluster", "connector-name"}, + Method: "DELETE", + } + httpmock.RegisterResponder( + "DELETE", + "http://baseUrl/api/public/kafka/v2/cluster/my-cluster/connect/my-connect/connector/my-connector/offsets", + httpmock.NewStringResponder(200, ""), + ) + + _, err := client.Run(run, []string{"my-cluster", "my-connect", "my-connector"}, map[string]string{}, nil) + if err != nil { + t.Errorf("expected no error, got: %s", err) + } +} + +func TestRunConnectorAlterOffsetsSendsJSONBody(t *testing.T) { + defer httpmock.Reset() + client := makeMockedClient(t) + + // The decoded JSON value the CLI would pass after parsing --offsets. + body := map[string]interface{}{ + "offsets": []interface{}{ + map[string]interface{}{"partition": map[string]interface{}{"kafka_topic": "t"}, "offset": map[string]interface{}{"kafka_offset": 42}}, + }, + } + + httpmock.RegisterMatcherResponderWithQuery( + "PATCH", + "http://baseUrl/api/public/kafka/v2/cluster/my-cluster/connect/my-connect/connector/my-connector/offsets", + nil, + httpmock.BodyContainsString(`"offsets"`). + And(httpmock.BodyContainsString(`"kafka_topic":"t"`)), + httpmock.NewStringResponder(200, ""), + ) + + _, err := client.Run(connectorOffsetsRun, []string{"my-cluster", "my-connect", "my-connector"}, map[string]string{}, body) + if err != nil { + t.Errorf("expected no error, got: %s", err) + } +} + +// The API returns 400 when the connector is not STOPPED; the CLI must surface +// the API's message rather than a generic failure. +func TestRunConnectorAlterOffsetsNotStoppedError(t *testing.T) { + defer httpmock.Reset() + client := makeMockedClient(t) + + httpmock.RegisterResponder( + "PATCH", + "http://baseUrl/api/public/kafka/v2/cluster/my-cluster/connect/my-connect/connector/my-connector/offsets", + httpmock.NewStringResponder(400, `{"title":"The request is invalid (e.g. the connector is not in STOPPED state)"}`), + ) + + _, err := client.Run(connectorOffsetsRun, []string{"my-cluster", "my-connect", "my-connector"}, map[string]string{}, map[string]interface{}{"offsets": []interface{}{}}) + if err == nil { + t.Fatal("expected an error, got nil") + } + if !strings.Contains(err.Error(), "not in STOPPED state") { + t.Errorf("expected error to surface the API message, got: %s", err) + } +} diff --git a/pkg/client/console_client_test.go b/pkg/client/console_client_test.go index 2c1c835..fc7fbb1 100644 --- a/pkg/client/console_client_test.go +++ b/pkg/client/console_client_test.go @@ -78,6 +78,52 @@ func TestApplyShouldWork(t *testing.T) { } } +// initialState lives in the Connector spec and is sent through unchanged when +// applying a connector resource: the CLI needs no special handling for it. +func TestApplyConnectorForwardsInitialState(t *testing.T) { + defer httpmock.Reset() + baseURL := "http://baseUrl" + apiKey := "aToken" + client, err := Make(APIParameter{ + APIKey: apiKey, + BaseURL: baseURL, + }) + if err != nil { + panic(err) + } + client.setAuthMethodFromEnvIfNeeded() + httpmock.ActivateNonDefault(client.client.GetClient()) + responder := httpmock.NewStringResponder(200, `{"upsertResult": "Created"}`) + + connectorJSON := []byte(`{"apiVersion":"v2","kind":"Connector","metadata":{"name":"my-connector","cluster":"local","connectCluster":"my-connect"},"spec":{"config":{},"initialState":"STOPPED"}}`) + connector := resource.Resource{ + Json: connectorJSON, + Kind: "Connector", + Name: "my-connector", + Version: "v2", + Metadata: map[string]interface{}{ + "cluster": "local", + "connectCluster": "my-connect", + }, + } + + httpmock.RegisterMatcherResponderWithQuery( + "PUT", + "http://baseUrl/api/public/kafka/v2/cluster/local/connect/my-connect/connector", + nil, + httpmock.BodyContainsString(`"initialState":"STOPPED"`), + responder, + ) + + body, err := client.Apply(&connector, false, false) + if err != nil { + t.Error(err) + } + if body.UpsertResult != "Created" { + t.Errorf("Bad result expected Created got: %s", body) + } +} + func TestApplyShouldWorkWithExternalAuthMode(t *testing.T) { defer httpmock.Reset() baseURL := "http://baseUrl" diff --git a/pkg/schema/default_schema/console.json b/pkg/schema/default_schema/console.json index 3463b04..69dbe95 100644 --- a/pkg/schema/default_schema/console.json +++ b/pkg/schema/default_schema/console.json @@ -1 +1,567 @@ -{"Kind":{"Alert":{"Versions":{"2":{"ListPath":"/public/monitoring/v2/cluster/{cluster}/alert","Name":"Alert","ParentPathParam":["cluster"],"ParentQueryParam":null,"ListQueryParameter":{"alertType":{"FlagName":"alert-type","Required":false,"Type":"string"},"connect":{"FlagName":"connect","Required":false,"Type":"string"},"connector":{"FlagName":"connector","Required":false,"Type":"string"},"consumerGroup":{"FlagName":"consumer-group","Required":false,"Type":"string"},"topic":{"FlagName":"topic","Required":false,"Type":"string"}},"ApplyExample":"apiVersion: v2\nkind: Alert\nmetadata:\n name: alert\n cluster: cluster\nspec:\n connectName: connect\n connectorName: connector\n threshold: 1\n operator: GreaterThan\n metric: FailedTaskCount\n type: KafkaConnectAlert\n","Order":19},"3":{"ListPath":"/public/monitoring/v3/alert","Name":"Alert","ParentPathParam":[],"ParentQueryParam":["appInstance","group","user"],"ListQueryParameter":{"alertType":{"FlagName":"alert-type","Required":false,"Type":"string"},"appInstance":{"FlagName":"application-instance","Required":false,"Type":"string"},"cluster":{"FlagName":"cluster","Required":false,"Type":"string"},"connect":{"FlagName":"connect","Required":false,"Type":"string"},"connector":{"FlagName":"connector","Required":false,"Type":"string"},"consumerGroup":{"FlagName":"consumer-group","Required":false,"Type":"string"},"group":{"FlagName":"group","Required":false,"Type":"string"},"topic":{"FlagName":"topic","Required":false,"Type":"string"},"user":{"FlagName":"user","Required":false,"Type":"string"}},"ApplyExample":"apiVersion: v3\nkind: Alert\nmetadata:\n name: alert\n appInstance: my-app\nspec:\n cluster: cluster\n connectName: connect\n connectorName: connector\n threshold: 1\n operator: GreaterThan\n metric: FailedTaskCount\n destination:\n channel: test\n type: Slack\n type: KafkaConnectAlert\n","Order":19}}},"Application":{"Versions":{"1":{"ListPath":"/public/self-serve/v1/application","Name":"Application","ParentPathParam":[],"ParentQueryParam":null,"ListQueryParameter":{},"ApplyExample":"apiVersion: v1\nkind: Application\nmetadata:\n name: my-application\n labels:\n business-unit: delivery\nspec:\n title: My Application\n owner: me\n policyRef:\n - my-policy\n","Order":7}}},"ApplicationGroup":{"Versions":{"1":{"ListPath":"/public/self-serve/v1/application-group","Name":"ApplicationGroup","ParentPathParam":[],"ParentQueryParam":null,"ListQueryParameter":{"application":{"FlagName":"application","Required":false,"Type":"string"}},"ApplyExample":"apiVersion: v1\nkind: ApplicationGroup\nmetadata:\n application: clickstream-app\n name: clickstream-support\nspec:\n displayName: Support Clickstream\n description: |-\n Members of the Support Group are allowed:\n Read access on all the resources\n Can restart owned connectors\n Can reset offsets\n permissions:\n - appInstance: clickstream-app-dev\n patternType: LITERAL\n name: '*'\n permissions:\n - topicConsume\n - topicViewConfig\n resourceType: TOPIC\n members:\n - user1@company.org\n - user2@company.org\n externalGroups:\n - support\n externalGroupRegex:\n - support*\n","Order":10}}},"ApplicationInstance":{"Versions":{"1":{"ListPath":"/public/self-serve/v1/application-instance","Name":"ApplicationInstance","ParentPathParam":[],"ParentQueryParam":null,"ListQueryParameter":{"application":{"FlagName":"application","Required":false,"Type":"string"}},"ApplyExample":"apiVersion: v1\nkind: ApplicationInstance\nmetadata:\n name: my-app-instance-prod\n application: my-app\n labels:\n env: prod\n team: my-team\nspec:\n cluster: prod-cluster\n topicPolicyRef:\n - my-topic-policy\n policyRef:\n - my-policy\n resources:\n - type: TOPIC\n name: my-topic\n patternType: LITERAL\n - type: CONSUMER_GROUP\n name: my-consumer-group\n patternType: LITERAL\n serviceAccount: my-service-account\n defaultCatalogVisibility: PUBLIC\n","Order":8}}},"ApplicationInstancePermission":{"Versions":{"1":{"ListPath":"/public/self-serve/v1/application-instance-permission","Name":"ApplicationInstancePermission","ParentPathParam":[],"ParentQueryParam":null,"ListQueryParameter":{"filterByApplication":{"FlagName":"application","Required":false,"Type":"string"},"filterByApplicationInstance":{"FlagName":"application-instance","Required":false,"Type":"string"},"filterByGrantedTo":{"FlagName":"granted-to","Required":false,"Type":"string"}},"ApplyExample":"apiVersion: v1\nkind: ApplicationInstancePermission\nmetadata:\n application: test\n appInstance: test\n name: test\nspec:\n resource:\n type: TOPIC\n name: test\n patternType: LITERAL\n userPermission: READ\n serviceAccountPermission: WRITE\n grantedTo: test\n","Order":9}}},"Connector":{"Versions":{"2":{"ListPath":"/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector","Name":"Connector","ParentPathParam":["cluster","connectCluster"],"ParentQueryParam":null,"ListQueryParameter":{},"ApplyExample":"apiVersion: v2\nkind: Connector\nmetadata:\n name: my-connector\n cluster: my-cluster\n connectCluster: my-connect\n autoRestart:\n enabled: true\n frequencySeconds: 600\n description: My connector\nspec:\n config: {}\n","Order":14}}},"DataQualityPolicy":{"Versions":{"1":{"ListPath":"/public/data-quality/v1/data-quality-policy","Name":"DataQualityPolicy","ParentPathParam":[],"ParentQueryParam":null,"ListQueryParameter":{"includeMetrics":{"FlagName":"include-metrics","Required":false,"Type":"boolean"},"metricsFrom":{"FlagName":"metrics-from","Required":false,"Type":"string"}},"ApplyExample":"apiVersion: v1\nkind: DataQualityPolicy\nmetadata:\n name: my-policy\n group: my-group\nspec:\n displayName: My policy\n description: My policy description\n rules:\n - my-rule\n targets:\n - cluster: my-cluster1\n topic: topic\n patternType: LITERAL\n - cluster: my-cluster2\n topic: topic-*\n patternType: PREFIXED\n actions:\n report:\n enabled: true\n block:\n enabled: true\n","Order":18}}},"DataQualityRule":{"Versions":{"1":{"ListPath":"/public/data-quality/v1/data-quality-rule","Name":"DataQualityRule","ParentPathParam":[],"ParentQueryParam":null,"ListQueryParameter":{},"ApplyExample":"apiVersion: v1\nkind: DataQualityRule\nmetadata:\n name: my-rule\nspec:\n celExpression: value.user == 'admin'\n displayName: Admin only\n description: Allow only admin\n type: Cel\n","Order":17}}},"Group":{"Versions":{"2":{"ListPath":"/public/iam/v2/group","Name":"Group","ParentPathParam":[],"ParentQueryParam":null,"ListQueryParameter":{},"ApplyExample":"apiVersion: v2\nkind: Group\nmetadata:\n name: group\nspec:\n displayName: test\n description: description\n externalGroups:\n - test\n externalGroupRegex:\n - tes*\n members:\n - user@conduktor.io\n membersFromExternalGroups: []\n permissions:\n - resourceType: TOPIC\n cluster: '*'\n name: test\n patternType: LITERAL\n permissions:\n - topicConsume\n - resourceType: TOPIC\n cluster: '*'\n name: test2\n patternType: PREFIXED\n permissions:\n - topicConsume\n","Order":1}}},"IndexedTopic":{"Versions":{"1":{"ListPath":"/public/sql/v1/cluster/{cluster}/indexed_topic","Name":"IndexedTopic","ParentPathParam":["cluster"],"ParentQueryParam":null,"ListQueryParameter":{},"ApplyExample":"apiVersion: v1\nkind: IndexedTopic\nmetadata:\n cluster: my-cluster\n name: my-topic\nspec:\n retentionTimeInSecond: 3600\n","Order":15}}},"KafkaCluster":{"Versions":{"2":{"ListPath":"/public/console/v2/kafka-cluster","Name":"KafkaCluster","ParentPathParam":[],"ParentQueryParam":null,"ListQueryParameter":{},"ApplyExample":"apiVersion: v2\nkind: KafkaCluster\nmetadata:\n name: my-kafka-cluster\n labels:\n env: prod\nspec:\n displayName: yo\n bootstrapServers: localhost:9092\n properties:\n sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";\n color: '#FF0000'\n icon: icon\n schemaRegistry:\n url: https://my-schema-registry:8081\n security:\n username: admin\n password: admin-secret\n type: BasicAuth\n ignoreUntrustedCertificate: false\n type: ConfluentLike\n","Order":2}}},"KafkaConnectCluster":{"Versions":{"2":{"ListPath":"/public/console/v2/cluster/{cluster}/kafka-connect","Name":"KafkaConnectCluster","ParentPathParam":["cluster"],"ParentQueryParam":null,"ListQueryParameter":{},"ApplyExample":"apiVersion: v2\nkind: KafkaConnectCluster\nmetadata:\n name: connect-1\n cluster: my-cloud\n labels:\n user-labels: I am a user label\nspec:\n displayName: My kafka connect\n urls: http://localhost:8083\n headers:\n a: b\n c: d\n ignoreUntrustedCertificate: true\n security:\n username: user\n password: password\n type: BasicAuth\n","Order":3}}},"KsqlDBCluster":{"Versions":{"2":{"ListPath":"/public/console/v2/cluster/{cluster}/ksqldb","Name":"KsqlDBCluster","ParentPathParam":["cluster"],"ParentQueryParam":null,"ListQueryParameter":{},"ApplyExample":"apiVersion: v2\nkind: KsqlDBCluster\nmetadata:\n name: connect-1\n cluster: my-cloud\nspec:\n displayName: My kafka connect\n url: http://localhost:8083\n headers:\n a: b\n c: d\n ignoreUntrustedCertificate: true\n security:\n username: user\n password: password\n type: BasicAuth\n","Order":4}}},"PartnerZone":{"Versions":{"2":{"ListPath":"/public/console/v2/partner-zone","Name":"PartnerZone","ParentPathParam":[],"ParentQueryParam":null,"ListQueryParameter":{},"ApplyExample":"apiVersion: v2\nkind: PartnerZone\nmetadata:\n name: john-partner-zone\n labels:\n project: projectA\nspec:\n cluster: my-cluster\n displayName: John's partner zone\n description: for john\n url: http://conduktor.io\n authenticationMode:\n serviceAccount: johndoe\n type: PLAIN\n topics:\n - name: topic-a\n backingTopic: kafka-topic-a\n permission: WRITE\n partner:\n name: John Doe\n role: Data analyst\n email: johndoe@company.io\n phone: 07827 837 177\n trafficControlPolicies:\n maxProduceRate: 1000000\n maxConsumeRate: 2000000\n limitCommitOffset: 30\n headers:\n addOnProduce:\n - key: X-My-Header\n value: my-x-value\n overrideIfExists: false\n - key: Y-My-Header\n value: my-y-value\n overrideIfExists: true\n removeOnConsume:\n - keyRegex: my_org_prefix.*\n","Order":16}}},"ResourcePolicy":{"Versions":{"1":{"ListPath":"/public/self-serve/v1/resource-policy","Name":"ResourcePolicy","ParentPathParam":[],"ParentQueryParam":null,"ListQueryParameter":{"application":{"FlagName":"application","Required":false,"Type":"string"},"application-instance":{"FlagName":"application-instance","Required":false,"Type":"string"}},"ApplyExample":"apiVersion: v1\nkind: ResourcePolicy\nmetadata:\n name: example\n labels:\n business-unit: delivery\nspec:\n targetKind: Connector\n description: A policy for a Connect to check that we always set the key 'a' to 1 and the key 'name' to 'example'\n rules:\n - condition: spec.a == 1\n errorMessage: a should be 1\n - condition: metadata.name == 'example'\n errorMessage: name should be example\n","Order":6}}},"ServiceAccount":{"Versions":{"1":{"ListPath":"/public/self-serve/v1/cluster/{cluster}/service-account","Name":"ServiceAccount","ParentPathParam":["cluster"],"ParentQueryParam":null,"ListQueryParameter":{},"ApplyExample":"apiVersion: v1\nkind: ServiceAccount\nmetadata:\n appInstance: my-app-instance-dev\n cluster: my-kafka-cluster\n labels:\n conduktor.io/application: application-a\n conduktor.io/application-instance: dev\n user-labels: I am a user label\n name: sa-clicko-dev\nspec:\n authorization:\n acls:\n - type: TOPIC\n name: click.\n patternType: PREFIXED\n operations:\n - Write\n host: '*'\n permission: Allow\n type: KAFKA_ACL\n","Order":11}}},"Subject":{"Versions":{"2":{"ListPath":"/public/kafka/v2/cluster/{cluster}/subject","Name":"Subject","ParentPathParam":["cluster"],"ParentQueryParam":null,"ListQueryParameter":{},"ApplyExample":"apiVersion: v2\nkind: Subject\nmetadata:\n name: my-subject\n cluster: my-cluster\n labels:\n conduktor.io/application: application-a\n conduktor.io/application-instance: staging\nspec:\n format: AVRO\n compatibility: BACKWARD_TRANSITIVE\n schema: '{\"type\": \"long\"}'\n","Order":13}}},"Topic":{"Versions":{"2":{"ListPath":"/public/kafka/v2/cluster/{cluster}/topic","Name":"Topic","ParentPathParam":["cluster"],"ParentQueryParam":null,"ListQueryParameter":{},"ApplyExample":"apiVersion: v2\nkind: Topic\nmetadata:\n name: my-topic\n cluster: my-cluster\n labels:\n conduktor.io/application: application-a\n conduktor.io/application-instance: staging\n user-labels: I am a user label\n catalogVisibility: PUBLIC\n descriptionIsEditable: true\n description: This is a topic description\n sqlStorage:\n retentionTimeInSecond: 42\nspec:\n partitions: 1\n replicationFactor: 1\n configs:\n cleanup.policy: delete\n retention.ms: '86400000'\n","Order":12}}},"TopicPolicy":{"Versions":{"1":{"ListPath":"/public/self-serve/v1/topic-policy","Name":"TopicPolicy","ParentPathParam":[],"ParentQueryParam":null,"ListQueryParameter":{"app-instance":{"FlagName":"application-instance","Required":false,"Type":"string"}},"ApplyExample":"apiVersion: v1\nkind: TopicPolicy\nmetadata:\n name: my-app-instance-prod\nspec:\n policies:\n my-policy:\n constraint: OneOf\n optional: true\n values:\n - value1\n - value2\n","Order":5}}},"User":{"Versions":{"2":{"ListPath":"/public/iam/v2/user","Name":"User","ParentPathParam":[],"ParentQueryParam":null,"ListQueryParameter":{},"ApplyExample":"apiVersion: v2\nkind: User\nmetadata:\n name: user@conduktor.io\nspec:\n firstName: description\n lastName: test\n permissions: []\n","Order":0}}}},"Run":{"partnerZoneGenerateCredentials":{"Path":"/public/partner-zone/v2/{partner-zone-name}/generate-credentials","Name":"partnerZoneGenerateCredentials","Doc":"generate a token for a partner zone service account","QueryParameter":{},"PathParameter":["partner-zone-name"],"BodyFields":{},"Method":"POST"},"whoami":{"Path":"/token/v1/whoami","Name":"whoami","Doc":"Return information about the credentials configured in the CLI","QueryParameter":{},"PathParameter":[],"BodyFields":{},"Method":"GET"}}} \ No newline at end of file +{ + "Kind": { + "Alert": { + "Versions": { + "2": { + "ListPath": "/public/monitoring/v2/cluster/{cluster}/alert", + "Name": "Alert", + "ParentPathParam": [ + "cluster" + ], + "ParentQueryParam": null, + "ListQueryParameter": { + "alertType": { + "FlagName": "alert-type", + "Required": false, + "Type": "string" + }, + "connect": { + "FlagName": "connect", + "Required": false, + "Type": "string" + }, + "connector": { + "FlagName": "connector", + "Required": false, + "Type": "string" + }, + "consumerGroup": { + "FlagName": "consumer-group", + "Required": false, + "Type": "string" + }, + "topic": { + "FlagName": "topic", + "Required": false, + "Type": "string" + } + }, + "ApplyExample": "apiVersion: v2\nkind: Alert\nmetadata:\n cluster: cluster\n name: alert\nspec:\n connectName: connect\n connectorName: connector\n metric: FailedTaskCount\n operator: GreaterThan\n threshold: 1\n type: KafkaConnectAlert\n", + "Order": 21 + }, + "3": { + "ListPath": "/public/monitoring/v3/alert", + "Name": "Alert", + "ParentPathParam": [], + "ParentQueryParam": [ + "appInstance", + "group", + "user" + ], + "ListQueryParameter": { + "alertType": { + "FlagName": "alert-type", + "Required": false, + "Type": "string" + }, + "appInstance": { + "FlagName": "application-instance", + "Required": false, + "Type": "string" + }, + "cluster": { + "FlagName": "cluster", + "Required": false, + "Type": "string" + }, + "connect": { + "FlagName": "connect", + "Required": false, + "Type": "string" + }, + "connector": { + "FlagName": "connector", + "Required": false, + "Type": "string" + }, + "consumerGroup": { + "FlagName": "consumer-group", + "Required": false, + "Type": "string" + }, + "group": { + "FlagName": "group", + "Required": false, + "Type": "string" + }, + "topic": { + "FlagName": "topic", + "Required": false, + "Type": "string" + }, + "user": { + "FlagName": "user", + "Required": false, + "Type": "string" + } + }, + "ApplyExample": "apiVersion: v3\nkind: Alert\nmetadata:\n appInstance: my-app\n name: alert\nspec:\n cluster: cluster\n connectName: connect\n connectorName: connector\n destination:\n channel: test\n type: Slack\n metric: FailedTaskCount\n operator: GreaterThan\n threshold: 1\n type: KafkaConnectAlert\n", + "Order": 21 + } + } + }, + "Application": { + "Versions": { + "1": { + "ListPath": "/public/self-serve/v1/application", + "Name": "Application", + "ParentPathParam": [], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v1\nkind: Application\nmetadata:\n labels:\n business-unit: delivery\n name: my-application\nspec:\n owner: me\n policyRef:\n - my-policy\n title: My Application\n", + "Order": 7 + } + } + }, + "ApplicationGroup": { + "Versions": { + "1": { + "ListPath": "/public/self-serve/v1/application-group", + "Name": "ApplicationGroup", + "ParentPathParam": [], + "ParentQueryParam": null, + "ListQueryParameter": { + "application": { + "FlagName": "application", + "Required": false, + "Type": "string" + } + }, + "ApplyExample": "apiVersion: v1\nkind: ApplicationGroup\nmetadata:\n application: clickstream-app\n name: clickstream-support\nspec:\n description: |-\n Members of the Support Group are allowed:\n Read access on all the resources\n Can restart owned connectors\n Can reset offsets\n displayName: Support Clickstream\n externalGroupRegex:\n - support*\n externalGroups:\n - support\n instancePermissions:\n - appInstance: clickstream-app-dev\n permissions:\n - instanceViewConfig\n members:\n - user1@company.org\n - user2@company.org\n permissions:\n - appInstance: clickstream-app-dev\n name: '*'\n patternType: LITERAL\n permissions:\n - topicConsume\n - topicViewConfig\n resourceType: TOPIC\n", + "Order": 10 + } + } + }, + "ApplicationInstance": { + "Versions": { + "1": { + "ListPath": "/public/self-serve/v1/application-instance", + "Name": "ApplicationInstance", + "ParentPathParam": [], + "ParentQueryParam": null, + "ListQueryParameter": { + "application": { + "FlagName": "application", + "Required": false, + "Type": "string" + } + }, + "ApplyExample": "apiVersion: v1\nkind: ApplicationInstance\nmetadata:\n application: my-app\n labels:\n env: prod\n team: my-team\n name: my-app-instance-prod\nspec:\n cluster: prod-cluster\n defaultCatalogVisibility: PUBLIC\n policyRef:\n - my-policy\n resources:\n - name: my-topic\n patternType: LITERAL\n type: TOPIC\n - name: my-consumer-group\n patternType: LITERAL\n type: CONSUMER_GROUP\n serviceAccount: my-service-account\n topicPolicyRef:\n - my-topic-policy\n", + "Order": 8 + } + } + }, + "ApplicationInstancePermission": { + "Versions": { + "1": { + "ListPath": "/public/self-serve/v1/application-instance-permission", + "Name": "ApplicationInstancePermission", + "ParentPathParam": [], + "ParentQueryParam": null, + "ListQueryParameter": { + "filterByApplication": { + "FlagName": "application", + "Required": false, + "Type": "string" + }, + "filterByApplicationInstance": { + "FlagName": "application-instance", + "Required": false, + "Type": "string" + }, + "filterByGrantedTo": { + "FlagName": "granted-to", + "Required": false, + "Type": "string" + } + }, + "ApplyExample": "apiVersion: v1\nkind: ApplicationInstancePermission\nmetadata:\n appInstance: test\n application: test\n name: test\nspec:\n grantedTo: test\n resource:\n name: test\n patternType: LITERAL\n type: TOPIC\n serviceAccountPermission: WRITE\n userPermission: READ\n", + "Order": 9 + } + } + }, + "Connector": { + "Versions": { + "2": { + "ListPath": "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector", + "Name": "Connector", + "ParentPathParam": [ + "cluster", + "connectCluster" + ], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v2\nkind: Connector\nmetadata:\n autoRestart:\n enabled: true\n frequencySeconds: 600\n cluster: my-cluster\n connectCluster: my-connect\n description: My connector\n name: my-connector\nspec:\n config: {}\n", + "Order": 15 + } + } + }, + "ConnectorTemplate": { + "Versions": { + "2": { + "ListPath": "/public/console/v2/connector-template", + "Name": "ConnectorTemplate", + "ParentPathParam": [], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v2\nkind: ConnectorTemplate\nmetadata:\n labels:\n category: sink\n name: s3-sink-standard\nspec:\n defaults:\n metadata:\n labels:\n category: sink\n name: s3-sink-${dept}\n spec:\n class: io.confluent.connect.s3.S3SinkConnector\n config:\n flush.size: \"10000\"\n format.class: io.confluent.connect.s3.format.avro.AvroFormat\n rotate.interval.ms: \"60000\"\n storage.class: io.confluent.connect.s3.storage.S3Storage\n tasks.max: \"2\"\n description: Standard S3 sink connector with flush every 10,000 records or 60 seconds.\n displayName: S3 Sink (Standard)\n", + "Order": 23 + } + } + }, + "DataQualityPolicy": { + "Versions": { + "1": { + "ListPath": "/public/data-quality/v1/data-quality-policy", + "Name": "DataQualityPolicy", + "ParentPathParam": [], + "ParentQueryParam": null, + "ListQueryParameter": { + "descSort": { + "FlagName": "desc-sort", + "Required": false, + "Type": "boolean" + }, + "includeMetrics": { + "FlagName": "include-metrics", + "Required": false, + "Type": "boolean" + }, + "metricsFrom": { + "FlagName": "metrics-from", + "Required": false, + "Type": "string" + } + }, + "ApplyExample": "apiVersion: v1\nkind: DataQualityPolicy\nmetadata:\n group: my-group\n name: my-policy\nspec:\n actions:\n block:\n enabled: true\n dlq:\n dlqTopic: my-dlq-topic\n enabled: true\n mark:\n enabled: true\n description: My policy description\n displayName: My policy\n rules:\n - my-rule\n targets:\n - cluster: my-cluster1\n patternType: LITERAL\n topic: topic\n - cluster: my-cluster2\n patternType: PREFIXED\n topic: topic-*\n", + "Order": 19 + } + } + }, + "DataQualityRule": { + "Versions": { + "1": { + "ListPath": "/public/data-quality/v1/data-quality-rule", + "Name": "DataQualityRule", + "ParentPathParam": [], + "ParentQueryParam": null, + "ListQueryParameter": { + "descSort": { + "FlagName": "desc-sort", + "Required": false, + "Type": "boolean" + } + }, + "ApplyExample": "apiVersion: v1\nkind: DataQualityRule\nmetadata:\n name: my-rule\nspec:\n celExpression: value.user == 'admin'\n description: Allow only admin\n displayName: Admin only\n type: Cel\n", + "Order": 18 + } + } + }, + "GlueSchema": { + "Versions": { + "2": { + "ListPath": "/public/kafka/v2/cluster/{cluster}/glue-schemas", + "Name": "GlueSchema", + "ParentPathParam": [ + "cluster" + ], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v2\nkind: GlueSchema\nmetadata:\n cluster: my-cluster\n labels:\n conduktor.io/application: application-a\n conduktor.io/application-instance: staging\n name: my-schema\nspec:\n compatibility: BACKWARD_ALL\n format: AVRO\n schema: '{\"type\": \"long\"}'\n", + "Order": 14 + } + } + }, + "Group": { + "Versions": { + "2": { + "ListPath": "/public/iam/v2/group", + "Name": "Group", + "ParentPathParam": [], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v2\nkind: Group\nmetadata:\n name: group\nspec:\n description: description\n displayName: test\n externalGroupRegex:\n - tes*\n externalGroups:\n - test\n members:\n - user@conduktor.io\n membersFromExternalGroups: []\n permissions:\n - cluster: '*'\n name: test\n patternType: LITERAL\n permissions:\n - topicConsume\n resourceType: TOPIC\n - cluster: '*'\n name: test2\n patternType: PREFIXED\n permissions:\n - topicConsume\n resourceType: TOPIC\n", + "Order": 1 + } + } + }, + "IndexedTopic": { + "Versions": { + "1": { + "ListPath": "/public/sql/v1/cluster/{cluster}/indexed_topic", + "Name": "IndexedTopic", + "ParentPathParam": [ + "cluster" + ], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v1\nkind: IndexedTopic\nmetadata:\n cluster: my-cluster\n name: my-topic\nspec:\n retentionTimeInSecond: 3600\n", + "Order": 16 + } + } + }, + "Integration": { + "Versions": { + "3": { + "ListPath": "/public/monitoring/v3/integration", + "Name": "Integration", + "ParentPathParam": [], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v3\nkind: Integration\nmetadata:\n name: Slack\nspec:\n config:\n token: slack-token\n type: Slack\n", + "Order": 20 + } + } + }, + "KafkaCluster": { + "Versions": { + "2": { + "ListPath": "/public/console/v2/kafka-cluster", + "Name": "KafkaCluster", + "ParentPathParam": [], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v2\nkind: KafkaCluster\nmetadata:\n labels:\n env: prod\n name: my-kafka-cluster\nspec:\n bootstrapServers: localhost:9092\n color: '#FF0000'\n displayName: yo\n icon: icon\n properties:\n sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";\n schemaRegistry:\n ignoreUntrustedCertificate: false\n security:\n password: admin-secret\n type: BasicAuth\n username: admin\n type: ConfluentLike\n url: https://my-schema-registry:8081\n", + "Order": 2 + } + } + }, + "KafkaConnectCluster": { + "Versions": { + "2": { + "ListPath": "/public/console/v2/cluster/{cluster}/kafka-connect", + "Name": "KafkaConnectCluster", + "ParentPathParam": [ + "cluster" + ], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v2\nkind: KafkaConnectCluster\nmetadata:\n cluster: my-cloud\n labels:\n user-labels: I am a user label\n name: connect-1\nspec:\n displayName: My kafka connect\n headers:\n a: b\n c: d\n ignoreUntrustedCertificate: true\n security:\n password: password\n type: BasicAuth\n username: user\n urls: http://localhost:8083\n", + "Order": 3 + } + } + }, + "KsqlDBCluster": { + "Versions": { + "2": { + "ListPath": "/public/console/v2/cluster/{cluster}/ksqldb", + "Name": "KsqlDBCluster", + "ParentPathParam": [ + "cluster" + ], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v2\nkind: KsqlDBCluster\nmetadata:\n cluster: my-cloud\n name: connect-1\nspec:\n displayName: My kafka connect\n headers:\n a: b\n c: d\n ignoreUntrustedCertificate: true\n security:\n password: password\n type: BasicAuth\n username: user\n url: http://localhost:8083\n", + "Order": 4 + } + } + }, + "PartnerZone": { + "Versions": { + "2": { + "ListPath": "/public/console/v2/partner-zone", + "Name": "PartnerZone", + "ParentPathParam": [], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v2\nkind: PartnerZone\nmetadata:\n labels:\n project: projectA\n name: john-partner-zone\nspec:\n authenticationMode:\n serviceAccount: johndoe\n type: PLAIN\n cluster: my-gateway\n description: for john\n displayName: John's partner zone\n headers:\n addOnProduce:\n - key: X-My-Header\n overrideIfExists: false\n value: my-x-value\n - key: Y-My-Header\n overrideIfExists: true\n value: my-y-value\n removeOnConsume:\n - keyRegex: my_org_prefix.*\n partner:\n email: johndoe@company.io\n name: John Doe\n phone: 07827 837 177\n role: Data analyst\n topics:\n - backingTopic: kafka-topic-a\n name: topic-a\n permission: WRITE\n trafficControlPolicies:\n limitCommitOffset: 30\n maxConsumeRate: 2000000\n maxProduceRate: 1000000\n underlyingCluster: my-cluster\n url: http://conduktor.io\n vclusterName: johns-vcluster\n", + "Order": 17 + } + } + }, + "ResourcePolicy": { + "Versions": { + "1": { + "ListPath": "/public/self-serve/v1/resource-policy", + "Name": "ResourcePolicy", + "ParentPathParam": [], + "ParentQueryParam": null, + "ListQueryParameter": { + "application": { + "FlagName": "application", + "Required": false, + "Type": "string" + }, + "application-instance": { + "FlagName": "application-instance", + "Required": false, + "Type": "string" + } + }, + "ApplyExample": "apiVersion: v1\nkind: ResourcePolicy\nmetadata:\n labels:\n business-unit: delivery\n name: example\nspec:\n description: A policy for a Connect to check that we always set the key 'a' to 1 and the key 'name' to 'example'\n rules:\n - condition: spec.a == 1\n errorMessage: a should be 1\n - condition: metadata.name == 'example'\n errorMessage: name should be example\n targetKind: Connector\n", + "Order": 6 + } + } + }, + "ServiceAccount": { + "Versions": { + "1": { + "ListPath": "/public/self-serve/v1/cluster/{cluster}/service-account", + "Name": "ServiceAccount", + "ParentPathParam": [ + "cluster" + ], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v1\nkind: ServiceAccount\nmetadata:\n appInstance: my-app-instance-dev\n cluster: my-kafka-cluster\n labels:\n conduktor.io/application: application-a\n conduktor.io/application-instance: dev\n user-labels: I am a user label\n name: sa-clicko-dev\nspec:\n authorization:\n acls:\n - host: '*'\n name: click.\n operations:\n - Write\n patternType: PREFIXED\n permission: Allow\n type: TOPIC\n type: KAFKA_ACL\n schemaRegistryAuthorization:\n acls:\n - name: click.\n operations:\n - Read\n - Write\n patternType: PREFIXED\n", + "Order": 11 + } + } + }, + "Subject": { + "Versions": { + "2": { + "ListPath": "/public/kafka/v2/cluster/{cluster}/subject", + "Name": "Subject", + "ParentPathParam": [ + "cluster" + ], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v2\nkind: Subject\nmetadata:\n cluster: my-cluster\n labels:\n conduktor.io/application: application-a\n conduktor.io/application-instance: staging\n name: my-subject\nspec:\n compatibility: BACKWARD_TRANSITIVE\n format: AVRO\n schema: '{\"type\": \"long\"}'\n", + "Order": 13 + } + } + }, + "Topic": { + "Versions": { + "2": { + "ListPath": "/public/kafka/v2/cluster/{cluster}/topic", + "Name": "Topic", + "ParentPathParam": [ + "cluster" + ], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v2\nkind: Topic\nmetadata:\n catalogVisibility: PUBLIC\n cluster: my-cluster\n description: This is a topic description\n descriptionIsEditable: true\n labels:\n conduktor.io/application: application-a\n conduktor.io/application-instance: staging\n user-labels: I am a user label\n name: my-topic\n sqlStorage:\n retentionTimeInSecond: 42\nspec:\n configs:\n cleanup.policy: delete\n retention.ms: \"86400000\"\n partitions: 1\n replicationFactor: 1\n", + "Order": 12 + } + } + }, + "TopicPolicy": { + "Versions": { + "1": { + "ListPath": "/public/self-serve/v1/topic-policy", + "Name": "TopicPolicy", + "ParentPathParam": [], + "ParentQueryParam": null, + "ListQueryParameter": { + "app-instance": { + "FlagName": "application-instance", + "Required": false, + "Type": "string" + } + }, + "ApplyExample": "apiVersion: v1\nkind: TopicPolicy\nmetadata:\n name: my-app-instance-prod\nspec:\n policies:\n my-policy:\n constraint: OneOf\n optional: true\n values:\n - value1\n - value2\n", + "Order": 5 + } + } + }, + "TopicTemplate": { + "Versions": { + "2": { + "ListPath": "/public/console/v2/topic-template", + "Name": "TopicTemplate", + "ParentPathParam": [], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v2\nkind: TopicTemplate\nmetadata:\n labels:\n category: template\n name: high-partition-topic\nspec:\n defaults:\n metadata:\n labels:\n throughput: high\n name: my-topic-${department}\n spec:\n configs:\n cleanup.policy: delete\n min.insync.replicas: \"2\"\n retention.ms: \"604800000\"\n partitions: 24\n replicationFactor: 3\n description: Optimised for high-throughput workloads. 24 partitions, 3× replication, 7-day retention.\n displayName: High Partition Topic\n", + "Order": 22 + } + } + }, + "User": { + "Versions": { + "2": { + "ListPath": "/public/iam/v2/user", + "Name": "User", + "ParentPathParam": [], + "ParentQueryParam": null, + "ListQueryParameter": {}, + "ApplyExample": "apiVersion: v2\nkind: User\nmetadata:\n name: user@conduktor.io\nspec:\n firstName: description\n lastName: test\n permissions: []\n", + "Order": 0 + } + } + } + }, + "Run": { + "connectorAlterOffsets": { + "Path": "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/offsets", + "Name": "connectorAlterOffsets", + "Doc": "Alter the offsets of a stopped connector", + "QueryParameter": {}, + "PathParameter": [ + "cluster", + "connectCluster", + "connector-name" + ], + "BodyFields": { + "offsets": { + "FlagName": "offsets", + "Required": false, + "Type": "json" + } + }, + "Method": "PATCH" + }, + "connectorGetOffsets": { + "Path": "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/offsets", + "Name": "connectorGetOffsets", + "Doc": "Get the offsets of a connector", + "QueryParameter": {}, + "PathParameter": [ + "cluster", + "connectCluster", + "connector-name" + ], + "BodyFields": {}, + "Method": "GET" + }, + "connectorResetOffsets": { + "Path": "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/offsets", + "Name": "connectorResetOffsets", + "Doc": "Reset all offsets of a stopped connector", + "QueryParameter": {}, + "PathParameter": [ + "cluster", + "connectCluster", + "connector-name" + ], + "BodyFields": {}, + "Method": "DELETE" + }, + "connectorStop": { + "Path": "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/stop", + "Name": "connectorStop", + "Doc": "Stop a connector", + "QueryParameter": {}, + "PathParameter": [ + "cluster", + "connectCluster", + "connector-name" + ], + "BodyFields": {}, + "Method": "PUT" + }, + "partnerZoneGenerateCredentials": { + "Path": "/public/partner-zone/v2/{partner-zone-name}/generate-credentials", + "Name": "partnerZoneGenerateCredentials", + "Doc": "generate a token for a partner zone service account", + "QueryParameter": {}, + "PathParameter": [ + "partner-zone-name" + ], + "BodyFields": {}, + "Method": "POST" + }, + "whoami": { + "Path": "/token/v1/whoami", + "Name": "whoami", + "Doc": "Return information about the credentials configured in the CLI", + "QueryParameter": {}, + "PathParameter": [], + "BodyFields": {}, + "Method": "GET" + } + } +} \ No newline at end of file diff --git a/pkg/schema/openapi_parser.go b/pkg/schema/openapi_parser.go index b1c42b5..23a7d81 100644 --- a/pkg/schema/openapi_parser.go +++ b/pkg/schema/openapi_parser.go @@ -84,6 +84,10 @@ func (s *OpenAPIParser) getRuns(backendType BackendType) (RunCatalog, error) { if err != nil { return nil, err } + err = handleExecuteOperation(backendType, path.Key(), path.Value().Patch, resty.MethodPatch, result) + if err != nil { + return nil, err + } } return result, nil } @@ -147,9 +151,14 @@ func computeBodyFields(body *v3high.RequestBody) map[string]FlagParameterOption for propertiesPair := bodySchema.Properties.First(); propertiesPair != nil; propertiesPair = propertiesPair.Next() { key := propertiesPair.Key() value := propertiesPair.Value() - if value != nil && value.Schema() != nil { + if value != nil && value.Schema() != nil && len(value.Schema().Type) > 0 { valueType := value.Schema().Type[0] - if valueType == "string" || valueType == "boolean" || valueType == "integer" { + // Scalars map to a typed flag; arrays/objects are passed as a + // JSON-encoded string flag that we decode back into the body. + if valueType == "array" || valueType == "object" { + valueType = "json" + } + if valueType == "string" || valueType == "boolean" || valueType == "integer" || valueType == "json" { result[key] = FlagParameterOption{ FlagName: computeFlagName(key), Type: valueType, diff --git a/pkg/schema/openapi_parser_console_test.go b/pkg/schema/openapi_parser_console_test.go index 12ba953..f208ebd 100644 --- a/pkg/schema/openapi_parser_console_test.go +++ b/pkg/schema/openapi_parser_console_test.go @@ -460,3 +460,76 @@ func TestGetExecutes(t *testing.T) { } }) } + +func TestGetConnectorRuns(t *testing.T) { + t.Run("parses connector stop / offsets runs, including the PATCH with a JSON array body", func(t *testing.T) { + schemaContent, err := os.ReadFile("testdata/connector_run.yaml") + if err != nil { + t.Fatalf("failed reading file: %s", err) + } + + schema, err := NewOpenAPIParser(schemaContent) + if err != nil { + t.Fatalf("failed creating new schema: %s", err) + } + + result, err := schema.getRuns(CONSOLE) + if err != nil { + t.Fatalf("failed getting runs: %s", err) + } + + offsetsPath := "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/offsets" + connectorPathParams := []string{"cluster", "connectCluster", "connector-name"} + expected := RunCatalog{ + "connectorStop": Run{ + Path: "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/stop", + Name: "connectorStop", + Doc: "Stop a connector", + QueryParameter: map[string]FlagParameterOption{}, + PathParameter: connectorPathParams, + BodyFields: map[string]FlagParameterOption{}, + Method: "PUT", + BackendType: CONSOLE, + }, + "connectorGetOffsets": Run{ + Path: offsetsPath, + Name: "connectorGetOffsets", + Doc: "Get the offsets of a connector", + QueryParameter: map[string]FlagParameterOption{}, + PathParameter: connectorPathParams, + BodyFields: map[string]FlagParameterOption{}, + Method: "GET", + BackendType: CONSOLE, + }, + "connectorResetOffsets": Run{ + Path: offsetsPath, + Name: "connectorResetOffsets", + Doc: "Reset all offsets of a stopped connector", + QueryParameter: map[string]FlagParameterOption{}, + PathParameter: connectorPathParams, + BodyFields: map[string]FlagParameterOption{}, + Method: "DELETE", + BackendType: CONSOLE, + }, + "connectorAlterOffsets": Run{ + Path: offsetsPath, + Name: "connectorAlterOffsets", + Doc: "Alter the offsets of a stopped connector", + QueryParameter: map[string]FlagParameterOption{}, + PathParameter: connectorPathParams, + BodyFields: map[string]FlagParameterOption{ + "offsets": { + FlagName: "offsets", + Required: false, + Type: "json", + }, + }, + Method: "PATCH", + BackendType: CONSOLE, + }, + } + if !reflect.DeepEqual(result, expected) { + t.Error(spew.Printf("got %v, want %v", result, expected)) + } + }) +} diff --git a/pkg/schema/testdata/connector_run.yaml b/pkg/schema/testdata/connector_run.yaml new file mode 100644 index 0000000..155d8f3 --- /dev/null +++ b/pkg/schema/testdata/connector_run.yaml @@ -0,0 +1,152 @@ +openapi: 3.0.3 +info: + title: Conduktor Public API + version: 0.0.1-SNAPSHOT +paths: + /public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/stop: + put: + tags: + - connector + description: Stop a connector (Kafka Connect 3.5.0+, KIP-875) + operationId: stop-connector + parameters: + - name: cluster + in: path + required: true + schema: + type: string + - name: connectCluster + in: path + required: true + schema: + type: string + - name: connector-name + in: path + required: true + schema: + type: string + responses: + '204': + description: '' + security: + - httpAuth: [] + x-cdk-run-version: '1' + x-cdk-run-name: connectorStop + x-cdk-run-doc: Stop a connector + /public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/offsets: + get: + tags: + - connector + description: Get the source or sink offsets of a connector + operationId: get-connector-offsets + parameters: + - name: cluster + in: path + required: true + schema: + type: string + - name: connectCluster + in: path + required: true + schema: + type: string + - name: connector-name + in: path + required: true + schema: + type: string + responses: + '200': + description: '' + content: + application/json: + schema: + $ref: '#/components/schemas/ConnectorOffsets' + security: + - httpAuth: [] + x-cdk-run-version: '1' + x-cdk-run-name: connectorGetOffsets + x-cdk-run-doc: Get the offsets of a connector + delete: + tags: + - connector + description: Reset all offsets of a connector. The connector must be in STOPPED state + operationId: reset-connector-offsets + parameters: + - name: cluster + in: path + required: true + schema: + type: string + - name: connectCluster + in: path + required: true + schema: + type: string + - name: connector-name + in: path + required: true + schema: + type: string + responses: + '200': + description: '' + security: + - httpAuth: [] + x-cdk-run-version: '1' + x-cdk-run-name: connectorResetOffsets + x-cdk-run-doc: Reset all offsets of a stopped connector + patch: + tags: + - connector + description: Alter the offsets of a connector. The connector must be in STOPPED state + operationId: alter-connector-offsets + parameters: + - name: cluster + in: path + required: true + schema: + type: string + - name: connectCluster + in: path + required: true + schema: + type: string + - name: connector-name + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ConnectorOffsets' + required: true + responses: + '200': + description: '' + security: + - httpAuth: [] + x-cdk-run-version: '1' + x-cdk-run-name: connectorAlterOffsets + x-cdk-run-doc: Alter the offsets of a stopped connector +components: + schemas: + ConnectorOffsetEntry: + title: ConnectorOffsetEntry + type: object + required: + - partition + - offset + properties: + partition: {} + offset: {} + ConnectorOffsets: + title: ConnectorOffsets + type: object + properties: + offsets: + type: array + items: + $ref: '#/components/schemas/ConnectorOffsetEntry' From 081baaf1c1c5904ea3889238d7c8e83df34fa480 Mon Sep 17 00:00:00 2001 From: Benjamin Thuillier Date: Wed, 27 May 2026 16:32:06 +0200 Subject: [PATCH 2/2] add new verb --- pkg/client/console_client_run_test.go | 60 +++++++++------ pkg/schema/default_schema/console.json | 39 ++++++++++ pkg/schema/openapi_parser_console_test.go | 30 ++++++++ pkg/schema/testdata/connector_run.yaml | 90 +++++++++++++++++++++++ 4 files changed, 195 insertions(+), 24 deletions(-) diff --git a/pkg/client/console_client_run_test.go b/pkg/client/console_client_run_test.go index ccc91b0..f49b280 100644 --- a/pkg/client/console_client_run_test.go +++ b/pkg/client/console_client_run_test.go @@ -34,31 +34,43 @@ var connectorOffsetsRun = schema.Run{ Method: "PATCH", } -func connectorStopRun() schema.Run { - return schema.Run{ - BackendType: schema.CONSOLE, - Name: "connectorStop", - Path: "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/stop", - PathParameter: []string{"cluster", "connectCluster", "connector-name"}, - Method: "PUT", +// connectorStop / pause / resume / restart are all empty-body PUTs that differ +// only by the trailing path segment. +func TestRunConnectorPutVerbs(t *testing.T) { + verbs := []struct { + name string + segment string + }{ + {"connectorStop", "stop"}, + {"connectorPause", "pause"}, + {"connectorResume", "resume"}, + {"connectorRestart", "restart"}, } -} - -func TestRunConnectorStop(t *testing.T) { - defer httpmock.Reset() - client := makeMockedClient(t) - - httpmock.RegisterMatcherResponderWithQuery( - "PUT", - "http://baseUrl/api/public/kafka/v2/cluster/my-cluster/connect/my-connect/connector/my-connector/stop", - nil, - httpmock.HeaderIs("Authorization", "Bearer aToken"), - httpmock.NewStringResponder(204, ""), - ) - - _, err := client.Run(connectorStopRun(), []string{"my-cluster", "my-connect", "my-connector"}, map[string]string{}, nil) - if err != nil { - t.Errorf("expected no error, got: %s", err) + for _, verb := range verbs { + t.Run(verb.name, func(t *testing.T) { + defer httpmock.Reset() + client := makeMockedClient(t) + + run := schema.Run{ + BackendType: schema.CONSOLE, + Name: verb.name, + Path: "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/" + verb.segment, + PathParameter: []string{"cluster", "connectCluster", "connector-name"}, + Method: "PUT", + } + httpmock.RegisterMatcherResponderWithQuery( + "PUT", + "http://baseUrl/api/public/kafka/v2/cluster/my-cluster/connect/my-connect/connector/my-connector/"+verb.segment, + nil, + httpmock.HeaderIs("Authorization", "Bearer aToken"), + httpmock.NewStringResponder(204, ""), + ) + + _, err := client.Run(run, []string{"my-cluster", "my-connect", "my-connector"}, map[string]string{}, nil) + if err != nil { + t.Errorf("expected no error, got: %s", err) + } + }) } } diff --git a/pkg/schema/default_schema/console.json b/pkg/schema/default_schema/console.json index 69dbe95..8885d77 100644 --- a/pkg/schema/default_schema/console.json +++ b/pkg/schema/default_schema/console.json @@ -517,6 +517,19 @@ "BodyFields": {}, "Method": "GET" }, + "connectorPause": { + "Path": "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/pause", + "Name": "connectorPause", + "Doc": "Pause a connector", + "QueryParameter": {}, + "PathParameter": [ + "cluster", + "connectCluster", + "connector-name" + ], + "BodyFields": {}, + "Method": "PUT" + }, "connectorResetOffsets": { "Path": "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/offsets", "Name": "connectorResetOffsets", @@ -530,6 +543,32 @@ "BodyFields": {}, "Method": "DELETE" }, + "connectorRestart": { + "Path": "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/restart", + "Name": "connectorRestart", + "Doc": "Restart a connector", + "QueryParameter": {}, + "PathParameter": [ + "cluster", + "connectCluster", + "connector-name" + ], + "BodyFields": {}, + "Method": "PUT" + }, + "connectorResume": { + "Path": "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/resume", + "Name": "connectorResume", + "Doc": "Resume a paused connector", + "QueryParameter": {}, + "PathParameter": [ + "cluster", + "connectCluster", + "connector-name" + ], + "BodyFields": {}, + "Method": "PUT" + }, "connectorStop": { "Path": "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/stop", "Name": "connectorStop", diff --git a/pkg/schema/openapi_parser_console_test.go b/pkg/schema/openapi_parser_console_test.go index f208ebd..c30b062 100644 --- a/pkg/schema/openapi_parser_console_test.go +++ b/pkg/schema/openapi_parser_console_test.go @@ -491,6 +491,36 @@ func TestGetConnectorRuns(t *testing.T) { Method: "PUT", BackendType: CONSOLE, }, + "connectorPause": Run{ + Path: "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/pause", + Name: "connectorPause", + Doc: "Pause a connector", + QueryParameter: map[string]FlagParameterOption{}, + PathParameter: connectorPathParams, + BodyFields: map[string]FlagParameterOption{}, + Method: "PUT", + BackendType: CONSOLE, + }, + "connectorResume": Run{ + Path: "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/resume", + Name: "connectorResume", + Doc: "Resume a paused connector", + QueryParameter: map[string]FlagParameterOption{}, + PathParameter: connectorPathParams, + BodyFields: map[string]FlagParameterOption{}, + Method: "PUT", + BackendType: CONSOLE, + }, + "connectorRestart": Run{ + Path: "/public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/restart", + Name: "connectorRestart", + Doc: "Restart a connector", + QueryParameter: map[string]FlagParameterOption{}, + PathParameter: connectorPathParams, + BodyFields: map[string]FlagParameterOption{}, + Method: "PUT", + BackendType: CONSOLE, + }, "connectorGetOffsets": Run{ Path: offsetsPath, Name: "connectorGetOffsets", diff --git a/pkg/schema/testdata/connector_run.yaml b/pkg/schema/testdata/connector_run.yaml index 155d8f3..8bcd035 100644 --- a/pkg/schema/testdata/connector_run.yaml +++ b/pkg/schema/testdata/connector_run.yaml @@ -33,6 +33,96 @@ paths: x-cdk-run-version: '1' x-cdk-run-name: connectorStop x-cdk-run-doc: Stop a connector + /public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/pause: + put: + tags: + - connector + description: Pause a connector + operationId: pause-connector + parameters: + - name: cluster + in: path + required: true + schema: + type: string + - name: connectCluster + in: path + required: true + schema: + type: string + - name: connector-name + in: path + required: true + schema: + type: string + responses: + '204': + description: '' + security: + - httpAuth: [] + x-cdk-run-version: '1' + x-cdk-run-name: connectorPause + x-cdk-run-doc: Pause a connector + /public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/resume: + put: + tags: + - connector + description: Resume a paused connector + operationId: resume-connector + parameters: + - name: cluster + in: path + required: true + schema: + type: string + - name: connectCluster + in: path + required: true + schema: + type: string + - name: connector-name + in: path + required: true + schema: + type: string + responses: + '204': + description: '' + security: + - httpAuth: [] + x-cdk-run-version: '1' + x-cdk-run-name: connectorResume + x-cdk-run-doc: Resume a paused connector + /public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/restart: + put: + tags: + - connector + description: Restart a connector + operationId: restart-connector + parameters: + - name: cluster + in: path + required: true + schema: + type: string + - name: connectCluster + in: path + required: true + schema: + type: string + - name: connector-name + in: path + required: true + schema: + type: string + responses: + '204': + description: '' + security: + - httpAuth: [] + x-cdk-run-version: '1' + x-cdk-run-name: connectorRestart + x-cdk-run-doc: Restart a connector /public/kafka/v2/cluster/{cluster}/connect/{connectCluster}/connector/{connector-name}/offsets: get: tags: