diff --git a/benchmarks/meson.build b/benchmarks/meson.build index 42a2598..fb7d7c6 100644 --- a/benchmarks/meson.build +++ b/benchmarks/meson.build @@ -6,8 +6,6 @@ if meson.get_compiler('cpp').has_argument('-fcoroutines') elif meson.get_compiler('cpp').has_argument('-fcoroutines-ts') cpp_args += [ '-fcoroutines-ts', '-DLIBASYNC_FORCE_USE_EXPERIMENTAL', '-fsized-deallocation' ] deps += subproject('cxxshim').get_variable('clang_coroutine_dep') -else - error('Unsupported compiler') endif deps += subproject('frigg').get_variable('frigg_dep') diff --git a/docs/src/headers/basic/any_receiver.md b/docs/src/headers/basic/any_receiver.md index 004273d..d08adf3 100644 --- a/docs/src/headers/basic/any_receiver.md +++ b/docs/src/headers/basic/any_receiver.md @@ -12,7 +12,6 @@ struct any_receiver { any_receiver(R receiver); // (1) void set_value(T); // (2) - void set_value_noinline(T); // (2) } ``` diff --git a/docs/src/headers/basic/operation.md b/docs/src/headers/basic/operation.md index a347895..d87ad18 100644 --- a/docs/src/headers/basic/operation.md +++ b/docs/src/headers/basic/operation.md @@ -27,7 +27,7 @@ struct write_operation { write_operation(write_operation &&) = delete; write_operation &operator=(write_operation &&) = delete; - bool start_inline() { /* omitted for brevity */ } + void start() { /* omitted for brevity */ } private: uv_write_t req_; diff --git a/docs/src/headers/basic/receives.md b/docs/src/headers/basic/receives.md index 4529127..c0fa4a7 100644 --- a/docs/src/headers/basic/receives.md +++ b/docs/src/headers/basic/receives.md @@ -13,26 +13,17 @@ concept Receives = ...; ### Requirements -A `set_value_inline` and `set_value_noinline` members, which can be called with -a `T&&` value, or no parameters, if `T` is `void`. +A `set_value` member which can be called with a `T&&` value, or no parameters, if `T` is `void`. ## Examples ```cpp struct discard_receiver { template - void set_value_inline(T) { + void set_value(T) { assert(std::is_constant_evaluated()); } - void set_value_inline() { - assert(std::is_constant_evaluated()); - } - - template - void set_value_noinline(T) { - assert(std::is_constant_evaluated()); - } - void set_value_noinline() { + void set_value() { assert(std::is_constant_evaluated()); } }; diff --git a/docs/src/headers/basic/spawn.md b/docs/src/headers/basic/spawn.md index 6bf1553..c4b3140 100644 --- a/docs/src/headers/basic/spawn.md +++ b/docs/src/headers/basic/spawn.md @@ -28,11 +28,7 @@ This function doesn't return any value. ```cpp struct my_receiver { - void set_value_inline(int value) { - std::cout << "Value: " << value << std::endl; - } - - void set_value_noinline(int value) { + void set_value(int value) { std::cout << "Value: " << value << std::endl; } }; diff --git a/docs/src/headers/execution.md b/docs/src/headers/execution.md index 856ea16..f234e74 100644 --- a/docs/src/headers/execution.md +++ b/docs/src/headers/execution.md @@ -10,10 +10,7 @@ This header contains customization point objects (CPOs) for the following methods/functions: - `connect` (as a member or function), - `start` (as a member or function), - - `start_inline` (as a member), - `set_value` (as a member), - - `set_value_inline` (as a member), - - `set_value_noinline` (as a member). In addition to that, it provides a convenience type definition for working with operations: ```cpp @@ -25,5 +22,5 @@ using operation_t = std::invoke_result_t; ```cpp auto op = async::execution::connect(my_sender, my_receiver); -bool finished_inline = async::execution::start_inline(op); +async::execution::start(op); ``` diff --git a/docs/src/io-service.md b/docs/src/io-service.md index 4ed2306..7b37c6f 100644 --- a/docs/src/io-service.md +++ b/docs/src/io-service.md @@ -18,18 +18,17 @@ The following example shows the approximate call graph executing an event-loop-d - `async::run(my_sender, my_io_service)` - `my_operation = async::execution::connect(my_sender, internal_receiver)` - - `async::execution::start_inline(my_operation)` + - `async::execution::start(my_operation)` - `my_operation` starts running... - `co_await some_ev` - `some_ev` operation is started - `my_io_service.add_waiter(this)` - - (`async::execution::start_inline` returns `false`) - `my_io_service.wait()` - IO service waits for event to happen... - `waiters_.front()->complete()` - `some_ev` operation completes - `my_operation` resumes - `co_return 2` - - `async::execution::set_value_noinline(internal_receiver, 2)` + - `async::execution::set_value(internal_receiver, 2)` - `return internal_receiver.value` - (`async::run` returns `2`) diff --git a/docs/src/sender-receiver.md b/docs/src/sender-receiver.md index 6926f08..25a92c1 100644 --- a/docs/src/sender-receiver.md +++ b/docs/src/sender-receiver.md @@ -22,17 +22,8 @@ asynchronous operation. It is immovable, and as such pointers to it will remain valid for as long as the operation exists. When the operation is finished, it notifies the receiver and optionally passes it a result value. -Every operation must either have a `void start()` or a `bool start_inline()` method -that is invoked when the operation is first started. `void start()` is equivalent to -`bool start_inline()` with `return false;` at the end. - -#### Inline and no-inline completion - -Operations that complete synchronously can signal inline completion. If an operation -completes inline, it sets the value using `set_value_inline`, and returns `true` from -`start_inline` (Operations that have a `start` method cannot complete inline). Inline -completion allows for certain optimizations, like avoiding suspending the coroutine -if the operation completed synchronously. +Every operation must either have a `void start()` method +that is invoked when the operation is first started. ### Receiver @@ -40,5 +31,5 @@ A receiver is an object that knows what to do after an operation finishes (e.g. resume the coroutine). It optionally receives a result value from the operation. It is moveable. -Every receiver must have `void set_value_inline(...)` and `void set_value_noinline(...)` -methods that are invoked by the operation when it completes. +Every receiver must have a `void set_value(...)` +method that is invoked by the operation when it completes. diff --git a/docs/src/your-own-sender.md b/docs/src/your-own-sender.md index 30a510d..6a2f043 100644 --- a/docs/src/your-own-sender.md +++ b/docs/src/your-own-sender.md @@ -114,24 +114,20 @@ type in order to support any receiver type. The operation is also made immovable and non-copyable so that pointers to it can safely be taken without worrying that they may become invalid at some point. -Next, we add a `start_inline` method: +Next, we add a `start` method: ```cpp -bool start_inline() { +void start() { auto result = uv_write(&req_, handle_, bufs_, nbufs_, [] (uv_write_t *req, int status) { /* TODO */ }); - if (result < 0) { - async::execution::set_value_inline(r_, result); - return true; // Completed inline - } - - return false; // Did not complete inline + if (result < 0) + async::execution::set_value(r_, result); // Completed inline } ``` -We use `start_inline` here in order to notify the user of any immediate errors -synchronously. We use functions defined inside of `async::execution` to set the +Here, `start` completes synchronously if any errors happen immediately. +We use functions defined inside of `async::execution` to set the value, because they properly detect which method should be called on the receiver. Now, let's implement the actual asynchronous completion: @@ -160,11 +156,11 @@ Finally, we add our `complete` method: ```cpp private: void complete(int status) { - async::execution::set_value_noinline(r_, status); + async::execution::set_value(r_, status); } ``` -On `complete`, we use `async::execution::set_value_noinline` to set the result +On `complete`, we use `async::execution::set_value` to set the result value and notify the receiver that the operation is complete (so that it can for example resume the suspended coroutine, like the `async::sender_awaiter` receiver). @@ -212,24 +208,20 @@ struct write_operation { write_operation(write_operation &&) = delete; write_operation &operator=(write_operation &&) = delete; - bool start_inline() { + void start() { handle_->data = this; auto result = uv_write(&req_, handle_, bufs_, nbufs_, [] (uv_write_t *req, int status) { auto op = static_cast(req->handle->data); op->complete(status); }); - if (result < 0) { - async::execution::set_value_inline(r_, result); - return true; // Completed inline - } - - return false; // Did not complete inline + if (result < 0) + async::execution::set_value(r_, result); } private: void complete(int status) { - async::execution::set_value_noinline(r_, status); + async::execution::set_value(r_, status); } uv_write_t req_; diff --git a/include/async/algorithm.hpp b/include/async/algorithm.hpp index e95b6a6..1e7710a 100644 --- a/include/async/algorithm.hpp +++ b/include/async/algorithm.hpp @@ -40,14 +40,13 @@ struct [[nodiscard]] invocable_operation { invocable_operation &operator= (const invocable_operation &) = delete; - bool start_inline() { + void start() { if constexpr (std::is_same_v, void>) { f_(); - execution::set_value_inline(r_); + return execution::set_value(r_); }else{ - execution::set_value_inline(r_, f_()); + return execution::set_value(r_, f_()); } - return true; } private: @@ -86,22 +85,12 @@ struct value_transform_receiver { : dr_{std::move(dr)}, f_{std::move(f)} { } template - void set_value_inline(X value) { - if constexpr (std::is_same_v, void>) { - f_(std::move(value)); - execution::set_value_inline(dr_); - }else{ - execution::set_value_inline(dr_, f_(std::move(value))); - } - } - - template - void set_value_noinline(X value) { + void set_value(X value) { if constexpr (std::is_same_v, void>) { f_(std::move(value)); - execution::set_value_noinline(dr_); + execution::set_value(dr_); }else{ - execution::set_value_noinline(dr_, f_(std::move(value))); + execution::set_value(dr_, f_(std::move(value))); } } @@ -115,21 +104,12 @@ struct void_transform_receiver { void_transform_receiver(Receiver dr, F f) : dr_{std::move(dr)}, f_{std::move(f)} { } - void set_value_inline() { + void set_value() { if constexpr (std::is_same_v, void>) { f_(); - execution::set_value_inline(dr_); + execution::set_value(dr_); }else{ - execution::set_value_inline(dr_, f_()); - } - } - - void set_value_noinline() { - if constexpr (std::is_same_v, void>) { - f_(); - execution::set_value_noinline(dr_); - }else{ - execution::set_value_noinline(dr_, f_()); + execution::set_value(dr_, f_()); } } @@ -205,17 +185,17 @@ struct [[nodiscard]] ite_operation { else_op_.destruct(); } - bool start_inline() { + void start() { if(cond_()) { then_op_.construct_with([&] { return execution::connect(std::move(then_s_), std::move(dr_)); }); - return execution::start_inline(*then_op_); + return execution::start(*then_op_); }else{ else_op_.construct_with([&] { return execution::connect(std::move(else_s_), std::move(dr_)); }); - return execution::start_inline(*else_op_); + return execution::start(*else_op_); } } @@ -270,13 +250,9 @@ struct [[nodiscard]] repeat_while_operation { repeat_while_operation &operator=(const repeat_while_operation &) = delete; - bool start_inline() { - if(loop_()) { - execution::set_value_inline(dr_); - return true; - } - - return false; + void start() { + if(loop_()) + return execution::set_value(dr_); } private: @@ -298,15 +274,11 @@ struct [[nodiscard]] repeat_while_operation { receiver(repeat_while_operation *self) : self_{self} { } - void set_value_inline() { - // Do nothing. - } - - void set_value_noinline() { + void set_value() { auto s = self_; // box_.destruct() will destruct this. s->box_.destruct(); if(s->loop_()) - execution::set_value_noinline(s->dr_); + execution::set_value(s->dr_); } private: @@ -376,10 +348,7 @@ struct race_and_cancel_operation, std::index_s internal_receiver(race_and_cancel_operation *self) : self_{self} { } - void set_value_inline() { - } - - void set_value_noinline() { + void set_value() { auto n = self_->n_done_.fetch_add(1, std::memory_order_acq_rel); if(!n) { for(unsigned int j = 0; j < sizeof...(Is); ++j) @@ -387,7 +356,7 @@ struct race_and_cancel_operation, std::index_s self_->cs_[j].cancel(); } if(n + 1 == sizeof...(Is)) - execution::set_value_noinline(self_->r_); + execution::set_value(self_->r_); } private: @@ -418,7 +387,7 @@ struct race_and_cancel_operation, std::index_s race_and_cancel_operation(race_and_cancel_sender s, Receiver r) : r_{std::move(r)}, ops_{make_operations_tuple(std::move(s))}, n_sync_{0}, n_done_{0} { } - bool start_inline() { + void start() { unsigned int n_sync = 0; ((execution::start_inline(ops_.template get()) @@ -432,13 +401,9 @@ struct race_and_cancel_operation, std::index_s cs_[j].cancel(); } - if ((n + n_sync) == sizeof...(Is)) { - execution::set_value_inline(r_); - return true; - } + if ((n + n_sync) == sizeof...(Is)) + return execution::set_value(r_); } - - return false; } private: @@ -482,10 +447,10 @@ struct let_operation { } public: - bool start_inline() { + void start() { imm_ = std::move(pred_()); op_.construct_with([&]{ return execution::connect(func_(imm_), std::move(r_)); }); - return execution::start_inline(*op_); + return execution::start(*op_); } private: @@ -538,7 +503,7 @@ struct [[nodiscard]] sequence_operation { sequence_operation &operator=(const sequence_operation &) = delete; - bool start_inline() { + void start() { return do_step<0, true>(); } @@ -548,7 +513,7 @@ struct [[nodiscard]] sequence_operation { template requires (InlinePath) - bool do_step() { + void do_step() { using operation_type = execution::operation_t, receiver>; @@ -559,12 +524,11 @@ struct [[nodiscard]] sequence_operation { if(execution::start_inline(*op)) { if constexpr (Index == sizeof...(Senders) - 1) { - return true; + return; }else{ return do_step(); } } - return false; } // Same as above but since we are not on the InlinePath, we do not care about the return value. @@ -594,16 +558,7 @@ struct [[nodiscard]] sequence_operation { receiver(sequence_operation *self) : self_{self} { } - void set_value_inline() requires (Index < sizeof...(Senders) - 1) { - using operation_type = execution::operation_t, - receiver>; - auto op = std::launder(reinterpret_cast(self_->box_.buffer)); - op->~operation_type(); - - // Do nothing: execution continues in do_step(). - } - - void set_value_noinline() requires (Index < sizeof...(Senders) - 1) { + void set_value() requires (Index < sizeof...(Senders) - 1) { using operation_type = execution::operation_t, receiver>; auto s = self_; // following lines will destruct this. @@ -614,7 +569,7 @@ struct [[nodiscard]] sequence_operation { s->template do_step(); } - void set_value_inline() + void set_value() requires ((Index == sizeof...(Senders) - 1) && (std::is_same_v)) { using operation_type = execution::operation_t, @@ -623,27 +578,11 @@ struct [[nodiscard]] sequence_operation { auto op = std::launder(reinterpret_cast(s->box_.buffer)); op->~operation_type(); - if(InlinePath) { - execution::set_value_inline(s->dr_); - }else{ - execution::set_value_noinline(s->dr_); - } - } - - void set_value_noinline() - requires ((Index == sizeof...(Senders) - 1) - && (std::is_same_v)) { - using operation_type = execution::operation_t, - receiver>; - auto s = self_; // following lines will destruct this. - auto op = std::launder(reinterpret_cast(s->box_.buffer)); - op->~operation_type(); - - execution::set_value_noinline(s->dr_); + execution::set_value(s->dr_); } template - void set_value_inline(T value) + void set_value(T value) requires ((Index == sizeof...(Senders) - 1) && (!std::is_same_v) && (std::is_same_v)) { @@ -653,25 +592,7 @@ struct [[nodiscard]] sequence_operation { auto op = std::launder(reinterpret_cast(s->box_.buffer)); op->~operation_type(); - if(InlinePath) { - execution::set_value_inline(s->dr_, std::move(value)); - }else{ - execution::set_value_noinline(s->dr_, std::move(value)); - } - } - - template - void set_value_noinline(T value) - requires ((Index == sizeof...(Senders) - 1) - && (!std::is_same_v) - && (std::is_same_v)) { - using operation_type = execution::operation_t, - receiver>; - auto s = self_; // following lines will destruct this. - auto op = std::launder(reinterpret_cast(s->box_.buffer)); - op->~operation_type(); - - execution::set_value_noinline(s->dr_, std::move(value)); + execution::set_value(s->dr_, std::move(value)); } private: @@ -729,15 +650,11 @@ struct when_all_operation { receiver(when_all_operation *self) : self_{self} { } - void set_value_inline() { - // Simply do nothing. - } - - void set_value_noinline() { + void set_value() { auto c = self_->ctr_.fetch_sub(1, std::memory_order_acq_rel); assert(c > 0); if(c == 1) - execution::set_value_noinline(self_->dr_); + execution::set_value(self_->dr_); } private: @@ -760,7 +677,7 @@ struct when_all_operation { ops_{make_operations_tuple(std::index_sequence_for{}, std::move(senders))}, ctr_{sizeof...(Senders)} { } - bool start_inline() { + void start() { int n_fast = 0; [&] (std::index_sequence) { ([&] () { @@ -771,11 +688,8 @@ struct when_all_operation { auto c = ctr_.fetch_sub(n_fast, std::memory_order_acq_rel); assert(c > 0); - if(c == n_fast) { - execution::set_value_inline(dr_); - return true; - } - return false; + if(c == n_fast) + return execution::set_value(dr_); } Receiver dr_; // Downstream receiver. @@ -817,8 +731,8 @@ struct lambda_operation { lambda_operation(R receiver, Fn fn, std::tuple args) : fn_{std::move(fn)}, op_{execution::connect(std::apply(fn_, args), std::move(receiver))} { } - bool start_inline() { - return execution::start_inline(op_); + void start() { + return execution::start(op_); } Fn fn_; diff --git a/include/async/basic.hpp b/include/async/basic.hpp index 4313839..899790b 100644 --- a/include/async/basic.hpp +++ b/include/async/basic.hpp @@ -40,12 +40,14 @@ namespace async { template concept Receives = std::movable && (std::same_as ? -requires(T t) { - { t.set_value_inline() } -> std::same_as; +(requires(T t) { + { t.set_value() } -> std::same_as; +} || requires(T t) { { t.set_value_noinline() } -> std::same_as; -} -: requires(T t) { - { t.set_value_inline(std::declval()) } -> std::same_as; +}) +: (requires(T t) { + { t.set_value(std::declval()) } -> std::same_as; +}) || requires(T t) { { t.set_value_noinline(std::declval()) } -> std::same_as; }); @@ -54,19 +56,10 @@ template struct dummy_receiver { template requires (!std::same_as) - void set_value_inline(T) { - assert(std::is_constant_evaluated()); - } - void set_value_inline() { - assert(std::is_constant_evaluated()); - } - - template - requires (!std::same_as) - void set_value_noinline(T) { + void set_value(T) { assert(std::is_constant_evaluated()); } - void set_value_noinline() { + void set_value() { assert(std::is_constant_evaluated()); } }; @@ -127,11 +120,7 @@ template struct [[nodiscard]] sender_awaiter { private: struct receiver { - void set_value_inline(T result) { - p_->result_.emplace(std::move(result)); - } - - void set_value_noinline(T result) { + void set_value(T result) { p_->result_.emplace(std::move(result)); p_->h_.resume(); } @@ -167,11 +156,7 @@ template struct [[nodiscard]] sender_awaiter { private: struct receiver { - void set_value_inline() { - // Do nothing. - } - - void set_value_noinline() { + void set_value() { p_->h_.resume(); } @@ -220,7 +205,7 @@ struct any_receiver { new (stor_) R(receiver); set_value_fptr_ = [] (void *p, T value) { auto *rp = static_cast(p); - execution::set_value_noinline(*rp, std::move(value)); + execution::set_value(*rp, std::move(value)); }; } @@ -228,10 +213,6 @@ struct any_receiver { set_value_fptr_(stor_, std::move(value)); } - void set_value_noinline(T value) { - set_value_fptr_(stor_, std::move(value)); - } - private: alignas(alignof(void *)) char stor_[sizeof(void *)]; void (*set_value_fptr_) (void *, T); @@ -249,7 +230,7 @@ struct any_receiver { new (stor_) R(receiver); set_value_fptr_ = [] (void *p) { auto *rp = static_cast(p); - execution::set_value_noinline(*rp); + execution::set_value(*rp); }; } @@ -257,10 +238,6 @@ struct any_receiver { set_value_fptr_(stor_); } - void set_value_noinline() { - set_value_fptr_(stor_); - } - private: alignas(alignof(void *)) char stor_[sizeof(void *)]; void (*set_value_fptr_) (void *); @@ -378,65 +355,19 @@ struct run_queue { // Top-level execution functions. // ---------------------------------------------------------------------------- +// TODO: It makes more sense to demand a run() method. template concept Waitable = requires (T t) { t.wait(); }; -template -void run_forever(IoService ios) { - while(true) { - ios.wait(); +struct dummy_io_service { + void wait() { + // TODO: dummy_io_service could use a futex to wait. + platform::panic("dummy_io_service does not know how to wait"); } -} - -template -requires std::same_as -void run(Sender s) { - struct receiver { - void set_value_inline() { } - - void set_value_noinline() { } - }; - - auto operation = execution::connect(std::move(s), receiver{}); - if(execution::start_inline(operation)) - return; - - platform::panic("libasync: Operation hasn't completed and we don't know how to wait"); -} - -template -requires (!std::same_as) -typename Sender::value_type run(Sender s) { - struct state { - frg::optional value; - }; - - struct receiver { - receiver(state *stp) - : stp_{stp} { } - - void set_value_inline(typename Sender::value_type value) { - stp_->value.emplace(std::move(value)); - } - - void set_value_noinline(typename Sender::value_type value) { - stp_->value.emplace(std::move(value)); - } - - private: - state *stp_; - }; - - state st; - - auto operation = execution::connect(std::move(s), receiver{&st}); - if (execution::start_inline(operation)) - return std::move(*st.value); - - platform::panic("libasync: Operation hasn't completed and we don't know how to wait"); -} +}; +static_assert(Waitable); template requires std::same_as @@ -449,11 +380,7 @@ void run(Sender s, IoService ios) { receiver(state *stp) : stp_{stp} { } - void set_value_inline() { - stp_->done = true; - } - - void set_value_noinline() { + void set_value() { stp_->done = true; } @@ -464,8 +391,7 @@ void run(Sender s, IoService ios) { state st; auto operation = execution::connect(std::move(s), receiver{&st}); - if(execution::start_inline(operation)) - return; + execution::start(operation); while(!st.done) { ios.wait(); @@ -484,12 +410,7 @@ typename Sender::value_type run(Sender s, IoService ios) { receiver(state *stp) : stp_{stp} { } - void set_value_inline(typename Sender::value_type value) { - stp_->value.emplace(std::move(value)); - stp_->done = true; - } - - void set_value_noinline(typename Sender::value_type value) { + void set_value(typename Sender::value_type value) { stp_->value.emplace(std::move(value)); stp_->done = true; } @@ -501,8 +422,7 @@ typename Sender::value_type run(Sender s, IoService ios) { state st; auto operation = execution::connect(std::move(s), receiver{&st}); - if(execution::start_inline(operation)) - return std::move(*st.value); + execution::start(operation); while(!st.done) { ios.wait(); @@ -511,6 +431,35 @@ typename Sender::value_type run(Sender s, IoService ios) { return std::move(*st.value); } +template +auto run(Sender s) { + return run(std::move(s), dummy_io_service{}); +} + +template R> +struct forever_operation { + void start() { + // Do nothing. + } + + R receiver; +}; + +struct forever_sender { + using value_type = void; + + template R> + forever_operation connect(R &&receiver) { + return {std::move(receiver)}; + } +}; +static_assert(Sender); + +template +void run_forever(IoService ios) { + return run(forever_sender{}, std::move(ios)); +} + // ---------------------------------------------------------------------------- // Detached coroutines. // ---------------------------------------------------------------------------- @@ -551,11 +500,7 @@ namespace detach_details_ { final_receiver(control_block *cb) : cb_{cb} { } - void set_value_inline() { - finalize(cb_); - } - - void set_value_noinline() { + void set_value() { finalize(cb_); } @@ -621,14 +566,8 @@ namespace spawn_details_ { : cb_{cb} { } template - void set_value_inline(Args &&... args) { - cb_->dr.set_value_inline(std::forward(args)...); - finalize(cb_); - } - - template - void set_value_noinline(Args &&... args) { - cb_->dr.set_value_noinline(std::forward(args)...); + void set_value(Args &&... args) { + execution::set_value(cb_->dr, std::forward(args)...); finalize(cb_); } diff --git a/include/async/execution.hpp b/include/async/execution.hpp index 528bd3b..99596f0 100644 --- a/include/async/execution.hpp +++ b/include/async/execution.hpp @@ -8,6 +8,7 @@ namespace async { namespace cpo_types { + template concept member_connect = requires (Sender &&s, Receiver &&r) { std::forward(s).connect(std::forward(r)); @@ -35,11 +36,6 @@ struct connect_cpo { } }; -template -concept inline_startable_operation = requires (Operation &&op) { - { std::forward(op).start_inline() } -> std::convertible_to; -}; - template concept member_start = requires (Operation &&op) { std::forward(op).start(); @@ -50,12 +46,24 @@ concept global_start = requires (Operation &&op) { start(std::forward(op)); }; +struct start_cpo { + template + void operator() (Operation &&op) const { + if constexpr (member_start) { + std::forward(op).start(); + }else if constexpr (global_start) { + start(std::forward(op)); + }else{ + static_assert(frg::dependent_false_t, + "No start() customization defined for operation type"); + } + } +}; + struct start_inline_cpo { template bool operator() (Operation &&op) const { - if constexpr (inline_startable_operation) { - return op.start_inline(); - }else if constexpr (member_start) { + if constexpr (member_start) { std::forward(op).start(); return false; }else if constexpr (global_start) { @@ -71,40 +79,21 @@ struct start_inline_cpo { struct set_value_cpo { template requires requires(Receiver &&r, T &&value) { - std::forward(r).set_value_noinline(std::forward(value)); + std::forward(r).set_value(std::forward(value)); } void operator() (Receiver &&r, T &&value) { - std::forward(r).set_value_noinline(std::forward(value)); + std::forward(r).set_value(std::forward(value)); } template requires requires(Receiver &&r) { - std::forward(r).set_value_noinline(); + std::forward(r).set_value(); } void operator() (Receiver &&r) { - std::forward(r).set_value_noinline(); + std::forward(r).set_value(); } -}; -struct set_value_inline_cpo { - template - requires requires(Receiver &&r, T &&value) { - std::forward(r).set_value_inline(std::forward(value)); - } - void operator() (Receiver &&r, T &&value) { - std::forward(r).set_value_inline(std::forward(value)); - } - - template - requires requires(Receiver &&r) { - std::forward(r).set_value_inline(); - } - void operator() (Receiver &&r) { - std::forward(r).set_value_inline(); - } -}; - -struct set_value_noinline_cpo { + // Obsolete set_value_noinline() functions. template requires requires(Receiver &&r, T &&value) { std::forward(r).set_value_noinline(std::forward(value)); @@ -121,17 +110,17 @@ struct set_value_noinline_cpo { std::forward(r).set_value_noinline(); } }; -} + +} // namespace cpo_types namespace execution { template using operation_t = std::invoke_result_t; inline cpo_types::connect_cpo connect; + inline cpo_types::start_cpo start; inline cpo_types::start_inline_cpo start_inline; inline cpo_types::set_value_cpo set_value; - inline cpo_types::set_value_inline_cpo set_value_inline; - inline cpo_types::set_value_noinline_cpo set_value_noinline; } } // namespace async diff --git a/include/async/mutex.hpp b/include/async/mutex.hpp index 7e22299..ca051e1 100644 --- a/include/async/mutex.hpp +++ b/include/async/mutex.hpp @@ -35,12 +35,10 @@ namespace detail { lock_operation(mutex *self, R receiver) : self_{self}, receiver_{std::move(receiver)} { } - bool start_inline() { + void start() { // Avoid taking mutex_ if possible. - if (self_->try_lock()) { - execution::set_value_inline(receiver_); - return true; - } + if (self_->try_lock()) + return execution::set_value(receiver_); { frg::unique_lock lock(self_->mutex_); @@ -66,24 +64,23 @@ namespace detail { ); if (success) { self_->waiters_.push_back(this); - return false; + return; } } else { // mutex_ protects against concurrent transitions from state::contended. assert(st == state::contended); self_->waiters_.push_back(this); - return false; + return; } } } - execution::set_value_inline(receiver_); - return true; + return execution::set_value(receiver_); } private: void complete() override { - execution::set_value_noinline(receiver_); + execution::set_value(receiver_); } mutex *self_; @@ -233,11 +230,9 @@ namespace detail { exclusive = true; } - bool start_inline() { - if (self_->try_lock()) { - execution::set_value_inline(receiver_); - return true; - } + void start() { + if (self_->try_lock()) + return execution::set_value(receiver_); { frg::unique_lock lock(self_->mutex_); @@ -263,24 +258,23 @@ namespace detail { ); if (success) { self_->waiters_.push_back(this); - return false; + return; } } else { // mutex_ protects against concurrent transitions from contention::contended. assert(st.c == contention::contended); self_->waiters_.push_back(this); - return false; + return; } } } - execution::set_value_inline(receiver_); - return true; + return execution::set_value(receiver_); } private: void complete() override { - execution::set_value_noinline(receiver_); + execution::set_value(receiver_); } shared_mutex *self_; @@ -322,11 +316,9 @@ namespace detail { exclusive = false; } - bool start_inline() { - if (self_->try_lock_shared()) { - execution::set_value_inline(receiver_); - return true; - } + void start() { + if (self_->try_lock_shared()) + return execution::set_value(receiver_); { frg::unique_lock lock(self_->mutex_); @@ -364,7 +356,7 @@ namespace detail { ); if (success) { self_->waiters_.push_back(this); - return false; + return; } } else { // mutex_ protects against concurrent transitions from contention::contended. @@ -374,19 +366,18 @@ namespace detail { state{.c = contention::contended, .shared_cnt = st.shared_cnt}, std::memory_order_relaxed ); - return false; + return; } } } } - execution::set_value_inline(receiver_); - return true; + return execution::set_value(receiver_); } private: void complete() override { - execution::set_value_noinline(receiver_); + execution::set_value(receiver_); } shared_mutex *self_; diff --git a/include/async/oneshot-event.hpp b/include/async/oneshot-event.hpp index 7dba08d..e5fcdb0 100644 --- a/include/async/oneshot-event.hpp +++ b/include/async/oneshot-event.hpp @@ -71,13 +71,11 @@ struct oneshot_primitive { wait_operation(oneshot_primitive *evt, Receiver r) : node{&complete}, evt_{evt}, r_{std::move(r)} { } - bool start_inline() { + void start() { node *current = evt_->state_.load(std::memory_order_acquire); while (true) { - if (current == fired()) { - execution::set_value_inline(r_); - return true; - } + if (current == fired()) + return execution::set_value(r_); next_ = current; auto success = evt_->state_.compare_exchange_weak( current, @@ -86,14 +84,14 @@ struct oneshot_primitive { std::memory_order_acquire ); if (success) - return false; + return; } } private: static void complete(node *base) { auto self = static_cast(base); - execution::set_value_noinline(self->r_); + execution::set_value(self->r_); } oneshot_primitive *evt_; diff --git a/include/async/post-ack.hpp b/include/async/post-ack.hpp index 935c3e8..f375272 100644 --- a/include/async/post-ack.hpp +++ b/include/async/post-ack.hpp @@ -62,7 +62,7 @@ struct post_ack_mechanism { node::object = std::move(object); } - bool start_inline() { + void start() { auto fast_path = [&] { frg::intrusive_list< poll_node, @@ -101,16 +101,13 @@ struct post_ack_mechanism { return false; }(); // Immediately invoked. - if(fast_path) { - execution::set_value_inline(receiver_); - return true; - } - return false; + if(fast_path) + return execution::set_value(receiver_); } private: void complete() override { - execution::set_value_noinline(receiver_); + execution::set_value(receiver_); } post_ack_mechanism *mech_; diff --git a/include/async/promise.hpp b/include/async/promise.hpp index 8689fbb..7a54d32 100644 --- a/include/async/promise.hpp +++ b/include/async/promise.hpp @@ -235,7 +235,7 @@ struct future { get_operation(detail::promise_state *state, cancellation_token ct, Receiver r) : state_{state}, ct_{std::move(ct)}, r_{std::move(r)}, cobs_{this} { } - bool start_inline() { + void start() { bool cancelled = false; { frg::unique_lock lock{state_->mutex_}; @@ -245,21 +245,19 @@ struct future { cancelled = true; } else { state_->queue_.push_back(this); - return false; + return; } } } if constexpr (std::is_same_v) - execution::set_value_inline(r_, !cancelled); + return execution::set_value(r_, !cancelled); else { if (cancelled) - execution::set_value_inline(r_, frg::optional{frg::null_opt}); + return execution::set_value(r_, frg::optional{frg::null_opt}); else - execution::set_value_inline(r_, frg::optional{&state_->get()}); + return execution::set_value(r_, frg::optional{&state_->get()}); } - - return true; } private: @@ -276,21 +274,21 @@ struct future { } if constexpr (std::is_same_v) - execution::set_value_noinline(r_, !cancelled); + execution::set_value(r_, !cancelled); else { if (cancelled) - execution::set_value_noinline(r_, frg::optional{frg::null_opt}); + execution::set_value(r_, frg::optional{frg::null_opt}); else - execution::set_value_noinline(r_, frg::optional{&state_->get()}); + execution::set_value(r_, frg::optional{&state_->get()}); } } void complete() override { if (cobs_.try_reset()) { if constexpr (std::is_same_v) - execution::set_value_noinline(r_, true); + execution::set_value(r_, true); else - execution::set_value_noinline(r_, frg::optional{&state_->get()}); + execution::set_value(r_, frg::optional{&state_->get()}); } } diff --git a/include/async/queue.hpp b/include/async/queue.hpp index 5fd0929..af12624 100644 --- a/include/async/queue.hpp +++ b/include/async/queue.hpp @@ -67,7 +67,7 @@ struct queue { get_operation(queue *q, cancellation_token ct, Receiver r) : q_{q}, ct_{std::move(ct)}, r_{std::move(r)} { } - bool start_inline() { + void start() { bool retire = false; { frg::unique_lock lock{q_->mutex_}; @@ -86,11 +86,8 @@ struct queue { } } - if(retire) { - execution::set_value_inline(r_, std::move(value)); - return true; - } - return false; + if(retire) + return execution::set_value(r_, std::move(value)); } private: @@ -108,11 +105,11 @@ struct queue { } } - execution::set_value_noinline(r_, std::move(value)); + execution::set_value(r_, std::move(value)); } void complete() override { - execution::set_value_noinline(r_, std::move(value)); + execution::set_value(r_, std::move(value)); } queue *q_; diff --git a/include/async/recurring-event.hpp b/include/async/recurring-event.hpp index cc1c8ec..23681df 100644 --- a/include/async/recurring-event.hpp +++ b/include/async/recurring-event.hpp @@ -78,7 +78,7 @@ struct recurring_event { wait_if_operation(recurring_event *evt, C cond, cancellation_token ct, Receiver r) : evt_{evt}, cond_{std::move(cond)}, ct_{std::move(ct)}, r_{std::move(r)}, cobs_{this} { } - bool start_inline() { + void start() { assert(st_ == state::none); bool retire_condfail = false; @@ -101,14 +101,11 @@ struct recurring_event { if(retire_condfail) { st_ = state::retired; - execution::set_value_inline(r_, maybe_awaited::condition_failed); - return true; + return execution::set_value(r_, maybe_awaited::condition_failed); }else if(retire_cancelled) { st_ = state::retired; - execution::set_value_inline(r_, maybe_cancelled::cancelled); - return true; + return execution::set_value(r_, maybe_cancelled::cancelled); } - return false; } private: @@ -127,13 +124,13 @@ struct recurring_event { } st_ = state::retired; - execution::set_value_noinline(r_, maybe_cancelled::cancelled); + execution::set_value(r_, maybe_cancelled::cancelled); } void complete() override { if(cobs_.try_reset()) { st_ = state::retired; - execution::set_value_noinline(r_, maybe_awaited::awaited); + execution::set_value(r_, maybe_awaited::awaited); } } diff --git a/include/async/result.hpp b/include/async/result.hpp index ace0a38..0a32a99 100644 --- a/include/async/result.hpp +++ b/include/async/result.hpp @@ -52,7 +52,7 @@ struct result_continuation { // On past_start -> past_suspend transitions, we call resume(). enum class coroutine_cfp { indeterminate, - past_start, // We are past start_inline(). + past_start, // We are past start(). past_suspend // We are past final_suspend. }; @@ -289,7 +289,7 @@ struct result_operation final : private result_continuation { result_operation &operator= (const result_operation &) = delete; - bool start_inline() { + void start() { auto h = s_.h_; auto promise = &h.promise(); promise->cont_ = this; @@ -298,15 +298,13 @@ struct result_operation final : private result_continuation { if(cfp == coroutine_cfp::past_suspend) { // Synchronize with the thread that complete the coroutine. std::atomic_thread_fence(std::memory_order_acquire); - async::execution::set_value_inline(receiver_, std::move(value())); - return true; + return async::execution::set_value(receiver_, std::move(value())); } - return false; } private: void resume() override { - async::execution::set_value_noinline(receiver_, std::move(value())); + async::execution::set_value(receiver_, std::move(value())); } private: @@ -326,7 +324,7 @@ struct result_operation final : private result_continuation { result_operation &operator= (const result_operation &) = delete; - bool start_inline() { + void start() { auto h = s_.h_; auto promise = &h.promise(); promise->cont_ = this; @@ -335,15 +333,13 @@ struct result_operation final : private result_continuation { if(cfp == coroutine_cfp::past_suspend) { // Synchronize with the thread that complete the coroutine. std::atomic_thread_fence(std::memory_order_acquire); - async::execution::set_value_inline(receiver_); - return true; + return async::execution::set_value(receiver_); } - return false; } private: void resume() override { - async::execution::set_value_noinline(receiver_); + async::execution::set_value(receiver_); } private: diff --git a/include/async/wait-group.hpp b/include/async/wait-group.hpp index 11dae8a..10fe219 100644 --- a/include/async/wait-group.hpp +++ b/include/async/wait-group.hpp @@ -84,7 +84,7 @@ struct wait_group { wait_operation(wait_group *wg, cancellation_token ct, Receiver r) : wg_{wg}, ct_{std::move(ct)}, r_{std::move(r)}, cobs_{this} { } - bool start_inline() { + void start() { bool cancelled = false; { frg::unique_lock lock(wg_->mutex_); @@ -94,13 +94,12 @@ struct wait_group { cancelled = true; }else{ wg_->queue_.push_back(this); - return false; + return; } } } - execution::set_value_inline(r_, !cancelled); - return true; + return execution::set_value(r_, !cancelled); } private: @@ -116,12 +115,12 @@ struct wait_group { } } - execution::set_value_noinline(r_, !cancelled); + execution::set_value(r_, !cancelled); } void complete() override { if(cobs_.try_reset()) - execution::set_value_noinline(r_, true); + execution::set_value(r_, true); } wait_group *wg_; @@ -189,22 +188,11 @@ struct [[nodiscard]] sender_ { struct receiver_ { operation_ &op_; - /* receiver bits */ template requires(sizeof...(Ts) <= 1) - void set_value_inline(Ts &&...ts) { + void set_value(Ts &&...ts) { op_.wg_.done(); - execution::set_value_inline( - op_.originalr_, - std::forward(ts)... - ); - } - - template - requires(sizeof...(Ts) <= 1) - void set_value_noinline(Ts &&...ts) { - op_.wg_.done(); - execution::set_value_noinline( + execution::set_value( op_.originalr_, std::forward(ts)... ); @@ -219,9 +207,9 @@ struct [[nodiscard]] sender_ { , originalop_(execution::connect(std::move(s.originals_), receiver_{*this})) {} - bool start_inline() { + void start() { wg_.add(1); - return execution::start_inline(originalop_); + return execution::start(originalop_); } operation_(const operation_&) = delete; diff --git a/tests/meson.build b/tests/meson.build index 09433f3..42fa799 100644 --- a/tests/meson.build +++ b/tests/meson.build @@ -6,8 +6,6 @@ if meson.get_compiler('cpp').has_argument('-fcoroutines') elif meson.get_compiler('cpp').has_argument('-fcoroutines-ts') cpp_args += [ '-fcoroutines-ts', '-DLIBASYNC_FORCE_USE_EXPERIMENTAL', '-fsized-deallocation' ] deps += subproject('cxxshim').get_variable('clang_coroutine_dep') -else - error('Unsupported compiler') endif deps += subproject('frigg').get_variable('frigg_dep')