Skip to content

Commit a4e7443

Browse files
committed
Introduce LakeTieringDecoratorPlugin to make it pluggable
1 parent efcc2f9 commit a4e7443

9 files changed

Lines changed: 406 additions & 15 deletions

File tree

fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTiering.java

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2828

2929
import java.util.Map;
30+
import java.util.ServiceLoader;
3031

3132
import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME;
3233
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX;
@@ -39,10 +40,10 @@
3940
* <p>This class is responsible for parsing configuration parameters, initializing the Flink
4041
* execution environment, and coordinating the construction of the tiering pipeline.
4142
*
42-
* <p>Design Motivation: By decoupling the logic from {@link FlussLakeTieringEntrypoint} into this
43-
* class, extensibility is significantly improved. Developers can now extend this class to customize
44-
* configuration extraction (e.g., injecting internal security tokens) without duplicating the core
45-
* entrypoint boilerplate.
43+
* <p>Extensibility: Customization of Flink execution environment and configurations is supported
44+
* through the {@link LakeTieringDecoratorPlugin} SPI mechanism. Different environments (e.g.,
45+
* internal vs. public cloud) can provide their own decorator implementations without touching core
46+
* code, ensuring better maintainability, testability, and forward compatibility.
4647
*/
4748
public class FlussLakeTiering {
4849

@@ -51,17 +52,17 @@ public class FlussLakeTiering {
5152

5253
protected final StreamExecutionEnvironment execEnv;
5354
protected final String dataLake;
54-
protected final Map<String, String> flussConfigMap;
55-
protected final Map<String, String> lakeConfigMap;
56-
protected final Map<String, String> lakeTieringConfigMap;
55+
protected final Configuration flussConfig;
56+
protected final Configuration lakeConfig;
57+
protected final Configuration lakeTieringConfig;
5758

5859
public FlussLakeTiering(String[] args) {
5960
// parse params
6061
final MultipleParameterToolAdapter params = MultipleParameterToolAdapter.fromArgs(args);
6162
Map<String, String> paramsMap = params.toMap();
6263

6364
// extract fluss config
64-
flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX);
65+
Map<String, String> flussConfigMap = extractAndRemovePrefix(paramsMap, FLUSS_CONF_PREFIX);
6566
// we need to get bootstrap.servers
6667
String bootstrapServers = flussConfigMap.get(ConfigOptions.BOOTSTRAP_SERVERS.key());
6768
if (bootstrapServers == null) {
@@ -70,6 +71,7 @@ public FlussLakeTiering(String[] args) {
7071
"The bootstrap server to fluss is not configured, please configure %s",
7172
FLUSS_CONF_PREFIX + ConfigOptions.BOOTSTRAP_SERVERS.key()));
7273
}
74+
this.flussConfig = Configuration.fromMap(flussConfigMap);
7375

7476
dataLake = paramsMap.get(ConfigOptions.DATALAKE_FORMAT.key());
7577
if (dataLake == null) {
@@ -78,12 +80,15 @@ public FlussLakeTiering(String[] args) {
7880
}
7981

8082
// extract lake config
81-
lakeConfigMap =
83+
Map<String, String> lakeConfigMap =
8284
extractAndRemovePrefix(
8385
paramsMap, String.format("%s%s.", DATA_LAKE_CONFIG_PREFIX, dataLake));
86+
this.lakeConfig = Configuration.fromMap(lakeConfigMap);
8487

8588
// extract tiering service config
86-
lakeTieringConfigMap = extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX);
89+
Map<String, String> lakeTieringConfigMap =
90+
extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX);
91+
this.lakeTieringConfig = Configuration.fromMap(lakeTieringConfigMap);
8792

8893
// now, we must use full restart strategy if any task is failed,
8994
// since committer is stateless, if tiering committer is failover, committer
@@ -97,18 +102,45 @@ public FlussLakeTiering(String[] args) {
97102
}
98103

99104
protected void run() throws Exception {
105+
// Load and apply all available decorator plugins
106+
loadAndApplyDecoratorPlugins();
107+
100108
// build and run lake tiering job
101109
JobClient jobClient =
102110
LakeTieringJobBuilder.newBuilder(
103-
execEnv,
104-
Configuration.fromMap(flussConfigMap),
105-
Configuration.fromMap(lakeConfigMap),
106-
Configuration.fromMap(lakeTieringConfigMap),
107-
dataLake)
111+
execEnv, flussConfig, lakeConfig, lakeTieringConfig, dataLake)
108112
.build();
109113

