diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala index 194aab278c0e..33b7861f5c58 100644 --- a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala @@ -18,4 +18,12 @@ package org.apache.paimon.spark.sql +import org.apache.spark.SparkConf + class UpdateTableTest extends UpdateTableTestBase {} + +class V2UpdateTableTest extends UpdateTableTestBase { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.paimon.write.use-v2-write", "true") + } +} diff --git a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala index 194aab278c0e..33b7861f5c58 100644 --- a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala +++ b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala @@ -18,4 +18,12 @@ package org.apache.paimon.spark.sql +import org.apache.spark.SparkConf + class UpdateTableTest extends UpdateTableTestBase {} + +class V2UpdateTableTest extends UpdateTableTestBase { + override protected def sparkConf: SparkConf = { + super.sparkConf.set("spark.paimon.write.use-v2-write", "true") + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala index 6808e64c4550..7c9aaddc243a 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala @@ -18,32 +18,14 @@ package org.apache.paimon.spark.catalyst.analysis -import org.apache.paimon.spark.SparkTable -import org.apache.paimon.spark.catalyst.optimizer.OptimizeMetadataOnlyDeleteFromPaimonTable import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand import org.apache.paimon.table.FileStoreTable -import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper { - /** Determines if DataSourceV2 delete is not supported for the given table. */ - private def shouldFallbackToV1Delete(table: SparkTable, condition: Expression): Boolean = { - val baseTable = table.getTable - org.apache.spark.SPARK_VERSION < "3.5" || - !baseTable.isInstanceOf[FileStoreTable] || - !baseTable.primaryKeys().isEmpty || - !table.useV2Write || - table.coreOptions.deletionVectorsEnabled() || - table.coreOptions.rowTrackingEnabled() || - table.coreOptions.dataEvolutionEnabled() || - OptimizeMetadataOnlyDeleteFromPaimonTable.isMetadataOnlyDelete( - baseTable.asInstanceOf[FileStoreTable], - condition) - } - override val operation: RowLevelOp = Delete override def apply(plan: LogicalPlan): LogicalPlan = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala index 63d19379cc4a..49be4cf9bcf4 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala @@ -36,7 +36,8 @@ object PaimonUpdateTable override def apply(plan: LogicalPlan): LogicalPlan = { plan.resolveOperators { - case u @ UpdateTable(PaimonRelation(table), assignments, condition) if u.resolved => + case u @ UpdateTable(PaimonRelation(table), assignments, condition) + if u.resolved && shouldFallbackToV1Update(table, u) => checkPaimonTable(table.getTable) table.getTable match { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RewritePaimonV2UpdateTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RewritePaimonV2UpdateTable.scala new file mode 100644 index 000000000000..41f63fc51156 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RewritePaimonV2UpdateTable.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule + +object RewritePaimonV2UpdateTable extends Rule[LogicalPlan] with AssignmentAlignmentHelper { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case u @ UpdateTable(aliasedTable, assignments, _) if u.resolved => + u.copy(assignments = alignAssignments(aliasedTable.output, assignments)) + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala index eecf0542e1d1..4fe2650b26e0 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala @@ -18,11 +18,13 @@ package org.apache.paimon.spark.catalyst.analysis -import org.apache.paimon.table.Table +import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.catalyst.optimizer.OptimizeMetadataOnlyDeleteFromPaimonTable +import org.apache.paimon.table.{FileStoreTable, Table} import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression} -import org.apache.spark.sql.catalyst.plans.logical.Assignment +import org.apache.spark.sql.catalyst.plans.logical.{Assignment, LogicalPlan, UpdateTable} trait RowLevelHelper extends SQLConfHelper { @@ -73,4 +75,30 @@ trait RowLevelHelper extends SQLConfHelper { case _ => false } } + + /** Determines if DataSourceV2 is not supported for the given table. */ + protected def shouldFallbackToV1(table: SparkTable): Boolean = { + val baseTable = table.getTable + org.apache.spark.SPARK_VERSION < "3.5" || + !baseTable.isInstanceOf[FileStoreTable] || + !baseTable.primaryKeys().isEmpty || + !table.useV2Write || + table.coreOptions.deletionVectorsEnabled() || + table.coreOptions.rowTrackingEnabled() || + table.coreOptions.dataEvolutionEnabled() + } + + /** Determines if DataSourceV2 delete is not supported for the given table. */ + protected def shouldFallbackToV1Delete(table: SparkTable, condition: Expression): Boolean = { + shouldFallbackToV1(table) || + OptimizeMetadataOnlyDeleteFromPaimonTable.isMetadataOnlyDelete( + table.getTable.asInstanceOf[FileStoreTable], + condition) + } + + /** Determines if DataSourceV2 update is not supported for the given table. */ + protected def shouldFallbackToV1Update(table: SparkTable, updateTable: UpdateTable): Boolean = { + shouldFallbackToV1(table) || + !updateTable.rewritable + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala index 950b5797c7b4..82100da698be 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala @@ -18,7 +18,7 @@ package org.apache.paimon.spark.extensions -import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTable, PaimonFunctionResolver, PaimonIncompatibleResolutionRules, PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable, PaimonViewResolver, ReplacePaimonFunctions, RewriteUpsertTable} +import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTable, PaimonFunctionResolver, PaimonIncompatibleResolutionRules, PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable, PaimonViewResolver, ReplacePaimonFunctions, RewritePaimonV2UpdateTable, RewriteUpsertTable} import org.apache.paimon.spark.catalyst.optimizer.{MergePaimonScalarSubqueries, OptimizeMetadataOnlyDeleteFromPaimonTable} import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions import org.apache.paimon.spark.commands.BucketExpression @@ -51,6 +51,8 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) { extensions.injectPostHocResolutionRule(_ => PaimonDeleteTable) extensions.injectPostHocResolutionRule(spark => PaimonMergeInto(spark)) + extensions.injectPostHocResolutionRule(_ => RewritePaimonV2UpdateTable) + // table function extensions PaimonTableValuedFunctions.supportedFnNames.foreach { fnName => diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala index 589ba1745193..4171a0f5c166 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala @@ -108,6 +108,7 @@ case class PaimonBatchWrite( private def buildDeletedCommitMessage( deletedFiles: Seq[SparkDataFileMeta]): Seq[CommitMessage] = { + logInfo(s"[V2 Write] Building deleted commit message for ${deletedFiles.size} files") deletedFiles .groupBy(f => (f.partition, f.bucket)) .map {