Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 15 additions & 28 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,12 +772,22 @@ impl MetadataLoadedParquetOpen {
prepared.physical_file_schema = Arc::clone(&physical_file_schema);

// Build predicates for this specific file
let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
let pruning_predicate = build_pruning_predicates(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better is to delay it so they are only created after we loaded the page index, this avoids doing it as well for files without page index (and avoids it in short-circuit scenarios / query cancellation).

prepared.predicate.as_ref(),
&physical_file_schema,
&prepared.predicate_creation_errors,
);

// Only build page pruning predicate if page index is enabled
let page_pruning_predicate = if prepared.enable_page_index {
prepared.predicate.as_ref().and_then(|predicate| {
let p = build_page_pruning_predicate(predicate, &physical_file_schema);
(p.filter_number() > 0).then_some(p)
})
} else {
None
};

Ok(FiltersPreparedParquetOpen {
loaded: MetadataLoadedParquetOpen {
prepared,
Expand All @@ -797,10 +807,7 @@ impl FiltersPreparedParquetOpen {
// metadata load above may not have read the page index structures yet.
// If we need them for reading and they aren't yet loaded, we need to
// load them now.
if should_enable_page_index(
self.loaded.prepared.enable_page_index,
&self.page_pruning_predicate,
) {
if self.page_pruning_predicate.is_some() {
self.loaded.reader_metadata = load_page_index(
self.loaded.reader_metadata,
&mut self.loaded.prepared.async_file_reader,
Expand Down Expand Up @@ -1513,20 +1520,11 @@ pub(crate) fn build_pruning_predicates(
predicate: Option<&Arc<dyn PhysicalExpr>>,
file_schema: &SchemaRef,
predicate_creation_errors: &Count,
) -> (
Option<Arc<PruningPredicate>>,
Option<Arc<PagePruningAccessPlanFilter>>,
) {
) -> Option<Arc<PruningPredicate>> {
let Some(predicate) = predicate.as_ref() else {
return (None, None);
return None;
};
let pruning_predicate = build_pruning_predicate(
Arc::clone(predicate),
file_schema,
predicate_creation_errors,
);
let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema);
(pruning_predicate, Some(page_pruning_predicate))
build_pruning_predicate(Arc::clone(predicate), file_schema, predicate_creation_errors)
}

/// Returns a `ArrowReaderMetadata` with the page index loaded, loading
Expand Down Expand Up @@ -1560,17 +1558,6 @@ async fn load_page_index<T: AsyncFileReader>(
}
}

fn should_enable_page_index(
enable_page_index: bool,
page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>,
) -> bool {
enable_page_index
&& page_pruning_predicate.is_some()
&& page_pruning_predicate
.as_ref()
.map(|p| p.filter_number() > 0)
.unwrap_or(false)
}

#[cfg(test)]
mod test {
Expand Down
8 changes: 4 additions & 4 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,17 +627,17 @@ impl FileSource for ParquetSource {
write!(f, ", reverse_row_groups=true")?;
}

// Try to build a the pruning predicates.
// Try to build the pruning predicates.
// These are only generated here because it's useful to have *some*
// idea of what pushdown is happening when viewing plans.
// However it is important to note that these predicates are *not*
// However, it is important to note that these predicates are *not*
// necessarily the predicates that are actually evaluated:
// the actual predicates are built in reference to the physical schema of
// each file, which we do not have at this point and hence cannot use.
// Instead we use the logical schema of the file (the table schema without partition columns).
// Instead, we use the logical schema of the file (the table schema without partition columns).
if let Some(predicate) = &self.predicate {
let predicate_creation_errors = Count::new();
if let (Some(pruning_predicate), _) = build_pruning_predicates(
if let Some(pruning_predicate) = build_pruning_predicates(
Some(predicate),
self.table_schema.table_schema(),
&predicate_creation_errors,
Expand Down
Loading