Skip to content

Spark 4.1: Add scoped replacement writes (INSERT INTO ... REPLACE USING)#16795

Open
liucao-dd wants to merge 4 commits into
apache:mainfrom
liucao-dd:scoped-replace-spark-4.1
Open

Spark 4.1: Add scoped replacement writes (INSERT INTO ... REPLACE USING)#16795
liucao-dd wants to merge 4 commits into
apache:mainfrom
liucao-dd:scoped-replace-spark-4.1

Conversation

@liucao-dd

@liucao-dd liucao-dd commented Jun 12, 2026

Copy link
Copy Markdown

Part 1 work of #16785.

What

This adds scoped replacement writes to the Iceberg Spark 4.1 extensions:

INSERT INTO target_table
REPLACE USING (scope_col_1, scope_col_2)
<query>

The command treats the source as the complete replacement for the scopes it touches. It reads the distinct REPLACE USING scope tuples from the source, deletes target rows whose scope shows up in the source, appends every source row once, and commits the delete and append as a single Iceberg snapshot. Scopes that the source never mentions are left alone. This is the "full group refresh" operation from #16785: atomic, with no two-commit DELETE + INSERT and no stretched MERGE INTO ... WHEN NOT MATCHED BY SOURCE.

Scope of this PR

This PR covers part of #16785. Two things are left for later on purpose:

  • Only the column-list form (REPLACE USING (cols)) is implemented. The arbitrary-condition form (REPLACE ON <expr>) from the issue is not here yet.
  • There is no native Spark grammar. Spark's insertInto grammar does not accept REPLACE USING, and the Iceberg extension grammar cannot own an arbitrary trailing Spark query. So this PR uses an in-place text split as a stopgap: the Iceberg ANTLR grammar parses only the command head (singleScopedReplaceHead: INSERT INTO t REPLACE USING (cols)), and the remaining query tail goes to the wrapped Spark parser. Detection is anchored to the statement head with literals and comments masked, so a REPLACE USING ( later in the query body (say, a join alias) does not trigger the path. The hack is only here to demonstrate the semantics. The end state I want is to propose REPLACE USING / REPLACE ON for Spark's insertInto grammar and build the logical plan from the native parse tree instead. There is a TODO to that effect in IcebergSparkSqlExtensionsParser.

How it works

  • A new logical node, ReplaceScopedData(table, scopeColumns, source).
  • RewriteScopedReplace lowers it through Iceberg's existing row-level write path. It requests the operation as MERGE so the table configuration decides between copy-on-write and merge-on-read (and deletion vectors):
    • COW → ReplaceData
    • MOR → WriteDelta
  • It reuses the standard row-level group filtering and runtime filtering for performance. Note that there is no
    planning-time static pruning yet.

Tests

  • TestReplaceScopedData covers the shared semantics: scope deletion, full source append, untouched scopes, empty source, duplicate scope keys, null scope values, and multi-column scopes.
  • TestCopyOnWriteReplaceScopedData and TestMergeOnReadReplaceScopedData exercise both write modes.

Docs

  • docs/docs/spark-writes.md documents the supported REPLACE USING form and notes that REPLACE ON and the native grammar are future work.

Follow-ups

  1. Propose native Spark grammar for REPLACE USING / REPLACE ON and remove the text-split workaround.
  2. Implement the REPLACE ON <boolean_expr> form.
  3. When the source's distinct scope values are small/known, pre-evaluate them and synthesize a static scope IN (...) predicate to feed Iceberg's static filter

CTCC1 added 4 commits June 12, 2026 13:17
Add the parser front-end and logical node for scoped-replace, a data-dependent overwrite with Delta replaceUsing semantics: delete every target row whose scope columns match the source, then append all source rows in a single snapshot.

The Iceberg extension grammar is DDL-shaped and cannot tokenize an arbitrary trailing query, so the statement is split at the text level. The head, INSERT INTO t REPLACE USING (cols), is parsed by a new grammar rule, and the trailing query is delegated to Spark's parser. String literals and comments are masked before locating the scope list so the split and routing heuristic cannot be fooled by their contents.

Scoped-replace command detection is anchored to the INSERT head, so ordinary query bodies remain delegated to Spark even when they contain replaceUsing-like text, comments, or a join alias named replace followed by USING.

This commit only produces the ReplaceScopedData logical node; the rewrite that lowers it into the row-level write path is added separately so copy-on-write, merge-on-read, and deletion-vector selection can be reviewed with the write logic.
Add RewriteScopedReplace, a post-hoc resolution rule that lowers the ReplaceScopedData node into Spark's row-level write path, and register it in IcebergSparkSessionExtensions. The command is requested as MERGE so write mode is selected through the same table config path as MERGE.

For copy-on-write tables it emits a ReplaceData whose replacement query is a union of two branches over a CTE-shared source. The carryover branch scans target rows that match no source scope with a left anti join and tags them WRITE_WITH_METADATA so grouping metadata is preserved. The insert branch aligns and casts every source row to the target schema, then tags it WRITE. Sharing the source through a CTE makes a deterministic source inline and a non-deterministic source evaluate once, so the anti join, inserts, and runtime group filter observe the same rows.

For merge-on-read tables it emits a WriteDelta. The delta plan shares the same source CTE, emits one DELETE row for each target row whose scope matches the source with a left semi join, and emits one INSERT row for every source row. This preserves scoped-replace semantics when duplicate source rows share a scope: target rows are deleted once, while all source rows are appended. Iceberg still chooses deletion vectors or position deletes according to table format and write configuration.

Scoped-replace sources are aligned through Spark's table output resolver so store-assignment rules match INSERT semantics. Nondeterministic scoped-replace sources are not used as row-level operation conditions because Spark evaluates those conditions outside the write query and requires them to be deterministic. In that case the rewrite falls back to an unconditional operation, preserving correctness while making the full-table rewrite tradeoff visible in logs.

The row, metadata, and delta projections are built directly because the union top nodes are not introspectable by Spark's framework helpers.
Add Spark extension coverage for INSERT INTO ... REPLACE USING under both copy-on-write and merge-on-read row-level write modes.

The shared suite verifies the core scoped-replace contract: target rows are deleted by null-safe scope equality, non-matching rows are retained, all source rows are appended, and duplicate source scopes delete target rows only once. It also covers new source scopes, mixed matching and new scopes, empty sources, multi-column scopes, null scope values, nondeterministic sources, and Spark store-assignment behavior.

The parser coverage verifies that regular INSERT queries with replaceUsing-like text in string literals, comments, nested block comments, and query bodies remain ordinary INSERT statements. It also verifies that comments in the command head do not prevent scoped-replace routing.

The COW and MOR subclasses pin the write-mode selection through table properties and assert the resulting snapshots use rewritten data files or row deltas respectively, including deletion-vector summaries on v3 tables.
Document the supported REPLACE USING form for Spark writes. The docs describe the scoped replacement semantics, show the column-list SQL shape, and call out that REPLACE ON and native Spark grammar support are future work.
@liucao-dd liucao-dd force-pushed the scoped-replace-spark-4.1 branch from 84b702c to 8f5d729 Compare June 12, 2026 20:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants