Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ public void compactManifests() {
throw new UnsupportedOperationException();
}

@Override
public FormatTableCommit withCommitProperties(Map<String, String> properties) {
throw new UnsupportedOperationException();
}

@Override
public TableCommit withMetricRegistry(MetricRegistry registry) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ public interface BatchTableCommit extends TableCommit {

/** Compact the manifest entries. Generates a snapshot with {@link CommitKind#COMPACT}. */
void compactManifests();

BatchTableCommit withCommitProperties(Map<String, String> properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public interface InnerTableCommit extends StreamTableCommit, BatchTableCommit {

InnerTableCommit rowIdCheckConflict(@Nullable Long rowIdCheckFromSnapshot);

InnerTableCommit withCommitProperties(Map<String, String> properties);

@Override
InnerTableCommit withMetricRegistry(MetricRegistry registry);
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class TableCommitImpl implements InnerTableCommit {
private final ThreadPoolExecutor fileCheckExecutor;

@Nullable private Map<String, String> overwritePartition = null;
@Nullable private Map<String, String> commitProperties = null;
private boolean batchCommitted = false;
private boolean expireForEmptyCommit = true;

Expand Down Expand Up @@ -170,6 +171,12 @@ public TableCommitImpl rowIdCheckConflict(@Nullable Long rowIdCheckFromSnapshot)
return this;
}

@Override
public TableCommitImpl withCommitProperties(Map<String, String> commitProperties) {
this.commitProperties = commitProperties;
return this;
}

@Override
public InnerTableCommit withMetricRegistry(MetricRegistry registry) {
commit.withMetrics(new CommitMetrics(registry, tableName));
Expand Down Expand Up @@ -227,6 +234,9 @@ private ManifestCommittable createManifestCommittable(
for (CommitMessage commitMessage : commitMessages) {
committable.addFileCommittable(commitMessage);
}
if (commitProperties != null) {
commitProperties.forEach(committable::addProperty);
}
return committable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.shim

import org.apache.paimon.spark.SparkCatalog
import org.apache.paimon.spark.catalog.FormatTableCatalog
import org.apache.paimon.spark.write.SnapshotOperation

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan}
Expand Down Expand Up @@ -66,7 +67,8 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) extends Strate
query,
planLater(query),
newProps,
new CaseInsensitiveStringMap(writeOptions.asJava),
new CaseInsensitiveStringMap((writeOptions +
(SnapshotOperation.OPERATION_OPTION -> SnapshotOperation.CREATE_TABLE_AS_SELECT)).asJava),
ifNotExists
) :: Nil
case _ => Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.shim

import org.apache.paimon.spark.SparkCatalog
import org.apache.paimon.spark.catalog.FormatTableCatalog
import org.apache.paimon.spark.write.SnapshotOperation

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.ResolvedDBObjectName
Expand Down Expand Up @@ -69,7 +70,8 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
query,
planLater(query),
qualifiedSpec,
new CaseInsensitiveStringMap(writeOptions.asJava),
new CaseInsensitiveStringMap((writeOptions +
(SnapshotOperation.OPERATION_OPTION -> SnapshotOperation.CREATE_TABLE_AS_SELECT)).asJava),
ifNotExists
) :: Nil
case _ => Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.shim

import org.apache.paimon.spark.SparkCatalog
import org.apache.paimon.spark.catalog.FormatTableCatalog
import org.apache.paimon.spark.write.SnapshotOperation

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
Expand Down Expand Up @@ -71,7 +72,8 @@ case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
analyzedQuery.get,
planLater(query),
qualifiedSpec,
new CaseInsensitiveStringMap(writeOptions.asJava),
new CaseInsensitiveStringMap((writeOptions +
(SnapshotOperation.OPERATION_OPTION -> SnapshotOperation.CREATE_TABLE_AS_SELECT)).asJava),
ifNotExists
) :: Nil
case _ => Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.spark.sql.execution.shim

