diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 3926de8acd..bc85fdb425 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -39,9 +39,11 @@ use crate::to_datafusion_error; /// Iceberg [`Table`] scan as a DataFusion [`ExecutionPlan`]. /// -/// Has two construction modes: [`IcebergTableScan::new`] for a lazy -/// single-partition scan, and [`IcebergTableScan::new_with_tasks`] for an -/// eager multi-partition scan over pre-planned [`FileScanTask`] buckets. +/// Has three construction modes: [`new`][Self::new] for a lazy +/// single-partition scan, [`new_with_tasks`][Self::new_with_tasks] for an +/// eager scan over pre-planned [`FileScanTask`] buckets, and +/// [`new_with_tasks_from_predicate`][Self::new_with_tasks_from_predicate] for +/// rebuilding an eager scan from a [`Predicate`]. /// /// Note: in eager mode the underlying `TableScan` is rebuilt on every /// `execute(partition)` call. The per-build cost is bounded (no I/O) and @@ -57,6 +59,10 @@ pub struct IcebergTableScan { plan_properties: Arc, /// Projection column names, None means all columns. projection: Option>, + /// Full schema before projection. + table_schema: ArrowSchemaRef, + /// Projection indices as received by `scan`. + projection_indices: Option>, /// Filters to apply to the table scan. predicates: Option, /// Pre-planned file scan tasks per partition (eager mode), or `None` (lazy mode). @@ -84,7 +90,7 @@ impl IcebergTableScan { snapshot_id, schema, projection, - filters, + convert_filters_to_predicate(filters), limit, Partitioning::UnknownPartitioning(1), None, @@ -111,7 +117,31 @@ impl IcebergTableScan { snapshot_id, schema, projection, - filters, + convert_filters_to_predicate(filters), + limit, + partitioning, + Some(buckets), + ) + } + + /// Creates an eager multi-partition scan from a [`Predicate`]. + #[allow(clippy::too_many_arguments)] + pub fn new_with_tasks_from_predicate( + table: Table, + snapshot_id: Option, + schema: ArrowSchemaRef, + projection: Option<&Vec>, + predicate: Option, + limit: Option, + buckets: Vec>, + partitioning: Partitioning, + ) -> Self { + Self::new_inner( + table, + snapshot_id, + schema, + projection, + predicate, limit, partitioning, Some(buckets), @@ -124,7 +154,7 @@ impl IcebergTableScan { snapshot_id: Option, schema: ArrowSchemaRef, projection: Option<&Vec>, - filters: &[Expr], + predicate: Option, limit: Option, partitioning: Partitioning, buckets: Option>>, @@ -139,15 +169,18 @@ impl IcebergTableScan { EmissionType::Incremental, Boundedness::Bounded, )); + let table_schema = Arc::clone(&schema); + let projection_indices = projection.cloned(); let projection = get_column_names(schema, projection); - let predicates = convert_filters_to_predicate(filters); Self { table, snapshot_id, plan_properties, projection, - predicates, + table_schema, + projection_indices, + predicates: predicate, buckets, partition_keys_kind: None, limit, @@ -166,6 +199,10 @@ impl IcebergTableScan { &self.table } + pub fn table_schema(&self) -> &ArrowSchemaRef { + &self.table_schema + } + pub fn snapshot_id(&self) -> Option { self.snapshot_id } @@ -174,6 +211,10 @@ impl IcebergTableScan { self.projection.as_deref() } + pub fn projection_indices(&self) -> Option<&[usize]> { + self.projection_indices.as_deref() + } + pub fn predicates(&self) -> Option<&Predicate> { self.predicates.as_ref() } @@ -383,3 +424,71 @@ pub(super) fn get_column_names( .collect::>() }) } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use datafusion::physical_plan::ExecutionPlan; + use iceberg::TableIdent; + use iceberg::expr::Reference; + use iceberg::io::FileIO; + use iceberg::spec::Datum; + use iceberg::table::{StaticTable, Table}; + + use super::*; + + async fn get_test_table_from_metadata_file() -> Table { + let metadata_file_name = "TableMetadataV2Valid.json"; + let metadata_file_path = format!( + "{}/tests/test_data/{}", + env!("CARGO_MANIFEST_DIR"), + metadata_file_name + ); + let ident = TableIdent::from_strs(["ns", "scan_table"]).unwrap(); + StaticTable::from_metadata_file(&metadata_file_path, ident, FileIO::new_with_fs()) + .await + .unwrap() + .into_table() + } + + fn create_test_arrow_schema() -> ArrowSchemaRef { + Arc::new(ArrowSchema::new(vec![ + Field::new("x", DataType::Int64, false), + Field::new("y", DataType::Int64, false), + Field::new("z", DataType::Int64, false), + ])) + } + + #[tokio::test] + async fn test_predicate_constructor_exposes_rebuild_inputs() { + let schema = create_test_arrow_schema(); + let projection = vec![0usize, 2]; + let predicate = Reference::new("x").greater_than(Datum::long(5)); + let scan = IcebergTableScan::new_with_tasks_from_predicate( + get_test_table_from_metadata_file().await, + None, + schema.clone(), + Some(&projection), + Some(predicate.clone()), + Some(100), + vec![vec![], vec![]], + Partitioning::UnknownPartitioning(2), + ); + + assert_eq!(scan.predicates(), Some(&predicate)); + assert_eq!(scan.table_schema().fields(), schema.fields()); + assert_eq!(scan.table_schema().fields().len(), 3); + assert_eq!(scan.schema().fields().len(), 2); + assert_ne!(scan.schema().fields(), scan.table_schema().fields()); + + assert_eq!(scan.projection_indices(), Some(projection.as_slice())); + let expected_projection = vec!["x".to_string(), "z".to_string()]; + assert_eq!(scan.projection(), Some(expected_projection.as_slice())); + assert!(matches!( + scan.properties().partitioning, + Partitioning::UnknownPartitioning(2) + )); + } +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 56faab8075..2e1c664eb5 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -208,8 +208,8 @@ impl TableProvider for IcebergTableProvider { Some(names) => builder.select(names), None => builder.select_all(), }; - if let Some(pred) = predicate { - builder = builder.with_filter(pred); + if let Some(pred) = &predicate { + builder = builder.with_filter(pred.clone()); } let tasks: Vec = builder @@ -258,12 +258,12 @@ impl TableProvider for IcebergTableProvider { }; Ok(Arc::new( - IcebergTableScan::new_with_tasks( + IcebergTableScan::new_with_tasks_from_predicate( table, None, // Always use current snapshot for catalog-backed provider self.schema.clone(), projection, - filters, + predicate, limit, buckets, partitioning,