Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions rclcpp_async/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ if(BUILD_TESTING)
find_package(std_srvs REQUIRED)
find_package(example_interfaces REQUIRED)

# Level 0: Pure C++ build test (no rclcpp on include path)
ament_add_gtest(test_pure_headers test/test_pure_headers.cpp)
target_include_directories(test_pure_headers PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/include)
target_compile_features(test_pure_headers PRIVATE cxx_std_20)
if(CMAKE_COMPILER_IS_GNUCXX)
target_compile_options(test_pure_headers PRIVATE -fcoroutines)
endif()

# Level 1: Pure unit tests (no ROS2 runtime)
ament_add_gtest(test_result test/test_result.cpp)
target_link_libraries(test_result ${PROJECT_NAME})
Expand Down
74 changes: 68 additions & 6 deletions rclcpp_async/include/rclcpp_async/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,22 @@
#include <utility>

#include "rclcpp_async/cancelled_exception.hpp"
#include "rclcpp_async/executor.hpp"

namespace rclcpp_async
{

class CoContext;

using StopCb = std::stop_callback<std::function<void()>>;

template <typename T>
class Channel
{
CoContext & ctx_;
Executor & ctx_;
std::mutex mutex_;
std::queue<T> queue_;
std::coroutine_handle<> waiter_;
bool closed_ = false;

public:
explicit Channel(CoContext & ctx) : ctx_(ctx) {}
explicit Channel(Executor & ctx) : ctx_(ctx) {}

void send(T value);
void close();
Expand Down Expand Up @@ -87,4 +84,69 @@ class Channel
NextAwaiter next() { return NextAwaiter{*this, {}, {}, false}; }
};

template <typename T>
bool Channel<T>::NextAwaiter::await_suspend(std::coroutine_handle<> h)
{
{
std::lock_guard lock(ch.mutex_);
if (!ch.queue_.empty() || ch.closed_) {
return false; // don't suspend, data already available
}
ch.waiter_ = h;
} // release ch.mutex_ before stop_callback to avoid deadlock
cancel_cb_ = std::make_shared<StopCb>(token, [this, h, &cb = cancel_cb_]() {
std::coroutine_handle<> w;
{
std::lock_guard lock(ch.mutex_);
if (ch.waiter_ != h) {
return;
}
w = ch.waiter_;
ch.waiter_ = nullptr;
}
if (w) {
cancelled = true;
ch.ctx_.post([w, weak = std::weak_ptr(cb)]() {
if (weak.lock()) {
w.resume();
}
});
}
});
return true;
}

template <typename T>
void Channel<T>::send(T value)
{
std::coroutine_handle<> w;
{
std::lock_guard lock(mutex_);
if (closed_) {
return;
}
queue_.push(std::move(value));
w = waiter_;
waiter_ = nullptr;
}
if (w) {
ctx_.post([w]() { w.resume(); });
}
}

template <typename T>
void Channel<T>::close()
{
std::coroutine_handle<> w;
{
std::lock_guard lock(mutex_);
closed_ = true;
w = waiter_;
waiter_ = nullptr;
}
if (w) {
ctx_.post([w]() { w.resume(); });
}
}

} // namespace rclcpp_async
137 changes: 4 additions & 133 deletions rclcpp_async/include/rclcpp_async/co_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "rclcpp_async/cancelled_exception.hpp"
#include "rclcpp_async/channel.hpp"
#include "rclcpp_async/event.hpp"
#include "rclcpp_async/executor.hpp"
#include "rclcpp_async/goal_context.hpp"
#include "rclcpp_async/goal_stream.hpp"
#include "rclcpp_async/mutex.hpp"
Expand All @@ -51,8 +52,6 @@ namespace rclcpp_async

using namespace std::chrono_literals; // NOLINT(build/namespaces)

using StopCb = std::stop_callback<std::function<void()>>;

class DrainWaitable : public rclcpp::Waitable
{
rclcpp::GuardCondition gc_;
Expand Down Expand Up @@ -113,7 +112,7 @@ class DrainWaitable : public rclcpp::Waitable
}
};

class CoContext
class CoContext : public Executor
{
std::shared_ptr<DrainWaitable> drain_;
rclcpp::Node & node_;
Expand All @@ -129,10 +128,10 @@ class CoContext
CoContext & operator=(const CoContext &) = delete;

// Post a callback to be executed on the executor thread (thread-safe).
void post(std::function<void()> fn) { drain_->post(std::move(fn)); }
void post(std::function<void()> fn) override { drain_->post(std::move(fn)); }

// Resume a coroutine directly. Call only from the executor thread.
void resume(std::coroutine_handle<> h) { h.resume(); }
void resume(std::coroutine_handle<> h) override { h.resume(); }

rclcpp::Node & node() { return node_; }

Expand Down Expand Up @@ -336,24 +335,6 @@ class CoContext
}
};

