Skip to content

Commit 7515310

Browse files
committed
Extract Executor base class for rclcpp-free coroutine primitives
Introduce Executor base class with resume() and post() so that Event, Mutex, and Channel depend only on pure C++ headers. Move their implementations out of co_context.hpp into their own headers. CoContext inherits from Executor — existing API unchanged. Add test_pure_headers that compiles without rclcpp on include path.
1 parent 9387a61 commit 7515310

14 files changed

Lines changed: 334 additions & 163 deletions

rclcpp_async/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,15 @@ if(BUILD_TESTING)
6363
find_package(std_srvs REQUIRED)
6464
find_package(example_interfaces REQUIRED)
6565

66+
# Level 0: Pure C++ build test (no rclcpp on include path)
67+
ament_add_gtest(test_pure_headers test/test_pure_headers.cpp)
68+
target_include_directories(test_pure_headers PRIVATE
69+
${CMAKE_CURRENT_SOURCE_DIR}/include)
70+
target_compile_features(test_pure_headers PRIVATE cxx_std_20)
71+
if(CMAKE_COMPILER_IS_GNUCXX)
72+
target_compile_options(test_pure_headers PRIVATE -fcoroutines)
73+
endif()
74+
6675
# Level 1: Pure unit tests (no ROS2 runtime)
6776
ament_add_gtest(test_result test/test_result.cpp)
6877
target_link_libraries(test_result ${PROJECT_NAME})

rclcpp_async/include/rclcpp_async/channel.hpp

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,22 @@
2424
#include <utility>
2525

2626
#include "rclcpp_async/cancelled_exception.hpp"
27+
#include "rclcpp_async/executor.hpp"
2728

2829
namespace rclcpp_async
2930
{
3031

31-
class CoContext;
32-
33-
using StopCb = std::stop_callback<std::function<void()>>;
34-
3532
template <typename T>
3633
class Channel
3734
{
38-
CoContext & ctx_;
35+
Executor & ctx_;
3936
std::mutex mutex_;
4037
std::queue<T> queue_;
4138
std::coroutine_handle<> waiter_;
4239
bool closed_ = false;
4340

4441
public:
45-
explicit Channel(CoContext & ctx) : ctx_(ctx) {}
42+
explicit Channel(Executor & ctx) : ctx_(ctx) {}
4643

4744
void send(T value);
4845
void close();
@@ -87,4 +84,69 @@ class Channel
8784
NextAwaiter next() { return NextAwaiter{*this, {}, {}, false}; }
8885
};
8986

87+
template <typename T>
88+
bool Channel<T>::NextAwaiter::await_suspend(std::coroutine_handle<> h)
89+
{
90+
{
91+
std::lock_guard lock(ch.mutex_);
92+
if (!ch.queue_.empty() || ch.closed_) {
93+
return false; // don't suspend, data already available
94+
}
95+
ch.waiter_ = h;
96+
} // release ch.mutex_ before stop_callback to avoid deadlock
97+
cancel_cb_ = std::make_shared<StopCb>(token, [this, h, &cb = cancel_cb_]() {
98+
std::coroutine_handle<> w;
99+
{
100+
std::lock_guard lock(ch.mutex_);
101+
if (ch.waiter_ != h) {
102+
return;
103+
}
104+
w = ch.waiter_;
105+
ch.waiter_ = nullptr;
106+
}
107+
if (w) {
108+
cancelled = true;
109+
ch.ctx_.post([w, weak = std::weak_ptr(cb)]() {
110+
if (weak.lock()) {
111+
w.resume();
112+
}
113+
});
114+
}
115+
});
116+
return true;
117+
}
118+
119+
template <typename T>
120+
void Channel<T>::send(T value)
121+
{
122+
std::coroutine_handle<> w;
123+
{
124+
std::lock_guard lock(mutex_);
125+
if (closed_) {
126+
return;
127+
}
128+
queue_.push(std::move(value));
129+
w = waiter_;
130+
waiter_ = nullptr;
131+
}
132+
if (w) {
133+
ctx_.post([w]() { w.resume(); });
134+
}
135+
}
136+
137+
template <typename T>
138+
void Channel<T>::close()
139+
{
140+
std::coroutine_handle<> w;
141+
{
142+
std::lock_guard lock(mutex_);
143+
closed_ = true;
144+
w = waiter_;
145+
waiter_ = nullptr;
146+
}
147+
if (w) {
148+
ctx_.post([w]() { w.resume(); });
149+
}
150+
}
151+
90152
} // namespace rclcpp_async

