Skip to content

Commit 46e828e

Browse files
authored
Replace io_service with io_context for boost >=1.87 compatibility (dials#2997)
Also add some unit tests of ThreadPool
1 parent 1c9cf61 commit 46e828e

6 files changed

Lines changed: 192 additions & 8 deletions

File tree

newsfragments/2997.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Stop using a removed boost::asio feature

src/dials/util/thread_pool.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,19 @@ namespace dials { namespace util {
2727
* Instantiate with the number of required threads
2828
* @param N The number of threads
2929
*/
30-
ThreadPool(std::size_t N) : work_(io_service_), started_(0), finished_(0) {
30+
ThreadPool(std::size_t N)
31+
: work_(boost::asio::make_work_guard(io_context_)), started_(0), finished_(0) {
3132
for (std::size_t i = 0; i < N; ++i) {
3233
threads_.create_thread(
33-
boost::bind(&boost::asio::io_service::run, &io_service_));
34+
boost::bind(&boost::asio::io_context::run, &io_context_));
3435
}
3536
}
3637

3738
/**
3839
* Destroy the thread pool and join all threads
3940
*/
4041
~ThreadPool() {
41-
io_service_.stop();
42+
io_context_.stop();
4243
try {
4344
threads_.join_all();
4445
} catch (const std::exception &) {
@@ -53,7 +54,7 @@ namespace dials { namespace util {
5354
template <typename Function>
5455
void post(Function function) {
5556
started_++;
56-
io_service_.post(FunctionRunner<Function>(function, finished_));
57+
boost::asio::post(io_context_, FunctionRunner<Function>(function, finished_));
5758
}
5859

5960
/**
@@ -92,8 +93,8 @@ namespace dials { namespace util {
9293
std::atomic<std::size_t> &counter_;
9394
};
9495

95-
boost::asio::io_service io_service_;
96-
boost::asio::io_service::work work_;
96+
boost::asio::io_context io_context_;
97+
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_;
9798
boost::thread_group threads_;
9899
std::size_t started_;
99100
std::atomic<std::size_t> finished_;

tests/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,7 @@ target_link_libraries( tst_reeke_model PUBLIC Boost::boost CCTBX::scitbx )
88
add_executable( tst_collision_detection algorithms/spatial_indexing/tst_collision_detection.cc )
99
set_target_properties( tst_collision_detection PROPERTIES RUNTIME_OUTPUT_DIRECTORY "algorithms/spatial_indexing")
1010
target_link_libraries( tst_collision_detection PUBLIC Boost::boost CCTBX::scitbx )
11+
12+
add_executable( tst_thread_pool util/tst_thread_pool.cc )
13+
set_target_properties( tst_thread_pool PROPERTIES RUNTIME_OUTPUT_DIRECTORY "util")
14+
target_link_libraries( tst_thread_pool PUBLIC Boost::thread CCTBX::scitbx )

tests/SConscript

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
Import("env")
22

3-
env.Replace(LIBS=[])
4-
53
env.Program(
64
target="algorithms/spot_prediction/tst_reeke_model",
75
source="algorithms/spot_prediction/tst_reeke_model.cc",
@@ -10,3 +8,8 @@ env.Program(
108
target="algorithms/spatial_indexing/tst_collision_detection",
119
source="algorithms/spatial_indexing/tst_collision_detection.cc",
1210
)
11+
env.Program(
12+
target="util/tst_thread_pool",
13+
source="util/tst_thread_pool.cc",
14+
LIBS=env["LIBS"],
15+
)

tests/test_cpp_components.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
# Paths are under /build/
1010
"tests/algorithms/spatial_indexing/tst_collision_detection",
1111
"tests/algorithms/spot_prediction/tst_reeke_model",
12+
"tests/util/tst_thread_pool",
1213
]
1314

1415

tests/util/tst_thread_pool.cc

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
#include <cassert>
2+
#include <vector>
3+
#include <iostream>
4+
#include <atomic>
5+
#include <dials/util/thread_pool.h>
6+
#include <cstdlib>
7+
#include <cstdio>
8+
9+
#ifndef _WIN32
10+
#include <sys/wait.h>
11+
#include <unistd.h>
12+
#endif
13+
14+
using dials::util::ThreadPool;
15+
16+
void tst_basic_functionality() {
17+
ThreadPool pool(4);
18+
std::atomic<int> counter(0);
19+
20+
// Post some simple tasks
21+
for (int i = 0; i < 100; ++i) {
22+
pool.post([&counter]() { counter++; });
23+
}
24+
25+
// Wait for all tasks to complete
26+
pool.wait();
27+
28+
// Check that all tasks executed
29+
assert(counter == 100);
30+
31+
// Test passed
32+
std::cout << "OK" << std::endl;
33+
}
34+
35+
void tst_concurrent_execution() {
36+
ThreadPool pool(4);
37+
std::atomic<int> concurrent_count(0);
38+
std::atomic<int> max_concurrent(0);
39+
std::atomic<int> completed(0);
40+
41+
// Post tasks that simulate work and track concurrency
42+
for (int i = 0; i < 20; ++i) {
43+
pool.post([&]() {
44+
int current = ++concurrent_count;
45+
46+
// Update max concurrent if needed
47+
int expected = max_concurrent.load();
48+
while (expected < current
49+
&& !max_concurrent.compare_exchange_weak(expected, current)) {
50+
// Keep trying
51+
}
52+
53+
// Simulate some work
54+
for (volatile int j = 0; j < 10000; ++j) {
55+
}
56+
57+
--concurrent_count;
58+
++completed;
59+
});
60+
}
61+
62+
pool.wait();
63+
64+
// Should have completed all tasks
65+
assert(completed == 20);
66+
// Should have achieved some concurrency (more than 1 thread working)
67+
assert(max_concurrent > 1);
68+
// No tasks should be running now
69+
assert(concurrent_count == 0);
70+
71+
// Test passed
72+
std::cout << "OK" << std::endl;
73+
}
74+
75+
void tst_multiple_post_wait_cycles() {
76+
ThreadPool pool(2);
77+
std::atomic<int> total_executed(0);
78+
79+
// Do multiple cycles of post/wait
80+
for (int cycle = 0; cycle < 5; ++cycle) {
81+
std::atomic<int> cycle_count(0);
82+
83+
// Post tasks for this cycle
84+
for (int i = 0; i < 10; ++i) {
85+
pool.post([&cycle_count, &total_executed]() {
86+
cycle_count++;
87+
total_executed++;
88+
});
89+
}
90+
91+
// Wait for this cycle to complete
92+
pool.wait();
93+
94+
// Check this cycle completed
95+
assert(cycle_count == 10);
96+
}
97+
98+
// Check total
99+
assert(total_executed == 50);
100+
101+
// Test passed
102+
std::cout << "OK" << std::endl;
103+
}
104+
105+
void tst_exception_handling() {
106+
// Just document the current behavior: ThreadPool does not attempt to handle
107+
// exceptions, instead it just crashes.
108+
109+
#ifdef _WIN32
110+
std::cout << "Exception handling test skipped on Windows" << std::endl;
111+
std::cout << "Note: Exceptions thrown in thread pool tasks will terminate the program"
112+
<< std::endl;
113+
std::cout << "OK" << std::endl;
114+
#else
115+
// Fork a child process to test exception behavior
116+
pid_t pid = fork();
117+
if (pid == 0) {
118+
// Child process - this should crash
119+
// Don't print stderr
120+
freopen("/dev/null", "w", stderr);
121+
ThreadPool pool(1);
122+
pool.post([]() { throw std::runtime_error("Raise an exception"); });
123+
pool.wait();
124+
exit(0); // Should never reach here
125+
} else {
126+
// Parent process - wait for child to crash
127+
int status;
128+
waitpid(pid, &status, 0);
129+
assert(!WIFEXITED(status) || WEXITSTATUS(status) != 0); // Should not exit normally
130+
std::cout << "OK" << std::endl;
131+
}
132+
#endif
133+
}
134+
135+
void tst_single_thread_pool() {
136+
ThreadPool pool(1);
137+
std::vector<int> execution_order;
138+
std::atomic<int> counter(0);
139+
140+
// Post tasks that should execute in order (single thread)
141+
for (int i = 0; i < 10; ++i) {
142+
pool.post([&execution_order, &counter, i]() {
143+
// Simple work simulation
144+
for (volatile int j = 0; j < 1000; ++j) {
145+
}
146+
execution_order.push_back(i);
147+
counter++;
148+
});
149+
}
150+
151+
pool.wait();
152+
153+
// All tasks should have executed
154+
assert(counter == 10);
155+
assert(execution_order.size() == 10);
156+
157+
// With single thread, should execute in order
158+
for (int i = 0; i < 10; ++i) {
159+
assert(execution_order[i] == i);
160+
}
161+
162+
// Test passed
163+
std::cout << "OK" << std::endl;
164+
}
165+
166+
int main(int argc, char const *argv[]) {
167+
tst_basic_functionality();
168+
tst_concurrent_execution();
169+
tst_multiple_post_wait_cycles();
170+
tst_exception_handling();
171+
tst_single_thread_pool();
172+
173+
return 0;
174+
}

0 commit comments

Comments
 (0)