Skip to content

⚡ Bolt: Implement morsel-driven execution for ParquetExec#313

Open
Dandandan wants to merge 119 commits intomainfrom
parquet-morsel-driven-execution-237164415184908839
Open

⚡ Bolt: Implement morsel-driven execution for ParquetExec#313
Dandandan wants to merge 119 commits intomainfrom
parquet-morsel-driven-execution-237164415184908839

Conversation

@Dandandan
Copy link
Copy Markdown
Owner

Implement morsel-driven execution for ParquetExec to avoid problems with data skew. Splits files into individual row groups and shares them across partitions via a central work queue. Includes metadata caching and pruning during morselization.


PR created automatically by Jules for task 237164415184908839 started by @Dandandan

This PR implements morsel-driven execution for Parquet files in DataFusion, enabling row-group level work sharing across partitions to mitigate data skew.

Key changes:
- Introduced `WorkQueue` in `datafusion/datasource/src/file_stream.rs` for shared pool of work.
- Added `morselize` method to `FileOpener` trait to allow dynamic splitting of files into morsels.
- Implemented `morselize` for `ParquetOpener` to split files into individual row groups.
- Cached `ParquetMetaData` in `ParquetMorsel` extensions to avoid redundant I/O.
- Modified `FileStream` to support work stealing from the shared queue.
- Implemented `Weak` pointer pattern for `WorkQueue` in `FileScanConfig` to support plan re-executability.
- Added `MorselizingGuard` to ensure shared state consistency on cancellation.
- Added `allow_morsel_driven` configuration option (enabled by default for Parquet).
- Implemented row-group pruning during the morselization phase for better efficiency.

Tests:
- Added `parquet_morsel_driven_execution` test to verify work distribution and re-executability.
- Added `parquet_morsel_driven_enabled_by_default` to verify the default configuration.

Co-authored-by: Dandandan <163737+Dandandan@users.noreply.github.com>
@google-labs-jules
Copy link
Copy Markdown

👋 Jules, reporting for duty! I'm here to lend a hand with this pull request.

When you start a review, I'll add a 👀 emoji to each comment to let you know I've read it. I'll focus on feedback directed at me and will do my best to stay out of conversations between you and other bots or reviewers to keep the noise down.

I'll push a commit with your requested changes shortly after. Please note there might be a delay between these steps, but rest assured I'm on the job!

For more direct control, you can switch me to Reactive Mode. When this mode is on, I will only act on comments where you specifically mention me with @jules. You can find this option in the Pull Request section of your global Jules UI settings. You can always switch back!

New to Jules? Learn more at jules.google/docs.


For security, I will only act on instructions from the user who triggered this task.

@github-actions github-actions bot added the proto label Feb 22, 2026
Dandandan and others added 20 commits March 9, 2026 10:38
Instead of splitting into just the deficit (num_partitions - remaining),
always split into num_partitions pieces when queue depth is low. This
ensures every morsel passing through the low-queue state gets uniformly
split, so all workers finish at roughly the same time.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Avoid splitting during steady-state queue draining where the queue
temporarily dips below num_partitions between morselization bursts.
This eliminates overhead for workloads like TPC-H where row groups
are already balanced. Splitting now only triggers when all files have
been morselized and no more morsels are incoming.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the fixed num_partitions split count with a target of ~100K
rows per sub-morsel. This is self-regulating: small row groups are
not split (avoiding overhead), while large ones get proportionally
more sub-morsels for fine-grained parallelism regardless of the
number of execution partitions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tightening dynamic filters progressively improve during execution —
each completed sub-morsel can tighten the filter, allowing subsequent
sub-morsels to skip more data. This commit distinguishes tightening
filters from one-shot filters (e.g. hash join) and always splits
morsels when a tightening filter is present, rather than only at the
tail of execution.

