diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/configuration_manager.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/configuration_manager.hpp index f55289b..0ae12a8 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/configuration_manager.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/configuration_manager.hpp @@ -120,6 +120,12 @@ class ConfigurationManager { /// Key: node_name, Value: map of param_name -> Parameter mutable std::mutex defaults_mutex_; std::map> default_values_; + + /// Mutex to serialize all parameter operations. + /// SyncParametersClient internally spins param_node_ via spin_node_until_future_complete(), + /// which is not thread-safe. Concurrent HTTP requests would cause: + /// "Node '/_param_client_node' has already been added to an executor" + mutable std::mutex param_operations_mutex_; }; } // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/src/configuration_manager.cpp b/src/ros2_medkit_gateway/src/configuration_manager.cpp index a0332a4..0263580 100644 --- a/src/ros2_medkit_gateway/src/configuration_manager.cpp +++ b/src/ros2_medkit_gateway/src/configuration_manager.cpp @@ -59,6 +59,7 @@ std::shared_ptr ConfigurationManager::get_param_cl } ParameterResult ConfigurationManager::list_parameters(const std::string & node_name) { + std::lock_guard op_lock(param_operations_mutex_); ParameterResult result; RCLCPP_DEBUG(node_->get_logger(), "list_parameters called for node: '%s'", node_name.c_str()); @@ -138,6 +139,7 @@ ParameterResult ConfigurationManager::list_parameters(const std::string & node_n } ParameterResult ConfigurationManager::get_parameter(const std::string & node_name, const std::string & param_name) { + std::lock_guard op_lock(param_operations_mutex_); ParameterResult result; try { @@ -196,6 +198,7 @@ ParameterResult ConfigurationManager::get_parameter(const std::string & node_nam ParameterResult ConfigurationManager::set_parameter(const std::string & node_name, const std::string & param_name, const json & value) { + std::lock_guard op_lock(param_operations_mutex_); ParameterResult result; try { @@ -476,6 +479,7 @@ void ConfigurationManager::cache_default_values(const std::string & node_name) { } ParameterResult ConfigurationManager::reset_parameter(const std::string & node_name, const std::string & param_name) { + std::lock_guard op_lock(param_operations_mutex_); ParameterResult result; try { @@ -543,6 +547,7 @@ ParameterResult ConfigurationManager::reset_parameter(const std::string & node_n } ParameterResult ConfigurationManager::reset_all_parameters(const std::string & node_name) { + std::lock_guard op_lock(param_operations_mutex_); ParameterResult result; try { diff --git a/src/ros2_medkit_gateway/src/http/rest_server.cpp b/src/ros2_medkit_gateway/src/http/rest_server.cpp index 830ed2f..5e9c522 100644 --- a/src/ros2_medkit_gateway/src/http/rest_server.cpp +++ b/src/ros2_medkit_gateway/src/http/rest_server.cpp @@ -284,19 +284,21 @@ void RESTServer::setup_routes() { }); // App configurations - get specific - srv->Get((api_path("/apps") + R"(/([^/]+)/configurations/([^/]+)$)"), + // Use (.+) for config_id to accept slashes from percent-encoded URLs (%2F -> /) + // ROS2 parameters like qos_overrides./parameter_events.publisher.depth contain slashes + srv->Get((api_path("/apps") + R"(/([^/]+)/configurations/(.+)$)"), [this](const httplib::Request & req, httplib::Response & res) { config_handlers_->handle_get_configuration(req, res); }); // App configurations - set - srv->Put((api_path("/apps") + R"(/([^/]+)/configurations/([^/]+)$)"), + srv->Put((api_path("/apps") + R"(/([^/]+)/configurations/(.+)$)"), [this](const httplib::Request & req, httplib::Response & res) { config_handlers_->handle_set_configuration(req, res); }); // App configurations - delete single - srv->Delete((api_path("/apps") + R"(/([^/]+)/configurations/([^/]+)$)"), + srv->Delete((api_path("/apps") + R"(/([^/]+)/configurations/(.+)$)"), [this](const httplib::Request & req, httplib::Response & res) { config_handlers_->handle_delete_configuration(req, res); }); @@ -391,17 +393,17 @@ void RESTServer::setup_routes() { config_handlers_->handle_list_configurations(req, res); }); - srv->Get((api_path("/functions") + R"(/([^/]+)/configurations/([^/]+)$)"), + srv->Get((api_path("/functions") + R"(/([^/]+)/configurations/(.+)$)"), [this](const httplib::Request & req, httplib::Response & res) { config_handlers_->handle_get_configuration(req, res); }); - srv->Put((api_path("/functions") + R"(/([^/]+)/configurations/([^/]+)$)"), + srv->Put((api_path("/functions") + R"(/([^/]+)/configurations/(.+)$)"), [this](const httplib::Request & req, httplib::Response & res) { config_handlers_->handle_set_configuration(req, res); }); - srv->Delete((api_path("/functions") + R"(/([^/]+)/configurations/([^/]+)$)"), + srv->Delete((api_path("/functions") + R"(/([^/]+)/configurations/(.+)$)"), [this](const httplib::Request & req, httplib::Response & res) { config_handlers_->handle_delete_configuration(req, res); }); @@ -526,17 +528,17 @@ void RESTServer::setup_routes() { config_handlers_->handle_list_configurations(req, res); }); - srv->Get((api_path("/areas") + R"(/([^/]+)/configurations/([^/]+)$)"), + srv->Get((api_path("/areas") + R"(/([^/]+)/configurations/(.+)$)"), [this](const httplib::Request & req, httplib::Response & res) { config_handlers_->handle_get_configuration(req, res); }); - srv->Put((api_path("/areas") + R"(/([^/]+)/configurations/([^/]+)$)"), + srv->Put((api_path("/areas") + R"(/([^/]+)/configurations/(.+)$)"), [this](const httplib::Request & req, httplib::Response & res) { config_handlers_->handle_set_configuration(req, res); }); - srv->Delete((api_path("/areas") + R"(/([^/]+)/configurations/([^/]+)$)"), + srv->Delete((api_path("/areas") + R"(/([^/]+)/configurations/(.+)$)"), [this](const httplib::Request & req, httplib::Response & res) { config_handlers_->handle_delete_configuration(req, res); }); @@ -670,19 +672,19 @@ void RESTServer::setup_routes() { }); // Get specific configuration (parameter) - register before general route - srv->Get((api_path("/components") + R"(/([^/]+)/configurations/([^/]+)$)"), + srv->Get((api_path("/components") + R"(/([^/]+)/configurations/(.+)$)"), [this](const httplib::Request & req, httplib::Response & res) { config_handlers_->handle_get_configuration(req, res); }); // Set configuration (parameter) - srv->Put((api_path("/components") + R"(/([^/]+)/configurations/([^/]+)$)"), + srv->Put((api_path("/components") + R"(/([^/]+)/configurations/(.+)$)"), [this](const httplib::Request & req, httplib::Response & res) { config_handlers_->handle_set_configuration(req, res); }); // Delete (reset) single configuration to default value - srv->Delete((api_path("/components") + R"(/([^/]+)/configurations/([^/]+)$)"), + srv->Delete((api_path("/components") + R"(/([^/]+)/configurations/(.+)$)"), [this](const httplib::Request & req, httplib::Response & res) { config_handlers_->handle_delete_configuration(req, res); }); diff --git a/src/ros2_medkit_gateway/test/test_configuration_manager.cpp b/src/ros2_medkit_gateway/test/test_configuration_manager.cpp index d7f3e17..196b35c 100644 --- a/src/ros2_medkit_gateway/test/test_configuration_manager.cpp +++ b/src/ros2_medkit_gateway/test/test_configuration_manager.cpp @@ -345,6 +345,111 @@ TEST_F(TestConfigurationManager, test_concurrent_parameter_access) { EXPECT_GE(success_count.load(), 1); } +TEST_F(TestConfigurationManager, test_concurrent_parameter_operations_no_executor_error) { + // Regression test: concurrent parameter operations must not cause + // "Node has already been added to an executor" error. + // SyncParametersClient internally spins param_node_ - without proper + // serialization, concurrent calls would cause executor conflicts. + + node_->declare_parameter("concurrent_test_int", 0); + node_->declare_parameter("concurrent_test_str", std::string("init")); + + // Warm-up: ensure parameter service is discoverable before starting concurrent operations. + // This prevents flaky failures on slow CI due to service discovery timing. + auto warmup_result = config_manager_->list_parameters("/test_config_manager_node"); + ASSERT_TRUE(warmup_result.success) << "Warm-up failed: " << warmup_result.error_message; + + constexpr int kNumThreads = 10; + constexpr int kOpsPerThread = 5; + + std::vector threads; + std::atomic success_count{0}; + std::atomic exception_count{0}; + std::atomic start_flag{false}; + + // Spawn threads that will all start simultaneously + for (int i = 0; i < kNumThreads; ++i) { + threads.emplace_back([this, i, &success_count, &exception_count, &start_flag]() { + // Wait for all threads to be ready + while (!start_flag.load()) { + std::this_thread::yield(); + } + + for (int op = 0; op < kOpsPerThread; ++op) { + try { + // Mix different operations to stress test serialization + switch ((i + op) % 6) { + case 0: { + auto result = config_manager_->list_parameters("/test_config_manager_node"); + if (result.success) { + success_count++; + } + break; + } + case 1: { + auto result = config_manager_->get_parameter("/test_config_manager_node", "concurrent_test_int"); + if (result.success) { + success_count++; + } + break; + } + case 2: { + auto result = + config_manager_->set_parameter("/test_config_manager_node", "concurrent_test_int", nlohmann::json(i)); + if (result.success) { + success_count++; + } + break; + } + case 3: { + auto result = config_manager_->get_parameter("/test_config_manager_node", "concurrent_test_str"); + if (result.success) { + success_count++; + } + break; + } + case 4: { + auto result = config_manager_->reset_parameter("/test_config_manager_node", "concurrent_test_int"); + if (result.success) { + success_count++; + } + break; + } + case 5: { + auto result = config_manager_->reset_all_parameters("/test_config_manager_node"); + // reset_all_parameters returns success=false if some params are read-only, + // but data is still populated - count as success if no exception + if (result.success || result.data.contains("reset_count")) { + success_count++; + } + break; + } + } + } catch (const std::exception & e) { + // This should NOT happen - executor conflicts would throw here + exception_count++; + RCLCPP_ERROR(rclcpp::get_logger("test"), "Exception in concurrent test: %s", e.what()); + } + } + }); + } + + // Start all threads simultaneously + start_flag.store(true); + + for (auto & t : threads) { + t.join(); + } + + // No exceptions should have occurred (especially executor errors) + // This is the main assertion - if the mutex serialization is broken, + // we'd get "Node has already been added to an executor" exceptions here. + EXPECT_EQ(exception_count.load(), 0) << "Concurrent access caused exceptions (likely executor conflict)"; + + // All operations should succeed - we're operating on our own node which is always available. + EXPECT_EQ(success_count.load(), kNumThreads * kOpsPerThread); +} + int main(int argc, char ** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();