Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 278 additions & 54 deletions datafusion/datasource/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,80 @@ fn sort_columns_from_physical_sort_exprs(
.collect()
}

fn seed_summary_statistics(summary_statistics: &mut Statistics, file_stats: &Statistics) {
summary_statistics.num_rows = file_stats.num_rows;
summary_statistics.total_byte_size = file_stats.total_byte_size;

for (summary_col_stats, file_col_stats) in summary_statistics
.column_statistics
.iter_mut()
.zip(file_stats.column_statistics.iter())
{
summary_col_stats.null_count = file_col_stats.null_count;
summary_col_stats.max_value = file_col_stats.max_value.clone();
summary_col_stats.min_value = file_col_stats.min_value.clone();
summary_col_stats.sum_value = file_col_stats.sum_value.cast_to_sum_type();
summary_col_stats.byte_size = file_col_stats.byte_size;
}
}

fn merge_summary_statistics(
summary_statistics: &mut Statistics,
file_stats: &Statistics,
) {
summary_statistics.num_rows = summary_statistics.num_rows.add(&file_stats.num_rows);
summary_statistics.total_byte_size = summary_statistics
.total_byte_size
.add(&file_stats.total_byte_size);

for (summary_col_stats, file_col_stats) in summary_statistics
.column_statistics
.iter_mut()
.zip(file_stats.column_statistics.iter())
{
let ColumnStatistics {
null_count: file_nc,
max_value: file_max,
min_value: file_min,
sum_value: file_sum,
distinct_count: _,
byte_size: file_sbs,
} = file_col_stats;

summary_col_stats.null_count = summary_col_stats.null_count.add(file_nc);
summary_col_stats.max_value = summary_col_stats.max_value.max(file_max);
summary_col_stats.min_value = summary_col_stats.min_value.min(file_min);
summary_col_stats.sum_value = summary_col_stats.sum_value.add_for_sum(file_sum);
summary_col_stats.byte_size = summary_col_stats.byte_size.add(file_sbs);
}
}

fn seed_first_file_statistics(
limit_num_rows: &mut Precision<usize>,
summary_statistics: &mut Statistics,
file_stats: &Statistics,
collect_stats: bool,
) {
*limit_num_rows = file_stats.num_rows;

if collect_stats {
seed_summary_statistics(summary_statistics, file_stats);
}
}

fn merge_file_statistics(
limit_num_rows: &mut Precision<usize>,
summary_statistics: &mut Statistics,
file_stats: &Statistics,
collect_stats: bool,
) {
*limit_num_rows = limit_num_rows.add(&file_stats.num_rows);

if collect_stats {
merge_summary_statistics(summary_statistics, file_stats);
}
}

