Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions crates/integrations/datafusion/tests/pk_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,69 @@ async fn test_pk_partial_update_fixed_bucket_e2e() {
);
}

/// Partial updates of one key within a single INSERT are merged at flush
/// (mirrors Java MergeTreeWriter#flushWriteBuffer): the flushed file holds
/// one row per key, so SELECT and COUNT(*) agree.
#[tokio::test]
async fn test_pk_partial_update_merges_within_single_commit() {
let (_tmp, sql_context) = setup_sql_context().await;

sql_context
.sql(
"CREATE TABLE paimon.test_db.t_pu_flush_merge (
id INT NOT NULL, v_int INT, v_str STRING,
PRIMARY KEY (id)
) WITH ('bucket' = '1', 'merge-engine' = 'partial-update')",
)
.await
.unwrap();

// Three partial updates of key 1 plus key 2 in ONE commit: the writer
// must merge key 1 down to a single physical row.
sql_context
.sql(
"INSERT INTO paimon.test_db.t_pu_flush_merge VALUES
(1, 10, CAST(NULL AS STRING)),
(1, CAST(NULL AS INT), 'hello'),
(1, 100, CAST(NULL AS STRING)),
(2, 200, 'world')",
)
.await
.unwrap()
.collect()
.await
.unwrap();

// Field-wise merge result: latest non-null per column.
let batches = sql_context
.sql("SELECT id, v_int, v_str FROM paimon.test_db.t_pu_flush_merge ORDER BY id")
.await
.unwrap()
.collect()
.await
.unwrap();
assert_eq!(
collect_int_int_str(&batches),
vec![(1, 100, "hello".to_string()), (2, 200, "world".to_string())]
);

// COUNT(*) must agree with SELECT: physical rows now equal merged rows.
let batches = sql_context
.sql("SELECT COUNT(*) FROM paimon.test_db.t_pu_flush_merge")
.await
.unwrap()
.collect()
.await
.unwrap();
let count = batches[0]
.column(0)
.as_any()
.downcast_ref::<datafusion::arrow::array::Int64Array>()
.unwrap()
.value(0);
assert_eq!(count, 2, "COUNT(*) must count merged rows");
}

// ======================= Dedup Within Single Commit =======================

/// Duplicate keys in a single INSERT — last value wins (Deduplicate engine).
Expand Down
5 changes: 3 additions & 2 deletions crates/paimon/src/table/kv_file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,9 @@ impl KeyValueFileReader {
continue;
}

// Always go through sort-merge even for single file,
// because a single file may contain duplicate keys.
// Always go through sort-merge even for a single file: files
// written before the writer merged key groups at flush may
// still contain duplicate keys.
let mut merge_stream = SortMergeReaderBuilder::new(
file_streams,
internal_schema.clone(),
Expand Down
Loading
Loading