Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/integrations/datafusion/src/full_text_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ impl TableProvider for FullTextSearchTableProvider {
table,
schema: &self.schema(),
plan: &plan,
scan_trace: None,
projection,
pushed_predicate: None,
limit,
Expand Down
13 changes: 12 additions & 1 deletion crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
use futures::{StreamExt, TryStreamExt};
use paimon::spec::Predicate;
use paimon::table::Table;
use paimon::table::{ScanTrace, Table};
use paimon::DataSplit;

use crate::error::to_datafusion_error;
Expand Down Expand Up @@ -57,9 +57,12 @@ pub struct PaimonTableScan {
/// Whether the pushed predicate is exact (no residual filtering needed).
/// When true and all splits have known merged_row_count, statistics can be exact.
filter_exact: bool,
/// Metadata-pruning trace captured during eager scan planning.
scan_trace: Option<ScanTrace>,
}

impl PaimonTableScan {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
schema: ArrowSchemaRef,
table: Table,
Expand All @@ -68,6 +71,7 @@ impl PaimonTableScan {
planned_partitions: Vec<Arc<[DataSplit]>>,
limit: Option<usize>,
filter_exact: bool,
scan_trace: Option<ScanTrace>,
) -> Self {
let plan_properties = Arc::new(PlanProperties::new(
EquivalenceProperties::new(schema.clone()),
Expand All @@ -83,6 +87,7 @@ impl PaimonTableScan {
plan_properties,
limit,
filter_exact,
scan_trace,
}
}

Expand Down Expand Up @@ -245,6 +250,9 @@ impl DisplayAs for PaimonTableScan {
if let Some(limit) = self.limit {
write!(f, ", limit={limit}")?;
}
if let Some(ref trace) = self.scan_trace {
write!(f, ", trace={trace}")?;
}
Ok(())
}
}
Expand Down Expand Up @@ -290,6 +298,7 @@ mod tests {
vec![Arc::from(Vec::new())],
None,
false,
None,
);
assert_eq!(scan.properties().output_partitioning().partition_count(), 1);
}
Expand All @@ -310,6 +319,7 @@ mod tests {
planned_partitions,
None,
false,
None,
);
assert_eq!(scan.properties().output_partitioning().partition_count(), 3);
}
Expand Down Expand Up @@ -387,6 +397,7 @@ mod tests {
vec![Arc::from(vec![split])],
None,
false,
None,
);

let ctx = SessionContext::new();
Expand Down
5 changes: 4 additions & 1 deletion crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ pub(crate) struct PaimonScanBuilder<'a> {
pub(crate) table: &'a Table,
pub(crate) schema: &'a ArrowSchemaRef,
pub(crate) plan: &'a paimon::table::Plan,
pub(crate) scan_trace: Option<paimon::table::ScanTrace>,
pub(crate) projection: Option<&'a Vec<usize>>,
pub(crate) pushed_predicate: Option<paimon::spec::Predicate>,
pub(crate) limit: Option<usize>,
Expand Down Expand Up @@ -149,6 +150,7 @@ impl PaimonScanBuilder<'_> {
planned_partitions,
self.limit,
self.filter_exact,
self.scan_trace,
)))
}
}
Expand Down Expand Up @@ -189,7 +191,7 @@ impl TableProvider for PaimonTableProvider {
// Tokio runtime. `scan.plan()` can reach OpenDAL/Tokio filesystem calls while
// reading Paimon metadata, so we must provide a runtime here instead of
// assuming the caller already entered one.
let plan = await_with_runtime(scan.plan())
let (plan, scan_trace) = await_with_runtime(scan.plan_with_trace())
.await
.map_err(to_datafusion_error)?;

Expand All @@ -203,6 +205,7 @@ impl TableProvider for PaimonTableProvider {
table: &self.table,
schema: &self.schema,
plan: &plan,
scan_trace: Some(scan_trace),
projection,
pushed_predicate: filter_analysis.pushed_predicate,
limit: pushed_limit,
Expand Down
1 change: 1 addition & 0 deletions crates/integrations/datafusion/src/vector_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ impl TableProvider for VectorSearchTableProvider {
table,
schema: &self.schema(),
plan: &plan,
scan_trace: None,
projection,
pushed_predicate: None,
limit,
Expand Down
2 changes: 1 addition & 1 deletion crates/integrations/datafusion/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ async fn test_residual_filter_limit_keeps_connector_limit_and_correctness() {
assert!(
scan_lines
.iter()
.all(|line| !line.contains("limit=")),
.all(|line| !line.contains(", limit=")),
"Residual filter queries should not push a scan limit hint when residual filters stay above the scan, plan:\n{plan_text}"
);

Expand Down
Loading
Loading