Skip to content

Commit 709de51

Browse files
committed
fix(gateway): #153 serialize parameter operations to prevent executor conflicts
SyncParametersClient internally spins param_node_ via spin_node_until_future_complete(), which is not thread-safe. Concurrent HTTP requests would cause: Node /_param_client_node has already been added to an executor Add recursive_mutex to serialize all parameter operations (list, get, set, reset). Using recursive_mutex because list_parameters() calls cache_default_values() internally.
1 parent 9310af2 commit 709de51

3 files changed

Lines changed: 96 additions & 0 deletions

File tree

src/ros2_medkit_gateway/include/ros2_medkit_gateway/configuration_manager.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,13 @@ class ConfigurationManager {
120120
/// Key: node_name, Value: map of param_name -> Parameter
121121
mutable std::mutex defaults_mutex_;
122122
std::map<std::string, std::map<std::string, rclcpp::Parameter>> default_values_;
123+
124+
/// Mutex to serialize all parameter operations.
125+
/// SyncParametersClient internally spins param_node_ via spin_node_until_future_complete(),
126+
/// which is not thread-safe. Concurrent HTTP requests would cause:
127+
/// "Node '/_param_client_node' has already been added to an executor"
128+
/// Using recursive_mutex because list_parameters() calls cache_default_values() internally.
129+
mutable std::recursive_mutex param_operations_mutex_;
123130
};
124131

125132
} // namespace ros2_medkit_gateway

src/ros2_medkit_gateway/src/configuration_manager.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ std::shared_ptr<rclcpp::SyncParametersClient> ConfigurationManager::get_param_cl
5959
}
6060

6161
ParameterResult ConfigurationManager::list_parameters(const std::string & node_name) {
62+
std::lock_guard<std::recursive_mutex> op_lock(param_operations_mutex_);
6263
ParameterResult result;
6364

6465
RCLCPP_DEBUG(node_->get_logger(), "list_parameters called for node: '%s'", node_name.c_str());
@@ -138,6 +139,7 @@ ParameterResult ConfigurationManager::list_parameters(const std::string & node_n
138139
}
139140

140141
ParameterResult ConfigurationManager::get_parameter(const std::string & node_name, const std::string & param_name) {
142+
std::lock_guard<std::recursive_mutex> op_lock(param_operations_mutex_);
141143
ParameterResult result;
142144

143145
try {
@@ -196,6 +198,7 @@ ParameterResult ConfigurationManager::get_parameter(const std::string & node_nam
196198

197199
ParameterResult ConfigurationManager::set_parameter(const std::string & node_name, const std::string & param_name,
198200
const json & value) {
201+
std::lock_guard<std::recursive_mutex> op_lock(param_operations_mutex_);
199202
ParameterResult result;
200203

201204
try {
@@ -476,6 +479,7 @@ void ConfigurationManager::cache_default_values(const std::string & node_name) {
476479
}
477480

478481
ParameterResult ConfigurationManager::reset_parameter(const std::string & node_name, const std::string & param_name) {
482+
std::lock_guard<std::recursive_mutex> op_lock(param_operations_mutex_);
479483
ParameterResult result;
480484

481485
try {
@@ -543,6 +547,7 @@ ParameterResult ConfigurationManager::reset_parameter(const std::string & node_n
543547
}
544548

545549
ParameterResult ConfigurationManager::reset_all_parameters(const std::string & node_name) {
550+
std::lock_guard<std::recursive_mutex> op_lock(param_operations_mutex_);
546551
ParameterResult result;
547552

548553
try {

src/ros2_medkit_gateway/test/test_configuration_manager.cpp

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,90 @@ TEST_F(TestConfigurationManager, test_concurrent_parameter_access) {
345345
EXPECT_GE(success_count.load(), 1);
346346
}
347347

348+
TEST_F(TestConfigurationManager, test_concurrent_parameter_operations_no_executor_error) {
349+
// Regression test: concurrent parameter operations must not cause
350+
// "Node has already been added to an executor" error.
351+
// SyncParametersClient internally spins param_node_ - without proper
352+
// serialization, concurrent calls would cause executor conflicts.
353+
354+
node_->declare_parameter("concurrent_test_int", 0);
355+
node_->declare_parameter("concurrent_test_str", std::string("init"));
356+
357+
constexpr int kNumThreads = 10;
358+
constexpr int kOpsPerThread = 5;
359+
360+
std::vector<std::thread> threads;
361+
std::atomic<int> success_count{0};
362+
std::atomic<int> exception_count{0};
363+
std::atomic<bool> start_flag{false};
364+
365+
// Spawn threads that will all start simultaneously
366+
for (int i = 0; i < kNumThreads; ++i) {
367+
threads.emplace_back([this, i, &success_count, &exception_count, &start_flag]() {
368+
// Wait for all threads to be ready
369+
while (!start_flag.load()) {
370+
std::this_thread::yield();
371+
}
372+
373+
for (int op = 0; op < kOpsPerThread; ++op) {
374+
try {
375+
// Mix different operations to stress test serialization
376+
switch ((i + op) % 4) {
377+
case 0: {
378+
auto result = config_manager_->list_parameters("/test_config_manager_node");
379+
if (result.success) {
380+
success_count++;
381+
}
382+
break;
383+
}
384+
case 1: {
385+
auto result = config_manager_->get_parameter("/test_config_manager_node", "concurrent_test_int");
386+
if (result.success) {
387+
success_count++;
388+
}
389+
break;
390+
}
391+
case 2: {
392+
auto result =
393+
config_manager_->set_parameter("/test_config_manager_node", "concurrent_test_int", nlohmann::json(i));
394+
if (result.success) {
395+
success_count++;
396+
}
397+
break;
398+
}
399+
case 3: {
400+
auto result = config_manager_->get_parameter("/test_config_manager_node", "concurrent_test_str");
401+
if (result.success) {
402+
success_count++;
403+
}
404+
break;
405+
}
406+
}
407+
} catch (const std::exception & e) {
408+
// This should NOT happen - executor conflicts would throw here
409+
exception_count++;
410+
RCLCPP_ERROR(rclcpp::get_logger("test"), "Exception in concurrent test: %s", e.what());
411+
}
412+
}
413+
});
414+
}
415+
416+
// Start all threads simultaneously
417+
start_flag.store(true);
418+
419+
for (auto & t : threads) {
420+
t.join();
421+
}
422+
423+
// No exceptions should have occurred (especially executor errors)
424+
// This is the main assertion - if the mutex serialization is broken,
425+
// we'd get "Node has already been added to an executor" exceptions here.
426+
EXPECT_EQ(exception_count.load(), 0) << "Concurrent access caused exceptions (likely executor conflict)";
427+
428+
// All operations should succeed - we're operating on our own node which is always available.
429+
EXPECT_EQ(success_count.load(), kNumThreads * kOpsPerThread);
430+
}
431+
348432
int main(int argc, char ** argv) {
349433
::testing::InitGoogleTest(&argc, argv);
350434
return RUN_ALL_TESTS();

0 commit comments

Comments
 (0)