/// Get all files as well as the file level summary statistics (no statistic for partition columns).
/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to
/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on
Expand All @@ -308,9 +382,14 @@ pub async fn get_statistics_with_limit(
// - zero for summations, and
// - neutral element for extreme points.
let size = file_schema.fields().len();
let mut col_stats_set = vec![ColumnStatistics::default(); size];
let mut num_rows = Precision::<usize>::Absent;
let mut total_byte_size = Precision::<usize>::Absent;
let mut summary_statistics = Statistics {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: vec![ColumnStatistics::default(); size],
};
// Keep limit pruning separate from the returned summary so `collect_stats=false`
// can still stop early using known file row counts.
let mut limit_num_rows = Precision::<usize>::Absent;

// Fusing the stream allows us to call next safely even once it is finished.
let mut all_files = Box::pin(all_files.fuse());
Expand All @@ -320,23 +399,18 @@ pub async fn get_statistics_with_limit(
file.statistics = Some(Arc::clone(&file_stats));
result_files.push(file);

// First file, we set them directly from the file statistics.
num_rows = file_stats.num_rows;
total_byte_size = file_stats.total_byte_size;
for (index, file_column) in
file_stats.column_statistics.clone().into_iter().enumerate()
{
col_stats_set[index].null_count = file_column.null_count;
col_stats_set[index].max_value = file_column.max_value;
col_stats_set[index].min_value = file_column.min_value;
col_stats_set[index].sum_value = file_column.sum_value.cast_to_sum_type();
}
seed_first_file_statistics(
&mut limit_num_rows,
&mut summary_statistics,
&file_stats,
collect_stats,
);

// If the number of rows exceeds the limit, we can stop processing
// files. This only applies when we know the number of rows. It also
// currently ignores tables that have no statistics regarding the
// number of rows.
let conservative_num_rows = match num_rows {
let conservative_num_rows = match limit_num_rows {
Precision::Exact(nr) => nr,
_ => usize::MIN,
};
Expand All @@ -345,44 +419,18 @@ pub async fn get_statistics_with_limit(
let (mut file, file_stats) = current?;
file.statistics = Some(Arc::clone(&file_stats));
result_files.push(file);
if !collect_stats {
continue;
}

// We accumulate the number of rows, total byte size and null
// counts across all the files in question. If any file does not
// provide any information or provides an inexact value, we demote
// the statistic precision to inexact.
num_rows = num_rows.add(&file_stats.num_rows);

total_byte_size = total_byte_size.add(&file_stats.total_byte_size);

for (file_col_stats, col_stats) in file_stats
.column_statistics
.iter()
.zip(col_stats_set.iter_mut())
{
let ColumnStatistics {
null_count: file_nc,
max_value: file_max,
min_value: file_min,
sum_value: file_sum,
distinct_count: _,
byte_size: file_sbs,
} = file_col_stats;

col_stats.null_count = col_stats.null_count.add(file_nc);
col_stats.max_value = col_stats.max_value.max(file_max);
col_stats.min_value = col_stats.min_value.min(file_min);
col_stats.sum_value = col_stats.sum_value.add_for_sum(file_sum);
col_stats.byte_size = col_stats.byte_size.add(file_sbs);
}
merge_file_statistics(
&mut limit_num_rows,
&mut summary_statistics,
&file_stats,
collect_stats,
);

// If the number of rows exceeds the limit, we can stop processing
// files. This only applies when we know the number of rows. It also
// currently ignores tables that have no statistics regarding the
// number of rows.
if num_rows.get_value().unwrap_or(&usize::MIN)
if limit_num_rows.get_value().unwrap_or(&usize::MIN)
> &limit.unwrap_or(usize::MAX)
{
break;
Expand All @@ -391,11 +439,7 @@ pub async fn get_statistics_with_limit(
}
};

let mut statistics = Statistics {
num_rows,
total_byte_size,
column_statistics: col_stats_set,
};
let mut statistics = summary_statistics;
if all_files.next().await.is_some() {
// If we still have files in the stream, it means that the limit kicked
// in, and the statistic could have been different had we processed the
Expand Down Expand Up @@ -520,6 +564,39 @@ mod tests {
}
}

fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]))
}

fn make_file_stats(
num_rows: usize,
total_byte_size: usize,
col_stats: ColumnStatistics,
) -> Arc<Statistics> {
Arc::new(Statistics {
num_rows: Precision::Exact(num_rows),
total_byte_size: Precision::Exact(total_byte_size),
column_statistics: vec![col_stats],
})
}

fn rich_col_stats(
null_count: usize,
min: i64,
max: i64,
sum: i64,
byte_size: usize,
) -> ColumnStatistics {
ColumnStatistics {
null_count: Precision::Exact(null_count),
max_value: Precision::Exact(ScalarValue::Int64(Some(max))),
min_value: Precision::Exact(ScalarValue::Int64(Some(min))),
distinct_count: Precision::Absent,
sum_value: Precision::Exact(ScalarValue::Int64(Some(sum))),
byte_size: Precision::Exact(byte_size),
}
}

#[tokio::test]
#[expect(deprecated)]
async fn test_get_statistics_with_limit_casts_first_file_sum_to_sum_type()
Expand All @@ -533,7 +610,7 @@ mod tests {
))]);

let (_group, stats) =
get_statistics_with_limit(files, schema, None, false).await?;
get_statistics_with_limit(files, schema, None, true).await?;

assert_eq!(
stats.column_statistics[0].sum_value,
Expand Down Expand Up @@ -571,4 +648,151 @@ mod tests {

Ok(())
}

#[tokio::test]
#[expect(deprecated)]
async fn get_statistics_with_limit_collect_stats_false_returns_bare_statistics() {
let all_files = stream::iter(vec![
Ok((
PartitionedFile::new("first.parquet", 10),
make_file_stats(0, 0, rich_col_stats(1, 1, 9, 15, 64)),
)),
Ok((
PartitionedFile::new("second.parquet", 20),
make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)),
)),
]);

let (_files, statistics) =
get_statistics_with_limit(all_files, test_schema(), None, false)
.await
.unwrap();

