From 46c5e623174f3faa509deb270f215f0e4a58bf3e Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Mon, 8 Jun 2026 13:36:45 -0400 Subject: [PATCH 1/2] . --- .../benches/hash_join_semi_anti.rs | 629 +++++++++++------- .../physical-plan/src/joins/array_map.rs | 185 +++++- .../physical-plan/src/joins/hash_join/exec.rs | 356 +++++++++- .../src/joins/hash_join/stream.rs | 424 +++++++++++- .../physical-plan/src/joins/join_hash_map.rs | 274 ++++++++ .../sort_merge_join/materializing_stream.rs | 26 +- datafusion/physical-plan/src/joins/utils.rs | 88 ++- 7 files changed, 1668 insertions(+), 314 deletions(-) diff --git a/datafusion/physical-plan/benches/hash_join_semi_anti.rs b/datafusion/physical-plan/benches/hash_join_semi_anti.rs index 1e11da36be73c..0aed83514b984 100644 --- a/datafusion/physical-plan/benches/hash_join_semi_anti.rs +++ b/datafusion/physical-plan/benches/hash_join_semi_anti.rs @@ -15,7 +15,8 @@ // specific language governing permissions and limitations // under the License. -//! Criterion benchmarks for Hash Join with RightSemi/RightAnti joins with Int32 keys. +//! Criterion benchmarks for Hash Join with RightSemi/RightAnti joins with +//! Int32 and Utf8 keys. //! //! ## Key Benchmark Axes //! @@ -39,18 +40,24 @@ //! //! Semi/anti joins can short-circuit after finding the first match, so these //! benchmarks help evaluate optimization strategies for existence checks. +//! +//! Each scenario below describes a build/probe data shape and is benchmarked +//! under both RightSemi and RightAnti. use std::sync::Arc; use arrow::array::{Int32Array, RecordBatch, StringArray}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use criterion::measurement::WallTime; +use criterion::{ + BenchmarkGroup, BenchmarkId, Criterion, criterion_group, criterion_main, +}; use datafusion_common::{JoinType, NullEquality}; use datafusion_execution::TaskContext; use datafusion_physical_expr::expressions::col; -use datafusion_physical_plan::collect; use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, utils::JoinOn}; use datafusion_physical_plan::test::TestMemoryExec; +use datafusion_physical_plan::{ExecutionPlan, collect}; use tokio::runtime::Runtime; /// Build RecordBatches with Int32 keys. @@ -65,37 +72,10 @@ fn build_batches( key_offset: i32, schema: &SchemaRef, ) -> Vec { - let keys: Vec = (0..num_rows) - .map(|i| ((i % key_mod) as i32) + key_offset) - .collect(); - let data: Vec = (0..num_rows).map(|i| i as i32).collect(); - let payload: Vec = data.iter().map(|d| format!("val_{d}")).collect(); - - let batch = RecordBatch::try_new( - Arc::clone(schema), - vec![ - Arc::new(Int32Array::from(keys)), - Arc::new(Int32Array::from(data)), - Arc::new(StringArray::from(payload)), - ], - ) - .unwrap(); - - let batch_size = 8192; - let mut batches = Vec::new(); - let mut offset = 0; - while offset < batch.num_rows() { - let len = (batch.num_rows() - offset).min(batch_size); - batches.push(batch.slice(offset, len)); - offset += len; - } - batches + build_batches_from_key_fn(num_rows, schema, |i| ((i % key_mod) as i32) + key_offset) } -fn make_exec( - batches: &[RecordBatch], - schema: &SchemaRef, -) -> Arc { +fn make_exec(batches: &[RecordBatch], schema: &SchemaRef) -> Arc { TestMemoryExec::try_new_exec(&[batches.to_vec()], Arc::clone(schema), None).unwrap() } @@ -107,34 +87,117 @@ fn schema() -> SchemaRef { ])) } +fn utf8_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("data", DataType::Int32, false), + Field::new("payload", DataType::Utf8, false), + ])) +} + fn do_hash_join( - left: Arc, - right: Arc, + left: Arc, + right: Arc, join_type: JoinType, rt: &Runtime, -) -> usize { +) -> HashJoinRun { let on: JoinOn = vec![( col("key", &left.schema()).unwrap(), col("key", &right.schema()).unwrap(), )]; - let join = HashJoinExec::try_new( - left, - right, - on, - None, - &join_type, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNothing, - false, - ) - .unwrap(); + let join = Arc::new( + HashJoinExec::try_new( + left, + right, + on, + None, + &join_type, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ); let task_ctx = Arc::new(TaskContext::default()); - rt.block_on(async { - let batches = collect(Arc::new(join), task_ctx).await.unwrap(); - batches.iter().map(|b| b.num_rows()).sum() - }) + let output_rows = rt.block_on(async { + let batches = collect(join.clone(), task_ctx).await.unwrap(); + batches.iter().map(|b| b.num_rows()).sum::() + }); + let metrics = join.metrics().unwrap(); + let array_map_created_count = metrics + .sum_by_name(HashJoinExec::ARRAY_MAP_CREATED_COUNT_METRIC_NAME) + .map(|v| v.as_usize()) + .unwrap_or(0); + + HashJoinRun { + output_rows, + array_map_created_count, + } +} + +struct HashJoinRun { + output_rows: usize, + array_map_created_count: usize, +} + +/// A build/probe data shape benchmarked under both RightSemi and RightAnti. +struct BenchScenario { + /// Name suffix shared by the `right_semi_*` / `right_anti_*` variants. + suffix: &'static str, + probe_rows: usize, + left_batches: Vec, + right_batches: Vec, + /// Expected RightSemi output row count. The expected RightAnti output is + /// the complement (`probe_rows - semi_output_rows`), since every probe + /// row is emitted by exactly one of the two join types. + semi_output_rows: usize, + /// Whether the build side is expected to select the ArrayMap strategy. + uses_array_map: bool, +} + +fn bench_scenario( + group: &mut BenchmarkGroup<'_, WallTime>, + schema: &SchemaRef, + rt: &Runtime, + scenario: &BenchScenario, +) { + for join_type in [JoinType::RightSemi, JoinType::RightAnti] { + let (prefix, expected_output_rows) = match join_type { + JoinType::RightSemi => ("right_semi", scenario.semi_output_rows), + _ => ( + "right_anti", + scenario.probe_rows - scenario.semi_output_rows, + ), + }; + let name = format!("{prefix}_{}", scenario.suffix); + + // Validate the output row count and map strategy once before measuring. + let run = do_hash_join( + make_exec(&scenario.left_batches, schema), + make_exec(&scenario.right_batches, schema), + join_type, + rt, + ); + assert_eq!( + run.output_rows, expected_output_rows, + "unexpected output row count for {name}" + ); + assert_eq!( + run.array_map_created_count > 0, + scenario.uses_array_map, + "unexpected map strategy for {name}" + ); + + group.bench_function(BenchmarkId::new(name, scenario.probe_rows), |b| { + b.iter(|| { + let left = make_exec(&scenario.left_batches, schema); + let right = make_exec(&scenario.right_batches, schema); + do_hash_join(left, right, join_type, rt).output_rows + }) + }); + } } /// Build batches with sparse keys (key = row_index % key_mod * multiplier + key_offset). @@ -146,9 +209,21 @@ fn build_batches_sparse( multiplier: i32, schema: &SchemaRef, ) -> Vec { - let keys: Vec = (0..num_rows) - .map(|i| ((i % key_mod) as i32) * multiplier + key_offset) - .collect(); + build_batches_from_key_fn(num_rows, schema, |i| { + ((i % key_mod) as i32) * multiplier + key_offset + }) +} + +/// Build batches from an arbitrary deterministic key generator. +fn build_batches_from_key_fn( + num_rows: usize, + schema: &SchemaRef, + mut key_fn: F, +) -> Vec +where + F: FnMut(usize) -> i32, +{ + let keys: Vec = (0..num_rows).map(&mut key_fn).collect(); let data: Vec = (0..num_rows).map(|i| i as i32).collect(); let payload: Vec = data.iter().map(|d| format!("val_{d}")).collect(); @@ -162,6 +237,37 @@ fn build_batches_sparse( ) .unwrap(); + slice_batches(&batch) +} + +/// Build batches with Utf8 keys: `key = "key_"`, zero-padded so +/// every key has the same length. +fn build_utf8_batches( + num_rows: usize, + key_mod: usize, + schema: &SchemaRef, +) -> Vec { + let keys: Vec = (0..num_rows) + .map(|i| format!("key_{:08}", i % key_mod)) + .collect(); + let data: Vec = (0..num_rows).map(|i| i as i32).collect(); + let payload: Vec = data.iter().map(|d| format!("val_{d}")).collect(); + + let batch = RecordBatch::try_new( + Arc::clone(schema), + vec![ + Arc::new(StringArray::from(keys)), + Arc::new(Int32Array::from(data)), + Arc::new(StringArray::from(payload)), + ], + ) + .unwrap(); + + slice_batches(&batch) +} + +/// Split `batch` into 8192-row slices. +fn slice_batches(batch: &RecordBatch) -> Vec { let batch_size = 8192; let mut batches = Vec::new(); let mut offset = 0; @@ -173,6 +279,22 @@ fn build_batches_sparse( batches } +/// Skewed 50%-hit key generator for a sparse (HashMap) build side whose keys are +/// spaced by 10. Matching keys land on multiples of 10; misses are offset by 1. +fn skewed_h50_hashmap_key(row: usize, build_rows: usize, hot_keys: usize) -> i32 { + let logical_row = row / 2; + if row.is_multiple_of(2) { + let key = if logical_row % 10 < 8 { + logical_row % hot_keys + } else { + hot_keys + logical_row % (build_rows - hot_keys) + }; + (key as i32) * 10 + } else { + ((logical_row % build_rows) as i32) * 10 + 1 + } +} + fn bench_hash_join_semi_anti(c: &mut Criterion) { let rt = Runtime::new().unwrap(); let s = schema(); @@ -184,200 +306,257 @@ fn bench_hash_join_semi_anti(c: &mut Criterion) { let build_rows = 100_000; let probe_rows = 1_000_000; - // ========================================================================= - // RightSemi Join benchmarks - // ========================================================================= - - // RightSemi - 100% Density, 100% hit rate + // 100% Density, 100% hit rate // Keys: 0..100K contiguous, all probe rows find a match - { - let left_batches = build_batches(build_rows, build_rows, 0, &s); - let right_batches = build_batches(probe_rows, build_rows, 0, &s); - group.bench_function(BenchmarkId::new("right_semi_d100_h100", probe_rows), |b| { - b.iter(|| { - let left = make_exec(&left_batches, &s); - let right = make_exec(&right_batches, &s); - do_hash_join(left, right, JoinType::RightSemi, &rt) - }) - }); - } - - // RightSemi - 100% Density, 10% hit rate + bench_scenario( + &mut group, + &s, + &rt, + &BenchScenario { + suffix: "d100_h100", + probe_rows, + left_batches: build_batches(build_rows, build_rows, 0, &s), + right_batches: build_batches(probe_rows, build_rows, 0, &s), + semi_output_rows: probe_rows, + uses_array_map: true, + }, + ); + + // 100% Density, 10% hit rate // Keys: 0..100K contiguous, only 10% of probe rows find a match - { - let left_batches = build_batches(build_rows, build_rows, 0, &s); - let right_batches = build_batches(probe_rows, build_rows * 10, 0, &s); - group.bench_function(BenchmarkId::new("right_semi_d100_h10", probe_rows), |b| { - b.iter(|| { - let left = make_exec(&left_batches, &s); - let right = make_exec(&right_batches, &s); - do_hash_join(left, right, JoinType::RightSemi, &rt) - }) - }); - } - - // RightSemi - 50% Density, 100% hit rate + bench_scenario( + &mut group, + &s, + &rt, + &BenchScenario { + suffix: "d100_h10", + probe_rows, + left_batches: build_batches(build_rows, build_rows, 0, &s), + right_batches: build_batches(probe_rows, build_rows * 10, 0, &s), + semi_output_rows: build_rows, + uses_array_map: true, + }, + ); + + // 100% Density, 50% hit rate + // Keys: 0..100K contiguous, half of probe rows find a match. + bench_scenario( + &mut group, + &s, + &rt, + &BenchScenario { + suffix: "d100_h50", + probe_rows, + left_batches: build_batches(build_rows, build_rows, 0, &s), + right_batches: build_batches(probe_rows, build_rows * 2, 0, &s), + semi_output_rows: probe_rows / 2, + uses_array_map: true, + }, + ); + + // 50% Density, 100% hit rate // Keys: 0, 2, 4, ... (sparse, multiplier=2), all probe rows find a match - { - let left_batches = build_batches_sparse(build_rows, build_rows, 0, 2, &s); - let right_batches = build_batches_sparse(probe_rows, build_rows, 0, 2, &s); - group.bench_function(BenchmarkId::new("right_semi_d50_h100", probe_rows), |b| { - b.iter(|| { - let left = make_exec(&left_batches, &s); - let right = make_exec(&right_batches, &s); - do_hash_join(left, right, JoinType::RightSemi, &rt) - }) - }); - } - - // RightSemi - 50% Density, 10% hit rate + bench_scenario( + &mut group, + &s, + &rt, + &BenchScenario { + suffix: "d50_h100", + probe_rows, + left_batches: build_batches_sparse(build_rows, build_rows, 0, 2, &s), + right_batches: build_batches_sparse(probe_rows, build_rows, 0, 2, &s), + semi_output_rows: probe_rows, + uses_array_map: true, + }, + ); + + // 50% Density, 10% hit rate // Keys: 0, 2, 4, ... (sparse), only 10% of probe rows find a match - { - let left_batches = build_batches_sparse(build_rows, build_rows, 0, 2, &s); - let right_batches = build_batches_sparse(probe_rows, build_rows * 10, 0, 2, &s); - group.bench_function(BenchmarkId::new("right_semi_d50_h10", probe_rows), |b| { - b.iter(|| { - let left = make_exec(&left_batches, &s); - let right = make_exec(&right_batches, &s); - do_hash_join(left, right, JoinType::RightSemi, &rt) - }) - }); - } - - // RightSemi - 10% Density, 100% hit rate + bench_scenario( + &mut group, + &s, + &rt, + &BenchScenario { + suffix: "d50_h10", + probe_rows, + left_batches: build_batches_sparse(build_rows, build_rows, 0, 2, &s), + right_batches: build_batches_sparse(probe_rows, build_rows * 10, 0, 2, &s), + semi_output_rows: build_rows, + uses_array_map: true, + }, + ); + + // 10% Density, 100% hit rate // Keys: 0, 10, 20, ... (very sparse, multiplier=10), all probe rows find a match - { - let left_batches = build_batches_sparse(build_rows, build_rows, 0, 10, &s); - let right_batches = build_batches_sparse(probe_rows, build_rows, 0, 10, &s); - group.bench_function(BenchmarkId::new("right_semi_d10_h100", probe_rows), |b| { - b.iter(|| { - let left = make_exec(&left_batches, &s); - let right = make_exec(&right_batches, &s); - do_hash_join(left, right, JoinType::RightSemi, &rt) - }) - }); - } - - // RightSemi - 10% Density, 10% hit rate + bench_scenario( + &mut group, + &s, + &rt, + &BenchScenario { + suffix: "d10_h100", + probe_rows, + left_batches: build_batches_sparse(build_rows, build_rows, 0, 10, &s), + right_batches: build_batches_sparse(probe_rows, build_rows, 0, 10, &s), + semi_output_rows: probe_rows, + uses_array_map: false, + }, + ); + + // 10% Density, 10% hit rate // Keys: 0, 10, 20, ... (very sparse), only 10% of probe rows find a match - { - let left_batches = build_batches_sparse(build_rows, build_rows, 0, 10, &s); - let right_batches = build_batches_sparse(probe_rows, build_rows * 10, 0, 10, &s); - group.bench_function(BenchmarkId::new("right_semi_d10_h10", probe_rows), |b| { - b.iter(|| { - let left = make_exec(&left_batches, &s); - let right = make_exec(&right_batches, &s); - do_hash_join(left, right, JoinType::RightSemi, &rt) - }) - }); - } - - // RightSemi - 100% Density, ~1% hit rate, fanout ~100 - // Build keys are duplicated: 100K rows over 1K distinct keys. Matching - // probe rows produce many duplicate probe indices before RightSemi - // deduplication. + bench_scenario( + &mut group, + &s, + &rt, + &BenchScenario { + suffix: "d10_h10", + probe_rows, + left_batches: build_batches_sparse(build_rows, build_rows, 0, 10, &s), + right_batches: build_batches_sparse(probe_rows, build_rows * 10, 0, 10, &s), + semi_output_rows: build_rows, + uses_array_map: false, + }, + ); + + // 10% Density, 50% hit rate + // Sparse build keys force the HashMap path while avoiding all-match/no-match extremes. + bench_scenario( + &mut group, + &s, + &rt, + &BenchScenario { + suffix: "d10_h50", + probe_rows, + left_batches: build_batches_sparse(build_rows, build_rows, 0, 10, &s), + right_batches: build_batches_sparse(probe_rows, build_rows * 2, 0, 10, &s), + semi_output_rows: probe_rows / 2, + uses_array_map: false, + }, + ); + + // 100% Density, ~1% hit rate, fanout ~100 + // Build keys are duplicated: 100K rows over 1K distinct keys, so each + // matching key has fanout ~100. { let fanout_keys = 1_000; - let left_batches = build_batches(build_rows, fanout_keys, 0, &s); - let right_batches = build_batches(probe_rows, build_rows, 0, &s); - group.bench_function( - BenchmarkId::new("right_semi_fanout100_h1", probe_rows), - |b| { - b.iter(|| { - let left = make_exec(&left_batches, &s); - let right = make_exec(&right_batches, &s); - do_hash_join(left, right, JoinType::RightSemi, &rt) - }) + bench_scenario( + &mut group, + &s, + &rt, + &BenchScenario { + suffix: "fanout100_h1", + probe_rows, + left_batches: build_batches(build_rows, fanout_keys, 0, &s), + right_batches: build_batches(probe_rows, build_rows, 0, &s), + semi_output_rows: probe_rows / 100, + uses_array_map: true, }, ); } - // ========================================================================= - // RightAnti Join benchmarks - // ========================================================================= - - // RightAnti - 100% Density, 100% hit rate (no output) - // Keys: 0..100K contiguous, all probe rows find a match -> no output + // 100% Density, 50% hit rate, fanout ~10 + // This is a moderate duplicate-heavy case: enough fanout to stress pair + // materialization, without relying on a tiny match rate. { - let left_batches = build_batches(build_rows, build_rows, 0, &s); - let right_batches = build_batches(probe_rows, build_rows, 0, &s); - group.bench_function(BenchmarkId::new("right_anti_d100_h100", probe_rows), |b| { - b.iter(|| { - let left = make_exec(&left_batches, &s); - let right = make_exec(&right_batches, &s); - do_hash_join(left, right, JoinType::RightAnti, &rt) - }) - }); - } - - // RightAnti - 100% Density, 10% hit rate (90% output) - // Keys: 0..100K contiguous, only 10% of probe rows find a match -> 90% output - { - let left_batches = build_batches(build_rows, build_rows, 0, &s); - let right_batches = build_batches(probe_rows, build_rows * 10, 0, &s); - group.bench_function(BenchmarkId::new("right_anti_d100_h10", probe_rows), |b| { - b.iter(|| { - let left = make_exec(&left_batches, &s); - let right = make_exec(&right_batches, &s); - do_hash_join(left, right, JoinType::RightAnti, &rt) - }) - }); + let fanout_keys = 10_000; + bench_scenario( + &mut group, + &s, + &rt, + &BenchScenario { + suffix: "fanout10_h50", + probe_rows, + left_batches: build_batches(build_rows, fanout_keys, 0, &s), + right_batches: build_batches(probe_rows, fanout_keys * 2, 0, &s), + semi_output_rows: probe_rows / 2, + uses_array_map: true, + }, + ); } - // RightAnti - 50% Density, 100% hit rate (no output) - // Keys: 0, 2, 4, ... (sparse), all probe rows find a match -> no output + // HashMap path, 50% hit rate, fanout ~10 + // A large sparse key range prevents ArrayMap selection while preserving the + // same moderate fanout and hit-rate shape. { - let left_batches = build_batches_sparse(build_rows, build_rows, 0, 2, &s); - let right_batches = build_batches_sparse(probe_rows, build_rows, 0, 2, &s); - group.bench_function(BenchmarkId::new("right_anti_d50_h100", probe_rows), |b| { - b.iter(|| { - let left = make_exec(&left_batches, &s); - let right = make_exec(&right_batches, &s); - do_hash_join(left, right, JoinType::RightAnti, &rt) - }) - }); + let fanout_keys = 10_000; + bench_scenario( + &mut group, + &s, + &rt, + &BenchScenario { + suffix: "fanout10_h50_hashmap", + probe_rows, + left_batches: build_batches_sparse(build_rows, fanout_keys, 0, 1000, &s), + right_batches: build_batches_sparse( + probe_rows, + fanout_keys * 2, + 0, + 1000, + &s, + ), + semi_output_rows: probe_rows / 2, + uses_array_map: false, + }, + ); } - // RightAnti - 50% Density, 10% hit rate (90% output) - // Keys: 0, 2, 4, ... (sparse), only 10% of probe rows find a match -> 90% output + // HashMap path, skewed 50% hit rate { - let left_batches = build_batches_sparse(build_rows, build_rows, 0, 2, &s); - let right_batches = build_batches_sparse(probe_rows, build_rows * 10, 0, 2, &s); - group.bench_function(BenchmarkId::new("right_anti_d50_h10", probe_rows), |b| { - b.iter(|| { - let left = make_exec(&left_batches, &s); - let right = make_exec(&right_batches, &s); - do_hash_join(left, right, JoinType::RightAnti, &rt) - }) - }); + let hot_keys = 1_000; + bench_scenario( + &mut group, + &s, + &rt, + &BenchScenario { + suffix: "skewed_h50_hashmap", + probe_rows, + left_batches: build_batches_sparse(build_rows, build_rows, 0, 10, &s), + right_batches: build_batches_from_key_fn(probe_rows, &s, |row| { + skewed_h50_hashmap_key(row, build_rows, hot_keys) + }), + semi_output_rows: probe_rows / 2, + uses_array_map: false, + }, + ); } - // RightAnti - 10% Density, 100% hit rate (no output) - // Keys: 0, 10, 20, ... (very sparse), all probe rows find a match -> no output + // Utf8 keys, unique build keys, 50% hit rate + // String keys always take the HashMap path (ArrayMap supports only + // integer keys), and key equality is substantially more expensive than + // for Int32. { - let left_batches = build_batches_sparse(build_rows, build_rows, 0, 10, &s); - let right_batches = build_batches_sparse(probe_rows, build_rows, 0, 10, &s); - group.bench_function(BenchmarkId::new("right_anti_d10_h100", probe_rows), |b| { - b.iter(|| { - let left = make_exec(&left_batches, &s); - let right = make_exec(&right_batches, &s); - do_hash_join(left, right, JoinType::RightAnti, &rt) - }) - }); - } + let utf8_s = utf8_schema(); + bench_scenario( + &mut group, + &utf8_s, + &rt, + &BenchScenario { + suffix: "utf8_h50", + probe_rows, + left_batches: build_utf8_batches(build_rows, build_rows, &utf8_s), + right_batches: build_utf8_batches(probe_rows, build_rows * 2, &utf8_s), + semi_output_rows: probe_rows / 2, + uses_array_map: false, + }, + ); - // RightAnti - 10% Density, 10% hit rate (90% output) - // Keys: 0, 10, 20, ... (very sparse), only 10% of probe rows find a match -> 90% output - { - let left_batches = build_batches_sparse(build_rows, build_rows, 0, 10, &s); - let right_batches = build_batches_sparse(probe_rows, build_rows * 10, 0, 10, &s); - group.bench_function(BenchmarkId::new("right_anti_d10_h10", probe_rows), |b| { - b.iter(|| { - let left = make_exec(&left_batches, &s); - let right = make_exec(&right_batches, &s); - do_hash_join(left, right, JoinType::RightAnti, &rt) - }) - }); + // Utf8 keys, fanout ~10, 50% hit rate + // Duplicate build keys exercise the hash-chain walk with expensive + // key comparisons. + let fanout_keys = 10_000; + bench_scenario( + &mut group, + &utf8_s, + &rt, + &BenchScenario { + suffix: "utf8_fanout10_h50", + probe_rows, + left_batches: build_utf8_batches(build_rows, fanout_keys, &utf8_s), + right_batches: build_utf8_batches(probe_rows, fanout_keys * 2, &utf8_s), + semi_output_rows: probe_rows / 2, + uses_array_map: false, + }, + ); } group.finish(); diff --git a/datafusion/physical-plan/src/joins/array_map.rs b/datafusion/physical-plan/src/joins/array_map.rs index ad40d6776df4f..d6df9c9e03877 100644 --- a/datafusion/physical-plan/src/joins/array_map.rs +++ b/datafusion/physical-plan/src/joins/array_map.rs @@ -18,6 +18,7 @@ use arrow_schema::DataType; use num_traits::AsPrimitive; use std::mem::size_of; +use std::ops::Range; use crate::joins::MapOffset; use crate::joins::chain::traverse_chain; @@ -142,7 +143,6 @@ impl ArrayMap { } /// Estimates the maximum memory usage for an `ArrayMap` with the given parameters. - /// pub fn estimate_memory_size(min_val: u64, max_val: u64, num_rows: usize) -> usize { let range = Self::calculate_range(min_val, max_val); if range >= usize::MAX as u64 { @@ -157,6 +157,16 @@ impl ArrayMap { max_val.wrapping_sub(min_val) } + #[inline] + fn key_to_index(key: u64, offset: u64, data_len: usize) -> Option { + let idx = key.wrapping_sub(offset); + if idx < data_len as u64 { + Some(idx as usize) + } else { + None + } + } + /// Creates a new [`ArrayMap`] from the given array of join keys. /// /// Note: This function processes only the non-null values in the input `array`, @@ -207,10 +217,9 @@ impl ArrayMap { for (i, val) in arr.iter().enumerate().rev() { if let Some(val) = val { let key: u64 = val.as_(); - let idx = key.wrapping_sub(offset_val) as usize; - if idx >= data.len() { + let Some(idx) = Self::key_to_index(key, offset_val, data.len()) else { return internal_err!("failed build Array idx >= data.len()"); - } + }; if data[idx] != 0 { if next.is_empty() { @@ -264,6 +273,76 @@ impl ArrayMap { ) } + /// Returns whether `key` (a raw probe value cast to `u64`) is present in the + /// build side, i.e. maps to a non-empty bucket. + #[inline] + fn key_present(&self, key: u64) -> bool { + let Some(idx) = Self::key_to_index(key, self.offset, self.data.len()) else { + return false; + }; + self.data[idx] != 0 + } + + pub fn get_probe_indices_with_any_match( + &self, + prob_side_keys: &[ArrayRef], + range: Range, + probe_indices: &mut Vec, + ) -> Result<()> { + if prob_side_keys.len() != 1 { + return internal_err!( + "ArrayMap expects 1 join key, but got {}", + prob_side_keys.len() + ); + } + let array = &prob_side_keys[0]; + + downcast_supported_integer!( + array.data_type() => ( + lookup_and_get_probe_indices, + self, + array, + range, + probe_indices + ) + ) + } + + fn lookup_and_get_probe_indices( + &self, + array: &ArrayRef, + range: Range, + probe_indices: &mut Vec, + ) -> Result<()> + where + T::Native: Copy + AsPrimitive, + { + probe_indices.clear(); + + let arr = array.as_primitive::(); + let end = range.end.min(arr.len()); + if range.start >= end { + return Ok(()); + } + + let have_null = arr.null_count() > 0; + for probe_idx in range.start..end { + if have_null && arr.is_null(probe_idx) { + // Skip NULLs: under NullEqualsNothing, they will never match a + // build row, and ArrayMap is not used with NullEqualsNull. + continue; + } + + // SAFETY: probe_idx is guaranteed to be within bounds by the loop range. + let prob_val: u64 = unsafe { arr.value_unchecked(probe_idx) }.as_(); + if self.key_present(prob_val) { + probe_indices.push(probe_idx as u32); + } + } + + Ok(()) + } + fn lookup_and_get_indices( &self, array: &ArrayRef, @@ -294,11 +373,13 @@ impl ArrayMap { } // SAFETY: prob_idx is guaranteed to be within bounds by the loop range. let prob_val: u64 = unsafe { arr.value_unchecked(prob_idx) }.as_(); - let idx_in_build_side = prob_val.wrapping_sub(self.offset) as usize; + let Some(idx_in_build_side) = + Self::key_to_index(prob_val, self.offset, self.data.len()) + else { + continue; + }; - if idx_in_build_side >= self.data.len() - || self.data[idx_in_build_side] == 0 - { + if self.data[idx_in_build_side] == 0 { continue; } build_indices.push((self.data[idx_in_build_side] - 1) as u64); @@ -345,10 +426,12 @@ impl ArrayMap { // SAFETY: prob_idx is guaranteed to be within bounds by the loop range. let prob_val: u64 = unsafe { arr.value_unchecked(prob_side_idx) }.as_(); - let idx_in_build_side = prob_val.wrapping_sub(self.offset) as usize; - if idx_in_build_side >= self.data.len() - || self.data[idx_in_build_side] == 0 - { + let Some(idx_in_build_side) = + Self::key_to_index(prob_val, self.offset, self.data.len()) + else { + continue; + }; + if self.data[idx_in_build_side] == 0 { continue; } @@ -402,8 +485,7 @@ impl ArrayMap { } // SAFETY: i is within bounds [0, arr.len()) let key: u64 = unsafe { arr.value_unchecked(i) }.as_(); - let idx = key.wrapping_sub(self.offset) as usize; - idx < self.data.len() && self.data[idx] != 0 + self.key_present(key) }); Ok(BooleanArray::new(buffer, None)) } @@ -414,6 +496,7 @@ mod tests { use super::*; use arrow::array::Int32Array; use arrow::array::Int64Array; + use arrow::array::UInt64Array; use std::sync::Arc; #[test] @@ -506,6 +589,80 @@ mod tests { Ok(()) } + #[test] + fn test_array_map_probe_indices_with_any_match() -> Result<()> { + let build: ArrayRef = Arc::new(Int32Array::from(vec![1, 1, 3, 5, 5])); + let map = ArrayMap::try_new(&build, 1, 5)?; + let probe = [Arc::new(Int32Array::from(vec![ + Some(0), + Some(1), + Some(1), + Some(2), + Some(3), + None, + Some(4), + Some(5), + ])) as ArrayRef]; + + let mut probe_indices = vec![99]; + map.get_probe_indices_with_any_match(&probe, 2..8, &mut probe_indices)?; + + assert_eq!(probe_indices, vec![2, 4, 7]); + Ok(()) + } + + #[test] + fn test_array_map_rejects_large_out_of_range_probe_key() -> Result<()> { + let build: ArrayRef = + Arc::new(UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])); + let map = ArrayMap::try_new(&build, 0, 10)?; + + assert_eq!(ArrayMap::key_to_index(3, 0, 11), Some(3)); + + // Pick a key for which the computed bucket offset is larger than + // u32::MAX but has low 32 bits equal to 3. It must be bounds-checked + // before casting to usize, otherwise 32-bit targets can truncate it + // into range. + let out_of_range_key = (1_u64 << 32) + 3; + assert_eq!(ArrayMap::key_to_index(out_of_range_key, 0, 11), None); + + let probe = [Arc::new(UInt64Array::from(vec![ + Some(3), + Some(out_of_range_key), + Some(11), + None, + ])) as ArrayRef]; + + let mut probe_indices = vec![]; + map.get_probe_indices_with_any_match( + &probe, + 0..probe[0].len(), + &mut probe_indices, + )?; + assert_eq!(probe_indices, vec![0]); + + let mut matched_probe_indices = vec![]; + let mut matched_build_indices = vec![]; + let next = map.get_matched_indices_with_limit_offset( + &probe, + 10, + (0, None), + &mut matched_probe_indices, + &mut matched_build_indices, + )?; + assert_eq!(matched_probe_indices, vec![0]); + assert_eq!(matched_build_indices, vec![3]); + assert!(next.is_none()); + + let contains = map.contain_keys(&probe)?; + assert!(contains.value(0)); + assert!(!contains.value(1)); + assert!(!contains.value(2)); + assert!(!contains.value(3)); + + Ok(()) + } + #[test] fn test_array_map_i64_with_negative_and_positive_numbers() -> Result<()> { // Build array with a mix of negative and positive i64 values, no duplicates diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 3774a300209d0..6afd5d5835684 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -100,8 +100,6 @@ use super::partitioned_hash_eval::SeededRandomState; pub(crate) const HASH_JOIN_SEED: SeededRandomState = SeededRandomState::with_seed(12210250226015887276); -const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &str = "array_map_created_count"; - #[expect(clippy::too_many_arguments)] fn try_create_array_map( bounds: &Option, @@ -797,6 +795,11 @@ impl EmbeddedProjection for HashJoinExec { } impl HashJoinExec { + /// Name of the metric counting how many partitions selected the + /// [`ArrayMap`]-based build-side strategy instead of a hash table. + pub const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &'static str = + "array_map_created_count"; + /// Tries to create a new [`HashJoinExec`]. /// /// # Error @@ -1309,7 +1312,7 @@ impl ExecutionPlan for HashJoinExec { let array_map_created_count = MetricBuilder::new(&self.metrics) .with_category(MetricCategory::Rows) - .counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition); + .counter(Self::ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition); // Initialize build_accumulator lazily with runtime partition counts (only if enabled) // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing @@ -2096,7 +2099,7 @@ mod tests { if use_phj { assert!( metrics - .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME) + .sum_by_name(HashJoinExec::ARRAY_MAP_CREATED_COUNT_METRIC_NAME) .expect("should have array_map_created_count metrics") .as_usize() >= 1 @@ -2104,7 +2107,7 @@ mod tests { } else { assert_eq!( metrics - .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME) + .sum_by_name(HashJoinExec::ARRAY_MAP_CREATED_COUNT_METRIC_NAME) .map(|v| v.as_usize()) .unwrap_or(0), 0 @@ -3401,6 +3404,22 @@ mod tests { ) } + fn build_high_fanout_left_table() -> Arc { + build_table( + ("a1", &vec![10, 11, 12, 13, 14, 15, 16, 17, 18]), + ("b1", &vec![1, 1, 1, 1, 3, 3, 5, 5, 5]), + ("c1", &vec![100, 110, 120, 130, 140, 150, 160, 170, 180]), + ) + } + + fn build_high_fanout_right_table() -> Arc { + build_table( + ("a2", &vec![0, 1, 2, 3, 4, 5, 6, 7]), + ("b2", &vec![0, 1, 1, 2, 3, 1, 4, 5]), + ("c2", &vec![10, 20, 30, 40, 50, 60, 70, 80]), + ) + } + #[apply(hash_join_exec_configs)] #[tokio::test] async fn join_left_semi( @@ -3606,6 +3625,85 @@ mod tests { Ok(()) } + #[tokio::test] + async fn join_right_semi_high_fanout_hashmap_preserves_probe_order() -> Result<()> { + let task_ctx = prepare_task_ctx(2, false); + let left = build_high_fanout_left_table(); + let right = build_high_fanout_right_table(); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b2", &right.schema())?) as _, + )]; + + let (columns, batches, metrics) = join_collect( + left, + right, + on, + &JoinType::RightSemi, + NullEquality::NullEqualsNothing, + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a2", "b2", "c2"]); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r" + +----+----+----+ + | a2 | b2 | c2 | + +----+----+----+ + | 1 | 1 | 20 | + | 2 | 1 | 30 | + | 4 | 3 | 50 | + | 5 | 1 | 60 | + | 7 | 5 | 80 | + +----+----+----+ + "); + } + assert_join_metrics!(metrics, 5); + assert_phj_used(&metrics, false); + + Ok(()) + } + + #[tokio::test] + async fn join_right_semi_high_fanout_partitioned_hashmap() -> Result<()> { + let task_ctx = prepare_task_ctx(2, false); + let left = build_high_fanout_left_table(); + let right = build_high_fanout_right_table(); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b2", &right.schema())?) as _, + )]; + + let (columns, batches, metrics) = partitioned_join_collect( + left, + right, + on, + &JoinType::RightSemi, + NullEquality::NullEqualsNothing, + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a2", "b2", "c2"]); + let expected = [ + "+----+----+----+", + "| a2 | b2 | c2 |", + "+----+----+----+", + "| 1 | 1 | 20 |", + "| 2 | 1 | 30 |", + "| 4 | 3 | 50 |", + "| 5 | 1 | 60 |", + "| 7 | 5 | 80 |", + "+----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + assert_join_metrics!(metrics, 5); + assert_phj_used(&metrics, false); + + Ok(()) + } + #[apply(hash_join_exec_configs)] #[tokio::test] async fn join_right_semi_with_filter( @@ -3924,6 +4022,254 @@ mod tests { Ok(()) } + #[tokio::test] + async fn join_right_anti_high_fanout_hashmap_preserves_probe_order() -> Result<()> { + let task_ctx = prepare_task_ctx(2, false); + let left = build_high_fanout_left_table(); + let right = build_high_fanout_right_table(); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b2", &right.schema())?) as _, + )]; + + let (columns, batches, metrics) = join_collect( + left, + right, + on, + &JoinType::RightAnti, + NullEquality::NullEqualsNothing, + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a2", "b2", "c2"]); + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches), @r" + +----+----+----+ + | a2 | b2 | c2 | + +----+----+----+ + | 0 | 0 | 10 | + | 3 | 2 | 40 | + | 6 | 4 | 70 | + +----+----+----+ + "); + } + assert_join_metrics!(metrics, 3); + assert_phj_used(&metrics, false); + + Ok(()) + } + + #[tokio::test] + async fn join_right_anti_high_fanout_partitioned_hashmap() -> Result<()> { + let task_ctx = prepare_task_ctx(2, false); + let left = build_high_fanout_left_table(); + let right = build_high_fanout_right_table(); + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b2", &right.schema())?) as _, + )]; + + let (columns, batches, metrics) = partitioned_join_collect( + left, + right, + on, + &JoinType::RightAnti, + NullEquality::NullEqualsNothing, + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a2", "b2", "c2"]); + let expected = [ + "+----+----+----+", + "| a2 | b2 | c2 |", + "+----+----+----+", + "| 0 | 0 | 10 |", + "| 3 | 2 | 40 |", + "| 6 | 4 | 70 |", + "+----+----+----+", + ]; + assert_batches_sorted_eq!(expected, &batches); + assert_join_metrics!(metrics, 3); + assert_phj_used(&metrics, false); + + Ok(()) + } + + #[tokio::test] + async fn join_right_semi_anti_hashmap_null_equality() -> Result<()> { + let task_ctx = prepare_task_ctx(2, false); + let left = build_table_two_cols( + ("a1", &vec![Some(1), None, Some(3)]), + ("b1", &vec![Some(10), Some(20), Some(30)]), + ); + let right = build_table_two_cols( + ("a2", &vec![Some(1), None, Some(2), None]), + ("b2", &vec![Some(10), Some(20), Some(30), Some(40)]), + ); + let on = vec![( + Arc::new(Column::new_with_schema("a1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("a2", &right.schema())?) as _, + )]; + + let (_, batches_null_eq, _) = join_collect( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + &JoinType::RightSemi, + NullEquality::NullEqualsNull, + Arc::clone(&task_ctx), + ) + .await?; + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches_null_eq), @r" + +----+----+ + | a2 | b2 | + +----+----+ + | 1 | 10 | + | | 20 | + | | 40 | + +----+----+ + "); + } + + let (_, batches_null_neq, _) = join_collect( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + &JoinType::RightSemi, + NullEquality::NullEqualsNothing, + Arc::clone(&task_ctx), + ) + .await?; + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches_null_neq), @r" + +----+----+ + | a2 | b2 | + +----+----+ + | 1 | 10 | + +----+----+ + "); + } + + let (_, batches_null_eq, _) = join_collect( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + &JoinType::RightAnti, + NullEquality::NullEqualsNull, + Arc::clone(&task_ctx), + ) + .await?; + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches_null_eq), @r" + +----+----+ + | a2 | b2 | + +----+----+ + | 2 | 30 | + +----+----+ + "); + } + + let (_, batches_null_neq, _) = join_collect( + left, + right, + on, + &JoinType::RightAnti, + NullEquality::NullEqualsNothing, + task_ctx, + ) + .await?; + allow_duplicates! { + assert_snapshot!(batches_to_string(&batches_null_neq), @r" + +----+----+ + | a2 | b2 | + +----+----+ + | | 20 | + | 2 | 30 | + | | 40 | + +----+----+ + "); + } + + Ok(()) + } + + #[tokio::test] + async fn join_right_semi_anti_phj_null_equality() -> Result<()> { + let task_ctx = prepare_task_ctx(2, true); + let left = build_table_two_cols( + ("a1", &vec![Some(1), None, Some(3)]), + ("b1", &vec![Some(10), Some(20), Some(30)]), + ); + let right = build_table_two_cols( + ("a2", &vec![Some(1), None, Some(2), None]), + ("b2", &vec![Some(10), Some(20), Some(30), Some(40)]), + ); + let on = vec![( + Arc::new(Column::new_with_schema("a1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("a2", &right.schema())?) as _, + )]; + + let (_, _, metrics) = join_collect( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + &JoinType::RightSemi, + NullEquality::NullEqualsNull, + Arc::clone(&task_ctx), + ) + .await?; + assert_phj_used(&metrics, false); + + let (_, batches_null_neq, metrics) = join_collect( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + &JoinType::RightSemi, + NullEquality::NullEqualsNothing, + Arc::clone(&task_ctx), + ) + .await?; + assert_batches_eq!( + [ + "+----+----+", + "| a2 | b2 |", + "+----+----+", + "| 1 | 10 |", + "+----+----+", + ], + &batches_null_neq + ); + assert_phj_used(&metrics, true); + + let (_, batches_null_neq, metrics) = join_collect( + left, + right, + on, + &JoinType::RightAnti, + NullEquality::NullEqualsNothing, + task_ctx, + ) + .await?; + assert_batches_eq!( + [ + "+----+----+", + "| a2 | b2 |", + "+----+----+", + "| | 20 |", + "| 2 | 30 |", + "| | 40 |", + "+----+----+", + ], + &batches_null_neq + ); + assert_phj_used(&metrics, true); + + Ok(()) + } + #[apply(hash_join_exec_configs)] #[tokio::test] async fn join_right_anti_with_filter( diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index d403fa43cda4b..dff67e45a7d36 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -20,6 +20,7 @@ //! This module implements [`HashJoinStream`], the streaming engine for //! [`super::HashJoinExec`]. See comments in [`HashJoinStream`] for more details. +use std::ops::Range; use std::sync::Arc; use std::sync::atomic::Ordering; use std::task::Poll; @@ -33,21 +34,25 @@ use crate::joins::hash_join::shared_bounds::{ PartitionBounds, PartitionBuildData, SharedBuildAccumulator, }; use crate::joins::utils::{ - OnceFut, equal_rows_arr, get_final_indices_from_shared_bitmap, + OnceFut, equal_rows_arr, equal_rows_mask, get_anti_indices, + get_final_indices_from_shared_bitmap, is_contiguous_range, }; use crate::stream::EmptyRecordBatchStream; use crate::{ RecordBatchStream, SendableRecordBatchStream, handle_state, hash_utils::create_hashes, joins::utils::{ - BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType, + BuildProbeJoinMetrics, ColumnIndex, ExistenceProbe, JoinFilter, JoinHashMapType, StatefulStreamResult, adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_empty_build_side, build_batch_from_indices, need_produce_result_in_final, }, }; -use arrow::array::{Array, ArrayRef, UInt32Array, UInt64Array}; +use arrow::array::{ + Array, ArrayRef, BooleanArray, UInt32Array, UInt64Array, downcast_array, +}; +use arrow::compute::FilterBuilder; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use datafusion_common::{ @@ -97,15 +102,6 @@ impl BuildSide { _ => internal_err!("Expected build side in ready state"), } } - - /// Tries to extract BuildSideReadyState from BuildSide enum. - /// Returns an error if state is not Ready. - fn try_as_ready_mut(&mut self) -> Result<&mut BuildSideReadyState> { - match self { - BuildSide::Ready(state) => Ok(state), - _ => internal_err!("Expected build side in ready state"), - } - } } /// Represents state of HashJoinStream @@ -316,10 +312,8 @@ pub(super) struct HashJoinStream { batch_size: usize, /// Scratch space for computing hashes hashes_buffer: Vec, - /// Scratch space for probe indices during hash lookup - probe_indices_buffer: Vec, - /// Scratch space for build indices during hash lookup - build_indices_buffer: Vec, + /// Reusable scratch space for probe-side lookups + probe_scratch: ProbeScratch, /// Specifies whether the right side has an ordering to potentially preserve right_side_ordered: bool, /// Owns this partition's build-data report lifecycle. @@ -387,6 +381,37 @@ impl RecordBatchStream for HashJoinStream { /// Build indices: 4, 5, 6, 6 /// Probe indices: 3, 3, 4, 5 /// ``` +/// Reusable scratch buffers for probe-side lookups, allocated once per stream +/// and recycled across probe chunks. +/// +/// `probe_indices` and `build_indices` are shared by the pair-producing and +/// existence-probe paths; the candidate-window buffers belong to the +/// existence probe's chain walk and stay empty on the pair-producing path. +struct ProbeScratch { + /// Probe-side row indices produced by map lookups + probe_indices: Vec, + /// Build-side row indices produced by map lookups + build_indices: Vec, + /// Original candidate position of each entry in the current window + candidate_positions: Vec, + /// Original candidate position of each entry in the next window + next_candidate_positions: Vec, + /// Per-candidate match verdicts, indexed by original candidate position + matched_candidates: Vec, +} + +impl ProbeScratch { + fn with_capacity(capacity: usize) -> Self { + Self { + probe_indices: Vec::with_capacity(capacity), + build_indices: Vec::with_capacity(capacity), + candidate_positions: Vec::new(), + next_candidate_positions: Vec::new(), + matched_candidates: Vec::new(), + } + } +} + #[expect(clippy::too_many_arguments)] pub(super) fn lookup_join_hashmap( build_hashmap: &dyn JoinHashMapType, @@ -429,6 +454,182 @@ pub(super) fn lookup_join_hashmap( Ok((build_indices, probe_indices, next_offset)) } +/// Returns the sorted, deduplicated indices of the probe rows in +/// `probe_range` that have at least one key-equal row on the build side. +/// +/// This is the chained-hash-map implementation of the existence probe used by +/// `HashJoinStream::try_existence_probe_batch`; the dense-key `ArrayMap` +/// strategy provides an equivalent inherent method of the same name. +/// +/// # Algorithm +/// +/// Walking each probe row's hash chain to completion would do work +/// proportional to fanout, and comparing keys row-by-row during the walk +/// would lose vectorization. Instead, all chains are walked in lockstep, one +/// link per round, comparing keys for a whole round with one vectorized +/// [`equal_rows_mask`] call: +/// +/// 1. Gather the initial candidate window: each probe row's first chain +/// entry. +/// 2. Compare build/probe join keys for the entire window at once. +/// 3. Window entries whose keys matched are finished; the rest advance to +/// their next chain entry, dropping out when their chain is exhausted. +/// Repeat from step 2 until the window is empty. +/// +/// A hash chain groups build rows with identical hash values: absent collisions +/// a chain holds duplicates of a single key, and a probe row either matches its +/// first candidate or will match none of them. In practice, nearly every row +/// will resolve in the first round; later rounds handle hash collisions. +/// +/// `scratch` is reused across calls: its index vectors back the per-round +/// index arrays and are returned (emptied) on exit, and its candidate-window +/// buffers retain their allocations across rounds and chunks. +fn get_probe_indices_with_any_match( + existence_probe: &dyn ExistenceProbe, + build_side_values: &[ArrayRef], + probe_side_values: &[ArrayRef], + null_equality: NullEquality, + hashes_buffer: &[u64], + probe_range: Range, + scratch: &mut ProbeScratch, +) -> Result { + // Gather the initial candidates: a (build index, probe index) pair for + // each probe row in `probe_range` whose hash is present in the map. + existence_probe.get_probe_first_candidates( + hashes_buffer, + probe_range, + &mut scratch.probe_indices, + &mut scratch.build_indices, + ); + + let mut current_build_indices: UInt64Array = + std::mem::take(&mut scratch.build_indices).into(); + let initial_probe_indices: UInt32Array = + std::mem::take(&mut scratch.probe_indices).into(); + + let equal_mask = equal_rows_mask( + ¤t_build_indices, + &initial_probe_indices, + build_side_values, + probe_side_values, + null_equality, + )?; + + // The equality check passed for every candidate, so the matched set is all + // of `initial_probe_indices`: return it as-is instead of filtering by an + // all-true mask. + if equal_mask.null_count() == 0 && equal_mask.true_count() == equal_mask.len() { + scratch.build_indices = current_build_indices.into_parts().1.into(); + let capacity = initial_probe_indices.len(); + scratch.probe_indices = Vec::with_capacity(capacity); + return Ok(initial_probe_indices); + } + + // Unique hashes mean every chain has length 1, so the chain walk below + // is unnecessary. + if existence_probe.has_unique_hashes() { + let filter_builder = FilterBuilder::new(&equal_mask).optimize().build(); + let matched_probe_indices = filter_builder.filter(&initial_probe_indices)?; + + scratch.build_indices = current_build_indices.into_parts().1.into(); + scratch.probe_indices = initial_probe_indices.into_parts().1.into(); + + return Ok(downcast_array(matched_probe_indices.as_ref())); + } + + /// Marks the candidates of the current window whose join keys compared equal. + fn mark_matched(equal_mask: &BooleanArray, positions: &[u32], matched: &mut [bool]) { + if equal_mask.null_count() == 0 && equal_mask.true_count() == equal_mask.len() { + for &position in positions { + matched[position as usize] = true; + } + } else { + for (i, &position) in positions.iter().enumerate() { + if !equal_mask.is_null(i) && equal_mask.value(i) { + matched[position as usize] = true; + } + } + } + } + + // General case: walk the chains round by round. `matched_candidates[i]` + // records the verdict for the i-th initial candidate (one per probe + // row); because the window shrinks as rows resolve, each window entry + // carries its original candidate position in `candidate_positions` + // (initially the identity mapping) so later rounds can record verdicts + // in the right slot. + let candidate_count = initial_probe_indices.len(); + scratch.candidate_positions.clear(); + scratch + .candidate_positions + .extend(0..candidate_count as u32); + scratch.matched_candidates.clear(); + scratch.matched_candidates.resize(candidate_count, false); + // Cheap Arc clone; reassigned (releasing the alias) on the first chain advance. + let mut current_probe_indices = initial_probe_indices.clone(); + let mut equal_mask = equal_mask; + + loop { + mark_matched( + &equal_mask, + &scratch.candidate_positions, + &mut scratch.matched_candidates, + ); + + // Advance each unmatched window entry one link down its hash chain; + // matched and chain-exhausted entries drop out of the window. + existence_probe.get_next_probe_candidates( + current_build_indices.values().as_ref(), + current_probe_indices.values().as_ref(), + &scratch.candidate_positions, + &scratch.matched_candidates, + &mut scratch.next_candidate_positions, + &mut scratch.build_indices, + &mut scratch.probe_indices, + ); + + if scratch.build_indices.is_empty() { + break; + } + + std::mem::swap( + &mut scratch.candidate_positions, + &mut scratch.next_candidate_positions, + ); + current_build_indices = std::mem::take(&mut scratch.build_indices).into(); + current_probe_indices = std::mem::take(&mut scratch.probe_indices).into(); + + equal_mask = equal_rows_mask( + ¤t_build_indices, + ¤t_probe_indices, + build_side_values, + probe_side_values, + null_equality, + )?; + } + + // Release the potential alias of `initial_probe_indices` so the buffer + // recycling below stays zero-copy. + drop(current_probe_indices); + + // Collect the probe index of every candidate position that matched in + // some round; ascending position order keeps the result sorted. + scratch.probe_indices.clear(); + for (position, &is_matched) in scratch.matched_candidates.iter().enumerate() { + if is_matched { + scratch + .probe_indices + .push(initial_probe_indices.value(position)); + } + } + + scratch.build_indices = current_build_indices.into_parts().1.into(); + let matched_probe_indices = std::mem::take(&mut scratch.probe_indices).into(); + scratch.probe_indices = initial_probe_indices.into_parts().1.into(); + + Ok(matched_probe_indices) +} + /// Counts the number of distinct elements in the input array. /// /// The input array must be sorted (e.g., `[0, 1, 1, 2, 2, ...]`) and contain no null values. @@ -458,6 +659,38 @@ fn count_distinct_sorted_indices(indices: &UInt32Array) -> usize { count } +/// Builds an output batch by slicing `probe_batch` instead of `take`-ing it, +/// when `probe_indices` form a contiguous ascending run and every output +/// column is sourced from the probe side. +/// +/// Returns `Ok(None)` when the fast path does not apply and the caller should +/// fall back to [`build_batch_from_indices`]. +fn try_build_contiguous_probe_output( + schema: &Arc, + column_indices: &[ColumnIndex], + probe_batch: &RecordBatch, + probe_indices: &UInt32Array, +) -> Result> { + if schema.fields().is_empty() { + return Ok(None); + } + + let Some(range) = is_contiguous_range(probe_indices) else { + return Ok(None); + }; + + let probe_slice = probe_batch.slice(range.start, range.len()); + let mut columns = Vec::with_capacity(column_indices.len()); + for column_index in column_indices { + if column_index.side != JoinSide::Right { + return Ok(None); + } + columns.push(Arc::clone(probe_slice.column(column_index.index))); + } + + Ok(Some(RecordBatch::try_new(Arc::clone(schema), columns)?)) +} + impl HashJoinStream { #[expect(clippy::too_many_arguments)] pub(super) fn new( @@ -500,8 +733,7 @@ impl HashJoinStream { build_side, batch_size, hashes_buffer, - probe_indices_buffer: Vec::with_capacity(batch_size), - build_indices_buffer: Vec::with_capacity(batch_size), + probe_scratch: ProbeScratch::with_capacity(batch_size), right_side_ordered, build_report: BuildReportHandle::new(partition, mode, build_accumulator), mode, @@ -510,6 +742,133 @@ impl HashJoinStream { } } + /// Try the existence-probe fast path for a probe-batch chunk. + /// + /// RightSemi and RightAnti joins only need to know whether each probe row + /// has at least one equal build row. For those joins, when there is no + /// join filter, the build side is non-empty, and the build-side map + /// supports [`ExistenceProbe`], this path scans a range of probe rows and + /// emits each probe row at most once, stopping each row's hash-chain + /// search at its first key-equal match. + /// + /// Returns false when the path is ineligible and the caller should fall + /// back to the general pair-producing path. + fn try_existence_probe_batch(&mut self) -> Result { + let state = self.state.try_as_process_probe_batch_mut()?; + let build_side = self.build_side.try_as_ready()?; + let probe_batch_num_rows = state.batch.num_rows(); + + // An empty build side falls through to the general path, which emits + // whole probe batches at once via `build_batch_empty_build_side` + // instead of probing the empty map row by row. + if !matches!(self.join_type, JoinType::RightSemi | JoinType::RightAnti) + || self.filter.is_some() + || build_side.left_data.map().is_empty() + { + return Ok(false); + } + + let timer = self.join_metrics.join_time.timer(); + + let probe_range_start = state.offset.0; + let probe_range_end = + (probe_range_start + self.batch_size).min(probe_batch_num_rows); + let probe_range = probe_range_start..probe_range_end; + + let matched_probe_indices = match build_side.left_data.map() { + Map::HashMap(map) => { + let Some(existence_probe) = map.existence_probe() else { + timer.done(); + return Ok(false); + }; + get_probe_indices_with_any_match( + existence_probe, + build_side.left_data.values(), + &state.values, + self.null_equality, + &self.hashes_buffer, + probe_range.clone(), + &mut self.probe_scratch, + )? + } + Map::ArrayMap(array_map) => { + array_map.get_probe_indices_with_any_match( + &state.values, + probe_range.clone(), + &mut self.probe_scratch.probe_indices, + )?; + UInt32Array::from(std::mem::take(&mut self.probe_scratch.probe_indices)) + } + }; + let matched_probe_count = matched_probe_indices.len(); + + // The existence path stops after the first equal build row for each + // probe row, so exact fanout cannot be determined. Leave avg_fanout as + // N/A for this path. + self.join_metrics + .probe_hit_rate + .add_total(probe_range.len()); + self.join_metrics + .probe_hit_rate + .add_part(matched_probe_count); + + let right_indices = match self.join_type { + JoinType::RightSemi => matched_probe_indices.clone(), + JoinType::RightAnti => { + get_anti_indices(probe_range.clone(), &matched_probe_indices) + } + _ => { + unreachable!("existence probe only runs for right semi/anti") + } + }; + + let push_status = if right_indices.is_empty() { + self.probe_scratch.probe_indices = + matched_probe_indices.into_parts().1.into(); + PushBatchStatus::Continue + } else { + let batch = if let Some(batch) = try_build_contiguous_probe_output( + &self.schema, + &self.column_indices, + &state.batch, + &right_indices, + )? { + batch + } else { + let left_indices = UInt64Array::from(Vec::::new()); + build_batch_from_indices( + &self.schema, + build_side.left_data.batch(), + &state.batch, + &left_indices, + &right_indices, + &self.column_indices, + JoinSide::Left, + self.join_type, + )? + }; + + let push_status = self.output_buffer.push_batch(batch)?; + self.probe_scratch.probe_indices = right_indices.into_parts().1.into(); + push_status + }; + + timer.done(); + + if probe_range_end == probe_batch_num_rows { + self.state = HashJoinStreamState::FetchProbeBatch; + } else { + state.advance((probe_range_end, None), None); + } + + if push_status == PushBatchStatus::LimitReached { + self.output_buffer.finish()?; + self.state = HashJoinStreamState::Completed; + } + + Ok(true) + } + /// Returns the next state after the build side has been fully collected /// and any required build-side coordination has completed. fn state_after_build_ready( @@ -712,8 +1071,13 @@ impl HashJoinStream { fn process_probe_batch( &mut self, ) -> Result>> { + if self.try_existence_probe_batch()? { + return Ok(StatefulStreamResult::Continue); + } + let state = self.state.try_as_process_probe_batch_mut()?; - let build_side = self.build_side.try_as_ready_mut()?; + let build_side = self.build_side.try_as_ready()?; + let is_empty = build_side.left_data.map().is_empty(); self.join_metrics .probe_hit_rate @@ -759,10 +1123,6 @@ impl HashJoinStream { } } - // If the build side is empty, this stream only reaches ProcessProbeBatch for - // join types whose output still depends on probe rows. - let is_empty = build_side.left_data.map().is_empty(); - if is_empty { // Invariant: state_after_build_ready should have already completed // join types whose result is fixed to empty when the build side is empty. @@ -781,7 +1141,9 @@ impl HashJoinStream { return Ok(StatefulStreamResult::Continue); } - // get the matched by join keys indices + // Find build/probe row pairs with equal join keys, bounded by + // `batch_size`. `next_offset` identifies where to resume if the + // candidate scan needs another output batch. let (left_indices, right_indices, next_offset) = match build_side.left_data.map() { Map::HashMap(map) => lookup_join_hashmap( @@ -792,20 +1154,20 @@ impl HashJoinStream { &self.hashes_buffer, self.batch_size, state.offset, - &mut self.probe_indices_buffer, - &mut self.build_indices_buffer, + &mut self.probe_scratch.probe_indices, + &mut self.probe_scratch.build_indices, )?, Map::ArrayMap(array_map) => { let next_offset = array_map.get_matched_indices_with_limit_offset( &state.values, self.batch_size, state.offset, - &mut self.probe_indices_buffer, - &mut self.build_indices_buffer, + &mut self.probe_scratch.probe_indices, + &mut self.probe_scratch.build_indices, )?; ( - UInt64Array::from(self.build_indices_buffer.clone()), - UInt32Array::from(self.probe_indices_buffer.clone()), + UInt64Array::from(self.probe_scratch.build_indices.clone()), + UInt32Array::from(self.probe_scratch.probe_indices.clone()), next_offset, ) } @@ -823,7 +1185,7 @@ impl HashJoinStream { .avg_fanout .add_total(distinct_right_indices_count); - // apply join filter if exists + // apply join filter, if any let (left_indices, right_indices) = if let Some(filter) = &self.filter { apply_join_filter_to_indices( build_side.left_data.batch(), diff --git a/datafusion/physical-plan/src/joins/join_hash_map.rs b/datafusion/physical-plan/src/joins/join_hash_map.rs index 8f0fb66b64fbf..2119046d5d4a3 100644 --- a/datafusion/physical-plan/src/joins/join_hash_map.rs +++ b/datafusion/physical-plan/src/joins/join_hash_map.rs @@ -20,6 +20,7 @@ //! ["on" values] to a list of indices with this key's value. use std::fmt::{self, Debug}; +use std::ops::Range; use std::ops::Sub; use arrow::array::BooleanArray; @@ -134,6 +135,59 @@ pub trait JoinHashMapType: Send + Sync { /// Returns the number of entries in the join hash map. fn len(&self) -> usize; + + /// Returns the existence-probe interface if this map supports first-match + /// probing, `None` otherwise. + /// + /// Callers that receive `None` must use the general pair-producing path + /// ([`Self::get_matched_indices_with_limit_offset`]) instead. + fn existence_probe(&self) -> Option<&dyn ExistenceProbe> { + None + } +} + +/// First-match probing protocol used by the hash join's semi/anti existence +/// fast path. +/// +/// The protocol walks build-side hash chains one link at a time across a +/// window of probe rows: [`get_probe_first_candidates`] gathers each probe +/// row's chain head as its initial candidate, the caller checks join-key +/// equality for the window in a vectorized fashion, and +/// [`get_next_probe_candidates`] advances only the rows that have not +/// matched yet. +/// +/// [`get_probe_first_candidates`]: Self::get_probe_first_candidates +/// [`get_next_probe_candidates`]: Self::get_next_probe_candidates +pub trait ExistenceProbe { + /// Collects the first build candidate for each probe row with a matching hash. + /// + /// The returned indices preserve ascending probe order. + fn get_probe_first_candidates( + &self, + hash_values: &[u64], + range: Range, + probe_indices: &mut Vec, + build_indices: &mut Vec, + ); + + /// Collects the next hash-chain candidate for each unmatched probe row. + /// + /// `matched_candidates` is indexed by the original candidate position, not + /// the current candidate window. + #[expect(clippy::too_many_arguments)] + fn get_next_probe_candidates( + &self, + build_indices: &[u64], + probe_indices: &[u32], + candidate_positions: &[u32], + matched_candidates: &[bool], + next_candidate_positions: &mut Vec, + next_build_indices: &mut Vec, + next_probe_indices: &mut Vec, + ); + + /// Returns `true` if every hash table entry has exactly one build row. + fn has_unique_hashes(&self) -> bool; } pub struct JoinHashMapU32 { @@ -212,6 +266,54 @@ impl JoinHashMapType for JoinHashMapU32 { fn len(&self) -> usize { self.map.len() } + + fn existence_probe(&self) -> Option<&dyn ExistenceProbe> { + Some(self) + } +} + +impl ExistenceProbe for JoinHashMapU32 { + fn get_probe_first_candidates( + &self, + hash_values: &[u64], + range: Range, + probe_indices: &mut Vec, + build_indices: &mut Vec, + ) { + get_probe_first_candidates( + &self.map, + hash_values, + range, + probe_indices, + build_indices, + ) + } + + fn get_next_probe_candidates( + &self, + build_indices: &[u64], + probe_indices: &[u32], + candidate_positions: &[u32], + matched_candidates: &[bool], + next_candidate_positions: &mut Vec, + next_build_indices: &mut Vec, + next_probe_indices: &mut Vec, + ) { + get_next_probe_candidates( + &self.next, + build_indices, + probe_indices, + candidate_positions, + matched_candidates, + next_candidate_positions, + next_build_indices, + next_probe_indices, + ) + } + + fn has_unique_hashes(&self) -> bool { + self.map.len() == self.next.len() + } } pub struct JoinHashMapU64 { @@ -290,6 +392,54 @@ impl JoinHashMapType for JoinHashMapU64 { fn len(&self) -> usize { self.map.len() } + + fn existence_probe(&self) -> Option<&dyn ExistenceProbe> { + Some(self) + } +} + +impl ExistenceProbe for JoinHashMapU64 { + fn get_probe_first_candidates( + &self, + hash_values: &[u64], + range: Range, + probe_indices: &mut Vec, + build_indices: &mut Vec, + ) { + get_probe_first_candidates( + &self.map, + hash_values, + range, + probe_indices, + build_indices, + ) + } + + fn get_next_probe_candidates( + &self, + build_indices: &[u64], + probe_indices: &[u32], + candidate_positions: &[u32], + matched_candidates: &[bool], + next_candidate_positions: &mut Vec, + next_build_indices: &mut Vec, + next_probe_indices: &mut Vec, + ) { + get_next_probe_candidates( + &self.next, + build_indices, + probe_indices, + candidate_positions, + matched_candidates, + next_candidate_positions, + next_build_indices, + next_probe_indices, + ) + } + + fn has_unique_hashes(&self) -> bool { + self.map.len() == self.next.len() + } } use crate::joins::MapOffset; @@ -464,6 +614,82 @@ where None } +pub fn get_probe_first_candidates( + map: &HashTable<(u64, T)>, + hash_values: &[u64], + range: Range, + probe_indices: &mut Vec, + build_indices: &mut Vec, +) where + T: Copy + TryFrom + Into + Sub, + >::Error: Debug, +{ + probe_indices.clear(); + build_indices.clear(); + + let end = range.end.min(hash_values.len()); + if range.start >= end { + return; + } + + let one = T::try_from(1).unwrap(); + + for (offset, &hash) in hash_values[range.start..end].iter().enumerate() { + let Some((_, index)) = map.find(hash, |(h, _)| hash == *h) else { + continue; + }; + + let build_idx = *index - one; + let build_idx_u64 = build_idx.into(); + probe_indices.push((range.start + offset) as u32); + build_indices.push(build_idx_u64); + } +} + +#[expect(clippy::too_many_arguments)] +pub fn get_next_probe_candidates( + next_chain: &[T], + build_indices: &[u64], + probe_indices: &[u32], + candidate_positions: &[u32], + matched_candidates: &[bool], + next_candidate_positions: &mut Vec, + next_build_indices: &mut Vec, + next_probe_indices: &mut Vec, +) where + T: Copy + TryFrom + PartialEq + Into + Sub, + >::Error: Debug, +{ + debug_assert_eq!(probe_indices.len(), build_indices.len()); + debug_assert_eq!(probe_indices.len(), candidate_positions.len()); + + next_candidate_positions.clear(); + next_build_indices.clear(); + next_probe_indices.clear(); + + let zero = T::try_from(0usize).unwrap(); + let one = T::try_from(1usize).unwrap(); + + for ((&position, &build_idx), &probe_idx) in candidate_positions + .iter() + .zip(build_indices.iter()) + .zip(probe_indices.iter()) + { + if matched_candidates[position as usize] { + continue; + } + + let next = next_chain[build_idx as usize]; + if next == zero { + continue; + } + + next_candidate_positions.push(position); + next_build_indices.push((next - one).into()); + next_probe_indices.push(probe_idx); + } +} + pub fn contain_hashes(map: &HashTable<(u64, T)>, hash_values: &[u64]) -> BooleanArray { let buffer = BooleanBuffer::collect_bool(hash_values.len(), |i| { let hash = hash_values[i]; @@ -476,6 +702,8 @@ pub fn contain_hashes(map: &HashTable<(u64, T)>, hash_values: &[u64]) -> Bool mod tests { use super::*; + use datafusion_common::Result; + #[test] fn test_contain_hashes() { let mut hash_map = JoinHashMapU32::with_capacity(10); @@ -494,4 +722,50 @@ mod tests { } } } + + #[test] + fn probe_first_candidates_preserve_order() -> Result<()> { + let mut hash_map = JoinHashMapU32::with_capacity(3); + hash_map.update_from_iter(Box::new([10u64, 10, 20].iter().enumerate()), 0); + + let probe_hashes = vec![30, 10, 20, 10]; + let mut probe_indices = vec![]; + let mut build_indices = vec![]; + get_probe_first_candidates( + &hash_map.map, + &probe_hashes, + 0..probe_hashes.len(), + &mut probe_indices, + &mut build_indices, + ); + + assert_eq!(probe_indices, vec![1, 2, 3]); + assert_eq!(build_indices, vec![1, 2, 1]); + Ok(()) + } + + #[test] + fn next_probe_candidates_skips_matched_and_exhausted_candidates() -> Result<()> { + let mut hash_map = JoinHashMapU32::with_capacity(2); + hash_map.update_from_iter(Box::new([10u64, 10].iter().enumerate()), 0); + + let mut next_positions = vec![99]; + let mut next_build_indices = vec![99]; + let mut next_probe_indices = vec![99]; + get_next_probe_candidates( + &hash_map.next, + &[1, 0], + &[3, 4], + &[0, 1], + &[false, true], + &mut next_positions, + &mut next_build_indices, + &mut next_probe_indices, + ); + + assert_eq!(next_positions, vec![0]); + assert_eq!(next_build_indices, vec![0]); + assert_eq!(next_probe_indices, vec![3]); + Ok(()) + } } diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs index 9bcc749c23dce..712b3517e2d78 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs @@ -38,7 +38,7 @@ use crate::joins::sort_merge_join::filter::{ get_filter_columns, needs_deferred_filtering, }; use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics; -use crate::joins::utils::{JoinFilter, JoinKeyComparator}; +use crate::joins::utils::{JoinFilter, JoinKeyComparator, is_contiguous_range}; use crate::metrics::RecordOutput; use crate::spill::spill_manager::SpillManager; use crate::stream::EmptyRecordBatchStream; @@ -1795,30 +1795,6 @@ fn produce_buffered_null_batch( )?)) } -/// Checks if a `UInt64Array` contains a contiguous ascending range (e.g. \[3,4,5,6\]). -/// Returns `Some(start..start+len)` if so, `None` otherwise. -/// This allows replacing an O(n) `take` with an O(1) `slice`. -#[inline] -fn is_contiguous_range(indices: &UInt64Array) -> Option> { - if indices.is_empty() || indices.null_count() > 0 { - return None; - } - let values = indices.values(); - let start = values[0]; - let len = values.len() as u64; - // Quick rejection: if last element doesn't match expected, not contiguous - if values[values.len() - 1] != start + len - 1 { - return None; - } - // Verify every element is sequential (handles duplicates and gaps) - for i in 1..values.len() { - if values[i] != start + i as u64 { - return None; - } - } - Some(start as usize..(start + len) as usize) -} - /// Get `buffered_indices` rows for `buffered_data[buffered_batch_idx]` by specific column indices #[inline(always)] fn fetch_right_columns_by_idxs( diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 5687be04ad867..7898303c67032 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -37,7 +37,7 @@ use crate::{ }; // compatibility pub use super::join_filter::JoinFilter; -pub use super::join_hash_map::JoinHashMapType; +pub use super::join_hash_map::{ExistenceProbe, JoinHashMapType}; pub use crate::joins::{JoinOn, JoinOnRef}; use arrow::array::{ @@ -68,7 +68,7 @@ use datafusion_common::stats::Precision; use datafusion_common::utils::normalize_float_zero; use datafusion_common::{ DataFusionError, JoinSide, JoinType, NullEquality, Result, SharedResult, - not_impl_err, plan_err, + internal_err, not_impl_err, plan_err, }; use datafusion_expr::Operator; use datafusion_expr::interval_arithmetic::Interval; @@ -1464,8 +1464,14 @@ pub(crate) fn adjust_indices_by_join_type( Ok((left_indices, right_indices)) } JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { - // matched or unmatched left row will be produced in the end of loop - // When visit the right batch, we can output the matched left row and don't need to wait the end of loop + // These join types output build-side rows. Instead of emitting + // during probing, each matched build row is recorded in the shared + // visited bitmap; the output is produced from that bitmap once the + // probe side is exhausted. Deferring this way emits each build row + // exactly once across all probe partitions. It is also necessary + // for anti rows, whose unmatched status is only known after the + // whole probe side has been seen. The probe phase contributes no + // rows here, so empty indices are returned. Ok(( UInt64Array::from_iter_values(vec![]), UInt32Array::from_iter_values(vec![]), @@ -1608,6 +1614,36 @@ where PrimitiveArray::::new(output.into(), None) } +/// Checks if an index array contains a contiguous ascending range (e.g. \[3,4,5,6\]). +/// Returns `Some(start..start+len)` if so, `None` otherwise (including for +/// empty or null-containing input). +/// This allows replacing an O(n) `take` with an O(1) `slice`. +#[inline] +pub(crate) fn is_contiguous_range( + indices: &PrimitiveArray, +) -> Option> +where + T::Native: Into, +{ + if indices.is_empty() || indices.null_count() > 0 { + return None; + } + let values = indices.values(); + let start: u64 = values[0].into(); + let len = values.len() as u64; + // Quick rejection: if last element doesn't match expected, not contiguous + if values[values.len() - 1].into() != start + len - 1 { + return None; + } + // Verify every element is sequential (handles duplicates and gaps) + for (i, &v) in values.iter().enumerate().skip(1) { + if v.into() != start + i as u64 { + return None; + } + } + Some(start as usize..(start + len) as usize) +} + /// Returns the intersection of `range` and `input_indices`, omitting duplicates. /// /// `input_indices` must be sorted ascending and contain no nulls. @@ -2146,10 +2182,42 @@ pub(super) fn equal_rows_arr( right_arrays: &[ArrayRef], null_equality: NullEquality, ) -> Result<(UInt64Array, UInt32Array)> { + if left_arrays.is_empty() { + return Ok((Vec::::new().into(), Vec::::new().into())); + }; + + let equal_mask = equal_rows_mask( + indices_left, + indices_right, + left_arrays, + right_arrays, + null_equality, + )?; + + let filter_builder = FilterBuilder::new(&equal_mask).optimize().build(); + + let left_filtered = filter_builder.filter(indices_left)?; + let right_filtered = filter_builder.filter(indices_right)?; + + Ok(( + downcast_array(left_filtered.as_ref()), + downcast_array(right_filtered.as_ref()), + )) +} + +pub(super) fn equal_rows_mask( + indices_left: &UInt64Array, + indices_right: &UInt32Array, + left_arrays: &[ArrayRef], + right_arrays: &[ArrayRef], + null_equality: NullEquality, +) -> Result { + debug_assert_eq!(indices_left.len(), indices_right.len()); + let mut iter = left_arrays.iter().zip(right_arrays.iter()); let Some((first_left, first_right)) = iter.next() else { - return Ok((Vec::::new().into(), Vec::::new().into())); + return internal_err!("equal_rows_mask requires at least one join key column"); }; let arr_left = take(first_left.as_ref(), indices_left, None)?; @@ -2168,15 +2236,7 @@ pub(super) fn equal_rows_arr( }) .try_fold(equal, |acc, equal2| and(&acc, &equal2?))?; - let filter_builder = FilterBuilder::new(&equal).optimize().build(); - - let left_filtered = filter_builder.filter(indices_left)?; - let right_filtered = filter_builder.filter(indices_right)?; - - Ok(( - downcast_array(left_filtered.as_ref()), - downcast_array(right_filtered.as_ref()), - )) + Ok(equal) } // version of eq_dyn supporting equality on null arrays From 262c59c1298737027c41e47848933696446a203e Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Thu, 11 Jun 2026 12:31:40 -0400 Subject: [PATCH 2/2] Fix docs CI --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 6afd5d5835684..c241ec90f2ed3 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -796,7 +796,7 @@ impl EmbeddedProjection for HashJoinExec { impl HashJoinExec { /// Name of the metric counting how many partitions selected the - /// [`ArrayMap`]-based build-side strategy instead of a hash table. + /// `ArrayMap`-based build-side strategy instead of a hash table. pub const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &'static str = "array_map_created_count";