diff --git a/fluss-flink/fluss-flink-tiering/src/README.md b/fluss-flink/fluss-flink-tiering/src/README.md index 2de96debcf..e3880d7061 100644 --- a/fluss-flink/fluss-flink-tiering/src/README.md +++ b/fluss-flink/fluss-flink-tiering/src/README.md @@ -18,7 +18,9 @@ # Fluss Flink Tiering -This module contains one class FlussLakeTiering. +This module provides the infrastructure for tiering Fluss data to lake formats (e.g., Apache Paimon), +consisting of FlussLakeTiering which encapsulates the core configuration and job graph logic, +and FlussLakeTieringEntrypoint which serves as the official Flink job main class and entrypoint. The reason for extracting it as a separate module is that: When executing the Flink jar job, a jar must be specified. If a `fluss-flink.jar` is specified, it may cause various classloader issues, as there are also `fluss-flink.jar` diff --git a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTiering.java b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTiering.java new file mode 100644 index 0000000000..340be397c6 --- /dev/null +++ b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTiering.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.flink.adapter.MultipleParameterToolAdapter; + +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.Map; +import java.util.ServiceLoader; + +import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME; +import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX; +import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix; +import static org.apache.fluss.utils.PropertiesUtils.extractPrefix; + +/** + * The entrypoint logic for building and launching a Fluss-to-Lake (e.g., Paimon) data tiering job. + * + *

This class is responsible for parsing configuration parameters, initializing the Flink + * execution environment, and coordinating the construction of the tiering pipeline. + * + *

