From b097e32d09448c8f3dec4b1c161457bbfc78f204 Mon Sep 17 00:00:00 2001 From: Vadim Skipin Date: Thu, 7 May 2026 16:10:49 +0000 Subject: [PATCH 1/2] Add fiber unique identifier --- include/silk/fibers/fiber.h | 38 ++++++++++++-- src/fibers/fiber.cpp | 32 ++++++++++-- src/fibers/tests/fiber-test.cpp | 88 +++++++++++++++++++++++++++++++++ 3 files changed, 150 insertions(+), 8 deletions(-) diff --git a/include/silk/fibers/fiber.h b/include/silk/fibers/fiber.h index fed7b0a..83a30eb 100644 --- a/include/silk/fibers/fiber.h +++ b/include/silk/fibers/fiber.h @@ -61,10 +61,25 @@ class FiberScheduler */ template [[nodiscard]] static int run(int (*fiberMain)(T *) noexcept, T && parameters, FiberFuture * future) noexcept + { + return run(fiberMain, std::forward(parameters), 0, future); + } + + /** + * Start a fiber whose result will be delivered to a FiberFuture and stamp + * the fiber's identity with @p category (high 8 bits of getCurrentFiberId). + * The runtime treats the category as opaque; profilers and tracers use it + * to group samples by fiber role. + */ + template + [[nodiscard]] static int run(int (*fiberMain)(T *) noexcept, T && parameters, uint8_t category, FiberFuture * future) noexcept { static_assert(sizeof(T) <= FIBER_PARAMETERS_SIZE); Fiber * fiber = allocateFiber( - reinterpret_cast(fiberMain), std::is_trivially_destructible_v ? nullptr : destroyParameters, future); + reinterpret_cast(fiberMain), + std::is_trivially_destructible_v ? nullptr : destroyParameters, + category, + future); if (fiber) { std::construct_at(static_cast(getFiberParameters(fiber)), std::forward(parameters)); @@ -84,10 +99,10 @@ class FiberScheduler * @return The fiber's integer result code. */ template - static int run(int (*fiberMain)(T *) noexcept, T && parameters) noexcept + static int run(int (*fiberMain)(T *) noexcept, T && parameters, uint8_t category = 0) noexcept { FiberFuture future; - int r = run(fiberMain, std::forward(parameters), &future); + int r = run(fiberMain, std::forward(parameters), category, &future); return r ? r : future.wait(); } @@ -97,6 +112,21 @@ class FiberScheduler */ static Fiber * getCurrentFiber() noexcept; + /** + * Return the current fiber's identity, or 0 if the calling thread is not + * currently inside a fiber's context (e.g. proxy fiber thread, or scheduler + * thread between fibers). + * + * Identity is packed as [category:8 | cpu:8 | serial:48]: + * category: byte passed to run(); the runtime treats it as opaque. + * cpu: CPU on which the fiber's id was minted at allocation time. + * serial: per-CPU monotonic counter, fresh on every Fiber init/reuse. + * + * Profilers and tracers can use the id as a stack-key prefix to group + * samples by fiber rather than by OS thread. + */ + static uint64_t getCurrentFiberId() noexcept; + /** * Return true if fiber is currently running. */ @@ -370,7 +400,7 @@ class FiberScheduler static void buildStealCandidates() noexcept; static void * getFiberParameters(Fiber * fiber) noexcept; - static Fiber * allocateFiber(FiberMain * fiberMain, ParametersDtor * parametersDtor, FiberFuture * future) noexcept; + static Fiber * allocateFiber(FiberMain * fiberMain, ParametersDtor * parametersDtor, uint8_t category, FiberFuture * future) noexcept; static void freeFiber(Fiber * fiber) noexcept; static void enqueueReady(Fiber * fiber) noexcept; static void yieldSuspendCallback(Fiber * fiber, void * context) noexcept; diff --git a/src/fibers/fiber.cpp b/src/fibers/fiber.cpp index 581c1ca..0bbaa7f 100644 --- a/src/fibers/fiber.cpp +++ b/src/fibers/fiber.cpp @@ -122,7 +122,7 @@ class Fiber Fiber(bool isProxyFiber = false) noexcept; ~Fiber() noexcept; - bool initialize(FiberMain * fiberMain, ParametersDtor * parametersDtor, FiberFuture * waitingFuture) noexcept; + bool initialize(uint64_t fiberId, FiberMain * fiberMain, ParametersDtor * parametersDtor, FiberFuture * waitingFuture) noexcept; void deinitialize() noexcept; void switchToFiberContext() noexcept; @@ -174,6 +174,9 @@ class Fiber // Node ready for the next enqueue to the shared ready queue. QueueBase::QueueNode * reservedNode = nullptr; + + // Fiber identity packed as [category:8 | cpu:8 | serial:48]. + uint64_t fiberId = 0; }; // Cache line 1: context-switch state + per-fiber-once start/stop state. @@ -272,7 +275,7 @@ Fiber::~Fiber() noexcept } } -bool Fiber::initialize(FiberMain * fiberMain_, ParametersDtor * parametersDtor_, FiberFuture * waitingFuture_) noexcept +bool Fiber::initialize(uint64_t fiberId_, FiberMain * fiberMain_, ParametersDtor * parametersDtor_, FiberFuture * waitingFuture_) noexcept { state.store(FiberState::SUSPENDED, std::memory_order_relaxed); @@ -281,6 +284,7 @@ bool Fiber::initialize(FiberMain * fiberMain_, ParametersDtor * parametersDtor_, suspendedProcessorNumber = MAX_PROCESSOR_NUMBER; suspendCallback = nullptr; suspendContext = nullptr; + fiberId = fiberId_; result = 0; waitingFuture = waitingFuture_; @@ -595,6 +599,13 @@ struct FiberScheduler::ProcessorState // Must be a member: the SQPOLL thread reads the pointer from the SQE // asynchronously after enqueueWakeup() returns. __kernel_timespec wakeupTs{}; + + // Per-CPU monotonic counter feeding the serial bits of Fiber::fiberId. + // Initialized to 1 so the first allocated fiber (cpu=0, serial=0) does + // not collide with the all-zero sentinel that getCurrentFiberId returns + // for "no fiber". Incremented relaxed; uniqueness across the (cpu, serial) + // pair holds because each CPU advances only its own counter. + std::atomic fiberSerial{1}; }; }; @@ -1060,6 +1071,11 @@ Fiber * FiberScheduler::getCurrentFiber() noexcept return proxyFiber.get(); } +uint64_t FiberScheduler::getCurrentFiberId() noexcept +{ + return threadFiber ? threadFiber->fiberId : 0; +} + bool FiberScheduler::isFiberRunning(Fiber * fiber) noexcept { return fiber->state.load(std::memory_order_acquire) == FiberState::RUNNING; @@ -1070,12 +1086,20 @@ void * FiberScheduler::getFiberParameters(Fiber * fiber) noexcept return fiber->parameters; } -Fiber * FiberScheduler::allocateFiber(FiberMain * fiberMain, ParametersDtor * parametersDtor, FiberFuture * future) noexcept +Fiber * +FiberScheduler::allocateFiber(FiberMain * fiberMain, ParametersDtor * parametersDtor, uint8_t category, FiberFuture * future) noexcept { Fiber * fiber = scheduler->fiberPool.allocate(); if (fiber) { - if (fiber->initialize(fiberMain, parametersDtor, future)) + uint32_t cpu = getCurrentProcessor(); + ASSERT_DEBUG(cpu < 256, "FiberId reserves only 8 bits for cpu index"); + + ProcessorState * processor = &scheduler->processorState[cpu]; + uint64_t serial = processor->fiberSerial.fetch_add(1, std::memory_order_relaxed); + + uint64_t fiberId = (uint64_t(category) << 56) | (uint64_t(cpu) << 48) | (serial & ((uint64_t(1) << 48) - 1)); + if (fiber->initialize(fiberId, fiberMain, parametersDtor, future)) { Perf::getSimpleCounter(simpleCounters[FIBER_STARTED]).increment(); return fiber; diff --git a/src/fibers/tests/fiber-test.cpp b/src/fibers/tests/fiber-test.cpp index ef7542b..942f1de 100644 --- a/src/fibers/tests/fiber-test.cpp +++ b/src/fibers/tests/fiber-test.cpp @@ -128,6 +128,94 @@ TEST(Fiber, getCurrent) ASSERT_EQ(r, 0); } +// getCurrentFiberId returns 0 outside a fiber context (proxy fiber thread). +TEST(Fiber, getCurrentFiberIdOutsideFiber) +{ + EXPECT_EQ(FiberScheduler::getCurrentFiberId(), 0u); +} + +// getCurrentFiberId returns a non-zero id inside a fiber, encoding cpu+serial. +TEST(Fiber, getCurrentFiberIdInsideFiber) +{ + struct Params + { + static int fiberMain(Params * p) noexcept + { + UNUSED(p); + uint64_t id = FiberScheduler::getCurrentFiberId(); + // serial is allocated monotonically per cpu; cpu byte is the + // allocating CPU. Both serial and cpu must be representable. + uint8_t category = static_cast(id >> 56); + EXPECT_EQ(category, 0u) << "category byte starts at 0"; + return id != 0 ? 0 : 1; + } + }; + + int r = FiberScheduler::run(Params::fiberMain, {}); + ASSERT_EQ(r, 0); +} + +// Two concurrent fibers must observe distinct fiber ids -- the per-CPU serial +// counter increments on every allocation and is never reused. +TEST(Fiber, fiberIdIsUniquePerInvocation) +{ + static constexpr int N = 64; + + struct Params + { + std::atomic * out; + int index; + + static int fiberMain(Params * p) noexcept + { + p->out[p->index].store(FiberScheduler::getCurrentFiberId(), std::memory_order_relaxed); + return 0; + } + }; + + std::atomic ids[N]; + FiberFuture futures[N]; + for (int i = 0; i < N; ++i) + { + ids[i].store(0, std::memory_order_relaxed); + int r = FiberScheduler::run(Params::fiberMain, {ids, i}, &futures[i]); + ASSERT_FALSE(r); + } + for (int i = 0; i < N; ++i) + { + futures[i].wait(); + } + + std::set unique; + for (int i = 0; i < N; ++i) + { + uint64_t id = ids[i].load(std::memory_order_relaxed); + ASSERT_NE(id, 0u); + EXPECT_TRUE(unique.insert(id).second) << "fiber id " << id << " appeared twice"; + } +} + +// run() with explicit category stamps the byte into the high 8 bits of fiberId. +TEST(Fiber, runWithCategoryStampsUpperByte) +{ + struct Params + { + uint64_t * out; + + static int fiberMain(Params * p) noexcept + { + *p->out = FiberScheduler::getCurrentFiberId(); + return 0; + } + }; + + uint64_t id = 0; + int r = FiberScheduler::run(Params::fiberMain, {&id}, uint8_t{0xAB}); + ASSERT_EQ(r, 0); + EXPECT_EQ(id >> 56, 0xABu); + EXPECT_NE(id & ((uint64_t(1) << 56) - 1), 0u) << "cpu+serial must be non-zero"; +} + // Async IO from a non-fiber thread (proxy fiber): enqueueIo must call // submitIo immediately since there is no runFiber to flush the SQE. TEST(Fiber, asyncIoFromThread) From 8f9c0d9250119d7465fd2a578b09b7441df5bdbf Mon Sep 17 00:00:00 2001 From: Vadim Skipin Date: Thu, 7 May 2026 17:26:20 +0000 Subject: [PATCH 2/2] Use USDT probes to collect latency histogram --- bb | 231 +++++++++++++++++++------------ src/fibers/CMakeLists.txt | 2 +- src/fibers/fiber.cpp | 24 +++- src/profiler/main.cpp | 37 +++-- src/profiler/profiler.bpf.c | 184 ++++++++++++++++++++++++- src/profiler/profiler.cpp | 261 +++++++++++++++++++++++++++++++----- src/profiler/profiler.h | 28 +++- 7 files changed, 623 insertions(+), 144 deletions(-) diff --git a/bb b/bb index 16d3e6d..c562b7a 100755 --- a/bb +++ b/bb @@ -403,26 +403,30 @@ def _render_flamegraph(folded_file: str, out_svg: str, title: str) -> None: ) -def _run_flamegraph(preset: str, name: str, client_args: list[str]) -> None: +def _run_profiler( + preset: str, + name: str, + client_args: list[str], + profiler_args: list[str], + out_suffix: str, +) -> str: cmd_build(preset, ["profiler"]) profiler_bin = os.path.join(ROOT, f"build/{preset}/bin/profiler") - folded_stacks = os.path.join(ROOT, f"build/{preset}/{name}.flamegraph.folded") - out_svg = os.path.join(ROOT, f"build/{preset}/{name}.flamegraph.svg") + out_file = os.path.join(ROOT, f"build/{preset}/{name}.{out_suffix}") verbose_flag = ["--verbose"] if log.isEnabledFor(logging.DEBUG) else [] - log.info("profiling %s -> %s", name, out_svg) + log.info("profiling %s -> %s", name, out_file) client = start_process(*client_args, stdout=subprocess.DEVNULL) try: - with open(folded_stacks, "w") as f: + with open(out_file, "w") as f: profiler = start_process( profiler_bin, "--pid", str(client.pid), - "--off-cpu", - "--kernel-stacks", + *profiler_args, *verbose_flag, stdout=f, ) @@ -439,12 +443,35 @@ def _run_flamegraph(preset: str, name: str, client_args: list[str]) -> None: client.kill() raise + return out_file + + +def _run_flamegraph(preset: str, name: str, client_args: list[str]) -> None: + folded_stacks = _run_profiler( + preset, + name, + client_args, + ["--on-cpu", "--off-cpu", "--kernel-stacks"], + "flamegraph.folded", + ) + + out_svg = os.path.join(ROOT, f"build/{preset}/{name}.flamegraph.svg") _render_flamegraph(folded_stacks, out_svg, f"{name} on-CPU + off-CPU") log.info("folded stacks: %s", folded_stacks) log.info("flamegraph: %s", out_svg) +def _run_latency(preset: str, name: str, client_args: list[str]) -> None: + latency_report = _run_profiler(preset, name, client_args, ["--usdt"], "latency.txt") + + # Echo the latency table to the user in addition to leaving it on disk. + with open(latency_report) as f: + sys.stdout.write(f.read()) + + log.info("latency report: %s", latency_report) + + def _print_counters(data: dict[str, Any]) -> None: counters = data.get("counters", {}) if not counters: @@ -516,6 +543,7 @@ class NetPerfParams: connections: list[int] = field(default_factory=lambda: [1000]) delay: str = "0" flamegraph: bool = False + latency: bool = False print_counters: bool = False timeout: int = 180 @@ -565,31 +593,31 @@ def _cmd_net_perf_impl(preset: str, params: NetPerfParams, binary: str) -> None: ) try: - if params.flamegraph: - _run_flamegraph( - preset, - binary, - [ - "taskset", - "-c", - client_cpus, - net_perf, - "client", - "--host", - params.host, - "--port", - str(params.port), - "--connections", - str(params.connections[0]), - "--msg-size", - str(params.msg_size), - "--duration", - str(params.duration), - "--warmup", - str(params.warmup), - *verbose_flag, - ], - ) + if params.flamegraph or params.latency: + client_cmd = [ + "taskset", + "-c", + client_cpus, + net_perf, + "client", + "--host", + params.host, + "--port", + str(params.port), + "--connections", + str(params.connections[0]), + "--msg-size", + str(params.msg_size), + "--duration", + str(params.duration), + "--warmup", + str(params.warmup), + *verbose_flag, + ] + if params.flamegraph: + _run_flamegraph(preset, binary, client_cmd) + else: + _run_latency(preset, binary, client_cmd) else: print(_perf_row(_NP_HEADERS, _NP_WIDTH)) print(_perf_sep(_NP_WIDTH)) @@ -657,6 +685,7 @@ class FilePerfParams: iodepth: list[int] = field(default_factory=lambda: [16]) rw: list[str] = field(default_factory=lambda: ["randread"]) flamegraph: bool = False + latency: bool = False print_counters: bool = False timeout: int = 180 @@ -695,32 +724,32 @@ def cmd_file_perf(preset: str, params: FilePerfParams) -> None: verbose_flag = ["--verbose"] if log.isEnabledFor(logging.DEBUG) else [] try: - if params.flamegraph: + if params.flamegraph or params.latency: jobs, depth, mode = configs[0] - _run_flamegraph( - preset, - "file-perf", - [ - file_perf, - "--numjobs", - str(jobs), - "--iodepth", - str(depth), - "--bs", - params.bs, - "--rw", - mode, - "--size", - params.size, - "--runtime", - str(params.duration), - "--warmup", - str(params.warmup), - "--filename", - params.file, - *verbose_flag, - ], - ) + client_cmd = [ + file_perf, + "--numjobs", + str(jobs), + "--iodepth", + str(depth), + "--bs", + params.bs, + "--rw", + mode, + "--size", + params.size, + "--runtime", + str(params.duration), + "--warmup", + str(params.warmup), + "--filename", + params.file, + *verbose_flag, + ] + if params.flamegraph: + _run_flamegraph(preset, "file-perf", client_cmd) + else: + _run_latency(preset, "file-perf", client_cmd) else: print(_perf_row(_FP_HEADERS, _FP_WIDTHS)) print(_perf_sep(_FP_WIDTHS)) @@ -972,6 +1001,7 @@ class HttpPerfParams: delay: str = "0" threads: bool = False flamegraph: bool = False + latency: bool = False print_counters: bool = False timeout: int = 180 nginx: bool = False @@ -1089,30 +1119,31 @@ def cmd_http_perf(preset: str, params: HttpPerfParams) -> None: verbose_flag = ["--verbose"] if log.isEnabledFor(logging.DEBUG) else [] try: - if params.flamegraph: - _run_flamegraph( - preset, - "http-perf-" + mode, - [ - "taskset", - "-c", - client_cpus, - http_perf, - "client", - "--host", - params.host, - "--port", - str(params.port), - "--connections", - str(params.connections[0]), - "--duration", - str(params.duration), - "--warmup", - str(params.warmup), - *threads_flag, - *verbose_flag, - ], - ) + if params.flamegraph or params.latency: + client_cmd = [ + "taskset", + "-c", + client_cpus, + http_perf, + "client", + "--host", + params.host, + "--port", + str(params.port), + "--connections", + str(params.connections[0]), + "--duration", + str(params.duration), + "--warmup", + str(params.warmup), + *threads_flag, + *verbose_flag, + ] + tag = "http-perf-" + mode + if params.flamegraph: + _run_flamegraph(preset, tag, client_cmd) + else: + _run_latency(preset, tag, client_cmd) else: print(_perf_row(_HP_HEADERS, _HP_WIDTHS)) print(_perf_sep(_HP_WIDTHS)) @@ -1202,6 +1233,7 @@ class S3PerfParams: rw: list[str] = field(default_factory=lambda: ["read"]) threads: bool = False flamegraph: bool = False + latency: bool = False data_dir: str = "/dev/shm/minio-data" print_counters: bool = False timeout: int = 180 @@ -1346,13 +1378,14 @@ def cmd_s3_perf(preset: str, params: S3PerfParams) -> None: executor = "threads" if params.threads else "fibers" - if params.flamegraph: + if params.flamegraph or params.latency: jobs, depth, mode = configs[0] - _run_flamegraph( - preset, - f"s3-perf-{mode}-{executor}", - ["taskset", "-c", client_cpus] + make_cmd(jobs, depth, mode), - ) + client_cmd = ["taskset", "-c", client_cpus] + make_cmd(jobs, depth, mode) + tag = f"s3-perf-{mode}-{executor}" + if params.flamegraph: + _run_flamegraph(preset, tag, client_cmd) + else: + _run_latency(preset, tag, client_cmd) else: print(_perf_row(_S3P_HEADERS, _S3P_WIDTHS)) print(_perf_sep(_S3P_WIDTHS)) @@ -1602,6 +1635,12 @@ def _build_parser() -> argparse.ArgumentParser: action="store_true", help="profile process and generate flamegraph SVG", ) + file_perf_parser.add_argument( + "--latency", + dest="file_latency", + action="store_true", + help="profile process with USDT probes and print fiber latency breakdown", + ) file_perf_parser.add_argument( "--print-counters", dest="file_print_counters", @@ -1731,6 +1770,12 @@ def _build_parser() -> argparse.ArgumentParser: action="store_true", help="profile client and generate flamegraph SVG", ) + parser.add_argument( + "--latency", + dest="net_latency", + action="store_true", + help="profile client with USDT probes and print fiber latency breakdown", + ) parser.add_argument( "--timeout", dest="net_timeout", @@ -1853,6 +1898,12 @@ def _build_parser() -> argparse.ArgumentParser: action="store_true", help="profile client and generate flamegraph SVG", ) + http_perf_parser.add_argument( + "--latency", + dest="http_latency", + action="store_true", + help="profile client with USDT probes and print fiber latency breakdown", + ) http_perf_parser.add_argument( "--print-counters", dest="http_print_counters", @@ -1936,6 +1987,12 @@ def _build_parser() -> argparse.ArgumentParser: action="store_true", help="profile first config and generate flamegraph SVG", ) + s3_perf_parser.add_argument( + "--latency", + dest="s3_latency", + action="store_true", + help="profile first config with USDT probes and print fiber latency breakdown", + ) s3_perf_parser.add_argument( "--data-dir", dest="s3_data_dir", diff --git a/src/fibers/CMakeLists.txt b/src/fibers/CMakeLists.txt index 07edcc8..4a37f47 100644 --- a/src/fibers/CMakeLists.txt +++ b/src/fibers/CMakeLists.txt @@ -2,7 +2,7 @@ file(GLOB FIBERS_SOURCES CONFIGURE_DEPENDS *.cpp) add_library(silk-fibers ${FIBERS_SOURCES}) target_link_libraries(silk-fibers PUBLIC silk-util) -target_link_libraries(silk-fibers PRIVATE Boost::context Liburing::Liburing) +target_link_libraries(silk-fibers PRIVATE Boost::context Liburing::Liburing SystemTap::Sdt) add_subdirectory(tests) add_subdirectory(benchmarks) diff --git a/src/fibers/fiber.cpp b/src/fibers/fiber.cpp index 0bbaa7f..4c465f7 100644 --- a/src/fibers/fiber.cpp +++ b/src/fibers/fiber.cpp @@ -37,6 +37,7 @@ #include #include +#include #include namespace silk @@ -1101,6 +1102,10 @@ FiberScheduler::allocateFiber(FiberMain * fiberMain, ParametersDtor * parameters uint64_t fiberId = (uint64_t(category) << 56) | (uint64_t(cpu) << 48) | (serial & ((uint64_t(1) << 48) - 1)); if (fiber->initialize(fiberId, fiberMain, parametersDtor, future)) { + // USDT lifecycle probe: fiber created. + // Fires once per fiber init, before it has been scheduled to run. + DTRACE_PROBE2(silk, fiber_start, fiber, fiberId); + Perf::getSimpleCounter(simpleCounters[FIBER_STARTED]).increment(); return fiber; } @@ -1113,6 +1118,10 @@ FiberScheduler::allocateFiber(FiberMain * fiberMain, ParametersDtor * parameters void FiberScheduler::freeFiber(Fiber * fiber) noexcept { + // USDT lifecycle probe: fiber destroyed. + // Fires once per fiber as it heads back to the pool. Pair with silk:fiber_start. + DTRACE_PROBE2(silk, fiber_stop, fiber, fiber->fiberId); + Perf::getSimpleCounter(simpleCounters[FIBER_STOPPED], fiber->processorNumber).increment(); fiber->deinitialize(); @@ -1134,6 +1143,10 @@ void FiberScheduler::enqueueReady(Fiber * fiber) noexcept { if (!fiber->isProxyFiber) { + // USDT lifecycle probe: fiber became runnable. + // Useful for tracers that want to measure wakeup latency. + DTRACE_PROBE2(silk, fiber_schedule, fiber, fiber->fiberId); + if (!fiber->inThreadMode) { if (fiber->processorNumber == MAX_PROCESSOR_NUMBER) @@ -1684,10 +1697,15 @@ void FiberScheduler::runFiber(Fiber * fiber, CpuTimer * timer) noexcept fiber->suspendedProcessorNumber = MAX_PROCESSOR_NUMBER; } - threadFiber = fiber; - fiber->changeState(FiberState::READY, FiberState::RUNNING); + // USDT probes for fiber-aware profilers (see src/profiler). + // fiber_enter publishes the fiber identity for the calling thread. + // fiber_exit clears it so the per-tid map tracks "currently running fiber". + DTRACE_PROBE2(silk, fiber_enter, fiber, fiber->fiberId); + + threadFiber = fiber; + if (timer) { timer->reset(simpleCounters[SCHEDULER_USER_TIME], fiber->processorNumber); @@ -1702,6 +1720,8 @@ void FiberScheduler::runFiber(Fiber * fiber, CpuTimer * timer) noexcept threadFiber = nullptr; + DTRACE_PROBE2(silk, fiber_exit, fiber, fiber->fiberId); + // Submit any SQEs the fiber enqueued during this run. ProcessorState * processor = &scheduler->processorState[fiber->processorNumber]; processor->submitIo(); diff --git a/src/profiler/main.cpp b/src/profiler/main.cpp index dc155f6..52121c0 100644 --- a/src/profiler/main.cpp +++ b/src/profiler/main.cpp @@ -64,11 +64,13 @@ static bool sigwaitFor(const sigset_t & mask, uint64_t ns) noexcept int main(int argc, char ** argv) { - uint32_t targetPid; - uint32_t sampleHz; - uint32_t durationSec; - bool kernelStacks; - bool offcpu; + uint32_t targetPid = 0; + uint32_t sampleHz = 99; + uint32_t durationSec = 0; + bool kernelStacks = false; + bool oncpu = false; + bool offcpu = false; + bool usdt = false; bool verbose = false; namespace po = boost::program_options; @@ -78,10 +80,12 @@ int main(int argc, char ** argv) desc.add_options() ("help,h", "show this help") ("pid", po::value(&targetPid)->required(), "target process ID") - ("hz", po::value(&sampleHz)->default_value(99), "on-CPU sampling frequency") - ("duration", po::value(&durationSec)->default_value(0), "run for N seconds (0 = until Ctrl+C)") - ("kernel-stacks", po::bool_switch(&kernelStacks), "capture kernel stack frames") + ("hz", po::value(&sampleHz)->default_value(sampleHz), "on-CPU sampling frequency") + ("duration", po::value(&durationSec)->default_value(durationSec), "run for N seconds (0 = until Ctrl+C)") + ("kernel-stacks", po::bool_switch(&kernelStacks), "capture kernel stack frames (only with --on-cpu/--off-cpu)") + ("on-cpu", po::bool_switch(&oncpu), "capture on-CPU stack samples") ("off-cpu", po::bool_switch(&offcpu), "capture off-CPU blocking time") + ("usdt", po::bool_switch(&usdt), "attach silk USDT probes for fiber latency breakdown") ("verbose,v", po::bool_switch(&verbose), "enable debug logging") ; // clang-format on @@ -107,6 +111,12 @@ int main(int argc, char ** argv) return 1; } + if (!oncpu && !offcpu && !usdt) + { + LOG_ERROR("nothing to capture; pass at least one of --on-cpu, --off-cpu, --usdt"); + return 1; + } + bool hasBpf = hasCapability(CAP_BPF); bool hasPerfmon = hasCapability(CAP_PERFMON); @@ -151,7 +161,7 @@ int main(int argc, char ** argv) } } - Profiler profiler(targetPid, sampleHz, kernelStacks, offcpu); + Profiler profiler(targetPid, sampleHz, kernelStacks, oncpu, offcpu, usdt); r = profiler.start(); if (r) @@ -177,7 +187,14 @@ int main(int argc, char ** argv) LOG_INFO("collecting results..."); try { - profiler.collect(&symbolizer); + if (oncpu || offcpu) + { + profiler.emitFoldedStacks(&symbolizer); + } + if (usdt) + { + profiler.emitLatencyBreakdown(); + } } catch (const std::exception & ex) { diff --git a/src/profiler/profiler.bpf.c b/src/profiler/profiler.bpf.c index 7738e5d..63e8285 100644 --- a/src/profiler/profiler.bpf.c +++ b/src/profiler/profiler.bpf.c @@ -3,6 +3,7 @@ #include #include #include +#include const volatile u32 target_tgid = 0; const volatile u8 kernel_stacks = 0; @@ -15,8 +16,7 @@ struct __uint(max_entries, 65536); } stack_map SEC(".maps"); -// on-CPU: sample count keyed by combined stack ID -// key = ((u32)userSid << 32) | (u32)kernelSid; (u32)-1 means absent +// on-CPU: sample count keyed by combined stack key - see makeStackKey struct { __uint(type, BPF_MAP_TYPE_PERCPU_HASH); @@ -52,6 +52,44 @@ struct __type(value, u64); } sleep_stack SEC(".maps"); +// Per-fiber lifecycle state, keyed by Fiber * (passed as arg0 of every +// silk:fiber_* USDT probe). Each entry tracks the fiber's current phase and +// when it entered that phase. Inserted at fiber_start, removed at fiber_stop. +// +// Phases: +// 1 = WAITING -- between fiber_start and fiber_schedule, or between +// fiber_exit and the next fiber_schedule (blocked on IO/sync) +// 2 = WAKEUP -- between fiber_schedule and fiber_enter (on a ready queue) +// 3 = RUNNING -- between fiber_enter and fiber_exit (executing) +#define PHASE_WAITING 1 +#define PHASE_WAKEUP 2 +#define PHASE_RUNNING 3 + +struct fiber_state_t +{ + u64 last_ts; + u8 last_phase; +}; + +struct +{ + __uint(type, BPF_MAP_TYPE_HASH); + __uint(max_entries, 65536); + __type(key, u64); + __type(value, struct fiber_state_t); +} fiber_state SEC(".maps"); + +// Latency histogram, per-CPU to avoid contention on the probe hot path. +// Key encoding: bits 0..7 bucket, 8..15 phase, 16..23 fiber category. +// Bucket = clamped log2(ns); covers 1ns..2^31 ns (~2.1s). +struct +{ + __uint(type, BPF_MAP_TYPE_PERCPU_HASH); + __uint(max_entries, 4096); + __type(key, u64); + __type(value, u64); +} latency_hist SEC(".maps"); + static __always_inline u64 makeStackKey(s32 userSid, s32 kernelSid) { return ((u64)(u32)userSid << 32) | (u32)kernelSid; @@ -68,6 +106,56 @@ static __always_inline void incrementMap(void * map, u64 key, u64 delta) } } +// Clamped log2 in 0..31. Used to bucket nanosecond intervals. +static __always_inline u8 log2Bucket(u64 ns) +{ + if (ns == 0) + { + return 0; + } + u8 b = 0; + if (ns >> 32) + { + b += 32; + ns >>= 32; + } + if (ns >> 16) + { + b += 16; + ns >>= 16; + } + if (ns >> 8) + { + b += 8; + ns >>= 8; + } + if (ns >> 4) + { + b += 4; + ns >>= 4; + } + if (ns >> 2) + { + b += 2; + ns >>= 2; + } + if (ns >> 1) + { + b += 1; + } + if (b > 31) + b = 31; + return b; +} + +static __always_inline void bumpLatency(u64 fiber_id, u8 phase, u64 ns) +{ + u8 category = (u8)(fiber_id >> 56); + u8 bucket = log2Bucket(ns); + u64 key = ((u64)category << 16) | ((u64)phase << 8) | bucket; + incrementMap(&latency_hist, key, 1); +} + SEC("perf_event") int on_cpu_sample(struct bpf_perf_event_data * ctx) { @@ -157,4 +245,96 @@ int BPF_PROG(on_sched_switch, bool preempt, struct task_struct * prev, struct ta return 0; } +// silk:fiber_start - fiber created. Seed the per-fiber state in WAITING phase +// (interval until the first fiber_schedule is the initial dispatch latency). +SEC("usdt") +int BPF_USDT(on_fiber_start, void * fiber_ptr, u64 fiber_id) +{ + u64 fkey = (u64)fiber_ptr; + struct fiber_state_t state = {.last_ts = bpf_ktime_get_ns(), .last_phase = PHASE_WAITING}; + bpf_map_update_elem(&fiber_state, &fkey, &state, BPF_ANY); + return 0; +} + +// silk:fiber_schedule - fiber became READY. Close the WAITING interval and +// open a WAKEUP interval. Tolerates the start-of-trace case (no prior state) +// by silently skipping the histogram bump. +SEC("usdt") +int BPF_USDT(on_fiber_schedule, void * fiber_ptr, u64 fiber_id) +{ + u64 fkey = (u64)fiber_ptr; + u64 now = bpf_ktime_get_ns(); + struct fiber_state_t * state = bpf_map_lookup_elem(&fiber_state, &fkey); + if (state) + { + if (state->last_phase == PHASE_WAITING) + { + bumpLatency(fiber_id, PHASE_WAITING, now - state->last_ts); + } + state->last_ts = now; + state->last_phase = PHASE_WAKEUP; + } + else + { + // Profiler attached mid-flight: seed state. + struct fiber_state_t fresh = {.last_ts = now, .last_phase = PHASE_WAKEUP}; + bpf_map_update_elem(&fiber_state, &fkey, &fresh, BPF_ANY); + } + return 0; +} + +// silk:fiber_enter - fiber starts running. Close WAKEUP interval, open RUNNING. +SEC("usdt") +int BPF_USDT(on_fiber_enter, void * fiber_ptr, u64 fiber_id) +{ + u64 fkey = (u64)fiber_ptr; + u64 now = bpf_ktime_get_ns(); + struct fiber_state_t * state = bpf_map_lookup_elem(&fiber_state, &fkey); + if (state) + { + if (state->last_phase == PHASE_WAKEUP) + { + bumpLatency(fiber_id, PHASE_WAKEUP, now - state->last_ts); + } + state->last_ts = now; + state->last_phase = PHASE_RUNNING; + } + else + { + struct fiber_state_t fresh = {.last_ts = now, .last_phase = PHASE_RUNNING}; + bpf_map_update_elem(&fiber_state, &fkey, &fresh, BPF_ANY); + } + return 0; +} + +// silk:fiber_exit - fiber stops running (either suspending or about to stop). +// Close RUNNING interval; reopen WAITING (until the next schedule, or until +// fiber_stop tears the entry down). +SEC("usdt") +int BPF_USDT(on_fiber_exit, void * fiber_ptr, u64 fiber_id) +{ + u64 fkey = (u64)fiber_ptr; + u64 now = bpf_ktime_get_ns(); + struct fiber_state_t * state = bpf_map_lookup_elem(&fiber_state, &fkey); + if (state) + { + if (state->last_phase == PHASE_RUNNING) + { + bumpLatency(fiber_id, PHASE_RUNNING, now - state->last_ts); + } + state->last_ts = now; + state->last_phase = PHASE_WAITING; + } + return 0; +} + +// silk:fiber_stop - fiber destroyed. Drop its state entry. +SEC("usdt") +int BPF_USDT(on_fiber_stop, void * fiber_ptr, u64 fiber_id) +{ + u64 fkey = (u64)fiber_ptr; + bpf_map_delete_elem(&fiber_state, &fkey); + return 0; +} + char LICENSE[] SEC("license") = "GPL"; diff --git a/src/profiler/profiler.cpp b/src/profiler/profiler.cpp index dc69c21..79ebfda 100644 --- a/src/profiler/profiler.cpp +++ b/src/profiler/profiler.cpp @@ -4,13 +4,18 @@ #include #include +#include #include #include #include +#include #include +#include #include +#include +#include #include #include #include @@ -20,9 +25,8 @@ typedef __u8 u8; typedef __u16 u16; typedef __u32 u32; typedef __u64 u64; +typedef bool _Bool; -#include -#include #include static int perfEventOpen(perf_event_attr * attr, pid_t pid, int cpu, int groupFd, unsigned long flags) @@ -71,11 +75,13 @@ static int libbpfPrint(libbpf_print_level level, const char * format, va_list ar return 0; } -Profiler::Profiler(uint32_t targetTgid, uint32_t sampleHz, bool kernelStacks, bool offcpu) noexcept +Profiler::Profiler(uint32_t targetTgid, uint32_t sampleHz, bool kernelStacks, bool oncpu, bool offcpu, bool usdt) noexcept : targetTgid(targetTgid) , sampleHz(sampleHz) , kernelStacks(kernelStacks) + , oncpu(oncpu) , offcpu(offcpu) + , usdt(usdt) { } @@ -104,10 +110,22 @@ int Profiler::start() noexcept skel->rodata->target_tgid = targetTgid; skel->rodata->kernel_stacks = kernelStacks ? 1 : 0; + if (!oncpu) + { + bpf_program__set_autoload(skel->progs.on_cpu_sample, false); + } if (!offcpu) { bpf_program__set_autoload(skel->progs.on_sched_switch, false); } + if (!usdt) + { + bpf_program__set_autoload(skel->progs.on_fiber_start, false); + bpf_program__set_autoload(skel->progs.on_fiber_schedule, false); + bpf_program__set_autoload(skel->progs.on_fiber_enter, false); + bpf_program__set_autoload(skel->progs.on_fiber_exit, false); + bpf_program__set_autoload(skel->progs.on_fiber_stop, false); + } if (profiler_bpf__load(skel)) { @@ -123,45 +141,83 @@ int Profiler::start() noexcept return EINVAL; } - perf_event_attr attr = {}; - attr.type = PERF_TYPE_SOFTWARE; - attr.config = PERF_COUNT_SW_CPU_CLOCK; - attr.freq = 1; - attr.sample_freq = sampleHz; + // Attach all five lifecycle USDT probes to the target's executable when + // --usdt is on. Probes live in libsilk-fibers statically linked into the + // target, so the binary path is /proc//exe. + if (usdt) + { + char exePath[64]; + std::snprintf(exePath, sizeof(exePath), "/proc/%u/exe", targetTgid); - int numCpus = libbpf_num_possible_cpus(); - for (int cpu = 0; cpu < numCpus; cpu++) + struct + { + bpf_program * prog; + const char * name; + } probes[] = { + {skel->progs.on_fiber_start, "fiber_start"}, + {skel->progs.on_fiber_schedule, "fiber_schedule"}, + {skel->progs.on_fiber_enter, "fiber_enter"}, + {skel->progs.on_fiber_exit, "fiber_exit"}, + {skel->progs.on_fiber_stop, "fiber_stop"}, + }; + + for (auto & probe : probes) + { + bpf_link * link = bpf_program__attach_usdt(probe.prog, targetTgid, exePath, "silk", probe.name, nullptr); + if (!link) + { + int r = errno; + LOG_WARN("attach silk:{} to {}: {} (latency breakdown disabled)", probe.name, exePath, strerror(r)); + } + else + { + usdtLinks.push_back(link); + } + } + } + + if (oncpu) { - int fd = perfEventOpen(&attr, -1, cpu, -1, 0); - if (fd < 0) + perf_event_attr attr = {}; + attr.type = PERF_TYPE_SOFTWARE; + attr.config = PERF_COUNT_SW_CPU_CLOCK; + attr.freq = 1; + attr.sample_freq = sampleHz; + + int numCpus = libbpf_num_possible_cpus(); + for (int cpu = 0; cpu < numCpus; cpu++) { - int r = errno; - if (r == ENODEV) + int fd = perfEventOpen(&attr, -1, cpu, -1, 0); + if (fd < 0) { - // CPU offline + int r = errno; + if (r == ENODEV) + { + // CPU offline + continue; + } + LOG_WARN("perf_event_open cpu {}: {}", cpu, strerror(r)); continue; } - LOG_WARN("perf_event_open cpu {}: {}", cpu, strerror(r)); - continue; + + bpf_link * link = bpf_program__attach_perf_event(skel->progs.on_cpu_sample, fd); + if (!link) + { + int r = errno; + LOG_WARN("attach perf_event cpu {}: {}", cpu, strerror(r)); + ::close(fd); + continue; + } + + perfLinks.push_back(link); } - bpf_link * link = bpf_program__attach_perf_event(skel->progs.on_cpu_sample, fd); - if (!link) + if (perfLinks.empty()) { - int r = errno; - LOG_WARN("attach perf_event cpu {}: {}", cpu, strerror(r)); - ::close(fd); - continue; + LOG_ERROR("failed to attach to any CPU"); + stop(); + return ENODEV; } - - perfLinks.push_back(link); - } - - if (perfLinks.empty()) - { - LOG_ERROR("failed to attach to any CPU"); - stop(); - return ENODEV; } return 0; @@ -174,9 +230,15 @@ void Profiler::stop() noexcept bpf_link__destroy(link); } perfLinks.clear(); + + for (bpf_link * link : usdtLinks) + { + bpf_link__destroy(link); + } + usdtLinks.clear(); } -void Profiler::collect(Symbolizer * symbolizer) +void Profiler::emitFoldedStacks(Symbolizer * symbolizer) { ASSERT(skel); @@ -189,8 +251,8 @@ void Profiler::collect(Symbolizer * symbolizer) // Merge both maps into {stack_key → (on_ns, off_ns)}. std::unordered_map> merged; - drainMap(oncpuFd, merged, true, sampleNs); - drainMap(offcpuFd, merged, false, 1); + drainStacks(oncpuFd, merged, true, sampleNs); + drainStacks(offcpuFd, merged, false, 1); std::string folded; @@ -275,7 +337,7 @@ void Profiler::collect(Symbolizer * symbolizer) } } -void Profiler::drainMap(int fd, StackMap & merged, bool isOnCpu, uint64_t weightFactor) +void Profiler::drainStacks(int fd, StackMap & merged, bool isOnCpu, uint64_t weightFactor) { int numCpus = libbpf_num_possible_cpus(); std::vector perCpuValues(numCpus); @@ -309,7 +371,7 @@ void Profiler::drainMap(int fd, StackMap & merged, bool isOnCpu, uint64_t weight } while (bpf_map_get_next_key(fd, &key, &nextKey) == 0); } -int Profiler::countFrames(const uint64_t * addrs) +int Profiler::countFrames(const uint64_t * addrs) noexcept { int n = 0; while (n < MAX_FRAMES && addrs[n]) @@ -318,3 +380,128 @@ int Profiler::countFrames(const uint64_t * addrs) } return n; } + +void Profiler::emitLatencyBreakdown() +{ + LatencyHist hist; + drainHistogram(bpf_map__fd(skel->maps.latency_hist), hist); + + if (hist.empty()) + { + return; + } + + std::printf("\n=== fiber latency breakdown (ns, log2 buckets) ===\n"); + std::printf("%-24s%12s%12s%12s%12s%12s\n", "category / phase", "count", "p50", "p90", "p99", "max"); + for (const auto & [phaseKey, buckets] : hist) + { + uint64_t total = 0; + for (uint64_t c : buckets) + { + total += c; + } + if (total == 0) + { + continue; + } + + char label[32]; + std::snprintf(label, sizeof(label), "fiber-%02x %s", phaseKey.first, phaseName(phaseKey.second)); + std::printf( + "%-24s%12lu%12lu%12lu%12lu%12lu\n", + label, + total, + percentileNs(buckets, 0.50), + percentileNs(buckets, 0.90), + percentileNs(buckets, 0.99), + maxNs(buckets)); + } +} + +void Profiler::drainHistogram(int fd, LatencyHist & hist) +{ + int numCpus = libbpf_num_possible_cpus(); + std::vector perCpuValues(numCpus); + + uint64_t key, nextKey; + if (bpf_map_get_next_key(fd, nullptr, &nextKey) != 0) + { + return; + } + + do + { + key = nextKey; + bpf_map_lookup_elem(fd, &key, perCpuValues.data()); + + uint64_t total = 0; + for (int i = 0; i < numCpus; i++) + { + total += perCpuValues[i]; + } + if (total == 0) + { + continue; + } + + // Key encoding from profiler.bpf.c::bumpLatency: + // bits 0..7 bucket (clamped log2 ns) + // bits 8..15 phase + // bits 16..23 fiber category + uint8_t category = static_cast(key >> 16); + uint8_t phase = static_cast(key >> 8); + uint8_t bucket = static_cast(key); + if (bucket < HIST_BUCKETS) + { + hist[{category, phase}][bucket] += total; + } + } while (bpf_map_get_next_key(fd, &key, &nextKey) == 0); +} + +uint64_t Profiler::percentileNs(const HistBuckets & buckets, double p) noexcept +{ + uint64_t total = 0; + for (uint64_t c : buckets) + { + total += c; + } + + uint64_t target = static_cast(total * p); + uint64_t cum = 0; + for (int b = 0; b < HIST_BUCKETS; b++) + { + cum += buckets[b]; + if (cum >= target) + { + return 1ULL << b; + } + } + return 1ULL << (HIST_BUCKETS - 1); +} + +uint64_t Profiler::maxNs(const HistBuckets & buckets) noexcept +{ + for (int b = HIST_BUCKETS - 1; b >= 0; b--) + { + if (buckets[b]) + { + return 1ULL << b; + } + } + return 0; +} + +const char * Profiler::phaseName(uint8_t phase) noexcept +{ + switch (phase) + { + case 1: + return "waiting"; + case 2: + return "wakeup"; + case 3: + return "running"; + default: + return "?"; + } +} diff --git a/src/profiler/profiler.h b/src/profiler/profiler.h index e24ed5c..b786b0e 100644 --- a/src/profiler/profiler.h +++ b/src/profiler/profiler.h @@ -1,6 +1,8 @@ #pragma once +#include #include +#include #include #include #include @@ -12,7 +14,7 @@ class Symbolizer; class Profiler { public: - Profiler(uint32_t targetTgid, uint32_t sampleHz, bool kernelStacks, bool offcpu) noexcept; + Profiler(uint32_t targetTgid, uint32_t sampleHz, bool kernelStacks, bool oncpu, bool offcpu, bool usdt) noexcept; ~Profiler() noexcept; // opens, loads, and attaches all BPF programs; returns 0 on success, errno on failure @@ -22,19 +24,35 @@ class Profiler void stop() noexcept; // Emits merged folded stacks to stdout: "frame1;frame2 on_ns off_ns" - void collect(Symbolizer * symbolizer); + void emitFoldedStacks(Symbolizer * symbolizer); + + // Per-category fiber-lifecycle latency breakdown. Reads the latency_hist + // BPF map populated by the silk:fiber_* USDT handlers. Only meaningful + // when the profiler was started with usdt=true; otherwise the map is + // empty and the table prints no rows. + void emitLatencyBreakdown(); private: static constexpr int MAX_FRAMES = 127; - using StackMap = std::unordered_map>; - static void drainMap(int fd, StackMap & merged, bool isOnCpu, uint64_t weightFactor); - static int countFrames(const uint64_t * addrs); + static void drainStacks(int fd, StackMap & merged, bool isOnCpu, uint64_t weightFactor); + static int countFrames(const uint64_t * addrs) noexcept; + + static constexpr int HIST_BUCKETS = 32; + using HistBuckets = std::array; + using LatencyHist = std::map, HistBuckets>; + static void drainHistogram(int fd, LatencyHist & hist); + static uint64_t percentileNs(const HistBuckets & buckets, double p) noexcept; + static uint64_t maxNs(const HistBuckets & buckets) noexcept; + static const char * phaseName(uint8_t phase) noexcept; uint32_t targetTgid; uint32_t sampleHz; bool kernelStacks; + bool oncpu; bool offcpu; + bool usdt; profiler_bpf * skel = nullptr; std::vector perfLinks; + std::vector usdtLinks; };