From 3ce07d561c12b46e59f3fe50db9d2fd107b31ef0 Mon Sep 17 00:00:00 2001 From: Charles-Antoine Leger Date: Thu, 11 Jun 2026 10:50:14 +0200 Subject: [PATCH 1/4] feat(datafusion): reconstruct IcebergTableScan from a predicate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a `new_with_tasks_from_predicate` constructor plus `table_schema()` and `projection_indices()` getters so a downstream consumer can rebuild the node without the original DataFusion `Expr`s — e.g. a distributed-plan codec that ships a serialized `Predicate` across workers. The node now keeps the pre-projection schema and the numeric projection indices verbatim: the provider caches its schema while reloading the table separately, so re-deriving them would be incorrect. All struct construction still funnels through `new_inner`, now parameterized by `Option`; `new`/`new_with_tasks` keep their `&[Expr]` signatures and pre-convert. The catalog-backed provider passes its already-computed predicate straight to the constructor, dropping the duplicate `filters -> Predicate` conversion. --- .../datafusion/src/physical_plan/scan.rs | 194 +++++++++++++++++- .../integrations/datafusion/src/table/mod.rs | 8 +- 2 files changed, 190 insertions(+), 12 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 3926de8acd..e134946b41 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -39,9 +39,12 @@ use crate::to_datafusion_error; /// Iceberg [`Table`] scan as a DataFusion [`ExecutionPlan`]. /// -/// Has two construction modes: [`IcebergTableScan::new`] for a lazy -/// single-partition scan, and [`IcebergTableScan::new_with_tasks`] for an -/// eager multi-partition scan over pre-planned [`FileScanTask`] buckets. +/// Has three construction modes: [`new`][Self::new] (lazy, single-partition), +/// [`new_with_tasks`][Self::new_with_tasks] (eager, over pre-planned +/// [`FileScanTask`] buckets), and +/// [`new_with_tasks_from_predicate`][Self::new_with_tasks_from_predicate] +/// (eager, from a [`Predicate`]) — the last lets a node be rebuilt from its +/// getters, e.g. by a distributed-plan codec. /// /// Note: in eager mode the underlying `TableScan` is rebuilt on every /// `execute(partition)` call. The per-build cost is bounded (no I/O) and @@ -57,6 +60,12 @@ pub struct IcebergTableScan { plan_properties: Arc, /// Projection column names, None means all columns. projection: Option>, + /// Full schema before projection. Kept verbatim, not re-derived: the + /// provider caches it while reloading the table, so it can diverge from the + /// table's current metadata. + table_schema: ArrowSchemaRef, + /// Projection indices as received by `scan`; `projection` keeps only names. + projection_indices: Option>, /// Filters to apply to the table scan. predicates: Option, /// Pre-planned file scan tasks per partition (eager mode), or `None` (lazy mode). @@ -84,7 +93,7 @@ impl IcebergTableScan { snapshot_id, schema, projection, - filters, + convert_filters_to_predicate(filters), limit, Partitioning::UnknownPartitioning(1), None, @@ -111,7 +120,34 @@ impl IcebergTableScan { snapshot_id, schema, projection, - filters, + convert_filters_to_predicate(filters), + limit, + partitioning, + Some(buckets), + ) + } + + /// Eager variant taking a [`Predicate`] instead of [`Expr`] filters, so a + /// node can be rebuilt from its getters. The predicate is unbound; the scan + /// builder binds it at `execute` time. + // Arity mirrors `new_with_tasks`; an args struct is deferred. + #[allow(clippy::too_many_arguments)] + pub fn new_with_tasks_from_predicate( + table: Table, + snapshot_id: Option, + schema: ArrowSchemaRef, + projection: Option<&Vec>, + predicate: Option, + limit: Option, + buckets: Vec>, + partitioning: Partitioning, + ) -> Self { + Self::new_inner( + table, + snapshot_id, + schema, + projection, + predicate, limit, partitioning, Some(buckets), @@ -124,7 +160,7 @@ impl IcebergTableScan { snapshot_id: Option, schema: ArrowSchemaRef, projection: Option<&Vec>, - filters: &[Expr], + predicate: Option, limit: Option, partitioning: Partitioning, buckets: Option>>, @@ -139,15 +175,18 @@ impl IcebergTableScan { EmissionType::Incremental, Boundedness::Bounded, )); + let table_schema = Arc::clone(&schema); + let projection_indices = projection.cloned(); let projection = get_column_names(schema, projection); - let predicates = convert_filters_to_predicate(filters); Self { table, snapshot_id, plan_properties, projection, - predicates, + table_schema, + projection_indices, + predicates: predicate, buckets, partition_keys_kind: None, limit, @@ -166,6 +205,10 @@ impl IcebergTableScan { &self.table } + pub fn table_schema(&self) -> &ArrowSchemaRef { + &self.table_schema + } + pub fn snapshot_id(&self) -> Option { self.snapshot_id } @@ -174,6 +217,10 @@ impl IcebergTableScan { self.projection.as_deref() } + pub fn projection_indices(&self) -> Option<&[usize]> { + self.projection_indices.as_deref() + } + pub fn predicates(&self) -> Option<&Predicate> { self.predicates.as_ref() } @@ -383,3 +430,134 @@ pub(super) fn get_column_names( .collect::>() }) } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use datafusion::physical_plan::ExecutionPlan; + use datafusion::prelude::{Expr, col, lit}; + use iceberg::TableIdent; + use iceberg::expr::Reference; + use iceberg::io::FileIO; + use iceberg::spec::Datum; + use iceberg::table::{StaticTable, Table}; + + use super::*; + + async fn test_table() -> Table { + let metadata_path = format!( + "{}/tests/test_data/TableMetadataV2Valid.json", + env!("CARGO_MANIFEST_DIR") + ); + let ident = TableIdent::from_strs(["ns", "scan_table"]).unwrap(); + StaticTable::from_metadata_file(&metadata_path, ident, FileIO::new_with_fs()) + .await + .unwrap() + .into_table() + } + + fn arrow_schema() -> ArrowSchemaRef { + Arc::new(ArrowSchema::new(vec![ + Field::new("x", DataType::Int64, false), + Field::new("y", DataType::Int64, false), + Field::new("z", DataType::Int64, false), + ])) + } + + fn filters() -> Vec { + vec![col("x").gt(lit(5i64))] + } + + // The Expr and predicate constructors must agree: `new_with_tasks` only + // pre-converts the filters that `new_with_tasks_from_predicate` takes raw. + #[tokio::test] + async fn expr_and_predicate_constructors_agree() { + let table = test_table().await; + let projection = vec![0usize, 2]; + let from_filters = IcebergTableScan::new_with_tasks( + table.clone(), + None, + arrow_schema(), + Some(&projection), + &filters(), + Some(100), + vec![vec![], vec![]], + Partitioning::UnknownPartitioning(2), + ); + let from_predicate = IcebergTableScan::new_with_tasks_from_predicate( + table, + None, + arrow_schema(), + Some(&projection), + convert_filters_to_predicate(&filters()), + Some(100), + vec![vec![], vec![]], + Partitioning::UnknownPartitioning(2), + ); + + assert_eq!(from_filters.predicates(), from_predicate.predicates()); + assert_eq!(from_filters.projection(), from_predicate.projection()); + assert_eq!( + from_filters.schema().fields(), + from_predicate.schema().fields() + ); + assert_eq!( + format!("{:?}", from_filters.properties().output_partitioning()), + format!("{:?}", from_predicate.properties().output_partitioning()), + ); + } + + // table_schema() exposes the full pre-projection schema; schema() is projected. + #[tokio::test] + async fn getters_expose_full_schema_and_indices() { + let projection = vec![0usize, 2]; + let scan = IcebergTableScan::new( + test_table().await, + None, + arrow_schema(), + Some(&projection), + &[], + None, + ); + + assert_eq!(scan.table_schema().fields(), arrow_schema().fields()); + assert_eq!(scan.table_schema().fields().len(), 3); + assert_eq!(scan.schema().fields().len(), 2); + assert_ne!(scan.schema().fields(), scan.table_schema().fields()); + + assert_eq!(scan.projection_indices(), Some(projection.as_slice())); + let names = ["x".to_string(), "z".to_string()]; + assert_eq!(scan.projection(), Some(names.as_slice())); + } + + // Without a projection, indices/names are None and the full schema is kept. + #[tokio::test] + async fn no_projection_keeps_full_schema() { + let scan = IcebergTableScan::new(test_table().await, None, arrow_schema(), None, &[], None); + + assert_eq!(scan.projection_indices(), None); + assert_eq!(scan.projection(), None); + assert_eq!(scan.schema().fields(), scan.table_schema().fields()); + assert_eq!(scan.table_schema().fields(), arrow_schema().fields()); + } + + // A predicate passed to the new constructor round-trips unchanged. + #[tokio::test] + async fn predicate_round_trips() { + let predicate = Reference::new("x").greater_than(Datum::long(5)); + let scan = IcebergTableScan::new_with_tasks_from_predicate( + test_table().await, + None, + arrow_schema(), + None, + Some(predicate.clone()), + None, + vec![vec![]], + Partitioning::UnknownPartitioning(1), + ); + + assert_eq!(scan.predicates(), Some(&predicate)); + } +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 56faab8075..2e1c664eb5 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -208,8 +208,8 @@ impl TableProvider for IcebergTableProvider { Some(names) => builder.select(names), None => builder.select_all(), }; - if let Some(pred) = predicate { - builder = builder.with_filter(pred); + if let Some(pred) = &predicate { + builder = builder.with_filter(pred.clone()); } let tasks: Vec = builder @@ -258,12 +258,12 @@ impl TableProvider for IcebergTableProvider { }; Ok(Arc::new( - IcebergTableScan::new_with_tasks( + IcebergTableScan::new_with_tasks_from_predicate( table, None, // Always use current snapshot for catalog-backed provider self.schema.clone(), projection, - filters, + predicate, limit, buckets, partitioning, From 12cc86ff7d2b091fffc81bb77377c6550672c0c4 Mon Sep 17 00:00:00 2001 From: Charles-Antoine Leger Date: Thu, 11 Jun 2026 11:02:58 +0200 Subject: [PATCH 2/4] test(datafusion): trim scan constructor coverage --- .../datafusion/src/physical_plan/scan.rs | 108 ++++++------------ 1 file changed, 33 insertions(+), 75 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index e134946b41..386c9497c5 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -437,7 +437,6 @@ mod tests { use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datafusion::physical_plan::ExecutionPlan; - use datafusion::prelude::{Expr, col, lit}; use iceberg::TableIdent; use iceberg::expr::Reference; use iceberg::io::FileIO; @@ -446,19 +445,21 @@ mod tests { use super::*; - async fn test_table() -> Table { - let metadata_path = format!( - "{}/tests/test_data/TableMetadataV2Valid.json", - env!("CARGO_MANIFEST_DIR") + async fn get_test_table_from_metadata_file() -> Table { + let metadata_file_name = "TableMetadataV2Valid.json"; + let metadata_file_path = format!( + "{}/tests/test_data/{}", + env!("CARGO_MANIFEST_DIR"), + metadata_file_name ); let ident = TableIdent::from_strs(["ns", "scan_table"]).unwrap(); - StaticTable::from_metadata_file(&metadata_path, ident, FileIO::new_with_fs()) + StaticTable::from_metadata_file(&metadata_file_path, ident, FileIO::new_with_fs()) .await .unwrap() .into_table() } - fn arrow_schema() -> ArrowSchemaRef { + fn create_test_arrow_schema() -> ArrowSchemaRef { Arc::new(ArrowSchema::new(vec![ Field::new("x", DataType::Int64, false), Field::new("y", DataType::Int64, false), @@ -466,98 +467,55 @@ mod tests { ])) } - fn filters() -> Vec { - vec![col("x").gt(lit(5i64))] - } - - // The Expr and predicate constructors must agree: `new_with_tasks` only - // pre-converts the filters that `new_with_tasks_from_predicate` takes raw. #[tokio::test] - async fn expr_and_predicate_constructors_agree() { - let table = test_table().await; + async fn test_predicate_constructor_exposes_rebuild_inputs() { + let schema = create_test_arrow_schema(); let projection = vec![0usize, 2]; - let from_filters = IcebergTableScan::new_with_tasks( - table.clone(), - None, - arrow_schema(), - Some(&projection), - &filters(), - Some(100), - vec![vec![], vec![]], - Partitioning::UnknownPartitioning(2), - ); - let from_predicate = IcebergTableScan::new_with_tasks_from_predicate( - table, + let predicate = Reference::new("x").greater_than(Datum::long(5)); + let scan = IcebergTableScan::new_with_tasks_from_predicate( + get_test_table_from_metadata_file().await, None, - arrow_schema(), + schema.clone(), Some(&projection), - convert_filters_to_predicate(&filters()), + Some(predicate.clone()), Some(100), vec![vec![], vec![]], Partitioning::UnknownPartitioning(2), ); - assert_eq!(from_filters.predicates(), from_predicate.predicates()); - assert_eq!(from_filters.projection(), from_predicate.projection()); - assert_eq!( - from_filters.schema().fields(), - from_predicate.schema().fields() - ); - assert_eq!( - format!("{:?}", from_filters.properties().output_partitioning()), - format!("{:?}", from_predicate.properties().output_partitioning()), - ); - } - - // table_schema() exposes the full pre-projection schema; schema() is projected. - #[tokio::test] - async fn getters_expose_full_schema_and_indices() { - let projection = vec![0usize, 2]; - let scan = IcebergTableScan::new( - test_table().await, - None, - arrow_schema(), - Some(&projection), - &[], - None, - ); - - assert_eq!(scan.table_schema().fields(), arrow_schema().fields()); + assert_eq!(scan.predicates(), Some(&predicate)); + assert_eq!(scan.table_schema().fields(), schema.fields()); assert_eq!(scan.table_schema().fields().len(), 3); assert_eq!(scan.schema().fields().len(), 2); assert_ne!(scan.schema().fields(), scan.table_schema().fields()); assert_eq!(scan.projection_indices(), Some(projection.as_slice())); - let names = ["x".to_string(), "z".to_string()]; - assert_eq!(scan.projection(), Some(names.as_slice())); - } - - // Without a projection, indices/names are None and the full schema is kept. - #[tokio::test] - async fn no_projection_keeps_full_schema() { - let scan = IcebergTableScan::new(test_table().await, None, arrow_schema(), None, &[], None); - - assert_eq!(scan.projection_indices(), None); - assert_eq!(scan.projection(), None); - assert_eq!(scan.schema().fields(), scan.table_schema().fields()); - assert_eq!(scan.table_schema().fields(), arrow_schema().fields()); + let expected_projection = vec!["x".to_string(), "z".to_string()]; + assert_eq!(scan.projection(), Some(expected_projection.as_slice())); + assert!(matches!( + scan.properties().partitioning, + Partitioning::UnknownPartitioning(2) + )); } - // A predicate passed to the new constructor round-trips unchanged. #[tokio::test] - async fn predicate_round_trips() { - let predicate = Reference::new("x").greater_than(Datum::long(5)); + async fn test_no_projection_keeps_full_schema() { + let schema = create_test_arrow_schema(); let scan = IcebergTableScan::new_with_tasks_from_predicate( - test_table().await, + get_test_table_from_metadata_file().await, + None, + schema.clone(), None, - arrow_schema(), None, - Some(predicate.clone()), None, vec![vec![]], Partitioning::UnknownPartitioning(1), ); - assert_eq!(scan.predicates(), Some(&predicate)); + assert_eq!(scan.projection_indices(), None); + assert_eq!(scan.projection(), None); + assert_eq!(scan.predicates(), None); + assert_eq!(scan.schema().fields(), scan.table_schema().fields()); + assert_eq!(scan.table_schema().fields(), schema.fields()); } } From 729eca72ff9fb17eef190232d3819f5e37092860 Mon Sep 17 00:00:00 2001 From: Charles-Antoine Leger Date: Thu, 11 Jun 2026 11:03:57 +0200 Subject: [PATCH 3/4] style(datafusion): tighten scan comments --- .../datafusion/src/physical_plan/scan.rs | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 386c9497c5..1fb178c62c 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -39,12 +39,11 @@ use crate::to_datafusion_error; /// Iceberg [`Table`] scan as a DataFusion [`ExecutionPlan`]. /// -/// Has three construction modes: [`new`][Self::new] (lazy, single-partition), -/// [`new_with_tasks`][Self::new_with_tasks] (eager, over pre-planned -/// [`FileScanTask`] buckets), and -/// [`new_with_tasks_from_predicate`][Self::new_with_tasks_from_predicate] -/// (eager, from a [`Predicate`]) — the last lets a node be rebuilt from its -/// getters, e.g. by a distributed-plan codec. +/// Has three construction modes: [`new`][Self::new] for a lazy +/// single-partition scan, [`new_with_tasks`][Self::new_with_tasks] for an +/// eager scan over pre-planned [`FileScanTask`] buckets, and +/// [`new_with_tasks_from_predicate`][Self::new_with_tasks_from_predicate] for +/// rebuilding an eager scan from a [`Predicate`]. /// /// Note: in eager mode the underlying `TableScan` is rebuilt on every /// `execute(partition)` call. The per-build cost is bounded (no I/O) and @@ -60,11 +59,9 @@ pub struct IcebergTableScan { plan_properties: Arc, /// Projection column names, None means all columns. projection: Option>, - /// Full schema before projection. Kept verbatim, not re-derived: the - /// provider caches it while reloading the table, so it can diverge from the - /// table's current metadata. + /// Full schema before projection. table_schema: ArrowSchemaRef, - /// Projection indices as received by `scan`; `projection` keeps only names. + /// Projection indices as received by `scan`. projection_indices: Option>, /// Filters to apply to the table scan. predicates: Option, @@ -127,10 +124,7 @@ impl IcebergTableScan { ) } - /// Eager variant taking a [`Predicate`] instead of [`Expr`] filters, so a - /// node can be rebuilt from its getters. The predicate is unbound; the scan - /// builder binds it at `execute` time. - // Arity mirrors `new_with_tasks`; an args struct is deferred. + /// Creates an eager multi-partition scan from a [`Predicate`]. #[allow(clippy::too_many_arguments)] pub fn new_with_tasks_from_predicate( table: Table, From 85b94e3dfd9cc78d41ca3f01dc88a49da91a1696 Mon Sep 17 00:00:00 2001 From: Charles-Antoine Leger Date: Thu, 11 Jun 2026 11:09:28 +0200 Subject: [PATCH 4/4] test(datafusion): keep one scan constructor test --- .../datafusion/src/physical_plan/scan.rs | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 1fb178c62c..bc85fdb425 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -491,25 +491,4 @@ mod tests { Partitioning::UnknownPartitioning(2) )); } - - #[tokio::test] - async fn test_no_projection_keeps_full_schema() { - let schema = create_test_arrow_schema(); - let scan = IcebergTableScan::new_with_tasks_from_predicate( - get_test_table_from_metadata_file().await, - None, - schema.clone(), - None, - None, - None, - vec![vec![]], - Partitioning::UnknownPartitioning(1), - ); - - assert_eq!(scan.projection_indices(), None); - assert_eq!(scan.projection(), None); - assert_eq!(scan.predicates(), None); - assert_eq!(scan.schema().fields(), scan.table_schema().fields()); - assert_eq!(scan.table_schema().fields(), schema.fields()); - } }