diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java b/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java index ffd83eddd227..7143d7d5fdc2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/RowRangeIndex.java @@ -43,8 +43,12 @@ private RowRangeIndex(List ranges) { } public static RowRangeIndex create(List ranges) { + return create(ranges, true); + } + + public static RowRangeIndex create(List ranges, boolean mergeAdjacent) { checkArgument(ranges != null, "Ranges cannot be null"); - return new RowRangeIndex(Range.sortAndMergeOverlap(ranges, true)); + return new RowRangeIndex(Range.sortAndMergeOverlap(ranges, mergeAdjacent)); } public List ranges() { @@ -63,6 +67,13 @@ public boolean contains(Range range) { && ends[candidate] >= range.to; } + public boolean containsExactly(Range range) { + int candidate = lowerBound(starts, range.from); + return candidate < starts.length + && starts[candidate] == range.from + && ends[candidate] == range.to; + } + public List intersectedRanges(long start, long end) { int left = lowerBound(ends, start); if (left >= ranges.size()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java index 86f555748f4b..30f214afcf9f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFallbackRecordReader.java @@ -56,6 +56,7 @@ public class BlobFallbackRecordReader implements RecordReader { private final List> groupReaders = new ArrayList<>(); private final int blobIndex; + private final int fieldCount; private boolean returned; BlobFallbackRecordReader( @@ -65,6 +66,7 @@ public class BlobFallbackRecordReader implements RecordReader { RowType readRowType, int blobIndex) { this.blobIndex = blobIndex; + this.fieldCount = readRowType.getFieldCount(); checkArgument(!files.isEmpty(), "Blob bunch should not be empty."); long firstRowId = Long.MAX_VALUE; @@ -172,8 +174,7 @@ public InternalRow next() throws IOException { } } if (result == null) { - throw new IllegalStateException( - "Invalid state: all blob files at the same row id store a placeholder, it's a bug."); + result = nullBlobRow(); } return result; } @@ -187,6 +188,10 @@ public void releaseBatch() { }; } + private InternalRow nullBlobRow() { + return new GenericRow(fieldCount); + } + private boolean isPlaceHolder(InternalRow row) { return !row.isNullAt(blobIndex) && row.getBlob(blobIndex) == BlobPlaceholder.INSTANCE; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index ec5829e81c08..39bbca3338da 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -633,26 +633,23 @@ Optional checkRowIdExistence( return Optional.empty(); } - Set existingIndex = new HashSet<>(); - for (SimpleFileEntry base : baseEntries) { - if (base.firstRowId() != null) { - existingIndex.add( - new FileRowIdKey( - base.partition(), - base.bucket(), - base.firstRowId(), - base.rowCount())); - } - } + List existingRanges = + baseEntries.stream() + .filter( + base -> + base.firstRowId() != null + && !dedicatedStorageFile(base.fileName())) + .map(SimpleFileEntry::nonNullRowIdRange) + .collect(Collectors.toList()); + RowRangeIndex existingIndex = RowRangeIndex.create(existingRanges, false); for (SimpleFileEntry entry : filesToCheck) { - FileRowIdKey key = - new FileRowIdKey( - entry.partition(), - entry.bucket(), - entry.firstRowId(), - entry.rowCount()); - if (!existingIndex.contains(key)) { + Range rowRange = entry.nonNullRowIdRange(); + boolean exists = + dedicatedStorageFile(entry.fileName()) + ? existingIndex.contains(rowRange) + : existingIndex.containsExactly(rowRange); + if (!exists) { return Optional.of( new RuntimeException( String.format( @@ -670,40 +667,6 @@ Optional checkRowIdExistence( return Optional.empty(); } - private static class FileRowIdKey { - private final BinaryRow partition; - private final int bucket; - private final long firstRowId; - private final long rowCount; - - FileRowIdKey(BinaryRow partition, int bucket, long firstRowId, long rowCount) { - this.partition = partition; - this.bucket = bucket; - this.firstRowId = firstRowId; - this.rowCount = rowCount; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - FileRowIdKey that = (FileRowIdKey) o; - return bucket == that.bucket - && firstRowId == that.firstRowId - && rowCount == that.rowCount - && Objects.equals(partition, that.partition); - } - - @Override - public int hashCode() { - return Objects.hash(partition, bucket, firstRowId, rowCount); - } - } - private static boolean dedicatedStorageFile(String fileName) { return isBlobFile(fileName) || isVectorStoreFile(fileName); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java index dcea0a6ff130..596dbcd955fa 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/BlobFallbackRecordReaderTest.java @@ -45,7 +45,6 @@ import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link BlobFallbackRecordReader}. */ public class BlobFallbackRecordReaderTest { @@ -134,18 +133,19 @@ public void testBlobFallbackRecordReaderDerivesRowIdBoundsFromFiles() throws Exc } @Test - public void testBlobFallbackRecordReaderThrowsIfAllRowsArePlaceholders() { + public void testBlobFallbackRecordReaderReturnsNullIfAllRowsArePlaceholders() throws Exception { DataFileMeta newFile = blobFile("new-placeholder-file", 0, 1, 2); DataFileMeta oldFile = blobFile("old-placeholder-file", 0, 1, 1); - assertThatThrownBy( - () -> - readFallback( - Arrays.asList(newFile, oldFile), - null, - placeholderRows(newFile, 0, oldFile, 0))) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("all blob files at the same row id store a placeholder"); + ReadResult rows = + readFallback( + Arrays.asList(newFile, oldFile), + null, + placeholderRows(newFile, 0, oldFile, 0)); + + assertThat(rows.rowIds).isEmpty(); + assertThat(rows.nullBlobRowCount).isEqualTo(1); + assertThat(rows.placeholderRowCount).isEqualTo(0); } @Test @@ -363,6 +363,7 @@ private static class ReadResult { final List sequenceNumbers = new ArrayList<>(); final List batchSizes = new ArrayList<>(); int placeholderRowCount; + int nullBlobRowCount; static ReadResult read(RecordReader reader) throws Exception { try { @@ -385,7 +386,9 @@ static ReadResult read(RecordReader reader) throws Exception { } private void add(InternalRow row) { - if (row.getBlob(BLOB_INDEX) == BlobPlaceholder.INSTANCE) { + if (row.isNullAt(BLOB_INDEX)) { + nullBlobRowCount++; + } else if (row.getBlob(BLOB_INDEX) == BlobPlaceholder.INSTANCE) { placeholderRowCount++; } else { rowIds.add(row.getLong(1)); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java index 20be3b2c6ee1..c3371e1c19b9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java @@ -454,6 +454,85 @@ void testCheckRowIdExistenceBaseFileRewritten() { assertThat(result.get().getMessage()).contains("Row ID existence conflict"); } + @Test + void testCheckRowIdExistenceNormalFileRejectsAdjacentDataFiles() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 2L)); + baseEntries.add(createFileEntryWithRowId("f2", ADD, 2L, 2L)); + + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 4L)); + + Optional result = + detection.checkRowIdExistence(baseEntries, deltaEntries, 4L); + assertThat(result).isPresent(); + assertThat(result.get().getMessage()).contains("Row ID existence conflict"); + } + + @Test + void testCheckRowIdExistenceDedicatedFileCoveredByDataFiles() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 4L)); + + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("p1.blob", ADD, 0L, 2L)); + + assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 4L)).isEmpty(); + } + + @Test + void testCheckRowIdExistenceDedicatedFileRejectsAdjacentDataFiles() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 2L)); + baseEntries.add(createFileEntryWithRowId("f2", ADD, 2L, 2L)); + + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("p1.blob", ADD, 0L, 4L)); + + Optional result = + detection.checkRowIdExistence(baseEntries, deltaEntries, 4L); + assertThat(result).isPresent(); + assertThat(result.get().getMessage()).contains("Row ID existence conflict"); + } + + @Test + void testCheckRowIdExistenceDedicatedFileRejectsRangeNotCoveredByOneDataFile() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 2L)); + + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("p1.blob", ADD, 0L, 3L)); + + Optional result = + detection.checkRowIdExistence(baseEntries, deltaEntries, 3L); + assertThat(result).isPresent(); + assertThat(result.get().getMessage()).contains("Row ID existence conflict"); + } + + @Test + void testCheckRowIdExistenceDedicatedFileIgnoresBaseDedicatedFiles() { + ConflictDetection detection = createConflictDetection(); + + List baseEntries = new ArrayList<>(); + baseEntries.add(createFileEntryWithRowId("old.blob", ADD, 0L, 2L)); + + List deltaEntries = new ArrayList<>(); + deltaEntries.add(createFileEntryWithRowId("p1.blob", ADD, 0L, 2L)); + + Optional result = + detection.checkRowIdExistence(baseEntries, deltaEntries, 2L); + assertThat(result).isPresent(); + assertThat(result.get().getMessage()).contains("Row ID existence conflict"); + } + @Test void testCheckRowIdExistenceSkipsNewlyAppendedFiles() { ConflictDetection detection = createConflictDetection(); diff --git a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala new file mode 100644 index 000000000000..b190abbc912c --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class BlobUpdateTest extends BlobUpdateTestBase {} diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala new file mode 100644 index 000000000000..b190abbc912c --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class BlobUpdateTest extends BlobUpdateTestBase {} diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala new file mode 100644 index 000000000000..b190abbc912c --- /dev/null +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class BlobUpdateTest extends BlobUpdateTestBase {} diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala new file mode 100644 index 000000000000..b190abbc912c --- /dev/null +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class BlobUpdateTest extends BlobUpdateTestBase {} diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index ad6f5b95011a..3975ffdf843b 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -34,20 +34,22 @@ import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl} import org.apache.paimon.table.source.DataSplit import org.apache.paimon.table.source.snapshot.SnapshotReader +import org.apache.paimon.types.DataTypeRoot.BLOB import org.apache.paimon.types.RowType +import org.apache.paimon.types.VectorType.isVectorStoreFile import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.PaimonUtils._ import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolver -import org.apache.spark.sql.catalyst.expressions.{Alias, And, AttributeReference, EqualTo, Expression, ExprId, Literal, PythonUDF, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, And, AttributeReference, EqualTo, Expression, ExprId, Literal, Or, PythonUDF, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.paimon.shims.SparkShimLoader -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{BooleanType, StructType} import scala.collection.{immutable, mutable} import scala.collection.JavaConverters._ @@ -91,9 +93,8 @@ case class MergeIntoPaimonDataEvolutionTable( action match { case updateAction: UpdateAction => for (assignment <- updateAction.assignments) { - if (!assignment.key.equals(assignment.value)) { - val key = assignment.key.asInstanceOf[AttributeReference] - columns ++= Seq(key) + if (isModifiedAssignment(assignment)) { + columns += assignmentKeyAttribute(assignment) } } } @@ -160,7 +161,12 @@ case class MergeIntoPaimonDataEvolutionTable( val firstRowIds: immutable.IndexedSeq[Long] = tableSplits .flatMap(_.dataFiles().asScala) - .filter(file => file.firstRowId() != null && !isBlobFile(file.fileName())) + .filter { + file => + file.firstRowId() != null && + !isBlobFile(file.fileName()) && + !isVectorStoreFile(file.fileName()) + } .map(file => file.firstRowId().asInstanceOf[Long]) .distinct .sorted @@ -330,11 +336,11 @@ case class MergeIntoPaimonDataEvolutionTable( touchedFileTargetRelation: DataSourceV2Relation, firstRowIds: immutable.IndexedSeq[Long], persistSourceDss: Option[Dataset[Row]]): Seq[CommitMessage] = { - val mergeFields = extractFields(matchedCondition) + val conditionFields = extractFields(matchedCondition) val allFields = mutable.SortedSet.empty[AttributeReference]( (o1, o2) => { o1.toString().compareTo(o2.toString()) - }) ++ mergeFields + }) ++ conditionFields val updateColumnsSorted = updateColumns.toSeq.sortBy( s => targetTable.output.map(x => x.toString()).indexOf(s.toString())) @@ -347,8 +353,39 @@ case class MergeIntoPaimonDataEvolutionTable( .map { case (_, attrs) => attrs.head } .toSeq - val assignments = metadataColumns.map(column => Assignment(column, column)) - val output = updateColumnsSorted ++ metadataColumns + // Find raw blob update columns and avoid reading them from target table + val blobInlineFields = table.coreOptions().blobInlineField().asScala.toSet + val rawBlobFieldNames = table + .rowType() + .getFields + .asScala + .filter( + field => + field.`type`().is(BLOB) && + !blobInlineFields.exists(inlineField => resolver(inlineField, field.name()))) + .map(_.name()) + .toSet + + def isRawBlobUpdateColumn(attr: AttributeReference): Boolean = { + rawBlobFieldNames.exists(rawBlobFieldName => resolver(rawBlobFieldName, attr.name)) + } + + // The final output is composed by updated columns, metadata columns and blob marker columns. + // Marker columns are used to mark whether a blob field should be written with placeholder + val rawBlobUpdateColumns = updateColumnsSorted.filter(isRawBlobUpdateColumn) + val rawBlobMarkerNames = + rawBlobMarkerNamesAvoiding( + rawBlobUpdateColumns.size, + updateColumnsSorted.map(_.name) ++ sourceTable.output.map(_.name)) + val rawBlobMarkerNamesByColumn = rawBlobUpdateColumns + .zip(rawBlobMarkerNames) + .map { case (attr, markerName) => attr.name -> markerName } + .toMap + val rawBlobMarkerAttributes = rawBlobUpdateColumns.map( + attr => + AttributeReference(rawBlobMarkerNamesByColumn(attr.name), BooleanType, nullable = false)()) + val mergeOutput = updateColumnsSorted ++ metadataColumns ++ rawBlobMarkerAttributes + val realUpdateActions = matchedActions .map(s => s.asInstanceOf[UpdateAction]) .map( @@ -356,12 +393,88 @@ case class MergeIntoPaimonDataEvolutionTable( UpdateAction.apply( update.condition, update.assignments.filter( - a => - updateColumnsSorted.contains( - a.key.asInstanceOf[AttributeReference])) ++ assignments)) + a => updateColumnsSorted.contains(assignmentKeyAttribute(a))))) + // All fields are composed by: + // 1. Match condition fields + // 2. For each update action, the condition fields and the assignment value fields + // 3. All updated fields exclude raw blob fields for (action <- realUpdateActions) { - allFields ++= action.references.flatMap(r => extractFields(r)).seq + action.condition.foreach(condition => allFields ++= extractFields(condition)) + for (assignment <- action.assignments) { + if (isModifiedAssignment(assignment)) { + allFields ++= extractFields(assignment.value) + } + } + } + allFields ++= updateColumnsSorted.filterNot(isRawBlobUpdateColumn) + + def modifiedRawBlobNames(action: UpdateAction): Set[String] = { + action.assignments.flatMap { + assignment => + if (isModifiedAssignment(assignment)) { + val key = assignmentKeyAttribute(assignment) + rawBlobUpdateColumns.find(_.sameRef(key)).map(_.name) + } else { + None + } + }.toSet + } + + def assignmentValue(action: UpdateAction, attr: AttributeReference): Expression = { + action.assignments + .find(assignment => assignmentKeyAttribute(assignment).sameRef(attr)) + .map(_.value) + .getOrElse(attr) + } + + // the output projection for update from source table + def updateOutput(action: UpdateAction, rawBlobModified: Set[String]): Seq[Expression] = { + val updatedColumns = updateColumnsSorted.map { + attr => + if ( + rawBlobUpdateColumns.exists(_.sameRef(attr)) && !rawBlobModified.contains(attr.name) + ) { + Literal(null, attr.dataType) + } else { + assignmentValue(action, attr) + } + } + val metadata = metadataColumns.map(attr => assignmentValue(action, attr)) + val markers = rawBlobUpdateColumns.map { + attr => + if (rawBlobModified.contains(attr.name)) { + FalseLiteral + } else { + TrueLiteral + } + } + updatedColumns ++ metadata ++ markers + } + + // the output projection for target table copy + def copyOutput: Seq[Expression] = { + val copiedColumns = updateColumnsSorted.map { + attr => + if (rawBlobUpdateColumns.exists(_.sameRef(attr))) { + Literal(null, attr.dataType) + } else { + attr + } + } + copiedColumns ++ metadataColumns ++ rawBlobUpdateColumns.map(_ => TrueLiteral) + } + + def reorderPartialWriteColumns(dataset: Dataset[Row]): Dataset[Row] = { + if (rawBlobMarkerAttributes.isEmpty) { + dataset + } else { + val columns = + updateColumnsSorted.map(attr => quotedColumn(attr.name)) ++ + Seq(quotedColumn(ROW_ID_NAME), quotedColumn(FIRST_ROW_ID_NAME)) ++ + rawBlobMarkerAttributes.map(attr => quotedColumn(attr.name)) + dataset.select(columns: _*) + } } val toWrite = if (isSelfMergeOnRowId) { @@ -400,6 +513,8 @@ case class MergeIntoPaimonDataEvolutionTable( } } + val rawBlobModifiedByAction = realUpdateActions.map(modifiedRawBlobNames) + val rewrittenUpdateActions: Seq[UpdateAction] = realUpdateActions.map { ua => val newCond = ua.condition.map(c => rewriteSourceToTarget(c, sourceToTarget)) @@ -413,29 +528,32 @@ case class MergeIntoPaimonDataEvolutionTable( isSourceRowPresent = TrueLiteral, isTargetRowPresent = TrueLiteral, matchedInstructions = rewrittenUpdateActions - .map( - action => { + .zip(rawBlobModifiedByAction) + .map { + case (action, rawBlobModified) => SparkShimLoader.shim .mergeRowsKeepUpdate( action.condition.getOrElse(TrueLiteral), - action.assignments.map(a => a.value)) + updateOutput(action, rawBlobModified)) .asInstanceOf[MergeRows.Instruction] - }) ++ Seq( + } ++ Seq( SparkShimLoader.shim - .mergeRowsKeepCopy(TrueLiteral, output) + .mergeRowsKeepCopy(TrueLiteral, copyOutput) .asInstanceOf[MergeRows.Instruction]), notMatchedInstructions = Nil, notMatchedBySourceInstructions = Seq( SparkShimLoader.shim - .mergeRowsKeepCopy(TrueLiteral, output) + .mergeRowsKeepCopy(TrueLiteral, copyOutput) .asInstanceOf[MergeRows.Instruction]), checkCardinality = false, - output = output, + output = mergeOutput, child = readPlan ) - val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds) - assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2) + val withFirstRowId = reorderPartialWriteColumns( + addFirstRowId(sparkSession, mergeRows, firstRowIds)) + assert( + withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2 + rawBlobUpdateColumns.size) withFirstRowId } else { val allReadFieldsOnTarget = allFields.filter( @@ -468,30 +586,35 @@ case class MergeIntoPaimonDataEvolutionTable( SparkShimLoader.shim .mergeRowsKeepUpdate( action.condition.getOrElse(TrueLiteral), - action.assignments.map(a => a.value)) + updateOutput(action, modifiedRawBlobNames(action))) .asInstanceOf[MergeRows.Instruction] }) ++ Seq( SparkShimLoader.shim - .mergeRowsKeepCopy(TrueLiteral, output) + .mergeRowsKeepCopy(TrueLiteral, copyOutput) .asInstanceOf[MergeRows.Instruction]), notMatchedInstructions = Nil, notMatchedBySourceInstructions = Seq( SparkShimLoader.shim - .mergeRowsKeepCopy(TrueLiteral, output) + .mergeRowsKeepCopy(TrueLiteral, copyOutput) .asInstanceOf[MergeRows.Instruction]).toSeq, checkCardinality = false, - output = output, + output = mergeOutput, child = joinPlan ) - val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds) - assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2) + val withFirstRowId = reorderPartialWriteColumns( + addFirstRowId(sparkSession, mergeRows, firstRowIds)) + assert( + withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2 + rawBlobUpdateColumns.size) withFirstRowId .repartition(col(FIRST_ROW_ID_NAME)) .sortWithinPartitions(FIRST_ROW_ID_NAME, ROW_ID_NAME) } val writer = DataEvolutionPaimonWriter(table, dataSplits) - writer.writePartialFields(toWrite, updateColumnsSorted.map(_.name)) + writer.writePartialFields( + toWrite, + updateColumnsSorted.map(_.name), + rawBlobUpdateColumns.map(attr => attr.name -> rawBlobMarkerNamesByColumn(attr.name)).toMap) } private def insertActionInvoke( @@ -695,6 +818,52 @@ object MergeIntoPaimonDataEvolutionTable { final private val ROW_FROM_TARGET = "__row_from_target" final private val ROW_ID_NAME = "_ROW_ID" final private val FIRST_ROW_ID_NAME = "_FIRST_ROW_ID"; + final private val RAW_BLOB_PLACEHOLDER_MARKER_PREFIX = "__paimon_raw_blob_placeholder_" + + private[commands] def isModifiedAssignment(assignment: Assignment): Boolean = { + !sameAttributeReference(assignment.key, assignment.value) + } + + private[commands] def assignmentKeyAttribute(assignment: Assignment): AttributeReference = { + assignment.key match { + case key: AttributeReference => key + case other => + throw new UnsupportedOperationException( + s"Unsupported update assignment key: $other. Only top-level attributes are supported.") + } + } + + private[commands] def rawBlobMarkerName(index: Int): String = { + RAW_BLOB_PLACEHOLDER_MARKER_PREFIX + index + } + + private[commands] def rawBlobMarkerNamesAvoiding( + count: Int, + reservedNames: Seq[String]): Seq[String] = { + var nextIndex = 0 + (0 until count).map { + _ => + var markerName = rawBlobMarkerName(nextIndex) + while (reservedNames.exists(reservedName => resolver(reservedName, markerName))) { + nextIndex += 1 + markerName = rawBlobMarkerName(nextIndex) + } + nextIndex += 1 + markerName + } + } + + private def quotedColumn(name: String) = { + col("`" + name.replace("`", "``") + "`") + } + + private def sameAttributeReference(left: Expression, right: Expression): Boolean = { + (left, right) match { + case (leftAttr: AttributeReference, rightAttr: AttributeReference) => + leftAttr.sameRef(rightAttr) + case _ => false + } + } private def floorBinarySearch(indexed: immutable.IndexedSeq[Long], value: Long): Long = { if (indexed.isEmpty) { diff --git a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala new file mode 100644 index 000000000000..b190abbc912c --- /dev/null +++ b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class BlobUpdateTest extends BlobUpdateTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala new file mode 100644 index 000000000000..b190abbc912c --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class BlobUpdateTest extends BlobUpdateTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala index c49368d1bf38..1c93bdcb754d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala @@ -30,6 +30,7 @@ import org.apache.paimon.types.VectorType.isVectorStoreFile import org.apache.paimon.utils.SerializationUtils import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolver import java.util.Collections @@ -43,23 +44,42 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Se override val table: FileStoreTable = paimonTable.copy(Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 G")) - def writePartialFields(data: DataFrame, columnNames: Seq[String]): Seq[CommitMessage] = { + def writePartialFields( + data: DataFrame, + columnNames: Seq[String], + rawBlobPlaceholderMarkerColumns: Map[String, String] = Map.empty): Seq[CommitMessage] = { val sparkSession = data.sparkSession import sparkSession.implicits._ - assert(data.columns.length == columnNames.size + 2) + assert(data.columns.length == columnNames.size + 2 + rawBlobPlaceholderMarkerColumns.size) val writeType = table.rowType().project(columnNames.asJava) val options = new CoreOptions(table.schema().options()) - val updatableBlobFields = options.updatableBlobFields() - val hasRawDataBlob = writeType.getFields.asScala.exists( - f => f.`type`().is(BLOB) && !updatableBlobFields.contains(f.name())) - if (hasRawDataBlob) { - throw new UnsupportedOperationException( - "DataEvolution does not support writing partial columns with raw-data BLOB type. " + - "Only descriptor-based BLOB columns (configured via '" + - CoreOptions.BLOB_DESCRIPTOR_FIELD.key() + "' or '" + - CoreOptions.BLOB_VIEW_FIELD.key() + "' or '" + - CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key() + "') can be updated.") + val blobInlineFields = options.blobInlineField().asScala.toSeq + // Maps from blob field index to corresponding marker column index + val rawBlobPlaceholderMarkerIndexes = writeType.getFields.asScala.flatMap { + field => + if ( + field.`type`().is(BLOB) && + !blobInlineFields.exists(inlineField => resolver(inlineField, field.name())) + ) { + val markerColumn = rawBlobPlaceholderMarkerColumns.getOrElse( + field.name(), + throw new UnsupportedOperationException( + "DataEvolution raw-data BLOB partial writes require an internal placeholder marker " + + s"for column ${field.name()}.") + ) + Some(writeType.getFieldIndex(field.name()) -> data.schema.fieldIndex(markerColumn)) + } else { + None + } + }.toMap + val unusedMarkerColumns = + rawBlobPlaceholderMarkerColumns.keySet -- rawBlobPlaceholderMarkerIndexes.keys.map( + index => writeType.getFields.get(index).name()) + if (unusedMarkerColumns.nonEmpty) { + throw new IllegalArgumentException( + "Raw BLOB placeholder markers do not match partial write columns: " + + unusedMarkerColumns.toSeq.sorted.mkString(", ")) } val firstRowIdToPartitionMap = new mutable.HashMap[Long, (Array[Byte], Long)] @@ -91,7 +111,8 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Se writeBuilder, writeType, firstRowIdToPartitionMapBroadcast.value, - catalogContextForBlobDescriptor) + catalogContextForBlobDescriptor, + rawBlobPlaceholderMarkerIndexes) try { iter.foreach(row => write.write(row)) Iterator.apply(write.commit) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index cd1b000a361f..a287b60399ec 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -34,6 +34,7 @@ import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl} import org.apache.paimon.table.source.DataSplit import org.apache.paimon.table.source.snapshot.SnapshotReader +import org.apache.paimon.types.DataTypeRoot.BLOB import org.apache.paimon.types.RowType import org.apache.paimon.types.VectorType.isVectorStoreFile @@ -48,7 +49,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.paimon.shims.SparkShimLoader -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{BooleanType, StructType} import scala.collection.{immutable, mutable} import scala.collection.JavaConverters._ @@ -334,11 +335,11 @@ case class MergeIntoPaimonDataEvolutionTable( touchedFileTargetRelation: DataSourceV2Relation, firstRowIds: immutable.IndexedSeq[Long], persistSourceDss: Option[Dataset[Row]]): Seq[CommitMessage] = { - val mergeFields = extractFields(matchedCondition) + val conditionFields = extractFields(matchedCondition) val allFields = mutable.SortedSet.empty[AttributeReference]( (o1, o2) => { o1.toString().compareTo(o2.toString()) - }) ++ mergeFields + }) ++ conditionFields val updateColumnsSorted = updateColumns.toSeq.sortBy( s => targetTable.output.map(x => x.toString()).indexOf(s.toString())) @@ -351,8 +352,39 @@ case class MergeIntoPaimonDataEvolutionTable( .map { case (_, attrs) => attrs.head } .toSeq - val assignments = metadataColumns.map(column => Assignment(column, column)) - val output = updateColumnsSorted ++ metadataColumns + // Find raw blob update columns and avoid reading them from target table + val blobInlineFields = table.coreOptions().blobInlineField().asScala.toSet + val rawBlobFieldNames = table + .rowType() + .getFields + .asScala + .filter( + field => + field.`type`().is(BLOB) && + !blobInlineFields.exists(inlineField => resolver(inlineField, field.name()))) + .map(_.name()) + .toSet + + def isRawBlobUpdateColumn(attr: AttributeReference): Boolean = { + rawBlobFieldNames.exists(rawBlobFieldName => resolver(rawBlobFieldName, attr.name)) + } + + // The final output is composed by updated columns, metadata columns and blob marker columns. + // Marker columns are used to mark whether a blob field should be written with placeholder + val rawBlobUpdateColumns = updateColumnsSorted.filter(isRawBlobUpdateColumn) + val rawBlobMarkerNames = + rawBlobMarkerNamesAvoiding( + rawBlobUpdateColumns.size, + updateColumnsSorted.map(_.name) ++ sourceTable.output.map(_.name)) + val rawBlobMarkerNamesByColumn = rawBlobUpdateColumns + .zip(rawBlobMarkerNames) + .map { case (attr, markerName) => attr.name -> markerName } + .toMap + val rawBlobMarkerAttributes = rawBlobUpdateColumns.map( + attr => + AttributeReference(rawBlobMarkerNamesByColumn(attr.name), BooleanType, nullable = false)()) + val mergeOutput = updateColumnsSorted ++ metadataColumns ++ rawBlobMarkerAttributes + val realUpdateActions = matchedActions .map(s => s.asInstanceOf[UpdateAction]) .map( @@ -360,10 +392,88 @@ case class MergeIntoPaimonDataEvolutionTable( UpdateAction.apply( update.condition, update.assignments.filter( - a => updateColumnsSorted.contains(assignmentKeyAttribute(a))) ++ assignments)) + a => updateColumnsSorted.contains(assignmentKeyAttribute(a))))) + // All fields are composed by: + // 1. Match condition fields + // 2. For each update action, the condition fields and the assignment value fields + // 3. All updated fields exclude raw blob fields for (action <- realUpdateActions) { - allFields ++= action.references.flatMap(r => extractFields(r)).seq + action.condition.foreach(condition => allFields ++= extractFields(condition)) + for (assignment <- action.assignments) { + if (isModifiedAssignment(assignment)) { + allFields ++= extractFields(assignment.value) + } + } + } + allFields ++= updateColumnsSorted.filterNot(isRawBlobUpdateColumn) + + def modifiedRawBlobNames(action: UpdateAction): Set[String] = { + action.assignments.flatMap { + assignment => + if (isModifiedAssignment(assignment)) { + val key = assignmentKeyAttribute(assignment) + rawBlobUpdateColumns.find(_.sameRef(key)).map(_.name) + } else { + None + } + }.toSet + } + + def assignmentValue(action: UpdateAction, attr: AttributeReference): Expression = { + action.assignments + .find(assignment => assignmentKeyAttribute(assignment).sameRef(attr)) + .map(_.value) + .getOrElse(attr) + } + + // the output projection for update from source table + def updateOutput(action: UpdateAction, rawBlobModified: Set[String]): Seq[Expression] = { + val updatedColumns = updateColumnsSorted.map { + attr => + if ( + rawBlobUpdateColumns.exists(_.sameRef(attr)) && !rawBlobModified.contains(attr.name) + ) { + Literal(null, attr.dataType) + } else { + assignmentValue(action, attr) + } + } + val metadata = metadataColumns.map(attr => assignmentValue(action, attr)) + val markers = rawBlobUpdateColumns.map { + attr => + if (rawBlobModified.contains(attr.name)) { + FalseLiteral + } else { + TrueLiteral + } + } + updatedColumns ++ metadata ++ markers + } + + // the output projection for target table copy + def copyOutput: Seq[Expression] = { + val copiedColumns = updateColumnsSorted.map { + attr => + if (rawBlobUpdateColumns.exists(_.sameRef(attr))) { + Literal(null, attr.dataType) + } else { + attr + } + } + copiedColumns ++ metadataColumns ++ rawBlobUpdateColumns.map(_ => TrueLiteral) + } + + def reorderPartialWriteColumns(dataset: Dataset[Row]): Dataset[Row] = { + if (rawBlobMarkerAttributes.isEmpty) { + dataset + } else { + val columns = + updateColumnsSorted.map(attr => quotedColumn(attr.name)) ++ + Seq(quotedColumn(ROW_ID_NAME), quotedColumn(FIRST_ROW_ID_NAME)) ++ + rawBlobMarkerAttributes.map(attr => quotedColumn(attr.name)) + dataset.select(columns: _*) + } } val toWrite = if (isSelfMergeOnRowId) { @@ -402,6 +512,8 @@ case class MergeIntoPaimonDataEvolutionTable( } } + val rawBlobModifiedByAction = realUpdateActions.map(modifiedRawBlobNames) + val rewrittenUpdateActions: Seq[UpdateAction] = realUpdateActions.map { ua => val newCond = ua.condition.map(c => rewriteSourceToTarget(c, sourceToTarget)) @@ -415,29 +527,32 @@ case class MergeIntoPaimonDataEvolutionTable( isSourceRowPresent = TrueLiteral, isTargetRowPresent = TrueLiteral, matchedInstructions = rewrittenUpdateActions - .map( - action => { + .zip(rawBlobModifiedByAction) + .map { + case (action, rawBlobModified) => SparkShimLoader.shim .mergeRowsKeepUpdate( action.condition.getOrElse(TrueLiteral), - action.assignments.map(a => a.value)) + updateOutput(action, rawBlobModified)) .asInstanceOf[MergeRows.Instruction] - }) ++ Seq( + } ++ Seq( SparkShimLoader.shim - .mergeRowsKeepCopy(TrueLiteral, output) + .mergeRowsKeepCopy(TrueLiteral, copyOutput) .asInstanceOf[MergeRows.Instruction]), notMatchedInstructions = Nil, notMatchedBySourceInstructions = Seq( SparkShimLoader.shim - .mergeRowsKeepCopy(TrueLiteral, output) + .mergeRowsKeepCopy(TrueLiteral, copyOutput) .asInstanceOf[MergeRows.Instruction]), checkCardinality = false, - output = output, + output = mergeOutput, child = readPlan ) - val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds) - assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2) + val withFirstRowId = reorderPartialWriteColumns( + addFirstRowId(sparkSession, mergeRows, firstRowIds)) + assert( + withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2 + rawBlobUpdateColumns.size) withFirstRowId } else { val allReadFieldsOnTarget = allFields.filter( @@ -470,30 +585,35 @@ case class MergeIntoPaimonDataEvolutionTable( SparkShimLoader.shim .mergeRowsKeepUpdate( action.condition.getOrElse(TrueLiteral), - action.assignments.map(a => a.value)) + updateOutput(action, modifiedRawBlobNames(action))) .asInstanceOf[MergeRows.Instruction] }) ++ Seq( SparkShimLoader.shim - .mergeRowsKeepCopy(TrueLiteral, output) + .mergeRowsKeepCopy(TrueLiteral, copyOutput) .asInstanceOf[MergeRows.Instruction]), notMatchedInstructions = Nil, notMatchedBySourceInstructions = Seq( SparkShimLoader.shim - .mergeRowsKeepCopy(TrueLiteral, output) + .mergeRowsKeepCopy(TrueLiteral, copyOutput) .asInstanceOf[MergeRows.Instruction]).toSeq, checkCardinality = false, - output = output, + output = mergeOutput, child = joinPlan ) - val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds) - assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2) + val withFirstRowId = reorderPartialWriteColumns( + addFirstRowId(sparkSession, mergeRows, firstRowIds)) + assert( + withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2 + rawBlobUpdateColumns.size) withFirstRowId .repartition(col(FIRST_ROW_ID_NAME)) .sortWithinPartitions(FIRST_ROW_ID_NAME, ROW_ID_NAME) } val writer = DataEvolutionPaimonWriter(table, dataSplits) - writer.writePartialFields(toWrite, updateColumnsSorted.map(_.name)) + writer.writePartialFields( + toWrite, + updateColumnsSorted.map(_.name), + rawBlobUpdateColumns.map(attr => attr.name -> rawBlobMarkerNamesByColumn(attr.name)).toMap) } private def insertActionInvoke( @@ -697,6 +817,7 @@ object MergeIntoPaimonDataEvolutionTable { final private val ROW_FROM_TARGET = "__row_from_target" final private val ROW_ID_NAME = "_ROW_ID" final private val FIRST_ROW_ID_NAME = "_FIRST_ROW_ID"; + final private val RAW_BLOB_PLACEHOLDER_MARKER_PREFIX = "__paimon_raw_blob_placeholder_" private[commands] def isModifiedAssignment(assignment: Assignment): Boolean = { !sameAttributeReference(assignment.key, assignment.value) @@ -711,6 +832,30 @@ object MergeIntoPaimonDataEvolutionTable { } } + private[commands] def rawBlobMarkerName(index: Int): String = { + RAW_BLOB_PLACEHOLDER_MARKER_PREFIX + index + } + + private[commands] def rawBlobMarkerNamesAvoiding( + count: Int, + reservedNames: Seq[String]): Seq[String] = { + var nextIndex = 0 + (0 until count).map { + _ => + var markerName = rawBlobMarkerName(nextIndex) + while (reservedNames.exists(reservedName => resolver(reservedName, markerName))) { + nextIndex += 1 + markerName = rawBlobMarkerName(nextIndex) + } + nextIndex += 1 + markerName + } + } + + private def quotedColumn(name: String) = { + col("`" + name.replace("`", "``") + "`") + } + private def sameAttributeReference(left: Expression, right: Expression): Boolean = { (left, right) match { case (leftAttr: AttributeReference, rightAttr: AttributeReference) => diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala index ba2e84ef8a44..50f2f691b04c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala @@ -18,15 +18,18 @@ package org.apache.paimon.spark.write +import org.apache.paimon.casting.FallbackMappingRow import org.apache.paimon.catalog.CatalogContext -import org.apache.paimon.data.{BinaryRow, InternalRow} +import org.apache.paimon.data.{BinaryRow, BlobPlaceholder, GenericRow, InternalRow} import org.apache.paimon.disk.IOManager +import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile import org.apache.paimon.io.{CompactIncrement, DataIncrement} import org.apache.paimon.operation.AbstractFileStoreWrite import org.apache.paimon.spark.SparkUtils import org.apache.paimon.spark.util.SparkRowUtils import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, CommitMessageImpl, TableWriteImpl} import org.apache.paimon.types.RowType +import org.apache.paimon.types.VectorType.isVectorStoreFile import org.apache.paimon.utils.RecordWriter import org.apache.paimon.utils.SerializationUtils @@ -34,6 +37,7 @@ import org.apache.spark.sql.Row import java.util.Collections +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ListBuffer @@ -41,7 +45,8 @@ case class DataEvolutionTableDataWrite( writeBuilder: BatchWriteBuilder, writeType: RowType, firstRowIdToPartitionMap: mutable.HashMap[Long, (Array[Byte], Long)], - catalogContext: CatalogContext) + catalogContext: CatalogContext, + rawBlobPlaceholderMarkerIndexes: Map[Int, Int]) extends InnerTableV1DataWrite { private var currentWriter: PerFileWriter = _ @@ -53,6 +58,18 @@ case class DataEvolutionTableDataWrite( private val toPaimonRow = { SparkRowUtils.toPaimonRow(writeType, -1, catalogContext) } + private val rawBlobFallbackFields = rawBlobPlaceholderMarkerIndexes.toSeq.sortBy(_._1).toArray + private val rawBlobFallbackMappings = { + val mappings = Array.fill(writeType.getFieldCount)(-1) + rawBlobFallbackFields.zipWithIndex.foreach { + case ((fieldIndex, _), fallbackIndex) => + mappings(fieldIndex) = fallbackIndex + } + mappings + } + private val rawBlobFallbackMarkerIndexes = rawBlobFallbackFields.map(_._2) + private val rawBlobFallbackRow = new GenericRow(rawBlobFallbackMarkerIndexes.length) + private val rawBlobFallbackMappingRow = new FallbackMappingRow(rawBlobFallbackMappings) def write(row: Row): Unit = { val firstRowId = row.getLong(firstRowIdIndex) @@ -62,7 +79,24 @@ case class DataEvolutionTableDataWrite( newCurrentWriter(firstRowId) } - currentWriter.write(toPaimonRow(row), rowId) + val paimonRow = toPaimonRow(row) + currentWriter.write( + if (rawBlobPlaceholderMarkerIndexes.isEmpty) { + paimonRow + } else { + rawBlobFallbackMappingRow.replace(paimonRow, rawBlobPlaceholderFallbackRow(row)) + }, + rowId) + } + + private def rawBlobPlaceholderFallbackRow(row: Row): InternalRow = { + rawBlobFallbackMarkerIndexes.zipWithIndex.foreach { + case (markerIndex, fallbackIndex) => + rawBlobFallbackRow.setField( + fallbackIndex, + if (row.getBoolean(markerIndex)) BlobPlaceholder.INSTANCE else null) + } + rawBlobFallbackRow } private def newCurrentWriter(firstRowId: Long): Unit = { @@ -89,7 +123,7 @@ case class DataEvolutionTableDataWrite( private def finishCurrentWriter(): Unit = { if (currentWriter != null) { - commitMessages.append(currentWriter.finish()) + commitMessages ++= currentWriter.finish() } currentWriter = null } @@ -126,28 +160,66 @@ case class DataEvolutionTableDataWrite( recordWriter.write(row) } - def finish(): CommitMessageImpl = { + def finish(): Seq[CommitMessageImpl] = { try { assert( numRecords == numWritten, s"Number of written records $numWritten does not match expected number $numRecords for first row ID $firstRowId.") val result = recordWriter.prepareCommit(false) val dataFiles = result.newFilesIncrement().newFiles() - assert(dataFiles.size() == 1, "This is a bug, PerFileWriter could only produce one file") - val dataFileMeta = dataFiles.get(0).assignFirstRowId(firstRowId) - new CommitMessageImpl( - partition, - 0, - null, - new DataIncrement( - java.util.Arrays.asList(dataFileMeta), - Collections.emptyList(), - Collections.emptyList()), - CompactIncrement.emptyIncrement() - ) + val dataFileMetas = assignFirstRowIds(dataFiles.asScala.toSeq) + Seq( + new CommitMessageImpl( + partition, + 0, + null, + new DataIncrement( + dataFileMetas.asJava, + Collections.emptyList(), + Collections.emptyList()), + CompactIncrement.emptyIncrement() + )) } finally { recordWriter.close() } } + + private def assignFirstRowIds(dataFiles: Seq[org.apache.paimon.io.DataFileMeta]) + : Seq[org.apache.paimon.io.DataFileMeta] = { + val assigned = ListBuffer[org.apache.paimon.io.DataFileMeta]() + val blobFieldStarts = mutable.HashMap[String, Long]() + var normalFileCount = 0 + var normalFileStart = firstRowId + var vectorStoreStart = firstRowId + + dataFiles.foreach { + file => + if (isBlobFile(file.fileName())) { + val blobFieldName = file.writeCols().get(0) + val blobStart = blobFieldStarts.getOrElse(blobFieldName, firstRowId) + assigned += file.assignFirstRowId(blobStart) + blobFieldStarts.update(blobFieldName, blobStart + file.rowCount()) + } else if (isVectorStoreFile(file.fileName())) { + assigned += file.assignFirstRowId(vectorStoreStart) + vectorStoreStart += file.rowCount() + } else { + normalFileCount += 1 + assigned += file.assignFirstRowId(normalFileStart) + normalFileStart += file.rowCount() + } + } + + // Raw blob/vector-store only partial writes may produce no normal file. If a normal file is + // produced, row-id assignment assumes there is at most one in this target row range. + // DedicatedFormatRollingFileWriter validates dedicated file row counts when a normal file + // exists. + if (normalFileCount > 1) { + throw new IllegalStateException( + s"This is a bug: DataEvolution partial write should produce at most one normal file, " + + s"but produced $normalFileCount files. Files: ${dataFiles.mkString(", ")}") + } + + assigned.toSeq + } } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala index 9c912e027343..2d24b1435bb9 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala @@ -430,56 +430,6 @@ class BlobTestBase extends PaimonSparkTestBase { } } - test("Blob: merge-into rejects updating raw-data BLOB column") { - withTable("s", "t") { - sql("CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES " + - "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture')") - sql("INSERT INTO t VALUES (1, 'name1', X'48656C6C6F')") - - sql("CREATE TABLE s (id INT, picture BINARY)") - sql("INSERT INTO s VALUES (1, X'4E4557')") - - val e = intercept[UnsupportedOperationException] { - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN MATCHED THEN UPDATE SET t.picture = s.picture - |""".stripMargin) - } - assert(e.getMessage.contains("raw-data BLOB")) - } - } - - test("Blob: merge-into updates non-blob column on raw blob table with split blob files") { - withTable("s", "t") { - sql( - "CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES " + - "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', " + - "'blob-field'='picture', 'blob.target-file-size'='1 b')") - sql( - "INSERT INTO t VALUES " + - "(1, 'name1', X'48656C6C6F'), " + - "(2, 'name2', X'5945'), " + - "(3, 'name3', X'414243')") - - sql("CREATE TABLE s (id INT, name STRING)") - sql("INSERT INTO s VALUES (1, 'updated_name1')") - - sql(""" - |MERGE INTO t - |USING s - |ON t.id = s.id - |WHEN MATCHED THEN UPDATE SET t.name = s.name - |""".stripMargin) - - checkAnswer( - sql("SELECT id, name FROM t ORDER BY id"), - Seq(Row(1, "updated_name1"), Row(2, "name2"), Row(3, "name3")) - ) - } - } - test("Blob: self merge reads raw blob column to update non-blob column") { withTable("t") { sql( diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTestBase.scala new file mode 100644 index 000000000000..a696cec7b52d --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobUpdateTestBase.scala @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.SparkConf +import org.apache.spark.sql.Row + +class BlobUpdateTestBase extends PaimonSparkTestBase { + + override def sparkConf: SparkConf = { + super.sparkConf.set("spark.paimon.write.use-v2-write", "false") + } + + test("Blob: merge-into updates raw-data BLOB column") { + withTable("s", "t") { + sql("CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES " + + "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture')") + sql( + "INSERT INTO t VALUES " + + "(1, 'name1', X'48656C6C6F'), " + + "(2, 'name2', X'5945'), " + + "(3, 'name3', X'414243')") + + sql("CREATE TABLE s (id INT, picture BINARY)") + sql("INSERT INTO s VALUES (1, X'4E4557')") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.picture = s.picture + |""".stripMargin) + + checkAnswer( + sql("SELECT id, picture FROM t ORDER BY id"), + Seq( + Row(1, Array[Byte](78, 69, 87)), + Row(2, Array[Byte](89, 69)), + Row(3, Array[Byte](65, 66, 67))) + ) + } + } + + test("Blob: merge-into updates raw-data BLOB column to null") { + withTable("s", "s2", "t") { + sql("CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES " + + "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture')") + sql( + "INSERT INTO t VALUES " + + "(1, 'name1', X'48656C6C6F'), " + + "(2, 'name2', X'5945')") + + sql("CREATE TABLE s (id INT, picture BINARY)") + sql("INSERT INTO s VALUES (1, CAST(NULL AS BINARY))") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.picture = s.picture + |""".stripMargin) + + checkAnswer( + sql("SELECT id, picture FROM t ORDER BY id"), + Seq(Row(1, null), Row(2, Array[Byte](89, 69))) + ) + + sql("CREATE TABLE s2 (id INT, picture BINARY)") + sql("INSERT INTO s2 VALUES (2, X'4E4557')") + sql(""" + |MERGE INTO t + |USING s2 + |ON t.id = s2.id + |WHEN MATCHED THEN UPDATE SET t.picture = s2.picture + |""".stripMargin) + + checkAnswer( + sql("SELECT id, picture FROM t ORDER BY id"), + Seq(Row(1, null), Row(2, Array[Byte](78, 69, 87))) + ) + } + } + + test("Blob: merge-into raw-data BLOB marker name does not collide with target column") { + withTable("s", "t") { + sql( + "CREATE TABLE t (id INT, `__paimon_raw_blob_placeholder_0` STRING, picture BINARY) " + + "TBLPROPERTIES ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', " + + "'blob-field'='picture')") + sql("INSERT INTO t VALUES (1, 'old_marker_name', X'01'), (2, 'kept', X'02')") + + sql("CREATE TABLE s (id INT, `__paimon_raw_blob_placeholder_0` STRING, picture BINARY)") + sql("INSERT INTO s VALUES (1, 'new_marker_name', X'4E4557')") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET + | t.`__paimon_raw_blob_placeholder_0` = s.`__paimon_raw_blob_placeholder_0`, + | t.picture = s.picture + |""".stripMargin) + + checkAnswer( + sql("SELECT id, `__paimon_raw_blob_placeholder_0`, picture FROM t ORDER BY id"), + Seq(Row(1, "new_marker_name", Array[Byte](78, 69, 87)), Row(2, "kept", Array[Byte](2))) + ) + } + } + + test("Blob: self merge updates raw-data BLOB column") { + withTable("t") { + sql("CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES " + + "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture')") + sql( + "INSERT INTO t VALUES " + + "(1, 'name1', X'48656C6C6F'), " + + "(2, 'name2', X'5945'), " + + "(3, 'name3', X'414243')") + + sql(""" + |MERGE INTO t + |USING t AS source + |ON t._ROW_ID = source._ROW_ID + |WHEN MATCHED AND source.id = 1 THEN + | UPDATE SET t.picture = unhex(concat(hex(source.picture), '01')) + |""".stripMargin) + + checkAnswer( + sql("SELECT id, picture FROM t ORDER BY id"), + Seq( + Row(1, Array[Byte](72, 101, 108, 108, 111, 1)), + Row(2, Array[Byte](89, 69)), + Row(3, Array[Byte](65, 66, 67))) + ) + } + } + + test("Blob: merge-into updates multiple raw-data BLOB columns with split blob files") { + withTable("s", "t") { + def bytesHex(value: Int, length: Int): String = { + Seq.fill(length)(f"$value%02X").mkString + } + + def bytes(value: Int, length: Int): Array[Byte] = { + Array.fill[Byte](length)(value.toByte) + } + + def blobFileRanges(sequenceFilter: String): Map[String, Seq[(Long, Long)]] = { + val blobFiles = sql( + "SELECT first_row_id, record_count, write_cols FROM `t$files` " + + s"WHERE file_path LIKE '%.blob' AND $sequenceFilter") + .collect() + blobFiles + .groupBy(row => row.getSeq[String](2).head) + .map { + case (field, rows) => + field -> rows + .map(row => row.getLong(0) -> row.getLong(1)) + .sortBy(_._1) + .toSeq + } + } + + sql( + "CREATE TABLE t (id INT, pic1 BINARY, pic2 BINARY) TBLPROPERTIES (" + + "'row-tracking.enabled'='true', 'data-evolution.enabled'='true', " + + "'blob-field'='pic1,pic2', 'blob.target-file-size'='30 b')") + sql( + "INSERT INTO t " + + "SELECT /*+ REPARTITION(1) */ id, pic1, pic2 FROM VALUES " + + s"(1, X'${bytesHex(1, 20)}', X'${bytesHex(31, 20)}'), " + + s"(2, X'${bytesHex(2, 20)}', X'${bytesHex(32, 20)}'), " + + s"(3, X'${bytesHex(3, 20)}', X'${bytesHex(33, 20)}'), " + + s"(4, X'${bytesHex(4, 20)}', X'${bytesHex(34, 20)}') " + + "AS v(id, pic1, pic2)") + + val oldRanges = blobFileRanges("max_sequence_number = 1") + val oneBlobPerFile = Seq(0L -> 1L, 1L -> 1L, 2L -> 1L, 3L -> 1L) + assert(oldRanges == Map("pic1" -> oneBlobPerFile, "pic2" -> oneBlobPerFile)) + val dataFileRanges = sql( + "SELECT first_row_id, record_count FROM `t$files` " + + "WHERE file_path NOT LIKE '%.blob' AND max_sequence_number = 1") + .collect() + .map(row => row.getLong(0) -> row.getLong(1)) + .sortBy(_._1) + .toSeq + assert(dataFileRanges == Seq(0L -> 4L)) + + sql("CREATE TABLE s (id INT, pic1 BINARY, pic2 BINARY)") + sql( + "INSERT INTO s VALUES " + + s"(1, X'${bytesHex(11, 1)}', X'${bytesHex(41, 20)}'), " + + s"(2, X'${bytesHex(12, 1)}', X'${bytesHex(42, 20)}'), " + + s"(3, X'${bytesHex(13, 1)}', X'${bytesHex(43, 20)}'), " + + s"(4, X'${bytesHex(14, 1)}', X'${bytesHex(44, 20)}')") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.pic1 = s.pic1, t.pic2 = s.pic2 + |""".stripMargin) + + checkAnswer( + sql("SELECT id, pic1, pic2 FROM t ORDER BY id"), + Seq( + Row(1, bytes(11, 1), bytes(41, 20)), + Row(2, bytes(12, 1), bytes(42, 20)), + Row(3, bytes(13, 1), bytes(43, 20)), + Row(4, bytes(14, 1), bytes(44, 20))) + ) + + val updatedRanges = blobFileRanges("max_sequence_number > 1") + assert(updatedRanges("pic1") == Seq(0L -> 2L, 2L -> 2L)) + assert(updatedRanges("pic2") == oneBlobPerFile) + assert(updatedRanges("pic1") != oldRanges("pic1")) + assert(updatedRanges("pic1") != updatedRanges("pic2")) + } + } + + test("Blob: merge-into updates non-blob column on raw blob table with split blob files") { + withTable("s", "t") { + sql( + "CREATE TABLE t (id INT, name STRING, picture BINARY) TBLPROPERTIES " + + "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', " + + "'blob-field'='picture', 'blob.target-file-size'='1 b')") + sql( + "INSERT INTO t VALUES " + + "(1, 'name1', X'48656C6C6F'), " + + "(2, 'name2', X'5945'), " + + "(3, 'name3', X'414243')") + + sql("CREATE TABLE s (id INT, name STRING)") + sql("INSERT INTO s VALUES (1, 'updated_name1')") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN UPDATE SET t.name = s.name + |""".stripMargin) + + checkAnswer( + sql("SELECT id, name, picture FROM t ORDER BY id"), + Seq( + Row(1, "updated_name1", Array[Byte](72, 101, 108, 108, 111)), + Row(2, "name2", Array[Byte](89, 69)), + Row(3, "name3", Array[Byte](65, 66, 67))) + ) + } + } + + test("Blob: merge-into matched actions update different non-blob columns") { + withTable("s", "t") { + sql( + "CREATE TABLE t (id INT, name STRING, label STRING, picture BINARY) TBLPROPERTIES " + + "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', " + + "'blob-field'='picture')") + sql( + "INSERT INTO t VALUES " + + "(1, 'name1', 'label1', X'01'), " + + "(2, 'name2', 'label2', X'02'), " + + "(3, 'name3', 'label3', X'03')") + + sql("CREATE TABLE s (id INT, action INT, new_name STRING, new_label STRING)") + sql( + "INSERT INTO s VALUES " + + "(1, 1, 'updated_name1', 'unused_label1'), " + + "(2, 2, 'unused_name2', 'updated_label2')") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED AND s.action = 1 THEN UPDATE SET t.name = s.new_name + |WHEN MATCHED AND s.action = 2 THEN UPDATE SET t.label = s.new_label + |""".stripMargin) + + checkAnswer( + sql("SELECT id, name, label, picture FROM t ORDER BY id"), + Seq( + Row(1, "updated_name1", "label1", Array[Byte](1)), + Row(2, "name2", "updated_label2", Array[Byte](2)), + Row(3, "name3", "label3", Array[Byte](3))) + ) + } + } + +} + +class BlobUpdateTestWithV2Write extends BlobUpdateTestBase { + override def sparkConf: SparkConf = { + super.sparkConf.set("spark.paimon.write.use-v2-write", "true") + } +}