Skip to content

Fix shared TopK early exit with global prefix threshold#1

Open
geoffreyclaude wants to merge 1 commit into
ajegou:arnaud.jegou/topk-early-exit-filter-rejectionfrom
geoffreyclaude:codex/topk-global-prefix-early-exit
Open

Fix shared TopK early exit with global prefix threshold#1
geoffreyclaude wants to merge 1 commit into
ajegou:arnaud.jegou/topk-early-exit-filter-rejectionfrom
geoffreyclaude:codex/topk-global-prefix-early-exit

Conversation

@geoffreyclaude

Copy link
Copy Markdown

Which issue does this PR close?

Supplements apache#22852 for apache#22849.

Rationale for this change

PR apache#22852 fixes the local all-filtered-batch path by calling attempt_early_completion before returning. The remaining regression is in partitioned SortExec: every local TopK shares one TopKDynamicFilters. One partition can establish a global threshold before another partition has filled its local heap. The second partition then sees fully rejected batches, but heap.max() is still None locally, so it cannot prove completion and keeps draining sorted input.

This patch stores the common-prefix row for the shared global threshold, and each local TopK checks that shared prefix before falling back to its local heap prefix. It also prevents local partition TopKs from marking a shared dynamic filter complete while sibling partitions can still tighten it. Single-partition behavior is unchanged.

What changes are included in this PR?

  • Store a global common-prefix threshold row in TopKDynamicFilters.
  • Check that global prefix threshold in attempt_early_completion before local heap fallback.
  • Keep the PR fix(topk): call attempt_early_completion when filter rejects entire batch apache/datafusion#22852 all-filtered-batch completion call.
  • Also check completion on batches that pass the filter but produce zero heap replacements.
  • Avoid marking shared partitioned TopK filters complete from individual local partitions.
  • Add tests for shared-filter completion before local heap fill, equal-prefix non-completion, missing-prefix non-completion, and DESC/null prefix ordering.

Are these changes tested?

Commands run on the final rebased branch:

  • cargo fmt --all
  • cargo test -p datafusion-physical-plan topk --lib
  • cargo test -p datafusion-physical-plan sort --lib
  • cargo clippy -p datafusion-physical-plan --lib -- -D warnings
  • cargo clippy --all-targets --all-features -- -D warnings
  • cargo build --release --bin dfbench

Benchmark command:

target/release/dfbench sort-tpch --sorted --limit 10 --iterations 5 --path /tmp/df-topk-bench-data/tpch_sf1 -o /tmp/df-patched-rerun2-top10_sorted_tpch.json

Results, using the clean reruns and ignoring the earlier noisy iteration:

run total Q1 Q4 Q8 Q9 Q10
pre_15770_v48_rerun 358.40 56.63 5.00 5.60 7.77 10.20
post_15770_v48_rerun 418.88 26.51 16.44 27.42 67.61 60.39
pr22852_base_v53_rerun 358.72 5.20 18.69 28.99 35.77 59.52
pr22852_fix_v53_rerun 323.65 3.84 13.38 28.48 39.47 57.70
patched_v53_run3 167.19 3.10 7.10 3.98 5.89 8.16

Key comparisons:

Debug proof for the bounded-read shape after this patch:

  • Q8: DataSourceExec output_rows=81.92K, output_batches=10, files_processed=0, bytes_scanned=15.79M.
  • Q9: DataSourceExec output_rows=81.92K, output_batches=10, files_processed=0, bytes_scanned=20.89M.
  • Q10: DataSourceExec output_rows=81.92K, output_batches=10, files_processed=0, bytes_scanned=34.69M.

Those Q8/Q9/Q10 debug runs show the scan returns to one batch per partition instead of draining millions of rows across the remaining file ranges.

Are there any user-facing changes?

No. This is an internal physical execution optimization fix.

@geoffreyclaude

Copy link
Copy Markdown
Author

@codex review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant