Skip to content

Commit b4633f6

Browse files
committed
fix comments
1 parent 100999f commit b4633f6

4 files changed

Lines changed: 87 additions & 12 deletions

File tree

fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.config.cluster.AlterConfig;
2323
import org.apache.fluss.config.cluster.AlterConfigOpType;
2424
import org.apache.fluss.exception.InvalidAlterTableException;
25+
import org.apache.fluss.exception.InvalidConfigException;
2526
import org.apache.fluss.metadata.DataLakeFormat;
2627
import org.apache.fluss.metadata.DatabaseDescriptor;
2728
import org.apache.fluss.metadata.Schema;
@@ -182,6 +183,50 @@ void testTableWithExplicitDatalakeFormatCanEnableDatalake() throws Exception {
182183
assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue();
183184
}
184185

186+
@Test
187+
void testCannotEnableTableWhenTableFormatDiffersFromClusterFormat() throws Exception {
188+
String databaseName = "test_db";
189+
String tableName = "test_table_format_mismatch";
190+
TablePath tablePath = TablePath.of(databaseName, tableName);
191+
192+
admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, true).get();
193+
admin.alterClusterConfigs(
194+
Collections.singletonList(
195+
new AlterConfig(
196+
DATALAKE_FORMAT.key(), null, AlterConfigOpType.SET)))
197+
.get();
198+
199+
TableDescriptor tableDescriptor =
200+
TableDescriptor.builder()
201+
.schema(
202+
Schema.newBuilder()
203+
.column("c1", DataTypes.INT())
204+
.column("c2", DataTypes.STRING())
205+
.build())
206+
.distributedBy(3, "c1")
207+
.property(ConfigOptions.TABLE_DATALAKE_FORMAT, DataLakeFormat.ICEBERG)
208+
.build();
209+
admin.createTable(tablePath, tableDescriptor, false).get();
210+
211+
admin.alterClusterConfigs(
212+
Arrays.asList(
213+
new AlterConfig(
214+
DATALAKE_FORMAT.key(),
215+
DataLakeFormat.PAIMON.toString(),
216+
AlterConfigOpType.SET),
217+
new AlterConfig(
218+
DATALAKE_ENABLED.key(), "true", AlterConfigOpType.SET)))
219+
.get();
220+
221+
List<TableChange> enableDatalakeChange =
222+
Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(), "true"));
223+
assertThatThrownBy(() -> admin.alterTable(tablePath, enableDatalakeChange, false).get())
224+
.cause()
225+
.isInstanceOf(InvalidConfigException.class)
226+
.hasMessageContaining("'table.datalake.format' ('iceberg')")
227+
.hasMessageContaining("cluster 'datalake.format' ('paimon')");
228+
}
229+
185230
@Test
186231
void testEnableTableAfterClusterEnablesDataLake() throws Exception {
187232
String databaseName = "test_db";

fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,15 @@ public void validate(Configuration newConfig) throws ConfigException {
6767
? newConfig.get(DATALAKE_FORMAT)
6868
: currentConfiguration.get(DATALAKE_FORMAT);
6969

70+
// TODO: validate(...) only sees the merged effective cluster config, so it cannot
71+
// detect the case where a user enables datalake.enabled and unsets
72+
// datalake.format in the same dynamic config change. This may leave the cluster
73+
// with datalake.enabled set but no datalake.format. Fixing this likely requires
74+
// extending the validate/reconfigure framework to expose the incremental change
75+
// set, rather than only the merged result. We accept this for now because
76+
// table-level enablement is still validated, and enabling datalake for a table
77+
// will fail if datalake.format is not configured.
7078
boolean explicitDataLakeEnabled = newConfig.getOptional(DATALAKE_ENABLED).orElse(false);
71-
72-
Optional<DataLakeFormat> newDataLakeFormat = newConfig.getOptional(DATALAKE_FORMAT);
73-
Optional<DataLakeFormat> effectiveDataLakeFormat =
74-
newDataLakeFormat.isPresent()
75-
? newDataLakeFormat
76-
: currentConfiguration.getOptional(DATALAKE_FORMAT);
77-
7879
if (explicitDataLakeEnabled && newDatalakeFormat != null) {
7980
throw new ConfigException(
8081
String.format(
@@ -88,7 +89,7 @@ public void validate(Configuration newConfig) throws ConfigException {
8889
return;
8990
}
9091

91-
String datalakePrefix = "datalake." + effectiveDataLakeFormat.get() + ".";
92+
String datalakePrefix = "datalake." + newDatalakeFormat + ".";
9293
Map<String, String> configMap = newConfig.toMap();
9394
configMap.forEach(
9495
(key, value) -> {
@@ -99,7 +100,7 @@ public void validate(Configuration newConfig) throws ConfigException {
99100
throw new ConfigException(
100101
String.format(
101102
"Invalid configuration '%s' for '%s' datalake format",
102-
key, effectiveDataLakeFormat.get()));
103+
key, newDatalakeFormat));
103104
}
104105
});
105106
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,10 @@ public long createTable(
367367
boolean ignoreIfExists)
368368
throws TableAlreadyExistException, DatabaseNotExistException {
369369
// validate table properties before creating table
370-
validateTableDescriptor(tableToCreate, maxBucketNum);
370+
validateTableDescriptor(
371+
tableToCreate,
372+
maxBucketNum,
373+
lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat());
371374

372375
if (!databaseExists(tablePath.getDatabaseName())) {
373376
throw new DatabaseNotExistException(
@@ -514,7 +517,10 @@ public void alterTableProperties(
514517

515518
if (newDescriptor != null) {
516519
// reuse the same validate logic with the createTable() method
517-
validateTableDescriptor(newDescriptor, maxBucketNum);
520+
validateTableDescriptor(
521+
newDescriptor,
522+
maxBucketNum,
523+
lakeCatalogDynamicLoader.getLakeCatalogContainer().getDataLakeFormat());
518524
// pre alter table properties, e.g. create lake table in lake storage if it's to
519525
// enable datalake for the table
520526
preAlterTableProperties(

fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.fluss.exception.TooManyBucketsException;
2929
import org.apache.fluss.metadata.AggFunction;
3030
import org.apache.fluss.metadata.ChangelogImage;
31+
import org.apache.fluss.metadata.DataLakeFormat;
3132
import org.apache.fluss.metadata.DeleteBehavior;
3233
import org.apache.fluss.metadata.KvFormat;
3334
import org.apache.fluss.metadata.LogFormat;
@@ -41,6 +42,8 @@
4142
import org.apache.fluss.utils.AutoPartitionStrategy;
4243
import org.apache.fluss.utils.StringUtils;
4344

45+
import javax.annotation.Nullable;
46+
4447
import java.util.Arrays;
4548
import java.util.Collections;
4649
import java.util.EnumSet;
@@ -79,7 +82,10 @@ public class TableDescriptorValidation {
7982
Arrays.asList(DataTypeRoot.ARRAY, DataTypeRoot.MAP, DataTypeRoot.ROW);
8083

8184
/** Validate table descriptor to create is valid and contain all necessary information. */
82-
public static void validateTableDescriptor(TableDescriptor tableDescriptor, int maxBucketNum) {
85+
public static void validateTableDescriptor(
86+
TableDescriptor tableDescriptor,
87+
int maxBucketNum,
88+
@Nullable DataLakeFormat clusterDataLakeFormat) {
8389
Schema schema = tableDescriptor.getSchema();
8490
boolean hasPrimaryKey = schema.getPrimaryKey().isPresent();
8591
Configuration tableConf = Configuration.fromMap(tableDescriptor.getProperties());
@@ -118,6 +124,23 @@ public static void validateTableDescriptor(TableDescriptor tableDescriptor, int
118124
checkTieredLog(tableConf);
119125
checkPartition(tableConf, tableDescriptor.getPartitionKeys(), schema.getRowType());
120126
checkSystemColumns(schema.getRowType());
127+
checkTableLakeFormatMatchesCluster(tableConf, clusterDataLakeFormat);
128+
}
129+
130+
private static void checkTableLakeFormatMatchesCluster(
131+
Configuration tableConf, @Nullable DataLakeFormat clusterDataLakeFormat) {
132+
Optional<DataLakeFormat> tableDataLakeFormat =
133+
tableConf.getOptional(ConfigOptions.TABLE_DATALAKE_FORMAT);
134+
if (tableDataLakeFormat.isPresent() && tableDataLakeFormat.get() != clusterDataLakeFormat) {
135+
throw new InvalidConfigException(
136+
String.format(
137+
"'%s' ('%s') must match cluster '%s' ('%s') when '%s' is enabled.",
138+
ConfigOptions.TABLE_DATALAKE_FORMAT.key(),
139+
tableDataLakeFormat.get(),
140+
ConfigOptions.DATALAKE_FORMAT.key(),
141+
clusterDataLakeFormat,
142+
ConfigOptions.TABLE_DATALAKE_ENABLED.key()));
143+
}
121144
}
122145

123146
public static void validateAlterTableProperties(

0 commit comments

Comments
 (0)