diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 3926de8acd..0a386bded7 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -34,7 +34,6 @@ use iceberg::scan::{FileScanTask, TableScan}; use iceberg::table::Table; use super::expr_to_predicate::convert_filters_to_predicate; -use crate::table::PartitionKeysKind; use crate::to_datafusion_error; /// Iceberg [`Table`] scan as a DataFusion [`ExecutionPlan`]. @@ -61,8 +60,6 @@ pub struct IcebergTableScan { predicates: Option, /// Pre-planned file scan tasks per partition (eager mode), or `None` (lazy mode). buckets: Option>>, - /// `None` when partitioning is `UnknownPartitioning`. - partition_keys_kind: Option, /// Optional limit on the number of rows to return. limit: Option, } @@ -149,19 +146,10 @@ impl IcebergTableScan { projection, predicates, buckets, - partition_keys_kind: None, limit, } } - pub(crate) fn with_partition_keys_kind( - mut self, - partition_keys_kind: Option, - ) -> Self { - self.partition_keys_kind = partition_keys_kind; - self - } - pub fn table(&self) -> &Table { &self.table } @@ -183,12 +171,6 @@ impl IcebergTableScan { self.buckets.as_deref().unwrap_or(&[]) } - /// Returns the transform family behind the `Partitioning::Hash` declaration, - /// or `None` when the scan declares `UnknownPartitioning`. - pub fn partition_keys_kind(&self) -> Option { - self.partition_keys_kind - } - pub fn limit(&self) -> Option { self.limit } diff --git a/crates/integrations/datafusion/src/table/bucketing.rs b/crates/integrations/datafusion/src/table/bucketing.rs index 633a732b31..579752108c 100644 --- a/crates/integrations/datafusion/src/table/bucketing.rs +++ b/crates/integrations/datafusion/src/table/bucketing.rs @@ -152,17 +152,6 @@ pub(super) fn compute_bucket_cols( }]) } -/// Identifies the transform family behind a `Partitioning::Hash` declaration -/// on an [`IcebergTableScan`][crate::physical_plan::scan::IcebergTableScan]. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[non_exhaustive] -pub enum PartitionKeysKind { - /// Keys come from `Transform::Identity` fields. - Identity, - /// Keys come from `Transform::Bucket(_)` fields. - Bucket, -} - /// Single-entry partition-key descriptor used by [`bucket_tasks`] and /// `IcebergTableProvider::scan` to drive both task distribution and the /// `Partitioning::Hash` declaration. @@ -186,13 +175,6 @@ impl PartitionKeys { .collect(), } } - - pub(super) fn kind(&self) -> PartitionKeysKind { - match self { - PartitionKeys::Identity(_) => PartitionKeysKind::Identity, - PartitionKeys::Bucket(_) => PartitionKeysKind::Bucket, - } - } } /// Return the partition keys that drive a `Partitioning::Hash` declaration, diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 56faab8075..42fe833e5e 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -26,7 +26,6 @@ //! table snapshot. Use for consistent analytical queries or time-travel scenarios. mod bucketing; -pub use bucketing::PartitionKeysKind; pub mod metadata_table; pub mod table_provider_factory; @@ -249,27 +248,23 @@ impl TableProvider for IcebergTableProvider { let (buckets, all_had_full_key) = bucketing::bucket_tasks(tasks, n_partitions, keys.as_ref()); - let (partitioning, partition_keys_kind) = match &keys { - Some(keys) if all_had_full_key && n_partitions > 0 => ( - Partitioning::Hash(keys.column_exprs(), n_partitions), - Some(keys.kind()), - ), - _ => (Partitioning::UnknownPartitioning(n_partitions), None), + let partitioning = match &keys { + Some(keys) if all_had_full_key && n_partitions > 0 => { + Partitioning::Hash(keys.column_exprs(), n_partitions) + } + _ => Partitioning::UnknownPartitioning(n_partitions), }; - Ok(Arc::new( - IcebergTableScan::new_with_tasks( - table, - None, // Always use current snapshot for catalog-backed provider - self.schema.clone(), - projection, - filters, - limit, - buckets, - partitioning, - ) - .with_partition_keys_kind(partition_keys_kind), - )) + Ok(Arc::new(IcebergTableScan::new_with_tasks( + table, + None, // Always use current snapshot for catalog-backed provider + self.schema.clone(), + projection, + filters, + limit, + buckets, + partitioning, + ))) } fn supports_filters_pushdown( @@ -1334,10 +1329,6 @@ mod tests { } other => panic!("expected Partitioning::Hash, got {other:?}"), } - assert_eq!( - scan.partition_keys_kind(), - Some(super::PartitionKeysKind::Identity), - ); } /// A projection that omits the partition source column drops @@ -1370,7 +1361,6 @@ mod tests { scan.properties().partitioning, Partitioning::UnknownPartitioning(_) )); - assert_eq!(scan.partition_keys_kind(), None); } // ── Bucket-transform partitioning tests ───────────────────────────────── @@ -1529,10 +1519,6 @@ mod tests { } other => panic!("expected Partitioning::Hash, got {other:?}"), } - assert_eq!( - scan.partition_keys_kind(), - Some(super::PartitionKeysKind::Bucket), - ); } /// Single-column bucket spec where the projection excludes the *only* @@ -1573,7 +1559,6 @@ mod tests { scan.properties().partitioning, Partitioning::UnknownPartitioning(_) )); - assert_eq!(scan.partition_keys_kind(), None); } /// A `None` partition slot makes `bucket_hash` return `None`, so the @@ -1605,7 +1590,6 @@ mod tests { scan.properties().partitioning, Partitioning::UnknownPartitioning(_) )); - assert_eq!(scan.partition_keys_kind(), None); } /// Mixed `Bucket[N] + Truncate(_)` spec: `compute_bucket_cols` rejects @@ -1712,7 +1696,6 @@ mod tests { scan.properties().partitioning, Partitioning::UnknownPartitioning(_) )); - assert_eq!(scan.partition_keys_kind(), None); } /// Mixed `Identity + Bucket` spec must keep the existing behaviour: @@ -1845,10 +1828,6 @@ mod tests { } other => panic!("expected Partitioning::Hash, got {other:?}"), } - assert_eq!( - scan.partition_keys_kind(), - Some(super::PartitionKeysKind::Identity), - ); } /// Pure `Bucket[N]` with `target_partitions == N`: tasks must land