template <typename DonePred, typename CancelAction>
inline void register_cancel(
std::shared_ptr<StopCb> & out, std::stop_token token, CoContext & ctx, std::coroutine_handle<> h,
DonePred is_done, CancelAction action)
{
out = std::make_shared<StopCb>(token, [&ctx, h, is_done, action, &out]() {
ctx.post([&ctx, h, is_done, action, weak = std::weak_ptr(out)]() {
if (is_done()) {
return;
}
action();
if (weak.lock()) {
ctx.resume(h);
}
});
});
}

template <typename ServiceT>
void SendRequestAwaiter<ServiceT>::await_suspend(std::coroutine_handle<> h)
{
Expand Down Expand Up @@ -505,51 +486,6 @@ void SendGoalAwaiter<ActionT>::await_suspend(std::coroutine_handle<> h)
});
}

inline void Event::WaitAwaiter::await_suspend(std::coroutine_handle<> h)
{
active = std::make_shared<bool>(true);
event.waiters_.push({h, active});
auto a = active;
register_cancel(cancel_cb_, token, event.ctx_, h, [a]() { return !*a; }, [a]() { *a = false; });
}

inline void Event::set()
{
set_ = true;
std::queue<Waiter> pending;
pending.swap(waiters_);
while (!pending.empty()) {
auto w = pending.front();
pending.pop();
if (*w.active) {
*w.active = false;
ctx_.resume(w.handle);
}
}
}

inline void Mutex::LockAwaiter::await_suspend(std::coroutine_handle<> h)
{
active = std::make_shared<bool>(true);
mutex.waiters_.push({h, active});
auto a = active;
register_cancel(cancel_cb_, token, mutex.ctx_, h, [a]() { return !*a; }, [a]() { *a = false; });
}

inline void Mutex::unlock()
{
while (!waiters_.empty()) {
auto w = waiters_.front();
waiters_.pop();
if (*w.active) {
*w.active = false;
ctx_.resume(w.handle);
return;
}
}
locked_ = false;
}

inline void TimerStream::NextAwaiter::await_suspend(std::coroutine_handle<> h)
{
stream.waiter_ = h;
Expand All @@ -567,69 +503,4 @@ inline void TimerStream::NextAwaiter::await_suspend(std::coroutine_handle<> h)
});
}

template <typename T>
bool Channel<T>::NextAwaiter::await_suspend(std::coroutine_handle<> h)
{
{
std::lock_guard lock(ch.mutex_);
if (!ch.queue_.empty() || ch.closed_) {
return false; // don't suspend, data already available
}
ch.waiter_ = h;
} // release ch.mutex_ before stop_callback to avoid deadlock
cancel_cb_ = std::make_shared<StopCb>(token, [this, h, &cb = cancel_cb_]() {
std::coroutine_handle<> w;
{
std::lock_guard lock(ch.mutex_);
if (ch.waiter_ != h) {
return;
}
w = ch.waiter_;
ch.waiter_ = nullptr;
}
if (w) {
cancelled = true;
ch.ctx_.post([w, weak = std::weak_ptr(cb)]() {
if (weak.lock()) {
w.resume();
}
});
}
});
return true;
}

template <typename T>
void Channel<T>::send(T value)
{
std::coroutine_handle<> w;
{
std::lock_guard lock(mutex_);
if (closed_) {
return;
}
queue_.push(std::move(value));
w = waiter_;
waiter_ = nullptr;
}
if (w) {
ctx_.post([w]() { w.resume(); });
}
}

template <typename T>
void Channel<T>::close()
{
std::coroutine_handle<> w;
{
std::lock_guard lock(mutex_);
closed_ = true;
w = waiter_;
waiter_ = nullptr;
}
if (w) {
ctx_.post([w]() { w.resume(); });
}
}

} // namespace rclcpp_async
32 changes: 26 additions & 6 deletions rclcpp_async/include/rclcpp_async/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,14 @@
#include <utility>

#include "rclcpp_async/cancelled_exception.hpp"
#include "rclcpp_async/executor.hpp"

namespace rclcpp_async
{

class CoContext;

using StopCb = std::stop_callback<std::function<void()>>;

class Event
{
CoContext & ctx_;
Executor & ctx_;
bool set_ = false;

struct Waiter
Expand All @@ -44,7 +41,7 @@ class Event
std::queue<Waiter> waiters_;

public:
explicit Event(CoContext & ctx) : ctx_(ctx) {}
explicit Event(Executor & ctx) : ctx_(ctx) {}

bool is_set() const { return set_; }

Expand Down Expand Up @@ -83,4 +80,27 @@ class Event
WaitAwaiter wait() { return WaitAwaiter{*this, {}, {}, nullptr}; }
};

inline void Event::WaitAwaiter::await_suspend(std::coroutine_handle<> h)
{
active = std::make_shared<bool>(true);
event.waiters_.push({h, active});
auto a = active;
register_cancel(cancel_cb_, token, event.ctx_, h, [a]() { return !*a; }, [a]() { *a = false; });
}

inline void Event::set()
{
set_ = true;
std::queue<Waiter> pending;
pending.swap(waiters_);
while (!pending.empty()) {
auto w = pending.front();
pending.pop();
if (*w.active) {
*w.active = false;
ctx_.resume(w.handle);
}
}
}

} // namespace rclcpp_async
Loading
Loading