diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java index e2960dcc7c..92b675c428 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java @@ -285,18 +285,21 @@ private Builder() { /** Adopts all members from the given schema. */ public Builder fromSchema(Schema schema) { - columns.addAll(schema.columns); - if (schema.primaryKey != null) { - primaryKeyNamed(schema.primaryKey.constraintName, schema.primaryKey.columnNames); - } - if (schema.autoIncrementColumnNames != null - && !schema.autoIncrementColumnNames.isEmpty()) { - checkState( - schema.autoIncrementColumnNames.size() == 1, - "Multiple auto increment columns are not supported yet."); - enableAutoIncrement(schema.autoIncrementColumnNames.get(0)); - } - this.highestFieldId = new AtomicInteger(schema.highestFieldId); + // Check that the builder is empty before adopting from an existing schema + checkState( + columns.isEmpty() && autoIncrementColumnNames.isEmpty() && primaryKey == null, + "Schema.Builder#fromSchema should be the first API to be called on the builder."); + + // Adopt columns while preserving their original IDs + this.fromColumns(schema.getColumns()); + + // Sync the highest field ID counter to prevent ID conflicts + this.highestFieldId.set(schema.getHighestFieldId()); + + // Copy the metadata members + this.autoIncrementColumnNames.addAll(schema.getAutoIncrementColumnNames()); + schema.getPrimaryKey().ifPresent(pk -> this.primaryKey = pk); + return this; } @@ -327,16 +330,25 @@ public Builder fromSchema(Schema schema) { * some not set) */ public Builder fromColumns(List inputColumns) { + boolean nonSetColumnId = inputColumns.stream() .noneMatch(column -> column.columnId != Column.UNKNOWN_COLUMN_ID); boolean allSetColumnId = inputColumns.stream() .allMatch(column -> column.columnId != Column.UNKNOWN_COLUMN_ID); + // REFINED CHECK: + // Only throw if we are adopting a full schema (allSetColumnId) + // AND the builder already has columns assigned. + // We use !columns.isEmpty() as the primary signal of a "dirty" builder. + if (allSetColumnId && !inputColumns.isEmpty() && !this.columns.isEmpty()) { + throw new IllegalStateException( + "Schema.Builder#fromColumns (or fromSchema) should be the first API to be called on the builder when adopting columns with IDs."); + } + checkState( nonSetColumnId || allSetColumnId, "All columns must have columnId or none of them must have columnId."); - if (allSetColumnId) { columns.addAll(inputColumns); List allFieldIds = collectAllFieldIds(inputColumns); @@ -546,6 +558,13 @@ public Builder enableAutoIncrement(String columnName) { return this; } + /** Returns the column with the given name, if it exists. */ + public Optional getColumn(String columnName) { + return columns.stream() + .filter(column -> column.getName().equals(columnName)) + .findFirst(); + } + /** Returns an instance of an {@link Schema}. */ public Schema build() { return new Schema(columns, primaryKey, highestFieldId.get(), autoIncrementColumnNames); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java index e3527cb886..011871a0ba 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java @@ -21,15 +21,9 @@ import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableInfo; -import org.apache.fluss.types.DataType; -import org.apache.fluss.types.ReassignFieldId; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; /** Schema update. */ public class SchemaUpdate { @@ -43,36 +37,16 @@ public static Schema applySchemaChanges(TableInfo tableInfo, List c return schemaUpdate.getSchema(); } - private final List columns; - private final AtomicInteger highestFieldId; - private final List primaryKeys; - private final Map existedColumns; - private final List autoIncrementColumns; + // Now we only maintain the Builder + private final Schema.Builder builder; public SchemaUpdate(TableInfo tableInfo) { - this.columns = new ArrayList<>(); - this.existedColumns = new HashMap<>(); - this.highestFieldId = new AtomicInteger(tableInfo.getSchema().getHighestFieldId()); - this.primaryKeys = tableInfo.getPrimaryKeys(); - this.autoIncrementColumns = tableInfo.getSchema().getAutoIncrementColumnNames(); - this.columns.addAll(tableInfo.getSchema().getColumns()); - for (Schema.Column column : columns) { - existedColumns.put(column.getName(), column); - } + // Initialize builder from the current table schema + this.builder = Schema.newBuilder().fromSchema(tableInfo.getSchema()); } public Schema getSchema() { - Schema.Builder builder = - Schema.newBuilder() - .fromColumns(columns) - .highestFieldId((short) highestFieldId.get()); - if (!primaryKeys.isEmpty()) { - builder.primaryKey(primaryKeys); - } - for (String autoIncrementColumn : autoIncrementColumns) { - builder.enableAutoIncrement(autoIncrementColumn); - } - + // Validation and building are now delegated return builder.build(); } @@ -91,9 +65,10 @@ public SchemaUpdate applySchemaChange(TableChange columnChange) { } private SchemaUpdate addColumn(TableChange.AddColumn addColumn) { - Schema.Column existingColumn = existedColumns.get(addColumn.getName()); + // Use the builder to check if column exists + Schema.Column existingColumn = builder.getColumn(addColumn.getName()).orElse(null); + if (existingColumn != null) { - // Allow idempotent retries: if column name/type/comment match existing, treat as no-op if (!existingColumn.getDataType().equals(addColumn.getDataType()) || !Objects.equals( existingColumn.getComment().orElse(null), addColumn.getComment())) { @@ -103,8 +78,7 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) { return this; } - TableChange.ColumnPosition position = addColumn.getPosition(); - if (position != TableChange.ColumnPosition.last()) { + if (addColumn.getPosition() != TableChange.ColumnPosition.last()) { throw new IllegalArgumentException("Only support addColumn column at last now."); } @@ -113,13 +87,15 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) { "Column " + addColumn.getName() + " must be nullable."); } - int columnId = highestFieldId.incrementAndGet(); - DataType dataType = ReassignFieldId.reassign(addColumn.getDataType(), highestFieldId); + // Delegate the actual addition to the builder + builder.column(addColumn.getName(), addColumn.getDataType()); + + // Fixed: Use null check for the String comment + String comment = addColumn.getComment(); + if (comment != null) { + builder.withComment(comment); + } - Schema.Column newColumn = - new Schema.Column(addColumn.getName(), dataType, addColumn.getComment(), columnId); - columns.add(newColumn); - existedColumns.put(newColumn.getName(), newColumn); return this; }