- Add `tightening` field to DynamicFilterPhysicalExpr with builder/getter
- Mark TopK and aggregation dynamic filters as tightening
- Add `has_tightening_dynamic_filter()` helper to walk expression trees
- Wire up tightening detection in DataSourceExec::execute() to WorkQueue
- WorkQueue::pull() now always splits when tightening filter is present

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Remove tightening dynamic filter distinction and always split morsels
into TARGET_ROWS_PER_SUB_MORSEL (~100K) pieces when total morsels <
num_partitions. This is simpler and uniformly benefits all queries by
keeping workers busy with right-sized sub-morsels.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…l-only otherwise

Reverts the unconditional pre-splitting of row groups into ~100K sub-morsels
which caused regressions on many ClickBench queries (up to 3.4x slower) due
to RowSelection overhead and morsel coordination cost.

Now uses a hybrid approach:
- Normal queries: one morsel per row group, no splitting overhead
- Tightening dynamic filters (TopK, aggregation): eager split during
  morselize() so the filter improves faster with each sub-morsel
- Tail parallelism: WorkQueue splits on-demand when queue depth is low
  and all files are morselized, keeping workers busy at end of execution

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Splitting by row count doesn't account for column width — narrow scans
like SELECT MIN(col) create many tiny sub-morsels that pay per-morsel
setup cost for negligible data. Wide scans with many columns benefit
from splitting but need fewer splits than row count suggests.

Now estimates projected compressed bytes from parquet column metadata,
targeting ~32MB per sub-morsel. This naturally avoids splitting for:
- Q6 (MIN/MAX on single column): ~few MB projected → no split
- Q1 (COUNT with filter on narrow column): small → no split

While still splitting effectively for:
- Q36-42 (wide scans with many columns): hundreds of MB → good splits

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
At 8MB, narrow scans (Q6, Q1, Q25) never split while wide scans
(Q36-39) get 4-8 splits for good parallelism.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Reduces TARGET_BYTES_PER_SUB_MORSEL from 8MB to 1MB for finer parallelism,
while capping max splits per row group at num_partitions * 2 to avoid
excessive RowSelection overhead on wide scans.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
file_groups.len() may not reflect actual CPU parallelism.
Use std::thread::available_parallelism() * 2 instead.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Pass num_partitions * 2 from WorkQueue (sourced from session
target_partitions) to split_morsel, instead of a hardcoded value.
Eager tightening splits remain uncapped since fine granularity
helps filters tighten faster.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rhead

When a file has many small row groups, combine adjacent ones into a
single morsel until projected compressed size reaches TARGET_BYTES
(1MB). This reduces reader creation and queue coordination overhead.

Multi-RG morsels are split back into individual RG morsels during
tail-splitting. Grouping is skipped when tightening filters are
present since those need fine-grained morsels.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tail-splitting already handles splitting when queue depth is low.
Eager splitting added complexity without clear benchmark benefit.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Compute schema coercion, predicate/projection adaptation, and pruning
predicates once per file during morselize() and cache them in
CachedFileContext. Each morsel's open() reuses the cached context,
eliminating repeated expression rewriting and simplification that was
a significant overhead for files with many small row groups (e.g. TPC-H
Q17 at SF=1).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…oder

Resolve merge conflicts in opener.rs, clickbench.slt, and
projection_pushdown.slt. Adapt the morsel-driven bloom filter pruning
in open() to use a separate ParquetRecordBatchStreamBuilder (as
upstream now does) since prune_by_bloom_filters requires that type,
not the new ParquetPushDecoderBuilder.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Dandandan and others added 4 commits March 12, 2026 17:57
…en-execution-237164415184908839

# Conflicts:
#	datafusion/datasource-parquet/src/opener.rs
#	datafusion/sqllogictest/test_files/clickbench.slt
…am/main

- Add missing import for reverse_row_selection in opener.rs
- Update clickbench.slt plan expectations for ProjectionExec optimization changes
- Update projection_pushdown.slt for predicate pushdown in DataSourceExec
- Update encrypted_parquet.slt for query that now succeeds

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants