Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ class ConfigurationManager {
/// Key: node_name, Value: map of param_name -> Parameter
mutable std::mutex defaults_mutex_;
std::map<std::string, std::map<std::string, rclcpp::Parameter>> default_values_;

/// Mutex to serialize all parameter operations.
/// SyncParametersClient internally spins param_node_ via spin_node_until_future_complete(),
/// which is not thread-safe. Concurrent HTTP requests would cause:
/// "Node '/_param_client_node' has already been added to an executor"
mutable std::mutex param_operations_mutex_;
};

} // namespace ros2_medkit_gateway
5 changes: 5 additions & 0 deletions src/ros2_medkit_gateway/src/configuration_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ std::shared_ptr<rclcpp::SyncParametersClient> ConfigurationManager::get_param_cl
}

ParameterResult ConfigurationManager::list_parameters(const std::string & node_name) {
std::lock_guard<std::mutex> op_lock(param_operations_mutex_);
ParameterResult result;

RCLCPP_DEBUG(node_->get_logger(), "list_parameters called for node: '%s'", node_name.c_str());
Expand Down Expand Up @@ -138,6 +139,7 @@ ParameterResult ConfigurationManager::list_parameters(const std::string & node_n
}

ParameterResult ConfigurationManager::get_parameter(const std::string & node_name, const std::string & param_name) {
std::lock_guard<std::mutex> op_lock(param_operations_mutex_);
ParameterResult result;

try {
Expand Down Expand Up @@ -196,6 +198,7 @@ ParameterResult ConfigurationManager::get_parameter(const std::string & node_nam

ParameterResult ConfigurationManager::set_parameter(const std::string & node_name, const std::string & param_name,
const json & value) {
std::lock_guard<std::mutex> op_lock(param_operations_mutex_);
ParameterResult result;

try {
Expand Down Expand Up @@ -476,6 +479,7 @@ void ConfigurationManager::cache_default_values(const std::string & node_name) {
}

ParameterResult ConfigurationManager::reset_parameter(const std::string & node_name, const std::string & param_name) {
std::lock_guard<std::mutex> op_lock(param_operations_mutex_);
ParameterResult result;

try {
Expand Down Expand Up @@ -543,6 +547,7 @@ ParameterResult ConfigurationManager::reset_parameter(const std::string & node_n
}

ParameterResult ConfigurationManager::reset_all_parameters(const std::string & node_name) {
std::lock_guard<std::mutex> op_lock(param_operations_mutex_);
ParameterResult result;

try {
Expand Down
26 changes: 14 additions & 12 deletions src/ros2_medkit_gateway/src/http/rest_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,19 +284,21 @@ void RESTServer::setup_routes() {
});

// App configurations - get specific
srv->Get((api_path("/apps") + R"(/([^/]+)/configurations/([^/]+)$)"),
// Use (.+) for config_id to accept slashes from percent-encoded URLs (%2F -> /)
// ROS2 parameters like qos_overrides./parameter_events.publisher.depth contain slashes
srv->Get((api_path("/apps") + R"(/([^/]+)/configurations/(.+)$)"),
[this](const httplib::Request & req, httplib::Response & res) {
config_handlers_->handle_get_configuration(req, res);
});

// App configurations - set
srv->Put((api_path("/apps") + R"(/([^/]+)/configurations/([^/]+)$)"),
srv->Put((api_path("/apps") + R"(/([^/]+)/configurations/(.+)$)"),
[this](const httplib::Request & req, httplib::Response & res) {
config_handlers_->handle_set_configuration(req, res);
});

// App configurations - delete single
srv->Delete((api_path("/apps") + R"(/([^/]+)/configurations/([^/]+)$)"),
srv->Delete((api_path("/apps") + R"(/([^/]+)/configurations/(.+)$)"),
[this](const httplib::Request & req, httplib::Response & res) {
config_handlers_->handle_delete_configuration(req, res);
});
Expand Down Expand Up @@ -391,17 +393,17 @@ void RESTServer::setup_routes() {
config_handlers_->handle_list_configurations(req, res);
});

srv->Get((api_path("/functions") + R"(/([^/]+)/configurations/([^/]+)$)"),
srv->Get((api_path("/functions") + R"(/([^/]+)/configurations/(.+)$)"),
[this](const httplib::Request & req, httplib::Response & res) {
config_handlers_->handle_get_configuration(req, res);
});

srv->Put((api_path("/functions") + R"(/([^/]+)/configurations/([^/]+)$)"),
srv->Put((api_path("/functions") + R"(/([^/]+)/configurations/(.+)$)"),
[this](const httplib::Request & req, httplib::Response & res) {
config_handlers_->handle_set_configuration(req, res);
});

srv->Delete((api_path("/functions") + R"(/([^/]+)/configurations/([^/]+)$)"),
srv->Delete((api_path("/functions") + R"(/([^/]+)/configurations/(.+)$)"),
[this](const httplib::Request & req, httplib::Response & res) {
config_handlers_->handle_delete_configuration(req, res);
});
Expand Down Expand Up @@ -526,17 +528,17 @@ void RESTServer::setup_routes() {
config_handlers_->handle_list_configurations(req, res);
});

srv->Get((api_path("/areas") + R"(/([^/]+)/configurations/([^/]+)$)"),
srv->Get((api_path("/areas") + R"(/([^/]+)/configurations/(.+)$)"),
[this](const httplib::Request & req, httplib::Response & res) {
config_handlers_->handle_get_configuration(req, res);
});

srv->Put((api_path("/areas") + R"(/([^/]+)/configurations/([^/]+)$)"),
srv->Put((api_path("/areas") + R"(/([^/]+)/configurations/(.+)$)"),
[this](const httplib::Request & req, httplib::Response & res) {
config_handlers_->handle_set_configuration(req, res);
});

srv->Delete((api_path("/areas") + R"(/([^/]+)/configurations/([^/]+)$)"),
srv->Delete((api_path("/areas") + R"(/([^/]+)/configurations/(.+)$)"),
[this](const httplib::Request & req, httplib::Response & res) {
config_handlers_->handle_delete_configuration(req, res);
});
Expand Down Expand Up @@ -670,19 +672,19 @@ void RESTServer::setup_routes() {
});

// Get specific configuration (parameter) - register before general route
srv->Get((api_path("/components") + R"(/([^/]+)/configurations/([^/]+)$)"),
srv->Get((api_path("/components") + R"(/([^/]+)/configurations/(.+)$)"),
[this](const httplib::Request & req, httplib::Response & res) {
config_handlers_->handle_get_configuration(req, res);
});

// Set configuration (parameter)
srv->Put((api_path("/components") + R"(/([^/]+)/configurations/([^/]+)$)"),
srv->Put((api_path("/components") + R"(/([^/]+)/configurations/(.+)$)"),
[this](const httplib::Request & req, httplib::Response & res) {
config_handlers_->handle_set_configuration(req, res);
});

// Delete (reset) single configuration to default value
srv->Delete((api_path("/components") + R"(/([^/]+)/configurations/([^/]+)$)"),
srv->Delete((api_path("/components") + R"(/([^/]+)/configurations/(.+)$)"),
[this](const httplib::Request & req, httplib::Response & res) {
config_handlers_->handle_delete_configuration(req, res);
});
Expand Down
105 changes: 105 additions & 0 deletions src/ros2_medkit_gateway/test/test_configuration_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,111 @@ TEST_F(TestConfigurationManager, test_concurrent_parameter_access) {
EXPECT_GE(success_count.load(), 1);
}

TEST_F(TestConfigurationManager, test_concurrent_parameter_operations_no_executor_error) {
// Regression test: concurrent parameter operations must not cause
// "Node has already been added to an executor" error.
// SyncParametersClient internally spins param_node_ - without proper
// serialization, concurrent calls would cause executor conflicts.

node_->declare_parameter("concurrent_test_int", 0);
node_->declare_parameter("concurrent_test_str", std::string("init"));

// Warm-up: ensure parameter service is discoverable before starting concurrent operations.
// This prevents flaky failures on slow CI due to service discovery timing.
auto warmup_result = config_manager_->list_parameters("/test_config_manager_node");
ASSERT_TRUE(warmup_result.success) << "Warm-up failed: " << warmup_result.error_message;

constexpr int kNumThreads = 10;
constexpr int kOpsPerThread = 5;

std::vector<std::thread> threads;
std::atomic<int> success_count{0};
std::atomic<int> exception_count{0};
std::atomic<bool> start_flag{false};

// Spawn threads that will all start simultaneously
for (int i = 0; i < kNumThreads; ++i) {
threads.emplace_back([this, i, &success_count, &exception_count, &start_flag]() {
// Wait for all threads to be ready
while (!start_flag.load()) {
std::this_thread::yield();
}

for (int op = 0; op < kOpsPerThread; ++op) {
try {
// Mix different operations to stress test serialization
switch ((i + op) % 6) {
case 0: {
auto result = config_manager_->list_parameters("/test_config_manager_node");
if (result.success) {
success_count++;
}
break;
}
case 1: {
auto result = config_manager_->get_parameter("/test_config_manager_node", "concurrent_test_int");
if (result.success) {
success_count++;
}
break;
}
case 2: {
auto result =
config_manager_->set_parameter("/test_config_manager_node", "concurrent_test_int", nlohmann::json(i));
if (result.success) {
success_count++;
}
break;
}
case 3: {
auto result = config_manager_->get_parameter("/test_config_manager_node", "concurrent_test_str");
if (result.success) {
success_count++;
}
break;
}
case 4: {
auto result = config_manager_->reset_parameter("/test_config_manager_node", "concurrent_test_int");
if (result.success) {
success_count++;
}
break;
}
case 5: {
auto result = config_manager_->reset_all_parameters("/test_config_manager_node");
// reset_all_parameters returns success=false if some params are read-only,
// but data is still populated - count as success if no exception
if (result.success || result.data.contains("reset_count")) {
success_count++;
}
break;
}
}
} catch (const std::exception & e) {
// This should NOT happen - executor conflicts would throw here
exception_count++;
RCLCPP_ERROR(rclcpp::get_logger("test"), "Exception in concurrent test: %s", e.what());
}
}
});
}

// Start all threads simultaneously
start_flag.store(true);

for (auto & t : threads) {
t.join();
}

// No exceptions should have occurred (especially executor errors)
// This is the main assertion - if the mutex serialization is broken,
// we'd get "Node has already been added to an executor" exceptions here.
EXPECT_EQ(exception_count.load(), 0) << "Concurrent access caused exceptions (likely executor conflict)";

// All operations should succeed - we're operating on our own node which is always available.
EXPECT_EQ(success_count.load(), kNumThreads * kOpsPerThread);
}

int main(int argc, char ** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
Expand Down