rclcpp_async/include/rclcpp_async/co_context.hpp

Lines changed: 4 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "rclcpp_async/cancelled_exception.hpp"
3636
#include "rclcpp_async/channel.hpp"
3737
#include "rclcpp_async/event.hpp"
38+
#include "rclcpp_async/executor.hpp"
3839
#include "rclcpp_async/goal_context.hpp"
3940
#include "rclcpp_async/goal_stream.hpp"
4041
#include "rclcpp_async/mutex.hpp"
@@ -51,8 +52,6 @@ namespace rclcpp_async
5152

5253
using namespace std::chrono_literals; // NOLINT(build/namespaces)
5354

54-
using StopCb = std::stop_callback<std::function<void()>>;
55-
5655
class DrainWaitable : public rclcpp::Waitable
5756
{
5857
rclcpp::GuardCondition gc_;
@@ -113,7 +112,7 @@ class DrainWaitable : public rclcpp::Waitable
113112
}
114113
};
115114

116-
class CoContext
115+
class CoContext : public Executor
117116
{
118117
std::shared_ptr<DrainWaitable> drain_;
119118
rclcpp::Node & node_;
@@ -129,10 +128,10 @@ class CoContext
129128
CoContext & operator=(const CoContext &) = delete;
130129

131130
// Post a callback to be executed on the executor thread (thread-safe).
132-
void post(std::function<void()> fn) { drain_->post(std::move(fn)); }
131+
void post(std::function<void()> fn) override { drain_->post(std::move(fn)); }
133132

134133
// Resume a coroutine directly. Call only from the executor thread.
135-
void resume(std::coroutine_handle<> h) { h.resume(); }
134+
void resume(std::coroutine_handle<> h) override { h.resume(); }
136135

137136
rclcpp::Node & node() { return node_; }
138137

@@ -336,24 +335,6 @@ class CoContext
336335
}
337336
};
338337

339-
template <typename DonePred, typename CancelAction>
340-
inline void register_cancel(
341-
std::shared_ptr<StopCb> & out, std::stop_token token, CoContext & ctx, std::coroutine_handle<> h,
342-
DonePred is_done, CancelAction action)
343-
{
344-
out = std::make_shared<StopCb>(token, [&ctx, h, is_done, action, &out]() {
345-
ctx.post([&ctx, h, is_done, action, weak = std::weak_ptr(out)]() {
346-
if (is_done()) {
347-
return;
348-
}
349-
action();
350-
if (weak.lock()) {
351-
ctx.resume(h);
352-
}
353-
});
354-
});
355-
}
356-
357338
template <typename ServiceT>
358339
void SendRequestAwaiter<ServiceT>::await_suspend(std::coroutine_handle<> h)
359340
{
@@ -505,51 +486,6 @@ void SendGoalAwaiter<ActionT>::await_suspend(std::coroutine_handle<> h)
505486
});
506487
}
507488

508-
inline void Event::WaitAwaiter::await_suspend(std::coroutine_handle<> h)
509-
{
510-
active = std::make_shared<bool>(true);
511-
event.waiters_.push({h, active});
512-
auto a = active;
513-
register_cancel(cancel_cb_, token, event.ctx_, h, [a]() { return !*a; }, [a]() { *a = false; });
514-
}
515-
516-
inline void Event::set()
517-
{
518-
set_ = true;
519-
std::queue<Waiter> pending;
520-
pending.swap(waiters_);
521-
while (!pending.empty()) {
522-
auto w = pending.front();
523-
pending.pop();
524-
if (*w.active) {
525-
*w.active = false;
526-
ctx_.resume(w.handle);
527-
}
528-
}
529-
}
530-
531-
inline void Mutex::LockAwaiter::await_suspend(std::coroutine_handle<> h)
532-
{
533-
active = std::make_shared<bool>(true);
534-
mutex.waiters_.push({h, active});
535-
auto a = active;
536-
register_cancel(cancel_cb_, token, mutex.ctx_, h, [a]() { return !*a; }, [a]() { *a = false; });
537-
}
538-
539-
inline void Mutex::unlock()
540-
{
541-
while (!waiters_.empty()) {
542-
auto w = waiters_.front();
543-
waiters_.pop();
544-
if (*w.active) {
545-
*w.active = false;
546-
ctx_.resume(w.handle);
547-
return;
548-
}
549-
}
550-
locked_ = false;
551-
}
552-
553489
inline void TimerStream::NextAwaiter::await_suspend(std::coroutine_handle<> h)
554490
{
555491
stream.waiter_ = h;
@@ -567,69 +503,4 @@ inline void TimerStream::NextAwaiter::await_suspend(std::coroutine_handle<> h)
567503
});
568504
}
569505