110114
System.out.printf(
111115
"Starting data tiering service from Fluss to %s, jobId is %s.....%n",
112116
dataLake, jobClient.getJobID());
113117
}
118+
119+
/**
120+
* Loads all available {@link LakeTieringDecoratorPlugin} implementations and applies their
121+
* decorators in sequence.
122+
*
123+
* <p>All available plugins will be loaded and their decorators will be called in the order they
124+
* are discovered by the ServiceLoader. This allows multiple decorators to be applied
125+
* sequentially, where each decorator can further customize the Flink execution environment and
126+
* configurations.
127+
*/
128+
protected void loadAndApplyDecoratorPlugins() {
129+
ServiceLoader<LakeTieringDecoratorPlugin> serviceLoader =
130+
ServiceLoader.load(
131+
LakeTieringDecoratorPlugin.class,
132+
LakeTieringDecoratorPlugin.class.getClassLoader());
133+
serviceLoader.stream()
134+
.map(ServiceLoader.Provider::get)
135+
.forEach(
136+
plugin -> {
137+
String identifier = plugin.identifier();
138+
System.out.printf(
139+
"Applying LakeTieringDecoratorPlugin with identifier: %s%n",
140+
identifier);
141+
LakeTieringDecorator decorator = plugin.createLakeTieringDecorator();
142+
decorator.decorate(
143+
execEnv, flussConfig, lakeConfig, lakeTieringConfig, dataLake);
144+
});
145+
}
114146
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.tiering;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.config.Configuration;
22+
23+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
24+
25+
/**
26+
* Interface for customizing Flink execution environment and configurations for lake tiering jobs.
27+
*
28+
* <p>Implementations of this interface can customize the Flink execution environment and
29+
* configurations as needed for specific deployment environments (e.g., injecting internal security
30+
* tokens, setting environment-specific configurations).
31+
*
32+
* @since 0.9
33+
*/
34+
@PublicEvolving
35+
public interface LakeTieringDecorator {
36+
37+
/**
38+
* Customizes the Flink execution environment and configurations for the lake tiering job.
39+
*
40+
* <p>This method is called before building the tiering job, allowing implementations to modify
41+
* the Flink execution environment or any of the provided configurations as needed.
42+
*
43+
* @param env the Flink StreamExecutionEnvironment to customize
44+
* @param flussConfig the Fluss configuration (may be modified)
45+
* @param dataLakeConfig the data lake configuration (may be modified)
46+
* @param lakeTieringConfig the lake tiering configuration (may be modified)
47+
* @param dataLakeFormat the data lake format identifier (e.g., "paimon", "iceberg")
48+
*/
49+
void decorate(
50+
StreamExecutionEnvironment env,
51+
Configuration flussConfig,
52+
Configuration dataLakeConfig,
53+
Configuration lakeTieringConfig,
54+
String dataLakeFormat);
55+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.tiering;
19+
20+
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.plugin.Plugin;
22+
23+
/**
24+
* A Plugin to create instances of {@link LakeTieringDecorator}.
25+
*
26+
* <p>This plugin mechanism allows different environments (e.g., internal vs. public cloud) to
27+
* provide their own decorator implementations for customizing Flink execution environment and
28+
* configurations without touching core code, ensuring better maintainability, testability, and
29+
* forward compatibility.
30+
*
31+
* <p>Multiple different plugin implementations can be loaded and applied simultaneously. However,
32+
* the loading order of plugins is not guaranteed and may vary between runs. Plugin implementations
33+
* should not depend on the loading order and should be designed to work correctly regardless of
34+
* when they are applied relative to other plugins.
35+
*
36+
* @since 0.9
37+
*/
38+
@PublicEvolving
39+
public interface LakeTieringDecoratorPlugin extends Plugin {
40+
41+
/**
42+
* Returns a unique identifier among {@link LakeTieringDecoratorPlugin} implementations.
43+
*
44+
* @return the identifier
45+
*/
46+
String identifier();
47+
48+
/**
49+
* Creates a new instance of {@link LakeTieringDecorator}.
50+
*
51+
* @return the lake tiering decorator instance
52+
*/
53+
LakeTieringDecorator createLakeTieringDecorator();
54+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.tiering;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
24+
/** Test for {@link FlussLakeTiering}. */
25+
class FlussLakeTieringTest {
26+
27+
@Test
28+
void testMultipleDecoratorsApplied() {
29+
String[] args = {
30+
"--fluss.bootstrap.servers",
31+
"localhost:9123",
32+
"--datalake.paimon.metastore",
33+
"rest",
34+
"--datalake.paimon.warehous",
35+
"fluss_test",
36+
"--datalake.format",
37+
"paimon",
38+
};
39+
FlussLakeTiering tiering = new FlussLakeTiering(args);
40+
41+
// Apply decorators - method now uses internal Configuration fields
42+
tiering.loadAndApplyDecoratorPlugins();
43+
44+
// Verify that TestLakeTieringDecorator modified the configurations
45+
assertThat(tiering.flussConfig.getRawValue(TestLakeTieringDecorator.TEST_FLUSS_CONFIG_KEY))
46+
.isPresent()
47+
.contains(TestLakeTieringDecorator.TEST_FLUSS_CONFIG_VALUE);
48+
assertThat(
49+
tiering.lakeConfig.getRawValue(
50+
TestLakeTieringDecorator.TEST_DATA_LAKE_CONFIG_KEY))
51+
.isPresent()
52+
.contains(TestLakeTieringDecorator.TEST_DATA_LAKE_CONFIG_VALUE);
53+
assertThat(
54+
tiering.lakeTieringConfig.getRawValue(
55+
TestLakeTieringDecorator.TEST_LAKE_TIERING_CONFIG_KEY))
56+
.isPresent()
57+
.contains(TestLakeTieringDecorator.TEST_LAKE_TIERING_CONFIG_VALUE);
58+
59+
// Verify that TestLakeTieringDecorator2 also modified the configurations
60+
assertThat(tiering.flussConfig.getRawValue(TestLakeTieringDecorator2.TEST_FLUSS_CONFIG_KEY))
61+
.isPresent()
62+
.contains(TestLakeTieringDecorator2.TEST_FLUSS_CONFIG_VALUE);
63+
assertThat(
64+
tiering.lakeConfig.getRawValue(
65+
TestLakeTieringDecorator2.TEST_DATA_LAKE_CONFIG_KEY))
66+
.isPresent()
67+
.contains(TestLakeTieringDecorator2.TEST_DATA_LAKE_CONFIG_VALUE);
68+
assertThat(
69+
tiering.lakeTieringConfig.getRawValue(
70+
TestLakeTieringDecorator2.TEST_LAKE_TIERING_CONFIG_KEY))
71+
.isPresent()
72+
.contains(TestLakeTieringDecorator2.TEST_LAKE_TIERING_CONFIG_VALUE);
73+
}
74+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.flink.tiering;
19+
20+
import org.apache.fluss.config.Configuration;
21+
22+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
23+
24+
/** Test implementation of {@link LakeTieringDecorator}. */
25+
class TestLakeTieringDecorator implements LakeTieringDecorator {
26+
27+
static final String TEST_FLUSS_CONFIG_KEY = "test.decorator1.fluss.config";
28+
static final String TEST_FLUSS_CONFIG_VALUE = "decorator1-value";
29+
static final String TEST_DATA_LAKE_CONFIG_KEY = "test.decorator1.datalake.config";
30+
static final String TEST_DATA_LAKE_CONFIG_VALUE = "decorator1-datalake-value";
31+
static final String TEST_LAKE_TIERING_CONFIG_KEY = "test.decorator1.tiering.config";
32+
static final String TEST_LAKE_TIERING_CONFIG_VALUE = "decorator1-tiering-value";
33+
34+
@Override
35+
public void decorate(
36+
StreamExecutionEnvironment env,
37+
Configuration flussConfig,
38+
Configuration dataLakeConfig,
39+
Configuration lakeTieringConfig,
40+
String dataLakeFormat) {
41+
// Modify configurations to verify decorator is called
42+
flussConfig.setString(TEST_FLUSS_CONFIG_KEY, TEST_FLUSS_CONFIG_VALUE);
43+
dataLakeConfig.setString(TEST_DATA_LAKE_CONFIG_KEY, TEST_DATA_LAKE_CONFIG_VALUE);
44+
lakeTieringConfig.setString(TEST_LAKE_TIERING_CONFIG_KEY, TEST_LAKE_TIERING_CONFIG_VALUE);
45+
}
46+
}

0 commit comments

Comments
 (0)