Extensibility: Customization of Flink execution environment and configurations is supported + * through the {@link LakeTieringDecoratorPlugin} SPI mechanism. Different environments (e.g., + * internal vs. public cloud) can provide their own decorator implementations. + */ +public class FlussLakeTiering { + + private static final String FLUSS_CONF_PREFIX = "fluss."; + private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering."; + + protected final StreamExecutionEnvironment execEnv; + protected final String dataLake; + protected final Configuration flussConfig; + protected final Configuration lakeConfig; + protected final Configuration lakeTieringConfig; + + public FlussLakeTiering(String[] args) { + // parse params + final MultipleParameterToolAdapter params = MultipleParameterToolAdapter.fromArgs(args); + Map paramsMap = params.toMap(); + + // extract fluss config + Map flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX); + // we need to get bootstrap.servers + String bootstrapServers = flussConfigMap.get(ConfigOptions.BOOTSTRAP_SERVERS.key()); + if (bootstrapServers == null) { + throw new IllegalArgumentException( + String.format( + "The bootstrap server to fluss is not configured, please configure %s", + FLUSS_CONF_PREFIX + ConfigOptions.BOOTSTRAP_SERVERS.key())); + } + this.flussConfig = Configuration.fromMap(flussConfigMap); + + dataLake = paramsMap.get(ConfigOptions.DATALAKE_FORMAT.key()); + if (dataLake == null) { + throw new IllegalArgumentException( + ConfigOptions.DATALAKE_FORMAT.key() + " is not configured"); + } + + // extract lake config + Map lakeConfigMap = + extractAndRemovePrefix( + paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake)); + this.lakeConfig = Configuration.fromMap(lakeConfigMap); + + // extract tiering service config + Map lakeTieringConfigMap = + extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX); + this.lakeTieringConfig = Configuration.fromMap(lakeTieringConfigMap); + + // now, we must use full restart strategy if any task is failed, + // since committer is stateless, if tiering committer is failover, committer + // will lost the collected committable, and will never collect all committable to do commit + // todo: support region failover + org.apache.flink.configuration.Configuration flinkConfig = + new org.apache.flink.configuration.Configuration(); + flinkConfig.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, FULL_RESTART_STRATEGY_NAME); + + execEnv = StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig); + } + + protected void run() throws Exception { + // Load and apply all available decorator plugins + loadAndApplyDecoratorPlugins(); + + // build and run lake tiering job + JobClient jobClient = + LakeTieringJobBuilder.newBuilder( + execEnv, flussConfig, lakeConfig, lakeTieringConfig, dataLake) + .build(); + + System.out.printf( + "Starting data tiering service from Fluss to %s, jobId is %s.....%n", + dataLake, jobClient.getJobID()); + } + + /** + * Loads all available {@link LakeTieringDecoratorPlugin} implementations and applies their + * decorators in sequence. + * + *

All available plugins will be loaded and their decorators will be called in the order they + * are discovered by the ServiceLoader. This allows multiple decorators to be applied + * sequentially, where each decorator can further customize the Flink execution environment and + * configurations. + */ + protected void loadAndApplyDecoratorPlugins() { + ServiceLoader serviceLoader = + ServiceLoader.load( + LakeTieringDecoratorPlugin.class, + LakeTieringDecoratorPlugin.class.getClassLoader()); + for (LakeTieringDecoratorPlugin plugin : serviceLoader) { + String identifier = plugin.identifier(); + System.out.printf( + "Applying LakeTieringDecoratorPlugin with identifier: %s%n", identifier); + LakeTieringDecorator decorator = plugin.createLakeTieringDecorator(); + decorator.decorate(execEnv, flussConfig, lakeConfig, lakeTieringConfig, dataLake); + } + } +} diff --git a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java index a2ce6ce1af..04ce86af14 100644 --- a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java +++ b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java @@ -17,84 +17,10 @@ package org.apache.fluss.flink.tiering; -import org.apache.fluss.config.ConfigOptions; -import org.apache.fluss.config.Configuration; -import org.apache.fluss.flink.adapter.MultipleParameterToolAdapter; - -import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -import java.util.Map; - -import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME; -import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX; -import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix; -import static org.apache.fluss.utils.PropertiesUtils.extractPrefix; - /** The entrypoint for Flink to tier fluss data to lake format like paimon. */ public class FlussLakeTieringEntrypoint { - private static final String FLUSS_CONF_PREFIX = "fluss."; - private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering."; - public static void main(String[] args) throws Exception { - - // parse params - final MultipleParameterToolAdapter params = MultipleParameterToolAdapter.fromArgs(args); - Map paramsMap = params.toMap(); - - // extract fluss config - Map flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX); - // we need to get bootstrap.servers - String bootstrapServers = flussConfigMap.get(ConfigOptions.BOOTSTRAP_SERVERS.key()); - if (bootstrapServers == null) { - throw new IllegalArgumentException( - String.format( - "The bootstrap server to fluss is not configured, please configure %s", - FLUSS_CONF_PREFIX + ConfigOptions.BOOTSTRAP_SERVERS.key())); - } - flussConfigMap.put(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers); - - String dataLake = paramsMap.get(ConfigOptions.DATALAKE_FORMAT.key()); - if (dataLake == null) { - throw new IllegalArgumentException( - ConfigOptions.DATALAKE_FORMAT.key() + " is not configured"); - } - - // extract lake config - Map lakeConfigMap = - extractAndRemovePrefix( - paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake)); - - // extract tiering service config - Map lakeTieringConfigMap = - extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX); - - // now, we must use full restart strategy if any task is failed, - // since committer is stateless, if tiering committer is failover, committer - // will lost the collected committable, and will never collect all committable to do commit - // todo: support region failover - org.apache.flink.configuration.Configuration flinkConfig = - new org.apache.flink.configuration.Configuration(); - flinkConfig.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, FULL_RESTART_STRATEGY_NAME); - - // build tiering source - final StreamExecutionEnvironment execEnv = - StreamExecutionEnvironment.getExecutionEnvironment(flinkConfig); - - // build lake tiering job - JobClient jobClient = - LakeTieringJobBuilder.newBuilder( - execEnv, - Configuration.fromMap(flussConfigMap), - Configuration.fromMap(lakeConfigMap), - Configuration.fromMap(lakeTieringConfigMap), - dataLake) - .build(); - - System.out.printf( - "Starting data tiering service from Fluss to %s, jobId is %s.....%n", - dataLake, jobClient.getJobID()); + new FlussLakeTiering(args).run(); } } diff --git a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/LakeTieringDecorator.java b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/LakeTieringDecorator.java new file mode 100644 index 0000000000..c47d2e1e03 --- /dev/null +++ b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/LakeTieringDecorator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.config.Configuration; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * Interface for customizing Flink execution environment and configurations for lake tiering jobs. + * + *

Implementations of this interface can customize the Flink execution environment and + * configurations as needed for specific deployment environments (e.g., injecting internal security + * tokens, setting environment-specific configurations). + * + * @since 0.9 + */ +@PublicEvolving +public interface LakeTieringDecorator { + + /** + * Customizes the Flink execution environment and configurations for the lake tiering job. + * + *

This method is called before building the tiering job, allowing implementations to modify + * the Flink execution environment or any of the provided configurations as needed. + * + * @param env the Flink StreamExecutionEnvironment to customize + * @param flussConfig the Fluss configuration (may be modified) + * @param dataLakeConfig the data lake configuration (may be modified) + * @param lakeTieringConfig the lake tiering configuration (may be modified) + * @param dataLakeFormat the data lake format identifier (e.g., "paimon", "iceberg") + */ + void decorate( + StreamExecutionEnvironment env, + Configuration flussConfig, + Configuration dataLakeConfig, + Configuration lakeTieringConfig, + String dataLakeFormat); +} diff --git a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/LakeTieringDecoratorPlugin.java b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/LakeTieringDecoratorPlugin.java new file mode 100644 index 0000000000..59a1626ead --- /dev/null +++ b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/LakeTieringDecoratorPlugin.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.plugin.Plugin; + +/** + * A Plugin to create instances of {@link LakeTieringDecorator}. + * + *

This plugin mechanism allows different environments (e.g., internal vs. public cloud) to + * provide their own decorator implementations for customizing Flink execution environment. + * + *

Multiple different plugin implementations can be loaded and applied simultaneously. However, + * the loading order of plugins is not guaranteed and may vary between runs. Plugin implementations + * should not depend on the loading order and should be designed to work correctly regardless of + * when they are applied relative to other plugins. + * + * @since 0.9 + */ +@PublicEvolving +public interface LakeTieringDecoratorPlugin extends Plugin { + + /** + * Returns a unique identifier among {@link LakeTieringDecoratorPlugin} implementations. + * + * @return the identifier + */ + String identifier(); + + /** + * Creates a new instance of {@link LakeTieringDecorator}. + * + * @return the lake tiering decorator instance + */ + LakeTieringDecorator createLakeTieringDecorator(); +} diff --git a/fluss-flink/fluss-flink-tiering/src/test/java/org/apache/fluss/flink/tiering/FlussLakeTieringTest.java b/fluss-flink/fluss-flink-tiering/src/test/java/org/apache/fluss/flink/tiering/FlussLakeTieringTest.java new file mode 100644 index 0000000000..ee69004287 --- /dev/null +++ b/fluss-flink/fluss-flink-tiering/src/test/java/org/apache/fluss/flink/tiering/FlussLakeTieringTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link FlussLakeTiering}. */ +class FlussLakeTieringTest { + + @Test + void testMultipleDecoratorsApplied() { + String[] args = { + "--fluss.bootstrap.servers", + "localhost:9123", + "--datalake.paimon.metastore", + "rest", + "--datalake.paimon.warehous", + "fluss_test", + "--datalake.format", + "paimon", + }; + FlussLakeTiering tiering = new FlussLakeTiering(args); + + // Apply decorators - method now uses internal Configuration fields + tiering.loadAndApplyDecoratorPlugins(); + + // Verify that TestLakeTieringDecorator modified the configurations + assertThat(tiering.flussConfig.getRawValue(TestLakeTieringDecorator.TEST_FLUSS_CONFIG_KEY)) + .isPresent() + .contains(TestLakeTieringDecorator.TEST_FLUSS_CONFIG_VALUE); + assertThat( + tiering.lakeConfig.getRawValue( + TestLakeTieringDecorator.TEST_DATA_LAKE_CONFIG_KEY)) + .isPresent() + .contains(TestLakeTieringDecorator.TEST_DATA_LAKE_CONFIG_VALUE); + assertThat( + tiering.lakeTieringConfig.getRawValue( + TestLakeTieringDecorator.TEST_LAKE_TIERING_CONFIG_KEY)) + .isPresent() + .contains(TestLakeTieringDecorator.TEST_LAKE_TIERING_CONFIG_VALUE); + + // Verify that TestLakeTieringDecorator2 also modified the configurations + assertThat(tiering.flussConfig.getRawValue(TestLakeTieringDecorator2.TEST_FLUSS_CONFIG_KEY)) + .isPresent() + .contains(TestLakeTieringDecorator2.TEST_FLUSS_CONFIG_VALUE); + assertThat( + tiering.lakeConfig.getRawValue( + TestLakeTieringDecorator2.TEST_DATA_LAKE_CONFIG_KEY)) + .isPresent() + .contains(TestLakeTieringDecorator2.TEST_DATA_LAKE_CONFIG_VALUE); + assertThat( + tiering.lakeTieringConfig.getRawValue( + TestLakeTieringDecorator2.TEST_LAKE_TIERING_CONFIG_KEY)) + .isPresent() + .contains(TestLakeTieringDecorator2.TEST_LAKE_TIERING_CONFIG_VALUE); + } +} diff --git a/fluss-flink/fluss-flink-tiering/src/test/java/org/apache/fluss/flink/tiering/TestLakeTieringDecorator.java b/fluss-flink/fluss-flink-tiering/src/test/java/org/apache/fluss/flink/tiering/TestLakeTieringDecorator.java new file mode 100644 index 0000000000..023d717439 --- /dev/null +++ b/fluss-flink/fluss-flink-tiering/src/test/java/org/apache/fluss/flink/tiering/TestLakeTieringDecorator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +import org.apache.fluss.config.Configuration; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** Test implementation of {@link LakeTieringDecorator}. */ +class TestLakeTieringDecorator implements LakeTieringDecorator { + + static final String TEST_FLUSS_CONFIG_KEY = "test.decorator1.fluss.config"; + static final String TEST_FLUSS_CONFIG_VALUE = "decorator1-value"; + static final String TEST_DATA_LAKE_CONFIG_KEY = "test.decorator1.datalake.config"; + static final String TEST_DATA_LAKE_CONFIG_VALUE = "decorator1-datalake-value"; + static final String TEST_LAKE_TIERING_CONFIG_KEY = "test.decorator1.tiering.config"; + static final String TEST_LAKE_TIERING_CONFIG_VALUE = "decorator1-tiering-value"; + + @Override + public void decorate( + StreamExecutionEnvironment env, + Configuration flussConfig, + Configuration dataLakeConfig, + Configuration lakeTieringConfig, + String dataLakeFormat) { + // Modify configurations to verify decorator is called + flussConfig.setString(TEST_FLUSS_CONFIG_KEY, TEST_FLUSS_CONFIG_VALUE); + dataLakeConfig.setString(TEST_DATA_LAKE_CONFIG_KEY, TEST_DATA_LAKE_CONFIG_VALUE); + lakeTieringConfig.setString(TEST_LAKE_TIERING_CONFIG_KEY, TEST_LAKE_TIERING_CONFIG_VALUE); + } +} diff --git a/fluss-flink/fluss-flink-tiering/src/test/java/org/apache/fluss/flink/tiering/TestLakeTieringDecorator2.java b/fluss-flink/fluss-flink-tiering/src/test/java/org/apache/fluss/flink/tiering/TestLakeTieringDecorator2.java new file mode 100644 index 0000000000..cd3860a432 --- /dev/null +++ b/fluss-flink/fluss-flink-tiering/src/test/java/org/apache/fluss/flink/tiering/TestLakeTieringDecorator2.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +import org.apache.fluss.config.Configuration; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** Test implementation of {@link LakeTieringDecorator} for testing multiple decorators. */ +class TestLakeTieringDecorator2 implements LakeTieringDecorator { + + static final String TEST_FLUSS_CONFIG_KEY = "test.decorator2.fluss.config"; + static final String TEST_FLUSS_CONFIG_VALUE = "decorator2-value"; + static final String TEST_DATA_LAKE_CONFIG_KEY = "test.decorator2.datalake.config"; + static final String TEST_DATA_LAKE_CONFIG_VALUE = "decorator2-datalake-value"; + static final String TEST_LAKE_TIERING_CONFIG_KEY = "test.decorator2.tiering.config"; + static final String TEST_LAKE_TIERING_CONFIG_VALUE = "decorator2-tiering-value"; + + @Override + public void decorate( + StreamExecutionEnvironment env, + Configuration flussConfig, + Configuration dataLakeConfig, + Configuration lakeTieringConfig, + String dataLakeFormat) { + // Modify configurations to verify decorator is called + flussConfig.setString(TEST_FLUSS_CONFIG_KEY, TEST_FLUSS_CONFIG_VALUE); + dataLakeConfig.setString(TEST_DATA_LAKE_CONFIG_KEY, TEST_DATA_LAKE_CONFIG_VALUE); + lakeTieringConfig.setString(TEST_LAKE_TIERING_CONFIG_KEY, TEST_LAKE_TIERING_CONFIG_VALUE); + } +} diff --git a/fluss-flink/fluss-flink-tiering/src/test/java/org/apache/fluss/flink/tiering/TestLakeTieringDecoratorPlugin.java b/fluss-flink/fluss-flink-tiering/src/test/java/org/apache/fluss/flink/tiering/TestLakeTieringDecoratorPlugin.java new file mode 100644 index 0000000000..7ae610223a --- /dev/null +++ b/fluss-flink/fluss-flink-tiering/src/test/java/org/apache/fluss/flink/tiering/TestLakeTieringDecoratorPlugin.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +/** Test implementation of {@link LakeTieringDecoratorPlugin}. */ +public class TestLakeTieringDecoratorPlugin implements LakeTieringDecoratorPlugin { + + @Override + public String identifier() { + return "test-decorator-1"; + } + + @Override + public LakeTieringDecorator createLakeTieringDecorator() { + return new TestLakeTieringDecorator(); + } +} diff --git a/fluss-flink/fluss-flink-tiering/src/test/java/org/apache/fluss/flink/tiering/TestLakeTieringDecoratorPlugin2.java b/fluss-flink/fluss-flink-tiering/src/test/java/org/apache/fluss/flink/tiering/TestLakeTieringDecoratorPlugin2.java new file mode 100644 index 0000000000..ab579e7989 --- /dev/null +++ b/fluss-flink/fluss-flink-tiering/src/test/java/org/apache/fluss/flink/tiering/TestLakeTieringDecoratorPlugin2.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.tiering; + +/** Test implementation of {@link LakeTieringDecoratorPlugin} for testing multiple plugins. */ +public class TestLakeTieringDecoratorPlugin2 implements LakeTieringDecoratorPlugin { + + @Override + public String identifier() { + return "test-decorator-2"; + } + + @Override + public LakeTieringDecorator createLakeTieringDecorator() { + return new TestLakeTieringDecorator2(); + } +} diff --git a/fluss-flink/fluss-flink-tiering/src/test/resources/META-INF/services/org.apache.fluss.flink.tiering.LakeTieringDecoratorPlugin b/fluss-flink/fluss-flink-tiering/src/test/resources/META-INF/services/org.apache.fluss.flink.tiering.LakeTieringDecoratorPlugin new file mode 100644 index 0000000000..71338224b7 --- /dev/null +++ b/fluss-flink/fluss-flink-tiering/src/test/resources/META-INF/services/org.apache.fluss.flink.tiering.LakeTieringDecoratorPlugin @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.fluss.flink.tiering.TestLakeTieringDecoratorPlugin +org.apache.fluss.flink.tiering.TestLakeTieringDecoratorPlugin2 diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 16e942860e..2e7d79c504 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -465,6 +465,7 @@ org.apache.fluss.flink.tiering.LakeTieringJobBuilder org.apache.fluss.flink.tiering.FlussLakeTieringEntrypoint + org.apache.fluss.flink.tiering.FlussLakeTiering org.apache.flink.table.catalog.*