From 79af15a8732bb9b129440b9a3d74710d73e1d3e0 Mon Sep 17 00:00:00 2001 From: Nilesh Kumar Date: Mon, 6 Apr 2026 22:14:36 -0500 Subject: [PATCH 1/5] KAFKA-20389: Add 'internal' flag to ConfigKeyInfo --- .../kafka/connect/runtime/AbstractHerder.java | 3 ++- .../runtime/rest/entities/ConfigKeyInfo.java | 3 ++- .../ConnectorPluginsResourceTest.java | 23 +++++++++++++++---- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index d5d6edf703e40..59ddf400c0d3c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -1013,7 +1013,8 @@ private static ConfigKeyInfo convertConfigKey(ConfigKey configKey, String prefix String width = configKey.width.name(); String displayName = configKey.displayName; List dependents = configKey.dependents; - return new ConfigKeyInfo(name, typeName, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents); + boolean internal = configKey.internalConfig; + return new ConfigKeyInfo(name, typeName, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents, internal); } private static ConfigValueInfo convertConfigValue(ConfigValue configValue, Type type) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java index 2d3a3f93be151..b44e39cf03672 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java @@ -31,6 +31,7 @@ public record ConfigKeyInfo( @JsonProperty("order_in_group") int orderInGroup, @JsonProperty("width") String width, @JsonProperty("display_name") String displayName, - @JsonProperty("dependents") List dependents + @JsonProperty("dependents") List dependents, + @JsonProperty("internal") boolean internal ) { } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index 19c9c9e006f98..a800fbef71542 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -88,6 +88,7 @@ import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_HEALTH_CHECK_TIMEOUT_MS; import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -170,25 +171,25 @@ public class ConnectorPluginsResourceTest { List configs = new LinkedList<>(result.configs()); List partialConfigs = new LinkedList<>(partialResult.configs()); - ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true, null, "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config", List.of()); + ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true, null, "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config", List.of(), false); ConfigValueInfo configValueInfo = new ConfigValueInfo("test.string.config", "testString", List.of(), List.of(), true); ConfigInfo configInfo = new ConfigInfo(configKeyInfo, configValueInfo); configs.add(configInfo); partialConfigs.add(configInfo); - configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, null, "MEDIUM", "Test configuration for integer type.", "Test", 1, "MEDIUM", "test.int.config", List.of()); + configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, null, "MEDIUM", "Test configuration for integer type.", "Test", 1, "MEDIUM", "test.int.config", List.of(), false); configValueInfo = new ConfigValueInfo("test.int.config", "1", List.of("1", "2", "3"), List.of(), true); configInfo = new ConfigInfo(configKeyInfo, configValueInfo); configs.add(configInfo); partialConfigs.add(configInfo); - configKeyInfo = new ConfigKeyInfo("test.string.config.default", "STRING", false, "", "LOW", "Test configuration with default value.", null, -1, "NONE", "test.string.config.default", List.of()); + configKeyInfo = new ConfigKeyInfo("test.string.config.default", "STRING", false, "", "LOW", "Test configuration with default value.", null, -1, "NONE", "test.string.config.default", List.of(), false); configValueInfo = new ConfigValueInfo("test.string.config.default", "", List.of(), List.of(), true); configInfo = new ConfigInfo(configKeyInfo, configValueInfo); configs.add(configInfo); partialConfigs.add(configInfo); - configKeyInfo = new ConfigKeyInfo("test.list.config", "LIST", true, null, "HIGH", "Test configuration for list type.", "Test", 2, "LONG", "test.list.config", List.of()); + configKeyInfo = new ConfigKeyInfo("test.list.config", "LIST", true, null, "HIGH", "Test configuration for list type.", "Test", 2, "LONG", "test.list.config", List.of(), false); configValueInfo = new ConfigValueInfo("test.list.config", "a,b", List.of("a", "b", "c"), List.of(), true); configInfo = new ConfigInfo(configKeyInfo, configValueInfo); configs.add(configInfo); @@ -455,6 +456,16 @@ public void testGetConnectorConfigDef() { Optional cki = connectorConfigDef.stream().filter(c -> c.name().equals(config)).findFirst(); assertTrue(cki.isPresent()); } + + Optional internalCki = connectorConfigDef.stream() + .filter(c -> c.name().equals(ConnectorPluginsResourceTestConnector.TEST_INTERNAL_CONFIG)).findFirst(); + assertTrue(internalCki.isPresent()); + assertTrue(internalCki.get().internal()); + + Optional nonInternalCki = connectorConfigDef.stream() + .filter(c -> c.name().equals(ConnectorPluginsResourceTestConnector.TEST_STRING_CONFIG)).findFirst(); + assertTrue(nonInternalCki.isPresent()); + assertFalse(nonInternalCki.get().internal()); } /* Name here needs to be unique as we are testing the aliasing mechanism */ @@ -464,13 +475,15 @@ public static class ConnectorPluginsResourceTestConnector extends SourceConnecto private static final String TEST_INT_CONFIG = "test.int.config"; private static final String TEST_STRING_CONFIG_DEFAULT = "test.string.config.default"; private static final String TEST_LIST_CONFIG = "test.list.config"; + private static final String TEST_INTERNAL_CONFIG = "test.internal.config"; private static final String GROUP = "Test"; private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(TEST_STRING_CONFIG, Type.STRING, Importance.HIGH, "Test configuration for string type.") .define(TEST_INT_CONFIG, Type.INT, Importance.MEDIUM, "Test configuration for integer type.", GROUP, 1, Width.MEDIUM, TEST_INT_CONFIG, new IntegerRecommender()) .define(TEST_STRING_CONFIG_DEFAULT, Type.STRING, "", Importance.LOW, "Test configuration with default value.") - .define(TEST_LIST_CONFIG, Type.LIST, Importance.HIGH, "Test configuration for list type.", GROUP, 2, Width.LONG, TEST_LIST_CONFIG, new ListRecommender()); + .define(TEST_LIST_CONFIG, Type.LIST, Importance.HIGH, "Test configuration for list type.", GROUP, 2, Width.LONG, TEST_LIST_CONFIG, new ListRecommender()) + .defineInternal(TEST_INTERNAL_CONFIG, Type.STRING, "", Importance.LOW); @Override public String version() { From 439d352affb122b1081bc073d04a8ecd20cafc41 Mon Sep 17 00:00:00 2001 From: Nilesh Kumar Date: Thu, 9 Apr 2026 06:33:25 -0500 Subject: [PATCH 2/5] Revert "KAFKA-20389: Add 'internal' flag to ConfigKeyInfo" This reverts commit 79af15a8732bb9b129440b9a3d74710d73e1d3e0. --- .../kafka/connect/runtime/AbstractHerder.java | 3 +-- .../runtime/rest/entities/ConfigKeyInfo.java | 3 +-- .../ConnectorPluginsResourceTest.java | 23 ++++--------------- 3 files changed, 7 insertions(+), 22 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 59ddf400c0d3c..d5d6edf703e40 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -1013,8 +1013,7 @@ private static ConfigKeyInfo convertConfigKey(ConfigKey configKey, String prefix String width = configKey.width.name(); String displayName = configKey.displayName; List dependents = configKey.dependents; - boolean internal = configKey.internalConfig; - return new ConfigKeyInfo(name, typeName, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents, internal); + return new ConfigKeyInfo(name, typeName, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents); } private static ConfigValueInfo convertConfigValue(ConfigValue configValue, Type type) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java index b44e39cf03672..2d3a3f93be151 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java @@ -31,7 +31,6 @@ public record ConfigKeyInfo( @JsonProperty("order_in_group") int orderInGroup, @JsonProperty("width") String width, @JsonProperty("display_name") String displayName, - @JsonProperty("dependents") List dependents, - @JsonProperty("internal") boolean internal + @JsonProperty("dependents") List dependents ) { } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index a800fbef71542..19c9c9e006f98 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -88,7 +88,6 @@ import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_HEALTH_CHECK_TIMEOUT_MS; import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -171,25 +170,25 @@ public class ConnectorPluginsResourceTest { List configs = new LinkedList<>(result.configs()); List partialConfigs = new LinkedList<>(partialResult.configs()); - ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true, null, "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config", List.of(), false); + ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true, null, "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config", List.of()); ConfigValueInfo configValueInfo = new ConfigValueInfo("test.string.config", "testString", List.of(), List.of(), true); ConfigInfo configInfo = new ConfigInfo(configKeyInfo, configValueInfo); configs.add(configInfo); partialConfigs.add(configInfo); - configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, null, "MEDIUM", "Test configuration for integer type.", "Test", 1, "MEDIUM", "test.int.config", List.of(), false); + configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, null, "MEDIUM", "Test configuration for integer type.", "Test", 1, "MEDIUM", "test.int.config", List.of()); configValueInfo = new ConfigValueInfo("test.int.config", "1", List.of("1", "2", "3"), List.of(), true); configInfo = new ConfigInfo(configKeyInfo, configValueInfo); configs.add(configInfo); partialConfigs.add(configInfo); - configKeyInfo = new ConfigKeyInfo("test.string.config.default", "STRING", false, "", "LOW", "Test configuration with default value.", null, -1, "NONE", "test.string.config.default", List.of(), false); + configKeyInfo = new ConfigKeyInfo("test.string.config.default", "STRING", false, "", "LOW", "Test configuration with default value.", null, -1, "NONE", "test.string.config.default", List.of()); configValueInfo = new ConfigValueInfo("test.string.config.default", "", List.of(), List.of(), true); configInfo = new ConfigInfo(configKeyInfo, configValueInfo); configs.add(configInfo); partialConfigs.add(configInfo); - configKeyInfo = new ConfigKeyInfo("test.list.config", "LIST", true, null, "HIGH", "Test configuration for list type.", "Test", 2, "LONG", "test.list.config", List.of(), false); + configKeyInfo = new ConfigKeyInfo("test.list.config", "LIST", true, null, "HIGH", "Test configuration for list type.", "Test", 2, "LONG", "test.list.config", List.of()); configValueInfo = new ConfigValueInfo("test.list.config", "a,b", List.of("a", "b", "c"), List.of(), true); configInfo = new ConfigInfo(configKeyInfo, configValueInfo); configs.add(configInfo); @@ -456,16 +455,6 @@ public void testGetConnectorConfigDef() { Optional cki = connectorConfigDef.stream().filter(c -> c.name().equals(config)).findFirst(); assertTrue(cki.isPresent()); } - - Optional internalCki = connectorConfigDef.stream() - .filter(c -> c.name().equals(ConnectorPluginsResourceTestConnector.TEST_INTERNAL_CONFIG)).findFirst(); - assertTrue(internalCki.isPresent()); - assertTrue(internalCki.get().internal()); - - Optional nonInternalCki = connectorConfigDef.stream() - .filter(c -> c.name().equals(ConnectorPluginsResourceTestConnector.TEST_STRING_CONFIG)).findFirst(); - assertTrue(nonInternalCki.isPresent()); - assertFalse(nonInternalCki.get().internal()); } /* Name here needs to be unique as we are testing the aliasing mechanism */ @@ -475,15 +464,13 @@ public static class ConnectorPluginsResourceTestConnector extends SourceConnecto private static final String TEST_INT_CONFIG = "test.int.config"; private static final String TEST_STRING_CONFIG_DEFAULT = "test.string.config.default"; private static final String TEST_LIST_CONFIG = "test.list.config"; - private static final String TEST_INTERNAL_CONFIG = "test.internal.config"; private static final String GROUP = "Test"; private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(TEST_STRING_CONFIG, Type.STRING, Importance.HIGH, "Test configuration for string type.") .define(TEST_INT_CONFIG, Type.INT, Importance.MEDIUM, "Test configuration for integer type.", GROUP, 1, Width.MEDIUM, TEST_INT_CONFIG, new IntegerRecommender()) .define(TEST_STRING_CONFIG_DEFAULT, Type.STRING, "", Importance.LOW, "Test configuration with default value.") - .define(TEST_LIST_CONFIG, Type.LIST, Importance.HIGH, "Test configuration for list type.", GROUP, 2, Width.LONG, TEST_LIST_CONFIG, new ListRecommender()) - .defineInternal(TEST_INTERNAL_CONFIG, Type.STRING, "", Importance.LOW); + .define(TEST_LIST_CONFIG, Type.LIST, Importance.HIGH, "Test configuration for list type.", GROUP, 2, Width.LONG, TEST_LIST_CONFIG, new ListRecommender()); @Override public String version() { From 7117b4c9f4f79ec3186125f290717e3cd9b9facc Mon Sep 17 00:00:00 2001 From: Nilesh Kumar Date: Thu, 9 Apr 2026 08:12:24 -0500 Subject: [PATCH 3/5] filtering internal config from the response --- .../kafka/connect/runtime/AbstractHerder.java | 4 +- .../apache/kafka/connect/runtime/Herder.java | 4 +- .../connect/runtime/AbstractHerderTest.java | 17 +++++ .../SourceConnectorWithInternalConfig.java | 63 +++++++++++++++++++ 4 files changed, 85 insertions(+), 3 deletions(-) create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorWithInternalConfig.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index d5d6edf703e40..8cf5838609a62 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -1217,7 +1217,9 @@ public List connectorPluginConfig(String pluginName, VersionRange List results = new ArrayList<>(); for (ConfigKey configKey : configsMap.values()) { - results.add(AbstractHerder.convertConfigKey(configKey)); + if (!configKey.internalConfig) { + results.add(AbstractHerder.convertConfigKey(configKey)); + } } return results; } catch (ClassNotFoundException e) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index a1a1505c98320..b22ae4b587c1d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -317,9 +317,9 @@ default void validateConnectorConfig(Map connectorConfig, Callba /** - * Returns the configuration of a plugin + * Returns the configuration of a plugin. Internal configuration keys are omitted. * @param pluginName the name of the plugin - * @return the list of ConfigKeyInfo of the plugin + * @return the list of ConfigKeyInfo for non-internal keys of the plugin */ List connectorPluginConfig(String pluginName); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 02caa21812f6b..c2fd4ee18c51a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -1141,6 +1141,23 @@ public void testTransformationPluginConfig() throws ClassNotFoundException { ); } + @Test + public void testConnectorPluginConfigOmitsInternalKeys() throws Exception { + String pluginName = "source-with-internal"; + Class pluginClass = Class.forName("org.apache.kafka.connect.runtime.SourceConnectorWithInternalConfig"); + AbstractHerder herder = testHerder(); + + when(plugins.pluginClass(pluginName, null)).then(invocation -> pluginClass); + when(plugins.newPlugin(anyString(), any())).then(invocation -> pluginClass.getDeclaredConstructor().newInstance()); + when(herder.plugins()).thenReturn(plugins); + + List configs = herder.connectorPluginConfig(pluginName); + assertTrue(configs.stream().anyMatch(c -> c.name().equals("required"))); + // Same value as SourceConnectorWithInternalConfig.INTERNAL_ONLY_CONFIG_KEY (avoid type ref for checkstyle CDA) + assertFalse(configs.stream().anyMatch(c -> c.name().equals("internal.only.config"))); + verify(plugins).withClassLoader(pluginClass.getClassLoader()); + } + private void testConnectorPluginConfig( String pluginName, Supplier newPluginInstance, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorWithInternalConfig.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorWithInternalConfig.java new file mode 100644 index 0000000000000..7dedd06644708 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorWithInternalConfig.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.source.SourceConnector; + +import java.util.List; +import java.util.Map; + +/** Test-only source connector including one internal {@link org.apache.kafka.common.config.ConfigDef} key. */ +public final class SourceConnectorWithInternalConfig extends SourceConnector { + + public static final String INTERNAL_ONLY_CONFIG_KEY = "internal.only.config"; + + private static final ConfigDef CONFIG = new ConfigDef() + .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs") + .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs") + .defineInternal(INTERNAL_ONLY_CONFIG_KEY, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW); + + @Override + public String version() { + return "test"; + } + + @Override + public void start(Map props) { + } + + @Override + public Class taskClass() { + return null; + } + + @Override + public List> taskConfigs(int maxTasks) { + return List.of(); + } + + @Override + public void stop() { + } + + @Override + public ConfigDef config() { + return CONFIG; + } +} From 407026c916627569e789df4c9399812cc4a83eff Mon Sep 17 00:00:00 2001 From: Nilesh Kumar Date: Thu, 9 Apr 2026 08:15:24 -0500 Subject: [PATCH 4/5] nit changes --- .../java/org/apache/kafka/connect/runtime/AbstractHerder.java | 1 + .../org/apache/kafka/connect/runtime/AbstractHerderTest.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 8cf5838609a62..37c6fb0879129 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -1217,6 +1217,7 @@ public List connectorPluginConfig(String pluginName, VersionRange List results = new ArrayList<>(); for (ConfigKey configKey : configsMap.values()) { + // internal config keys should not be exposed if (!configKey.internalConfig) { results.add(AbstractHerder.convertConfigKey(configKey)); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index c2fd4ee18c51a..e80edbabda11e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -1153,7 +1153,7 @@ public void testConnectorPluginConfigOmitsInternalKeys() throws Exception { List configs = herder.connectorPluginConfig(pluginName); assertTrue(configs.stream().anyMatch(c -> c.name().equals("required"))); - // Same value as SourceConnectorWithInternalConfig.INTERNAL_ONLY_CONFIG_KEY (avoid type ref for checkstyle CDA) + // any internal config keys should not be exposed assertFalse(configs.stream().anyMatch(c -> c.name().equals("internal.only.config"))); verify(plugins).withClassLoader(pluginClass.getClassLoader()); } From b1c648137091d3daec9cee2c14f845628aae6728 Mon Sep 17 00:00:00 2001 From: Nilesh Kumar Date: Thu, 9 Apr 2026 08:25:29 -0500 Subject: [PATCH 5/5] nit comment addition --- .../main/java/org/apache/kafka/connect/runtime/Herder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index b22ae4b587c1d..30a0d72e6d6ef 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -317,9 +317,9 @@ default void validateConnectorConfig(Map connectorConfig, Callba /** - * Returns the configuration of a plugin. Internal configuration keys are omitted. + * Returns the non-internal configuration of a plugin. * @param pluginName the name of the plugin - * @return the list of ConfigKeyInfo for non-internal keys of the plugin + * @return a list of {@link ConfigKeyInfo}, one per non-internal config key of the plugin */ List connectorPluginConfig(String pluginName);