diff --git a/rclcpp_async/CMakeLists.txt b/rclcpp_async/CMakeLists.txt index 3800a7a..4d80d55 100644 --- a/rclcpp_async/CMakeLists.txt +++ b/rclcpp_async/CMakeLists.txt @@ -63,6 +63,15 @@ if(BUILD_TESTING) find_package(std_srvs REQUIRED) find_package(example_interfaces REQUIRED) + # Level 0: Pure C++ build test (no rclcpp on include path) + ament_add_gtest(test_pure_headers test/test_pure_headers.cpp) + target_include_directories(test_pure_headers PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/include) + target_compile_features(test_pure_headers PRIVATE cxx_std_20) + if(CMAKE_COMPILER_IS_GNUCXX) + target_compile_options(test_pure_headers PRIVATE -fcoroutines) + endif() + # Level 1: Pure unit tests (no ROS2 runtime) ament_add_gtest(test_result test/test_result.cpp) target_link_libraries(test_result ${PROJECT_NAME}) diff --git a/rclcpp_async/include/rclcpp_async/channel.hpp b/rclcpp_async/include/rclcpp_async/channel.hpp index 24f8fc8..db4815a 100644 --- a/rclcpp_async/include/rclcpp_async/channel.hpp +++ b/rclcpp_async/include/rclcpp_async/channel.hpp @@ -24,25 +24,22 @@ #include #include "rclcpp_async/cancelled_exception.hpp" +#include "rclcpp_async/executor.hpp" namespace rclcpp_async { -class CoContext; - -using StopCb = std::stop_callback>; - template class Channel { - CoContext & ctx_; + Executor & ctx_; std::mutex mutex_; std::queue queue_; std::coroutine_handle<> waiter_; bool closed_ = false; public: - explicit Channel(CoContext & ctx) : ctx_(ctx) {} + explicit Channel(Executor & ctx) : ctx_(ctx) {} void send(T value); void close(); @@ -87,4 +84,69 @@ class Channel NextAwaiter next() { return NextAwaiter{*this, {}, {}, false}; } }; +template +bool Channel::NextAwaiter::await_suspend(std::coroutine_handle<> h) +{ + { + std::lock_guard lock(ch.mutex_); + if (!ch.queue_.empty() || ch.closed_) { + return false; // don't suspend, data already available + } + ch.waiter_ = h; + } // release ch.mutex_ before stop_callback to avoid deadlock + cancel_cb_ = std::make_shared(token, [this, h, &cb = cancel_cb_]() { + std::coroutine_handle<> w; + { + std::lock_guard lock(ch.mutex_); + if (ch.waiter_ != h) { + return; + } + w = ch.waiter_; + ch.waiter_ = nullptr; + } + if (w) { + cancelled = true; + ch.ctx_.post([w, weak = std::weak_ptr(cb)]() { + if (weak.lock()) { + w.resume(); + } + }); + } + }); + return true; +} + +template +void Channel::send(T value) +{ + std::coroutine_handle<> w; + { + std::lock_guard lock(mutex_); + if (closed_) { + return; + } + queue_.push(std::move(value)); + w = waiter_; + waiter_ = nullptr; + } + if (w) { + ctx_.post([w]() { w.resume(); }); + } +} + +template +void Channel::close() +{ + std::coroutine_handle<> w; + { + std::lock_guard lock(mutex_); + closed_ = true; + w = waiter_; + waiter_ = nullptr; + } + if (w) { + ctx_.post([w]() { w.resume(); }); + } +} + } // namespace rclcpp_async diff --git a/rclcpp_async/include/rclcpp_async/co_context.hpp b/rclcpp_async/include/rclcpp_async/co_context.hpp index 6f90f4c..85b0073 100644 --- a/rclcpp_async/include/rclcpp_async/co_context.hpp +++ b/rclcpp_async/include/rclcpp_async/co_context.hpp @@ -35,6 +35,7 @@ #include "rclcpp_async/cancelled_exception.hpp" #include "rclcpp_async/channel.hpp" #include "rclcpp_async/event.hpp" +#include "rclcpp_async/executor.hpp" #include "rclcpp_async/goal_context.hpp" #include "rclcpp_async/goal_stream.hpp" #include "rclcpp_async/mutex.hpp" @@ -51,8 +52,6 @@ namespace rclcpp_async using namespace std::chrono_literals; // NOLINT(build/namespaces) -using StopCb = std::stop_callback>; - class DrainWaitable : public rclcpp::Waitable { rclcpp::GuardCondition gc_; @@ -113,7 +112,7 @@ class DrainWaitable : public rclcpp::Waitable } }; -class CoContext +class CoContext : public Executor { std::shared_ptr drain_; rclcpp::Node & node_; @@ -129,10 +128,10 @@ class CoContext CoContext & operator=(const CoContext &) = delete; // Post a callback to be executed on the executor thread (thread-safe). - void post(std::function fn) { drain_->post(std::move(fn)); } + void post(std::function fn) override { drain_->post(std::move(fn)); } // Resume a coroutine directly. Call only from the executor thread. - void resume(std::coroutine_handle<> h) { h.resume(); } + void resume(std::coroutine_handle<> h) override { h.resume(); } rclcpp::Node & node() { return node_; } @@ -336,24 +335,6 @@ class CoContext } }; -template -inline void register_cancel( - std::shared_ptr & out, std::stop_token token, CoContext & ctx, std::coroutine_handle<> h, - DonePred is_done, CancelAction action) -{ - out = std::make_shared(token, [&ctx, h, is_done, action, &out]() { - ctx.post([&ctx, h, is_done, action, weak = std::weak_ptr(out)]() { - if (is_done()) { - return; - } - action(); - if (weak.lock()) { - ctx.resume(h); - } - }); - }); -} - template void SendRequestAwaiter::await_suspend(std::coroutine_handle<> h) { @@ -505,51 +486,6 @@ void SendGoalAwaiter::await_suspend(std::coroutine_handle<> h) }); } -inline void Event::WaitAwaiter::await_suspend(std::coroutine_handle<> h) -{ - active = std::make_shared(true); - event.waiters_.push({h, active}); - auto a = active; - register_cancel(cancel_cb_, token, event.ctx_, h, [a]() { return !*a; }, [a]() { *a = false; }); -} - -inline void Event::set() -{ - set_ = true; - std::queue pending; - pending.swap(waiters_); - while (!pending.empty()) { - auto w = pending.front(); - pending.pop(); - if (*w.active) { - *w.active = false; - ctx_.resume(w.handle); - } - } -} - -inline void Mutex::LockAwaiter::await_suspend(std::coroutine_handle<> h) -{ - active = std::make_shared(true); - mutex.waiters_.push({h, active}); - auto a = active; - register_cancel(cancel_cb_, token, mutex.ctx_, h, [a]() { return !*a; }, [a]() { *a = false; }); -} - -inline void Mutex::unlock() -{ - while (!waiters_.empty()) { - auto w = waiters_.front(); - waiters_.pop(); - if (*w.active) { - *w.active = false; - ctx_.resume(w.handle); - return; - } - } - locked_ = false; -} - inline void TimerStream::NextAwaiter::await_suspend(std::coroutine_handle<> h) { stream.waiter_ = h; @@ -567,69 +503,4 @@ inline void TimerStream::NextAwaiter::await_suspend(std::coroutine_handle<> h) }); } -template -bool Channel::NextAwaiter::await_suspend(std::coroutine_handle<> h) -{ - { - std::lock_guard lock(ch.mutex_); - if (!ch.queue_.empty() || ch.closed_) { - return false; // don't suspend, data already available - } - ch.waiter_ = h; - } // release ch.mutex_ before stop_callback to avoid deadlock - cancel_cb_ = std::make_shared(token, [this, h, &cb = cancel_cb_]() { - std::coroutine_handle<> w; - { - std::lock_guard lock(ch.mutex_); - if (ch.waiter_ != h) { - return; - } - w = ch.waiter_; - ch.waiter_ = nullptr; - } - if (w) { - cancelled = true; - ch.ctx_.post([w, weak = std::weak_ptr(cb)]() { - if (weak.lock()) { - w.resume(); - } - }); - } - }); - return true; -} - -template -void Channel::send(T value) -{ - std::coroutine_handle<> w; - { - std::lock_guard lock(mutex_); - if (closed_) { - return; - } - queue_.push(std::move(value)); - w = waiter_; - waiter_ = nullptr; - } - if (w) { - ctx_.post([w]() { w.resume(); }); - } -} - -template -void Channel::close() -{ - std::coroutine_handle<> w; - { - std::lock_guard lock(mutex_); - closed_ = true; - w = waiter_; - waiter_ = nullptr; - } - if (w) { - ctx_.post([w]() { w.resume(); }); - } -} - } // namespace rclcpp_async diff --git a/rclcpp_async/include/rclcpp_async/event.hpp b/rclcpp_async/include/rclcpp_async/event.hpp index 0e51115..11f1135 100644 --- a/rclcpp_async/include/rclcpp_async/event.hpp +++ b/rclcpp_async/include/rclcpp_async/event.hpp @@ -23,17 +23,14 @@ #include #include "rclcpp_async/cancelled_exception.hpp" +#include "rclcpp_async/executor.hpp" namespace rclcpp_async { -class CoContext; - -using StopCb = std::stop_callback>; - class Event { - CoContext & ctx_; + Executor & ctx_; bool set_ = false; struct Waiter @@ -44,7 +41,7 @@ class Event std::queue waiters_; public: - explicit Event(CoContext & ctx) : ctx_(ctx) {} + explicit Event(Executor & ctx) : ctx_(ctx) {} bool is_set() const { return set_; } @@ -83,4 +80,27 @@ class Event WaitAwaiter wait() { return WaitAwaiter{*this, {}, {}, nullptr}; } }; +inline void Event::WaitAwaiter::await_suspend(std::coroutine_handle<> h) +{ + active = std::make_shared(true); + event.waiters_.push({h, active}); + auto a = active; + register_cancel(cancel_cb_, token, event.ctx_, h, [a]() { return !*a; }, [a]() { *a = false; }); +} + +inline void Event::set() +{ + set_ = true; + std::queue pending; + pending.swap(waiters_); + while (!pending.empty()) { + auto w = pending.front(); + pending.pop(); + if (*w.active) { + *w.active = false; + ctx_.resume(w.handle); + } + } +} + } // namespace rclcpp_async diff --git a/rclcpp_async/include/rclcpp_async/executor.hpp b/rclcpp_async/include/rclcpp_async/executor.hpp new file mode 100644 index 0000000..43ca93a --- /dev/null +++ b/rclcpp_async/include/rclcpp_async/executor.hpp @@ -0,0 +1,57 @@ +// Copyright 2026 Tamaki Nishino +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace rclcpp_async +{ + +using StopCb = std::stop_callback>; + +class Executor +{ +public: + virtual ~Executor() = default; + + // Post a callback to be executed on the executor thread (thread-safe). + virtual void post(std::function fn) = 0; + + // Resume a coroutine directly. Call only from the executor thread. + virtual void resume(std::coroutine_handle<> h) { h.resume(); } +}; + +template +inline void register_cancel( + std::shared_ptr & out, std::stop_token token, Executor & ctx, std::coroutine_handle<> h, + DonePred is_done, CancelAction action) +{ + out = std::make_shared(token, [&ctx, h, is_done, action, &out]() { + ctx.post([&ctx, h, is_done, action, weak = std::weak_ptr(out)]() { + if (is_done()) { + return; + } + action(); + if (weak.lock()) { + ctx.resume(h); + } + }); + }); +} + +} // namespace rclcpp_async diff --git a/rclcpp_async/include/rclcpp_async/goal_stream.hpp b/rclcpp_async/include/rclcpp_async/goal_stream.hpp index 428c09d..117e875 100644 --- a/rclcpp_async/include/rclcpp_async/goal_stream.hpp +++ b/rclcpp_async/include/rclcpp_async/goal_stream.hpp @@ -25,6 +25,7 @@ #include #include "rclcpp_async/cancelled_exception.hpp" +#include "rclcpp_async/executor.hpp" #include "rclcpp_async/result.hpp" namespace rclcpp_async @@ -32,8 +33,6 @@ namespace rclcpp_async class CoContext; -using StopCb = std::stop_callback>; - template struct GoalEvent { diff --git a/rclcpp_async/include/rclcpp_async/mutex.hpp b/rclcpp_async/include/rclcpp_async/mutex.hpp index 1321e0b..d3e8779 100644 --- a/rclcpp_async/include/rclcpp_async/mutex.hpp +++ b/rclcpp_async/include/rclcpp_async/mutex.hpp @@ -23,17 +23,14 @@ #include #include "rclcpp_async/cancelled_exception.hpp" +#include "rclcpp_async/executor.hpp" namespace rclcpp_async { -class CoContext; - -using StopCb = std::stop_callback>; - class Mutex { - CoContext & ctx_; + Executor & ctx_; bool locked_ = false; struct Waiter @@ -44,7 +41,7 @@ class Mutex std::queue waiters_; public: - explicit Mutex(CoContext & ctx) : ctx_(ctx) {} + explicit Mutex(Executor & ctx) : ctx_(ctx) {} bool is_locked() const { return locked_; } @@ -85,4 +82,26 @@ class Mutex void unlock(); }; +inline void Mutex::LockAwaiter::await_suspend(std::coroutine_handle<> h) +{ + active = std::make_shared(true); + mutex.waiters_.push({h, active}); + auto a = active; + register_cancel(cancel_cb_, token, mutex.ctx_, h, [a]() { return !*a; }, [a]() { *a = false; }); +} + +inline void Mutex::unlock() +{ + while (!waiters_.empty()) { + auto w = waiters_.front(); + waiters_.pop(); + if (*w.active) { + *w.active = false; + ctx_.resume(w.handle); + return; + } + } + locked_ = false; +} + } // namespace rclcpp_async diff --git a/rclcpp_async/include/rclcpp_async/rclcpp_async.hpp b/rclcpp_async/include/rclcpp_async/rclcpp_async.hpp index 512708b..8672582 100644 --- a/rclcpp_async/include/rclcpp_async/rclcpp_async.hpp +++ b/rclcpp_async/include/rclcpp_async/rclcpp_async.hpp @@ -18,6 +18,7 @@ #include "rclcpp_async/channel.hpp" #include "rclcpp_async/co_context.hpp" #include "rclcpp_async/event.hpp" +#include "rclcpp_async/executor.hpp" #include "rclcpp_async/goal_context.hpp" #include "rclcpp_async/goal_stream.hpp" #include "rclcpp_async/mutex.hpp" diff --git a/rclcpp_async/include/rclcpp_async/send_request_awaiter.hpp b/rclcpp_async/include/rclcpp_async/send_request_awaiter.hpp index a5b0e66..067b909 100644 --- a/rclcpp_async/include/rclcpp_async/send_request_awaiter.hpp +++ b/rclcpp_async/include/rclcpp_async/send_request_awaiter.hpp @@ -23,14 +23,13 @@ #include #include "rclcpp_async/cancelled_exception.hpp" +#include "rclcpp_async/executor.hpp" namespace rclcpp_async { class CoContext; -using StopCb = std::stop_callback>; - template struct SendRequestAwaiter { diff --git a/rclcpp_async/include/rclcpp_async/sleep_awaiter.hpp b/rclcpp_async/include/rclcpp_async/sleep_awaiter.hpp index 6425f8e..86132cb 100644 --- a/rclcpp_async/include/rclcpp_async/sleep_awaiter.hpp +++ b/rclcpp_async/include/rclcpp_async/sleep_awaiter.hpp @@ -24,14 +24,13 @@ #include #include "rclcpp_async/cancelled_exception.hpp" +#include "rclcpp_async/executor.hpp" namespace rclcpp_async { class CoContext; -using StopCb = std::stop_callback>; - struct SleepAwaiter { CoContext & ctx; diff --git a/rclcpp_async/include/rclcpp_async/timer_stream.hpp b/rclcpp_async/include/rclcpp_async/timer_stream.hpp index 1c8bbe1..cca136d 100644 --- a/rclcpp_async/include/rclcpp_async/timer_stream.hpp +++ b/rclcpp_async/include/rclcpp_async/timer_stream.hpp @@ -23,14 +23,13 @@ #include #include "rclcpp_async/cancelled_exception.hpp" +#include "rclcpp_async/executor.hpp" namespace rclcpp_async { class CoContext; -using StopCb = std::stop_callback>; - class TimerStream { CoContext & ctx_; diff --git a/rclcpp_async/include/rclcpp_async/topic_stream.hpp b/rclcpp_async/include/rclcpp_async/topic_stream.hpp index ba7b2fa..b6ef741 100644 --- a/rclcpp_async/include/rclcpp_async/topic_stream.hpp +++ b/rclcpp_async/include/rclcpp_async/topic_stream.hpp @@ -25,14 +25,13 @@ #include #include "rclcpp_async/cancelled_exception.hpp" +#include "rclcpp_async/executor.hpp" namespace rclcpp_async { class CoContext; -using StopCb = std::stop_callback>; - template class TopicStream { diff --git a/rclcpp_async/include/rclcpp_async/when_all.hpp b/rclcpp_async/include/rclcpp_async/when_all.hpp index 1cc9115..4f67306 100644 --- a/rclcpp_async/include/rclcpp_async/when_all.hpp +++ b/rclcpp_async/include/rclcpp_async/when_all.hpp @@ -29,13 +29,12 @@ #include #include "rclcpp_async/cancelled_exception.hpp" +#include "rclcpp_async/executor.hpp" #include "rclcpp_async/task.hpp" namespace rclcpp_async { -using StopCb = std::stop_callback>; - // Map Task -> std::monostate, Task -> T template using when_all_value_t = std::conditional_t, std::monostate, T>; diff --git a/rclcpp_async/test/test_pure_headers.cpp b/rclcpp_async/test/test_pure_headers.cpp new file mode 100644 index 0000000..31384e7 --- /dev/null +++ b/rclcpp_async/test/test_pure_headers.cpp @@ -0,0 +1,136 @@ +// Copyright 2026 Tamaki Nishino +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This test verifies that pure C++ headers build without rclcpp. +// It is compiled with only the project include directory — +// no rclcpp, rclcpp_action, or tf2_ros on the include path. + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace rclcpp_async; // NOLINT(build/namespaces) + +struct SimpleExecutor : Executor +{ + void post(std::function fn) override { fn(); } +}; + +TEST(PureHeaders, TaskReturnsValue) +{ + auto coro = []() -> Task { co_return 42; }; + auto task = coro(); + task.started_ = true; + task.handle.resume(); + ASSERT_TRUE(task.handle.done()); + EXPECT_EQ(task.result(), 42); +} + +TEST(PureHeaders, EventSetBeforeWait) +{ + SimpleExecutor exec; + Event event(exec); + event.set(); + + bool reached = false; + auto coro = [&]() -> Task { + co_await event.wait(); + reached = true; + }; + auto task = coro(); + task.started_ = true; + task.handle.resume(); + + ASSERT_TRUE(task.handle.done()); + EXPECT_TRUE(reached); +} + +TEST(PureHeaders, EventWaitThenSet) +{ + SimpleExecutor exec; + Event event(exec); + + bool reached = false; + auto coro = [&]() -> Task { + co_await event.wait(); + reached = true; + }; + auto task = coro(); + task.started_ = true; + task.handle.resume(); + + EXPECT_FALSE(reached); + + event.set(); + + ASSERT_TRUE(task.handle.done()); + EXPECT_TRUE(reached); +} + +TEST(PureHeaders, MutexLockUnlock) +{ + SimpleExecutor exec; + Mutex mutex(exec); + + bool reached = false; + auto coro = [&]() -> Task { + co_await mutex.lock(); + reached = true; + mutex.unlock(); + }; + auto task = coro(); + task.started_ = true; + task.handle.resume(); + + ASSERT_TRUE(task.handle.done()); + EXPECT_TRUE(reached); + EXPECT_FALSE(mutex.is_locked()); +} + +TEST(PureHeaders, ChannelSendReceive) +{ + SimpleExecutor exec; + Channel ch(exec); + + ch.send(42); + + std::optional received; + auto coro = [&]() -> Task { received = co_await ch.next(); }; + auto task = coro(); + task.started_ = true; + task.handle.resume(); + + ASSERT_TRUE(task.handle.done()); + ASSERT_TRUE(received.has_value()); + EXPECT_EQ(*received, 42); +} + +TEST(PureHeaders, ResultOkAndTimeout) +{ + auto ok = Result::Ok(42); + EXPECT_TRUE(ok.ok()); + EXPECT_EQ(*ok.value, 42); + + auto timeout = Result::Timeout(); + EXPECT_TRUE(timeout.timeout()); +}