diff --git a/crates/integrations/datafusion/src/merge_into.rs b/crates/integrations/datafusion/src/merge_into.rs index 9961aea2..79a8a0b0 100644 --- a/crates/integrations/datafusion/src/merge_into.rs +++ b/crates/integrations/datafusion/src/merge_into.rs @@ -37,7 +37,7 @@ use datafusion::sql::sqlparser::ast::{ use futures::TryStreamExt; use paimon::spec::{datums_to_binary_row, extract_datum_from_arrow, CoreOptions}; -use paimon::table::{CopyOnWriteMergeWriter, DataSplitBuilder, Table}; +use paimon::table::{CopyOnWriteMergeWriter, DataSplitBuilder, Table, WriteBuilder}; use crate::error::to_datafusion_error; use crate::sql_context::SQLContext; @@ -361,6 +361,7 @@ async fn execute_cow_merge_once( let mut temp_tracker = TempTableTracker::new(ctx); let (has_target_data, cow_table_name) = register_cow_target_table(ctx, table, &writer, &mut temp_tracker).await?; + let wb = table.new_write_builder(); let merge_ctx = CowMergeContext { source_ref: &source_ref, @@ -376,6 +377,7 @@ async fn execute_cow_merge_once( ctx, &clauses, &mut writer, + &wb, table, &merge_ctx, &mut temp_tracker, @@ -391,8 +393,7 @@ async fn execute_cow_merge_once( all_messages.extend(insert_messages); if !all_messages.is_empty() { - let commit = table.new_write_builder().new_commit(); - commit + wb.new_commit() .commit(all_messages) .await .map_err(to_datafusion_error)?; @@ -418,6 +419,7 @@ async fn execute_cow_merge_inner( ctx: &SQLContext, clauses: &CowMergeClauses, writer: &mut CopyOnWriteMergeWriter, + wb: &WriteBuilder<'_>, table: &Table, merge_ctx: &CowMergeContext<'_>, temp_tracker: &mut TempTableTracker<'_>, @@ -596,10 +598,7 @@ async fn execute_cow_merge_inner( let insert_count: usize = insert_batches.iter().map(|b| b.num_rows()).sum(); if insert_count > 0 { - let mut table_write = table - .new_write_builder() - .new_write() - .map_err(to_datafusion_error)?; + let mut table_write = wb.new_write().map_err(to_datafusion_error)?; for batch in &insert_batches { table_write .write_arrow_batch(batch) diff --git a/crates/integrations/datafusion/tests/common/mod.rs b/crates/integrations/datafusion/tests/common/mod.rs index bcd0cd75..c01201d1 100644 --- a/crates/integrations/datafusion/tests/common/mod.rs +++ b/crates/integrations/datafusion/tests/common/mod.rs @@ -138,6 +138,21 @@ pub async fn dml_count(sql_context: &SQLContext, sql_str: &str) -> u64 { .value(0) } +#[allow(dead_code)] +pub async fn assert_sql_error(sql_context: &SQLContext, sql: &str, expected_substring: &str) { + let err_msg = match sql_context.sql(sql).await { + Ok(df) => match df.collect().await { + Ok(_) => panic!("Expected error containing '{expected_substring}', but got Ok"), + Err(err) => err.to_string(), + }, + Err(err) => err.to_string(), + }; + assert!( + err_msg.contains(expected_substring), + "Error message '{err_msg}' does not contain '{expected_substring}'" + ); +} + /// Collect (i32, i32, String) rows from batches, sorted by (col0, col1). #[allow(dead_code)] pub fn collect_int_int_str(batches: &[RecordBatch]) -> Vec<(i32, i32, String)> { diff --git a/crates/integrations/datafusion/tests/delete_tests.rs b/crates/integrations/datafusion/tests/delete_tests.rs index 0ed3205c..ac9ef95f 100644 --- a/crates/integrations/datafusion/tests/delete_tests.rs +++ b/crates/integrations/datafusion/tests/delete_tests.rs @@ -23,7 +23,9 @@ mod common; use paimon_datafusion::SQLContext; -use common::{create_sql_context, create_test_env, dml_count, exec, query_int_str_int}; +use common::{ + assert_sql_error, create_sql_context, create_test_env, dml_count, exec, query_int_str_int, +}; // ======================= Helpers ======================= @@ -449,3 +451,72 @@ async fn test_delete_multiple_rows_from_single_commit() { assert_eq!(query(&sql_context).await, vec![(1, "a".into(), 10)]); } + +// ======================= Unsupported cases ======================= + +#[tokio::test] +async fn test_delete_rejects_primary_key_table() { + let (tmp, catalog) = create_test_env(); + let sql_context = create_sql_context(catalog).await; + sql_context + .sql("CREATE SCHEMA paimon.test_db") + .await + .unwrap(); + sql_context + .sql( + "CREATE TABLE paimon.test_db.pk_t (\ + id INT NOT NULL, name VARCHAR, PRIMARY KEY (id)\ + ) WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + assert_sql_error( + &sql_context, + "DELETE FROM paimon.test_db.pk_t WHERE id = 1", + "DELETE on primary-key tables is not yet supported", + ) + .await; + drop(tmp); +} + +#[tokio::test] +async fn test_delete_rejects_data_evolution_table() { + let (tmp, catalog) = create_test_env(); + let sql_context = create_sql_context(catalog).await; + sql_context + .sql("CREATE SCHEMA paimon.test_db") + .await + .unwrap(); + sql_context + .sql( + "CREATE TABLE paimon.test_db.de_t (\ + id INT NOT NULL, name VARCHAR\ + ) WITH (\ + 'row-tracking.enabled' = 'true',\ + 'data-evolution.enabled' = 'true'\ + )", + ) + .await + .unwrap(); + + assert_sql_error( + &sql_context, + "DELETE FROM paimon.test_db.de_t WHERE id = 1", + "DELETE on data-evolution tables is not yet supported", + ) + .await; + drop(tmp); +} + +#[tokio::test] +async fn test_delete_rejects_table_alias() { + let (_tmp, sql_context) = setup().await; + + assert_sql_error( + &sql_context, + "DELETE FROM paimon.test_db.t AS target WHERE id = 1", + "Table alias 'target' in DELETE is not yet supported", + ) + .await; +} diff --git a/crates/integrations/datafusion/tests/merge_into_tests.rs b/crates/integrations/datafusion/tests/merge_into_tests.rs index 0969c09f..c6271ed6 100644 --- a/crates/integrations/datafusion/tests/merge_into_tests.rs +++ b/crates/integrations/datafusion/tests/merge_into_tests.rs @@ -1289,3 +1289,36 @@ async fn test_rejects_table_with_primary_keys() { ) .await; } + +#[tokio::test] +async fn test_rejects_primary_key_table_without_data_evolution() { + let (_tmp, catalog) = create_test_env(); + let sql_context = create_sql_context(catalog.clone()).await; + + sql_context + .sql("CREATE SCHEMA paimon.test_db") + .await + .unwrap(); + sql_context + .sql( + "CREATE TABLE paimon.test_db.pk_plain_target (\ + id INT NOT NULL, name STRING, PRIMARY KEY (id)\ + ) WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + register_source( + &sql_context, + "CREATE TEMPORARY TABLE paimon.test_db.src_pk_plain AS SELECT * FROM (VALUES (1, 'ALICE')) AS t(id, name)", + ) + .await; + + assert_merge_error( + &sql_context, + "MERGE INTO paimon.test_db.pk_plain_target t USING paimon.test_db.src_pk_plain s ON t.id = s.id \ + WHEN MATCHED THEN UPDATE SET name = s.name", + "primary-key tables without data-evolution", + ) + .await; +} diff --git a/crates/integrations/datafusion/tests/update_tests.rs b/crates/integrations/datafusion/tests/update_tests.rs index 676e41ca..edc39260 100644 --- a/crates/integrations/datafusion/tests/update_tests.rs +++ b/crates/integrations/datafusion/tests/update_tests.rs @@ -23,7 +23,9 @@ mod common; use paimon_datafusion::SQLContext; -use common::{create_sql_context, create_test_env, dml_count, exec, query_int_str_int}; +use common::{ + assert_sql_error, create_sql_context, create_test_env, dml_count, exec, query_int_str_int, +}; // ======================= Helpers ======================= @@ -475,3 +477,73 @@ async fn test_update_empty_table() { assert_eq!(cnt, 0); drop(tmp); } + +// ======================= Unsupported cases ======================= + +#[tokio::test] +async fn test_update_rejects_primary_key_table_without_data_evolution() { + let (tmp, catalog) = create_test_env(); + let sql_context = create_sql_context(catalog).await; + sql_context + .sql("CREATE SCHEMA paimon.test_db") + .await + .unwrap(); + sql_context + .sql( + "CREATE TABLE paimon.test_db.pk_t (\ + id INT NOT NULL, name VARCHAR, PRIMARY KEY (id)\ + ) WITH ('bucket' = '1')", + ) + .await + .unwrap(); + + assert_sql_error( + &sql_context, + "UPDATE paimon.test_db.pk_t SET name = 'x' WHERE id = 1", + "primary-key tables without data-evolution", + ) + .await; + drop(tmp); +} + +#[tokio::test] +async fn test_update_rejects_primary_key_table_with_data_evolution() { + let (tmp, catalog) = create_test_env(); + let sql_context = create_sql_context(catalog).await; + sql_context + .sql("CREATE SCHEMA paimon.test_db") + .await + .unwrap(); + sql_context + .sql( + "CREATE TABLE paimon.test_db.pk_de_t (\ + id INT NOT NULL, name VARCHAR, PRIMARY KEY (id)\ + ) WITH (\ + 'bucket' = '1',\ + 'row-tracking.enabled' = 'true',\ + 'data-evolution.enabled' = 'true'\ + )", + ) + .await + .unwrap(); + + assert_sql_error( + &sql_context, + "UPDATE paimon.test_db.pk_de_t SET name = 'x' WHERE id = 1", + "does not support primary keys", + ) + .await; + drop(tmp); +} + +#[tokio::test] +async fn test_update_rejects_table_alias() { + let (_tmp, sql_context) = setup().await; + + assert_sql_error( + &sql_context, + "UPDATE paimon.test_db.t AS target SET name = 'x' WHERE id = 1", + "Table alias 'target' in UPDATE is not yet supported", + ) + .await; +} diff --git a/docs/src/sql.md b/docs/src/sql.md index 4a8dc4f7..6705578f 100644 --- a/docs/src/sql.md +++ b/docs/src/sql.md @@ -227,6 +227,21 @@ ALTER TABLE IF EXISTS paimon.my_db.users ADD COLUMN age INT; ## DML +The table type determines which row-level DML operations are supported: + +| Operation | Append-only table | Primary-key table | Data-evolution row-tracking table (no primary key) | +|---|---|---|---| +| `INSERT INTO` | Supported | Supported | Supported | +| `INSERT OVERWRITE` | Supported | Supported | Supported | +| `INSERT OVERWRITE ... PARTITION` | Supported for partitioned tables | Supported for partitioned tables | Supported for partitioned tables | +| `TRUNCATE TABLE` | Supported | Supported | Supported | +| `ALTER TABLE ... DROP PARTITION` | Supported for partitioned tables | Supported for partitioned tables | Supported for partitioned tables | +| `UPDATE` | Supported via Copy-on-Write | Not supported | Supported via row-id update | +| `DELETE` | Supported via Copy-on-Write | Not supported | Not supported | +| `MERGE INTO` | Supported via Copy-on-Write | Not supported | Supported for matched `UPDATE` and not-matched `INSERT`; matched `DELETE` is not supported | + +A data-evolution row-tracking table must have both `'data-evolution.enabled' = 'true'` and `'row-tracking.enabled' = 'true'`, and must not have primary keys. Primary-key row-level `UPDATE`, `DELETE`, and `MERGE INTO` are not supported even when data evolution is enabled. + ### INSERT INTO ```sql @@ -279,7 +294,7 @@ For append-only tables (no primary key), updates are executed using Copy-on-Writ UPDATE paimon.my_db.t SET name = 'a_new' WHERE id = 1; ``` -For primary-key tables, `data-evolution.enabled` must be enabled to perform UPDATE. +For data-evolution row-tracking tables without primary keys, updates are executed with row-id-based partial-column writes. Primary-key tables are not supported for `UPDATE`. ### DELETE @@ -289,6 +304,8 @@ For append-only tables, deletes are executed using Copy-on-Write: DELETE FROM paimon.my_db.t WHERE name = 'b'; ``` +`DELETE` is not supported on primary-key tables or data-evolution tables. + ### MERGE INTO Standard SQL MERGE INTO syntax is supported, allowing INSERT, UPDATE, and DELETE in a single statement: @@ -326,7 +343,7 @@ ON target.id = source.id WHEN MATCHED THEN UPDATE SET name = source.name; ``` -For data-evolution tables, MERGE INTO uses the `_ROW_ID` virtual column for row-level tracking. For append-only tables, it uses Copy-on-Write file rewriting. +For append-only tables, `MERGE INTO` uses Copy-on-Write file rewriting and supports matched `UPDATE`, matched `DELETE`, and not-matched `INSERT`. For data-evolution row-tracking tables without primary keys, `MERGE INTO` uses the `_ROW_ID` virtual column for row-level tracking and supports matched `UPDATE` plus not-matched `INSERT`; matched `DELETE` is not yet supported. Primary-key tables are not supported for `MERGE INTO`. ### TRUNCATE TABLE