Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 32 additions & 13 deletions fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,18 +285,21 @@ private Builder() {

/** Adopts all members from the given schema. */
public Builder fromSchema(Schema schema) {
columns.addAll(schema.columns);
if (schema.primaryKey != null) {
primaryKeyNamed(schema.primaryKey.constraintName, schema.primaryKey.columnNames);
}
if (schema.autoIncrementColumnNames != null
&& !schema.autoIncrementColumnNames.isEmpty()) {
checkState(
schema.autoIncrementColumnNames.size() == 1,
"Multiple auto increment columns are not supported yet.");
enableAutoIncrement(schema.autoIncrementColumnNames.get(0));
}
this.highestFieldId = new AtomicInteger(schema.highestFieldId);
// Check that the builder is empty before adopting from an existing schema
checkState(
columns.isEmpty() && autoIncrementColumnNames.isEmpty() && primaryKey == null,
"Schema.Builder#fromSchema should be the first API to be called on the builder.");

// Adopt columns while preserving their original IDs
this.fromColumns(schema.getColumns());

// Sync the highest field ID counter to prevent ID conflicts
this.highestFieldId.set(schema.getHighestFieldId());

// Copy the metadata members
this.autoIncrementColumnNames.addAll(schema.getAutoIncrementColumnNames());
schema.getPrimaryKey().ifPresent(pk -> this.primaryKey = pk);

return this;
}

Expand Down Expand Up @@ -327,16 +330,25 @@ public Builder fromSchema(Schema schema) {
* some not set)
*/
public Builder fromColumns(List<Column> inputColumns) {

boolean nonSetColumnId =
inputColumns.stream()
.noneMatch(column -> column.columnId != Column.UNKNOWN_COLUMN_ID);
boolean allSetColumnId =
inputColumns.stream()
.allMatch(column -> column.columnId != Column.UNKNOWN_COLUMN_ID);
// REFINED CHECK:
// Only throw if we are adopting a full schema (allSetColumnId)
// AND the builder already has columns assigned.
// We use !columns.isEmpty() as the primary signal of a "dirty" builder.
if (allSetColumnId && !inputColumns.isEmpty() && !this.columns.isEmpty()) {
throw new IllegalStateException(
"Schema.Builder#fromColumns (or fromSchema) should be the first API to be called on the builder when adopting columns with IDs.");
}

checkState(
nonSetColumnId || allSetColumnId,
"All columns must have columnId or none of them must have columnId.");

if (allSetColumnId) {
columns.addAll(inputColumns);
List<Integer> allFieldIds = collectAllFieldIds(inputColumns);
Expand Down Expand Up @@ -546,6 +558,13 @@ public Builder enableAutoIncrement(String columnName) {
return this;
}

/** Returns the column with the given name, if it exists. */
public Optional<Column> getColumn(String columnName) {
return columns.stream()
.filter(column -> column.getName().equals(columnName))
.findFirst();
}

/** Returns an instance of an {@link Schema}. */
public Schema build() {
return new Schema(columns, primaryKey, highestFieldId.get(), autoIncrementColumnNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,9 @@
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.ReassignFieldId;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

/** Schema update. */
public class SchemaUpdate {
Expand All @@ -43,36 +37,16 @@ public static Schema applySchemaChanges(TableInfo tableInfo, List<TableChange> c
return schemaUpdate.getSchema();
}

private final List<Schema.Column> columns;
private final AtomicInteger highestFieldId;
private final List<String> primaryKeys;
private final Map<String, Schema.Column> existedColumns;
private final List<String> autoIncrementColumns;
// Now we only maintain the Builder
private final Schema.Builder builder;

public SchemaUpdate(TableInfo tableInfo) {
this.columns = new ArrayList<>();
this.existedColumns = new HashMap<>();
this.highestFieldId = new AtomicInteger(tableInfo.getSchema().getHighestFieldId());
this.primaryKeys = tableInfo.getPrimaryKeys();
this.autoIncrementColumns = tableInfo.getSchema().getAutoIncrementColumnNames();
this.columns.addAll(tableInfo.getSchema().getColumns());
for (Schema.Column column : columns) {
existedColumns.put(column.getName(), column);
}
// Initialize builder from the current table schema
this.builder = Schema.newBuilder().fromSchema(tableInfo.getSchema());
}

public Schema getSchema() {
Schema.Builder builder =
Schema.newBuilder()
.fromColumns(columns)
.highestFieldId((short) highestFieldId.get());
if (!primaryKeys.isEmpty()) {
builder.primaryKey(primaryKeys);
}
for (String autoIncrementColumn : autoIncrementColumns) {
builder.enableAutoIncrement(autoIncrementColumn);
}

// Validation and building are now delegated
return builder.build();
}

Expand All @@ -91,9 +65,10 @@ public SchemaUpdate applySchemaChange(TableChange columnChange) {
}

private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
Schema.Column existingColumn = existedColumns.get(addColumn.getName());
// Use the builder to check if column exists
Schema.Column existingColumn = builder.getColumn(addColumn.getName()).orElse(null);

if (existingColumn != null) {
// Allow idempotent retries: if column name/type/comment match existing, treat as no-op
if (!existingColumn.getDataType().equals(addColumn.getDataType())
|| !Objects.equals(
existingColumn.getComment().orElse(null), addColumn.getComment())) {
Expand All @@ -103,8 +78,7 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
return this;
}

TableChange.ColumnPosition position = addColumn.getPosition();
if (position != TableChange.ColumnPosition.last()) {
if (addColumn.getPosition() != TableChange.ColumnPosition.last()) {
throw new IllegalArgumentException("Only support addColumn column at last now.");
}

Expand All @@ -113,13 +87,15 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
"Column " + addColumn.getName() + " must be nullable.");
}

int columnId = highestFieldId.incrementAndGet();
DataType dataType = ReassignFieldId.reassign(addColumn.getDataType(), highestFieldId);
// Delegate the actual addition to the builder
builder.column(addColumn.getName(), addColumn.getDataType());

// Fixed: Use null check for the String comment
String comment = addColumn.getComment();
if (comment != null) {
builder.withComment(comment);
}

Schema.Column newColumn =
new Schema.Column(addColumn.getName(), dataType, addColumn.getComment(), columnId);
columns.add(newColumn);
existedColumns.put(newColumn.getName(), newColumn);
return this;
}

Expand Down