diff --git a/cpp/mrc/benchmarks/bench_coroutines.cpp b/cpp/mrc/benchmarks/bench_coroutines.cpp index 82a2aa437..ead732a2f 100644 --- a/cpp/mrc/benchmarks/bench_coroutines.cpp +++ b/cpp/mrc/benchmarks/bench_coroutines.cpp @@ -15,8 +15,10 @@ * limitations under the License. */ +#include "mrc/channel/v2/immediate_channel.hpp" #include "mrc/coroutines/sync_wait.hpp" #include "mrc/coroutines/task.hpp" +#include "mrc/coroutines/when_all.hpp" #include @@ -130,9 +132,55 @@ static void mrc_coro_await_incrementing_awaitable_baseline(benchmark::State& sta coroutines::sync_wait(task()); } +static void mrc_coro_immediate_channel(benchmark::State& state) +{ + channel::v2::ImmediateChannel immediate_channel; + + auto src = [&]() -> coroutines::Task<> { + for (auto _ : state) + { + co_await immediate_channel.async_write(42); + } + immediate_channel.close(); + co_return; + }; + + auto sink = [&]() -> coroutines::Task<> { + while (auto val = co_await immediate_channel.async_read()) {} + co_return; + }; + + coroutines::sync_wait(coroutines::when_all(sink(), src())); +} + +static auto bar(std::size_t i) -> std::size_t +{ + return i += 5; +} + +static void foo(std::size_t i) +{ + benchmark::DoNotOptimize(bar(i)); +} + +static void mrc_coro_immedate_channel_composite_fn_baseline(benchmark::State& state) +{ + auto task = [&]() -> coroutines::Task<> { + for (auto _ : state) + { + foo(42); + } + co_return; + }; + + coroutines::sync_wait(task()); +} + BENCHMARK(mrc_coro_create_single_task_and_sync); BENCHMARK(mrc_coro_create_single_task_and_sync_on_when_all); BENCHMARK(mrc_coro_create_two_tasks_and_sync_on_when_all); BENCHMARK(mrc_coro_await_suspend_never); BENCHMARK(mrc_coro_await_incrementing_awaitable_baseline); BENCHMARK(mrc_coro_await_incrementing_awaitable); +BENCHMARK(mrc_coro_immediate_channel); +BENCHMARK(mrc_coro_immedate_channel_composite_fn_baseline); diff --git a/cpp/mrc/include/mrc/channel/v2/immediate_channel.hpp b/cpp/mrc/include/mrc/channel/v2/immediate_channel.hpp new file mode 100644 index 000000000..c980a680f --- /dev/null +++ b/cpp/mrc/include/mrc/channel/v2/immediate_channel.hpp @@ -0,0 +1,259 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "mrc/channel/status.hpp" +#include "mrc/core/error.hpp" +#include "mrc/core/expected.hpp" + +#include + +#include +#include +#include + +namespace mrc::channel::v2 { + +template +class ImmediateChannel +{ + public: + using mutex_type = std::mutex; + + // mrc: hotpath + struct WriteOperation + { + WriteOperation(ImmediateChannel& parent, T&& data) : m_parent(parent), m_data(std::move(data)) {} + + // writes always suspend + constexpr static auto await_ready() noexcept -> bool + { + return false; + } + + auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> std::coroutine_handle<> + { + auto lock = std::unique_lock{m_parent.m_mutex}; + m_awaiting_coroutine = awaiting_coroutine; + + // if the channel was closed, resume immediate and throw an error in the await_resume method + if (m_parent.m_closed.load(std::memory_order::acquire)) [[unlikely]] + { + m_channel_closed = true; + return awaiting_coroutine; + } + + // if there are no readers to resume, we insert ourself into the lifo queue of writers with data and yield + if (m_parent.m_read_waiters == nullptr) + { + m_next = m_parent.m_write_waiters; + m_parent.m_write_waiters = this; + return std::noop_coroutine(); + } + + // otherwise we prepare the reader for resumption + auto* reader = m_parent.m_read_waiters; + m_parent.m_read_waiters = reader->m_next; + reader->m_data = std::move(m_data); + + // then we insert ourself at the end of the fifo queue of writers without data awaiting to be resumed + if (m_parent.m_write_resumers == nullptr) + { + m_parent.m_write_resumers = this; + } + else + { + // put current writer at the end of the fifo writer resumer list + auto* write_resumer = m_parent.m_write_resumers; + while (write_resumer->m_next != nullptr) + { + write_resumer = write_resumer->m_next; + } + write_resumer->m_next = this; + } + + // resume the reader via symmetric transfer + return reader->m_awaiting_coroutine; + } + + auto await_resume() -> void + { + if (m_channel_closed) [[unlikely]] + { + auto error = Error::create(ErrorCode::ChannelClosed, "write failed on closed channel"); + // LOG(ERROR) << error.value().message(); + throw error.value(); + } + } + + ImmediateChannel& m_parent; + std::coroutine_handle<> m_awaiting_coroutine; + WriteOperation* m_next{nullptr}; + bool m_channel_closed{false}; + T m_data; + std::unique_lock m_lock; + }; + + // mrc: hotpath + struct ReadOperation + { + bool await_ready() + { + m_lock = std::unique_lock(m_parent.m_mutex); + return m_parent.try_read_with_lock(this, m_lock); + } + + auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void + { + DCHECK(m_lock.owns_lock()); + auto lock = std::move(m_lock); + + m_awaiting_coroutine = awaiting_coroutine; + m_next = m_parent.m_read_waiters; + m_parent.m_read_waiters = this; + } + + auto await_resume() noexcept -> mrc::expected + { + if (m_channel_closed) [[unlikely]] + { + return mrc::unexpected(Status::closed); + } + + return {std::move(m_data)}; + } + + ImmediateChannel& m_parent; + std::coroutine_handle<> m_awaiting_coroutine; + ReadOperation* m_next{nullptr}; + T m_data; + bool m_channel_closed{false}; + std::unique_lock m_lock; + }; + + [[nodiscard]] WriteOperation async_write(T&& data) + { + // mrc: hotpath + return WriteOperation{*this, std::move(data)}; + } + + [[nodiscard]] ReadOperation async_read() + { + // mrc: hotpath + return ReadOperation{*this}; + } + + void close() + { + // Only wake up waiters once. + if (m_closed.load(std::memory_order::acquire)) + { + return; + } + + std::unique_lock lock{m_mutex}; + auto first_closer = !m_closed.exchange(true, std::memory_order::release); + + // only the first caller of close should continue + if (first_closer) + { + // the readers flush the writers, then after all writers are finished, + // the readers will see the channel is closed and resume with the closed status + while (m_read_waiters != nullptr) + { + auto* to_resume = m_read_waiters; + m_read_waiters = m_read_waiters->m_next; + lock.unlock(); + to_resume->m_channel_closed = true; + to_resume->m_awaiting_coroutine.resume(); + lock.lock(); + } + } + } + + private: + // mrc: hotpath + bool try_read_with_lock(ReadOperation* read_op, std::unique_lock& lock) + { + // if there are any writers in any state, we will resume them + while (m_write_waiters != nullptr || m_write_resumers) + { + // first process any writer that still holds data + if (m_write_waiters != nullptr) + { + // pop writer off the lifo writers queue + auto resume_in_future = m_write_waiters; + m_write_waiters = m_write_waiters->m_next; + resume_in_future->m_next = nullptr; + + // transfer the data object to this ReadOperation + read_op->m_data = std::move(resume_in_future->m_data); + + // the writer we pulled off the writers queue we push to the end of waiters fifo queue + if (m_write_resumers == nullptr) + { + m_write_resumers = resume_in_future; + } + else + { + auto last = m_write_resumers; + while (last->m_next != nullptr) + { + last = last->m_next; + } + last->m_next = resume_in_future; + } + + lock.unlock(); + return true; + } + + // there were no writers with data, so there must be at least one waiting to be resumed + DCHECK(m_write_resumers != nullptr); + + // pop off the first resumer + auto* to_resume = m_write_resumers; + m_write_resumers = to_resume->m_next; + + // resume the writer + lock.unlock(); + to_resume->m_awaiting_coroutine.resume(); + lock.lock(); + } + + // if there are no readers and the channel is closed, we should resume immediately + if (m_closed.load(std::memory_order::acquire)) [[unlikely]] + { + read_op->m_channel_closed = true; + lock.unlock(); + return true; + } + + // there are no writers present and the channel is still open ==> this reader must suspend + // the await_suspend method is responsible for unlocking + return false; + } + + mutex_type m_mutex; + WriteOperation* m_write_waiters{nullptr}; + WriteOperation* m_write_resumers{nullptr}; + ReadOperation* m_read_waiters{nullptr}; + std::atomic m_closed{false}; +}; + +} // namespace mrc::channel::v2 diff --git a/cpp/mrc/src/internal/expected.hpp b/cpp/mrc/include/mrc/core/error.hpp similarity index 86% rename from cpp/mrc/src/internal/expected.hpp rename to cpp/mrc/include/mrc/core/error.hpp index c23439a09..c6ead312a 100644 --- a/cpp/mrc/src/internal/expected.hpp +++ b/cpp/mrc/include/mrc/core/error.hpp @@ -17,23 +17,19 @@ #pragma once -#include "mrc/core/std23_expected.hpp" // IWYU pragma: export +#include "mrc/core/expected.hpp" // IWYU pragma: export #include "mrc/utils/macros.hpp" #include "mrc/utils/string_utils.hpp" // IWYU pragma: export -namespace mrc::internal { +namespace mrc { enum class ErrorCode { Internal, Fatal, + ChannelClosed, }; -class Error; - -// todo(#219) - update tidy to allow the following typedef -using UnexpectedError = std23::unexpected; // NOLINT - class Error final : public std::exception { Error(ErrorCode type) : m_code(type) {} @@ -42,9 +38,9 @@ class Error final : public std::exception public: template - static UnexpectedError create(ArgsT&&... args) + static auto create(ArgsT&&... args) -> decltype(auto) { - return UnexpectedError(Error(std::forward(args)...)); + return unexpected(Error(std::forward(args)...)); } DEFAULT_MOVEABILITY(Error); @@ -70,7 +66,7 @@ class Error final : public std::exception }; template -using Expected = std23::expected; // NOLINT +using Expected = expected; // NOLINT #define MRC_CHECK(condition) \ if (!(condition)) \ @@ -92,4 +88,4 @@ using Expected = std23::expected; // NOLINT throw expected.error(); \ } -} // namespace mrc::internal +} // namespace mrc diff --git a/cpp/mrc/include/mrc/core/std23_expected.hpp b/cpp/mrc/include/mrc/core/expected.hpp similarity index 99% rename from cpp/mrc/include/mrc/core/std23_expected.hpp rename to cpp/mrc/include/mrc/core/expected.hpp index 20580da29..2c20f58bf 100644 --- a/cpp/mrc/include/mrc/core/std23_expected.hpp +++ b/cpp/mrc/include/mrc/core/expected.hpp @@ -55,7 +55,7 @@ #include #include -namespace std23 { +namespace mrc { // clang-format off // NOLINTBEGIN(*) @@ -1478,4 +1478,4 @@ class expected { // NOLINTEND(*) // clang-format on -} // namespace std23 +} // namespace mrc diff --git a/cpp/mrc/include/mrc/coroutines/ring_buffer.hpp b/cpp/mrc/include/mrc/coroutines/ring_buffer.hpp index 94625ff4b..de4b8b3d9 100644 --- a/cpp/mrc/include/mrc/coroutines/ring_buffer.hpp +++ b/cpp/mrc/include/mrc/coroutines/ring_buffer.hpp @@ -38,7 +38,7 @@ #pragma once -#include "mrc/core/std23_expected.hpp" +#include "mrc/core/expected.hpp" #include "mrc/coroutines/schedule_policy.hpp" #include "mrc/coroutines/thread_local_context.hpp" #include "mrc/coroutines/thread_pool.hpp" @@ -243,13 +243,13 @@ class RingBuffer /** * @return The consumed element or std::nullopt if the read has failed. */ - auto await_resume() -> std23::expected + auto await_resume() -> mrc::expected { ThreadLocalContext::resume_thread_local_context(); if (m_stopped) { - return std23::unexpected(RingBufferOpStatus::Stopped); + return mrc::unexpected(RingBufferOpStatus::Stopped); } return std::move(m_e); diff --git a/cpp/mrc/src/internal/control_plane/client.hpp b/cpp/mrc/src/internal/control_plane/client.hpp index a640fef34..b9a211dbc 100644 --- a/cpp/mrc/src/internal/control_plane/client.hpp +++ b/cpp/mrc/src/internal/control_plane/client.hpp @@ -18,12 +18,12 @@ #pragma once #include "internal/control_plane/client/instance.hpp" // IWYU pragma: keep -#include "internal/expected.hpp" #include "internal/grpc/client_streaming.hpp" #include "internal/grpc/stream_writer.hpp" #include "internal/resources/partition_resources_base.hpp" #include "internal/service.hpp" +#include "mrc/core/error.hpp" #include "mrc/node/source_channel.hpp" #include "mrc/protos/architect.grpc.pb.h" #include "mrc/protos/architect.pb.h" diff --git a/cpp/mrc/src/internal/control_plane/client/connections_manager.cpp b/cpp/mrc/src/internal/control_plane/client/connections_manager.cpp index fb216e970..3dd97a11d 100644 --- a/cpp/mrc/src/internal/control_plane/client/connections_manager.cpp +++ b/cpp/mrc/src/internal/control_plane/client/connections_manager.cpp @@ -19,12 +19,12 @@ #include "internal/control_plane/client.hpp" #include "internal/control_plane/client/instance.hpp" -#include "internal/expected.hpp" #include "internal/runnable/resources.hpp" #include "internal/ucx/resources.hpp" #include "internal/ucx/worker.hpp" #include "internal/utils/contains.hpp" +#include "mrc/core/error.hpp" #include "mrc/core/task_queue.hpp" #include "mrc/protos/architect.pb.h" diff --git a/cpp/mrc/src/internal/control_plane/client/state_manager.cpp b/cpp/mrc/src/internal/control_plane/client/state_manager.cpp index 0b8c0d9e0..5130f8f0b 100644 --- a/cpp/mrc/src/internal/control_plane/client/state_manager.cpp +++ b/cpp/mrc/src/internal/control_plane/client/state_manager.cpp @@ -18,9 +18,9 @@ #include "internal/control_plane/client/state_manager.hpp" #include "internal/control_plane/client.hpp" -#include "internal/expected.hpp" #include "internal/runnable/resources.hpp" +#include "mrc/core/error.hpp" #include "mrc/node/edge_builder.hpp" #include "mrc/node/rx_sink.hpp" #include "mrc/node/source_channel.hpp" diff --git a/cpp/mrc/src/internal/control_plane/client/subscription_service.cpp b/cpp/mrc/src/internal/control_plane/client/subscription_service.cpp index 5e810883e..6defbd462 100644 --- a/cpp/mrc/src/internal/control_plane/client/subscription_service.cpp +++ b/cpp/mrc/src/internal/control_plane/client/subscription_service.cpp @@ -19,10 +19,10 @@ #include "internal/control_plane/client.hpp" #include "internal/control_plane/client/instance.hpp" -#include "internal/expected.hpp" #include "internal/service.hpp" #include "internal/utils/contains.hpp" +#include "mrc/core/error.hpp" #include "mrc/protos/architect.pb.h" #include diff --git a/cpp/mrc/src/internal/control_plane/proto_helpers.hpp b/cpp/mrc/src/internal/control_plane/proto_helpers.hpp index f802c7b15..d0c72d8c8 100644 --- a/cpp/mrc/src/internal/control_plane/proto_helpers.hpp +++ b/cpp/mrc/src/internal/control_plane/proto_helpers.hpp @@ -17,7 +17,7 @@ #pragma once -#include "internal/expected.hpp" +#include "mrc/core/error.hpp" #include diff --git a/cpp/mrc/src/internal/control_plane/server.cpp b/cpp/mrc/src/internal/control_plane/server.cpp index ecc0e3fd9..0cb37058f 100644 --- a/cpp/mrc/src/internal/control_plane/server.cpp +++ b/cpp/mrc/src/internal/control_plane/server.cpp @@ -309,11 +309,11 @@ void Server::do_handle_event(event_t&& event) DVLOG(10) << "event.ok failed; close stream"; drop_stream(event.stream); } - } catch (const std23::bad_expected_access& e) + } catch (const mrc::bad_expected_access& e) { LOG(ERROR) << "bad_expected_access: " << e.error().message(); on_fatal_exception(); - } catch (const UnexpectedError& e) + } catch (const mrc::unexpected& e) { LOG(ERROR) << "unexpected: " << e.value().message(); on_fatal_exception(); diff --git a/cpp/mrc/src/internal/control_plane/server.hpp b/cpp/mrc/src/internal/control_plane/server.hpp index 0b93e1718..263c5dffe 100644 --- a/cpp/mrc/src/internal/control_plane/server.hpp +++ b/cpp/mrc/src/internal/control_plane/server.hpp @@ -18,11 +18,11 @@ #pragma once #include "internal/control_plane/server/connection_manager.hpp" -#include "internal/expected.hpp" #include "internal/grpc/server.hpp" #include "internal/grpc/server_streaming.hpp" #include "internal/service.hpp" +#include "mrc/core/error.hpp" #include "mrc/node/queue.hpp" #include "mrc/protos/architect.grpc.pb.h" diff --git a/cpp/mrc/src/internal/control_plane/server/connection_manager.hpp b/cpp/mrc/src/internal/control_plane/server/connection_manager.hpp index 685843a6e..8add0553a 100644 --- a/cpp/mrc/src/internal/control_plane/server/connection_manager.hpp +++ b/cpp/mrc/src/internal/control_plane/server/connection_manager.hpp @@ -18,9 +18,9 @@ #pragma once #include "internal/control_plane/server/versioned_issuer.hpp" -#include "internal/expected.hpp" #include "internal/grpc/server_streaming.hpp" +#include "mrc/core/error.hpp" #include "mrc/types.hpp" #include diff --git a/cpp/mrc/src/internal/control_plane/server/subscription_manager.hpp b/cpp/mrc/src/internal/control_plane/server/subscription_manager.hpp index 35809eb8f..d1f0c6eae 100644 --- a/cpp/mrc/src/internal/control_plane/server/subscription_manager.hpp +++ b/cpp/mrc/src/internal/control_plane/server/subscription_manager.hpp @@ -19,8 +19,8 @@ #include "internal/control_plane/server/tagged_issuer.hpp" #include "internal/control_plane/server/versioned_issuer.hpp" -#include "internal/expected.hpp" +#include "mrc/core/error.hpp" #include "mrc/types.hpp" #include diff --git a/cpp/mrc/src/tests/test_expected.cpp b/cpp/mrc/src/tests/test_expected.cpp index 366bfccae..663fbb0d0 100644 --- a/cpp/mrc/src/tests/test_expected.cpp +++ b/cpp/mrc/src/tests/test_expected.cpp @@ -15,7 +15,7 @@ * limitations under the License. */ -#include "internal/expected.hpp" +#include "mrc/core/error.hpp" #include #include @@ -26,7 +26,6 @@ #include using namespace mrc; -using namespace mrc::internal; class TestExpected : public ::testing::Test {}; diff --git a/cpp/mrc/tests/CMakeLists.txt b/cpp/mrc/tests/CMakeLists.txt index 366afed8e..a50a767b6 100644 --- a/cpp/mrc/tests/CMakeLists.txt +++ b/cpp/mrc/tests/CMakeLists.txt @@ -15,6 +15,7 @@ # Keep all source files sorted!!! add_executable(test_mrc + channels/test_immediate_channel.cpp coroutines/test_event.cpp coroutines/test_latch.cpp coroutines/test_ring_buffer.cpp diff --git a/cpp/mrc/tests/channels/test_immediate_channel.cpp b/cpp/mrc/tests/channels/test_immediate_channel.cpp new file mode 100644 index 000000000..88bfd7dcf --- /dev/null +++ b/cpp/mrc/tests/channels/test_immediate_channel.cpp @@ -0,0 +1,141 @@ +/** + * SPDX-FileCopyrightText: Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mrc/channel/v2/immediate_channel.hpp" +#include "mrc/coroutines/latch.hpp" +#include "mrc/coroutines/sync_wait.hpp" +#include "mrc/coroutines/task.hpp" +#include "mrc/coroutines/when_all.hpp" + +#include +#include + +using namespace mrc; +using namespace mrc::channel; +using namespace mrc::channel::v2; + +class TestChannelV2 : public ::testing::Test +{ + protected: + void SetUp() override {} + + void TearDown() override {} + + ImmediateChannel m_channel; + + coroutines::Task int_writer(int iterations, coroutines::Latch& latch) + { + for (int i = 0; i < iterations; i++) + { + co_await m_channel.async_write(std::move(i)); + } + latch.count_down(); + co_return; + } + + coroutines::Task<> close_on_latch(coroutines::Latch& latch) + { + co_await latch; + m_channel.close(); + co_return; + } + + coroutines::Task int_reader(int iterations) + { + int i = 0; + while (true) + { + auto data = co_await m_channel.async_read(); + if (!data) + { + break; + } + i++; + } + EXPECT_EQ(i, iterations); + co_return; + } +}; + +TEST_F(TestChannelV2, ChannelClosed) +{ + ImmediateChannel channel; + channel.close(); + + auto test = [&]() -> coroutines::Task { + // write should throw + EXPECT_ANY_THROW(co_await channel.async_write(42)); + + // read should return unexpected + auto data = co_await channel.async_read(); + EXPECT_FALSE(data); + + // task throws + co_await channel.async_write(42); + co_return; + }; + + EXPECT_ANY_THROW(coroutines::sync_wait(test())); +} + +TEST_F(TestChannelV2, SingleWriterSingleReader) +{ + coroutines::Latch latch{1}; + coroutines::sync_wait(coroutines::when_all(close_on_latch(latch), int_writer(3, latch), int_reader(3))); +} + +TEST_F(TestChannelV2, Readerx1_Writer_x1) +{ + coroutines::Latch latch{1}; + coroutines::sync_wait(coroutines::when_all(int_reader(3), int_writer(3, latch), close_on_latch(latch))); +} + +TEST_F(TestChannelV2, Readerx2_Writer_x1) +{ + coroutines::Latch latch{1}; + coroutines::sync_wait( + coroutines::when_all(int_reader(2), int_reader(1), int_writer(3, latch), close_on_latch(latch))); +} + +TEST_F(TestChannelV2, Readerx3_Writer_x1) +{ + coroutines::Latch latch{1}; + coroutines::sync_wait( + coroutines::when_all(close_on_latch(latch), int_reader(1), int_reader(1), int_reader(1), int_writer(3, latch))); +} + +TEST_F(TestChannelV2, Readerx4_Writer_x1) +{ + // reader are a lifo, so the first reader in the task list will not get a data entry + coroutines::Latch latch{1}; + coroutines::sync_wait(coroutines::when_all( + close_on_latch(latch), int_reader(0), int_reader(1), int_reader(1), int_reader(1), int_writer(3, latch))); +} + +TEST_F(TestChannelV2, Readerx3_Writer_x1_Reader_x1) +{ + coroutines::Latch latch{1}; + coroutines::sync_wait(coroutines::when_all( + int_reader(1), int_reader(1), close_on_latch(latch), int_reader(1), int_writer(3, latch), int_reader(0))); +} + +TEST_F(TestChannelV2, Writer_2_Reader_x2) +{ + coroutines::Latch latch{1}; + coroutines::sync_wait(coroutines::when_all( + int_writer(2, latch), int_writer(2, latch), close_on_latch(latch), int_reader(4), int_reader(0))); +}