From a74539b54d8ec58fee355fc5e25a47d2d8207513 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Tue, 9 Jun 2026 15:54:45 +0800 Subject: [PATCH] [spark] Support merge delete for data evolution tables --- .../DataEvolutionDeleteRewriter.java | 316 ++++++++++++ .../MergeIntoPaimonDataEvolutionTable.scala | 480 ++++++++++++++++-- .../commands/DataEvolutionPaimonWriter.scala | 63 ++- .../MergeIntoPaimonDataEvolutionTable.scala | 455 +++++++++++++++-- .../paimon/spark/sql/BlobTestBase.scala | 31 ++ .../spark/sql/RowTrackingTestBase.scala | 51 ++ 6 files changed, 1288 insertions(+), 108 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionDeleteRewriter.java diff --git a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionDeleteRewriter.java b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionDeleteRewriter.java new file mode 100644 index 000000000000..7ebe62afbb8a --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionDeleteRewriter.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append.dataevolution; + +import org.apache.paimon.AppendOnlyFileStore; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatDiscover; +import org.apache.paimon.format.blob.BlobFileFormat; +import org.apache.paimon.globalindex.IndexedSplit; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.io.FileWriter; +import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.io.RowDataFileWriter; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.statistics.NoneSimpleColStatsCollector; +import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.LongCounter; +import org.apache.paimon.utils.Range; +import org.apache.paimon.utils.StatsCollectorFactories; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.LongFunction; + +import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; +import static org.apache.paimon.types.VectorType.isVectorStoreFile; + +/** Rewrites data-evolution files by removing deleted row id ranges. */ +public class DataEvolutionDeleteRewriter { + + private final FileStoreTable table; + private final CoreOptions options; + private final AppendOnlyFileStore store; + private final FileStorePathFactory pathFactory; + private final FileFormatDiscover formatDiscover; + private final StatsCollectorFactories statsCollectorFactories; + private final LongFunction schemaFetcher; + private final Map schemaCache; + + public DataEvolutionDeleteRewriter(FileStoreTable table) { + this.table = table; + this.options = table.coreOptions(); + this.store = (AppendOnlyFileStore) table.store(); + this.pathFactory = table.store().pathFactory(); + this.formatDiscover = FileFormatDiscover.of(options); + this.statsCollectorFactories = new StatsCollectorFactories(options); + this.schemaCache = new LinkedHashMap<>(); + this.schemaFetcher = + schemaId -> schemaCache.computeIfAbsent(schemaId, table.schemaManager()::schema); + } + + public List rewrite(List dataSplits, List deleteRanges) + throws IOException { + if (deleteRanges.isEmpty()) { + return Collections.emptyList(); + } + + Map plans = new LinkedHashMap<>(); + for (DataSplit dataSplit : dataSplits) { + SplitKey key = + new SplitKey( + dataSplit.partition(), dataSplit.bucket(), dataSplit.totalBuckets()); + RewritePlan plan = plans.computeIfAbsent(key, k -> new RewritePlan()); + for (DataFileMeta file : dataSplit.dataFiles()) { + List retainedRanges = retainedRanges(file, deleteRanges); + if (isFullRange(file, retainedRanges)) { + continue; + } + if (isVectorStoreFile(file.fileName())) { + throw new UnsupportedOperationException( + "DataEvolution MergeInto DELETE does not support vector-store files."); + } + + plan.compactBefore.add(file); + for (Range retainedRange : retainedRanges) { + plan.compactAfter.add( + rewriteFile( + dataSplit.partition(), + dataSplit.bucket(), + file, + retainedRange)); + } + } + } + + List result = new ArrayList<>(); + for (Map.Entry entry : plans.entrySet()) { + RewritePlan plan = entry.getValue(); + if (!plan.compactBefore.isEmpty()) { + SplitKey key = entry.getKey(); + result.add( + new CommitMessageImpl( + key.partition, + key.bucket, + key.totalBuckets, + DataIncrement.emptyIncrement(), + new CompactIncrement( + plan.compactBefore, + plan.compactAfter, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList()))); + } + } + return result; + } + + public static List retainedRanges(DataFileMeta file, List deleteRanges) { + List retained = new ArrayList<>(); + retained.add(file.nonNullRowIdRange()); + + for (Range deleteRange : deleteRanges) { + List next = new ArrayList<>(); + for (Range range : retained) { + Range intersection = Range.intersection(range, deleteRange); + if (intersection == null) { + next.add(range); + continue; + } + if (range.from < intersection.from) { + next.add(new Range(range.from, intersection.from - 1)); + } + if (intersection.to < range.to) { + next.add(new Range(intersection.to + 1, range.to)); + } + } + retained = next; + if (retained.isEmpty()) { + break; + } + } + + return retained; + } + + private boolean isFullRange(DataFileMeta file, List retainedRanges) { + return retainedRanges.size() == 1 && retainedRanges.get(0).equals(file.nonNullRowIdRange()); + } + + private DataFileMeta rewriteFile( + BinaryRow partition, int bucket, DataFileMeta file, Range retainedRange) + throws IOException { + RowType writeType = + schemaFetcher.apply(file.schemaId()).project(file.writeCols()).logicalRowType(); + DataSplit split = + DataSplit.builder() + .withPartition(partition) + .withBucket(bucket) + .withDataFiles(Collections.singletonList(file)) + .withBucketPath(pathFactory.bucketPath(partition, bucket).toString()) + .rawConvertible(false) + .build(); + + RecordReader reader = + store.newDataEvolutionRead() + .withReadType(writeType) + .createReader( + new IndexedSplit( + split, Collections.singletonList(retainedRange), null)); + FileWriter writer = + createFileWriter(partition, bucket, file, writeType); + + try { + reader.forEachRemaining( + row -> { + try { + writer.write(row); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + writer.close(); + return writer.result() + .assignFirstRowId(retainedRange.from) + .assignSequenceNumber(file.minSequenceNumber(), file.maxSequenceNumber()); + } catch (UncheckedIOException e) { + writer.abort(); + throw e.getCause(); + } catch (IOException e) { + writer.abort(); + throw e; + } catch (RuntimeException e) { + writer.abort(); + throw e; + } + } + + private FileWriter createFileWriter( + BinaryRow partition, int bucket, DataFileMeta file, RowType writeType) { + DataFilePathFactory filePathFactory = + pathFactory.createDataFilePathFactory(partition, bucket); + if (isBlobFile(file.fileName())) { + return new RowDataFileWriter( + table.fileIO(), + RollingFileWriter.createFileWriterContext( + new BlobFileFormat(), + writeType, + noneStatsCollectors(writeType), + "none"), + filePathFactory.newBlobPath(), + writeType, + file.schemaId(), + () -> new LongCounter(0), + new FileIndexOptions(), + FileSource.COMPACT, + false, + options.statsDenseStore(), + filePathFactory.isExternalPath(), + file.writeCols()); + } + + String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName()); + FileFormat fileFormat = formatDiscover.discover(formatIdentifier); + return new RowDataFileWriter( + table.fileIO(), + RollingFileWriter.createFileWriterContext( + fileFormat, + writeType, + statsCollectorFactories.statsCollectors(writeType.getFieldNames()), + options.fileCompression()), + filePathFactory.newPath(), + writeType, + file.schemaId(), + () -> new LongCounter(0), + new FileIndexOptions(), + FileSource.COMPACT, + false, + options.statsDenseStore(), + filePathFactory.isExternalPath(), + file.writeCols()); + } + + private SimpleColStatsCollector.Factory[] noneStatsCollectors(RowType writeType) { + SimpleColStatsCollector.Factory[] factories = + new SimpleColStatsCollector.Factory[writeType.getFieldCount()]; + for (int i = 0; i < factories.length; i++) { + factories[i] = NoneSimpleColStatsCollector::new; + } + return factories; + } + + private static class RewritePlan { + + private final List compactBefore = new ArrayList<>(); + private final List compactAfter = new ArrayList<>(); + } + + private static class SplitKey { + + private final BinaryRow partition; + private final int bucket; + private final Integer totalBuckets; + + private SplitKey(BinaryRow partition, int bucket, Integer totalBuckets) { + this.partition = partition; + this.bucket = bucket; + this.totalBuckets = totalBuckets; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof SplitKey)) { + return false; + } + SplitKey that = (SplitKey) obj; + return bucket == that.bucket + && Objects.equals(partition, that.partition) + && Objects.equals(totalBuckets, that.totalBuckets); + } + + @Override + public int hashCode() { + return Objects.hash(partition, bucket, totalBuckets); + } + } +} diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index ad6f5b95011a..1d3078c9ed50 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -19,9 +19,10 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions.GlobalIndexColumnUpdateAction +import org.apache.paimon.append.dataevolution.DataEvolutionDeleteRewriter import org.apache.paimon.data.BinaryRow import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile -import org.apache.paimon.io.{CompactIncrement, DataIncrement} +import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement} import org.apache.paimon.manifest.IndexManifestEntry import org.apache.paimon.spark.SparkTable import org.apache.paimon.spark.catalyst.analysis.PaimonRelation @@ -35,14 +36,16 @@ import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl} import org.apache.paimon.table.source.DataSplit import org.apache.paimon.table.source.snapshot.SnapshotReader import org.apache.paimon.types.RowType +import org.apache.paimon.types.VectorType.isVectorStoreFile +import org.apache.paimon.utils.Range import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.PaimonUtils._ import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolver -import org.apache.spark.sql.catalyst.expressions.{Alias, And, AttributeReference, EqualTo, Expression, ExprId, Literal, PythonUDF, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, And, AttributeReference, Coalesce, EqualTo, Expression, ExprId, Literal, Not, Or, PythonUDF, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} -import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter} +import org.apache.spark.sql.catalyst.plans.{Inner, LeftAnti, LeftOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.{col, udf} @@ -74,8 +77,13 @@ case class MergeIntoPaimonDataEvolutionTable( notMatchedBySourceActions.isEmpty, "notMatchedBySourceActions is not supported in MergeIntoPaimonDataEvolutionTable.") assert( - matchedActions.forall(x => x.isInstanceOf[UpdateAction]), - "Only SET clause is supported in MergeIntoPaimonDataEvolutionTable for SQL: WHEN MATCHED.") + matchedActions.forall { + case _: UpdateAction | _: DeleteAction => true + case _ => false + }, + "Only SET and DELETE clauses are supported in MergeIntoPaimonDataEvolutionTable for SQL: " + + "WHEN MATCHED." + ) assert( notMatchedActions.forall(x => x.isInstanceOf[InsertAction]), "Only INSERT clause is supported in MergeIntoPaimonDataEvolutionTable for SQL: WHEN NOT MATCHED." @@ -85,17 +93,19 @@ case class MergeIntoPaimonDataEvolutionTable( override val table: FileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable] + private lazy val updateActions: Seq[UpdateAction] = + matchedActions.collect { case action: UpdateAction => action } + + private lazy val deleteActions: Seq[DeleteAction] = + matchedActions.collect { case action: DeleteAction => action } + private val updateColumns: Set[AttributeReference] = { val columns = mutable.Set[AttributeReference]() - for (action <- matchedActions) { - action match { - case updateAction: UpdateAction => - for (assignment <- updateAction.assignments) { - if (!assignment.key.equals(assignment.value)) { - val key = assignment.key.asInstanceOf[AttributeReference] - columns ++= Seq(key) - } - } + for (updateAction <- updateActions) { + for (assignment <- updateAction.assignments) { + if (isModifiedAssignment(assignment)) { + columns += assignmentKeyAttribute(assignment) + } } } columns.toSet @@ -116,7 +126,9 @@ case class MergeIntoPaimonDataEvolutionTable( * without any extra shuffle, join, or sort. */ private lazy val isSelfMergeOnRowId: Boolean = { - if (!isPaimonTable(sourceTable)) { + if (deleteActions.nonEmpty) { + false + } else if (!isPaimonTable(sourceTable)) { false } else if (!targetRelation.name.equals(PaimonRelation.getPaimonRelation(sourceTable).name)) { false @@ -158,13 +170,7 @@ case class MergeIntoPaimonDataEvolutionTable( .map(_.asInstanceOf[DataSplit]) .toSeq - val firstRowIds: immutable.IndexedSeq[Long] = tableSplits - .flatMap(_.dataFiles().asScala) - .filter(file => file.firstRowId() != null && !isBlobFile(file.fileName())) - .map(file => file.firstRowId().asInstanceOf[Long]) - .distinct - .sorted - .toIndexedSeq + val firstRowIds: immutable.IndexedSeq[Long] = normalFirstRowIds(tableSplits) val firstRowIdToBlobFirstRowIds: Map[Long, List[Long]] = { val map = new mutable.HashMap[Long, List[Long]]() @@ -207,29 +213,83 @@ case class MergeIntoPaimonDataEvolutionTable( val touchedFileTargetRelation = createNewScanPlan(dataSplits, targetRelation) - // step 2: invoke update action - val updateCommit = - if (matchedActions.nonEmpty) { - val updateResult = - updateActionInvoke( - dataSplits, - sparkSession, - touchedFileTargetRelation, - firstRowIds, - persistSourceDss) - checkUpdateResult(updateResult) - } else Nil + // step 2: invoke delete action + val (deleteRanges, deleteCommit) = + if (deleteActions.nonEmpty) { + val ranges = + deleteActionInvoke(sparkSession, touchedFileTargetRelation, persistSourceDss) + val commit = + if (ranges.nonEmpty) { + val deleteResult = + new DataEvolutionDeleteRewriter(table).rewrite(dataSplits.asJava, ranges.asJava) + checkDeleteResult(deleteResult.asScala.toSeq) + } else { + Nil + } + (ranges, commit) + } else { + (Nil, Nil) + } + + val hasUpdateAction = updateActions.nonEmpty && updateColumns.nonEmpty + val commitDeleteFirst = deleteCommit.nonEmpty && hasUpdateAction - // step 3: invoke insert action + // INSERT must be planned against the original target files to preserve MERGE NOT MATCHED + // semantics when a matched DELETE removes a target row. val insertCommit = if (notMatchedActions.nonEmpty) insertActionInvoke(sparkSession, touchedFileTargetRelation, persistSourceDss) else Nil - if (plan.snapshotId() != null) { - writer.rowIdCheckConflict(plan.snapshotId()) + val (updateDataSplits, updateTargetRelation, updateFirstRowIds, updateDeleteRanges) = + if (commitDeleteFirst) { + if (plan.snapshotId() != null) { + writer.rowIdCheckConflict(plan.snapshotId()) + } + writer.commit(deleteCommit) + val postDeleteSplits = dataSplitsAfterDelete(dataSplits, deleteCommit) + ( + postDeleteSplits, + createNewScanPlan(postDeleteSplits, targetRelation), + normalFirstRowIds(postDeleteSplits), + Nil) + } else { + (dataSplits, touchedFileTargetRelation, firstRowIds, deleteRanges) + } + + // step 3: invoke update action + val updateCommit = + if (hasUpdateAction) { + val updateResult = updateActionInvoke( + updateDataSplits, + sparkSession, + updateTargetRelation, + updateFirstRowIds, + updateDeleteRanges, + persistSourceDss) + checkUpdateResult(updateResult) + } else Nil + + // step 4: commit remaining actions + val remainingCommit = + (if (commitDeleteFirst) Nil else deleteCommit) ++ updateCommit ++ insertCommit + val remainingWriter = + if (commitDeleteFirst) { + val nextWriter = PaimonSparkWriter(table) + val postDeletePlan = table.newSnapshotReader().read() + if (postDeletePlan.snapshotId() != null) { + nextWriter.rowIdCheckConflict(postDeletePlan.snapshotId()) + } + nextWriter + } else { + if (plan.snapshotId() != null) { + writer.rowIdCheckConflict(plan.snapshotId()) + } + writer + } + if (remainingCommit.nonEmpty) { + remainingWriter.commit(remainingCommit) } - writer.commit(updateCommit ++ insertCommit) } finally { if (persistSourceDss.isDefined) { persistSourceDss.get.unpersist(blocking = false) @@ -329,6 +389,7 @@ case class MergeIntoPaimonDataEvolutionTable( sparkSession: SparkSession, touchedFileTargetRelation: DataSourceV2Relation, firstRowIds: immutable.IndexedSeq[Long], + deleteRanges: Seq[Range], persistSourceDss: Option[Dataset[Row]]): Seq[CommitMessage] = { val mergeFields = extractFields(matchedCondition) val allFields = mutable.SortedSet.empty[AttributeReference]( @@ -338,6 +399,21 @@ case class MergeIntoPaimonDataEvolutionTable( val updateColumnsSorted = updateColumns.toSeq.sortBy( s => targetTable.output.map(x => x.toString()).indexOf(s.toString())) + val retainedFileRanges = + if (deleteRanges.nonEmpty) { + retainedNormalFileRanges(dataSplits, deleteRanges) + } else { + Nil + } + if (deleteRanges.nonEmpty && retainedFileRanges.isEmpty) { + return Nil + } + val writeFirstRowIds = + if (deleteRanges.nonEmpty) { + retainedFileRanges.map(_.firstRowId).distinct.sorted.toIndexedSeq + } else { + firstRowIds + } // Different Spark versions might produce duplicate attributes between `output` and // `metadataOutput`, so manually deduplicate by `exprId`. @@ -349,16 +425,13 @@ case class MergeIntoPaimonDataEvolutionTable( val assignments = metadataColumns.map(column => Assignment(column, column)) val output = updateColumnsSorted ++ metadataColumns - val realUpdateActions = matchedActions - .map(s => s.asInstanceOf[UpdateAction]) + val realUpdateActions = updateActions .map( update => UpdateAction.apply( update.condition, update.assignments.filter( - a => - updateColumnsSorted.contains( - a.key.asInstanceOf[AttributeReference])) ++ assignments)) + a => updateColumnsSorted.contains(assignmentKeyAttribute(a))) ++ assignments)) for (action <- realUpdateActions) { allFields ++= action.references.flatMap(r => extractFields(r)).seq @@ -434,7 +507,8 @@ case class MergeIntoPaimonDataEvolutionTable( child = readPlan ) - val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds) + val withFirstRowId = + filterDeletedRows(addFirstRowId(sparkSession, mergeRows, writeFirstRowIds), deleteRanges) assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2) withFirstRowId } else { @@ -483,7 +557,8 @@ case class MergeIntoPaimonDataEvolutionTable( output = output, child = joinPlan ) - val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds) + val withFirstRowId = + filterDeletedRows(addFirstRowId(sparkSession, mergeRows, writeFirstRowIds), deleteRanges) assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2) withFirstRowId .repartition(col(FIRST_ROW_ID_NAME)) @@ -491,7 +566,14 @@ case class MergeIntoPaimonDataEvolutionTable( } val writer = DataEvolutionPaimonWriter(table, dataSplits) - writer.writePartialFields(toWrite, updateColumnsSorted.map(_.name)) + if (deleteRanges.nonEmpty) { + writer.writePartialFieldsForRanges( + toWrite, + updateColumnsSorted.map(_.name), + retainedFileRanges) + } else { + writer.writePartialFields(toWrite, updateColumnsSorted.map(_.name)) + } } private def insertActionInvoke( @@ -539,6 +621,62 @@ case class MergeIntoPaimonDataEvolutionTable( writer.write(toWrite) } + private def deleteActionInvoke( + sparkSession: SparkSession, + touchedFileTargetRelation: DataSourceV2Relation, + persistSourceDss: Option[Dataset[Row]]): Seq[Range] = { + val mergeFields = extractFields(matchedCondition) + val allFields = mutable.SortedSet.empty[AttributeReference]( + (o1, o2) => { + o1.toString().compareTo(o2.toString()) + }) ++ mergeFields + + matchedActions.flatMap(_.condition).foreach(condition => allFields ++= extractFields(condition)) + + // Different Spark versions might produce duplicate attributes between `output` and + // `metadataOutput`, so manually deduplicate by `exprId`. + val metadataColumns = (targetRelation.output ++ targetRelation.metadataOutput) + .filter(attr => attr.name.equals(ROW_ID_NAME)) + .groupBy(_.exprId) + .map { case (_, attrs) => attrs.head } + .toSeq + + val allReadFieldsOnTarget = allFields.filter( + field => + targetTable.output.exists(attr => attr.exprId.equals(field.exprId))) ++ metadataColumns + val allReadFieldsOnSource = + allFields.filter(field => sourceTable.output.exists(attr => attr.exprId.equals(field.exprId))) + + val targetReadPlan = + touchedFileTargetRelation.copy(output = allReadFieldsOnTarget.toSeq) + val sourceChild = persistSourceDss.map(_.queryExecution.logical).getOrElse(sourceTable) + val sourceReadPlan = Project(allReadFieldsOnSource.toSeq, sourceChild) + + val joinPlan = + Join(targetReadPlan, sourceReadPlan, Inner, Some(matchedCondition), JoinHint.NONE) + + var previousMatched: Expression = FalseLiteral + val deleteConditions = matchedActions.flatMap { + action => + val currentMatched = nullAsFalse(action.condition.getOrElse(TrueLiteral)) + val effectiveCondition = And(currentMatched, Not(previousMatched)) + previousMatched = Or(previousMatched, currentMatched) + action match { + case _: DeleteAction => Some(effectiveCondition) + case _ => None + } + } + + if (deleteConditions.isEmpty) { + Nil + } else { + val rowIdAttr = joinPlan.output.find(_.exprId.equals(metadataColumns.head.exprId)).get + val deletePlan = + Project(Seq(rowIdAttr), Filter(deleteConditions.reduce(Or), joinPlan)) + collectRowIdRanges(createDataset(sparkSession, deletePlan), sparkSession) + } + } + /** * Attempts to identify a direct mapping from sourceTable's attribute to the target table's * `_ROW_ID`. @@ -636,6 +774,52 @@ case class MergeIntoPaimonDataEvolutionTable( } } + private def checkDeleteResult(deleteCommit: Seq[CommitMessage]): Seq[CommitMessage] = { + val affectedParts: Set[BinaryRow] = deleteCommit.map(_.partition()).toSet + + val latestSnapshot = table.latestSnapshot() + if (!latestSnapshot.isPresent) { + return deleteCommit + } + + val filter: org.apache.paimon.utils.Filter[IndexManifestEntry] = + (entry: IndexManifestEntry) => { + entry.indexFile().globalIndexMeta() != null && affectedParts.contains(entry.partition()) + } + + val affectedIndexEntries = table + .store() + .newIndexFileHandler() + .scan(latestSnapshot.get(), filter) + .asScala + + if (affectedIndexEntries.isEmpty) { + deleteCommit + } else { + table.coreOptions().globalIndexColumnUpdateAction() match { + case GlobalIndexColumnUpdateAction.THROW_ERROR => + val affectedPartitions = affectedParts.map(_.toString).toSeq.sorted + throw new RuntimeException( + s"""MergeInto: delete rows affect partitions with global indexes, not supported now. + |Affected partitions: ${affectedPartitions.mkString("[", ", ", "]")} + |""".stripMargin) + case GlobalIndexColumnUpdateAction.DROP_PARTITION_INDEX => + val grouped = affectedIndexEntries.groupBy(_.partition()) + val deleteCommitMessages = ArrayBuffer.empty[CommitMessage] + grouped.foreach { + case (part, entries) => + deleteCommitMessages += new CommitMessageImpl( + part, + 0, + null, + DataIncrement.deleteIndexIncrement(entries.map(_.indexFile()).asJava), + CompactIncrement.emptyIncrement()) + } + deleteCommit ++ deleteCommitMessages + } + } + } + private def findRelatedFirstRowIds( dataset: Dataset[Row], sparkSession: SparkSession, @@ -659,6 +843,167 @@ case class MergeIntoPaimonDataEvolutionTable( .collect() } + private def collectRowIdRanges(dataset: Dataset[Row], sparkSession: SparkSession): Seq[Range] = { + import sparkSession.implicits._ + if (dataset.columns.isEmpty) { + return Nil + } + + val rowIdColumn = dataset.columns.head + val ranges = dataset + .select(col(rowIdColumn)) + .distinct() + .sort(col(rowIdColumn)) + .as[Long] + .mapPartitions(MergeIntoPaimonDataEvolutionTable.rowIdsToRanges) + .collect() + .toSeq + .map { case (from, to) => new Range(from, to) } + + Range.sortAndMergeOverlap(ranges.asJava, true).asScala.toSeq + } + + private def normalFirstRowIds(dataSplits: Seq[DataSplit]): immutable.IndexedSeq[Long] = { + dataSplits + .flatMap(_.dataFiles().asScala) + .filter( + file => + file.firstRowId() != null && + !isBlobFile(file.fileName()) && + !isVectorStoreFile(file.fileName())) + .map(file => file.firstRowId().asInstanceOf[Long]) + .distinct + .sorted + .toIndexedSeq + } + + private def dataSplitsAfterDelete( + dataSplits: Seq[DataSplit], + deleteCommit: Seq[CommitMessage]): Seq[DataSplit] = { + val replacements = new mutable.HashMap[String, Seq[DataFileMeta]] + deleteCommit.foreach { + message => + val compactIncrement = message.asInstanceOf[CommitMessageImpl].compactIncrement() + val compactBefore = compactIncrement.compactBefore().asScala + val compactAfter = compactIncrement.compactAfter().asScala + compactBefore.foreach { + before => + replacements.put( + before.fileName(), + compactAfter + .filter( + after => + Range + .intersection(before.nonNullRowIdRange(), after.nonNullRowIdRange()) != null) + .toSeq) + } + } + + dataSplits.map { + split => + val files = split.dataFiles().asScala + val deletionFiles = split.deletionFiles().orElse(null) + val newFiles = ArrayBuffer.empty[DataFileMeta] + val newDeletionFiles = + if (deletionFiles == null) { + null + } else { + new java.util.ArrayList[org.apache.paimon.table.source.DeletionFile]() + } + + files.zipWithIndex.foreach { + case (file, index) => + replacements.get(file.fileName()) match { + case Some(afterFiles) => + afterFiles.foreach { + afterFile => + newFiles += afterFile + if (newDeletionFiles != null) { + newDeletionFiles.add(null) + } + } + case None => + newFiles += file + if (newDeletionFiles != null) { + newDeletionFiles.add(deletionFiles.get(index)) + } + } + } + + val builder = DataSplit + .builder() + .withSnapshot(split.snapshotId()) + .withPartition(split.partition()) + .withBucket(split.bucket()) + .withBucketPath(split.bucketPath()) + .withTotalBuckets(split.totalBuckets()) + .withDataFiles(newFiles.asJava) + .isStreaming(split.isStreaming()) + .rawConvertible(false) + if (newDeletionFiles != null) { + builder.withDataDeletionFiles(newDeletionFiles) + } + builder.build() + } + } + + private def filterDeletedRows(data: Dataset[Row], deleteRanges: Seq[Range]): Dataset[Row] = { + if (deleteRanges.isEmpty) { + data + } else { + val ranges = deleteRanges.map(range => (range.from, range.to)).toArray + val keepRow = udf { + rowId: Long => + var low = 0 + var high = ranges.length - 1 + var found = false + while (low <= high && !found) { + val mid = (low + high) >>> 1 + val (from, to) = ranges(mid) + if (rowId < from) { + high = mid - 1 + } else if (rowId > to) { + low = mid + 1 + } else { + found = true + } + } + !found + } + data.filter(keepRow(col(ROW_ID_NAME))) + } + } + + private def retainedNormalFileRanges( + dataSplits: Seq[DataSplit], + deleteRanges: Seq[Range]): Seq[DataEvolutionPaimonWriter.FileRange] = { + dataSplits.flatMap { + split => + split + .dataFiles() + .asScala + .filter( + file => + file.firstRowId() != null && + !isBlobFile(file.fileName()) && + !isVectorStoreFile(file.fileName())) + .flatMap { + file => + DataEvolutionDeleteRewriter + .retainedRanges(file, deleteRanges.asJava) + .asScala + .map( + range => + DataEvolutionPaimonWriter + .FileRange(split.partition(), range.from, range.count())) + } + } + } + + private def nullAsFalse(condition: Expression): Expression = { + Coalesce(Seq(condition, FalseLiteral)) + } + private def extractFields(expression: Expression): Seq[AttributeReference] = { val fields = new ListBuffer[AttributeReference]() @@ -696,6 +1041,27 @@ object MergeIntoPaimonDataEvolutionTable { final private val ROW_ID_NAME = "_ROW_ID" final private val FIRST_ROW_ID_NAME = "_FIRST_ROW_ID"; + private[commands] def isModifiedAssignment(assignment: Assignment): Boolean = { + !sameAttributeReference(assignment.key, assignment.value) + } + + private[commands] def assignmentKeyAttribute(assignment: Assignment): AttributeReference = { + assignment.key match { + case key: AttributeReference => key + case other => + throw new UnsupportedOperationException( + s"Unsupported update assignment key: $other. Only top-level attributes are supported.") + } + } + + private def sameAttributeReference(left: Expression, right: Expression): Boolean = { + (left, right) match { + case (leftAttr: AttributeReference, rightAttr: AttributeReference) => + leftAttr.sameRef(rightAttr) + case _ => false + } + } + private def floorBinarySearch(indexed: immutable.IndexedSeq[Long], value: Long): Long = { if (indexed.isEmpty) { throw new IllegalArgumentException("The input sorted sequence is empty.") @@ -712,4 +1078,26 @@ object MergeIntoPaimonDataEvolutionTable { } } } + + private[commands] def rowIdsToRanges(rowIds: Iterator[Long]): Iterator[(Long, Long)] = { + val ranges = ArrayBuffer.empty[(Long, Long)] + if (!rowIds.hasNext) { + return ranges.iterator + } + + var start = rowIds.next() + var end = start + while (rowIds.hasNext) { + val rowId = rowIds.next() + if (rowId == end + 1) { + end = rowId + } else { + ranges += ((start, end)) + start = rowId + end = rowId + } + } + ranges += ((start, end)) + ranges.iterator + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala index c49368d1bf38..11b6a66d9719 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala @@ -19,6 +19,7 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions +import org.apache.paimon.data.BinaryRow import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile import org.apache.paimon.spark.write.{DataEvolutionTableDataWrite, WriteHelper, WriteTaskResult} import org.apache.paimon.table.FileStoreTable @@ -39,11 +40,51 @@ import scala.collection.mutable case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Seq[DataSplit]) extends WriteHelper { + import DataEvolutionPaimonWriter._ + // File rolling will never be performed override val table: FileStoreTable = paimonTable.copy(Collections.singletonMap(CoreOptions.TARGET_FILE_SIZE.key(), "99999 G")) def writePartialFields(data: DataFrame, columnNames: Seq[String]): Seq[CommitMessage] = { + val firstRowIdToPartitionMap = new mutable.HashMap[Long, (Array[Byte], Long)] + dataSplits.foreach( + split => + split + .dataFiles() + .asScala + .filter(file => !isBlobFile(file.fileName()) && !isVectorStoreFile(file.fileName())) + .foreach( + file => + firstRowIdToPartitionMap + .put( + file.firstRowId(), + // BinaryRow stores data in transient memory segments and relies on Java + // serialization hooks to restore them. Store bytes in Spark closures and + // broadcasts so Kryo does not serialize BinaryRow internals directly. + (SerializationUtils.serializeBinaryRow(split.partition()), file.rowCount()) + ))) + writePartialFields(data, columnNames, firstRowIdToPartitionMap) + } + + def writePartialFieldsForRanges( + data: DataFrame, + columnNames: Seq[String], + fileRanges: Seq[FileRange]): Seq[CommitMessage] = { + val firstRowIdToPartitionMap = new mutable.HashMap[Long, (Array[Byte], Long)] + fileRanges.foreach { + range => + firstRowIdToPartitionMap.put( + range.firstRowId, + (SerializationUtils.serializeBinaryRow(range.partition), range.rowCount)) + } + writePartialFields(data, columnNames, firstRowIdToPartitionMap) + } + + private def writePartialFields( + data: DataFrame, + columnNames: Seq[String], + firstRowIdToPartitionMap: mutable.HashMap[Long, (Array[Byte], Long)]): Seq[CommitMessage] = { val sparkSession = data.sparkSession import sparkSession.implicits._ assert(data.columns.length == columnNames.size + 2) @@ -62,23 +103,6 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Se CoreOptions.BLOB_EXTERNAL_STORAGE_FIELD.key() + "') can be updated.") } - val firstRowIdToPartitionMap = new mutable.HashMap[Long, (Array[Byte], Long)] - dataSplits.foreach( - split => - split - .dataFiles() - .asScala - .filter(file => !isBlobFile(file.fileName()) && !isVectorStoreFile(file.fileName())) - .foreach( - file => - firstRowIdToPartitionMap - .put( - file.firstRowId(), - // BinaryRow stores data in transient memory segments and relies on Java - // serialization hooks to restore them. Store bytes in Spark closures and - // broadcasts so Kryo does not serialize BinaryRow internals directly. - (SerializationUtils.serializeBinaryRow(split.partition()), file.rowCount()) - ))) val firstRowIdToPartitionMapBroadcast = sparkSession.sparkContext.broadcast(firstRowIdToPartitionMap) val writeBuilder = table.newBatchWriteBuilder() @@ -103,3 +127,8 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Se WriteTaskResult.merge(written.collect()) } } + +object DataEvolutionPaimonWriter { + + case class FileRange(partition: BinaryRow, firstRowId: Long, rowCount: Long) +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index cd1b000a361f..1d3078c9ed50 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -19,9 +19,10 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions.GlobalIndexColumnUpdateAction +import org.apache.paimon.append.dataevolution.DataEvolutionDeleteRewriter import org.apache.paimon.data.BinaryRow import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile -import org.apache.paimon.io.{CompactIncrement, DataIncrement} +import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement} import org.apache.paimon.manifest.IndexManifestEntry import org.apache.paimon.spark.SparkTable import org.apache.paimon.spark.catalyst.analysis.PaimonRelation @@ -36,14 +37,15 @@ import org.apache.paimon.table.source.DataSplit import org.apache.paimon.table.source.snapshot.SnapshotReader import org.apache.paimon.types.RowType import org.apache.paimon.types.VectorType.isVectorStoreFile +import org.apache.paimon.utils.Range import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.PaimonUtils._ import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolver -import org.apache.spark.sql.catalyst.expressions.{Alias, And, AttributeReference, EqualTo, Expression, ExprId, Literal, Or, PythonUDF, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, And, AttributeReference, Coalesce, EqualTo, Expression, ExprId, Literal, Not, Or, PythonUDF, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} -import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter} +import org.apache.spark.sql.catalyst.plans.{Inner, LeftAnti, LeftOuter} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.functions.{col, udf} @@ -75,8 +77,13 @@ case class MergeIntoPaimonDataEvolutionTable( notMatchedBySourceActions.isEmpty, "notMatchedBySourceActions is not supported in MergeIntoPaimonDataEvolutionTable.") assert( - matchedActions.forall(x => x.isInstanceOf[UpdateAction]), - "Only SET clause is supported in MergeIntoPaimonDataEvolutionTable for SQL: WHEN MATCHED.") + matchedActions.forall { + case _: UpdateAction | _: DeleteAction => true + case _ => false + }, + "Only SET and DELETE clauses are supported in MergeIntoPaimonDataEvolutionTable for SQL: " + + "WHEN MATCHED." + ) assert( notMatchedActions.forall(x => x.isInstanceOf[InsertAction]), "Only INSERT clause is supported in MergeIntoPaimonDataEvolutionTable for SQL: WHEN NOT MATCHED." @@ -86,16 +93,19 @@ case class MergeIntoPaimonDataEvolutionTable( override val table: FileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable] + private lazy val updateActions: Seq[UpdateAction] = + matchedActions.collect { case action: UpdateAction => action } + + private lazy val deleteActions: Seq[DeleteAction] = + matchedActions.collect { case action: DeleteAction => action } + private val updateColumns: Set[AttributeReference] = { val columns = mutable.Set[AttributeReference]() - for (action <- matchedActions) { - action match { - case updateAction: UpdateAction => - for (assignment <- updateAction.assignments) { - if (isModifiedAssignment(assignment)) { - columns += assignmentKeyAttribute(assignment) - } - } + for (updateAction <- updateActions) { + for (assignment <- updateAction.assignments) { + if (isModifiedAssignment(assignment)) { + columns += assignmentKeyAttribute(assignment) + } } } columns.toSet @@ -116,7 +126,9 @@ case class MergeIntoPaimonDataEvolutionTable( * without any extra shuffle, join, or sort. */ private lazy val isSelfMergeOnRowId: Boolean = { - if (!isPaimonTable(sourceTable)) { + if (deleteActions.nonEmpty) { + false + } else if (!isPaimonTable(sourceTable)) { false } else if (!targetRelation.name.equals(PaimonRelation.getPaimonRelation(sourceTable).name)) { false @@ -158,18 +170,7 @@ case class MergeIntoPaimonDataEvolutionTable( .map(_.asInstanceOf[DataSplit]) .toSeq - val firstRowIds: immutable.IndexedSeq[Long] = tableSplits - .flatMap(_.dataFiles().asScala) - .filter { - file => - file.firstRowId() != null && - !isBlobFile(file.fileName()) && - !isVectorStoreFile(file.fileName()) - } - .map(file => file.firstRowId().asInstanceOf[Long]) - .distinct - .sorted - .toIndexedSeq + val firstRowIds: immutable.IndexedSeq[Long] = normalFirstRowIds(tableSplits) val firstRowIdToBlobFirstRowIds: Map[Long, List[Long]] = { val map = new mutable.HashMap[Long, List[Long]]() @@ -212,28 +213,83 @@ case class MergeIntoPaimonDataEvolutionTable( val touchedFileTargetRelation = createNewScanPlan(dataSplits, targetRelation) - // step 2: invoke update action + // step 2: invoke delete action + val (deleteRanges, deleteCommit) = + if (deleteActions.nonEmpty) { + val ranges = + deleteActionInvoke(sparkSession, touchedFileTargetRelation, persistSourceDss) + val commit = + if (ranges.nonEmpty) { + val deleteResult = + new DataEvolutionDeleteRewriter(table).rewrite(dataSplits.asJava, ranges.asJava) + checkDeleteResult(deleteResult.asScala.toSeq) + } else { + Nil + } + (ranges, commit) + } else { + (Nil, Nil) + } + + val hasUpdateAction = updateActions.nonEmpty && updateColumns.nonEmpty + val commitDeleteFirst = deleteCommit.nonEmpty && hasUpdateAction + + // INSERT must be planned against the original target files to preserve MERGE NOT MATCHED + // semantics when a matched DELETE removes a target row. + val insertCommit = + if (notMatchedActions.nonEmpty) + insertActionInvoke(sparkSession, touchedFileTargetRelation, persistSourceDss) + else Nil + + val (updateDataSplits, updateTargetRelation, updateFirstRowIds, updateDeleteRanges) = + if (commitDeleteFirst) { + if (plan.snapshotId() != null) { + writer.rowIdCheckConflict(plan.snapshotId()) + } + writer.commit(deleteCommit) + val postDeleteSplits = dataSplitsAfterDelete(dataSplits, deleteCommit) + ( + postDeleteSplits, + createNewScanPlan(postDeleteSplits, targetRelation), + normalFirstRowIds(postDeleteSplits), + Nil) + } else { + (dataSplits, touchedFileTargetRelation, firstRowIds, deleteRanges) + } + + // step 3: invoke update action val updateCommit = - if (matchedActions.nonEmpty) { + if (hasUpdateAction) { val updateResult = updateActionInvoke( - dataSplits, + updateDataSplits, sparkSession, - touchedFileTargetRelation, - firstRowIds, + updateTargetRelation, + updateFirstRowIds, + updateDeleteRanges, persistSourceDss) checkUpdateResult(updateResult) } else Nil - // step 3: invoke insert action - val insertCommit = - if (notMatchedActions.nonEmpty) - insertActionInvoke(sparkSession, touchedFileTargetRelation, persistSourceDss) - else Nil - - if (plan.snapshotId() != null) { - writer.rowIdCheckConflict(plan.snapshotId()) + // step 4: commit remaining actions + val remainingCommit = + (if (commitDeleteFirst) Nil else deleteCommit) ++ updateCommit ++ insertCommit + val remainingWriter = + if (commitDeleteFirst) { + val nextWriter = PaimonSparkWriter(table) + val postDeletePlan = table.newSnapshotReader().read() + if (postDeletePlan.snapshotId() != null) { + nextWriter.rowIdCheckConflict(postDeletePlan.snapshotId()) + } + nextWriter + } else { + if (plan.snapshotId() != null) { + writer.rowIdCheckConflict(plan.snapshotId()) + } + writer + } + if (remainingCommit.nonEmpty) { + remainingWriter.commit(remainingCommit) } - writer.commit(updateCommit ++ insertCommit) } finally { if (persistSourceDss.isDefined) { persistSourceDss.get.unpersist(blocking = false) @@ -333,6 +389,7 @@ case class MergeIntoPaimonDataEvolutionTable( sparkSession: SparkSession, touchedFileTargetRelation: DataSourceV2Relation, firstRowIds: immutable.IndexedSeq[Long], + deleteRanges: Seq[Range], persistSourceDss: Option[Dataset[Row]]): Seq[CommitMessage] = { val mergeFields = extractFields(matchedCondition) val allFields = mutable.SortedSet.empty[AttributeReference]( @@ -342,6 +399,21 @@ case class MergeIntoPaimonDataEvolutionTable( val updateColumnsSorted = updateColumns.toSeq.sortBy( s => targetTable.output.map(x => x.toString()).indexOf(s.toString())) + val retainedFileRanges = + if (deleteRanges.nonEmpty) { + retainedNormalFileRanges(dataSplits, deleteRanges) + } else { + Nil + } + if (deleteRanges.nonEmpty && retainedFileRanges.isEmpty) { + return Nil + } + val writeFirstRowIds = + if (deleteRanges.nonEmpty) { + retainedFileRanges.map(_.firstRowId).distinct.sorted.toIndexedSeq + } else { + firstRowIds + } // Different Spark versions might produce duplicate attributes between `output` and // `metadataOutput`, so manually deduplicate by `exprId`. @@ -353,8 +425,7 @@ case class MergeIntoPaimonDataEvolutionTable( val assignments = metadataColumns.map(column => Assignment(column, column)) val output = updateColumnsSorted ++ metadataColumns - val realUpdateActions = matchedActions - .map(s => s.asInstanceOf[UpdateAction]) + val realUpdateActions = updateActions .map( update => UpdateAction.apply( @@ -436,7 +507,8 @@ case class MergeIntoPaimonDataEvolutionTable( child = readPlan ) - val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds) + val withFirstRowId = + filterDeletedRows(addFirstRowId(sparkSession, mergeRows, writeFirstRowIds), deleteRanges) assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2) withFirstRowId } else { @@ -485,7 +557,8 @@ case class MergeIntoPaimonDataEvolutionTable( output = output, child = joinPlan ) - val withFirstRowId = addFirstRowId(sparkSession, mergeRows, firstRowIds) + val withFirstRowId = + filterDeletedRows(addFirstRowId(sparkSession, mergeRows, writeFirstRowIds), deleteRanges) assert(withFirstRowId.schema.fields.length == updateColumnsSorted.size + 2) withFirstRowId .repartition(col(FIRST_ROW_ID_NAME)) @@ -493,7 +566,14 @@ case class MergeIntoPaimonDataEvolutionTable( } val writer = DataEvolutionPaimonWriter(table, dataSplits) - writer.writePartialFields(toWrite, updateColumnsSorted.map(_.name)) + if (deleteRanges.nonEmpty) { + writer.writePartialFieldsForRanges( + toWrite, + updateColumnsSorted.map(_.name), + retainedFileRanges) + } else { + writer.writePartialFields(toWrite, updateColumnsSorted.map(_.name)) + } } private def insertActionInvoke( @@ -541,6 +621,62 @@ case class MergeIntoPaimonDataEvolutionTable( writer.write(toWrite) } + private def deleteActionInvoke( + sparkSession: SparkSession, + touchedFileTargetRelation: DataSourceV2Relation, + persistSourceDss: Option[Dataset[Row]]): Seq[Range] = { + val mergeFields = extractFields(matchedCondition) + val allFields = mutable.SortedSet.empty[AttributeReference]( + (o1, o2) => { + o1.toString().compareTo(o2.toString()) + }) ++ mergeFields + + matchedActions.flatMap(_.condition).foreach(condition => allFields ++= extractFields(condition)) + + // Different Spark versions might produce duplicate attributes between `output` and + // `metadataOutput`, so manually deduplicate by `exprId`. + val metadataColumns = (targetRelation.output ++ targetRelation.metadataOutput) + .filter(attr => attr.name.equals(ROW_ID_NAME)) + .groupBy(_.exprId) + .map { case (_, attrs) => attrs.head } + .toSeq + + val allReadFieldsOnTarget = allFields.filter( + field => + targetTable.output.exists(attr => attr.exprId.equals(field.exprId))) ++ metadataColumns + val allReadFieldsOnSource = + allFields.filter(field => sourceTable.output.exists(attr => attr.exprId.equals(field.exprId))) + + val targetReadPlan = + touchedFileTargetRelation.copy(output = allReadFieldsOnTarget.toSeq) + val sourceChild = persistSourceDss.map(_.queryExecution.logical).getOrElse(sourceTable) + val sourceReadPlan = Project(allReadFieldsOnSource.toSeq, sourceChild) + + val joinPlan = + Join(targetReadPlan, sourceReadPlan, Inner, Some(matchedCondition), JoinHint.NONE) + + var previousMatched: Expression = FalseLiteral + val deleteConditions = matchedActions.flatMap { + action => + val currentMatched = nullAsFalse(action.condition.getOrElse(TrueLiteral)) + val effectiveCondition = And(currentMatched, Not(previousMatched)) + previousMatched = Or(previousMatched, currentMatched) + action match { + case _: DeleteAction => Some(effectiveCondition) + case _ => None + } + } + + if (deleteConditions.isEmpty) { + Nil + } else { + val rowIdAttr = joinPlan.output.find(_.exprId.equals(metadataColumns.head.exprId)).get + val deletePlan = + Project(Seq(rowIdAttr), Filter(deleteConditions.reduce(Or), joinPlan)) + collectRowIdRanges(createDataset(sparkSession, deletePlan), sparkSession) + } + } + /** * Attempts to identify a direct mapping from sourceTable's attribute to the target table's * `_ROW_ID`. @@ -638,6 +774,52 @@ case class MergeIntoPaimonDataEvolutionTable( } } + private def checkDeleteResult(deleteCommit: Seq[CommitMessage]): Seq[CommitMessage] = { + val affectedParts: Set[BinaryRow] = deleteCommit.map(_.partition()).toSet + + val latestSnapshot = table.latestSnapshot() + if (!latestSnapshot.isPresent) { + return deleteCommit + } + + val filter: org.apache.paimon.utils.Filter[IndexManifestEntry] = + (entry: IndexManifestEntry) => { + entry.indexFile().globalIndexMeta() != null && affectedParts.contains(entry.partition()) + } + + val affectedIndexEntries = table + .store() + .newIndexFileHandler() + .scan(latestSnapshot.get(), filter) + .asScala + + if (affectedIndexEntries.isEmpty) { + deleteCommit + } else { + table.coreOptions().globalIndexColumnUpdateAction() match { + case GlobalIndexColumnUpdateAction.THROW_ERROR => + val affectedPartitions = affectedParts.map(_.toString).toSeq.sorted + throw new RuntimeException( + s"""MergeInto: delete rows affect partitions with global indexes, not supported now. + |Affected partitions: ${affectedPartitions.mkString("[", ", ", "]")} + |""".stripMargin) + case GlobalIndexColumnUpdateAction.DROP_PARTITION_INDEX => + val grouped = affectedIndexEntries.groupBy(_.partition()) + val deleteCommitMessages = ArrayBuffer.empty[CommitMessage] + grouped.foreach { + case (part, entries) => + deleteCommitMessages += new CommitMessageImpl( + part, + 0, + null, + DataIncrement.deleteIndexIncrement(entries.map(_.indexFile()).asJava), + CompactIncrement.emptyIncrement()) + } + deleteCommit ++ deleteCommitMessages + } + } + } + private def findRelatedFirstRowIds( dataset: Dataset[Row], sparkSession: SparkSession, @@ -661,6 +843,167 @@ case class MergeIntoPaimonDataEvolutionTable( .collect() } + private def collectRowIdRanges(dataset: Dataset[Row], sparkSession: SparkSession): Seq[Range] = { + import sparkSession.implicits._ + if (dataset.columns.isEmpty) { + return Nil + } + + val rowIdColumn = dataset.columns.head + val ranges = dataset + .select(col(rowIdColumn)) + .distinct() + .sort(col(rowIdColumn)) + .as[Long] + .mapPartitions(MergeIntoPaimonDataEvolutionTable.rowIdsToRanges) + .collect() + .toSeq + .map { case (from, to) => new Range(from, to) } + + Range.sortAndMergeOverlap(ranges.asJava, true).asScala.toSeq + } + + private def normalFirstRowIds(dataSplits: Seq[DataSplit]): immutable.IndexedSeq[Long] = { + dataSplits + .flatMap(_.dataFiles().asScala) + .filter( + file => + file.firstRowId() != null && + !isBlobFile(file.fileName()) && + !isVectorStoreFile(file.fileName())) + .map(file => file.firstRowId().asInstanceOf[Long]) + .distinct + .sorted + .toIndexedSeq + } + + private def dataSplitsAfterDelete( + dataSplits: Seq[DataSplit], + deleteCommit: Seq[CommitMessage]): Seq[DataSplit] = { + val replacements = new mutable.HashMap[String, Seq[DataFileMeta]] + deleteCommit.foreach { + message => + val compactIncrement = message.asInstanceOf[CommitMessageImpl].compactIncrement() + val compactBefore = compactIncrement.compactBefore().asScala + val compactAfter = compactIncrement.compactAfter().asScala + compactBefore.foreach { + before => + replacements.put( + before.fileName(), + compactAfter + .filter( + after => + Range + .intersection(before.nonNullRowIdRange(), after.nonNullRowIdRange()) != null) + .toSeq) + } + } + + dataSplits.map { + split => + val files = split.dataFiles().asScala + val deletionFiles = split.deletionFiles().orElse(null) + val newFiles = ArrayBuffer.empty[DataFileMeta] + val newDeletionFiles = + if (deletionFiles == null) { + null + } else { + new java.util.ArrayList[org.apache.paimon.table.source.DeletionFile]() + } + + files.zipWithIndex.foreach { + case (file, index) => + replacements.get(file.fileName()) match { + case Some(afterFiles) => + afterFiles.foreach { + afterFile => + newFiles += afterFile + if (newDeletionFiles != null) { + newDeletionFiles.add(null) + } + } + case None => + newFiles += file + if (newDeletionFiles != null) { + newDeletionFiles.add(deletionFiles.get(index)) + } + } + } + + val builder = DataSplit + .builder() + .withSnapshot(split.snapshotId()) + .withPartition(split.partition()) + .withBucket(split.bucket()) + .withBucketPath(split.bucketPath()) + .withTotalBuckets(split.totalBuckets()) + .withDataFiles(newFiles.asJava) + .isStreaming(split.isStreaming()) + .rawConvertible(false) + if (newDeletionFiles != null) { + builder.withDataDeletionFiles(newDeletionFiles) + } + builder.build() + } + } + + private def filterDeletedRows(data: Dataset[Row], deleteRanges: Seq[Range]): Dataset[Row] = { + if (deleteRanges.isEmpty) { + data + } else { + val ranges = deleteRanges.map(range => (range.from, range.to)).toArray + val keepRow = udf { + rowId: Long => + var low = 0 + var high = ranges.length - 1 + var found = false + while (low <= high && !found) { + val mid = (low + high) >>> 1 + val (from, to) = ranges(mid) + if (rowId < from) { + high = mid - 1 + } else if (rowId > to) { + low = mid + 1 + } else { + found = true + } + } + !found + } + data.filter(keepRow(col(ROW_ID_NAME))) + } + } + + private def retainedNormalFileRanges( + dataSplits: Seq[DataSplit], + deleteRanges: Seq[Range]): Seq[DataEvolutionPaimonWriter.FileRange] = { + dataSplits.flatMap { + split => + split + .dataFiles() + .asScala + .filter( + file => + file.firstRowId() != null && + !isBlobFile(file.fileName()) && + !isVectorStoreFile(file.fileName())) + .flatMap { + file => + DataEvolutionDeleteRewriter + .retainedRanges(file, deleteRanges.asJava) + .asScala + .map( + range => + DataEvolutionPaimonWriter + .FileRange(split.partition(), range.from, range.count())) + } + } + } + + private def nullAsFalse(condition: Expression): Expression = { + Coalesce(Seq(condition, FalseLiteral)) + } + private def extractFields(expression: Expression): Seq[AttributeReference] = { val fields = new ListBuffer[AttributeReference]() @@ -735,4 +1078,26 @@ object MergeIntoPaimonDataEvolutionTable { } } } + + private[commands] def rowIdsToRanges(rowIds: Iterator[Long]): Iterator[(Long, Long)] = { + val ranges = ArrayBuffer.empty[(Long, Long)] + if (!rowIds.hasNext) { + return ranges.iterator + } + + var start = rowIds.next() + var end = start + while (rowIds.hasNext) { + val rowId = rowIds.next() + if (rowId == end + 1) { + end = rowId + } else { + ranges += ((start, end)) + start = rowId + end = rowId + } + } + ranges += ((start, end)) + ranges.iterator + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala index 9c912e027343..f4622c9c9c57 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala @@ -543,6 +543,37 @@ class BlobTestBase extends PaimonSparkTestBase { } } + test("Blob: merge-into deletes rows from data evolution table") { + withTable("s", "t") { + sql( + "CREATE TABLE t (id INT, picture BINARY) TBLPROPERTIES " + + "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', " + + "'blob-field'='picture')") + sql( + "INSERT INTO t VALUES " + + "(1, X'01'), (2, X'02'), (3, X'03'), (4, X'04'), (5, X'05')") + + sql("CREATE TABLE s (id INT)") + sql("INSERT INTO s VALUES (3)") + + sql(""" + |MERGE INTO t + |USING s + |ON t.id = s.id + |WHEN MATCHED THEN DELETE + |""".stripMargin) + + checkAnswer( + sql("SELECT id, picture FROM t ORDER BY id"), + Seq( + Row(1, Array[Byte](1)), + Row(2, Array[Byte](2)), + Row(4, Array[Byte](4)), + Row(5, Array[Byte](5))) + ) + } + } + test("Blob: merge-into updates descriptor blob column with external storage end-to-end") { withTable("s", "t") { val externalStoragePath = tempDBDir.getCanonicalPath + "/external-storage-blob-merge-path" diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 1da318e2825d..cf18a8372c48 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -649,6 +649,57 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase with AdaptiveSpar } } + test("Data Evolution: merge into table with data-evolution delete") { + withTable("source", "target") { + sql("CREATE TABLE source (id INT, b INT)") + sql("INSERT INTO source VALUES (2, 200), (4, 400), (6, 600)") + + sql( + "CREATE TABLE target (id INT, b INT, c STRING) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + sql( + "INSERT INTO target VALUES (1, 10, 'c1'), (2, 20, 'c2'), (3, 30, 'c3'), (4, 40, 'c4'), (5, 50, 'c5')") + + sql(""" + |MERGE INTO target + |USING source + |ON target.id = source.id + |WHEN MATCHED AND source.id IN (2, 4) THEN DELETE + |WHEN NOT MATCHED THEN INSERT (id, b, c) VALUES (id, b, 'new') + |""".stripMargin) + + checkAnswer( + sql("SELECT id, b, c FROM target ORDER BY id"), + Seq(Row(1, 10, "c1"), Row(3, 30, "c3"), Row(5, 50, "c5"), Row(6, 600, "new")) + ) + } + } + + test("Data Evolution: merge into table with data-evolution update and delete") { + withTable("source", "target") { + sql("CREATE TABLE source (id INT, b INT)") + sql("INSERT INTO source VALUES (2, 200), (3, 300), (4, 400)") + + sql( + "CREATE TABLE target (id INT, b INT, c INT) TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')") + sql( + "INSERT INTO target VALUES (1, 10, 10), (2, 20, 20), (3, 30, 30), (4, 40, 40), (5, 50, 50)") + + sql(""" + |MERGE INTO target + |USING source + |ON target.id = source.id + |WHEN MATCHED AND source.id = 2 THEN UPDATE SET b = source.b + |WHEN MATCHED AND source.id = 3 THEN DELETE + |WHEN MATCHED THEN UPDATE SET c = source.b + |""".stripMargin) + + checkAnswer( + sql("SELECT id, b, c FROM target ORDER BY id"), + Seq(Row(1, 10, 10), Row(2, 200, 20), Row(4, 40, 400), Row(5, 50, 50)) + ) + } + } + Seq(false, true).foreach { filePruning => test(s"Data Evolution: merge into file pruning: $filePruning") {