From 7c5b0c162168505cce827f1d79cf56bbdb49c5c8 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 29 Mar 2026 18:44:35 +0100 Subject: [PATCH 1/2] [spark] Add union read for lake-enabled log tables --- .../org/apache/fluss/spark/SparkTable.scala | 15 +- .../spark/read/FlussInputPartition.scala | 18 + .../fluss/spark/read/FlussLakeBatch.scala | 309 ++++++++++++++++++ .../spark/read/FlussLakePartitionReader.scala | 83 +++++ .../FlussLakePartitionReaderFactory.scala | 55 ++++ .../spark/read/FlussLakeSourceUtils.scala | 49 +++ .../apache/fluss/spark/read/FlussScan.scala | 24 ++ .../fluss/spark/read/FlussScanBuilder.scala | 13 + fluss-spark/fluss-spark-ut/pom.xml | 7 + .../SparkLakeIcebergLogTableReadTest.scala | 45 +++ .../lake/SparkLakeLogTableReadTestBase.scala | 255 +++++++++++++++ .../SparkLakePaimonLogTableReadTest.scala | 45 +++ 12 files changed, 916 insertions(+), 2 deletions(-) create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala create mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala create mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala create mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala index 144db03aea..c2ad05081c 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala @@ -21,7 +21,7 @@ import org.apache.fluss.client.admin.Admin import org.apache.fluss.config.{Configuration => FlussConfiguration} import org.apache.fluss.metadata.{TableInfo, TablePath} import org.apache.fluss.spark.catalog.{AbstractSparkTable, SupportsFlussPartitionManagement} -import org.apache.fluss.spark.read.{FlussAppendScanBuilder, FlussUpsertScanBuilder} +import org.apache.fluss.spark.read.{FlussAppendScanBuilder, FlussLakeAppendScanBuilder, FlussUpsertScanBuilder} import org.apache.fluss.spark.write.{FlussAppendWriteBuilder, FlussUpsertWriteBuilder} import org.apache.spark.sql.catalyst.SQLConfHelper @@ -61,8 +61,19 @@ class SparkTable( override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { populateSparkConf(flussConfig) + val isDataLakeEnabled = tableInfo.getTableConfig.isDataLakeEnabled + val startupMode = options + .getOrDefault( + SparkFlussConf.SCAN_START_UP_MODE.key(), + flussConfig.get(SparkFlussConf.SCAN_START_UP_MODE)) + .toUpperCase + val isFullMode = startupMode == SparkFlussConf.StartUpMode.FULL.toString if (tableInfo.getPrimaryKeys.isEmpty) { - new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig) + if (isDataLakeEnabled && isFullMode) { + new FlussLakeAppendScanBuilder(tablePath, tableInfo, options, flussConfig) + } else { + new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig) + } } else { new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig) } diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala index 397b624306..ccf89cc87c 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala @@ -45,6 +45,24 @@ case class FlussAppendInputPartition(tableBucket: TableBucket, startOffset: Long } } +/** + * Represents an input partition for reading data from a single lake split. Each lake split maps to + * one Spark task, enabling parallel lake reads across splits. + * + * @param tableBucket + * the table bucket this split belongs to + * @param lakeSplitBytes + * serialized lake split data + */ +case class FlussLakeInputPartition(tableBucket: TableBucket, lakeSplitBytes: Array[Byte]) + extends FlussInputPartition { + override def toString: String = { + s"FlussLakeInputPartition{tableId=${tableBucket.getTableId}, bucketId=${tableBucket.getBucket}," + + s" partitionId=${tableBucket.getPartitionId}," + + s" splitSize=${lakeSplitBytes.length}}" + } +} + /** * Represents an input partition for reading data from a primary key table bucket. This partition * includes snapshot information for hybrid snapshot-log reading. diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala new file mode 100644 index 0000000000..cc4e2ee927 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeBatch.scala @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.read + +import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, OffsetsInitializer} +import org.apache.fluss.client.table.scanner.log.LogScanner +import org.apache.fluss.config.Configuration +import org.apache.fluss.exception.LakeTableSnapshotNotExistException +import org.apache.fluss.lake.serializer.SimpleVersionedSerializer +import org.apache.fluss.lake.source.{LakeSource, LakeSplit} +import org.apache.fluss.metadata.{ResolvedPartitionSpec, TableBucket, TableInfo, TablePath} +import org.apache.fluss.utils.ExceptionUtils + +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** Batch for reading lake-enabled log table (append-only table with datalake). */ +class FlussLakeAppendBatch( + tablePath: TablePath, + tableInfo: TableInfo, + readSchema: StructType, + options: CaseInsensitiveStringMap, + flussConfig: Configuration) + extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) { + + // Required by FlussBatch but unused — lake snapshot determines start offsets. + override val startOffsetsInitializer: OffsetsInitializer = OffsetsInitializer.earliest() + + override val stoppingOffsetsInitializer: OffsetsInitializer = { + FlussOffsetInitializers.stoppingOffsetsInitializer(true, options, flussConfig) + } + + private lazy val planned: (Array[InputPartition], Boolean) = doPlan() + + override def planInputPartitions(): Array[InputPartition] = planned._1 + + override def createReaderFactory(): PartitionReaderFactory = { + if (planned._2) { + new FlussAppendPartitionReaderFactory(tablePath, projection, options, flussConfig) + } else { + new FlussLakeAppendPartitionReaderFactory( + tableInfo.getProperties.toMap, + tablePath, + tableInfo.getRowType, + projection, + flussConfig) + } + } + + private def doPlan(): (Array[InputPartition], Boolean) = { + val lakeSnapshot = + try { + admin.getReadableLakeSnapshot(tablePath).get() + } catch { + case e: Exception => + if ( + ExceptionUtils + .stripExecutionException(e) + .isInstanceOf[LakeTableSnapshotNotExistException] + ) { + return (planFallbackPartitions(), true) + } + throw e + } + + val lakeSource = FlussLakeSourceUtils.createLakeSource(tableInfo.getProperties.toMap, tablePath) + lakeSource.withProject(FlussLakeSourceUtils.lakeProjection(projection)) + + val lakeSplits = lakeSource + .createPlanner(new LakeSource.PlannerContext { + override def snapshotId(): Long = lakeSnapshot.getSnapshotId + }) + .plan() + + val splitSerializer = lakeSource.getSplitSerializer + val tableBucketsOffset = lakeSnapshot.getTableBucketsOffset + val buckets = (0 until tableInfo.getNumBuckets).toSeq + val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, tablePath) + + val partitions = if (tableInfo.isPartitioned) { + planPartitionedTable( + lakeSplits.asScala, + splitSerializer, + tableBucketsOffset, + buckets, + bucketOffsetsRetriever) + } else { + planNonPartitionedTable( + lakeSplits.asScala, + splitSerializer, + tableBucketsOffset, + buckets, + bucketOffsetsRetriever) + } + + (partitions, false) + } + + private def planNonPartitionedTable( + lakeSplits: Seq[LakeSplit], + splitSerializer: SimpleVersionedSerializer[LakeSplit], + tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long], + buckets: Seq[Int], + bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Array[InputPartition] = { + val result = mutable.ArrayBuffer.empty[InputPartition] + val tableId = tableInfo.getTableId + + addLakePartitions(result, lakeSplits, splitSerializer, tableId, partitionId = null) + + val stoppingOffsets = + getBucketOffsets(stoppingOffsetsInitializer, null, buckets, bucketOffsetsRetriever) + buckets.foreach { + bucketId => + val tableBucket = new TableBucket(tableId, bucketId) + addLogTailPartition(result, tableBucket, tableBucketsOffset, stoppingOffsets(bucketId)) + } + + result.toArray + } + + private def planPartitionedTable( + lakeSplits: Seq[LakeSplit], + splitSerializer: SimpleVersionedSerializer[LakeSplit], + tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long], + buckets: Seq[Int], + bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Array[InputPartition] = { + val result = mutable.ArrayBuffer.empty[InputPartition] + val tableId = tableInfo.getTableId + + val flussPartitionIdByName = mutable.LinkedHashMap.empty[String, Long] + partitionInfos.asScala.foreach { + pi => flussPartitionIdByName(pi.getPartitionName) = pi.getPartitionId + } + + val lakeSplitsByPartition = groupLakeSplitsByPartition(lakeSplits) + var lakeSplitPartitionId = -1L + + lakeSplitsByPartition.foreach { + case (partitionName, splits) => + flussPartitionIdByName.remove(partitionName) match { + case Some(partitionId) => + // Partition in both lake and Fluss — lake splits + log tail + addLakePartitions(result, splits, splitSerializer, tableId, partitionId) + + val stoppingOffsets = getBucketOffsets( + stoppingOffsetsInitializer, + partitionName, + buckets, + bucketOffsetsRetriever) + buckets.foreach { + bucketId => + val tableBucket = new TableBucket(tableId, partitionId, bucketId) + addLogTailPartition( + result, + tableBucket, + tableBucketsOffset, + stoppingOffsets(bucketId)) + } + + case None => + // Partition only in lake (expired in Fluss) — lake splits only + val pid = lakeSplitPartitionId + lakeSplitPartitionId -= 1 + addLakePartitions(result, splits, splitSerializer, tableId, pid) + } + } + + // Partitions only in Fluss (not yet tiered) — log from earliest + flussPartitionIdByName.foreach { + case (partitionName, partitionId) => + val stoppingOffsets = getBucketOffsets( + stoppingOffsetsInitializer, + partitionName, + buckets, + bucketOffsetsRetriever) + buckets.foreach { + bucketId => + val stoppingOffset = stoppingOffsets(bucketId) + if (stoppingOffset > 0) { + val tableBucket = new TableBucket(tableId, partitionId, bucketId) + result += FlussAppendInputPartition( + tableBucket, + LogScanner.EARLIEST_OFFSET, + stoppingOffset) + } + } + } + + result.toArray + } + + private def groupLakeSplitsByPartition( + lakeSplits: Seq[LakeSplit]): mutable.LinkedHashMap[String, mutable.ArrayBuffer[LakeSplit]] = { + val grouped = mutable.LinkedHashMap.empty[String, mutable.ArrayBuffer[LakeSplit]] + lakeSplits.foreach { + split => + val partitionName = if (split.partition() == null || split.partition().isEmpty) { + "" + } else { + split.partition().asScala.mkString(ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR) + } + grouped.getOrElseUpdate(partitionName, mutable.ArrayBuffer.empty) += split + } + grouped + } + + private def addLakePartitions( + result: mutable.ArrayBuffer[InputPartition], + splits: Seq[LakeSplit], + splitSerializer: SimpleVersionedSerializer[LakeSplit], + tableId: Long, + partitionId: java.lang.Long): Unit = { + splits.foreach { + split => + val tableBucket = if (partitionId != null) { + new TableBucket(tableId, partitionId, split.bucket()) + } else { + new TableBucket(tableId, split.bucket()) + } + result += FlussLakeInputPartition(tableBucket, splitSerializer.serialize(split)) + } + } + + private def addLogTailPartition( + result: mutable.ArrayBuffer[InputPartition], + tableBucket: TableBucket, + tableBucketsOffset: java.util.Map[TableBucket, java.lang.Long], + stoppingOffset: Long): Unit = { + val snapshotLogOffset = tableBucketsOffset.get(tableBucket) + if (snapshotLogOffset != null) { + if (snapshotLogOffset.longValue() < stoppingOffset) { + result += FlussAppendInputPartition( + tableBucket, + snapshotLogOffset.longValue(), + stoppingOffset) + } + } else if (stoppingOffset > 0) { + result += FlussAppendInputPartition(tableBucket, LogScanner.EARLIEST_OFFSET, stoppingOffset) + } + } + + private def getBucketOffsets( + initializer: OffsetsInitializer, + partitionName: String, + buckets: Seq[Int], + bucketOffsetsRetriever: BucketOffsetsRetrieverImpl): Map[Int, Long] = { + initializer + .getBucketOffsets(partitionName, buckets.map(Integer.valueOf).asJava, bucketOffsetsRetriever) + .asScala + .map(e => (e._1.intValue(), Long2long(e._2))) + .toMap + } + + private def planFallbackPartitions(): Array[InputPartition] = { + val fallbackStartInit = FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig) + val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, tablePath) + val buckets = (0 until tableInfo.getNumBuckets).toSeq + val tableId = tableInfo.getTableId + + def createPartitions( + partitionId: Option[Long], + partitionName: String): Array[InputPartition] = { + val startOffsets = + getBucketOffsets(fallbackStartInit, partitionName, buckets, bucketOffsetsRetriever) + val stoppingOffsets = + getBucketOffsets(stoppingOffsetsInitializer, partitionName, buckets, bucketOffsetsRetriever) + + buckets.map { + bucketId => + val tableBucket = partitionId match { + case Some(pid) => new TableBucket(tableId, pid, bucketId) + case None => new TableBucket(tableId, bucketId) + } + FlussAppendInputPartition( + tableBucket, + startOffsets(bucketId), + stoppingOffsets(bucketId) + ): InputPartition + }.toArray + } + + if (tableInfo.isPartitioned) { + partitionInfos.asScala.flatMap { + pi => createPartitions(Some(pi.getPartitionId), pi.getPartitionName) + }.toArray + } else { + createPartitions(None, null) + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala new file mode 100644 index 0000000000..9c0031409b --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReader.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.read + +import org.apache.fluss.lake.source.{LakeSource, LakeSplit} +import org.apache.fluss.metadata.TablePath +import org.apache.fluss.record.LogRecord +import org.apache.fluss.spark.row.DataConverter +import org.apache.fluss.types.RowType +import org.apache.fluss.utils.CloseableIterator + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.PartitionReader + +/** Partition reader that reads data from a single lake split via lake storage (no Fluss connection). */ +class FlussLakePartitionReader( + tablePath: TablePath, + rowType: RowType, + partition: FlussLakeInputPartition, + lakeSource: LakeSource[LakeSplit]) + extends PartitionReader[InternalRow] + with Logging { + + private var currentRow: InternalRow = _ + private var closed = false + private var recordIterator: CloseableIterator[LogRecord] = _ + + initialize() + + private def initialize(): Unit = { + logInfo(s"Reading lake split for table $tablePath bucket=${partition.tableBucket.getBucket}") + + val splitSerializer = lakeSource.getSplitSerializer + val split = splitSerializer.deserialize(splitSerializer.getVersion, partition.lakeSplitBytes) + + recordIterator = lakeSource + .createRecordReader(new LakeSource.ReaderContext[LakeSplit] { + override def lakeSplit(): LakeSplit = split + }) + .read() + } + + override def next(): Boolean = { + if (closed || recordIterator == null) { + return false + } + + if (recordIterator.hasNext) { + val logRecord = recordIterator.next() + currentRow = DataConverter.toSparkInternalRow(logRecord.getRow, rowType) + true + } else { + false + } + } + + override def get(): InternalRow = currentRow + + override def close(): Unit = { + if (!closed) { + closed = true + if (recordIterator != null) { + recordIterator.close() + } + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala new file mode 100644 index 0000000000..31c4255bd3 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakePartitionReaderFactory.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.read + +import org.apache.fluss.config.Configuration +import org.apache.fluss.lake.source.{LakeSource, LakeSplit} +import org.apache.fluss.metadata.TablePath +import org.apache.fluss.types.RowType + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} + +import java.util + +/** Factory for lake-enabled log table reads. Dispatches to lake or log reader per partition type. */ +class FlussLakeAppendPartitionReaderFactory( + tableProperties: util.Map[String, String], + tablePath: TablePath, + rowType: RowType, + projection: Array[Int], + flussConfig: Configuration) + extends PartitionReaderFactory { + + @transient private lazy val lakeSource: LakeSource[LakeSplit] = { + val source = FlussLakeSourceUtils.createLakeSource(tableProperties, tablePath) + source.withProject(FlussLakeSourceUtils.lakeProjection(projection)) + source + } + + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + partition match { + case lake: FlussLakeInputPartition => + new FlussLakePartitionReader(tablePath, rowType, lake, lakeSource) + case log: FlussAppendInputPartition => + new FlussAppendPartitionReader(tablePath, projection, log, flussConfig) + case _ => + throw new IllegalArgumentException(s"Unexpected partition type: ${partition.getClass}") + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala new file mode 100644 index 0000000000..41958c3442 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussLakeSourceUtils.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.read + +import org.apache.fluss.config.{ConfigOptions, Configuration} +import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp +import org.apache.fluss.lake.source.{LakeSource, LakeSplit} +import org.apache.fluss.metadata.TablePath +import org.apache.fluss.utils.PropertiesUtils + +import java.util + +/** Shared utilities for creating lake sources and projections. */ +object FlussLakeSourceUtils { + + def createLakeSource( + tableProperties: util.Map[String, String], + tablePath: TablePath): LakeSource[LakeSplit] = { + val tableConfig = Configuration.fromMap(tableProperties) + val datalakeFormat = tableConfig.get(ConfigOptions.TABLE_DATALAKE_FORMAT) + val dataLakePrefix = "table.datalake." + datalakeFormat + "." + + val catalogProperties = PropertiesUtils.extractAndRemovePrefix(tableProperties, dataLakePrefix) + val lakeConfig = Configuration.fromMap(catalogProperties) + val lakeStoragePlugin = + LakeStoragePluginSetUp.fromDataLakeFormat(datalakeFormat.toString, null) + val lakeStorage = lakeStoragePlugin.createLakeStorage(lakeConfig) + lakeStorage.createLakeSource(tablePath).asInstanceOf[LakeSource[LakeSplit]] + } + + def lakeProjection(projection: Array[Int]): Array[Array[Int]] = { + projection.map(i => Array(i)) + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala index a543961272..d4e14bd479 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala @@ -61,6 +61,30 @@ case class FlussAppendScan( } } +/** Fluss Lake Append Scan. */ +case class FlussLakeAppendScan( + tablePath: TablePath, + tableInfo: TableInfo, + requiredSchema: Option[StructType], + options: CaseInsensitiveStringMap, + flussConfig: Configuration) + extends FlussScan { + + override def toBatch: Batch = { + new FlussLakeAppendBatch(tablePath, tableInfo, readSchema, options, flussConfig) + } + + override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { + new FlussAppendMicroBatchStream( + tablePath, + tableInfo, + readSchema, + options, + flussConfig, + checkpointLocation) + } +} + /** Fluss Upsert Scan. */ case class FlussUpsertScan( tablePath: TablePath, diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala index cd3e6768f4..9dd49f4df6 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala @@ -47,6 +47,19 @@ class FlussAppendScanBuilder( } } +/** Fluss Lake Append Scan Builder. */ +class FlussLakeAppendScanBuilder( + tablePath: TablePath, + tableInfo: TableInfo, + options: CaseInsensitiveStringMap, + flussConfig: FlussConfiguration) + extends FlussScanBuilder { + + override def build(): Scan = { + FlussLakeAppendScan(tablePath, tableInfo, requiredSchema, options, flussConfig) + } +} + /** Fluss Upsert Scan Builder. */ class FlussUpsertScanBuilder( tablePath: TablePath, diff --git a/fluss-spark/fluss-spark-ut/pom.xml b/fluss-spark/fluss-spark-ut/pom.xml index a6d663a0ed..1469659609 100644 --- a/fluss-spark/fluss-spark-ut/pom.xml +++ b/fluss-spark/fluss-spark-ut/pom.xml @@ -95,6 +95,13 @@ test + + org.apache.fluss + fluss-lake-iceberg + ${project.version} + test + + org.apache.curator diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala new file mode 100644 index 0000000000..3801fdda04 --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.lake + +import org.apache.fluss.config.Configuration +import org.apache.fluss.lake.iceberg.tiering.IcebergLakeTieringFactory +import org.apache.fluss.lake.writer.LakeTieringFactory +import org.apache.fluss.metadata.DataLakeFormat + +import java.nio.file.Files + +class SparkLakeIcebergLogTableReadTest extends SparkLakeLogTableReadTestBase { + + override protected def flussConf: Configuration = { + val conf = super.flussConf + conf.setString("datalake.format", DataLakeFormat.ICEBERG.toString) + conf.setString("datalake.iceberg.type", "hadoop") + warehousePath = + Files.createTempDirectory("fluss-testing-iceberg-lake-read").resolve("warehouse").toString + conf.setString("datalake.iceberg.warehouse", warehousePath) + conf + } + + override protected def createLakeTieringFactory(): LakeTieringFactory[_, _] = { + val icebergConfig = new Configuration() + icebergConfig.setString("type", "hadoop") + icebergConfig.setString("warehouse", warehousePath) + new IcebergLakeTieringFactory(icebergConfig) + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala new file mode 100644 index 0000000000..abbc43243d --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeLogTableReadTestBase.scala @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.lake + +import org.apache.fluss.config.{ConfigOptions, Configuration} +import org.apache.fluss.lake.committer.{CommitterInitContext, LakeCommitter} +import org.apache.fluss.lake.writer.{LakeTieringFactory, WriterInitContext} +import org.apache.fluss.metadata.{TableBucket, TableInfo, TablePath} +import org.apache.fluss.rpc.messages.{CommitLakeTableSnapshotRequest, PbLakeTableOffsetForBucket, PbLakeTableSnapshotInfo} +import org.apache.fluss.spark.FlussSparkTestBase +import org.apache.fluss.spark.SparkConnectorOptions.BUCKET_NUMBER + +import org.apache.spark.sql.Row + +import java.time.Duration +import java.util.Collections + +import scala.collection.JavaConverters._ + +/** + * Base class for lake-enabled log table read tests. Subclasses provide the lake format config and + * tiering factory. + */ +abstract class SparkLakeLogTableReadTestBase extends FlussSparkTestBase { + + protected var warehousePath: String = _ + + protected def createLakeTieringFactory(): LakeTieringFactory[_, _] + + override protected def withTable(tableNames: String*)(f: => Unit): Unit = { + try { + f + } finally { + tableNames.foreach(t => sql(s"DROP TABLE IF EXISTS $DEFAULT_DATABASE.$t")) + } + } + + private def tierToLake(tp: TablePath, ti: TableInfo, expectedRecordCount: Int): Long = { + val tableId = ti.getTableId + + val table = loadFlussTable(tp) + val logScanner = table.newScan().createLogScanner() + logScanner.subscribeFromBeginning(0) + + val scanRecords = + new java.util.ArrayList[org.apache.fluss.client.table.scanner.ScanRecord]() + val deadline = System.currentTimeMillis() + 30000 + while (scanRecords.size() < expectedRecordCount && System.currentTimeMillis() < deadline) { + val batch = logScanner.poll(Duration.ofSeconds(1)) + batch.iterator().asScala.foreach(r => scanRecords.add(r)) + } + assert( + scanRecords.size() == expectedRecordCount, + s"Expected $expectedRecordCount scan records, got ${scanRecords.size()}") + val logEndOffset = scanRecords.asScala.map(_.logOffset()).max + 1 + + val factory = createLakeTieringFactory() + + val tb = new TableBucket(tableId, null, 0) + val lakeWriter = factory + .asInstanceOf[LakeTieringFactory[Any, Any]] + .createLakeWriter(new WriterInitContext { + override def tablePath(): TablePath = tp + override def tableBucket(): TableBucket = tb + override def partition(): String = null + override def tableInfo(): TableInfo = ti + }) + for (record <- scanRecords.asScala) { + lakeWriter.write(record) + } + val writeResult = lakeWriter.complete() + lakeWriter.close() + + val lakeCommitter = factory + .asInstanceOf[LakeTieringFactory[Any, Any]] + .createLakeCommitter(new CommitterInitContext { + override def tablePath(): TablePath = tp + override def tableInfo(): TableInfo = ti + override def lakeTieringConfig(): Configuration = new Configuration() + override def flussClientConfig(): Configuration = new Configuration() + }) + val committable = + lakeCommitter.toCommittable(Collections.singletonList(writeResult)) + val commitResult = + lakeCommitter.commit(committable, Collections.emptyMap()) + val snapshotId = commitResult.getCommittedSnapshotId + lakeCommitter.close() + + val coordinatorGateway = flussServer.newCoordinatorClient() + val request = new CommitLakeTableSnapshotRequest() + val tableReq: PbLakeTableSnapshotInfo = request.addTablesReq() + tableReq.setTableId(tableId) + tableReq.setSnapshotId(snapshotId) + val bucketReq: PbLakeTableOffsetForBucket = tableReq.addBucketsReq() + bucketReq.setBucketId(0) + bucketReq.setLogEndOffset(logEndOffset) + bucketReq.setMaxTimestamp(System.currentTimeMillis()) + coordinatorGateway.commitLakeTableSnapshot(request).get() + + Thread.sleep(2000) + + logScanner.close() + table.close() + + logEndOffset + } + + test("Spark Lake Read: log table falls back when no lake snapshot") { + withTable("t") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t (id INT, name STRING) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t VALUES + |(1, "hello"), (2, "world"), (3, "fluss") + |""".stripMargin) + + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY id"), + Row(1, "hello") :: Row(2, "world") :: Row(3, "fluss") :: Nil + ) + + checkAnswer( + sql(s"SELECT name FROM $DEFAULT_DATABASE.t ORDER BY name"), + Row("fluss") :: Row("hello") :: Row("world") :: Nil + ) + } + } + + test("Spark Lake Read: log table lake-only (all data in lake, no log tail)") { + withTable("t_lake_only") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_lake_only (id INT, name STRING) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_lake_only VALUES + |(1, "alpha"), (2, "beta"), (3, "gamma") + |""".stripMargin) + + val tablePath = createTablePath("t_lake_only") + val table = loadFlussTable(tablePath) + val tableInfo = table.getTableInfo + table.close() + + tierToLake(tablePath, tableInfo, expectedRecordCount = 3) + + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_lake_only ORDER BY id"), + Row(1, "alpha") :: Row(2, "beta") :: Row(3, "gamma") :: Nil + ) + + checkAnswer( + sql(s"SELECT name FROM $DEFAULT_DATABASE.t_lake_only ORDER BY name"), + Row("alpha") :: Row("beta") :: Row("gamma") :: Nil + ) + } + } + + test("Spark Lake Read: log table union read (lake + log tail)") { + withTable("t_union") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_union (id INT, name STRING) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_union VALUES + |(1, "alpha"), (2, "beta"), (3, "gamma") + |""".stripMargin) + + val tablePath = createTablePath("t_union") + val table = loadFlussTable(tablePath) + val tableInfo = table.getTableInfo + table.close() + + tierToLake(tablePath, tableInfo, expectedRecordCount = 3) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_union VALUES + |(4, "delta"), (5, "epsilon") + |""".stripMargin) + + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_union ORDER BY id"), + Row(1, "alpha") :: Row(2, "beta") :: Row(3, "gamma") :: + Row(4, "delta") :: Row(5, "epsilon") :: Nil + ) + + checkAnswer( + sql(s"SELECT name FROM $DEFAULT_DATABASE.t_union ORDER BY name"), + Row("alpha") :: Row("beta") :: Row("delta") :: + Row("epsilon") :: Row("gamma") :: Nil + ) + } + } + + test("Spark Lake Read: non-FULL startup mode skips lake path") { + withTable("t_earliest") { + sql(s""" + |CREATE TABLE $DEFAULT_DATABASE.t_earliest (id INT, name STRING) + | TBLPROPERTIES ( + | '${ConfigOptions.TABLE_DATALAKE_ENABLED.key()}' = true, + | '${BUCKET_NUMBER.key()}' = 1) + |""".stripMargin) + + sql(s""" + |INSERT INTO $DEFAULT_DATABASE.t_earliest VALUES + |(1, "alpha"), (2, "beta"), (3, "gamma") + |""".stripMargin) + + val tablePath = createTablePath("t_earliest") + val table = loadFlussTable(tablePath) + val tableInfo = table.getTableInfo + table.close() + + tierToLake(tablePath, tableInfo, expectedRecordCount = 3) + + try { + spark.conf.set("spark.sql.fluss.scan.startup.mode", "earliest") + + checkAnswer( + sql(s"SELECT * FROM $DEFAULT_DATABASE.t_earliest ORDER BY id"), + Row(1, "alpha") :: Row(2, "beta") :: Row(3, "gamma") :: Nil + ) + } finally { + spark.conf.set("spark.sql.fluss.scan.startup.mode", "full") + } + } + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala new file mode 100644 index 0000000000..1d94bb1260 --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakePaimonLogTableReadTest.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.lake + +import org.apache.fluss.config.Configuration +import org.apache.fluss.lake.paimon.tiering.PaimonLakeTieringFactory +import org.apache.fluss.lake.writer.LakeTieringFactory +import org.apache.fluss.metadata.DataLakeFormat + +import java.nio.file.Files + +class SparkLakePaimonLogTableReadTest extends SparkLakeLogTableReadTestBase { + + override protected def flussConf: Configuration = { + val conf = super.flussConf + conf.setString("datalake.format", DataLakeFormat.PAIMON.toString) + conf.setString("datalake.paimon.metastore", "filesystem") + conf.setString("datalake.paimon.cache-enabled", "false") + warehousePath = + Files.createTempDirectory("fluss-testing-lake-read").resolve("warehouse").toString + conf.setString("datalake.paimon.warehouse", warehousePath) + conf + } + + override protected def createLakeTieringFactory(): LakeTieringFactory[_, _] = { + val paimonConfig = new Configuration() + paimonConfig.setString("warehouse", warehousePath) + new PaimonLakeTieringFactory(paimonConfig) + } +} From e39b4b1bc3fced313334c3e084be4dc5224343ed Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Mon, 30 Mar 2026 01:48:34 +0100 Subject: [PATCH 2/2] remove iceberg test for now, shading problem --- fluss-spark/fluss-spark-ut/pom.xml | 7 --- .../SparkLakeIcebergLogTableReadTest.scala | 45 ------------------- 2 files changed, 52 deletions(-) delete mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala diff --git a/fluss-spark/fluss-spark-ut/pom.xml b/fluss-spark/fluss-spark-ut/pom.xml index 1469659609..a6d663a0ed 100644 --- a/fluss-spark/fluss-spark-ut/pom.xml +++ b/fluss-spark/fluss-spark-ut/pom.xml @@ -95,13 +95,6 @@ test - - org.apache.fluss - fluss-lake-iceberg - ${project.version} - test - - org.apache.curator diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala deleted file mode 100644 index 3801fdda04..0000000000 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/lake/SparkLakeIcebergLogTableReadTest.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.lake - -import org.apache.fluss.config.Configuration -import org.apache.fluss.lake.iceberg.tiering.IcebergLakeTieringFactory -import org.apache.fluss.lake.writer.LakeTieringFactory -import org.apache.fluss.metadata.DataLakeFormat - -import java.nio.file.Files - -class SparkLakeIcebergLogTableReadTest extends SparkLakeLogTableReadTestBase { - - override protected def flussConf: Configuration = { - val conf = super.flussConf - conf.setString("datalake.format", DataLakeFormat.ICEBERG.toString) - conf.setString("datalake.iceberg.type", "hadoop") - warehousePath = - Files.createTempDirectory("fluss-testing-iceberg-lake-read").resolve("warehouse").toString - conf.setString("datalake.iceberg.warehouse", warehousePath) - conf - } - - override protected def createLakeTieringFactory(): LakeTieringFactory[_, _] = { - val icebergConfig = new Configuration() - icebergConfig.setString("type", "hadoop") - icebergConfig.setString("warehouse", warehousePath) - new IcebergLakeTieringFactory(icebergConfig) - } -}