Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,12 @@

package org.apache.paimon.spark.sql

import org.apache.spark.SparkConf

class UpdateTableTest extends UpdateTableTestBase {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add super.sparkConf.set("spark.paimon.write.use-v2-write", "false") here


class V2UpdateTableTest extends UpdateTableTestBase {
override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,12 @@

package org.apache.paimon.spark.sql

import org.apache.spark.SparkConf

class UpdateTableTest extends UpdateTableTestBase {}

class V2UpdateTableTest extends UpdateTableTestBase {
override protected def sparkConf: SparkConf = {
super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,14 @@

package org.apache.paimon.spark.catalyst.analysis

import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.catalyst.optimizer.OptimizeMetadataOnlyDeleteFromPaimonTable
import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule

object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {

/** Determines if DataSourceV2 delete is not supported for the given table. */
private def shouldFallbackToV1Delete(table: SparkTable, condition: Expression): Boolean = {
val baseTable = table.getTable
org.apache.spark.SPARK_VERSION < "3.5" ||
!baseTable.isInstanceOf[FileStoreTable] ||
!baseTable.primaryKeys().isEmpty ||
!table.useV2Write ||
table.coreOptions.deletionVectorsEnabled() ||
table.coreOptions.rowTrackingEnabled() ||
table.coreOptions.dataEvolutionEnabled() ||
OptimizeMetadataOnlyDeleteFromPaimonTable.isMetadataOnlyDelete(
baseTable.asInstanceOf[FileStoreTable],
condition)
}

override val operation: RowLevelOp = Delete

override def apply(plan: LogicalPlan): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ object PaimonUpdateTable

override def apply(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperators {
case u @ UpdateTable(PaimonRelation(table), assignments, condition) if u.resolved =>
case u @ UpdateTable(PaimonRelation(table), assignments, condition)
if u.resolved && shouldFallbackToV1Update(table, u) =>
checkPaimonTable(table.getTable)

table.getTable match {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule

object RewritePaimonV2UpdateTable extends Rule[LogicalPlan] with AssignmentAlignmentHelper {

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case u @ UpdateTable(aliasedTable, assignments, _) if u.resolved =>
u.copy(assignments = alignAssignments(aliasedTable.output, assignments))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.paimon.spark.catalyst.analysis

import org.apache.paimon.table.Table
import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.catalyst.optimizer.OptimizeMetadataOnlyDeleteFromPaimonTable
import org.apache.paimon.table.{FileStoreTable, Table}

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.Assignment
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, LogicalPlan, UpdateTable}

trait RowLevelHelper extends SQLConfHelper {

Expand Down Expand Up @@ -73,4 +75,30 @@ trait RowLevelHelper extends SQLConfHelper {
case _ => false
}
}

/** Determines if DataSourceV2 is not supported for the given table. */
protected def shouldFallbackToV1(table: SparkTable): Boolean = {
val baseTable = table.getTable
org.apache.spark.SPARK_VERSION < "3.5" ||
!baseTable.isInstanceOf[FileStoreTable] ||
!baseTable.primaryKeys().isEmpty ||
!table.useV2Write ||
table.coreOptions.deletionVectorsEnabled() ||
table.coreOptions.rowTrackingEnabled() ||
table.coreOptions.dataEvolutionEnabled()
}

/** Determines if DataSourceV2 delete is not supported for the given table. */
protected def shouldFallbackToV1Delete(table: SparkTable, condition: Expression): Boolean = {
shouldFallbackToV1(table) ||
OptimizeMetadataOnlyDeleteFromPaimonTable.isMetadataOnlyDelete(
table.getTable.asInstanceOf[FileStoreTable],
condition)
}

/** Determines if DataSourceV2 update is not supported for the given table. */
protected def shouldFallbackToV1Update(table: SparkTable, updateTable: UpdateTable): Boolean = {
shouldFallbackToV1(table) ||
!updateTable.rewritable
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.spark.extensions

import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTable, PaimonFunctionResolver, PaimonIncompatibleResolutionRules, PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable, PaimonViewResolver, ReplacePaimonFunctions, RewriteUpsertTable}
import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, PaimonDeleteTable, PaimonFunctionResolver, PaimonIncompatibleResolutionRules, PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable, PaimonViewResolver, ReplacePaimonFunctions, RewritePaimonV2UpdateTable, RewriteUpsertTable}
import org.apache.paimon.spark.catalyst.optimizer.{MergePaimonScalarSubqueries, OptimizeMetadataOnlyDeleteFromPaimonTable}
import org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
import org.apache.paimon.spark.commands.BucketExpression
Expand Down Expand Up @@ -51,6 +51,8 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
extensions.injectPostHocResolutionRule(_ => PaimonDeleteTable)
extensions.injectPostHocResolutionRule(spark => PaimonMergeInto(spark))

extensions.injectPostHocResolutionRule(_ => RewritePaimonV2UpdateTable)

// table function extensions
PaimonTableValuedFunctions.supportedFnNames.foreach {
fnName =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ case class PaimonBatchWrite(

private def buildDeletedCommitMessage(
deletedFiles: Seq[SparkDataFileMeta]): Seq[CommitMessage] = {
logInfo(s"[V2 Write] Building deleted commit message for ${deletedFiles.size} files")
deletedFiles
.groupBy(f => (f.partition, f.bucket))
.map {
Expand Down