diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java index 7de1695af0be..7d0f051756a9 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java @@ -18,9 +18,11 @@ package org.apache.paimon.spark; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Blob; import org.apache.paimon.data.BlobData; +import org.apache.paimon.data.BlobDescriptor; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; @@ -29,6 +31,8 @@ import org.apache.paimon.data.variant.Variant; import org.apache.paimon.spark.util.shim.TypeUtils$; import org.apache.paimon.types.RowKind; +import org.apache.paimon.utils.UriReader; +import org.apache.paimon.utils.UriReaderFactory; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.DateTimeUtils; @@ -41,43 +45,43 @@ import org.apache.spark.sql.types.TimestampNTZType; import org.apache.spark.sql.types.TimestampType; +import javax.annotation.Nullable; + import java.io.Serializable; import java.math.BigDecimal; import java.util.HashMap; import java.util.Map; -/** Wrapper to fetch value from the spark internal row. */ +/** + * An {@link InternalRow} wraps spark {@link org.apache.spark.sql.catalyst.InternalRow} for v2 + * write. + */ public class SparkInternalRowWrapper implements InternalRow, Serializable { - private transient org.apache.spark.sql.catalyst.InternalRow internalRow; - private final int length; - private final int rowKindIdx; private final StructType tableSchema; - private int[] fieldIndexMap = null; + private final int length; + private final boolean blobAsDescriptor; + @Nullable private final UriReaderFactory uriReaderFactory; + @Nullable private final int[] fieldIndexMap; - public SparkInternalRowWrapper( - org.apache.spark.sql.catalyst.InternalRow internalRow, - int rowKindIdx, - StructType tableSchema, - int length) { - this.internalRow = internalRow; - this.rowKindIdx = rowKindIdx; - this.length = length; - this.tableSchema = tableSchema; - } + private transient org.apache.spark.sql.catalyst.InternalRow internalRow; - public SparkInternalRowWrapper(int rowKindIdx, StructType tableSchema, int length) { - this.rowKindIdx = rowKindIdx; - this.length = length; - this.tableSchema = tableSchema; + public SparkInternalRowWrapper(StructType tableSchema, int length) { + this(tableSchema, length, null, false, null); } public SparkInternalRowWrapper( - int rowKindIdx, StructType tableSchema, StructType dataSchema, int length) { - this.rowKindIdx = rowKindIdx; - this.length = length; + StructType tableSchema, + int length, + StructType dataSchema, + boolean blobAsDescriptor, + CatalogContext catalogContext) { this.tableSchema = tableSchema; - this.fieldIndexMap = buildFieldIndexMap(tableSchema, dataSchema); + this.length = length; + this.fieldIndexMap = + dataSchema != null ? buildFieldIndexMap(tableSchema, dataSchema) : null; + this.blobAsDescriptor = blobAsDescriptor; + this.uriReaderFactory = blobAsDescriptor ? new UriReaderFactory(catalogContext) : null; } public SparkInternalRowWrapper replace(org.apache.spark.sql.catalyst.InternalRow internalRow) { @@ -128,12 +132,6 @@ public int getFieldCount() { @Override public RowKind getRowKind() { - if (rowKindIdx != -1) { - int actualPos = getActualFieldPosition(rowKindIdx); - if (actualPos != -1) { - return RowKind.fromByteValue(internalRow.getByte(actualPos)); - } - } return RowKind.INSERT; } @@ -244,7 +242,13 @@ public Variant getVariant(int pos) { @Override public Blob getBlob(int pos) { - return new BlobData(internalRow.getBinary(pos)); + if (blobAsDescriptor) { + BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(internalRow.getBinary(pos)); + UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri()); + return Blob.fromDescriptor(uriReader, blobDescriptor); + } else { + return new BlobData(internalRow.getBinary(pos)); + } } @Override @@ -276,10 +280,8 @@ public InternalRow getRow(int pos, int numFields) { return null; } return new SparkInternalRowWrapper( - internalRow.getStruct(actualPos, numFields), - -1, - (StructType) tableSchema.fields()[actualPos].dataType(), - numFields); + (StructType) tableSchema.fields()[actualPos].dataType(), numFields) + .replace(internalRow.getStruct(actualPos, numFields)); } private static Timestamp convertToTimestamp(DataType dataType, long micros) { @@ -434,8 +436,8 @@ public InternalMap getMap(int pos) { @Override public InternalRow getRow(int pos, int numFields) { - return new SparkInternalRowWrapper( - arrayData.getStruct(pos, numFields), -1, (StructType) elementType, numFields); + return new SparkInternalRowWrapper((StructType) elementType, numFields) + .replace(arrayData.getStruct(pos, numFields)); } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java index d06574376883..9ac1e5999422 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java @@ -55,7 +55,7 @@ import scala.collection.JavaConverters; -/** A {@link InternalRow} wraps spark {@link Row}. */ +/** An {@link InternalRow} wraps spark {@link Row} for v1 write. */ public class SparkRow implements InternalRow, Serializable { private final RowType type; @@ -78,7 +78,7 @@ public SparkRow( this.row = row; this.rowKind = rowkind; this.blobAsDescriptor = blobAsDescriptor; - this.uriReaderFactory = new UriReaderFactory(catalogContext); + this.uriReaderFactory = blobAsDescriptor ? new UriReaderFactory(catalogContext) : null; } @Override diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala index a0d97c68d48b..acdd40e9b244 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala @@ -25,7 +25,7 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableMap, import org.apache.paimon.spark.SparkInternalRowWrapper import org.apache.paimon.spark.SparkTypeUtils.toPaimonRowType import org.apache.paimon.spark.catalog.functions.PaimonFunctions._ -import org.apache.paimon.spark.function.{DescriptorToStringFunction, DescriptorToStringUnbound, PathToDescriptorFunction, PathToDescriptorUnbound} +import org.apache.paimon.spark.function.{DescriptorToStringUnbound, PathToDescriptorUnbound} import org.apache.paimon.table.{BucketMode, FileStoreTable} import org.apache.paimon.types.{ArrayType, DataType => PaimonDataType, LocalZonedTimestampType, MapType, RowType, TimestampType} import org.apache.paimon.utils.ProjectedRow @@ -90,7 +90,7 @@ class BucketFunction(NAME: String, bucketFunctionType: BucketFunctionType) exten val serializer = new InternalRowSerializer(bucketKeyRowType) val mapping = (1 to bucketKeyRowType.getFieldCount).toArray val reusedRow = - new SparkInternalRowWrapper(-1, inputType, inputType.fields.length) + new SparkInternalRowWrapper(inputType, inputType.fields.length) val bucketFunc: bucket.BucketFunction = bucket.BucketFunction.create(bucketFunctionType, bucketKeyRowType) new ScalarFunction[Int]() { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala index fb53fafa259b..717dc92b23f4 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala @@ -145,7 +145,7 @@ private class FormatTableDataWriter(batchWriteBuilder: BatchWriteBuilder, writeS private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow = { val numFields = writeSchema.fields.length record => { - new SparkInternalRowWrapper(-1, writeSchema, numFields).replace(record) + new SparkInternalRowWrapper(writeSchema, numFields).replace(record) } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWrite.scala index cfb6c04ee361..603ec7ecff1c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWrite.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataWrite.scala @@ -117,7 +117,7 @@ abstract class abstractInnerTableDataWrite[T] extends InnerTableDataWrite[T] wit /** For batch write, batchId is None, for streaming write, batchId is the current batch id (>= 0). */ val batchId: Option[Long] - private val needFullCompaction: Boolean = { + private lazy val needFullCompaction: Boolean = { fullCompactionDeltaCommits match { case Some(deltaCommits) if deltaCommits > 0 => batchId match { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala index 589ba1745193..1b58483e69ef 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala @@ -54,11 +54,15 @@ case class PaimonBatchWrite( } override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = { - val fullCompactionDeltaCommits: Option[Int] = - Option.apply(coreOptions.fullCompactionDeltaCommits()) - (_: Int, _: Long) => { - PaimonV2DataWriter(batchWriteBuilder, writeSchema, dataSchema, fullCompactionDeltaCommits) - } + (_: Int, _: Long) => + { + PaimonV2DataWriter( + batchWriteBuilder, + writeSchema, + dataSchema, + coreOptions, + table.catalogEnvironment().catalogContext()) + } } override def useCommitCoordinator(): Boolean = false diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala index d5291fe5d8f9..fbd166a18312 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala @@ -18,6 +18,8 @@ package org.apache.paimon.spark.write +import org.apache.paimon.CoreOptions +import org.apache.paimon.catalog.CatalogContext import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils} import org.apache.paimon.spark.metric.SparkMetricRegistry import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, TableWriteImpl} @@ -32,15 +34,19 @@ case class PaimonV2DataWriter( writeBuilder: BatchWriteBuilder, writeSchema: StructType, dataSchema: StructType, - fullCompactionDeltaCommits: Option[Int], + coreOptions: CoreOptions, + catalogContext: CatalogContext, batchId: Option[Long] = None) extends abstractInnerTableDataWrite[InternalRow] with InnerTableV2DataWrite { private val ioManager = SparkUtils.createIOManager() - private val metricRegistry = SparkMetricRegistry() + val fullCompactionDeltaCommits: Option[Int] = + Option.apply(coreOptions.fullCompactionDeltaCommits()) + val blobAsDescriptor: Boolean = coreOptions.blobAsDescriptor() + val write: TableWriteImpl[InternalRow] = { writeBuilder .newWrite() @@ -51,7 +57,12 @@ case class PaimonV2DataWriter( private val rowConverter: InternalRow => SparkInternalRowWrapper = { val numFields = writeSchema.fields.length - val reusableWrapper = new SparkInternalRowWrapper(-1, writeSchema, dataSchema, numFields) + val reusableWrapper = new SparkInternalRowWrapper( + writeSchema, + numFields, + dataSchema, + blobAsDescriptor, + catalogContext) record => reusableWrapper.replace(record) } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala index 4bc124661271..5d8e1ef9fb8b 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala @@ -19,14 +19,14 @@ package org.apache.paimon.spark.sql import org.apache.paimon.catalog.CatalogContext -import org.apache.paimon.data.Blob -import org.apache.paimon.data.BlobDescriptor +import org.apache.paimon.data.{Blob, BlobDescriptor} import org.apache.paimon.fs.Path import org.apache.paimon.fs.local.LocalFileIO import org.apache.paimon.options.Options import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.utils.UriReaderFactory +import org.apache.spark.SparkConf import org.apache.spark.sql.Row import java.util @@ -36,6 +36,10 @@ class BlobTestBase extends PaimonSparkTestBase { private val RANDOM = new Random + override def sparkConf: SparkConf = { + super.sparkConf.set("spark.paimon.write.use-v2-write", "false") + } + test("Blob: test basic") { withTable("t") { sql( @@ -216,7 +220,7 @@ class BlobTestBase extends PaimonSparkTestBase { def bytesToHex(bytes: Array[Byte]): String = { val hexChars = new Array[Char](bytes.length * 2) - for (j <- 0 until bytes.length) { + for (j <- bytes.indices) { val v = bytes(j) & 0xff hexChars(j * 2) = HEX_ARRAY(v >>> 4) hexChars(j * 2 + 1) = HEX_ARRAY(v & 0x0f) @@ -224,3 +228,9 @@ class BlobTestBase extends PaimonSparkTestBase { new String(hexChars) } } + +class BlobTestWithV2Write extends BlobTestBase { + override def sparkConf: SparkConf = { + super.sparkConf.set("spark.paimon.write.use-v2-write", "true") + } +}