assert_eq!(statistics.num_rows, Precision::Absent);
assert_eq!(statistics.total_byte_size, Precision::Absent);
assert_eq!(statistics.column_statistics.len(), 1);
assert_eq!(
statistics.column_statistics[0].null_count,
Precision::Absent
);
assert_eq!(statistics.column_statistics[0].max_value, Precision::Absent);
assert_eq!(statistics.column_statistics[0].min_value, Precision::Absent);
assert_eq!(statistics.column_statistics[0].sum_value, Precision::Absent);
assert_eq!(statistics.column_statistics[0].byte_size, Precision::Absent);
}

#[tokio::test]
#[expect(deprecated)]
async fn get_statistics_with_limit_collect_stats_false_uses_row_counts_for_limit() {
let all_files = stream::iter(vec![
Ok((
PartitionedFile::new("first.parquet", 10),
make_file_stats(3, 30, rich_col_stats(1, 1, 9, 15, 64)),
)),
Ok((
PartitionedFile::new("second.parquet", 20),
make_file_stats(3, 30, rich_col_stats(2, 10, 99, 300, 128)),
)),
Ok((
PartitionedFile::new("third.parquet", 30),
make_file_stats(3, 30, rich_col_stats(0, 100, 199, 450, 256)),
)),
]);

let (files, statistics) =
get_statistics_with_limit(all_files, test_schema(), Some(4), false)
.await
.unwrap();

assert_eq!(files.len(), 2);
assert_eq!(statistics.num_rows, Precision::Absent);
assert_eq!(statistics.total_byte_size, Precision::Absent);
}

#[tokio::test]
#[expect(deprecated)]
async fn get_statistics_with_limit_collect_stats_true_aggregates_statistics() {
let all_files = stream::iter(vec![
Ok((
PartitionedFile::new("first.parquet", 10),
make_file_stats(5, 50, rich_col_stats(1, 1, 9, 15, 64)),
)),
Ok((
PartitionedFile::new("second.parquet", 20),
make_file_stats(10, 100, rich_col_stats(2, 10, 99, 300, 128)),
)),
]);

let (_files, statistics) =
get_statistics_with_limit(all_files, test_schema(), None, true)
.await
.unwrap();

assert_eq!(statistics.num_rows, Precision::Exact(15));
assert_eq!(statistics.total_byte_size, Precision::Exact(150));
assert_eq!(
statistics.column_statistics[0].null_count,
Precision::Exact(3)
);
assert_eq!(
statistics.column_statistics[0].min_value,
Precision::Exact(ScalarValue::Int64(Some(1)))
);
assert_eq!(
statistics.column_statistics[0].max_value,
Precision::Exact(ScalarValue::Int64(Some(99)))
);
assert_eq!(
statistics.column_statistics[0].sum_value,
Precision::Exact(ScalarValue::Int64(Some(315)))
);
assert_eq!(
statistics.column_statistics[0].byte_size,
Precision::Exact(192)
);
}

#[tokio::test]
#[expect(deprecated)]
async fn get_statistics_with_limit_collect_stats_true_limit_marks_inexact() {
let all_files = stream::iter(vec![
Ok((
PartitionedFile::new("first.parquet", 10),
make_file_stats(5, 50, rich_col_stats(0, 1, 5, 15, 64)),
)),
Ok((
PartitionedFile::new("second.parquet", 20),
make_file_stats(5, 50, rich_col_stats(1, 6, 10, 40, 64)),
)),
Ok((
PartitionedFile::new("third.parquet", 20),
make_file_stats(5, 50, rich_col_stats(2, 11, 15, 65, 64)),
)),
]);

let (files, statistics) =
get_statistics_with_limit(all_files, test_schema(), Some(8), true)
.await
.unwrap();

assert_eq!(files.len(), 2);
assert_eq!(statistics.num_rows, Precision::Inexact(10));
assert_eq!(statistics.total_byte_size, Precision::Inexact(100));
assert_eq!(
statistics.column_statistics[0].min_value,
Precision::Inexact(ScalarValue::Int64(Some(1)))
);
assert_eq!(
statistics.column_statistics[0].max_value,
Precision::Inexact(ScalarValue::Int64(Some(10)))
);
assert_eq!(
statistics.column_statistics[0].sum_value,
Precision::Inexact(ScalarValue::Int64(Some(55)))
);
assert_eq!(
statistics.column_statistics[0].byte_size,
Precision::Inexact(128)
);
}
}
Loading
Loading