Skip to content

Commit f10a940

Browse files
Refactor SchemaUpdate to delegate schema changes to Schema.Builder
1 parent 864bbe6 commit f10a940

2 files changed

Lines changed: 39 additions & 54 deletions

File tree

  • fluss-common/src/main/java/org/apache/fluss/metadata
  • fluss-server/src/main/java/org/apache/fluss/server/coordinator

fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -285,18 +285,21 @@ private Builder() {
285285

286286
/** Adopts all members from the given schema. */
287287
public Builder fromSchema(Schema schema) {
288-
columns.addAll(schema.columns);
289-
if (schema.primaryKey != null) {
290-
primaryKeyNamed(schema.primaryKey.constraintName, schema.primaryKey.columnNames);
291-
}
292-
if (schema.autoIncrementColumnNames != null
293-
&& !schema.autoIncrementColumnNames.isEmpty()) {
294-
checkState(
295-
schema.autoIncrementColumnNames.size() == 1,
296-
"Multiple auto increment columns are not supported yet.");
297-
enableAutoIncrement(schema.autoIncrementColumnNames.get(0));
298-
}
299-
this.highestFieldId = new AtomicInteger(schema.highestFieldId);
288+
// 1. Clear current builder state
289+
this.columns.clear();
290+
this.autoIncrementColumnNames.clear();
291+
this.primaryKey = null;
292+
293+
// 2. Adopt columns while preserving their original IDs
294+
this.fromColumns(schema.getColumns());
295+
296+
// 3. Sync the highest field ID counter to prevent ID conflicts
297+
this.highestFieldId.set(schema.getHighestFieldId());
298+
299+
// 4. Copy the metadata members
300+
this.autoIncrementColumnNames.addAll(schema.getAutoIncrementColumnNames());
301+
schema.getPrimaryKey().ifPresent(pk -> this.primaryKey = pk);
302+
300303
return this;
301304
}
302305

@@ -546,6 +549,13 @@ public Builder enableAutoIncrement(String columnName) {
546549
return this;
547550
}
548551

552+
/** Returns the column with the given name, if it exists. */
553+
public Optional<Column> getColumn(String columnName) {
554+
return columns.stream()
555+
.filter(column -> column.getName().equals(columnName))
556+
.findFirst();
557+
}
558+
549559
/** Returns an instance of an {@link Schema}. */
550560
public Schema build() {
551561
return new Schema(columns, primaryKey, highestFieldId.get(), autoIncrementColumnNames);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/SchemaUpdate.java

Lines changed: 17 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,13 @@
2121
import org.apache.fluss.metadata.Schema;
2222
import org.apache.fluss.metadata.TableChange;
2323
import org.apache.fluss.metadata.TableInfo;
24-
import org.apache.fluss.types.DataType;
25-
import org.apache.fluss.types.ReassignFieldId;
2624

27-
import java.util.ArrayList;
28-
import java.util.HashMap;
2925
import java.util.List;
30-
import java.util.Map;
3126
import java.util.Objects;
32-
import java.util.concurrent.atomic.AtomicInteger;
3327

3428
/** Schema update. */
3529
public class SchemaUpdate {
3630

37-
/** Apply schema changes to the given table info and return the updated schema. */
3831
public static Schema applySchemaChanges(TableInfo tableInfo, List<TableChange> changes) {
3932
SchemaUpdate schemaUpdate = new SchemaUpdate(tableInfo);
4033
for (TableChange change : changes) {
@@ -43,36 +36,16 @@ public static Schema applySchemaChanges(TableInfo tableInfo, List<TableChange> c
4336
return schemaUpdate.getSchema();
4437
}
4538

46-
private final List<Schema.Column> columns;
47-
private final AtomicInteger highestFieldId;
48-
private final List<String> primaryKeys;
49-
private final Map<String, Schema.Column> existedColumns;
50-
private final List<String> autoIncrementColumns;
39+
// Now we only maintain the Builder
40+
private final Schema.Builder builder;
5141

5242
public SchemaUpdate(TableInfo tableInfo) {
53-
this.columns = new ArrayList<>();
54-
this.existedColumns = new HashMap<>();
55-
this.highestFieldId = new AtomicInteger(tableInfo.getSchema().getHighestFieldId());
56-
this.primaryKeys = tableInfo.getPrimaryKeys();
57-
this.autoIncrementColumns = tableInfo.getSchema().getAutoIncrementColumnNames();
58-
this.columns.addAll(tableInfo.getSchema().getColumns());
59-
for (Schema.Column column : columns) {
60-
existedColumns.put(column.getName(), column);
61-
}
43+
// Initialize builder from the current table schema
44+
this.builder = Schema.newBuilder().fromSchema(tableInfo.getSchema());
6245
}
6346

6447
public Schema getSchema() {
65-
Schema.Builder builder =
66-
Schema.newBuilder()
67-
.fromColumns(columns)
68-
.highestFieldId((short) highestFieldId.get());
69-
if (!primaryKeys.isEmpty()) {
70-
builder.primaryKey(primaryKeys);
71-
}
72-
for (String autoIncrementColumn : autoIncrementColumns) {
73-
builder.enableAutoIncrement(autoIncrementColumn);
74-
}
75-
48+
// Validation and building are now delegated
7649
return builder.build();
7750
}
7851

@@ -91,9 +64,10 @@ public SchemaUpdate applySchemaChange(TableChange columnChange) {
9164
}
9265

9366
private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
94-
Schema.Column existingColumn = existedColumns.get(addColumn.getName());
67+
// Use the builder to check if column exists
68+
Schema.Column existingColumn = builder.getColumn(addColumn.getName()).orElse(null);
69+
9570
if (existingColumn != null) {
96-
// Allow idempotent retries: if column name/type/comment match existing, treat as no-op
9771
if (!existingColumn.getDataType().equals(addColumn.getDataType())
9872
|| !Objects.equals(
9973
existingColumn.getComment().orElse(null), addColumn.getComment())) {
@@ -103,8 +77,7 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
10377
return this;
10478
}
10579

106-
TableChange.ColumnPosition position = addColumn.getPosition();
107-
if (position != TableChange.ColumnPosition.last()) {
80+
if (addColumn.getPosition() != TableChange.ColumnPosition.last()) {
10881
throw new IllegalArgumentException("Only support addColumn column at last now.");
10982
}
11083

@@ -113,13 +86,15 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
11386
"Column " + addColumn.getName() + " must be nullable.");
11487
}
11588

116-
int columnId = highestFieldId.incrementAndGet();
117-
DataType dataType = ReassignFieldId.reassign(addColumn.getDataType(), highestFieldId);
89+
// Delegate the actual addition to the builder
90+
builder.column(addColumn.getName(), addColumn.getDataType());
91+
92+
// Fixed: Use null check for the String comment
93+
String comment = addColumn.getComment();
94+
if (comment != null) {
95+
builder.withComment(comment);
96+
}
11897

119-
Schema.Column newColumn =
120-
new Schema.Column(addColumn.getName(), dataType, addColumn.getComment(), columnId);
121-
columns.add(newColumn);
122-
existedColumns.put(newColumn.getName(), newColumn);
12398
return this;
12499
}
125100

0 commit comments

Comments
 (0)