Skip to content

Commit 6dcaf46

Browse files
committed
Add dedicated prefetch reservation
1 parent 9711351 commit 6dcaf46

2 files changed

Lines changed: 31 additions & 12 deletions

File tree

datafusion/physical-plan/src/joins/grace_hash_join/exec.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,11 @@ impl ExecutionPlan for GraceHashJoinExec {
726726
MemoryConsumer::new(format!("GraceHashJoinStream[{partition}]"))
727727
.with_can_spill(true)
728728
.register(context.memory_pool());
729+
// Separate reservation for prefetch to avoid blocking the main join reservation.
730+
let prefetch_reservation =
731+
MemoryConsumer::new(format!("GraceHashJoinPrefetch[{partition}]"))
732+
.with_can_spill(true)
733+
.register(context.memory_pool());
729734

730735
let max_partition_passes = context
731736
.session_config()
@@ -748,6 +753,7 @@ impl ExecutionPlan for GraceHashJoinExec {
748753
join_metrics,
749754
Arc::clone(&context),
750755
reservation,
756+
prefetch_reservation,
751757
self.random_state.clone(),
752758
partition_batch_size,
753759
base_partition_budget_bytes,
@@ -1039,11 +1045,11 @@ async fn partition_and_spill_one_side(
10391045
// Calculate dynamic buffer size threshold to keep total overhead under control.
10401046
// Scale write buffering based on the caller-provided budget but clamp it to
10411047
// reasonable min/max bounds so we avoid both tiny spill files and runaway memory.
1042-
const MIN_TOTAL_TARGET_BUFFER_BYTES: usize = 128 * 1024 * 1024;
1043-
const MAX_TOTAL_TARGET_BUFFER_BYTES: usize = 1024 * 1024 * 1024;
1044-
const MIN_FLUSH_BYTES: usize = 8 * 1024 * 1024;
1045-
const MAX_FLUSH_BYTES: usize = 128 * 1024 * 1024;
1046-
const MAX_SPILL_FILES_PER_SIDE: usize = 2048;
1048+
const MIN_TOTAL_TARGET_BUFFER_BYTES: usize = 256 * 1024 * 1024;
1049+
const MAX_TOTAL_TARGET_BUFFER_BYTES: usize = 2 * 1024 * 1024 * 1024;
1050+
const MIN_FLUSH_BYTES: usize = 16 * 1024 * 1024;
1051+
const MAX_FLUSH_BYTES: usize = 256 * 1024 * 1024;
1052+
const MAX_SPILL_FILES_PER_SIDE: usize = 4096;
10471053

10481054
let total_target_buffer = partition_write_buffer_bytes
10491055
.clamp(MIN_TOTAL_TARGET_BUFFER_BYTES, MAX_TOTAL_TARGET_BUFFER_BYTES);

datafusion/physical-plan/src/joins/grace_hash_join/stream.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ enum GraceJoinState {
9494
/// Bytes reserved in the memory pool for the current partition's
9595
/// loaded right batches
9696
right_bytes: Arc<Mutex<usize>>,
97+
/// Reservation used to track memory for the current partition's loaded batches
98+
reservation: Arc<Mutex<MemoryReservation>>,
9799
current_join_start: Option<Instant>,
98100
repartition_fut: Option<OnceFut<Vec<PartitionWorkItem>>>,
99101
/// Prefetch for the next partition (at most one in-flight)
@@ -456,6 +458,8 @@ pub struct GraceHashJoinStream {
456458
context: Arc<TaskContext>,
457459
/// Memory reservation tracking in-memory buffers used by the join stream
458460
reservation: Arc<Mutex<MemoryReservation>>,
461+
/// Memory reservation dedicated to prefetching the next partition
462+
prefetch_reservation: Arc<Mutex<MemoryReservation>>,
459463
random_state: RandomState,
460464
partition_batch_size: usize,
461465
adaptive_budget: AdaptivePartitionBudget,
@@ -494,6 +498,7 @@ struct PrefetchState {
494498
right_fut: OnceFut<LoadedPartitionBatches>,
495499
left_bytes: Arc<Mutex<usize>>,
496500
right_bytes: Arc<Mutex<usize>>,
501+
reservation: Arc<Mutex<MemoryReservation>>,
497502
}
498503

499504
impl RecordBatchStream for GraceHashJoinStream {
@@ -519,6 +524,7 @@ impl GraceHashJoinStream {
519524
join_metrics: Arc<BuildProbeJoinMetrics>,
520525
context: Arc<TaskContext>,
521526
reservation: MemoryReservation,
527+
prefetch_reservation: MemoryReservation,
522528
random_state: RandomState,
523529
partition_batch_size: usize,
524530
base_partition_budget_bytes: usize,
@@ -549,6 +555,7 @@ impl GraceHashJoinStream {
549555
join_metrics,
550556
context,
551557
reservation: Arc::new(Mutex::new(reservation)),
558+
prefetch_reservation: Arc::new(Mutex::new(prefetch_reservation)),
552559
random_state,
553560
partition_batch_size,
554561
adaptive_budget,
@@ -623,6 +630,7 @@ impl GraceHashJoinStream {
623630
right_fut: None,
624631
left_bytes,
625632
right_bytes,
633+
reservation: Arc::clone(&self.reservation),
626634
current_join_start: None,
627635
repartition_fut: None,
628636
prefetch: None,
@@ -638,6 +646,7 @@ impl GraceHashJoinStream {
638646
right_fut,
639647
left_bytes,
640648
right_bytes,
649+
reservation,
641650
current_join_start,
642651
repartition_fut,
643652
prefetch,
@@ -649,6 +658,7 @@ impl GraceHashJoinStream {
649658
*current_work = Some(work);
650659
*left_bytes.lock() = 0;
651660
*right_bytes.lock() = 0;
661+
*reservation = Arc::clone(&self.reservation);
652662
self.adaptive_budget.update_active_partitions(1);
653663
}
654664
None => {
@@ -670,6 +680,7 @@ impl GraceHashJoinStream {
670680
*right_fut = Some(pref.right_fut);
671681
*left_bytes = pref.left_bytes;
672682
*right_bytes = pref.right_bytes;
683+
*reservation = pref.reservation;
673684
} else {
674685
// Not the same work, keep prefetch for later
675686
*prefetch = Some(pref);
@@ -727,13 +738,13 @@ impl GraceHashJoinStream {
727738
*left_fut = Some(load_partition_async(
728739
Arc::clone(&self.spill_left),
729740
work.left.clone(),
730-
Arc::clone(&self.reservation),
741+
Arc::clone(reservation),
731742
Arc::clone(left_bytes),
732743
));
733744
*right_fut = Some(load_partition_async(
734745
Arc::clone(&self.spill_right),
735746
work.right.clone(),
736-
Arc::clone(&self.reservation),
747+
Arc::clone(reservation),
737748
Arc::clone(right_bytes),
738749
));
739750
} else if skip_load || force_compute_repartition {
@@ -978,7 +989,7 @@ impl GraceHashJoinStream {
978989
total
979990
};
980991
if bytes_to_free > 0 {
981-
let mut res = self.reservation.lock();
992+
let mut res = reservation.lock();
982993
res.shrink(bytes_to_free);
983994
}
984995
*left_fut = None;
@@ -1077,13 +1088,13 @@ impl GraceHashJoinStream {
10771088
*left_fut = Some(load_partition_async(
10781089
Arc::clone(&self.spill_left),
10791090
work.left.clone(),
1080-
Arc::clone(&self.reservation),
1091+
Arc::clone(reservation),
10811092
Arc::clone(left_bytes),
10821093
));
10831094
*right_fut = Some(load_partition_async(
10841095
Arc::clone(&self.spill_right),
10851096
work.right.clone(),
1086-
Arc::clone(&self.reservation),
1097+
Arc::clone(reservation),
10871098
Arc::clone(right_bytes),
10881099
));
10891100
continue;
@@ -1132,13 +1143,13 @@ impl GraceHashJoinStream {
11321143
let left_fut_pf = load_partition_async(
11331144
Arc::clone(&self.spill_left),
11341145
next_work.left.clone(),
1135-
Arc::clone(&self.reservation),
1146+
Arc::clone(&self.prefetch_reservation),
11361147
Arc::clone(&left_bytes_pf),
11371148
);
11381149
let right_fut_pf = load_partition_async(
11391150
Arc::clone(&self.spill_right),
11401151
next_work.right.clone(),
1141-
Arc::clone(&self.reservation),
1152+
Arc::clone(&self.prefetch_reservation),
11421153
Arc::clone(&right_bytes_pf),
11431154
);
11441155
debug!(
@@ -1153,6 +1164,7 @@ impl GraceHashJoinStream {
11531164
right_fut: right_fut_pf,
11541165
left_bytes: left_bytes_pf,
11551166
right_bytes: right_bytes_pf,
1167+
reservation: Arc::clone(&self.prefetch_reservation),
11561168
});
11571169
} else {
11581170
let key = (next_work.partition_id, next_work.pass);
@@ -1213,6 +1225,7 @@ impl GraceHashJoinStream {
12131225
*current_stream = None;
12141226
*current_work = None;
12151227
*last_prefetch_skip = None;
1228+
*reservation = Arc::clone(&self.reservation);
12161229
self.adaptive_budget.update_active_partitions(1);
12171230
continue;
12181231
}

0 commit comments

Comments
 (0)