Skip to content

Commit 8132e06

Browse files
committed
Add use_statistics_registry config and registry-aware join selection
Adds a pluggable statistics path for JoinSelection that uses the StatisticsRegistry instead of each operator's built-in partition_statistics. - Add optimizer.use_statistics_registry config flag (default=false) - Override optimize_with_context in JoinSelection to pass the registry to should_swap_join_order when the flag is enabled; if no registry is set on SessionState the built-in default is constructed lazily - Add statistics_registry.slt demonstrating how the registry produces more conservative join estimates for skewed data (10*10=100 cartesian fallback vs 10*10/3=33 range-NDV estimate), triggering the correct build-side swap that the built-in estimator misses style: fix prettier formatting in configs.md fix: cargo fmt and broken rustdoc link in config docstring
1 parent 17b5653 commit 8132e06

6 files changed

Lines changed: 380 additions & 130 deletions

File tree

datafusion/common/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,6 +1231,12 @@ config_namespace! {
12311231
/// query is used.
12321232
pub join_reordering: bool, default = true
12331233

1234+
/// When set to true, the physical plan optimizer uses the pluggable
1235+
/// `StatisticsRegistry` for statistics propagation across operators.
1236+
/// This enables more accurate cardinality estimates compared to each
1237+
/// operator's built-in `partition_statistics`.
1238+
pub use_statistics_registry: bool, default = false
1239+
12341240
/// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
12351241
/// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
12361242
pub prefer_hash_join: bool, default = true

datafusion/physical-optimizer/src/join_selection.rs

Lines changed: 68 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
//! `PartitionMode` and the build side using the available statistics for hash joins.
2525
2626
use crate::PhysicalOptimizerRule;
27+
use crate::optimizer::{ConfigOnlyContext, PhysicalOptimizerContext};
28+
use datafusion_common::Statistics;
2729
use datafusion_common::config::ConfigOptions;
2830
use datafusion_common::error::Result;
2931
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
@@ -37,6 +39,7 @@ use datafusion_physical_plan::joins::{
3739
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
3840
StreamJoinPartitionMode, SymmetricHashJoinExec,
3941
};
42+
use datafusion_physical_plan::operator_statistics::StatisticsRegistry;
4043
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
4144
use std::sync::Arc;
4245

@@ -53,36 +56,49 @@ impl JoinSelection {
5356
}
5457
}
5558

