diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 801ac275..0741a50c 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -42,7 +42,7 @@ storage-all = [ "storage-hdfs", ] fulltext = ["tantivy", "tempfile"] -vortex = ["dep:vortex", "dep:kanal"] +vortex = ["dep:vortex"] storage-memory = ["opendal/services-memory"] storage-fs = ["opendal/services-fs"] @@ -102,11 +102,10 @@ urlencoding = "2.1" tantivy = { version = "0.22", optional = true } tempfile = { version = "3", optional = true } vortex = { version = "0.68", features = ["tokio"], optional = true } -kanal = { version = "0.1.1", optional = true } libloading = "0.9" # Keep CI on the dependency set that passed before unicode-segmentation 1.13.3. # The 1.13.3 resolver update correlates with Linux Vortex tests hanging. -unicode-segmentation = "=1.13.3" +unicode-segmentation = "=1.13.2" [dev-dependencies] axum = { version = "0.7", features = ["macros", "tokio", "http1", "http2"] } diff --git a/crates/paimon/src/arrow/format/vortex.rs b/crates/paimon/src/arrow/format/vortex.rs index 6dfce9c4..2a77e130 100644 --- a/crates/paimon/src/arrow/format/vortex.rs +++ b/crates/paimon/src/arrow/format/vortex.rs @@ -17,131 +17,47 @@ use super::{FilePredicates, FormatFileReader, FormatFileWriter}; use crate::io::{FileRead, OutputFile}; -use crate::spec::{DataField, Datum, Predicate, PredicateOperator}; +use crate::spec::{DataField, DataType, Datum, Predicate, PredicateOperator}; use crate::table::{ArrowRecordBatchStream, RowRange}; use crate::Error; -use arrow_array::RecordBatch; -use arrow_schema::{DataType as ArrowDataType, SchemaRef}; +use arrow_array::{ + Array, ArrayRef as ArrowArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, + Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, Scalar, + StringArray, Time32MillisecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, +}; +use arrow_ord::cmp::{ + eq as arrow_eq, gt as arrow_gt, gt_eq as arrow_gt_eq, lt as arrow_lt, lt_eq as arrow_lt_eq, + neq as arrow_neq, +}; +use arrow_schema::{ArrowError, DataType as ArrowDataType, SchemaRef}; use async_trait::async_trait; -use futures::future::BoxFuture; -use futures::Stream; -use futures::StreamExt; -use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; -use std::task::{Context, Poll}; +use std::sync::{Arc, OnceLock}; use vortex::array::arrow::{FromArrowArray, IntoArrowArray}; use vortex::array::dtype::arrow::FromArrowType; use vortex::array::dtype::DType; -use vortex::array::expr::{ - and_collect, col, eq, gt, gt_eq, is_null, lit, lt, lt_eq, not, not_eq, or_collect, Expression, -}; -use vortex::array::stream::{ArrayStreamAdapter, ArrayStreamExt}; use vortex::array::ArrayRef; -use vortex::buffer::{Alignment, ByteBuffer}; -use vortex::error::VortexResult; +use vortex::buffer::ByteBuffer; use vortex::file::{OpenOptionsSessionExt, WriteOptionsSessionExt}; -use vortex::io::runtime::tokio::TokioRuntime; +use vortex::io::runtime::current::CurrentThreadRuntime; use vortex::io::runtime::BlockingRuntime; use vortex::io::session::RuntimeSessionExt; -use vortex::io::{IoBuf, VortexReadAt, VortexWrite}; +use vortex::layout::scan::split_by::SplitBy; use vortex::scan::selection::Selection; use vortex::session::VortexSession; use vortex::VortexSessionDefault; -// --------------------------------------------------------------------------- -// Constants -// --------------------------------------------------------------------------- - -/// Maximum number of concurrent read requests for Vortex file IO. -const DEFAULT_READ_CONCURRENCY: usize = 10; - -// --------------------------------------------------------------------------- -// Vortex Runtime -// --------------------------------------------------------------------------- - -struct PaimonVortexRuntime { - runtime: TokioRuntime, -} - -impl PaimonVortexRuntime { - fn new() -> crate::Result> { - let handle = tokio::runtime::Handle::try_current().map_err(|e| Error::DataInvalid { - message: format!("Vortex requires an active Tokio runtime: {e}"), +async fn acquire_vortex_io_permit() -> crate::Result> { + static SEMAPHORE: OnceLock = OnceLock::new(); + SEMAPHORE + .get_or_init(|| tokio::sync::Semaphore::new(1)) + .acquire() + .await + .map_err(|e| Error::DataInvalid { + message: format!("Failed to acquire Vortex I/O permit: {e}"), source: None, - })?; - Ok(Arc::new(Self { - runtime: TokioRuntime::new(handle), - })) - } - - fn session(&self) -> VortexSession { - VortexSession::default().with_handle(self.runtime.handle()) - } -} - -fn new_vortex_session() -> crate::Result<(VortexSession, Arc)> { - let runtime = PaimonVortexRuntime::new()?; - let session = runtime.session(); - Ok((session, runtime)) -} - -struct VortexRecordBatchStream { - inner: ArrowRecordBatchStream, - _runtime: Arc, -} - -impl Unpin for VortexRecordBatchStream {} - -impl Stream for VortexRecordBatchStream { - type Item = crate::Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().inner.as_mut().poll_next(cx) - } -} - -// --------------------------------------------------------------------------- -// IO Adapters -// --------------------------------------------------------------------------- - -/// Adapts paimon's `FileRead` to Vortex's `VortexReadAt`. -struct PaimonVortexReadAt { - file_size: u64, - reader: Arc, -} - -impl VortexReadAt for PaimonVortexReadAt { - fn uri(&self) -> Option<&Arc> { - None - } - - fn concurrency(&self) -> usize { - DEFAULT_READ_CONCURRENCY - } - - fn size(&self) -> futures::future::BoxFuture<'static, VortexResult> { - let size = self.file_size; - Box::pin(async move { Ok(size) }) - } - - fn read_at( - &self, - offset: u64, - length: usize, - alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { - let reader = Arc::clone(&self.reader); - Box::pin(async move { - let bytes = reader - .read(offset..offset + length as u64) - .await - .map_err(|e| vortex::error::vortex_err!("paimon read error: {e}"))?; - // Zero-copy when the Bytes pointer is already aligned; falls back to copy otherwise. - let buffer = ByteBuffer::from(bytes).aligned(alignment); - Ok(vortex::array::buffer::BufferHandle::new_host(buffer)) }) - } } // --------------------------------------------------------------------------- @@ -161,101 +77,203 @@ impl FormatFileReader for VortexFormatReader { _batch_size: Option, row_selection: Option>, ) -> crate::Result { - let (session, runtime) = new_vortex_session()?; - - let source = Arc::new(PaimonVortexReadAt { - file_size, - reader: Arc::from(reader), + let bytes = reader.read(0..file_size).await?; + let target_schema = crate::arrow::build_target_arrow_schema(read_fields)?; + let read_fields = read_fields.to_vec(); + let predicates = predicates.map(|fp| FilePredicates { + predicates: fp.predicates.clone(), + file_fields: fp.file_fields.clone(), }); + let scan_fields = build_vortex_scan_fields(&read_fields, predicates.as_ref()); + let scan_schema = crate::arrow::build_target_arrow_schema(&scan_fields)?; + let _permit = acquire_vortex_io_permit().await?; + + let target_schema_for_scan = target_schema.clone(); + let plan = VortexReadPlan { + target_schema: target_schema_for_scan, + read_fields, + scan_schema, + scan_fields, + predicates, + row_selection, + }; + let batches = + tokio::task::spawn_blocking(move || read_vortex_batches_blocking(bytes, plan)) + .await + .map_err(|e| Error::DataInvalid { + message: format!("Vortex read task failed: {e}"), + source: None, + })??; + + Ok(Box::pin(futures::stream::iter(batches.into_iter().map(Ok)))) + } +} + +struct VortexReadPlan { + target_schema: SchemaRef, + read_fields: Vec, + scan_schema: SchemaRef, + scan_fields: Vec, + predicates: Option, + row_selection: Option>, +} - let vortex_file = session +fn read_vortex_batches_blocking( + bytes: bytes::Bytes, + plan: VortexReadPlan, +) -> crate::Result> { + run_vortex_on_thread("paimon-vortex-read", move || { + let runtime = CurrentThreadRuntime::new(); + let session = VortexSession::default().with_handle(runtime.handle()); + read_vortex_batches(&runtime, session, ByteBuffer::from(bytes), plan) + }) +} + +fn read_vortex_batches( + runtime: &CurrentThreadRuntime, + session: VortexSession, + bytes: ByteBuffer, + plan: VortexReadPlan, +) -> crate::Result> { + let VortexReadPlan { + target_schema, + read_fields, + scan_schema, + scan_fields, + predicates, + row_selection, + } = plan; + + let vortex_file = + session .open_options() - .with_file_size(file_size) - .open(source) - .await + .open_buffer(bytes) .map_err(|e| Error::DataInvalid { message: format!("Failed to open Vortex file: {e}"), source: None, })?; - // Build the target Arrow schema for the projected fields. - let target_schema = crate::arrow::build_target_arrow_schema(read_fields)?; - - if read_fields.is_empty() { - let row_count = match &row_selection { + if scan_fields.is_empty() { + let row_count = if constant_predicates_match(predicates.as_ref()) { + match &row_selection { Some(ranges) => ranges.iter().map(|r| r.count() as usize).sum(), None => vortex_file.row_count() as usize, - }; - let batch = RecordBatch::try_new_with_options( - target_schema, - vec![], - &arrow_array::RecordBatchOptions::new().with_row_count(Some(row_count)), - ) - .map_err(|e| Error::DataInvalid { - message: format!("Failed to build empty RecordBatch: {e}"), - source: None, - })?; - return Ok(Box::pin(futures::stream::once(async { Ok(batch) }))); - } + } + } else { + 0 + }; + let batch = RecordBatch::try_new_with_options( + target_schema, + vec![], + &arrow_array::RecordBatchOptions::new().with_row_count(Some(row_count)), + ) + .map_err(|e| Error::DataInvalid { + message: format!("Failed to build empty RecordBatch: {e}"), + source: None, + })?; + return Ok(vec![batch]); + } - // Build projection expression for requested fields. - let projected_names: Vec<&str> = read_fields.iter().map(|f| f.name()).collect(); + // Build projection expression for requested fields. + let projected_names: Vec<&str> = scan_fields.iter().map(|f| f.name()).collect(); - let mut scan_builder = vortex_file.scan().map_err(|e| Error::DataInvalid { - message: format!("Failed to create Vortex scan: {e}"), + let mut scan_builder = vortex_file.scan().map_err(|e| Error::DataInvalid { + message: format!("Failed to create Vortex scan: {e}"), + source: None, + })?; + + // Apply column projection. + { + use vortex::array::expr::{root, select}; + scan_builder = scan_builder.with_projection(select(projected_names, root())); + } + + // Vortex 0.68 filtered scans can block indefinitely on some runtimes. + // Decode predicate columns and apply the same filter with Arrow kernels below. + + // Push row selection down to Vortex. + // For a single contiguous range, use with_row_range directly (avoids roaring bitmap overhead). + // For multiple ranges, build a Selection::IncludeRoaring bitmap. + if let Some(ref ranges) = row_selection { + let total_rows = vortex_file.row_count(); + if let Some(range) = as_single_row_range(ranges, total_rows) { + scan_builder = scan_builder.with_row_range(range); + } else { + let selection = row_ranges_to_selection(ranges, total_rows); + scan_builder = scan_builder.with_selection(selection); + } + } + + let rows_per_split = usize::try_from(vortex_file.row_count()) + .unwrap_or(usize::MAX) + .max(1); + let vortex_tasks = scan_builder + .with_concurrency(1) + .with_split_by(SplitBy::RowCount(rows_per_split)) + .build() + .map_err(|e| Error::DataInvalid { + message: format!("Failed to build Vortex scan tasks: {e}"), source: None, })?; - // Apply column projection. - { - use vortex::array::expr::{root, select}; - scan_builder = scan_builder.with_projection(select(projected_names, root())); - } + let mut batches = Vec::with_capacity(vortex_tasks.len()); + for task in vortex_tasks { + let Some(vortex_array) = runtime.block_on(task).map_err(|e| Error::DataInvalid { + message: format!("Vortex read error: {e}"), + source: None, + })? + else { + continue; + }; + let batch = vortex_array_to_record_batch(vortex_array, &scan_schema)?; + batches.push(filter_and_project_batch( + batch, + &target_schema, + &read_fields, + &scan_fields, + predicates.as_ref(), + )?); + } + + Ok(batches) +} + +fn constant_predicates_match(predicates: Option<&FilePredicates>) -> bool { + predicates.is_none_or(|fp| { + fp.predicates + .iter() + .all(|predicate| constant_predicate_value(predicate).unwrap_or(true)) + }) +} - // Push predicate filter down to Vortex. - if let Some(fp) = predicates { - if let Some(filter_expr) = predicates_to_vortex_expr(&fp.predicates, &fp.file_fields) { - scan_builder = scan_builder.with_filter(filter_expr); +fn constant_predicate_value(predicate: &Predicate) -> Option { + match predicate { + Predicate::AlwaysTrue => Some(true), + Predicate::AlwaysFalse => Some(false), + Predicate::And(children) => { + let mut saw_unknown = false; + for child in children { + match constant_predicate_value(child) { + Some(true) => {} + Some(false) => return Some(false), + None => saw_unknown = true, + } } + (!saw_unknown).then_some(true) } - - // Push row selection down to Vortex. - // For a single contiguous range, use with_row_range directly (avoids roaring bitmap overhead). - // For multiple ranges, build a Selection::IncludeRoaring bitmap. - if let Some(ref ranges) = row_selection { - let total_rows = vortex_file.row_count(); - if let Some(range) = as_single_row_range(ranges, total_rows) { - scan_builder = scan_builder.with_row_range(range); - } else { - let selection = row_ranges_to_selection(ranges, total_rows); - scan_builder = scan_builder.with_selection(selection); + Predicate::Or(children) => { + let mut saw_unknown = false; + for child in children { + match constant_predicate_value(child) { + Some(true) => return Some(true), + Some(false) => {} + None => saw_unknown = true, + } } + (!saw_unknown).then_some(false) } - - let vortex_stream = scan_builder - .into_array_stream() - .map_err(|e| Error::DataInvalid { - message: format!("Failed to build Vortex array stream: {e}"), - source: None, - })?; - - // Convert Vortex stream to Arrow RecordBatch stream. - let stream = vortex_stream - .map(move |result| { - result.map_err(|e| Error::DataInvalid { - message: format!("Vortex read error: {e}"), - source: None, - }) - }) - .map(move |result| { - let schema = target_schema.clone(); - result.and_then(|vortex_array| vortex_array_to_record_batch(vortex_array, &schema)) - }); - - Ok(Box::pin(VortexRecordBatchStream { - inner: Box::pin(stream), - _runtime: runtime, - })) + Predicate::Not(inner) => constant_predicate_value(inner).map(|value| !value), + Predicate::Leaf { .. } => None, } } @@ -278,224 +296,524 @@ fn as_single_row_range(ranges: &[RowRange], total_rows: u64) -> Option, +) -> Vec { + let mut fields = read_fields.to_vec(); + + if let Some(fp) = predicates { + let mut predicate_indices = Vec::new(); + for predicate in &fp.predicates { + collect_predicate_field_indices(predicate, &mut predicate_indices); + } + for index in predicate_indices { + if let Some(field) = fp.file_fields.get(index) { + push_unique_scan_field(&mut fields, field); + } + } + } + + fields +} + +fn collect_predicate_field_indices(predicate: &Predicate, indices: &mut Vec) { + match predicate { + Predicate::Leaf { index, .. } => indices.push(*index), + Predicate::And(children) | Predicate::Or(children) => { + for child in children { + collect_predicate_field_indices(child, indices); + } + } + Predicate::Not(inner) => collect_predicate_field_indices(inner, indices), + Predicate::AlwaysTrue | Predicate::AlwaysFalse => {} + } +} + +fn push_unique_scan_field(fields: &mut Vec, field: &DataField) { + if !fields + .iter() + .any(|existing| same_data_field(existing, field)) + { + fields.push(field.clone()); + } +} -/// Convert a list of Paimon predicates (ANDed together) into a single Vortex filter expression. -fn predicates_to_vortex_expr( +fn same_data_field(left: &DataField, right: &DataField) -> bool { + left.id() == right.id() || left.name() == right.name() +} + +fn filter_and_project_batch( + batch: RecordBatch, + target_schema: &SchemaRef, + read_fields: &[DataField], + scan_fields: &[DataField], + predicates: Option<&FilePredicates>, +) -> crate::Result { + let filtered = match predicates { + Some(fp) => filter_record_batch_by_predicates(batch, fp, scan_fields)?, + None => batch, + }; + + if read_fields.is_empty() { + return RecordBatch::try_new_with_options( + target_schema.clone(), + vec![], + &arrow_array::RecordBatchOptions::new().with_row_count(Some(filtered.num_rows())), + ) + .map_err(|e| Error::DataInvalid { + message: format!("Failed to build projected empty RecordBatch: {e}"), + source: None, + }); + } + + let columns = projection_indices(read_fields, scan_fields)? + .into_iter() + .map(|index| filtered.column(index).clone()) + .collect::>(); + + RecordBatch::try_new(target_schema.clone(), columns).map_err(|e| Error::DataInvalid { + message: format!("Failed to project Vortex RecordBatch: {e}"), + source: None, + }) +} + +fn projection_indices( + read_fields: &[DataField], + scan_fields: &[DataField], +) -> crate::Result> { + read_fields + .iter() + .map(|field| { + scan_fields + .iter() + .position(|scan_field| same_data_field(scan_field, field)) + .ok_or_else(|| Error::DataInvalid { + message: format!( + "Projected Vortex field {} was not included in the scan", + field.name() + ), + source: None, + }) + }) + .collect() +} + +fn filter_record_batch_by_predicates( + batch: RecordBatch, + predicates: &FilePredicates, + scan_fields: &[DataField], +) -> crate::Result { + let Some(mask) = evaluate_predicates_mask( + &batch, + &predicates.predicates, + &predicates.file_fields, + scan_fields, + )? + else { + return Ok(batch); + }; + + arrow_select::filter::filter_record_batch(&batch, &mask).map_err(|e| Error::DataInvalid { + message: format!("Failed to filter Vortex RecordBatch: {e}"), + source: Some(Box::new(e)), + }) +} + +fn evaluate_predicates_mask( + batch: &RecordBatch, predicates: &[Predicate], file_fields: &[DataField], -) -> Option { - let exprs: Vec = predicates - .iter() - .filter_map(|p| predicate_to_vortex_expr(p, file_fields)) - .collect(); - and_collect(exprs) + scan_fields: &[DataField], +) -> crate::Result> { + let mut combined = None; + for predicate in predicates { + let Some(mask) = evaluate_predicate_mask(batch, predicate, file_fields, scan_fields)? + else { + continue; + }; + combined = Some(match combined { + Some(existing) => combine_filter_masks(&existing, &mask, false), + None => mask, + }); + } + Ok(combined) } -/// Convert a single Paimon `Predicate` tree node into a Vortex `Expression`. -fn predicate_to_vortex_expr( +fn evaluate_predicate_mask( + batch: &RecordBatch, predicate: &Predicate, file_fields: &[DataField], -) -> Option { + scan_fields: &[DataField], +) -> crate::Result> { match predicate { - Predicate::AlwaysTrue => Some(lit(true)), - Predicate::AlwaysFalse => Some(lit(false)), + Predicate::AlwaysTrue => Ok(Some(BooleanArray::from(vec![true; batch.num_rows()]))), + Predicate::AlwaysFalse => Ok(Some(BooleanArray::from(vec![false; batch.num_rows()]))), Predicate::And(children) => { - // Dropping unconvertible children is safe for AND: it makes the filter - // less restrictive, so no matching rows are incorrectly excluded. - let exprs: Vec = children - .iter() - .filter_map(|c| predicate_to_vortex_expr(c, file_fields)) - .collect(); - and_collect(exprs) + let mut combined = None; + for child in children { + let Some(mask) = evaluate_predicate_mask(batch, child, file_fields, scan_fields)? + else { + continue; + }; + combined = Some(match combined { + Some(existing) => combine_filter_masks(&existing, &mask, false), + None => mask, + }); + } + Ok(combined) } Predicate::Or(children) => { - // All children must be convertible; otherwise skip the entire OR - // to avoid incorrectly filtering out rows that match unconverted branches. - let exprs: Vec = children - .iter() - .map(|c| predicate_to_vortex_expr(c, file_fields)) - .collect::>>()?; - or_collect(exprs) + let mut combined = BooleanArray::from(vec![false; batch.num_rows()]); + for child in children { + let Some(mask) = evaluate_predicate_mask(batch, child, file_fields, scan_fields)? + else { + return Ok(None); + }; + combined = combine_filter_masks(&combined, &mask, true); + } + Ok(Some(combined)) + } + Predicate::Not(inner) => { + let Some(mask) = evaluate_predicate_mask(batch, inner, file_fields, scan_fields)? + else { + return Ok(None); + }; + Ok(Some(boolean_mask_from_predicate(mask.len(), |row_index| { + !mask.value(row_index) + }))) } - Predicate::Not(inner) => predicate_to_vortex_expr(inner, file_fields).map(not), Predicate::Leaf { - column, index, op, literals, .. - } => leaf_to_vortex_expr(column, *index, *op, literals, file_fields), + } => { + let Some(file_field) = file_fields.get(*index) else { + return Ok(None); + }; + let Some(scan_index) = scan_fields + .iter() + .position(|scan_field| same_data_field(scan_field, file_field)) + else { + return Ok(None); + }; + evaluate_arrow_leaf_predicate( + batch.column(scan_index), + file_field.data_type(), + *op, + literals, + ) + } } } -/// Convert a leaf predicate to a Vortex expression. -fn leaf_to_vortex_expr( - _column: &str, - index: usize, +fn evaluate_arrow_leaf_predicate( + array: &ArrowArrayRef, + data_type: &DataType, op: PredicateOperator, literals: &[Datum], - file_fields: &[DataField], -) -> Option { - let file_field = file_fields.get(index)?; - // Use the file-level column name for the Vortex expression. - let column_expr = col(file_field.name()); - +) -> crate::Result> { match op { - PredicateOperator::IsNull => Some(is_null(column_expr)), - PredicateOperator::IsNotNull => Some(not(is_null(column_expr))), - PredicateOperator::Eq => { - let v = datum_to_vortex_lit(literals.first()?, file_field)?; - Some(eq(column_expr, v)) - } - PredicateOperator::NotEq => { - let v = datum_to_vortex_lit(literals.first()?, file_field)?; - Some(not_eq(column_expr, v)) - } - PredicateOperator::Lt => { - let v = datum_to_vortex_lit(literals.first()?, file_field)?; - Some(lt(column_expr, v)) - } - PredicateOperator::LtEq => { - let v = datum_to_vortex_lit(literals.first()?, file_field)?; - Some(lt_eq(column_expr, v)) + PredicateOperator::IsNull => Ok(Some(boolean_mask_from_predicate( + array.len(), + |row_index| array.is_null(row_index), + ))), + PredicateOperator::IsNotNull => Ok(Some(boolean_mask_from_predicate( + array.len(), + |row_index| array.is_valid(row_index), + ))), + PredicateOperator::In | PredicateOperator::NotIn => { + evaluate_set_membership_predicate(array, data_type, op, literals) } - PredicateOperator::Gt => { - let v = datum_to_vortex_lit(literals.first()?, file_field)?; - Some(gt(column_expr, v)) - } - PredicateOperator::GtEq => { - let v = datum_to_vortex_lit(literals.first()?, file_field)?; - Some(gt_eq(column_expr, v)) - } - PredicateOperator::In => { - // OR of eq for each literal value. - // All literals must be convertible; otherwise skip the entire predicate - // to avoid incorrectly filtering out rows that match unconverted literals. - let exprs: Vec = literals - .iter() - .map(|d| datum_to_vortex_lit(d, file_field).map(|v| eq(col(file_field.name()), v))) - .collect::>>()?; - or_collect(exprs) + PredicateOperator::Eq + | PredicateOperator::NotEq + | PredicateOperator::Lt + | PredicateOperator::LtEq + | PredicateOperator::Gt + | PredicateOperator::GtEq => { + let Some(literal) = literals.first() else { + return Ok(None); + }; + let Some(scalar) = literal_scalar_for_arrow_filter(literal, data_type)? else { + return Ok(None); + }; + let mask = + evaluate_column_predicate(array, &scalar, op).map_err(|e| Error::DataInvalid { + message: format!("Failed to evaluate Vortex predicate: {e}"), + source: Some(Box::new(e)), + })?; + Ok(Some(sanitize_filter_mask(mask))) } + } +} + +fn evaluate_set_membership_predicate( + array: &ArrowArrayRef, + data_type: &DataType, + op: PredicateOperator, + literals: &[Datum], +) -> crate::Result> { + if literals.is_empty() { + return Ok(Some(match op { + PredicateOperator::In => BooleanArray::from(vec![false; array.len()]), + PredicateOperator::NotIn => { + boolean_mask_from_predicate(array.len(), |row_index| array.is_valid(row_index)) + } + _ => unreachable!(), + })); + } + + let mut combined = match op { + PredicateOperator::In => BooleanArray::from(vec![false; array.len()]), PredicateOperator::NotIn => { - // AND of not_eq for each literal value. - // All literals must be convertible; otherwise skip the entire predicate - // to avoid incorrectly keeping rows that match unconverted literals. - let exprs: Vec = literals - .iter() - .map(|d| { - datum_to_vortex_lit(d, file_field).map(|v| not_eq(col(file_field.name()), v)) - }) - .collect::>>()?; - and_collect(exprs) + boolean_mask_from_predicate(array.len(), |row_index| array.is_valid(row_index)) } + _ => unreachable!(), + }; + + for literal in literals { + let Some(scalar) = literal_scalar_for_arrow_filter(literal, data_type)? else { + return Ok(None); + }; + let comparison_op = match op { + PredicateOperator::In => PredicateOperator::Eq, + PredicateOperator::NotIn => PredicateOperator::NotEq, + _ => unreachable!(), + }; + let mask = evaluate_column_predicate(array, &scalar, comparison_op).map_err(|e| { + Error::DataInvalid { + message: format!("Failed to evaluate Vortex set predicate: {e}"), + source: Some(Box::new(e)), + } + })?; + let mask = sanitize_filter_mask(mask); + combined = combine_filter_masks(&combined, &mask, matches!(op, PredicateOperator::In)); + } + + Ok(Some(combined)) +} + +fn evaluate_column_predicate( + column: &ArrowArrayRef, + scalar: &Scalar, + op: PredicateOperator, +) -> Result { + match op { + PredicateOperator::Eq => arrow_eq(column, scalar), + PredicateOperator::NotEq => arrow_neq(column, scalar), + PredicateOperator::Lt => arrow_lt(column, scalar), + PredicateOperator::LtEq => arrow_lt_eq(column, scalar), + PredicateOperator::Gt => arrow_gt(column, scalar), + PredicateOperator::GtEq => arrow_gt_eq(column, scalar), + PredicateOperator::IsNull + | PredicateOperator::IsNotNull + | PredicateOperator::In + | PredicateOperator::NotIn => Ok(BooleanArray::new_null(column.len())), + } +} + +fn sanitize_filter_mask(mask: BooleanArray) -> BooleanArray { + if mask.null_count() == 0 { + return mask; } + + boolean_mask_from_predicate(mask.len(), |row_index| { + mask.is_valid(row_index) && mask.value(row_index) + }) } -/// Convert a Paimon `Datum` to a Vortex literal `Expression`. -/// Returns `None` for types not yet supported by this conversion. -fn datum_to_vortex_lit(datum: &Datum, file_field: &DataField) -> Option { - use crate::spec::DataType as PaimonDataType; - use vortex::array::dtype::Nullability; - use vortex::array::scalar::{PValue, Scalar, ScalarValue}; - match datum { - Datum::Bool(v) => Some(lit(*v)), - Datum::TinyInt(v) => Some(lit(*v)), - Datum::SmallInt(v) => Some(lit(*v)), - Datum::Int(v) => Some(lit(*v)), - Datum::Long(v) => Some(lit(*v)), - Datum::Float(v) => Some(lit(*v)), - Datum::Double(v) => Some(lit(*v)), - Datum::String(v) => Some(lit(v.as_str())), - Datum::Bytes(v) => Some(lit(v.as_slice())), - // Date: stored as days since epoch (i32) in both Paimon and Vortex. - Datum::Date(v) => { - use vortex::extension::datetime::{Date, TimeUnit}; - let dtype = - DType::Extension(Date::new(TimeUnit::Days, Nullability::NonNullable).erased()); - let scalar = - Scalar::try_new(dtype, Some(ScalarValue::Primitive(PValue::I32(*v)))).ok()?; - Some(lit(scalar)) +fn combine_filter_masks(left: &BooleanArray, right: &BooleanArray, use_or: bool) -> BooleanArray { + debug_assert_eq!(left.len(), right.len()); + boolean_mask_from_predicate(left.len(), |row_index| { + if use_or { + left.value(row_index) || right.value(row_index) + } else { + left.value(row_index) && right.value(row_index) } - // Time: stored as milliseconds since midnight (i32) in Paimon. - Datum::Time(v) => { - use vortex::extension::datetime::{Time, TimeUnit}; - let dtype = DType::Extension( - Time::new(TimeUnit::Milliseconds, Nullability::NonNullable).erased(), - ); - let scalar = - Scalar::try_new(dtype, Some(ScalarValue::Primitive(PValue::I32(*v)))).ok()?; - Some(lit(scalar)) + }) +} + +fn boolean_mask_from_predicate( + len: usize, + mut predicate: impl FnMut(usize) -> bool, +) -> BooleanArray { + BooleanArray::from((0..len).map(&mut predicate).collect::>()) +} + +fn literal_scalar_for_arrow_filter( + literal: &Datum, + file_data_type: &DataType, +) -> crate::Result>> { + let array: ArrowArrayRef = match file_data_type { + DataType::Boolean(_) => match literal { + Datum::Bool(value) => Arc::new(BooleanArray::new_scalar(*value).into_inner()), + _ => return Ok(None), + }, + DataType::TinyInt(_) => { + match integer_literal(literal).and_then(|value| i8::try_from(value).ok()) { + Some(value) => Arc::new(Int8Array::new_scalar(value).into_inner()), + None => return Ok(None), + } } - // Timestamp: convert (millis, nanos) to the unit matching the field precision. - // precision 0-3 → milliseconds, 4-6 → microseconds, 7-9 → nanoseconds. - Datum::Timestamp { millis, nanos } => { - use vortex::extension::datetime::Timestamp; - let precision = match file_field.data_type() { - PaimonDataType::Timestamp(ts) => ts.precision(), - _ => return None, - }; - let (time_unit, value) = precision_to_time_unit_and_value(*millis, *nanos, precision); - let dtype = - DType::Extension(Timestamp::new(time_unit, Nullability::NonNullable).erased()); - let scalar = - Scalar::try_new(dtype, Some(ScalarValue::Primitive(PValue::I64(value)))).ok()?; - Some(lit(scalar)) + DataType::SmallInt(_) => { + match integer_literal(literal).and_then(|value| i16::try_from(value).ok()) { + Some(value) => Arc::new(Int16Array::new_scalar(value).into_inner()), + None => return Ok(None), + } } - Datum::LocalZonedTimestamp { millis, nanos } => { - use vortex::extension::datetime::Timestamp; - let precision = match file_field.data_type() { - PaimonDataType::LocalZonedTimestamp(ts) => ts.precision(), - _ => return None, - }; - let (time_unit, value) = precision_to_time_unit_and_value(*millis, *nanos, precision); - let dtype = DType::Extension( - Timestamp::new_with_tz(time_unit, Some(Arc::from("UTC")), Nullability::NonNullable) - .erased(), - ); - let scalar = - Scalar::try_new(dtype, Some(ScalarValue::Primitive(PValue::I64(value)))).ok()?; - Some(lit(scalar)) + DataType::Int(_) => { + match integer_literal(literal).and_then(|value| i32::try_from(value).ok()) { + Some(value) => Arc::new(Int32Array::new_scalar(value).into_inner()), + None => return Ok(None), + } } - // Decimal: construct a Vortex Scalar with the correct precision and scale. - Datum::Decimal { - unscaled, - precision, - scale, - } => { - use vortex::array::dtype::DecimalDType; - use vortex::array::scalar::{DecimalValue, ScalarValue as SV}; - let precision = u8::try_from(*precision).ok()?; - let scale = i8::try_from(*scale).ok()?; - let dtype = DType::Decimal( - DecimalDType::new(precision, scale), - Nullability::NonNullable, - ); - let scalar = - Scalar::try_new(dtype, Some(SV::Decimal(DecimalValue::I128(*unscaled)))).ok()?; - Some(lit(scalar)) + DataType::BigInt(_) => { + match integer_literal(literal).and_then(|value| i64::try_from(value).ok()) { + Some(value) => Arc::new(Int64Array::new_scalar(value).into_inner()), + None => return Ok(None), + } } - } + DataType::Float(_) => match float32_literal(literal) { + Some(value) => Arc::new(Float32Array::new_scalar(value).into_inner()), + None => return Ok(None), + }, + DataType::Double(_) => match float64_literal(literal) { + Some(value) => Arc::new(Float64Array::new_scalar(value).into_inner()), + None => return Ok(None), + }, + DataType::Char(_) | DataType::VarChar(_) => match literal { + Datum::String(value) => Arc::new(StringArray::new_scalar(value.as_str()).into_inner()), + _ => return Ok(None), + }, + DataType::Binary(_) | DataType::VarBinary(_) | DataType::Blob(_) => match literal { + Datum::Bytes(value) => Arc::new(BinaryArray::new_scalar(value.as_slice()).into_inner()), + _ => return Ok(None), + }, + DataType::Date(_) => match literal { + Datum::Date(value) => Arc::new(Date32Array::new_scalar(*value).into_inner()), + _ => return Ok(None), + }, + DataType::Time(_) => match literal { + Datum::Time(value) => Arc::new(Time32MillisecondArray::new_scalar(*value).into_inner()), + _ => return Ok(None), + }, + DataType::Timestamp(ts) => match literal { + Datum::Timestamp { millis, nanos } => { + let Some(array) = timestamp_scalar(*millis, *nanos, ts.precision(), None)? else { + return Ok(None); + }; + array + } + _ => return Ok(None), + }, + DataType::LocalZonedTimestamp(ts) => match literal { + Datum::LocalZonedTimestamp { millis, nanos } => { + let Some(array) = timestamp_scalar(*millis, *nanos, ts.precision(), Some("UTC"))? + else { + return Ok(None); + }; + array + } + _ => return Ok(None), + }, + DataType::Decimal(decimal) => match literal { + Datum::Decimal { + unscaled, + precision, + scale, + } if *precision <= decimal.precision() && *scale == decimal.scale() => { + let precision = + u8::try_from(decimal.precision()).map_err(|_| Error::Unsupported { + message: "Decimal precision exceeds Arrow decimal128 range".to_string(), + })?; + let scale = + i8::try_from(decimal.scale() as i32).map_err(|_| Error::Unsupported { + message: "Decimal scale exceeds Arrow decimal128 range".to_string(), + })?; + Arc::new( + Decimal128Array::new_scalar(*unscaled) + .into_inner() + .with_precision_and_scale(precision, scale) + .map_err(|e| Error::UnexpectedError { + message: format!( + "Failed to build decimal scalar for Vortex row filter: {e}" + ), + source: Some(Box::new(e)), + })?, + ) + } + _ => return Ok(None), + }, + DataType::Array(_) | DataType::Map(_) | DataType::Multiset(_) | DataType::Row(_) => { + return Ok(None); + } + }; + + Ok(Some(Scalar::new(array))) } -/// Convert Paimon's (millis, sub-millis nanos) pair to the Vortex TimeUnit and i64 storage value -/// for the given timestamp precision. -fn precision_to_time_unit_and_value( +fn timestamp_scalar( millis: i64, nanos: i32, precision: u32, -) -> (vortex::extension::datetime::TimeUnit, i64) { - use vortex::extension::datetime::TimeUnit; - match precision { - 0..=3 => (TimeUnit::Milliseconds, millis), - 4..=6 => ( - TimeUnit::Microseconds, - millis * 1_000 + (nanos as i64) / 1_000, - ), - _ => (TimeUnit::Nanoseconds, millis * 1_000_000 + (nanos as i64)), + timezone: Option<&'static str>, +) -> crate::Result> { + let array: ArrowArrayRef = match precision { + 0..=3 => { + let array = TimestampMillisecondArray::new_scalar(millis).into_inner(); + match timezone { + Some(tz) => Arc::new(array.with_timezone(tz)), + None => Arc::new(array), + } + } + 4..=6 => { + let value = millis * 1_000 + (nanos as i64) / 1_000; + let array = TimestampMicrosecondArray::new_scalar(value).into_inner(); + match timezone { + Some(tz) => Arc::new(array.with_timezone(tz)), + None => Arc::new(array), + } + } + 7..=9 => { + let value = millis * 1_000_000 + (nanos as i64); + let array = TimestampNanosecondArray::new_scalar(value).into_inner(); + match timezone { + Some(tz) => Arc::new(array.with_timezone(tz)), + None => Arc::new(array), + } + } + _ => return Ok(None), + }; + Ok(Some(array)) +} + +fn integer_literal(literal: &Datum) -> Option { + match literal { + Datum::TinyInt(value) => Some(i128::from(*value)), + Datum::SmallInt(value) => Some(i128::from(*value)), + Datum::Int(value) => Some(i128::from(*value)), + Datum::Long(value) => Some(i128::from(*value)), + _ => None, + } +} + +fn float32_literal(literal: &Datum) -> Option { + match literal { + Datum::Float(value) => Some(*value), + Datum::Double(value) => { + let casted = *value as f32; + ((casted as f64) == *value).then_some(casted) + } + _ => None, + } +} + +fn float64_literal(literal: &Datum) -> Option { + match literal { + Datum::Float(value) => Some(f64::from(*value)), + Datum::Double(value) => Some(*value), + _ => None, } } @@ -559,97 +877,39 @@ fn vortex_array_to_record_batch( }) } -// --------------------------------------------------------------------------- -// VortexWrite adapter -// --------------------------------------------------------------------------- - -/// Adapts paimon's `AsyncFileWrite` (tokio AsyncWrite) to Vortex's `VortexWrite`, -/// with an `AtomicU64` counter tracking bytes flushed to storage. -struct CountingPaimonWrite { - inner: Box, - bytes_written: Arc, -} - -impl VortexWrite for CountingPaimonWrite { - async fn write_all(&mut self, buffer: B) -> std::io::Result { - let len = buffer.as_slice().len() as u64; - tokio::io::AsyncWriteExt::write_all(&mut self.inner, buffer.as_slice()).await?; - self.bytes_written.fetch_add(len, Ordering::Relaxed); - Ok(buffer) - } - - async fn flush(&mut self) -> std::io::Result<()> { - tokio::io::AsyncWriteExt::flush(&mut self.inner).await - } - - async fn shutdown(&mut self) -> std::io::Result<()> { - tokio::io::AsyncWriteExt::shutdown(&mut self.inner).await - } -} - // --------------------------------------------------------------------------- // VortexFormatWriter // --------------------------------------------------------------------------- /// Vortex implementation of [`FormatFileWriter`]. /// -/// Uses a background task with a channel for streaming writes: -/// - `write()` converts each RecordBatch to a Vortex ArrayRef and sends it through a channel -/// - A background `tokio::spawn` task runs `VortexWriteOptions::write()` consuming the channel -/// - `close()` drops the sender (signaling EOF) and awaits the background task -/// -/// This avoids buffering all data in memory and provides accurate `num_bytes()`. +/// `write()` converts each RecordBatch to a Vortex ArrayRef. `close()` then +/// writes all accumulated arrays through Vortex into an in-memory buffer before +/// flushing that buffer to Paimon's output file. pub(crate) struct VortexFormatWriter { - /// Channel sender for pushing arrays to the background write task. - sender: Option>>, - /// Background write task handle. - write_task: Option>>, - /// Bytes already flushed to storage (updated by the background task). + /// Vortex dtype derived from the target Arrow schema. + dtype: DType, + /// Converted arrays pending final Vortex write. + arrays: Vec, + /// Paimon output file receiving the finalized Vortex buffer. + output: OutputFile, + /// Bytes already flushed to storage. bytes_written: Arc, - /// Keeps the Vortex runtime handle alive while the write task uses the session. - _runtime: Arc, + /// Estimated bytes staged in `arrays` before Vortex finalizes the file. + staged_bytes: usize, } impl VortexFormatWriter { pub(crate) async fn new(output: &OutputFile, schema: SchemaRef) -> crate::Result { - let (session, runtime) = new_vortex_session()?; let dtype = DType::from_arrow(schema); - - // Create channel for streaming arrays to the background writer. - let (sender, receiver) = kanal::bounded_async::>(1); - - // Wrap receiver as an ArrayStream. - use vortex::io::kanal_ext::KanalExt; - let array_stream = ArrayStreamAdapter::new(dtype, receiver.into_stream()); - let sendable_stream = ArrayStreamExt::boxed(array_stream); - - // Create the counting VortexWrite sink. - let async_writer = output.async_writer().await?; let bytes_written = Arc::new(AtomicU64::new(0)); - let sink = CountingPaimonWrite { - inner: async_writer, - bytes_written: Arc::clone(&bytes_written), - }; - - // Spawn the background write task. - let write_task = tokio::spawn(async move { - let mut sink = sink; - let result = session - .write_options() - .write(&mut sink, sendable_stream) - .await; - // Vortex only calls flush(), but opendal needs shutdown() to finalize the file. - sink.shutdown() - .await - .map_err(|e| vortex::error::vortex_err!("shutdown error: {e}"))?; - result - }); Ok(Self { - sender: Some(sender), - write_task: Some(write_task), + dtype, + arrays: Vec::new(), + output: output.clone(), bytes_written, - _runtime: runtime, + staged_bytes: 0, }) } } @@ -657,90 +917,102 @@ impl VortexFormatWriter { #[async_trait] impl FormatFileWriter for VortexFormatWriter { async fn write(&mut self, batch: &RecordBatch) -> crate::Result<()> { + let staged_bytes = batch.get_array_memory_size(); let vortex_arr = ArrayRef::from_arrow(batch.clone(), false).map_err(|e| Error::DataInvalid { message: format!("Failed to convert RecordBatch to Vortex: {e}"), source: None, })?; - let sender = self.sender.as_ref().ok_or_else(|| Error::DataInvalid { - message: "VortexFormatWriter already closed".to_string(), - source: None, - })?; - - if sender.send(Ok(vortex_arr)).await.is_err() { - // Channel closed — the background task has exited. Try to retrieve the real error. - if let Some(task) = self.write_task.take() { - match task.await { - Ok(Err(e)) => { - return Err(Error::DataInvalid { - message: format!("Vortex background write task failed: {e}"), - source: None, - }); - } - Err(e) => { - return Err(Error::DataInvalid { - message: format!("Vortex background write task panicked: {e}"), - source: None, - }); - } - Ok(Ok(_)) => {} - } - } - return Err(Error::DataInvalid { - message: "Vortex background write task exited unexpectedly".to_string(), - source: None, - }); - } + self.arrays.push(vortex_arr); + self.staged_bytes = self.staged_bytes.saturating_add(staged_bytes); Ok(()) } fn num_bytes(&self) -> usize { - self.bytes_written.load(Ordering::Relaxed) as usize + let bytes_written = self.bytes_written.load(Ordering::Relaxed) as usize; + bytes_written.max(self.staged_bytes) } fn in_progress_size(&self) -> usize { - // Vortex manages its own internal buffering in the background task; - // we have no visibility into it, so report 0. - 0 + self.staged_bytes } async fn flush(&mut self) -> crate::Result<()> { - // Vortex handles flushing internally in the background task. + // Vortex writes are finalized in close(). Ok(()) } - async fn close(mut self: Box) -> crate::Result { - // Drop the sender to signal EOF to the background stream. - drop(self.sender.take()); + async fn close(self: Box) -> crate::Result { + let this = *self; + let VortexFormatWriter { + dtype, + arrays, + output, + bytes_written, + staged_bytes: _, + } = this; - // Await the background write task. - let task = self.write_task.take().ok_or_else(|| Error::DataInvalid { - message: "VortexFormatWriter already closed".to_string(), - source: None, - })?; + let (size, buffer) = { + let _permit = acquire_vortex_io_permit().await?; + tokio::task::spawn_blocking(move || write_vortex_buffer_blocking(dtype, arrays)) + .await + .map_err(|e| Error::DataInvalid { + message: format!("Vortex write task failed: {e}"), + source: None, + })?? + }; + output.write(bytes::Bytes::from(buffer)).await?; + bytes_written.store(size, Ordering::Relaxed); - let summary = task - .await - .map_err(|e| Error::DataInvalid { - message: format!("Vortex write task panicked: {e}"), - source: None, - })? + Ok(size) + } +} + +fn write_vortex_buffer_blocking( + dtype: DType, + arrays: Vec, +) -> crate::Result<(u64, Vec)> { + run_vortex_on_thread("paimon-vortex-write", move || { + let runtime = CurrentThreadRuntime::new(); + let session = VortexSession::default().with_handle(runtime.handle()); + let mut buffer = Vec::new(); + let summary = runtime + .block_on(async { + let mut writer = session.write_options().writer(&mut buffer, dtype); + for array in arrays { + writer.push(array).await?; + } + writer.finish().await + }) .map_err(|e| Error::DataInvalid { message: format!("Failed to write Vortex file: {e}"), source: None, })?; - Ok(summary.size()) - } + Ok((summary.size(), buffer)) + }) } -impl Drop for VortexFormatWriter { - fn drop(&mut self) { - if let Some(task) = self.write_task.take() { - task.abort(); - } - } +fn run_vortex_on_thread( + name: &'static str, + f: impl FnOnce() -> crate::Result + Send + 'static, +) -> crate::Result +where + T: Send + 'static, +{ + let join = std::thread::Builder::new() + .name(name.to_string()) + .spawn(f) + .map_err(|e| Error::DataInvalid { + message: format!("Failed to spawn Vortex worker thread: {e}"), + source: None, + })?; + + join.join().map_err(|_| Error::DataInvalid { + message: "Vortex worker thread panicked".to_string(), + source: None, + })? } #[cfg(test)] @@ -751,6 +1023,9 @@ mod tests { use crate::spec::{DataField, DataType, VarCharType}; use arrow_array::{Int32Array, StringArray}; use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; + use bytes::Bytes; + use futures::StreamExt; + use std::ops::Range; fn test_arrow_schema() -> Arc { Arc::new(ArrowSchema::new(vec![ @@ -770,6 +1045,110 @@ mod tests { .unwrap() } + struct WholeFileOnlyRead { + bytes: Bytes, + } + + #[async_trait] + impl FileRead for WholeFileOnlyRead { + async fn read(&self, range: Range) -> crate::Result { + let file_size = self.bytes.len() as u64; + if range != (0..file_size) { + return Err(Error::DataInvalid { + message: format!( + "expected a whole-file read, got {}..{}", + range.start, range.end + ), + source: None, + }); + } + Ok(self.bytes.clone()) + } + } + + #[test] + fn test_vortex_writer_outlives_calling_tokio_runtime() { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let path = "memory:/test_vortex_writer_runtime.vortex"; + let output = file_io.new_output(path).unwrap(); + let schema = test_arrow_schema(); + + let caller_runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let writer = caller_runtime.block_on(async { + let mut writer = VortexFormatWriter::new(&output, schema.clone()) + .await + .unwrap(); + let batch = test_batch(&schema, vec![1, 2, 3], vec![10, 20, 30]); + writer.write(&batch).await.unwrap(); + writer + }); + drop(caller_runtime); + + let verifier_runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let bytes = verifier_runtime + .block_on(async { Box::new(writer).close().await }) + .unwrap(); + assert!(bytes > 0); + } + + #[test] + fn test_vortex_reader_stream_outlives_calling_tokio_runtime() { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let path = "memory:/test_vortex_reader_runtime.vortex"; + let output = file_io.new_output(path).unwrap(); + let schema = test_arrow_schema(); + + let caller_runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let stream = caller_runtime.block_on(async { + let mut writer = VortexFormatWriter::new(&output, schema.clone()) + .await + .unwrap(); + let batch = test_batch(&schema, vec![1, 2, 3], vec![10, 20, 30]); + writer.write(&batch).await.unwrap(); + Box::new(writer).close().await.unwrap(); + + let input = file_io.new_input(path).unwrap(); + let file_reader = input.reader().await.unwrap(); + let metadata = input.metadata().await.unwrap(); + let reader = VortexFormatReader; + reader + .read_batch_stream( + Box::new(file_reader), + metadata.size, + &test_file_fields(), + None, + None, + None, + ) + .await + .unwrap() + }); + drop(caller_runtime); + + let verifier_runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let rows = verifier_runtime.block_on(async { + let mut stream = stream; + let mut rows = 0; + while let Some(result) = stream.next().await { + rows += result.unwrap().num_rows(); + } + rows + }); + assert_eq!(rows, 3); + } + #[tokio::test] async fn test_vortex_writer_write_and_read() { let file_io = FileIOBuilder::new("memory").build().unwrap(); @@ -1077,182 +1456,86 @@ mod tests { assert_eq!(total_rows, 5); } - // ----------------------------------------------------------------------- - // Predicate conversion unit tests - // ----------------------------------------------------------------------- - - use crate::spec::{DataType as PaimonDataType, IntType, PredicateBuilder}; - - fn test_file_fields() -> Vec { - vec![ - DataField::new(0, "id".to_string(), PaimonDataType::Int(IntType::new())), - DataField::new(1, "value".to_string(), PaimonDataType::Int(IntType::new())), - ] - } - - #[test] - fn test_predicate_eq_converts() { - let fields = test_file_fields(); - let builder = PredicateBuilder::new(&fields); - let pred = builder.equal("id", Datum::Int(3)).unwrap(); - let expr = predicates_to_vortex_expr(&[pred], &fields); - assert!(expr.is_some()); - } - - #[test] - fn test_predicate_not_eq_converts() { - let fields = test_file_fields(); - let builder = PredicateBuilder::new(&fields); - let pred = builder.not_equal("value", Datum::Int(10)).unwrap(); - let expr = predicates_to_vortex_expr(&[pred], &fields); - assert!(expr.is_some()); - } - - #[test] - fn test_predicate_lt_gt_converts() { - let fields = test_file_fields(); - let builder = PredicateBuilder::new(&fields); - let lt_pred = builder.less_than("id", Datum::Int(5)).unwrap(); - let gt_pred = builder.greater_than("value", Datum::Int(20)).unwrap(); - let expr = predicates_to_vortex_expr(&[lt_pred, gt_pred], &fields); - assert!(expr.is_some(), "AND of Lt and Gt should convert"); - } - - #[test] - fn test_predicate_is_null_converts() { - let fields = test_file_fields(); - let builder = PredicateBuilder::new(&fields); - let pred = builder.is_null("id").unwrap(); - let expr = predicates_to_vortex_expr(&[pred], &fields); - assert!(expr.is_some()); - } - - #[test] - fn test_predicate_is_not_null_converts() { - let fields = test_file_fields(); - let builder = PredicateBuilder::new(&fields); - let pred = builder.is_not_null("value").unwrap(); - let expr = predicates_to_vortex_expr(&[pred], &fields); - assert!(expr.is_some()); - } - - #[test] - fn test_predicate_in_converts() { - let fields = test_file_fields(); - let builder = PredicateBuilder::new(&fields); - let pred = builder - .is_in("id", vec![Datum::Int(1), Datum::Int(3)]) - .unwrap(); - let expr = predicates_to_vortex_expr(&[pred], &fields); - assert!(expr.is_some()); - } - - #[test] - fn test_predicate_not_in_converts() { - let fields = test_file_fields(); - let builder = PredicateBuilder::new(&fields); - let pred = builder - .is_not_in("id", vec![Datum::Int(2), Datum::Int(4)]) - .unwrap(); - let expr = predicates_to_vortex_expr(&[pred], &fields); - assert!(expr.is_some()); - } + #[tokio::test] + async fn test_vortex_reader_opens_from_whole_file_buffer() { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let path = "memory:/test_vortex_whole_file_buffer.vortex"; + let output = file_io.new_output(path).unwrap(); + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new("name", ArrowDataType::Utf8, false), + ])); + let ids: Vec = (0..10_000).collect(); + let names: Vec = ids + .iter() + .map(|id| format!("row-{id:05}-abcdefghijklmnopqrstuvwxyz0123456789")) + .collect(); - #[test] - fn test_predicate_in_with_unsupported_literal_skips_entirely() { - let fields = test_file_fields(); - // Manually build an In predicate with a Decimal literal whose precision - // exceeds u8 range, making it unconvertible to Vortex. - let pred = Predicate::Leaf { - column: "id".to_string(), - index: 0, - data_type: PaimonDataType::Int(IntType::new()), - op: PredicateOperator::In, - literals: vec![ - Datum::Int(1), - Datum::Decimal { - unscaled: 100, - precision: 256, - scale: 2, - }, + let mut writer: Box = Box::new( + VortexFormatWriter::new(&output, schema.clone()) + .await + .unwrap(), + ); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(ids.clone())), + Arc::new(StringArray::from(names)), ], - }; - // The entire In should be skipped (None) because one literal can't convert. - let expr = predicate_to_vortex_expr(&pred, &fields); - assert!(expr.is_none()); - } + ) + .unwrap(); + writer.write(&batch).await.unwrap(); + writer.close().await.unwrap(); - #[test] - fn test_predicate_or_with_unsupported_branch_skips_entirely() { - let fields = test_file_fields(); - let builder = PredicateBuilder::new(&fields); - let convertible = builder.equal("id", Datum::Int(1)).unwrap(); - // Build an unconvertible leaf (Decimal with precision > u8::MAX). - let unconvertible = Predicate::Leaf { - column: "id".to_string(), - index: 0, - data_type: PaimonDataType::Int(IntType::new()), - op: PredicateOperator::Eq, - literals: vec![Datum::Decimal { - unscaled: 100, - precision: 256, - scale: 2, - }], - }; - let or_pred = Predicate::Or(vec![convertible, unconvertible]); - // The entire OR should be skipped because one branch can't convert. - let expr = predicate_to_vortex_expr(&or_pred, &fields); - assert!(expr.is_none()); - } + let input = file_io.new_input(path).unwrap(); + let file_bytes = input.read().await.unwrap(); + let metadata = input.metadata().await.unwrap(); + assert!(metadata.size > 65_535); + let reader = VortexFormatReader; - #[test] - fn test_predicate_and_with_unsupported_branch_keeps_convertible() { - let fields = test_file_fields(); - let builder = PredicateBuilder::new(&fields); - let convertible = builder.equal("id", Datum::Int(1)).unwrap(); - let unconvertible = Predicate::Leaf { - column: "id".to_string(), - index: 0, - data_type: PaimonDataType::Int(IntType::new()), - op: PredicateOperator::Eq, - literals: vec![Datum::Decimal { - unscaled: 100, - precision: 256, - scale: 2, - }], - }; - let and_pred = Predicate::And(vec![convertible, unconvertible]); - // AND should still produce an expression from the convertible branch. - let expr = predicate_to_vortex_expr(&and_pred, &fields); - assert!(expr.is_some()); - } + let mut stream = reader + .read_batch_stream( + Box::new(WholeFileOnlyRead { bytes: file_bytes }), + metadata.size, + &[ + DataField::new(0, "id".to_string(), DataType::Int(IntType::new())), + DataField::new( + 1, + "name".to_string(), + DataType::VarChar(VarCharType::string_type()), + ), + ], + None, + None, + None, + ) + .await + .unwrap(); - #[test] - fn test_predicate_always_true_false() { - let fields = test_file_fields(); - assert!(predicate_to_vortex_expr(&Predicate::AlwaysTrue, &fields).is_some()); - assert!(predicate_to_vortex_expr(&Predicate::AlwaysFalse, &fields).is_some()); + let mut all_ids = Vec::new(); + while let Some(result) = stream.next().await { + let batch = result.unwrap(); + let id_col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + all_ids.extend(id_col.values().iter().copied()); + } + assert_eq!(all_ids, ids); } - #[test] - fn test_predicate_not_converts() { - let fields = test_file_fields(); - let builder = PredicateBuilder::new(&fields); - let inner = builder.equal("id", Datum::Int(3)).unwrap(); - let pred = Predicate::Not(Box::new(inner)); - let expr = predicate_to_vortex_expr(&pred, &fields); - assert!(expr.is_some()); - } + use crate::spec::{DataType as PaimonDataType, IntType, PredicateBuilder}; - #[test] - fn test_empty_predicates_returns_none() { - let fields = test_file_fields(); - let expr = predicates_to_vortex_expr(&[], &fields); - assert!(expr.is_none()); + fn test_file_fields() -> Vec { + vec![ + DataField::new(0, "id".to_string(), PaimonDataType::Int(IntType::new())), + DataField::new(1, "value".to_string(), PaimonDataType::Int(IntType::new())), + ] } // ----------------------------------------------------------------------- - // Integration tests: predicate pushdown through VortexFormatReader + // Integration tests: Arrow-side predicate filtering through VortexFormatReader // ----------------------------------------------------------------------- /// Helper: write test data and read back with given predicates, return collected id values. @@ -1389,121 +1672,171 @@ mod tests { assert!(ids.is_empty()); } - // ----------------------------------------------------------------------- - // Timestamp precision tests - // ----------------------------------------------------------------------- - - use crate::spec::{LocalZonedTimestampType, TimestampType}; + #[tokio::test] + async fn test_vortex_read_filter_column_not_projected() { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let path = "memory:/test_vortex_pred_unprojected.vortex"; + let output = file_io.new_output(path).unwrap(); + let schema = test_arrow_schema(); - #[test] - fn test_precision_to_time_unit_and_value_millis() { - use vortex::extension::datetime::TimeUnit; - // precision 0-3 → millis, nanos ignored - assert_eq!( - precision_to_time_unit_and_value(1000, 500_000, 0), - (TimeUnit::Milliseconds, 1000) - ); - assert_eq!( - precision_to_time_unit_and_value(1000, 500_000, 3), - (TimeUnit::Milliseconds, 1000) + let mut writer: Box = Box::new( + VortexFormatWriter::new(&output, schema.clone()) + .await + .unwrap(), ); - } + writer + .write(&test_batch( + &schema, + vec![1, 2, 3, 4, 5], + vec![10, 20, 30, 40, 50], + )) + .await + .unwrap(); + writer.close().await.unwrap(); - #[test] - fn test_precision_to_time_unit_and_value_micros() { - use vortex::extension::datetime::TimeUnit; - // precision 4-6 → micros = millis * 1000 + nanos / 1000 - // 1000ms, 500_000ns (= 500µs) → 1_000_500µs - assert_eq!( - precision_to_time_unit_and_value(1000, 500_000, 6), - (TimeUnit::Microseconds, 1_000_500) - ); - assert_eq!( - precision_to_time_unit_and_value(1000, 0, 4), - (TimeUnit::Microseconds, 1_000_000) - ); - } + let fields = test_file_fields(); + let builder = PredicateBuilder::new(&fields); + let pred = builder.greater_than("value", Datum::Int(30)).unwrap(); + let fp = FilePredicates { + predicates: vec![pred], + file_fields: fields.clone(), + }; + let read_fields = vec![fields[0].clone()]; - #[test] - fn test_precision_to_time_unit_and_value_nanos() { - use vortex::extension::datetime::TimeUnit; - // precision 7-9 → nanos = millis * 1_000_000 + nanos - // 1000ms, 500_000ns → 1_000_500_000ns - assert_eq!( - precision_to_time_unit_and_value(1000, 500_000, 9), - (TimeUnit::Nanoseconds, 1_000_500_000) - ); - assert_eq!( - precision_to_time_unit_and_value(1000, 0, 7), - (TimeUnit::Nanoseconds, 1_000_000_000) - ); - } + let input = file_io.new_input(path).unwrap(); + let file_reader = input.reader().await.unwrap(); + let metadata = input.metadata().await.unwrap(); + let reader = VortexFormatReader; + let mut stream = reader + .read_batch_stream( + Box::new(file_reader), + metadata.size, + &read_fields, + Some(&fp), + None, + None, + ) + .await + .unwrap(); - #[test] - fn test_datum_to_vortex_lit_timestamp_precision() { - let ts_field_millis = DataField::new( - 0, - "ts".to_string(), - PaimonDataType::Timestamp(TimestampType::new(3).unwrap()), - ); - let ts_field_micros = DataField::new( - 0, - "ts".to_string(), - PaimonDataType::Timestamp(TimestampType::new(6).unwrap()), - ); - let ts_field_nanos = DataField::new( - 0, - "ts".to_string(), - PaimonDataType::Timestamp(TimestampType::new(9).unwrap()), - ); + let mut all_ids = Vec::new(); + while let Some(result) = stream.next().await { + let batch = result.unwrap(); + assert_eq!(batch.num_columns(), 1); + let id_col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + all_ids.extend(id_col.values().iter().copied()); + } + assert_eq!(all_ids, vec![4, 5]); + } - let datum = Datum::Timestamp { - millis: 1000, - nanos: 500_000, + #[tokio::test] + async fn test_vortex_empty_projection_with_predicate_returns_filtered_count() { + let fields = test_file_fields(); + let builder = PredicateBuilder::new(&fields); + let pred = builder.greater_than("id", Datum::Int(3)).unwrap(); + let fp = FilePredicates { + predicates: vec![pred], + file_fields: fields, }; - // All should produce Some - assert!(datum_to_vortex_lit(&datum, &ts_field_millis).is_some()); - assert!(datum_to_vortex_lit(&datum, &ts_field_micros).is_some()); - assert!(datum_to_vortex_lit(&datum, &ts_field_nanos).is_some()); + let count = write_and_read_empty_projection_with_predicates( + "memory:/test_vortex_empty_proj_pred.vortex", + Some(fp), + ) + .await; + assert_eq!(count, 2); } - #[test] - fn test_datum_to_vortex_lit_local_zoned_timestamp_precision() { - let field_millis = DataField::new( - 0, - "ts".to_string(), - PaimonDataType::LocalZonedTimestamp(LocalZonedTimestampType::new(3).unwrap()), - ); - let field_micros = DataField::new( - 0, - "ts".to_string(), - PaimonDataType::LocalZonedTimestamp(LocalZonedTimestampType::new(6).unwrap()), + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_vortex_predicate_reads_do_not_block_each_other() { + let fields = test_file_fields(); + let builder = PredicateBuilder::new(&fields); + let eq = FilePredicates { + predicates: vec![builder.equal("id", Datum::Int(3)).unwrap()], + file_fields: fields.clone(), + }; + let gt = FilePredicates { + predicates: vec![builder.greater_than("id", Datum::Int(3)).unwrap()], + file_fields: fields.clone(), + }; + let combined = FilePredicates { + predicates: vec![ + builder.greater_or_equal("id", Datum::Int(2)).unwrap(), + builder.less_than("value", Datum::Int(50)).unwrap(), + ], + file_fields: fields, + }; + + let (empty, eq, gt, combined) = tokio::join!( + write_and_read_empty_projection("memory:/test_vortex_concurrent_empty.vortex"), + write_and_read_with_predicates("memory:/test_vortex_concurrent_eq.vortex", Some(eq)), + write_and_read_with_predicates("memory:/test_vortex_concurrent_gt.vortex", Some(gt)), + write_and_read_with_predicates( + "memory:/test_vortex_concurrent_combined.vortex", + Some(combined) + ), ); - let datum = Datum::LocalZonedTimestamp { - millis: 2000, - nanos: 123_456, - }; + assert_eq!(empty, 5); + assert_eq!(eq, vec![3]); + assert_eq!(gt, vec![4, 5]); + assert_eq!(combined, vec![2, 3, 4]); + } - assert!(datum_to_vortex_lit(&datum, &field_millis).is_some()); - assert!(datum_to_vortex_lit(&datum, &field_micros).is_some()); + async fn write_and_read_empty_projection(path: &str) -> usize { + write_and_read_empty_projection_with_predicates(path, None).await } - #[test] - fn test_datum_to_vortex_lit_timestamp_wrong_field_type_returns_none() { - // Timestamp datum with an Int field should return None - let int_field = DataField::new(0, "id".to_string(), PaimonDataType::Int(IntType::new())); - let datum = Datum::Timestamp { - millis: 1000, - nanos: 0, - }; - assert!(datum_to_vortex_lit(&datum, &int_field).is_none()); + async fn write_and_read_empty_projection_with_predicates( + path: &str, + predicates: Option, + ) -> usize { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let output = file_io.new_output(path).unwrap(); + let schema = test_arrow_schema(); - let datum_lz = Datum::LocalZonedTimestamp { - millis: 1000, - nanos: 0, - }; - assert!(datum_to_vortex_lit(&datum_lz, &int_field).is_none()); + let mut writer: Box = Box::new( + VortexFormatWriter::new(&output, schema.clone()) + .await + .unwrap(), + ); + writer + .write(&test_batch( + &schema, + vec![1, 2, 3, 4, 5], + vec![10, 20, 30, 40, 50], + )) + .await + .unwrap(); + writer.close().await.unwrap(); + + let input = file_io.new_input(path).unwrap(); + let file_reader = input.reader().await.unwrap(); + let metadata = input.metadata().await.unwrap(); + + let reader = VortexFormatReader; + let mut stream = reader + .read_batch_stream( + Box::new(file_reader), + metadata.size, + &[], + predicates.as_ref(), + None, + None, + ) + .await + .unwrap(); + + let mut total_rows = 0; + while let Some(result) = stream.next().await { + let batch = result.unwrap(); + assert_eq!(batch.num_columns(), 0); + total_rows += batch.num_rows(); + } + total_rows } } diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index d8add25f..b752904f 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -1539,6 +1539,48 @@ mod tests { assert_eq!(total_rows, 4); } + #[cfg(feature = "vortex")] + #[tokio::test] + async fn test_vortex_write_rolling_on_target_file_size() { + let file_io = test_file_io(); + let table_path = "memory:/test_vortex_table_write_rolling"; + setup_dirs(&file_io, table_path).await; + + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .option("target-file-size", "1b") + .option("file.format", "vortex") + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + let table = Table::new( + file_io.clone(), + Identifier::new("default", "test_table"), + table_path.to_string(), + table_schema, + None, + ); + + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); + + table_write + .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20])) + .await + .unwrap(); + table_write + .write_arrow_batch(&make_batch(vec![3, 4], vec![30, 40])) + .await + .unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].new_files.len(), 2); + + let total_rows: i64 = messages[0].new_files.iter().map(|f| f.row_count).sum(); + assert_eq!(total_rows, 4); + } + // ----------------------------------------------------------------------- // Primary-key table write tests // -----------------------------------------------------------------------