Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion docs/docs/spark-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Iceberg uses Apache Spark's DataSourceV2 API for data source and catalog impleme
| Feature support | Spark | Notes |
|--------------------------------------------------|---------|-----------------------------------------------------------------------------|
| [SQL insert into](#insert-into) | ✔️ | ⚠ Requires `spark.sql.storeAssignmentPolicy=ANSI` (default since Spark 3.0) |
| [SQL scoped replace](#insert-into--replace-using) | ✔️ | ⚠ Requires Iceberg Spark extensions and Spark 4.1 or higher |
| [SQL merge into](#merge-into) | ✔️ | ⚠ Requires Iceberg Spark extensions |
| [SQL insert overwrite](#insert-overwrite) | ✔️ | ⚠ Requires `spark.sql.storeAssignmentPolicy=ANSI` (default since Spark 3.0) |
| [SQL delete from](#delete-from) | ✔️ | ⚠ Row-level delete requires Iceberg Spark extensions |
Expand All @@ -40,7 +41,7 @@ Iceberg uses Apache Spark's DataSourceV2 API for data source and catalog impleme

## Writing with SQL

Spark supports SQL `INSERT INTO`, `MERGE INTO`, and `INSERT OVERWRITE`, as well as the new `DataFrameWriterV2` API.
Spark supports SQL `INSERT INTO`, `INSERT INTO ... REPLACE USING`, `MERGE INTO`, and `INSERT OVERWRITE`, as well as the new `DataFrameWriterV2` API.

### `INSERT INTO`

Expand All @@ -53,6 +54,31 @@ INSERT INTO prod.db.table VALUES (1, 'a'), (2, 'b')
INSERT INTO prod.db.table SELECT ...
```

### `INSERT INTO ... REPLACE USING`

Iceberg supports scoped replacement writes with Iceberg Spark extensions in Spark 4.1 and higher. A scoped replace deletes existing target rows whose replacement scope appears in the source query, then inserts all rows from the source query in the same commit.

```sql
INSERT INTO prod.db.table
REPLACE USING (scope_col_1, scope_col_2)
SELECT ...
```

The columns listed in `REPLACE USING` define the replacement scope. For each distinct tuple of scope values produced by the source query, matching target rows are removed using null-safe equality, and the full source query output is appended.

For example, this query replaces all existing rows for categories present in `prod.db.staged_rows` and keeps rows for other categories:

```sql
INSERT INTO prod.db.sample
REPLACE USING (category)
SELECT id, data, category, ts
FROM prod.db.staged_rows
```

`REPLACE USING` is useful when incoming data contains complete replacement slices for one or more logical groups, such as tenants, departments, dates, or regions. Unlike `INSERT OVERWRITE`, the replacement scope is based on table columns in the source data and does not depend on the table's partition spec.

The source query must produce the same number of columns as the target table, using the same assignment rules as `INSERT INTO`. Each `REPLACE USING` column must exist in both the target table and the source query output.

### `MERGE INTO`

Spark supports `MERGE INTO` queries that can express row-level updates.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ singleOrder
: order EOF
;

// Parses only the command head of `INSERT INTO t REPLACE USING (cols) <query>`.
// The query tail remains Spark SQL and is delegated to Spark's parser until this syntax
// can be represented directly in Spark's `insertInto` grammar.
singleScopedReplaceHead
: INSERT INTO TABLE? multipartIdentifier REPLACE USING '(' fieldList ')' EOF
;

order
: fields+=orderField (',' fields+=orderField)*
| '(' fields+=orderField (',' fields+=orderField)* ')'
Expand Down Expand Up @@ -211,6 +218,7 @@ nonReserved
| DISTRIBUTED | LOCALLY | MINUTES | MONTHS | UNORDERED | REPLACE | RETAIN | VERSION | WITH | IDENTIFIER_KW | FIELDS | SET | SNAPSHOT | SNAPSHOTS
| TAG | TRUE | FALSE
| MAP
| INSERT | INTO | USING
;

snapshotId
Expand Down Expand Up @@ -243,6 +251,8 @@ FIELDS: 'FIELDS';
FIRST: 'FIRST';
HOURS: 'HOURS';
IF : 'IF';
INSERT: 'INSERT';
INTO: 'INTO';
LAST: 'LAST';
LOCALLY: 'LOCALLY';
MINUTES: 'MINUTES';
Expand All @@ -264,6 +274,7 @@ SNAPSHOTS: 'SNAPSHOTS';
TABLE: 'TABLE';
TAG: 'TAG';
UNORDERED: 'UNORDERED';
USING: 'USING';
VERSION: 'VERSION';
WITH: 'WITH';
WRITE: 'WRITE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.analysis.CheckViews
import org.apache.spark.sql.catalyst.analysis.ResolveBranch
import org.apache.spark.sql.catalyst.analysis.ResolveViews
import org.apache.spark.sql.catalyst.analysis.RewriteScopedReplace
import org.apache.spark.sql.catalyst.optimizer.ReplaceStaticInvoke
import org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser
import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy
Expand All @@ -35,6 +36,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
// analyzer extensions
extensions.injectResolutionRule { spark => ResolveViews(spark) }
extensions.injectPostHocResolutionRule { spark => ResolveBranch(spark) }
extensions.injectPostHocResolutionRule { _ => RewriteScopedReplace }
extensions.injectCheckRule(_ => CheckViews)

// optimizer extensions
Expand Down
Loading