diff --git a/docs/sync.md b/docs/sync.md index dec477c..157e437 100644 --- a/docs/sync.md +++ b/docs/sync.md @@ -37,10 +37,11 @@ int err = future.wait(); // suspends until set `reset()` clears the future so it can be reused for the next operation. -**State** -- packed `uint64_t`: `{waiter:62, multipleWait:1, isSet:1}`. The -`waiter` field holds a `Fiber*` (normal wait) or a `MultipleWaitState*` -(multiple wait). The `multipleWait` bit distinguishes the two cases in -`signal()`. +**State** -- packed `uint64_t`: `{waiter:61, multipleWait:1, hasCallback:1, isSet:1}`. +The `waiter` field holds a `Fiber*` (normal wait), a `MultipleWaitState*` +(multiple wait), or a `SubscribeCallback*` (subscribe). The `multipleWait` +and `hasCallback` bits select the dispatch path in `signal()`; only one is +ever set on a given future. **suspendCallback race** -- the waiter pointer is installed inside `suspendCallback`, which runs after the fiber is parked. If `signal()` arrives diff --git a/include/silk/fibers/blocking-queue.h b/include/silk/fibers/blocking-queue.h index be92e6e..68ba08f 100644 --- a/include/silk/fibers/blocking-queue.h +++ b/include/silk/fibers/blocking-queue.h @@ -3,7 +3,6 @@ #include #include -#include #include #include @@ -32,7 +31,7 @@ class FiberBlockingQueue */ [[nodiscard]] int enqueue(T value) noexcept { - while (!stopping.load(std::memory_order_relaxed)) + while (!spaceAvailable.stopped()) { uint64_t token = spaceAvailable.get(); if (queue.enqueue(value)) @@ -40,12 +39,14 @@ class FiberBlockingQueue itemAvailable.post(); return 0; } - if (stopping.load(std::memory_order_relaxed)) + + int r = spaceAvailable.wait(token + 1); + if (r) { - break; + return r; } - spaceAvailable.wait(token + 1); } + return ECANCELED; } @@ -55,7 +56,7 @@ class FiberBlockingQueue */ [[nodiscard]] int dequeue(T * value) noexcept { - while (!stopping.load(std::memory_order_relaxed)) + while (!itemAvailable.stopped()) { uint64_t token = itemAvailable.get(); if (queue.dequeue(value)) @@ -63,19 +64,21 @@ class FiberBlockingQueue spaceAvailable.post(); return 0; } - if (stopping.load(std::memory_order_relaxed)) + + int r = itemAvailable.wait(token + 1); + if (r) { - break; + return r; } - itemAvailable.wait(token + 1); } + return ECANCELED; } - /** Append a value without suspending. Returns false if the queue is full. */ + /** Append a value without suspending. Returns false if the queue is full or torn down. */ [[nodiscard]] bool tryEnqueue(T value) noexcept { - if (!stopping.load(std::memory_order_relaxed) && queue.enqueue(value)) + if (!spaceAvailable.stopped() && queue.enqueue(value)) { itemAvailable.post(); return true; @@ -83,10 +86,10 @@ class FiberBlockingQueue return false; } - /** Write the head value into @p value without suspending. Returns false if empty. */ + /** Write the head value into @p value without suspending. Returns false if empty or torn down. */ [[nodiscard]] bool tryDequeue(T * value) noexcept { - if (!stopping.load(std::memory_order_relaxed) && queue.dequeue(value)) + if (!itemAvailable.stopped() && queue.dequeue(value)) { spaceAvailable.post(); return true; @@ -97,16 +100,14 @@ class FiberBlockingQueue /** Unblock all current and future enqueue/dequeue callers with ECANCELED. */ void teardown() noexcept { - stopping.store(true, std::memory_order_relaxed); - spaceAvailable.post(); - itemAvailable.post(); + spaceAvailable.stop(); + itemAvailable.stop(); } private: BoundedQueue queue; FiberFutex spaceAvailable; FiberFutex itemAvailable; - std::atomic stopping{}; }; } // namespace silk diff --git a/include/silk/fibers/futex.h b/include/silk/fibers/futex.h index 71d882f..c492082 100644 --- a/include/silk/fibers/futex.h +++ b/include/silk/fibers/futex.h @@ -17,6 +17,9 @@ class Fiber; * as Linux futex. * * post has release semantics; get and wait have acquire semantics. + * + * stop transitions the futex into a stopped state: every current waiter is woken + * and every subsequent wait returns ECANCELED without suspending. */ class FiberFutex { @@ -29,29 +32,48 @@ class FiberFutex return currentState.counter; } - /** Wait until at least one post fires after this call. */ - void wait() noexcept { wait(get() + 1); } + /** Wait until at least one post fires after this call, or stop() is called. */ + int wait() noexcept { return wait(get() + 1); } /** - * Wait until the counter reaches @p token. - * Returns immediately if the counter is already >= @p token. + * Wait until the counter reaches @p token, or stop() is called. + * Returns 0 on normal wakeup, ECANCELED if the futex has been stopped. + * Returns 0 immediately if the counter is already >= @p token (even after stop, + * the satisfied-token case takes precedence so callers do not lose state changes). * @p token is typically obtained as get() + 1 to wait for the next post. */ - void wait(uint64_t token) noexcept; + [[nodiscard]] int wait(uint64_t token) noexcept; - /** Increment the counter and wake all waiting fibers. */ + /** Increment the counter and wake all waiting fibers. No-op once stopped. */ void post() noexcept; + /** + * Transition into the stopped state and wake all current waiters. After this, + * every wait returns ECANCELED until the object is destroyed. Idempotent. + */ + void stop() noexcept; + + /** Returns true if stop has been called. */ + bool stopped() const noexcept + { + State currentState; + currentState.raw = state.load(std::memory_order_acquire); + return currentState.stopped; + } + private: /** - * Packed event state. + * Packed event state. counter occupies the low 62 bits, the high two bits are + * hasWaiters and stopped. Layout is chosen so the (counter, hasWaiters, stopped) + * triple updates atomically with a single 64-bit CAS. */ union State { struct { - uint64_t counter : 63; + uint64_t counter : 62; uint64_t hasWaiters : 1; + uint64_t stopped : 1; }; uint64_t raw = 0; }; diff --git a/src/fibers/benchmarks/futex-bench.cpp b/src/fibers/benchmarks/futex-bench.cpp index 6522d8c..18cf8fc 100644 --- a/src/fibers/benchmarks/futex-bench.cpp +++ b/src/fibers/benchmarks/futex-bench.cpp @@ -30,7 +30,11 @@ BENCHMARK_F(FiberFutexBench, RoundTrip)(benchmark::State & state) { for (uint64_t i = 1; !p->stop->load(std::memory_order_relaxed); ++i) { - p->request->wait(i); + int r = p->request->wait(i); + if (r) + { + return r; + } if (p->stop->load(std::memory_order_relaxed)) { break; @@ -53,7 +57,11 @@ BENCHMARK_F(FiberFutexBench, RoundTrip)(benchmark::State & state) for (auto _ : *p->state) { p->request->post(); - p->reply->wait(replyToken); + int r = p->reply->wait(replyToken); + if (r) + { + return r; + } ++replyToken; } return 0; diff --git a/src/fibers/futex.cpp b/src/fibers/futex.cpp index b955b43..36712f8 100644 --- a/src/fibers/futex.cpp +++ b/src/fibers/futex.cpp @@ -4,11 +4,12 @@ #include #include +#include namespace silk { -void FiberFutex::wait(uint64_t token) noexcept +int FiberFutex::wait(uint64_t token) noexcept { // Spin for ~500 ns (16 x ~35 ns PAUSE on Skylake) before suspending. static constexpr uint32_t SPIN_COUNT = 16; @@ -18,15 +19,15 @@ void FiberFutex::wait(uint64_t token) noexcept // Spin briefly before suspending: if the poster is on another CPU and fires // within ~500 ns, we avoid the full scheduler wakeup path. - // Skip if there are already waiters in the queue. - if (!currentState.hasWaiters) + // Skip if there are already waiters in the queue or we are already stopped. + if (!currentState.hasWaiters && !currentState.stopped) { spinWait( [this, token] { State s; s.raw = state.load(std::memory_order_relaxed); - return s.counter >= token || s.hasWaiters; + return s.counter >= token || s.hasWaiters || s.stopped; }, SPIN_COUNT); } @@ -36,6 +37,16 @@ void FiberFutex::wait(uint64_t token) noexcept SuspendCtx ctx{this, token}; FiberScheduler::suspend(reinterpret_cast(suspendCallback), &ctx); } + + // Re-read state for the cancellation decision. The token-satisfied case + // takes precedence: a waiter that got the post it asked for sees a + // successful wakeup even if stop fired afterwards. + currentState.raw = state.load(std::memory_order_acquire); + if (currentState.counter >= token) + { + return 0; + } + return ECANCELED; } void FiberFutex::post() noexcept @@ -44,6 +55,14 @@ void FiberFutex::post() noexcept currentState.raw = state.load(std::memory_order_relaxed); for (;;) { + if (currentState.stopped) + { + // No waiters can be queued (stop wakes them all and bars further + // suspension); skip the increment to leave the counter stable + // for any concurrent get() call. + return; + } + State newState; newState.counter = currentState.counter + 1; newState.hasWaiters = 0; @@ -59,13 +78,39 @@ void FiberFutex::post() noexcept } } +void FiberFutex::stop() noexcept +{ + State currentState; + currentState.raw = state.load(std::memory_order_relaxed); + for (;;) + { + if (currentState.stopped) + { + return; + } + + State newState(currentState); + newState.hasWaiters = 0; + newState.stopped = 1; + if (state.compare_exchange_weak(currentState.raw, newState.raw, std::memory_order_release, std::memory_order_relaxed)) + { + break; + } + } + + if (currentState.hasWaiters) + { + FiberScheduler::releaseWaiters(reinterpret_cast(this)); + } +} + bool FiberFutex::waitHelper(uint64_t token) noexcept { State currentState; currentState.raw = state.load(std::memory_order_acquire); for (;;) { - if (currentState.counter >= token) + if (currentState.counter >= token || currentState.stopped) { return true; } @@ -91,7 +136,7 @@ void FiberFutex::suspendCallback(Fiber * fiber, SuspendCtx * ctx) noexcept State currentState; currentState.raw = event->state.load(std::memory_order_acquire); - if (currentState.counter >= ctx->token) + if (currentState.counter >= ctx->token || currentState.stopped) { FiberScheduler::schedule(fiber); return; @@ -99,10 +144,11 @@ void FiberFutex::suspendCallback(Fiber * fiber, SuspendCtx * ctx) noexcept FiberScheduler::enqueueWaiter(reinterpret_cast(event), fiber); - // Re-check after enqueue. If hasWaiters is false, post() already ran its - // releaseWaiters but missed us (we were not yet in the table). We must - // release now to avoid a missed wakeup. If the counter already satisfies - // the token, the fibers we wake will return immediately from waitHelper. + // Re-check after enqueue. If hasWaiters is false, post() or stop() already + // ran their releaseWaiters but missed us (we were not yet in the table). + // We must release now to avoid a missed wakeup. If the counter already + // satisfies the token, or the futex is stopped, the fibers we wake will + // return immediately from waitHelper. currentState.raw = event->state.load(std::memory_order_acquire); if (!currentState.hasWaiters) { diff --git a/src/fibers/tests/futex-test.cpp b/src/fibers/tests/futex-test.cpp index b9242e8..602bac9 100644 --- a/src/fibers/tests/futex-test.cpp +++ b/src/fibers/tests/futex-test.cpp @@ -32,8 +32,8 @@ TEST(FiberFutex, waitNoArg) // between set() and wait(), causing wait() to block on token+1 forever. uint64_t token = p->event->get() + 1; p->waiting->set(0); - p->event->wait(token); - return 0; + int r = p->event->wait(token); + return r; } }; @@ -44,7 +44,7 @@ TEST(FiberFutex, waitNoArg) waiting.wait(); event.post(); - future.wait(); + EXPECT_EQ(future.wait(), 0); } TEST(FiberFutex, waitAlreadyFired) @@ -53,7 +53,7 @@ TEST(FiberFutex, waitAlreadyFired) event.post(); uint64_t token = event.get(); - event.wait(token); + EXPECT_EQ(event.wait(token), 0); } TEST(FiberFutex, waitSuspended) @@ -68,9 +68,9 @@ TEST(FiberFutex, waitSuspended) static int fiberMain(WaiterParams * p) noexcept { p->waiting->set(0); - p->event->wait(p->token); + int r = p->event->wait(p->token); p->done->set(0); - return 0; + return r; } }; @@ -102,9 +102,9 @@ TEST(FiberFutex, multipleWaiters) { uint64_t token = p->event->get() + 1; p->ready->set(0); - p->event->wait(token); + int r = p->event->wait(token); p->done->set(0); - return 0; + return r; } }; @@ -144,7 +144,11 @@ TEST(FiberFutex, multiplePost) { for (uint64_t i = 0; i < N_ITER; ++i) { - p->event->wait(i + 1); + int r = p->event->wait(i + 1); + if (r) + { + return r; + } } p->done->set(0); return 0; @@ -166,6 +170,112 @@ TEST(FiberFutex, multiplePost) future.wait(); } +// stop() makes wait return ECANCELED for currently-suspended waiters. +TEST(FiberFutex, stopWakesSuspendedWaiter) +{ + struct Params + { + FiberFutex * event; + FiberFuture * waiting; + + static int fiberMain(Params * p) noexcept + { + uint64_t token = p->event->get() + 1; + p->waiting->set(0); + return p->event->wait(token); + } + }; + + FiberFutex event; + FiberFuture future, waiting; + int r = FiberScheduler::run(Params::fiberMain, {&event, &waiting}, &future); + ASSERT_FALSE(r); + + waiting.wait(); + event.stop(); + EXPECT_EQ(future.wait(), ECANCELED); + EXPECT_TRUE(event.stopped()); +} + +// stop() takes effect immediately; subsequent wait calls do not suspend. +TEST(FiberFutex, stopBeforeWaitReturnsImmediately) +{ + FiberFutex event; + event.stop(); + + EXPECT_TRUE(event.stopped()); + EXPECT_EQ(event.wait(event.get() + 1), ECANCELED); +} + +// Calling stop twice is a no-op. +TEST(FiberFutex, stopIdempotent) +{ + FiberFutex event; + event.stop(); + event.stop(); + EXPECT_TRUE(event.stopped()); + EXPECT_EQ(event.wait(event.get() + 1), ECANCELED); +} + +// post() after stop() is inert: counter does not advance, wait still returns ECANCELED. +TEST(FiberFutex, postAfterStopIsNoOp) +{ + FiberFutex event; + uint64_t before = event.get(); + event.stop(); + event.post(); + EXPECT_EQ(event.get(), before); + EXPECT_EQ(event.wait(before + 1), ECANCELED); +} + +// A waiter satisfied by post() right before stop() must observe success, not +// ECANCELED -- the token-satisfied case wins so callers do not lose work. +TEST(FiberFutex, satisfiedTokenBeatsStop) +{ + FiberFutex event; + event.post(); + event.stop(); + EXPECT_EQ(event.wait(1), 0); +} + +// stop() wakes every suspended fiber, not just one. +TEST(FiberFutex, stopWakesAllWaiters) +{ + static constexpr int N = 4; + + struct Params + { + FiberFutex * event; + FiberFuture * ready; + + static int fiberMain(Params * p) noexcept + { + uint64_t token = p->event->get() + 1; + p->ready->set(0); + return p->event->wait(token); + } + }; + + FiberFutex event; + FiberFuture futures[N], ready[N]; + for (int i = 0; i < N; ++i) + { + int r = FiberScheduler::run(Params::fiberMain, {&event, &ready[i]}, &futures[i]); + ASSERT_FALSE(r); + } + for (int i = 0; i < N; ++i) + { + ready[i].wait(); + } + + event.stop(); + + for (int i = 0; i < N; ++i) + { + EXPECT_EQ(futures[i].wait(), ECANCELED); + } +} + // Stress: N fibers post concurrently; verify no increment is lost. // The test thread waits for each token in sequence; if any post were // dropped the final wait would hang. @@ -200,7 +310,7 @@ TEST(FiberFutex, concurrentPostStress) // bursts so multiple waits may return immediately in a row. for (int i = 1; i <= N * ITER; ++i) { - event.wait(uint64_t(i)); + EXPECT_EQ(event.wait(uint64_t(i)), 0); } for (int i = 0; i < N; ++i)