Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions docs/sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ int err = future.wait(); // suspends until set

`reset()` clears the future so it can be reused for the next operation.

**State** -- packed `uint64_t`: `{waiter:62, multipleWait:1, isSet:1}`. The
`waiter` field holds a `Fiber*` (normal wait) or a `MultipleWaitState*`
(multiple wait). The `multipleWait` bit distinguishes the two cases in
`signal()`.
**State** -- packed `uint64_t`: `{waiter:61, multipleWait:1, hasCallback:1, isSet:1}`.
The `waiter` field holds a `Fiber*` (normal wait), a `MultipleWaitState*`
(multiple wait), or a `SubscribeCallback*` (subscribe). The `multipleWait`
and `hasCallback` bits select the dispatch path in `signal()`; only one is
ever set on a given future.

**suspendCallback race** -- the waiter pointer is installed inside
`suspendCallback`, which runs after the fiber is parked. If `signal()` arrives
Expand Down
35 changes: 18 additions & 17 deletions include/silk/fibers/blocking-queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include <silk/fibers/futex.h>
#include <silk/util/bounded-queue.h>

#include <atomic>
#include <cerrno>
#include <cstdint>

Expand Down Expand Up @@ -32,20 +31,22 @@ class FiberBlockingQueue
*/
[[nodiscard]] int enqueue(T value) noexcept
{
while (!stopping.load(std::memory_order_relaxed))
while (!spaceAvailable.stopped())
{
uint64_t token = spaceAvailable.get();
if (queue.enqueue(value))
{
itemAvailable.post();
return 0;
}
if (stopping.load(std::memory_order_relaxed))

int r = spaceAvailable.wait(token + 1);
if (r)
{
break;
return r;
}
spaceAvailable.wait(token + 1);
}

return ECANCELED;
}

Expand All @@ -55,38 +56,40 @@ class FiberBlockingQueue
*/
[[nodiscard]] int dequeue(T * value) noexcept
{
while (!stopping.load(std::memory_order_relaxed))
while (!itemAvailable.stopped())
{
uint64_t token = itemAvailable.get();
if (queue.dequeue(value))
{
spaceAvailable.post();
return 0;
}
if (stopping.load(std::memory_order_relaxed))

int r = itemAvailable.wait(token + 1);
if (r)
{
break;
return r;
}
itemAvailable.wait(token + 1);
}

return ECANCELED;
}

/** Append a value without suspending. Returns false if the queue is full. */
/** Append a value without suspending. Returns false if the queue is full or torn down. */
[[nodiscard]] bool tryEnqueue(T value) noexcept
{
if (!stopping.load(std::memory_order_relaxed) && queue.enqueue(value))
if (!spaceAvailable.stopped() && queue.enqueue(value))
{
itemAvailable.post();
return true;
}
return false;
}

/** Write the head value into @p value without suspending. Returns false if empty. */
/** Write the head value into @p value without suspending. Returns false if empty or torn down. */
[[nodiscard]] bool tryDequeue(T * value) noexcept
{
if (!stopping.load(std::memory_order_relaxed) && queue.dequeue(value))
if (!itemAvailable.stopped() && queue.dequeue(value))
{
spaceAvailable.post();
return true;
Expand All @@ -97,16 +100,14 @@ class FiberBlockingQueue
/** Unblock all current and future enqueue/dequeue callers with ECANCELED. */
void teardown() noexcept
{
stopping.store(true, std::memory_order_relaxed);
spaceAvailable.post();
itemAvailable.post();
spaceAvailable.stop();
itemAvailable.stop();
}

private:
BoundedQueue<T> queue;
FiberFutex spaceAvailable;
FiberFutex itemAvailable;
std::atomic<bool> stopping{};
};

} // namespace silk
38 changes: 30 additions & 8 deletions include/silk/fibers/futex.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class Fiber;
* as Linux futex.
*
* post has release semantics; get and wait have acquire semantics.
*
* stop transitions the futex into a stopped state: every current waiter is woken
* and every subsequent wait returns ECANCELED without suspending.
*/
class FiberFutex
{
Expand All @@ -29,29 +32,48 @@ class FiberFutex
return currentState.counter;
}

/** Wait until at least one post fires after this call. */
void wait() noexcept { wait(get() + 1); }
/** Wait until at least one post fires after this call, or stop() is called. */
int wait() noexcept { return wait(get() + 1); }

/**
* Wait until the counter reaches @p token.
* Returns immediately if the counter is already >= @p token.
* Wait until the counter reaches @p token, or stop() is called.
* Returns 0 on normal wakeup, ECANCELED if the futex has been stopped.
* Returns 0 immediately if the counter is already >= @p token (even after stop,
* the satisfied-token case takes precedence so callers do not lose state changes).
* @p token is typically obtained as get() + 1 to wait for the next post.
*/
void wait(uint64_t token) noexcept;
[[nodiscard]] int wait(uint64_t token) noexcept;

/** Increment the counter and wake all waiting fibers. */
/** Increment the counter and wake all waiting fibers. No-op once stopped. */
void post() noexcept;

/**
* Transition into the stopped state and wake all current waiters. After this,
* every wait returns ECANCELED until the object is destroyed. Idempotent.
*/
void stop() noexcept;

/** Returns true if stop has been called. */
bool stopped() const noexcept
{
State currentState;
currentState.raw = state.load(std::memory_order_acquire);
return currentState.stopped;
}

