Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
.comment("test table")
.distributedBy(3, "id")
.property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1))
.property(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION, 2)
.customProperty("connector", "fluss")
.build();

Expand Down Expand Up @@ -774,6 +775,14 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {
try (Connection conn = ConnectionFactory.createConnection(clientConf);
Admin admin = conn.getAdmin()) {
TableInfo tableInfo = admin.getTableInfo(DEFAULT_TABLE_PATH).get();
TableDescriptor expectedDescriptor =
DEFAULT_TABLE_DESCRIPTOR.withReplicationFactor(3).withDataLakeFormat(PAIMON);
Map<String, String> expectedProperties =
new HashMap<>(expectedDescriptor.getProperties());
expectedProperties.put(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION.key(), "2");
expectedDescriptor = expectedDescriptor.withProperties(expectedProperties);

assertThat(tableInfo.toTableDescriptor()).isEqualTo(expectedDescriptor);
assertThat(tableInfo.toTableDescriptor())
.isEqualTo(
DEFAULT_TABLE_DESCRIPTOR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows;
Expand Down Expand Up @@ -103,6 +105,9 @@ void testGetDescriptor() throws Exception {
DATA1_TABLE_DESCRIPTOR_PK
.withReplicationFactor(3)
.withDataLakeFormat(DataLakeFormat.PAIMON);
Map<String, String> expectedProperties = new HashMap<>(expected.getProperties());
expectedProperties.put(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION.key(), "2");
expected = expected.withProperties(expectedProperties);
assertThat(tableInfo.toTableDescriptor()).isEqualTo(expected);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,14 @@ public class ConfigOptions {
.withDescription(
"If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake. It is disabled by default.");

public static final ConfigOption<Integer> TABLE_DATALAKE_STORAGE_VERSION =
key("table.datalake.storage-version")
.intType()
.noDefaultValue()
.withDescription(
"The storage version of the datalake table. This option is automatically set by Fluss server "
+ "and cannot be set by clients. Version 2 indicates a clean schema without system columns in datalake.");

public static final ConfigOption<MergeEngineType> TABLE_MERGE_ENGINE =
key("table.merge-engine")
.enumType(MergeEngineType.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public boolean isDataLakeAutoExpireSnapshot() {
return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT);
}

/** Gets the data lake storage version of the table. Returns empty if not set (legacy table). */
public Optional<Integer> getDataLakeStorageVersion() {
return config.getOptional(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION);
}

/** Gets the optional merge engine type of the table. */
public Optional<MergeEngineType> getMergeEngineType() {
return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
@PublicEvolving
public interface LakeCatalog extends AutoCloseable {

Integer CURRENT_LAKE_STORAGE_VERSION = 2;

/**
* Create a new table in lake.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,15 +323,23 @@ public boolean isBounded() {
case TIMESTAMP:
offsetsInitializer =
OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
if (hasPrimaryKey()) {
// Currently, for primary key tables, we do not consider lake data
// when reading from a given timestamp. This is because we will need
// to read the change log of primary key table.
// TODO: consider support it using paimon change log data?
enableLakeSource = false;
} else {
if (enableLakeSource) {
enableLakeSource = pushTimeStampFilterToLakeSource(lakeSource);
if (lakeSource != null) {
if (!tableOptions.containsKey(
ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION.key())) {
enableLakeSource = handleLegacyV1LakeTable();
} else {
int lakeStorageVersion =
Integer.parseInt(
tableOptions.get(
ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION
.key()));
if (lakeStorageVersion < 2) {
throw new IllegalArgumentException(
"Unsupported lake storage version: " + lakeStorageVersion);
} else {
// todo: support map to partition in issue
enableLakeSource = false;
}
}
}
break;
Expand Down Expand Up @@ -384,6 +392,18 @@ public boolean isBounded() {
}
}

private boolean handleLegacyV1LakeTable() {
if (hasPrimaryKey()) {
// Currently, for primary key tables, we do not consider lake data
// when reading from a given timestamp. This is because we will need
// to read the change log of primary key table.
// TODO: consider support it using paimon change log data?
return false;
} else {
return pushTimeStampFilterToLakeSource(lakeSource);
}
}

private boolean pushTimeStampFilterToLakeSource(LakeSource<?> lakeSource) {
// will push timestamp to lake
// we will have three additional system columns, __bucket, __offset, __timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ void testTableWithExpression() throws Exception {
Map<String, String> expectedTableProperties = new HashMap<>();
expectedTableProperties.put("table.datalake.format", "paimon");
expectedTableProperties.put("table.replication.factor", "1");
expectedTableProperties.put("table.datalake.storage-version", "2");
assertThat(tableInfo.getProperties().toMap()).isEqualTo(expectedTableProperties);

Map<String, String> expectedCustomProperties = new HashMap<>();
Expand Down Expand Up @@ -896,6 +897,7 @@ protected static void assertOptionsEqual(
Map<String, String> actualOptions, Map<String, String> expectedOptions) {
actualOptions.remove(ConfigOptions.BOOTSTRAP_SERVERS.key());
actualOptions.remove(ConfigOptions.TABLE_REPLICATION_FACTOR.key());
actualOptions.remove(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION.key());
assertThat(actualOptions).isEqualTo(expectedOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION;
import static org.apache.fluss.config.ConfigOptions.TABLE_REPLICATION_FACTOR;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -140,6 +141,7 @@ private static void assertOptionsEqual(
actualOptions.remove(TABLE_REPLICATION_FACTOR.key());
// Remove datalake format (auto-added when datalake is enabled in Fluss cluster)
actualOptions.remove(TABLE_DATALAKE_FORMAT.key());
actualOptions.remove(TABLE_DATALAKE_STORAGE_VERSION.key());
assertThat(actualOptions).isEqualTo(expectedOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
throws TableNotExistException {
try {
List<SchemaChange> paimonSchemaChanges = toPaimonSchemaChanges(tableChanges);
Table paimonTable = getTable(tablePath);
List<SchemaChange> paimonSchemaChanges =
toPaimonSchemaChanges(paimonTable, tableChanges);

// Compare current Paimon table schema with expected target schema before altering
if (shouldAlterTable(tablePath, tableChanges)) {
Expand Down Expand Up @@ -157,6 +159,14 @@ private boolean shouldAlterTable(TablePath tablePath, List<TableChange> tableCha
}
}

private Table getTable(TablePath tablePath) throws TableNotExistException {
try {
return paimonCatalog.getTable(toPaimon(tablePath));
} catch (Catalog.TableNotExistException e) {
throw new TableNotExistException("Table " + tablePath + " does not exist.");
}
}

private boolean isColumnAlreadyExists(Schema currentSchema, TableChange.AddColumn addColumn) {
String columnName = addColumn.getName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ private ReadBuilder applyProject(
int timestampFieldPos = paimonFullRowType.getFieldIndex(TIMESTAMP_COLUMN_NAME);

int[] paimonProject =
IntStream.concat(
IntStream.of(projectIds),
IntStream.of(offsetFieldPos, timestampFieldPos))
.toArray();
hasSystemColumn(paimonFullRowType)
? IntStream.concat(
IntStream.of(projectIds),
IntStream.of(offsetFieldPos, timestampFieldPos))
.toArray()
: projectIds;

return readBuilder.withProjection(paimonProject);
}
Expand All @@ -110,17 +112,15 @@ public static class PaimonRowAsFlussRecordIterator implements CloseableIterator<
private final ProjectedRow projectedRow;
private final PaimonRowAsFlussRow paimonRowAsFlussRow;

private final int logOffsetColIndex;
private final int timestampColIndex;

public PaimonRowAsFlussRecordIterator(
org.apache.paimon.utils.CloseableIterator<InternalRow> paimonRowIterator,
RowType paimonRowType) {
this.paimonRowIterator = paimonRowIterator;
this.logOffsetColIndex = paimonRowType.getFieldIndex(OFFSET_COLUMN_NAME);
this.timestampColIndex = paimonRowType.getFieldIndex(TIMESTAMP_COLUMN_NAME);

int[] project = IntStream.range(0, paimonRowType.getFieldCount() - 2).toArray();
int[] project =
hasSystemColumn(paimonRowType)
? IntStream.range(0, paimonRowType.getFieldCount() - 2).toArray()
: IntStream.range(0, paimonRowType.getFieldCount()).toArray();
projectedRow = ProjectedRow.from(project);
paimonRowAsFlussRow = new PaimonRowAsFlussRow();
}
Expand All @@ -143,14 +143,21 @@ public boolean hasNext() {
public LogRecord next() {
InternalRow paimonRow = paimonRowIterator.next();
ChangeType changeType = toChangeType(paimonRow.getRowKind());
long offset = paimonRow.getLong(logOffsetColIndex);
long timestamp = paimonRow.getTimestamp(timestampColIndex, 6).getMillisecond();

return new GenericRecord(
offset,
timestamp,
-1L,
-1L,
changeType,
projectedRow.replaceRow(paimonRowAsFlussRow.replaceRow(paimonRow)));
}
}

// for legacy table, we will have system column
private static boolean hasSystemColumn(RowType paimonRowType) {
return paimonRowType
.getFields()
.get(paimonRowType.getFieldCount() - 1)
.name()
.equals(TIMESTAMP_COLUMN_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {

private final boolean paimonIncludingSystemColumns;
private final int bucket;
private LogRecord logRecord;
private int originRowFieldCount;
Expand All @@ -39,15 +40,25 @@ public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {
private final int offsetFieldIndex;
private final int timestampFieldIndex;

public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
public FlussRecordAsPaimonRow(
int bucket, RowType tableTowType, boolean paimonIncludingSystemColumns) {
super(tableTowType);
this.bucket = bucket;
this.businessFieldCount = tableRowType.getFieldCount() - SYSTEM_COLUMNS.size();
this.businessFieldCount =
tableRowType.getFieldCount()
- (paimonIncludingSystemColumns ? SYSTEM_COLUMNS.size() : 0);
this.paimonIncludingSystemColumns = paimonIncludingSystemColumns;

// only valid when paimon including system columns
this.bucketFieldIndex = businessFieldCount;
this.offsetFieldIndex = businessFieldCount + 1;
this.timestampFieldIndex = businessFieldCount + 2;
}

public FlussRecordAsPaimonRow(int bucket, RowType tablerowType) {
this(bucket, tablerowType, false);
}

public void setFlussRecord(LogRecord logRecord) {
this.logRecord = logRecord;
this.internalRow = logRecord.getRow();
Expand Down Expand Up @@ -86,13 +97,21 @@ public boolean isNullAt(int pos) {
// Padding NULL for missing business fields when Paimon schema is wider than Fluss
return true;
}
// is the last three system fields: bucket, offset, timestamp which are never null
return false;

if (paimonIncludingSystemColumns && pos < businessFieldCount + SYSTEM_COLUMNS.size()) {
// is the last three system fields: bucket, offset, timestamp which are never null
return false;
}

// shouldn't happen in normal cases.
throw new IllegalStateException(
String.format(
"Field %s is NULL because Paimon schema is wider than Fluss record.", pos));
}

@Override
public int getInt(int pos) {
if (pos == bucketFieldIndex) {
if (pos == bucketFieldIndex && paimonIncludingSystemColumns) {
// bucket system column
return bucket;
}
Expand All @@ -107,12 +126,14 @@ public int getInt(int pos) {

@Override
public long getLong(int pos) {
if (pos == offsetFieldIndex) {
// offset system column
return logRecord.logOffset();
} else if (pos == timestampFieldIndex) {
// timestamp system column
return logRecord.timestamp();
if (paimonIncludingSystemColumns) {
if (pos == offsetFieldIndex) {
// offset system column
return logRecord.logOffset();
} else if (pos == timestampFieldIndex) {
// timestamp system column
return logRecord.timestamp();
}
}
if (pos >= originRowFieldCount) {
throw new IllegalStateException(
Expand All @@ -126,8 +147,8 @@ public long getLong(int pos) {

@Override
public Timestamp getTimestamp(int pos, int precision) {
// it's timestamp system column
if (pos == timestampFieldIndex) {
if (paimonIncludingSystemColumns && pos == timestampFieldIndex) {
// it's timestamp system column
return Timestamp.fromEpochMillis(logRecord.timestamp());
}
if (pos >= originRowFieldCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,29 @@ public PaimonLakeWriter(

List<String> partitionKeys = fileStoreTable.partitionKeys();

// currently, when the storage version is not present, the corresponding
// paimon table must include system columns
boolean paimonIncludeSystemColumns =
!writerInitContext
.tableInfo()
.getTableConfig()
.getDataLakeStorageVersion()
.isPresent();

this.recordWriter =
fileStoreTable.primaryKeys().isEmpty()
? new AppendOnlyWriter(
fileStoreTable,
writerInitContext.tableBucket(),
writerInitContext.partition(),
partitionKeys)
partitionKeys,
paimonIncludeSystemColumns)
: new MergeTreeWriter(
fileStoreTable,
writerInitContext.tableBucket(),
writerInitContext.partition(),
partitionKeys);
partitionKeys,
paimonIncludeSystemColumns);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public RecordWriter(
RowType tableRowType,
TableBucket tableBucket,
@Nullable String partition,
List<String> partitionKeys) {
List<String> partitionKeys,
boolean paimonIncludingSystemColumns) {
this.tableWrite = tableWrite;
this.tableRowType = tableRowType;
this.bucket = tableBucket.getBucket();
Expand All @@ -56,7 +57,8 @@ public RecordWriter(
this.partition = BinaryRow.EMPTY_ROW;
}
this.flussRecordAsPaimonRow =
new FlussRecordAsPaimonRow(tableBucket.getBucket(), tableRowType);
new FlussRecordAsPaimonRow(
tableBucket.getBucket(), tableRowType, paimonIncludingSystemColumns);
}

public abstract void write(LogRecord record) throws Exception;
Expand Down
Loading