Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 37 additions & 49 deletions src/YgorThreadPool.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//Thread_Pool.h.
//YgorThreadPool.h.

#pragma once

#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>
#include <algorithm>
#include <list>
#include <type_traits>
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YgorThreadPool.h uses std::chrono::{nanoseconds,seconds,milliseconds} but does not include <chrono>. The header should be self-contained; relying on indirect includes from <condition_variable> is non-portable. Add #include <chrono> here.

Suggested change
#include <type_traits>
#include <type_traits>
#include <chrono>

Copilot uses AI. Check for mistakes.


// Multi-threaded work queue for offloading processing tasks.
Expand All @@ -33,7 +33,7 @@ class work_queue {
std::unique_lock<std::mutex> lock(this->queue_mutex);

// Exercise the condition variables and mutexes, ensuring they are initialized by the implementation.
// This should efectively evaluate to a no-op, but also helps suppress false-positive warning messages in
// This should effectively evaluate to a no-op, but also helps suppress false-positive warning messages in
// Valgrind's DRD tool, i.e., 'not a condition variable', and other tools.
this->new_task_notifier.notify_all(); // No threads waiting, so nothing to notify.
this->end_task_notifier.notify_all(); // No threads waiting, so nothing to notify.
Expand All @@ -49,58 +49,54 @@ class work_queue {
this->worker_threads.emplace_back(
[this](){
// Continually check the queue and wait on the condition variable.
bool l_should_quit = false;
while(!l_should_quit){
while(true){

std::unique_lock<std::mutex> lock(this->queue_mutex);
while( !(l_should_quit = this->should_quit.load())
while( !this->should_quit.load()
&& this->queue.empty() ){

// Waiting releases the lock, which allows for work to be submitted.
//
// Note: spurious notifications are OK, since the queue will be empty and the worker will return to
// waiting on the condition variable.
// this->new_task_notifier.wait(lock);
this->new_task_notifier.wait_for(lock, std::chrono::seconds(2) ); // No notifiers, so no signal to receive.
this->new_task_notifier.wait_for(lock, std::chrono::seconds(2) );
}

if(this->queue.empty()){
return;
}

// Assume ownership of only the first item in the queue (FIFO).
std::list<T> l_queue;
if(!this->queue.empty()) l_queue.splice( std::end(l_queue), this->queue, std::begin(this->queue) );

//// Assume ownership of all available items in the queue.
//std::list<T> l_queue;
//l_queue.swap( this->queue );

// Perform the work in FIFO order.
auto task = std::move(this->queue.front());
this->queue.pop_front();

lock.unlock();
for(const auto &user_f : l_queue){
try{
if(user_f){
user_f();
}
}catch(const std::exception &){};

lock.lock();
this->end_task_notifier.notify_all();
lock.unlock();
}

// Perform the work.
try{
if constexpr(std::is_constructible_v<bool, const T &>){
if(task) task();
}else{
task();
}
}catch(const std::exception &){}
catch(...){};

Comment on lines +82 to +84
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a stray null statement after the catch(...) block (catch(...){ };). It’s harmless but inconsistent with the preceding catch and makes the exception handling look accidental. Remove the extra ; (and consider formatting the two catch clauses consistently).

Suggested change
}catch(const std::exception &){}
catch(...){};
} catch (const std::exception &) {
} catch (...) {
}

Copilot uses AI. Check for mistakes.
this->end_task_notifier.notify_one();
}
}
);
}
}