private:
/**
* Packed event state.
* Packed event state. counter occupies the low 62 bits, the high two bits are
* hasWaiters and stopped. Layout is chosen so the (counter, hasWaiters, stopped)
* triple updates atomically with a single 64-bit CAS.
*/
union State
{
struct
{
uint64_t counter : 63;
uint64_t counter : 62;
uint64_t hasWaiters : 1;
uint64_t stopped : 1;
};
uint64_t raw = 0;
};
Expand Down
12 changes: 10 additions & 2 deletions src/fibers/benchmarks/futex-bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ BENCHMARK_F(FiberFutexBench, RoundTrip)(benchmark::State & state)
{
for (uint64_t i = 1; !p->stop->load(std::memory_order_relaxed); ++i)
{
p->request->wait(i);
int r = p->request->wait(i);
if (r)
{
return r;
}
if (p->stop->load(std::memory_order_relaxed))
{
break;
Expand All @@ -53,7 +57,11 @@ BENCHMARK_F(FiberFutexBench, RoundTrip)(benchmark::State & state)
for (auto _ : *p->state)
{
p->request->post();
p->reply->wait(replyToken);
int r = p->reply->wait(replyToken);
if (r)
{
return r;
}
++replyToken;
}
return 0;
Expand Down
66 changes: 56 additions & 10 deletions src/fibers/futex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
#include <silk/util/spinlock.h>

#include <atomic>
#include <cerrno>

namespace silk
{

void FiberFutex::wait(uint64_t token) noexcept
int FiberFutex::wait(uint64_t token) noexcept
{
// Spin for ~500 ns (16 x ~35 ns PAUSE on Skylake) before suspending.
static constexpr uint32_t SPIN_COUNT = 16;
Expand All @@ -18,15 +19,15 @@ void FiberFutex::wait(uint64_t token) noexcept

// Spin briefly before suspending: if the poster is on another CPU and fires
// within ~500 ns, we avoid the full scheduler wakeup path.
// Skip if there are already waiters in the queue.
if (!currentState.hasWaiters)
// Skip if there are already waiters in the queue or we are already stopped.
if (!currentState.hasWaiters && !currentState.stopped)
{
spinWait(
[this, token]
{
State s;
s.raw = state.load(std::memory_order_relaxed);
return s.counter >= token || s.hasWaiters;
return s.counter >= token || s.hasWaiters || s.stopped;
},
SPIN_COUNT);
}
Expand All @@ -36,6 +37,16 @@ void FiberFutex::wait(uint64_t token) noexcept
SuspendCtx ctx{this, token};
FiberScheduler::suspend(reinterpret_cast<FiberScheduler::SuspendCallback *>(suspendCallback), &ctx);
}

// Re-read state for the cancellation decision. The token-satisfied case
// takes precedence: a waiter that got the post it asked for sees a
// successful wakeup even if stop fired afterwards.
currentState.raw = state.load(std::memory_order_acquire);
if (currentState.counter >= token)
{
return 0;
}
return ECANCELED;
}

void FiberFutex::post() noexcept
Expand All @@ -44,6 +55,14 @@ void FiberFutex::post() noexcept
currentState.raw = state.load(std::memory_order_relaxed);
for (;;)
{
if (currentState.stopped)
{
// No waiters can be queued (stop wakes them all and bars further
// suspension); skip the increment to leave the counter stable
// for any concurrent get() call.
return;
}

State newState;
newState.counter = currentState.counter + 1;
newState.hasWaiters = 0;
Expand All @@ -59,13 +78,39 @@ void FiberFutex::post() noexcept
}
}

void FiberFutex::stop() noexcept
{
State currentState;
currentState.raw = state.load(std::memory_order_relaxed);
for (;;)
{
if (currentState.stopped)
{
return;
}

State newState(currentState);
newState.hasWaiters = 0;
newState.stopped = 1;
if (state.compare_exchange_weak(currentState.raw, newState.raw, std::memory_order_release, std::memory_order_relaxed))
{
break;
}
}

if (currentState.hasWaiters)
{
FiberScheduler::releaseWaiters(reinterpret_cast<uint64_t>(this));
}
}

bool FiberFutex::waitHelper(uint64_t token) noexcept
{
State currentState;
currentState.raw = state.load(std::memory_order_acquire);
for (;;)
{
if (currentState.counter >= token)
if (currentState.counter >= token || currentState.stopped)
{
return true;
}
Expand All @@ -91,18 +136,19 @@ void FiberFutex::suspendCallback(Fiber * fiber, SuspendCtx * ctx) noexcept

State currentState;
currentState.raw = event->state.load(std::memory_order_acquire);
if (currentState.counter >= ctx->token)
if (currentState.counter >= ctx->token || currentState.stopped)
{
FiberScheduler::schedule(fiber);
return;
}

FiberScheduler::enqueueWaiter(reinterpret_cast<uint64_t>(event), fiber);

// Re-check after enqueue. If hasWaiters is false, post() already ran its
// releaseWaiters but missed us (we were not yet in the table). We must
// release now to avoid a missed wakeup. If the counter already satisfies
// the token, the fibers we wake will return immediately from waitHelper.
// Re-check after enqueue. If hasWaiters is false, post() or stop() already
// ran their releaseWaiters but missed us (we were not yet in the table).
// We must release now to avoid a missed wakeup. If the counter already
// satisfies the token, or the futex is stopped, the fibers we wake will
// return immediately from waitHelper.
currentState.raw = event->state.load(std::memory_order_acquire);
if (!currentState.hasWaiters)
{
Expand Down
Loading
Loading