diff --git a/include/concurrency/thread_pool.hpp b/include/concurrency/thread_pool.hpp index 6ecf881f..9972a219 100644 --- a/include/concurrency/thread_pool.hpp +++ b/include/concurrency/thread_pool.hpp @@ -127,6 +127,7 @@ class ThreadPool { std::queue work_queue_; std::mutex queue_mutex_; std::condition_variable condition_; + std::condition_variable drain_cv_; std::atomic shutdown_requested_{false}; }; diff --git a/src/concurrency/thread_pool.cpp b/src/concurrency/thread_pool.cpp index 51b7efcd..c97ebd02 100644 --- a/src/concurrency/thread_pool.cpp +++ b/src/concurrency/thread_pool.cpp @@ -54,7 +54,18 @@ void ThreadPool::shutdown() { shutdown_requested_.store(true); } - // Wake up all threads + // Wake workers so they process remaining items (drain phase). + condition_.notify_all(); + + // Explicitly wait for the queue to drain before signalling workers to exit. + // This makes the "all pending work completes before shutdown" guarantee + // explicit and by-design, not merely incidental. + { + std::unique_lock lock(queue_mutex_); + drain_cv_.wait(lock, [this]() { return work_queue_.empty(); }); + } + + // Queue is now empty — wake all workers so they see the exit condition. condition_.notify_all(); // Join all worker threads @@ -90,6 +101,11 @@ void ThreadPool::worker_loop() { } else { continue; // Spurious wakeup } + + // Notify drain waiters when the queue becomes empty after dequeueing. + if (work_queue_.empty()) { + drain_cv_.notify_all(); + } } // Execute work item outside the lock diff --git a/tests/unit/test_thread_pool.cpp b/tests/unit/test_thread_pool.cpp index ad96a804..ba968de7 100644 --- a/tests/unit/test_thread_pool.cpp +++ b/tests/unit/test_thread_pool.cpp @@ -202,6 +202,25 @@ TEST(ThreadPoolTest, ConcurrentSubmissions) { EXPECT_EQ(counter.load(), 100); } +// Test: GracefulShutdown drains queue by design, not incidentally. +// Submits bursts of work and then calls shutdown() immediately; all submitted +// work must be counted even though shutdown() races with the workers. +TEST(ThreadPoolTest, GracefulShutdownDrainsQueueExplicitly) { + ThreadPool pool(4); + std::atomic counter{0}; + + // Submit a burst of short-lived tasks before calling shutdown(). + for (int32_t i = 0; i < 20; ++i) { + pool.submit([&]() { counter.fetch_add(1); }); + } + + // shutdown() must not return until every submitted task has executed. + pool.shutdown(); + + EXPECT_EQ(counter.load(), 20); + EXPECT_TRUE(pool.is_shutting_down()); +} + // Test: Destructor calls shutdown TEST(ThreadPoolTest, DestructorShutdown) { std::atomic counter{0};