Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 131 additions & 54 deletions include/tmc/detail/awaitable_customizer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,54 +68,17 @@ struct awaitable_customizer_base {
awaitable_customizer_base() noexcept
: continuation{nullptr},
continuation_executor{tmc::detail::this_thread::executor()},
done_count{nullptr}, flags{tmc::detail::this_thread::this_task().prio} {}

// Either returns the awaiting coroutine (continuation) to be resumed
// directly, or submits that awaiting coroutine to the continuation executor
// to be resumed. This should be called exactly once, after the awaitable is
// complete and any results are ready.
//
// The overload taking a Handle destroys the coroutine via `self.destroy()`
// BEFORE performing any atomic operation that could allow a parent task to
// resume. This is required when this task is HALO'd into a parent task's
// allocation: after the atomic (e.g., done_count.fetch_sub), another child
// could resume the parent, which could then complete and destroy its
// allocation (including this task's HALO'd frame). By copying all needed data
// to stack locals and destroying ourselves first, we avoid use-after-free.
TMC_FORCE_INLINE inline std::coroutine_handle<>
resume_continuation(std::coroutine_handle<> self) noexcept {
// Copy all needed fields to stack locals FIRST, before destroying self.
void* lContinuationExecutor = continuation_executor;
void* lContinuation = continuation;
void* lDoneCount = done_count;
size_t lFlags = flags;
// Destroy the coroutine BEFORE the atomic that could allow parent
// destruction. After this point, `this` is INVALID - use only locals.
self.destroy();
return resume_continuation_impl(
lContinuationExecutor, lContinuation, lDoneCount, lFlags
);
}

// The no-argument overload is for non-coroutine awaitables that don't need
// to destroy themselves.
TMC_FORCE_INLINE inline std::coroutine_handle<>
resume_continuation() noexcept {
// There's no risk of use-after-free here, so just pass the fields directly.
return resume_continuation_impl(
continuation_executor, continuation, done_count, flags
);
done_count{nullptr}, flags{tmc::detail::this_thread::this_task().prio} {
}

private:
// Implementation that works only with stack locals - no access to `this`.
TMC_FORCE_INLINE inline static std::coroutine_handle<>
resume_continuation_impl(
void* ContinuationExecutor, void* Continuation, void* DoneCount,
// Return the raw continuation, and if the ContinuationExecutor should be
// indirected (it's dispatched through a group), updates the parameter with
// the final value, ready to be cast to a tmc::ex_any*.
// May return nullptr.
TMC_FORCE_INLINE inline static std::coroutine_handle<> get_continuation(
void*& ContinuationExecutor, void* Continuation, void* DoneCount,
size_t Flags
) noexcept {
tmc::ex_any* continuationExecutor =
static_cast<tmc::ex_any*>(ContinuationExecutor);
std::coroutine_handle<> finalContinuation;
if (DoneCount == nullptr) {
// being awaited alone, or detached
Expand Down Expand Up @@ -150,7 +113,7 @@ struct awaitable_customizer_base {
) == 0;
}
if (shouldResume) {
continuationExecutor =
ContinuationExecutor =
*static_cast<tmc::ex_any**>(ContinuationExecutor);
finalContinuation =
*(static_cast<std::coroutine_handle<>*>(Continuation));
Expand All @@ -159,25 +122,139 @@ struct awaitable_customizer_base {
}
}

// Common submission and continuation logic
// Single return to satisfy NRVO
return finalContinuation;
}

// Gets the awaiting coroutine (continuation) and then posts it to the
// continuation executor - no symmetric transfer. This should be called
// exactly once, after the awaitable is complete and any results are ready.
//
// The overload taking a Handle destroys the coroutine via `self.destroy()`
// BEFORE performing any atomic operation that could allow a parent task to
// resume. This is required when this task is HALO'd into a parent task's
// allocation: after the atomic (e.g., done_count.fetch_sub), another child
// could resume the parent, which could then complete and destroy its
// allocation (including this task's HALO'd frame). By copying all needed data
// to stack locals and destroying ourselves first, we avoid use-after-free.
TMC_FORCE_INLINE inline void
post_continuation(std::coroutine_handle<> self) noexcept {
// Copy all needed fields to stack locals FIRST, before destroying self.
void* lContinuationExecutor = continuation_executor;
void* lContinuation = continuation;
void* lDoneCount = done_count;
size_t lFlags = flags;

// Destroy the coroutine BEFORE the atomic that could allow parent
// destruction. After this point, `this` is INVALID - use only locals.
self.destroy();

post_continuation_impl(
lContinuationExecutor, lContinuation, lDoneCount, lFlags
);
}

// Gets the awaiting coroutine (continuation) and then posts it to the
// continuation executor - no symmetric transfer. This should be called
// exactly once, after the awaitable is complete and any results are ready.
//
// The no-argument overload is for non-coroutine awaitables that don't need
// to destroy themselves.
TMC_FORCE_INLINE inline void post_continuation() noexcept {
// There's no risk of use-after-free here, so just pass the fields directly.
post_continuation_impl(
continuation_executor, continuation, done_count, flags
);
}

// Either returns the awaiting coroutine (continuation) to be resumed
// directly, or submits that awaiting coroutine to the continuation executor
// to be resumed. This should be called exactly once, after the awaitable is
// complete and any results are ready.
//
// The overload taking a Handle destroys the coroutine via `self.destroy()`
// BEFORE performing any atomic operation that could allow a parent task to
// resume. This is required when this task is HALO'd into a parent task's
// allocation: after the atomic (e.g., done_count.fetch_sub), another child
// could resume the parent, which could then complete and destroy its
// allocation (including this task's HALO'd frame). By copying all needed data
// to stack locals and destroying ourselves first, we avoid use-after-free.
TMC_FORCE_INLINE inline std::coroutine_handle<>
resume_continuation(std::coroutine_handle<> self) noexcept {
// Copy all needed fields to stack locals FIRST, before destroying self.
void* lContinuationExecutor = continuation_executor;
void* lContinuation = continuation;
void* lDoneCount = done_count;
size_t lFlags = flags;

// Destroy the coroutine BEFORE the atomic that could allow parent
// destruction. After this point, `this` is INVALID - use only locals.
self.destroy();

return resume_continuation_impl(
lContinuationExecutor, lContinuation, lDoneCount, lFlags
);
}

// Either returns the awaiting coroutine (continuation) to be resumed
// directly, or submits that awaiting coroutine to the continuation executor
// to be resumed. This should be called exactly once, after the awaitable is
// complete and any results are ready.
//
// The no-argument overload is for non-coroutine awaitables that don't need
// to destroy themselves.
TMC_FORCE_INLINE inline std::coroutine_handle<>
resume_continuation() noexcept {
// There's no risk of use-after-free here, so just pass the fields directly.
return resume_continuation_impl(
continuation_executor, continuation, done_count, flags
);
}

private:
// Implementation that works only with stack locals - no access to `this`.
TMC_FORCE_INLINE inline static void post_continuation_impl(
void* ContinuationExecutor, void* Continuation, void* DoneCount,
size_t Flags
) noexcept {
auto finalContinuation =
get_continuation(ContinuationExecutor, Continuation, DoneCount, Flags);

if (finalContinuation == nullptr) {
return;
}

size_t continuationPriority = Flags & task_flags::PRIORITY_MASK;
auto exec = static_cast<tmc::ex_any*>(ContinuationExecutor);
tmc::detail::post_checked(
exec, std::move(finalContinuation), continuationPriority
);
}

// Implementation that works only with stack locals - no access to `this`.
TMC_FORCE_INLINE inline static std::coroutine_handle<>
resume_continuation_impl(
void* ContinuationExecutor, void* Continuation, void* DoneCount,
size_t Flags
) noexcept {
auto finalContinuation =
get_continuation(ContinuationExecutor, Continuation, DoneCount, Flags);

// Determine if we are allowed to symmetric transfer to the continuation
if (finalContinuation == nullptr) {
finalContinuation = std::noop_coroutine();
} else {
size_t continuationPriority = Flags & task_flags::PRIORITY_MASK;
if (continuationExecutor != nullptr &&
!tmc::detail::this_thread::exec_prio_is(
continuationExecutor, continuationPriority
)) {
auto exec = static_cast<tmc::ex_any*>(ContinuationExecutor);
if (exec != nullptr &&
!tmc::detail::this_thread::exec_prio_is(exec, continuationPriority)) {
// post_checked is redundant with the prior check at the moment
tmc::detail::post_checked(
continuationExecutor, std::move(finalContinuation),
continuationPriority
exec, std::move(finalContinuation), continuationPriority
);
finalContinuation = std::noop_coroutine();
}
}

// Single return to satisfy NRVO
return finalContinuation;
}
};
Expand Down
13 changes: 13 additions & 0 deletions include/tmc/detail/waiter_list.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ struct waiter_list_waiter {
try_symmetric_transfer(std::coroutine_handle<> Outer) noexcept;
};

/// 1. Checks ToWake's executor and priority for symmetric transfer eligibility.
/// If eligible, returns ToWake and posts Continuation to its executor.
/// 2. Checks Continuation's executor and priority for symmetric transfer
/// eligibility. If eligible, returns Continuation and posts ToWake to its
/// executor.
/// 3. If neither is eligible, posts them both to their executors and returns
/// std::noop_coroutine().
/// Also checks both for null (counts as ineligible).
[[nodiscard]] TMC_DECL std::coroutine_handle<> try_symmetric_transfer2_waiter(
waiter_list_waiter* ToWake, std::coroutine_handle<> Continuation,
tmc::ex_any* Executor, size_t Priority
) noexcept;

struct waiter_data_base;

struct waiter_list_node {
Expand Down
35 changes: 35 additions & 0 deletions include/tmc/detail/waiter_list.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "tmc/detail/impl.hpp" // IWYU pragma: keep

#include "tmc/current.hpp"
#include "tmc/detail/thread_locals.hpp"
#include "tmc/detail/waiter_list.hpp"

Expand Down Expand Up @@ -83,6 +84,40 @@ reverse_chain(tmc::detail::waiter_list_node* curr) noexcept {
}
} // namespace

std::coroutine_handle<> try_symmetric_transfer2_waiter(
waiter_list_waiter* ToWake, std::coroutine_handle<> Continuation,
tmc::ex_any* Executor, size_t Priority
) noexcept {
if (ToWake != nullptr) {
std::coroutine_handle<> toContinuation = ToWake->continuation;
tmc::ex_any* toExecutor = ToWake->continuation_executor;
size_t toPriority = ToWake->continuation_priority;
// If we can transfer to primary, then do so, and post backup.
if (tmc::detail::this_thread::exec_prio_is(toExecutor, toPriority)) {
if (Continuation != nullptr) {
tmc::detail::post_checked(Executor, std::move(Continuation), Priority);
}
return toContinuation;
}

// Transfer to primary disallowed
tmc::detail::post_checked(
toExecutor, std::move(toContinuation), toPriority
);
}

if (Continuation != nullptr) {
// Try to transfer to backup
if (tmc::detail::this_thread::exec_prio_is(Executor, Priority)) {
return Continuation;
}

// Transfer to backup disallowed
tmc::detail::post_checked(Executor, std::move(Continuation), Priority);
}
return std::noop_coroutine();
}

void waiter_list::add_waiter(tmc::detail::waiter_list_node& w) noexcept {
auto h = input.load(std::memory_order_acquire);
do {
Expand Down
Loading
Loading