Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 61 additions & 38 deletions src/fibers/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ static constexpr uint64_t CQE_TAG_CANCEL = 0;
static constexpr uint64_t CQE_TAG_TIMEOUT = 1;
static constexpr uint64_t CQE_TAG_DOORBELL = 2;

// Hard cap on CPU index (largest known socket: 384 cores).
static constexpr uint16_t MAX_PROCESSOR_NUMBER = UINT16_MAX;

// Table size must be a power of two.
static_assert((WAITER_TABLE_SIZE & (WAITER_TABLE_SIZE - 1)) == 0);

// clang-format off
Expand Down Expand Up @@ -134,7 +138,10 @@ class Fiber
// Fiber entry point. Called once when the fiber is first activated.
static void fiberContextMain(boost::context::detail::transfer_t transfer) noexcept;

// Cache line 0: scheduling hot path.
// Cache line 0: scheduling + per-suspend hot path. Touched on every
// dispatch and every suspension. runFiber's full read/write set lives on
// this single line, so dispatch never pulls a second cache line on the
// common path.
struct alignas(CACHELINE_SIZE)
{
// Intrusive node for pool free-list and WaitStack membership.
Expand All @@ -144,20 +151,23 @@ class Fiber
// unconditional exchange (changeState) to coordinate scheduler and waiters.
std::atomic<FiberState> state;

// True for proxy fibers created by getCurrentFiber() on non-fiber threads.
// These use semaphores rather than context switching in suspend()/schedule().
// True for proxy fibers created by getCurrentFiber on non-fiber threads.
// These use semaphores rather than context switching in suspend/schedule.
bool isProxyFiber = false;

// True while the fiber is running on the thread worker pool.
bool inThreadMode = false;

// CPU this fiber is assigned to; UINT32_MAX means unassigned.
uint32_t processorNumber = UINT32_MAX;
// CPU this fiber is assigned to.
uint16_t processorNumber = MAX_PROCESSOR_NUMBER;

// mmap'd stack and Boost.Context fcontext handles for cooperative switching.
void * stack = nullptr;
boost::context::detail::fcontext_t fiberContext = nullptr;
boost::context::detail::fcontext_t threadContext = nullptr;
// Processor whose suspendedList this fiber is currently in.
uint16_t suspendedProcessorNumber = MAX_PROCESSOR_NUMBER;

// Suspend callback set by suspend, invoked by runFiber after the
// context switch back to the scheduler or thread worker.
FiberScheduler::SuspendCallback * suspendCallback = nullptr;
void * suspendContext = nullptr;

// Suspended-list membership; linked while the fiber is SUSPENDED.
ListEntry suspendedEntry;
Expand All @@ -166,26 +176,30 @@ class Fiber
QueueBase::QueueNode * reservedNode = nullptr;
};

// Cache line 1: suspend/join state.
// Cache line 1: context-switch state + per-fiber-once start/stop state.
// The context fields are touched only inside switchToFiberContext and
// switchToThreadContext; fiberMain/parametersDtor are read exactly twice
// per fiber lifetime (entry and STOPPED); result/waitingFuture fire
// exactly once at STOPPED. Co-locating them avoids a second cacheline
// miss on fiber start/stop.
struct alignas(CACHELINE_SIZE)
{
// Suspend callback set by suspend(), invoked by runFiber() after the
// context switch back to the scheduler or thread worker.
FiberScheduler::SuspendCallback * suspendCallback = nullptr;
void * suspendContext = nullptr;

// Processor whose suspendedList this fiber is currently in;
// UINT32_MAX when not linked.
uint32_t suspendedProcessorNumber = UINT32_MAX;
// mmap'd stack and Boost.Context fcontext handles for cooperative switching.
void * stack = nullptr;
boost::context::detail::fcontext_t fiberContext = nullptr;
boost::context::detail::fcontext_t threadContext = nullptr;

// Return value of fiberMain; valid after the fiber reaches STOPPED state.
int result = 0;
// Entry point and optional parameters destructor. parametersDtor is set
// by run for non-trivially-destructible T and called by
// fiberContextMain immediately after fiberMain returns.
FiberMain * fiberMain = nullptr;
ParametersDtor * parametersDtor = nullptr;

// Set by run() to the FiberFuture to notify on completion.
// Set by run to the FiberFuture to notify on completion.
FiberFuture * waitingFuture = nullptr;

// Used by proxy fibers for blocking in suspend() and unblocking in schedule().
sem_t threadSemaphore{};
// Return value of fiberMain; valid after the fiber reaches STOPPED state.
int result = 0;
};

// Embedded node for the shared ready queue. Its memory is always valid
Expand All @@ -194,11 +208,15 @@ class Fiber
// dummy is stored here so it can be reused.
QueueBase::QueueNode queueNode;

