diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index e5a1e4613b3d..5835e47bbada 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -284,6 +284,80 @@ fn sort_columns_from_physical_sort_exprs( .collect() } +fn seed_summary_statistics(summary_statistics: &mut Statistics, file_stats: &Statistics) { + summary_statistics.num_rows = file_stats.num_rows; + summary_statistics.total_byte_size = file_stats.total_byte_size; + + for (summary_col_stats, file_col_stats) in summary_statistics + .column_statistics + .iter_mut() + .zip(file_stats.column_statistics.iter()) + { + summary_col_stats.null_count = file_col_stats.null_count; + summary_col_stats.max_value = file_col_stats.max_value.clone(); + summary_col_stats.min_value = file_col_stats.min_value.clone(); + summary_col_stats.sum_value = file_col_stats.sum_value.cast_to_sum_type(); + summary_col_stats.byte_size = file_col_stats.byte_size; + } +} + +fn merge_summary_statistics( + summary_statistics: &mut Statistics, + file_stats: &Statistics, +) { + summary_statistics.num_rows = summary_statistics.num_rows.add(&file_stats.num_rows); + summary_statistics.total_byte_size = summary_statistics + .total_byte_size + .add(&file_stats.total_byte_size); + + for (summary_col_stats, file_col_stats) in summary_statistics + .column_statistics + .iter_mut() + .zip(file_stats.column_statistics.iter()) + { + let ColumnStatistics { + null_count: file_nc, + max_value: file_max, + min_value: file_min, + sum_value: file_sum, + distinct_count: _, + byte_size: file_sbs, + } = file_col_stats; + + summary_col_stats.null_count = summary_col_stats.null_count.add(file_nc); + summary_col_stats.max_value = summary_col_stats.max_value.max(file_max); + summary_col_stats.min_value = summary_col_stats.min_value.min(file_min); + summary_col_stats.sum_value = summary_col_stats.sum_value.add_for_sum(file_sum); + summary_col_stats.byte_size = summary_col_stats.byte_size.add(file_sbs); + } +} + +fn seed_first_file_statistics( + limit_num_rows: &mut Precision, + summary_statistics: &mut Statistics, + file_stats: &Statistics, + collect_stats: bool, +) { + *limit_num_rows = file_stats.num_rows; + + if collect_stats { + seed_summary_statistics(summary_statistics, file_stats); + } +} + +fn merge_file_statistics( + limit_num_rows: &mut Precision, + summary_statistics: &mut Statistics, + file_stats: &Statistics, + collect_stats: bool, +) { + *limit_num_rows = limit_num_rows.add(&file_stats.num_rows); + + if collect_stats { + merge_summary_statistics(summary_statistics, file_stats); + } +} + /// Get all files as well as the file level summary statistics (no statistic for partition columns). /// If the optional `limit` is provided, includes only sufficient files. Needed to read up to /// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on @@ -308,9 +382,14 @@ pub async fn get_statistics_with_limit( // - zero for summations, and // - neutral element for extreme points. let size = file_schema.fields().len(); - let mut col_stats_set = vec![ColumnStatistics::default(); size]; - let mut num_rows = Precision::::Absent; - let mut total_byte_size = Precision::::Absent; + let mut summary_statistics = Statistics { + num_rows: Precision::Absent, + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics::default(); size], + }; + // Keep limit pruning separate from the returned summary so `collect_stats=false` + // can still stop early using known file row counts. + let mut limit_num_rows = Precision::::Absent; // Fusing the stream allows us to call next safely even once it is finished. let mut all_files = Box::pin(all_files.fuse()); @@ -320,23 +399,18 @@ pub async fn get_statistics_with_limit( file.statistics = Some(Arc::clone(&file_stats)); result_files.push(file); - // First file, we set them directly from the file statistics. - num_rows = file_stats.num_rows; - total_byte_size = file_stats.total_byte_size; - for (index, file_column) in - file_stats.column_statistics.clone().into_iter().enumerate() - { - col_stats_set[index].null_count = file_column.null_count; - col_stats_set[index].max_value = file_column.max_value; - col_stats_set[index].min_value = file_column.min_value; - col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type(); - } + seed_first_file_statistics( + &mut limit_num_rows, + &mut summary_statistics, + &file_stats, + collect_stats, + ); // If the number of rows exceeds the limit, we can stop processing // files. This only applies when we know the number of rows. It also // currently ignores tables that have no statistics regarding the // number of rows. - let conservative_num_rows = match num_rows { + let conservative_num_rows = match limit_num_rows { Precision::Exact(nr) => nr, _ => usize::MIN, }; @@ -345,44 +419,18 @@ pub async fn get_statistics_with_limit( let (mut file, file_stats) = current?; file.statistics = Some(Arc::clone(&file_stats)); result_files.push(file); - if !collect_stats { - continue; - } - - // We accumulate the number of rows, total byte size and null - // counts across all the files in question. If any file does not - // provide any information or provides an inexact value, we demote - // the statistic precision to inexact. - num_rows = num_rows.add(&file_stats.num_rows); - - total_byte_size = total_byte_size.add(&file_stats.total_byte_size); - - for (file_col_stats, col_stats) in file_stats - .column_statistics - .iter() - .zip(col_stats_set.iter_mut()) - { - let ColumnStatistics { - null_count: file_nc, - max_value: file_max, - min_value: file_min, - sum_value: file_sum, - distinct_count: _, - byte_size: file_sbs, - } = file_col_stats; - - col_stats.null_count = col_stats.null_count.add(file_nc); - col_stats.max_value = col_stats.max_value.max(file_max); - col_stats.min_value = col_stats.min_value.min(file_min); - col_stats.sum_value = col_stats.sum_value.add_for_sum(file_sum); - col_stats.byte_size = col_stats.byte_size.add(file_sbs); - } + merge_file_statistics( + &mut limit_num_rows, + &mut summary_statistics, + &file_stats, + collect_stats, + ); // If the number of rows exceeds the limit, we can stop processing // files. This only applies when we know the number of rows. It also // currently ignores tables that have no statistics regarding the // number of rows. - if num_rows.get_value().unwrap_or(&usize::MIN) + if limit_num_rows.get_value().unwrap_or(&usize::MIN) > &limit.unwrap_or(usize::MAX) { break; @@ -391,11 +439,7 @@ pub async fn get_statistics_with_limit( } }; - let mut statistics = Statistics { - num_rows, - total_byte_size, - column_statistics: col_stats_set, - }; + let mut statistics = summary_statistics; if all_files.next().await.is_some() { // If we still have files in the stream, it means that the limit kicked // in, and the statistic could have been different had we processed the @@ -520,6 +564,39 @@ mod tests { } } + fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)])) + } + + fn make_file_stats( + num_rows: usize, + total_byte_size: usize, + col_stats: ColumnStatistics, + ) -> Arc { + Arc::new(Statistics { + num_rows: Precision::Exact(num_rows), + total_byte_size: Precision::Exact(total_byte_size), + column_statistics: vec![col_stats], + }) + } + + fn rich_col_stats( + null_count: usize, + min: i64, + max: i64, + sum: i64, + byte_size: usize, + ) -> ColumnStatistics { + ColumnStatistics { + null_count: Precision::Exact(null_count), + max_value: Precision::Exact(ScalarValue::Int64(Some(max))), + min_value: Precision::Exact(ScalarValue::Int64(Some(min))), + distinct_count: Precision::Absent, + sum_value: Precision::Exact(ScalarValue::Int64(Some(sum))), + byte_size: Precision::Exact(byte_size), + } + } + #[tokio::test] #[expect(deprecated)] async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type() @@ -533,7 +610,7 @@ mod tests { ))]); let (_group, stats) = - get_statistics_with_limit(files, schema, None, false).await?; + get_statistics_with_limit(files, schema, None, true).await?; assert_eq!( stats.column_statistics[0].sum_value, @@ -571,4 +648,151 @@ mod tests { Ok(()) } + + #[tokio::test] + #[expect(deprecated)] + async fn get_statistics_with_limit_collect_stats_false_returns_bare_statistics() { + let all_files = stream::iter(vec![ + Ok(( + PartitionedFile::new("first.parquet", 10), + make_file_stats(0, 0, rich_col_stats(1, 1, 9, 15, 64)), + )), + Ok(( + PartitionedFile::new("second.parquet", 20), + make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)), + )), + ]); + + let (_files, statistics) = + get_statistics_with_limit(all_files, test_schema(), None, false) + .await + .unwrap(); + + assert_eq!(statistics.num_rows, Precision::Absent); + assert_eq!(statistics.total_byte_size, Precision::Absent); + assert_eq!(statistics.column_statistics.len(), 1); + assert_eq!( + statistics.column_statistics[0].null_count, + Precision::Absent + ); + assert_eq!(statistics.column_statistics[0].max_value, Precision::Absent); + assert_eq!(statistics.column_statistics[0].min_value, Precision::Absent); + assert_eq!(statistics.column_statistics[0].sum_value, Precision::Absent); + assert_eq!(statistics.column_statistics[0].byte_size, Precision::Absent); + } + + #[tokio::test] + #[expect(deprecated)] + async fn get_statistics_with_limit_collect_stats_false_uses_row_counts_for_limit() { + let all_files = stream::iter(vec![ + Ok(( + PartitionedFile::new("first.parquet", 10), + make_file_stats(3, 30, rich_col_stats(1, 1, 9, 15, 64)), + )), + Ok(( + PartitionedFile::new("second.parquet", 20), + make_file_stats(3, 30, rich_col_stats(2, 10, 99, 300, 128)), + )), + Ok(( + PartitionedFile::new("third.parquet", 30), + make_file_stats(3, 30, rich_col_stats(0, 100, 199, 450, 256)), + )), + ]); + + let (files, statistics) = + get_statistics_with_limit(all_files, test_schema(), Some(4), false) + .await + .unwrap(); + + assert_eq!(files.len(), 2); + assert_eq!(statistics.num_rows, Precision::Absent); + assert_eq!(statistics.total_byte_size, Precision::Absent); + } + + #[tokio::test] + #[expect(deprecated)] + async fn get_statistics_with_limit_collect_stats_true_aggregates_statistics() { + let all_files = stream::iter(vec![ + Ok(( + PartitionedFile::new("first.parquet", 10), + make_file_stats(5, 50, rich_col_stats(1, 1, 9, 15, 64)), + )), + Ok(( + PartitionedFile::new("second.parquet", 20), + make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)), + )), + ]); + + let (_files, statistics) = + get_statistics_with_limit(all_files, test_schema(), None, true) + .await + .unwrap(); + + assert_eq!(statistics.num_rows, Precision::Exact(15)); + assert_eq!(statistics.total_byte_size, Precision::Exact(150)); + assert_eq!( + statistics.column_statistics[0].null_count, + Precision::Exact(3) + ); + assert_eq!( + statistics.column_statistics[0].min_value, + Precision::Exact(ScalarValue::Int64(Some(1))) + ); + assert_eq!( + statistics.column_statistics[0].max_value, + Precision::Exact(ScalarValue::Int64(Some(99))) + ); + assert_eq!( + statistics.column_statistics[0].sum_value, + Precision::Exact(ScalarValue::Int64(Some(315))) + ); + assert_eq!( + statistics.column_statistics[0].byte_size, + Precision::Exact(192) + ); + } + + #[tokio::test] + #[expect(deprecated)] + async fn get_statistics_with_limit_collect_stats_true_limit_marks_inexact() { + let all_files = stream::iter(vec![ + Ok(( + PartitionedFile::new("first.parquet", 10), + make_file_stats(5, 50, rich_col_stats(0, 1, 5, 15, 64)), + )), + Ok(( + PartitionedFile::new("second.parquet", 20), + make_file_stats(5, 50, rich_col_stats(1, 6, 10, 40, 64)), + )), + Ok(( + PartitionedFile::new("third.parquet", 20), + make_file_stats(5, 50, rich_col_stats(2, 11, 15, 65, 64)), + )), + ]); + + let (files, statistics) = + get_statistics_with_limit(all_files, test_schema(), Some(8), true) + .await + .unwrap(); + + assert_eq!(files.len(), 2); + assert_eq!(statistics.num_rows, Precision::Inexact(10)); + assert_eq!(statistics.total_byte_size, Precision::Inexact(100)); + assert_eq!( + statistics.column_statistics[0].min_value, + Precision::Inexact(ScalarValue::Int64(Some(1))) + ); + assert_eq!( + statistics.column_statistics[0].max_value, + Precision::Inexact(ScalarValue::Int64(Some(10))) + ); + assert_eq!( + statistics.column_statistics[0].sum_value, + Precision::Inexact(ScalarValue::Int64(Some(55))) + ); + assert_eq!( + statistics.column_statistics[0].byte_size, + Precision::Inexact(128) + ); + } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 81088e5f89fd..afe2b0ae810a 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -807,15 +807,23 @@ fn collect_new_statistics( }; }; let (lower, upper) = interval.into_bounds(); - let is_exact = !lower.is_null() && !upper.is_null() && lower == upper; - let min_value = interval_bound_to_precision(lower, is_exact); - let max_value = interval_bound_to_precision(upper, is_exact); + let is_single_value = + !lower.is_null() && !upper.is_null() && lower == upper; + let min_value = interval_bound_to_precision(lower, is_single_value); + let max_value = interval_bound_to_precision(upper, is_single_value); + // When the interval collapses to a single value (equality + // predicate), the column has exactly 1 distinct value + let capped_distinct_count = if is_single_value { + Precision::Exact(1) + } else { + distinct_count.to_inexact() + }; ColumnStatistics { null_count: input_column_stats[idx].null_count.to_inexact(), max_value, min_value, sum_value: Precision::Absent, - distinct_count: distinct_count.to_inexact(), + distinct_count: capped_distinct_count, byte_size: input_column_stats[idx].byte_size, } }, @@ -2241,4 +2249,327 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_filter_statistics_equality_sets_ndv_to_one() -> Result<()> { + // a: min=1, max=100, ndv=80 + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(400), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + distinct_count: Precision::Inexact(80), + ..Default::default() + }], + }, + schema.clone(), + )); + + // a = 42 collapses interval to a single value + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Exact(1) + ); + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_or_equality_preserves_ndv() -> Result<()> { + // a: min=1, max=100, ndv=80 + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(400), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + distinct_count: Precision::Inexact(80), + ..Default::default() + }], + }, + schema.clone(), + )); + + // a = 42 OR a = 22: interval stays [1, 100], not a single value + let predicate = Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )), + Operator::Or, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(22)))), + )), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Inexact(80) + ); + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_and_equality_ndv() -> Result<()> { + // a: min=1, max=100, ndv=80 + // b: min=1, max=50, ndv=40 + // c: min=1, max=200, ndv=150 + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(1200), + column_statistics: vec![ + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + distinct_count: Precision::Inexact(80), + ..Default::default() + }, + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(50))), + distinct_count: Precision::Inexact(40), + ..Default::default() + }, + ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(200))), + distinct_count: Precision::Inexact(150), + ..Default::default() + }, + ], + }, + schema.clone(), + )); + + // a = 42 AND b > 10 AND c = 7 + let predicate = Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("b", 1)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::new(Column::new("c", 2)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(7)))), + )), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + // a = 42 collapses to single value + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Exact(1) + ); + // b > 10 narrows to [11, 50] but doesn't collapse + assert_eq!( + statistics.column_statistics[1].distinct_count, + Precision::Inexact(40) + ); + // c = 7 collapses to single value + assert_eq!( + statistics.column_statistics[2].distinct_count, + Precision::Exact(1) + ); + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_equality_absent_bounds_ndv() -> Result<()> { + // a: ndv=80, no min/max + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(400), + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Inexact(80), + ..Default::default() + }], + }, + schema.clone(), + )); + + // a = 42: even without known bounds, interval analysis resolves + // the equality to [42, 42], so NDV is correctly set to Exact(1) + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Exact(1) + ); + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_equality_int8_ndv() -> Result<()> { + // a: min=-100, max=100, ndv=50 + let schema = Schema::new(vec![Field::new("a", DataType::Int8, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(100), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int8(Some(-100))), + max_value: Precision::Inexact(ScalarValue::Int8(Some(100))), + distinct_count: Precision::Inexact(50), + ..Default::default() + }], + }, + schema.clone(), + )); + + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int8(Some(42)))), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Exact(1) + ); + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_equality_int64_ndv() -> Result<()> { + // a: min=0, max=1_000_000, ndv=100_000 + let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100_000), + total_byte_size: Precision::Inexact(800_000), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int64(Some(0))), + max_value: Precision::Inexact(ScalarValue::Int64(Some(1_000_000))), + distinct_count: Precision::Inexact(100_000), + ..Default::default() + }], + }, + schema.clone(), + )); + + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int64(Some(42)))), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Exact(1) + ); + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_equality_float32_ndv() -> Result<()> { + // a: min=0.0, max=100.0, ndv=50 + let schema = Schema::new(vec![Field::new("a", DataType::Float32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(400), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Float32(Some(0.0))), + max_value: Precision::Inexact(ScalarValue::Float32(Some(100.0))), + distinct_count: Precision::Inexact(50), + ..Default::default() + }], + }, + schema.clone(), + )); + + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Float32(Some(42.5)))), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Exact(1) + ); + Ok(()) + } + + #[tokio::test] + async fn test_filter_statistics_equality_reversed_ndv() -> Result<()> { + // a: min=1, max=100, ndv=80 + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(400), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + distinct_count: Precision::Inexact(80), + ..Default::default() + }], + }, + schema.clone(), + )); + + // 42 = a (literal on the left) + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + Operator::Eq, + Arc::new(Column::new("a", 0)), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let statistics = filter.partition_statistics(None)?; + assert_eq!( + statistics.column_statistics[0].distinct_count, + Precision::Exact(1) + ); + Ok(()) + } } diff --git a/datafusion/sqllogictest/test_files/parquet_statistics.slt b/datafusion/sqllogictest/test_files/parquet_statistics.slt index 1a0e004a6379..1073f60a0fef 100644 --- a/datafusion/sqllogictest/test_files/parquet_statistics.slt +++ b/datafusion/sqllogictest/test_files/parquet_statistics.slt @@ -59,7 +59,7 @@ query TT EXPLAIN SELECT * FROM test_table WHERE column1 = 1; ---- physical_plan -01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) ScanBytes=Inexact(40))]] +01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]] 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] 03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] @@ -84,7 +84,7 @@ query TT EXPLAIN SELECT * FROM test_table WHERE column1 = 1; ---- physical_plan -01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) ScanBytes=Inexact(40))]] +01)FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(10), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]] 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] 03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0) ScanBytes=Inexact(40))]] @@ -109,7 +109,7 @@ query TT EXPLAIN SELECT * FROM test_table WHERE column1 = 1; ---- physical_plan -01)FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)))]] +01)FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Distinct=Exact(1))]] 02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]] 03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]] @@ -117,6 +117,75 @@ physical_plan statement ok DROP TABLE test_table; +###### +# Equality filter NDV=Exact(1) for different numeric types +###### + +statement ok +set datafusion.execution.collect_statistics = true; + +query I +COPY ( + SELECT + arrow_cast(v, 'Int8') AS i8, + arrow_cast(v, 'Int64') AS i64, + arrow_cast(v + 0.5, 'Float32') AS f32, + arrow_cast(v + 0.5, 'Float64') AS f64 + FROM (VALUES (1), (2), (3), (4), (5)) AS t(v) +) +TO 'test_files/scratch/parquet_statistics/typed_table.parquet' +STORED AS PARQUET; +---- +5 + +statement ok +CREATE EXTERNAL TABLE typed_table ( + i8 TINYINT, + i64 BIGINT, + f32 FLOAT, + f64 DOUBLE +) STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_statistics/typed_table.parquet'; + +# Int8 equality +query TT +EXPLAIN SELECT i8 FROM typed_table WHERE i8 = 2; +---- +physical_plan +01)FilterExec: i8@0 = 2, statistics=[Rows=Inexact(1), Bytes=Inexact(1), [(Col[0]: Min=Exact(Int8(2)) Max=Exact(Int8(2)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(5))]] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, statistics=[Rows=Inexact(5), Bytes=Inexact(5), [(Col[0]: Min=Inexact(Int8(1)) Max=Inexact(Int8(5)) Null=Inexact(0) ScanBytes=Inexact(5))]] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/typed_table.parquet]]}, projection=[i8], file_type=parquet, predicate=i8@0 = 2, pruning_predicate=i8_null_count@2 != row_count@3 AND i8_min@0 <= 2 AND 2 <= i8_max@1, required_guarantees=[i8 in (2)], statistics=[Rows=Inexact(5), Bytes=Inexact(5), [(Col[0]: Min=Inexact(Int8(1)) Max=Inexact(Int8(5)) Null=Inexact(0) ScanBytes=Inexact(5))]] + +# Int64 equality +query TT +EXPLAIN SELECT i64 FROM typed_table WHERE i64 = 2; +---- +physical_plan +01)FilterExec: i64@0 = 2, statistics=[Rows=Inexact(1), Bytes=Inexact(8), [(Col[0]: Min=Exact(Int64(2)) Max=Exact(Int64(2)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(5)) Null=Inexact(0) ScanBytes=Inexact(40))]] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/typed_table.parquet]]}, projection=[i64], file_type=parquet, predicate=i64@1 = 2, pruning_predicate=i64_null_count@2 != row_count@3 AND i64_min@0 <= 2 AND 2 <= i64_max@1, required_guarantees=[i64 in (2)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(5)) Null=Inexact(0) ScanBytes=Inexact(40))]] + +# Float32 equality +query TT +EXPLAIN SELECT f32 FROM typed_table WHERE f32 = 2.5; +---- +physical_plan +01)FilterExec: CAST(f32@0 AS Float64) = 2.5, statistics=[Rows=Inexact(1), Bytes=Inexact(1), [(Col[0]: Min=Exact(Float32(2.5)) Max=Exact(Float32(2.5)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(20))]] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, statistics=[Rows=Inexact(5), Bytes=Inexact(20), [(Col[0]: Min=Inexact(Float32(1.5)) Max=Inexact(Float32(5.5)) Null=Inexact(0) ScanBytes=Inexact(20))]] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/typed_table.parquet]]}, projection=[f32], file_type=parquet, predicate=CAST(f32@2 AS Float64) = 2.5, pruning_predicate=f32_null_count@2 != row_count@3 AND CAST(f32_min@0 AS Float64) <= 2.5 AND 2.5 <= CAST(f32_max@1 AS Float64), required_guarantees=[], statistics=[Rows=Inexact(5), Bytes=Inexact(20), [(Col[0]: Min=Inexact(Float32(1.5)) Max=Inexact(Float32(5.5)) Null=Inexact(0) ScanBytes=Inexact(20))]] + +# Reversed operand order: literal = column (Float64) +query TT +EXPLAIN SELECT f64 FROM typed_table WHERE 2.5 = f64; +---- +physical_plan +01)FilterExec: f64@0 = 2.5, statistics=[Rows=Inexact(1), Bytes=Inexact(1), [(Col[0]: Min=Exact(Float64(2.5)) Max=Exact(Float64(2.5)) Null=Inexact(0) Distinct=Exact(1) ScanBytes=Inexact(40))]] +02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Float64(1.5)) Max=Inexact(Float64(5.5)) Null=Inexact(0) ScanBytes=Inexact(40))]] +03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/typed_table.parquet]]}, projection=[f64], file_type=parquet, predicate=f64@3 = 2.5, pruning_predicate=f64_null_count@2 != row_count@3 AND f64_min@0 <= 2.5 AND 2.5 <= f64_max@1, required_guarantees=[f64 in (2.5)], statistics=[Rows=Inexact(5), Bytes=Inexact(40), [(Col[0]: Min=Inexact(Float64(1.5)) Max=Inexact(Float64(5.5)) Null=Inexact(0) ScanBytes=Inexact(40))]] + +statement ok +DROP TABLE typed_table; + # Config reset statement ok RESET datafusion.execution.collect_statistics;