void submit_task(T f){
std::lock_guard<std::mutex> lock(this->queue_mutex);
this->queue.push_back(std::move(f));

// Note: it's not strictly necessary to lock the mutex before notifying, but it's possible it could lead to a data
// race or use-after-free. If nothing else, locking suppresses warnings of a 'possible' data race in thread sanitizers.
// See discussion and some links at
// https://stackoverflow.com/questions/17101922/do-i-have-to-acquire-lock-before-calling-condition-variable-notify-one
// Also note that this can potentially lead to a performance downgrade; in practice, most pthread
// implementations will detect and mitigate the issue.
{
std::lock_guard<std::mutex> lock(this->queue_mutex);
this->queue.push_back(std::move(f));
}
// Note: notifying without the mutex held avoids the situation where a notified thread wakes and immediately
// blocks waiting for the mutex to be released. This is safe; condition_variable::notify_one does not require
// the associated mutex to be held.
this->new_task_notifier.notify_one();
return;
}
Expand All @@ -119,12 +115,9 @@ class work_queue {
//
// We rely on a condition variable to signal when tasks are completed, but fallback on occasional polling in
// case there are any races to avoid waiting forever.
bool l_should_quit = false;
while(!l_should_quit){

{
std::unique_lock<std::mutex> lock(this->queue_mutex);
while( !(l_should_quit = this->should_quit.load())
&& !this->queue.empty() ){
while( !this->queue.empty() ){

// Waiting releases the lock while waiting, which still allows for outstanding work to be completed.
//
Expand All @@ -133,15 +126,10 @@ class work_queue {
this->end_task_notifier.wait_for(lock, std::chrono::milliseconds(2000) );
}

if(!l_should_quit){
this->should_quit.store(true);
this->new_task_notifier.notify_all(); // notify threads to wake up and 'notice' they need to terminate.
lock.unlock();
for(auto &wt : this->worker_threads) wt.join();
}
break;
this->should_quit.store(true);
}
this->new_task_notifier.notify_all(); // notify threads to wake up and 'notice' they need to terminate.
for(auto &wt : this->worker_threads) wt.join();
}
};


143 changes: 143 additions & 0 deletions tests2/YgorThreadPool.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#include <atomic>
#include <chrono>
#include <functional>
#include <stdexcept>
#include <thread>
#include <vector>
#include <mutex>

#include <YgorThreadPool.h>

#include "doctest/doctest.h"


TEST_CASE( "work_queue" ){

SUBCASE("basic task execution"){
std::atomic<int> counter{0};
{
work_queue<std::function<void()>> pool(2);
pool.submit_task([&counter](){ ++counter; });
pool.submit_task([&counter](){ ++counter; });
pool.submit_task([&counter](){ ++counter; });
}
REQUIRE(counter.load() == 3);
}

SUBCASE("single worker sequential FIFO order"){
std::vector<int> results;
std::mutex results_mutex;
{
work_queue<std::function<void()>> pool(1);
for(int i = 0; i < 10; ++i){
pool.submit_task([i, &results, &results_mutex](){
std::lock_guard<std::mutex> lock(results_mutex);
results.push_back(i);
});
}
}
REQUIRE(results.size() == 10);
for(int i = 0; i < 10; ++i){
REQUIRE(results[static_cast<size_t>(i)] == i);
}
}

SUBCASE("handles std::exception without crashing"){
std::atomic<int> counter{0};
{
work_queue<std::function<void()>> pool(2);
pool.submit_task([](){ throw std::runtime_error("test"); });
pool.submit_task([&counter](){ ++counter; });
}
REQUIRE(counter.load() == 1);
}

SUBCASE("handles non-std exceptions without crashing"){
std::atomic<int> counter{0};
{
work_queue<std::function<void()>> pool(2);
pool.submit_task([](){ throw 42; });
pool.submit_task([&counter](){ ++counter; });
}
REQUIRE(counter.load() == 1);
}

SUBCASE("clear_tasks removes pending tasks"){
std::atomic<int> counter{0};
work_queue<std::function<void()>> pool(1);

// Submit a blocking task to hold the single worker.
std::atomic<bool> blocker{true};
std::atomic<bool> started{false};
pool.submit_task([&blocker, &started](){
started.store(true);
while(blocker.load()) std::this_thread::sleep_for(std::chrono::milliseconds(10));
});

// Wait for the blocking task to start.
while(!started.load()) std::this_thread::sleep_for(std::chrono::milliseconds(1));
Comment on lines +74 to +78
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clear_tasks subcase uses unbounded busy-wait loops (while(blocker.load()) ... and while(!started.load()) ...). If something regresses (e.g., worker never starts), the test can hang indefinitely and stall CI. Add a timeout/deadline (failing the test if exceeded) or use a condition_variable to wait for started/blocker transitions.

Suggested change
while(blocker.load()) std::this_thread::sleep_for(std::chrono::milliseconds(10));
});
// Wait for the blocking task to start.
while(!started.load()) std::this_thread::sleep_for(std::chrono::milliseconds(1));
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
while (blocker.load() && std::chrono::steady_clock::now() < deadline) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
});
// Wait for the blocking task to start (with timeout to avoid hanging).
auto start_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
while (!started.load() && std::chrono::steady_clock::now() < start_deadline) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
REQUIRE(started.load());

Copilot uses AI. Check for mistakes.

// Submit tasks that should be clearable (worker is busy).
pool.submit_task([&counter](){ ++counter; });
pool.submit_task([&counter](){ ++counter; });
pool.submit_task([&counter](){ ++counter; });

auto cleared = pool.clear_tasks();
REQUIRE(cleared.size() == 3);

// Release the blocker so the destructor can join.
blocker.store(false);
}

SUBCASE("many tasks with many workers"){
const int N = 1000;
std::atomic<int> counter{0};
{
work_queue<std::function<void()>> pool(8);
for(int i = 0; i < N; ++i){
pool.submit_task([&counter](){ ++counter; });
}
}
REQUIRE(counter.load() == N);
}

SUBCASE("handles empty function gracefully"){
{
work_queue<std::function<void()>> pool(2);
std::function<void()> empty_fn;
pool.submit_task(empty_fn);
}
// Should not crash.
REQUIRE(true);
}

SUBCASE("default constructor uses hardware concurrency"){
std::atomic<int> counter{0};
{
work_queue<std::function<void()>> pool;
pool.submit_task([&counter](){ ++counter; });
}
REQUIRE(counter.load() == 1);
}

SUBCASE("zero workers defaults to hardware concurrency"){
std::atomic<int> counter{0};
{
work_queue<std::function<void()>> pool(0);
pool.submit_task([&counter](){ ++counter; });
}
REQUIRE(counter.load() == 1);
}

SUBCASE("tasks submitted from within tasks"){
std::atomic<int> counter{0};
{
work_queue<std::function<void()>> pool(2);
pool.submit_task([&pool, &counter](){
++counter;
pool.submit_task([&counter](){ ++counter; });
});
}
REQUIRE(counter.load() == 2);
}
}
1 change: 1 addition & 0 deletions tests2/compile_and_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ g++ \
YgorStatsConditionalForests.cc \
YgorStatsStochasticForests.cc \
YgorString.cc \
YgorThreadPool.cc \
YgorTime/*.cc \
\
-o run_tests \
Expand Down
Loading