// Entry point, parameters buffer, and optional parameters destructor.
// parametersDtor is set by run<T> for non-trivially-destructible T and
// called by fiberContextMain immediately after fiberMain returns.
FiberMain * fiberMain = nullptr;
ParametersDtor * parametersDtor = nullptr;
// Proxy-fiber semaphore. Cross-thread sem_post/sem_wait;
// only touched by proxy fibers, so regular fibers leave this line cold
// for the lifetime of the pool slot. Keeping it cacheline-isolated
// prevents the cross-CPU bounce from poisoning a more useful line.
sem_t threadSemaphore{};

// Parameters buffer for the fiber's entry point. Sits on its own
// cacheline (FIBER_PARAMETERS_SIZE = 64) and is touched alongside
// fiberMain at first activation.
uint8_t parameters[FIBER_PARAMETERS_SIZE];

#if defined(__SANITIZE_ADDRESS__)
Expand Down Expand Up @@ -259,10 +277,10 @@ bool Fiber::initialize(FiberMain * fiberMain_, ParametersDtor * parametersDtor_,
state.store(FiberState::SUSPENDED, std::memory_order_relaxed);

inThreadMode = false;
processorNumber = UINT32_MAX;
processorNumber = MAX_PROCESSOR_NUMBER;
suspendedProcessorNumber = MAX_PROCESSOR_NUMBER;
suspendCallback = nullptr;
suspendContext = nullptr;
suspendedProcessorNumber = UINT32_MAX;
result = 0;
waitingFuture = waitingFuture_;

Expand Down Expand Up @@ -520,8 +538,8 @@ struct FiberScheduler::ProcessorState
// Neighboring CPUs sorted by estimated steal cost (topology-aware).
std::unique_ptr<StealCandidate[]> stealCandidates;

// CPU index this processor is pinned to; UINT32_MAX means inactive.
uint32_t number = UINT32_MAX;
// CPU index this processor is pinned to.
uint32_t number = MAX_PROCESSOR_NUMBER;

// eventfd used as a wakeup doorbell. wakeThread() writes to it;
// a persistent IORING_OP_POLL_ADD_MULTI SQE delivers a CQE to the
Expand Down Expand Up @@ -884,6 +902,11 @@ void FiberScheduler::initialize() noexcept
scheduler = new SchedulerState();

scheduler->processorCount = getProcessorCount();
ASSERT(
scheduler->processorCount < MAX_PROCESSOR_NUMBER,
"system has {} CPUs; silk caps at {}",
scheduler->processorCount,
MAX_PROCESSOR_NUMBER);
scheduler->processorState = std::make_unique<ProcessorState[]>(scheduler->processorCount);

cpu_set_t processCpuSet;
Expand Down Expand Up @@ -944,7 +967,7 @@ void FiberScheduler::buildStealCandidates() noexcept
for (uint32_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
{
ProcessorState * processor = &scheduler->processorState[cpu];
if (processor->number == UINT32_MAX)
if (processor->number == MAX_PROCESSOR_NUMBER)
{
continue;
}
Expand All @@ -960,7 +983,7 @@ void FiberScheduler::buildStealCandidates() noexcept
continue;
}
uint64_t cost = UINT64_MAX;
if (scheduler->processorState[other].number != UINT32_MAX)
if (scheduler->processorState[other].number != MAX_PROCESSOR_NUMBER)
{
cost = topologyCostCycles(topologies[cpu], topologies[other]);
}
Expand Down Expand Up @@ -994,7 +1017,7 @@ void FiberScheduler::destroy() noexcept
for (uint32_t cpu = 0; cpu < scheduler->processorCount; ++cpu)
{
ProcessorState * processor = &scheduler->processorState[cpu];
if (processor->number != UINT32_MAX)
if (processor->number != MAX_PROCESSOR_NUMBER)
{
processor->wakeThread();
}
Expand Down Expand Up @@ -1089,7 +1112,7 @@ void FiberScheduler::enqueueReady(Fiber * fiber) noexcept
{
if (!fiber->inThreadMode)
{
if (fiber->processorNumber == UINT32_MAX)
if (fiber->processorNumber == MAX_PROCESSOR_NUMBER)
{
fiber->processorNumber = getCurrentProcessor();
}
Expand Down Expand Up @@ -1630,11 +1653,11 @@ void FiberScheduler::runFiber(Fiber * fiber, CpuTimer * timer) noexcept
// two lock round-trips keep that line warm for the rest of the scheduling
// hot path. Benchmarking showed no net cost; the cache warming effect
// outweighs the lock overhead on uncontested paths.
if (fiber->suspendedProcessorNumber != UINT32_MAX)
if (fiber->suspendedProcessorNumber != MAX_PROCESSOR_NUMBER)
{
ProcessorState * processor = &scheduler->processorState[fiber->suspendedProcessorNumber];
processor->removeSuspended(fiber);
fiber->suspendedProcessorNumber = UINT32_MAX;
fiber->suspendedProcessorNumber = MAX_PROCESSOR_NUMBER;
}

threadFiber = fiber;
Expand Down
Loading