From 06345c4f35d3d65a8e5f2ab54ab59f63a7df67d3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 25 Mar 2026 04:27:35 +0000 Subject: [PATCH 1/3] Initial plan From 4632ecf54126d67a833decbefd9fdde0b1067788 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 25 Mar 2026 04:41:58 +0000 Subject: [PATCH 2/3] Improve work_queue thread pool: correctness fixes, performance optimizations, and tests Correctness: - Catch all exception types (catch(...)) in worker threads, not just std::exception, preventing std::terminate on non-std exceptions - Use if constexpr to conditionally check callable validity, supporting callable types that are not contextually convertible to bool Performance: - Move notify_one() outside lock in submit_task to avoid waking a thread that immediately blocks on the still-held mutex - Remove unnecessary lock re-acquisition for end_task_notifier in worker - Use notify_one instead of notify_all for end_task_notifier since only the destructor waits on it Simplification: - Restructure worker loop: use std::move for single-task extraction instead of splicing into a temporary list - Clean up destructor: remove confusing while/break pattern, use a scoped block to release lock before join - Fix header comment to match filename (YgorThreadPool.h) - Replace unused include with needed Co-authored-by: hdclark <934858+hdclark@users.noreply.github.com> Agent-Logs-Url: https://github.com/hdclark/Ygor/sessions/99a8d908-5ba2-4cd4-9674-5fdc2c8d68ac --- src/YgorThreadPool.h | 86 ++++++++++------------- tests2/YgorThreadPool.cc | 144 ++++++++++++++++++++++++++++++++++++++ tests2/compile_and_run.sh | 1 + 3 files changed, 182 insertions(+), 49 deletions(-) create mode 100644 tests2/YgorThreadPool.cc diff --git a/src/YgorThreadPool.h b/src/YgorThreadPool.h index 5431561..a92c8ba 100644 --- a/src/YgorThreadPool.h +++ b/src/YgorThreadPool.h @@ -1,4 +1,4 @@ -//Thread_Pool.h. +//YgorThreadPool.h. #pragma once @@ -6,8 +6,8 @@ #include #include #include -#include #include +#include // Multi-threaded work queue for offloading processing tasks. @@ -33,7 +33,7 @@ class work_queue { std::unique_lock lock(this->queue_mutex); // Exercise the condition variables and mutexes, ensuring they are initialized by the implementation. - // This should efectively evaluate to a no-op, but also helps suppress false-positive warning messages in + // This should effectively evaluate to a no-op, but also helps suppress false-positive warning messages in // Valgrind's DRD tool, i.e., 'not a condition variable', and other tools. this->new_task_notifier.notify_all(); // No threads waiting, so nothing to notify. this->end_task_notifier.notify_all(); // No threads waiting, so nothing to notify. @@ -49,42 +49,40 @@ class work_queue { this->worker_threads.emplace_back( [this](){ // Continually check the queue and wait on the condition variable. - bool l_should_quit = false; - while(!l_should_quit){ + while(true){ std::unique_lock lock(this->queue_mutex); - while( !(l_should_quit = this->should_quit.load()) + while( !this->should_quit.load() && this->queue.empty() ){ // Waiting releases the lock, which allows for work to be submitted. // // Note: spurious notifications are OK, since the queue will be empty and the worker will return to // waiting on the condition variable. -// this->new_task_notifier.wait(lock); - this->new_task_notifier.wait_for(lock, std::chrono::seconds(2) ); // No notifiers, so no signal to receive. + this->new_task_notifier.wait_for(lock, std::chrono::seconds(2) ); + } + + if(this->queue.empty()){ + return; } // Assume ownership of only the first item in the queue (FIFO). - std::list l_queue; - if(!this->queue.empty()) l_queue.splice( std::end(l_queue), this->queue, std::begin(this->queue) ); - - //// Assume ownership of all available items in the queue. - //std::list l_queue; - //l_queue.swap( this->queue ); - - // Perform the work in FIFO order. + auto task = std::move(this->queue.front()); + this->queue.pop_front(); + lock.unlock(); - for(const auto &user_f : l_queue){ - try{ - if(user_f){ - user_f(); - } - }catch(const std::exception &){}; - - lock.lock(); - this->end_task_notifier.notify_all(); - lock.unlock(); - } + + // Perform the work. + try{ + if constexpr(std::is_constructible_v){ + if(task) task(); + }else{ + task(); + } + }catch(const std::exception &){} + catch(...){}; + + this->end_task_notifier.notify_one(); } } ); @@ -92,15 +90,13 @@ class work_queue { } void submit_task(T f){ - std::lock_guard lock(this->queue_mutex); - this->queue.push_back(std::move(f)); - - // Note: it's not strictly necessary to lock the mutex before notifying, but it's possible it could lead to a data - // race or use-after-free. If nothing else, locking suppresses warnings of a 'possible' data race in thread sanitizers. - // See discussion and some links at - // https://stackoverflow.com/questions/17101922/do-i-have-to-acquire-lock-before-calling-condition-variable-notify-one - // Also note that this can potentially lead to a performance downgrade; in practice, most pthread - // implementations will detect and mitigate the issue. + { + std::lock_guard lock(this->queue_mutex); + this->queue.push_back(std::move(f)); + } + // Note: notifying without the mutex held avoids the situation where a notified thread wakes and immediately + // blocks waiting for the mutex to be released. This is safe; condition_variable::notify_one does not require + // the associated mutex to be held. this->new_task_notifier.notify_one(); return; } @@ -119,12 +115,9 @@ class work_queue { // // We rely on a condition variable to signal when tasks are completed, but fallback on occasional polling in // case there are any races to avoid waiting forever. - bool l_should_quit = false; - while(!l_should_quit){ - + { std::unique_lock lock(this->queue_mutex); - while( !(l_should_quit = this->should_quit.load()) - && !this->queue.empty() ){ + while( !this->queue.empty() ){ // Waiting releases the lock while waiting, which still allows for outstanding work to be completed. // @@ -133,15 +126,10 @@ class work_queue { this->end_task_notifier.wait_for(lock, std::chrono::milliseconds(2000) ); } - if(!l_should_quit){ - this->should_quit.store(true); - this->new_task_notifier.notify_all(); // notify threads to wake up and 'notice' they need to terminate. - lock.unlock(); - for(auto &wt : this->worker_threads) wt.join(); - } - break; + this->should_quit.store(true); } + this->new_task_notifier.notify_all(); // notify threads to wake up and 'notice' they need to terminate. + for(auto &wt : this->worker_threads) wt.join(); } }; - diff --git a/tests2/YgorThreadPool.cc b/tests2/YgorThreadPool.cc new file mode 100644 index 0000000..179976b --- /dev/null +++ b/tests2/YgorThreadPool.cc @@ -0,0 +1,144 @@ + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "doctest/doctest.h" + + +TEST_CASE( "work_queue" ){ + + SUBCASE("basic task execution"){ + std::atomic counter{0}; + { + work_queue> pool(2); + pool.submit_task([&counter](){ ++counter; }); + pool.submit_task([&counter](){ ++counter; }); + pool.submit_task([&counter](){ ++counter; }); + } + REQUIRE(counter.load() == 3); + } + + SUBCASE("single worker sequential FIFO order"){ + std::vector results; + std::mutex results_mutex; + { + work_queue> pool(1); + for(int i = 0; i < 10; ++i){ + pool.submit_task([i, &results, &results_mutex](){ + std::lock_guard lock(results_mutex); + results.push_back(i); + }); + } + } + REQUIRE(results.size() == 10); + for(int i = 0; i < 10; ++i){ + REQUIRE(results[static_cast(i)] == i); + } + } + + SUBCASE("handles std::exception without crashing"){ + std::atomic counter{0}; + { + work_queue> pool(2); + pool.submit_task([](){ throw std::runtime_error("test"); }); + pool.submit_task([&counter](){ ++counter; }); + } + REQUIRE(counter.load() == 1); + } + + SUBCASE("handles non-std exceptions without crashing"){ + std::atomic counter{0}; + { + work_queue> pool(2); + pool.submit_task([](){ throw 42; }); + pool.submit_task([&counter](){ ++counter; }); + } + REQUIRE(counter.load() == 1); + } + + SUBCASE("clear_tasks removes pending tasks"){ + std::atomic counter{0}; + work_queue> pool(1); + + // Submit a blocking task to hold the single worker. + std::atomic blocker{true}; + std::atomic started{false}; + pool.submit_task([&blocker, &started](){ + started.store(true); + while(blocker.load()) std::this_thread::sleep_for(std::chrono::milliseconds(10)); + }); + + // Wait for the blocking task to start. + while(!started.load()) std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + // Submit tasks that should be clearable (worker is busy). + pool.submit_task([&counter](){ ++counter; }); + pool.submit_task([&counter](){ ++counter; }); + pool.submit_task([&counter](){ ++counter; }); + + auto cleared = pool.clear_tasks(); + REQUIRE(cleared.size() == 3); + + // Release the blocker so the destructor can join. + blocker.store(false); + } + + SUBCASE("many tasks with many workers"){ + const int N = 1000; + std::atomic counter{0}; + { + work_queue> pool(8); + for(int i = 0; i < N; ++i){ + pool.submit_task([&counter](){ ++counter; }); + } + } + REQUIRE(counter.load() == N); + } + + SUBCASE("handles empty function gracefully"){ + { + work_queue> pool(2); + std::function empty_fn; + pool.submit_task(empty_fn); + } + // Should not crash. + REQUIRE(true); + } + + SUBCASE("default constructor uses hardware concurrency"){ + std::atomic counter{0}; + { + work_queue> pool; + pool.submit_task([&counter](){ ++counter; }); + } + REQUIRE(counter.load() == 1); + } + + SUBCASE("zero workers defaults to hardware concurrency"){ + std::atomic counter{0}; + { + work_queue> pool(0); + pool.submit_task([&counter](){ ++counter; }); + } + REQUIRE(counter.load() == 1); + } + + SUBCASE("tasks submitted from within tasks"){ + std::atomic counter{0}; + { + work_queue> pool(2); + pool.submit_task([&pool, &counter](){ + ++counter; + pool.submit_task([&counter](){ ++counter; }); + }); + } + REQUIRE(counter.load() == 2); + } +} diff --git a/tests2/compile_and_run.sh b/tests2/compile_and_run.sh index c85e684..acf3e9a 100755 --- a/tests2/compile_and_run.sh +++ b/tests2/compile_and_run.sh @@ -60,6 +60,7 @@ g++ \ YgorStatsConditionalForests.cc \ YgorStatsStochasticForests.cc \ YgorString.cc \ + YgorThreadPool.cc \ YgorTime/*.cc \ \ -o run_tests \ From 785151dd5ceb994bcc909f7de9ad48deff881e40 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 25 Mar 2026 04:45:35 +0000 Subject: [PATCH 3/3] Remove leading blank line in test file per code review Co-authored-by: hdclark <934858+hdclark@users.noreply.github.com> Agent-Logs-Url: https://github.com/hdclark/Ygor/sessions/99a8d908-5ba2-4cd4-9674-5fdc2c8d68ac --- tests2/YgorThreadPool.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/tests2/YgorThreadPool.cc b/tests2/YgorThreadPool.cc index 179976b..7b803b7 100644 --- a/tests2/YgorThreadPool.cc +++ b/tests2/YgorThreadPool.cc @@ -1,4 +1,3 @@ - #include #include #include