diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index d5d6edf703e40..37c6fb0879129 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -1217,7 +1217,10 @@ public List connectorPluginConfig(String pluginName, VersionRange List results = new ArrayList<>(); for (ConfigKey configKey : configsMap.values()) { - results.add(AbstractHerder.convertConfigKey(configKey)); + // internal config keys should not be exposed + if (!configKey.internalConfig) { + results.add(AbstractHerder.convertConfigKey(configKey)); + } } return results; } catch (ClassNotFoundException e) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index a1a1505c98320..30a0d72e6d6ef 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -317,9 +317,9 @@ default void validateConnectorConfig(Map connectorConfig, Callba /** - * Returns the configuration of a plugin + * Returns the non-internal configuration of a plugin. * @param pluginName the name of the plugin - * @return the list of ConfigKeyInfo of the plugin + * @return a list of {@link ConfigKeyInfo}, one per non-internal config key of the plugin */ List connectorPluginConfig(String pluginName); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 02caa21812f6b..e80edbabda11e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -1141,6 +1141,23 @@ public void testTransformationPluginConfig() throws ClassNotFoundException { ); } + @Test + public void testConnectorPluginConfigOmitsInternalKeys() throws Exception { + String pluginName = "source-with-internal"; + Class pluginClass = Class.forName("org.apache.kafka.connect.runtime.SourceConnectorWithInternalConfig"); + AbstractHerder herder = testHerder(); + + when(plugins.pluginClass(pluginName, null)).then(invocation -> pluginClass); + when(plugins.newPlugin(anyString(), any())).then(invocation -> pluginClass.getDeclaredConstructor().newInstance()); + when(herder.plugins()).thenReturn(plugins); + + List configs = herder.connectorPluginConfig(pluginName); + assertTrue(configs.stream().anyMatch(c -> c.name().equals("required"))); + // any internal config keys should not be exposed + assertFalse(configs.stream().anyMatch(c -> c.name().equals("internal.only.config"))); + verify(plugins).withClassLoader(pluginClass.getClassLoader()); + } + private void testConnectorPluginConfig( String pluginName, Supplier newPluginInstance, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorWithInternalConfig.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorWithInternalConfig.java new file mode 100644 index 0000000000000..7dedd06644708 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorWithInternalConfig.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.source.SourceConnector; + +import java.util.List; +import java.util.Map; + +/** Test-only source connector including one internal {@link org.apache.kafka.common.config.ConfigDef} key. */ +public final class SourceConnectorWithInternalConfig extends SourceConnector { + + public static final String INTERNAL_ONLY_CONFIG_KEY = "internal.only.config"; + + private static final ConfigDef CONFIG = new ConfigDef() + .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs") + .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs") + .defineInternal(INTERNAL_ONLY_CONFIG_KEY, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW); + + @Override + public String version() { + return "test"; + } + + @Override + public void start(Map props) { + } + + @Override + public Class taskClass() { + return null; + } + + @Override + public List> taskConfigs(int maxTasks) { + return List.of(); + } + + @Override + public void stop() { + } + + @Override + public ConfigDef config() { + return CONFIG; + } +}