import org.apache.paimon.spark.catalog.SparkBaseCatalog
import org.apache.paimon.spark.write.SnapshotOperation

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
Expand Down Expand Up @@ -62,7 +63,11 @@ case class PaimonReplaceTableAsSelectStrategy(spark: SparkSession)
val (tableOptions, writeOptions) =
splitTableAndWriteOptions(options)
val qualifiedSpec = qualifyTableSpec(tableSpec, tableOptions)
val writeOpts = new CaseInsensitiveStringMap(writeOptions.asJava)
val operation =
if (orCreate) SnapshotOperation.CREATE_OR_REPLACE_TABLE_AS_SELECT
else SnapshotOperation.REPLACE_TABLE_AS_SELECT
val writeOpts = new CaseInsensitiveStringMap(
(writeOptions + (SnapshotOperation.OPERATION_OPTION -> operation)).asJava)
val pinnedQuery =
pinSnapshotInQuery(catalog, ident, analyzedQuery.get)
if (canAtomicReplace(catalog, ident, qualifiedSpec, parts)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.paimon.spark.catalyst.analysis.PaimonUpdateTable.toColumn
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.util.ScanPlanHelper.createNewScanPlan
import org.apache.paimon.spark.write.SnapshotOperation
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
import org.apache.paimon.table.source.DataSplit
Expand Down Expand Up @@ -229,7 +230,9 @@ case class MergeIntoPaimonDataEvolutionTable(
if (plan.snapshotId() != null) {
writer.rowIdCheckConflict(plan.snapshotId())
}
writer.commit(updateCommit ++ insertCommit)
writer.commit(
updateCommit ++ insertCommit,
SnapshotOperation.asProperties(SnapshotOperation.MERGE))
} finally {
if (persistSourceDss.isDefined) {
persistSourceDss.get.unpersist(blocking = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.paimon.spark.catalyst.analysis.{PaimonRelation, PaimonUpdateAc
import org.apache.paimon.spark.schema.{PaimonMetadataColumn, SparkSystemColumns}
import org.apache.paimon.spark.schema.PaimonMetadataColumn._
import org.apache.paimon.spark.util.{EncoderUtils, SparkRowUtils}
import org.apache.paimon.spark.write.SnapshotOperation
import org.apache.paimon.table.{FileStoreTable, SpecialFields}
import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.types.RowKind
Expand Down Expand Up @@ -78,7 +79,7 @@ case class MergeIntoPaimonTable(
} else {
performMergeForNonPkTable(sparkSession)
}
writer.commit(commitMessages)
writer.commit(commitMessages, SnapshotOperation.asProperties(SnapshotOperation.MERGE))
Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,15 @@ class PaimonBatchWrite(
writeSchema: StructType,
dataSchema: StructType,
overwritePartitions: Option[Map[String, String]],
copyOnWriteScan: Option[PaimonCopyOnWriteScan])
extends PaimonBatchWriteBase(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan)
copyOnWriteScan: Option[PaimonCopyOnWriteScan],
operationType: Option[String] = None)
extends PaimonBatchWriteBase(
table,
writeSchema,
dataSchema,
overwritePartitions,
copyOnWriteScan,
operationType)
with BatchWrite
with Serializable {

Expand All @@ -57,6 +64,13 @@ object PaimonBatchWrite {
writeSchema: StructType,
dataSchema: StructType,
overwritePartitions: Option[Map[String, String]],
copyOnWriteScan: Option[PaimonCopyOnWriteScan]): PaimonBatchWrite =
new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan)
copyOnWriteScan: Option[PaimonCopyOnWriteScan],
operationType: Option[String] = None): PaimonBatchWrite =
new PaimonBatchWrite(
table,
writeSchema,
dataSchema,
overwritePartitions,
copyOnWriteScan,
operationType)
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,15 @@ class Spark4Shim extends SparkShim {
writeSchema: StructType,
dataSchema: StructType,
overwritePartitions: Option[Map[String, String]],
copyOnWriteScan: Option[PaimonCopyOnWriteScan]): BatchWrite =
new PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan)
copyOnWriteScan: Option[PaimonCopyOnWriteScan],
operationType: Option[String]): BatchWrite =
new PaimonBatchWrite(
table,
writeSchema,
dataSchema,
overwritePartitions,
copyOnWriteScan,
operationType)

override def createFormatTableBatchWrite(
table: FormatTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.paimon.spark.commands

import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.spark.write.SnapshotOperation
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.PrimaryKeyTableUtils.validatePKUpsertDeletable
import org.apache.paimon.table.sink.CommitMessage
Expand All @@ -46,7 +47,7 @@ case class DeleteFromPaimonTableCommand(
} else {
performNonPrimaryKeyDelete(sparkSession)
}
writer.commit(commitMessages)
writer.commit(commitMessages, SnapshotOperation.asProperties(SnapshotOperation.DELETE))
Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.paimon.spark.catalyst.analysis.PaimonUpdateTable.toColumn
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.util.ScanPlanHelper.createNewScanPlan
import org.apache.paimon.spark.write.SnapshotOperation
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
import org.apache.paimon.table.source.DataSplit
Expand Down Expand Up @@ -233,7 +234,9 @@ case class MergeIntoPaimonDataEvolutionTable(
if (plan.snapshotId() != null) {
writer.rowIdCheckConflict(plan.snapshotId())
}
writer.commit(updateCommit ++ insertCommit)
writer.commit(
updateCommit ++ insertCommit,
SnapshotOperation.asProperties(SnapshotOperation.MERGE))
} finally {
if (persistSourceDss.isDefined) {
persistSourceDss.get.unpersist(blocking = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.paimon.spark.catalyst.analysis.{PaimonRelation, PaimonUpdateAc
import org.apache.paimon.spark.schema.{PaimonMetadataColumn, SparkSystemColumns}
import org.apache.paimon.spark.schema.PaimonMetadataColumn._
import org.apache.paimon.spark.util.{EncoderUtils, SparkRowUtils}
import org.apache.paimon.spark.write.SnapshotOperation
import org.apache.paimon.table.{FileStoreTable, SpecialFields}
import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.types.RowKind
Expand Down Expand Up @@ -78,7 +79,7 @@ case class MergeIntoPaimonTable(
} else {
performMergeForNonPkTable(sparkSession)
}
writer.commit(commitMessages)
writer.commit(commitMessages, SnapshotOperation.asProperties(SnapshotOperation.MERGE))
Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,10 @@ case class PaimonSparkWriter(
}

def commit(commitMessages: Seq[CommitMessage]): Unit = {
commit(commitMessages, Map.empty[String, String])
}

def commit(commitMessages: Seq[CommitMessage], commitProperties: Map[String, String]): Unit = {
val finalWriteBuilder = if (postponeBatchWriteFixedBucket) {
writeBuilder
.asInstanceOf[BatchWriteBuilderImpl]
Expand All @@ -424,6 +428,9 @@ case class PaimonSparkWriter(
writeBuilder
}
val tableCommit = finalWriteBuilder.newCommit()
if (commitProperties.nonEmpty) {
tableCommit.withCommitProperties(commitProperties.asJava)
}
try {
tableCommit.commit(commitMessages.toList.asJava)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.paimon.spark.commands

import org.apache.paimon.spark.schema.PaimonMetadataColumn.{ROW_ID_COLUMN, SEQUENCE_NUMBER_COLUMN}
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.spark.write.SnapshotOperation
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.table.source.DataSplit
Expand Down Expand Up @@ -52,7 +53,7 @@ case class UpdatePaimonTableCommand(
} else {
performUpdateForNonPkTable(sparkSession)
}
writer.commit(commitMessages)
writer.commit(commitMessages, SnapshotOperation.asProperties(SnapshotOperation.UPDATE))

Seq.empty[Row]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.paimon.options.Options
import org.apache.paimon.spark._
import org.apache.paimon.spark.catalyst.analysis.ReplacePaimonFunctions
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.write.SnapshotOperation
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -60,8 +61,17 @@ case class WriteIntoPaimonTable(
if (overwritePartition != null) {
writer.writeBuilder.withOverwrite(overwritePartition.asJava)
}
// CTAS/RTAS inject the operation via this write option; plain INSERT/OVERWRITE derive it
// from the save mode.
val operation = Option(options.get(SnapshotOperation.OPERATION_OPTION)).getOrElse {
if (overwritePartition != null || dynamicPartitionOverwriteMode) {
SnapshotOperation.OVERWRITE
} else {
SnapshotOperation.WRITE
}
}
val commitMessages = writer.write(replacedData)
writer.commit(commitMessages)
writer.commit(commitMessages, SnapshotOperation.asProperties(operation))

Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class PaimonSparkCopyOnWriteOperation(table: FileStoreTable, info: RowLevelOpera
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
val options = Options.fromMap(info.options)
val builder = new PaimonV2WriteBuilder(table, info.schema(), options)
// command() is DELETE / UPDATE / MERGE, recorded into the snapshot properties.
builder.withOperationType(command().toString)
assert(copyOnWriteScan.isDefined)
builder.overwriteFiles(copyOnWriteScan.get)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.paimon.spark.metric.SparkMetricRegistry
import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
import org.apache.paimon.spark.schema.PaimonMetadataColumn.{FILE_PATH, ROW_ID, SEQUENCE_NUMBER}
import org.apache.paimon.table.{FileStoreTable, SpecialFields}
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, CommitMessageImpl}
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, CommitMessageImpl, InnerTableCommit}

import org.apache.spark.sql.PaimonSparkSession
import org.apache.spark.sql.connector.write.{DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage}
Expand Down Expand Up @@ -55,7 +55,8 @@ abstract class PaimonBatchWriteBase(
val writeSchema: StructType,
val dataSchema: StructType,
val overwritePartitions: Option[Map[String, String]],
val copyOnWriteScan: Option[PaimonCopyOnWriteScan])
val copyOnWriteScan: Option[PaimonCopyOnWriteScan],
operationType: Option[String] = None)
extends WriteHelper
with Serializable {

Expand Down Expand Up @@ -114,6 +115,12 @@ abstract class PaimonBatchWriteBase(
logInfo(s"Committing to table ${table.name()}")
val batchTableCommit = batchWriteBuilder.newCommit()
batchTableCommit.withMetricRegistry(metricRegistry)
// Record the operation type; INSERT/INSERT OVERWRITE have none and fall back to WRITE/OVERWRITE.
val operation = operationType.getOrElse(
if (overwritePartitions.isDefined) SnapshotOperation.OVERWRITE else SnapshotOperation.WRITE)
batchTableCommit
.withCommitProperties(
java.util.Collections.singletonMap(SnapshotOperation.OPERATION_PROPERTY, operation))
val addCommitMessage = WriteTaskResult.merge(messages)
val deletedCommitMessage = copyOnWriteScan match {
case Some(scan) => buildDeletedCommitMessage(scan.scannedFiles)
Expand Down
Loading