Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 117 additions & 8 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ use crate::to_datafusion_error;

/// Iceberg [`Table`] scan as a DataFusion [`ExecutionPlan`].
///
/// Has two construction modes: [`IcebergTableScan::new`] for a lazy
/// single-partition scan, and [`IcebergTableScan::new_with_tasks`] for an
/// eager multi-partition scan over pre-planned [`FileScanTask`] buckets.
/// Has three construction modes: [`new`][Self::new] for a lazy
/// single-partition scan, [`new_with_tasks`][Self::new_with_tasks] for an
/// eager scan over pre-planned [`FileScanTask`] buckets, and
/// [`new_with_tasks_from_predicate`][Self::new_with_tasks_from_predicate] for
/// rebuilding an eager scan from a [`Predicate`].
///
/// Note: in eager mode the underlying `TableScan` is rebuilt on every
/// `execute(partition)` call. The per-build cost is bounded (no I/O) and
Expand All @@ -57,6 +59,10 @@ pub struct IcebergTableScan {
plan_properties: Arc<PlanProperties>,
/// Projection column names, None means all columns.
projection: Option<Vec<String>>,
/// Full schema before projection.
table_schema: ArrowSchemaRef,
/// Projection indices as received by `scan`.
projection_indices: Option<Vec<usize>>,
/// Filters to apply to the table scan.
predicates: Option<Predicate>,
/// Pre-planned file scan tasks per partition (eager mode), or `None` (lazy mode).
Expand Down Expand Up @@ -84,7 +90,7 @@ impl IcebergTableScan {
snapshot_id,
schema,
projection,
filters,
convert_filters_to_predicate(filters),
limit,
Partitioning::UnknownPartitioning(1),
None,
Expand All @@ -111,7 +117,31 @@ impl IcebergTableScan {
snapshot_id,
schema,
projection,
filters,
convert_filters_to_predicate(filters),
limit,
partitioning,
Some(buckets),
)
}

/// Creates an eager multi-partition scan from a [`Predicate`].
#[allow(clippy::too_many_arguments)]
pub fn new_with_tasks_from_predicate(
table: Table,
snapshot_id: Option<i64>,
schema: ArrowSchemaRef,
projection: Option<&Vec<usize>>,
predicate: Option<Predicate>,
limit: Option<usize>,
buckets: Vec<Vec<FileScanTask>>,
partitioning: Partitioning,
) -> Self {
Self::new_inner(
table,
snapshot_id,
schema,
projection,
predicate,
limit,
partitioning,
Some(buckets),
Expand All @@ -124,7 +154,7 @@ impl IcebergTableScan {
snapshot_id: Option<i64>,
schema: ArrowSchemaRef,
projection: Option<&Vec<usize>>,
filters: &[Expr],
predicate: Option<Predicate>,
limit: Option<usize>,
partitioning: Partitioning,
buckets: Option<Vec<Vec<FileScanTask>>>,
Expand All @@ -139,15 +169,18 @@ impl IcebergTableScan {
EmissionType::Incremental,
Boundedness::Bounded,
));
let table_schema = Arc::clone(&schema);
let projection_indices = projection.cloned();
let projection = get_column_names(schema, projection);
let predicates = convert_filters_to_predicate(filters);

Self {
table,
snapshot_id,
plan_properties,
projection,
predicates,
table_schema,
projection_indices,
predicates: predicate,
buckets,
partition_keys_kind: None,
limit,
Expand All @@ -166,6 +199,10 @@ impl IcebergTableScan {
&self.table
}

pub fn table_schema(&self) -> &ArrowSchemaRef {
&self.table_schema
}

pub fn snapshot_id(&self) -> Option<i64> {
self.snapshot_id
}
Expand All @@ -174,6 +211,10 @@ impl IcebergTableScan {
self.projection.as_deref()
}

pub fn projection_indices(&self) -> Option<&[usize]> {
self.projection_indices.as_deref()
}

pub fn predicates(&self) -> Option<&Predicate> {
self.predicates.as_ref()
}
Expand Down Expand Up @@ -383,3 +424,71 @@ pub(super) fn get_column_names(
.collect::<Vec<String>>()
})
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datafusion::physical_plan::ExecutionPlan;
use iceberg::TableIdent;
use iceberg::expr::Reference;
use iceberg::io::FileIO;
use iceberg::spec::Datum;
use iceberg::table::{StaticTable, Table};

use super::*;

async fn get_test_table_from_metadata_file() -> Table {
let metadata_file_name = "TableMetadataV2Valid.json";
let metadata_file_path = format!(
"{}/tests/test_data/{}",
env!("CARGO_MANIFEST_DIR"),
metadata_file_name
);
let ident = TableIdent::from_strs(["ns", "scan_table"]).unwrap();
StaticTable::from_metadata_file(&metadata_file_path, ident, FileIO::new_with_fs())
.await
.unwrap()
.into_table()
}

fn create_test_arrow_schema() -> ArrowSchemaRef {
Arc::new(ArrowSchema::new(vec![
Field::new("x", DataType::Int64, false),
Field::new("y", DataType::Int64, false),
Field::new("z", DataType::Int64, false),
]))
}

#[tokio::test]
async fn test_predicate_constructor_exposes_rebuild_inputs() {
let schema = create_test_arrow_schema();
let projection = vec![0usize, 2];
let predicate = Reference::new("x").greater_than(Datum::long(5));
let scan = IcebergTableScan::new_with_tasks_from_predicate(
get_test_table_from_metadata_file().await,
None,
schema.clone(),
Some(&projection),
Some(predicate.clone()),
Some(100),
vec![vec![], vec![]],
Partitioning::UnknownPartitioning(2),
);

assert_eq!(scan.predicates(), Some(&predicate));
assert_eq!(scan.table_schema().fields(), schema.fields());
assert_eq!(scan.table_schema().fields().len(), 3);
assert_eq!(scan.schema().fields().len(), 2);
assert_ne!(scan.schema().fields(), scan.table_schema().fields());

assert_eq!(scan.projection_indices(), Some(projection.as_slice()));
let expected_projection = vec!["x".to_string(), "z".to_string()];
assert_eq!(scan.projection(), Some(expected_projection.as_slice()));
assert!(matches!(
scan.properties().partitioning,
Partitioning::UnknownPartitioning(2)
));
}
}
8 changes: 4 additions & 4 deletions crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ impl TableProvider for IcebergTableProvider {
Some(names) => builder.select(names),
None => builder.select_all(),
};
if let Some(pred) = predicate {
builder = builder.with_filter(pred);
if let Some(pred) = &predicate {
builder = builder.with_filter(pred.clone());
}

let tasks: Vec<FileScanTask> = builder
Expand Down Expand Up @@ -258,12 +258,12 @@ impl TableProvider for IcebergTableProvider {
};

Ok(Arc::new(
IcebergTableScan::new_with_tasks(
IcebergTableScan::new_with_tasks_from_predicate(
table,
None, // Always use current snapshot for catalog-backed provider
self.schema.clone(),
projection,
filters,
predicate,
limit,
buckets,
partitioning,
Expand Down
Loading