570-
template <typename T>
571-
bool Channel<T>::NextAwaiter::await_suspend(std::coroutine_handle<> h)
572-
{
573-
{
574-
std::lock_guard lock(ch.mutex_);
575-
if (!ch.queue_.empty() || ch.closed_) {
576-
return false; // don't suspend, data already available
577-
}
578-
ch.waiter_ = h;
579-
} // release ch.mutex_ before stop_callback to avoid deadlock
580-
cancel_cb_ = std::make_shared<StopCb>(token, [this, h, &cb = cancel_cb_]() {
581-
std::coroutine_handle<> w;
582-
{
583-
std::lock_guard lock(ch.mutex_);
584-
if (ch.waiter_ != h) {
585-
return;
586-
}
587-
w = ch.waiter_;
588-
ch.waiter_ = nullptr;
589-
}
590-
if (w) {
591-
cancelled = true;
592-
ch.ctx_.post([w, weak = std::weak_ptr(cb)]() {
593-
if (weak.lock()) {
594-
w.resume();
595-
}
596-
});
597-
}
598-
});
599-
return true;
600-
}
601-
602-
template <typename T>
603-
void Channel<T>::send(T value)
604-
{
605-
std::coroutine_handle<> w;
606-
{
607-
std::lock_guard lock(mutex_);
608-
if (closed_) {
609-
return;
610-
}
611-
queue_.push(std::move(value));
612-
w = waiter_;
613-
waiter_ = nullptr;
614-
}
615-
if (w) {
616-
ctx_.post([w]() { w.resume(); });
617-
}
618-
}
619-
620-
template <typename T>
621-
void Channel<T>::close()
622-
{
623-
std::coroutine_handle<> w;
624-
{
625-
std::lock_guard lock(mutex_);
626-
closed_ = true;
627-
w = waiter_;
628-
waiter_ = nullptr;
629-
}
630-
if (w) {
631-
ctx_.post([w]() { w.resume(); });
632-
}
633-
}
634-
635506
} // namespace rclcpp_async

rclcpp_async/include/rclcpp_async/event.hpp

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,14 @@
2323
#include <utility>
2424

2525
#include "rclcpp_async/cancelled_exception.hpp"
26+
#include "rclcpp_async/executor.hpp"
2627

2728
namespace rclcpp_async
2829
{
2930

30-
class CoContext;
31-
32-
using StopCb = std::stop_callback<std::function<void()>>;
33-
3431
class Event
3532
{
36-
CoContext & ctx_;
33+
Executor & ctx_;
3734
bool set_ = false;
3835

3936
struct Waiter
@@ -44,7 +41,7 @@ class Event
4441
std::queue<Waiter> waiters_;
4542

4643
public:
47-
explicit Event(CoContext & ctx) : ctx_(ctx) {}
44+
explicit Event(Executor & ctx) : ctx_(ctx) {}
4845

4946
bool is_set() const { return set_; }
5047

@@ -83,4 +80,27 @@ class Event
8380
WaitAwaiter wait() { return WaitAwaiter{*this, {}, {}, nullptr}; }
8481
};
8582

83+
inline void Event::WaitAwaiter::await_suspend(std::coroutine_handle<> h)
84+
{
85+
active = std::make_shared<bool>(true);
86+
event.waiters_.push({h, active});
87+
auto a = active;
88+
register_cancel(cancel_cb_, token, event.ctx_, h, [a]() { return !*a; }, [a]() { *a = false; });
89+
}
90+
91+
inline void Event::set()
92+
{
93+
set_ = true;
94+
std::queue<Waiter> pending;
95+
pending.swap(waiters_);
96+
while (!pending.empty()) {
97+
auto w = pending.front();
98+
pending.pop();
99+
if (*w.active) {
100+
*w.active = false;
101+
ctx_.resume(w.handle);
102+
}
103+
}
104+
}
105+
86106
} // namespace rclcpp_async

0 commit comments

Comments
 (0)