From 9953e3b7882843ed9f26b31b8c667ff5629f3f37 Mon Sep 17 00:00:00 2001 From: Micah Villmow <4211002+mvillmow@users.noreply.github.com> Date: Sat, 25 Apr 2026 17:42:55 -0700 Subject: [PATCH] fix(concurrency): drain queue by design before shutdown flag signals worker exit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The shutdown() method previously set shutdown_requested_ immediately, relying on the workers' exit-condition check (queue.empty()) to drain remaining items — correct but incidental. This commit makes the drain guarantee explicit and by-design: * Add drain_cv_ condition variable to ThreadPool. * Workers notify drain_cv_ under the queue lock immediately after dequeueing the last item. * shutdown() waits on drain_cv_ until the queue is empty before issuing the final condition_.notify_all() that causes workers to exit. * Add GracefulShutdownDrainsQueueExplicitly test that submits a burst of 20 tasks and asserts all complete after shutdown(), with no sleep(). Closes #322. Co-Authored-By: Claude Sonnet 4.6 --- include/concurrency/thread_pool.hpp | 1 + src/concurrency/thread_pool.cpp | 18 +++++++++++++++++- tests/unit/test_thread_pool.cpp | 19 +++++++++++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/include/concurrency/thread_pool.hpp b/include/concurrency/thread_pool.hpp index 6ecf881f..9972a219 100644 --- a/include/concurrency/thread_pool.hpp +++ b/include/concurrency/thread_pool.hpp @@ -127,6 +127,7 @@ class ThreadPool { std::queue work_queue_; std::mutex queue_mutex_; std::condition_variable condition_; + std::condition_variable drain_cv_; std::atomic shutdown_requested_{false}; }; diff --git a/src/concurrency/thread_pool.cpp b/src/concurrency/thread_pool.cpp index 51b7efcd..c97ebd02 100644 --- a/src/concurrency/thread_pool.cpp +++ b/src/concurrency/thread_pool.cpp @@ -54,7 +54,18 @@ void ThreadPool::shutdown() { shutdown_requested_.store(true); } - // Wake up all threads + // Wake workers so they process remaining items (drain phase). + condition_.notify_all(); + + // Explicitly wait for the queue to drain before signalling workers to exit. + // This makes the "all pending work completes before shutdown" guarantee + // explicit and by-design, not merely incidental. + { + std::unique_lock lock(queue_mutex_); + drain_cv_.wait(lock, [this]() { return work_queue_.empty(); }); + } + + // Queue is now empty — wake all workers so they see the exit condition. condition_.notify_all(); // Join all worker threads @@ -90,6 +101,11 @@ void ThreadPool::worker_loop() { } else { continue; // Spurious wakeup } + + // Notify drain waiters when the queue becomes empty after dequeueing. + if (work_queue_.empty()) { + drain_cv_.notify_all(); + } } // Execute work item outside the lock diff --git a/tests/unit/test_thread_pool.cpp b/tests/unit/test_thread_pool.cpp index ad96a804..ba968de7 100644 --- a/tests/unit/test_thread_pool.cpp +++ b/tests/unit/test_thread_pool.cpp @@ -202,6 +202,25 @@ TEST(ThreadPoolTest, ConcurrentSubmissions) { EXPECT_EQ(counter.load(), 100); } +// Test: GracefulShutdown drains queue by design, not incidentally. +// Submits bursts of work and then calls shutdown() immediately; all submitted +// work must be counted even though shutdown() races with the workers. +TEST(ThreadPoolTest, GracefulShutdownDrainsQueueExplicitly) { + ThreadPool pool(4); + std::atomic counter{0}; + + // Submit a burst of short-lived tasks before calling shutdown(). + for (int32_t i = 0; i < 20; ++i) { + pool.submit([&]() { counter.fetch_add(1); }); + } + + // shutdown() must not return until every submitted task has executed. + pool.shutdown(); + + EXPECT_EQ(counter.load(), 20); + EXPECT_TRUE(pool.is_shutting_down()); +} + // Test: Destructor calls shutdown TEST(ThreadPoolTest, DestructorShutdown) { std::atomic counter{0};