59+
/// Get statistics for a plan node, using the registry if available.
60+
fn get_stats(
61+
plan: &dyn ExecutionPlan,
62+
registry: Option<&StatisticsRegistry>,
63+
) -> Result<Arc<Statistics>> {
64+
if let Some(reg) = registry {
65+
reg.compute(plan)
66+
.map(|s| Arc::<Statistics>::clone(s.base_arc()))
67+
} else {
68+
plan.partition_statistics(None)
69+
}
70+
}
71+
5672
// TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
5773
// TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times.
5874
/// Checks whether join inputs should be swapped using available statistics.
5975
///
6076
/// It follows these steps:
61-
/// 1. Compare the in-memory sizes of both sides, and place the smaller side on
77+
/// 1. If a [`StatisticsRegistry`] is provided, use it for cross-operator estimates
78+
/// (e.g., intermediate join outputs that would otherwise have `Absent` statistics).
79+
/// 2. Compare the in-memory sizes of both sides, and place the smaller side on
6280
/// the left (build) side.
63-
/// 2. If in-memory byte sizes are unavailable, fall back to row counts.
64-
/// 3. Do not reorder the join if neither statistic is available, or if
81+
/// 3. If in-memory byte sizes are unavailable, fall back to row counts.
82+
/// 4. Do not reorder the join if neither statistic is available, or if
6583
/// `datafusion.optimizer.join_reordering` is disabled.
6684
///
67-
///
6885
/// Used configurations inside arg `config`
6986
/// - `config.optimizer.join_reordering`: allows or forbids statistics-driven join swapping
7087
pub(crate) fn should_swap_join_order(
7188
left: &dyn ExecutionPlan,
7289
right: &dyn ExecutionPlan,
7390
config: &ConfigOptions,
91+
registry: Option<&StatisticsRegistry>,
7492
) -> Result<bool> {
7593
if !config.optimizer.join_reordering {
7694
return Ok(false);
7795
}
7896

79-
// Get the left and right table's total bytes
80-
// If both the left and right tables contain total_byte_size statistics,
81-
// use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
82-
let left_stats = left.partition_statistics(None)?;
83-
let right_stats = right.partition_statistics(None)?;
84-
// First compare `total_byte_size` of left and right side,
85-
// if information in this field is insufficient fallback to the `num_rows`
97+
let left_stats = get_stats(left, registry)?;
98+
let right_stats = get_stats(right, registry)?;
99+
100+
// First compare total_byte_size, then fall back to num_rows if byte
101+
// sizes are unavailable.
86102
match (
87103
left_stats.total_byte_size.get_value(),
88104
right_stats.total_byte_size.get_value(),
@@ -102,8 +118,9 @@ fn supports_collect_by_thresholds(
102118
plan: &dyn ExecutionPlan,
103119
threshold_byte_size: usize,
104120
threshold_num_rows: usize,
121+
registry: Option<&StatisticsRegistry>,
105122
) -> bool {
106-
let Ok(stats) = plan.partition_statistics(None) else {
123+
let Ok(stats) = get_stats(plan, registry) else {
107124
return false;
108125
};
109126

@@ -126,31 +143,36 @@ impl PhysicalOptimizerRule for JoinSelection {
126143
plan: Arc<dyn ExecutionPlan>,
127144
config: &ConfigOptions,
128145
) -> Result<Arc<dyn ExecutionPlan>> {
129-
// First, we make pipeline-fixing modifications to joins so as to accommodate
130-
// unbounded inputs. Each pipeline-fixing subrule, which is a function
131-
// of type `PipelineFixerSubrule`, takes a single [`PipelineStatePropagator`]
132-
// argument storing state variables that indicate the unboundedness status
133-
// of the current [`ExecutionPlan`] as we traverse the plan tree.
146+
self.optimize_with_context(plan, &ConfigOnlyContext::new(config))
147+
}
148+
149+
fn optimize_with_context(
150+
&self,
151+
plan: Arc<dyn ExecutionPlan>,
152+
context: &dyn PhysicalOptimizerContext,
153+
) -> Result<Arc<dyn ExecutionPlan>> {
154+
let config = context.config_options();
155+
let mut default_registry = None;
156+
let registry: Option<&StatisticsRegistry> =
157+
if config.optimizer.use_statistics_registry {
158+
Some(context.statistics_registry().unwrap_or_else(|| {
159+
default_registry
160+
.insert(StatisticsRegistry::default_with_builtin_providers())
161+
}))
162+
} else {
163+
None
164+
};
134165
let subrules: Vec<Box<PipelineFixerSubrule>> = vec![
135166
Box::new(hash_join_convert_symmetric_subrule),
136167
Box::new(hash_join_swap_subrule),
137168
];
138169
let new_plan = plan
139170
.transform_up(|p| apply_subrules(p, &subrules, config))
140171
.data()?;
141-
// Next, we apply another subrule that tries to optimize joins using any
142-
// statistics their inputs might have.
143-
// - For a hash join with partition mode [`PartitionMode::Auto`], we will
144-
// make a cost-based decision to select which `PartitionMode` mode
145-
// (`Partitioned`/`CollectLeft`) is optimal. If the statistics information
146-
// is not available, we will fall back to [`PartitionMode::Partitioned`].
147-
// - We optimize/swap join sides so that the left (build) side of the join
148-
// is the small side. If the statistics information is not available, we
149-
// do not modify join sides.
150-
// - We will also swap left and right sides for cross joins so that the left
151-
// side is the small side.
152172
new_plan
153-
.transform_up(|plan| statistical_join_selection_subrule(plan, config))
173+
.transform_up(|plan| {
174+
statistical_join_selection_subrule(plan, config, registry)
175+
})
154176
.data()
155177
}
156178

@@ -178,6 +200,7 @@ pub(crate) fn try_collect_left(
178200
hash_join: &HashJoinExec,
179201
ignore_threshold: bool,
180202
config: &ConfigOptions,
203+
registry: Option<&StatisticsRegistry>,
181204
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
182205
let left = hash_join.left();
183206
let right = hash_join.right();
@@ -188,20 +211,22 @@ pub(crate) fn try_collect_left(
188211
&**left,
189212
optimizer_config.hash_join_single_partition_threshold,
190213
optimizer_config.hash_join_single_partition_threshold_rows,
214+
registry,
191215
);
192216
let right_can_collect = ignore_threshold
193217
|| supports_collect_by_thresholds(
194218
&**right,
195219
optimizer_config.hash_join_single_partition_threshold,
196220
optimizer_config.hash_join_single_partition_threshold_rows,
221+
registry,
197222
);
198223

199224
match (left_can_collect, right_can_collect) {
200225
(true, true) => {
201226
// Don't swap null-aware anti joins as they have specific side requirements
202227
if hash_join.join_type().supports_swap()
203228
&& !hash_join.null_aware
204-
&& should_swap_join_order(&**left, &**right, config)?
229+
&& should_swap_join_order(&**left, &**right, config, registry)?
205230
{
206231
Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
207232
} else {
@@ -245,13 +270,14 @@ pub(crate) fn try_collect_left(
245270
pub(crate) fn partitioned_hash_join(
246271
hash_join: &HashJoinExec,
247272
config: &ConfigOptions,
273+
registry: Option<&StatisticsRegistry>,
248274
) -> Result<Arc<dyn ExecutionPlan>> {
249275
let left = hash_join.left();
250276
let right = hash_join.right();
251277
// Don't swap null-aware anti joins as they have specific side requirements
252278
if hash_join.join_type().supports_swap()
253279
&& !hash_join.null_aware
254-
&& should_swap_join_order(&**left, &**right, config)?
280+
&& should_swap_join_order(&**left, &**right, config, registry)?
255281
{
256282
hash_join.swap_inputs(PartitionMode::Partitioned)
257283
} else {
@@ -285,26 +311,28 @@ pub(crate) fn partitioned_hash_join(
285311
fn statistical_join_selection_subrule(
286312
plan: Arc<dyn ExecutionPlan>,
287313
config: &ConfigOptions,
314+
registry: Option<&StatisticsRegistry>,
288315
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
289316
let transformed = if let Some(hash_join) = plan.downcast_ref::<HashJoinExec>() {
290317
match hash_join.partition_mode() {
291-
PartitionMode::Auto => try_collect_left(hash_join, false, config)?
318+
PartitionMode::Auto => try_collect_left(hash_join, false, config, registry)?
292319
.map_or_else(
293-
|| partitioned_hash_join(hash_join, config).map(Some),
320+
|| partitioned_hash_join(hash_join, config, registry).map(Some),
294321
|v| Ok(Some(v)),
295322
)?,
296-
PartitionMode::CollectLeft => try_collect_left(hash_join, true, config)?
297-
.map_or_else(
298-
|| partitioned_hash_join(hash_join, config).map(Some),
323+
PartitionMode::CollectLeft => {
324+
try_collect_left(hash_join, true, config, registry)?.map_or_else(
325+
|| partitioned_hash_join(hash_join, config, registry).map(Some),
299326
|v| Ok(Some(v)),
300-
)?,
327+
)?
328+
}
301329
PartitionMode::Partitioned => {
302330
let left = hash_join.left();
303331
let right = hash_join.right();
304332
// Don't swap null-aware anti joins as they have specific side requirements
305333
if hash_join.join_type().supports_swap()
306334
&& !hash_join.null_aware
307-
&& should_swap_join_order(&**left, &**right, config)?
335+
&& should_swap_join_order(&**left, &**right, config, registry)?
308336
{
309337
hash_join
310338
.swap_inputs(PartitionMode::Partitioned)
@@ -317,7 +345,7 @@ fn statistical_join_selection_subrule(
317345
} else if let Some(cross_join) = plan.downcast_ref::<CrossJoinExec>() {
318346
let left = cross_join.left();
319347
let right = cross_join.right();
320-
if should_swap_join_order(&**left, &**right, config)? {
348+
if should_swap_join_order(&**left, &**right, config, registry)? {
321349
cross_join.swap_inputs().map(Some)?
322350
} else {
323351
None
@@ -326,7 +354,7 @@ fn statistical_join_selection_subrule(
326354
let left = nl_join.left();
327355
let right = nl_join.right();
328356
if nl_join.join_type().supports_swap()
329-
&& should_swap_join_order(&**left, &**right, config)?
357+
&& should_swap_join_order(&**left, &**right, config, registry)?
330358
{
331359
nl_join.swap_inputs().map(Some)?
332360
} else {

0 commit comments

Comments
 (0)