Skip to content

Commit e4a9bc2

Browse files
davidlattimoreNthTensor
authored andcommitted
fix: Avoid deadlock on downsizing of threadpool
1 parent 2b52a16 commit e4a9bc2

File tree

1 file changed

+18
-8
lines changed

1 file changed

+18
-8
lines changed

src/thread_pool.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -440,25 +440,31 @@ impl ThreadPool {
440440
}
441441
// The size decreased
442442
cmp::Ordering::Less => {
443+
let mut heartbeat_handle = None;
444+
443445
// Halt the heartbeat thread when scaling to zero.
444-
if let Some(control) = state.managed_threads.heartbeat.take() {
446+
if new_size == 0
447+
&& let Some(control) = state.managed_threads.heartbeat.take()
448+
{
445449
control.halt.store(true, Ordering::Relaxed);
446-
let _ = control.handle.join();
450+
heartbeat_handle = Some(control.handle);
447451
}
448452

449453
// Pull the workers we intend to halt out of the thread manager.
450454
let terminating_workers = state.managed_threads.workers.split_off(new_size);
451455

452-
drop(state);
453-
454456
// Terminate the workers.
455457
for worker in &terminating_workers {
456458
// Tell the worker to halt.
457459
worker.control.halt.store(true, Ordering::Relaxed);
458460
}
459461

460462
// Wake any sleeping workers to ensure they will eventually see the termination notice.
461-
// self.job_is_ready.notify_all();
463+
for seat in &state.seats {
464+
seat.data.sleep_controller.wake();
465+
}
466+
467+
drop(state);
462468

463469
let own_lease = Worker::map_current(|worker| worker.lease.index);
464470

@@ -470,6 +476,10 @@ impl ThreadPool {
470476
let _ = worker.control.handle.join();
471477
}
472478
}
479+
480+
if let Some(handle) = heartbeat_handle {
481+
let _ = handle.join();
482+
}
473483
}
474484
}
475485

@@ -906,7 +916,7 @@ impl Worker {
906916
/// Cooperatively yields execution to the threadpool, allowing it to execute
907917
/// some work.
908918
///
909-
/// Tis function may execute either local or shared work: work already
919+
/// This function may execute either local or shared work: work already
910920
/// queued on the worker, or work off-loaded by a different worker. If there
911921
/// is no work on the pool, this will lock the thread-pool mutex, so it
912922
/// should not be called within a hot loop. Consider using
@@ -1134,7 +1144,7 @@ impl Worker {
11341144
// the queue must point to `stack_job`, implying that
11351145
// `stack_job` cannot have been executed yet.
11361146
let a = unsafe { stack_job.unwrap() };
1137-
// Execute the closure directly and return the results. This is
1147+
// Execute the closure directly and return the results. This
11381148
// allows the compiler to inline and optimize `a`.
11391149
let result_a = a(self);
11401150
return (result_a, result_b);
@@ -1553,7 +1563,7 @@ fn heartbeat_loop(thread_pool: &'static ThreadPool, halt: Arc<AtomicBool>) {
15531563
queued_to_heartbeat = (seat_index + 1) % num_seats;
15541564
}
15551565

1556-
// Count every occupied slot, even if we didn't sent them a heartbeat.
1566+
// Count every occupied slot, even if we didn't send them a heartbeat.
15571567
num_occupied += 1;
15581568
}
15591569
}

0 commit comments

Comments
 (0)