From 635757ae32789561857843078242f25443de0850 Mon Sep 17 00:00:00 2001 From: hazemawadalla Date: Thu, 26 Mar 2026 00:54:10 -0700 Subject: [PATCH] KV cache benchmark: multi-turn full context reload, block-layer tracing & fio workload distiller Problem: The multi-turn conversation path in process_requests() only read the immediately previous turn (turn N-1) when resuming a conversation. In a real inference serving environment with KV cache offloading, resuming a conversation requires reloading the full prior context from storage; every previous turn that survived LRU eviction needs to be read back. Reading only N-1 understated the read I/O by a factor of N/2 for deep conversations. Additionally, the decode probe read latency in step 5 was silently dropped from the per-request storage_latency accumulator; every other read in process_requests() accumulated correctly except this one, and decode_latencies only recorded the probe read without the batched decode reads that follow it. Separately, we had no way to decompose what was happening at the block layer during benchmark runs. The L4 "device" latency measures the time to read an entire .npy file through np.load(); these files can reach 500 MB to 2 GB depending on context length and model, and the kernel splits each read into hundreds of NVMe commands at the MDTS boundary. The P95 device read latency reflects the total time to load a large KV cache entry. This adds world class telemetry with minimal overhead on the storage block layer metrics enabled via a single flag --enable-latency-tracing. Fixes (benchmark.py): - Multi-turn step 2 now calls get_all_previous_turn_keys() and reads ALL previous turns via access_cache(). Entries that were evicted by the LRU waterfall return (None, 0.0) immediately with zero I/O and zero memory allocation. Surviving entries get real np.load reads with measured latency. The multi-turn hit rate in the output now reflects the true conversation cache survival rate under eviction pressure; we saw 45% hit rate with 10 users on DC3000ME which tells you exactly how much prior context the storage tier can sustain. - Steps 2+3 moved inside if not self.decode_only guard; in decode-only mode writes are skipped so multi-turn reads always miss. - storage_latency += read_latency added after the step 5 decode probe read, matching every other read in the method. - decode_latencies now accumulates probe read + all batched decode reads per request, not just the probe. - max_turns_per_conv hard cap enforced in user_worker; previously the config value was read but never checked, so conversations could grow unbounded regardless of the setting. - Memory safety check at startup: estimates peak RAM from the formula peak = (workers x 2 x mean_entry_bytes) + baseline and warns with safe --num-users / --max-concurrent-allocs values if the estimate exceeds 85% of available RAM. Block-layer tracing (--enable-latency-tracing): - Spawns bpftrace as a sudo subprocess before the benchmark run, sends SIGINT after, parses the histogram output into structured data. - 15 histograms captured: D2C read/write (actual NVMe hardware time per command), Q2D read/write (I/O scheduler queue), VFS read/write (application visible), fsync, write-to-fsync serialization gap, fadvise-to-read gap, block size distribution (bssplit) read/write, in-flight I/O count at dispatch read/write, and LBA heatmap read/write (10 GB linear buckets via lhist). - bpftrace 0.14 compatible: uses comm == instead of str(comm) ==, END block removed (bpftrace auto-prints maps on SIGINT), D2C measured unconditionally at block_rq_issue (not gated on block_rq_insert which NVMe blk-mq direct dispatch bypasses). - Results flow to stdout (P50/P95/P99 per histogram with raw bars), JSON (full bucket data under device_latency_tracing key), and XLSX (Device Tracing summary sheet + Trace Histograms raw data sheet). fio workload distiller: - _generate_fio_workload() in benchmark.py distills the traced bssplit, read/write ratio, queue depth, and thinktime into a standalone fio .ini file saved as fio_kv_cache_workload_{timestamp}.ini. - utils/distill_fio.py is a standalone Python script that parses raw bpftrace output and generates the same fio config without needing the benchmark installed. Point it at vLLM, llm-d, or any process and get a representative fio workload for bare-metal drive comparison. - utils/storage_latency_stack.sh updated with --fio flag that captures trace output and pipes through distill_fio.py automatically. - bssplit derived from block size histograms with separate read/write splits per fio spec. rwmixread from I/O count ratio. iodepth from in-flight I/O histogram P50. thinktime from write-to-fsync gap P50. thinktime_iotime commented out for fio <3.28 compatibility. - Generated config includes D2C latency summary and LBA hot zone in the header comments for reference. New method (cache.py): - check_cache_exists(): metadata-only existence check on cache_entries. Returns (location, size) or (None, 0). No np.load, no LRU update, no memory allocation. Tests: - test_part3c updated: shows the triangular read pattern where turn 4 reads turns 1, 2, 3. Asserts 10 total decode_reads (6 multi-turn reads + 4 decode reads). - test_part3d added: 6-turn conversation on NVMe with capacity for 3.5 entries. Proves eviction works correctly; later turns show a mix of hits (recent turns still cached) and misses (old turns evicted). Documents both miss counters: cache.stats['cache_misses'] (tier-level, all types) vs results['multi_turn_cache_misses'] (benchmark-level, step 2 only). Documentation: - README.md: Added Block-Layer Latency Tracing section with usage examples, histogram reference table, fio distiller docs, and standalone vLLM/llm-d tracing instructions. Updated Unit Testing section with TestVisualizeUserRequestFlow examples and flags. - Proposal document: Added section 8.5 covering tracing motivation, histogram reference, fio distiller, and standalone usage. Tested on Kingston DC3000ME PCIe Gen5 NVMe (447 GB), llama3.1-8b, 1-100 users, seed 42, with bpftrace block-layer validation confirming D2C read P50 of 1-2 ms per NVMe command and bssplit dominated by 1 MB blocks (99% of read I/Os, matching MDTS splits of the large KV cache .npy files). --- kv_cache_benchmark/README.md | 136 ++++- .../docs/MLperf v3 KV cache proposal.md | 86 +++ kv_cache_benchmark/kv_cache/benchmark.py | 494 +++++++++++++++++- kv_cache_benchmark/kv_cache/cache.py | 16 + kv_cache_benchmark/kv_cache/cli.py | 86 ++- kv_cache_benchmark/tests/test_kv_cache.py | 209 ++++++-- kv_cache_benchmark/utils/distill_fio.py | 276 ++++++++++ .../utils/storage_latency_stack.bt | 317 +++++++++++ .../utils/storage_latency_stack.sh | 107 ++++ 9 files changed, 1656 insertions(+), 71 deletions(-) create mode 100755 kv_cache_benchmark/utils/distill_fio.py create mode 100644 kv_cache_benchmark/utils/storage_latency_stack.bt create mode 100755 kv_cache_benchmark/utils/storage_latency_stack.sh diff --git a/kv_cache_benchmark/README.md b/kv_cache_benchmark/README.md index b6757782..cfe986b0 100644 --- a/kv_cache_benchmark/README.md +++ b/kv_cache_benchmark/README.md @@ -25,7 +25,8 @@ A storage benchmarking tool for Large Language Model inference systems. This ben 12. [Understanding Results](#understanding-results) 13. [Unit Testing](#unit-testing) 14. [Excel Export](#excel-export) -15. [MLPerf Submission Guidelines](#mlperf-submission-guidelines) +15. [Block-Layer Latency Tracing](#block-layer-latency-tracing) +16. [MLPerf Submission Guidelines](#mlperf-submission-guidelines) 16. [Troubleshooting](#troubleshooting) --- @@ -1498,10 +1499,39 @@ The test suite covers 23 component categories with ~170+ individual tests: | `TestPerTierPhaseMetrics` | 7 | Per-tier (GPU/CPU/Storage) KV bytes read/written tracking during prefill/decode phases | | `TestPerTierPhaseMetricsWithGPU` | 4 | GPU tier metrics tracking, phase-aware read/write separation (skipped without GPU) | +### Visualize User Request Flow + +The `TestVisualizeUserRequestFlow` test class traces the complete I/O path of real requests through the benchmark; these are the tests to run when you want to understand exactly what the benchmark does at each step: + +```bash +# Part 3: The 4 latency levels (L1-L4) with real NVMe timing +pytest tests/test_kv_cache.py::TestVisualizeUserRequestFlow::test_part3_four_latency_levels -v -s + +# Part 3b: How requests become .npy files on disk +pytest tests/test_kv_cache.py::TestVisualizeUserRequestFlow::test_part3b_request_to_npy_file_mapping -v -s + +# Part 3c: Multi-turn conversation I/O (triangular read pattern) +pytest tests/test_kv_cache.py::TestVisualizeUserRequestFlow::test_part3c_multi_turn_prefill_decode_file_io -v -s + +# Part 3d: Multi-turn with eviction pressure (hits vs misses under LRU) +pytest tests/test_kv_cache.py::TestVisualizeUserRequestFlow::test_part3d_multi_turn_with_eviction -v -s + +# Part 4: 3-tier waterfall LRU eviction cascade (GPU -> CPU -> NVMe -> DELETE) +pytest tests/test_kv_cache.py::TestVisualizeUserRequestFlow::test_part4_three_tier_waterfall_eviction -v -s + +# Part 5: NVMe-only eviction (what happens when the drive fills up) +pytest tests/test_kv_cache.py::TestVisualizeUserRequestFlow::test_part5_one_tier_nvme_only_eviction -v -s + +# Run all visualization tests at once +pytest tests/test_kv_cache.py::TestVisualizeUserRequestFlow -v -s +``` + +Use `-s` to see the printed I/O traces; without it pytest captures the output and you lose the visualization. + ### Expected Runtime -- **Without GPU**: ~5-10 seconds -- **With GPU**: ~10-15 seconds +- **Without GPU**: ~4-5 minutes (211 tests) +- **With GPU**: ~5-6 minutes GPU tests are automatically skipped if CUDA is not available. @@ -1553,6 +1583,106 @@ The Excel file contains a single row with all key metrics: --- +## Block-Layer Latency Tracing + +The `--enable-latency-tracing` flag adds block-layer visibility to the benchmark with a single flag; no code changes, no separate tooling, minimal overhead. It spawns bpftrace as a sudo subprocess, attaches to the kernel block layer tracepoints during the benchmark run, and on completion distills the I/O profile into structured telemetry across stdout, JSON, and XLSX. + +This is the same class of telemetry that storage engineers use when characterizing production workloads; the difference is that it is fully integrated into the benchmark and the results are machine-readable. + +### What It Captures + +15 histograms across the full I/O stack: + +| Category | Histograms | What It Tells You | +|----------|-----------|-------------------| +| Device hardware | D2C read/write | Per-NVMe-command completion time; this is what the SSD controller actually took | +| I/O scheduler | Q2D read/write | Time sitting in the Linux I/O scheduler queue before dispatch | +| Application visible | VFS read/write | Full syscall latency from the application's perspective | +| Serialization | write-to-fsync gap, fsync, fadvise-to-read gap | CPU vs device bottleneck decomposition | +| Block sizes | bssplit read/write | I/O size distribution at the kernel layer (matches MDTS splits) | +| Queue depth | In-flight at dispatch read/write | Instantaneous I/O concurrency at the moment of dispatch | +| Spatial | LBA heatmap read/write | Where on the device the I/O lands (10 GB linear buckets) | + +### Usage + +```bash +# Run benchmark with tracing (requires sudo for bpftrace) +kv-cache --config config.yaml --model llama3.1-8b \ + --num-users 10 --duration 30 \ + --gpu-mem-gb 0 --cpu-mem-gb 0 \ + --max-concurrent-allocs 1 \ + --generation-mode none \ + --cache-dir /mnt/nvme --seed 42 \ + --enable-latency-tracing \ + --xlsx-output results_traced.xlsx +``` + +The tracing output appears at the end of the benchmark results. The XLSX gets two additional sheets: **Device Tracing** (P50/P95/P99 summary per histogram) and **Trace Histograms** (raw bucket data for charting). + +### Standalone Tracing Against vLLM / llm-d + +The bpftrace scripts work independently of the benchmark. Point them at any inference engine process: + +```bash +# Trace vLLM and generate a fio workload +sudo ./utils/storage_latency_stack.sh vllm --fio + +# Trace llm-d +sudo ./utils/storage_latency_stack.sh llm-d --fio + +# Trace any process +sudo ./utils/storage_latency_stack.sh python3 + +# Manual distill from saved trace +python3 utils/distill_fio.py -i trace_output.txt --process vllm -o vllm_workload.ini +``` + +The `--fio` flag captures the bpftrace output and pipes it through `distill_fio.py` to generate a standalone fio workload file. This means you can trace vLLM on a production node, take the generated .ini file, and replay the exact I/O pattern on a bare-metal test rig with fio to compare drives without running the inference stack. + +### fio Workload Distiller + +When `--enable-latency-tracing` is used with the benchmark, or when `--fio` is passed to the shell wrapper, a fio .ini file is generated automatically. The fio config includes: + +- **bssplit** from the traced block size distribution (separate read/write splits) +- **rwmixread** from the read/write I/O count ratio +- **iodepth** from the in-flight I/O histogram P50 +- **thinktime** from the write-to-fsync serialization gap (idle time between I/O bursts) +- D2C latency summary and LBA hot zone in the header comments + +Example generated config: +```ini +[kv-cache-traced] +ioengine=libaio +direct=1 +time_based +runtime=300 +rw=randrw +rwmixread=87 +bssplit=4k/1:8k/1:16k/1:32k/1:64k/1:128k/100,4k/7:8k/1:16k/1:32k/4:64k/4:128k/83 +iodepth=2048 +iodepth_batch_submit=2048 +iodepth_batch_complete_min=1 +size=100G +thinktime=32 +thinktime_blocks=2048 +thinktime_iotime=1s +refill_buffers=1 +norandommap=1 +randrepeat=0 +numjobs=1 +group_reporting +percentile_list=50:95:99:99.9:99.99 +``` + +### Requirements + +- Linux 5.x+ with BTF support +- bpftrace 0.14+ (`sudo apt install bpftrace`) +- sudo or CAP_BPF privileges +- If bpftrace is not available, the flag degrades gracefully; the benchmark runs normally without tracing. + +--- + ## MLPerf Submission Guidelines For official MLPerf v3.0 storage submissions, use these standardized commands. **These invocations have been validated through extensive discovery testing** (1,411 Fast system tests, 268 Slow system tests comparing 14,000 MB/s vs 3,000 MB/s storage). diff --git a/kv_cache_benchmark/docs/MLperf v3 KV cache proposal.md b/kv_cache_benchmark/docs/MLperf v3 KV cache proposal.md index 0e632e59..d5ae8fc7 100644 --- a/kv_cache_benchmark/docs/MLperf v3 KV cache proposal.md +++ b/kv_cache_benchmark/docs/MLperf v3 KV cache proposal.md @@ -1981,6 +1981,92 @@ data = buffer[start:start+size].reshape(kv_shape) --- +## 8.5 Block-Layer Latency Tracing & fio Workload Distiller + +The benchmark includes an integrated block-layer tracing capability that decomposes storage I/O across every layer of the Linux I/O stack; from the application (VFS) down to the NVMe controller (D2C). This is enabled with a single flag and requires no code changes, no separate tooling, and adds minimal overhead to the benchmark run. + +### Motivation + +The L4 "device" latency reported by the benchmark measures the time to read or write an entire .npy file through NumPy. For large KV cache entries (500 MB to 2 GB), the kernel splits each file I/O into hundreds of NVMe commands at the MDTS boundary. The resulting P95 device latency reflects the total time to load a large entry; it includes both the actual NVMe hardware time and the numpy deserialization overhead within that single np.load() call. Without block-layer visibility, there is no way to distinguish how much of that latency is the drive vs the host. + +### Enabling Tracing + +```bash +kv-cache --config config.yaml --model llama3.1-8b \ + --num-users 10 --duration 30 \ + --gpu-mem-gb 0 --cpu-mem-gb 0 \ + --max-concurrent-allocs 1 \ + --generation-mode none \ + --cache-dir /mnt/nvme --seed 42 \ + --enable-latency-tracing \ + --xlsx-output results_traced.xlsx +``` + +The benchmark spawns bpftrace as a sudo subprocess before the run, attaches to 16 kernel tracepoints, and on completion sends SIGINT to collect the histogram data. The tracing subprocess runs in its own process group; the benchmark itself does not require root. + +### Histograms Captured + +| Histogram | Layer | What It Measures | +|-----------|-------|-----------------| +| D2C read/write | Device | Per-NVMe-command completion time (actual hardware latency) | +| Q2D read/write | I/O Scheduler | Time in the scheduler queue before dispatch to the NVMe driver | +| VFS read/write | Application | Full syscall time including page cache, filesystem, and block I/O | +| fsync | Device | Actual device flush latency after buffered writes | +| write-to-fsync gap | Serialization | CPU idle time between write() return and fsync() entry | +| fadvise-to-read gap | Cache mgmt | Overhead of page cache invalidation before reads | +| bssplit read/write | Block sizes | I/O size distribution at the kernel layer | +| Queue depth read/write | Concurrency | Instantaneous in-flight I/O count at the moment of dispatch | +| LBA heatmap read/write | Spatial | Where on the device the I/O lands (10 GB linear buckets) | + +### fio Workload Distiller + +When tracing is enabled, the benchmark automatically generates a standalone fio .ini file that reproduces the observed I/O pattern. The distiller extracts bssplit (block size distribution with separate read/write splits), rwmixread (from the I/O count ratio), iodepth (from the in-flight I/O histogram), and thinktime (from the write-to-fsync serialization gap) and writes them into a fio config that can be run independently against any device. + +Example output from a traced benchmark run on Kingston DC3000ME: + +```ini +[kv-cache-traced] +ioengine=libaio +direct=1 +time_based +runtime=300 +rw=randrw +rwmixread=87 +bssplit=4k/1:8k/1:16k/1:32k/1:64k/1:128k/100,4k/7:8k/1:16k/1:32k/4:64k/4:128k/83 +iodepth=2048 +iodepth_batch_submit=2048 +iodepth_batch_complete_min=1 +size=100G +thinktime=32 +thinktime_blocks=2048 +thinktime_iotime=1s +refill_buffers=1 +norandommap=1 +randrepeat=0 +numjobs=1 +group_reporting +percentile_list=50:95:99:99.9:99.99 +``` + +### Standalone Usage Against Inference Engines + +The tracing tools work independently of the benchmark. The shell wrapper and Python distiller can be pointed at any process: + +```bash +# Trace vLLM and generate fio workload +sudo ./utils/storage_latency_stack.sh vllm --fio + +# Trace llm-d +sudo ./utils/storage_latency_stack.sh llm-d --fio + +# Manual distill from saved trace output +python3 utils/distill_fio.py -i trace_output.txt --process vllm -o vllm_workload.ini +``` + +This means you can characterize the I/O profile of a real inference engine on a production node, take the generated fio .ini file to a test bench, and run it against multiple drives with fio to compare storage performance without deploying the full inference stack. + +--- + ## 9. Common Issues & Troubleshooting ### Issue: High Host Latency diff --git a/kv_cache_benchmark/kv_cache/benchmark.py b/kv_cache_benchmark/kv_cache/benchmark.py index 8219019b..a69790e0 100755 --- a/kv_cache_benchmark/kv_cache/benchmark.py +++ b/kv_cache_benchmark/kv_cache/benchmark.py @@ -12,9 +12,12 @@ import glob import time import queue +import signal import random import logging import threading +import subprocess +import re from typing import Dict, List, Optional, Tuple from datetime import datetime from concurrent.futures import ThreadPoolExecutor @@ -73,7 +76,8 @@ def __init__(self, trace_speedup: float = 1.0, replay_cycles: int = 0, prefill_only: bool = False, - decode_only: bool = False): + decode_only: bool = False, + enable_latency_tracing: bool = False): self.model_config = model_config self.num_users = num_users @@ -81,6 +85,8 @@ def __init__(self, self.duration = duration_seconds self.enable_autoscaling = enable_autoscaling self.enable_multi_turn = enable_multi_turn + self.enable_latency_tracing = enable_latency_tracing + self._trace_proc = None self.generation_mode = generation_mode self.ms_per_token = GENERATION_TIMING[generation_mode] * 1000 self.enable_prefix_caching = enable_prefix_caching @@ -419,6 +425,15 @@ def user_worker(user: UserProfile): self.user_conversations.pop(user.user_id, None) local_conv_id = None + # Enforce max_turns_per_conv hard cap + if local_conv_id: + with self.conversation_manager.lock: + state = self.conversation_manager.conversations.get(local_conv_id) + if state and state.turn_number >= self.conversation_manager.max_turns_per_conv: + with self.user_conversations_lock: + self.user_conversations.pop(user.user_id, None) + local_conv_id = None + if local_conv_id is None: local_conv_id = self.conversation_manager.start_conversation(user.user_id) with self.user_conversations_lock: @@ -514,17 +529,24 @@ def process_requests(self, stop_event: threading.Event): storage_latency += read_lat request.context_tokens = remaining_tokens - # Skip if decode_only mode (disaggregated decode node) + # Skip steps 2+3 entirely in decode_only mode: + # - Step 2 reads always miss (step 3 writes are skipped, so no entries exist) + # - Step 3 prefill writes don't apply to decode-only nodes if not self.decode_only: - # 2. For multi-turn conversations, access cache from previous turn. + # 2. For multi-turn conversations, reload all previous turns' KV cache. + # Reads every previous turn via access_cache (real I/O for entries + # that survived eviction; immediate (None, 0.0) for evicted entries). if self.conversation_manager and request.turn_number > 1: - prev_turn_key = f"{request.conversation_id}_turn_{request.turn_number - 1}" - location, read_latency = self.cache.access_cache(prev_turn_key, InferencePhase.DECODE, 'multi_turn') - if location is not None: - storage_latency += read_latency - with self.results_lock: self.results['multi_turn_cache_hits'] += 1 - else: - with self.results_lock: self.results['multi_turn_cache_misses'] += 1 + prev_keys = self.conversation_manager.get_all_previous_turn_keys( + request.conversation_id, request.turn_number + ) + for prev_turn_key in prev_keys: + location, read_latency = self.cache.access_cache(prev_turn_key, InferencePhase.DECODE, 'multi_turn') + if location is not None: + storage_latency += read_latency + with self.results_lock: self.results['multi_turn_cache_hits'] += 1 + else: + with self.results_lock: self.results['multi_turn_cache_misses'] += 1 # 3. Perform the main PREFILL operation (a cache WRITE). if request.phase == InferencePhase.PREFILL or request.phase == InferencePhase.PREFILL_DECODE: @@ -556,6 +578,8 @@ def process_requests(self, stop_event: threading.Event): decode_key = request.cache_key location, read_latency = self.cache.access_cache(decode_key, InferencePhase.DECODE, cache_type) + storage_latency += read_latency + decode_total_latency = read_latency if location is None: # Cache miss during decode - need to allocate (unless decode_only) @@ -572,8 +596,9 @@ def process_requests(self, stop_event: threading.Event): for _ in range(num_batched_reads): _, batch_read_latency = self.cache.access_cache(decode_key, InferencePhase.DECODE, cache_type) storage_latency += batch_read_latency + decode_total_latency += batch_read_latency - with self.results_lock: self.results['decode_latencies'].append(read_latency) + with self.results_lock: self.results['decode_latencies'].append(decode_total_latency) # 6. Simulate token generation time. generation_latency = request.generate_tokens * GENERATION_TIMING[self.generation_mode] @@ -668,6 +693,432 @@ def monitor_stats(self, stop_event: threading.Event): f"Throughput: {throughput:.2f} tok/s") last_log_time = now + def _start_latency_tracing(self): + """Spawn bpftrace as a sudo subprocess to trace block-layer device latency.""" + script_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'utils') + script_path = os.path.join(script_dir, 'storage_latency_stack.sh') + + if not os.path.exists(script_path): + logger.warning(f"Tracing script not found: {script_path}") + print(f" WARNING: {script_path} not found. Skipping latency tracing.") + return + + # Determine the process name to filter on + comm = os.path.basename(sys.argv[0]) if sys.argv[0] else 'python3' + + print(f"\n### LATENCY TRACING ###") + print(f" Script: {script_path}") + print(f" Filter: {comm}") + print(f" Spawning sudo bpftrace (you may be prompted for password)...") + + try: + self._trace_proc = subprocess.Popen( + ['sudo', script_path, comm], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + preexec_fn=os.setsid # own process group for clean SIGINT + ) + # Give bpftrace time to attach probes + time.sleep(2) + if self._trace_proc.poll() is not None: + stderr = self._trace_proc.stderr.read().decode('utf-8', errors='replace') + logger.warning(f"bpftrace exited early: {stderr[:200]}") + print(f" WARNING: bpftrace failed to start. Continuing without tracing.") + self._trace_proc = None + else: + print(f" bpftrace attached (pid {self._trace_proc.pid}). Tracing active.") + except FileNotFoundError: + logger.warning("sudo or bpftrace not found") + print(f" WARNING: sudo/bpftrace not available. Skipping latency tracing.") + self._trace_proc = None + except Exception as e: + logger.warning(f"Failed to start tracing: {e}") + print(f" WARNING: Tracing failed: {e}. Continuing without tracing.") + self._trace_proc = None + + def _stop_latency_tracing(self) -> Optional[Dict]: + """Send SIGINT to bpftrace, capture histograms, parse into dict.""" + if not self._trace_proc: + return None + + print(f"\n### COLLECTING TRACE DATA ###") + try: + # SIGINT to the process group triggers bpftrace's END block + os.killpg(os.getpgid(self._trace_proc.pid), signal.SIGINT) + stdout, stderr = self._trace_proc.communicate(timeout=10) + except subprocess.TimeoutExpired: + os.killpg(os.getpgid(self._trace_proc.pid), signal.SIGKILL) + stdout, stderr = self._trace_proc.communicate() + except Exception as e: + logger.warning(f"Error stopping bpftrace: {e}") + return None + + # bpftrace may print histograms to stdout or stderr depending on version + stdout_text = stdout.decode('utf-8', errors='replace') + stderr_text = stderr.decode('utf-8', errors='replace') + output = stdout_text + '\n' + stderr_text + + if '@' not in output: + logger.warning(f"No histogram data in bpftrace output ({len(output)} bytes)") + logger.warning(f"Raw output: {repr(output[:500])}") + return {} + + return self._parse_bpftrace_output(output) + + def _parse_bpftrace_output(self, output: str) -> Dict: + """Parse bpftrace histogram output into structured dict.""" + result = {} + current_hist = None + lines = output.split('\n') + + for line in lines: + # Match histogram name like "@d2c_read_us:" or "@vfs_write_us:" + # bpftrace may prefix with newlines or spaces + hist_match = re.match(r'^\s*@(\w+):\s*$', line) + if hist_match: + current_hist = hist_match.group(1) + result[current_hist] = {'buckets': [], 'raw': []} + continue + + # Match histogram bucket like "[128, 256) 5 |@@@@ |" + bucket_match = re.match(r'^\[(\d+),\s*(\d+)\)\s+(\d+)\s+\|', line) + if bucket_match and current_hist: + low = int(bucket_match.group(1)) + high = int(bucket_match.group(2)) + count = int(bucket_match.group(3)) + result[current_hist]['buckets'].append({ + 'range_us': [low, high], + 'count': count + }) + result[current_hist]['raw'].append(line.rstrip()) + continue + + # Match single-value bucket like "[1M, 2M) 1 |@@@@ |" + bucket_match_k = re.match(r'^\[(\d+)([KM]),\s*(\d+)([KM])\)\s+(\d+)\s+\|', line) + if bucket_match_k and current_hist: + def parse_val(num, suffix): + v = int(num) + return v * 1024 if suffix == 'K' else v * 1048576 + low = parse_val(bucket_match_k.group(1), bucket_match_k.group(2)) + high = parse_val(bucket_match_k.group(3), bucket_match_k.group(4)) + count = int(bucket_match_k.group(5)) + result[current_hist]['buckets'].append({ + 'range_us': [low, high], + 'count': count + }) + result[current_hist]['raw'].append(line.rstrip()) + + return result + + @staticmethod + def _hist_percentile(buckets: List[Dict], pct: float) -> Dict: + """Compute a percentile bucket from a parsed histogram.""" + total = sum(b['count'] for b in buckets) + if total == 0: + return buckets[0] if buckets else {'range_us': [0, 0], 'count': 0} + target = total * pct / 100.0 + cumulative = 0 + for b in buckets: + cumulative += b['count'] + if cumulative >= target: + return b + return buckets[-1] + + def _print_trace_results(self, trace_data: Dict): + """Print parsed bpftrace histograms.""" + if not trace_data: + return + + print(f"\n### DEVICE LATENCY TRACING (bpftrace) ###") + + # ── Latency histograms ── + latency_histograms = [ + ('d2c_read_us', 'D2C Read (device hardware time)', 'us'), + ('d2c_write_us', 'D2C Write (device hardware time)', 'us'), + ('q2d_read_us', 'Q2D Read (I/O scheduler queue)', 'us'), + ('q2d_write_us', 'Q2D Write (I/O scheduler queue)', 'us'), + ('vfs_read_us', 'VFS Read (application-visible)', 'us'), + ('vfs_write_us', 'VFS Write (application-visible)', 'us'), + ('fsync_us', 'fsync (device flush)', 'us'), + ('write_to_fsync_us', 'Write-to-fsync gap (CPU serialization)', 'us'), + ('fadvise_to_read_us', 'fadvise-to-read gap (cache drop)', 'us'), + ] + + for key, label, unit in latency_histograms: + if key not in trace_data or not trace_data[key]['buckets']: + continue + buckets = trace_data[key]['buckets'] + total_count = sum(b['count'] for b in buckets) + if total_count == 0: + continue + + p50 = self._hist_percentile(buckets, 50) + p95 = self._hist_percentile(buckets, 95) + p99 = self._hist_percentile(buckets, 99) + + print(f"\n {label}:") + print(f" Samples: {total_count}") + print(f" P50: [{p50['range_us'][0]:,}, {p50['range_us'][1]:,}) {unit}") + print(f" P95: [{p95['range_us'][0]:,}, {p95['range_us'][1]:,}) {unit}") + print(f" P99: [{p99['range_us'][0]:,}, {p99['range_us'][1]:,}) {unit}") + for raw_line in trace_data[key]['raw']: + print(f" {raw_line}") + + # ── Block size distribution ── + bssplit_histograms = [ + ('bssplit_read_kb', 'Block Size Distribution (Reads)'), + ('bssplit_write_kb', 'Block Size Distribution (Writes)'), + ] + + for key, label in bssplit_histograms: + if key not in trace_data or not trace_data[key]['buckets']: + continue + buckets = trace_data[key]['buckets'] + total_count = sum(b['count'] for b in buckets) + if total_count == 0: + continue + + p50 = self._hist_percentile(buckets, 50) + + print(f"\n {label}:") + print(f" I/O count: {total_count}") + print(f" P50: [{p50['range_us'][0]:,}, {p50['range_us'][1]:,}) KB") + for raw_line in trace_data[key]['raw']: + print(f" {raw_line}") + + # ── Queue depth distribution ── + qd_histograms = [ + ('qd_read', 'Queue Depth at Dispatch (Reads)'), + ('qd_write', 'Queue Depth at Dispatch (Writes)'), + ] + + for key, label in qd_histograms: + if key not in trace_data or not trace_data[key]['buckets']: + continue + buckets = trace_data[key]['buckets'] + total_count = sum(b['count'] for b in buckets) + if total_count == 0: + continue + + p50 = self._hist_percentile(buckets, 50) + p95 = self._hist_percentile(buckets, 95) + + print(f"\n {label}:") + print(f" Samples: {total_count}") + print(f" P50: [{p50['range_us'][0]}, {p50['range_us'][1]})") + print(f" P95: [{p95['range_us'][0]}, {p95['range_us'][1]})") + for raw_line in trace_data[key]['raw']: + print(f" {raw_line}") + + # ── LBA heatmap ── + lba_histograms = [ + ('lba_read_gb', 'LBA Heatmap (Reads, 10 GB buckets)'), + ('lba_write_gb', 'LBA Heatmap (Writes, 10 GB buckets)'), + ] + + for key, label in lba_histograms: + if key not in trace_data or not trace_data[key]['buckets']: + continue + buckets = trace_data[key]['buckets'] + total_count = sum(b['count'] for b in buckets) + if total_count == 0: + continue + + # Find the hot zone (buckets with > 1% of I/O) + hot_zones = [b for b in buckets if b['count'] > total_count * 0.01] + if hot_zones: + hot_start = hot_zones[0]['range_us'][0] + hot_end = hot_zones[-1]['range_us'][1] + hot_pct = sum(b['count'] for b in hot_zones) * 100.0 / total_count + else: + hot_start = hot_end = hot_pct = 0 + + print(f"\n {label}:") + print(f" I/O count: {total_count}") + if hot_zones: + print(f" Hot zone: {hot_start}-{hot_end} GB ({hot_pct:.0f}% of I/O)") + for raw_line in trace_data[key]['raw']: + print(f" {raw_line}") + + def _generate_fio_workload(self, trace_data: Dict) -> Optional[str]: + """Generate a fio workload .ini file from bpftrace trace data. + + Distills the traced block-layer I/O pattern into a standalone fio config + that reproduces the same bssplit, read/write ratio, queue depth, and + idle time characteristics observed during the benchmark run. + """ + # ── Validate minimum required histograms ── + required = ['bssplit_read_kb', 'bssplit_write_kb'] + for key in required: + if key not in trace_data or not trace_data[key].get('buckets'): + logger.warning(f"Missing {key} histogram; cannot generate fio workload") + return None + + # ── bssplit: convert histogram buckets to fio format ── + def hist_to_bssplit(buckets: List[Dict]) -> str: + total = sum(b['count'] for b in buckets) + if total == 0: + return "4k/100" + parts = [] + for b in buckets: + if b['count'] == 0: + continue + size_kb = b['range_us'][0] # lower bound of bucket + pct = int(round(b['count'] * 100.0 / total)) + if pct == 0 and b['count'] > 0: + pct = 1 # don't drop non-zero buckets + # Format size: use k for < 1024, m for >= 1024 + if size_kb >= 1024: + size_str = f"{size_kb // 1024}m" + else: + size_str = f"{size_kb}k" + parts.append(f"{size_str}/{pct}") + return ":".join(parts) if parts else "4k/100" + + read_bssplit = hist_to_bssplit(trace_data['bssplit_read_kb']['buckets']) + write_bssplit = hist_to_bssplit(trace_data['bssplit_write_kb']['buckets']) + bssplit_line = f"{read_bssplit},{write_bssplit}" + + # ── rwmixread: from I/O count ratio ── + read_count = sum(b['count'] for b in trace_data['bssplit_read_kb']['buckets']) + write_count = sum(b['count'] for b in trace_data['bssplit_write_kb']['buckets']) + total_io = read_count + write_count + rwmixread = int(round(read_count * 100.0 / total_io)) if total_io > 0 else 50 + + # ── iodepth: from QD histogram P50 ── + iodepth = 32 # default + for qd_key in ('qd_read', 'qd_write'): + if qd_key in trace_data and trace_data[qd_key].get('buckets'): + p50 = self._hist_percentile(trace_data[qd_key]['buckets'], 50) + candidate = max(1, p50['range_us'][0]) + iodepth = max(iodepth, candidate) + + # ── thinktime: from write_to_fsync gap (CPU idle between I/O bursts) ── + thinktime_us = 0 + if 'write_to_fsync_us' in trace_data and trace_data['write_to_fsync_us'].get('buckets'): + buckets = trace_data['write_to_fsync_us']['buckets'] + total_samples = sum(b['count'] for b in buckets) + if total_samples >= 4: + p50 = self._hist_percentile(buckets, 50) + thinktime_us = p50['range_us'][0] + + # ── thinktime_iotime: from fsync latency (active I/O burst duration) ── + thinktime_iotime_us = 0 + if 'fsync_us' in trace_data and trace_data['fsync_us'].get('buckets'): + buckets = trace_data['fsync_us']['buckets'] + total_samples = sum(b['count'] for b in buckets) + if total_samples >= 4: + p50 = self._hist_percentile(buckets, 50) + thinktime_iotime_us = p50['range_us'][0] + + # ── Build the fio config ── + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + model_name = self.model_config.name + bpt = self.model_config.kv_cache_size_per_token + + lines = [ + f"# KV Cache Benchmark; Traced Workload", + f"# Generated: {timestamp}", + f"# Model: {model_name}, Users: {self.num_users}, Duration: {self.duration}s", + f"# KV bytes/token: {bpt:,} bytes ({bpt/1024:.0f} KB)", + f"#", + f"# Distilled from bpftrace block-layer tracing during benchmark run.", + f"# Total traced I/Os: {total_io:,} ({read_count:,} reads, {write_count:,} writes)", + f"#", + f"# Usage:", + f"# fio --filename=/dev/nvmeXn1", + f"# fio --filename=/mnt/nvme/fio_test --size=100G", + f"", + f"[kv-cache-traced]", + f"ioengine=libaio", + f"direct=1", + f"time_based", + f"runtime=300", + f"rw=randrw", + f"rwmixread={rwmixread}", + f"bssplit={bssplit_line}", + f"iodepth={iodepth}", + f"iodepth_batch_submit={iodepth}", + f"iodepth_batch_complete_min=1", + f"size=100%", + ] + + if thinktime_us > 0: + lines.extend([ + f"thinktime={thinktime_us}", + f"thinktime_blocks={iodepth}", + ]) + if thinktime_iotime_us > 0: + # thinktime_iotime requires fio 3.28+ + # Converts active I/O period from microseconds to seconds + thinktime_iotime_s = max(1, thinktime_iotime_us // 1000000) + lines.append(f"# thinktime_iotime={thinktime_iotime_s}s # uncomment for fio 3.28+ (active I/O burst before idle)") + + lines.extend([ + f"refill_buffers=1", + f"norandommap=1", + f"randrepeat=0", + f"numjobs=1", + f"group_reporting", + f"percentile_list=50:95:99:99.9:99.99", + ]) + + return "\n".join(lines) + "\n" + + def _check_memory_safety(self): + """Estimate peak memory usage and warn if OOM is likely. + + Peak memory per worker thread comes from access_cache -> np.load + np.array + which briefly holds 2x the entry size in RAM. With N worker threads running + concurrently, peak = N * 2 * mean_entry_bytes + baseline (precomputed buffer, + Python/torch overhead, OS). + """ + try: + import psutil + available_ram = psutil.virtual_memory().available + except ImportError: + try: + with open('/proc/meminfo', 'r') as f: + for line in f: + if line.startswith('MemAvailable:'): + available_ram = int(line.split()[1]) * 1024 # kB to bytes + break + else: + return # can't determine + except (FileNotFoundError, ValueError): + return # non-Linux or parse error + + bpt = self.model_config.kv_cache_size_per_token + # Mean context from user templates: midpoint of chatbot + coding + document ranges + mean_context_tokens = 8000 # conservative estimate + mean_entry_bytes = mean_context_tokens * bpt + # np.load + np.array = 2x entry size per concurrent read + per_thread_peak = mean_entry_bytes * 2 + # Worker threads = min(num_users, 500); all can read concurrently + num_workers = min(self.num_users, 500) + # Baseline: precomputed buffer (~256 MB) + Python/torch overhead (~2 GB) + baseline = 2.5 * 1024**3 + estimated_peak = (num_workers * per_thread_peak) + baseline + + safe_workers = max(1, int((available_ram - baseline) / per_thread_peak)) if per_thread_peak > 0 else num_workers + + print(f"\n### MEMORY SAFETY CHECK ###") + print(f" Formula: peak = (workers x 2 x mean_entry_bytes) + baseline") + print(f" = ({num_workers} x 2 x {mean_entry_bytes / 1024**2:.0f} MB) + {baseline / 1024**3:.1f} GB") + print(f" = {estimated_peak / 1024**3:.1f} GB") + print(f" Available RAM: {available_ram / 1024**3:.1f} GB") + print(f" Mean entry size: {mean_entry_bytes / 1024**2:.0f} MB ({mean_context_tokens} tok x {bpt:,} B/tok)") + print(f" Peak per thread: {per_thread_peak / 1024**2:.0f} MB (np.load + np.array copy)") + print(f" Worker threads: {num_workers}") + print(f" Safe concurrent readers: ~{safe_workers} = (available - baseline) / peak_per_thread") + + if estimated_peak > available_ram * 0.85: + print(f" WARNING: Estimated peak memory ({estimated_peak / 1024**3:.1f} GB) exceeds 85% of") + print(f" available RAM ({available_ram / 1024**3:.1f} GB). Risk of OOM with {num_workers} workers.") + print(f" Consider: --num-users {min(safe_workers, self.num_users)} or --max-concurrent-allocs {max(1, safe_workers // 2)}") + else: + print(f" Status: OK") + def run(self) -> Dict: """The main entry point to start the benchmark execution.""" print(f"\nIntegrated Multi-User KV Cache Benchmark - MLPerf Edition") @@ -714,6 +1165,8 @@ def run(self) -> Dict: print(f" Conversations: {self.sharegpt_loader.token_stats.get('total_conversations', 0)}") print(f" Total Turns: {self.sharegpt_loader.token_stats.get('total_turns', 0)}") + self._check_memory_safety() + if self.precondition: self._run_preconditioning() @@ -727,6 +1180,9 @@ def run(self) -> Dict: mode_str = "PREFILL-ONLY (write-heavy, disaggregated prefill node)" elif self.decode_only: mode_str = "DECODE-ONLY (read-heavy, assumes KV cache pre-populated)" + if self.enable_latency_tracing: + self._start_latency_tracing() + print(f"\nStarting benchmark... Mode: {mode_str}") print("-" * 80) @@ -763,8 +1219,24 @@ def run(self) -> Dict: for thread in threads: thread.join(timeout=2.0) + # Stop tracing and collect results before stats calculation + trace_data = None + if self.enable_latency_tracing: + trace_data = self._stop_latency_tracing() + self._calculate_stats(actual_duration) + if trace_data: + self.results['device_latency_tracing'] = trace_data + self._print_trace_results(trace_data) + + fio_config = self._generate_fio_workload(trace_data) + if fio_config: + self.results['fio_workload'] = fio_config + print(f"\n### GENERATED FIO WORKLOAD ###") + for line in fio_config.strip().split('\n'): + print(f" {line}") + if self.validator: self.results['validation'] = self.validator.validate_benchmark(self.results) diff --git a/kv_cache_benchmark/kv_cache/cache.py b/kv_cache_benchmark/kv_cache/cache.py index e1d904ae..69621f8e 100755 --- a/kv_cache_benchmark/kv_cache/cache.py +++ b/kv_cache_benchmark/kv_cache/cache.py @@ -710,6 +710,22 @@ def _allocate_cache_inner(self, key: str, num_tokens: int, phase: InferencePhase del data return False, 'none', 0.0 + def check_cache_exists(self, key: str) -> Tuple[Optional[str], int]: + """Metadata-only existence check. No I/O, no LRU update. + + Used by multi-turn virtual context checks: determines whether a + previous turn's KV cache entry survived LRU eviction without + loading data from disk (no np.load, no memory allocation). + + Returns: + (location, size_bytes) if the entry exists, (None, 0) otherwise. + """ + with self.metadata_lock: + entry = self.cache_entries.get(key) + if entry is None: + return None, 0 + return entry['location'], entry['size'] + def access_cache(self, key: str, phase: InferencePhase = InferencePhase.DECODE, cache_type: str = 'user') -> Tuple[Optional[str], float]: """Accesses an existing cached entry and records the read performance.""" diff --git a/kv_cache_benchmark/kv_cache/cli.py b/kv_cache_benchmark/kv_cache/cli.py index 03864c3b..c1f8b63f 100755 --- a/kv_cache_benchmark/kv_cache/cli.py +++ b/kv_cache_benchmark/kv_cache/cli.py @@ -208,6 +208,78 @@ def get_nested(d, keys, default=None): qos_df = pd.DataFrame(qos_rows) qos_df.to_excel(writer, sheet_name='QoS Metrics', index=False) + # Device tracing sheet (when --enable-latency-tracing is used) + trace_data = results.get('device_latency_tracing', {}) + if trace_data: + trace_rows = [] + display_order = [ + ('d2c_read_us', 'D2C Read (us)', 'Device hardware time'), + ('d2c_write_us', 'D2C Write (us)', 'Device hardware time'), + ('q2d_read_us', 'Q2D Read (us)', 'I/O scheduler queue'), + ('q2d_write_us', 'Q2D Write (us)', 'I/O scheduler queue'), + ('vfs_read_us', 'VFS Read (us)', 'Application-visible'), + ('vfs_write_us', 'VFS Write (us)', 'Application-visible'), + ('fsync_us', 'fsync (us)', 'Device flush'), + ('write_to_fsync_us', 'Write-to-fsync (us)', 'CPU serialization gap'), + ('fadvise_to_read_us', 'fadvise-to-read (us)', 'Cache drop overhead'), + ('bssplit_read_kb', 'Block Size Read (KB)', 'I/O size distribution'), + ('bssplit_write_kb', 'Block Size Write (KB)', 'I/O size distribution'), + ('qd_read', 'Queue Depth Read', 'Instantaneous QD at dispatch'), + ('qd_write', 'Queue Depth Write', 'Instantaneous QD at dispatch'), + ('lba_read_gb', 'LBA Heatmap Read (GB)', 'Spatial I/O distribution'), + ('lba_write_gb', 'LBA Heatmap Write (GB)', 'Spatial I/O distribution'), + ] + + def hist_pct(buckets, pct): + total = sum(b['count'] for b in buckets) + if total == 0: + return 0 + target = total * pct / 100.0 + cum = 0 + for b in buckets: + cum += b['count'] + if cum >= target: + return b['range_us'][0] + return buckets[-1]['range_us'][0] + + for key, label, description in display_order: + if key not in trace_data or not trace_data[key].get('buckets'): + continue + buckets = trace_data[key]['buckets'] + total_count = sum(b['count'] for b in buckets) + if total_count == 0: + continue + trace_rows.append({ + 'Metric': label, + 'Description': description, + 'Samples': total_count, + 'P50': hist_pct(buckets, 50), + 'P95': hist_pct(buckets, 95), + 'P99': hist_pct(buckets, 99), + 'Min Bucket': buckets[0]['range_us'][0], + 'Max Bucket': buckets[-1]['range_us'][1], + }) + + if trace_rows: + trace_df = pd.DataFrame(trace_rows) + trace_df.to_excel(writer, sheet_name='Device Tracing', index=False) + + # Raw histograms sheet + raw_rows = [] + for key, label, _ in display_order: + if key not in trace_data or not trace_data[key].get('buckets'): + continue + for b in trace_data[key]['buckets']: + raw_rows.append({ + 'Histogram': label, + 'Bucket Low': b['range_us'][0], + 'Bucket High': b['range_us'][1], + 'Count': b['count'], + }) + if raw_rows: + raw_df = pd.DataFrame(raw_rows) + raw_df.to_excel(writer, sheet_name='Trace Histograms', index=False) + logger.info(f"XLSX results saved to {output_path}") else: csv_path = output_path.replace('.xlsx', '.csv') if output_path.endswith('.xlsx') else output_path @@ -299,6 +371,8 @@ def main(): help='Simulate disaggregated prefill node (write-heavy, no decode reads).') parser.add_argument('--decode-only', action='store_true', help='Simulate disaggregated decode node (read-heavy, assumes KV cache exists).') + parser.add_argument('--enable-latency-tracing', action='store_true', + help='Enable bpftrace device latency tracing (requires sudo, bpftrace).') args = parser.parse_args() @@ -377,7 +451,8 @@ def main(): trace_speedup=args.trace_speedup, replay_cycles=args.replay_cycles, prefill_only=args.prefill_only, - decode_only=args.decode_only + decode_only=args.decode_only, + enable_latency_tracing=args.enable_latency_tracing ) results = benchmark.run() @@ -401,6 +476,15 @@ def convert_numpy(obj): if args.xlsx_output: export_results_to_xlsx(results, args, args.xlsx_output) + # Save fio workload file when latency tracing produced one + fio_config = results.get('fio_workload') + if fio_config: + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + fio_filename = f"fio_kv_cache_workload_{timestamp}.ini" + with open(fio_filename, 'w') as f: + f.write(fio_config) + logger.info(f"fio workload saved to {fio_filename}") + if __name__ == "__main__": main() diff --git a/kv_cache_benchmark/tests/test_kv_cache.py b/kv_cache_benchmark/tests/test_kv_cache.py index df8d7909..baf67c22 100644 --- a/kv_cache_benchmark/tests/test_kv_cache.py +++ b/kv_cache_benchmark/tests/test_kv_cache.py @@ -3341,31 +3341,21 @@ def test_part3b_request_to_npy_file_mapping(self, tiny_model): def test_part3c_multi_turn_prefill_decode_file_io(self, tiny_model): """ - Shows how a multi-turn conversation creates and reads .npy files. + Shows how a multi-turn conversation reads ALL previous turns. - Conversation with 4 turns: + Each turn writes its own new tokens. On subsequent turns, ALL + previous turns are read via access_cache to reload the full + conversation context. Evicted turns return (None, 0.0) with + no I/O; surviving turns trigger real np.load reads. - Turn 1 (no previous context): - cache_key = "conv_XXX_turn_1" - PREFILL: allocate_cache() → WRITE conv_XXX_turn_1.npy (new file) - DECODE: access_cache() → READ conv_XXX_turn_1.npy + Turn 1: WRITE turn_1 + READ turn_1 (decode) + Turn 2: READ [turn_1] + WRITE turn_2 + READ turn_2 + Turn 3: READ [turn_1, turn_2] + WRITE turn_3 + READ turn_3 + Turn 4: READ [turn_1, turn_2, turn_3] + WRITE turn_4 + READ turn_4 - Turn 2 (has previous turn): - cache_key = "conv_XXX_turn_2" - MULTI-TURN READ: access_cache(turn_1) → READ conv_XXX_turn_1.npy ← reuse! - PREFILL: allocate_cache() → WRITE conv_XXX_turn_2.npy (new file) - DECODE: access_cache() → READ conv_XXX_turn_2.npy - - Turn 3: - MULTI-TURN READ: access_cache(turn_2) → READ conv_XXX_turn_2.npy ← reuse! - PREFILL: WRITE conv_XXX_turn_3.npy - DECODE: READ conv_XXX_turn_3.npy - - Each turn: - - Reads the PREVIOUS turn's .npy (multi-turn cache reuse) - - Writes a NEW .npy for this turn's KV cache - - Reads the NEW .npy during decode - - File count grows by 1 per turn (until eviction cleans old ones) + Multi-turn reads (triangular): 0+1+2+3 = 6 + Decode reads: 4 (one per turn) + Total decode_reads stat: 10 This is the exact flow from benchmark.py process_requests() steps 2, 3, 5. """ @@ -3411,25 +3401,21 @@ def test_part3c_multi_turn_prefill_decode_file_io(self, tiny_model): storage_latency = 0.0 file_ops = [] - # ── Step 2: Multi-turn read (previous turn's cache) ── + # ── Step 2: Multi-turn read (ALL previous turns) ── if turn > 1: - prev_key = f"{conv_id}_turn_{turn - 1}" - prev_file = nvme_dir / f"{prev_key}.npy" - - print(f"\n Step 2: MULTI-TURN READ (reuse previous turn)") - print(f" Read: {prev_key}.npy") - print(f" Exists: {prev_file.exists()}") - - location, read_lat = cache.access_cache( - prev_key, InferencePhase.DECODE, 'multi_turn' - ) - storage_latency += read_lat - file_ops.append(f"READ {prev_key}.npy ({read_lat*1000:.3f} ms) [multi-turn reuse]") - - if location: - print(f" Hit: location={location}, latency={read_lat*1000:.3f} ms") - else: - print(f" Miss: previous turn not in cache") + prev_keys = conv_mgr.get_all_previous_turn_keys(conv_id, turn) + print(f"\n Step 2: MULTI-TURN READ ({len(prev_keys)} previous turn(s))") + for prev_key in prev_keys: + location, read_lat = cache.access_cache( + prev_key, InferencePhase.DECODE, 'multi_turn' + ) + storage_latency += read_lat + if location: + file_ops.append(f"READ {prev_key}.npy ({read_lat*1000:.3f} ms) [hit, {location}]") + print(f" {prev_key} -> Hit ({location}, {read_lat*1000:.3f} ms)") + else: + file_ops.append(f"READ {prev_key}.npy [miss, evicted]") + print(f" {prev_key} -> Miss (evicted)") else: print(f"\n Step 2: MULTI-TURN READ — skipped (turn 1, no history)") @@ -3488,34 +3474,145 @@ def test_part3c_multi_turn_prefill_decode_file_io(self, tiny_model): print(f" Total write bytes: {cache.stats['total_write_bytes']/1024:.0f} KB") print(f" Total read bytes: {cache.stats['total_read_bytes']/1024:.0f} KB") - print(f"\n File-per-turn pattern:") - print(f" Turn 1: WRITE turn_1.npy + READ turn_1.npy") - print(f" Turn 2: READ turn_1.npy + WRITE turn_2.npy + READ turn_2.npy") - print(f" Turn 3: READ turn_2.npy + WRITE turn_3.npy + READ turn_3.npy") - print(f" Turn N: READ turn_(N-1).npy + WRITE turn_N.npy + READ turn_N.npy") + multi_turn_reads = (num_turns * (num_turns - 1)) // 2 + + print(f"\n Full-context reload pattern:") + print(f" Turn 1: WRITE turn_1 + READ turn_1") + print(f" Turn 2: READ [turn_1] + WRITE turn_2 + READ turn_2") + print(f" Turn 3: READ [turn_1, turn_2] + WRITE turn_3 + READ turn_3") + print(f" Turn N: READ [turns 1..N-1] + WRITE turn_N + READ turn_N") print(f"") print(f" I/O per turn:") print(f" Turn 1: 1 write + 1 read = 2 I/O ops") - print(f" Turn 2+: 1 write + 2 reads = 3 I/O ops (extra read = multi-turn reuse)") - print(f"") - print(f" Write amplification over {num_turns} turns:") - total_data = num_turns * context_per_turn * bpt - total_written = cache.stats['total_write_bytes'] - print(f" Unique KV data: {total_data/1024:.0f} KB " - f"({num_turns} turns × {context_per_turn} tok × {bpt:,d} B)") - print(f" Bytes written: {total_written/1024:.0f} KB") - print(f" Ratio: {total_written / total_data:.2f}x") + print(f" Turn N: 1 write + N reads = {num_turns + 1} I/O ops (turn {num_turns})") + print(f" Multi-turn reads (triangular): {multi_turn_reads}") # Assertions assert len(all_npy) == num_turns, \ f"Should have {num_turns} .npy files (one per turn), got {len(all_npy)}" assert cache.stats['prefill_writes'] == num_turns, \ f"Should have {num_turns} prefill writes" - # decode_reads: turn 1 has 1, turns 2-4 have 2 each (multi-turn + decode) - expected_reads = 1 + (num_turns - 1) * 2 + # decode_reads = multi-turn reads (triangular) + decode reads (one per turn) + expected_reads = num_turns + multi_turn_reads assert cache.stats['decode_reads'] == expected_reads, \ f"Expected {expected_reads} decode reads, got {cache.stats['decode_reads']}" + # ------------------------------------------------------------------ + # Part 3d: Multi-turn with eviction pressure + # ------------------------------------------------------------------ + + def test_part3d_multi_turn_with_eviction(self, tiny_model): + """ + Shows what happens when previous turns get evicted by the LRU + waterfall before the next turn reads them. + + Setup: NVMe capacity fits only 3 entries. A 6-turn conversation + forces turns 1-3 to be evicted as turns 4-6 are written. The + multi-turn reads for later turns show a mix of hits and misses. + + This validates that access_cache returns (None, 0.0) cleanly for + evicted entries, and that the benchmark correctly counts misses. + """ + print("\n" + "=" * 72) + print(" PART 3d: MULTI-TURN WITH EVICTION PRESSURE") + print("=" * 72) + + bpt = tiny_model.kv_cache_size_per_token + context_per_turn = 200 + entry_bytes = context_per_turn * bpt + # Capacity for ~3 entries; turns 4+ force eviction of oldest + capacity_bytes = entry_bytes * 3.5 + capacity_gb = capacity_bytes / (1024 ** 3) + + cache = MultiTierCache( + model_config=tiny_model, + gpu_memory_gb=0, + cpu_memory_gb=0, + seed=42, + storage_capacity_gb=capacity_gb, + ) + + nvme_dir = cache.backends['nvme'].base_path + conv_mgr = ConversationManager(max_conversations=10) + conv_id = conv_mgr.start_conversation("bob") + + num_turns = 6 + print(f"\n Entry size: {entry_bytes/1024:.0f} KB") + print(f" NVMe capacity: {capacity_bytes/1024:.0f} KB (~{capacity_bytes/entry_bytes:.1f} entries)") + print(f" Turns: {num_turns} (will force eviction after turn 3)") + + total_hits = 0 + total_misses = 0 + + for turn in range(1, num_turns + 1): + turn_num, cache_key = conv_mgr.add_turn(conv_id, context_per_turn, 50) + + print(f"\n Turn {turn}:") + + # Step 2: read all previous turns + if turn > 1: + prev_keys = conv_mgr.get_all_previous_turn_keys(conv_id, turn) + hits = 0 + misses = 0 + for prev_key in prev_keys: + location, _ = cache.access_cache(prev_key, InferencePhase.DECODE, 'multi_turn') + if location is not None: + hits += 1 + else: + misses += 1 + total_hits += hits + total_misses += misses + print(f" Step 2: read {len(prev_keys)} previous turns -> {hits} hits, {misses} misses") + + # Step 3: write this turn + success, tier, _ = cache.allocate_cache( + cache_key, num_tokens=context_per_turn, phase=InferencePhase.PREFILL + ) + npy_count = len(list(nvme_dir.glob("*.npy"))) + print(f" Step 3: write {cache_key} -> {tier}, files on disk: {npy_count}") + + npy_files = sorted(nvme_dir.glob("*.npy")) + print(f"\n {'=' * 64}") + print(f" EVICTION SUMMARY") + print(f" {'=' * 64}") + print(f" Multi-turn hits: {total_hits}") + print(f" Multi-turn misses: {total_misses}") + print(f" Files on disk: {len(npy_files)} (capacity held ~3)") + print(f" Evictions: {cache.stats['evictions']}") + print(f" Cache misses: {cache.stats['cache_misses']} (tier-level, all types)") + for f in npy_files: + print(f" {f.name}") + + print(f"\n HOW CACHE MISSES INCREMENT:") + print(f" ────────────────────────────") + print(f" Two separate counters track misses:") + print(f"") + print(f" 1. cache.stats['cache_misses'] (tier-level, inside access_cache):") + print(f" Increments when ANY access_cache() call finds the key missing") + print(f" from cache_entries. Covers all miss types: multi-turn, decode,") + print(f" prefix, RAG. Returns (None, 0.0) immediately; no I/O, no np.load.") + print(f"") + print(f" 2. results['multi_turn_cache_misses'] (benchmark-level, step 2 only):") + print(f" Increments when a multi-turn access_cache() returns None.") + print(f" This is a SUBSET of cache_misses; it counts only the misses") + print(f" from the step 2 previous-turn read loop.") + print(f"") + print(f" In this test: {cache.stats['cache_misses']} tier-level misses total,") + print(f" {total_misses} of which are multi-turn misses from evicted turns.") + print(f"") + print(f" A high multi-turn miss rate ({total_misses}/{total_hits + total_misses}" + f" = {total_misses*100/max(total_hits+total_misses,1):.0f}%) means the") + print(f" NVMe tier is too small to hold the full conversation history.") + print(f" The LRU waterfall evicts old turns before they can be reused.") + + # Assertions + assert total_misses > 0, "Eviction should cause some multi-turn misses" + assert total_hits > 0, "Recent turns should still be cached" + assert cache.stats['evictions'] > 0, "Eviction should have occurred" + assert cache.stats['cache_misses'] >= total_misses, \ + "Tier-level misses should be >= multi-turn misses (superset)" + assert len(npy_files) <= 4, "NVMe should hold ~3 entries, not all 6" + # ------------------------------------------------------------------ # Part 4: 3-tier waterfall LRU eviction # ------------------------------------------------------------------ diff --git a/kv_cache_benchmark/utils/distill_fio.py b/kv_cache_benchmark/utils/distill_fio.py new file mode 100755 index 00000000..09dd50e4 --- /dev/null +++ b/kv_cache_benchmark/utils/distill_fio.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3 +""" +distill_fio.py — Convert bpftrace histogram output to a fio workload file. + +Parses the output of storage_latency_stack.bt and generates a standalone +fio .ini that reproduces the same I/O pattern: bssplit, read/write ratio, +queue depth, and idle time. + +Usage: + # Pipe from bpftrace: + sudo bpftrace storage_latency_stack.bt "vllm" 2>&1 | python3 distill_fio.py + + # From saved file: + python3 distill_fio.py < trace_output.txt + + # With custom output name: + python3 distill_fio.py -o vllm_workload.ini < trace_output.txt + + # Standalone (called by storage_latency_stack.sh --fio): + python3 distill_fio.py -i /tmp/trace_raw.txt -o fio_traced.ini --process vllm + +Author: Hazem Awadallah, Kingston Digital +License: Apache-2.0, donated to MLCommons +""" + +import re +import sys +import argparse +from datetime import datetime +from typing import Dict, List, Optional, Tuple + + +def parse_bpftrace_output(text: str) -> Dict: + """Parse bpftrace histogram output into structured dict.""" + result = {} + current_hist = None + + for line in text.split('\n'): + # Match histogram name: @histogram_name: + hist_match = re.match(r'^\s*@(\w+):\s*$', line) + if hist_match: + current_hist = hist_match.group(1) + result[current_hist] = [] + continue + + if current_hist is None: + continue + + # Match numeric bucket: [low, high) count |@@@@| + m = re.match(r'^\[(\d+),\s*(\d+)\)\s+(\d+)\s+\|', line) + if m: + result[current_hist].append({ + 'low': int(m.group(1)), + 'high': int(m.group(2)), + 'count': int(m.group(3)), + }) + continue + + # Match K/M bucket: [1K, 2K) count |@@@@| + m = re.match(r'^\[(\d+)([KM]),\s*(\d+)([KM])\)\s+(\d+)\s+\|', line) + if m: + def to_val(n, s): + v = int(n) + return v * 1024 if s == 'K' else v * 1048576 + result[current_hist].append({ + 'low': to_val(m.group(1), m.group(2)), + 'high': to_val(m.group(3), m.group(4)), + 'count': int(m.group(5)), + }) + + return result + + +def hist_percentile(buckets: List[Dict], pct: float) -> int: + """Return the lower bound of the bucket containing the given percentile.""" + total = sum(b['count'] for b in buckets) + if total == 0: + return 0 + target = total * pct / 100.0 + cumulative = 0 + for b in buckets: + cumulative += b['count'] + if cumulative >= target: + return b['low'] + return buckets[-1]['low'] if buckets else 0 + + +def hist_to_bssplit(buckets: List[Dict]) -> str: + """Convert a bpftrace size histogram (KB) to fio bssplit format.""" + total = sum(b['count'] for b in buckets) + if total == 0: + return "4k/100" + parts = [] + for b in buckets: + if b['count'] == 0: + continue + size_kb = b['low'] + pct = int(round(b['count'] * 100.0 / total)) + if pct == 0 and b['count'] > 0: + pct = 1 + if size_kb >= 1024: + size_str = f"{size_kb // 1024}m" + elif size_kb == 0: + continue # skip zero-size buckets + else: + size_str = f"{size_kb}k" + parts.append(f"{size_str}/{pct}") + return ":".join(parts) if parts else "4k/100" + + +def generate_fio(histograms: Dict, process_name: str = "") -> str: + """Generate a fio .ini config from parsed histograms.""" + + # ── bssplit ── + read_bs = histograms.get('bssplit_read_kb', []) + write_bs = histograms.get('bssplit_write_kb', []) + read_bssplit = hist_to_bssplit(read_bs) + write_bssplit = hist_to_bssplit(write_bs) + + # ── rwmixread ── + read_count = sum(b['count'] for b in read_bs) + write_count = sum(b['count'] for b in write_bs) + total_io = read_count + write_count + rwmixread = int(round(read_count * 100.0 / total_io)) if total_io > 0 else 50 + + # ── iodepth from QD histogram P50 ── + iodepth = 32 + for qd_key in ('qd_read', 'qd_write'): + buckets = histograms.get(qd_key, []) + if buckets: + candidate = max(1, hist_percentile(buckets, 50)) + iodepth = max(iodepth, candidate) + + # ── thinktime from write_to_fsync gap ── + thinktime_us = 0 + wt_buckets = histograms.get('write_to_fsync_us', []) + if sum(b['count'] for b in wt_buckets) >= 4: + thinktime_us = hist_percentile(wt_buckets, 50) + + # ── thinktime_iotime from fsync latency ── + thinktime_iotime_us = 0 + fs_buckets = histograms.get('fsync_us', []) + if sum(b['count'] for b in fs_buckets) >= 4: + thinktime_iotime_us = hist_percentile(fs_buckets, 50) + + # ── LBA summary ── + lba_summary = "" + for lba_key, direction in [('lba_read_gb', 'Read'), ('lba_write_gb', 'Write')]: + buckets = histograms.get(lba_key, []) + total = sum(b['count'] for b in buckets) + if total > 0: + hot = [b for b in buckets if b['count'] > total * 0.01] + if hot: + lba_summary += f"# {direction} hot zone: {hot[0]['low']}-{hot[-1]['high']} GB ({sum(b['count'] for b in hot)*100//total}% of I/O)\n" + + # ── D2C summary ── + d2c_summary = "" + for key, direction in [('d2c_read_us', 'Read'), ('d2c_write_us', 'Write')]: + buckets = histograms.get(key, []) + total = sum(b['count'] for b in buckets) + if total > 0: + p50 = hist_percentile(buckets, 50) + p99 = hist_percentile(buckets, 99) + d2c_summary += f"# D2C {direction}: P50={p50} us, P99={p99} us ({total} samples)\n" + + # ── Build config ── + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + job_name = process_name.replace('-', '_') if process_name else "traced" + + lines = [ + f"# Storage Latency Stack; Distilled fio Workload", + f"# Generated: {timestamp}", + f"# Source process: {process_name or '(all)'}", + f"# Total traced I/Os: {total_io:,} ({read_count:,} reads, {write_count:,} writes)", + f"#", + ] + if d2c_summary: + lines.append(f"# Device latency (D2C):") + lines.append(d2c_summary.rstrip()) + if lba_summary: + lines.append(f"# LBA spatial distribution:") + lines.append(lba_summary.rstrip()) + lines.extend([ + f"#", + f"# Usage:", + f"# fio --filename=/dev/nvmeXn1", + f"# fio --filename=/mnt/nvme/fio_test --size=100G", + f"", + f"[{job_name}_workload]", + f"ioengine=libaio", + f"direct=1", + f"time_based", + f"runtime=300", + f"rw=randrw", + f"rwmixread={rwmixread}", + f"bssplit={read_bssplit},{write_bssplit}", + f"iodepth={iodepth}", + f"iodepth_batch_submit={iodepth}", + f"iodepth_batch_complete_min=1", + f"size=100%", + ]) + + if thinktime_us > 0: + lines.append(f"thinktime={thinktime_us}") + lines.append(f"thinktime_blocks={iodepth}") + if thinktime_iotime_us > 0: + iotime_s = max(1, thinktime_iotime_us // 1000000) + lines.append(f"# thinktime_iotime={iotime_s}s # uncomment for fio 3.28+") + + lines.extend([ + f"refill_buffers=1", + f"norandommap=1", + f"randrepeat=0", + f"numjobs=1", + f"group_reporting", + f"percentile_list=50:95:99:99.9:99.99", + ]) + + return "\n".join(lines) + "\n" + + +def main(): + parser = argparse.ArgumentParser( + description='Distill bpftrace trace output into a fio workload file.' + ) + parser.add_argument('-i', '--input', default=None, + help='Input file (bpftrace output). Default: stdin') + parser.add_argument('-o', '--output', default=None, + help='Output fio .ini file. Default: fio_traced_TIMESTAMP.ini') + parser.add_argument('--process', default='', + help='Process name (for fio job naming and comments)') + parser.add_argument('--stdout', action='store_true', + help='Print fio config to stdout instead of file') + args = parser.parse_args() + + # Read input + if args.input: + with open(args.input, 'r') as f: + text = f.read() + else: + text = sys.stdin.read() + + if not text.strip(): + print("Error: empty input", file=sys.stderr) + sys.exit(1) + + # Parse + histograms = parse_bpftrace_output(text) + if not histograms: + print("Error: no histograms found in input", file=sys.stderr) + sys.exit(1) + + # Check for required histograms + has_bssplit = 'bssplit_read_kb' in histograms or 'bssplit_write_kb' in histograms + if not has_bssplit: + print("Error: no bssplit histograms found. Was storage_latency_stack.bt used?", file=sys.stderr) + sys.exit(1) + + # Generate + fio_config = generate_fio(histograms, args.process) + + if args.stdout: + print(fio_config) + else: + if args.output: + outpath = args.output + else: + ts = datetime.now().strftime('%Y%m%d_%H%M%S') + outpath = f"fio_traced_{ts}.ini" + with open(outpath, 'w') as f: + f.write(fio_config) + print(f"fio workload saved to {outpath}") + + +if __name__ == '__main__': + main() diff --git a/kv_cache_benchmark/utils/storage_latency_stack.bt b/kv_cache_benchmark/utils/storage_latency_stack.bt new file mode 100644 index 00000000..681be5cd --- /dev/null +++ b/kv_cache_benchmark/utils/storage_latency_stack.bt @@ -0,0 +1,317 @@ +#!/usr/bin/env bpftrace +/* + * storage_latency_stack.bt — Full-Stack Storage Latency Diagnostic Tool + * + * Decomposes storage I/O latency across every layer of the Linux I/O stack: + * + * Application (VFS) + * └── Filesystem / Page Cache + * └── Block Layer (I/O Scheduler) + * └── NVMe / SCSI Driver + * └── Physical Device + * + * Ten histograms produced on Ctrl-C: + * + * @q2d_read_us — Queue-to-Dispatch (I/O scheduler wait) for reads + * @q2d_write_us — Queue-to-Dispatch (I/O scheduler wait) for writes + * @d2c_read_us — Dispatch-to-Complete (actual device hardware time) for reads + * @d2c_write_us — Dispatch-to-Complete (actual device hardware time) for writes + * @vfs_read_us — Full VFS read latency (application-visible) + * @vfs_write_us — Full VFS write latency (application-visible) + * + * Plus block size, queue depth, and LBA distribution: + * + * @bssplit_read_kb — Block size distribution for reads (KB) + * @bssplit_write_kb — Block size distribution for writes (KB) + * @qd_read — Instantaneous queue depth at dispatch (reads) + * @qd_write — Instantaneous queue depth at dispatch (writes) + * @lba_read_gb — LBA heatmap for reads (10 GB linear buckets) + * @lba_write_gb — LBA heatmap for writes (10 GB linear buckets) + * + * Plus the write-path serialization gap: + * + * @write_to_fsync_us — Time between write() return and fsync() entry + * (captures CPU serialization, locks, GIL overhead) + * @fsync_us — Actual fsync() latency (device flush) + * + * Interpreting results: + * + * If D2C >> Q2D: Device is the bottleneck (slow NAND, MDTS splitting) + * If Q2D >> D2C: I/O scheduler contention (too many threads, bad scheduler) + * If VFS >> Q2D + D2C: Filesystem/page cache/serialization overhead dominates + * If write_to_fsync >> fsync: CPU serialization is the bottleneck, not storage + * + * Bimodal VFS patterns: + * + * Fast (µs) + Slow (ms) reads = page cache hits + device misses + * Fast (µs) + Slow (ms) writes = buffered writes + fsync flushes + * If ALL reads are fast (µs) = page cache absorbing everything — NOT testing storage! + * + * Usage: + * + * # Trace all I/O to all devices, filter VFS to specific process: + * sudo BPFTRACE_COMM="kv-cache" bpftrace storage_latency_stack.bt + * + * # Trace a Python script: + * sudo BPFTRACE_COMM="python3" bpftrace storage_latency_stack.bt + * + * # Trace fio: + * sudo BPFTRACE_COMM="fio" bpftrace storage_latency_stack.bt + * + * # Trace all processes (caution: noisy): + * sudo BPFTRACE_COMM="" bpftrace storage_latency_stack.bt + * + * Optional device filter (block layer only): + * + * # Get device number: lsblk -no MAJ:MIN /dev/nvme5n1 → 259:5 + * # Convert: python3 -c "print(hex(259 << 20 | 5))" → 0x10300005 + * sudo BPFTRACE_COMM="kv-cache" BPFTRACE_DEV=0x10300005 bpftrace storage_latency_stack.bt + * + * Requirements: + * - Linux 5.x+ with BTF support + * - bpftrace 0.16+ (sudo apt install bpftrace) + * - CAP_BPF or root + * + * Author: Hazem Awadallah, Kingston Digital + * License: Apache-2.0, donated to MLCommons + */ + +/* ==================================================================== + * BLOCK LAYER: Queue → Dispatch → Complete + * + * These tracepoints fire for ALL block I/O on the system (or filtered + * by device). They measure the kernel block layer, below the filesystem + * and above the device driver. + * + * Timeline per I/O request: + * + * block_rq_insert ─── Q2D ─── block_rq_issue ─── D2C ─── block_rq_complete + * (entered I/O (dispatched to (device finished) + * scheduler queue) NVMe/SCSI driver) + * + * Q2D = time waiting in I/O scheduler (noop/mq-deadline/kyber/bfq) + * D2C = time the physical device took to complete the command + * + * For NVMe, D2C includes: + * - NVMe submission queue → controller processing → completion queue + * - For PRP drives: host PRP list construction overhead + * - For drives with small MDTS: each split command measured individually + * + * ==================================================================== */ + +tracepoint:block:block_rq_insert +{ + @q[args->dev, args->sector] = nsecs; +} + +/* Handle device-mapper and md remaps — follow the I/O to its final device */ +tracepoint:block:block_rq_remap +{ + if (@q[args->old_dev, args->old_sector]) { + @q[args->dev, args->sector] = @q[args->old_dev, args->old_sector]; + delete(@q[args->old_dev, args->old_sector]); + } +} + +/* I/O dispatched to device driver — start of D2C. + * + * NVMe with blk-mq and 'none' scheduler uses direct dispatch: + * I/Os go straight to block_rq_issue without block_rq_insert. + * D2C, bssplit, and QD are measured unconditionally. + * Q2D is measured separately only when block_rq_insert fired. + * + * Split into two probes to stay within bpftrace 0.14 verifier limits. */ + +/* Q2D: only fires when block_rq_insert populated @q */ +tracepoint:block:block_rq_issue +/@q[args->dev, args->sector]/ +{ + $lat = (nsecs - @q[args->dev, args->sector]) / 1000; + if (((uint8 *)args->rwbs)[0] == 82) { + @q2d_read_us = hist($lat); + } else if (((uint8 *)args->rwbs)[0] == 87) { + @q2d_write_us = hist($lat); + } + delete(@q[args->dev, args->sector]); +} + +/* D2C start + bssplit + QD: fires on ALL dispatched I/Os */ +tracepoint:block:block_rq_issue +{ + @d[args->dev, args->sector] = nsecs; + + if (((uint8 *)args->rwbs)[0] == 82) { + @bssplit_read_kb = hist(args->bytes / 1024); + @inflight_read++; + @qd_read = hist(@inflight_read); + } else if (((uint8 *)args->rwbs)[0] == 87) { + @bssplit_write_kb = hist(args->bytes / 1024); + @inflight_write++; + @qd_write = hist(@inflight_write); + } +} + +/* LBA heatmap: where on the device is I/O landing? + * Sector >> 21 converts 512-byte sectors to GB offset. + * lhist gives linear 10 GB buckets up to 8 TB — shows spatial I/O pattern. + * Clustered = narrow band; random = spread; sequential = progression. */ +tracepoint:block:block_rq_issue +{ + $gb = args->sector >> 21; + if (((uint8 *)args->rwbs)[0] == 82) { + @lba_read_gb = lhist($gb, 0, 8000, 10); + } else if (((uint8 *)args->rwbs)[0] == 87) { + @lba_write_gb = lhist($gb, 0, 8000, 10); + } +} + +/* I/O completed by device — end of D2C, decrement in-flight counters */ +tracepoint:block:block_rq_complete +{ + if (@d[args->dev, args->sector]) { + $lat = (nsecs - @d[args->dev, args->sector]) / 1000; + + if (((uint8 *)args->rwbs)[0] == 82) { + @d2c_read_us = hist($lat); + @inflight_read--; + } else if (((uint8 *)args->rwbs)[0] == 87) { + @d2c_write_us = hist($lat); + @inflight_write--; + } + + delete(@d[args->dev, args->sector]); + } +} + + +/* ==================================================================== + * VFS LAYER: Application-visible read/write latency + * + * Measures the full syscall time from the application's perspective. + * Includes: page cache lookup, filesystem journaling, block I/O, + * and data copy to/from userspace. + * + * Bimodal pattern expected under correct benchmark behavior: + * - Fast peak (µs): page cache hits (buffered writes, cached reads) + * - Slow peak (ms): actual device I/O (fsync flushes, cache misses) + * + * If the slow peak is MISSING, the benchmark is not hitting storage! + * Check that posix_fadvise(DONTNEED) or O_DIRECT is being used. + * + * Filtered by process name via BPFTRACE_COMM environment variable. + * ==================================================================== */ + +kprobe:vfs_read +/comm == str($1) || str($1) == ""/ +{ + @rs[tid] = nsecs; +} + +kretprobe:vfs_read +/@rs[tid]/ +{ + @vfs_read_us = hist((nsecs - @rs[tid]) / 1000); + delete(@rs[tid]); +} + +kprobe:vfs_write +/comm == str($1) || str($1) == ""/ +{ + @ws[tid] = nsecs; +} + +kretprobe:vfs_write +/@ws[tid]/ +{ + $lat = (nsecs - @ws[tid]) / 1000; + @vfs_write_us = hist($lat); + delete(@ws[tid]); +} + + +/* ==================================================================== + * SERIALIZATION GAP: write() → fsync() + * + * Measures the CPU time between a write() returning and the next + * fsync() call on the same thread. This gap captures: + * + * - NumPy/Python serialization overhead + * - Lock acquisition and GIL contention + * - Memory allocation and buffer management + * - Application-level bookkeeping (stats, logging) + * + * In KV cache workloads, this is "host serialization latency" — + * typically 30-50% of total per-request time. + * + * Also measures fsync() itself — the actual device flush. + * + * Combined interpretation: + * write_to_fsync + fsync ≈ total per-entry write cost + * If write_to_fsync >> fsync: CPU-bound, faster storage won't help + * If fsync >> write_to_fsync: device-bound, storage upgrade helps + * ==================================================================== */ + +tracepoint:syscalls:sys_enter_write +/comm == str($1) || str($1) == ""/ +{ + @write_enter[tid] = nsecs; +} + +tracepoint:syscalls:sys_exit_write +/@write_enter[tid]/ +{ + @write_done[tid] = nsecs; + delete(@write_enter[tid]); +} + +tracepoint:syscalls:sys_enter_fsync +/comm == str($1) || str($1) == ""/ +{ + /* Measure gap from last write() completion to this fsync() entry */ + if (@write_done[tid]) { + @write_to_fsync_us = hist((nsecs - @write_done[tid]) / 1000); + delete(@write_done[tid]); + } + @fsync_enter[tid] = nsecs; +} + +tracepoint:syscalls:sys_exit_fsync +/@fsync_enter[tid]/ +{ + @fsync_us = hist((nsecs - @fsync_enter[tid]) / 1000); + delete(@fsync_enter[tid]); +} + + +/* ==================================================================== + * READ PATH: fadvise → read gap (optional, for cache invalidation) + * + * If the benchmark uses posix_fadvise(DONTNEED) to drop page cache + * before reads, this measures the gap. A non-zero gap means the + * application is doing work between cache invalidation and read. + * ==================================================================== */ + +tracepoint:syscalls:sys_exit_fadvise64 +/comm == str($1) || str($1) == ""/ +{ + @fadvise_done[tid] = nsecs; +} + +tracepoint:syscalls:sys_enter_read +/comm == str($1) || str($1) == "" && @fadvise_done[tid]/ +{ + if (@fadvise_done[tid]) { + @fadvise_to_read_us = hist((nsecs - @fadvise_done[tid]) / 1000); + delete(@fadvise_done[tid]); + } +} + + +/* ==================================================================== + * CLEANUP: Remove incomplete entries on exit + * ==================================================================== */ + +/* END block removed for bpftrace 0.14 compatibility. + * bpftrace automatically prints all maps on SIGINT. + * The @q, @d, @rs etc. maps will appear as empty hashes in output; + * the parser ignores them (no histogram buckets). */ diff --git a/kv_cache_benchmark/utils/storage_latency_stack.sh b/kv_cache_benchmark/utils/storage_latency_stack.sh new file mode 100755 index 00000000..c04aeea4 --- /dev/null +++ b/kv_cache_benchmark/utils/storage_latency_stack.sh @@ -0,0 +1,107 @@ +#!/bin/bash +# storage_latency_stack.sh — Full-Stack Storage Latency Diagnostic +# Author: Hazem Awadallah, Kingston Digital +# License: Apache-2.0, donated to MLCommons +# +# Decomposes I/O latency across every layer of the Linux storage stack: +# VFS → Filesystem → Block Layer (Q2D) → Device (D2C) +# +# Plus serialization gap analysis (write→fsync) for CPU vs device bottleneck, +# block size distribution (bssplit), queue depth, and LBA heatmap. +# +# Usage: +# sudo ./storage_latency_stack.sh kv-cache # trace kv-cache process +# sudo ./storage_latency_stack.sh vllm # trace vllm +# sudo ./storage_latency_stack.sh python3 # trace python3 +# sudo ./storage_latency_stack.sh "" # trace all (noisy) +# +# sudo ./storage_latency_stack.sh vllm --fio # trace + generate fio workload +# sudo ./storage_latency_stack.sh llm-d --fio # trace llm-d + generate fio +# +# Output (on Ctrl-C): +# Histograms printed to stdout. +# With --fio: also generates a fio .ini workload file from the trace data. +# +# Diagnosing: +# D2C >> Q2D → Device bottleneck (NAND, MDTS, interface) +# Q2D >> D2C → I/O scheduler contention +# VFS >> Q2D + D2C → Filesystem/serialization overhead +# write_to_fsync >> fsync → CPU-bound, faster storage won't help +# fsync >> write_to_fsync → Device-bound, storage upgrade helps +# All VFS reads in us → Page cache hit, NOT testing storage! + +set -euo pipefail + +COMM="" +GEN_FIO=0 + +# Parse arguments +for arg in "$@"; do + if [ "$arg" = "--fio" ]; then + GEN_FIO=1 + elif [ -z "$COMM" ]; then + COMM="$arg" + fi +done + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +BT_SCRIPT="${SCRIPT_DIR}/storage_latency_stack.bt" +DISTILL_SCRIPT="${SCRIPT_DIR}/distill_fio.py" + +if [ ! -f "$BT_SCRIPT" ]; then + echo "Error: $BT_SCRIPT not found" >&2 + exit 1 +fi + +if [ "$(id -u)" -ne 0 ]; then + echo "Error: must run as root (sudo)" >&2 + exit 1 +fi + +if ! command -v bpftrace &>/dev/null; then + echo "Error: bpftrace not found. Install: sudo apt install bpftrace" >&2 + exit 1 +fi + +if [ -n "$COMM" ]; then + echo "Tracing process: $COMM" + echo "Block layer: all devices (unfiltered)" +else + echo "Tracing ALL processes (block + VFS)" + echo "Warning: this will be noisy. Consider filtering by process name." +fi + +if [ "$GEN_FIO" -eq 1 ]; then + echo "fio generation: enabled (will save .ini after Ctrl-C)" +fi + +echo "Press Ctrl-C to stop and print histograms." +echo "---" + +if [ "$GEN_FIO" -eq 1 ]; then + # Capture bpftrace output to a temp file + TRACE_OUTPUT=$(mktemp /tmp/bpftrace_trace_XXXXXXXX.txt) + + # Run bpftrace; redirect all output to file + # bpftrace prints histograms on SIGINT before exiting + bpftrace "$BT_SCRIPT" "$COMM" > "$TRACE_OUTPUT" 2>&1 || true + + # Display the histograms + cat "$TRACE_OUTPUT" + + echo "" + echo "=== Distilling fio workload from trace data ===" + + if [ -f "$DISTILL_SCRIPT" ] && command -v python3 &>/dev/null; then + python3 "$DISTILL_SCRIPT" --input "$TRACE_OUTPUT" --process "$COMM" + else + echo "Warning: python3 or distill_fio.py not found. Skipping fio generation." + echo "Trace data saved to: $TRACE_OUTPUT" + echo "Manual: python3 distill_fio.py -i $TRACE_OUTPUT --process $COMM" + exit 0 + fi + + rm -f "$TRACE_OUTPUT" +else + exec bpftrace "$BT_SCRIPT" "$COMM" +fi