From 08590d85f537a5ae7ed0866200e93687ecb7a4df Mon Sep 17 00:00:00 2001 From: Prajwal-banakar Date: Tue, 20 Jan 2026 12:18:52 +0000 Subject: [PATCH 1/3] Refactor SchemaUpdate to delegate schema changes to Schema.Builder --- .../org/apache/fluss/metadata/Schema.java | 34 +++++++---- .../server/coordinator/SchemaUpdate.java | 58 ++++++------------- 2 files changed, 39 insertions(+), 53 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java index e2960dcc7c..2bbe538116 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java @@ -285,18 +285,21 @@ private Builder() { /** Adopts all members from the given schema. */ public Builder fromSchema(Schema schema) { - columns.addAll(schema.columns); - if (schema.primaryKey != null) { - primaryKeyNamed(schema.primaryKey.constraintName, schema.primaryKey.columnNames); - } - if (schema.autoIncrementColumnNames != null - && !schema.autoIncrementColumnNames.isEmpty()) { - checkState( - schema.autoIncrementColumnNames.size() == 1, - "Multiple auto increment columns are not supported yet."); - enableAutoIncrement(schema.autoIncrementColumnNames.get(0)); - } - this.highestFieldId = new AtomicInteger(schema.highestFieldId); + // Check that the builder is empty before adopting from an existing schema + checkState( + columns.isEmpty() && autoIncrementColumnNames.isEmpty() && primaryKey == null, + "Schema.Builder#fromSchema should be the first API to be called on the builder."); + + // Adopt columns while preserving their original IDs + this.fromColumns(schema.getColumns()); + + // Sync the highest field ID counter to prevent ID conflicts + this.highestFieldId.set(schema.getHighestFieldId()); + + // Copy the metadata members + this.autoIncrementColumnNames.addAll(schema.getAutoIncrementColumnNames()); + schema.getPrimaryKey().ifPresent(pk -> this.primaryKey = pk); + return this; } @@ -546,6 +549,13 @@ public Builder enableAutoIncrement(String columnName) { return this; } + /** Returns the column with the given name, if it exists. */ + public Optional getColumn(String columnName) { + return columns.stream() + .filter(column -> column.getName().equals(columnName)) + .findFirst(); + } + /** Returns an instance of an {@link Schema}. */ public Schema build() { return new Schema(columns, primaryKey, highestFieldId.get(), autoIncrementColumnNames); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java index e3527cb886..011871a0ba 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java @@ -21,15 +21,9 @@ import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableInfo; -import org.apache.fluss.types.DataType; -import org.apache.fluss.types.ReassignFieldId; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; /** Schema update. */ public class SchemaUpdate { @@ -43,36 +37,16 @@ public static Schema applySchemaChanges(TableInfo tableInfo, List c return schemaUpdate.getSchema(); } - private final List columns; - private final AtomicInteger highestFieldId; - private final List primaryKeys; - private final Map existedColumns; - private final List autoIncrementColumns; + // Now we only maintain the Builder + private final Schema.Builder builder; public SchemaUpdate(TableInfo tableInfo) { - this.columns = new ArrayList<>(); - this.existedColumns = new HashMap<>(); - this.highestFieldId = new AtomicInteger(tableInfo.getSchema().getHighestFieldId()); - this.primaryKeys = tableInfo.getPrimaryKeys(); - this.autoIncrementColumns = tableInfo.getSchema().getAutoIncrementColumnNames(); - this.columns.addAll(tableInfo.getSchema().getColumns()); - for (Schema.Column column : columns) { - existedColumns.put(column.getName(), column); - } + // Initialize builder from the current table schema + this.builder = Schema.newBuilder().fromSchema(tableInfo.getSchema()); } public Schema getSchema() { - Schema.Builder builder = - Schema.newBuilder() - .fromColumns(columns) - .highestFieldId((short) highestFieldId.get()); - if (!primaryKeys.isEmpty()) { - builder.primaryKey(primaryKeys); - } - for (String autoIncrementColumn : autoIncrementColumns) { - builder.enableAutoIncrement(autoIncrementColumn); - } - + // Validation and building are now delegated return builder.build(); } @@ -91,9 +65,10 @@ public SchemaUpdate applySchemaChange(TableChange columnChange) { } private SchemaUpdate addColumn(TableChange.AddColumn addColumn) { - Schema.Column existingColumn = existedColumns.get(addColumn.getName()); + // Use the builder to check if column exists + Schema.Column existingColumn = builder.getColumn(addColumn.getName()).orElse(null); + if (existingColumn != null) { - // Allow idempotent retries: if column name/type/comment match existing, treat as no-op if (!existingColumn.getDataType().equals(addColumn.getDataType()) || !Objects.equals( existingColumn.getComment().orElse(null), addColumn.getComment())) { @@ -103,8 +78,7 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) { return this; } - TableChange.ColumnPosition position = addColumn.getPosition(); - if (position != TableChange.ColumnPosition.last()) { + if (addColumn.getPosition() != TableChange.ColumnPosition.last()) { throw new IllegalArgumentException("Only support addColumn column at last now."); } @@ -113,13 +87,15 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) { "Column " + addColumn.getName() + " must be nullable."); } - int columnId = highestFieldId.incrementAndGet(); - DataType dataType = ReassignFieldId.reassign(addColumn.getDataType(), highestFieldId); + // Delegate the actual addition to the builder + builder.column(addColumn.getName(), addColumn.getDataType()); + + // Fixed: Use null check for the String comment + String comment = addColumn.getComment(); + if (comment != null) { + builder.withComment(comment); + } - Schema.Column newColumn = - new Schema.Column(addColumn.getName(), dataType, addColumn.getComment(), columnId); - columns.add(newColumn); - existedColumns.put(newColumn.getName(), newColumn); return this; } From 5806e6309208d4f92f2ff6a9db6d8f3ead5cb00c Mon Sep 17 00:00:00 2001 From: Prajwal-banakar Date: Wed, 21 Jan 2026 17:46:01 +0000 Subject: [PATCH 2/3] Rerunning CI to bypass flaky test --- .../java/org/apache/fluss/metadata/Schema.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java index 2bbe538116..2cd3d8a5df 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java @@ -285,10 +285,6 @@ private Builder() { /** Adopts all members from the given schema. */ public Builder fromSchema(Schema schema) { - // Check that the builder is empty before adopting from an existing schema - checkState( - columns.isEmpty() && autoIncrementColumnNames.isEmpty() && primaryKey == null, - "Schema.Builder#fromSchema should be the first API to be called on the builder."); // Adopt columns while preserving their original IDs this.fromColumns(schema.getColumns()); @@ -330,16 +326,25 @@ public Builder fromSchema(Schema schema) { * some not set) */ public Builder fromColumns(List inputColumns) { + boolean nonSetColumnId = inputColumns.stream() .noneMatch(column -> column.columnId != Column.UNKNOWN_COLUMN_ID); boolean allSetColumnId = inputColumns.stream() .allMatch(column -> column.columnId != Column.UNKNOWN_COLUMN_ID); + // REFINED CHECK: + // Only throw if we are adopting a full schema (allSetColumnId) + // AND the builder already has columns assigned. + // We use !columns.isEmpty() as the primary signal of a "dirty" builder. + if (allSetColumnId && !inputColumns.isEmpty() && !this.columns.isEmpty()) { + throw new IllegalStateException( + "Schema.Builder#fromColumns (or fromSchema) should be the first API to be called on the builder when adopting columns with IDs."); + } + checkState( nonSetColumnId || allSetColumnId, "All columns must have columnId or none of them must have columnId."); - if (allSetColumnId) { columns.addAll(inputColumns); List allFieldIds = collectAllFieldIds(inputColumns); From cba6acbb74e7de719b959e33c33098cbe7744bbf Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Fri, 23 Jan 2026 15:29:13 +0800 Subject: [PATCH 3/3] Enforce empty state requirement for Schema.Builder#fromSchema method --- .../src/main/java/org/apache/fluss/metadata/Schema.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java index 2cd3d8a5df..92b675c428 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java @@ -285,6 +285,10 @@ private Builder() { /** Adopts all members from the given schema. */ public Builder fromSchema(Schema schema) { + // Check that the builder is empty before adopting from an existing schema + checkState( + columns.isEmpty() && autoIncrementColumnNames.isEmpty() && primaryKey == null, + "Schema.Builder#fromSchema should be the first API to be called on the builder."); // Adopt columns while preserving their original IDs this.fromColumns(schema.getColumns());