diff --git a/src/fibers/fiber.cpp b/src/fibers/fiber.cpp index 361ff0a..581c1ca 100644 --- a/src/fibers/fiber.cpp +++ b/src/fibers/fiber.cpp @@ -57,6 +57,10 @@ static constexpr uint64_t CQE_TAG_CANCEL = 0; static constexpr uint64_t CQE_TAG_TIMEOUT = 1; static constexpr uint64_t CQE_TAG_DOORBELL = 2; +// Hard cap on CPU index (largest known socket: 384 cores). +static constexpr uint16_t MAX_PROCESSOR_NUMBER = UINT16_MAX; + +// Table size must be a power of two. static_assert((WAITER_TABLE_SIZE & (WAITER_TABLE_SIZE - 1)) == 0); // clang-format off @@ -134,7 +138,10 @@ class Fiber // Fiber entry point. Called once when the fiber is first activated. static void fiberContextMain(boost::context::detail::transfer_t transfer) noexcept; - // Cache line 0: scheduling hot path. + // Cache line 0: scheduling + per-suspend hot path. Touched on every + // dispatch and every suspension. runFiber's full read/write set lives on + // this single line, so dispatch never pulls a second cache line on the + // common path. struct alignas(CACHELINE_SIZE) { // Intrusive node for pool free-list and WaitStack membership. @@ -144,20 +151,23 @@ class Fiber // unconditional exchange (changeState) to coordinate scheduler and waiters. std::atomic state; - // True for proxy fibers created by getCurrentFiber() on non-fiber threads. - // These use semaphores rather than context switching in suspend()/schedule(). + // True for proxy fibers created by getCurrentFiber on non-fiber threads. + // These use semaphores rather than context switching in suspend/schedule. bool isProxyFiber = false; // True while the fiber is running on the thread worker pool. bool inThreadMode = false; - // CPU this fiber is assigned to; UINT32_MAX means unassigned. - uint32_t processorNumber = UINT32_MAX; + // CPU this fiber is assigned to. + uint16_t processorNumber = MAX_PROCESSOR_NUMBER; - // mmap'd stack and Boost.Context fcontext handles for cooperative switching. - void * stack = nullptr; - boost::context::detail::fcontext_t fiberContext = nullptr; - boost::context::detail::fcontext_t threadContext = nullptr; + // Processor whose suspendedList this fiber is currently in. + uint16_t suspendedProcessorNumber = MAX_PROCESSOR_NUMBER; + + // Suspend callback set by suspend, invoked by runFiber after the + // context switch back to the scheduler or thread worker. + FiberScheduler::SuspendCallback * suspendCallback = nullptr; + void * suspendContext = nullptr; // Suspended-list membership; linked while the fiber is SUSPENDED. ListEntry suspendedEntry; @@ -166,26 +176,30 @@ class Fiber QueueBase::QueueNode * reservedNode = nullptr; }; - // Cache line 1: suspend/join state. + // Cache line 1: context-switch state + per-fiber-once start/stop state. + // The context fields are touched only inside switchToFiberContext and + // switchToThreadContext; fiberMain/parametersDtor are read exactly twice + // per fiber lifetime (entry and STOPPED); result/waitingFuture fire + // exactly once at STOPPED. Co-locating them avoids a second cacheline + // miss on fiber start/stop. struct alignas(CACHELINE_SIZE) { - // Suspend callback set by suspend(), invoked by runFiber() after the - // context switch back to the scheduler or thread worker. - FiberScheduler::SuspendCallback * suspendCallback = nullptr; - void * suspendContext = nullptr; - - // Processor whose suspendedList this fiber is currently in; - // UINT32_MAX when not linked. - uint32_t suspendedProcessorNumber = UINT32_MAX; + // mmap'd stack and Boost.Context fcontext handles for cooperative switching. + void * stack = nullptr; + boost::context::detail::fcontext_t fiberContext = nullptr; + boost::context::detail::fcontext_t threadContext = nullptr; - // Return value of fiberMain; valid after the fiber reaches STOPPED state. - int result = 0; + // Entry point and optional parameters destructor. parametersDtor is set + // by run for non-trivially-destructible T and called by + // fiberContextMain immediately after fiberMain returns. + FiberMain * fiberMain = nullptr; + ParametersDtor * parametersDtor = nullptr; - // Set by run() to the FiberFuture to notify on completion. + // Set by run to the FiberFuture to notify on completion. FiberFuture * waitingFuture = nullptr; - // Used by proxy fibers for blocking in suspend() and unblocking in schedule(). - sem_t threadSemaphore{}; + // Return value of fiberMain; valid after the fiber reaches STOPPED state. + int result = 0; }; // Embedded node for the shared ready queue. Its memory is always valid @@ -194,11 +208,15 @@ class Fiber // dummy is stored here so it can be reused. QueueBase::QueueNode queueNode; - // Entry point, parameters buffer, and optional parameters destructor. - // parametersDtor is set by run for non-trivially-destructible T and - // called by fiberContextMain immediately after fiberMain returns. - FiberMain * fiberMain = nullptr; - ParametersDtor * parametersDtor = nullptr; + // Proxy-fiber semaphore. Cross-thread sem_post/sem_wait; + // only touched by proxy fibers, so regular fibers leave this line cold + // for the lifetime of the pool slot. Keeping it cacheline-isolated + // prevents the cross-CPU bounce from poisoning a more useful line. + sem_t threadSemaphore{}; + + // Parameters buffer for the fiber's entry point. Sits on its own + // cacheline (FIBER_PARAMETERS_SIZE = 64) and is touched alongside + // fiberMain at first activation. uint8_t parameters[FIBER_PARAMETERS_SIZE]; #if defined(__SANITIZE_ADDRESS__) @@ -259,10 +277,10 @@ bool Fiber::initialize(FiberMain * fiberMain_, ParametersDtor * parametersDtor_, state.store(FiberState::SUSPENDED, std::memory_order_relaxed); inThreadMode = false; - processorNumber = UINT32_MAX; + processorNumber = MAX_PROCESSOR_NUMBER; + suspendedProcessorNumber = MAX_PROCESSOR_NUMBER; suspendCallback = nullptr; suspendContext = nullptr; - suspendedProcessorNumber = UINT32_MAX; result = 0; waitingFuture = waitingFuture_; @@ -520,8 +538,8 @@ struct FiberScheduler::ProcessorState // Neighboring CPUs sorted by estimated steal cost (topology-aware). std::unique_ptr stealCandidates; - // CPU index this processor is pinned to; UINT32_MAX means inactive. - uint32_t number = UINT32_MAX; + // CPU index this processor is pinned to. + uint32_t number = MAX_PROCESSOR_NUMBER; // eventfd used as a wakeup doorbell. wakeThread() writes to it; // a persistent IORING_OP_POLL_ADD_MULTI SQE delivers a CQE to the @@ -884,6 +902,11 @@ void FiberScheduler::initialize() noexcept scheduler = new SchedulerState(); scheduler->processorCount = getProcessorCount(); + ASSERT( + scheduler->processorCount < MAX_PROCESSOR_NUMBER, + "system has {} CPUs; silk caps at {}", + scheduler->processorCount, + MAX_PROCESSOR_NUMBER); scheduler->processorState = std::make_unique(scheduler->processorCount); cpu_set_t processCpuSet; @@ -944,7 +967,7 @@ void FiberScheduler::buildStealCandidates() noexcept for (uint32_t cpu = 0; cpu < scheduler->processorCount; ++cpu) { ProcessorState * processor = &scheduler->processorState[cpu]; - if (processor->number == UINT32_MAX) + if (processor->number == MAX_PROCESSOR_NUMBER) { continue; } @@ -960,7 +983,7 @@ void FiberScheduler::buildStealCandidates() noexcept continue; } uint64_t cost = UINT64_MAX; - if (scheduler->processorState[other].number != UINT32_MAX) + if (scheduler->processorState[other].number != MAX_PROCESSOR_NUMBER) { cost = topologyCostCycles(topologies[cpu], topologies[other]); } @@ -994,7 +1017,7 @@ void FiberScheduler::destroy() noexcept for (uint32_t cpu = 0; cpu < scheduler->processorCount; ++cpu) { ProcessorState * processor = &scheduler->processorState[cpu]; - if (processor->number != UINT32_MAX) + if (processor->number != MAX_PROCESSOR_NUMBER) { processor->wakeThread(); } @@ -1089,7 +1112,7 @@ void FiberScheduler::enqueueReady(Fiber * fiber) noexcept { if (!fiber->inThreadMode) { - if (fiber->processorNumber == UINT32_MAX) + if (fiber->processorNumber == MAX_PROCESSOR_NUMBER) { fiber->processorNumber = getCurrentProcessor(); } @@ -1630,11 +1653,11 @@ void FiberScheduler::runFiber(Fiber * fiber, CpuTimer * timer) noexcept // two lock round-trips keep that line warm for the rest of the scheduling // hot path. Benchmarking showed no net cost; the cache warming effect // outweighs the lock overhead on uncontested paths. - if (fiber->suspendedProcessorNumber != UINT32_MAX) + if (fiber->suspendedProcessorNumber != MAX_PROCESSOR_NUMBER) { ProcessorState * processor = &scheduler->processorState[fiber->suspendedProcessorNumber]; processor->removeSuspended(fiber); - fiber->suspendedProcessorNumber = UINT32_MAX; + fiber->suspendedProcessorNumber = MAX_PROCESSOR_NUMBER; } threadFiber = fiber;