diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index a212122401f98..254e437b69cc0 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -594,6 +594,10 @@ impl Statistics { } Precision::Absent => Precision::Absent, }; + // NDV can never exceed the number of rows + if let Some(&rows) = self.num_rows.get_value() { + cs.distinct_count = cs.distinct_count.min(&Precision::Inexact(rows)); + } cs }) .collect(); @@ -2169,7 +2173,8 @@ mod tests { result_col_stats.sum_value, Precision::Inexact(ScalarValue::Int32(Some(123456))) ); - assert_eq!(result_col_stats.distinct_count, Precision::Inexact(789)); + // NDV is capped at the new row count (250) since 789 > 250 + assert_eq!(result_col_stats.distinct_count, Precision::Inexact(250)); } #[test] @@ -2280,6 +2285,90 @@ mod tests { assert_eq!(result.total_byte_size, Precision::Inexact(800)); } + #[test] + fn test_with_fetch_caps_ndv_at_row_count() { + // NDV=500 but after LIMIT 10, NDV should be capped at 10 + let stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Exact(8000), + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Inexact(500), + ..Default::default() + }], + }; + + let result = stats.with_fetch(Some(10), 0, 1).unwrap(); + assert_eq!(result.num_rows, Precision::Exact(10)); + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Inexact(10) + ); + } + + #[test] + fn test_with_fetch_caps_ndv_with_skip() { + // 1000 rows, NDV=500, OFFSET 5 LIMIT 10 + // with_fetch computes num_rows = min(1000 - 5, 10) = 10 + // NDV should be capped at 10 + let stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Exact(8000), + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Inexact(500), + ..Default::default() + }], + }; + + let result = stats.with_fetch(Some(10), 5, 1).unwrap(); + assert_eq!(result.num_rows, Precision::Exact(10)); + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Inexact(10) + ); + } + + #[test] + fn test_with_fetch_caps_ndv_with_large_skip() { + // 1000 rows, NDV=500, OFFSET 995 LIMIT 100 + // with_fetch computes num_rows = min(1000 - 995, 100) = 5 + // NDV should be capped at 5 + let stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Exact(8000), + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Inexact(500), + ..Default::default() + }], + }; + + let result = stats.with_fetch(Some(100), 995, 1).unwrap(); + assert_eq!(result.num_rows, Precision::Exact(5)); + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Inexact(5) + ); + } + + #[test] + fn test_with_fetch_ndv_below_row_count_unchanged() { + // NDV=5 and LIMIT 10: NDV should stay at 5 + let stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Exact(8000), + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Inexact(5), + ..Default::default() + }], + }; + + let result = stats.with_fetch(Some(10), 0, 1).unwrap(); + assert_eq!(result.num_rows, Precision::Exact(10)); + assert_eq!( + result.column_statistics[0].distinct_count, + Precision::Inexact(5) + ); + } + #[test] fn test_try_merge_iter_basic() { let schema = Arc::new(Schema::new(vec![ diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index e38238f861739..22e0db663a3d4 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -281,17 +281,18 @@ async fn sql_limit() -> Result<()> { let df = ctx.sql("SELECT * FROM stats_table LIMIT 5").await.unwrap(); let physical_plan = df.create_physical_plan().await.unwrap(); // when the limit is smaller than the original number of lines we mark the statistics as inexact + // and cap NDV at the new row count + let limit_stats = physical_plan.partition_statistics(None)?; + assert_eq!(limit_stats.num_rows, Precision::Exact(5)); + // c1: NDV=2 stays at 2 (already below limit of 5) assert_eq!( - Statistics { - num_rows: Precision::Exact(5), - column_statistics: stats - .column_statistics - .iter() - .map(|c| c.clone().to_inexact()) - .collect(), - total_byte_size: Precision::Absent - }, - *physical_plan.partition_statistics(None)? + limit_stats.column_statistics[0].distinct_count, + Precision::Inexact(2) + ); + // c2: NDV=13 capped to 5 (the limit row count) + assert_eq!( + limit_stats.column_statistics[1].distinct_count, + Precision::Inexact(5) ); let df = ctx diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 8720d5f7d223b..f8bb9a141bde3 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -341,6 +341,10 @@ impl FilterExec { schema, &input_stats.column_statistics, analysis_ctx.boundaries, + match &num_rows { + Precision::Absent => None, + p => Some(*p), + }, ); Ok(Statistics { num_rows, @@ -781,6 +785,7 @@ fn collect_new_statistics( schema: &SchemaRef, input_column_stats: &[ColumnStatistics], analysis_boundaries: Vec, + filtered_num_rows: Option>, ) -> Vec { analysis_boundaries .into_iter() @@ -816,11 +821,15 @@ fn collect_new_statistics( let min_value = interval_bound_to_precision(lower, is_single_value); let max_value = interval_bound_to_precision(upper, is_single_value); // When the interval collapses to a single value (equality - // predicate), the column has exactly 1 distinct value + // predicate), the column has exactly 1 distinct value. + // Otherwise, cap NDV at the filtered row count. let capped_distinct_count = if is_single_value { Precision::Exact(1) } else { - distinct_count.to_inexact() + match filtered_num_rows { + Some(rows) => distinct_count.to_inexact().min(&rows), + None => distinct_count.to_inexact(), + } }; ColumnStatistics { null_count: input_column_stats[idx].null_count.to_inexact(), @@ -2398,10 +2407,12 @@ mod tests { statistics.column_statistics[0].distinct_count, Precision::Exact(1) ); - // b > 10 narrows to [11, 50] but doesn't collapse + // b > 10 narrows to [11, 50] but doesn't collapse to a single value. + // The combined selectivity of a=42 (1/80) and c=7 (1/150) on 100 rows + // computes num_rows = 1, so NDV is capped at the row count: min(40, 1) = 1. assert_eq!( statistics.column_statistics[1].distinct_count, - Precision::Inexact(40) + Precision::Inexact(1) ); // c = 7 collapses to single value assert_eq!( @@ -2639,4 +2650,41 @@ mod tests { assert_eq!(out_schema.field(1).name(), "tokens"); Ok(()) } + + #[tokio::test] + async fn test_filter_statistics_ndv_capped_at_row_count() -> Result<()> { + // Table: a: min=1, max=100, distinct_count=80, 100 rows + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics { + num_rows: Precision::Inexact(100), + total_byte_size: Precision::Inexact(400), + column_statistics: vec![ColumnStatistics { + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + max_value: Precision::Inexact(ScalarValue::Int32(Some(100))), + distinct_count: Precision::Inexact(80), + ..Default::default() + }], + }, + schema.clone(), + )); + + // a <= 10 => ~10 rows out of 100 + let predicate: Arc = + binary(col("a", &schema)?, Operator::LtEq, lit(10i32), &schema)?; + + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + + let statistics = filter.partition_statistics(None)?; + // Filter estimates ~10 rows (selectivity = 10/100) + assert_eq!(statistics.num_rows, Precision::Inexact(10)); + // NDV should be capped at the filtered row count (10), not the original 80 + let ndv = &statistics.column_statistics[0].distinct_count; + assert!( + ndv.get_value().copied() <= Some(10), + "Expected NDV <= 10 (filtered row count), got {ndv:?}" + ); + Ok(()) + } } diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 39a380eed9d41..b0ecf7b789f6b 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -709,7 +709,19 @@ fn max_distinct_count( stats: &ColumnStatistics, ) -> Precision { match &stats.distinct_count { - &dc @ (Precision::Exact(_) | Precision::Inexact(_)) => dc, + &dc @ (Precision::Exact(_) | Precision::Inexact(_)) => { + // NDV can never exceed the number of rows + match num_rows { + Precision::Absent => dc, + _ => { + if dc.get_value() <= num_rows.get_value() { + dc + } else { + num_rows.to_inexact() + } + } + } + } _ => { // The number can never be greater than the number of rows we have // minus the nulls (since they don't count as distinct values). @@ -2392,6 +2404,22 @@ mod tests { (10, Inexact(1), Inexact(10), Absent, Absent), Some(Inexact(0)), ), + // NDV > num_rows: distinct count should be capped at row count + ( + (5, Inexact(1), Inexact(100), Inexact(50), Absent), + (10, Inexact(1), Inexact(100), Inexact(50), Absent), + // max_distinct_count caps: left NDV=min(50,5)=5, right NDV=min(50,10)=10 + // cardinality = (5 * 10) / max(5, 10) = 50 / 10 = 5 + Some(Inexact(5)), + ), + // NDV > num_rows on one side only + ( + (3, Inexact(1), Inexact(100), Inexact(100), Absent), + (10, Inexact(1), Inexact(100), Inexact(5), Absent), + // max_distinct_count caps: left NDV=min(100,3)=3, right NDV=min(5,10)=5 + // cardinality = (3 * 10) / max(3, 5) = 30 / 5 = 6 + Some(Inexact(6)), + ), ]; for (left_info, right_info, expected_cardinality) in cases { @@ -2531,11 +2559,14 @@ mod tests { // y: min=0, max=100, distinct=None // // Join on a=c, b=d (ignore x/y) + // Right column d has NDV=2500 but only 2000 rows, so NDV is capped + // to 2000. join_selectivity = max(500, 2000) = 2000. + // Inner cardinality = (1000 * 2000) / 2000 = 1000 let cases = vec![ - (JoinType::Inner, 800), + (JoinType::Inner, 1000), (JoinType::Left, 1000), (JoinType::Right, 2000), - (JoinType::Full, 2200), + (JoinType::Full, 2000), ]; let left_col_stats = vec![ @@ -3170,4 +3201,76 @@ mod tests { assert_eq!(cmp_nl.compare(0, 0), Ordering::Greater); assert_eq!(cmp_nl.compare(1, 1), Ordering::Less); } + + #[test] + fn test_max_distinct_count_preserves_precision_when_not_capped() { + assert_eq!( + max_distinct_count( + &Exact(10), + &ColumnStatistics { + distinct_count: Exact(5), + ..Default::default() + } + ), + Exact(5) + ); + assert_eq!( + max_distinct_count( + &Exact(10), + &ColumnStatistics { + distinct_count: Inexact(5), + ..Default::default() + } + ), + Inexact(5) + ); + // Inexact num_rows does not affect an exact NDV that is within bounds + assert_eq!( + max_distinct_count( + &Inexact(10), + &ColumnStatistics { + distinct_count: Exact(5), + ..Default::default() + } + ), + Exact(5) + ); + } + + #[test] + fn test_max_distinct_count_demotes_to_inexact_when_capped() { + // Exact NDV > Exact num_rows is an illegal state (NDV <= num_rows is a + // mathematical invariant), but the code handles it defensively by + // capping and demoting to inexact + assert_eq!( + max_distinct_count( + &Exact(10), + &ColumnStatistics { + distinct_count: Exact(15), + ..Default::default() + } + ), + Inexact(10) + ); + assert_eq!( + max_distinct_count( + &Inexact(10), + &ColumnStatistics { + distinct_count: Exact(15), + ..Default::default() + } + ), + Inexact(10) + ); + assert_eq!( + max_distinct_count( + &Exact(10), + &ColumnStatistics { + distinct_count: Inexact(15), + ..Default::default() + } + ), + Inexact(10) + ); + } }