diff --git a/crates/integrations/datafusion/src/full_text_search.rs b/crates/integrations/datafusion/src/full_text_search.rs index 20ff38ff..03d208bb 100644 --- a/crates/integrations/datafusion/src/full_text_search.rs +++ b/crates/integrations/datafusion/src/full_text_search.rs @@ -188,6 +188,7 @@ impl TableProvider for FullTextSearchTableProvider { table, schema: &self.schema(), plan: &plan, + scan_trace: None, projection, pushed_predicate: None, limit, diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index a7387f4a..68a9c42f 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -29,7 +29,7 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; use futures::{StreamExt, TryStreamExt}; use paimon::spec::Predicate; -use paimon::table::Table; +use paimon::table::{ScanTrace, Table}; use paimon::DataSplit; use crate::error::to_datafusion_error; @@ -57,9 +57,12 @@ pub struct PaimonTableScan { /// Whether the pushed predicate is exact (no residual filtering needed). /// When true and all splits have known merged_row_count, statistics can be exact. filter_exact: bool, + /// Metadata-pruning trace captured during eager scan planning. + scan_trace: Option, } impl PaimonTableScan { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( schema: ArrowSchemaRef, table: Table, @@ -68,6 +71,7 @@ impl PaimonTableScan { planned_partitions: Vec>, limit: Option, filter_exact: bool, + scan_trace: Option, ) -> Self { let plan_properties = Arc::new(PlanProperties::new( EquivalenceProperties::new(schema.clone()), @@ -83,6 +87,7 @@ impl PaimonTableScan { plan_properties, limit, filter_exact, + scan_trace, } } @@ -245,6 +250,9 @@ impl DisplayAs for PaimonTableScan { if let Some(limit) = self.limit { write!(f, ", limit={limit}")?; } + if let Some(ref trace) = self.scan_trace { + write!(f, ", trace={trace}")?; + } Ok(()) } } @@ -290,6 +298,7 @@ mod tests { vec![Arc::from(Vec::new())], None, false, + None, ); assert_eq!(scan.properties().output_partitioning().partition_count(), 1); } @@ -310,6 +319,7 @@ mod tests { planned_partitions, None, false, + None, ); assert_eq!(scan.properties().output_partitioning().partition_count(), 3); } @@ -387,6 +397,7 @@ mod tests { vec![Arc::from(vec![split])], None, false, + None, ); let ctx = SessionContext::new(); diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 3508362a..761d2516 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -103,6 +103,7 @@ pub(crate) struct PaimonScanBuilder<'a> { pub(crate) table: &'a Table, pub(crate) schema: &'a ArrowSchemaRef, pub(crate) plan: &'a paimon::table::Plan, + pub(crate) scan_trace: Option, pub(crate) projection: Option<&'a Vec>, pub(crate) pushed_predicate: Option, pub(crate) limit: Option, @@ -149,6 +150,7 @@ impl PaimonScanBuilder<'_> { planned_partitions, self.limit, self.filter_exact, + self.scan_trace, ))) } } @@ -189,7 +191,7 @@ impl TableProvider for PaimonTableProvider { // Tokio runtime. `scan.plan()` can reach OpenDAL/Tokio filesystem calls while // reading Paimon metadata, so we must provide a runtime here instead of // assuming the caller already entered one. - let plan = await_with_runtime(scan.plan()) + let (plan, scan_trace) = await_with_runtime(scan.plan_with_trace()) .await .map_err(to_datafusion_error)?; @@ -203,6 +205,7 @@ impl TableProvider for PaimonTableProvider { table: &self.table, schema: &self.schema, plan: &plan, + scan_trace: Some(scan_trace), projection, pushed_predicate: filter_analysis.pushed_predicate, limit: pushed_limit, diff --git a/crates/integrations/datafusion/src/vector_search.rs b/crates/integrations/datafusion/src/vector_search.rs index 34daadc8..5b53d146 100644 --- a/crates/integrations/datafusion/src/vector_search.rs +++ b/crates/integrations/datafusion/src/vector_search.rs @@ -188,6 +188,7 @@ impl TableProvider for VectorSearchTableProvider { table, schema: &self.schema(), plan: &plan, + scan_trace: None, projection, pushed_predicate: None, limit, diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index 396e9514..84dc32ef 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -478,7 +478,7 @@ async fn test_residual_filter_limit_keeps_connector_limit_and_correctness() { assert!( scan_lines .iter() - .all(|line| !line.contains("limit=")), + .all(|line| !line.contains(", limit=")), "Residual filter queries should not push a scan limit hint when residual filters stay above the scan, plan:\n{plan_text}" ); diff --git a/crates/integrations/datafusion/tests/scan_pruning_trace.rs b/crates/integrations/datafusion/tests/scan_pruning_trace.rs new file mode 100644 index 00000000..c92ee8ad --- /dev/null +++ b/crates/integrations/datafusion/tests/scan_pruning_trace.rs @@ -0,0 +1,293 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Self-contained baselines for scan-pruning trace counters. + +mod common; + +use std::collections::HashMap; + +use datafusion::arrow::array::Int64Array; +use datafusion::physical_plan::displayable; +use paimon::catalog::Identifier; +use paimon::spec::{Datum, PredicateBuilder}; +use paimon::{Catalog, Table}; + +async fn setup_trace_table() -> (tempfile::TempDir, std::sync::Arc) { + let (tmp, catalog) = common::create_test_env(); + let sql_context = common::create_sql_context(catalog.clone()).await; + sql_context + .sql("CREATE SCHEMA paimon.test_db") + .await + .unwrap() + .collect() + .await + .unwrap(); + sql_context + .sql( + "CREATE TABLE paimon.test_db.trace_append ( + id INT, value INT, dt STRING + ) PARTITIONED BY (dt)", + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + + common::exec( + &sql_context, + "INSERT INTO paimon.test_db.trace_append VALUES + (1, 10, '2024-01-01'), (2, 20, '2024-01-01')", + ) + .await; + common::exec( + &sql_context, + "INSERT INTO paimon.test_db.trace_append VALUES + (100, 1000, '2024-01-02'), (101, 1010, '2024-01-02')", + ) + .await; + common::exec( + &sql_context, + "INSERT INTO paimon.test_db.trace_append VALUES + (200, 2000, '2024-01-03'), (201, 2010, '2024-01-03')", + ) + .await; + + (tmp, catalog) +} + +async fn load_table( + catalog: &std::sync::Arc, + table_name: &str, +) -> Table { + catalog + .get_table(&Identifier::new("test_db", table_name)) + .await + .unwrap() +} + +fn trace_counter(plan_text: &str, key: &str) -> usize { + let needle = format!("{key}="); + let start = plan_text + .find(&needle) + .unwrap_or_else(|| panic!("trace counter {key} not found in plan:\n{plan_text}")) + + needle.len(); + let end = plan_text[start..] + .find(|c: char| !c.is_ascii_digit()) + .map(|offset| start + offset) + .unwrap_or(plan_text.len()); + plan_text[start..end].parse().unwrap() +} + +fn trace_manifest_counts(plan_text: &str) -> (usize, usize) { + let needle = "manifests="; + let start = plan_text + .find(needle) + .unwrap_or_else(|| panic!("manifest counts not found in plan:\n{plan_text}")) + + needle.len(); + let rest = &plan_text[start..]; + let slash = rest + .find('/') + .unwrap_or_else(|| panic!("manifest counts missing slash in plan:\n{plan_text}")); + let after: usize = rest[..slash].parse().unwrap(); + let before_len = rest[slash + 1..] + .find(|c: char| !c.is_ascii_digit()) + .unwrap_or(rest.len() - slash - 1); + let before: usize = rest[slash + 1..slash + 1 + before_len].parse().unwrap(); + (after, before) +} + +#[tokio::test] +async fn test_scan_trace_records_partition_pruning() { + let (_tmp, catalog) = setup_trace_table().await; + let table = load_table(&catalog, "trace_append").await; + + let (_all_plan, all_trace) = table + .new_read_builder() + .new_scan() + .plan_with_trace() + .await + .unwrap(); + assert_eq!(all_trace.snapshot_id, Some(3)); + assert_eq!(all_trace.final_files, 3); + + let fields = table.schema().fields(); + let pb = PredicateBuilder::new(fields); + let mut partition_reader = table.new_read_builder(); + partition_reader.with_filter( + pb.equal("dt", Datum::String("2024-01-01".to_string())) + .unwrap(), + ); + let (_partition_plan, partition_trace) = + partition_reader.new_scan().plan_with_trace().await.unwrap(); + assert!( + partition_trace.manifest_files_after_partition_pruning + < partition_trace.manifest_files_before_partition_pruning + || partition_trace.manifest_entries_pruned_by_partition > 0, + "partition predicate should prune at manifest or entry level: {partition_trace:?}" + ); + assert!( + partition_trace.final_files < all_trace.final_files, + "partition pruning should reduce final files: all={all_trace:?}, filtered={partition_trace:?}" + ); +} + +#[tokio::test] +async fn test_sql_between_records_partition_pruning_trace() { + let (_tmp, catalog) = setup_trace_table().await; + let sql_context = common::create_sql_context(catalog).await; + let sql = "SELECT id, value FROM paimon.test_db.trace_append + WHERE dt BETWEEN '2024-01-01' AND '2024-01-02'"; + + let plan = sql_context + .sql(sql) + .await + .unwrap() + .create_physical_plan() + .await + .unwrap(); + let plan_text = displayable(plan.as_ref()).indent(true).to_string(); + let (manifests_after, manifests_before) = trace_manifest_counts(&plan_text); + let partition_pruned = trace_counter(&plan_text, "partition_pruned"); + assert!( + manifests_after < manifests_before || partition_pruned > 0, + "SQL BETWEEN over partition column should prune at manifest or entry level:\n{plan_text}" + ); + + let rows = common::collect_id_value(&sql_context, sql).await; + assert_eq!(rows, vec![(1, 10), (2, 20), (100, 1000), (101, 1010)]); +} + +#[tokio::test] +async fn test_count_star_uses_statistics_without_scan_trace() { + let (_tmp, catalog) = setup_trace_table().await; + let sql_context = common::create_sql_context(catalog).await; + let sql = "SELECT COUNT(*) FROM paimon.test_db.trace_append"; + + let df = sql_context.sql(sql).await.unwrap(); + let plan = df.create_physical_plan().await.unwrap(); + let plan_text = displayable(plan.as_ref()).indent(true).to_string(); + assert!( + !plan_text.contains("PaimonTableScan"), + "COUNT(*) should be satisfied from exact scan statistics without data scan:\n{plan_text}" + ); + assert!( + !plan_text.contains("trace="), + "COUNT(*) statistics rewrite should remove the scan node before trace display:\n{plan_text}" + ); + + let batches = sql_context.sql(sql).await.unwrap().collect().await.unwrap(); + let count = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(0); + assert_eq!(count, 6); +} + +#[tokio::test] +async fn test_scan_trace_records_bucket_limit_and_time_travel() { + let (tmp, catalog) = common::create_test_env(); + let sql_context = common::create_sql_context(catalog.clone()).await; + common::exec(&sql_context, "CREATE SCHEMA paimon.test_db").await; + common::exec( + &sql_context, + "CREATE TABLE paimon.test_db.trace_pk ( + id INT NOT NULL, value INT, + PRIMARY KEY (id) + ) WITH ('bucket' = '4')", + ) + .await; + common::exec( + &sql_context, + "INSERT INTO paimon.test_db.trace_pk VALUES + (1, 10), (2, 20), (3, 30), (4, 40)", + ) + .await; + common::exec( + &sql_context, + "INSERT INTO paimon.test_db.trace_pk VALUES + (5, 50), (6, 60), (7, 70), (8, 80)", + ) + .await; + + let table = load_table(&catalog, "trace_pk").await; + let fields = table.schema().fields(); + let pb = PredicateBuilder::new(fields); + let mut bucket_reader = table.new_read_builder(); + bucket_reader.with_filter(pb.equal("id", Datum::Int(1)).unwrap()); + let (_bucket_plan, bucket_trace) = bucket_reader.new_scan().plan_with_trace().await.unwrap(); + assert!( + bucket_trace.manifest_entries_pruned_by_bucket > 0, + "bucket-key predicate should prune manifest entries by bucket: {bucket_trace:?}" + ); + + let mut limit_reader = table.new_read_builder(); + limit_reader.with_limit(1); + let (_limit_plan, limit_trace) = limit_reader.new_scan().plan_with_trace().await.unwrap(); + assert!( + limit_trace.splits_after_limit < limit_trace.splits_before_limit, + "LIMIT should reduce planned splits when no data residual exists: {limit_trace:?}" + ); + + let snapshot_one_table = table.copy_with_options(HashMap::from([( + "scan.version".to_string(), + "1".to_string(), + )])); + let (_snapshot_one_plan, snapshot_one_trace) = snapshot_one_table + .new_read_builder() + .new_scan() + .plan_with_trace() + .await + .unwrap(); + assert_eq!(snapshot_one_trace.snapshot_id, Some(1)); + + let (_latest_plan, latest_trace) = table + .new_read_builder() + .new_scan() + .plan_with_trace() + .await + .unwrap(); + assert_eq!(latest_trace.snapshot_id, Some(2)); + assert!( + snapshot_one_trace.final_files < latest_trace.final_files, + "time travel should plan from the selected snapshot: snapshot1={snapshot_one_trace:?}, latest={latest_trace:?}" + ); + + drop(tmp); +} + +#[tokio::test] +async fn test_physical_plan_displays_scan_trace_summary() { + let (_tmp, catalog) = setup_trace_table().await; + let sql_context = common::create_sql_context(catalog).await; + let plan = sql_context + .sql("SELECT id FROM paimon.test_db.trace_append LIMIT 1") + .await + .unwrap() + .create_physical_plan() + .await + .unwrap(); + let plan_text = displayable(plan.as_ref()).indent(true).to_string(); + + assert!( + plan_text.contains("trace=") && plan_text.contains("splits_before_limit="), + "physical plan should include scan trace summary:\n{plan_text}" + ); +} diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index 15931227..031f4a21 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -44,7 +44,7 @@ pub use catalog::FileSystemCatalog; pub use table::{ CommitMessage, DataEvolutionWriter, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, - Plan, RESTEnv, RESTSnapshotCommit, ReadBuilder, RenamingSnapshotCommit, RowRange, + Plan, RESTEnv, RESTSnapshotCommit, ReadBuilder, RenamingSnapshotCommit, RowRange, ScanTrace, SnapshotCommit, SnapshotManager, Table, TableCommit, TableRead, TableScan, TableUpdate, TableWrite, TagManager, WriteBuilder, }; diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 04c25017..5cbe4393 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -46,6 +46,7 @@ mod read_builder; pub mod referenced_files; pub(crate) mod rest_env; pub(crate) mod row_id_predicate; +mod scan_trace; pub(crate) mod schema_manager; pub(crate) mod snapshot_commit; mod snapshot_manager; @@ -73,6 +74,7 @@ pub use full_text_search_builder::FullTextSearchBuilder; use futures::stream::BoxStream; pub use read_builder::ReadBuilder; pub use rest_env::RESTEnv; +pub use scan_trace::ScanTrace; pub use schema_manager::SchemaManager; pub use snapshot_commit::{RESTSnapshotCommit, RenamingSnapshotCommit, SnapshotCommit}; pub use snapshot_manager::SnapshotManager; diff --git a/crates/paimon/src/table/scan_trace.rs b/crates/paimon/src/table/scan_trace.rs new file mode 100644 index 00000000..d6e9e464 --- /dev/null +++ b/crates/paimon/src/table/scan_trace.rs @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Scan planning trace counters. + +use std::fmt; + +/// Stage counters collected while planning a table scan. +/// +/// The counters are intended for explain output and regression tests. They +/// describe pruning at metadata planning time only; reader-side Parquet row +/// group pruning and DataFusion residual filters are outside this trace. +#[non_exhaustive] +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct ScanTrace { + pub snapshot_id: Option, + pub base_manifest_files: usize, + pub delta_manifest_files: usize, + pub manifest_files_before_partition_pruning: usize, + pub manifest_files_after_partition_pruning: usize, + pub manifest_entries_read: usize, + pub manifest_entries_pruned_by_bucket: usize, + pub manifest_entries_pruned_by_partition: usize, + pub manifest_entries_after_entry_pruning: usize, + pub manifest_entries_pruned_by_level: usize, + pub manifest_entries_pruned_by_data_stats: usize, + pub manifest_entries_after_manifest_filters: usize, + pub manifest_entries_after_merge: usize, + pub manifest_entries_pruned_by_cross_schema_stats: usize, + pub manifest_entries_after_cross_schema_stats: usize, + pub data_evolution_groups_before_stats: usize, + pub data_evolution_groups_pruned_by_stats: usize, + pub data_evolution_groups_pruned_by_row_ranges: usize, + pub splits_before_limit: usize, + pub splits_after_limit: usize, + pub final_splits: usize, + pub final_files: usize, + pub limit: Option, +} + +impl ScanTrace { + pub(crate) fn record_manifest_lists(&mut self, base_count: usize, delta_count: usize) { + self.base_manifest_files = base_count; + self.delta_manifest_files = delta_count; + self.manifest_files_before_partition_pruning = base_count + delta_count; + } + + pub(crate) fn record_final_plan( + &mut self, + splits_before_limit: usize, + splits: usize, + files: usize, + ) { + self.splits_before_limit = splits_before_limit; + self.splits_after_limit = splits; + self.final_splits = splits; + self.final_files = files; + } +} + +impl fmt::Display for ScanTrace { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "snapshot={:?}, manifests={}/{}, entries_read={}, bucket_pruned={}, partition_pruned={}, data_stats_pruned={}, cross_schema_pruned={}, splits_before_limit={}, splits_after_limit={}, files={}", + self.snapshot_id, + self.manifest_files_after_partition_pruning, + self.manifest_files_before_partition_pruning, + self.manifest_entries_read, + self.manifest_entries_pruned_by_bucket, + self.manifest_entries_pruned_by_partition, + self.manifest_entries_pruned_by_data_stats, + self.manifest_entries_pruned_by_cross_schema_stats, + self.splits_before_limit, + self.splits_after_limit, + self.final_files + ) + } +} diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index 170405b6..78ecc684 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -41,6 +41,7 @@ use crate::table::source::{ any_range_overlaps_file, intersect_ranges_with_file, merge_row_ranges, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RowRange, }; +use crate::table::ScanTrace; use crate::table::SnapshotManager; use futures::{StreamExt, TryStreamExt}; use std::collections::{HashMap, HashSet}; @@ -51,6 +52,29 @@ const MANIFEST_DIR: &str = "manifest"; /// Path segment for index directory under table. const INDEX_DIR: &str = "index"; +#[derive(Debug, Default)] +struct ManifestReadCounters { + entries_read: usize, + pruned_by_bucket: usize, + pruned_by_partition: usize, + after_entry_pruning: usize, + pruned_by_level: usize, + pruned_by_data_stats: usize, + after_manifest_filters: usize, +} + +impl ManifestReadCounters { + fn merge(&mut self, other: Self) { + self.entries_read += other.entries_read; + self.pruned_by_bucket += other.pruned_by_bucket; + self.pruned_by_partition += other.pruned_by_partition; + self.after_entry_pruning += other.after_entry_pruning; + self.pruned_by_level += other.pruned_by_level; + self.pruned_by_data_stats += other.pruned_by_data_stats; + self.after_manifest_filters += other.after_manifest_filters; + } +} + /// Reads a manifest list file (Avro) and returns manifest file metas. async fn read_manifest_list( file_io: &FileIO, @@ -92,15 +116,21 @@ async fn read_all_manifest_entries( schema_fields: &[DataField], bucket_predicate: Option<&Predicate>, bucket_key_fields: &[DataField], + trace: Option<&mut ScanTrace>, ) -> crate::Result> { let (mut manifest_files, delta) = futures::try_join!( read_manifest_list(file_io, table_path, snapshot.base_manifest_list()), read_manifest_list(file_io, table_path, snapshot.delta_manifest_list()), )?; + let mut trace = trace; + if let Some(trace) = trace.as_deref_mut() { + trace.record_manifest_lists(manifest_files.len(), delta.len()); + } manifest_files.extend(delta); // Manifest-file-level partition stats pruning: skip entire manifest files // whose partition range doesn't overlap the partition predicate. + let manifest_files_before_partition_pruning = manifest_files.len(); if let Some(pf) = partition_filter { if !partition_fields.is_empty() { manifest_files.retain(|meta| { @@ -118,58 +148,76 @@ async fn read_all_manifest_entries( }); } } + if let Some(trace) = trace.as_deref_mut() { + trace.manifest_files_before_partition_pruning = manifest_files_before_partition_pruning; + trace.manifest_files_after_partition_pruning = manifest_files.len(); + } let manifest_path_prefix = format!("{}/{}", table_path.trim_end_matches('/'), MANIFEST_DIR); let shared_cache = SharedSchemaCache::new(); - let all_entries: Vec = futures::stream::iter(manifest_files) - .map(|meta| { - let path = format!("{}/{}", manifest_path_prefix, meta.file_name()); - let cache = shared_cache.clone(); - async move { - let input_file = file_io.new_input(&path)?; - let content = input_file.read().await?; - - // Per-task bucket cache (few distinct total_buckets values per manifest). - let mut bucket_cache: HashMap>> = HashMap::new(); - - let entries = crate::spec::avro::from_manifest_bytes_filtered_shared( - &content, - &cache, - &mut |_kind, partition_bytes, bucket, total_buckets| { - // Bucket filter (negative bucket = unassigned) - if has_primary_keys && !scan_all_files && bucket < 0 { - return false; - } - if let Some(pred) = bucket_predicate { - let targets = bucket_cache.entry(total_buckets).or_insert_with(|| { - compute_target_buckets(pred, bucket_key_fields, total_buckets) - }); - if let Some(targets) = targets { - if !targets.contains(&bucket) { - return false; + let manifest_results: Vec<(Vec, ManifestReadCounters)> = + futures::stream::iter(manifest_files) + .map(|meta| { + let path = format!("{}/{}", manifest_path_prefix, meta.file_name()); + let cache = shared_cache.clone(); + async move { + let input_file = file_io.new_input(&path)?; + let content = input_file.read().await?; + + // Per-task bucket cache (few distinct total_buckets values per manifest). + let mut bucket_cache: HashMap>> = HashMap::new(); + let mut counters = ManifestReadCounters::default(); + + let entries = crate::spec::avro::from_manifest_bytes_filtered_shared( + &content, + &cache, + &mut |_kind, partition_bytes, bucket, total_buckets| { + counters.entries_read += 1; + // Bucket filter (negative bucket = unassigned) + if has_primary_keys && !scan_all_files && bucket < 0 { + counters.pruned_by_bucket += 1; + return false; + } + if let Some(pred) = bucket_predicate { + let targets = + bucket_cache.entry(total_buckets).or_insert_with(|| { + compute_target_buckets( + pred, + bucket_key_fields, + total_buckets, + ) + }); + if let Some(targets) = targets { + if !targets.contains(&bucket) { + counters.pruned_by_bucket += 1; + return false; + } } } - } - // Partition filter - if let Some(pf) = partition_filter { - match pf.matches_entry(partition_bytes) { - Ok(false) => return false, - Ok(true) => {} - Err(_) => {} + // Partition filter + if let Some(pf) = partition_filter { + match pf.matches_entry(partition_bytes) { + Ok(false) => { + counters.pruned_by_partition += 1; + return false; + } + Ok(true) => {} + Err(_) => {} + } } - } - true - }, - )?; + true + }, + )?; + counters.after_entry_pruning = entries.len(); - // Post-filter: level-0 and data predicates (need DataFileMeta) - let filtered: Vec = entries - .into_iter() - .filter(|entry| { + // Post-filter: level-0 and data predicates (need DataFileMeta) + let mut filtered = Vec::with_capacity(entries.len()); + for entry in entries { if skip_level_zero && has_primary_keys && entry.file().level == 0 { - return false; + counters.pruned_by_level += 1; + continue; } if !data_predicates.is_empty() && !data_file_matches_predicates( @@ -179,20 +227,34 @@ async fn read_all_manifest_entries( schema_fields, ) { - return false; + counters.pruned_by_data_stats += 1; + continue; } - true - }) - .collect(); - Ok::<_, crate::Error>(filtered) - } - }) - .buffered(64) - .try_collect::>() - .await? - .into_iter() - .flatten() - .collect(); + filtered.push(entry); + } + counters.after_manifest_filters = filtered.len(); + Ok::<_, crate::Error>((filtered, counters)) + } + }) + .buffered(64) + .try_collect::>() + .await?; + + let mut counters = ManifestReadCounters::default(); + let mut all_entries = Vec::new(); + for (entries, manifest_counters) in manifest_results { + counters.merge(manifest_counters); + all_entries.extend(entries); + } + if let Some(trace) = trace { + trace.manifest_entries_read = counters.entries_read; + trace.manifest_entries_pruned_by_bucket = counters.pruned_by_bucket; + trace.manifest_entries_pruned_by_partition = counters.pruned_by_partition; + trace.manifest_entries_after_entry_pruning = counters.after_entry_pruning; + trace.manifest_entries_pruned_by_level = counters.pruned_by_level; + trace.manifest_entries_pruned_by_data_stats = counters.pruned_by_data_stats; + trace.manifest_entries_after_manifest_filters = counters.after_manifest_filters; + } Ok(all_entries) } @@ -368,7 +430,22 @@ impl<'a> TableScan<'a> { Some(snapshot) => snapshot, None => return Ok(Plan::new(Vec::new())), }; - self.plan_snapshot(snapshot).await + self.plan_snapshot(snapshot, None).await + } + + /// Plan the full scan and return metadata-pruning trace counters. + pub async fn plan_with_trace(&self) -> crate::Result<(Plan, ScanTrace)> { + let mut trace = ScanTrace { + limit: self.limit, + ..Default::default() + }; + let snapshot = match self.resolve_snapshot().await? { + Some(snapshot) => snapshot, + None => return Ok((Plan::new(Vec::new()), trace)), + }; + trace.snapshot_id = Some(snapshot.id()); + let plan = self.plan_snapshot(snapshot, Some(&mut trace)).await?; + Ok((plan, trace)) } async fn resolve_snapshot(&self) -> crate::Result> { @@ -456,6 +533,14 @@ impl<'a> TableScan<'a> { pub(crate) async fn plan_manifest_entries( &self, snapshot: &Snapshot, + ) -> crate::Result> { + self.plan_manifest_entries_with_trace(snapshot, None).await + } + + async fn plan_manifest_entries_with_trace( + &self, + snapshot: &Snapshot, + mut trace: Option<&mut ScanTrace>, ) -> crate::Result> { let file_io = self.table.file_io(); let table_path = self.table.location(); @@ -527,16 +612,25 @@ impl<'a> TableScan<'a> { self.table.schema().fields(), self.bucket_predicate.as_ref(), &bucket_key_fields, + trace.as_deref_mut(), ) .await?; - Ok(merge_manifest_entries(entries)) + let merged = merge_manifest_entries(entries); + if let Some(trace) = trace { + trace.manifest_entries_after_merge = merged.len(); + } + Ok(merged) } fn can_push_down_limit_hint(&self, row_ranges: Option<&[RowRange]>) -> bool { can_push_down_limit_hint_for_scan(&self.data_predicates, row_ranges) } - async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result { + async fn plan_snapshot( + &self, + snapshot: Snapshot, + mut trace: Option<&mut ScanTrace>, + ) -> crate::Result { let file_io = self.table.file_io(); let table_path = self.table.location(); let core_options = CoreOptions::new(self.table.schema().options()); @@ -545,8 +639,13 @@ impl<'a> TableScan<'a> { let open_file_cost = core_options.source_split_open_file_cost(); let partition_keys = self.table.schema().partition_keys(); - let entries = self.plan_manifest_entries(&snapshot).await?; + let entries = self + .plan_manifest_entries_with_trace(&snapshot, trace.as_deref_mut()) + .await?; if entries.is_empty() { + if let Some(trace) = trace { + trace.record_final_plan(0, 0, 0); + } return Ok(Plan::new(Vec::new())); } @@ -560,8 +659,12 @@ impl<'a> TableScan<'a> { .iter() .any(|e| e.file().schema_id != current_schema_id); if !has_cross_schema { + if let Some(trace) = trace.as_deref_mut() { + trace.manifest_entries_after_cross_schema_stats = entries.len(); + } entries } else { + let before = entries.len(); let mut kept = Vec::with_capacity(entries.len()); let mut schema_cache: HashMap>> = HashMap::new(); @@ -578,11 +681,22 @@ impl<'a> TableScan<'a> { kept.push(entry); } } + if let Some(trace) = trace.as_deref_mut() { + trace.manifest_entries_pruned_by_cross_schema_stats += before - kept.len(); + trace.manifest_entries_after_cross_schema_stats = kept.len(); + } kept } }; if entries.is_empty() { + if let Some(trace) = trace { + trace.record_final_plan(0, 0, 0); + } return Ok(Plan::new(Vec::new())); + } else if let Some(trace) = trace.as_deref_mut() { + if trace.manifest_entries_after_cross_schema_stats == 0 { + trace.manifest_entries_after_cross_schema_stats = entries.len(); + } } // Group by (partition, bucket), decomposing entries to avoid cloning partition. @@ -681,12 +795,16 @@ impl<'a> TableScan<'a> { // Apply group-level predicate filtering after grouping by row_id range. let file_groups: Vec = if data_evolution_enabled { let row_id_groups = group_by_overlapping_row_id(data_files); + if let Some(trace) = trace.as_deref_mut() { + trace.data_evolution_groups_before_stats += row_id_groups.len(); + } // Filter groups by merged stats before splitting. let row_id_groups: Vec> = if self.data_predicates.is_empty() { row_id_groups } else { - row_id_groups + let before = row_id_groups.len(); + let groups = row_id_groups .into_iter() .filter(|group| { data_evolution_group_matches_predicates( @@ -695,15 +813,24 @@ impl<'a> TableScan<'a> { self.table.schema().fields(), ) }) - .collect() + .collect::>(); + if let Some(trace) = trace.as_deref_mut() { + trace.data_evolution_groups_pruned_by_stats += before - groups.len(); + } + groups }; // Filter groups by row ID ranges. let row_id_groups = if let Some(ref ranges) = effective_row_ranges { - row_id_groups + let before = row_id_groups.len(); + let groups = row_id_groups .into_iter() .filter(|group| group.iter().any(|f| any_range_overlaps_file(ranges, f))) - .collect() + .collect::>(); + if let Some(trace) = trace.as_deref_mut() { + trace.data_evolution_groups_pruned_by_row_ranges += before - groups.len(); + } + groups } else { row_id_groups }; @@ -811,11 +938,16 @@ impl<'a> TableScan<'a> { // With data predicates or row_ranges, merged_row_count() reflects pre-filter // row counts, so stopping early could return fewer rows than the limit. + let splits_before_limit = splits.len(); let splits = if self.can_push_down_limit_hint(effective_row_ranges.as_deref()) { self.apply_limit_pushdown(splits) } else { splits }; + if let Some(trace) = trace { + let final_files = splits.iter().map(|split| split.data_files().len()).sum(); + trace.record_final_plan(splits_before_limit, splits.len(), final_files); + } Ok(Plan::new(splits)) } @@ -836,7 +968,7 @@ mod tests { use crate::table::partition_filter::PartitionFilter; use crate::table::source::{DataSplit, DataSplitBuilder, DeletionFile}; use crate::table::stats_filter::{data_file_matches_predicates, group_by_overlapping_row_id}; - use crate::table::Table; + use crate::table::{CommitMessage, Table, TableCommit}; use crate::Error; use chrono::{DateTime, Utc}; @@ -985,6 +1117,46 @@ mod tests { } } + fn scan_trace_test_table(table_path: &str) -> Table { + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let schema = PaimonSchema::builder() + .column("id", DataType::Int(IntType::new())) + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + Table::new( + file_io, + Identifier::new("test_db", "scan_trace"), + table_path.to_string(), + table_schema, + None, + ) + } + + async fn setup_scan_trace_dirs(table: &Table) { + table + .file_io() + .mkdirs(&format!("{}/snapshot/", table.location())) + .await + .unwrap(); + table + .file_io() + .mkdirs(&format!("{}/manifest/", table.location())) + .await + .unwrap(); + } + + fn stats_trace_file(name: &str, min_id: i32, max_id: i32) -> DataFileMeta { + let mut file = test_data_file_meta( + int_stats_row(Some(min_id)), + int_stats_row(Some(max_id)), + vec![Some(0)], + 2, + ); + file.file_name = name.to_string(); + file + } + fn limit_test_table() -> Table { let file_io = FileIOBuilder::new("file").build().unwrap(); let schema = PaimonSchema::builder().build().unwrap(); @@ -1284,6 +1456,45 @@ mod tests { )); } + #[tokio::test] + async fn test_plan_with_trace_records_between_data_stats_pruning() { + let table_path = "memory:/test_plan_with_trace_records_between_data_stats_pruning"; + let table = scan_trace_test_table(table_path); + setup_scan_trace_dirs(&table).await; + + TableCommit::new(table.clone(), "scan-trace-test".to_string()) + .commit(vec![CommitMessage::new( + Vec::new(), + 0, + vec![ + stats_trace_file("stats-1.parquet", 1, 2), + stats_trace_file("stats-2.parquet", 10, 20), + stats_trace_file("stats-3.parquet", 100, 101), + ], + )]) + .await + .unwrap(); + + let fields = int_field(); + let pb = PredicateBuilder::new(&fields); + let between = Predicate::and(vec![ + pb.greater_or_equal("id", Datum::Int(10)).unwrap(), + pb.less_or_equal("id", Datum::Int(20)).unwrap(), + ]); + let mut reader = table.new_read_builder(); + reader.with_filter(between); + let (_plan, trace) = reader.new_scan().plan_with_trace().await.unwrap(); + + assert_eq!( + trace.final_files, 1, + "BETWEEN should keep only the overlapping stats range: {trace:?}" + ); + assert!( + trace.manifest_entries_pruned_by_data_stats >= 2, + "BETWEEN should prune files outside the min/max range: {trace:?}" + ); + } + #[test] fn test_data_file_matches_is_null_prunes_when_null_count_is_zero() { let fields = int_field();