Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions crates/integrations/datafusion/src/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use datafusion::sql::sqlparser::ast::{
use futures::TryStreamExt;

use paimon::spec::{datums_to_binary_row, extract_datum_from_arrow, CoreOptions};
use paimon::table::{CopyOnWriteMergeWriter, DataSplitBuilder, Table};
use paimon::table::{CopyOnWriteMergeWriter, DataSplitBuilder, Table, WriteBuilder};

use crate::error::to_datafusion_error;
use crate::sql_context::SQLContext;
Expand Down Expand Up @@ -361,6 +361,7 @@ async fn execute_cow_merge_once(
let mut temp_tracker = TempTableTracker::new(ctx);
let (has_target_data, cow_table_name) =
register_cow_target_table(ctx, table, &writer, &mut temp_tracker).await?;
let wb = table.new_write_builder();

let merge_ctx = CowMergeContext {
source_ref: &source_ref,
Expand All @@ -376,6 +377,7 @@ async fn execute_cow_merge_once(
ctx,
&clauses,
&mut writer,
&wb,
table,
&merge_ctx,
&mut temp_tracker,
Expand All @@ -391,8 +393,7 @@ async fn execute_cow_merge_once(
all_messages.extend(insert_messages);

if !all_messages.is_empty() {
let commit = table.new_write_builder().new_commit();
commit
wb.new_commit()
.commit(all_messages)
.await
.map_err(to_datafusion_error)?;
Expand All @@ -418,6 +419,7 @@ async fn execute_cow_merge_inner(
ctx: &SQLContext,
clauses: &CowMergeClauses,
writer: &mut CopyOnWriteMergeWriter,
wb: &WriteBuilder<'_>,
table: &Table,
merge_ctx: &CowMergeContext<'_>,
temp_tracker: &mut TempTableTracker<'_>,
Expand Down Expand Up @@ -596,10 +598,7 @@ async fn execute_cow_merge_inner(

let insert_count: usize = insert_batches.iter().map(|b| b.num_rows()).sum();
if insert_count > 0 {
let mut table_write = table
.new_write_builder()
.new_write()
.map_err(to_datafusion_error)?;
let mut table_write = wb.new_write().map_err(to_datafusion_error)?;
for batch in &insert_batches {
table_write
.write_arrow_batch(batch)
Expand Down
15 changes: 15 additions & 0 deletions crates/integrations/datafusion/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,21 @@ pub async fn dml_count(sql_context: &SQLContext, sql_str: &str) -> u64 {
.value(0)
}

#[allow(dead_code)]
pub async fn assert_sql_error(sql_context: &SQLContext, sql: &str, expected_substring: &str) {
let err_msg = match sql_context.sql(sql).await {
Ok(df) => match df.collect().await {
Ok(_) => panic!("Expected error containing '{expected_substring}', but got Ok"),
Err(err) => err.to_string(),
},
Err(err) => err.to_string(),
};
assert!(
err_msg.contains(expected_substring),
"Error message '{err_msg}' does not contain '{expected_substring}'"
);
}

/// Collect (i32, i32, String) rows from batches, sorted by (col0, col1).
#[allow(dead_code)]
pub fn collect_int_int_str(batches: &[RecordBatch]) -> Vec<(i32, i32, String)> {
Expand Down
73 changes: 72 additions & 1 deletion crates/integrations/datafusion/tests/delete_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ mod common;

use paimon_datafusion::SQLContext;

use common::{create_sql_context, create_test_env, dml_count, exec, query_int_str_int};
use common::{
assert_sql_error, create_sql_context, create_test_env, dml_count, exec, query_int_str_int,
};

// ======================= Helpers =======================

Expand Down Expand Up @@ -449,3 +451,72 @@ async fn test_delete_multiple_rows_from_single_commit() {

assert_eq!(query(&sql_context).await, vec![(1, "a".into(), 10)]);
}

// ======================= Unsupported cases =======================

#[tokio::test]
async fn test_delete_rejects_primary_key_table() {
let (tmp, catalog) = create_test_env();
let sql_context = create_sql_context(catalog).await;
sql_context
.sql("CREATE SCHEMA paimon.test_db")
.await
.unwrap();
sql_context
.sql(
"CREATE TABLE paimon.test_db.pk_t (\
id INT NOT NULL, name VARCHAR, PRIMARY KEY (id)\
) WITH ('bucket' = '1')",
)
.await
.unwrap();

assert_sql_error(
&sql_context,
"DELETE FROM paimon.test_db.pk_t WHERE id = 1",
"DELETE on primary-key tables is not yet supported",
)
.await;
drop(tmp);
}

#[tokio::test]
async fn test_delete_rejects_data_evolution_table() {
let (tmp, catalog) = create_test_env();
let sql_context = create_sql_context(catalog).await;
sql_context
.sql("CREATE SCHEMA paimon.test_db")
.await
.unwrap();
sql_context
.sql(
"CREATE TABLE paimon.test_db.de_t (\
id INT NOT NULL, name VARCHAR\
) WITH (\
'row-tracking.enabled' = 'true',\
'data-evolution.enabled' = 'true'\
)",
)
.await
.unwrap();

assert_sql_error(
&sql_context,
"DELETE FROM paimon.test_db.de_t WHERE id = 1",
"DELETE on data-evolution tables is not yet supported",
)
.await;
drop(tmp);
}

#[tokio::test]
async fn test_delete_rejects_table_alias() {
let (_tmp, sql_context) = setup().await;

assert_sql_error(
&sql_context,
"DELETE FROM paimon.test_db.t AS target WHERE id = 1",
"Table alias 'target' in DELETE is not yet supported",
)
.await;
}
33 changes: 33 additions & 0 deletions crates/integrations/datafusion/tests/merge_into_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1289,3 +1289,36 @@ async fn test_rejects_table_with_primary_keys() {
)
.await;
}

#[tokio::test]
async fn test_rejects_primary_key_table_without_data_evolution() {
let (_tmp, catalog) = create_test_env();
let sql_context = create_sql_context(catalog.clone()).await;

sql_context
.sql("CREATE SCHEMA paimon.test_db")
.await
.unwrap();
sql_context
.sql(
"CREATE TABLE paimon.test_db.pk_plain_target (\
id INT NOT NULL, name STRING, PRIMARY KEY (id)\
) WITH ('bucket' = '1')",
)
.await
.unwrap();

register_source(
&sql_context,
"CREATE TEMPORARY TABLE paimon.test_db.src_pk_plain AS SELECT * FROM (VALUES (1, 'ALICE')) AS t(id, name)",
)
.await;

assert_merge_error(
&sql_context,
"MERGE INTO paimon.test_db.pk_plain_target t USING paimon.test_db.src_pk_plain s ON t.id = s.id \
WHEN MATCHED THEN UPDATE SET name = s.name",
"primary-key tables without data-evolution",
)
.await;
}
74 changes: 73 additions & 1 deletion crates/integrations/datafusion/tests/update_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ mod common;

use paimon_datafusion::SQLContext;

use common::{create_sql_context, create_test_env, dml_count, exec, query_int_str_int};
use common::{
assert_sql_error, create_sql_context, create_test_env, dml_count, exec, query_int_str_int,
};

// ======================= Helpers =======================

Expand Down Expand Up @@ -475,3 +477,73 @@ async fn test_update_empty_table() {
assert_eq!(cnt, 0);
drop(tmp);
}

// ======================= Unsupported cases =======================

#[tokio::test]
async fn test_update_rejects_primary_key_table_without_data_evolution() {
let (tmp, catalog) = create_test_env();
let sql_context = create_sql_context(catalog).await;
sql_context
.sql("CREATE SCHEMA paimon.test_db")
.await
.unwrap();
sql_context
.sql(
"CREATE TABLE paimon.test_db.pk_t (\
id INT NOT NULL, name VARCHAR, PRIMARY KEY (id)\
) WITH ('bucket' = '1')",
)
.await
.unwrap();

assert_sql_error(
&sql_context,
"UPDATE paimon.test_db.pk_t SET name = 'x' WHERE id = 1",
"primary-key tables without data-evolution",
)
.await;
drop(tmp);
}

#[tokio::test]
async fn test_update_rejects_primary_key_table_with_data_evolution() {
let (tmp, catalog) = create_test_env();
let sql_context = create_sql_context(catalog).await;
sql_context
.sql("CREATE SCHEMA paimon.test_db")
.await
.unwrap();
sql_context
.sql(
"CREATE TABLE paimon.test_db.pk_de_t (\
id INT NOT NULL, name VARCHAR, PRIMARY KEY (id)\
) WITH (\
'bucket' = '1',\
'row-tracking.enabled' = 'true',\
'data-evolution.enabled' = 'true'\
)",
)
.await
.unwrap();

assert_sql_error(
&sql_context,
"UPDATE paimon.test_db.pk_de_t SET name = 'x' WHERE id = 1",
"does not support primary keys",
)
.await;
drop(tmp);
}

#[tokio::test]
async fn test_update_rejects_table_alias() {
let (_tmp, sql_context) = setup().await;

assert_sql_error(
&sql_context,
"UPDATE paimon.test_db.t AS target SET name = 'x' WHERE id = 1",
"Table alias 'target' in UPDATE is not yet supported",
)
.await;
}
21 changes: 19 additions & 2 deletions docs/src/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,21 @@ ALTER TABLE IF EXISTS paimon.my_db.users ADD COLUMN age INT;

## DML

The table type determines which row-level DML operations are supported:

| Operation | Append-only table | Primary-key table | Data-evolution row-tracking table (no primary key) |
|---|---|---|---|
| `INSERT INTO` | Supported | Supported | Supported |
| `INSERT OVERWRITE` | Supported | Supported | Supported |
| `INSERT OVERWRITE ... PARTITION` | Supported for partitioned tables | Supported for partitioned tables | Supported for partitioned tables |
| `TRUNCATE TABLE` | Supported | Supported | Supported |
| `ALTER TABLE ... DROP PARTITION` | Supported for partitioned tables | Supported for partitioned tables | Supported for partitioned tables |
| `UPDATE` | Supported via Copy-on-Write | Not supported | Supported via row-id update |
| `DELETE` | Supported via Copy-on-Write | Not supported | Not supported |
| `MERGE INTO` | Supported via Copy-on-Write | Not supported | Supported for matched `UPDATE` and not-matched `INSERT`; matched `DELETE` is not supported |

A data-evolution row-tracking table must have both `'data-evolution.enabled' = 'true'` and `'row-tracking.enabled' = 'true'`, and must not have primary keys. Primary-key row-level `UPDATE`, `DELETE`, and `MERGE INTO` are not supported even when data evolution is enabled.

### INSERT INTO

```sql
Expand Down Expand Up @@ -279,7 +294,7 @@ For append-only tables (no primary key), updates are executed using Copy-on-Writ
UPDATE paimon.my_db.t SET name = 'a_new' WHERE id = 1;
```

For primary-key tables, `data-evolution.enabled` must be enabled to perform UPDATE.
For data-evolution row-tracking tables without primary keys, updates are executed with row-id-based partial-column writes. Primary-key tables are not supported for `UPDATE`.

### DELETE

Expand All @@ -289,6 +304,8 @@ For append-only tables, deletes are executed using Copy-on-Write:
DELETE FROM paimon.my_db.t WHERE name = 'b';
```

`DELETE` is not supported on primary-key tables or data-evolution tables.

### MERGE INTO

Standard SQL MERGE INTO syntax is supported, allowing INSERT, UPDATE, and DELETE in a single statement:
Expand Down Expand Up @@ -326,7 +343,7 @@ ON target.id = source.id
WHEN MATCHED THEN UPDATE SET name = source.name;
```

For data-evolution tables, MERGE INTO uses the `_ROW_ID` virtual column for row-level tracking. For append-only tables, it uses Copy-on-Write file rewriting.
For append-only tables, `MERGE INTO` uses Copy-on-Write file rewriting and supports matched `UPDATE`, matched `DELETE`, and not-matched `INSERT`. For data-evolution row-tracking tables without primary keys, `MERGE INTO` uses the `_ROW_ID` virtual column for row-level tracking and supports matched `UPDATE` plus not-matched `INSERT`; matched `DELETE` is not yet supported. Primary-key tables are not supported for `MERGE INTO`.

### TRUNCATE TABLE

Expand Down
Loading