diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index cfd7ea81fd..27f9378e34 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -550,6 +550,16 @@ class Executor AnyExecutable & any_executable, std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + /** + * This function triggers a recollect of all entities + * that are registered to the executor. + * + * Calling this function is thread safe. + * + * @param notify if true will execute a trigger that will wake up a waiting executor + */ + void trigger_entity_recollect(bool notify); + /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. std::atomic_bool spinning; diff --git a/rclcpp/include/rclcpp/wait_result.hpp b/rclcpp/include/rclcpp/wait_result.hpp index 74d3d2183c..3384a7846a 100644 --- a/rclcpp/include/rclcpp/wait_result.hpp +++ b/rclcpp/include/rclcpp/wait_result.hpp @@ -274,7 +274,7 @@ class WaitResult final if (this->kind() == WaitResultKind::Ready) { auto & wait_set = this->get_wait_set(); - auto rcl_wait_set = wait_set.get_rcl_wait_set(); + auto & rcl_wait_set = wait_set.get_rcl_wait_set(); while (next_waitable_index_ < wait_set.size_of_waitables()) { auto cur_waitable = wait_set.waitables(next_waitable_index_++); if (cur_waitable != nullptr && cur_waitable->is_ready(rcl_wait_set)) { diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index a54d71e21b..ca094be96f 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -72,13 +73,10 @@ Executor::Executor(const rclcpp::ExecutorOptions & options) } }); - notify_waitable_->set_on_ready_callback( - [this](auto, auto) { - this->entities_need_rebuild_.store(true); - }); - notify_waitable_->add_guard_condition(interrupt_guard_condition_); notify_waitable_->add_guard_condition(shutdown_guard_condition_); + + wait_set_.add_waitable(notify_waitable_); } Executor::~Executor() @@ -122,6 +120,20 @@ Executor::~Executor() } } +void Executor::trigger_entity_recollect(bool notify) +{ + this->entities_need_rebuild_.store(true); + + if (!spinning.load() && entities_need_rebuild_.exchange(false)) { + std::lock_guard guard(mutex_); + this->collect_entities(); + } + + if(notify) { + interrupt_guard_condition_->trigger(); + } +} + std::vector Executor::get_all_callback_groups() { @@ -152,19 +164,12 @@ Executor::add_callback_group( (void) node_ptr; this->collector_.add_callback_group(group_ptr); - if (!spinning.load()) { - std::lock_guard guard(mutex_); - this->collect_entities(); - } - - if (notify) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( - std::string( - "Failed to trigger guard condition on callback group add: ") + ex.what()); - } + try { + this->trigger_entity_recollect(notify); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group add: ") + ex.what()); } } @@ -173,19 +178,12 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt { this->collector_.add_node(node_ptr); - if (!spinning.load()) { - std::lock_guard guard(mutex_); - this->collect_entities(); - } - - if (notify) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( + try { + this->trigger_entity_recollect(notify); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( std::string( "Failed to trigger guard condition on node add: ") + ex.what()); - } } } @@ -196,18 +194,12 @@ Executor::remove_callback_group( { this->collector_.remove_callback_group(group_ptr); - if (!spinning.load()) { - std::lock_guard guard(mutex_); - this->collect_entities(); - } - if (notify) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( + try { + this->trigger_entity_recollect(notify); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( std::string( "Failed to trigger guard condition on callback group remove: ") + ex.what()); - } } } @@ -222,19 +214,12 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node { this->collector_.remove_node(node_ptr); - if (!spinning.load()) { - std::lock_guard guard(mutex_); - this->collect_entities(); - } - - if (notify) { - try { - interrupt_guard_condition_->trigger(); - } catch (const rclcpp::exceptions::RCLError & ex) { - throw std::runtime_error( + try { + this->trigger_entity_recollect(notify); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( std::string( "Failed to trigger guard condition on node remove: ") + ex.what()); - } } } @@ -379,6 +364,9 @@ Executor::execute_any_executable(AnyExecutable & any_exec) return; } + assert((void("Internal error, tried to execute a AnyExecutable without a valid callback group"), + any_exec.callback_group)); + if (any_exec.timer) { TRACETOOLS_TRACEPOINT( rclcpp_executor_execute, @@ -403,9 +391,7 @@ Executor::execute_any_executable(AnyExecutable & any_exec) } // Reset the callback_group, regardless of type - if (any_exec.callback_group) { - any_exec.callback_group->can_be_taken_from().store(true); - } + any_exec.callback_group->can_be_taken_from().store(true); } template @@ -642,7 +628,6 @@ Executor::collect_entities() // In the case that an entity already has an expired weak pointer // before being removed from the waitset, additionally prune the waitset. this->wait_set_.prune_deleted_entities(); - this->entities_need_rebuild_.store(false); } void @@ -655,7 +640,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) { std::lock_guard guard(mutex_); - if (current_collection_.empty() || this->entities_need_rebuild_.load()) { + if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) { this->collect_entities(); } } @@ -664,6 +649,13 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) RCUTILS_LOG_WARN_NAMED( "rclcpp", "empty wait set received in wait(). This should never happen."); + } else { + if(this->wait_result_->kind() == WaitResultKind::Ready && notify_waitable_) { + auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set(); + if(notify_waitable_->is_ready(rcl_wait_set)) { + notify_waitable_->execute(notify_waitable_->take_data()); + } + } } } @@ -689,7 +681,8 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get()); if (entity_iter != current_collection_.timers.end()) { auto callback_group = entity_iter->second.callback_group.lock(); - if (callback_group && !callback_group->can_be_taken_from()) { + if (!callback_group || !callback_group->can_be_taken_from()) { + current_timer_index++; continue; } // At this point the timer is either ready for execution or was perhaps @@ -699,6 +692,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) wait_result_->clear_timer_with_index(current_timer_index); // Check that the timer should be called still, i.e. it wasn't canceled. if (!timer->call()) { + current_timer_index++; continue; } any_executable.timer = timer; @@ -706,6 +700,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) valid_executable = true; break; } + current_timer_index++; } } @@ -715,7 +710,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) subscription->get_subscription_handle().get()); if (entity_iter != current_collection_.subscriptions.end()) { auto callback_group = entity_iter->second.callback_group.lock(); - if (callback_group && !callback_group->can_be_taken_from()) { + if (!callback_group || !callback_group->can_be_taken_from()) { continue; } any_executable.subscription = subscription; @@ -731,7 +726,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) auto entity_iter = current_collection_.services.find(service->get_service_handle().get()); if (entity_iter != current_collection_.services.end()) { auto callback_group = entity_iter->second.callback_group.lock(); - if (callback_group && !callback_group->can_be_taken_from()) { + if (!callback_group || !callback_group->can_be_taken_from()) { continue; } any_executable.service = service; @@ -747,7 +742,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) auto entity_iter = current_collection_.clients.find(client->get_client_handle().get()); if (entity_iter != current_collection_.clients.end()) { auto callback_group = entity_iter->second.callback_group.lock(); - if (callback_group && !callback_group->can_be_taken_from()) { + if (!callback_group || !callback_group->can_be_taken_from()) { continue; } any_executable.client = client; @@ -763,7 +758,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable) auto entity_iter = current_collection_.waitables.find(waitable.get()); if (entity_iter != current_collection_.waitables.end()) { auto callback_group = entity_iter->second.callback_group.lock(); - if (callback_group && !callback_group->can_be_taken_from()) { + if (!callback_group || !callback_group->can_be_taken_from()) { continue; } any_executable.waitable = waitable; diff --git a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp index 765002b2ef..68ac56b656 100644 --- a/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp +++ b/rclcpp/src/rclcpp/executors/executor_entities_collection.cpp @@ -153,7 +153,7 @@ ready_executables( continue; } auto group_info = group_cache(entity_iter->second.callback_group); - if (group_info && !group_info->can_be_taken_from().load()) { + if (!group_info || !group_info->can_be_taken_from().load()) { continue; } if (!entity->call()) { @@ -176,7 +176,7 @@ ready_executables( continue; } auto group_info = group_cache(entity_iter->second.callback_group); - if (group_info && !group_info->can_be_taken_from().load()) { + if (!group_info || !group_info->can_be_taken_from().load()) { continue; } rclcpp::AnyExecutable exec; @@ -196,7 +196,7 @@ ready_executables( continue; } auto group_info = group_cache(entity_iter->second.callback_group); - if (group_info && !group_info->can_be_taken_from().load()) { + if (!group_info || !group_info->can_be_taken_from().load()) { continue; } rclcpp::AnyExecutable exec; @@ -216,7 +216,7 @@ ready_executables( continue; } auto group_info = group_cache(entity_iter->second.callback_group); - if (group_info && !group_info->can_be_taken_from().load()) { + if (!group_info || !group_info->can_be_taken_from().load()) { continue; } rclcpp::AnyExecutable exec; @@ -236,7 +236,7 @@ ready_executables( continue; } auto group_info = group_cache(entry.callback_group); - if (group_info && !group_info->can_be_taken_from().load()) { + if (!group_info || !group_info->can_be_taken_from().load()) { continue; } rclcpp::AnyExecutable exec; diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index 831076c61c..b8fe92cb09 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -110,7 +110,7 @@ StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout) std::optional> StaticSingleThreadedExecutor::collect_and_wait(std::chrono::nanoseconds timeout) { - if (current_collection_.empty() || this->entities_need_rebuild_.load()) { + if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) { this->collect_entities(); } auto wait_result = wait_set_.wait(std::chrono::nanoseconds(timeout)); diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index d52c94b1d2..ea29f7a288 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -473,6 +473,15 @@ if(TARGET test_executors) target_link_libraries(test_executors_timer_cancel_behavior ${PROJECT_NAME} ${rosgraph_msgs_TARGETS}) endif() +ament_add_gtest( + test_executors_callback_group_behavior + executors/test_executors_callback_group_behavior.cpp + APPEND_LIBRARY_DIRS "${append_library_dirs}" + TIMEOUT 180) +if(TARGET test_executors) + target_link_libraries(test_executors_callback_group_behavior ${PROJECT_NAME}) +endif() + ament_add_gtest( test_executors_intraprocess executors/test_executors_intraprocess.cpp diff --git a/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp b/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp new file mode 100644 index 0000000000..f5b189c2af --- /dev/null +++ b/rclcpp/test/rclcpp/executors/test_executors_callback_group_behavior.cpp @@ -0,0 +1,139 @@ +// Copyright 2024 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This test checks that when callback groups go out of scope, that their associated executable + * entities should not be returned as valid executables. + * + * The test makes use of a bit of executor internals, but is meant to prevent regressions of behavior. + * Ref: https://github.com/ros2/rclcpp/issues/2474 + */ + +#include + +#include +#include +#include +#include + +class CustomExecutor: public rclcpp::Executor +{ +public: + explicit CustomExecutor(const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions()) + : rclcpp::Executor(options) + { + } + + ~CustomExecutor() override = default; + + void spin() override {}; + + void collect() { + this->collect_entities(); + } + + void wait() { + this->wait_for_work(std::chrono::milliseconds(10)); + } + + size_t collected_timers() const { + return this->current_collection_.timers.size(); + } + + rclcpp::AnyExecutable next() { + rclcpp::AnyExecutable ret; + this->get_next_ready_executable(ret); + return ret; + } +}; + + +TEST(TestCallbackGroup, valid_callback_group) +{ + rclcpp::init(0, nullptr); + + // Create a timer associated with a callback group + auto node = std::make_shared("node"); + + // Add the callback group to the executor + auto executor = CustomExecutor(); + executor.add_node(node); + executor.spin_all(std::chrono::milliseconds(10)); + + auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, true); + auto timer = node->create_wall_timer(std::chrono::milliseconds(1), [](){}, cbg); + executor.add_callback_group(cbg, node->get_node_base_interface()); + + executor.spin_all(std::chrono::milliseconds(10)); + + // Collect the entities + executor.collect(); + EXPECT_EQ(1u, executor.collected_timers()); + + executor.wait(); + auto next_executable = executor.next(); + EXPECT_EQ(timer, next_executable.timer); + EXPECT_EQ(cbg, next_executable.callback_group); + + EXPECT_EQ(nullptr, next_executable.client); + EXPECT_EQ(nullptr, next_executable.service); + EXPECT_EQ(nullptr, next_executable.subscription); + EXPECT_EQ(nullptr, next_executable.waitable); + EXPECT_EQ(nullptr, next_executable.data); + + rclcpp::shutdown(); +} + +TEST(TestCallbackGroup, invalid_callback_group) +{ + rclcpp::init(0, nullptr); + + // Create a timer associated with a callback group + auto node = std::make_shared("node"); + + // Add the callback group to the executor + auto executor = CustomExecutor(); + executor.add_node(node); + executor.spin_all(std::chrono::milliseconds(10)); + + auto cbg = node->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false); + auto timer = node->create_wall_timer(std::chrono::milliseconds(1), [](){}, cbg); + executor.add_callback_group(cbg, nullptr); + + executor.spin_all(std::chrono::milliseconds(10)); + + // Collect the entities + executor.collect(); + EXPECT_EQ(1u, executor.collected_timers()); + + executor.wait(); + + cbg.reset(); + + // Since the callback group has been reset, this should not be allowed to + // be a valid executable (timer and cbg should both be nullptr). + // In the regression, timer == next_executable.timer whil + // next_executable.callback_group == nullptr, which was incorrect. + auto next_executable = executor.next(); + EXPECT_EQ(nullptr, next_executable.timer); + EXPECT_EQ(nullptr, next_executable.callback_group); + + EXPECT_EQ(nullptr, next_executable.client); + EXPECT_EQ(nullptr, next_executable.service); + EXPECT_EQ(nullptr, next_executable.subscription); + EXPECT_EQ(nullptr, next_executable.waitable); + EXPECT_EQ(nullptr, next_executable.data); + + rclcpp::shutdown(); +}