From 7db81df10f1f8e8ac162567507219e7be73ca501 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 12 Jun 2026 00:48:51 +0800 Subject: [PATCH 1/2] feat(table/write): merge partial-update rows at flush MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The partial-update writer kept every row of a key in the flushed file, deferring all merging to the read side. Java's MergeTreeWriter runs the merge function over the write buffer before flushing, so a data file never holds two rows of one key — an invariant split planning and statistics can rely on (a file's physical row count equals its logical row count). Mirror that: at flush, group sorted rows by primary key and emit one row per group with the same semantics as the read-side PartialUpdateMergeFunction — every column keeps its latest non-null value ordered by (sequence fields, system sequence), the merged row carries the group's highest sequence number, and retract rows are rejected. Changelog files (input producer) still record the pre-merge rows, matching Java's rawConsumer. Cross-commit merging is unchanged: files from different commits still overlap on key range and go through the sort-merge reader. --- .../datafusion/tests/pk_tables.rs | 63 ++++ crates/paimon/src/table/kv_file_reader.rs | 5 +- crates/paimon/src/table/kv_file_writer.rs | 303 +++++++++++++++--- 3 files changed, 329 insertions(+), 42 deletions(-) diff --git a/crates/integrations/datafusion/tests/pk_tables.rs b/crates/integrations/datafusion/tests/pk_tables.rs index 71058558..deed4887 100644 --- a/crates/integrations/datafusion/tests/pk_tables.rs +++ b/crates/integrations/datafusion/tests/pk_tables.rs @@ -177,6 +177,69 @@ async fn test_pk_partial_update_fixed_bucket_e2e() { ); } +/// Partial updates of one key within a single INSERT are merged at flush +/// (mirrors Java MergeTreeWriter#flushWriteBuffer): the flushed file holds +/// one row per key, so SELECT and COUNT(*) agree. +#[tokio::test] +async fn test_pk_partial_update_merges_within_single_commit() { + let (_tmp, sql_context) = setup_sql_context().await; + + sql_context + .sql( + "CREATE TABLE paimon.test_db.t_pu_flush_merge ( + id INT NOT NULL, v_int INT, v_str STRING, + PRIMARY KEY (id) + ) WITH ('bucket' = '1', 'merge-engine' = 'partial-update')", + ) + .await + .unwrap(); + + // Three partial updates of key 1 plus key 2 in ONE commit: the writer + // must merge key 1 down to a single physical row. + sql_context + .sql( + "INSERT INTO paimon.test_db.t_pu_flush_merge VALUES + (1, 10, CAST(NULL AS STRING)), + (1, CAST(NULL AS INT), 'hello'), + (1, 100, CAST(NULL AS STRING)), + (2, 200, 'world')", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + // Field-wise merge result: latest non-null per column. + let batches = sql_context + .sql("SELECT id, v_int, v_str FROM paimon.test_db.t_pu_flush_merge ORDER BY id") + .await + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!( + collect_int_int_str(&batches), + vec![(1, 100, "hello".to_string()), (2, 200, "world".to_string())] + ); + + // COUNT(*) must agree with SELECT: physical rows now equal merged rows. + let batches = sql_context + .sql("SELECT COUNT(*) FROM paimon.test_db.t_pu_flush_merge") + .await + .unwrap() + .collect() + .await + .unwrap(); + let count = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(0); + assert_eq!(count, 2, "COUNT(*) must count merged rows"); +} + // ======================= Dedup Within Single Commit ======================= /// Duplicate keys in a single INSERT — last value wins (Deduplicate engine). diff --git a/crates/paimon/src/table/kv_file_reader.rs b/crates/paimon/src/table/kv_file_reader.rs index 154d4b80..6d3e4e5a 100644 --- a/crates/paimon/src/table/kv_file_reader.rs +++ b/crates/paimon/src/table/kv_file_reader.rs @@ -330,8 +330,9 @@ impl KeyValueFileReader { continue; } - // Always go through sort-merge even for single file, - // because a single file may contain duplicate keys. + // Always go through sort-merge even for a single file: files + // written before the writer merged key groups at flush may + // still contain duplicate keys. let mut merge_stream = SortMergeReaderBuilder::new( file_streams, internal_schema.clone(), diff --git a/crates/paimon/src/table/kv_file_writer.rs b/crates/paimon/src/table/kv_file_writer.rs index 6b882f45..f3a96b2d 100644 --- a/crates/paimon/src/table/kv_file_writer.rs +++ b/crates/paimon/src/table/kv_file_writer.rs @@ -236,14 +236,33 @@ impl KeyValueFileWriter { source: None, })?; - // After sorting by PK + seq fields + auto-seq (all ascending): + // After sorting by PK + seq fields + auto-seq (all ascending), merge + // each key group down to one row, mirroring Java's + // MergeTreeWriter#flushWriteBuffer (the write buffer runs the merge + // function before any file is written, so a flushed file never holds + // two rows of one key): // Deduplicate → keep last row per key group (highest seq) // FirstRow → keep first row per key group (lowest seq) - // PartialUpdate → keep all rows for read-side field-wise merge - let selected_indices = self.select_flush_indices(&combined, &sorted_indices)?; - let selected_u32 = UInt32Array::from(selected_indices); + // PartialUpdate → per column, keep the latest non-null value + let (data_batch, data_seq, data_indices) = match self.config.merge_engine { + MergeEngine::PartialUpdate => { + let (merged, merged_seq) = + self.merge_partial_update_rows(&combined, seq_array.as_ref(), &sorted_indices)?; + let identity = + UInt32Array::from_iter_values(0..u32::try_from(merged.num_rows()).unwrap()); + (merged, merged_seq, identity) + } + MergeEngine::Deduplicate | MergeEngine::FirstRow => { + let selected = self.select_flush_indices(&combined, &sorted_indices)?; + ( + combined.clone(), + seq_array.clone(), + UInt32Array::from(selected), + ) + } + }; - let data_delete_row_count = Self::indexed_delete_row_count(&combined, &selected_u32)?; + let data_delete_row_count = Self::indexed_delete_row_count(&data_batch, &data_indices)?; let changelog_delete_row_count = if self.config.input_changelog { Some(Self::indexed_delete_row_count(&combined, &sorted_indices)?) } else { @@ -252,9 +271,9 @@ impl KeyValueFileWriter { let data_file = self .write_indexed_file( - &combined, - seq_array.as_ref(), - &selected_u32, + &data_batch, + data_seq.as_ref(), + &data_indices, IndexedFileWrite { file_prefix: "data-", file_ordinal: self.written_files.len(), @@ -512,29 +531,131 @@ impl KeyValueFileWriter { MergeEngine::Deduplicate | MergeEngine::FirstRow => { self.dedup_sorted_indices(batch, sorted_indices) } - // Aggregation, like PartialUpdate, keeps every row on flush and - // performs the per-field merge on the read side. - MergeEngine::PartialUpdate | MergeEngine::Aggregation => Ok((0..sorted_indices.len()) + MergeEngine::PartialUpdate => { + unreachable!("partial-update merges rows at flush via merge_partial_update_rows") + } + // Aggregation keeps every row on flush and performs the per-field + // merge on the read side. + MergeEngine::Aggregation => Ok((0..sorted_indices.len()) .map(|idx| sorted_indices.value(idx)) .collect()), } } - /// Deduplicate sorted indices by primary key for Deduplicate / FirstRow engines. + /// Merge same-key rows at flush for the partial-update engine, mirroring + /// Java `MergeTreeWriter#flushWriteBuffer` (the write buffer applies the + /// merge function before any file is written) with the same semantics as + /// the read-side `PartialUpdateMergeFunction`: rows are visited in + /// ascending (sequence fields, auto-seq) order and every column keeps its + /// latest non-null value; a column that is null in every row stays null. + /// DELETE / UPDATE_BEFORE rows are rejected, matching the read side. /// - /// Input: `sorted_indices` ordered by PK + seq fields + auto-seq (all ascending). - /// Output: a Vec of original row indices to keep, in sorted PK order. - fn dedup_sorted_indices( + /// Returns the merged batch (user schema, in primary-key order) and its + /// `_SEQUENCE_NUMBER` column; each merged row keeps the highest sequence + /// number of its key group, so cross-file merge ordering is preserved. + fn merge_partial_update_rows( &self, batch: &RecordBatch, + seq_array: &dyn Array, sorted_indices: &arrow_array::UInt32Array, - ) -> Result> { + ) -> Result<(RecordBatch, Arc)> { + // Reject retract rows up front, mirroring the read-side error. + let vk_idx = batch + .schema() + .fields() + .iter() + .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME); + if let Some(vk_idx) = vk_idx { + let kinds = batch + .column(vk_idx) + .as_any() + .downcast_ref::() + .ok_or_else(|| crate::Error::DataInvalid { + message: "_VALUE_KIND column must be Int8".to_string(), + source: None, + })?; + for row in 0..kinds.len() { + if !RowKind::from_value(kinds.value(row))?.is_add() { + return Err(crate::Error::Unsupported { + message: "merge-engine=partial-update basic mode does not support DELETE or UPDATE_BEFORE rows".to_string(), + }); + } + } + } + + let key_rows = self.convert_key_rows(batch)?; + let n = sorted_indices.len(); - if n == 0 { - return Ok(vec![]); + let num_cols = batch.num_columns(); + // Per output column: the source row chosen for each key group. + let mut col_indices: Vec> = vec![Vec::new(); num_cols]; + // Per key group: the last (highest-sequence) source row, for `_SEQUENCE_NUMBER`. + let mut last_indices: Vec = Vec::new(); + + let mut group_start = 0; + while group_start < n { + let mut group_end = group_start + 1; + let first = sorted_indices.value(group_start) as usize; + while group_end < n + && key_rows.row(sorted_indices.value(group_end) as usize) == key_rows.row(first) + { + group_end += 1; + } + + let last = sorted_indices.value(group_end - 1); + last_indices.push(last); + for (col_idx, chosen_per_group) in col_indices.iter_mut().enumerate() { + let column = batch.column(col_idx); + // Latest non-null wins; an all-null group keeps the (null) + // value of the last row. + let mut chosen = last; + for pos in (group_start..group_end).rev() { + let row = sorted_indices.value(pos); + if column.is_valid(row as usize) { + chosen = row; + break; + } + } + chosen_per_group.push(chosen); + } + + group_start = group_end; } - // Convert PK columns to arrow-row Rows for efficient comparison. + let merged_columns: Vec> = col_indices + .iter() + .enumerate() + .map(|(col_idx, indices)| { + arrow_select::take::take( + batch.column(col_idx).as_ref(), + &UInt32Array::from(indices.clone()), + None, + ) + .map_err(|e| crate::Error::DataInvalid { + message: format!("Failed to take merged partial-update column: {e}"), + source: None, + }) + }) + .collect::>>()?; + let merged = RecordBatch::try_new(batch.schema(), merged_columns).map_err(|e| { + crate::Error::DataInvalid { + message: format!("Failed to build merged partial-update batch: {e}"), + source: None, + } + })?; + let merged_seq = + arrow_select::take::take(seq_array, &UInt32Array::from(last_indices), None).map_err( + |e| crate::Error::DataInvalid { + message: format!("Failed to take merged sequence numbers: {e}"), + source: None, + }, + )?; + Ok((merged, merged_seq)) + } + + /// Convert the primary-key columns into arrow-row `Rows` so same-key rows + /// can be compared cheaply. + fn convert_key_rows(&self, batch: &RecordBatch) -> Result { let sort_fields: Vec = self .config .primary_key_indices @@ -543,7 +664,7 @@ impl KeyValueFileWriter { .collect(); let converter = RowConverter::new(sort_fields).map_err(|e| crate::Error::UnexpectedError { - message: format!("Failed to create RowConverter for dedup: {e}"), + message: format!("Failed to create RowConverter for key grouping: {e}"), source: Some(Box::new(e)), })?; let key_columns: Vec> = self @@ -552,13 +673,29 @@ impl KeyValueFileWriter { .iter() .map(|&idx| batch.column(idx).clone()) .collect(); - let rows = - converter - .convert_columns(&key_columns) - .map_err(|e| crate::Error::UnexpectedError { - message: format!("Failed to convert key columns for dedup: {e}"), - source: Some(Box::new(e)), - })?; + converter + .convert_columns(&key_columns) + .map_err(|e| crate::Error::UnexpectedError { + message: format!("Failed to convert key columns for key grouping: {e}"), + source: Some(Box::new(e)), + }) + } + + /// Deduplicate sorted indices by primary key for Deduplicate / FirstRow engines. + /// + /// Input: `sorted_indices` ordered by PK + seq fields + auto-seq (all ascending). + /// Output: a Vec of original row indices to keep, in sorted PK order. + fn dedup_sorted_indices( + &self, + batch: &RecordBatch, + sorted_indices: &arrow_array::UInt32Array, + ) -> Result> { + let n = sorted_indices.len(); + if n == 0 { + return Ok(vec![]); + } + + let rows = self.convert_key_rows(batch)?; let mut result: Vec = Vec::with_capacity(n); // Track the start of the current key group and the candidate winner. @@ -717,33 +854,119 @@ mod tests { assert_eq!(deduped, vec![0, 2]); } + fn partial_update_writer() -> KeyValueFileWriter { + KeyValueFileWriter::new( + FileIOBuilder::new("memory").build().unwrap(), + test_write_config(MergeEngine::PartialUpdate), + 0, + ) + .unwrap() + } + + /// Partial-update merges each key group down to one row at flush: every + /// column keeps its latest non-null value (different columns may come + /// from different source rows) and the merged row carries the group's + /// highest sequence number. #[test] - fn test_select_flush_indices_keeps_all_rows_for_partial_update_engine() { + fn test_merge_partial_update_rows_latest_non_null_per_column() { let schema = Arc::new(ArrowSchema::new(vec![ Arc::new(ArrowField::new("id", ArrowDataType::Int32, false)), Arc::new(ArrowField::new("seq", ArrowDataType::Int64, false)), + Arc::new(ArrowField::new("v1", ArrowDataType::Int32, true)), + Arc::new(ArrowField::new("v2", ArrowDataType::Int32, true)), ])); let batch = RecordBatch::try_new( schema, vec![ - Arc::new(Int32Array::from(vec![1, 1])) as Arc, - Arc::new(Int64Array::from(vec![10, 20])) as Arc, + Arc::new(Int32Array::from(vec![1, 1, 1, 2])) as Arc, + Arc::new(Int64Array::from(vec![10, 20, 30, 5])) as Arc, + Arc::new(Int32Array::from(vec![Some(100), None, None, Some(9)])) + as Arc, + Arc::new(Int32Array::from(vec![None, Some(200), None, None])) + as Arc, ], ) .unwrap(); - let sorted_indices = UInt32Array::from(vec![0, 1]); - let writer = KeyValueFileWriter::new( - FileIOBuilder::new("memory").build().unwrap(), - test_write_config(MergeEngine::PartialUpdate), - 0, - ) - .unwrap(); + let sorted_indices = UInt32Array::from(vec![0, 1, 2, 3]); + let seq_array = Int64Array::from(vec![1000, 1001, 1002, 1003]); - let selected = writer - .select_flush_indices(&batch, &sorted_indices) + let (merged, merged_seq) = partial_update_writer() + .merge_partial_update_rows(&batch, &seq_array, &sorted_indices) .unwrap(); - assert_eq!(selected, vec![0, 1]); + assert_eq!(merged.num_rows(), 2); + let ids = merged + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let seqs = merged + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let v1 = merged + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + let v2 = merged + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + // Key 1: v1 from the first row (only non-null), v2 from the second, + // user seq column from the third (latest non-null). + assert_eq!((ids.value(0), seqs.value(0)), (1, 30)); + assert_eq!((v1.value(0), v2.value(0)), (100, 200)); + // v2 of key 2 is null in every row and stays null. + assert_eq!((ids.value(1), v1.value(1)), (2, 9)); + assert!(v2.is_null(1)); + + // The merged _SEQUENCE_NUMBER is the highest of each group. + let merged_seq = merged_seq + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec(); + assert_eq!(merged_seq, vec![1002, 1003]); + } + + /// Retract rows are rejected at flush, matching the read-side + /// PartialUpdateMergeFunction error. + #[test] + fn test_merge_partial_update_rows_rejects_retract() { + let schema = Arc::new(ArrowSchema::new(vec![ + Arc::new(ArrowField::new("id", ArrowDataType::Int32, false)), + Arc::new(ArrowField::new("seq", ArrowDataType::Int64, false)), + Arc::new(ArrowField::new( + VALUE_KIND_FIELD_NAME, + ArrowDataType::Int8, + false, + )), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![1])) as Arc, + Arc::new(Int64Array::from(vec![10])) as Arc, + // RowKind::Delete + Arc::new(Int8Array::from(vec![3])) as Arc, + ], + ) + .unwrap(); + let sorted_indices = UInt32Array::from(vec![0]); + let seq_array = Int64Array::from(vec![1000]); + + let err = partial_update_writer() + .merge_partial_update_rows(&batch, &seq_array, &sorted_indices) + .unwrap_err(); + assert!( + matches!(err, crate::Error::Unsupported { ref message } + if message.contains("does not support DELETE or UPDATE_BEFORE")), + "got {err:?}" + ); } #[test] From e20e2290cfb045aedee752c8ebd956f2817f3dc7 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 12 Jun 2026 01:14:55 +0800 Subject: [PATCH 2/2] test(table/write): lock flush merge to read-side partial-update semantics Java reuses one MergeFunction across write flush, compaction, and reads, giving engine semantics a single source of truth. The Rust write side is a vectorized re-implementation of the read-side streaming merge, so feed the same key groups through merge_partial_update_rows and the read-side PartialUpdateMergeFunction and assert identical output, preventing the two implementations from drifting. --- crates/paimon/src/table/kv_file_writer.rs | 129 ++++++++++++++++++++++ 1 file changed, 129 insertions(+) diff --git a/crates/paimon/src/table/kv_file_writer.rs b/crates/paimon/src/table/kv_file_writer.rs index f3a96b2d..814a994f 100644 --- a/crates/paimon/src/table/kv_file_writer.rs +++ b/crates/paimon/src/table/kv_file_writer.rs @@ -933,6 +933,135 @@ mod tests { assert_eq!(merged_seq, vec![1002, 1003]); } + /// Lock the flush-time merge to the read-side `PartialUpdateMergeFunction`. + /// + /// Java uses one `MergeFunction` for write flush, compaction, and reads, + /// so engine semantics have a single source of truth. The Rust write side + /// is a vectorized re-implementation (per-column take) of the read side's + /// streaming merge; this test feeds the same key groups through both and + /// asserts identical output, so the two implementations cannot drift. + #[test] + fn test_flush_merge_matches_read_side_partial_update_merge() { + use crate::table::sort_merge::{ + BufferedBatch, MergeFunction, MergeResult, MergeRow, PartialUpdateMergeFunction, + }; + use arrow_array::StringArray; + + // Arrival order; auto-seq = 1000 + row index. The `seq` column is the + // user sequence field (test_write_config: sequence_field_indices=[1]). + // + // Key 1 ordering by (user seq, auto-seq): r2(10) < r0(20,@1000) < r3(20,@1003) + // v1: latest non-null = r3 (7); v2: latest non-null = r0 ("b"). + // Key 2 ordering: r1(5,@1001) < r4(5,@1004) + // v1: latest non-null = r1 (9); v2: null in every row. + let schema = Arc::new(ArrowSchema::new(vec![ + Arc::new(ArrowField::new("id", ArrowDataType::Int32, false)), + Arc::new(ArrowField::new("seq", ArrowDataType::Int64, false)), + Arc::new(ArrowField::new("v1", ArrowDataType::Int32, true)), + Arc::new(ArrowField::new("v2", ArrowDataType::Utf8, true)), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 1, 1, 2])) as Arc, + Arc::new(Int64Array::from(vec![20, 5, 10, 20, 5])) as Arc, + Arc::new(Int32Array::from(vec![ + None, + Some(9), + Some(100), + Some(7), + None, + ])) as Arc, + Arc::new(StringArray::from(vec![ + Some("b"), + None, + Some("a"), + None, + None, + ])) as Arc, + ], + ) + .unwrap(); + let seq_values: Vec = (1000..1005).collect(); + let seq_array = Int64Array::from(seq_values.clone()); + + // Write side: replicate the flush sort (PK + sequence field + auto-seq). + let sort_columns = vec![ + SortColumn { + values: batch.column(0).clone(), + options: Some(SortOptions { + descending: false, + nulls_first: true, + }), + }, + SortColumn { + values: batch.column(1).clone(), + options: Some(SortOptions { + descending: false, + nulls_first: true, + }), + }, + SortColumn { + values: Arc::new(seq_array.clone()), + options: Some(SortOptions { + descending: false, + nulls_first: true, + }), + }, + ]; + let sorted_indices = lexsort_to_indices(&sort_columns, None).unwrap(); + let (merged, merged_seq) = partial_update_writer() + .merge_partial_update_rows(&batch, &seq_array, &sorted_indices) + .unwrap(); + assert_eq!(merged.num_rows(), 2, "two keys, one merged row each"); + assert_eq!( + merged_seq + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec(), + vec![1003, 1004], + "merged rows carry each group's highest sequence number" + ); + + // Read side: feed the same key groups (in arrival order — the merge + // function orders rows itself) through PartialUpdateMergeFunction. + let table_options = + HashMap::from([("merge-engine".to_string(), "partial-update".to_string())]); + let merge_fn = + PartialUpdateMergeFunction::new(&table_options, "default.test_table").unwrap(); + let buffer = [BufferedBatch::Source(batch.clone())]; + let identity: Vec = (0..batch.num_columns()).collect(); + let seq_col = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + for (group_idx, group_rows) in [vec![0usize, 2, 3], vec![1usize, 4]].iter().enumerate() { + let rows: Vec = group_rows + .iter() + .map(|&row_idx| MergeRow { + batch_idx: 0, + row_idx, + sequence_number: seq_values[row_idx], + value_kind: 0, + user_sequences: vec![Some(seq_col.value(row_idx) as i128)], + }) + .collect(); + let result = merge_fn.merge(&rows, &buffer, &identity, &schema).unwrap(); + let MergeResult::MaterializedRow(read_row) = result else { + panic!("partial-update merge must materialize a row"); + }; + assert_eq!( + merged.slice(group_idx, 1), + read_row, + "flush merge and read-side merge must agree for group {group_idx}" + ); + } + } + /// Retract rows are rejected at flush, matching the read-side /// PartialUpdateMergeFunction error. #[test]